[Linux]生产者消费者模型(基于BlockQueue的生产者消费者模型 | 基于环形队列的生产者消费者模型 | 信号量 )
创始人
2024-05-22 19:03:06
0

在这里插入图片描述

文章目录

  • 生产者消费者模型
    • 函数调用角度理解生产者消费者模型
    • 生活角度理解生产者消费者模型
    • 为什么要使用生产者消费者模型
    • 生产者消费者模型优点
    • 321原则
    • 基于BlockingQueue的生产者消费者模型
  • POSIX信号量
    • 回顾信号量概念
    • 信号量操作函数
    • 环形队列
    • 基于环形队列的生产者消费者模型

生产者消费者模型

函数调用角度理解生产者消费者模型

我们之前学习的函数调用:

在这里插入图片描述
在调用FunctionA函数内部调用FunctionB,只有在进入FunctionB函数内部后执行FunctionB的业务代码才可以返回到调用FunctionB函数之后的代码继续执行。FunctionA把数据交给FunctionB处理,这在单线程下是一个串行的过程。函数和函数之间的交互的本质上是数据通信。

如果将FunctionA交由线程A处理,FunctionB交由线程B处理,并内置一段缓冲区,Function
A往缓冲区写数据,FunctionB往缓冲区取走数据处理,这样用两个执行流和一段缓冲区就可以实现FunctionA和FunctionB并行执行。我们把这种场景就叫做生产者消费者模型。

在这里插入图片描述


生活角度理解生产者消费者模型

生活中的生产者消费者模型最典型的就是超市。站在超市的角度消费者就是普通老百姓,生产者就是各种供货商。

在这里插入图片描述

为什么要有超市?

  1. 提高效率。消费者和供应商无法直接交易,供应商的生产量动则成千上万,而消费者的需求并没有这么高,效率很低,这就需要超市来做一个中间件。超市的功能是收集需求,超市收集老百姓的需求,向各个供货商进大量的货物,足以养活供应商,也可以满足大部分老百姓的需求,超市的存在就大大减少了交易成本,这就大大提高了效率。
  2. 生产环节和消费环节进行解耦。当某一家矿泉水供应商倒闭了,对于普通老百姓短期来说是没有影响的,因为还可以购买超市的存货,这就实现了生产环节和消费环节的解耦,生产者和消费者互不影响。

为什么要使用生产者消费者模型

  • 生产者消费者模式就是通过一个容器来解决生产者和消费者的强耦合问题。生产者和消费者彼此之间不直接通讯,而通过阻塞队列来进行通讯,所以生产者生产完数据之后不用等待消费者处理,直接扔给阻塞队列,消费者不找生产者要数据,而是直接从阻塞队列里取,阻塞队列就相当于一个缓冲区,平衡了生产者和消费者的处理能力。这个阻塞队列就是用来给生产者和消费者解耦的。

生产者消费者模型优点

  • 解耦
  • 支持并发
  • 支持忙闲不均

321原则

3种关系:

  • 生产者 vs 生产者:竞争关系、互斥关系
  • 消费者 vs 消费者:竞争关系、互斥关系
  • 生产者 vs 消费者:互斥关系、同步关系

2种角色:

  • 生产者(执行流)
  • 消费者(执行流)

1个消费场所:

  • 缓冲区(内存空间、STL容器等)

在这里插入图片描述


基于BlockingQueue的生产者消费者模型

  • 在多线程编程中阻塞队列(Blocking Queue)是一种常用于实现生产者和消费者模型的数据结构。其与普通的队列区别在于,当队列为空时,从队列获取元素的操作将会被阻塞,直到队列中被放入了元素;当队列满时,往队列里存放元素的操作也会被阻塞,直到有元素被从队列中取出(以上的操作都是基于不同的线程来说的,线程在对阻塞队列进程操作时会被阻塞)。

在这里插入图片描述
Makefile文件:

CpTest:CpTest.ccg++ -o $@ $^ -std=c++11 -lpthread
.PHONY:clean
clean:rm -f CpTest

