Java线程池深入理解

xiaoxiao2021-02-28  16

摘要:本文主要讲了Java中线程池的使用方法,注意事项以及实现原理,对深入理解Java线程池有一定的帮助。

为什么需要使用线程池呢?笔者认为线程池除了可以将执行线程与主线程隔离,实现线程的异步执行外,还可以减少因为线程创建、切换、销毁所产生的资源开销。当然,线程的休眠和执行都需要一定的内容空间,使用过多的线程将会带来性能上的损失。所以,线程池中线程的数量需要进行合理的设置。

提交一个线程到线程池

建立ExecutorService线程池

ExecutorService executorService = Executors.newCachedThreadPool();

如果线程池可以占用整个服务器的资源,可以依据cpu的数量创建固定大小的线程池

int cpuNums = Runtime.getRuntime().availableProcessors(); //获取当前系统的CPU 数目 ExecutorService executorService =Executors.newFixedThreadPool(cpuNums * POOL_SIZE); //ExecutorService通常根据系统资源情况灵活定义线程池大小

定义线程类

class Handler implements Runnable{ }

调用线程池

executorService.execute(new Handler(...));

Executors创建的的几种ExecutorService线程池

  newCachedThreadPool -缓存型池子,先查看池中有没有以前建立的线程,如果有,就reuse.如果没有,就建一个新的线程加入池中-缓存型池子通常用于执行一些生存期很短的异步型任务,因此在一些面向连接的daemon型SERVER中用得不多。-能reuse的线程,必须是timeout IDLE内的池中线程,缺省timeout是60s,超过这个IDLE时长,线程实例将被终止及移出池。newFixedThreadPool

-newFixedThreadPool与cacheThreadPool都实例化自ExecutorPoolExecutor,只是初始化的参数不同。

-其独特之处:任意时间点,最多只能有固定数目的活动线程存在,此时如果有新的线程要建立,只能放在另外的队列中等待,直到当前的线程中某个线程终止直接被移出池子

newSingleThreadScheduledExecutor-单例线程,任意时间池中只能有一个线程-用的是和cache池和fixed池相同的底层池newScheduledThreadPool-调度型线程池-这个池子里的线程可以按schedule依次delay执行,或周期执行newWorkStealingPool-使用所有可用的处理器创建工作线程池作为其目标并行级别

线程池的一些常用方法

execute()

表示往线程池添加线程,有可能会立即运行,也有可能不会。无法预知线程何时开始,何时线束。

submit()

将线程放入线程池中,除了使用execute,也可以使用submit,它们两个的区别是一个使用有返回值,一个没有返回值。submit的方法很适应于生产者-消费者模式,通过和Future结合一起使用,可以起到如果线程没有返回结果,就阻塞当前线程等待线程 池结果返回。

主要有三种方法:

shutdown()

通常放在execute后面。如果调用 了这个方法,一方面,表明当前线程池已不再接收新添加的线程,新添加的线程会被拒绝执行。另一方面,表明当所有线程执行完毕后,回收线程池的资源。注意,它不会马上关闭线程池!

shutdownNow()

不管当前有没有线程在执行,马上关闭线程池!这个方法要小心使用,要不可能会引起系统异常!

ThreadPoolExecutor技术内幕

来看看Java中线程池的源码实现,其继承关系如下:

newCachedThreadPool

public static ExecutorService newCachedThreadPool() { return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>()); }

newFixedThreadPool

public static ExecutorService newFixedThreadPool(int nThreads) { return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()); }

newSingleThreadExecutor

public static ExecutorService newSingleThreadExecutor() { return new FinalizableDelegatedExecutorService (new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>())); }

可以发现,其实它们调用的都是同一个接口ThreadPoolExecutor方法,只不过传入参数不一样而已。下面就来看看这个神秘的ThreadPoolExecutor。

首先来看看它的一些基本参数:

public class ThreadPoolExecutor extends AbstractExecutorService { //运行状态标志位 volatile int runState; static final int RUNNING = 0; static final int SHUTDOWN = 1; static final int STOP = 2; static final int TERMINATED = 3; //线程缓冲队列,当线程池线程运行超过一定线程时并满足一定的条件,待运行的线程会放入到这个队列 private final BlockingQueue<Runnable> workQueue; //重入锁,更新核心线程池大小、最大线程池大小时要加锁 private final ReentrantLock mainLock = new ReentrantLock(); //重入锁状态 private final Condition termination = mainLock.newCondition(); //工作都set集合 private final HashSet<Worker> workers = new HashSet<Worker>(); //线程执行完成后在线程池中的缓存时间 private volatile long keepAliveTime; //核心线程池大小 private volatile int corePoolSize; //最大线程池大小 private volatile int maximumPoolSize; //当前线程池在运行线程大小 private volatile int poolSize; //当缓冲队列也放不下线程时的拒绝策略 private volatile RejectedExecutionHandler handler; //线程工厂,用来创建线程 private volatile ThreadFactory threadFactory; //用来记录线程池中曾经出现过的最大线程数 private int largestPoolSize; //用来记录已经执行完毕的任务个数 private long completedTaskCount; ................ }

最终调用的构造方法:

public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) { if (corePoolSize < 0 || maximumPoolSize <= 0 || maximumPoolSize < corePoolSize || keepAliveTime < 0) throw new IllegalArgumentException(); if (workQueue == null || threadFactory == null || handler == null) throw new NullPointerException(); this.corePoolSize = corePoolSize; this.maximumPoolSize = maximumPoolSize; this.workQueue = workQueue; this.keepAliveTime = unit.toNanos(keepAliveTime); this.threadFactory = threadFactory; this.handler = handler; }

