CyclicBarrier详解

# 简介

允许一组线程全部等待彼此到达公共屏障点的同步辅助工具。 CyclicBarriers在涉及必须偶尔相互等待的固定大小线程组的程序中很有用。之所以称为循环屏障,是因为它可以在等待线程被释放后重新使用。 CyclicBarrier支持可选的Runnable命令,该命令在每个障碍点运行一次,在派对中的最后一个线程到达之后,但在释放任何线程之前。此屏障操作对于在任何一方继续之前更新共享状态很有用

# 示例

private class SportsMan{
    private String name;
    private CyclicBarrier cyclicBarrier;

    public SportsMan(String name, CyclicBarrier cyclicBarrier) {
        this.name = name;
        this.cyclicBarrier = cyclicBarrier;
    }

    public void doSports(){
        try {
            System.out.println("运动员" + name + "开始热身");
            Thread.sleep(new Random().nextInt(10) * 100);
            cyclicBarrier.await();
            System.out.println("运动员" + name + "开始运动");
            Thread.sleep(new Random().nextInt(10) * 100);
            cyclicBarrier.await();
            System.out.println("运动员" + name + "开始拉伸");
            Thread.sleep(new Random().nextInt(10) * 100);
            cyclicBarrier.await();
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }
    public static void main(String[] args){
        CyclicBarrier cyclicBarrier = new CyclicBarrier(3,()->{
            System.out.println("当前阶段全部完成");
        });
        for (int i = 0; i < 3; i++) {
            SportsMan man = new SportsMan(String.valueOf(i),cyclicBarrier);
            new Thread(man::doSports).start();
        }
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34

运行结果:

运动员1开始热身
运动员0开始热身
运动员2开始热身
当前阶段全部完成
运动员2开始运动
运动员0开始运动
运动员1开始运动
当前阶段全部完成
运动员1开始拉伸
运动员0开始拉伸
运动员2开始拉伸
当前阶段全部完成
1
2
3
4
5
6
7
8
9
10
11
12

# 源码分析

# 类属性

//当最后一个线程到达屏障点时,会重置屏障点以备下次循环使用,而其尚未被唤醒的线程还处于旧的屏障处,这里通过每次生成新的对象来做屏障点的区分
private static class Generation {
    boolean broken = false;
}

//控制进入屏障点的锁
private final ReentrantLock lock = new ReentrantLock();
//在屏障点等待进入下次循环的条件
private final Condition trip = lock.newCondition();
//参与计数
private final int parties;
//全部到达屏障点后可以执行的操作
private final Runnable barrierCommand;
//当前循环
private Generation generation = new Generation();
//未到达屏障点的线程数
private int count;

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18

# 构造函数

支持设置一个Runnable命令,在全部到达屏障点时执行。


public CyclicBarrier(int parties, Runnable barrierAction) {
    if (parties <= 0) throw new IllegalArgumentException();
    this.parties = parties;
    this.count = parties;
    this.barrierCommand = barrierAction;
}

public CyclicBarrier(int parties) {
    this(parties, null);
}

1
2
3
4
5
6
7
8
9
10
11
12

# await

await提供无限期等待与限期等待两种方法,底层通过doAwait方法实现

private int dowait(boolean timed, long nanos)
    throws InterruptedException, BrokenBarrierException,
            TimeoutException {
    final ReentrantLock lock = this.lock;
    //进入屏障点前获取锁
    lock.lock();
    try {
        final Generation g = generation;
        //检查当前屏障是否被破坏
        if (g.broken)
            throw new BrokenBarrierException();

        if (Thread.interrupted()) {
            //线程中断,标记屏障破坏
            breakBarrier();
            throw new InterruptedException();
        }
        //到达屏障点,计数减1
        int index = --count;
        if (index == 0) {  // tripped
            //最后一个到达屏障点
            boolean ranAction = false;
            try {
                //如果有到达屏障点命令,执行命令
                final Runnable command = barrierCommand;
                if (command != null)
                    command.run();
                ranAction = true;
                //重置屏障点,下次循环使用
                nextGeneration();
                return 0;
            } finally {
                if (!ranAction)
                    //执行命令失败,破坏屏障
                    breakBarrier();
            }
        }

        // 不是最后一个到达屏障点的,那么需要循环等待
        for (;;) {
            try {
                if (!timed)
                    trip.await();
                else if (nanos > 0L)
                    nanos = trip.awaitNanos(nanos);
            } catch (InterruptedException ie) {
                if (g == generation && ! g.broken) {
                    //出现异常,破坏屏障
                    breakBarrier();
                    throw ie;
                } else {
                    Thread.currentThread().interrupt();
                }
            }
            //屏障已被破坏 抛出异常
            if (g.broken)
                throw new BrokenBarrierException();
            //已生成下一次屏障
            if (g != generation)
                return index;

            if (timed && nanos <= 0L) {
                //超时,破坏屏障
                breakBarrier();
                throw new TimeoutException();
            }
        }
    } finally {
        lock.unlock();
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71

# nextGeneration

生成下一个版本,所有线程又可以重新进入屏障。

private void nextGeneration() {
    //唤醒其他等待线程
    trip.signalAll();
    //重置计数
    count = parties;
    generation = new Generation();
}
1
2
3
4
5
6
7

# 总结

CyclicBarrier是一个控制一组线程彼此等待完成事件的工具,并且在所有线程完成后,还可以再次让这组线程彼此等待完成下一件事。

CountDownLatch对比:

  1. CountDownLatch是一部分线程等待另一部分线程。
  2. CountDownLatch是一次性的,CyclicBarrier可重复使用。

有一点需要注意的是,在CyclicBarrier的重复使用过程中,每一次到达屏障点必须是固定数量的所有线程,才能继续重复下一次屏障,否则会阻塞或者超时。通俗一点说,就是这一组线程必须同进同退,不能说其中一个线程执行几轮后结束,剩余线程继续。

上次更新: 2023/04/09, 16:34:32
最近更新
01
docker-compose笔记
01-12
02
MySQL数据迁移
11-27
03
Docker部署服务,避免PID=1
11-27
更多文章>