cc++ 线程池

xiaoxiao2021-02-28  60

概念

原理

线程池简单来说就是有一堆已经创建好的线程(最大数目一定),初始时他们都处于空闲状态,当有新的任务进来,从线程池中取出一个空闲的线程处理任务,然后当任务处理完成之后,该线程被重新放回到线程池中,供其他的任务使用,当线程池中的线程都在处理任务时,就没有空闲线程供使用,此时,若有新的任务产生,只能等待线程池中有线程结束任务空闲才能执行

为什么使用线程池

因为线程的创建、和清理都是需要耗费系统资源的。假设某个线程的创建、运行和销毁的时间分别为T1、T2、T3,当T1+T3的时间相对于T2不可忽略时,线程池的就有必要引入了,尤其是处理数百万级的高并发处理时。

线程池提升了多线程程序的性能,因为线程池里面的线程都是现成的而且能够重复使用,我们不需要临时创建大量线程,然后在任务结束时又销毁大量线程。一个理想的线程池能够合理地动态调节池内线程数量,既不会因为线程过少而导致大量任务堆积,也不会因为线程过多了而增加额外的系统开销。

优点

第一:降低资源消耗。通过重复利用已创建的线程降低线程创建和销毁造成的消耗。 第二:提高响应速度。当任务到达时,任务可以不需要等到线程创建就能立即执行。 第三:提高线程的可管理性。线程是稀缺资源,如果无限制的创建,不仅会消耗系统资源,还会降低系统的稳定性,使用线程池可以进行统一的分配,调优和监控。但是要做到合理的利用线程池,必须对其原理了如指掌。

生产者与消费者模型

线程池的原理是一个非常典型的生产者消费者同步问题。线程池至少有两个主要动作,一个是主程序不定时地向线程池添加任务,另一个是线程池里的线程领取任务去执行。

主程序执行入队操作,把任务添加到一个队列里面;池子里的多个工作线程共同对这个队列试图执行出队操作,这里要保证同一时刻只有一个线程出队成功,抢夺到这个任务,其他线程继续共同试图出队抢夺下一个任务。所以在实现线程池之前,我们需要一个队列。

这里的生产者就是主程序,生产任务(增加任务),消费者就是工作线程,消费任务(执行、减少任务)。因为这里涉及到多个线程同时访问一个队列的问题,所以我们需要互斥锁来保护队列,同时还需要条件变量来处理主线程通知任务到达、工作线程抢夺任务的问题。

c 实现线程池

