1)需要大量的线程来完成任务,且完成任务的时间比较短。 比如WEB服务器完成网页请求这样的任务,使用线程池技术是非常合适的。因为单个任务小,而任务数量巨大。但对于长时间的任务,比如一个Telnet连接请求,线程池的优点就不明显了。因为Telnet会话时间比线程的创建时间大多了。
2)对性能要求苛刻的应用,比如要求服务器迅速响应客户请求。
3)接受突发性的大量请求,但不至于使服务器因此产生大量线程的应用。
#ifndef THREADPOLL_H #define THREADPOLL_H //三种机制实现同步与互斥 mutex : lock unlocker cond : wait signal sem : wait post #include<list> #include<cstdio> #include<exception> #include<pthread.h> #include"locker.h" template<class T> class threadpool { public: threadpool(int thead_number = 8, int max_requests = 10000); ~threadpool(); bool append(T*request); //往请求队列中添加数据 private: static void * worker(void* arg); //线程工作运行的函数 void run(); private: int m_thread_number; //线程池中的线程数 int m_max_request; //请求队列中最大请求数 pthread_t * m_threads; //描述线程的数组,其大小为m_thread_number std::list<T*> m_workqueue;//请求队列 locker m_queuelocker; //保护请求队列的互斥锁 sem m_queuestat; //是否有任务需要处理 bool m_stop; //是否结束函数 }; template<class T> threadpool<T>::threadpool(int thread_number, int max_requests) : m_thread_number(thread_number), m_max_request(max_requests), m_stop(false), m_threds(NULL) { if ((thread_number <= 0) || (max_requests <= 0)) { throw std::exception(); } m_threads = new pthread_t[m_thread_number]; if (!m_threads) { throw std::exception(); } //创建thread_number个线程 for (int i = 0; i < thread_number; i++) { printf("create the %d th thread\n",i); if (pthread_create(m_threads + i, NULL, worker, this) != 0) { delete[] m_threads; throw std::exception(); } //线程分离 if (phtread_detach(m_threads[i])) { delete[] m_threads; throw std::exception(); } } } template<class T> threadpool<T>::~threadpool() { delete[] m_threads; m_stop = true; } template<class T> bool threadpool<T>::append(T*request) { //操作队列时,一定要上锁,因为所有的线程是共享进程空间的 m_queuelocker.lock(); if (m_workqueue.size() > m_max_request) { m_queuelocker.unlocker(); return false; } m_workqueue.push_back(request); m_queuelocker.unlocker(); m_queuestat.post(); return true; } template<class T> void* threadpool<T>::worker(void* arg) { threadpool* pool = (threadpool*)arg; pool->run(); return pool; } template<typename T> void threadpool<T>::run() { while (!m_stop) { m_queuestat.wait(); m_queuelocker.locker(); if (m_workqueue.empty()) { m_queuelocker.unlocker(); continue; } T* request = m_workqueue.front(); m_workqueue.pop_front(); m_queuelocker.unlock(); if (!request) { continue; } request->process(); } } #endiflocker.h
#ifndef LOCKER_H #define LOCKER_H #include<exception> #include<semaphore.h> #include<pthread.h> class sem { public: sem() { if (sem_init(&m_sem, 0, 0) != 0) { throw std::exception(); } } ~sem() { sem_destroy(&m_sem); } //等待信号量 bool wait() { return sem_wait(&m_sem)==0; } //增加信号量 bool post() { return sem_post(&m_sem)==0; } private: sem_t m_sem; }; //封装互斥锁 class locker { public: locker() { if (pthread_mutex_init(&m_mutex, NULL) != 0) { throw std::exception(); } } ~locker() { pthread_mutex_destroy(&m_mutex); } //获取互斥锁 bool lock() { return pthread_mutex_lock(&m_mutex)==0; } bool unlock() { return pthread_mutex_unlock(&m_mutex) == 0; } private: pthread_mutex_t m_mutex; }; //封装条件变量 class cond { public: //创建并初始化互斥锁 cond() { if (pthread_mutex_init(&m_mutex, NULL) != 0) { throw std::exception(); } if (pthread_cond_init(&m_cond, NULL) != 0) { //构造函数中一旦出现问题就应该立即释放已经成功分配了的资源 pthread_cond_destroy(&m_mutex); throw std::exception(); } } ~cond() { pthread_mutex_destroy(&m_mutex); pthread_cond_destroy(&m_cond); } //等待条件变量 bool wait() { int ret = 0; pthread_mutex_lock(&m_mutex); ret = pthread_cond_wait(&m_cond, &m_mutex); pthread_mutex_unlock(&m_mutex); return ret == 0; } //唤醒条件变量的线程 bool signal() { return pthread_cond_signal(&m_cond)==0; } private: pthread_mutex_t m_mutex; pthread_cond_t m_cond; }; #endif