CyclicBarrier详解
# 简介
允许一组线程全部等待彼此到达公共屏障点的同步辅助工具。 CyclicBarriers
在涉及必须偶尔相互等待的固定大小线程组的程序中很有用。之所以称为循环屏障,是因为它可以在等待线程被释放后重新使用。
CyclicBarrier支持可选的Runnable命令,该命令在每个障碍点运行一次,在派对中的最后一个线程到达之后,但在释放任何线程之前。此屏障操作对于在任何一方继续之前更新共享状态很有用
# 示例
private class SportsMan{
private String name;
private CyclicBarrier cyclicBarrier;
public SportsMan(String name, CyclicBarrier cyclicBarrier) {
this.name = name;
this.cyclicBarrier = cyclicBarrier;
}
public void doSports(){
try {
System.out.println("运动员" + name + "开始热身");
Thread.sleep(new Random().nextInt(10) * 100);
cyclicBarrier.await();
System.out.println("运动员" + name + "开始运动");
Thread.sleep(new Random().nextInt(10) * 100);
cyclicBarrier.await();
System.out.println("运动员" + name + "开始拉伸");
Thread.sleep(new Random().nextInt(10) * 100);
cyclicBarrier.await();
} catch (Exception e) {
throw new RuntimeException(e);
}
}
public static void main(String[] args){
CyclicBarrier cyclicBarrier = new CyclicBarrier(3,()->{
System.out.println("当前阶段全部完成");
});
for (int i = 0; i < 3; i++) {
SportsMan man = new SportsMan(String.valueOf(i),cyclicBarrier);
new Thread(man::doSports).start();
}
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
运行结果:
运动员1开始热身
运动员0开始热身
运动员2开始热身
当前阶段全部完成
运动员2开始运动
运动员0开始运动
运动员1开始运动
当前阶段全部完成
运动员1开始拉伸
运动员0开始拉伸
运动员2开始拉伸
当前阶段全部完成
1
2
3
4
5
6
7
8
9
10
11
12
2
3
4
5
6
7
8
9
10
11
12
# 源码分析
# 类属性
//当最后一个线程到达屏障点时,会重置屏障点以备下次循环使用,而其尚未被唤醒的线程还处于旧的屏障处,这里通过每次生成新的对象来做屏障点的区分
private static class Generation {
boolean broken = false;
}
//控制进入屏障点的锁
private final ReentrantLock lock = new ReentrantLock();
//在屏障点等待进入下次循环的条件
private final Condition trip = lock.newCondition();
//参与计数
private final int parties;
//全部到达屏障点后可以执行的操作
private final Runnable barrierCommand;
//当前循环
private Generation generation = new Generation();
//未到达屏障点的线程数
private int count;
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
# 构造函数
支持设置一个Runnable
命令,在全部到达屏障点时执行。
public CyclicBarrier(int parties, Runnable barrierAction) {
if (parties <= 0) throw new IllegalArgumentException();
this.parties = parties;
this.count = parties;
this.barrierCommand = barrierAction;
}
public CyclicBarrier(int parties) {
this(parties, null);
}
1
2
3
4
5
6
7
8
9
10
11
12
2
3
4
5
6
7
8
9
10
11
12
# await
await
提供无限期等待与限期等待两种方法,底层通过doAwait
方法实现
private int dowait(boolean timed, long nanos)
throws InterruptedException, BrokenBarrierException,
TimeoutException {
final ReentrantLock lock = this.lock;
//进入屏障点前获取锁
lock.lock();
try {
final Generation g = generation;
//检查当前屏障是否被破坏
if (g.broken)
throw new BrokenBarrierException();
if (Thread.interrupted()) {
//线程中断,标记屏障破坏
breakBarrier();
throw new InterruptedException();
}
//到达屏障点,计数减1
int index = --count;
if (index == 0) { // tripped
//最后一个到达屏障点
boolean ranAction = false;
try {
//如果有到达屏障点命令,执行命令
final Runnable command = barrierCommand;
if (command != null)
command.run();
ranAction = true;
//重置屏障点,下次循环使用
nextGeneration();
return 0;
} finally {
if (!ranAction)
//执行命令失败,破坏屏障
breakBarrier();
}
}
// 不是最后一个到达屏障点的,那么需要循环等待
for (;;) {
try {
if (!timed)
trip.await();
else if (nanos > 0L)
nanos = trip.awaitNanos(nanos);
} catch (InterruptedException ie) {
if (g == generation && ! g.broken) {
//出现异常,破坏屏障
breakBarrier();
throw ie;
} else {
Thread.currentThread().interrupt();
}
}
//屏障已被破坏 抛出异常
if (g.broken)
throw new BrokenBarrierException();
//已生成下一次屏障
if (g != generation)
return index;
if (timed && nanos <= 0L) {
//超时,破坏屏障
breakBarrier();
throw new TimeoutException();
}
}
} finally {
lock.unlock();
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
# nextGeneration
生成下一个版本,所有线程又可以重新进入屏障。
private void nextGeneration() {
//唤醒其他等待线程
trip.signalAll();
//重置计数
count = parties;
generation = new Generation();
}
1
2
3
4
5
6
7
2
3
4
5
6
7
# 总结
CyclicBarrier
是一个控制一组线程彼此等待完成事件的工具,并且在所有线程完成后,还可以再次让这组线程彼此等待完成下一件事。
与CountDownLatch
对比:
CountDownLatch
是一部分线程等待另一部分线程。CountDownLatch
是一次性的,CyclicBarrier
可重复使用。
有一点需要注意的是,在CyclicBarrier
的重复使用过程中,每一次到达屏障点必须是固定数量的所有线程,才能继续重复下一次屏障,否则会阻塞或者超时。通俗一点说,就是这一组线程必须同进同退,不能说其中一个线程执行几轮后结束,剩余线程继续。
编辑 (opens new window)
上次更新: 2023/04/09, 16:34:32