【——】线程池实现

xiaoxiao2021-02-28  96

配置VS2013 使用pthread,参考: http://blog.csdn.net/k459905889/article/details/48676159

pthread函数使用,参考: http://blog.csdn.net/dreamintheworld/article/details/52577681

线程池,参考: http://www.cnblogs.com/li-daphne/p/5583224.html

基本思路:管理一个任务队列,一个线程队列,然后每次取一个任务分配给一个线程去做,循环往复

以下贴出9个文件的完整源码


Main.cpp:

#include <pthread.h> #include <stdio.h> #include <stdlib.h> #include <windows.h> #include <semaphore.h> #include <iostream> #include "MyThreadPool.h" #include "ThreadManage.h" #pragma comment(lib,"pthreadVC2.lib") void main() { CThreadManage* manage = new CThreadManage(10); for (int i = 0; i < 80; i++) { CYJob* job = new CYJob(); manage->Run(job, NULL); } for (int i = 0; i < 80; i++) { CXJob* job = new CXJob(); manage->Run(job, NULL); } for (int i = 0; i < 80; i++) { CYJob* job = new CYJob(); manage->Run(job, NULL); } Sleep(500000); manage->TerminateAll(); }

主函数示例了如何使用线程池。 CThreadManage(线程池管理类)是线程池对外接口。使用CThreadManage::Run()不断向线程池添加任务。


Job.h:

#ifndef THREADPOOL_Job #define THREADPOOL_Job #include <windows.h> #include "CWorkerThread.h" class MyThreadPool; class CWorkerThread; class CJob { private: int m_JobNo; //The num was assigned to the job char* m_JobName; //The job name CWorkerThread *m_pWorkThread; //The thread associated with the job public: CJob(void); virtual ~CJob(); int GetJobNo(void) const { return m_JobNo; } void SetJobNo(int jobno){ m_JobNo = jobno; } char* GetJobName(void) const { return m_JobName; } void SetJobName(char* jobname); CWorkerThread *GetWorkThread(void){ return m_pWorkThread; } void SetWorkThread(CWorkerThread *pWorkThread){ m_pWorkThread = pWorkThread; } virtual void Run(void *ptr) = 0; }; class CXJob :public CJob { public: CXJob(){ } ~CXJob(){} void Run(void* jobdata) { printf("The Job comes from CXJOB\n"); Sleep(5000); } }; class CYJob :public CJob { public: CYJob(){ } ~CYJob(){} void Run(void* jobdata) { printf("The Job comes from CYJob\n"); Sleep(1000); } }; #endif

Job.cpp:

#include "Job.h" CJob::CJob(void) :m_pWorkThread(NULL) , m_JobNo(0) , m_JobName(NULL) { } CJob::~CJob(){ if (NULL != m_JobName) free(m_JobName); } void CJob::SetJobName(char* jobname) { if (NULL != m_JobName) { free(m_JobName); m_JobName = NULL; } if (NULL != jobname) { m_JobName = (char*)malloc(strlen(jobname) + 1); strcpy(m_JobName, jobname); } }

Job类为任务类的虚基类,有虚函数:Run() CXJob、CYJob为具体任务类。 若要定义自己的任务,可继承Job类后重写Run()函数。


CWorkerThread.h:

#ifndef THREADPOOL_Thread #define THREADPOOL_Thread #include <windows.h> #include "MyThreadPool.h" class MyThreadPool; class CJob; class CWorkerThread { private: MyThreadPool* m_ThreadPool; CJob* m_Job; void* m_JobData; pthread_mutex_t m_VarMutex; bool m_IsEnd; pthread_t thread_; protected: public: sem_t m_JobSem; pthread_mutex_t m_WorkMutex; CWorkerThread(); virtual ~CWorkerThread(); static void* Run(void*); void SetJob(CJob* job, void* jobdata); CJob* GetJob(void){ return m_Job; } void SetThreadPool(MyThreadPool* thrpool); MyThreadPool* GetThreadPool(void){ return m_ThreadPool; } bool Start(void); bool Join(void); pthread_t GetThreadID(){ return thread_; }; }; #endif