这里很简单,就是设置一下各个参数,并校验参数是否正确。

看看executor()方法的实现:

public void execute(Runnable command) { if (command == null) throw new NullPointerException(); /* * 需要处理的三种情况: * * 1. If fewer than corePoolSize threads are running, try to * start a new thread with the given command as its first * task. The call to addWorker atomically checks runState and * workerCount, and so prevents false alarms that would add * threads when it shouldn't, by returning false. * * 2. If a task can be successfully queued, then we still need * to double-check whether we should have added a thread * (because existing ones died since last checking) or that * the pool shut down since entry into this method. So we * recheck state and if necessary roll back the enqueuing if * stopped, or start a new thread if there are none. * * 3. If we cannot queue task, then we try to add a new * thread. If it fails, we know we are shut down or saturated * and so reject the task. */ int c = ctl.get(); if (workerCountOf(c) < corePoolSize) { if (addWorker(command, true)) return; c = ctl.get(); } if (isRunning(c) && workQueue.offer(command)) { int recheck = ctl.get(); if (! isRunning(recheck) && remove(command)) reject(command); else if (workerCountOf(recheck) == 0) addWorker(null, false); } else if (!addWorker(command, false)) reject(command); }

里面的调用了addWorker()方法:

private boolean addWorker(Runnable firstTask, boolean core) { retry: for (;;) { int c = ctl.get(); int rs = runStateOf(c); // Check if queue empty only if necessary. if (rs >= SHUTDOWN && ! (rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty())) return false; for (;;) { int wc = workerCountOf(c); if (wc >= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize)) return false; if (compareAndIncrementWorkerCount(c)) break retry; c = ctl.get(); // Re-read ctl if (runStateOf(c) != rs) continue retry; // else CAS failed due to workerCount change; retry inner loop } } boolean workerStarted = false; boolean workerAdded = false; Worker w = null; try { w = new Worker(firstTask); final Thread t = w.thread; if (t != null) { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { // Recheck while holding lock. // Back out on ThreadFactory failure or if // shut down before lock acquired. int rs = runStateOf(ctl.get()); if (rs < SHUTDOWN || (rs == SHUTDOWN && firstTask == null)) { if (t.isAlive()) // precheck that t is startable throw new IllegalThreadStateException(); workers.add(w); int s = workers.size(); if (s > largestPoolSize) largestPoolSize = s; workerAdded = true; } } finally { mainLock.unlock(); } if (workerAdded) { t.start(); workerStarted = true; } } } finally { if (! workerStarted) addWorkerFailed(w); } return workerStarted; }

其实Work是真实去调用线程方法的地方,它是对Thread类的一个包装,每次Thread类调用其start方法时,就会调用到work的run方法。最终调用了runWorker()方法:

final void runWorker(Worker w) { Thread wt = Thread.currentThread(); Runnable task = w.firstTask; w.firstTask = null; w.unlock(); // allow interrupts boolean completedAbruptly = true; try { while (task != null || (task = getTask()) != null) { w.lock(); // If pool is stopping, ensure thread is interrupted; // if not, ensure thread is not interrupted. This // requires a recheck in second case to deal with // shutdownNow race while clearing interrupt if ((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && !wt.isInterrupted()) wt.interrupt(); try { beforeExecute(wt, task); Throwable thrown = null; try { task.run(); } catch (RuntimeException x) { thrown = x; throw x; } catch (Error x) { thrown = x; throw x; } catch (Throwable x) { thrown = x; throw new Error(x); } finally { afterExecute(task, thrown); } } finally { task = null; w.completedTasks++; w.unlock(); } } completedAbruptly = false; } finally { processWorkerExit(w, completedAbruptly); } }

根据不同的Runnable实现类型调用不同的run()方法实现。

参考:

Java并发编程与技术内幕:线程池深入理解;

转载请注明原文地址: https://www.6miu.com/read-2250266.html

最新回复(0)