Java并发编程(八)《线程池》

xiaoxiao2021-02-27  188

Java并发编程八线程池 1 为什么使用线程池2 线程池是什么结构怎样的怎么工作 21 线程池模型22 线程池工作原理23 ThreadPoolExecutor源码分析 231 数据结构232 运行原理233 线程池全局锁mainLock234 任务队列workQueue235 工作者线程集合workers236 线程池运行状态runState 1 RUNNING2 SHUTDOWN3 STOP4 TERMINATED5 runState运行状态对比 3 线程池的使用 31 线程池的创建 1 构造方法2 corePoolSize核心线程数3 maximumPoolSize最大线程数4 keepAliveTimeunit线程活动保持时间5 workQueue线程安全的阻塞队列 6 threadFactory线程名称工厂7 RejectedExecutionHandler 任务饱和策略 32 向线程池提交任务 321 execute 322 submit 33 关闭线程池 331 shutdown332 shutdownNow333 isShutdown334 isTerminated 4 合理运用线程池 41 线程池的任务队列建议使用有界阻塞队列42 大量使用线程池时要对其监控

Java并发编程(八)《线程池》

@(并发)

8.1 为什么使用线程池?

线程池优点:

1.降低资源消耗。   通过重复利用已创建的线程不停工作,减少创建销毁线程的资源消耗。 2.提高响应的速度。   当线程池预热完毕后,新任务来直接添加到任务队列中,当任务到达,核心线程可以立即执行而不用等到创建线程 。 3.提高线程的可管理度。   线程是稀缺的资源,如果系统无限制的创建,不断会损耗资源,还会降低系统稳定性,而线程池将这些线程合理的管理了起来,统一分配、调优和监控。

8.2 线程池是什么,结构怎样的,怎么工作?

8.2.1 线程池模型

线程池组件: - 线程池管理器:负责启动关闭线程池、创建核心线程、添加任务到队列,监控线程池 - 阻塞任务队列:负责存放任务 - 一组工作者线程:负责从任务队列中取出任务来处理。

8.2.2 线程池工作原理

![Alt text](./线程池工作原理 1.png)

解析: 1. 若线程池里当前运行的线程数小于核心线程数时,则创建新线程处理任务。(预热) 2. 当运行的线程数大于或者等于核心线程数时,则将任务添加到阻塞队列中。 3. 当阻塞队列满时,则创建新的下线程直到最大线程数,来处理任务 4. 当运行的线程数到达最大线程数时,任务将被拒绝,执行RejectedExecutionHandler.rejectedExecution()方法

8.2.3 ThreadPoolExecutor源码分析

8.2.3.1 数据结构

public class ThreadPoolExecutor extends AbstractExecutorService { /** * Permission for checking shutdown */ private static final RuntimePermission shutdownPerm =new RuntimePermission("modifyThread"); /** * 线程池运行状态 */ volatile int runState; static final int RUNNING = 0; static final int SHUTDOWN = 1; static final int STOP = 2; static final int TERMINATED = 3; /* * 阻塞任务队列 */ private final BlockingQueue<Runnable> workQueue; /** * 持有锁,为了更新poolSize, corePoolSize,maximumPoolSize, runState, and workers set. */ private final ReentrantLock mainLock = new ReentrantLock(); /** * Wait condition to support awaitTermination */ private final Condition termination = mainLock.newCondition(); /** * 工作者线程集合 */ private final HashSet<Worker> workers = new HashSet<Worker>(); /** * 工作者线程超时等待任务队列不为空的时间 */ private volatile long keepAliveTime; /* * 若false,核心线程永远存活;true,核心线程只会超时等待,然后销毁 */ private volatile boolean allowCoreThreadTimeOut; /** * 核心线程数 volatile 具有并发可见性 */ private volatile int corePoolSize; /** * 最大线程数,volatile 具有并发可见性 */ private volatile int maximumPoolSize; /** * 当前线程数,volatile 具有并发可见性 */ private volatile int poolSize; /** * 饱和策略 */ private volatile RejectedExecutionHandler handler; /** * 线程池线程的创建都要通过threadFactory获取线程名称 */ private volatile ThreadFactory threadFactory; /** * 跟踪线程池达到的最大数。 */ private int largestPoolSize; /** * 统计线程池完成的任务数. 当一个工作者线程要结束被销毁时,将这个工作者线程执行的任务数加入到completedTaskCount * completedTaskCount += worker.completedTasks */ private long completedTaskCount; /** * 默认的饱和策略,直接抛异常 */ private static final RejectedExecutionHandler defaultHandler = new AbortPolicy(); }

