C++11 <2>单生产者多消费者模式

xiaoxiao2021-02-28  7

2、单生产者多消费者模式:

与单生产者和单消费者模型不同的是,单生产者-多消费者模型中可以允许多个消费者同时从产品库中取走产品。所以除了保护产品库在多个读写线程下互斥之外,还需要维护消费者取走产品的计数器,代码如下: 在多个消费者消费的情况下,我们要确保消费的个数不要超过仓库生产的个数,多个消费要确保消费计数器的互斥 #include <iostream> #include <thread> #include <mutex> #include <condition_variable> #include <deque> using namespace std; static const int kItemsToProduce = 20;//定义生产者能够生产的最大产品个数 std::mutex stdoutMutex;//多线程标准输出 同步锁 struct ItemRepository { deque<int> itemQueue; // 这里用队列代表仓库缓冲区 int MaxSize = 10; // 仓库所容纳的产品最大个数 int itemCounter=0; std::mutex mtx; // 互斥量,保护产品缓冲区 std::mutex itemCounterMtx; std::condition_variable repository_notFull; // 条件变量, 指产品仓库缓冲区不为满 std::condition_variable repository_notEmpty; // 条件变量, 指产品仓库缓冲区不为空 }gItemRepository; // 产品库全局变量,生产者和消费者操作该变量. typedef struct ItemRepository ItemRepository; // 生产 产品 void ProduceItem(ItemRepository &itemRepo, int item) { std::unique_lock<std::mutex> lock(itemRepo.mtx); itemRepo.repository_notFull.wait(lock, [&itemRepo] { bool full = itemRepo.itemQueue.size() >= itemRepo.MaxSize; if (full) { std::lock_guard<std::mutex> lock(stdoutMutex); cout << "仓库满了,生产者等待中..." << "thread id = " << std::this_thread::get_id() << endl; } return !full; }); itemRepo.itemQueue.push_back(item); // 仓库放入产品 itemRepo.repository_notEmpty.notify_all(); // 通知消费者仓库不为空 lock.unlock(); // 释放锁 } // 消费 产品 int ConsumeItem(ItemRepository &itemRepo) { int data; std::unique_lock<std::mutex> lock(itemRepo.mtx); // 等待信号不为空的通知,wait 第二参数为true时 向下执行,否则一直等待 itemRepo.repository_notEmpty.wait(lock, [&itemRepo] { bool empty = itemRepo.itemQueue.empty(); if (empty) { std::lock_guard<std::mutex> lock(stdoutMutex); cout << "仓库空了,消费者等待中..." << "thread id = " << std::this_thread::get_id() << endl; } return !empty; }); data = itemRepo.itemQueue.front(); itemRepo.itemQueue.pop_front(); itemRepo.repository_notFull.notify_all(); lock.unlock(); return data; } // 生产者任务 void ProducerTask() { for (int i = 1;i <= kItemsToProduce;++i) { ProduceItem(gItemRepository, i); // 生产产品 { std::lock_guard<std::mutex> lock(stdoutMutex); cout << "Produce the " << i << " ^th item..." << endl; } } { std::lock_guard<std::mutex> lock(stdoutMutex); cout << "Producer Thread exit.... " << endl; } } // 消费者任务 void ConsumerTask(int th_ID) { bool readyToExit = false; while (true) { this_thread::sleep_for(std::chrono::seconds(1)); std::unique_lock<std::mutex> lock(gItemRepository.itemCounterMtx); // 仓库产品消费计数器保持多线程互斥 if (gItemRepository.itemCounter < kItemsToProduce) { int item = ConsumeItem(gItemRepository); // 消费产品 gItemRepository.itemCounter++; // 每消费一次进行计数器+1 { std::lock_guard<std::mutex> lock(stdoutMutex); cout << "Consume Thread " <<th_ID<<" the " <<item << "^th item..." << endl; } } else { readyToExit = true; } lock.unlock(); if (readyToExit) break; } { std::lock_guard<std::mutex> lock(stdoutMutex); cout << "Consumer Thread "<<th_ID<<" exit...." << endl; } } int main() { std::thread producer(ProducerTask); std::thread consumer1(ConsumerTask,1); std::thread consumer2(ConsumerTask,2); std::thread consumer3(ConsumerTask,3); std::thread consumer4(ConsumerTask,4); producer.join(); consumer1.join(); consumer2.join(); consumer3.join(); consumer4.join(); system("pause"); return 0; } Produce the 1 ^th item... Produce the 2 ^th item... Produce the 3 ^th item... Produce the 4 ^th item... Produce the 5 ^th item... Produce the 6 ^th item... Produce the 7 ^th item... Produce the 8 ^th item... Produce the 9 ^th item... Produce the 10 ^th item... 仓库满了,生产者等待中...thread id = 140649595565824 Consume Thread 1 the 1^th item... Produce the 11 ^th item... 仓库满了,生产者等待中...thread id = 140649595565824 Consume Thread 2 the 2^th item... Consume Thread 4 the 3^th item... Consume Thread 3 the 4^th item... Produce the 12 ^th item... Produce the 13 ^th item... Produce the 14 ^th item... 仓库满了,生产者等待中...thread id = 140649595565824 Consume Thread 1 the 5^th item... Produce the 15 ^th item... 仓库满了,生产者等待中...thread id = 140649595565824 Consume Thread 2 the 6^th item... Consume Thread 4 the 7^th item... Consume Thread 3 the 8^th item... Produce the 16 ^th item... Produce the 17 ^th item... Produce the 18 ^th item... 仓库满了,生产者等待中...thread id = 140649595565824 Consume Thread 1 the 9^th item... Produce the 19 ^th item... 仓库满了,生产者等待中...thread id = 140649595565824 Consume Thread 2 the 10^th item... Consume Thread 4 the 11^th item... Consume Thread 3 the 12^th item... Produce the 20 ^th item... Producer Thread exit.... Consume Thread 1 the 13^th item... Consume Thread 2 the 14^th item... Consume Thread 4 the 15^th item... Consume Thread 3 the 16^th item... Consume Thread 1 the 17^th item... Consume Thread 2 the 18^th item... Consume Thread 4 the 19^th item... Consume Thread 3 the 20^th item... Consumer Thread 1 exit.... Consumer Thread 2 exit.... Consumer Thread 4 exit.... Consumer Thread 3 exit.... real 0m6.067s user 0m0.004s sys 0m0.052s g++ main2.cpp -std=c++11 -lpthread time ./a.out > my.log
转载请注明原文地址: https://www.6miu.com/read-1950354.html

最新回复(0)