一看就懂的Semaphore源码分析
Semaphore(信号量)
其中和可重入锁相同,由公平锁和非公平锁,先从非公平锁谈起,默认使用的是非公平锁,如下例所示:
public class SemaphoreDemo {
public static void main(String[] args) {
// 允许有3个线程(有3个停车位)
Semaphore semaphore = new Semaphore(3); // 信号量
for (int i = 1; i <= 6; i++) {
new Thread(() -> {
try {
// 获得,假设如果线程已经满了,等待被释放为止
semaphore.acquire();
System.out.println(Thread.currentThread().getName() + "抢到了车位!!!");
TimeUnit.SECONDS.sleep(2);
System.out.println(Thread.currentThread().getName() + "离开车位!!!");
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
// 将信号量释放,并唤醒等待的线程
semaphore.release();
}
}, String.valueOf(i)).start();
}
}
}
此处使用了常用的两个方法:acquire() 与 release() 方法,直接切入正题,看看源码中是如何设计的,首先看调用 acquire() 方法的流程:
调用acquire()方法,源码如下所示:
public void acquire() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}
其中sync是Semaphore的内部类所定义的类型,其即成了AQS类,AQS是非常重要的类,是很多类的基础例如:ReentrantLock、CountDownLatch、ReentrantReadWriteLock等,好了不多说了,今天的正题是信号量。
再接着看acquireSharedInterruptibly(int arg) 方法
public final void acquireSharedInterruptibly(int arg)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
// 尝试获取共享锁
if (tryAcquireShared(arg) < 0) // 若返回的值小于0,表明许可证的数量已经被获取完毕
doAcquireSharedInterruptibly(arg);
}
protected int tryAcquireShared(int acquires) {
return nonfairTryAcquireShared(acquires);
}
final int nonfairTryAcquireShared(int acquires) {
for (;;) {
int available = getState(); // 获取state状态变量的值(即:许可证的数量)
int remaining = available - acquires; // 剩余许可证的数量
if (remaining < 0 || // 许可证获取完毕,返回remaining
compareAndSetState(available, remaining)) // 使用CAS设置许可证数量
return remaining;
}
}
当创建一个信号量对象时,其中传入的参数若为3,说明只有3个线程可以获取许可证,多余的线程无法获取许可证,必须等待拥有许可证的线程释放许可证,其他的线程才可以取争抢许可证,执行相应的任务。举个例子吧:若此时,有3个许可证,表示只有3个线程可以工作,若此时创建的线程多于3个(比如:6个),那么第4个、第5个、第6个线程,在前3个获得许可证的线程未释放许可证时,是无法获得许可证的,只能被阻塞,等待获取许可证。
在许可证数量被获取完毕后,接着进入doAcquireSharedInterruptibly(arg) 方法内,如下所示:
private void doAcquireSharedInterruptibly(int arg)
throws InterruptedException {
// 此处是将线程信息封装为节点,加入到同步队列中,也就是AQS中的双端队列
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
for (;;) {
final Node p = node.predecessor();
if (p == head) { // 只有前驱节点是head节点的,才会被唤醒
int r = tryAcquireShared(arg); // 尝试获取许可证
if (r >= 0) {
setHeadAndPropagate(node, r); //
p.next = null; // help GC
failed = false;
return;
}
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt()) // if中实现的功能就是将线程挂起
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}
以上的代码是为获取许可证的线程,其线程信息会被封装为节点,添加到双端队列尾部,具体队列如下图所示:
如图中所示:由于设置许可证的数量为3,那么在设置6个线程时,只有3个线程会获得,其他的三个线程,会被加入到同步队列中,排队等待,只有在调用了release()方法后,有线程被唤醒,才会尝试获取许可证。
接下来看下release()方法,是如何释放许可证,以及如何唤醒被挂起的线程。
public void release() {
sync.releaseShared(1);
}
public final boolean releaseShared(int arg) {
// 尝试获取锁
if (tryReleaseShared(arg)) {
doReleaseShared(); // 释放锁
return true;
}
return false;
}
protected final boolean tryReleaseShared(int releases) {
for (;;) { // 死循环
int current = getState(); // 获取许可证的数量
int next = current + releases; // 加1,即表示释放了一个许可证
if (next < current) // overflow
throw new Error("Maximum permit count exceeded");
if (compareAndSetState(current, next)) // CAS设置许可证的数量
return true;
}
}
在tryReleaseShared(int releases)方法中,CAS保证设置许可证的数量成功。设置成功后,return true,进入以下方法体内:
private void doReleaseShared() {
for (;;) { // 死循环
Node h = head;
if (h != null && h != tail) { // 至少存在两个节点
int ws = h.waitStatus; // 获取状态值
if (ws == Node.SIGNAL) { // h若是SIGNAL状态,那么其后继节点即为要唤醒的节点
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
continue; // loop to recheck cases
unparkSuccessor(h); // 唤醒头节点的后继节点
}
else if (ws == 0 &&
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue; // loop on failed CAS
}
if (h == head) // 头结点未变,直接退出,头结点被改变的唯一条件是有其他的线程修改了头结点
break;
}
}
以上的代码:就是唤醒线程,一旦唤醒线程,在同步队列中排队的队首(不是头结点)线程就会获取许可证,获取成功后,就执行相应的代码。为什么是非公平锁,因为在对列中排队的线程,只有头结点的后继节点有资格可以获取锁,而在获取许可证时,有其他的线程(不是同步对列中的线程)进入,尝试去获取许可证,这两个线程都有可能获取到许可证,这就是非公平锁的特点。公平锁,就是只有队首的节点线程,可以获取许可证,有其他的线程在获取许可证时,会被加入到队尾,等待获取锁。嗯~~~,就是这样^_^。
转载自:https://juejin.cn/post/7042287897931677726