[Linux]生产者消费者模型(基于BlockQueue的生产者消费者模型 | 基于环形队列的生产者消费者模型 | 信号量 )
创始人
2024-05-22 19:03:06
0

在这里插入图片描述

文章目录

  • 生产者消费者模型
    • 函数调用角度理解生产者消费者模型
    • 生活角度理解生产者消费者模型
    • 为什么要使用生产者消费者模型
    • 生产者消费者模型优点
    • 321原则
    • 基于BlockingQueue的生产者消费者模型
  • POSIX信号量
    • 回顾信号量概念
    • 信号量操作函数
    • 环形队列
    • 基于环形队列的生产者消费者模型

生产者消费者模型

函数调用角度理解生产者消费者模型

我们之前学习的函数调用:

在这里插入图片描述
在调用FunctionA函数内部调用FunctionB,只有在进入FunctionB函数内部后执行FunctionB的业务代码才可以返回到调用FunctionB函数之后的代码继续执行。FunctionA把数据交给FunctionB处理,这在单线程下是一个串行的过程。函数和函数之间的交互的本质上是数据通信。

如果将FunctionA交由线程A处理,FunctionB交由线程B处理,并内置一段缓冲区,Function
A往缓冲区写数据,FunctionB往缓冲区取走数据处理,这样用两个执行流和一段缓冲区就可以实现FunctionA和FunctionB并行执行。我们把这种场景就叫做生产者消费者模型。

在这里插入图片描述


生活角度理解生产者消费者模型

生活中的生产者消费者模型最典型的就是超市。站在超市的角度消费者就是普通老百姓,生产者就是各种供货商。

在这里插入图片描述

为什么要有超市?

  1. 提高效率。消费者和供应商无法直接交易,供应商的生产量动则成千上万,而消费者的需求并没有这么高,效率很低,这就需要超市来做一个中间件。超市的功能是收集需求,超市收集老百姓的需求,向各个供货商进大量的货物,足以养活供应商,也可以满足大部分老百姓的需求,超市的存在就大大减少了交易成本,这就大大提高了效率。
  2. 生产环节和消费环节进行解耦。当某一家矿泉水供应商倒闭了,对于普通老百姓短期来说是没有影响的,因为还可以购买超市的存货,这就实现了生产环节和消费环节的解耦,生产者和消费者互不影响。

为什么要使用生产者消费者模型

  • 生产者消费者模式就是通过一个容器来解决生产者和消费者的强耦合问题。生产者和消费者彼此之间不直接通讯,而通过阻塞队列来进行通讯,所以生产者生产完数据之后不用等待消费者处理,直接扔给阻塞队列,消费者不找生产者要数据,而是直接从阻塞队列里取,阻塞队列就相当于一个缓冲区,平衡了生产者和消费者的处理能力。这个阻塞队列就是用来给生产者和消费者解耦的。

生产者消费者模型优点

  • 解耦
  • 支持并发
  • 支持忙闲不均

321原则

3种关系:

  • 生产者 vs 生产者:竞争关系、互斥关系
  • 消费者 vs 消费者:竞争关系、互斥关系
  • 生产者 vs 消费者:互斥关系、同步关系

2种角色:

  • 生产者(执行流)
  • 消费者(执行流)

1个消费场所:

  • 缓冲区(内存空间、STL容器等)

在这里插入图片描述


基于BlockingQueue的生产者消费者模型

  • 在多线程编程中阻塞队列(Blocking Queue)是一种常用于实现生产者和消费者模型的数据结构。其与普通的队列区别在于,当队列为空时,从队列获取元素的操作将会被阻塞,直到队列中被放入了元素;当队列满时,往队列里存放元素的操作也会被阻塞,直到有元素被从队列中取出(以上的操作都是基于不同的线程来说的,线程在对阻塞队列进程操作时会被阻塞)。

在这里插入图片描述
Makefile文件:

