Condition 源码分析
Condition
结合 ReentrantLock
使用,实现线程竞争同一把锁,但是可以在不同条件下等待与唤醒的功能。
功能类似 synchronized
代码块中,锁对象可以使用 wait
、notify
、notifyALL
方法进行等待与唤醒操作,不过这些方法只能在锁对象上等待,而 Condition
可以创建多个条件。
# Condition 接口
public interface Condition {
void await() throws InterruptedException;
void awaitUninterruptibly();
long awaitNanos(long nanosTimeout) throws InterruptedException;
boolean await(long time, TimeUnit unit) throws InterruptedException;
boolean awaitUntil(Date deadline) throws InterruptedException;
void signal();
void signalAll();
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
接口主要定义了等待遇唤醒的方法。实现类是AbstractQueuedSynchronizer.ConditionObject
成员变量:
public class ConditionObject implements Condition, java.io.Serializable {
/** First node of condition queue. */
private transient Node firstWaiter;
/** Last node of condition queue. */
private transient Node lastWaiter;
}
2
3
4
5
6
内部使用AQS的Node对象保存线程信息,并按照等待顺序保存成链表结构,记录首尾节点信息。
# await 阻塞阶段
# condition的等待队列
public final void await() throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
//添加到该条件的等待队列中
Node node = addConditionWaiter();
//释放锁占用记录
int savedState = fullyRelease(node);
int interruptMode = 0;
while (!isOnSyncQueue(node)) {
//阻塞 等待
LockSupport.park(this);
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
if (node.nextWaiter != null) // clean up if cancelled
unlinkCancelledWaiters();
if (interruptMode != 0)
// 处理中断
reportInterruptAfterWait(interruptMode);
}
private Node addConditionWaiter() {
Node t = lastWaiter;
// If lastWaiter is cancelled, clean out.
if (t != null && t.waitStatus != Node.CONDITION) {
//清除链表中状态不为CONDITION的节点
unlinkCancelledWaiters();
t = lastWaiter;
}
Node node = new Node(Thread.currentThread(), Node.CONDITION);
if (t == null)
firstWaiter = node;
else
t.nextWaiter = node;
lastWaiter = node;
return node;
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
# 释放锁
final int fullyRelease(Node node) {
boolean failed = true;
try {
// 获取AQS的锁计数
int savedState = getState();
if (release(savedState)) {
failed = false;
return savedState;
} else {
throw new IllegalMonitorStateException();
}
} finally {
if (failed)
//释放失败 标记为CANCELLED状态
node.waitStatus = Node.CANCELLED;
}
}
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
// 当前线程是获取锁的,其他线程在AQS等待队列中,唤醒头部等待节点,防止信号量丢失
unparkSuccessor(h);
return true;
}
return false;
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
tryRelease
方法位于AbstractQueuedSynchronizer
中,留给之类实现,以ReentrantLock.Sync
为例:
protected final boolean tryRelease(int releases) {
int c = getState() - releases;
// 校验当前线程必须是获取锁的线程
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
if (c == 0) {
free = true;
//置空AQS独占线程,即释放锁
setExclusiveOwnerThread(null);
}
setState(c);
return free;
}
2
3
4
5
6
7
8
9
10
11
12
13
14
从这里可以看出,await
方法必须在lock.lock()
与lock.unlock()
之间使用,即必须在获取锁时使用。否则会抛出IllegalMonitorStateException
异常。
# 阻塞
while (!isOnSyncQueue(node)) {
//阻塞
LockSupport.park(this);
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
2
3
4
5
6
判断节点是否在AQS等待队列中,新建的节点首先会添加到condition
的等待队列中,不在AQS的等待队列。
final boolean isOnSyncQueue(Node node) {
if (node.waitStatus == Node.CONDITION || node.prev == null)
return false;
if (node.next != null) // If has successor, it must be on queue
return true;
return findNodeFromTail(node);
}
private boolean findNodeFromTail(Node node) {
Node t = tail;
for (;;) {
if (t == node)
return true;
if (t == null)
return false;
t = t.prev;
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
然后调用LockSupport.park(this);
线程阻塞,等待signal
或signalAll
方法执行。
# signal 传递信号量
public final void signal() {
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
Node first = firstWaiter;
if (first != null)
doSignal(first);
}
private void doSignal(Node first) {
do {
if ( (firstWaiter = first.nextWaiter) == null)
lastWaiter = null;
first.nextWaiter = null;
} while (!transferForSignal(first) &&
(first = firstWaiter) != null);
}
final boolean transferForSignal(Node node) {
// 设置是失败 那么当前节点是CANCELLED状态,返回false 外部同步信号量给下一个节点
if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
return false;
// 加入同步队列 得到前驱节点
Node p = enq(node);
int ws = p.waitStatus;
if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
// 前驱节点CANCELLED 唤醒当前线程
// 这里虽然唤醒了,但是节点不是同步队列的首节点,还是会被阻塞,这一步的意义是什么?
LockSupport.unpark(node.thread);
return true;
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
signal
方法将节点从条件队列转移到同步队列。如果前驱节点不是CANCELLED
状态,不会唤醒线程。
也就是进入同步队列,等待其他拥有锁的线程执行完,在释放锁的方法中调用unparkSuccessor
唤醒同步队列中的节点,当轮到这个转移节点时对应线程才会被唤醒。
需要注意的是这种转移节点的线程阻塞位置位于await
方法的LockSupport.park(this);
处。
# await 唤醒阶段
public final void await() throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
Node node = addConditionWaiter();
int savedState = fullyRelease(node);
int interruptMode = 0;
while (!isOnSyncQueue(node)) {
LockSupport.park(this);
// 唤醒后从这里继续
// 检查在阻塞阶段是否线程中断
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
//获取锁
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
if (node.nextWaiter != null) // clean up if cancelled
unlinkCancelledWaiters();
if (interruptMode != 0)
// 处理中断
reportInterruptAfterWait(interruptMode);
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
如果线程是中断唤醒,那break跳出循环。
如果是其他线程释放锁后唤醒,此时节点位于同步队列首位,isOnSyncQueue
方法为true,也会跳出循环。
中断时机有两种,一种是还在条件队列中被中断,一种是转移到同步队列后中断。
private int checkInterruptWhileWaiting(Node node) {
return Thread.interrupted() ? (transferAfterCancelledWait(node) ? THROW_IE : REINTERRUPT) : 0;
}
final boolean transferAfterCancelledWait(Node node) {
// 状态还是CONDITION,说明在条件队列时中断
if (compareAndSetWaitStatus(node, Node.CONDITION, 0)) {
//为啥还要加入同步队列呢?
enq(node);
return true;
}
//状态不是CONDITION了,但是还没加入同步队列 ,应该是signal方法还在执行enq 先让步,等会再看
while (!isOnSyncQueue(node))
Thread.yield();
return false;
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
在被唤醒后还会调用acquireQueued
方法获取锁,
如果是其他线程释放锁后唤醒的该线程,那么节点位于队首,执行该方法直接获取锁,移除同步队列。
如果不是队首(中断唤醒、或者是在其他线程调用signal
方法发现转移到同步队列后前驱节点CANCELLED
唤醒),那么会再次阻塞。
这里有个疑问,中断唤醒后为直接给节点标记为CANCELLED就行了呢,为啥还要在同步队列上等待呢,甚至在条件队列时中断的还要继续转移到同步队列,然后再阻塞等待。