@(并发)
线程池优点:
1.降低资源消耗。 通过重复利用已创建的线程不停工作,减少创建销毁线程的资源消耗。 2.提高响应的速度。 当线程池预热完毕后,新任务来直接添加到任务队列中,当任务到达,核心线程可以立即执行而不用等到创建线程 。 3.提高线程的可管理度。 线程是稀缺的资源,如果系统无限制的创建,不断会损耗资源,还会降低系统稳定性,而线程池将这些线程合理的管理了起来,统一分配、调优和监控。
线程池组件: - 线程池管理器:负责启动关闭线程池、创建核心线程、添加任务到队列,监控线程池 - 阻塞任务队列:负责存放任务 - 一组工作者线程:负责从任务队列中取出任务来处理。

解析: 1. 若线程池里当前运行的线程数小于核心线程数时,则创建新线程处理任务。(预热) 2. 当运行的线程数大于或者等于核心线程数时,则将任务添加到阻塞队列中。 3. 当阻塞队列满时,则创建新的下线程直到最大线程数,来处理任务 4. 当运行的线程数到达最大线程数时,任务将被拒绝,执行RejectedExecutionHandler.rejectedExecution()方法
向线程池提交任务主要是调用execute方法:
public void execute(Runnable command) { if (command == null) throw new NullPointerException(); //当线程数小于核心线程数时,直接创建线程执行任务。 if (poolSize >= corePoolSize || !addIfUnderCorePoolSize(command)) { //当线程数大于等于核心线程数或者线程创建失败时,将任务添加到队列中 if (runState == RUNNING && workQueue.offer(command)) { if (runState != RUNNING || poolSize == 0) ensureQueuedTaskHandled(command); } //当线程池不处于运行中状态或者任务队列已满无法入队,并且当前线程数小于最大线程数时,创建个新线程执行任务 else if (!addIfUnderMaximumPoolSize(command)) //若线程数大于最大线程数了,线程池已饱和,执行丢弃策略。 reject(command); // is shutdown or saturated } }当线程数小于核心线程数时,直接创建新线程执行任务,同时:
将新的worker添加到线程池工作者线程集合set里poolSize当前线程数++统计线程池达到的最大线程数largestPoolSize /** * 创建新线程,addThread() * 这里要获取到线程池全局的锁mainLock,因为要对共享变量操作poolSize ,corePoolSize ,runState等 */ private boolean addIfUnderCorePoolSize(Runnable firstTask) { Thread t = null; final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { if (poolSize < corePoolSize && runState == RUNNING) t = addThread(firstTask); } finally { mainLock.unlock(); } return t != null; } /** * 创建新线程: * 1.将任务包装成工作者线程Worker * 2.将新的Worker添加到线程池工作者线程set里 * 3.poolSize当前线程数++ * 4.统计线程池达到的最大线程数largestPoolSize */ private Thread addThread(Runnable firstTask) { //将任务包装成工作者线程Worker Worker w = new Worker(firstTask); Thread t = threadFactory.newThread(w); boolean workerStarted = false; if (t != null) { if (t.isAlive()) // precheck that t is startable throw new IllegalThreadStateException(); w.thread = t; //将新的Worker添加到线程池工作者线程set里 workers.add(w); //poolSize当前线程数++ int nt = ++poolSize; //统计线程池达到的最大线程数largestPoolSize if (nt > largestPoolSize) largestPoolSize = nt; try { t.start(); workerStarted = true; } finally { if (!workerStarted) workers.remove(w); } } return t; }创建好的工作者线程会循环获取工作队列中任务执行:
private final class Worker implements Runnable { /** * Main run loop */ public void run() { try { hasRun = true; Runnable task = firstTask; firstTask = null; //getTask() 循环获取工作队列中任务执行poll(),take() while (task != null || (task = getTask()) != null) { runTask(task); task = null; } } finally { workerDone(this); } } }当线程池刚被创建时的状态:
volatile int runState; static final int RUNNING = 0;当线程池中运行的线程数为0,并且此时线程池状态为STOP或者SHUTDOWN,线程池调用tryTerminate()方法,线程最终会变为TERMINATED状态:
//尝试结束线程池方法 //1.工作者线程关闭时并且此时poolSize==0,会调用 //2.shutdown()关闭线程池方法,会调用 //3.shutdownNow()关闭线程池方法,会调用 private void tryTerminate() { //当线程池中运行的线程数为0 if (poolSize == 0) { int state = runState; if (state < STOP && !workQueue.isEmpty()) { state = RUNNING; // disable termination check below addThread(null); } //并且此时线程池状态为STOP或者SHUTDOWN if (state == STOP || state == SHUTDOWN) { //设置线程池状态为TERMINATED runState = TERMINATED; termination.signalAll(); terminated(); } } }用ThreadPoolExecutor构造方法创建线程池:
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) { this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, Executors.defaultThreadFactory(), defaultHandler); } public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) { if (corePoolSize < 0 || maximumPoolSize <= 0 || maximumPoolSize < corePoolSize || keepAliveTime < 0) throw new IllegalArgumentException(); if (workQueue == null || threadFactory == null || handler == null) throw new NullPointerException(); this.corePoolSize = corePoolSize; this.maximumPoolSize = maximumPoolSize; this.workQueue = workQueue; this.keepAliveTime = unit.toNanos(keepAliveTime); this.threadFactory = threadFactory; this.handler = handler; } //声明一个线程池 public static void main(String[] args) throws ExecutionException, InterruptedException { ThreadPoolExecutor pool = new ThreadPoolExecutor(5, 200, 100, TimeUnit.MINUTES, new LinkedBlockingQueue<Runnable>(2000)); }当有一个任务提交给线程池时,若此时运行的线程数小于corePoolSize核心线程数,则创建线程。如果调用prestartAllCoreThreads()方法,线程池会提前创建并启动核心的基本线程。
public int prestartAllCoreThreads() { int n = 0; //不停的向线程中添加线程数,知道poolSize大于等于核心线程数 while (addIfUnderCorePoolSize(null)) ++n; return n; }线程池允许创建启动的最大线程数。由于生产者生产速度远远高于消费者消费速度时,会造成任务队列充满,若此时已创建的线程数<最大线程数,则线程池继续创建新的线程执行任务。(注意maximumPoolSize对于使用无界的任务队列的线程池无效)
当线程数大于核心线程数或者allowCoreThreadTimeOut为true(表示核心线程要超时等待了,不在一直阻塞take了),此时工作者线程向任务队列取任务时,若队列为空了直接返回,该工作者线程直接销毁,但是设置了keepAliveTime,可以让worker线程多等待keepAliveTime时间,保持一定的线程空闲活动时间,提高线程的利用率。
//线程池构造函数中,keepAliveTime计算单位纳秒 this.keepAliveTime = unit.toNanos(keepAliveTime); //每个工作者线程运行的run方法 public void run() { try { hasRun = true; Runnable task = firstTask; firstTask = null; while (task != null || (task = getTask()) != null) { runTask(task); task = null; } } finally { workerDone(this); } } //每个工作者不停的要从任务队列中获取任务 Runnable getTask() { for (;;) { try { int state = runState; if (state > SHUTDOWN) return null; Runnable r; if (state == SHUTDOWN) // Help drain queue r = workQueue.poll(); else if (poolSize > corePoolSize || allowCoreThreadTimeOut) //超时等待获取workQueue任务队列元素。 r = workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS); else r = workQueue.take(); if (r != null) return r; if (workerCanExit()) { if (runState >= SHUTDOWN) // Wake up others interruptIdleWorkers(); return null; } // Else retry } catch (InterruptedException ie) { // On interruption, re-check runState } } }用于保存等待执行任务的阻塞队列。可以是下面几种阻塞队列: - ArrayListBlockingQueue:是一个由数组组成的有界阻塞队列,此队列按FIFO先进先出的顺序对元素进行排序。 - LinkedBlockingQueue:是一个由链表组成的有界阻塞队列,此队列按FIFO先进先出,吞吐量要高于ArrayListBlockingQueue。由静态工厂方法Executors.newFixedThreadPool( )使用了此队列。 - SynchronousQueue:一个不存储元素的阻塞队列,队列的每个插入操作必须要等到令一个元素的移除操作,否则插入操作一直处于等待状态,吞吐量要高于LinkedBlockingQueue。由静态工厂方法Executors.newCachedThreadPool( )使用了此队列。 - PriorityBlockingQueue:一个具有优先级的无界队列。 - DelayQueue:一个由优先级队列实现的无界延时阻塞队列。由静态工厂方法Executors.newScheduledThreadPool( )使用了此队列.
当线程池要创建新的线程时,都需要调用ThreadFactory .newThread(Runnable r)方法创建线程,目的是创建名称有意义的线程。下面看下DefaultThreadFactory 的实现newThread。
static class DefaultThreadFactory implements ThreadFactory { //线程池序号原子序列,申明static,poolNumber无状态,所有对象共享此变量 static final AtomicInteger poolNumber = new AtomicInteger(1); final ThreadGroup group; //线程池中线程原子序列,有状态变量,每个ThreadFactory 对象不同 final AtomicInteger threadNumber = new AtomicInteger(1); final String namePrefix; //初始化线程池名称pool-1-thread- DefaultThreadFactory() { SecurityManager s = System.getSecurityManager(); group = (s != null)? s.getThreadGroup() : Thread.currentThread().getThreadGroup(); namePrefix = "pool-" + poolNumber.getAndIncrement() + "-thread-"; } //不断的从线程池中创建线程pool-1-thread-1,pool-1-thread-2,pool-1-thread-3..... public Thread newThread(Runnable r) { Thread t = new Thread(group, r, namePrefix + threadNumber.getAndIncrement(), 0); if (t.isDaemon()) t.setDaemon(false); if (t.getPriority() != Thread.NORM_PRIORITY) t.setPriority(Thread.NORM_PRIORITY); return t; } }当任务队列已满,并且运行的线程数大于或者等于线程池的最大线程数时,此时线程池已饱和了无法接收外部的任务了,此时需要执行饱和策略。线程池默认AbortPolicy,直接抛出throw new RejectedExecutionException()抛异常。当然线程池提供一下四种策略:
AbortPolicy:直接抛出异常。 public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { throw new RejectedExecutionException(); } CallerRunsPolicy:让当前提交任务的线程直接执行任务。 public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { if (!e.isShutdown()) { r.run(); } } DiscardPolicy:不处理,丢弃掉。 public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { } DiscardOldestPolicy:将队头任务丢弃掉(最先提交的马上要出队的任务),在执行任务。 public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { if (!e.isShutdown()) { e.getQueue().poll(); e.execute(r); } }execute方法用于提交不需要返回值的任务,所以无法判断任务是否被线程池的线程执行成功。
//1.声明一个线程池 ThreadPoolExecutor pool = new ThreadPoolExecutor(5, 200, 100, TimeUnit.MINUTES, new LinkedBlockingQueue<Runnable()); //2.提交不需要返回值的任务 pool.execute(new Runnable() { public void run() { System.out.println("poll execute"); } });submit方法用于提交有返回值的任务,线程池会返回一个代表异步计算的结果的FutureTask类型的对象(下面介绍),可以通过这个future对象判断任务是否执行成功,调用future.get()会阻塞当前线程直到任务完成,而使用get(long timeout, TimeUnit unit)方法则会阻塞当前线程一段时间,超时候方法立马返回,可能此时任务没有执行完。
Future<?> future = pool.submit(new Runnable() { public void run() { System.out.println("poll submit"); } }); //当任务异步执行完,future.get()方法才会返回 if(future.get()!=null){ System.out.println("end"); }注意:
通过调用线程池的shutdown和shutdownNow来关闭线程池。他们的原理是,遍历线程池中所有的工作者线程,然后一个个中断,所以无法响应中断的任务可能永远无法停止了。shutdown和shutdownNow的区别是:shutdown将线程池状态设置成SHUTDOWN状态,然后中断所有没有正在执行任务的的线程,其他运行中的线程保持继续工作;而shutdownNow将线程池状态设置成STOP状态,尝试停止中断所有正在执行或者暂停任务的线程,任务队列任务不在消费执行。shutdown和shutdownNow**最终会让线程池状态变为TERMINATED状态**。通常调用shutdown关闭线程池,如果任务不一定要求全部执行完,可以用shutdownNow。