CpTest:CpTest.ccg++ -o $@ $^ -std=c++11 -lpthread
.PHONY:clean
clean:rm -f CpTest

BlockQueue.hpp文件:

#pragma once
#include 
#include 
#include namespace ns_blockqueue
{const int default_capacity = 5;template class BlockQueue{private:std::queue _bq;       // 阻塞队列int _capacity;           // 队列的容量上限pthread_mutex_t _mtx;    // 保护临界资源的互斥锁pthread_cond_t is_full;  // 队列满,消费者条件变量等待pthread_cond_t is_empty; // 队列空,生产者条件变量等待bool IsFull(){return _bq.size() == _capacity;}bool IsEmpty(){return _bq.size() == 0;}void LockQueue(){pthread_mutex_lock(&_mtx);}void UnlockQueue(){pthread_mutex_unlock(&_mtx);}void ProducterWait(){pthread_cond_wait(&is_empty, &_mtx);}void ConsumerWait(){pthread_cond_wait(&is_full, &_mtx);}void WakeupConsumer(){pthread_cond_signal(&is_full);}void WakeupProducter(){pthread_cond_signal(&is_empty);}public:BlockQueue(int capacity = default_capacity) : _capacity(capacity){pthread_mutex_init(&_mtx, nullptr);pthread_cond_init(&is_full, nullptr);pthread_cond_init(&is_empty, nullptr);}void Push(const T &in){LockQueue();// if (IsFull())while (IsFull()){ProducterWait();}_bq.push(in);if (_bq.size() > _capacity / 2)WakeupConsumer();UnlockQueue();}void Pop(T *out){LockQueue();while (IsEmpty()){ConsumerWait();}*out = _bq.front();_bq.pop();if (_bq.size() < _capacity / 2)WakeupProducter();UnlockQueue();}~BlockQueue(){pthread_mutex_destroy(&_mtx);pthread_cond_destroy(&is_full);pthread_cond_destroy(&is_empty);}};
}

Task.hpp:

#pragma once
#include 
#include 
#include namespace ns_task
{class Task{private:int _x;int _y;char _op;public:Task() {}Task(int x, int y, char op):_x(x), _y(y), _op(op) {}int Run(){int res = 0;switch(_op){case '+':res = _x + _y;break;case '-':res = _x - _y;break;case '*':res = _x * _y;break;case '/':res = _x / _y;break;case '%':res = _x % _y;break;default:break;}std::cout << "当前任务正在被: " << pthread_self() << " 处理: " << _x << _op << _y << "=" << res << std::endl;return res;}int operator()(){return Run();}~Task(){}};
}

CpTest.cc文件:

#include "BlockQueue.hpp"
#include "Task.hpp"
#include 
#include 
#include using namespace ns_blockqueue;
using namespace ns_task;
void *consumer(void *args)
{BlockQueue *bq = (BlockQueue *)args;while(true){Task t;bq->Pop(&t);t();// int data = 0;// bq->Pop(&data);// std::cout << "消费者消费数据:" << data << std::endl;}}void *producter(void *args)
{BlockQueue *bq = (BlockQueue *)args;std::string ops = "+-*/%";while(true){int x = rand()%10 + 1;int y = rand()%20 + 1;char op = ops[rand()%5];Task t(x, y, op);std::cout << "线程: " << pthread_self() <<  " 分发任务 " << x << op << y << "=?" << std::endl;bq->Push(t);sleep(1);// sleep(1);// int data = rand()%20 + 1;// bq->Push(data);   // std::cout << "生产者成产数据:" << data << std::endl;}
}int main()
{srand((long long)time(nullptr));BlockQueue *bq = new BlockQueue();pthread_t c, p;pthread_t c1, c2, c3;pthread_t p1, p2, p3;pthread_create(&c, nullptr, consumer, (void*)bq);// pthread_create(&c1, nullptr, consumer, (void*)bq);// pthread_create(&c2, nullptr, consumer, (void*)bq);// pthread_create(&c3, nullptr, consumer, (void*)bq);pthread_create(&p, nullptr, producter, (void*)bq);// pthread_create(&p1, nullptr, producter, (void*)bq);// pthread_create(&p2, nullptr, producter, (void*)bq);// pthread_create(&p3, nullptr, producter, (void*)bq);pthread_join(c, nullptr);pthread_join(p, nullptr);return 0;
}

在这里插入图片描述
Push函数中生产者判断是否为满时,用if进行条件判断是不太完善的,因为当满足为满条件后,生产者线程要进行挂起等待,但是如果挂起等待失败,如果生产者线程被伪唤醒,都会造成生产条件不具备,导致程序出错。所以用while循环判断是比较合理的,多次判断能避免以上的情况。

运行结果:

[cwx@VM-20-16-centos consumer_producter]$ make
g++ -o CpTest CpTest.cc -std=c++11 -lpthread
[cwx@VM-20-16-centos consumer_producter]$ ./CpTest 
线程: 140317604919040 分发任务 7*11=?
当前任务正在被: 140317613311744 处理: 7*11=77
线程: 140317604919040 分发任务 2-20=?
线程: 140317604919040 分发任务 10+7=?
线程: 140317604919040 分发任务 5%2=?
当前任务正在被: 140317613311744 处理: 2-20=-18
当前任务正在被: 140317613311744 处理: 10+7=17
当前任务正在被: 140317613311744 处理: 5%2=1

POSIX信号量

POSIX信号量和SystemV信号量作用相同,都是用于同步操作,达到无冲突的访问共享资源目的。 但POSIX可以用于线程间同步。

回顾信号量概念

  • 信号量的本质是一个计数器,用来描述临界资源中资源的数目(最多可以有多少资源分配给线程)。
  • 使用信号量就使得每个线程要访问临界资源就必须申请信号量,申请信号量的本质是预定资源,临界资源被拆分成一份一份的小资源,可以让多个线程同时访问临界资源,从而实现线程并发。

信号量操作函数

初始化信号量:

#include 原型:int sem_init(sem_t *sem, int pshared, unsigned int value);
参数:pshared:0表示线程间共享,非零表示进程间共享value:信号量初始值

销毁信号量:

#include 原型:int sem_destroy(sem_t *sem);

等待信号量:

#include 功能:	等待信号量,会将信号量的值减1
原型:int sem_wait(sem_t *sem); //P()

发布信号量:

#include 功能:发布信号量,表示资源使用完毕,可以归还资源了。将信号量值加1。
原型:int sem_post(sem_t *sem); //V()

在这里插入图片描述


环形队列

在这里插入图片描述

  • 环形队列实际上是数组的线性空间来实现的。

  • 当队列到了尾部,运用取模操作使之指向头部。

环形队列的判空判满条件:

  • front:指向环形队列的第一个元素,初始值为0
  • rear:指向环形队列的最后一个元素的后一个位置,初始值为0
  • 队列为空:front == rear
  • 队列为满:(rear+1) % size == front

基于环形队列的生产者消费者模型

在这里插入图片描述

基本实现思想:

  • 生产者最关心的是环形队列中的空的位置。
  • 消费者最关心的是环形队列中的数据。
  • 根据空满状态,决定生产者消费者谁先访问。
  • 生产者不能循环一圈超过消费者。
  • 消费者不能超过生产者。
  • 生产者和消费者并发执行。

伪代码:

在这里插入图片描述


实现代码:

Makefile文件:

ring_queue_test:ring_queue_test.ccg++ -o $@ $^ -std=c++11 -lpthread.PHONY:clean
clean:rm -rf ring_test

ring_queue.hpp文件:

#pragma once
#include 
#include 
#include // 基于环形队列的多生产者多消费者模型
namespace ns_ring_queue
{const int cap_default = 10;template class RingQueue{private:std::vector _ring_queue; // 环形队列int _cap;                   // 环形队列的容量sem_t _sem_blank;           // 空位置信号量sem_t _sem_data;            // 数据信号量int c_step;                 // 消费者下标int p_step;                 // 生产者下标pthread_mutex_t c_mtx;      // 保护消费者的临界资源pthread_mutex_t p_mtx;      // 保护生产者的临界资源public:RingQueue(int cap = cap_default) : _ring_queue(cap), _cap(cap){sem_init(&_sem_blank, 0, _cap);sem_init(&_sem_data, 0, 0);c_step = p_step = 0;pthread_mutex_init(&c_mtx, nullptr);pthread_mutex_init(&p_mtx, nullptr);}void Push(const T &in){sem_wait(&_sem_blank); //P(blank)pthread_mutex_lock(&p_mtx);_ring_queue[p_step] = in;p_step++; // 临界资源p_step %= _cap;pthread_mutex_unlock(&p_mtx);sem_post(&_sem_data); //V(data)}void Pop(T* out){sem_wait(&_sem_data); //P(data)pthread_mutex_lock(&c_mtx);*out = _ring_queue[c_step];c_step++; // 临界资源c_step %= _cap;pthread_mutex_unlock(&c_mtx);sem_post(&_sem_blank); //V(blank)}~RingQueue(){sem_destroy(&_sem_blank);sem_destroy(&_sem_data);pthread_mutex_destroy(&p_mtx);pthread_mutex_destroy(&c_mtx);}};
}

ring_queue_test.cc文件:

#include "ring_queue.hpp"
#include "Task.hpp"
#include 
#include 
#include 
#include using namespace ns_task;
using namespace ns_ring_queue;void* consumer(void* args)
{RingQueue* rq = (RingQueue*)args;while(true){Task t;rq->Pop(&t);t();// sleep(1);// int data;// rq->Pop(&data);// std::cout << "Thread:" << pthread_self() << "消费者消费数据:" << data << std::endl;// sleep(1);}
}void* producter(void* args)
{RingQueue* rq = (RingQueue*)args;while(true){int x = rand()%20 + 1;int y = rand()%10 + 1;char op = "+-*/%"[rand()%5];Task t(x, y, op);rq->Push(t);std::cout << "Producter Thread: " << pthread_self() << " Task: " << t.message() << std::endl; // int data = rand()%30 + 1;// rq->Push(data);// std::cout << "Thread:" << pthread_self() << "生产者生产数据:" << data << std::endl;}
}int main()
{srand((long long)time(nullptr));RingQueue* rq = new RingQueue();pthread_t c0,c1,c2,c3,p0,p1,p2;pthread_create(&c0, nullptr, consumer, (void*)rq);pthread_create(&c1, nullptr, consumer, (void*)rq);pthread_create(&c2, nullptr, consumer, (void*)rq);pthread_create(&c3, nullptr, consumer, (void*)rq);pthread_create(&p0, nullptr, producter, (void*)rq);pthread_create(&p1, nullptr, producter, (void*)rq);pthread_create(&p2, nullptr, producter, (void*)rq);pthread_join(c0, nullptr);pthread_join(c1, nullptr);pthread_join(c2, nullptr);pthread_join(c3, nullptr);pthread_join(p0, nullptr);pthread_join(p1, nullptr);pthread_join(p2, nullptr);return 0;
}

Task.hpp文件:

#pragma once
#include 
#include 
#include namespace ns_task
{class Task{private:int _x;int _y;char _op;public:Task() {}Task(int x, int y, char op):_x(x), _y(y), _op(op) {}std::string message(){std::string msg = std::to_string(_x);msg += _op;msg += std::to_string(_y);msg += "=?";return msg;}int Run(){int res = 0;switch(_op){case '+':res = _x + _y;break;case '-':res = _x - _y;break;case '*':res = _x * _y;break;case '/':res = _x / _y;break;case '%':res = _x % _y;break;default:break;}std::cout << "Consumer Thread: " << pthread_self() << " Task: " << _x << _op << _y << "=" << res << std::endl;return res;}int operator()(){return Run();}~Task(){}};
}

运行结果:

[cwx@VM-20-16-centos ring_queue]$ ./ring_queue_test 
Producter Thread: 139742694962944 Task: 12+4=?
Consumer Thread: 139742736926464 Task: 12+4=16
Producter Thread: 139742711748352 Task: 20%2=?
Producter Thread: 139742703355648 Task: 17/10=?
Consumer Thread: 139742745319168 Task: 20%2=0
Consumer Thread: 139742745319168 Task: 17/10=1

相关内容

热门资讯

课题研究计划 课题研究计划  研究计划步骤  RP写作的第一步,就是Topic的确定。也就是你的所要研究的详细内容...
社团会长工作计划共41篇 社团会长工作计划 第一篇西北工业大学明德学院社团联合会,在学院领导的.关怀下,在院学生工作部和院团委...
呼吸内科实习总结 呼吸内科实习总结(精选7篇)  紧张又充实的实习生活又告一段落了,想必你的视野也得到了开拓,这个时候...
运动造型工作计划(通用18篇... 运动造型工作计划 第一篇一、赛事背景:为切实落实_中央_提出的学校教育树立“健康第一”的指导思想和贯...
综合素质测评自我总结 综合素质测评自我总结(精选6篇)  自我总结是个人在一个阶段对自己的学习或工作生活的自我总结,写自我...
检验士年终总结   2017检验士年终总结怎么写呢?那么下面是小编为大家搜集提供到的关于2017检验士年终总结范文,...
幼儿园小班个人工作计划 幼儿园小班个人工作计划(通用15篇)  时间一晃而过,我们又将迎来新的喜悦、新的收获,是时候开始写工...
小学电教工作总结 小学电教工作总结(精选4篇)  时间是箭,去来迅疾,一段时间的工作已经告一段落,回顾过去的工作,倍感...
四年一班假期班级工作总结 四年一班假期班级工作总结 愉快的寒假生活在欢声笑语中结束了,我们又迎来了紧张忙碌的新学年。孩子们经...
团工作总结范文优选43篇 团工作总结范文 第一篇英语儿童歌曲,因其充满童趣,富有动感,词句简单,内容生动,形式活泼等特点,深受...
vbse实训财务会计总结 vbse实训财务会计总结  总结是事后对某一阶段的学习、工作或其完成情况加以回顾和分析的一种书面材料...
幼儿园教师工作总结 【精华】幼儿园教师工作总结模板汇编8篇  总结就是把一个时段的学习、工作或其完成情况进行一次全面系统...
数控技师工作总结(6篇) 数控技师工作总结 第一篇本人于是20xx年在校就读数控专业,经过三年的学习已打下结实的基础,于20x...
团员民主评议总结 团员民主评议总结2篇  总结是事后对某一阶段的学习或工作情况作加以回顾检查并分析评价的书面材料,它可...
高二数学的知识点总结 高二数学的知识点总结  数学是我们学习中非常重要的一门课程,数学与我们的生活密切相关, 所以我们一定...
挖掘机销售人员的个人工作总结 挖掘机销售人员的个人工作总结  时间乘着年轮循序往前,一段时间的工作已经结束了,回顾过去的工作,倍感...
依法行政工作总结 依法行政工作总结(通用5篇)  时间总在不经意间匆匆溜走,我们的工作又告一段落了,回顾这段时间的工作...
教育技能实训总结范文通用14... 教育技能实训总结范文 第一篇通过这次的实训使我们学到了很多,同时也让我们意识到我们要学的更多。从程序...
双创工作总结共40篇 双创工作总结 第一篇省里要来检查“双创”工作,我们单位的领导也开会通知我们要打扫好自我科室的卫生,唉...
换届选举工作总结范文 换届选举工作总结范文  工作总结的定义  工作总结(Job Summary/Work Summary...