Java并发编程(六)《并发容器和框架》

xiaoxiao2021-02-27  225

Java并发编程六并发容器和框架1 map死循环2 ConcurrentHashMap分段锁线程安全map 21 数据结构 1 结构2 类图关系3 Segment分段锁4 实际存放元素的HashEntry数组 22 初始化24 定位Segment25 get 操作 251 get操作的高效252 key hash散列定位到具体的Segment桶253 从桶里取出数据 26 put 操作 261 定位Segment要将元素put到那个Segment桶里262 将元素加入Segment的HashEntry数组中263 如何扩容rehash 27 size 和 isEmpty操作 3 ConcurrentLinkedQueue非阻塞线程安全队列 31 offer入队cas32 poll出队cas 4 阻塞线程安全队列 41 ArrayBlockingQueue数组有界阻塞队列 1 数据结构2 offer入队3 超时offer入队4 poll出队5 poll超时出队6 put和take 42 LinkedBlockingQueue链表有界阻塞队列 1 数据结构1 offer2 超时offer3 poll4 超时poll5 put和take6 removeObject o和boolean containsObject o 43 PriorityBlockingQueue优先级无界阻塞队列44 DelayQueue延时优先级无界阻塞队列 1 数据结构2 offer2 poll3 put4 take5 实际运用 45 SynchronousQueue不存储元素的阻塞队列46 LinkedTransferQueue链表无界阻塞队列47 LinkedBlockingDueue链表双向阻塞队列48 阻塞队列原理49 ForkJoin 5 Queue队列 基本操作

Java并发编程(六)《并发容器和框架》

@(并发)

6.1 map死循环

在多线程的情况下,map的put操作会形成循环链表,导致get操作死循环(Infinite Loop),导致Cpu利用率接近100%

6.2 ConcurrentHashMap(分段锁线程安全map)

6.2.1. 数据结构

(1). 结构

(2). 类图关系

ConcurrentHashMap 组合了Segment数组 Segment 组合了HashEntery数组

(3). Segment分段锁

static final class Segment<K,V> extends ReentrantLock implements Serializable { /* * 当前每个Segment里实际包含HashEntry元素的数量 * 变量申明volatile ,它的写立马对后续的读可见 */ transient volatile int count; /* * 当Segment进行put、clear、remove操作时,modCount就+1 * 用来标记Segment容器是否变化 */ transient int modCount; /* * Segment容器中包含HashEntry的元素数量的阈值 * 当容器中元素数量超过threshold时,table将rehash重新散列 * 一般threshold=loadFactor * 容器容量(容器容量不等于包含HashEntry实际数量) */ transient int threshold; /* * Segment容器中包含HashEntry数组 * 变量申明volatile ,它的写立马对后续的读可见 */ transient volatile HashEntry<K,V>[] table; /* * 负载因子,通常等于0.75,最优 * 负载因子较小时,降低了冲突的可能性,链表长度减少,加快查询速度,但是所占空间加大,频繁rehash性能会降低; 负载因子较大时,元素分布比较紧凑,容易导致hash冲突,链表加长,访问更新速度较慢,但是空间利用率高。 */ final float loadFactor; }

注意: 1.Segment容器容量 : 表示HashEntry的数组的大小. 2.Segment的阈值:表示HashEntry的数组的大小 * 负载因子. 3.Segment的元素数量:HashEntry的数组放入的实际元素数量. 4.Segment实际是可重入锁ReentrantLock,具有锁的特性 .

(4). 实际存放元素的HashEntry数组

