Condition 源码分析

Condition 结合 ReentrantLock 使用,实现线程竞争同一把锁,但是可以在不同条件下等待与唤醒的功能。

功能类似 synchronized 代码块中,锁对象可以使用 waitnotifynotifyALL 方法进行等待与唤醒操作,不过这些方法只能在锁对象上等待,而 Condition 可以创建多个条件。

# Condition 接口

public interface Condition {

    void await() throws InterruptedException;

    void awaitUninterruptibly();

    long awaitNanos(long nanosTimeout) throws InterruptedException;

    boolean await(long time, TimeUnit unit) throws InterruptedException;
  
    boolean awaitUntil(Date deadline) throws InterruptedException;

    void signal();

    void signalAll();
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16

接口主要定义了等待遇唤醒的方法。实现类是AbstractQueuedSynchronizer.ConditionObject

成员变量:

public class ConditionObject implements Condition, java.io.Serializable {
    /** First node of condition queue. */
    private transient Node firstWaiter;
    /** Last node of condition queue. */
    private transient Node lastWaiter;
}
1
2
3
4
5
6

内部使用AQS的Node对象保存线程信息,并按照等待顺序保存成链表结构,记录首尾节点信息。

# await 阻塞阶段

# condition的等待队列

public final void await() throws InterruptedException {
    if (Thread.interrupted())
        throw new InterruptedException();
    //添加到该条件的等待队列中
    Node node = addConditionWaiter();
    //释放锁占用记录 
    int savedState = fullyRelease(node);
    int interruptMode = 0;
    while (!isOnSyncQueue(node)) {
        //阻塞 等待
        LockSupport.park(this);
        if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
            break;
    }
    if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
        interruptMode = REINTERRUPT;
    if (node.nextWaiter != null) // clean up if cancelled
        unlinkCancelledWaiters();
    if (interruptMode != 0)
        // 处理中断
        reportInterruptAfterWait(interruptMode);
}

private Node addConditionWaiter() {
    Node t = lastWaiter;
    // If lastWaiter is cancelled, clean out.
    if (t != null && t.waitStatus != Node.CONDITION) {
        //清除链表中状态不为CONDITION的节点
        unlinkCancelledWaiters();
        t = lastWaiter;
    }
    Node node = new Node(Thread.currentThread(), Node.CONDITION);
    if (t == null)
        firstWaiter = node;
    else
        t.nextWaiter = node;
    lastWaiter = node;
    return node;
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39

# 释放锁

final int fullyRelease(Node node) {
    boolean failed = true;
    try {
        // 获取AQS的锁计数
        int savedState = getState();
        if (release(savedState)) {
            failed = false;
            return savedState;
        } else {
            throw new IllegalMonitorStateException();
        }
    } finally {
        if (failed)
            //释放失败 标记为CANCELLED状态
            node.waitStatus = Node.CANCELLED;
    }
}

public final boolean release(int arg) {
    if (tryRelease(arg)) {
        Node h = head;
        if (h != null && h.waitStatus != 0)
            // 当前线程是获取锁的,其他线程在AQS等待队列中,唤醒头部等待节点,防止信号量丢失
            unparkSuccessor(h);
        return true;
    }
    return false;
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28

tryRelease方法位于AbstractQueuedSynchronizer中,留给之类实现,以ReentrantLock.Sync为例:

protected final boolean tryRelease(int releases) {
    int c = getState() - releases;
    // 校验当前线程必须是获取锁的线程
    if (Thread.currentThread() != getExclusiveOwnerThread())
        throw new IllegalMonitorStateException();
    boolean free = false;
    if (c == 0) {
        free = true;
        //置空AQS独占线程,即释放锁
        setExclusiveOwnerThread(null);
    }
    setState(c);
    return free;
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14

从这里可以看出,await方法必须在lock.lock()lock.unlock()之间使用,即必须在获取锁时使用。否则会抛出IllegalMonitorStateException异常。

# 阻塞

while (!isOnSyncQueue(node)) {
    //阻塞 
    LockSupport.park(this);
    if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
        break;
}
1
2
3
4
5
6

判断节点是否在AQS等待队列中,新建的节点首先会添加到condition的等待队列中,不在AQS的等待队列。

final boolean isOnSyncQueue(Node node) {
    if (node.waitStatus == Node.CONDITION || node.prev == null)
        return false;
    if (node.next != null) // If has successor, it must be on queue
        return true;
    return findNodeFromTail(node);
}
private boolean findNodeFromTail(Node node) {
    Node t = tail;
    for (;;) {
        if (t == node)
            return true;
        if (t == null)
            return false;
        t = t.prev;
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17

然后调用LockSupport.park(this);线程阻塞,等待signalsignalAll方法执行。

# signal 传递信号量

public final void signal() {
    if (!isHeldExclusively())
        throw new IllegalMonitorStateException();
    Node first = firstWaiter;
    if (first != null)
        doSignal(first);
}

private void doSignal(Node first) {
    do {
        if ( (firstWaiter = first.nextWaiter) == null)
            lastWaiter = null;
        first.nextWaiter = null;
    } while (!transferForSignal(first) &&
                (first = firstWaiter) != null);
}

final boolean transferForSignal(Node node) {
    // 设置是失败 那么当前节点是CANCELLED状态,返回false 外部同步信号量给下一个节点
    if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
        return false;
    // 加入同步队列 得到前驱节点
    Node p = enq(node);
    int ws = p.waitStatus;
    if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
        // 前驱节点CANCELLED 唤醒当前线程 
        // 这里虽然唤醒了,但是节点不是同步队列的首节点,还是会被阻塞,这一步的意义是什么?
        LockSupport.unpark(node.thread);
    return true;
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30

signal方法将节点从条件队列转移到同步队列。如果前驱节点不是CANCELLED状态,不会唤醒线程。

也就是进入同步队列,等待其他拥有锁的线程执行完,在释放锁的方法中调用unparkSuccessor唤醒同步队列中的节点,当轮到这个转移节点时对应线程才会被唤醒。

需要注意的是这种转移节点的线程阻塞位置位于await方法的LockSupport.park(this);处。

# await 唤醒阶段

public final void await() throws InterruptedException {
    if (Thread.interrupted())
        throw new InterruptedException();
    Node node = addConditionWaiter();
    int savedState = fullyRelease(node);
    int interruptMode = 0;
    while (!isOnSyncQueue(node)) {
        LockSupport.park(this);
        // 唤醒后从这里继续
        // 检查在阻塞阶段是否线程中断
        if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
            break;
    }
    //获取锁
    if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
        interruptMode = REINTERRUPT;
    if (node.nextWaiter != null) // clean up if cancelled
        unlinkCancelledWaiters();
    if (interruptMode != 0)
        // 处理中断
        reportInterruptAfterWait(interruptMode);
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22

如果线程是中断唤醒,那break跳出循环。

如果是其他线程释放锁后唤醒,此时节点位于同步队列首位,isOnSyncQueue方法为true,也会跳出循环。

中断时机有两种,一种是还在条件队列中被中断,一种是转移到同步队列后中断。

private int checkInterruptWhileWaiting(Node node) {
    return Thread.interrupted() ? (transferAfterCancelledWait(node) ? THROW_IE : REINTERRUPT) : 0;
}

final boolean transferAfterCancelledWait(Node node) {
    // 状态还是CONDITION,说明在条件队列时中断
    if (compareAndSetWaitStatus(node, Node.CONDITION, 0)) {
        //为啥还要加入同步队列呢?
        enq(node);
        return true;
    }
    //状态不是CONDITION了,但是还没加入同步队列 ,应该是signal方法还在执行enq 先让步,等会再看
    while (!isOnSyncQueue(node))
        Thread.yield();
    return false;
}        
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16

在被唤醒后还会调用acquireQueued方法获取锁,

如果是其他线程释放锁后唤醒的该线程,那么节点位于队首,执行该方法直接获取锁,移除同步队列。

如果不是队首(中断唤醒、或者是在其他线程调用signal方法发现转移到同步队列后前驱节点CANCELLED唤醒),那么会再次阻塞。

这里有个疑问,中断唤醒后为直接给节点标记为CANCELLED就行了呢,为啥还要在同步队列上等待呢,甚至在条件队列时中断的还要继续转移到同步队列,然后再阻塞等待。

上次更新: 2023/04/09, 16:34:32
最近更新
01
docker-compose笔记
01-12
02
MySQL数据迁移
11-27
03
Docker部署服务,避免PID=1
11-27
更多文章>