ReentrantReadWriteLock源码分析

# 总览

public class ReentrantReadWriteLock implements ReadWriteLock{

    private final ReentrantReadWriteLock.ReadLock readerLock;

    private final ReentrantReadWriteLock.WriteLock writerLock;

    final Sync sync;

    public ReentrantReadWriteLock() {
        this(false);
    }

    public ReentrantReadWriteLock(boolean fair) {
        sync = fair ? new FairSync() : new NonfairSync();
        readerLock = new ReadLock(this);
        writerLock = new WriteLock(this);
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18

由读锁和写锁组成,默认是非公平锁。

内部实现了锁同步器AbstractQueuedSynchronizer,有NonfairSyncFairSync(公平锁和非公平锁)两种形式。

内部类ReadLockWriteLock实现了Lock接口,提供加锁去锁方法。

# 锁同步器 AQS

abstract static class Sync extends AbstractQueuedSynchronizer {
    //读锁计数偏移量
    static final int SHARED_SHIFT   = 16;
    //便宜后的读锁计数单位
    static final int SHARED_UNIT    = (1 << SHARED_SHIFT);
    // 最大数量 读锁写锁各一半
    static final int MAX_COUNT      = (1 << SHARED_SHIFT) - 1;
    // 低位全为1 便于获取写锁计数
    static final int EXCLUSIVE_MASK = (1 << SHARED_SHIFT) - 1;

    static int sharedCount(int c)    { return c >>> SHARED_SHIFT; }

    static int exclusiveCount(int c) { return c & EXCLUSIVE_MASK; }

    static final class HoldCounter {
        int count = 0;
        // 使用线程tid 避免引用导致无法回收线程
        final long tid = getThreadId(Thread.currentThread());
    }

    static final class ThreadLocalHoldCounter extends ThreadLocal<HoldCounter> {
        // 重写ThreadLocal.initialValue方法,当调用get方法,map中没有对应值时的初始化
        public HoldCounter initialValue() {
            return new HoldCounter();
        }
    }
    // 当前线程持有可重入读锁的数量
    private transient ThreadLocalHoldCounter readHolds;

    //缓存上次加读锁线程锁的计数器 一定概率避免从ThreadLocalMap中查找
    private transient HoldCounter cachedHoldCounter;

    // 第一个成功获取读锁的线程 具体用途后面再看
    private transient Thread firstReader = null;
    private transient int firstReaderHoldCount;

    Sync() {
        readHolds = new ThreadLocalHoldCounter();
        //利用state是volatile的特性 避免重排序
        //从而确保readHolds初始化后再被其他线程使用
        setState(getState()); // ensures visibility of readHolds
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43

将读锁和写锁的计数分别设计在int型数据的高位和低位,这样就可以很方便的通过CAS方式修改计数避免加锁。

# 读锁

public static class ReadLock implements Lock {

    private final Sync sync;

    protected ReadLock(ReentrantReadWriteLock lock) {
        sync = lock.sync;
    }

    public void lock() {
        sync.acquireShared(1);
    }

    public void lockInterruptibly() throws InterruptedException {
        sync.acquireSharedInterruptibly(1);
    }

    public boolean tryLock() {
        return sync.tryReadLock();
    }

    public boolean tryLock(long timeout, TimeUnit unit)
            throws InterruptedException {
        return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
    }

    public void unlock() {
        sync.releaseShared(1);
    }

    public Condition newCondition() {
        throw new UnsupportedOperationException();
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33

未实现newCondition方法,不支持条件等待操作。

# 申请锁

读锁的lock方法调用AbstractQueuedSynchronizer框架提供的模板方法acquireShared

public final void acquireShared(int arg) {
    if (tryAcquireShared(arg) < 0)
        doAcquireShared(arg);
}
protected int tryAcquireShared(int arg) {
    throw new UnsupportedOperationException();
}
1
2
3
4
5
6
7

ReentrantReadWriteLock.Sync中实现了tryAcquireShared方法

protected final int tryAcquireShared(int unused) {
    Thread current = Thread.currentThread();
    int c = getState();
    // 已被其他线程加了写锁 返回-1
    if (exclusiveCount(c) != 0 && getExclusiveOwnerThread() != current)
        return -1;
    // 获取已加读锁计数
    int r = sharedCount(c);
    // 这里的c是既包含读锁又包含写锁记数的,cas修改成功说明没有出现其他线程并发申请读锁或写锁
    if (!readerShouldBlock() && r < MAX_COUNT && compareAndSetState(c, c + SHARED_UNIT)) {
        if (r == 0) {
            // 读锁计数为0,说明是第一个获得读锁的线程
            firstReader = current;
            firstReaderHoldCount = 1;
        } else if (firstReader == current) {
            // 重入读锁
            firstReaderHoldCount++;
        } else {
            //其他线程获取读锁
            HoldCounter rh = cachedHoldCounter;
            // 对比缓存的读锁计数是不是当前线程的
            if (rh == null || rh.tid != getThreadId(current))
                cachedHoldCounter = rh = readHolds.get();
            else if (rh.count == 0)
                // 缓存的是当前线程的锁计数器,说明之前获取过读锁,
                // 但是计数为0,说明之前获取的读锁已释放,那么ThreadLocal中应该已经删除了,这里重新添加进去
                // 可以认为是延迟删除 最大可能减少创建锁计数器的同时兼顾垃圾收集
                readHolds.set(rh);
            rh.count++;
        }
        return 1;
    }
    return fullTryAcquireShared(current);
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34

上面的获取读锁方法没有处理当前线程已获取写锁后的重入获取读锁,以及其他线程并发获取读锁导致CAS失败的情况。

完整版本的获取读锁方法是fullTryAcquireShared(为啥一开始不直接用这个方法呢?),提供循环CAS

final int fullTryAcquireShared(Thread current) {
    HoldCounter rh = null;
    for (;;) {
        int c = getState();
        if (exclusiveCount(c) != 0) {
            // 其他线程已获取写锁,当前线程获取读锁失败
            // 如果获取写锁的是当前线程,则可以重入获取读锁
            if (getExclusiveOwnerThread() != current)
                return -1;
        } else if (readerShouldBlock()) {
            // 并发情况下 仍有可能出现其他线程等待申请写锁 
            // 此时如果当前线程是第一个获取读锁的 则可重入
            if (firstReader == current) {
                // assert firstReaderHoldCount > 0;
            } else {
                if (rh == null) {
                    rh = cachedHoldCounter;
                    if (rh == null || rh.tid != getThreadId(current)) {
                        rh = readHolds.get();
                        if (rh.count == 0)
                            readHolds.remove();
                    }
                }
                if (rh.count == 0)
                    return -1;
            }
        }
        if (sharedCount(c) == MAX_COUNT)
            throw new Error("Maximum lock count exceeded");
        if (compareAndSetState(c, c + SHARED_UNIT)) {
            if (sharedCount(c) == 0) {
                firstReader = current;
                firstReaderHoldCount = 1;
            } else if (firstReader == current) {
                firstReaderHoldCount++;
            } else {
                if (rh == null)
                    rh = cachedHoldCounter;
                if (rh == null || rh.tid != getThreadId(current))
                    rh = readHolds.get();
                else if (rh.count == 0)
                    readHolds.set(rh);
                rh.count++;
                cachedHoldCounter = rh; // cache for release
            }
            return 1;
        }
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49

由源码可以发现,当前线程占有写锁时可以重入加读锁。有其他线程等待获取写锁时,如果当前线程是第一个加读锁的,那么可重入加读锁。

readerShouldBlock方法用于判断申请读锁的线程是否应该被阻塞,该方法在公平锁与非公平锁中的实现略有差异。

在公平锁中,调用的是AbstractQueuedSynchronizer.hasQueuedPredecessors方法

public final boolean hasQueuedPredecessors() {
    Node t = tail; 
    Node h = head;
    Node s;
    // h != t说明队列中有节点在等待 以这种情况为前提
    // 如果h.next为null,说明之前队列是空的,现在有一个节点在入队,刚刚将head初始化,节点自身还未加入,当前线程需要阻塞。
    // 如果h.next不为null,说明已有节点加入等待队列,如果该节点是当前线程,那么可重入,如果不是当前线程,那么就要阻塞了。
    return h != t && ((s = h.next) == null || s.thread != Thread.currentThread());
}
1
2
3
4
5
6
7
8
9

在非公平锁中,调用的是AbstractQueuedSynchronizer.apparentlyFirstQueuedIsExclusive方法

final boolean apparentlyFirstQueuedIsExclusive() {
    Node h, s;
    // 头尾节点都不为null 明确已有写线程节点在同步队列中
    return (h = head) != null && (s = h.next)  != null && !s.isShared() && s.thread != null;
}
1
2
3
4
5

对比可以看出,公平锁的读锁阻塞条件要宽一些,只要能判断出已有其他线程申请写锁,即使还未加入同步队列,那么当前线程申请读锁就得阻塞了。

而非公平锁则是在其他线程申请写锁的线程节点入队前,当前线程还可以获取读锁。

# 唤醒后续读节点

如果tryAcquireShared返回-1(未获取到读锁),那么回到AbstractQueuedSynchronizer的模板方法acquireShared,执行doAcquireShared方法

private void doAcquireShared(int arg) {
    final Node node = addWaiter(Node.SHARED);
    boolean failed = true;
    try {
        boolean interrupted = false;
        for (;;) {
            final Node p = node.predecessor();
            if (p == head) {
                int r = tryAcquireShared(arg);
                if (r >= 0) {
                    setHeadAndPropagate(node, r);
                    p.next = null; // help GC
                    if (interrupted)
                        selfInterrupt();
                    failed = false;
                    return;
                }
            }
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                interrupted = true;
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27

该方法与AbstractQueuedSynchronizer.acquireQueued代码逻辑类似,获取锁失败会阻塞。不同的是当发现自己是首节点并获取到锁时,还会执行setHeadAndPropagate唤醒后续节点。

private void setHeadAndPropagate(Node node, int propagate) {
    Node h = head; 
    setHead(node);
    // propagate是tryAcquireShared的返回结果 在读写锁中 申请到锁返回1,未申请到返回-1,所以进入这里时值是1
    if (propagate > 0 || h == null || h.waitStatus < 0 ||
        (h = head) == null || h.waitStatus < 0) {
        Node s = node.next;
        //下个等待线程也是读节点,需要唤醒
        if (s == null || s.isShared())
            doReleaseShared();
    }
}
private void doReleaseShared() {
    for (;;) {
        Node h = head;
        if (h != null && h != tail) {
            int ws = h.waitStatus;
            if (ws == Node.SIGNAL) {
                if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
                    continue;            // loop to recheck cases
                unparkSuccessor(h);
            }
            else if (ws == 0 && !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                continue;                // loop on failed CAS
        }
        // 被唤醒的读节点线程可能已经改变了同步队列
        if (h == head)                   // loop if head changed
            break;
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30

读锁在释放时会引入一个新的状态Node.PROPAGATE,这个不是为读写锁而引入的,因为当读线程占有锁时,其他读线程可以直接获取锁,而不用进入同步队列,这时只有写线程申请锁会进入同步队列。而写线程节点必须等到读锁释放后才需要被唤醒。

引入Node.PROPAGATE主要是为了支持SemaphoreCountDownLatch功能,这个在后面阅读到源码时再分析。

# 释放锁

读锁的unlock方法调用AbstractQueuedSynchronizer提供的模板方法releaseShared

public final boolean releaseShared(int arg) {
    if (tryReleaseShared(arg)) {
        doReleaseShared();
        return true;
    }
    return false;
}
1
2
3
4
5
6
7
protected final boolean tryReleaseShared(int unused) {
    Thread current = Thread.currentThread();
    if (firstReader == current) {
        // assert firstReaderHoldCount > 0;
        if (firstReaderHoldCount == 1)
            firstReader = null;
        else
            firstReaderHoldCount--;
    } else {
        HoldCounter rh = cachedHoldCounter;
        if (rh == null || rh.tid != getThreadId(current))
            rh = readHolds.get();
        int count = rh.count;
        if (count <= 1) {
            readHolds.remove();
            if (count <= 0)
                throw unmatchedUnlockException();
        }
        --rh.count;
    }
    for (;;) {
        int c = getState();
        int nextc = c - SHARED_UNIT;
        if (compareAndSetState(c, nextc))
            // Releasing the read lock has no effect on readers,
            // but it may allow waiting writers to proceed if
            // both read and write locks are now free.
            return nextc == 0;
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30

# 写锁

写锁的实现与ReentrantLock大致相同,都是使用了AbstractQueuedSynchronizer类的锁申请与释放方法。ReentrantLock源码分析 (opens new window)

上次更新: 2023/04/09, 16:34:32
最近更新
01
docker-compose笔记
01-12
02
MySQL数据迁移
11-27
03
Docker部署服务,避免PID=1
11-27
更多文章>