CountDownLatch 和CyclicBarrier 和 Semaphore 在使用的时候有类似,底层实现也很相似的。将分布介绍一下,
CountDownLatch 和lock的机制很相似,(参考lock:jdk 源码分析(7)java ReentrantLock结构)
相同点:
1)lock需要队列保存竞争的线程,
CountDownLatch需要队列(链表)保持等待的线程线程,
2)都需要park和unpark。
不同点:
1)lock (重入锁)是争论的谁能将state 变成1,而
CountDownLatch 是大家一起齐心协力将state 变成0,
2)
lock 是改变不了state的线程park,而
CountDownLatch 是需要等待的线程park,
3)
lock的upark 是另一个线程不再使用state时,选择队列的第一个线程park,而
CountDownLatch是其他线程一起将state 从一个非零值变成零后 park.
1)CountDownLatch
最后是改变
的AbstractQueuedSynchronizer的state 和lock 一样
public CountDownLatch(
int count) {
if (count <
0)
throw new IllegalArgumentException(
"count < 0")
;
this.
sync =
new Sync(count)
;
}
1)
CountDownLatch
await()方法主要调用这个方法,一般await用来暂停需要等待的线程(比如主线程)
private void doAcquireSharedInterruptibly(int arg) throws InterruptedException { //首先会添加到链表中,这里会记录当前线程,方法取消等待时能找到位置。
final Node node = addWaiter(Node.SHARED); boolean failed = true; try { for (;;) { final Node p = node.predecessor(); if (p == head) { int r = tryAcquireShared(arg); if (r >= 0) { setHeadAndPropagate(node, r); p.next = null; // help GC failed = false; return; } } //让当前线程进入等待,使用supportLock.park 方法
if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) throw new InterruptedException(); } } finally { if (failed) cancelAcquire(node); } }
countDown 方法:
public final boolean releaseShared(int arg) { //这里主要是判断是否已经达release 要求,
if (tryReleaseShared(arg)) { //释放park
doReleaseShared(); return true; } return false; }
细看上面里面的实现:
tryReleaseShared : 每次会减少1,一直到==0 否则都是范围false,false 就不会执行
doReleaseShared
();
protected boolean tryReleaseShared(int releases) { // Decrement count; signal when transition to zero //这里采用了循环,这是乐观锁的一种实现,一直尝试,直到成功,
for (;;) { int c = getState(); if (c == 0) return false; // 每次会减少1
int nextc = c-1; if (compareAndSetState(c, nextc)) return nextc == 0; } }
最后看看doReleaseShared()
private void doReleaseShared() { for (;;) { Node h = head; if (h != null && h != tail) { int ws = h.waitStatus; if (ws == Node.SIGNAL) { if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0)) continue; // loop to recheck cases //h 里面保存了await暂停的线程,直接释放就可以了,完成了等待过程。
unparkSuccessor(h); } else if (ws == 0 && !compareAndSetWaitStatus(h, 0, Node.PROPAGATE)) continue; // loop on failed CAS } if (h == head) // loop if head changed break; } }
CountDownLatch 解析过程结束: