上一个章节分析了CountDownLatch,
CountDownLatch 是等待所有线程都执行完后集合,而
CyclicBarrier 是大家一起准备,然后一起开始。比起玩网络游戏时,需要等待大家都准备好之后再开始,又比如,测试时需要测试并发量,如果直接用for 循环,然后start 线程,因为每个线程启动需要时间,没有在已经start后,在同时起来好,不过不管哪种方式都不太精确,因为底层还是串行的。只适合一般测试。
CyclicBarrier 和
CountDownLatch代码相似是
CyclicBarrier也是维护一个变量Count
,
1)
CyclicBarrier 自己维护变量个数count ,可以重置,然后继续下一个循环。这里的count 表示当前需要进程参与,parties 表示总共需要多少线程参于barrierCommand,是启动等待线程是会启动的线程。
当启动wait线程是,会重新将parties 赋值给count,重新开始下一轮。相信qq游戏120的比赛时就是这么做的。
public CyclicBarrier(
int parties
, Runnable barrierAction) {
if (parties <=
0)
throw new IllegalArgumentException()
;
this.
parties = parties
;
this.
count = parties
;
this.
barrierCommand = barrierAction
;
}
2)CyclicBarrier 里面使用lock 所以不需要自己存储线程,只需要统计个数就行。
private final Condition
trip =
lock.newCondition()
;
3)主要方法:
private int dowait(boolean timed, long nanos) throws InterruptedException, BrokenBarrierException, TimeoutException { final ReentrantLock lock = this.lock; lock.lock(); try { final Generation g = generation; if (g.broken) throw new BrokenBarrierException(); if (Thread.interrupted()) { breakBarrier(); throw new InterruptedException(); } int index = --count; //所有的都已经准备好了
if (index == 0) { // tripped boolean ranAction = false; try { final Runnable command = barrierCommand; if (command != null) command.run(); ranAction = true; //这里里面会调用trip.signalAll,启动所有等待,并重新开始下一轮count
nextGeneration(); return 0; } finally { if (!ranAction) breakBarrier(); } } // loop until tripped, broken, interrupted, or timed out //如果没有将count==0 ,就会进入等待。
for (;;) { try { if (!timed) trip.await(); else if (nanos > 0L) nanos = trip.awaitNanos(nanos); } catch (InterruptedException ie) { ... } if (g.broken) throw new BrokenBarrierException(); if (g != generation) return index; if (timed && nanos <= 0L) { breakBarrier(); throw new TimeoutException(); } } } finally { lock.unlock(); } }
上面的nextGeneration 代码:Generation默认break是false:
//这里里面会调用trip.signalAll,启动所有等待,并重新开始下一轮count
private void nextGeneration() {
// signal completion of last generation
trip.signalAll()
;
// set up next generation
count =
parties;
generation =
new Generation()
;
}
CyclicBarrier 的实现很简单,
1)定义两个变量
parties
和count,
2)每次线程将count减少1,如果减少完之后count!=0,进程竟然await 。如果count==0. signalAll所有线程,同时将count置为
parties,开始新的一轮
3)Generation是里面的一个标准,用于中断
CyclicBarrier等。所以新的一轮是会重新new 一个
Generation