我们之前学习的函数调用:
在调用FunctionA函数内部调用FunctionB,只有在进入FunctionB函数内部后执行FunctionB的业务代码才可以返回到调用FunctionB函数之后的代码继续执行。FunctionA把数据交给FunctionB处理,这在单线程下是一个串行的过程。函数和函数之间的交互的本质上是数据通信。
如果将FunctionA交由线程A处理,FunctionB交由线程B处理,并内置一段缓冲区,Function
A往缓冲区写数据,FunctionB往缓冲区取走数据处理,这样用两个执行流和一段缓冲区就可以实现FunctionA和FunctionB并行执行。我们把这种场景就叫做生产者消费者模型。
生活中的生产者消费者模型最典型的就是超市。站在超市的角度消费者就是普通老百姓,生产者就是各种供货商。
为什么要有超市?
3种关系:
2种角色:
1个消费场所:
Makefile文件:
CpTest:CpTest.ccg++ -o $@ $^ -std=c++11 -lpthread
.PHONY:clean
clean:rm -f CpTest
BlockQueue.hpp文件:
#pragma once
#include
#include
#include namespace ns_blockqueue
{const int default_capacity = 5;template class BlockQueue{private:std::queue _bq; // 阻塞队列int _capacity; // 队列的容量上限pthread_mutex_t _mtx; // 保护临界资源的互斥锁pthread_cond_t is_full; // 队列满,消费者条件变量等待pthread_cond_t is_empty; // 队列空,生产者条件变量等待bool IsFull(){return _bq.size() == _capacity;}bool IsEmpty(){return _bq.size() == 0;}void LockQueue(){pthread_mutex_lock(&_mtx);}void UnlockQueue(){pthread_mutex_unlock(&_mtx);}void ProducterWait(){pthread_cond_wait(&is_empty, &_mtx);}void ConsumerWait(){pthread_cond_wait(&is_full, &_mtx);}void WakeupConsumer(){pthread_cond_signal(&is_full);}void WakeupProducter(){pthread_cond_signal(&is_empty);}public:BlockQueue(int capacity = default_capacity) : _capacity(capacity){pthread_mutex_init(&_mtx, nullptr);pthread_cond_init(&is_full, nullptr);pthread_cond_init(&is_empty, nullptr);}void Push(const T &in){LockQueue();// if (IsFull())while (IsFull()){ProducterWait();}_bq.push(in);if (_bq.size() > _capacity / 2)WakeupConsumer();UnlockQueue();}void Pop(T *out){LockQueue();while (IsEmpty()){ConsumerWait();}*out = _bq.front();_bq.pop();if (_bq.size() < _capacity / 2)WakeupProducter();UnlockQueue();}~BlockQueue(){pthread_mutex_destroy(&_mtx);pthread_cond_destroy(&is_full);pthread_cond_destroy(&is_empty);}};
}
Task.hpp:
#pragma once
#include
#include
#include namespace ns_task
{class Task{private:int _x;int _y;char _op;public:Task() {}Task(int x, int y, char op):_x(x), _y(y), _op(op) {}int Run(){int res = 0;switch(_op){case '+':res = _x + _y;break;case '-':res = _x - _y;break;case '*':res = _x * _y;break;case '/':res = _x / _y;break;case '%':res = _x % _y;break;default:break;}std::cout << "当前任务正在被: " << pthread_self() << " 处理: " << _x << _op << _y << "=" << res << std::endl;return res;}int operator()(){return Run();}~Task(){}};
}
CpTest.cc文件:
#include "BlockQueue.hpp"
#include "Task.hpp"
#include
#include
#include using namespace ns_blockqueue;
using namespace ns_task;
void *consumer(void *args)
{BlockQueue *bq = (BlockQueue *)args;while(true){Task t;bq->Pop(&t);t();// int data = 0;// bq->Pop(&data);// std::cout << "消费者消费数据:" << data << std::endl;}}void *producter(void *args)
{BlockQueue *bq = (BlockQueue *)args;std::string ops = "+-*/%";while(true){int x = rand()%10 + 1;int y = rand()%20 + 1;char op = ops[rand()%5];Task t(x, y, op);std::cout << "线程: " << pthread_self() << " 分发任务 " << x << op << y << "=?" << std::endl;bq->Push(t);sleep(1);// sleep(1);// int data = rand()%20 + 1;// bq->Push(data); // std::cout << "生产者成产数据:" << data << std::endl;}
}int main()
{srand((long long)time(nullptr));BlockQueue *bq = new BlockQueue();pthread_t c, p;pthread_t c1, c2, c3;pthread_t p1, p2, p3;pthread_create(&c, nullptr, consumer, (void*)bq);// pthread_create(&c1, nullptr, consumer, (void*)bq);// pthread_create(&c2, nullptr, consumer, (void*)bq);// pthread_create(&c3, nullptr, consumer, (void*)bq);pthread_create(&p, nullptr, producter, (void*)bq);// pthread_create(&p1, nullptr, producter, (void*)bq);// pthread_create(&p2, nullptr, producter, (void*)bq);// pthread_create(&p3, nullptr, producter, (void*)bq);pthread_join(c, nullptr);pthread_join(p, nullptr);return 0;
}
Push函数中生产者判断是否为满时,用if进行条件判断是不太完善的,因为当满足为满条件后,生产者线程要进行挂起等待,但是如果挂起等待失败,如果生产者线程被伪唤醒,都会造成生产条件不具备,导致程序出错。所以用while循环判断是比较合理的,多次判断能避免以上的情况。
运行结果:
[cwx@VM-20-16-centos consumer_producter]$ make
g++ -o CpTest CpTest.cc -std=c++11 -lpthread
[cwx@VM-20-16-centos consumer_producter]$ ./CpTest
线程: 140317604919040 分发任务 7*11=?
当前任务正在被: 140317613311744 处理: 7*11=77
线程: 140317604919040 分发任务 2-20=?
线程: 140317604919040 分发任务 10+7=?
线程: 140317604919040 分发任务 5%2=?
当前任务正在被: 140317613311744 处理: 2-20=-18
当前任务正在被: 140317613311744 处理: 10+7=17
当前任务正在被: 140317613311744 处理: 5%2=1
POSIX信号量和SystemV信号量作用相同,都是用于同步操作,达到无冲突的访问共享资源目的。 但POSIX可以用于线程间同步。
初始化信号量:
#include 原型:int sem_init(sem_t *sem, int pshared, unsigned int value);
参数:pshared:0表示线程间共享,非零表示进程间共享value:信号量初始值
销毁信号量:
#include 原型:int sem_destroy(sem_t *sem);
等待信号量:
#include 功能: 等待信号量,会将信号量的值减1
原型:int sem_wait(sem_t *sem); //P()
发布信号量:
#include 功能:发布信号量,表示资源使用完毕,可以归还资源了。将信号量值加1。
原型:int sem_post(sem_t *sem); //V()
环形队列实际上是数组的线性空间来实现的。
当队列到了尾部,运用取模操作使之指向头部。
环形队列的判空判满条件:
基本实现思想:
伪代码:
实现代码:
Makefile文件:
ring_queue_test:ring_queue_test.ccg++ -o $@ $^ -std=c++11 -lpthread.PHONY:clean
clean:rm -rf ring_test
ring_queue.hpp文件:
#pragma once
#include
#include
#include // 基于环形队列的多生产者多消费者模型
namespace ns_ring_queue
{const int cap_default = 10;template class RingQueue{private:std::vector _ring_queue; // 环形队列int _cap; // 环形队列的容量sem_t _sem_blank; // 空位置信号量sem_t _sem_data; // 数据信号量int c_step; // 消费者下标int p_step; // 生产者下标pthread_mutex_t c_mtx; // 保护消费者的临界资源pthread_mutex_t p_mtx; // 保护生产者的临界资源public:RingQueue(int cap = cap_default) : _ring_queue(cap), _cap(cap){sem_init(&_sem_blank, 0, _cap);sem_init(&_sem_data, 0, 0);c_step = p_step = 0;pthread_mutex_init(&c_mtx, nullptr);pthread_mutex_init(&p_mtx, nullptr);}void Push(const T &in){sem_wait(&_sem_blank); //P(blank)pthread_mutex_lock(&p_mtx);_ring_queue[p_step] = in;p_step++; // 临界资源p_step %= _cap;pthread_mutex_unlock(&p_mtx);sem_post(&_sem_data); //V(data)}void Pop(T* out){sem_wait(&_sem_data); //P(data)pthread_mutex_lock(&c_mtx);*out = _ring_queue[c_step];c_step++; // 临界资源c_step %= _cap;pthread_mutex_unlock(&c_mtx);sem_post(&_sem_blank); //V(blank)}~RingQueue(){sem_destroy(&_sem_blank);sem_destroy(&_sem_data);pthread_mutex_destroy(&p_mtx);pthread_mutex_destroy(&c_mtx);}};
}
ring_queue_test.cc文件:
#include "ring_queue.hpp"
#include "Task.hpp"
#include
#include
#include
#include using namespace ns_task;
using namespace ns_ring_queue;void* consumer(void* args)
{RingQueue* rq = (RingQueue*)args;while(true){Task t;rq->Pop(&t);t();// sleep(1);// int data;// rq->Pop(&data);// std::cout << "Thread:" << pthread_self() << "消费者消费数据:" << data << std::endl;// sleep(1);}
}void* producter(void* args)
{RingQueue* rq = (RingQueue*)args;while(true){int x = rand()%20 + 1;int y = rand()%10 + 1;char op = "+-*/%"[rand()%5];Task t(x, y, op);rq->Push(t);std::cout << "Producter Thread: " << pthread_self() << " Task: " << t.message() << std::endl; // int data = rand()%30 + 1;// rq->Push(data);// std::cout << "Thread:" << pthread_self() << "生产者生产数据:" << data << std::endl;}
}int main()
{srand((long long)time(nullptr));RingQueue* rq = new RingQueue();pthread_t c0,c1,c2,c3,p0,p1,p2;pthread_create(&c0, nullptr, consumer, (void*)rq);pthread_create(&c1, nullptr, consumer, (void*)rq);pthread_create(&c2, nullptr, consumer, (void*)rq);pthread_create(&c3, nullptr, consumer, (void*)rq);pthread_create(&p0, nullptr, producter, (void*)rq);pthread_create(&p1, nullptr, producter, (void*)rq);pthread_create(&p2, nullptr, producter, (void*)rq);pthread_join(c0, nullptr);pthread_join(c1, nullptr);pthread_join(c2, nullptr);pthread_join(c3, nullptr);pthread_join(p0, nullptr);pthread_join(p1, nullptr);pthread_join(p2, nullptr);return 0;
}
Task.hpp文件:
#pragma once
#include
#include
#include namespace ns_task
{class Task{private:int _x;int _y;char _op;public:Task() {}Task(int x, int y, char op):_x(x), _y(y), _op(op) {}std::string message(){std::string msg = std::to_string(_x);msg += _op;msg += std::to_string(_y);msg += "=?";return msg;}int Run(){int res = 0;switch(_op){case '+':res = _x + _y;break;case '-':res = _x - _y;break;case '*':res = _x * _y;break;case '/':res = _x / _y;break;case '%':res = _x % _y;break;default:break;}std::cout << "Consumer Thread: " << pthread_self() << " Task: " << _x << _op << _y << "=" << res << std::endl;return res;}int operator()(){return Run();}~Task(){}};
}
运行结果:
[cwx@VM-20-16-centos ring_queue]$ ./ring_queue_test
Producter Thread: 139742694962944 Task: 12+4=?
Consumer Thread: 139742736926464 Task: 12+4=16
Producter Thread: 139742711748352 Task: 20%2=?
Producter Thread: 139742703355648 Task: 17/10=?
Consumer Thread: 139742745319168 Task: 20%2=0
Consumer Thread: 139742745319168 Task: 17/10=1
上一篇:C++ 深入理解模板实现多态思想