1、简介
1.1 介绍
一个有限的blocking queue由数组支持。 这个队列排列元素FIFO(先进先出)。 队列的头部是队列中最长的元素。 队列的尾部是队列中最短时间的元素。 新元素插入队列的尾部,队列检索操作获取队列头部的元素。 这是一个经典的“有界缓冲区”,其中固定大小的数组保存由生产者插入的元素并由消费者提取。 创建后,容量无法更改。 尝试put成满的队列的元件将导致在操作阻挡; 尝试take从空队列的元件将类似地阻塞。
此类支持可选的公平策略,用于订购等待的生产者和消费者线程。 默认情况下,此订单不能保证。 然而,以公平设置为true的队列以FIFO顺序授予线程访问权限。 公平性通常会降低吞吐量,但会降低变异性并避免饥饿。
该类及其迭代器实现了Collection和Iterator接口的所有可选方法。
1.2 类继承
java
.lang.Object
java
.util.AbstractCollection<E>
java
.util.AbstractQueue<E>
java
.util.concurrent.ArrayBlockingQueue<E>
1.3 构造
//有给定(固定)容量和默认访问策略的构造
ArrayBlockingQueue(int capacity)
//给定(固定)容量和指定访问策略的构造
ArrayBlockingQueue(int capacity, boolean fair)
//给定(固定)容量和指定访问策略和子集合的构造
ArrayBlockingQueue(int capacity, boolean fair, Collection
<? extends E> c)
1.4 主要方法
boolean add(E e)
在插入此队列的尾部,如果有可能立即这样做不超过该队列的容量,
true成功时 返回指定的元素 否则抛出 IllegalStateException如果此队列已满。
void clear()
从这个队列中原子地删除所有的元素。
boolean contains(Object o)
如果此队列包含指定的元素,则返回
true 。
int drainTo(Collection<?
super E> c)
从该队列中删除所有可用的元素,并将它们添加到给定的集合中。
int drainTo(Collection<?
super E> c,
int maxElements)
最多从该队列中删除给定数量的可用元素,并将它们添加到给定的集合中。
Iterator<E> iterator()
以适当的顺序返回该队列中的元素的迭代器。
boolean offer(E e)
如果可以在不超过队列容量的情况下立即将其指定的元素插入该队列的尾部,则在成功时
false如果该队列已满,则返回
true 。
boolean offer(E e,
long timeout, TimeUnit unit)
在该队列的尾部插入指定的元素,等待指定的等待时间,以使空间在队列已满时变为可用。
E peek()
检索但不删除此队列的头,如果此队列为空,则返回
null 。
E poll()
检索并删除此队列的头,如果此队列为空,则返回
null 。
E poll(
long timeout, TimeUnit unit)
检索并删除此队列的头,等待指定的等待时间(如有必要)使元素变为可用。
void put(E e)
在该队列的尾部插入指定的元素,如果队列已满,则等待空间变为可用。
int remainingCapacity()
返回此队列可以理想地(在没有内存或资源限制)的情况下接受而不阻止的附加元素数。
boolean remove(Object o)
从该队列中删除指定元素的单个实例(如果存在)。
2、源代码
public class ArrayBlockingQueue<E> extends AbstractQueue<E>
implements BlockingQueue<E>, java.io.Serializable {
final Object[] items;
int takeIndex;
int putIndex;
int count;
final ReentrantLock
lock;
private final Condition notEmpty;
private final Condition notFull;
final
int dec(
int i) {
return ((i ==
0) ? items.length : i) -
1;
}
final E itemAt(
int i) {
return (E) items[i];
}
private void enqueue(E x) {
final Object[] items =
this.items;
items[putIndex] = x;
if (++putIndex == items.length)
putIndex =
0;
count++;
notEmpty.signal();
}
private E
dequeue() {
final Object[] items =
this.items;
@SuppressWarnings(
"unchecked")
E x = (E) items[takeIndex];
items[takeIndex] =
null;
if (++takeIndex == items.length)
takeIndex =
0;
count--;
if (itrs !=
null)
itrs.elementDequeued();
notFull.signal();
return x;
}
void removeAt(final
int removeIndex) {
final Object[] items =
this.items;
if (removeIndex == takeIndex) {
items[takeIndex] =
null;
if (++takeIndex == items.length)
takeIndex =
0;
count--;
if (itrs !=
null)
itrs.elementDequeued();
}
else {
final
int putIndex =
this.putIndex;
for (
int i = removeIndex;;) {
int next = i +
1;
if (next == items.length)
next =
0;
if (next != putIndex) {
items[i] = items[next];
i = next;
}
else {
items[i] =
null;
this.putIndex = i;
break;
}
}
count--;
if (itrs !=
null)
itrs.removedAt(removeIndex);
}
notFull.signal();
}
public ArrayBlockingQueue(
int capacity) {
this(capacity,
false);
}
public ArrayBlockingQueue(
int capacity, boolean fair) {
if (capacity <=
0)
throw new IllegalArgumentException();
this.items =
new Object[capacity];
lock =
new ReentrantLock(fair);
notEmpty =
lock.newCondition();
notFull =
lock.newCondition();
}
public ArrayBlockingQueue(
int capacity, boolean fair,
Collection<? extends E> c) {
this(capacity, fair);
final ReentrantLock
lock =
this.
lock;
lock.
lock();
try {
int i =
0;
try {
for (E e : c) {
checkNotNull(e);
items[i++] = e;
}
}
catch (ArrayIndexOutOfBoundsException ex) {
throw new IllegalArgumentException();
}
count = i;
putIndex = (i == capacity) ?
0 : i;
}
finally {
lock.unlock();
}
}
public boolean
add(E e) {
return super.add(e);
}
public boolean
offer(E e) {
checkNotNull(e);
final ReentrantLock
lock =
this.
lock;
lock.
lock();
try {
if (count == items.length)
return false;
else {
enqueue(e);
return true;
}
}
finally {
lock.unlock();
}
}
public void put(E e) throws InterruptedException {
checkNotNull(e);
final ReentrantLock
lock =
this.
lock;
lock.lockInterruptibly();
try {
while (count == items.length)
notFull.
await();
enqueue(e);
}
finally {
lock.unlock();
}
}
public boolean
offer(E e,
long timeout, TimeUnit unit)
throws InterruptedException {
checkNotNull(e);
long nanos = unit.toNanos(timeout);
final ReentrantLock
lock =
this.
lock;
lock.lockInterruptibly();
try {
while (count == items.length) {
if (nanos <=
0)
return false;
nanos = notFull.awaitNanos(nanos);
}
enqueue(e);
return true;
}
finally {
lock.unlock();
}
}
public E
poll() {
final ReentrantLock
lock =
this.
lock;
lock.
lock();
try {
return (count ==
0) ?
null : dequeue();
}
finally {
lock.unlock();
}
}
public E
take() throws InterruptedException {
final ReentrantLock
lock =
this.
lock;
lock.lockInterruptibly();
try {
while (count ==
0)
notEmpty.
await();
return dequeue();
}
finally {
lock.unlock();
}
}
public E
peek() {
final ReentrantLock
lock =
this.
lock;
lock.
lock();
try {
return itemAt(takeIndex);
}
finally {
lock.unlock();
}
}
public boolean
remove(Object o) {
if (o ==
null)
return false;
final Object[] items =
this.items;
final ReentrantLock
lock =
this.
lock;
lock.
lock();
try {
if (count >
0) {
final
int putIndex =
this.putIndex;
int i = takeIndex;
do {
if (o.equals(items[i])) {
removeAt(i);
return true;
}
if (++i == items.length)
i =
0;
}
while (i != putIndex);
}
return false;
}
finally {
lock.unlock();
}
}
}
2.3 总结
总的来说是一个生产者/消费者结构。
容器是使用数组实现的环形队列。增删改操作使用ReentrantLock同步
生产者/消费者使用条件锁notEmpty/notFull实现