AbstractQueuedSynchronizer源码分析

AQS 是一个用来构建锁和同步器的框架。

AQS核心思想是,如果被请求的共享资源空闲,则将当前请求资源的线程设置为有效的工作线程,并且将共享资源设置为锁定状态。如果被请求的共享资源被占用,那么就需要一套线程阻塞等待以及被唤醒时锁分配的机制,这个机制AQS是用CLH队列锁实现的,即将暂时获取不到锁的线程加入到队列中。

成员变量如下:

//队列头
private transient volatile Node head;
//队列尾
private transient volatile Node tail;
//资源锁定状态 同时也是加锁计数器
private volatile int state;
1
2
3
4
5
6

# Node对象存储线程节点信息

static final class Node {
    /** 共享模式 */
    static final Node SHARED = new Node();
    /** 独占模式 */
    static final Node EXCLUSIVE = null;

    /** 当前线程被取消 */
    static final int CANCELLED =  1;
    /** waitStatus value to indicate successor's thread needs unparking */
    static final int SIGNAL    = -1;
    /** 当前节点在等待condition,也就是在condition队列中 */
    static final int CONDITION = -2;
    /** 当前场景下后续的acquireShared能够得以执行 */
    static final int PROPAGATE = -3;

    //节点状态
    volatile int waitStatus;
    // 前驱节点
    volatile Node prev;

    //后继节点
    volatile Node next;

    //入队的线程
    volatile Thread thread;

    Node nextWaiter;
}

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29

每个线程被阻塞的线程都会被封装成一个Node结点,放入队列。每个节点包含了一个Thread类型的引用,并且每个节点都存在一个状态,具体状态如下。

  • CANCELLED,值为1,表示当前的线程被取消。

  • SIGNAL,值为-1,表示当前节点的后继节点包含的线程需要运行,需要进行unpark操作。

  • CONDITION,值为-2,表示当前节点在等待condition,也就是在condition queue中。

  • PROPAGATE,值为-3,表示当前场景下后续的acquireShared能够得以执行。

  • 值为0,表示当前节点在sync queue中,等待着获取锁

# 通过CAS修改节点状态

初始化关键字段在类中的偏移量

private static final Unsafe unsafe = Unsafe.getUnsafe();
private static final long stateOffset;
private static final long headOffset;
private static final long tailOffset;
private static final long waitStatusOffset;
private static final long nextOffset;

static {
    try {
        stateOffset = unsafe.objectFieldOffset
            (AbstractQueuedSynchronizer.class.getDeclaredField("state"));
        headOffset = unsafe.objectFieldOffset
            (AbstractQueuedSynchronizer.class.getDeclaredField("head"));
        tailOffset = unsafe.objectFieldOffset
            (AbstractQueuedSynchronizer.class.getDeclaredField("tail"));
        waitStatusOffset = unsafe.objectFieldOffset
            (Node.class.getDeclaredField("waitStatus"));
        nextOffset = unsafe.objectFieldOffset
            (Node.class.getDeclaredField("next"));

    } catch (Exception ex) { throw new Error(ex); }
}

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23

加锁状态、出队入队都是通过 CAS 机制完成。使用 Unsafe 类获取关键字段偏移量,通过 Unsafe 类提供的方法进行修改。

protected final boolean compareAndSetState(int expect, int update) {
    return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
}

private final boolean compareAndSetHead(Node update) {
    return unsafe.compareAndSwapObject(this, headOffset, null, update);
}