//condition.h #ifndef _CONDITION_H_ #define _CONDITION_H_ #include <pthread.h> //封装一个互斥量和条件变量作为状态 typedef struct condition { pthread_mutex_t pmutex; pthread_cond_t pcond; }condition_t; //对状态的操作函数 int condition_init(condition_t *cond); int condition_lock(condition_t *cond); int condition_unlock(condition_t *cond); int condition_wait(condition_t *cond); int condition_timedwait(condition_t *cond, const struct timespec *abstime); int condition_signal(condition_t* cond); int condition_broadcast(condition_t *cond); int condition_destroy(condition_t *cond); #endif //condition.c #include "condition.h" //初始化 int condition_init(condition_t *cond) { int status; if((status = pthread_mutex_init(&cond->pmutex, NULL))) return status; if((status = pthread_cond_init(&cond->pcond, NULL))) return status; return 0; } //加锁 int condition_lock(condition_t *cond) { return pthread_mutex_lock(&cond->pmutex); } //解锁 int condition_unlock(condition_t *cond) { return pthread_mutex_unlock(&cond->pmutex); } //等待 int condition_wait(condition_t *cond) { return pthread_cond_wait(&cond->pcond, &cond->pmutex); } //固定时间等待 int condition_timedwait(condition_t *cond, const struct timespec *abstime) { return pthread_cond_timedwait(&cond->pcond, &cond->pmutex, abstime); } //唤醒一个睡眠线程 int condition_signal(condition_t* cond) { return pthread_cond_signal(&cond->pcond); } //唤醒所有睡眠线程 int condition_broadcast(condition_t *cond) { return pthread_cond_broadcast(&cond->pcond); } //释放 int condition_destroy(condition_t *cond) { int status; if((status = pthread_mutex_destroy(&cond->pmutex))) return status; if((status = pthread_cond_destroy(&cond->pcond))) return status; return 0; } //threadpool.h #ifndef _THREAD_POOL_H_ #define _THREAD_POOL_H_ //线程池头文件 #include "condition.h" //封装线程池中的对象需要执行的任务对象 typedef struct task { void *(*run)(void *args); //函数指针,需要执行的任务 void *arg; //参数 struct task *next; //任务队列中下一个任务 }task_t; //下面是线程池结构体 typedef struct threadpool { condition_t ready; //状态量 task_t *first; //任务队列中第一个任务 task_t *last; //任务队列中最后一个任务 int counter; //线程池中已有线程数 int idle; //线程池中空闲线程数 int max_threads; //线程池最大线程数 int quit; //是否退出标志 }threadpool_t; //线程池初始化 void threadpool_init(threadpool_t *pool, int threads); //往线程池中加入任务 void threadpool_add_task(threadpool_t *pool, void *(*run)(void *arg), void *arg); //摧毁线程池 void threadpool_destroy(threadpool_t *pool); #endif //threadpool.c #include "threadpool.h" #include <stdlib.h> #include <stdio.h> #include <string.h> #include <errno.h> #include <time.h> //创建的线程执行 void *thread_routine(void *arg) { struct timespec abstime; int timeout; printf("thread %d is starting\n", (int)pthread_self()); threadpool_t *pool = (threadpool_t *)arg; while(1) { timeout = 0; //访问线程池之前需要加锁 condition_lock(&pool->ready); //空闲 pool->idle++; //等待队列有任务到来 或者 收到线程池销毁通知 while(pool->first == NULL && !pool->quit) { //否则线程阻塞等待 printf("thread %d is waiting\n", (int)pthread_self()); //获取从当前时间,并加上等待时间, 设置进程的超时睡眠时间 clock_gettime(CLOCK_REALTIME, &abstime); abstime.tv_sec += 2; int status; status = condition_timedwait(&pool->ready, &abstime); //该函数会解锁,允许其他线程访问,当被唤醒时,加锁 if(status == ETIMEDOUT) { printf("thread %d wait timed out\n", (int)pthread_self()); timeout = 1; break; } } pool->idle--; if(pool->first != NULL) { //取出等待队列最前的任务,移除任务,并执行任务 task_t *t = pool->first; pool->first = t->next; //由于任务执行需要消耗时间,先解锁让其他线程访问线程池 condition_unlock(&pool->ready); //执行任务 t->run(t->arg); //执行完任务释放内存 free(t); //重新加锁 condition_lock(&pool->ready); } //退出线程池 if(pool->quit && pool->first == NULL) { pool->counter--;//当前工作的线程数-1 //若线程池中没有线程,通知等待线程(主线程)全部任务已经完成 if(pool->counter == 0) { condition_signal(&pool->ready); } condition_unlock(&pool->ready); break; } //超时,跳出销毁线程 if(timeout == 1) { pool->counter--;//当前工作的线程数-1 condition_unlock(&pool->ready); break; } condition_unlock(&pool->ready); } printf("thread %d is exiting\n", (int)pthread_self()); return NULL; } //线程池初始化 void threadpool_init(threadpool_t *pool, int threads) { condition_init(&pool->ready); pool->first = NULL; pool->last =NULL; pool->counter =0; pool->idle =0; pool->max_threads = threads; pool->quit =0; } //增加一个任务到线程池 void threadpool_add_task(threadpool_t *pool, void *(*run)(void *arg), void *arg) { //产生一个新的任务 task_t *newtask = (task_t *)malloc(sizeof(task_t)); newtask->run = run; newtask->arg = arg; newtask->next=NULL;//新加的任务放在队列尾端 //线程池的状态被多个线程共享,操作前需要加锁 condition_lock(&pool->ready); if(pool->first == NULL)//第一个任务加入 { pool->first = newtask; } else { pool->last->next = newtask; } pool->last = newtask; //队列尾指向新加入的线程 //线程池中有线程空闲,唤醒 if(pool->idle > 0) { condition_signal(&pool->ready); } //当前线程池中线程个数没有达到设定的最大值,创建一个新的线程 else if(pool->counter < pool->max_threads) { pthread_t tid; pthread_create(&tid, NULL, thread_routine, pool); pool->counter++; } //结束,访问 condition_unlock(&pool->ready); } //线程池销毁 void threadpool_destroy(threadpool_t *pool) { //如果已经调用销毁,直接返回 if(pool->quit) { return; } //加锁 condition_lock(&pool->ready); //设置销毁标记为1 pool->quit = 1; //线程池中线程个数大于0 if(pool->counter > 0) { //对于等待的线程,发送信号唤醒 if(pool->idle > 0) { condition_broadcast(&pool->ready); } //正在执行任务的线程,等待他们结束任务 while(pool->counter) { condition_wait(&pool->ready); } } condition_unlock(&pool->ready); condition_destroy(&pool->ready); } //main.c #include "threadpool.h" #include <unistd.h> #include <stdlib.h> #include <stdio.h> void* mytask(void *arg) { printf("thread %d is working on task %d\n", (int)pthread_self(), *(int*)arg); sleep(1); free(arg); return NULL; } //测试代码 int main(void) { threadpool_t pool; //初始化线程池,最多三个线程 threadpool_init(&pool, 3); int i; //创建十个任务 for(i=0; i < 10; i++) { int *arg = (int*)malloc(sizeof(int)); *arg = i; threadpool_add_task(&pool, mytask, arg); } threadpool_destroy(&pool); return 0; }

