Semaphore详解
# 简介
计数信号量,从概念上讲,信号量维护一组许可。每个调用acquire
的线程被阻塞直到许可可用,然后获取它。每次调用release
都会添加一个许可,可能会释放一个阻塞的获取者。
许可证并不是一个实际对象,Semaphore
只是记录可用数量,并相应的采取行动。
# 示例
public class SemaphoreTest {
public static void main(String[] args) {
int N = 8;
Semaphore semaphore = new Semaphore(3);
Random random = new Random();
for (int i = 0; i < N; i++) {
Thread t = new Thread(() -> {
try {
semaphore.acquire();
System.out.println(Thread.currentThread().getName() + "开始工作");
Thread.sleep(random.nextInt(10) * 1000);
semaphore.release();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
});
t.setName("线程" + i);
t.start();
}
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
# 源码分析
Semaphore
也是基于AQS
实现,内部类Sync
继承AbstractQueuedSynchronizer
,并且又实现了公平版本和非公平版本。
# Sync
abstract static class Sync extends AbstractQueuedSynchronizer {
Sync(int permits) {
//将许可数设置为同步器状态
setState(permits);
}
final int getPermits() {
return getState();
}
//非公平获取
final int nonfairTryAcquireShared(int acquires) {
for (;;) {
int available = getState();
//剩余许可数
int remaining = available - acquires;
if (remaining < 0 ||
//剩余许可数>=0,跟新许可数
compareAndSetState(available, remaining))
return remaining;
}
}
//释放锁,这里是增加许可数
protected final boolean tryReleaseShared(int releases) {
for (;;) {
int current = getState();
int next = current + releases;
if (next < current) // overflow
throw new Error("Maximum permit count exceeded");
if (compareAndSetState(current, next))
return true;
}
}
//扣减许可数
final void reducePermits(int reductions) {
for (;;) {
int current = getState();
int next = current - reductions;
if (next > current) // underflow
throw new Error("Permit count underflow");
if (compareAndSetState(current, next))
return;
}
}
//获取剩余全部许可
final int drainPermits() {
for (;;) {
int current = getState();
if (current == 0 || compareAndSetState(current, 0))
return current;
}
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
# NonfairSync
非公平策略,即每次获取许可都是直接抢占
static final class NonfairSync extends Sync {
NonfairSync(int permits) {
super(permits);
}
protected int tryAcquireShared(int acquires) {
return nonfairTryAcquireShared(acquires);
}
}
1
2
3
4
5
6
7
8
9
10
2
3
4
5
6
7
8
9
10
# FairSync
公平策略的不同之处是在竞争获取许可之前,会先看队列中是否已经有在等待的节点,如果有,则返回-1
表示获取失败,进入等待队列。保证了先申请许可的先获取到。
static final class FairSync extends Sync {
FairSync(int permits) {
super(permits);
}
protected int tryAcquireShared(int acquires) {
for (;;) {
//查看等待队列头部是否已经有节点了
if (hasQueuedPredecessors())
return -1;
int available = getState();
int remaining = available - acquires;
if (remaining < 0 ||
compareAndSetState(available, remaining))
return remaining;
}
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
# Semaphore
Semaphore
提供的方法都是直接调用AQS
方法,不再一一详列。
方法 | 说明 |
---|---|
acquire | 获取许可,线程阻塞直到获取一个许可,或者被中断。被中断时抛出异常InterruptedException |
acquire(permits) | 同上,可以指定获取许可的数量。 |
acquireUninterruptibly | 获取许可,线程阻塞直到获取一个许可。如果线程被中断,依旧阻塞,直到获取到许可,线程会被标记为中断状态。 |
acquireUninterruptibly(int permits) | 同上,可以指定获取许可的数量。 |
tryAcquire | 获取许可,如果可以获取到,返回true ,否则返回false 。 |
tryAcquire(int permits) | 同上,可以指定获取许可的数量。 |
tryAcquire(long timeout,TimeUnit unit) | 在给定时间内获取许可,线程阻塞直到获取一个许可,或者被中断。被中断时抛出异常InterruptedException |
tryAcquire(int permits, long timeout, TimeUnit unit) | 同上,可以指定获取许可的数量。 |
release | 释放许可,信号量加1。 |
release(int permits) | 释放指定数量许可。 |
availablePermits | 返回可用信号量。 |
reducePermits | 减少信号量。 |
drainPermits | 获取剩余全部许可。 |
编辑 (opens new window)
上次更新: 2023/04/09, 16:34:32