CWorkerThread.cpp:

#include "CWorkerThread.h" CWorkerThread::CWorkerThread() { m_Job = NULL; m_JobData = NULL; m_ThreadPool = NULL; m_IsEnd = false; pthread_mutex_init(&m_WorkMutex, NULL); pthread_mutex_init(&m_VarMutex, NULL); sem_init(&m_JobSem, 0, 0);//初始空闲信号量 } CWorkerThread::~CWorkerThread() { if (NULL != m_Job) delete m_Job; if (m_ThreadPool != NULL) delete m_ThreadPool; } void* CWorkerThread::Run(void * pthis) { //SetThreadState(THREAD_RUNNING); CWorkerThread* p = (CWorkerThread*)pthis; for (;;) { while (p->m_Job == NULL) sem_wait(&(p->m_JobSem));//没有任务时,等待任务信号量,挂起 p->m_Job->Run(p->m_JobData); p->m_Job->SetWorkThread(NULL); p->m_Job = NULL; p->m_ThreadPool->MoveToIdleList(p); if (p->m_ThreadPool->m_IdleList.size() > p->m_ThreadPool->GetAvailHighNum())//每次线程移到空闲链表后,需重新调整空闲线程数量 { p->m_ThreadPool->DeleteIdleThread(p->m_ThreadPool->m_IdleList.size() - p->m_ThreadPool->GetInitNum()); } pthread_mutex_unlock(&(p->m_WorkMutex));//解锁,可以添加新任务了 } } void CWorkerThread::SetJob(CJob* job, void* jobdata) { pthread_mutex_lock(&m_VarMutex); m_Job = job; m_JobData = jobdata; job->SetWorkThread(this); pthread_mutex_unlock(&m_VarMutex); sem_post(&m_JobSem);//添加新任务,线程从任务信号量等待中恢复 } void CWorkerThread::SetThreadPool(MyThreadPool* thrpool) { pthread_mutex_lock(&m_VarMutex); m_ThreadPool = thrpool; pthread_mutex_unlock(&m_VarMutex); } bool CWorkerThread::Start(void) { int res; res = pthread_create(&thread_, NULL, Run, (void*)this);//创建新线程 if (res) { printf("线程创建失败!\n"); exit(res); } } bool CWorkerThread::Join(void) { pthread_mutex_destroy(&m_WorkMutex); pthread_mutex_destroy(&m_VarMutex); sem_destroy(&m_JobSem); return pthread_cancel(thread_); }

CWorkerThread为工作线程类,对象由线程池创建管理,初始时其任务成员变量为空,故阻塞等待任务分配。

工作线程类真正运行在线程中的是一个静态成员函数:void* CWorkerThread::Run(void * pthis)。这个函数是个死循环,一直处于等待任务或者处理任务的状态。 由于是静态成员函数,需要将this指针作为参数传入,以访问工作线程对象的成员变量和其他成员函数。

添加新任务时,CThreadManage::Run()函数会调用MyThreadPool::AddJob()向线程池任务队列添加任务。添加后任务不一定立即被执行。

线程池有一个任务分发线程一直在运行,它调用MyThreadPool::Distribute_Job()分发任务队列中的任务。

MyThreadPool::Distribute_Job()会调用CWorkerThread::SetJob()为工作线程添加任务。并在CWorkerThread::SetJob()中唤醒了阻塞的工作线程。 将此工作线程放入忙碌线程链表中。

工作线程被唤醒后,执行获取的任务代码:Job::Run()。

工作线程完成任务后,将自己放回空闲线程链表中,并判断空闲线程个数,如果过多,则说明线程冗余,适量删除线程。

然后继续死循环,阻塞等待下一个任务到来。


MyThreadPool.h:

