1.什么情况使用线程池
通常需要执行多个任务,每个任务的执行时间比较短。例如:listView,GridView,RecyclerView等,每个子条目进入到屏幕时会执行一个任务为子条目进行赋值,当进行滑动同时有多个子条目进入屏幕,每个子条目的具体内容需要网络请求,这时就需要使用线程池进行请求。
2.为什么使用线程池
new Thread(new Runnable(){ public void run(){ //do.sth } }).start();通常使用上述方法创建线程,它在任务结束后,GC会自动回收该线程,因此适用于并发不多的程序中;而如果应用程序需要开启大量的线程处理任务,上述方式将产生如下影响:
1.线程的创建和销毁都需要时间,当有大量的线程创建和销毁时,那么这些时间消耗将比较明显,导致性能缺失 2.大量的线程创建、执行、销毁是非常消耗CPU和内存的,这样将影响系统的吞吐量,导致性能极具下降,如果内存资源占用比较多,很可能会造成OOM 3.大量的线程创建和销毁容易导致GC频繁执行,容易导致页面卡顿针对上述问题,我们可以重用已有的线程,从而减少线程的创建,控制线程的数量。
线程池作用就是限制系统中执行线程的数量。根据系统的环境情况,可以自动或手动设置线程数量,达到运行的最佳效果;用线程池控制线程数量,其他线程排队等候。一个任务执行完毕,再从队列中取最前面的任务开始执行。若队列中没有等待线程,线程池的这一资源处于等待。当一个新任务需要运行时,如果线程池中有等待的工作线程,就可以开始运行了;否则进入等待队列。线程池的优点可以概括为:
1.重用线程池中的线程,避免因为线程的创建和销毁所带来的性能开销 2.能有效控制线程池的最大并发数,避免大量的线程之间因相互抢占系统资源而导致的阻塞现象 3.能够对线程进行简单的管理,并提供定时执行以及指定间隔循环执行的功能ThreadPoolExecutor
ThreadPoolExecutor是线程池的真正实现,它的构造函数提供了一系列参数配置线程池
public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,BlockingQueue<Runnable> workQueue,ThreadFactory threadFactory,RejectedExecutionHandler handler) corePoolSize 线程池的核心线程数,默认情况下,核心线程在线程池中一直存活,即使他们处于闲置状态,直至线程池销毁;而如果配置了allowCoreThreadTimeOut属性为true的话,那么空闲的核心线程也会被销毁了,当它空闲的时间超出了keepAliveTime这个参数规定的时间之后,它就会被销毁掉。 threadPoolExecutor.allowCoreThreadTimeOut(true) 值得注意的是,并不是说只有核心线程才能去执行任务,而是核心线程是最稳定的线程,在默认状态下,它们不会销毁,这样在新的任务需要执行的时候,就会很节省时间,所以核心线程数只需要保证大于0就可以了。 maximumPoolSize 线程池所能容纳的最大线程数,当活动线程数达到此数值后,后续的新任务会被阻塞 keepAliveTime 非核心线程闲置时的超时时长,超过这个时长,非核心线程就会被回收。当ThreadPoolExecuor的allowCoreThreadTimeOut属性为true时,keepAliveTime同样会作用于核心线程。 unit 用于指定keepAliveTime参数的时间单位,这是一个枚举,常用的有TimeUnit.MILLISECONDS(毫秒)、TimeUnit.SECONDS(秒)、TimeUnit.MINUTES(分钟) workQueue 线程池中的任务队列,通过线程池的execute方法提交的Runable对象会储存在这个参数中。 threadFactory 线程工厂,为线程池提供创建新线程的功能。ThreadFactory是一个接口,它只有一个方法Thread newThread(Runable r)。 RejectedExecutionHandler 当线程池无法执行新任务时,可能由于队列已满或者无法执行任务成功,这个时候ThreadPoolExector会调用handler的rejectedExecution方法来通知调用者,默认情况下rejectedExection方法会抛出一个RejectedExecutionException.ThreadPoolExectutor 执行任务时大致遵循以下原则:
1.如果线程池中的线程数量未达到核心线程的数量,那么会直接启动一个核心线程来执行任务 2.如果线程池中的线程数量已经达到或者超过核心线程数量,那么任务会被插入到任务队列中排队等待执行 3.如果步骤2中无法将任务插入到任务队列中,往往是由于队列已满,这时如果线程数量未达到线程池规定的最大值,那么会立即启动一个非核心线程来执行任务 4.如果步骤3中线程数量已经达到线程池规定的最大值,那么就拒绝执行此任务,ThreadPoolExecutor会调用RejectedExecutionHandler的rejectedExecution方法通知调用者线程池分类 FixThreadPool
public static ExecutorService newFixedThreadPool(int nThreads){ return new ThreadPoolExecutor(nThreads,nThreads,0L,TimeUnit.MILLISECOND S,new LinkedBlockingQueue<Runable>()); }通过它的构造方法可以看出,它是一种线程数量固定的线程池,它的核心线程和最大线程是相等的,即该线程池中的所有线程都是核心线程,当所有的线程都处于活动状态时,新任务都会处于等待状态,直到有线程空闲下来。由于FixedThreadPool只有核心线程并且这些核心线程不会被回收,这意味着它能更加快速的响应外界需求,它没有超时机制,任务队列是无边界的任务队列,也就是可以添加无上限的任务,但是都会排队执行。
ExecutorService fixedThreadPool = Executors.newFixedThreadPool(3); for(int i = 0 ;i<10;i++){ final int index = i; fixedThreadPool.execute(new Runnable() { @Override public void run() { String threadNmae = Thread.currentThread().getName(); Log.v("zxy", "线程:"+threadNmae+",正在执行第"+index+"个任务"); try { Thread.sleep(2000); } catch (InterruptedException e) { e.printStackTrace(); } } }); }可以看到一开始就会执行3个任务,而后面的7个任务都会进入等待状态,当有核心线程空闲时,就会从队列中按照FIFO(先进先出)的策略取出一个任务进行执行,所以除了前三个任务,剩下的任务是按照顺序执行的。
CachedThreadPool
public static ExecutorService newCachedThreadPool(){ return new ThreadPoolExecutor(0,Integer.MAX_VALUE,60L,TimeUnit.SECONDS,n ew SychronousQueue<Runnable>()); }通过它的实例方法可以看出它的核心线程数是0也就是说该线程池并没有核心线程,而它的最大线程数是int类型的的上限,那么我们可以理解为该线程池的最大线程数是没有上限的,也就是说可以无限的创建线程。那么当新任务向线程池中提交的时候,如果有空闲线程,就会把任务放到空闲线程中去,如果没有空闲线程,就会开启一个新的线程来执行此任务,线程池中的空闲线程有超时机制,这个超时时常是60秒,超过60秒闲置的线程就会被回收,此类线程池比较适合执行大量的耗时比较少的任务,当整个线程池都处于空闲状态,线程池中的线程都会超时而被停止,此时CachedThreadPool之中是没有任何线程的,它几乎不占用系统资源。
ExecutorService cacheThreadPool = Executors.newCachedThreadPool(); for(int i = 0;i<10;i++){ final int index = i; try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } cacheThreadPool.execute(new Runnable() { @Override public void run() { String threadName = Thread.currentThread().getName(); Log.v("TAG", "线程:" + threadName + ",正在执行第"+index + "个任务"); long time = index*500; try { Thread.sleep(time); } catch (InterruptedException e) { e.printStackTrace(); } } }); }ScheduledThreadPool
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize){ return new ScheduledThreadPoolExecutor(corePoolSize); } public ScheduledThreadPoolExecutor(int corePoolSize){ super(corePoolSize,Integer.MAX_VALUE,0,NANOSECONDS,new DelayedWorkQueue ()); }它的核心线程是固定的,而非核心线程没有限制,并且当非核心线程闲置时会被立即回收,ScheduledThreadPool这类线程主要用于执行定时任务和具有固定周期的重复任务,newScheduledThreadPool方法实现:
scheduledThreadPool = Executors.newScheduledThreadPool(3); scheduledThreadPool.scheduleAtFixedRate(new Runnable() { @Override public void run() { System.out.println("执行任务"); } },4,1, TimeUnit.SECONDS);我们设置了在提交任务时,需要延迟4s才会第一次执行,同时在任务执行完毕后每隔1s又会重复的执行一次该任务
SingleThreadExecutor
public static ExecutorService newSingleThreadExecutor(){ return new FinalzableDelegatedExecutorService(new ThreadPoolExecutor(1,1 ,0L,TimeUnit.MILLISECOUNDS,new LinkedBlockingQueue<Runable>())); }此线程池的核心线程和最大线程都是固定的且均为1,每次只能执行一个任务,多余的任务保存到一个队列中,等到执行完成后,再按照顺序依次执行队列中的任务。
ExecutorService singleThreadPool = Executors.newSingleThreadExecutor(); for(int i=0;i<10;i++){ final int index = i; singleThreadPool.execute(new Runnable() { @Override public void run() { string threadName = Thread.currentThread().getName(); Log.v("TAG", "线程:" + threadName + ",正在执行第" + index + " 个任务"); } }); }仔细观察可以发现上面不同的线程池,其实是有不同的队列,决定不同的线程池执行不同的功能是由不同的队列决定的,它是一个实现了BlockingQueue<Runable>对象,泛型设置是只可以存放Runable对象,在这个接口里规定了加入或取出的方法,其中常见的有
LinkedBlockingQueue:无界的队列-----FixedThreadPool,SingleThreadExecutor SynchronousQueue:直接提交的队列-----CachedThreadPool DelayedWorkQueue:等待队列-----ScheduledThreadPool PriorityBlockingQueue:优先级队列 ArrayBlockingQueue:有界的队列无界队列:使用无界队列(例如 LinkedBlockingQueue)将导致在所有 corePoolSize 线程都忙的情况下将新任务加入队列。这样,创建的线程就不会超过 corePoolSize,(因此,maximumPoolSize 的值也就无效了。),此队列按照FIFO(先进先出)策略,排列元素,当每个任务完全独立于其他任务,即任务执行互不影响时,适合于使用无界队列。
有界队列:当使用有限的 maximumPoolSizes 时,有界队列(如 ArrayBlockingQueue)有助于防止资源耗尽,但是可能较难调整和控制。队列大小和最大池大小可能需要相互折衷:使用大型队列和小型池可以最大限度地降低 CPU 使用率、操作系统资源和上下文切换开销,但是可能导致人工降低吞吐量。如果任务频繁阻塞(例如,如果它们是 I/O 边界),则系统可能为超过您许可的更多线程安排时间。使用小型队列通常要求较大的池大小,CPU 使用率较高,但是可能遇到不可接受的调度开销,这样也会降低吞吐量,此队列也按照FIFO(先进先出)策略,排列元素。
直接提交:这个队列会把任务直接提交给线程而不保持它们。在此,如果不存在可用于立即运行任务的线程,则试图把任务加入队列将失败,因此会构造一个新的线程。此策略可以避免在处理可能具有内部依赖性的请求集时出现锁。直接提交通常要求无界 maximumPoolSizes 以避免拒绝新提交的任务。当命令以超过队列所能处理的平均数连续到达时,此策略允许无界线程具有增长的可能性。简单说来,这种队列就是没什么用,走个过场而已,所以使用这个队列的时候,线程池的最大线程数一般是无上限的,也就是int类型的最大值
优先级队列:这种队列在向线程池中提交任务的时候会检测每一个任务的优先级,会先把优先级高的任务扔到线程池中
等待队列:DelayQueue(基于PriorityQueue来实现的)是一个存放Delayed 元素的无界阻塞队列,只有在延迟期满时才能从中提取元素。该队列的头部是延迟期满后保存时间最长的 Delayed 元素。如果延迟都还没有期满,则队列没有头部,并且poll将返回null。当一个元素的 getDelay(TimeUnit.NANOSECONDS) 方法返回一个小于或等于零的值时,则出现期满,poll就以移除这个元素了。此队列不允许使用 null 元素。
自定义线程池:
自定义线程池主要取决于使用不同的BlockingQueue,以下是使用优先级队列的自定义线程池,首先我们写个抽象类:
public abstract class PriorityRunnable implements Runnable,Comparable<PriorityRunnable> { private int priority; public PriorityRunnable(int priority){ if(priority<0){ throw new IllegalArgumentException(); } this.priority = priority; } @Override public int compareTo(@NonNull PriorityRunnable another) { int num = another.getPriority()-priority; return num; } private int getPriority(){ return priority; } }首先创建一个实现Runnable接口的类,并实现Comparable接口,实现这个接口主要就是进行优先级的比较,来告诉队列到底谁大谁小,这里我们自己写了一个int类型的变量代表每一个任务的优先级,然后复写compareTo方法来写我们的比较条件,这里要注意的是,当我们自己去写的时候,不一定非要指定一个int类型的变量,也可以是其他的例如String等的,只要实现了Comparable接口就可以了。接下来我们就可以使用这个任务了,代码如下
ExecutorService priorityThreadPool = new ThreadPoolExecutor(3,3,0L,TimeUnit.SECONDS,new PriorityBlockingQueue<Runnable>()); for(int i = 1;i<=10;i++){ final int priority = i; priorityThreadPool.execute(new PriorityRunnable(priority) { @Override public void run() { String threadName = Thread.currentThread().getName(); Log.v("TAG", "线程:" + threadName + ",正在执行优先级为:" + priori ty + "的任务"); try { Thread.sleep(2000); } catch (InterruptedException e) { e.printStackTrace(); } } }); }我们这里创建的是核心线程为3的线程池,因此前三个任务就直接在线程池中运行,后面7个任务加入到优先级队列中排队,通过按照设置的比较方式进行比较,优先级高的会先执行任务。 优点:创建一个优先级线程池非常有用,它可以在线程池中线程数量不足或系统资源紧张时,优先处理我们想要先处理的任务,而优先级低的则放到后面再处理,这极大改善了系统默认线程池以FIFO方式处理任务的不灵活。
线程池的其他方法:
beforeExecute()--------任务执行前执行的方法 afterExecute() ---------任务执行结束后执行的方法 terminated() :线程池关闭后执行的方法 public class MyThreadPoolExecutor extends ThreadPoolExecutor { public MyThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) { super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue); } @Override protected void beforeExecute(Thread t, Runnable r) { super.beforeExecute(t, r); String threadName = t.getName(); Log.v("TAG", "线程:" + threadName + "准备执行任务!"); } @Override protected void afterExecute(Runnable r, Throwable t) { super.afterExecute(r, t); String threadName = Thread.currentThread().getName(); Log.v("TAG", "线程:" + threadName + "任务执行结束!"); } @Override protected void terminated() { super.terminated(); Log.v("TAG", "线程池结束!"); } }在使用线程池时,任务的提交方式是由于队列决定的,但是任务队列里的方法比较多,复写起来比较麻烦,而且官方也建议我们使用它提供给我们的几种线程池,有时我们的队列需要动态的调整FIFO和LIFO的策略,或者我们提交的策略非常复杂,系统默认的满足不了我们怎么办,通常我们的做法是自己完全重新写一个任务队列,并不实现线程池的接口,也不作为任务队列参数放到线程池中,而我们所有的任务都会先提交到我们自己定义的这个队列中来,然后当线程池空闲的时候再提交到线程池中去。
public class MyThreadPool extends ThreadPoolExecutor { private volatile Semaphore semaphore; private List runnableList; private boolean flag; private LoopThread loopThread; private OutWay outWay; public MyThreadPool(int corePoolSize) { super(corePoolSize,corePoolSize,0L,TimeUnit.SECONDS,new LinkedBlock ingQueue<Runnable>()); semaphore = new Semaphore(corePoolSize); runnableList = new LinkedList(); flag = true; //默认先进后出 outWay = OutWay.FIFO; loopThread = new LoopThread(); loopThread.start(); } enum OutWay{ FIFO,LIFO; } @Override public synchronized void execute(Runnable command) { runnableList.add(command); if(runnableList.size()<2){ //如果这是队列中的第一个任务,那么就去唤醒轮询线程 synchronized (loopThread){ loopThread.notifyAll(); } } } //设置是FIFO/LIFO public void setWay(OutWay outWay){ this.outWay = outWay; } @Override protected void afterExecute(Runnable r, Throwable t) { super.afterExecute(r, t); //释放信号量 semaphore.release(); } @Override protected void terminated() { super.terminated(); //关闭线程 flag = false; } class LoopThread extends Thread{ @Override public void run() { super.run(); while (flag){ if(runnableList.size() == 0){ try { //如果没有任务,线程就等待 synchronized (this){ wait(); } } catch (InterruptedException e) { e.printStackTrace(); } }else { try { //请求信号量 semaphore.acquire(); int index = runnableList.size(); switch (outWay){ case FIFO: //先进先出 index = 0; break; case LIFO: //先进后出 index = runnableList.size()-1; break; } //调用父类的添加方法,将任务添加到线程池中 MyThreadPool.super.execute((Runnable) runnableList. get(index)); runnableList.remove(index); } catch (InterruptedException e) { e.printStackTrace(); } } } } } } handler = new Handler(new Handler.Callback() { @Override public boolean handleMessage(Message msg) { mainTv.setText("任务:"+msg.what); return false; } }); myThreadPool = new MyThreadPool(1); for(int i = 0; i<100;i++){ final int index = i; myThreadPool.execute(new Runnable() { @Override public void run() { handler.sendEmptyMessage(index); try { Thread.sleep(100); } catch (InterruptedException e) { e.printStackTrace(); } } }); } FIFO.setOnClickListener(new View.OnClickListener() { @Override public void onClick(View v) { myThreadPool.setWay(MyThreadPool.OutWay.FIFO); } }); LIFO.setOnClickListener(new View.OnClickListener() { @Override public void onClick(View v) { myThreadPool.setWay(MyThreadPool.OutWay.LIFO); } });