PriorityBlockingQueue

xiaoxiao2021-02-28  129

1、简介

1.1 介绍

基于优先级堆的无限优先级queue 。 优先级队列的元素根据它们的有序natural ordering ,或由一个Comparator在队列构造的时候提供,这取决于所使用的构造方法。 优先队列不允许null元素。

1.2 构造

//创建一个PriorityBlockingQueue ,具有默认的初始容量(11),根据它们的自然排序对其元素进行排序 。 PriorityBlockingQueue() //创建一个 PriorityBlockingQueue集合中的元素的PriorityBlockingQueue。 PriorityBlockingQueue(Collection<? extends E> c) //创建具有PriorityBlockingQueue初始容量的PriorityBlockingQueue,根据它们的自然排序对其元素进行排序 。 PriorityBlockingQueue(int initialCapacity) //创建具有 PriorityBlockingQueue初始容量的PriorityBlockingQueue,根据指定的比较器对其元素进行排序 PriorityBlockingQueue(int initialCapacity, Comparator<? super E> comparator)

1.3 继承

java.lang.Object java.util.AbstractCollection<E> java.util.AbstractQueue<E> java.util.concurrent.PriorityBlockingQueue<E>

2、源代码

public class PriorityBlockingQueue<E> extends AbstractQueue<E> implements BlockingQueue<E>, java.io.Serializable { //默认长度 private static final int DEFAULT_INITIAL_CAPACITY = 11; //最大长度 private static final int MAX_ARRAY_SIZE = Integer.MAX_VALUE - 8; //容器 private transient Object[] queue; //元素个数 private transient int size; //比较器 private transient Comparator<? super E> comparator; private final ReentrantLock lock; //条件锁 private final Condition notEmpty; //自旋锁 private transient volatile int allocationSpinLock; //序列化/反序列化用 private PriorityQueue<E> q; //构造 public PriorityBlockingQueue() { this(DEFAULT_INITIAL_CAPACITY, null); } public PriorityBlockingQueue(int initialCapacity) { this(initialCapacity, null); } //真正的构造 public PriorityBlockingQueue(int initialCapacity, Comparator<? super E> comparator) { if (initialCapacity < 1) throw new IllegalArgumentException(); this.lock = new ReentrantLock(); this.notEmpty = lock.newCondition(); this.comparator = comparator; this.queue = new Object[initialCapacity]; } //带子集的构造 public PriorityBlockingQueue(Collection<? extends E> c) { this.lock = new ReentrantLock(); this.notEmpty = lock.newCondition(); boolean heapify = true; // true if not known to be in heap order //屏蔽null值标记 boolean screen = true; // true if must screen for nulls if (c instanceof SortedSet<?>) { SortedSet<? extends E> ss = (SortedSet<? extends E>) c; this.comparator = (Comparator<? super E>) ss.comparator(); heapify = false; } else if (c instanceof PriorityBlockingQueue<?>) { PriorityBlockingQueue<? extends E> pq = (PriorityBlockingQueue<? extends E>) c; this.comparator = (Comparator<? super E>) pq.comparator(); screen = false; if (pq.getClass() == PriorityBlockingQueue.class) // exact match heapify = false; } Object[] a = c.toArray(); int n = a.length; // If c.toArray incorrectly doesn't return Object[], copy it. //如果是基本类型 直接复制 if (a.getClass() != Object[].class) a = Arrays.copyOf(a, n, Object[].class); //检查空值 if (screen && (n == 1 || this.comparator != null)) { for (int i = 0; i < n; ++i) if (a[i] == null) throw new NullPointerException(); } this.queue = a; this.size = n; if (heapify) //建立堆结构 heapify(); } //尝试扩容 private void tryGrow(Object[] array, int oldCap) { //必须先释放锁 lock.unlock(); // must release and then re-acquire main lock Object[] newArray = null; //CAS操作 if (allocationSpinLock == 0 && UNSAFE.compareAndSwapInt(this, allocationSpinLockOffset, 0, 1)) { try { //原长度小于64 增长一倍 否则增长原来的二分之一 int newCap = oldCap + ((oldCap < 64) ? (oldCap + 2) : // grow faster if small (oldCap >> 1)); //防止过大 if (newCap - MAX_ARRAY_SIZE > 0) { // possible overflow int minCap = oldCap + 1; if (minCap < 0 || minCap > MAX_ARRAY_SIZE) throw new OutOfMemoryError(); newCap = MAX_ARRAY_SIZE; } //新建数组 if (newCap > oldCap && queue == array) newArray = new Object[newCap]; } finally { allocationSpinLock = 0; } } //如果有其他线程正在扩容 则让出 if (newArray == null) // back off if another thread is allocating Thread.yield(); //获取锁 lock.lock(); //如果原数组未改变 则将原数组内容拷贝到新数组 if (newArray != null && queue == array) { queue = newArray; System.arraycopy(array, 0, newArray, 0, oldCap); } } //出队 private E dequeue() { int n = size - 1; if (n < 0) return null; else { Object[] array = queue; //删除元素 E result = (E) array[0]; E x = (E) array[n]; array[n] = null; //调整二叉堆 Comparator<? super E> cmp = comparator; if (cmp == null) siftDownComparable(0, x, array, n); else siftDownUsingComparator(0, x, array, n, cmp); size = n; return result; } } //自上向下调整 private static <T> void siftDownComparable(int k, T x, Object[] array, int n) { if (n > 0) { Comparable<? super T> key = (Comparable<? super T>)x; int half = n >>> 1; // loop while a non-leaf while (k < half) { int child = (k << 1) + 1; // assume left child is least Object c = array[child]; int right = child + 1; if (right < n && ((Comparable<? super T>) c).compareTo((T) array[right]) > 0) c = array[child = right]; if (key.compareTo((T) c) <= 0) break; array[k] = c; k = child; } array[k] = key; } } //二叉堆的调整 private static <T> void siftUpComparable(int k, T x, Object[] array) { Comparable<? super T> key = (Comparable<? super T>) x; while (k > 0) { //计算父节点下标 int parent = (k - 1) >>> 1; Object e = array[parent]; if (key.compareTo((T) e) >= 0) break; array[k] = e; k = parent; } array[k] = key; } //二叉堆的调整 private static <T> void siftUpUsingComparator(int k, T x, Object[] array, Comparator<? super T> cmp) { while (k > 0) { int parent = (k - 1) >>> 1; Object e = array[parent]; if (cmp.compare(x, (T) e) >= 0) break; array[k] = e; k = parent; } array[k] = x; } //添加元素 public boolean offer(E e) { if (e == null) throw new NullPointerException(); final ReentrantLock lock = this.lock; lock.lock(); int n, cap; Object[] array; //自旋锁 while ((n = size) >= (cap = (array = queue).length)) tryGrow(array, cap); try { //添加完 调整堆 Comparator<? super E> cmp = comparator; if (cmp == null) siftUpComparable(n, e, array); else siftUpUsingComparator(n, e, array, cmp); size = n + 1; //唤醒消费者线程 notEmpty.signal(); } finally { lock.unlock(); } return true; } //出队 不会阻塞 public E poll() { final ReentrantLock lock = this.lock; lock.lock(); try { return dequeue(); } finally { lock.unlock(); } } //获取元素 public E take() throws InterruptedException { final ReentrantLock lock = this.lock; lock.lockInterruptibly(); E result; try { //如果队列空 阻塞 while ( (result = dequeue()) == null) notEmpty.await(); } finally { lock.unlock(); } return result; } //带超时 public E poll(long timeout, TimeUnit unit) throws InterruptedException { long nanos = unit.toNanos(timeout); final ReentrantLock lock = this.lock; lock.lockInterruptibly(); E result; try { while ( (result = dequeue()) == null && nanos > 0) nanos = notEmpty.awaitNanos(nanos); } finally { lock.unlock(); } return result; } //遍历 查找元素下标 private int indexOf(Object o) { if (o != null) { Object[] array = queue; int n = size; for (int i = 0; i < n; i++) if (o.equals(array[i])) return i; } return -1; } //移除指定元素 private void removeAt(int i) { Object[] array = queue; int n = size - 1; //如果是最后一个 直接移除 if (n == i) // removed last element array[i] = null; else { E moved = (E) array[n]; array[n] = null; Comparator<? super E> cmp = comparator; //将最后一个元素替换下标i的元素 if (cmp == null) siftDownComparable(i, moved, array, n); else siftDownUsingComparator(i, moved, array, n, cmp); //进行调整 if (array[i] == moved) { if (cmp == null) siftUpComparable(i, moved, array); else siftUpUsingComparator(i, moved, array, cmp); } } size = n; } }

2.2 总结

PriorityBlockingQueue的实现是一个二叉堆,且大小可增长。

只有一个条件锁,如果队列为空,再获取数据时会阻塞

看源码前看懂二叉堆会轻松很多 http://www.cnblogs.com/skywang12345/p/3610187.html

转载请注明原文地址: https://www.6miu.com/read-61251.html

最新回复(0)