Phaser详解
# 简介
一个可重用的同步屏障,在功能上类似于 CyclicBarrier
和 CountDownLatch
,但支持更灵活的使用。
注册:不同于其他屏障不同,在该同步器上注册的数量是可以随时改变的。任务可以在任意时刻注册(使用register
、bulkRegister
方法或者构造函数初始化数量的形式),可以在任意到达时注销(使用arriveAndDeregister
方法)。与大多数基础同步工具一样,注册合注销只是改动内部计数,不会建立内部记录,所以无法查到任务是否已经注册。(可以通过继承此类来实现这一功能。)
同步:与CyclicBarrier
一样,Phaser
可以重复等待。ArriveAndAwaitAdvance
方法与CyclicBarrier.await
类似。每一代Phaser
都关联一个phaser number
,从零开始,当所有任务到达时递增,在达到Integer.MAX_VALUE
时归零。通过下面两类方法,phaser number
可以独立控制到达和等待其他任务的动作。
Arrival
(到达机制)
# 源码分析
# 核心参数
private volatile long state;
private static final int PARTIES_SHIFT = 16;
private static final int PHASE_SHIFT = 32;
//0-15位 记录未到达数量
private static final int UNARRIVED_MASK = 0xffff; // to mask ints
//16-31位 记录总数量 用于进行下一轮等待
private static final long PARTIES_MASK = 0xffff0000L; // to mask longs
private static final long COUNTS_MASK = 0xffffffffL;
//终止位
private static final long TERMINATION_BIT = 1L << 63;
1
2
3
4
5
6
7
8
9
10
11
12
2
3
4
5
6
7
8
9
10
11
12
Phaser
使用一个long
型state
值来标识内部状态:
- 0-15位表示当前轮未到达数量;
- 16-31位表示当前轮总数量;
- 32-62位表示phase当前代;
- 63位表示当前phaser的终止状态。
# 构造函数
public Phaser() {
this(null, 0);
}
public Phaser(int parties) {
this(null, parties);
}
public Phaser(Phaser parent) {
this(parent, 0);
}
public Phaser(Phaser parent, int parties) {
//低16位为未到达数量,超出报错
if (parties >>> PARTIES_SHIFT != 0)
throw new IllegalArgumentException("Illegal number of parties");
int phase = 0;
this.parent = parent;
if (parent != null) {
final Phaser root = parent.root;
this.root = root;
this.evenQ = root.evenQ;
this.oddQ = root.oddQ;
if (parties != 0)
phase = parent.doRegister(1);
}
else {
this.root = this;
this.evenQ = new AtomicReference<QNode>();
this.oddQ = new AtomicReference<QNode>();
}
this.state = (parties == 0) ? (long)EMPTY :
((long)phase << PHASE_SHIFT) |
((long)parties << PARTIES_SHIFT) |
((long)parties);
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
# doArrive
private int doArrive(int adjust) {
final Phaser root = this.root;
for (;;) {
long s = (root == this) ? state : reconcileState();
//当前进行的轮数
int phase = (int)(s >>> PHASE_SHIFT);
if (phase < 0)
return phase;
//取0-31位 其中低16未为未到达数量
int counts = (int)s;
int unarrived = (counts == EMPTY) ? 0 : (counts & UNARRIVED_MASK);
if (unarrived <= 0)
throw new IllegalStateException(badArrive(s));
if (UNSAFE.compareAndSwapLong(this, stateOffset, s, s-=adjust)) {
if (unarrived == 1) {
//扣减前未到达数量是1,扣减后,那么这一轮全部到达
//获取下一轮phaser数量
long n = s & PARTIES_MASK; // base of next state
//右移16位为下一轮未到达数量
int nextUnarrived = (int)n >>> PARTIES_SHIFT;
//判断是否是根
if (root == this) {
//判断是否终止
if (onAdvance(phase, nextUnarrived))
n |= TERMINATION_BIT;
else if (nextUnarrived == 0)
n |= EMPTY;
else
n |= nextUnarrived;
int nextPhase = (phase + 1) & MAX_PHASE;
n |= (long)nextPhase << PHASE_SHIFT;
//更新状态
UNSAFE.compareAndSwapLong(this, stateOffset, s, n);
//唤醒等待节点
releaseWaiters(phase);
}
else if (nextUnarrived == 0) { // propagate deregistration
//子节点下一轮无计数了,通知父节点扣除
phase = parent.doArrive(ONE_DEREGISTER);
UNSAFE.compareAndSwapLong(this, stateOffset,
s, s | EMPTY);
}
else
//通知父节点子节点到达
phase = parent.doArrive(ONE_ARRIVAL);
}
return phase;
}
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
编辑 (opens new window)
上次更新: 2023/04/09, 16:34:32