ThreadPool,ObjectPool和任务控制

xiaoxiao2026-06-12  11

New a threadPool

ThreadPoolExecutor executorService = new ThreadPoolExecutor( poolCoreSize, poolMaxSize, timeout, java.util.concurrent.TimeUnit.MILLISECONDS, buff, handler);

 

poolcoreSize:  Pool至少运行的线程数量

poolMaxSize:   Pool最多运行的线程数量

timeout:空闲线程发呆时间,如果某线程发呆时间>timeout 而当前线程数量>poolcoreSize,就会终止该线程

buff:线程池所用的缓冲队列 实现BlockingQueue的那几个都能用,能设置定长和无限长度

handler: 当线程池线程数到达poolMaxSize buff 又满了,有新的任务丢到线程池的时候就会call这个hander进行处理.默认提供了几种处理方式,也可以自己实现,例如

public class PoolRejectHandler implements RejectedExecutionHandler { /* (non-Javadoc) * @see java.util.concurrent.RejectedExecutionHandler#rejectedExecution(java.lang.Runnable, java.util.concurrent.ThreadPoolExecutor) */ public void rejectedExecution(Runnable arg0, ThreadPoolExecutor arg1) { //log process is slower then produce //put back to inqueue BaseTask task = (BaseTask)arg0; //task.pushToInQueue(task.getInObject()); System.out.println("!!!!!!overload!!!"); try { task.getInQueue().put(task.getInObject()); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } } }

 这里我把这个task丢回一开始的任务队列了,这里可以实现自己的处理逻辑

 

New and excute  task

任何实现runnable 或者callable接口的类 都可以作为一个任务丢到线程池里面运行

executorService.execute(processTask);

 

ThreadPool 调度策略

条件: 线程数<poolcoreSize, buff未满,有新的任务提交   结果:新建线程处理这个任务

条件: 线程数>=poolcoreSize, buff未满,有新的任务提交  结果:把任务丢到buff对列中

条件: poolMaxSize>线程数>=poolcoreSize, buff满,有新的任务提交  结果:新建线程

条件: poolMaxSize=线程数>poolcoreSize, buff满,有新的任务提交  结果:调用handler处理这个任务

 

 

这里需要讨论的就是最后那个情况,虽然threadpool提供了handler来处理这种超出负荷的情况,但是如果能从源头就直接控制提交到线程池的任务数量,感觉上会好很多

 

 

这里我用到一个Manager类和ObjectPool来处理任务的提交

 

Manager类实现Runnable 和 DispatchListener 接口 ,在Runnable里面处理任务生成和分发,DispatchListener接口中实现任务对象的回收和结果收集(示意代码)

public class DispathManager implements Runnable, DispatchListener { public void run() { while (true) { try { // get task from taskobjectpool. will wait here if the active taskobjects reach the tasklimit // so we can control the limit of running task in the threadpool BaseTask processTask = (BaseTask) taskPool.borrowObject(); //set task params processTask.setListener(this); // put in threadpool executorService.execute(processTask); } catch (InterruptedException e) { break; } catch (Exception e) { // log dispatch queue exception e.printStackTrace(); } } } public void recycleTask(BaseTask baseTask) { // TODO Auto-generated method stub try { taskPool.returnObject(baseTask); } catch (Exception e) { e.printStackTrace(); } } }

 

然后在Task执行完毕的时候进行回收

public void run() { try { //task logic here // return task listener.recycleTask(this); } catch (Exception e) { listener.onTaskError(e, inObject); } }

 

这样就不必每次都重新new 这个task 对一些重型的task,会提高性能

 

这里用到了ObjectPool,是Apache Common下的管理ObjectPool的类,使用如下:

 

 

New a ObjectPool

ObjectPool taskPool = new GenericObjectPool(new TaskPoolFactory(taskClass), taskLimit, GenericObjectPool.WHEN_EXHAUSTED_BLOCK, -1);

 

其中TaskPoolFactory如下 taskClass是重配置文件读入的

public class TaskPoolFactory extends BasePoolableObjectFactory { /* (non-Javadoc) * @see org.apache.commons.pool.BasePoolableObjectFactory#makeObject() */ private Class c; public TaskPoolFactory(Class c) { super(); this.c = c; } @Override public Object makeObject() throws Exception { // TODO Auto-generated method stub return c.newInstance(); } }

 

taskLimit是允许借出的最大的数目(借出但是没有归还)

当借出的数目已经达到最大,这个时候还有借出请求 GenericObjectPool.WHEN_EXHAUSTED_BLOCK 就提供了策略(还有几种的) 这里设置为无限等待,也就是线程会wait在这里 直到有借出的task被归还

 

Objectpool使用参照Manager和task类

 

最后, 控制taskLimit的大小 就可以控制ThreadPool中处理任务的多少,也就是正在处理的task+buff里的task 必定<= taskLimit,这样就在源头控制了提交任务的数量,保证threadpool不会超负荷。

 

 

最后推荐大家的设置就是 ThreadPool buff设置为无限大 然后用poolcoresize和taskLimit来控制负荷能力(视不同机器设置不同的值)

 

 

 

欢迎大家拍砖...

转载请注明原文地址: https://www.6miu.com/read-5050003.html

最新回复(0)