#ifndef THREADPOOL_ThreadPool #define THREADPOOL_ThreadPool #include <assert.h> #include <windows.h> #include <vector> #include <semaphore.h> #include <pthread.h> #include "Job.h" #include "CWorkerThread.h" using namespace::std; class CWorkerThread; class CJob; class MyThreadPool { friend class CWorkerThread; private: unsigned int m_MaxNum; //线程数上限 the max thread num that can create at the same time unsigned int m_AvailLow; //空闲线程数下限 min num of idle thread that shoule kept unsigned int m_AvailHigh;//空闲线程数上限 The max num of idle thread that kept at the same time unsigned int m_AvailNum; //当前空闲线程数 the normal thread num of idle num; unsigned int m_InitNum; //初始线程数 Normal thread num; protected: CWorkerThread* GetIdleThread(void);//获取一个空闲线程 void AppendToIdleList(CWorkerThread* jobthread);//添加到空闲线程链表 void MoveToBusyList(CWorkerThread* idlethread);//移动到忙碌线程链表 void MoveToIdleList(CWorkerThread* busythread);//移动到空闲线程链表 void DeleteIdleThread(int num);//删除空闲线程 void CreateIdleThread(int num);//创建空闲线程 public: pthread_mutex_t m_BusyMutex; //when visit busy list,use m_BusyMutex to lock and unlock pthread_mutex_t m_IdleMutex; //when visit idle list,use m_IdleMutex to lock and unlock pthread_mutex_t m_VarMutex; pthread_mutex_t m_JobMutex; //任务队列互斥锁 pthread_cond_t m_BusyCond; //忙碌线程同步 m_BusyCond is used to sync busy thread list sem_t m_IdleSem; //m_IdleCond is used to sync idle thread list pthread_cond_t m_IdleJobCond; //m_JobCond is used to sync job list sem_t m_MaxNumSem; sem_t m_JobSem; //任务队列信号量 vector<CWorkerThread*> m_ThreadList; vector<CWorkerThread*> m_BusyList; //忙碌线程链表 Thread List vector<CWorkerThread*> m_IdleList; //空闲线程链表 Idle List vector<pair<CJob*, void*>> m_JobList; //任务队列:直接在主线程中分配任务会造成主线程阻塞,需要一个独立的线程用于不断分配任务。 MyThreadPool(); MyThreadPool(int initnum); virtual ~MyThreadPool(); void SetMaxNum(int maxnum){ m_MaxNum = maxnum; } int GetMaxNum(void){ return m_MaxNum; } void SetAvailLowNum(int minnum){ m_AvailLow = minnum; } int GetAvailLowNum(void){ return m_AvailLow; } void SetAvailHighNum(int highnum){ m_AvailHigh = highnum; } int GetAvailHighNum(void){ return m_AvailHigh; } int GetActualAvailNum(void){ return m_AvailNum; } int GetAllNum(void){ return m_ThreadList.size(); } int GetBusyNum(void){ return m_BusyList.size(); } void SetInitNum(int initnum){ m_InitNum = initnum; } int GetInitNum(void){ return m_InitNum; } void TerminateAll(void); bool AddJob(CJob* job, void* jobdata);//添加任务到任务队列 void Distribute_Job(CJob* job, void* jobdata);//分发任务给工作线程 static void* Job_distribut_thread(void *);//任务分发线程入口函数 pthread_t job_distribute_thread_handle;//任务分发线程标识 }; #endif

MyThreadPool.cpp:

#include "MyThreadPool.h" MyThreadPool::MyThreadPool() { m_MaxNum = 50; m_AvailLow = 5; m_InitNum = m_AvailNum = 10;//初始创建10个线程都是空闲线程 m_AvailHigh = 20; m_BusyList.clear(); m_IdleList.clear(); pthread_mutex_init(&m_BusyMutex, NULL); pthread_mutex_init(&m_IdleMutex, NULL); pthread_mutex_init(&m_VarMutex, NULL); pthread_mutex_init(&m_JobMutex, NULL); sem_init(&m_IdleSem, 0, 0);//初始空闲信号量 sem_init(&m_MaxNumSem, 0, 0);//初始空闲信号量 sem_init(&m_JobSem, 0, 0);//初始空闲信号量 for (int i = 0; i < m_InitNum; i++){ CWorkerThread* thr = new CWorkerThread(); AppendToIdleList(thr); //新线程全部添加到空闲线程列表中 thr->SetThreadPool(this); //在每个线程都保存线程池的指针 thr->Start(); //线程创建完成后启动,没有任务:挂起等待任务 begin the thread,the thread wait for job } pthread_create(&job_distribute_thread_handle, NULL, Job_distribut_thread, (void*)this);//启动任务分发线程 } MyThreadPool::MyThreadPool(int initnum) { assert(initnum > 0 && initnum <= 30); m_MaxNum = 30; m_AvailLow = initnum - 10 > 0 ? initnum - 10 : 3; m_InitNum = m_AvailNum = initnum; m_AvailHigh = initnum + 10; m_BusyList.clear(); m_IdleList.clear(); pthread_mutex_init(&m_BusyMutex, NULL); pthread_mutex_init(&m_IdleMutex, NULL); pthread_mutex_init(&m_VarMutex, NULL); pthread_mutex_init(&m_JobMutex, NULL); sem_init(&m_IdleSem, 0, 0);//初始空闲信号量 sem_init(&m_MaxNumSem, 0, 0);//初始空闲信号量 sem_init(&m_JobSem, 0, 0);//初始空闲信号量 for (int i = 0; i < m_InitNum; i++){ CWorkerThread* thr = new CWorkerThread(); AppendToIdleList(thr); //新线程全部添加到空闲线程列表中 thr->SetThreadPool(this); //在每个线程都保存线程池的指针 thr->Start(); //线程类创建完成后启动线程,初始时没有任务:挂起等待任务 } pthread_create(&job_distribute_thread_handle, NULL, Job_distribut_thread, (void*)this);//启动任务分发线程 } MyThreadPool::~MyThreadPool() { TerminateAll(); pthread_mutex_destroy(&m_BusyMutex);//销毁所有锁 pthread_mutex_destroy(&m_IdleMutex); pthread_mutex_destroy(&m_VarMutex); pthread_mutex_destroy(&m_JobMutex); sem_destroy(&m_IdleSem); sem_destroy(&m_MaxNumSem); sem_destroy(&m_JobSem); } void MyThreadPool::TerminateAll() { for (int i = 0; i < m_ThreadList.size(); i++) { CWorkerThread* thr = m_ThreadList[i]; thr->Join(); //每个工作线程都要结束 } pthread_cancel(job_distribute_thread_handle);//结束任务分发线程 return; } //获取一个空闲线程 CWorkerThread* MyThreadPool::GetIdleThread(void) { //生产者消费者模式: sem_wait(&m_IdleSem);//如果空闲线程为0,则等待空闲信号量,直到有新的空闲线程 pthread_mutex_lock(&m_IdleMutex); if (m_IdleList.size() > 0)//上锁后再次判断资源是否存在 { CWorkerThread* thr = (CWorkerThread*)m_IdleList.front(); printf("Get Idle thread %u\n", thr->GetThreadID()); pthread_mutex_unlock(&m_IdleMutex); return thr; } pthread_mutex_unlock(&m_IdleMutex); return NULL; } //add an idle thread to idle list //向线程链表和空闲线程链表添加新线程 void MyThreadPool::AppendToIdleList(CWorkerThread* jobthread) { pthread_mutex_lock(&m_IdleMutex); m_IdleList.push_back(jobthread); m_ThreadList.push_back(jobthread); sem_post(&m_IdleSem); //空闲信号量增加 pthread_mutex_unlock(&m_IdleMutex); } //move and idle thread to busy thread //将线程从空闲线程链表移动到忙碌线程链表 void MyThreadPool::MoveToBusyList(CWorkerThread* idlethread) { pthread_mutex_lock(&m_BusyMutex); m_BusyList.push_back(idlethread); m_AvailNum--; pthread_mutex_unlock(&m_BusyMutex); pthread_mutex_lock(&m_IdleMutex); vector<CWorkerThread*>::iterator pos; pos = find(m_IdleList.begin(), m_IdleList.end(), idlethread); if (pos != m_IdleList.end()) { m_IdleList.erase(pos); } pthread_mutex_unlock(&m_IdleMutex); } //将线程从忙碌线程链表移到空闲线程链表 void MyThreadPool::MoveToIdleList(CWorkerThread* busythread) { pthread_mutex_lock(&m_IdleMutex); m_IdleList.push_back(busythread); m_AvailNum++; pthread_mutex_unlock(&m_IdleMutex); pthread_mutex_lock(&m_BusyMutex); vector<CWorkerThread*>::iterator pos; pos = find(m_BusyList.begin(), m_BusyList.end(), busythread); if (pos != m_BusyList.end()) m_BusyList.erase(pos); pthread_mutex_unlock(&m_BusyMutex); sem_post(&m_IdleSem);//空闲信号量增加 sem_post(&m_MaxNumSem);//通知由于线程上限而等待的线程 //pthread_cond_signal(&m_IdleCond); //pthread_cond_signal(&m_MaxNumCond); } //create num idle thread and put them to idlelist void MyThreadPool::CreateIdleThread(int num) { for (int i = 0; i < num; i++){ CWorkerThread* thr = new CWorkerThread(); thr->SetThreadPool(this); AppendToIdleList(thr); pthread_mutex_lock(&m_VarMutex); m_AvailNum++; pthread_mutex_unlock(&m_VarMutex); thr->Start(); //begin the thread,the thread wait for job } } void MyThreadPool::DeleteIdleThread(int num) { printf("Enter into MyThreadPool::DeleteIdleThreadn"); pthread_mutex_lock(&m_IdleMutex); printf("Delete Num is %d\n", num); for (int i = 0; i < num; i++){ sem_wait(&m_IdleSem); CWorkerThread* thr; if (m_IdleList.size() > 0){ thr = (CWorkerThread*)m_IdleList.front(); printf("Get Idle thread %u\n", thr->GetThreadID()); } vector<CWorkerThread*>::iterator pos; pos = find(m_IdleList.begin(), m_IdleList.end(), thr); if (pos != m_IdleList.end()) m_IdleList.erase(pos); m_AvailNum--; printf("The idle thread available num:%d\n", m_AvailNum); printf("The idlelist num:%d\n", m_IdleList.size()); } pthread_mutex_unlock(&m_IdleMutex); } void MyThreadPool::Distribute_Job(CJob* job, void* jobdata) { assert(job != NULL); //if the busy thread num adds to m_MaxNum,so we should wait if (GetBusyNum() == m_MaxNum) sem_wait(&m_MaxNumSem);//忙碌线程数达到线程数上限,只能等待线程空闲,不能创建新线程 //m_MaxNumCond.Wait(); if (m_IdleList.size() < m_AvailLow)//空闲线程太少,在线程上限内,增加线程 { if (GetAllNum() + m_InitNum - m_IdleList.size() < m_MaxNum) CreateIdleThread(m_InitNum - m_IdleList.size()); else CreateIdleThread(m_MaxNum - GetAllNum()); } CWorkerThread* idlethr = GetIdleThread(); if (idlethr != NULL) { pthread_mutex_lock(&idlethr->m_WorkMutex);//为空闲线程添加新任务,上锁知道任务结束才解锁 MoveToBusyList(idlethr); idlethr->SetThreadPool(this); //线程创建时已经设置了 job->SetWorkThread(idlethr); //在任务中记录线程标识 printf("Job is set to thread %u\n", idlethr->GetThreadID()); idlethr->SetJob(job, jobdata); //向线程添加任务,会增加线程任务信号量 } } bool MyThreadPool::AddJob(CJob* job, void* jobdata) { pthread_mutex_lock(&m_JobMutex); m_JobList.push_back(pair<CJob*, void*>(job, jobdata)); //新任务添加到任务列表 pthread_mutex_unlock(&m_JobMutex); sem_post(&m_JobSem); //任务信号量增加,通知任务分发线程 return 1; } void * MyThreadPool::Job_distribut_thread(void* pthis) { MyThreadPool* p = (MyThreadPool*)pthis; while (1) { CJob* job; void* jobdata; sem_wait(&(p->m_JobSem)); pthread_mutex_lock(&(p->m_JobMutex)); if (!p->m_JobList.empty()) { job = ((p->m_JobList).begin())->first; jobdata = ((p->m_JobList).begin())->first; p->m_JobList.erase(p->m_JobList.begin());//消耗一个任务 } else { continue; } pthread_mutex_unlock(&(p->m_JobMutex)); p->Distribute_Job(job, jobdata);//分发任务给工作线程,即使阻塞,新任务也可以继续添加到任务队列中 } }

