Semaphore详解

# 简介

计数信号量,从概念上讲,信号量维护一组许可。每个调用acquire的线程被阻塞直到许可可用,然后获取它。每次调用release都会添加一个许可,可能会释放一个阻塞的获取者。

许可证并不是一个实际对象,Semaphore只是记录可用数量,并相应的采取行动。

# 示例

public class SemaphoreTest {
    public static void main(String[] args) {
        int N = 8;
        Semaphore semaphore = new Semaphore(3);
        Random random = new Random();
        for (int i = 0; i < N; i++) {
            Thread t = new Thread(() -> {
                try {
                    semaphore.acquire();
                    System.out.println(Thread.currentThread().getName() + "开始工作");
                    Thread.sleep(random.nextInt(10) * 1000);
                    semaphore.release();
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                }
            });
            t.setName("线程" + i);
            t.start();
        }
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21

# 源码分析

Semaphore也是基于AQS实现,内部类Sync继承AbstractQueuedSynchronizer,并且又实现了公平版本和非公平版本。

# Sync

abstract static class Sync extends AbstractQueuedSynchronizer {

    Sync(int permits) {
        //将许可数设置为同步器状态
        setState(permits);
    }

    final int getPermits() {
        return getState();
    }
    //非公平获取
    final int nonfairTryAcquireShared(int acquires) {
        for (;;) {
            int available = getState();
            //剩余许可数
            int remaining = available - acquires;
            if (remaining < 0 ||
                //剩余许可数>=0,跟新许可数
                compareAndSetState(available, remaining))
                return remaining;
        }
    }
    //释放锁,这里是增加许可数
    protected final boolean tryReleaseShared(int releases) {
        for (;;) {
            int current = getState();
            int next = current + releases;
            if (next < current) // overflow
                throw new Error("Maximum permit count exceeded");
            if (compareAndSetState(current, next))
                return true;
        }
    }
    //扣减许可数
    final void reducePermits(int reductions) {
        for (;;) {
            int current = getState();
            int next = current - reductions;
            if (next > current) // underflow
                throw new Error("Permit count underflow");
            if (compareAndSetState(current, next))
                return;
        }
    }
    //获取剩余全部许可
    final int drainPermits() {
        for (;;) {
            int current = getState();
            if (current == 0 || compareAndSetState(current, 0))
                return current;
        }
    }
}

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54

# NonfairSync

非公平策略,即每次获取许可都是直接抢占

static final class NonfairSync extends Sync {

    NonfairSync(int permits) {
        super(permits);
    }

    protected int tryAcquireShared(int acquires) {
        return nonfairTryAcquireShared(acquires);
    }
}
1
2
3
4
5
6
7
8
9
10

# FairSync

公平策略的不同之处是在竞争获取许可之前,会先看队列中是否已经有在等待的节点,如果有,则返回-1表示获取失败,进入等待队列。保证了先申请许可的先获取到。

static final class FairSync extends Sync {

    FairSync(int permits) {
        super(permits);
    }

    protected int tryAcquireShared(int acquires) {
        for (;;) {
            //查看等待队列头部是否已经有节点了
            if (hasQueuedPredecessors())
                return -1;
            int available = getState();
            int remaining = available - acquires;
            if (remaining < 0 ||
                compareAndSetState(available, remaining))
                return remaining;
        }
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19

# Semaphore

Semaphore提供的方法都是直接调用AQS方法,不再一一详列。

方法 说明
acquire 获取许可,线程阻塞直到获取一个许可,或者被中断。被中断时抛出异常InterruptedException
acquire(permits) 同上,可以指定获取许可的数量。
acquireUninterruptibly 获取许可,线程阻塞直到获取一个许可。如果线程被中断,依旧阻塞,直到获取到许可,线程会被标记为中断状态。
acquireUninterruptibly(int permits) 同上,可以指定获取许可的数量。
tryAcquire 获取许可,如果可以获取到,返回true,否则返回false
tryAcquire(int permits) 同上,可以指定获取许可的数量。
tryAcquire(long timeout,TimeUnit unit) 在给定时间内获取许可,线程阻塞直到获取一个许可,或者被中断。被中断时抛出异常InterruptedException
tryAcquire(int permits, long timeout, TimeUnit unit) 同上,可以指定获取许可的数量。
release 释放许可,信号量加1。
release(int permits) 释放指定数量许可。
availablePermits 返回可用信号量。
reducePermits 减少信号量。
drainPermits 获取剩余全部许可。
上次更新: 2023/04/09, 16:34:32
最近更新
01
docker-compose笔记
01-12
02
MySQL数据迁移
11-27
03
Docker部署服务,避免PID=1
11-27
更多文章>