从 ReentrantLock 探究 AQS 细节 - 独占锁篇
开篇语
如何保障程序有序的运行,锁是永远都绕不开的一条拦路虎。即可以让程序有条不紊的运行,也可以让程序步履蹒跚。而今天,我们就来剥开 Java 中最常用的锁 ReentrantLock,研究它,参悟它。
文章结构
- 介绍 ReentrantLock 的简单使用,并快速学习 ReentrantLock 如何使用 AQS 实现同步器
- 详细剖析 CLH
- AQS 如何改造 CLH
- 介绍 CAS
- AQS 内部数据结构
- AQS 源码解析
- AQS 细节深入讨论
ReentrantLock 使用与解析
简单使用
import java.util.concurrent.locks.ReentrantLock;
public class ReentrantLockTest {
public static void main(String[] args) {
ReentrantLock reentrantLock = new ReentrantLock();
// 加锁
reentrantLock.lock();
// 同步代码块
System.out.println("获取锁成功");
// 释放锁
reentrantLock.unlock();
}
}
ReentrantLock 的使用非常简单,只需要创建一个 ReentrantLock 对象,获取锁调用 lock()
方法,释放锁调用 unlock()
方法。
与 Synchronized 在使用上的不同点:
- Synchronized 自动释放锁,ReentrantLock 手动释放锁
- Synchronized 可以使用于方法和代码块,ReentrantLock 只能使用于代码块
解析
ReentrantLock 并不直接继承 AbstractQueuedSynchronizer 类,而是定义了一个子类 Sync 去继承 AbstractQueuedSynchronizer,让子类去实现同步器。
ReentrantLock 实现了 Lock 接口,该接口定义了一个锁需要实现的方法。ReentrantLock 实现 Lock 接口中的方法都是通过 Sync 类中的方法来完成。
ReentrantLock 内部使用了一个抽象内部类 Sync 继承了 AbstractQueuedSynchronizer 类。NonfairSync 类继承 Sync,用于提供非公平锁的功能;FairSync 类继承 Sync,用于提供公平锁的功能。
public class ReentrantLock implements Lock, Serializable {
// 同步器
private final Sync sync;
// 内部抽象类
abstract static class Sync extends AbstractQueuedSynchronizer {
private static final long serialVersionUID = -5179523762034025860L;
@ReservedStackAccess
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
@ReservedStackAccess
protected final boolean tryRelease(int releases) {
int c = getState() - releases;
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
if (c == 0) {
free = true;
setExclusiveOwnerThread(null);
}
setState(c);
return free;
}
}
/**
* 实现非公平锁的类
*/
static final class NonfairSync extends Sync {
private static final long serialVersionUID = 7316153563782823691L;
protected final boolean tryAcquire(int acquires) {
return nonfairTryAcquire(acquires);
}
}
/**
* 实现公平锁的类
*/
static final class FairSync extends Sync {
private static final long serialVersionUID = -3000897897090466540L;
/**
* Fair version of tryAcquire. Don't grant access unless
* recursive call or no waiters or is first.
*/
@ReservedStackAccess
protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (!hasQueuedPredecessors() &&
compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
}
ReentrantLock 实现了 lock()
方法,暴露给外部获取锁;实现了 unlock()
方法,暴露给外部释放锁。
public void lock() {
sync.acquire(1);
}
public void unlock() {
sync.release(1);
}
我们可以看到这两个方法实际上都是调用了 Sync 类的方法,因此 ReentrantLock 本身并没有实现同步器的功能,而是交给内部类 Sync 类实现。ReentrantLock 通过调用 Sync 提供的同步方法实现了同步器的功能。
CAS
CAS 是 compareAndSwap 的缩写,中文译为:比较和交换。是现代 CPU 广泛支持的一种对内存中的共享数据进行操作的一种特殊指令,这个指令会对内存中的共享数据做原子读写操作。其作用是让CPU 比较内存中某个值是否和预期的值相同,如果相同则将这个值更新为新值,不相同则不做更新,以此达到同步的效果。
CAS 在 Java 中的使用非常多, java.util.concurrent.atomic 这个包下的类都是使用 CAS 来实现的。
示例
内存地址 | 0x01 | 0x02 | 0x03 |
---|---|---|---|
值 | 1 | 2 | 3 |
我们假设在内存 0x01 处存放 1,0x02 处存放 2,0x03 处存放 3。现在我们有一个线程 A,它要使用 CAS 操作修改 0x01 处的值为 4。此时,它只需要这样操作即可。
给 CAS 函数提供三个参数:内存地址,内存地址中存放的原始值,内存地址需要存放的新值。函数原型 cas(*address, oldValue, newValue)
。
更新成功
线程 A 执行 cas(0x01, 1, 4)
,系统会先取出内存地址为 0x01 的值,取出值为1,与传入的旧值 1 相同,此时就会将 4 写入到内存地址 0x01 处,内存变为:
内存地址 | 0x01 | 0x02 | 0x03 |
---|---|---|---|
值 | 4 | 2 | 3 |
更新失败
如果在线程 A 执行cas(0x01, 1, 4)
时,存在另外一个线程 B,它先于线程 A 修改了 0x01 处的值,它将值改为 5。此时内存为
内存地址 | 0x01 | 0x02 | 0x03 |
---|---|---|---|
值 | 5 | 2 | 3 |
然后线程 A 继续执行,它取出 0x01 处的值为 5,传入的旧值为 1,1 != 5,此次 cas 操作失败,不会将新值写入到 0x01 中。
ABA 问题
我们现在来看另外一种特殊情况。
线程 A 执行cas(0x01, 1, 4)
时,线程 A 被挂起。另外一个线程 B,它修改 0x01 处的值,它将值改为 5;此时又有一个线程 C,它执行 cas(0x01, 5, 1)
,将 0x01 处的值又修改为 1。
内存值变化过程如图所示:
此时,线程 A 继续执行,发现内存值与传入的旧值相同,因此将值替换为 4。
这就是著名的 ABA 问题。当一个线程修改了值后,又有另一个线程将值修改回来,这样对于其他线程来说,这个值就相当于没有修改过一样。如果要解决这个问题,我们可以给值加一个版本号 version。这个 version 代表修改的次数,每一次修改都会将版本号加1,用于告诉其他线程,这个值已经修改过了。此时,cas 要成功更新,则需要值和版本号都相等的情况下,才能更新。
小结
CAS 由 CPU 提供的指令实现,使用简单,无需频繁切换上下文,提高效率。
但是有以下几个缺点:
- 只能保证一个共享变量的读写。
- ABA问题。
- 如果自旋 CAS 长时间地不成功,则会给 CPU 带来非常大的开销。
CLH
自旋锁
在介绍 CLH 之前,我们先来了解一个经常出现在大家视野中的锁——自旋锁。顾名思义,它在获取不到锁的时候,会循环的尝试获取锁,直到获取到锁为止。
import java.util.concurrent.atomic.AtomicReference;
public class SpinLock {
private AtomicReference<Thread> reference = new AtomicReference<>();
public void lock(){
Thread thread = Thread.currentThread();
// 使用 CAS 设置线程,只有 CAS 成功才会获取锁,否则循环执行
while (!reference.compareAndSet(null, thread)){
}
}
public void unlock(){
Thread thread = Thread.currentThread();
// 只有获取到锁的线程才能解锁
reference.compareAndSet(thread, null);
}
}
上面是使用 Java 实现的一个简易自旋锁,它只使用了最简单的 CAS 操作即可完成。当 reference 引用的值为 null 时,表示还没有任何一个线程获取锁,此时通过 compareAndSet()
方法设置当前线程,如果成功,则表示获取锁成功,退出循环;否则,持续循环,直到 compareAndSet()
方法返回 true为止。
我们可以看到自旋锁实现简单,并且不会发生内核态和用户态的切换。但是,它却有几个缺点
- 锁饥饿。当锁竞争激烈的情况下,会发生一个线程从始至终
compareAndSet()
都无法设置成功,导致一直获取不到锁。 - 性能低。当锁竞争激烈的情况下,因为只有一个线程可以获取到锁,因此其他线程都将处于空转的状态。当持有锁时间越长,空转时间就越长。并且,因为他们都只访问同一个共享变量,当共享变量改变时,将导致所有线程的高速缓存都失效。
所以,自旋锁适用于锁竞争不激烈,锁持有时间短的场景。
关于高速缓冲,需要了解 CPU 的三级缓存,这里暂不涉及。
思考题🧐
如何改造上面的自旋锁代码,使其支持可重入?
CLH 锁
CLH 是对简单自旋锁的改进。它本质上也是一个自旋锁,但是它解决了锁饥饿和性能低的问题,提供了一个可扩展,性能高,并且公平的锁服务。
CLH 针对锁饥饿,它使用了基于队列的自旋锁,每个获取锁的线程都需要进入到队列中等候,并且自旋访问前一个线程是否已经释放了锁,如果未释放,则持续自旋。如果已经释放,那么则可以获取锁。通过队列的先进先服务原则,保证了每一个线程都能公平的获取到锁。
CLH 针对性能低,不再是所有线程都只访问同一个共享变量,而是在每一个线程的内部都会维护一个变量,这个变量用于表示该线程是否已经获取锁和释放锁。每一个线程都只需要自旋前一个线程的变量即可。因此,当变量的值发生变化时,只会影响后继线程,而不会影响队列中的所有线程,缩小了影响范围。
数据结构
CLH 使用隐式链表队列作为其数据结构。隐式链表的意思就是没有前驱指针和后继指针指向前一个节点和后一个节点,取而代之的是,直接在节点中记录前一个节点。
所有请求获取锁的线程,都会在进入到队列中,自旋访问前一个节点的状态变量,只有前一个节点释放锁时,当前节点才能获取锁。
CLH 本身包含一个空节点,每个 CLH 节点包含两个属性:请求锁的线程和持有锁的状态。空节点的属性线程值为 null,锁状态为释放锁。空节点使用 tail 指针引用。当有一个新的线程请求锁时,将会包装为一个新的 CLH 节点。并且使用 getAndSet 操作,返回当前 tail 指向的节点,并将新节点设置为 tail,此时新的 CLH 节点将会成为队尾节点。入队成功后,线程会轮询上一个节点的状态变量,当上一个节点释放锁后,它将得到这个锁。
- 初始化一个 CLH,此时内部只有一个空节点,线程为 null,锁状态为 false。(步骤一)
- 当线程 thread1 请求锁,包装为 CLH 节点,使用原子的 getAndSet 操作将该节点设置为 tail,并发现前一个节点的锁状态为 false,因此获取锁成功,进入临界区执行代码。(步骤二)
- 当 thread1 持有锁期间,thread2 请求锁,发现前一个节点的锁状态为 true,获取锁失败。自旋 thread1 节点的状态,等待其释放锁。(步骤三)
- thread1 释放锁,thread2 自旋发现前一个节点已经释放锁,获取锁成功。(步骤四)
- thread2 释放锁。(步骤五)
如果后续有新的线程请求锁,会从 thread2 节点后面开始重复上面的步骤。
Java 实现 CLH
import java.util.concurrent.atomic.AtomicReference;
public class CLHLock {
// 队尾节点
private final AtomicReference<Node> tail;
// 前一个节点
private ThreadLocal<Node> preNode;
// 当前节点
private ThreadLocal<Node> currNode;
public CLHLock() {
// 默认为 false 的节点
this.tail = new AtomicReference<>(new Node());
this.currNode = ThreadLocal.withInitial(Node::new);
this.preNode = new ThreadLocal<>();
}
// 节点
private static class Node {
private volatile boolean locked;
}
// 加锁方法
public void lock(){
Node node = currNode.get();
node.locked = true;
Node pre = this.tail.getAndSet(node);
preNode.set(pre);
while (pre.locked){
}
}
// 解锁方法
public void unlock(){
Node node = currNode.get();
node.locked = false;
currNode.set(new Node());
}
}
代码来源:zhuanlan.zhihu.com/p/161629590… 45 行做修改。
为什么需要使用 volatile 修饰 locked 字段?不使用不行吗?
因为在 Java 中,每个线程都有独立的工作内存,和一块所有线程共享的主内存。访问数据时,线程首先会从共享内存中将变量值复制到工作内存中,然后在工作内存中进行读写。因为工作内存是独立,当线程对变量进行修改时,其他线程是无法感知的,只有写回主内存时(事件通知),其他线程才会知道该变量值被修改了,然后会将工作内存中该变量的值废弃,重新从主内存中读取。
关于缓存失效,可以参考 MESI,提供一个模拟 MESI 的网站
使用 volatile 关键字,我们可以使得线程对变量的修改对于其他线程是可见的。而这个可见的意思就是写回主内存中。而 volatile 的之所以可以实现这个效果得益于 Java 中的 happen-before 原则。
happen-before 原则中有一条关于 volatile 变量规则 :对于 volatile 变量的写操作,先行发生于后续对这个 volatile 变量的读操作。这里的先行发生是时间意义上的先后。
所以,当一个线程解锁时,设置 node.locked = false
。当 volatile 变量 locked 写操作完成后,后续线程读取 locked 变量可以马上获取到最新值。
除了解决内存可见性之外,volatile 关键字还解决了另外一个问题:指令重排序。
大多数现代微处理器都会采用将指令乱序执行的方法,在条件允许的情况下,直接运行当前有能力立即执行的后续指令,避开获取下一条指令所需数据时造成的等待。通过乱序执行的技术,处理器可以大大提高执行效率。
换言之处理器只需要保证单线程内的最终结果是正确即可,因此为了避免指令和数据获取的时延,在不影响最终结果的情况下,会不按照代码顺序执行。
在 lock()
方法中,node.locked = true; Node pre = this.tail.getAndSet(node);
这两行代码是有可能被重排序的。即可能先执行 Node pre = this.tail.getAndSet(node);
,然后再执行node.locked = true;
。
这样就会出现另外一个情况,当执行完 Node pre = this.tail.getAndSet(node);
时,线程被挂起,此时 node 中 locked 的值为 false,并且已经被放入了队尾中。此时其他线程来请求锁,发现 tail 指向的 node 中的 locked 为 false,则直接获取锁成功,执行临界区代码。
这里造成了两个问题:
- 被挂起的线程永远都没有办法继续获取到锁,因为它已经被认为释放了锁。
- 后续执行都会因为这个出现计算错误。
volatile 会阻止这两行语句进行交换,因此避免了出现上述情况。
volatile 可以实现内存可见性和阻止指令重排序主要是依靠内存屏障来完成的。
tail 为什么使用 AtomicReference
使用 AtomicReference 的目的是保证 tail 的操作是原子性的。getAndSet 的操作其实是分为多个步骤的:
- 读取 tail 值
- 读取 node 值
- 设置 tail
- 返回原 tail 值
由于多线程的特性,这里的每一个步骤都可能被暂停。假如线程 A 执行步骤一后,CPU 时间片用完,让出 CPU。接着线程 B 获得执行权,它执行完四个步骤。此时 tail 值已经被更改为线程 B,但是线程 A 获取到的依然是原来的那个 tail。
因此,如果不能保证 tail 的原子性,那么就会出现错误。
currNode.set(new Node()); 这行代码的作用是什么?
这一行代码是为了避免 Node 被复用,导致死锁。
线程 A 释放锁,执行完 node.locked = false
后。后继线程 B 即可开始获取锁,但如果此时后继线程 B 没有 CPU 的时间片,而线程 A 再次请求锁,并且获取锁成功,此时,tail 会指向线程 A。
此时,就会变成线程 A 的 pre 指向线程 B,线程 B 的 pre 指向线程 A。双方都在等待对方释放锁,因此造成死锁。
释放锁时,替换掉原 node 对象,再次获取锁时,将会使用新的 node 对象当入队尾,而不会复用原来的 node 对象。但是原 node 对象依然被后继节点引用着。实际上在后继节点释放锁时,可以断开对前一节点的引用,帮助 JVM 更好的 GC。
小结
CLH 性能优异,获取锁和释放锁的开销都相对较小。因为使用了链表作为底层数据结构,保证了线程获取锁的公平,防止了锁饥饿的发生。实现难度较小,扩展性却很好。
但是 CLH 也有两个缺点:
- 使用了自旋操作,如果持有锁时间过长,造成 CPU 资源的浪费。
- 单一的 CLH 过于简单,无法实现复杂功能。
AQS 改造 CLH
AQS 对 CLH 的改造主要在这三方面:
- 在节点中增加了前驱链接和后继链接
- 使用阻塞操作代替自旋操作
- 扩展锁状态字段,使其可以完成更多功能
我们先来看 AQS 中的 Node 节点长什么样?
static final class Node {
static final int CANCELLED = 1;
static final int SIGNAL = -1;
static final int CONDITION = -2;
static final int PROPAGATE = -3;
volatile int waitStatus;
volatile Node prev;
volatile Node next;
volatile Thread thread;
}
在节点中增加了前驱链接和后继链接
我们可以看到 Node 节点中包含了 prev 和 next 变量,这两个变量说明 AQS 内部是使用双向链表。
prev 指针的作用主要是处理取消获取锁的节点,当线程被中断或者超时,那么就会被取消获取锁。而 prev 指针就是处理这些需要取消获取锁的线程。
next 指针的作用是用来唤醒后继线程。因为 AQS 中使用阻塞来挂起线程的,而不是自旋,因此被挂起的线程并不知道前一个节点是否已经释放了锁。所以当一个节点释放锁时,需要主动唤醒后继线程,使其可以获取到锁。
使用阻塞操作代替自旋操作
在 AQS 中,如何线程获取不到锁,将不会像原始 CLH 那样,自旋前一个节点的锁状态来得知是否可以获取锁。而是直接将线程阻塞,当前一个节点释放锁时,会主动唤醒后一个线程。线程被唤醒后,将会继续尝试获取锁。
扩展锁状态字段,使其可以完成更多功能
在 CLH 中,锁状态使用布尔变量来表示,它所表示的的状态只有两种,true 为等待获取锁/已经获取锁;false 为释放锁。在 AQS 中,状态变量是一个 int 值。
它有五种状态:
- CANCELLED(1):表示该节点已经取消获取锁
- SIGNAL(-1):表示该节点在释放锁之后,需要唤醒后继节点
- CONDITION(-2):表示该节点在条件队列中
- PROPAGATE(-3):在共享锁中才会使用,目前先不解释,主要解决 Semaphore 的 bug。想要了解的可以查看此链接
- NONE(0):默认值,节点被创建时就是这个值
AQS 源码解析
数据结构
public abstract class AbstractQueuedSynchronizer{
private Node head;
private Node tail;
private int state;
}
AQS 内部的字段极其简单,它的组成就是一个双向链表和一个同步状态。
源码解析
lock
public void lock() {
sync.acquire(1);
}
lock 方法是 ReentrantLock 中暴露给使用者获取锁的方法。我们可以看到它是委托给 sync 对象调用 acquire
方法来获取锁。acquire
方法是 AQS 中提供的一个获取锁的模版方法。
acquire
public final void acquire(int arg) {
if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)){
selfInterrupt();
}
}
-
执行
tryAcquire
尝试获取锁。- 如果成功,则表示获取锁成功。
- 如果失败,则执行 acquireQueued,将线程加入到队列中。
这段代码就是非公平锁的体现,当一个线程去请求锁时,它首先会尝试去获取锁,而不是直接加入到队列中。非公平就体现在队列中还存在等待获取锁的线程,但是一个新的线程去请求锁时,也可能会成功。这就会导致一个后请求锁的线程,可能会比先请求锁的线程优先获取到锁。
- T1 时刻,thread 1 持有锁。thread2,thread3,thread4 依次请求锁,但都失败并进入到队列中等待; 此时 thread2,thread3,thread4 都是被挂起的状态,需要其他线程去唤醒他们。
- T2 时刻,thread 1 释放锁。此时 thread1 会去唤醒队列中的第一个有效线程。在唤醒线程之前,thread5 开始请求锁。
- T3 时刻,thread5 获取锁成功。同时 thread2 被唤醒,thread2 继续请求锁失败,重新被挂起。
tryAcquire
tryAcquire 是 AQS 中定义的一个抽象方法,需要子类对其重写。这个方法的作用是在独占锁模式下,尝试获取锁。这个方法应该始终由发起请求的线程来执行。如果这个方法返回 false,并且线程未在队列中排队,则可以将线程加入队列中,等待其他线程将其唤醒。如果返回 true,则表示请求锁成功。
在 ReentrantLock 中,是通过一个内部抽象类 Sync 继承 AbstractQueuedSynchronizer,一个 NonfairSync 继承 Sync 实现非公平锁。一个 FairSync 继承 Sync 实现公平锁。
因此重写 tryAcquire 方法的是 NonfairSync 类和 FairSync 类。
经过我们简化后,tryAcquire 的代码
// 非公平锁
static final class NonfairSync extends Sync {
private static final long serialVersionUID = 7316153563782823691L;
// 重写 tryAcquire
protected final boolean tryAcquire(int acquires) {
return nonfairTryAcquire(acquires);
}
}
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
// 公平锁
static final class FairSync extends Sync {
private static final long serialVersionUID = -3000897897090466540L;
protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (!hasQueuedPredecessors() &&
compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
}
执行步骤:
-
获取 state 值
-
判断 state 值是否为 0,如果等于 0,则表示还没有线程获得锁。不等于 0,则表示锁已被其他线程获得
- 如果 state 等于 0,则通过 CAS 值设置 state。如果设置成功,则表示该线程获取锁,执行临界区代码
- 如果 state 不等于 0(为1),则判断当前线程是否为已经获取锁的线程。如果是,则表示该线程获取锁,执行临界区代码。(这也表明 ReentrantLock 是可重入锁)。重入时,state 会被设置为重入的数量。
这个方法的主要作用就是应该如何获取锁。最主要的一段代码就是:compareAndSetState(0, acquires)
。修改 state 字段的值为 acquires
。因此,我们可以得出一个结论,获取锁的关键就在于能不能成功设置 state。如果可以,那么就表示获取锁成功。
ReentrantLock 是一个可重入的锁,那么则需要记录下重入的数量,以便于可以正确的释放锁。在 ReentrantLock 中,重入数据的记录也是通过 state 来实现,当线程每重入一次时,就会将当前 state 加 1,用于记录重入的数量。
对比公平锁和非公平锁,我们可以发现他们之间请求锁的代码中只有一处是不同的。公平锁会先判断队列中是否存在有其他线程等待获取锁,如果有,则直接将当前线程加入到队列中。
addWaiter
private Node addWaiter(Node mode) {
Node node = new Node(mode);
for (;;) {
Node oldTail = tail;
if (oldTail != null) {
node.setPrevRelaxed(oldTail);
if (compareAndSetTail(oldTail, node)) {
oldTail.next = node;
return node;
}
} else {
initializeSyncQueue();
}
}
}
当线程请求锁失败时,就会调用该方法。主要作用是向同步队列中加入一个等待节点。
-
创建一个新的节点
-
在 for 循环中执行以下步骤
-
判断 tail 是否存在
- 存在,说明已经初始化过了。直接通过 CAS 操作,将当前节点设置为 tail
- 不存在,则初始化链表。初始化链表的操作就是,创建一个空节点,让 head 和 tail 指针都指向这个空节点。然后重新进入下一轮 for 循环,将之前创建的节点加入到队列之中。
-
通过这个代码,我们可以知道两个细节:
- AQS 中的同步队列是延迟创建的,只有锁第一次存在竞争时才会创建。
- AQS 中的同步队列使用的是有头双向链表。
acquireQueued
final boolean acquireQueued(final Node node, int arg) {
// 1. 定义中断变量。这个变量主要是为了重新设置线程的中断标记。
boolean interrupted = false;
try {
// 进入 for 循环。为了线程被唤醒之后,可以通过该循环,重新执行获取锁的流程。
for (;;) {
// 获取刚刚加入节点的前驱节点
final Node p = node.predecessor();
// 如果前驱节点是头节点,那么说明当前节点是队列中第一个获取锁的线程。
// 所以,让该线程调用 tryAcquire() 再次尝试获取锁。
if (p == head && tryAcquire(arg)) {
// 获取锁成功,设置该节点为头节点。
setHead(node);
p.next = null; // help GC
// 返回中断状态
return interrupted;
}
// 获取锁失败之后,判断是否需要阻塞
if (shouldParkAfterFailedAcquire(p, node))
// 如果需要,那么则阻塞线程。当线程被唤醒后,会设置中断状态
interrupted |= parkAndCheckInterrupt();
}
} catch (Throwable t) {
// 取消请求
cancelAcquire(node);
if (interrupted)
// 发起中断
selfInterrupt();
throw t;
}
}
当线程通过 addWaiter()
加入到等待队列中后,就会执行该方法。主要作用是为已经在队列中的线程请求锁,这个方法是整个独占锁模式中的核心代码。我们一句一句来剖析。
一开始定义了 interrupted 变量,这个变量的作用是为了重新设置线程的中断标记。为什么需要重新设置中断呢?先留个悬念。
接下来会进入一个 for 循环中,跳出循环的条件就是成功获取锁。进入 for 循环后,首先获取到当前节点的前驱节点,如果前驱节点是头节点,那么说明当前节点是队列中第一个等待请求锁的线程。线程释放锁时,它作为第一候选人去尝试获取锁。获取锁成功后,则将该节点设置为头节点。
如果获取锁失败,那么则会执行 shouldParkAfterFailedAcquire 判断线程是否需要阻塞。如果返回 true,表示需要进行阻塞,因此会调用 parkAndCheckInterrupt 方法进行阻塞。线程被阻塞后,会停留在 LockSupport.park(this);
这一句中。直到以下三种情况才会被唤醒:
- 其他线程调用了
unpark
方法,将该线程唤醒 - 其他线程中断 (interrupts ) 了该线程
- 调用出现了错误
private final boolean parkAndCheckInterrupt() {
LockSupport.park(this);
return Thread.interrupted();
}
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus;
if (ws == Node.SIGNAL)
/*
* This node has already set status asking a release
* to signal it, so it can safely park.
*/
return true;
if (ws > 0) {
/*
* Predecessor was cancelled. Skip over predecessors and
* indicate retry.
*/
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
/*
* waitStatus must be 0 or PROPAGATE. Indicate that we
* need a signal, but don't park yet. Caller will need to
* retry to make sure it cannot acquire before parking.
*/
pred.compareAndSetWaitStatus(ws, Node.SIGNAL);
}
return false;
}
该方法首先判断前驱节点的状态是否为 SIGNAL,如果为 SIGNAL,表示已经告诉前驱节点在释放锁时,需要来通知我,因此可以直接进行阻塞线程操作。
如果大于 0,其实就是状态为 CANCELLED,那么则继续往前寻找到一个状态为非 CANCELLED 的节点,并且重新设置 pred 指针,进入下一轮循环。
如果等于 0,那么表示还未告诉前驱节点在释放锁时,需要来通知我,因此要设置前驱节点的状态为 SIGNAL,表示它释放锁时需要来通知我,进入下一轮循环。
这里会有一个疑惑点:为什么设置了前驱节点的状态为 SIGNAL 后,需要再次循环去尝试获取锁,而不是直接进行阻塞?这里需要结合释放锁的代码来一起看。
release
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
执行 tryRelease
,该方法与 tryAcquire
方法一样,都是需要子类重写的方法。该方法主要作用是设置 state 的值减 1。
如果执行成功,则表示释放锁成功,那么就需要唤醒后继线程去获取锁。但我们知道,AQS 中使用的是 CLH 队列,采用先进先服务的原则,所以会先唤醒第一个线程,也就是唤醒 head 的后继节点。因为 head 是一个空节点,不属于正在等待获取锁的节点。
获取到 head 节点,如果 head 为 null,表示队列中没有需要唤醒的节点,直接返回。如果 head 的状态为 0,表示没有后继线程需要唤醒。因为只要状态为 SIGNAL 时,才会去唤醒后继线程。
当符合上述两个条件时,才会调用 unparkSuccessor
方法,开始唤醒后继线程。
private void unparkSuccessor(Node node) {
/*
* If status is negative (i.e., possibly needing signal) try
* to clear in anticipation of signalling. It is OK if this
* fails or if status is changed by waiting thread.
*/
int ws = node.waitStatus;
if (ws < 0)
// 设置头节点的状态为 0
node.compareAndSetWaitStatus(ws, 0);
/*
* Thread to unpark is held in successor, which is normally
* just the next node. But if cancelled or apparently null,
* traverse backwards from tail to find the actual
* non-cancelled successor.
*/
// 如果后继节点为取消状态,从后往前找一个有效的节点
Node s = node.next;
if (s == null || s.waitStatus > 0) {
s = null;
for (Node p = tail; p != node && p != null; p = p.prev)
if (p.waitStatus <= 0)
s = p;
}
if (s != null)
// 唤醒线程
LockSupport.unpark(s.thread);
}
node 节点其实就是 head 节点,首先判断如果 head 节点小于 0,也就是处于 SIGNAL 状态,则将状态修改为 0。为什么要做这一步操作呢?
为了保证线程唤醒与阻塞的正确性。因为 LockSupport.unpark
方法,在线程运行时调用,那么在下一次调用 LockSupport.park
方法时,线程是不会被阻塞的。
还有就是我已经唤醒了后继线程,已经完成了我的任务,那么状态自然就是默认状态。如果后继线程还需要通知,那么需要重新设置我的状态。
- 在 T1 时刻,thread2 持有锁。thread1 进入第一次循环,获取锁失败,开始执行
shouldParkAfterFailedAcquire
。但是由于时间片使用完,CPU 将 thread1 挂起。此时 head 的 waitStatus 依然为 0。 - thread2 获取时间片,开始释放锁,判断条件
h != null && h.waitStatus != 0
,发现 head 的状态为 0,因此直接返回,不会唤醒后继线程。 - thread1 获取到时间片,继续执行
shouldParkAfterFailedAcquire
,修改 head 节点的状态设置为 SIGNAL,然后直接阻塞。
为什么设置了前驱节点的状态为 SIGNAL 后,需要再次循环去尝试获取锁,而不是直接进行阻塞?这里就可以给出解答:如果设置了 SIGNAL 后,直接阻塞的话,thread 1 会无法获取锁。
生活中的例子
我们可以想象一个看病的例子来理解 AQS 中的行为。
如果我到了诊所门口,可以先去看看是否有人在就诊,如果没有则可以直接进入诊所就诊。就诊就是等于获取锁成功。
如果我们到了诊所门口,发现已经有其他人在就诊,我们可以让那个人就诊完成出来后告知我一声可以进入就诊,并且在诊所门口排队。
如果前一个人已经就诊完成,但是还在通知我去就诊的路上。此时有另一个人已经到了诊所门口,发现诊所里面没有人,他直接进入诊所中就诊。我收到通知之后,赶来门口查看,发现里面已经有人在就诊了,因此只能回去原来的位置继续排队。并让那个人就诊完成出来后告知我一声可以进入就诊。
深入讨论
interrupted 的作用是什么
在 Java 中,采用的是协作式中断。当中断一个线程时,它只是在 JVM 中设置了线程的中断标志为 true,并不会影响当前正在运行的线程。但是如果当前线程是处于 BLOCK 和 WAITED 的状态下,则会被唤醒。
在 AQS 中,如果线程在阻塞时,被中断唤醒了,首先会执行 Thread.
interrupted
()
方法,这个方法的作用是:返回线程的中断状态,并且设置中断状态为 false,即会清除中断状态。
为什么 AQS 自己清除中断状态,然后又自己重新设置回去呢,一开始不清除不就可以了吗?这是为了不影响获取锁的过程。
当线程被中断唤醒后,会继续尝试获取锁,此时如果获取锁失败了,应该需要继续调用 LockSupport.
park
, 将线程阻塞。但是由于中断状态的存在,此时线程是无法被阻塞的。因为即使阻塞之后,只要检测到中断状态,又会被唤醒。
因此在线程被唤醒后,他需要先清除中断标记位,直到线程顺利获取锁之后,再重新设置中断状态,让业务自行去处理中断。
响应中断
上述的操作是通过忽略中断的方式来获取锁。AQS 中,还有另外一种响应中断 acquireInterruptibly
****的方式来获取锁。
它与忽略中断的不同之处在于,线程被唤醒后,如果发现了中断,则直接抛出 InterruptedException
异常,并且将该线程踢出队列中。
什么时候阻塞线程会被唤醒
- 其他线程调用了
unpark
方法,将该线程唤醒 - 其他线程中断 (interrupts) 了线程
- 调用出现了错误
通过 LockSupport.
park
**方法阻塞的线程,并不知道它是通过以上哪种方式被唤醒的。
为什么需要循环两次才阻塞线程
查看 release
方法的解析
释放锁时,为什么需要从后往前寻找后继节点
这里只需要结合 addWaiter 方法就可以理解了。
private Node addWaiter(Node mode) {
Node node = new Node(mode);
for (;;) {
Node oldTail = tail;
if (oldTail != null) {
node.setPrevRelaxed(oldTail);
if (compareAndSetTail(oldTail, node)) {
oldTail.next = node;
return node;
}
} else {
initializeSyncQueue();
}
}
}
在替换 tail 节点之前,会先设置 node 的前驱节点为当前的 tail 节点。也就是说此时,通过 node 的 pred 指针是可以找到 oldTail 的,但是 oldTail 的 next 指针找不到 node,然后通过 CAS 操作将 tail 指针指向 node。
如果我们在释放锁时,从前往后找的话,因为 oldTail 的 next 指针还未有设置值为 node,因此我们有可能找到 oldTail 节点的时候就以为已经遍历完全部节点了,但其实还忽略了一个新加入的节点。
如果我们从后往前找,在通过 CAS 操作将 tail 指针指向 node 后,我们就可以直接拿到最新的 tail 值,并且新 tail 的前驱指针已经在前面设置完成,所以,可以查询到全部节点。
小结:从后往前寻找后继节点是为了在新节点加入到队列中时,可以扫描到它。原因是 compareAndSetTail(oldTail, node)
和 oldTail.next = node;
这两条语句不是原子操作。
如何理解 AQS 中的公平锁和非公平锁
在 AQS 中,使用了双向链表作为同步队列,进入队列中的线程采用先进先服务的方式。因此可以得出结论,只要进入队列中的线程获取锁就是公平的。但是在非公平锁的模式下,在进入队列之前,会先尝试去获取锁,如果获取锁成功,那么就可以不进入队列去等待,而这就是非公平锁的本质。
而公平锁则是,只要获取锁,那么就必须先进入队列。而队列可以保证先进先服务,可以保证绝对的公平。
什么时候会取消锁请求
- 获取锁的过程中发生异常,如果响应中断的方法下,那么中断异常也会导致取消获取锁
- 使用有请求锁时间的方法,如果在指定时间内没有成功获取锁,会被取消获取锁
转载自:https://juejin.cn/post/7196549577163489317