CountDownLatch详解
# 简介
一种同步工具,允许一个或多个线程等待,直到其他线程中执行的一组操作完成。
初始化时给定一个计数。调用await
方法的线程会阻塞,直到其他线程调用countDown
方法将计数减为0,之后所有等待的线程会被唤醒,并且随后调用await
会立即返回。这是一个一次性的工具,计数不会被重置。如果想重置计数,重复调用,可以使用CyclicBarrier
。
CountDownLatch
是一个多功能同步工具,可以用于多种用途。初始化计数为1时可以作为一个简单的开关锁存器或门:调用await
的线程在大门外等待,直到一个线程调用countDown
打开大门。初始化计数为N
时,可以被用于使线程等待,直到N
个动作完成,或一个动作被执行N
次。
CountDownLatch
的一个有用的属性是,它不会阻塞调用countDown
的线程,它只是阻止任何线程继续通过 await
直到所有线程都可以通过。
# 示例
两个CountDownLatch
同步器,第一个是开始信号,用于驱动程序在准备好之前,阻塞所有工作线程的执行,第二个完成信号,允许驱动程序等待所有线程执行完成。
class Driver { // ...
void main() throws InterruptedException {
CountDownLatch startSignal = new CountDownLatch(1);
CountDownLatch doneSignal = new CountDownLatch(N);
for (int i = 0; i < N; ++i) // create and start threads
new Thread(new Worker(startSignal, doneSignal)).start();
doSomethingElse(); // don't let run yet
startSignal.countDown(); // let all threads proceed
doSomethingElse();
doneSignal.await(); // wait for all to finish
}
}
class Worker implements Runnable {
private final CountDownLatch startSignal;
private final CountDownLatch doneSignal;
Worker(CountDownLatch startSignal, CountDownLatch doneSignal) {
this.startSignal = startSignal;
this.doneSignal = doneSignal;
}
public void run() {
try {
startSignal.await();
doWork();
doneSignal.countDown();
} catch (InterruptedException ex) {
} // return;
}
void doWork() { //...}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
# 源码分析
# Sync
CountDownLatch
内部类Sync
继承AbstractQueuedSynchronizer
,并重写tryAcquireShared
和tryReleaseShared
方法,用于控制线程间的同步。从方法名可以看出使用的是AQS
的共享锁,也就是可以多个线程在一个同步器上锁定,当满足条件时,唤醒
private final Sync sync;
private static final class Sync extends AbstractQueuedSynchronizer {
private static final long serialVersionUID = 4982264981922014374L;
//将计数个数设置为同步状态
Sync(int count) {
setState(count);
}
int getCount() {
return getState();
}
//当计数为0时可以获取到锁,否则基于AQS框架,线程等待
protected int tryAcquireShared(int acquires) {
return (getState() == 0) ? 1 : -1;
}
//释放锁,扣减计数
protected boolean tryReleaseShared(int releases) {
// Decrement count; signal when transition to zero
for (;;) {
int c = getState();
if (c == 0)
return false;
int nextc = c-1;
if (compareAndSetState(c, nextc))
return nextc == 0;
}
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
# await
CountDownLatch
提供了阻塞和计时阻塞两个方法。其原理都是直接使用AQS
框架提供的获取锁方法。
public void await() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}
public boolean await(long timeout, TimeUnit unit)
throws InterruptedException {
return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
}
2
3
4
5
6
7
8
由于初始化时设置的计数大于0,所以获取锁时,调用内部类Sync
重写的tryAcquireShared
方法返回false
,相当于获取读锁失败线程等待。
# countDown
达到计数点,计数减1。
public void countDown() {
sync.releaseShared(1);
}
2
3
利用AQS
提供的释放共享锁原理:扣除共享锁计数,当计数扣除为0时,唤醒后续等待的线程。这里则是唤醒之前调用了CountDownLatch
的await
方法的线程,由于这些线程是获取读锁阻塞,所以会全部唤醒。
# 总结
CountDownLatch
基于AQS
读锁实现,在未达到计数次数的CountDown
之前,获取读锁是失败的,达到次数之后,计数减到0,利用AQS
的锁释放,唤醒后续等待的线程,由于都是读等待,会被全部唤醒。