New a threadPool
ThreadPoolExecutor executorService = new ThreadPoolExecutor( poolCoreSize, poolMaxSize, timeout, java.util.concurrent.TimeUnit.MILLISECONDS, buff, handler);
poolcoreSize: Pool至少运行的线程数量
poolMaxSize: Pool最多运行的线程数量
timeout:空闲线程发呆时间,如果某线程发呆时间>timeout 而当前线程数量>poolcoreSize,就会终止该线程
buff:线程池所用的缓冲队列 实现BlockingQueue的那几个都能用,能设置定长和无限长度
handler: 当线程池线程数到达poolMaxSize buff 又满了,有新的任务丢到线程池的时候就会call这个hander进行处理.默认提供了几种处理方式,也可以自己实现,例如
public class PoolRejectHandler implements RejectedExecutionHandler { /* (non-Javadoc) * @see java.util.concurrent.RejectedExecutionHandler#rejectedExecution(java.lang.Runnable, java.util.concurrent.ThreadPoolExecutor) */ public void rejectedExecution(Runnable arg0, ThreadPoolExecutor arg1) { //log process is slower then produce //put back to inqueue BaseTask task = (BaseTask)arg0; //task.pushToInQueue(task.getInObject()); System.out.println("!!!!!!overload!!!"); try { task.getInQueue().put(task.getInObject()); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } } }这里我把这个task丢回一开始的任务队列了,这里可以实现自己的处理逻辑
New and excute task
任何实现runnable 或者callable接口的类 都可以作为一个任务丢到线程池里面运行
executorService.execute(processTask);
ThreadPool 调度策略
条件: 线程数<poolcoreSize, buff未满,有新的任务提交 结果:新建线程处理这个任务
条件: 线程数>=poolcoreSize, buff未满,有新的任务提交 结果:把任务丢到buff对列中
条件: poolMaxSize>线程数>=poolcoreSize, buff满,有新的任务提交 结果:新建线程
条件: poolMaxSize=线程数>poolcoreSize, buff满,有新的任务提交 结果:调用handler处理这个任务
这里需要讨论的就是最后那个情况,虽然threadpool提供了handler来处理这种超出负荷的情况,但是如果能从源头就直接控制提交到线程池的任务数量,感觉上会好很多
这里我用到一个Manager类和ObjectPool来处理任务的提交
Manager类实现Runnable 和 DispatchListener 接口 ,在Runnable里面处理任务生成和分发,DispatchListener接口中实现任务对象的回收和结果收集(示意代码)
public class DispathManager implements Runnable, DispatchListener { public void run() { while (true) { try { // get task from taskobjectpool. will wait here if the active taskobjects reach the tasklimit // so we can control the limit of running task in the threadpool BaseTask processTask = (BaseTask) taskPool.borrowObject(); //set task params processTask.setListener(this); // put in threadpool executorService.execute(processTask); } catch (InterruptedException e) { break; } catch (Exception e) { // log dispatch queue exception e.printStackTrace(); } } } public void recycleTask(BaseTask baseTask) { // TODO Auto-generated method stub try { taskPool.returnObject(baseTask); } catch (Exception e) { e.printStackTrace(); } } }
然后在Task执行完毕的时候进行回收
public void run() { try { //task logic here // return task listener.recycleTask(this); } catch (Exception e) { listener.onTaskError(e, inObject); } }
这样就不必每次都重新new 这个task 对一些重型的task,会提高性能
这里用到了ObjectPool,是Apache Common下的管理ObjectPool的类,使用如下:
New a ObjectPool
ObjectPool taskPool = new GenericObjectPool(new TaskPoolFactory(taskClass), taskLimit, GenericObjectPool.WHEN_EXHAUSTED_BLOCK, -1);
其中TaskPoolFactory如下 taskClass是重配置文件读入的
public class TaskPoolFactory extends BasePoolableObjectFactory { /* (non-Javadoc) * @see org.apache.commons.pool.BasePoolableObjectFactory#makeObject() */ private Class c; public TaskPoolFactory(Class c) { super(); this.c = c; } @Override public Object makeObject() throws Exception { // TODO Auto-generated method stub return c.newInstance(); } }
taskLimit是允许借出的最大的数目(借出但是没有归还)
当借出的数目已经达到最大,这个时候还有借出请求 GenericObjectPool.WHEN_EXHAUSTED_BLOCK 就提供了策略(还有几种的) 这里设置为无限等待,也就是线程会wait在这里 直到有借出的task被归还
Objectpool使用参照Manager和task类
最后, 控制taskLimit的大小 就可以控制ThreadPool中处理任务的多少,也就是正在处理的task+buff里的task 必定<= taskLimit,这样就在源头控制了提交任务的数量,保证threadpool不会超负荷。
最后推荐大家的设置就是 ThreadPool buff设置为无限大 然后用poolcoresize和taskLimit来控制负荷能力(视不同机器设置不同的值)
欢迎大家拍砖...