BlockQueue.hpp文件:

#pragma once
#include 
#include 
#include namespace ns_blockqueue
{const int default_capacity = 5;template class BlockQueue{private:std::queue _bq;       // 阻塞队列int _capacity;           // 队列的容量上限pthread_mutex_t _mtx;    // 保护临界资源的互斥锁pthread_cond_t is_full;  // 队列满,消费者条件变量等待pthread_cond_t is_empty; // 队列空,生产者条件变量等待bool IsFull(){return _bq.size() == _capacity;}bool IsEmpty(){return _bq.size() == 0;}void LockQueue(){pthread_mutex_lock(&_mtx);}void UnlockQueue(){pthread_mutex_unlock(&_mtx);}void ProducterWait(){pthread_cond_wait(&is_empty, &_mtx);}void ConsumerWait(){pthread_cond_wait(&is_full, &_mtx);}void WakeupConsumer(){pthread_cond_signal(&is_full);}void WakeupProducter(){pthread_cond_signal(&is_empty);}public:BlockQueue(int capacity = default_capacity) : _capacity(capacity){pthread_mutex_init(&_mtx, nullptr);pthread_cond_init(&is_full, nullptr);pthread_cond_init(&is_empty, nullptr);}void Push(const T &in){LockQueue();// if (IsFull())while (IsFull()){ProducterWait();}_bq.push(in);if (_bq.size() > _capacity / 2)WakeupConsumer();UnlockQueue();}void Pop(T *out){LockQueue();while (IsEmpty()){ConsumerWait();}*out = _bq.front();_bq.pop();if (_bq.size() < _capacity / 2)WakeupProducter();UnlockQueue();}~BlockQueue(){pthread_mutex_destroy(&_mtx);pthread_cond_destroy(&is_full);pthread_cond_destroy(&is_empty);}};
}

Task.hpp:

#pragma once
#include 
#include 
#include namespace ns_task
{class Task{private:int _x;int _y;char _op;public:Task() {}Task(int x, int y, char op):_x(x), _y(y), _op(op) {}int Run(){int res = 0;switch(_op){case '+':res = _x + _y;break;case '-':res = _x - _y;break;case '*':res = _x * _y;break;case '/':res = _x / _y;break;case '%':res = _x % _y;break;default:break;}std::cout << "当前任务正在被: " << pthread_self() << " 处理: " << _x << _op << _y << "=" << res << std::endl;return res;}int operator()(){return Run();}~Task(){}};
}

CpTest.cc文件:

#include "BlockQueue.hpp"
#include "Task.hpp"
#include 
#include 
#include using namespace ns_blockqueue;
using namespace ns_task;
void *consumer(void *args)
{BlockQueue *bq = (BlockQueue *)args;while(true){Task t;bq->Pop(&t);t();// int data = 0;// bq->Pop(&data);// std::cout << "消费者消费数据:" << data << std::endl;}}void *producter(void *args)
{BlockQueue *bq = (BlockQueue *)args;std::string ops = "+-*/%";while(true){int x = rand()%10 + 1;int y = rand()%20 + 1;char op = ops[rand()%5];Task t(x, y, op);std::cout << "线程: " << pthread_self() <<  " 分发任务 " << x << op << y << "=?" << std::endl;bq->Push(t);sleep(1);// sleep(1);// int data = rand()%20 + 1;// bq->Push(data);   // std::cout << "生产者成产数据:" << data << std::endl;}
}int main()
{srand((long long)time(nullptr));BlockQueue *bq = new BlockQueue();pthread_t c, p;pthread_t c1, c2, c3;pthread_t p1, p2, p3;pthread_create(&c, nullptr, consumer, (void*)bq);// pthread_create(&c1, nullptr, consumer, (void*)bq);// pthread_create(&c2, nullptr, consumer, (void*)bq);// pthread_create(&c3, nullptr, consumer, (void*)bq);pthread_create(&p, nullptr, producter, (void*)bq);// pthread_create(&p1, nullptr, producter, (void*)bq);// pthread_create(&p2, nullptr, producter, (void*)bq);// pthread_create(&p3, nullptr, producter, (void*)bq);pthread_join(c, nullptr);pthread_join(p, nullptr);return 0;
}

在这里插入图片描述
Push函数中生产者判断是否为满时,用if进行条件判断是不太完善的,因为当满足为满条件后,生产者线程要进行挂起等待,但是如果挂起等待失败,如果生产者线程被伪唤醒,都会造成生产条件不具备,导致程序出错。所以用while循环判断是比较合理的,多次判断能避免以上的情况。

运行结果:

[cwx@VM-20-16-centos consumer_producter]$ make
g++ -o CpTest CpTest.cc -std=c++11 -lpthread
[cwx@VM-20-16-centos consumer_producter]$ ./CpTest 
线程: 140317604919040 分发任务 7*11=?
当前任务正在被: 140317613311744 处理: 7*11=77
线程: 140317604919040 分发任务 2-20=?
线程: 140317604919040 分发任务 10+7=?
线程: 140317604919040 分发任务 5%2=?
当前任务正在被: 140317613311744 处理: 2-20=-18
当前任务正在被: 140317613311744 处理: 10+7=17
当前任务正在被: 140317613311744 处理: 5%2=1

POSIX信号量

POSIX信号量和SystemV信号量作用相同,都是用于同步操作,达到无冲突的访问共享资源目的。 但POSIX可以用于线程间同步。

回顾信号量概念

  • 信号量的本质是一个计数器,用来描述临界资源中资源的数目(最多可以有多少资源分配给线程)。
  • 使用信号量就使得每个线程要访问临界资源就必须申请信号量,申请信号量的本质是预定资源,临界资源被拆分成一份一份的小资源,可以让多个线程同时访问临界资源,从而实现线程并发。

信号量操作函数

初始化信号量:

#include 原型:int sem_init(sem_t *sem, int pshared, unsigned int value);
参数:pshared:0表示线程间共享,非零表示进程间共享value:信号量初始值

销毁信号量:

#include 原型:int sem_destroy(sem_t *sem);

等待信号量:

#include 功能:	等待信号量,会将信号量的值减1
原型:int sem_wait(sem_t *sem); //P()

发布信号量:

#include 功能:发布信号量,表示资源使用完毕,可以归还资源了。将信号量值加1。
原型:int sem_post(sem_t *sem); //V()

在这里插入图片描述


环形队列

在这里插入图片描述

  • 环形队列实际上是数组的线性空间来实现的。

  • 当队列到了尾部,运用取模操作使之指向头部。

环形队列的判空判满条件:

  • front:指向环形队列的第一个元素,初始值为0
  • rear:指向环形队列的最后一个元素的后一个位置,初始值为0
  • 队列为空:front == rear
  • 队列为满:(rear+1) % size == front

基于环形队列的生产者消费者模型

在这里插入图片描述

基本实现思想:

  • 生产者最关心的是环形队列中的空的位置。
  • 消费者最关心的是环形队列中的数据。
  • 根据空满状态,决定生产者消费者谁先访问。
  • 生产者不能循环一圈超过消费者。
  • 消费者不能超过生产者。
  • 生产者和消费者并发执行。

伪代码:

在这里插入图片描述


实现代码:

Makefile文件:

ring_queue_test:ring_queue_test.ccg++ -o $@ $^ -std=c++11 -lpthread.PHONY:clean
clean:rm -rf ring_test

ring_queue.hpp文件:

#pragma once
#include 
#include 
#include // 基于环形队列的多生产者多消费者模型
namespace ns_ring_queue
{const int cap_default = 10;template class RingQueue{private:std::vector _ring_queue; // 环形队列int _cap;                   // 环形队列的容量sem_t _sem_blank;           // 空位置信号量sem_t _sem_data;            // 数据信号量int c_step;                 // 消费者下标int p_step;                 // 生产者下标pthread_mutex_t c_mtx;      // 保护消费者的临界资源pthread_mutex_t p_mtx;      // 保护生产者的临界资源public:RingQueue(int cap = cap_default) : _ring_queue(cap), _cap(cap){sem_init(&_sem_blank, 0, _cap);sem_init(&_sem_data, 0, 0);c_step = p_step = 0;pthread_mutex_init(&c_mtx, nullptr);pthread_mutex_init(&p_mtx, nullptr);}void Push(const T &in){sem_wait(&_sem_blank); //P(blank)pthread_mutex_lock(&p_mtx);_ring_queue[p_step] = in;p_step++; // 临界资源p_step %= _cap;pthread_mutex_unlock(&p_mtx);sem_post(&_sem_data); //V(data)}void Pop(T* out){sem_wait(&_sem_data); //P(data)pthread_mutex_lock(&c_mtx);*out = _ring_queue[c_step];c_step++; // 临界资源c_step %= _cap;pthread_mutex_unlock(&c_mtx);sem_post(&_sem_blank); //V(blank)}~RingQueue(){sem_destroy(&_sem_blank);sem_destroy(&_sem_data);pthread_mutex_destroy(&p_mtx);pthread_mutex_destroy(&c_mtx);}};
}

ring_queue_test.cc文件:

#include "ring_queue.hpp"
#include "Task.hpp"
#include 
#include 
#include 
#include using namespace ns_task;
using namespace ns_ring_queue;void* consumer(void* args)
{RingQueue* rq = (RingQueue*)args;while(true){Task t;rq->Pop(&t);t();// sleep(1);// int data;// rq->Pop(&data);// std::cout << "Thread:" << pthread_self() << "消费者消费数据:" << data << std::endl;// sleep(1);}
}void* producter(void* args)
{RingQueue* rq = (RingQueue*)args;while(true){int x = rand()%20 + 1;int y = rand()%10 + 1;char op = "+-*/%"[rand()%5];Task t(x, y, op);rq->Push(t);std::cout << "Producter Thread: " << pthread_self() << " Task: " << t.message() << std::endl; // int data = rand()%30 + 1;// rq->Push(data);// std::cout << "Thread:" << pthread_self() << "生产者生产数据:" << data << std::endl;}
}int main()
{srand((long long)time(nullptr));RingQueue* rq = new RingQueue();pthread_t c0,c1,c2,c3,p0,p1,p2;pthread_create(&c0, nullptr, consumer, (void*)rq);pthread_create(&c1, nullptr, consumer, (void*)rq);pthread_create(&c2, nullptr, consumer, (void*)rq);pthread_create(&c3, nullptr, consumer, (void*)rq);pthread_create(&p0, nullptr, producter, (void*)rq);pthread_create(&p1, nullptr, producter, (void*)rq);pthread_create(&p2, nullptr, producter, (void*)rq);pthread_join(c0, nullptr);pthread_join(c1, nullptr);pthread_join(c2, nullptr);pthread_join(c3, nullptr);pthread_join(p0, nullptr);pthread_join(p1, nullptr);pthread_join(p2, nullptr);return 0;
}

Task.hpp文件:

#pragma once
#include 
#include 
#include namespace ns_task
{class Task{private:int _x;int _y;char _op;public:Task() {}Task(int x, int y, char op):_x(x), _y(y), _op(op) {}std::string message(){std::string msg = std::to_string(_x);msg += _op;msg += std::to_string(_y);msg += "=?";return msg;}int Run(){int res = 0;switch(_op){case '+':res = _x + _y;break;case '-':res = _x - _y;break;case '*':res = _x * _y;break;case '/':res = _x / _y;break;case '%':res = _x % _y;break;default:break;}std::cout << "Consumer Thread: " << pthread_self() << " Task: " << _x << _op << _y << "=" << res << std::endl;return res;}int operator()(){return Run();}~Task(){}};
}

运行结果:

[cwx@VM-20-16-centos ring_queue]$ ./ring_queue_test 
Producter Thread: 139742694962944 Task: 12+4=?
Consumer Thread: 139742736926464 Task: 12+4=16
Producter Thread: 139742711748352 Task: 20%2=?
Producter Thread: 139742703355648 Task: 17/10=?
Consumer Thread: 139742745319168 Task: 20%2=0
Consumer Thread: 139742745319168 Task: 17/10=1

相关内容

热门资讯

我们的节日端午主持稿 我们的节日端午主持稿(通用12篇)  在当下社会,很多地方都会使用到主持稿,主持稿具有语言过度自然、...
婚礼主持词开场白   第一篇:  尊敬的各位来宾,  很高兴接受一对新人的委托,主持今天这场爱情的盛典,我是主持人张 ...
幸福额度简介及台词 幸福额度简介及台词  《幸福额度》简介  《幸福额度》由陈正道执导,林志玲、陈坤、廖凡主演的爱情电影...
舞蹈大赛主持词 舞蹈大赛主持词集合七篇  利用在中国拥有几千年文化的诗词能够有效提高主持词的感染力。在人们积极参与各...
团代会闭幕词 团代会闭幕词精选3篇  今天,CN人才网小编给大家分享的是团代会闭幕词精选3篇,希望对大家有帮助。 ...
职工代表大会主持词 职工代表大会主持词  ×××××第二十一届四次职工代表大会  开幕式主持词  各位代表:  根据大会...
婚礼主持词(精选10篇) 婚礼主持词(精选10篇)  主持词的格式及写法  1.标题  一般情况下,只需写明“主持词”即可,也...
农村婚庆司仪主持词 农村婚庆司仪主持词  主持词可以采用和历史文化有关的表述方法去写作以提升活动的文化内涵。在现今人们越...
泰坦尼克号英文台词 泰坦尼克号英文台词  导语:泰坦尼克号以1912年泰坦尼克号邮轮在其处女启航时触礁冰山而沉没的事件为...
宋小宝的小品烤串台词 宋小宝的小品烤串台词  宋小宝是我国著名的喜剧演员,下面一起来欣赏宋小宝的小品烤串台词!欢迎阅读! ...
年度总结大会主持稿 年度总结大会主持稿(通用9篇)  总结是事后对某一时期、某一项目或某些工作进行回顾和分析,从而做出带...
会晚宴主持词 主题:突破·创新主持:A: B: C: D:放音乐 暖场1 暖场2晚会开始前五分钟 此音乐完毕后主持...
新世纪福音战士渚薰的经典台词 新世纪福音战士渚薰的经典台词  新世纪福音战士渚薰是具有生命之果的使徒,和凌波丽一样,是一个人造人。...
运动会开幕式流程主持词 运动会开幕式流程主持词范文(精选7篇)  根据活动对象的不同,需要设置不同的主持词。在如今这个中国,...
妙手仁心1经典台词 妙手仁心1经典台词大全  1.如果两个人有共同的秘密,表示两人的关系已是非比寻常的了。  2.喜欢一...
新婚回门宴会主持词 新婚回门宴会主持词尊敬的各位来宾,女士们,先生们:  大家中午好!  受今天新婚回门庆典主人的委托,...
销售晨会主持词开场白 销售晨会主持词开场白   篇一:销售晨会主持词  销售晨会主持词  首先在这个阳光明媚,生机昂然的早...
婚礼喝交杯酒主持词 婚礼喝交杯酒主持词范文(精选6篇)  主持词的写作要突出活动的主旨并贯穿始终。随着中国在不断地进步,...
婚宴的致辞 婚宴的致辞合集15篇  在平日的学习、工作和生活里,大家都尝试过写致辞吧,致辞具有有张有弛、错落有致...
运动会方阵解说词 运动会方阵解说词运动会方阵解说词运动会方阵解说词范文一:1、国旗解说词。女:四位旗手手执鲜艳的五星红...