private static final boolean compareAndSetWaitStatus(Node node, int expect, int update) {
    return unsafe.compareAndSwapInt(node, waitStatusOffset, expect, update);
}
private static final boolean compareAndSetNext(Node node, Node expect, Node update) {
    return unsafe.compareAndSwapObject(node, nextOffset, expect, update);
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14

# 抢占锁

public final void acquire(int arg) {
    if (!tryAcquire(arg) &&
        // 独占模式 加入等待队列
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        selfInterrupt();
}
1
2
3
4
5
6

tryAcquire方法留给子类实现,有公平锁与非公平锁的实现方式。

private Node addWaiter(Node mode) {
    Node node = new Node(Thread.currentThread(), mode);
    Node pred = tail;
    if (pred != null) {
        // 构建新的尾节点,前驱指向旧的尾节点
        node.prev = pred;
        if (compareAndSetTail(pred, node)) {
            pred.next = node;
            return node;
        }
    }
    //循环CAS入队
    enq(node);
    return node;
}
private Node enq(final Node node) {
    for (;;) {
        Node t = tail;
        if (t == null) { 
            // 队列头是一个空Node
            if (compareAndSetHead(new Node()))
                tail = head;
        } else {
            node.prev = t;
            if (compareAndSetTail(t, node)) {
                t.next = node;
                return t;
            }
        }
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
final boolean acquireQueued(final Node node, int arg) {
    boolean failed = true;
    try {
        boolean interrupted = false;
        for (;;) {
            // 前驱节点
            final Node p = node.predecessor();
            // 前驱结点是head,则轮到当前节点去获取锁
            if (p == head && tryAcquire(arg)) {
                //获取锁成功,更新头节点
                setHead(node);
                p.next = null; // help GC
                failed = false;
                return interrupted;
            }
            // 未轮到当前节点获取锁 或者获取锁失败 进入阻塞状态
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                // 除了unpark,中断也会会唤醒 park的线程,记录一下中断返回给调用方
                interrupted = true;
        }
    } finally {
        if (failed)
            //异常跳出循环才会走到这里
            cancelAcquire(node);
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27

未抢占成功的线程被阻塞,当再次被唤醒时,检查前驱节点是否是队首,如果是则说明轮到自己去抢占锁。抢占失败或未轮到自己,则重新进入阻塞状态。

private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
    int ws = pred.waitStatus;
    if (ws == Node.SIGNAL)
        //前驱节点被设置为SIGNAL状态,说明前驱节点已经阻塞,那么当前线程也是可以阻塞的,直接返回true
        return true;
    if (ws > 0) {
        // CANCELLED  状态的从队列中去除
        // 这里不用考虑并发是因为节点只会在尾部追加  变为CANCELLED状态的节点不会再修改成其它状态
        // 所以不会有非CANCELLED节点被跳过
        do {
            node.prev = pred = pred.prev;
        } while (pred.waitStatus > 0);
        pred.next = node;
    } else {
        //Node节点waitStatus初始值是0 因为获取锁失败,将前驱节点状态改为SIGNAL 外部进行重试,在下次重试时可以返回true
        compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
    }
    return false;
}

private final boolean parkAndCheckInterrupt() {
    LockSupport.park(this);
    return Thread.interrupted();
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24

申请锁出错,清除出等待队列。这里需要注意的是防止信号量丢失,导致无法唤醒等待队列中的线程节点。

private void cancelAcquire(Node node) {
    // Ignore if node doesn't exist
    if (node == null)
        return;

    node.thread = null;

    // 跳过取消状态的节点 找到有效的前驱结点
    Node pred = node.prev;
    while (pred.waitStatus > 0)
        node.prev = pred = pred.prev;

    // 有效前驱节点的后继 后面修改用
    Node predNext = pred.next;
    // 变更状态不受其他线程影响
    // 其他线程在获取锁时 在shouldParkAfterFailedAcquire方法中,会自动将前驱节点跳过该节点
    node.waitStatus = Node.CANCELLED;

    // 当前节点是尾结点,那么后继可以置空了
    // 如果compareAndSetTail执行失败,说明有并发,尾结点不是当前节点了
    if (node == tail && compareAndSetTail(node, pred)) {
        compareAndSetNext(pred, predNext, null);
    } else {
        // If successor needs signal, try to set pred's next-link
        // so it will get one. Otherwise wake it up to propagate.
        int ws;
        // 如果前驱节点等待获取信号量,则从等待队列中移除当前节点
        if (pred != head &&
            ((ws = pred.waitStatus) == Node.SIGNAL ||
                (ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) &&
            pred.thread != null) {
            Node next = node.next;
            //这里为啥不更新next节点的前驱来加快GC呢?
            if (next != null && next.waitStatus <= 0)
                compareAndSetNext(pred, predNext, next);
        } else {
            // 走入该分支 有两种情况
            // 1. 当前节点是队首待执行的节点,所以要唤醒后继节点
            // 2. 前驱节点也CANCEL了 (这种情况下,假如前驱节点是队首待执行节点,虽然在他的线程中也会走到这里,去唤醒后继节点,但是可能此时当前节点还未设为CANCEL,那么选中的后继节点就是当前节点了) 为了防止信号量丢失 也要唤醒后继节点
            unparkSuccessor(node);
        }

        node.next = node; // help GC
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45

唤醒后继节点

private void unparkSuccessor(Node node) {
    int ws = node.waitStatus;
    if (ws < 0)
        compareAndSetWaitStatus(node, ws, 0);
    // 如果后继节点也CANCEL了,那么就从尾部向前遍历 找排在最前的等待中的节点
    // 不能从前向后找, 因为节点已经CANCEL 可能后继节点也被置空了
    Node s = node.next;
    if (s == null || s.waitStatus > 0) {
        s = null;
        for (Node t = tail; t != null && t != node; t = t.prev)
            if (t.waitStatus <= 0)
                s = t;
    }
    if (s != null)
        LockSupport.unpark(s.thread);
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16

# 可中断式抢占

区别点就是被中断唤醒的话直接抛出InterruptedException异常

private void doAcquireInterruptibly(int arg)
    throws InterruptedException {
    final Node node = addWaiter(Node.EXCLUSIVE);
    boolean failed = true;
    try {
        for (;;) {
            final Node p = node.predecessor();
            if (p == head && tryAcquire(arg)) {
                setHead(node);
                p.next = null; // help GC
                failed = false;
                return;
            }
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                throw new InterruptedException();
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22

# 计时等待式抢占

使用LockSupport的计时阻塞方法parkNanos,超时唤醒线程。

并且在循环中加入了计时判断,超时跳出循环,返回调用方抢占结果。

private boolean doAcquireNanos(int arg, long nanosTimeout)
        throws InterruptedException {
    if (nanosTimeout <= 0L)
        return false;
    final long deadline = System.nanoTime() + nanosTimeout;
    final Node node = addWaiter(Node.EXCLUSIVE);
    boolean failed = true;
    try {
        for (;;) {
            final Node p = node.predecessor();
            if (p == head && tryAcquire(arg)) {
                setHead(node);
                p.next = null; // help GC
                failed = false;
                return true;
            }
            nanosTimeout = deadline - System.nanoTime();
            if (nanosTimeout <= 0L)
                //超时跳出循环
                return false;
            if (shouldParkAfterFailedAcquire(p, node) &&
                nanosTimeout > spinForTimeoutThreshold)
                //计时阻塞
                LockSupport.parkNanos(this, nanosTimeout);
            if (Thread.interrupted())
                throw new InterruptedException();
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32

# 锁释放

tryRelease方法由子类实现,主要是扣减锁计数。

锁释放后,需要主动唤醒后继等待节点,传递信号量。

public final boolean release(int arg) {
    if (tryRelease(arg)) {
        Node h = head;
        //不为0, 说明后继节点需要信号量,唤醒后继节点
        if (h != null && h.waitStatus != 0)
            unparkSuccessor(h);
        return true;
    }
    return false;
}
1
2
3
4
5
6
7
8
9
10

这里需要注意一点,对于重入锁,会多次调用release方法,但只有最后一次锁计数为0时才真正需要唤醒后继节点。

ReentrantLock.Sync实现的tryRelease为例:

protected final boolean tryRelease(int releases) {
    int c = getState() - releases;
    if (Thread.currentThread() != getExclusiveOwnerThread())
        throw new IllegalMonitorStateException();
    boolean free = false;
    // 锁计数为0 该方法才会返回true
    if (c == 0) {
        free = true;
        setExclusiveOwnerThread(null);
    }
    setState(c);
    return free;
}
1
2
3
4
5
6
7
8
9
10
11
12
13

# AbstractQueuedLongSynchronizer

功能与 AbstractQueuedSynchronizer一致,只不过锁计数是long类型,可以支持更大数量。

上次更新: 2023/04/09, 16:34:32
最近更新
01
docker-compose笔记
01-12
02
MySQL数据迁移
11-27
03
Docker部署服务,避免PID=1
11-27
更多文章>