BlockingQueue源码剖析
参考ArrayBlockingQueue
核心组成取元素逻辑存元素逻辑inc DelayQueue
核心组成leader-follower模式takeput LinkedBlockingQueue
核心成员takeput PriorityBlockingQueue
核心成员takeput SynchronousQueue
BlockingQueue源码剖析
参考
英文文档 英文 jenkov 中文 defonds
package java.util.concurrent;
import java.util.Collection;
import java.util.Queue;
public interface BlockingQueue<E> extends Queue<E> {
boolean add(E e);
boolean offer(E e);
void put(E e) throws InterruptedException;
boolean offer(E e,
long timeout, TimeUnit unit)
throws InterruptedException;
E take() throws InterruptedException;
E poll(
long timeout, TimeUnit unit)
throws InterruptedException;
int remainingCapacity();
boolean remove(Object o);
public boolean contains(Object o);
int drainTo(Collection<?
super E> c);
int drainTo(Collection<?
super E> c,
int maxElements);
}
BlockingQueue的实现类有
ArrayBlockingQueueDelayQueueLinkedBlockingQueuePriorityBlockingQueueSynchronousQueue
ArrayBlockingQueue
转载地址
核心组成:
/** 底层维护队列元素的数组 */
final Object[] items;
/** 当读取元素时数组的下标(这里称为读下标) */
int takeIndex;
/** 添加元素时数组的下标 (这里称为写小标)*/
int putIndex;
/** 队列中的元素个数 */
int count;
/** 用于并发控制的工具类**/
final ReentrantLock lock;
/** 保证所有访问的主要的锁*/
final ReentrantLock lock;
/** 控制take操作时是否让线程等待 */
private final Condition notEmpty;
/** 控制put操作时是否让线程等待 */
private final Condition notFull;
取元素逻辑
在队列是空的时候需要等待到它不是空,同时take需要保证线程安全性。
public E
take() throws InterruptedException {
final ReentrantLock
lock =
this.
lock;
lock.lockInterruptibly();
try {
while (count ==
0)
notEmpty.
await();
return extract();
}
finally {
lock.unlock();
}
}
private E
extract() {
final Object[] items =
this.items;
E x =
this.<E>cast(items[takeIndex]);
items[takeIndex] =
null;
takeIndex = inc(takeIndex);
--count;
notFull.signal();
return x;
}
存元素逻辑
public void put(E e) throws InterruptedException {
checkNotNull(e);
final ReentrantLock
lock =
this.
lock;
lock.lockInterruptibly();
try {
while (count == items.length)
notFull.
await();
insert(e);
}
finally {
lock.unlock();
}
}
private void insert(E x) {
items[putIndex] = x;
putIndex = inc(putIndex);
++count;
notEmpty.signal();
}
inc
在extract和insert的时候都用到了inc方法,可以看出数组空间被循环利用了,因此ArrayBlockingQueue其实是一个循环队列
final
int inc(
int i) {
return (++i == items.
length) ?
0 : i;
}
DelayQueue
进入DelayQueue的元素必须实现 java.util.concurrent.Delayed 接口:
public interface Delayed extends Comparable<Delayed< {
public long getDelay(TimeUnit timeUnit);
}
当getDelay方法返回延迟的是 0 或者负值时,将被认为过期,该元素将会在 DelayQueue 的下一次take被调用的时候被释放掉。
原理实现参考文档
核心组成
private final transient ReentrantLock lock =
new ReentrantLock();
private final PriorityQueue<E> q =
new PriorityQueue<E>();
private Thread leader =
null;
private final Condition available = lock.newCondition();
leader-follower模式
参考这篇文章
take
public E
take() throws InterruptedException {
final ReentrantLock
lock =
this.
lock;
lock.lockInterruptibly();
try {
for (;;) {
E first = q.peek();
if (first ==
null)
available.
await();
else {
long delay = first.getDelay(NANOSECONDS);
if (delay <=
0)
return q.poll();
first =
null;
if (leader !=
null)
available.
await();
else {
Thread thisThread = Thread.currentThread();
leader = thisThread;
try {
available.awaitNanos(delay);
}
finally {
if (leader == thisThread)
leader =
null;
}
}
}
}
}
finally {
if (leader ==
null && q.peek() !=
null)
available.signal();
lock.unlock();
}
}
思考:采用leader/follower设计模式有什么好处?
不采用L/F模式,即直接将for循环里面first = null;之后的代码替换为available.awaitNanos(delay),无法避免频繁切换线程上下文的开销,比如以下场景: 线程A和B同时在awaitNanos,A等待了100ms,B等待了200ms,假设等待时限为300ms,线程B被唤醒,并在100ms内完成take()操作,100ms之后线程A被唤醒,发现首元素没有过期,因为之前过期的元素已经被线程B take出去了,就会继续等待。可见,线程A没有必要被唤醒,因为它付出了切换线程A上下文的代价。
采用L/F模式,可以避免这种开销,因为follower线程一直都在等待,不存在线程切入与切出上下文的问题。
put
public void put(E e) {
offer(e);
}
public boolean
offer(E e) {
final ReentrantLock
lock =
this.
lock;
lock.
lock();
try {
q.offer(e);
if (q.peek() == e) {
leader =
null;
available.signal();
}
return true;
}
finally {
lock.unlock();
}
}
LinkedBlockingQueue
可参考这篇文章
核心成员
static class Node<E> {
E item;
/**
* 指向下一个节点
* 指向this, 意思是下一个节点是head.next
* null, 没有实际的下一个节点
*/
Node<E> next;
Node(E x) { item = x; }
}
/** 队列的容量, 不提供容量就是Integer.MAX_VALUE*/
private final int capacity;
/**队列中元素的容量*/
private final AtomicInteger count =
new AtomicInteger();
/**
* 链表头结点引用.
* 恒等式: head.item == null
* 头结点不用于存储实际的元素
*/
transient Node<E> head;
/**
* 链表尾节点
* 恒等式: last.next == null
*/
private transient Node<E> last;
/** 保护take, poll等获取元素操作的锁*/
private final ReentrantLock takeLock =
new ReentrantLock();
private final Condition notEmpty = takeLock.newCondition();
/** 保护put, offer等存入元素操作的锁 */
private final ReentrantLock putLock =
new ReentrantLock();
private final Condition notFull = putLock.newCondition();
take
// 当队列非空的时候,取出链表首元素
public E
take()
throws InterruptedException {
E x;
int c = -
1;
final AtomicInteger count =
this.count;
final ReentrantLock takeLock =
this.takeLock;
takeLock.lockInterruptibly();
try {
while (count.get() ==
0) {
notEmpty.await();
}
x = dequeue();
c = count.getAndDecrement();
if (c >
1)
notEmpty.signal();
}
finally {
takeLock.unlock();
}
if (c == capacity)
signalNotFull();
return x;
}
private E
dequeue() {
Node<E> h = head;
Node<E> first = h.next;
h.next = h;
head = first;
E x = first.item;
first.item =
null;
return x;
}
put
public void put(E e)
throws InterruptedException {
if (e ==
null)
throw new NullPointerException();
int c = -
1;
Node<E> node =
new Node<E>(e);
final ReentrantLock putLock =
this.putLock;
final AtomicInteger count =
this.count;
putLock.lockInterruptibly();
try {
while (count.get() == capacity) {
notFull.await();
}
enqueue(node);
c = count.getAndIncrement();
if (c +
1 < capacity)
notFull.signal();
}
finally {
putLock.unlock();
}
if (c ==
0)
signalNotEmpty();
}
private void enqueue(Node<E> node) {
last = last.next = node;
}
private void signalNotEmpty() {
final ReentrantLock takeLock =
this.takeLock;
takeLock.lock();
try {
notEmpty.signal();
}
finally {
takeLock.unlock();
}
}
PriorityBlockingQueue
核心成员
/**
* 用来表示优先级队列的平衡的二进制堆(最小堆):queue[n]的两个儿子是queue[2*n+1]和 queue[2*(n+1)]。
* 优先级队列根据comparator排序, 如果comparator是null的话,就根据元素自然顺序排序。
*/
private transient Object[] queue;
private transient int size;
private transient Comparator<?
super E> comparator;
private final ReentrantLock lock;
private final Condition notEmpty;
private transient volatile int allocationSpinLock;
private PriorityQueue<E> q;
take
public E
take() throws InterruptedException {
final ReentrantLock
lock =
this.
lock;
lock.lockInterruptibly();
E result;
try {
while ( (result = dequeue()) ==
null)
notEmpty.
await();
}
finally {
lock.unlock();
}
return result;
}
private E
dequeue() {
int n = size -
1;
if (n <
0)
return null;
else {
Object[] array = queue;
E result = (E) array[
0];
E x = (E) array[n];
array[n] =
null;
Comparator<? super E> cmp = comparator;
if (cmp ==
null)
siftDownComparable(
0, x, array, n);
else
siftDownUsingComparator(
0, x, array, n, cmp);
size = n;
return result;
}
}
put
public void put(E e) {
offer(e);
}
public boolean
offer(E e) {
if (e ==
null)
throw new NullPointerException();
final ReentrantLock
lock =
this.
lock;
lock.
lock();
int n, cap;
Object[] array;
while ((n = size) >= (cap = (array = queue).length))
tryGrow(array, cap);
try {
Comparator<? super E> cmp = comparator;
if (cmp ==
null)
siftUpComparable(n, e, array);
else
siftUpUsingComparator(n, e, array, cmp);
size = n +
1;
notEmpty.signal();
}
finally {
lock.unlock();
}
return true;
private void tryGrow(Object[] array,
int oldCap) {
lock.unlock();
Object[] newArray =
null;
if (allocationSpinLock ==
0 &&
UNSAFE.compareAndSwapInt(
this, allocationSpinLockOffset,
0,
1)) {
try {
int newCap = oldCap + ((oldCap <
64) ?
(oldCap +
2) :
(oldCap >>
1));
if (newCap - MAX_ARRAY_SIZE >
0) {
int minCap = oldCap +
1;
if (minCap <
0 || minCap > MAX_ARRAY_SIZE)
throw new OutOfMemoryError();
newCap = MAX_ARRAY_SIZE;
}
if (newCap > oldCap && queue == array)
newArray =
new Object[newCap];
}
finally {
allocationSpinLock =
0;
}
}
if (newArray ==
null)
Thread.
yield();
lock.
lock();
if (newArray !=
null && queue == array) {
queue = newArray;
System.arraycopy(array,
0, newArray,
0, oldCap);
}
}
SynchronousQueue
SynchronousQueue 是一个特殊的队列,它的内部同时只能够容纳单个元素。如果该队列已有一元素的话,试图向队列中插入一个新元素的线程将会阻塞,直到另一个线程将该元素从队列中抽走。同样,如果该队列为空,试图向队列中抽取一个元素的线程将会阻塞,直到另一个线程向队列中插入了一条新的元素。