并发编程基础-信号量机制
信号量(Semaphore)是一种控制多线程(进程)访问共享资源的同步机制,是由荷兰的Dijkstra大佬在1962年前后提出来的。
信号量的原理
信号量机制包含以下几个核心概念:
- 信号量S,整型变量,需要初始化值大于0
- P原语,荷兰语Prolaag(probeer te verlagen),表示减少信号量,该操作必须是原子的
- V原语,荷兰语Verhogen,表示增加信号量,该操作必须是原子的

从上图不难看出信号量的两个核心操作,P和V:
- P操作,原子减少S,然后如果
S < 0
,则阻塞当前线程 - V操作,原子增加S,然后如果
S <= 0
,则唤醒一个阻塞的线程
信号量一般被用来控制多线程对共享资源的访问,允许最多S个线程同时访问临界区,多于S个的线程会被P操作阻塞,直到有线程执行完临界区代码后,调用V操作唤醒。所以PV操作必须是成对出现的。
那么信号量可以用来干什么呢?
- 信号量似乎天生就是为限流而生的,我们可以很容易用信号量实现一个限流器。
- 信号量可以用来实现互斥锁,初始化信号量
S = 1
,这样就只能有一个线程能访问临界区。很明显这是一个不可重入的锁。 - 信号量甚至能够实现条件变量,比如阻塞队列
动手实现一个信号量
学习这些经典理论的时候,最好的办法还是用自己熟悉的编程语言实现一遍。Java并发包提供了一个信号量的java.util.concurrent.Semaphore
,是用AbstractQueuedSynchronizer
的共享模式实现的,以后会单独分析关于AQS相关的原理,这里不再展开描述,其核心思想是CAS。
下面是我用Java实现的一个简单的信号量,这里使用synchronized
来替代互斥锁
信号量实现
public class Semaphore {
/**
* 信号量S
*/
private int s;
public Semaphore(int s) {
this.s = s;
}
/**
* P原语,原子操作
* <p>
* S减decr,如果S小于0,阻塞当前线程
*/
public synchronized void p(int decr) {
s -= decr;
if (s < 0) {
try {
wait();
} catch (InterruptedException e) {
// ...
}
}
}
/**
* V原语,原子操作
* <p>
* S加incr,如果S小于等于0,唤醒一个等待中的线程
*/
public synchronized void v(int incr) {
s += incr;
if (s <= 0) {
notify();
}
}
}
用信号量限流
public class Limiter implements Executor {
private Semaphore semaphore;
public Limiter(int limit) {
semaphore = new Semaphore(limit);
}
public void execute(Runnable runnable) {
if (runnable != null) {
new Thread(() -> {
semaphore.p(1);
runnable.run();
semaphore.v(1);
}).start();
}
}
}
用信号量实现互斥锁
public class SemaphoreLock {
private Semaphore semaphore = new Semaphore(1);
public void lock() {
semaphore.p(1);
}
public void unlock() {
semaphore.v(1);
}
}
用信号量实现阻塞队列
实现阻塞队列需要两个信号量和一个锁(锁也可以用信号量代替)
public class SemaphoreBlockingQueue<T> {
private Semaphore notFull;
private Semaphore notEmpty;
private SemaphoreLock lock = new SemaphoreLock();
private Object[] table;
private int size;
public SemaphoreBlockingQueue(int cap) {
if (cap < 1) {
throw new IllegalArgumentException("capacity must be > 0");
}
notEmpty = new Semaphore(0);
notFull = new Semaphore(cap);
table = new Object[cap];
}
public void add(T t) {
// 如果队列是满的就会阻塞
notFull.p(1);
// lock保证队列的原子添加
lock.lock();
table[size++] = t;
lock.unlock();
// 唤醒一个阻塞在notEmpty的线程
notEmpty.v(1);
}
@SuppressWarnings("unchecked")
public T poll() {
T element;
// 如果队列是空就会阻塞
notEmpty.p(1);
// lock保证队列的原子删除
lock.lock();
element = (T) table[--size];
lock.unlock();
// 唤醒一个阻塞在notFull的线程
notFull.v(1);
return element;
}
}
转载自:https://juejin.cn/post/6844903822465187847