Phaser详解

# 简介

一个可重用的同步屏障,在功能上类似于 CyclicBarrierCountDownLatch,但支持更灵活的使用。

注册:不同于其他屏障不同,在该同步器上注册的数量是可以随时改变的。任务可以在任意时刻注册(使用registerbulkRegister方法或者构造函数初始化数量的形式),可以在任意到达时注销(使用arriveAndDeregister方法)。与大多数基础同步工具一样,注册合注销只是改动内部计数,不会建立内部记录,所以无法查到任务是否已经注册。(可以通过继承此类来实现这一功能。)

同步:与CyclicBarrier一样,Phaser可以重复等待。ArriveAndAwaitAdvance方法与CyclicBarrier.await类似。每一代Phaser都关联一个phaser number,从零开始,当所有任务到达时递增,在达到Integer.MAX_VALUE时归零。通过下面两类方法,phaser number可以独立控制到达和等待其他任务的动作。

  • Arrival(到达机制)

# 源码分析

# 核心参数

private volatile long state;

private static final int  PARTIES_SHIFT   = 16;
private static final int  PHASE_SHIFT     = 32;
//0-15位 记录未到达数量
private static final int  UNARRIVED_MASK  = 0xffff;      // to mask ints
//16-31位 记录总数量 用于进行下一轮等待
private static final long PARTIES_MASK    = 0xffff0000L; // to mask longs
private static final long COUNTS_MASK     = 0xffffffffL;
//终止位
private static final long TERMINATION_BIT = 1L << 63;

1
2
3
4
5
6
7
8
9
10
11
12

Phaser使用一个longstate值来标识内部状态:

  • 0-15位表示当前轮未到达数量;
  • 16-31位表示当前轮总数量;
  • 32-62位表示phase当前代;
  • 63位表示当前phaser的终止状态。

# 构造函数

public Phaser() {
    this(null, 0);
}

public Phaser(int parties) {
    this(null, parties);
}

public Phaser(Phaser parent) {
    this(parent, 0);
}

public Phaser(Phaser parent, int parties) {
    //低16位为未到达数量,超出报错
    if (parties >>> PARTIES_SHIFT != 0)
        throw new IllegalArgumentException("Illegal number of parties");
    int phase = 0;
    this.parent = parent;
    if (parent != null) {
        final Phaser root = parent.root;
        this.root = root;
        this.evenQ = root.evenQ;
        this.oddQ = root.oddQ;
        if (parties != 0)
            phase = parent.doRegister(1);
    }
    else {
        this.root = this;
        this.evenQ = new AtomicReference<QNode>();
        this.oddQ = new AtomicReference<QNode>();
    }
    this.state = (parties == 0) ? (long)EMPTY :
        ((long)phase << PHASE_SHIFT) |
        ((long)parties << PARTIES_SHIFT) |
        ((long)parties);
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36

# doArrive

private int doArrive(int adjust) {
    final Phaser root = this.root;
    for (;;) {
        long s = (root == this) ? state : reconcileState();
        //当前进行的轮数
        int phase = (int)(s >>> PHASE_SHIFT);
        if (phase < 0)
            return phase;
        //取0-31位 其中低16未为未到达数量
        int counts = (int)s;
        int unarrived = (counts == EMPTY) ? 0 : (counts & UNARRIVED_MASK);
        if (unarrived <= 0)
            throw new IllegalStateException(badArrive(s));
        if (UNSAFE.compareAndSwapLong(this, stateOffset, s, s-=adjust)) {
            if (unarrived == 1) {
                //扣减前未到达数量是1,扣减后,那么这一轮全部到达
                //获取下一轮phaser数量
                long n = s & PARTIES_MASK;  // base of next state
                //右移16位为下一轮未到达数量
                int nextUnarrived = (int)n >>> PARTIES_SHIFT;
                //判断是否是根
                if (root == this) {
                    //判断是否终止
                    if (onAdvance(phase, nextUnarrived))
                        n |= TERMINATION_BIT;
                    else if (nextUnarrived == 0)
                        n |= EMPTY;
                    else
                        n |= nextUnarrived;
                    int nextPhase = (phase + 1) & MAX_PHASE;
                    n |= (long)nextPhase << PHASE_SHIFT;
                    //更新状态
                    UNSAFE.compareAndSwapLong(this, stateOffset, s, n);
                    //唤醒等待节点
                    releaseWaiters(phase);
                }
                else if (nextUnarrived == 0) { // propagate deregistration
                    //子节点下一轮无计数了,通知父节点扣除
                    phase = parent.doArrive(ONE_DEREGISTER);
                    UNSAFE.compareAndSwapLong(this, stateOffset,
                                              s, s | EMPTY);
                }
                else
                    //通知父节点子节点到达
                    phase = parent.doArrive(ONE_ARRIVAL);
            }
            return phase;
        }
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
上次更新: 2023/04/09, 16:34:32
最近更新
01
docker-compose笔记
01-12
02
MySQL数据迁移
11-27
03
Docker部署服务,避免PID=1
11-27
更多文章>