AbstractQueuedSynchronizer源码分析
AQS 是一个用来构建锁和同步器的框架。
AQS核心思想是,如果被请求的共享资源空闲,则将当前请求资源的线程设置为有效的工作线程,并且将共享资源设置为锁定状态。如果被请求的共享资源被占用,那么就需要一套线程阻塞等待以及被唤醒时锁分配的机制,这个机制AQS是用CLH队列锁实现的,即将暂时获取不到锁的线程加入到队列中。
成员变量如下:
//队列头
private transient volatile Node head;
//队列尾
private transient volatile Node tail;
//资源锁定状态 同时也是加锁计数器
private volatile int state;
2
3
4
5
6
# Node对象存储线程节点信息
static final class Node {
/** 共享模式 */
static final Node SHARED = new Node();
/** 独占模式 */
static final Node EXCLUSIVE = null;
/** 当前线程被取消 */
static final int CANCELLED = 1;
/** waitStatus value to indicate successor's thread needs unparking */
static final int SIGNAL = -1;
/** 当前节点在等待condition,也就是在condition队列中 */
static final int CONDITION = -2;
/** 当前场景下后续的acquireShared能够得以执行 */
static final int PROPAGATE = -3;
//节点状态
volatile int waitStatus;
// 前驱节点
volatile Node prev;
//后继节点
volatile Node next;
//入队的线程
volatile Thread thread;
Node nextWaiter;
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
每个线程被阻塞的线程都会被封装成一个Node结点,放入队列。每个节点包含了一个Thread类型的引用,并且每个节点都存在一个状态,具体状态如下。
CANCELLED
,值为1,表示当前的线程被取消。SIGNAL
,值为-1,表示当前节点的后继节点包含的线程需要运行,需要进行unpark操作。CONDITION
,值为-2,表示当前节点在等待condition,也就是在condition queue中。PROPAGATE
,值为-3,表示当前场景下后续的acquireShared能够得以执行。值为0,表示当前节点在sync queue中,等待着获取锁
# 通过CAS修改节点状态
初始化关键字段在类中的偏移量
private static final Unsafe unsafe = Unsafe.getUnsafe();
private static final long stateOffset;
private static final long headOffset;
private static final long tailOffset;
private static final long waitStatusOffset;
private static final long nextOffset;
static {
try {
stateOffset = unsafe.objectFieldOffset
(AbstractQueuedSynchronizer.class.getDeclaredField("state"));
headOffset = unsafe.objectFieldOffset
(AbstractQueuedSynchronizer.class.getDeclaredField("head"));
tailOffset = unsafe.objectFieldOffset
(AbstractQueuedSynchronizer.class.getDeclaredField("tail"));
waitStatusOffset = unsafe.objectFieldOffset
(Node.class.getDeclaredField("waitStatus"));
nextOffset = unsafe.objectFieldOffset
(Node.class.getDeclaredField("next"));
} catch (Exception ex) { throw new Error(ex); }
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
加锁状态、出队入队都是通过 CAS 机制完成。使用 Unsafe 类获取关键字段偏移量,通过 Unsafe 类提供的方法进行修改。
protected final boolean compareAndSetState(int expect, int update) {
return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
}
private final boolean compareAndSetHead(Node update) {
return unsafe.compareAndSwapObject(this, headOffset, null, update);
}
private static final boolean compareAndSetWaitStatus(Node node, int expect, int update) {
return unsafe.compareAndSwapInt(node, waitStatusOffset, expect, update);
}
private static final boolean compareAndSetNext(Node node, Node expect, Node update) {
return unsafe.compareAndSwapObject(node, nextOffset, expect, update);
}
2
3
4
5
6
7
8
9
10
11
12
13
14
# 抢占锁
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
// 独占模式 加入等待队列
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
2
3
4
5
6
tryAcquire方法留给子类实现,有公平锁与非公平锁的实现方式。
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
Node pred = tail;
if (pred != null) {
// 构建新的尾节点,前驱指向旧的尾节点
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
//循环CAS入队
enq(node);
return node;
}
private Node enq(final Node node) {
for (;;) {
Node t = tail;
if (t == null) {
// 队列头是一个空Node
if (compareAndSetHead(new Node()))
tail = head;
} else {
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
// 前驱节点
final Node p = node.predecessor();
// 前驱结点是head,则轮到当前节点去获取锁
if (p == head && tryAcquire(arg)) {
//获取锁成功,更新头节点
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
// 未轮到当前节点获取锁 或者获取锁失败 进入阻塞状态
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
// 除了unpark,中断也会会唤醒 park的线程,记录一下中断返回给调用方
interrupted = true;
}
} finally {
if (failed)
//异常跳出循环才会走到这里
cancelAcquire(node);
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
未抢占成功的线程被阻塞,当再次被唤醒时,检查前驱节点是否是队首,如果是则说明轮到自己去抢占锁。抢占失败或未轮到自己,则重新进入阻塞状态。
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus;
if (ws == Node.SIGNAL)
//前驱节点被设置为SIGNAL状态,说明前驱节点已经阻塞,那么当前线程也是可以阻塞的,直接返回true
return true;
if (ws > 0) {
// CANCELLED 状态的从队列中去除
// 这里不用考虑并发是因为节点只会在尾部追加 变为CANCELLED状态的节点不会再修改成其它状态
// 所以不会有非CANCELLED节点被跳过
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
//Node节点waitStatus初始值是0 因为获取锁失败,将前驱节点状态改为SIGNAL 外部进行重试,在下次重试时可以返回true
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
private final boolean parkAndCheckInterrupt() {
LockSupport.park(this);
return Thread.interrupted();
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
申请锁出错,清除出等待队列。这里需要注意的是防止信号量丢失,导致无法唤醒等待队列中的线程节点。
private void cancelAcquire(Node node) {
// Ignore if node doesn't exist
if (node == null)
return;
node.thread = null;
// 跳过取消状态的节点 找到有效的前驱结点
Node pred = node.prev;
while (pred.waitStatus > 0)
node.prev = pred = pred.prev;
// 有效前驱节点的后继 后面修改用
Node predNext = pred.next;
// 变更状态不受其他线程影响
// 其他线程在获取锁时 在shouldParkAfterFailedAcquire方法中,会自动将前驱节点跳过该节点
node.waitStatus = Node.CANCELLED;
// 当前节点是尾结点,那么后继可以置空了
// 如果compareAndSetTail执行失败,说明有并发,尾结点不是当前节点了
if (node == tail && compareAndSetTail(node, pred)) {
compareAndSetNext(pred, predNext, null);
} else {
// If successor needs signal, try to set pred's next-link
// so it will get one. Otherwise wake it up to propagate.
int ws;
// 如果前驱节点等待获取信号量,则从等待队列中移除当前节点
if (pred != head &&
((ws = pred.waitStatus) == Node.SIGNAL ||
(ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) &&
pred.thread != null) {
Node next = node.next;
//这里为啥不更新next节点的前驱来加快GC呢?
if (next != null && next.waitStatus <= 0)
compareAndSetNext(pred, predNext, next);
} else {
// 走入该分支 有两种情况
// 1. 当前节点是队首待执行的节点,所以要唤醒后继节点
// 2. 前驱节点也CANCEL了 (这种情况下,假如前驱节点是队首待执行节点,虽然在他的线程中也会走到这里,去唤醒后继节点,但是可能此时当前节点还未设为CANCEL,那么选中的后继节点就是当前节点了) 为了防止信号量丢失 也要唤醒后继节点
unparkSuccessor(node);
}
node.next = node; // help GC
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
唤醒后继节点
private void unparkSuccessor(Node node) {
int ws = node.waitStatus;
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
// 如果后继节点也CANCEL了,那么就从尾部向前遍历 找排在最前的等待中的节点
// 不能从前向后找, 因为节点已经CANCEL 可能后继节点也被置空了
Node s = node.next;
if (s == null || s.waitStatus > 0) {
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
if (s != null)
LockSupport.unpark(s.thread);
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
# 可中断式抢占
区别点就是被中断唤醒的话直接抛出InterruptedException
异常
private void doAcquireInterruptibly(int arg)
throws InterruptedException {
final Node node = addWaiter(Node.EXCLUSIVE);
boolean failed = true;
try {
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
# 计时等待式抢占
使用LockSupport
的计时阻塞方法parkNanos
,超时唤醒线程。
并且在循环中加入了计时判断,超时跳出循环,返回调用方抢占结果。
private boolean doAcquireNanos(int arg, long nanosTimeout)
throws InterruptedException {
if (nanosTimeout <= 0L)
return false;
final long deadline = System.nanoTime() + nanosTimeout;
final Node node = addWaiter(Node.EXCLUSIVE);
boolean failed = true;
try {
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return true;
}
nanosTimeout = deadline - System.nanoTime();
if (nanosTimeout <= 0L)
//超时跳出循环
return false;
if (shouldParkAfterFailedAcquire(p, node) &&
nanosTimeout > spinForTimeoutThreshold)
//计时阻塞
LockSupport.parkNanos(this, nanosTimeout);
if (Thread.interrupted())
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
# 锁释放
tryRelease
方法由子类实现,主要是扣减锁计数。
锁释放后,需要主动唤醒后继等待节点,传递信号量。
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
//不为0, 说明后继节点需要信号量,唤醒后继节点
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
2
3
4
5
6
7
8
9
10
这里需要注意一点,对于重入锁,会多次调用release
方法,但只有最后一次锁计数为0时才真正需要唤醒后继节点。
以ReentrantLock.Sync
实现的tryRelease
为例:
protected final boolean tryRelease(int releases) {
int c = getState() - releases;
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
// 锁计数为0 该方法才会返回true
if (c == 0) {
free = true;
setExclusiveOwnerThread(null);
}
setState(c);
return free;
}
2
3
4
5
6
7
8
9
10
11
12
13
# AbstractQueuedLongSynchronizer
功能与 AbstractQueuedSynchronizer
一致,只不过锁计数是long
类型,可以支持更大数量。