老汉谈——线程池原理

xiaoxiao2021-02-28  49

线程池

JDK1.8官方介绍:

Thread pools address two different problems: they usually provide improved performance when executing large numbers of asynchronous tasks, due to reduced per-task invocation overhead, and they provide a means of bounding and managing the resources, including threads, consumed when executing a collection of tasks. Each ThreadPoolExecutor also maintains some basic statistics, such as the number of completed tasks.

线程池解决了两个问题:它们通常在执行大量异步任务时提供改进性能问题,由于减少了每个任务的资源开销,他们提供边界和管理资源的手段,包括线程,消费在执行任务的集合。每个ThreadPoolExecutor 维持一些基本的统计数据,比如完成任务数量。

线程池的原理

线程池本身是不创建和启动线程的,只有任务到达时才会,即当一个任务提交到线程池时,就会创建一个线程。但可以重写动态使用方法prestartCoreThread()或prestartCoreAllThread()。当线程数没有达到核心线程数corePoolSize时,就会创建一个新的线程,不需要排队。当线程数大于核心线程数corePoolSize时,就会放到workerQueue任务队列里等待。当核心线程数小于等于线程总数maximunPoolSize时,如果队列中有等待的线程,则线程池会用空余的线程来执行队列的线程;否则,线程池就会采用拒绝策略来终止新的任务。如果某个线程会话时间超出保持会话时间keepAliveTime时,就会终止当前线程任务,然后加入新的线程任务。可以使用setKeepAliveTime(long, java.util.concurrent.TimeUnit)动态改变参数。一般来说,这个活动策略是已经超出了corePoolSize的线程时才应用。但只要keepAliveTime非0时,就可以通过allowCoreThreadTimeOut(true)方法,使超时策略应用在核心线程中。

任务排队

-直接提交:工作队列WorkQueue默认是SychronousQueue。它继承了AbstractQueue,实现了BlockingQueue。他的处理任务容量是无边界的。 当运行的线程数大于或等于核心线程corePoolSize时,才会加入队列,但直接提交是同步阻塞的。从下图的工作原理可以看出,直接提交maximunPoolSize最好无限大即无容量大小。

/** - Creates a {@code SynchronousQueue} with nonfair access policy. */ public SynchronousQueue() { this(false); } /** - Creates a {@code SynchronousQueue} with the specified fairness policy. * - @param fair if true, waiting threads contend in FIFO order for - access; otherwise the order is unspecified. */ public SynchronousQueue(boolean fair) { //fair 公平性标识 // 公平性TrasferQueue、 非公平行TransferStack transferer = fair ? new TransferQueue() : new TransferStack(); }

线程池类:ThreadPoolExecutor 应用场景:交换任工作,生产者的线程消费和消费者的线程同步传递消息、时间或任务。简单的说,任务2必须依赖于任务1的信息, -无界队列:LinkedBlockingQueue。它保证所有的超出corePoolSize的线程都放入队列里。这样创建的线程不会超出corePoolSize.,因此maximumPoolSize值就无效了。 线程池类:newFixedThreadPool

- Creates a {@code LinkedBlockingQueue} with a capacity of - {@link Integer#MAX_VALUE}. */ public LinkedBlockingQueue() { this(Integer.MAX_VALUE); } /** - Creates a {@code LinkedBlockingQueue} with the given (fixed) capacity. * - @param capacity the capacity of this queue - @throws IllegalArgumentException if {@code capacity} is not greater - than zero */ public LinkedBlockingQueue(int capacity) { if (capacity <= 0) throw new IllegalArgumentException(); this.capacity = capacity; last = head = new Node(null); } /** - Creates a {@code LinkedBlockingQueue} with a capacity of - {@link Integer#MAX_VALUE}, initially containing the elements of the - given collection, - added in traversal order of the collection's iterator. * - @param c the collection of elements to initially contain - @throws NullPointerException if the specified collection or any - of its elements are null */ public LinkedBlockingQueue(Collection c) { this(Integer.MAX_VALUE); final ReentrantLock putLock = this.putLock; putLock.lock(); // Never contended, but necessary for visibility try { int n = 0; for (E e : c) { if (e == null) throw new NullPointerException(); if (n == capacity) throw new IllegalStateException("Queue full"); enqueue(new Node(e)); ++n; } count.set(n); } finally { putLock.unlock(); } }

应用场景:适合任务之间互不影响 -有界队列:ArrayBlockingQueue。这是最复杂的一种。不推荐使用。当使用有限的MaximumPoolSize时,有界队列可以防止资源耗尽,但可能很难控制与调整。队列的容量是有限的。 一般使用大池小列或小池大列,前者会增加CPU使用率,但遇到不可接受的调度开销,也会降低吞吐量;后者会减少CPU使用率、操作系统资源和上下文的切换,但可能导致人工来降低吞吐量。

/** - Creates an {@code ArrayBlockingQueue} with the given (fixed) - capacity and default access policy. * - @param capacity the capacity of this queue - @throws IllegalArgumentException if {@code capacity < 1} */ public ArrayBlockingQueue(int capacity) { this(capacity, false); } /** - Creates an {@code ArrayBlockingQueue} with the given (fixed) - capacity and the specified access policy. * - @param capacity the capacity of this queue - @param fair if {@code true} then queue accesses for threads blocked - on insertion or removal, are processed in FIFO order; - if {@code false} the access order is unspecified. - @throws IllegalArgumentException if {@code capacity < 1} */ public ArrayBlockingQueue(int capacity, boolean fair) { if (capacity <= 0) throw new IllegalArgumentException(); this.items = new Object[capacity]; lock = new ReentrantLock(fair); notEmpty = lock.newCondition(); notFull = lock.newCondition(); } public ArrayBlockingQueue(int capacity, boolean fair, Collection c) { this(capacity, fair); final ReentrantLock lock = this.lock; lock.lock(); // Lock only for visibility, not mutual exclusion try { int i = 0; try { for (E e : c) { checkNotNull(e); items[i++] = e; } } catch (ArrayIndexOutOfBoundsException ex) { throw new IllegalArgumentException(); } count = i; putIndex = (i == capacity) ? 0 : i; } finally { lock.unlock(); }

}

从上可以看出:有界队列ArrayBlockingQueue和无界队列LinkedBlockingQueue都是使用ReentrantLock重入锁,而同步队列sychoronousQueue是使用公平性TransferQueue和非共性TransferStack,即转移数据队列和转移数据栈,从数据结构便知数据的顺序。
转载请注明原文地址: https://www.6miu.com/read-2631701.html

最新回复(0)