从 Semaphore 探究 AQS 细节-共享锁篇
开篇语
如果说独占锁是恋爱脑,你不放手,我誓死追随;那么共享锁就是海王,在有限的资源内,来者不拒。共享锁是可以被多个线程同时拥有的锁,它不像独占锁一样,一次只能被一个线程拥有。
文章结构
- 介绍 Semaphore 的简单使用,并快速学习 Semaphore 是如何使用 AQS 实现同步器
- 如何加锁
- 如何解锁
- 细节深入讨论
Semaphore 简介
Semaphore 又名信号量,它的主要作用是提供多个资源供多个线程使用,当资源被获取完之后,就需要等待其他线程释放资源。
它主要用于严格控制指定数量线程访问同一组资源。比如:数据库连接池。
简单实用
public class SemaphoreTest {
public static void main(String[] args) {
// 提供10个资源,表示只要10个线程可以同时访问资源
Semaphore semaphore = new Semaphore(10);
new Thread(() -> {
semaphore.acquire();
System.out.println("获取一个资源");
semaphore.release();
}).start();
}
}
Semaphore 的使用与 ReentrantLock 非常类似,因此将不再过多讲解。
Semaphore 如何使用 AQS
Semaphore 并不直接继承 AbstractQueuedSynchronizer 类,而是定义了一个子类 Sync 去继承 AbstractQueuedSynchronizer,让子类去实现同步器。
Semaphore 实现了 Lock 接口,该接口定义了一个锁需要实现的方法。Semaphore 实现 Lock 接口中的方法都是通过 Sync 类中的方法来完成。
加锁
// 共享模式
private void doAcquireShared(int arg) {
final Node node = addWaiter(Node.SHARED);
boolean interrupted = false;
try {
for (;;) {
final Node p = node.predecessor();
if (p == head) {
int r = tryAcquireShared(arg);
if (r >= 0) {
setHeadAndPropagate(node, r);
p.next = null; // help GC
return;
}
}
if (shouldParkAfterFailedAcquire(p, node))
interrupted |= parkAndCheckInterrupt();
}
} catch (Throwable t) {
cancelAcquire(node);
throw t;
} finally {
if (interrupted)
selfInterrupt();
}
}
// 独占模式
final boolean acquireQueued(final Node node, int arg) {
boolean interrupted = false;
try {
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
return interrupted;
}
if (shouldParkAfterFailedAcquire(p, node))
interrupted |= parkAndCheckInterrupt();
}
} catch (Throwable t) {
cancelAcquire(node);
if (interrupted)
selfInterrupt();
throw t;
}
}
上述同时贴出了独占模式和共享模式获取锁的代码。上面是共享模式,下面是独占模式。通过对这两段代码的对比,可以发现其中只有一小部分不相同。我们主要就来看这不同的部分。
在共享模式下,执行 tryAcquireShared(arg);
后,会返回当前剩余可用的资源数量。如果数量大于等于 0,才会执行 setHeadAndPropagate
,这里是跟独占模式中的 setHead
方法存在不同之处的地方。我们来解析一下该方法。
private void setHeadAndPropagate(Node node, int propagate) {
Node h = head; // Record old head for check below
setHead(node);
if (propagate > 0 || h == null || h.waitStatus < 0 ||
(h = head) == null || h.waitStatus < 0) {
Node s = node.next;
if (s == null || s.isShared())
doReleaseShared();
}
}
- 将获取锁的节点设置为 head
- 如果可用资源(propagate)大于 0,或者 h 为 null,或者 h.waitStatus 小于 0 的时候,将会执行
doReleaseShared();
,该方法主要是唤醒后续线程。
这里有几个需要特别注意的地方:
- 为什么加锁成功,也需要执行
doReleaseShared()
唤醒后继线程? - h == null,在上面可以看到 h 已经被赋值为 head 了,为什么这里会出现 null 的情况?
- h.waitStatus < 0,明明在之前唤醒线程时,已经将 h 的 waitStatus 设置为 0,为什么这里会出现小于 0 的情况?
- 为什么需要检查两次 h 的状态?
以上几个问题的答案可以帮助我们理解 AQS 在共享模式下的运作方式。
为什么加锁成功,也需要执行 doReleaseShared()
唤醒后继线程?
在共享锁的情况下,是存在多个资源可供给线程使用的,因此对资源的使用应该持有乐观态度。如果当前线程成功获取了锁,那么说明,接下来可能还有资源可以继续使用,因此可以唤醒后继线程,让他们去尝试获取锁。
唤醒后继线程的地方有两个:
- 释放锁时
- 获取锁成功时
所以,在共享模式下会出现大量唤醒线程去尝试获取锁的操作,这样做的目的是为了让线程可以尽快的获取锁,避免出现存在空闲资源的情况下,队列中也有线程在等待被唤醒的情况。
在 h 已经被赋值为 head 的情况下,为什么 h 会为 null?
我们都知道,在共享模式下,是同时有多个线程可以成功获取资源的,每一次成功获取到资源,都会执行 setHeadAndPropagate
方法,将节点设置为 head。那么就会出现 head 刚设置完成,就被其他线程重新设置为新的 head。
举个例子:
- thread1 获取到资源,并且执行到
setHeadAndPropagate
的setHead
后,但是由于 thread1 的时间片已经使用完,thread1 丧失 CPU 使用权。此时 head = node1;thread1 中的 h = init node。此时,队列如下所示:
- 此时,有一个线程释放了线程,并且唤醒了 thread2。thread2 执行完 setHead 后,此时 thread2 的 h = node1;head = node2,此时 node1 已经没有被任何对象引用,垃圾回收器可以将它回收。队列此时如图所示:
- 如果在 thread2 执行 if 判断条件前,垃圾回收器将 node1 回收,那么 h 就会出现 null 的情况。
为什么需要检查两次 h 的状态?
我们首先需要知道一个前提,因为共享模式下是有多个线程可以获取到资源的,因此可能会出现下列情况:一个线程刚获取完线程,刚设置完 head后,马上又有其他线程获取了资源,并且修改了 head。因此 head 是经常在变化的,这里需要跟独占锁区分开来。
我们先来看一下,他在什么情况下才不会进入 if 语句里面执行。
- 首先 propagate 小于等于0,表示没有更多的资源可以提供给其他线程使用,因此不需要唤醒后继线程。
- h != null,因为在上面执行了 h = head,该语句表示 h 引用的对象可能没有变化。如果 h == null,则表示 head 已经被回收了,当前的 head 已经跟之前不同。
- h.waitStatus >= 0,h = 0 的情况表示,h 不需要通知后继线程或者是已经唤醒了后继线程;h 大于 0 的情况表示,h 已经被取消了。
通过当前 3 个条件,我们可以推出,没有更多的资源可以供其他线程使用;h = head,当前 head 可能没有被修改过;h 已经唤醒了线程。
- (h = head) == null,重新检查 head 此时引用的对象,并且判断 head 是否被修改过。
- h.waitStatus >= 0,h = 0 的情况表示,h 不需要通知后继线程或者是已经唤醒了后继线程;h 大于 0 的情况表示,h 已经被取消了。
做了这么多的判断,我们会发现都是为了确定 head 是否被改变。如果 head 没有被修改,那么 h != null,并且 h.waitStatus == 0。此时就不需要唤醒后继线程了。如果 head 被替换了,那么说明有资源被释放了,那么就需要尽快唤醒队列中的线程去换取资源。
h.waitStatus 为什么小于 0
解释这个问题,需要结合解锁的方法一起看。所以留在解锁时再解析。
解锁
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
return false;
}
- 首先尝试通过 CAS 设置 state,如果成功,则表示解锁成功。此时就需要执行
doReleaseShared
唤醒队列中的线程。 - 否则,返回 false。
我们着重需要来看一下 doReleaseShared
方法。
private void doReleaseShared() {
for (;;) {
Node h = head;
if (h != null && h != tail) {
int ws = h.waitStatus;
if (ws == Node.SIGNAL) {
if (!h.compareAndSetWaitStatus(Node.SIGNAL, 0))
continue; // loop to recheck cases
unparkSuccessor(h);
}
else if (ws == 0 &&
!h.compareAndSetWaitStatus(0, Node.PROPAGATE))
continue; // loop on failed CAS
}
if (h == head) // loop if head changed
break;
}
-
进入一个死循环,跳出循环的条件就是:在第三行执行 h = head,那么在执行 4-14 行之时,head 都没有发生变化,那么就可以跳出循环。
-
判断 h != null && h != tail,表示队列进行过初始化,并且队列中存在至少两个节点才会继续执行。
-
判断 h.waitStatus = signal,表示有后继节点需要通知,因此,首先修改 waitStatus 的值。这里会出现 waitStatus 修改失败的情况,因为会存在其他线程也在释放资源,并且先于该线程修改 waitStatus 的值为 0。如果遇到这种情况,那么则重新进入下一轮 for 循环。此时会遇到两种情况:
- 有其他线程获取了资源,并且修改了head ,此时 head 已经不是之前的 head 了。
- head 还是之前的 head,但是此时 waitStatus 的值为 0。因此会进入到 else if 中,将 waitStatus 设置为 PROPAGATE。这个状态的作用我们稍后会详细讲解。
-
如果 waitStatus 修改成功,那么直接执行
unparkSuccessor
唤醒后继线程。
PROPAGATE 状态的作用
PROPAGATE 状态的作用:主要是为了解决 BUG bugs.java.com/view_bug.do…
在引入 PROPAGATE 状态之前的代码是这样的:
private void setHeadAndPropagate(Node node, int propagate) {
setHead(node);
if (propagate > 0 && node.waitStatus != 0) {
Node s = node.next;
if (s == null || s.isShared())
unparkSuccessor(node);
}
}
public final boolean releaseShared(long arg){
if (tryReleaseShared(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0){
unparkSuccessor(h);
reture true
}
}
return false
}
我们来看一个例子:没有 PROPAGATE。
有两个线程,thread1、thread2 分别持有资源1、资源2,三个线程,thread3、thread4、thread5 正在队列中。
thread1 释放资源,并唤醒了 thread3。thread1 设置了 head.waitStatus = 0。thread3 执行了 tryAcquire 得到了锁,但是此时线程失去执行权。还未执行 setHeadAndPopagrate。此时,head 依然指向 init node,并且 waitStatus = 0。
thread2 释放资源,发现 head.waitStatus = 0,判断无需唤醒后续线程,执行结束。
thread3 继续执行,执行 setHeadAndPopagate,判断 popagate = 0,也不会去唤醒线程。
此时,我们会发现存在一个可用资源,但是却没有唤醒队列中的线程去获取节点。
现在我们来看一个有 PROPAGATE 状态,它是怎么样运行的。
有两个线程,thread1、thread2 分别持有资源1、资源2,三个线程,thread3、thread4、thread5 正在队列中。
thread1 释放资源,并唤醒了 thread3。thread1 设置了 head.waitStatus = 0。thread3 执行了 tryAcquire 得到了锁,但是此时线程失去执行权。还未执行 setHeadAndPopagrate。此时,head 依然指向 init node,并且 waitStatus = 0。
thread2 释放资源,发现 head.waitStatus = 0,则设置 head.waitStatus = PROPAGATE,然后执行结束。
thread3 继续执行,执行 setHeadAndPopagate,判断 popagate = 0,但是 h.ws = propagate < 0,因此会继续唤醒后继线程。
thread3 会调用 doReleaseShared。会发现当前 head.waitStatus = signal,因此唤醒 thread4 去获取资源。
h.waitStatus 为什么小于 0
此时我们再回来看这个问题,我们会发现在执行 doReleaseShared 的时候,可能会将 h.waitStatus 设置为 propagate,而 propagate = -3。这就是为什么 h.waitStatus 为什么小于 0 的原因。
总结
- 在共享模式下,head 是会时刻被替换的,因此需要考虑它可能会被垃圾回收。
- 为了加快唤醒线程的速度,在会获取资源成功和释放资源时都尝试唤醒线程。
- PROPAGATE 状态的作用是为了修复在上述情况下,可以解决线程 hung 住的问题。
转载自:https://juejin.cn/post/7203348539258519613