1、简介
1.1 介绍
基于优先级堆的无限优先级queue 。 优先级队列的元素根据它们的有序natural ordering ,或由一个Comparator在队列构造的时候提供,这取决于所使用的构造方法。 优先队列不允许null元素。
1.2 构造
PriorityBlockingQueue()
PriorityBlockingQueue(Collection<? extends E> c)
PriorityBlockingQueue(int initialCapacity)
PriorityBlockingQueue(int initialCapacity, Comparator<? super E> comparator)
1.3 继承
java
.lang.Object
java
.util.AbstractCollection<E>
java
.util.AbstractQueue<E>
java
.util.concurrent.PriorityBlockingQueue<E>
2、源代码
public class PriorityBlockingQueue<E> extends AbstractQueue<E>
implements BlockingQueue<E>, java.io.Serializable {
private static final
int DEFAULT_INITIAL_CAPACITY =
11;
private static final
int MAX_ARRAY_SIZE = Integer.MAX_VALUE -
8;
private transient Object[] queue;
private transient
int size;
private transient Comparator<? super E> comparator;
private final ReentrantLock
lock;
private final Condition notEmpty;
private transient
volatile int allocationSpinLock;
private PriorityQueue<E> q;
public PriorityBlockingQueue() {
this(DEFAULT_INITIAL_CAPACITY,
null);
}
public PriorityBlockingQueue(
int initialCapacity) {
this(initialCapacity,
null);
}
public PriorityBlockingQueue(
int initialCapacity,
Comparator<? super E> comparator) {
if (initialCapacity <
1)
throw new IllegalArgumentException();
this.
lock =
new ReentrantLock();
this.notEmpty =
lock.newCondition();
this.comparator = comparator;
this.queue =
new Object[initialCapacity];
}
public PriorityBlockingQueue(Collection<? extends E> c) {
this.
lock =
new ReentrantLock();
this.notEmpty =
lock.newCondition();
boolean heapify =
true;
boolean screen =
true;
if (c instanceof SortedSet<?>) {
SortedSet<? extends E> ss = (SortedSet<? extends E>) c;
this.comparator = (Comparator<? super E>) ss.comparator();
heapify =
false;
}
else if (c instanceof PriorityBlockingQueue<?>) {
PriorityBlockingQueue<? extends E> pq =
(PriorityBlockingQueue<? extends E>) c;
this.comparator = (Comparator<? super E>) pq.comparator();
screen =
false;
if (pq.getClass() == PriorityBlockingQueue.class)
heapify =
false;
}
Object[] a = c.toArray();
int n = a.length;
if (a.getClass() != Object[].class)
a = Arrays.copyOf(a, n, Object[].class);
if (screen && (n ==
1 ||
this.comparator !=
null)) {
for (
int i =
0; i < n; ++i)
if (a[i] ==
null)
throw new NullPointerException();
}
this.queue = a;
this.size = n;
if (heapify)
heapify();
}
private void tryGrow(Object[] array,
int oldCap) {
lock.unlock();
Object[] newArray =
null;
if (allocationSpinLock ==
0 &&
UNSAFE.compareAndSwapInt(
this, allocationSpinLockOffset,
0,
1)) {
try {
int newCap = oldCap + ((oldCap <
64) ?
(oldCap +
2) :
(oldCap >>
1));
if (newCap - MAX_ARRAY_SIZE >
0) {
int minCap = oldCap +
1;
if (minCap <
0 || minCap > MAX_ARRAY_SIZE)
throw new OutOfMemoryError();
newCap = MAX_ARRAY_SIZE;
}
if (newCap > oldCap && queue == array)
newArray =
new Object[newCap];
}
finally {
allocationSpinLock =
0;
}
}
if (newArray ==
null)
Thread.
yield();
lock.
lock();
if (newArray !=
null && queue == array) {
queue = newArray;
System.arraycopy(array,
0, newArray,
0, oldCap);
}
}
private E
dequeue() {
int n = size -
1;
if (n <
0)
return null;
else {
Object[] array = queue;
E result = (E) array[
0];
E x = (E) array[n];
array[n] =
null;
Comparator<? super E> cmp = comparator;
if (cmp ==
null)
siftDownComparable(
0, x, array, n);
else
siftDownUsingComparator(
0, x, array, n, cmp);
size = n;
return result;
}
}
private static <T>
void siftDownComparable(
int k, T x, Object[] array,
int n) {
if (n >
0) {
Comparable<? super T> key = (Comparable<? super T>)x;
int half = n >>>
1;
while (k < half) {
int child = (k <<
1) +
1;
Object c = array[child];
int right = child +
1;
if (right < n &&
((Comparable<? super T>) c).compareTo((T) array[right]) >
0)
c = array[child = right];
if (key.compareTo((T) c) <=
0)
break;
array[k] = c;
k = child;
}
array[k] = key;
}
}
private static <T>
void siftUpComparable(
int k, T x, Object[] array) {
Comparable<? super T> key = (Comparable<? super T>) x;
while (k >
0) {
int parent = (k -
1) >>>
1;
Object e = array[parent];
if (key.compareTo((T) e) >=
0)
break;
array[k] = e;
k = parent;
}
array[k] = key;
}
private static <T>
void siftUpUsingComparator(
int k, T x, Object[] array,
Comparator<? super T> cmp) {
while (k >
0) {
int parent = (k -
1) >>>
1;
Object e = array[parent];
if (cmp.compare(x, (T) e) >=
0)
break;
array[k] = e;
k = parent;
}
array[k] = x;
}
public boolean
offer(E e) {
if (e ==
null)
throw new NullPointerException();
final ReentrantLock
lock =
this.
lock;
lock.
lock();
int n, cap;
Object[] array;
while ((n = size) >= (cap = (array = queue).length))
tryGrow(array, cap);
try {
Comparator<? super E> cmp = comparator;
if (cmp ==
null)
siftUpComparable(n, e, array);
else
siftUpUsingComparator(n, e, array, cmp);
size = n +
1;
notEmpty.signal();
}
finally {
lock.unlock();
}
return true;
}
public E
poll() {
final ReentrantLock
lock =
this.
lock;
lock.
lock();
try {
return dequeue();
}
finally {
lock.unlock();
}
}
public E
take() throws InterruptedException {
final ReentrantLock
lock =
this.
lock;
lock.lockInterruptibly();
E result;
try {
while ( (result = dequeue()) ==
null)
notEmpty.
await();
}
finally {
lock.unlock();
}
return result;
}
public E
poll(
long timeout, TimeUnit unit) throws InterruptedException {
long nanos = unit.toNanos(timeout);
final ReentrantLock
lock =
this.
lock;
lock.lockInterruptibly();
E result;
try {
while ( (result = dequeue()) ==
null && nanos >
0)
nanos = notEmpty.awaitNanos(nanos);
}
finally {
lock.unlock();
}
return result;
}
private int indexOf(Object o) {
if (o !=
null) {
Object[] array = queue;
int n = size;
for (
int i =
0; i < n; i++)
if (o.equals(array[i]))
return i;
}
return -
1;
}
private void removeAt(
int i) {
Object[] array = queue;
int n = size -
1;
if (n == i)
array[i] =
null;
else {
E moved = (E) array[n];
array[n] =
null;
Comparator<? super E> cmp = comparator;
if (cmp ==
null)
siftDownComparable(i, moved, array, n);
else
siftDownUsingComparator(i, moved, array, n, cmp);
if (array[i] == moved) {
if (cmp ==
null)
siftUpComparable(i, moved, array);
else
siftUpUsingComparator(i, moved, array, cmp);
}
}
size = n;
}
}
2.2 总结
PriorityBlockingQueue的实现是一个二叉堆,且大小可增长。
只有一个条件锁,如果队列为空,再获取数据时会阻塞
看源码前看懂二叉堆会轻松很多 http://www.cnblogs.com/skywang12345/p/3610187.html