8.2.3.2 运行原理

向线程池提交任务主要是调用execute方法:

public void execute(Runnable command) { if (command == null) throw new NullPointerException(); //当线程数小于核心线程数时,直接创建线程执行任务。 if (poolSize >= corePoolSize || !addIfUnderCorePoolSize(command)) { //当线程数大于等于核心线程数或者线程创建失败时,将任务添加到队列中 if (runState == RUNNING && workQueue.offer(command)) { if (runState != RUNNING || poolSize == 0) ensureQueuedTaskHandled(command); } //当线程池不处于运行中状态或者任务队列已满无法入队,并且当前线程数小于最大线程数时,创建个新线程执行任务 else if (!addIfUnderMaximumPoolSize(command)) //若线程数大于最大线程数了,线程池已饱和,执行丢弃策略。 reject(command); // is shutdown or saturated } }

当线程数小于核心线程数时,直接创建新线程执行任务,同时:

将新的worker添加到线程池工作者线程集合set里poolSize当前线程数++统计线程池达到的最大线程数largestPoolSize /** * 创建新线程,addThread() * 这里要获取到线程池全局的锁mainLock,因为要对共享变量操作poolSize ,corePoolSize ,runState等 */ private boolean addIfUnderCorePoolSize(Runnable firstTask) { Thread t = null; final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { if (poolSize < corePoolSize && runState == RUNNING) t = addThread(firstTask); } finally { mainLock.unlock(); } return t != null; } /** * 创建新线程: * 1.将任务包装成工作者线程Worker * 2.将新的Worker添加到线程池工作者线程set里 * 3.poolSize当前线程数++ * 4.统计线程池达到的最大线程数largestPoolSize */ private Thread addThread(Runnable firstTask) { //将任务包装成工作者线程Worker Worker w = new Worker(firstTask); Thread t = threadFactory.newThread(w); boolean workerStarted = false; if (t != null) { if (t.isAlive()) // precheck that t is startable throw new IllegalThreadStateException(); w.thread = t; //将新的Worker添加到线程池工作者线程set里 workers.add(w); //poolSize当前线程数++ int nt = ++poolSize; //统计线程池达到的最大线程数largestPoolSize if (nt > largestPoolSize) largestPoolSize = nt; try { t.start(); workerStarted = true; } finally { if (!workerStarted) workers.remove(w); } } return t; }

创建好的工作者线程会循环获取工作队列中任务执行:

private final class Worker implements Runnable { /** * Main run loop */ public void run() { try { hasRun = true; Runnable task = firstTask; firstTask = null; //getTask() 循环获取工作队列中任务执行poll(),take() while (task != null || (task = getTask()) != null) { runTask(task); task = null; } } finally { workerDone(this); } } }

8.2.3.3 线程池全局锁mainLock

全局锁的申明: private final ReentrantLock mainLock = new ReentrantLock(); 当线程池创建新线程时,要更新线程池的poolSize工作线程数,largestPoolSize到达的最大线程数,将新创建的工作者线程woker加入到wokers(工作者集合中),此时对这些线程池持有的共享变量的修改,需要全局锁定: //当线程数<核心线程时,需要创建新线程,此时要加全局锁。 private boolean addIfUnderCorePoolSize(Runnable firstTask) { Thread t = null; final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { if (poolSize < corePoolSize && runState == RUNNING) t = addThread(firstTask); } finally { mainLock.unlock(); } return t != null; }

8.2.3.4 任务队列workQueue

workQueue的申明: private final BlockingQueue<Runnable> workQueue; 生产–当线程池预热后,即运行的线程数>=核心线程数,此时添加的任务直接加入到任务队列中(workQueue.offer): //线程池添加Runnable 任务,调用execute public void execute(Runnable command) { //.....省略非关键代码 if (poolSize >= corePoolSize || !addIfUnderCorePoolSize(command)) { if (runState == RUNNING && workQueue.offer(command)) { //.....省略非关键代码 } //.....省略非关键代码 } } 消费–运行的工作者线程循环的从任务队列中poll或者take任务来消费: private final class Worker implements Runnable { public void run() { //.....省略非关键代码 //工作者线程循环的从任务队列中获取任务 while (task != null || (task = getTask()) != null) { runTask(task); } } } //从workQueue获取执行的任务 Runnable getTask() { //.....省略非关键代码 for (;;) { try { //若线程池SHUTDOWN状态,则不停poll取出任务,知道任务队列为空,线程关闭,线程池跟着关闭。 if (state == SHUTDOWN) // Help drain queue r = workQueue.poll(); //若线程池线程数大于基本的线程数,超时获取任务,若等待了keepAliveTime,队列为空,则该工作者线程会销毁结束 else if (poolSize > corePoolSize || allowCoreThreadTimeOut) r = workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS); else //若线程池线程数小于或等于基本的线程数(一般线程池就处于此状态),此时任务队列为空,工作者线程会一直阻塞在这里,不会关闭线程。 r = workQueue.take(); if (r != null) return r; } }

8.2.3.5 工作者线程集合workers

workers的申明: private final HashSet<Worker> workers = new HashSet<Worker>(); 添加–当创建新线程时,要包装成新的worker加入到HashSet workers中(workers.add): //线程池中添加新线程 private Thread addThread(Runnable firstTask) { Worker w = new Worker(firstTask); Thread t = threadFactory.newThread(w); boolean workerStarted = false; if (t != null) { //.....省略非关键代码 //新的worker加入到workers中 workers.add(w); //启动新线程 t.start(); //.....省略非关键代码 } return t; } 删除–当某个工作者线程关闭时,跳出不断获取任务队列元素的循环时,需要在workers将此woker去除掉remove : public void run() { try { //.....省略非关键代码 while (task != null || (task = getTask()) != null) { runTask(task); //.....省略非关键代码 } } finally { //工作者线程关闭 workerDone(this); } } /** * 工作者线程关闭: * 1.获取线程池全局锁 * 2.将该工作者完成的任务数加到线程池的完成任务数上 * 3.从线程池的工作者线程Set中移除 * 4.当前线程池工作者线程数减一 */ void workerDone(Worker w) { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { completedTaskCount += w.completedTasks; workers.remove(w); if (--poolSize == 0) tryTerminate(); } finally { mainLock.unlock(); } }

8.2.3.6 线程池运行状态runState

(1) RUNNING

当线程池刚被创建时的状态:

volatile int runState; static final int RUNNING = 0;
(2) SHUTDOWN
static final int SHUTDOWN = 1; 当线程池处于SHUTDOWN状态时,此时线程池不在接收新任务: //线程池接受任务execute public void execute(Runnable command) { //.....省略非关键代码 if (poolSize >= corePoolSize || !addIfUnderCorePoolSize(command)) { //线程池处于SHUTDOWN状态时,此时线程池不在接收新任务 if (runState == RUNNING && workQueue.offer(command)) { if (runState != RUNNING || poolSize == 0) //.....省略非关键代码 } else if (!addIfUnderMaximumPoolSize(command)) //.....省略非关键代码 } } 当线程池处于SHUTDOWN状态时,工作者线程还可以获取处理任务: //工作者线程循环获取任务方法 Runnable getTask() { for (;;) { //.....省略非关键代码 int state = runState; if (state > SHUTDOWN) return null; Runnable r; //当线程池处于SHUTDOWN状态时,不断从任务队列获取任务,帮助消费掉任务,好让worker线程关闭 if (state == SHUTDOWN) // Help drain queue r = workQueue.poll(); //.....省略非关键代码 if (r != null) return r; //当线程池处于SHUTDOWN状态,此时任务队列被消费完为空了,该工作者线程可以退出关闭了 //当线程池处于SHUTDOWN状态,不断有工作中的worker关闭,最终导致所有工作者线程结束,线程池最终变为TERMINATED if (workerCanExit()) { if (runState >= SHUTDOWN) // Wake up others interruptIdleWorkers(); return null; } //.....省略非关键代码 } } } SHUTDOWN -> TERMINATED: 当线程池处于SHUTDOWN状态时,任务队列不在增加新任务,工作者线程消费完任务队列中所有任务后,会一个个关闭,最终workQueue任务队列为空&workers工作者线程集合为空,线程池变为TERMINATED状态。
(3) STOP
static final int STOP= 2; 当线程池处于STOP状态时,此时线程池不在接收新任务: //线程池接受任务execute public void execute(Runnable command) { //.....省略非关键代码 if (poolSize >= corePoolSize || !addIfUnderCorePoolSize(command)) { //线程池处于STOP状态时,此时线程池不在接收新任务 if (runState == RUNNING && workQueue.offer(command)) { if (runState != RUNNING || poolSize == 0) //.....省略非关键代码 } else if (!addIfUnderMaximumPoolSize(command)) //.....省略非关键代码 } } 当线程池处于STOP状态时,工作者线程立马停止,不在获取处理任务,任务丢弃掉: //工作者线程循环获取任务方法 Runnable getTask() { for (;;) { //.....省略非关键代码 int state = runState; //当线程池处于STOP状态时,直接返回空,结束工作者线程循环,立马关闭线程。 if (state > SHUTDOWN) return null; } } STOP-> TERMINATED: 当线程池处于STOP状态时,任务队列不在增加新任务,工作者立马停止一个个关闭,最终workers工作者线程集合为空,线程池变为TERMINATED状态。
(4) TERMINATED

当线程池中运行的线程数为0,并且此时线程池状态为STOP或者SHUTDOWN,线程池调用tryTerminate()方法,线程最终会变为TERMINATED状态:

//尝试结束线程池方法 //1.工作者线程关闭时并且此时poolSize==0,会调用 //2.shutdown()关闭线程池方法,会调用 //3.shutdownNow()关闭线程池方法,会调用 private void tryTerminate() { //当线程池中运行的线程数为0 if (poolSize == 0) { int state = runState; if (state < STOP && !workQueue.isEmpty()) { state = RUNNING; // disable termination check below addThread(null); } //并且此时线程池状态为STOP或者SHUTDOWN if (state == STOP || state == SHUTDOWN) { //设置线程池状态为TERMINATED runState = TERMINATED; termination.signalAll(); terminated(); } } }
(5) runState运行状态对比
RUNNING(工作中):线程池正常工作的状态。SHUTDOWN(关闭线程池):此时没法新增任务,但运行中的工作者线程仍然工作直到任务队列为空,worker线程关闭,当一个个worker线程关闭时,最终调用tryTerminate(),线程设置为TERMINATED状态。STOP(停止线程池):此时没法新增任务,工作者线程立马停止工作(放弃费任务队列),关闭掉,当一个个worker线程关闭时,最终调用tryTerminate(),线程设置为TERMINATED状态。TERMINATED(终结线程池):线程池的最终状态,当线程池中运行的线程数为0,并且此时线程池状态为STOP或者SHUTDOWN,线程池调用tryTerminate()方法,线程最终会变为TERMINATED状态。

8.3 线程池的使用

8.3.1 线程池的创建

(1) 构造方法

用ThreadPoolExecutor构造方法创建线程池:

public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) { this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, Executors.defaultThreadFactory(), defaultHandler); } public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) { if (corePoolSize < 0 || maximumPoolSize <= 0 || maximumPoolSize < corePoolSize || keepAliveTime < 0) throw new IllegalArgumentException(); if (workQueue == null || threadFactory == null || handler == null) throw new NullPointerException(); this.corePoolSize = corePoolSize; this.maximumPoolSize = maximumPoolSize; this.workQueue = workQueue; this.keepAliveTime = unit.toNanos(keepAliveTime); this.threadFactory = threadFactory; this.handler = handler; } //声明一个线程池 public static void main(String[] args) throws ExecutionException, InterruptedException { ThreadPoolExecutor pool = new ThreadPoolExecutor(5, 200, 100, TimeUnit.MINUTES, new LinkedBlockingQueue<Runnable>(2000)); }

(2) corePoolSize(核心线程数)

当有一个任务提交给线程池时,若此时运行的线程数小于corePoolSize核心线程数,则创建线程。如果调用prestartAllCoreThreads()方法,线程池会提前创建并启动核心的基本线程。

public int prestartAllCoreThreads() { int n = 0; //不停的向线程中添加线程数,知道poolSize大于等于核心线程数 while (addIfUnderCorePoolSize(null)) ++n; return n; }

(3) maximumPoolSize(最大线程数)

线程池允许创建启动的最大线程数。由于生产者生产速度远远高于消费者消费速度时,会造成任务队列充满,若此时已创建的线程数<最大线程数,则线程池继续创建新的线程执行任务。(注意maximumPoolSize对于使用无界的任务队列的线程池无效)

(4) keepAliveTime,unit(线程活动保持时间)

当线程数大于核心线程数或者allowCoreThreadTimeOut为true(表示核心线程要超时等待了,不在一直阻塞take了),此时工作者线程向任务队列取任务时,若队列为空了直接返回,该工作者线程直接销毁,但是设置了keepAliveTime,可以让worker线程多等待keepAliveTime时间,保持一定的线程空闲活动时间,提高线程的利用率。

//线程池构造函数中,keepAliveTime计算单位纳秒 this.keepAliveTime = unit.toNanos(keepAliveTime); //每个工作者线程运行的run方法 public void run() { try { hasRun = true; Runnable task = firstTask; firstTask = null; while (task != null || (task = getTask()) != null) { runTask(task); task = null; } } finally { workerDone(this); } } //每个工作者不停的要从任务队列中获取任务 Runnable getTask() { for (;;) { try { int state = runState; if (state > SHUTDOWN) return null; Runnable r; if (state == SHUTDOWN) // Help drain queue r = workQueue.poll(); else if (poolSize > corePoolSize || allowCoreThreadTimeOut) //超时等待获取workQueue任务队列元素。 r = workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS); else r = workQueue.take(); if (r != null) return r; if (workerCanExit()) { if (runState >= SHUTDOWN) // Wake up others interruptIdleWorkers(); return null; } // Else retry } catch (InterruptedException ie) { // On interruption, re-check runState } } }

(5) workQueue(线程安全的阻塞队列 )

用于保存等待执行任务的阻塞队列。可以是下面几种阻塞队列: - ArrayListBlockingQueue:是一个由数组组成的有界阻塞队列,此队列按FIFO先进先出的顺序对元素进行排序。 - LinkedBlockingQueue:是一个由链表组成的有界阻塞队列,此队列按FIFO先进先出,吞吐量要高于ArrayListBlockingQueue。由静态工厂方法Executors.newFixedThreadPool( )使用了此队列。 - SynchronousQueue:一个不存储元素的阻塞队列,队列的每个插入操作必须要等到令一个元素的移除操作,否则插入操作一直处于等待状态,吞吐量要高于LinkedBlockingQueue。由静态工厂方法Executors.newCachedThreadPool( )使用了此队列。 - PriorityBlockingQueue:一个具有优先级的无界队列。 - DelayQueue:一个由优先级队列实现的无界延时阻塞队列。由静态工厂方法Executors.newScheduledThreadPool( )使用了此队列.

(6) threadFactory(线程名称工厂)

Worker w = new Worker(firstTask); Thread t = threadFactory.newThread(w);

当线程池要创建新的线程时,都需要调用ThreadFactory .newThread(Runnable r)方法创建线程,目的是创建名称有意义的线程。下面看下DefaultThreadFactory 的实现newThread。

static class DefaultThreadFactory implements ThreadFactory { //线程池序号原子序列,申明static,poolNumber无状态,所有对象共享此变量 static final AtomicInteger poolNumber = new AtomicInteger(1); final ThreadGroup group; //线程池中线程原子序列,有状态变量,每个ThreadFactory 对象不同 final AtomicInteger threadNumber = new AtomicInteger(1); final String namePrefix; //初始化线程池名称pool-1-thread- DefaultThreadFactory() { SecurityManager s = System.getSecurityManager(); group = (s != null)? s.getThreadGroup() : Thread.currentThread().getThreadGroup(); namePrefix = "pool-" + poolNumber.getAndIncrement() + "-thread-"; } //不断的从线程池中创建线程pool-1-thread-1,pool-1-thread-2,pool-1-thread-3..... public Thread newThread(Runnable r) { Thread t = new Thread(group, r, namePrefix + threadNumber.getAndIncrement(), 0); if (t.isDaemon()) t.setDaemon(false); if (t.getPriority() != Thread.NORM_PRIORITY) t.setPriority(Thread.NORM_PRIORITY); return t; } }

(7) RejectedExecutionHandler (任务饱和策略)

public void execute(Runnable command) { if (command == null) throw new NullPointerException(); if (poolSize >= corePoolSize || !addIfUnderCorePoolSize(command)) { if (runState == RUNNING && workQueue.offer(command)) { if (runState != RUNNING || poolSize == 0) ensureQueuedTaskHandled(command); } else if (!addIfUnderMaximumPoolSize(command)) //线程池已饱和,执行饱和策略reject reject(command); // is shutdown or saturated } } void reject(Runnable command) { handler.rejectedExecution(command, this); }

当任务队列已满,并且运行的线程数大于或者等于线程池的最大线程数时,此时线程池已饱和了无法接收外部的任务了,此时需要执行饱和策略。线程池默认AbortPolicy,直接抛出throw new RejectedExecutionException()抛异常。当然线程池提供一下四种策略:

AbortPolicy:直接抛出异常。 public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { throw new RejectedExecutionException(); } CallerRunsPolicy:让当前提交任务的线程直接执行任务。 public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { if (!e.isShutdown()) { r.run(); } } DiscardPolicy:不处理,丢弃掉。 public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { } DiscardOldestPolicy:将队头任务丢弃掉(最先提交的马上要出队的任务),在执行任务。 public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { if (!e.isShutdown()) { e.getQueue().poll(); e.execute(r); } }

8.3.2 向线程池提交任务

8.3.2.1 execute( )

execute方法用于提交不需要返回值的任务,所以无法判断任务是否被线程池的线程执行成功。

//1.声明一个线程池 ThreadPoolExecutor pool = new ThreadPoolExecutor(5, 200, 100, TimeUnit.MINUTES, new LinkedBlockingQueue<Runnable()); //2.提交不需要返回值的任务 pool.execute(new Runnable() { public void run() { System.out.println("poll execute"); } });

8.3.2.2 submit( )

submit方法用于提交有返回值的任务,线程池会返回一个代表异步计算的结果的FutureTask类型的对象(下面介绍),可以通过这个future对象判断任务是否执行成功,调用future.get()会阻塞当前线程直到任务完成,而使用get(long timeout, TimeUnit unit)方法则会阻塞当前线程一段时间,超时候方法立马返回,可能此时任务没有执行完。

Future<?> future = pool.submit(new Runnable() { public void run() { System.out.println("poll submit"); } }); //当任务异步执行完,future.get()方法才会返回 if(future.get()!=null){ System.out.println("end"); }

8.3.3 关闭线程池

注意:

通过调用线程池的shutdown和shutdownNow来关闭线程池。他们的原理是,遍历线程池中所有的工作者线程,然后一个个中断,所以无法响应中断的任务可能永远无法停止了。shutdown和shutdownNow的区别是:shutdown将线程池状态设置成SHUTDOWN状态,然后中断所有没有正在执行任务的的线程,其他运行中的线程保持继续工作;而shutdownNow将线程池状态设置成STOP状态,尝试停止中断所有正在执行或者暂停任务的线程,任务队列任务不在消费执行。shutdown和shutdownNow**最终会让线程池状态变为TERMINATED状态**。通常调用shutdown关闭线程池,如果任务不一定要求全部执行完,可以用shutdownNow。

8.3.3.1 shutdown()

public void shutdown() { SecurityManager security = System.getSecurityManager(); if (security != null) security.checkPermission(shutdownPerm); final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { if (security != null) { // Check if caller can modify our threads for (Worker w : workers) security.checkAccess(w.thread); } int state = runState; //1.shutdown将线程池状态设置成SHUTDOWN状态 if (state < SHUTDOWN) runState = SHUTDOWN; try { //2.遍历线程池中所有的工作者线程,中断所有没有正在执行任务的的线程 for (Worker w : workers) { w.interruptIfIdle(); } } catch (SecurityException se) { // Try to back out runState = state; // tryTerminate() here would be a no-op throw se; } //3.尝试终结线程池 tryTerminate(); // Terminate now if pool and queue empty } finally { mainLock.unlock(); } }

8.3.3.2 shutdownNow()

public List<Runnable> shutdownNow() { SecurityManager security = System.getSecurityManager(); if (security != null) security.checkPermission(shutdownPerm); final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { if (security != null) { // Check if caller can modify our threads for (Worker w : workers) security.checkAccess(w.thread); } //1.shutdownNow将线程池状态设置成STOP状态 int state = runState; if (state < STOP) runState = STOP; try { //2.遍历线程池中所有的工作者线程,中断所有正在执行或者暂停任务的的线程 for (Worker w : workers) { w.interruptNow(); } } catch (SecurityException se) { // Try to back out runState = state; // tryTerminate() here would be a no-op throw se; } List<Runnable> tasks = drainQueue(); //3.尝试终结线程池 tryTerminate(); // Terminate now if pool and queue empty return tasks; } finally { mainLock.unlock(); } }

8.3.3.3 isShutdown()

//判断线程池是否关闭,当线程池状态为不为RUNNING,返回true public boolean isShutdown() { return runState != RUNNING; }

8.3.3.4 isTerminated()

//判断线程池是否最终结束,当线程池状态为TERMINATED,表示当前线程池线程数为0并且之前线程池状态为STOP或者SHUTDOWN,返回true public boolean isTerminated() { return runState == TERMINATED; }

8.4 合理运用线程池

8.4.1 线程池的任务队列建议使用有界阻塞队列。

8.4.2 大量使用线程池时,要对其监控。

转载请注明原文地址: https://www.6miu.com/read-12530.html

最新回复(0)