概念
原理
线程池简单来说就是有一堆已经创建好的线程(最大数目一定),初始时他们都处于空闲状态,当有新的任务进来,从线程池中取出一个空闲的线程处理任务,然后当任务处理完成之后,该线程被重新放回到线程池中,供其他的任务使用,当线程池中的线程都在处理任务时,就没有空闲线程供使用,此时,若有新的任务产生,只能等待线程池中有线程结束任务空闲才能执行
为什么使用线程池
因为线程的创建、和清理都是需要耗费系统资源的。假设某个线程的创建、运行和销毁的时间分别为T1、T2、T3,当T1+T3的时间相对于T2不可忽略时,线程池的就有必要引入了,尤其是处理数百万级的高并发处理时。
线程池提升了多线程程序的性能,因为线程池里面的线程都是现成的而且能够重复使用,我们不需要临时创建大量线程,然后在任务结束时又销毁大量线程。一个理想的线程池能够合理地动态调节池内线程数量,既不会因为线程过少而导致大量任务堆积,也不会因为线程过多了而增加额外的系统开销。
优点
第一:降低资源消耗。通过重复利用已创建的线程降低线程创建和销毁造成的消耗。 第二:提高响应速度。当任务到达时,任务可以不需要等到线程创建就能立即执行。 第三:提高线程的可管理性。线程是稀缺资源,如果无限制的创建,不仅会消耗系统资源,还会降低系统的稳定性,使用线程池可以进行统一的分配,调优和监控。但是要做到合理的利用线程池,必须对其原理了如指掌。
生产者与消费者模型
线程池的原理是一个非常典型的生产者消费者同步问题。线程池至少有两个主要动作,一个是主程序不定时地向线程池添加任务,另一个是线程池里的线程领取任务去执行。
主程序执行入队操作,把任务添加到一个队列里面;池子里的多个工作线程共同对这个队列试图执行出队操作,这里要保证同一时刻只有一个线程出队成功,抢夺到这个任务,其他线程继续共同试图出队抢夺下一个任务。所以在实现线程池之前,我们需要一个队列。
这里的生产者就是主程序,生产任务(增加任务),消费者就是工作线程,消费任务(执行、减少任务)。因为这里涉及到多个线程同时访问一个队列的问题,所以我们需要互斥锁来保护队列,同时还需要条件变量来处理主线程通知任务到达、工作线程抢夺任务的问题。
c 实现线程池
#ifndef _CONDITION_H_
#define _CONDITION_H_
#include <pthread.h>
typedef struct condition
{
pthread_mutex_t pmutex;
pthread_cond_t pcond;
}condition_t;
int condition_init(condition_t *cond);
int condition_lock(condition_t *cond);
int condition_unlock(condition_t *cond);
int condition_wait(condition_t *cond);
int condition_timedwait(condition_t *cond,
const struct timespec *abstime);
int condition_signal(condition_t* cond);
int condition_broadcast(condition_t *cond);
int condition_destroy(condition_t *cond);
#endif
#include "condition.h"
int condition_init(condition_t *cond)
{
int status;
if((status = pthread_mutex_init(&cond->pmutex, NULL)))
return status;
if((status = pthread_cond_init(&cond->pcond, NULL)))
return status;
return 0;
}
int condition_lock(condition_t *cond)
{
return pthread_mutex_lock(&cond->pmutex);
}
int condition_unlock(condition_t *cond)
{
return pthread_mutex_unlock(&cond->pmutex);
}
int condition_wait(condition_t *cond)
{
return pthread_cond_wait(&cond->pcond, &cond->pmutex);
}
int condition_timedwait(condition_t *cond,
const struct timespec *abstime)
{
return pthread_cond_timedwait(&cond->pcond, &cond->pmutex, abstime);
}
int condition_signal(condition_t* cond)
{
return pthread_cond_signal(&cond->pcond);
}
int condition_broadcast(condition_t *cond)
{
return pthread_cond_broadcast(&cond->pcond);
}
int condition_destroy(condition_t *cond)
{
int status;
if((status = pthread_mutex_destroy(&cond->pmutex)))
return status;
if((status = pthread_cond_destroy(&cond->pcond)))
return status;
return 0;
}
#ifndef _THREAD_POOL_H_
#define _THREAD_POOL_H_
#include "condition.h"
typedef struct task
{
void *(*run)(
void *args);
void *arg;
struct task *next;
}task_t;
typedef struct threadpool
{
condition_t ready;
task_t *first;
task_t *last;
int counter;
int idle;
int max_threads;
int quit;
}threadpool_t;
void threadpool_init(threadpool_t *pool,
int threads);
void threadpool_add_task(threadpool_t *pool,
void *(*run)(
void *arg),
void *arg);
void threadpool_destroy(threadpool_t *pool);
#endif
#include "threadpool.h"
#include <stdlib.h>
#include <stdio.h>
#include <string.h>
#include <errno.h>
#include <time.h>
void *thread_routine(
void *arg)
{
struct timespec abstime;
int timeout;
printf(
"thread %d is starting\n", (
int)pthread_self());
threadpool_t *pool = (threadpool_t *)arg;
while(
1)
{
timeout =
0;
condition_lock(&pool->ready);
pool->idle++;
while(pool->first == NULL && !pool->quit)
{
printf(
"thread %d is waiting\n", (
int)pthread_self());
clock_gettime(CLOCK_REALTIME, &abstime);
abstime.tv_sec +=
2;
int status;
status = condition_timedwait(&pool->ready, &abstime);
if(status == ETIMEDOUT)
{
printf(
"thread %d wait timed out\n", (
int)pthread_self());
timeout =
1;
break;
}
}
pool->idle--;
if(pool->first != NULL)
{
task_t *t = pool->first;
pool->first = t->next;
condition_unlock(&pool->ready);
t->run(t->arg);
free(t);
condition_lock(&pool->ready);
}
if(pool->quit && pool->first == NULL)
{
pool->counter--;
if(pool->counter ==
0)
{
condition_signal(&pool->ready);
}
condition_unlock(&pool->ready);
break;
}
if(timeout ==
1)
{
pool->counter--;
condition_unlock(&pool->ready);
break;
}
condition_unlock(&pool->ready);
}
printf(
"thread %d is exiting\n", (
int)pthread_self());
return NULL;
}
void threadpool_init(threadpool_t *pool,
int threads)
{
condition_init(&pool->ready);
pool->first = NULL;
pool->last =NULL;
pool->counter =
0;
pool->idle =
0;
pool->max_threads = threads;
pool->quit =
0;
}
void threadpool_add_task(threadpool_t *pool,
void *(*run)(
void *arg),
void *arg)
{
task_t *newtask = (task_t *)
malloc(
sizeof(task_t));
newtask->run = run;
newtask->arg = arg;
newtask->next=NULL;
condition_lock(&pool->ready);
if(pool->first == NULL)
{
pool->first = newtask;
}
else
{
pool->last->next = newtask;
}
pool->last = newtask;
if(pool->idle >
0)
{
condition_signal(&pool->ready);
}
else if(pool->counter < pool->max_threads)
{
pthread_t tid;
pthread_create(&tid, NULL, thread_routine, pool);
pool->counter++;
}
condition_unlock(&pool->ready);
}
void threadpool_destroy(threadpool_t *pool)
{
if(pool->quit)
{
return;
}
condition_lock(&pool->ready);
pool->quit =
1;
if(pool->counter >
0)
{
if(pool->idle >
0)
{
condition_broadcast(&pool->ready);
}
while(pool->counter)
{
condition_wait(&pool->ready);
}
}
condition_unlock(&pool->ready);
condition_destroy(&pool->ready);
}
#include "threadpool.h"
#include <unistd.h>
#include <stdlib.h>
#include <stdio.h>
void* mytask(
void *arg)
{
printf(
"thread %d is working on task %d\n", (
int)pthread_self(), *(
int*)arg);
sleep(
1);
free(arg);
return NULL;
}
int main(
void)
{
threadpool_t pool;
threadpool_init(&pool,
3);
int i;
for(i=
0; i <
10; i++)
{
int *arg = (
int*)
malloc(
sizeof(
int));
*arg = i;
threadpool_add_task(&pool, mytask, arg);
}
threadpool_destroy(&pool);
return 0;
}
注:pro文件添加LIBS += -lpthread
转载链接,附另一Github源码
c++ 实现线程池
一般来说实现一个线程池主要包括以下4个组成部分
线程管理器:用于创建并管理线程池工作线程:线程池中实际执行任务的线程。在初始化线程时会预先创建好固定数目的线程在池中,这些初始化的线程一般处于空闲状态。任务接口:每个任务必须实现的接口。当线程池的任务队列中有可执行任务时,被空间的工作线程调去执行(线程的闲与忙的状态是通过互斥量实现的),把任务抽象出来形成一个接口,可以做到线程池与具体的任务无关。任务队列:用来存放没有处理的任务。提供一种缓冲机制。实现这种结构有很多方法,常用的有队列和链表结构。流程图如下:
#ifndef __THREAD_POOL_H
#define __THREAD_POOL_H
#include <vector>
#include <string>
#include <pthread.h>
using namespace std;
class CTask
{
protected:
string m_strTaskName;
void* m_ptrData;
public:
CTask() =
default;
CTask(
string &taskName): m_strTaskName(taskName), m_ptrData(NULL) {}
virtual int Run() =
0;
void setData(
void* data);
virtual ~CTask() {}
};
class CThreadPool
{
private:
static vector<CTask*> m_vecTaskList;
static bool shutdown;
int m_iThreadNum;
pthread_t *pthread_id;
static pthread_mutex_t m_pthreadMutex;
static pthread_cond_t m_pthreadCond;
protected:
static void* ThreadFunc(
void *threadData);
static int MoveToIdle(pthread_t tid);
static int MoveToBusy(pthread_t tid);
int Create();
public:
CThreadPool(
int threadNum);
int AddTask(CTask *task);
int StopAll();
int getTaskSize();
};
#endif
#include "thread_pool.h"
#include <cstdio>
void CTask::setData(
void* data)
{
m_ptrData = data;
}
vector<CTask*> CThreadPool::m_vecTaskList;
bool CThreadPool::shutdown =
false;
pthread_mutex_t CThreadPool::m_pthreadMutex = PTHREAD_MUTEX_INITIALIZER;
pthread_cond_t CThreadPool::m_pthreadCond = PTHREAD_COND_INITIALIZER;
CThreadPool::CThreadPool(
int threadNum)
{
this->m_iThreadNum = threadNum;
printf(
"I will create %d threads.\n", threadNum);
Create();
}
void* CThreadPool::ThreadFunc(
void* threadData)
{
pthread_t tid = pthread_self();
while(
1)
{
pthread_mutex_lock(&m_pthreadMutex);
while(m_vecTaskList.size() ==
0 && !shutdown)
pthread_cond_wait(&m_pthreadCond, &m_pthreadMutex);
if(shutdown)
{
pthread_mutex_unlock(&m_pthreadMutex);
printf(
"[tid: %lu]\texit\n", pthread_self());
pthread_exit(NULL);
}
printf(
"[tid: %lu]\trun: ", tid);
vector<CTask*>::iterator iter = m_vecTaskList.begin();
CTask* task = *iter;
if(iter != m_vecTaskList.end())
{
task = *iter;
m_vecTaskList.erase(iter);
}
pthread_mutex_unlock(&m_pthreadMutex);
task->Run();
printf(
"[tid: %lu]\tidle\n", tid);
}
return (
void*)
0;
}
int CThreadPool::AddTask(CTask *task)
{
pthread_mutex_lock(&m_pthreadMutex);
m_vecTaskList.push_back(task);
pthread_mutex_unlock(&m_pthreadMutex);
pthread_cond_signal(&m_pthreadCond);
return 0;
}
int CThreadPool::Create()
{
pthread_id =
new pthread_t[m_iThreadNum];
for(
int i =
0; i < m_iThreadNum; i++)
pthread_create(&pthread_id[i], NULL, ThreadFunc, NULL);
return 0;
}
int CThreadPool::StopAll()
{
if(shutdown)
return -
1;
printf(
"Now I will end all threads!\n\n");
shutdown =
true;
pthread_cond_broadcast(&m_pthreadCond);
for(
int i =
0; i < m_iThreadNum; i++)
pthread_join(pthread_id[i], NULL);
delete[] pthread_id;
pthread_id = NULL;
pthread_mutex_destroy(&m_pthreadMutex);
pthread_cond_destroy(&m_pthreadCond);
return 0;
}
int CThreadPool::getTaskSize()
{
return m_vecTaskList.size();
}
#include "thread_pool.h"
#include <cstdio>
#include <stdlib.h>
#include <unistd.h>
class CMyTask:
public CTask
{
public:
CMyTask() =
default;
int Run()
{
printf(
"%s\n", (
char*)m_ptrData);
int x = rand()%
4 +
1;
sleep(x);
return 0;
}
~CMyTask() {}
};
int main()
{
CMyTask taskObj;
char szTmp[] =
"hello!";
taskObj.setData((
void*)szTmp);
CThreadPool threadpool(
5);
for(
int i =
0; i <
10; i++)
threadpool.AddTask(&taskObj);
while(
1)
{
printf(
"There are still %d tasks need to handle\n", threadpool.getTaskSize());
if(threadpool.getTaskSize()==
0) {
if(threadpool.StopAll() == -
1) {
printf(
"Thread pool clear, exit.\n");
exit(
0);
}
}
sleep(
2);
printf(
"2 seconds later...\n");
}
return 0;
}
转载链接
c++11实现线程池
注意在.pro文件中需要添加
QMAKE_CXXFLAGS
+= -std=c
++11 -pthread
LIBS
+= -pthread