2、单生产者多消费者模式:
与单生产者和单消费者模型不同的是,单生产者-多消费者模型中可以允许多个消费者同时从产品库中取走产品。所以除了保护产品库在多个读写线程下互斥之外,还需要维护消费者取走产品的计数器,代码如下: 在多个消费者消费的情况下,我们要确保消费的个数不要超过仓库生产的个数,多个消费要确保消费计数器的互斥
#include <iostream>
#include <thread>
#include <mutex>
#include <condition_variable>
#include <deque>
using namespace std;
static const int kItemsToProduce =
20;
std::mutex stdoutMutex;
struct ItemRepository
{
deque<int> itemQueue;
int MaxSize =
10;
int itemCounter=
0;
std::mutex mtx;
std::mutex itemCounterMtx;
std::condition_variable repository_notFull;
std::condition_variable repository_notEmpty;
}gItemRepository;
typedef struct ItemRepository ItemRepository;
void ProduceItem(ItemRepository &itemRepo,
int item)
{
std::unique_lock<
std::mutex> lock(itemRepo.mtx);
itemRepo.repository_notFull.wait(lock, [&itemRepo] {
bool full = itemRepo.itemQueue.size() >= itemRepo.MaxSize;
if (full)
{
std::lock_guard<
std::mutex> lock(stdoutMutex);
cout <<
"仓库满了,生产者等待中..." <<
"thread id = " <<
std::this_thread::get_id() << endl;
}
return !full;
});
itemRepo.itemQueue.push_back(item);
itemRepo.repository_notEmpty.notify_all();
lock.unlock();
}
int ConsumeItem(ItemRepository &itemRepo)
{
int data;
std::unique_lock<
std::mutex> lock(itemRepo.mtx);
itemRepo.repository_notEmpty.wait(lock, [&itemRepo] {
bool empty = itemRepo.itemQueue.empty();
if (empty)
{
std::lock_guard<
std::mutex> lock(stdoutMutex);
cout <<
"仓库空了,消费者等待中..." <<
"thread id = " <<
std::this_thread::get_id() << endl;
}
return !empty;
});
data = itemRepo.itemQueue.front();
itemRepo.itemQueue.pop_front();
itemRepo.repository_notFull.notify_all();
lock.unlock();
return data;
}
void ProducerTask()
{
for (
int i =
1;i <= kItemsToProduce;++i)
{
ProduceItem(gItemRepository, i);
{
std::lock_guard<
std::mutex> lock(stdoutMutex);
cout <<
"Produce the " << i <<
" ^th item..." << endl;
}
}
{
std::lock_guard<
std::mutex> lock(stdoutMutex);
cout <<
"Producer Thread exit.... " << endl;
}
}
void ConsumerTask(
int th_ID)
{
bool readyToExit =
false;
while (
true)
{
this_thread::sleep_for(
std::chrono::seconds(
1));
std::unique_lock<
std::mutex> lock(gItemRepository.itemCounterMtx);
if (gItemRepository.itemCounter < kItemsToProduce)
{
int item = ConsumeItem(gItemRepository);
gItemRepository.itemCounter++;
{
std::lock_guard<
std::mutex> lock(stdoutMutex);
cout <<
"Consume Thread " <<th_ID<<
" the " <<item <<
"^th item..." << endl;
}
}
else
{
readyToExit =
true;
}
lock.unlock();
if (readyToExit)
break;
}
{
std::lock_guard<
std::mutex> lock(stdoutMutex);
cout <<
"Consumer Thread "<<th_ID<<
" exit...." << endl;
}
}
int main()
{
std::thread producer(ProducerTask);
std::thread consumer1(ConsumerTask,
1);
std::thread consumer2(ConsumerTask,
2);
std::thread consumer3(ConsumerTask,
3);
std::thread consumer4(ConsumerTask,
4);
producer.join();
consumer1.join();
consumer2.join();
consumer3.join();
consumer4.join();
system(
"pause");
return 0;
}
Produce
the 1 ^th
item...
Produce
the 2 ^th
item...
Produce
the 3 ^th
item...
Produce
the 4 ^th
item...
Produce
the 5 ^th
item...
Produce
the 6 ^th
item...
Produce
the 7 ^th
item...
Produce
the 8 ^th
item...
Produce
the 9 ^th
item...
Produce
the 10 ^th
item...
仓库满了,生产者等待中...thread
id =
140649595565824
Consume Thread
1 the 1^th
item...
Produce
the 11 ^th
item...
仓库满了,生产者等待中...thread
id =
140649595565824
Consume Thread
2 the 2^th
item...
Consume Thread
4 the 3^th
item...
Consume Thread
3 the 4^th
item...
Produce
the 12 ^th
item...
Produce
the 13 ^th
item...
Produce
the 14 ^th
item...
仓库满了,生产者等待中...thread
id =
140649595565824
Consume Thread
1 the 5^th
item...
Produce
the 15 ^th
item...
仓库满了,生产者等待中...thread
id =
140649595565824
Consume Thread
2 the 6^th
item...
Consume Thread
4 the 7^th
item...
Consume Thread
3 the 8^th
item...
Produce
the 16 ^th
item...
Produce
the 17 ^th
item...
Produce
the 18 ^th
item...
仓库满了,生产者等待中...thread
id =
140649595565824
Consume Thread
1 the 9^th
item...
Produce
the 19 ^th
item...
仓库满了,生产者等待中...thread
id =
140649595565824
Consume Thread
2 the 10^th
item...
Consume Thread
4 the 11^th
item...
Consume Thread
3 the 12^th
item...
Produce
the 20 ^th
item...
Producer Thread
exit....
Consume Thread
1 the 13^th
item...
Consume Thread
2 the 14^th
item...
Consume Thread
4 the 15^th
item...
Consume Thread
3 the 16^th
item...
Consume Thread
1 the 17^th
item...
Consume Thread
2 the 18^th
item...
Consume Thread
4 the 19^th
item...
Consume Thread
3 the 20^th
item...
Consumer Thread
1 exit....
Consumer Thread
2 exit....
Consumer Thread
4 exit....
Consumer Thread
3 exit....
real 0m6
.067s
user
0m0
.004s
sys
0m0
.052s
g++ main2
.cpp -
std=c++
11 -lpthread
time ./a
.out > my
.log