线程池有一个任务队列,一个任务分发线程。 任务分发线程在线程池初始化时启动,入口函数为: MyThreadPool::Job_distribut_thread(void* pthis)。该函数是一个死循环;在循环中,任务队列中的任务被不断分发给空闲线程链表中的线程。

如果任务队列为空,则线程阻塞等待新任务添加。

如果任务分发过程中,没有空闲线程可用,也会阻塞,等待忙碌线程空闲下来。但不会影响新任务添加到任务队列中。


线程池类还有两个链表,分别存放忙碌和空闲的工作线程。初始化时创建若干工作线程对象,放入空闲线程链表中。

添加新任务时,CThreadManage::Run()函数会调用MyThreadPool::AddJob()向线程池任务队列添加任务。添加后任务不一定立即被执行。

线程池有一个任务分发线程一直在运行,它调用MyThreadPool::Distribute_Job()分发任务队列中的任务。

MyThreadPool::Distribute_Job()会调用CWorkerThread::SetJob()为工作线程添加任务。并在CWorkerThread::SetJob()中唤醒了阻塞的工作线程。 还将此工作线程从空闲线程链表移到忙碌线程链表中。

其中在任务分发线程调用MyThreadPool::Distribute_Job()分发任务队列中的任务时,会先判断当前的忙碌线程个数是否达到线程个数上限(m_MaxNum),若已达到上限,说明所有线程都在忙碌且由于线程数已达到规定的上限,不能再增加新线程。此时阻塞等待,直到有忙碌线程空闲下来。