static final class HashEntry<K,V> { /* * 放入map中元素的key, 声明final, key一旦初始化确定不能更改 */ final K key; /* * 放入map中元素的key的hash值 */ final int hash; /* * 放入map中元素的value */ volatile V value; // 当key的hash冲突时(hash值相同),通过next组成链表,此时查询速度较慢(相对于数组而言) final HashEntry<K,V> next; }

6.2.2. 初始化

/** * 初始化ConcurrentHashMap * (1) ConcurrentHashMap初始化容量:DEFAULT_INITIAL_CAPACITY=16 * (2) 默认负载因子:DEFAULT_LOAD_FACTOR=0.75 * (3) 默认并发度(同时可16个线程进行修改操作):DEFAULT_CONCURRENCY_LEVEL=16 */ public ConcurrentHashMap() { this(DEFAULT_INITIAL_CAPACITY, DEFAULT_LOAD_FACTOR, DEFAULT_CONCURRENCY_LEVEL); } /** * 1.初始化用于定位Segment锁位置的散列运算中的segmentShift,segmentMask(掩码) * 2.ConcurrentHashMap中包含的Segment数组,大小为ssize,同时初始化每个Segment中HashEntry数组 * 3.通过以上两步,默认情况下初始化成: * 包含有16个Segment分段锁,每个Segment有1个HashEntry的ConcurrentHashMap */ public ConcurrentHashMap(int initialCapacity,float loadFactor, int concurrencyLevel) { if (!(loadFactor > 0) || initialCapacity < 0 || concurrencyLevel <= 0) throw new IllegalArgumentException(); if (concurrencyLevel > MAX_SEGMENTS) concurrencyLevel = MAX_SEGMENTS; //求出用于定位参与散列运算的位数sshift int sshift = 0; int ssize = 1; //求出初始化Segment的长度ssize ,即并发度 while (ssize < concurrencyLevel) { ++sshift; ssize <<= 1; } segmentShift = 32 - sshift; //求出参与散列运算的的掩码=散列槽的长度-1 segmentMask = ssize - 1; //初始化ConcurrentHashMap中包含的Segment数组,大小为ssize this.segments = Segment.newArray(ssize); if (initialCapacity > MAXIMUM_CAPACITY) initialCapacity = MAXIMUM_CAPACITY; /** * 计算每个Segment中容纳HashEntry数组的大小 * 规则:将Map设定的初始容量initialCapacity除以Segment分段锁的数量, * 求出每个Segment应该容纳多少HashEntry */ int c = initialCapacity / ssize; if (c * ssize < initialCapacity) ++c; int cap = 1; while (cap < c) cap <<= 1; //初始化每个Segment中HashEntry数组 for (int i = 0; i < this.segments.length; ++i) this.segments[i] = new Segment<K,V>(cap, loadFactor); }

6.2.4. 定位Segment

/** * 通过放入map中元素的key的hashcode再hash的值,取出高4位,同segmentMask掩码15进行与运算(相当于取摸), * 从而定位到具体的Segment槽slot位置 */ final Segment<K,V> segmentFor(int hash) { return segments[(hash >>> segmentShift) & segmentMask]; } private static int hash(int h) { // Spread bits to regularize both segment and index locations, // using variant of single-word Wang/Jenkins hash. h += (h << 15) ^ 0xffffcd7d; h ^= (h >>> 10); h += (h << 3); h ^= (h >>> 6); h += (h << 2) + (h << 14); return h ^ (h >>> 16); }

6.2.5. get 操作

6.2.5.1 get操作的高效

1. get操作不需要加锁。 2. 用volatile替代锁。 3. 将map中使用到共享变量统计Segment大小的count,以及存储值的HashEntry[]数组声明成volatile,使他们能在线程之间保持可见性,能多个线程同时读。 4. 之所以不会读到旧的值,这是由java内存模型的happen before原则决定的,对volatile字段的写先于读操作,即get操作总能拿到最新的值. 5.共享变量如下:

//The number of elements in this segment's region. transient volatile int count; // The per-segment table. transient volatile HashEntry<K,V>[] table; // HashEntry's value. volatile V value;

6.2.5.2 key hash散列定位到具体的Segment桶

public V get(Object key) { int hash = hash(key.hashCode()); return segmentFor(hash).get(key, hash); }

6.2.5.3 从桶里取出数据

V get(Object key, int hash) { /** * count表示:Segment桶里存储的HashEntry数量; * 申明volatile,读取的时候直接从主存取,写的时候直接写到主存里 */ if (count != 0) { // read-volatile //通过hash值找到对应的HashEntry链表的第一个元素 HashEntry<K,V> e = getFirst(hash); //遍历HashEntry e的链表 while (e != null) { //当hash以及key对应的值相等,即返回value if (e.hash == hash && key.equals(e.key)) { V v = e.value; if (v != null) return v; return readValueUnderLock(e); // recheck } e = e.next; } } return null; } /** * 通过hash值找到对应的HashEntry链表的第一个元素 */ HashEntry<K,V> getFirst(int hash) { //table(read-volatile) HashEntry<K,V>[] tab = table; //对table长度取摸,定位的存储位置 return tab[hash & (tab.length - 1)]; }

6.2.6. put 操作

6.2.6.1 定位Segment,要将元素put到那个Segment桶里

public V put(K key, V value) { if (value == null) throw new NullPointerException(); int hash = hash(key.hashCode()); //定位Segmnet,将元素放入其中 return segmentFor(hash).put(key, hash, value, false); }

6.2.6.2 将元素加入Segment的HashEntry数组中

/* *将元素put到对应的Segment的HashEntry[]数组内 */ V put(K key, int hash, V value, boolean onlyIfAbsent) { //put要对共享变量操作,这里加独占锁,只允许一个线程进入,保证线程安全 lock(); try { //获取此时HashEntry[]容纳的元素个数 int c = count; //HashEntry[]大小是否超过阈值threshold,若超过则Hash Table扩容 if (c++ > threshold) // ensure capacity rehash(); HashEntry<K,V>[] tab = table; int index = hash & (tab.length - 1); //通过放入元素的hash值与HashEntry[]长度取摸,定位到具体的HashEntry链表,取第一个元素 HashEntry<K,V> first = tab[index]; HashEntry<K,V> e = first; //遍历定位到具体的HashEntry链表,看put元素的key是否已经存在 while (e != null && (e.hash != hash || !key.equals(e.key))) e = e.next; V oldValue; //若put元素的key值存在,则新值替换旧值 if (e != null) { oldValue = e.value; if (!onlyIfAbsent) e.value = value; } //若put元素的key值不存在,将put的元素构成新HashEntry加入到链表头部,Segment操作变更modCount++,统计HashEntry[]元素个数count else { oldValue = null; ++modCount; tab[index] = new HashEntry<K,V>(key, hash, first, value); count = c; // write-volatile } return oldValue; } finally { //最后释放独占锁,唤醒同步队列中等待的线程 unlock(); } }

6.2.6.3 如何扩容rehash()

6.2.7. size 和 isEmpty操作

(1) 高并发下,判断CurrentHashMap是否为空,isEmpty的效率比size()==0效率高很多. (2) size方法:先尝试2次不锁住Segment的方式,统计每个Segment的count大小,若两次统计的过程中某Segment的modCount发生变化,则将所有的Segment全部锁住,统计count大小。

public int size() { final Segment<K,V>[] segments = this.segments; long sum = 0; long check = 0; int[] mc = new int[segments.length]; // Try a few times to get accurate count. On failure due to // continuous async changes in table, resort to locking. for (int k = 0; k < RETRIES_BEFORE_LOCK; ++k) { check = 0; sum = 0; int mcsum = 0; /** *统计每个segmnet的count,相加 */ for (int i = 0; i < segments.length; ++i) { sum += segments[i].count; mcsum += mc[i] = segments[i].modCount; } /** *看每个segmnet的modCount是否变化,若变化重新统计 */ if (mcsum != 0) { for (int i = 0; i < segments.length; ++i) { check += segments[i].count; if (mc[i] != segments[i].modCount) { check = -1; // force retry break; } } } if (check == sum) break; } if (check != sum) { // Resort to locking all segments sum = 0; /** *将segment全部锁住 */ for (int i = 0; i < segments.length; ++i) segments[i].lock(); for (int i = 0; i < segments.length; ++i) sum += segments[i].count; for (int i = 0; i < segments.length; ++i) segments[i].unlock(); } if (sum > Integer.MAX_VALUE) return Integer.MAX_VALUE; else return (int)sum; }

(3) isEmpty方法:判断ConcurrentHashMap是否为空,看其中每个segment的count是否都为0,若都为0了则再次校验每个segment的count是否都为0,每个segment的modCount是否有变化,若都没变则map为空.

public boolean isEmpty() { final Segment<K,V>[] segments = this.segments; /* * We keep track of per-segment modCounts to avoid ABA * problems in which an element in one segment was added and * in another removed during traversal, in which case the * table was never actually empty at any point. Note the * similar use of modCounts in the size() and containsValue() * methods, which are the only other methods also susceptible * to ABA problems. */ int[] mc = new int[segments.length]; int mcsum = 0; for (int i = 0; i < segments.length; ++i) { if (segments[i].count != 0) return false; else mcsum += mc[i] = segments[i].modCount; } // If mcsum happens to be zero, then we know we got a snapshot // before any modifications at all were made. This is // probably common enough to bother tracking. if (mcsum != 0) { for (int i = 0; i < segments.length; ++i) { if (segments[i].count != 0 || mc[i] != segments[i].modCount) return false; } } return true; }

6.3 ConcurrentLinkedQueue(非阻塞线程安全队列)

6.3.1 offer入队(cas)

6.3.2 poll出队(cas)

6.4 .阻塞线程安全队列

6.4.1 ArrayBlockingQueue(数组有界阻塞队列)

(1) 数据结构

/** * 存放任务的循环数组 */ final Object[] items; /** * 队列头部下标,for next take, poll, peek or remove */ int takeIndex; /** * 添加新元素的下标,队列尾部下标的next,for next put, offer or add */ int putIndex; /** *阻塞队列中当前容纳任务的数量 */ int count; /** * 独占锁,保护队列,让其线程安全 */ final ReentrantLock lock; /** * 队列同步器上的等待队列,等待队列不满 */ private final Condition notFull; /** * 队列同步器上的等待队列,等待队列不空 */ private final Condition notEmpty;

(2) offer()入队

offer操作:

第一个线程进来,加锁锁住整个队列,为了保证队列线程安全,方法只允许同一时间一个线程访问;若队列任务数量count已满,直接返回false;若不满,在队列尾部添加元素并且唤醒notEmpty等待队列头节点去竞争消费阻塞队列。线程释放独占锁。

代码分析:

//队尾入队 public boolean offer(E e) { //1.检查入队元素是否为空,若为空会抛出空指针异常 checkNotNull(e); final ReentrantLock lock = this.lock; //2.加锁,保证只有一个线程进入,其他线程在同步队列上等待 lock.lock(); try { //3.队列已满返回false,否则出队 if (count == items.length) return false; else { insert(e); return true; } } finally { //4.final块释放锁 lock.unlock(); } } //入队操作具体类 private void insert(E x) { //在循环数组尾部的next下标putIndex那,放入新元素 items[putIndex] = x; //修改putIndex+1,若已到数组最后一个元素,putIndex重0开始循环 putIndex = inc(putIndex); //元素容量+1 ++count; //此时队列中有新元素了,不在是空的了,通知notEmpty等待队列中的head节点继续消费了 notEmpty.signal(); } //Circularly decrement i final int inc(int i) { return (++i == items.length) ? 0 : i; }

(3) 超时offer()入队

当队队列满时无法入队,一般直接返回false入队失败,但是超时等待offer方法允许线程在(notFull Condition)上等待一段时间,若在规定的等待时间内,线程被poll出队操作唤醒并且能够得到锁,则可以继续入队。

代码分析:

//超时等待入队 public boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException { checkNotNull(e); long nanos = unit.toNanos(timeout); final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { while (count == items.length) { if (nanos <= 0) return false; //等待nanos纳秒 nanos = notFull.awaitNanos(nanos); } insert(e); return true; } finally { lock.unlock(); } }

(4) poll()出队

直接上代码:

//出队 public E poll() { //1.加锁 final ReentrantLock lock = this.lock; lock.lock(); try { //2.若队列为空,直接返回空,否则返回队列头部元素 return (count == 0) ? null : extract(); } finally { //3.释放元素 lock.unlock(); } } //真实出队操作 private E extract() { final Object[] items = this.items; //将队头takeIndex数组元素出队 E x = this.<E>cast(items[takeIndex]); //help-gc items[takeIndex] = null; //takeIndex+1,若已到数组最后一个元素,takeIndex重0开始循环 takeIndex = inc(takeIndex); --count; //此时队列出队后,元素不满了,要通知notFull condition继续生产元素了 notFull.signal(); return x; } final int inc(int i) { return (++i == items.length) ? 0 : i; }

(5) poll()超时出队

若队列此时没有元素,可以在notEmpt condition上等待一段时间,若在规定的时间内有生产者线程signal,并且能够竞争到锁,允许其继续出队。

代码分析:

//超时出队 public E poll(long timeout, TimeUnit unit) throws InterruptedException { long nanos = unit.toNanos(timeout); final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { while (count == 0) { if (nanos <= 0) return null; //等待时间纳秒级别 nanos = notEmpty.awaitNanos(nanos); } return extract(); } finally { lock.unlock(); } }

(6) put()和take()

put操作同样是入队操作,当队列满时无法入队,线程会在notFull condition上一直等待阻塞住,直到出队线程通知。take操作同样是出队操作,当对列为空时无法出队,线程会在notEmpty condition一直等待阻塞住,直到入队线程通知。

代码分析:

public void put(E e) throws InterruptedException { checkNotNull(e); final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { while (count == items.length) //队列满,线程等待 notFull.await(); insert(e); } finally { lock.unlock(); } } public E take() throws InterruptedException { final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { //队列空,线程等待 while (count == 0) notEmpty.await(); return extract(); } finally { lock.unlock(); } }

6.4.2 LinkedBlockingQueue(链表有界阻塞队列)

(1) 数据结构

public class LinkedBlockingQueue<E>{ //队列容量 private final int capacity; //此时队列元素数量,因为入队出队可以并发进行,统计数量要用原子类型 private final AtomicInteger count = new AtomicInteger(0); //队头,实际头部的prev节点 private transient Node<E> head; //队尾,实际尾部的next节点 private transient Node<E> last; /** 入队持有锁,put, offer, etc */ private final ReentrantLock putLock = new ReentrantLock(); /** 入队时的等待队列 */ private final Condition notFull = putLock.newCondition(); /** 出队持有锁,take, poll, etc */ private final ReentrantLock takeLock = new ReentrantLock(); /** 出队时的等待队列 */ private final Condition notEmpty = takeLock.newCondition(); /** * 队列节点对象 */ static class Node<E> { //节点存放的具体元素 E item; /** * One of: * - the real successor Node * - this Node, meaning the successor is head.next * - null, meaning there is no successor (this is the last node) */ //当前节点的next Node<E> next; Node(E x) { item = x; } } }

(1) offer()

/** *入队操作,若队列已满则返回false,否则入队 *入队操作,只是对队尾节点last共享变量的操作,所以只要持有putLock就好 */ public boolean offer(E e) { if (e == null) throw new NullPointerException(); //1.获取队列元素数量 final AtomicInteger count = this.count; //2.若队列满,直接返回false,入队失败 if (count.get() == capacity) return false; int c = -1; //3.队列未满,可以入队了,将添加元素包装成节点元素node Node<E> node = new Node(e); final ReentrantLock putLock = this.putLock; //4.入队,将队列的尾节点last->next=当前node,所以要持有putLock putLock.lock(); try { if (count.get() < capacity) { enqueue(node); c = count.getAndIncrement(); if (c + 1 < capacity) notFull.signal(); } } finally { //5.释放putLoack putLock.unlock(); } if (c == 0) //6.若添加之前队列元素为空,此时需要通知消费者,即notEmpty等待队列,可以poll,take了 signalNotEmpty(); return c >= 0; } //入队真实操作,将新节点加到队列尾部,last指向当前节点 private void enqueue(Node<E> node) { // assert putLock.isHeldByCurrentThread(); // assert last.next == null; last = last.next = node; }

(2) 超时offer()

public boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException { if (e == null) throw new NullPointerException(); long nanos = unit.toNanos(timeout); int c = -1; final ReentrantLock putLock = this.putLock; final AtomicInteger count = this.count; putLock.lockInterruptibly(); try { //当队列满时,线程可以等待一段时间,再次判断队列是否已满,尝试入队 while (count.get() == capacity) { if (nanos <= 0) return false; nanos = notFull.awaitNanos(nanos); } enqueue(new Node<E>(e)); c = count.getAndIncrement(); if (c + 1 < capacity) notFull.signal(); } finally { putLock.unlock(); } if (c == 0) signalNotEmpty(); return true; }

(3) poll()

/** *出队操作,若队列为空则返回null,否则可以出队 *出队操作,只是对队头节点head共享变量的操作,所以只要持有takeLock就好 */ public E poll() { final AtomicInteger count = this.count; //1.获取队列元素个数,若等于0,出队操作直接返回空 if (count.get() == 0) return null; E x = null; int c = -1; final ReentrantLock takeLock = this.takeLock; //2.尝试持有takeLock takeLock.lock(); try { //3.再次判断元素个数>0,出队 if (count.get() > 0) { x = dequeue(); //元素个数线程安全的+1 c = count.getAndDecrement(); if (c > 1) notEmpty.signal(); } } finally { //4.释放锁 takeLock.unlock(); } if (c == capacity) //5.若之前队列已满,这里出队了,此时队列不满了,要通知到notFull等待队列 signalNotFull(); return x; } //真实出队操作 private E dequeue() { // assert takeLock.isHeldByCurrentThread(); // assert head.item == null; //获取队头 Node<E> h = head; //实际出队的节点是head的next Node<E> first = h.next; h.next = h; // 将之前的head节点断开引用,help GC head = first; E x = first.item; first.item = null; return x; }

(4) 超时poll()

(5) put()和take()

put操作同样是入队操作,当队列满时无法入队,线程会在notFull condition上一直等待阻塞住,直到出队线程通知。take操作同样是出队操作,当对列为空时无法出队,线程会在notEmpty condition一直等待阻塞住,直到入队线程通知。

(6) remove(Object o)和boolean contains(Object o)

注意remove某个元素或者是否包含某个元素的时候,因为不知道元素是队头或者队尾,还是中间元素,所以要对putLock和takeLock都要加锁,才能进行操作。

6.4.3 PriorityBlockingQueue(优先级无界阻塞队列)

内部维护一个有顺序的object数组,默认按优先级升序排序(插入排序),出队直接取第一个元素。

6.4.4 DelayQueue(延时优先级无界阻塞队列)

(1) 数据结构

public class DelayQueue<E extends Delayed>{ /** 入队出队都需要持有这把锁 */ private final ReentrantLock lock= new ReentrantLock(); /** 用于根据delay时间排序的优先级队列,每次poll操作取的头结点 */ private final PriorityQueue<E> q = new PriorityQueue<E>(); /*优化阻塞通知的线程元素leader(Leader-Follower模式), *leader线程不为空,表示有线程在消费头结点,其他线程follower直接等待await。 */ private Thread leader = null; /*用于实现阻塞和通知的Condition对象 *等待的场景:队列为空、leader线程不为空(代表有线程在消费头结点)、头结点过期时间没到 *通知的场景:offer时新节点成为头节点、leader线程消费完头节点 */ private final Condition available = lock.newCondition(); } //延时队列中的元素必须继承这个Delayed,实现getDelay,Comparable方法 public interface Delayed extends Comparable<Delayed> { //用于获取元素的延时时间点,精确到纳秒级别 long getDelay(TimeUnit unit); }

(2) offer()

加锁将元素添加到优先级队列中若添加的元素经过优先级排序后排在队首设置leader为null,通知等待在avaliable上的头节点线程消费。释放锁 public boolean offer(E e) { final ReentrantLock lock = this.lock; lock.lock(); try { q.offer(e); if (q.peek() == e) { leader = null; available.signal(); } return true; } finally { lock.unlock(); } }

(2) poll()

加锁查看优先级队列的头部。若头节点为空或者deplay延时时间没有到,则返回空,否则出队。释放锁 public E poll() { final ReentrantLock lock = this.lock; lock.lock(); try { E first = q.peek(); if (first == null || first.getDelay(TimeUnit.NANOSECONDS) > 0) return null; else return q.poll(); } finally { lock.unlock(); } }

(3) put()

(4) take()

加锁查看队列的头节点,若为空则线程进入等待。获取头节点的delay,若delay到期了,则出队成功。判断leader 线程是否为空,若不为空表示已经有线程在等待消费了,这里直接阻塞线程。leader为空,获取当前线程,设置leader线程==当前线程,当前线程等待delay到期,在finally块中释放leader元素的引用.循环执行2-5如果leader为空并且优先级队列不为空的情况下(判断还有没有其他后续节点),调用signal通知其他的线程.释放锁。 public E take() throws InterruptedException { final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { for (;;) { E first = q.peek(); if (first == null) available.await(); else { long delay = first.getDelay(TimeUnit.NANOSECONDS); if (delay <= 0) return q.poll(); else if (leader != null) available.await(); else { Thread thisThread = Thread.currentThread(); leader = thisThread; try { available.awaitNanos(delay); } finally { if (leader == thisThread) leader = null; } } } } } finally { if (leader == null && q.peek() != null) available.signal(); lock.unlock(); } }

(5) 实际运用

缓存过期健清除: 将cache中的设置了过期时间的元素放入一个DelayQueue中,后台有一个线程不停的poll头节点,将最早过期的元素从缓存中清除掉。

6.4.5 SynchronousQueue(不存储元素的阻塞队列)

6.4.6 LinkedTransferQueue(链表无界阻塞队列)

6.4.7 LinkedBlockingDueue(链表双向阻塞队列)

6.4.8 阻塞队列原理

同步队列器等待通知机制,阻塞队列和等待队列交互的结果。

6.4.9 Fork/Join

原理:工作窃取算法(双端队列),当一个线程工作结束,不会销毁而是取其他工作队列的尾部任务来继续执行。最大限度的利用线程,代码太复杂以后在研究。

6.5 Queue队列

基本操作

public interface Queue<E> extends Collection<E> { /** * 入队,如果队列已满,则抛出异常 */ boolean add(E e); /** * 入队,如果队列已满,则返回false,不抛异常 */ boolean offer(E e); /** * 出队,移除并返回队列头部的元素,如果队列为空,则抛出异常 */ E remove(); /** * 出队,移除并返回队列头部的元素,如果队列为空,则返回false,不抛异常 */ E poll(); /** * 出队,不移除只返回队列头部的元素,如果队列为空,则抛出异常 */ E element(); /** * 出队,不移除只返回队列头部的元素,如果队列为空,则返回false,不抛异常 */ E peek(); }
转载请注明原文地址: https://www.6miu.com/read-11735.html

最新回复(0)