注:pro文件添加LIBS += -lpthread

转载链接,附另一Github源码

c++ 实现线程池

一般来说实现一个线程池主要包括以下4个组成部分 线程管理器:用于创建并管理线程池工作线程:线程池中实际执行任务的线程。在初始化线程时会预先创建好固定数目的线程在池中,这些初始化的线程一般处于空闲状态。任务接口:每个任务必须实现的接口。当线程池的任务队列中有可执行任务时,被空间的工作线程调去执行(线程的闲与忙的状态是通过互斥量实现的),把任务抽象出来形成一个接口,可以做到线程池与具体的任务无关。任务队列:用来存放没有处理的任务。提供一种缓冲机制。实现这种结构有很多方法,常用的有队列和链表结构。流程图如下: //thread_pool.h #ifndef __THREAD_POOL_H #define __THREAD_POOL_H #include <vector> #include <string> #include <pthread.h> using namespace std; /*执行任务的类:设置任务数据并执行*/ class CTask { protected: string m_strTaskName; //任务的名称 void* m_ptrData; //要执行的任务的具体数据 public: CTask() = default; CTask(string &taskName): m_strTaskName(taskName), m_ptrData(NULL) {} virtual int Run() = 0; void setData(void* data); //设置任务数据 virtual ~CTask() {} }; /*线程池管理类*/ class CThreadPool { private: static vector<CTask*> m_vecTaskList; //任务列表 static bool shutdown; //线程退出标志 int m_iThreadNum; //线程池中启动的线程数 pthread_t *pthread_id; static pthread_mutex_t m_pthreadMutex; //线程同步锁 static pthread_cond_t m_pthreadCond; //线程同步条件变量 protected: static void* ThreadFunc(void *threadData); //新线程的线程回调函数 static int MoveToIdle(pthread_t tid); //线程执行结束后,把自己放入空闲线程中 static int MoveToBusy(pthread_t tid); //移入到忙碌线程中去 int Create(); //创建线程池中的线程 public: CThreadPool(int threadNum); int AddTask(CTask *task); //把任务添加到任务队列中 int StopAll(); //使线程池中的所有线程退出 int getTaskSize(); //获取当前任务队列中的任务数 }; #endif //thread_pool.cpp #include "thread_pool.h" #include <cstdio> void CTask::setData(void* data) { m_ptrData = data; } //静态成员初始化 vector<CTask*> CThreadPool::m_vecTaskList; bool CThreadPool::shutdown = false; pthread_mutex_t CThreadPool::m_pthreadMutex = PTHREAD_MUTEX_INITIALIZER; pthread_cond_t CThreadPool::m_pthreadCond = PTHREAD_COND_INITIALIZER; //线程管理类构造函数 CThreadPool::CThreadPool(int threadNum) { this->m_iThreadNum = threadNum; printf("I will create %d threads.\n", threadNum); Create(); } //线程回调函数 void* CThreadPool::ThreadFunc(void* threadData) { pthread_t tid = pthread_self(); while(1) { pthread_mutex_lock(&m_pthreadMutex); //如果队列为空,等待新任务进入任务队列 while(m_vecTaskList.size() == 0 && !shutdown) pthread_cond_wait(&m_pthreadCond, &m_pthreadMutex); //关闭线程 if(shutdown) { pthread_mutex_unlock(&m_pthreadMutex); printf("[tid: %lu]\texit\n", pthread_self()); pthread_exit(NULL); } printf("[tid: %lu]\trun: ", tid); vector<CTask*>::iterator iter = m_vecTaskList.begin(); //取出一个任务并处理之 CTask* task = *iter; if(iter != m_vecTaskList.end()) { task = *iter; m_vecTaskList.erase(iter); } pthread_mutex_unlock(&m_pthreadMutex); task->Run(); //执行任务 printf("[tid: %lu]\tidle\n", tid); } return (void*)0; } //往任务队列里添加任务并发出线程同步信号 int CThreadPool::AddTask(CTask *task) { pthread_mutex_lock(&m_pthreadMutex); m_vecTaskList.push_back(task); pthread_mutex_unlock(&m_pthreadMutex); pthread_cond_signal(&m_pthreadCond); return 0; } //创建线程 int CThreadPool::Create() { pthread_id = new pthread_t[m_iThreadNum]; for(int i = 0; i < m_iThreadNum; i++) pthread_create(&pthread_id[i], NULL, ThreadFunc, NULL); return 0; } //停止所有线程 int CThreadPool::StopAll() { //避免重复调用 if(shutdown) return -1; printf("Now I will end all threads!\n\n"); //唤醒所有等待进程,线程池也要销毁了 shutdown = true; pthread_cond_broadcast(&m_pthreadCond); //清楚僵尸 for(int i = 0; i < m_iThreadNum; i++) pthread_join(pthread_id[i], NULL); delete[] pthread_id; pthread_id = NULL; //销毁互斥量和条件变量 pthread_mutex_destroy(&m_pthreadMutex); pthread_cond_destroy(&m_pthreadCond); return 0; } //获取当前队列中的任务数 int CThreadPool::getTaskSize() { return m_vecTaskList.size(); } //main.cpp #include "thread_pool.h" #include <cstdio> #include <stdlib.h> #include <unistd.h> class CMyTask: public CTask { public: CMyTask() = default; int Run() { printf("%s\n", (char*)m_ptrData); int x = rand()%4 + 1; sleep(x); return 0; } ~CMyTask() {} }; int main() { CMyTask taskObj; char szTmp[] = "hello!"; taskObj.setData((void*)szTmp); CThreadPool threadpool(5); //线程池大小为5 for(int i = 0; i < 10; i++) threadpool.AddTask(&taskObj); while(1) { printf("There are still %d tasks need to handle\n", threadpool.getTaskSize()); //任务队列已没有任务了 if(threadpool.getTaskSize()==0) { //清除线程池 if(threadpool.StopAll() == -1) { printf("Thread pool clear, exit.\n"); exit(0); } } sleep(2); printf("2 seconds later...\n"); } return 0; }

转载链接

c++11实现线程池

注意在.pro文件中需要添加

QMAKE_CXXFLAGS += -std=c++11 -pthread LIBS += -pthread
转载请注明原文地址: https://www.6miu.com/read-2619857.html

最新回复(0)