线程池判断当前空闲线程的个数(m_AvailNum)是否在大于规定的空闲线程数下限(m_AvailLow)。 若m_AvailNum小于m_AvailLow,则说明线程不够用,在线程个数上限(m_MaxNum)范围内增加新线程。


ThreadManage.h:

#ifndef THREADPOOL_ThreadManage #define THREADPOOL_ThreadManage #include <windows.h> #include "MyThreadPool.h" class CThreadManage { private: MyThreadPool* m_Pool; int m_NumOfThread; protected: public: void SetParallelNum(int num); CThreadManage(); CThreadManage(int num); virtual ~CThreadManage(); void Run(CJob* job, void* jobdata); void TerminateAll(void); }; #endif

ThreadManage.cpp:

#include "ThreadManage.h" CThreadManage::CThreadManage(){ m_NumOfThread = 10; m_Pool = new MyThreadPool(m_NumOfThread); } CThreadManage::CThreadManage(int num){ m_NumOfThread = num; m_Pool = new MyThreadPool(m_NumOfThread); } CThreadManage::~CThreadManage(){ if (NULL != m_Pool) delete m_Pool; } void CThreadManage::SetParallelNum(int num){ m_NumOfThread = num; } void CThreadManage::Run(CJob* job, void* jobdata){ m_Pool->AddJob(job, jobdata); } void CThreadManage::TerminateAll(void){ m_Pool->TerminateAll(); }
转载请注明原文地址: https://www.6miu.com/read-63659.html

最新回复(0)