Qt 实现的一个生产者消费者模式类

xiaoxiao2021-02-28  88

最近做公司项目遇到一个问题,加载20万的日志文件,解析后写入数据库中。做完后发现读文件、解析、写入数据依次搞下来,速度实在是太慢。 所以学习用多线程的来完成这个工作,考虑用生产者消费者模式来完成。 单生产者单消费者模式 头文件 class Repository { public: Repository(); /*! * brief 存入数据 * param str */ void AddData(QString str); /*! * brief 取数据 * return */ QString GetData(); private: std::condition_variable m_Queue_Not_Empty; //队列不满信号 std::condition_variable m_Queue_Not_Full; //队列不空信号 std::mutex m_Queue_Mutex; //队列锁 int m_nQueue_Max_Size; //队列最大长度 QQueue<QString> m_queue; //队列,也可以用其他容器、或者数组 }; cpp文件 #include "produce.h" Repository::Repository() { m_nQueue_Max_Size = 10; //默认最大长度为10,可根据需求修改 } void Repository::AddData(QString str) { std::unique_lock<std::mutex> lock(m_Queue_Mutex); //判断是否队列满 while (m_queue.count() > m_nQueue_Max_Size) { //等待信号触发,阻塞在此处。此时会释放m_Queue_Mutex锁, //其他线程可以获取m_Queue_Mutex m_Queue_Not_Full.wait(lock); } m_queue.enqueue(str); m_Queue_Not_Empty.notify_all(); lock.unlock(); //释放锁,也可以不调用,最后函数返回时也会释放 } QString Repository::GetData() { std::unique_lock<std::mutex> lock(m_Queue_Mutex); //判断是否队列满 while (m_queue.isEmpty()) { //等待信号触发,阻塞在此处。此时会释放m_Queue_Mutex锁, //其他线程可以获取m_Queue_Mutex m_Queue_Not_Empty.wait(lock); } QString str = m_queue.dequeue(); //获取数据 m_Queue_Not_Full.notify_all(); lock.unlock(); //释放锁,也可以不调用,最后函数返回时也会释放 return str; } mian 文件 #include "produce.h" int g_nProduceCount = 100; void produceTask(Repository* qR) { //模拟写入100行信息 for (int i=0; i<g_nProduceCount; i++) { //模拟读取文件,耗时100毫秒 std::this_thread::sleep_for(std::chrono::milliseconds(100)); QString str = QString("Produce %1").arg(i); qR->AddData(str); } } void conusemeTask(Repository* qR) { //模拟读取100行信息 for (int i=0; i<g_nProduceCount; i++) { QString str = qR->GetData(); //模拟处理文件,耗时100毫秒 std::this_thread::sleep_for(std::chrono::milliseconds(100)); } } int main(int argc, char *argv[]) { Repository re; std::thread Produce(produceTask, &re); std::thread Consume(conusemeTask, &re); Produce.join(); Consume.join(); } 写数据和读数据耗时都为100毫秒,结果如下图,基本是存入一个数据,取一个数据 设置conusemeTask()函数中处理耗时为200毫秒,即生产速度大于处理速度,如下图 疑问和遗留问题: 1. 如何停止conusemeTask线程,例子中是我知道有100行数据,读到100行我就停止了线程。实际中是不知道要处理多少行数据才停止的。      我想到一种处理方法是,在读取文件完成后,写入一个空QString,或者自己定义的一个字符串,在GetData这条信息后,停止线程。(只适合单生产者单消费者模式) void produceTask(Repository* qR) {     //模拟写入100行信息     for (int i=0; i<g_nProduceCount; i++)     {         //模拟读取文件,耗时100毫秒         std::this_thread::sleep_for(std::chrono::milliseconds(100));         QString str = QString("Produce %1").arg(i);         qR->AddData(str);     }     qR->AddData("File Read End"); } void conusemeTask(Repository* qR) {     while (1)     {         QString str = qR->GetData();         if (str == "File Read End")             break;         //模拟处理文件,耗时100毫秒         std::this_thread::sleep_for(std::chrono::milliseconds(200));     } }  第二种处理办法:在GetData(超时时间, QString& outData) 传递超时时间,返回值改给返回bool,,修改m_Queue_Not_Empty.wait()为如下 if (m_Queue_Not_Empty.wait_for(lock, std::chrono::milliseconds(5000)) == std::cv_status::timeout)         return false; conusemeTask()中如连续 qR->GetData(超时时间,outData);返回false即可停止线程 2. 消费者线程处理速度慢,肯定要多个消费者去处理数据。如何修改上面的代码满足单生产者多消费者模式。
转载请注明原文地址: https://www.6miu.com/read-53799.html

最新回复(0)