详解AQS的具体流程
AQS中的四个字段
本文讲解的是JDK17中的源码。
AQS中的字段很少,除去一些静态常量后就只剩下四个字段,并且这四个字段还恰恰是非常关键的,它们分别是 head
、tail
、state
和 exclusiveOwnerThread
,其中 exclusiveOwnerThread
字段是继承自父类的。
由于AQS就是一个多线程同步器,用来同步多个线程的,因此需要一个数据结构来存放正在同步过程中的线程(也就是正在排队中的线程),在这里AQS使用了双向链表数据结构来作为等待队列,而AQS中的 head
和 tail
变量就是用来指向等待队列中头节点和尾节点的。
AQS就像一个窗口柜台业务员,它的 state
字段用来表示该业务员当前的状态,比如是正在办理业务中还是正在空闲中,如果是正在办理中的话,那么它的 exclusiveOwnerThread
字段就是用来表示当前正在给谁办理业务中,如果正在空闲中的话,则该字段的值为 null
,表示现在没人正在办理业务中。
在AQS的等待队列中存放的并不是线程对象本身,而是封装了线程相关信息的节点对象,结构如下图:
等待队列中的虚拟节点
在上图的AQS等待队列中会注意到头节点是一个虚拟节点,这并不是什么特殊情况,而是一种简化链表操作的巧妙设计,这种设计在学习链表数据结构的时候就经常会用到。如果没有这个虚拟节点的话,则我们需要做很多繁琐的临界判断,比如在每次插入节点的时候都需要判断头指针是否指向 null
,如果是的话则需要额外将头指针指向自己,然后再修改为指针,而有了虚拟节点后,在插入节点的时候直接改变尾指针即可,不用考虑头指针的指向 null
情况了,因为有了虚拟节点的存在后头指针一定不为 null
。
虚拟节点也可以叫做辅助节点、伪节点,我这里是翻译自AQS注释中的 “dummy node”。
请求修改状态
在AQS中,线程想办理某种业务时必须要先去请求执行该业务的许可,只有得到许可后才可以去窗口柜台中办理业务,如果本次请求失败了,那么可以过一会再来请求。
在请求许可时,实际上是去操作AQS中的 state
字段,可以通过 acquire
方法来发出请求,参数为要修改成的状态(具体的状态对应的整数值由子类决定),该方法会在请求成功后返回,中途必要时会发生阻塞行为。
acquire
方法在执行前会先去尝试修改状态,修改成功则直接就返回了,由于是尝试修改,所以即使修改失败了也会直接返回,而不会产生任何的阻塞行为。具体的是怎么修改的以及状态整数值的意义是什么,都是由子类来决定的,而AQS只是负责同步线程,也就是维护秩序的保安。从这里就可以体会到AQS(抽象队列同步器)的名称含义所在了。
acquire
方法的源码如下:
public final void acquire(int arg) {
if (!tryAcquire(arg)) // 尝试请求修改状态
// 请求失败,开始进入等待队列。
acquire(null, arg, false, false, false, 0L);
}
AQS为我们提供了3个用来操作 state
字段的方法,分别为:
getState
:普通的Getter方法。setState
:普通的Setter方法。compareAndSetState
:通过CAS的方式将预期的状态改为指定的状态。
具体的请求流程
我们先假设有一个线程正在执行足够耗时的任务,此时第二个线程去请求执行任务,然后我们分析第二个线程的执行流程。入队的完整源码如下:
final int acquire(AbstractQueuedSynchronizer.Node node, int arg, boolean shared,
boolean interruptible, boolean timed, long time) {
Thread current = Thread.currentThread();
byte spins = 0, postSpins = 0;
boolean interrupted = false, first = false;
AbstractQueuedSynchronizer.Node pred = null;
for (;;) {
if (!first && (pred = (node == null) ? null : node.prev) != null &&
!(first = (head == pred))) {
if (pred.status < 0) {
cleanQueue();
continue;
} else if (pred.prev == null) {
Thread.onSpinWait();
continue;
}
}
if (first || pred == null) {
boolean acquired;
try {
if (shared)
acquired = (tryAcquireShared(arg) >= 0);
else
acquired = tryAcquire(arg);
} catch (Throwable ex) {
cancelAcquire(node, interrupted, false);
throw ex;
}
if (acquired) {
if (first) {
node.prev = null;
head = node;
pred.next = null;
node.waiter = null;
if (shared)
signalNextIfShared(node);
if (interrupted)
current.interrupt();
}
return 1;
}
}
if (node == null) {
if (shared)
node = new AbstractQueuedSynchronizer.SharedNode();
else
node = new AbstractQueuedSynchronizer.ExclusiveNode();
} else if (pred == null) {
node.waiter = current;
AbstractQueuedSynchronizer.Node t = tail;
node.setPrevRelaxed(t);
if (t == null)
tryInitializeHead();
else if (!casTail(t, node))
node.setPrevRelaxed(null);
else
t.next = node;
} else if (first && spins != 0) {
--spins;
Thread.onSpinWait();
} else if (node.status == 0) {
node.status = WAITING;
} else {
long nanos;
spins = postSpins = (byte)((postSpins << 1) | 1);
if (!timed)
LockSupport.park(this);
else if ((nanos = time - System.nanoTime()) > 0L)
LockSupport.parkNanos(this, nanos);
else
break;
node.clearStatus();
if ((interrupted |= Thread.interrupted()) && interruptible)
break;
}
}
return cancelAcquire(node, interrupted, interruptible);
}
这里需要先了解一些要点:
- AQS对于阻塞操作的优化。阻塞线程是一个很昂贵的操作,涉及到操作系统从用户态到内核态的切换,因此AQS会尽可能地避免阻塞,会在进入等待队列前频繁地尝试修改状态,如果尝试多次后实在修改不了,才会迫不得已进入等待队列并开始阻塞。
- 源码中的
first
变量指向的节点并不是等待队列中的虚拟节点,而是虚拟节点的下一个节点。 pred
变量指向当前节点的上一个节点,由于有虚拟节点的存在,只要该变量的值不为null
,则说明当前节点已经进入到等待队列中了,如果为null
则说明节点还没有创建或已创建但还没有进入到等待队列中。- 入队操作分为好几个小阶段,每一次for循环都会执行其中一个小阶段的任务。
这段源码不是高挑美女,没有什么惊艳的设计模式,而是传统的素颜美女,非常的耐看,每一次看都会有不一样的体会。由于这段代码非常的绕(不愧是Doug Lea),每一次for循环都是一个小阶段,并且不同阶段会走不同的分支,我们只需关注特定阶段的分支代码并了解该阶段的具体流程即可,因此我会省略掉一些无关代码,并用红色框框出特定阶段需要关注的代码。
阶段1:创建空节点
阶段2:创建虚拟节点/入队
创建虚拟节点
在入队先需要先判断是否有虚拟节点,如果没有虚拟节点则当前阶段会变成创建虚拟节点阶段。
在创建好虚拟节点后并不是直接入队的,由于AQS需要面对高并发的场景,因此AQS在修改任何内部的共享变量时都会使用到CAS技术。
private void tryInitializeHead() {
Node h = new ExclusiveNode();
if (U.compareAndSetReference(this, HEAD, null, h))
tail = h;
}
假如此时有大量的线程在同时创建完虚拟节点,接下来同时修改 head
变量,此时只有一个线程可以修改成功,而其它线程将会修改失败。这里需要注意的是,修改失败的线程并不需要自旋重试,因为只要有了虚拟节点就行,后面又不会使用到该虚拟节点,因此没有必要再创建一个新的虚拟节点然后再覆盖之前的虚拟节点。
CAS成功的线程可以直接修改 tail
变量,而不用担心线程安全的问题,因为CAS失败的线程直接就返回了,不会操作 tail
变量。
入队
有了虚拟节点后,当前阶段就会变成入队阶段了。入队其实就是将 tail
变量由原来的指向改成指向当前节点。由于 tail
变量是共享变量,因此需要使用CAS来修改。这里其实还有个需要注意的地方, tail
变量原来指向的节点其实也算是共享变量,但由于只有CAS修改 tail
变量成功的线程才会操作该共享变量,不存在竞争关系,因此无需再使用CAS来修改。
阶段3:修改节点状态
节点是用来封装对应线程相关信息的,封装的信息如下:
prev
:指向等待队列中的前一个节点。next
:指向等待队列中的后一个节点。waiter
:当前节点对应的线程对象。status
:当前节点的状态。
节点的状态有4种:
0
:初始值,表示节点刚刚被创建,无任何状态。WAITING
:线程正在阻塞状态中。CANCELLED
:值为负数,表示线程已退出,有可能是等太久不想等了(超时),也有可能是被其它人叫走了(被打断)。只要节点的状态值为负数,则一律为CANCELLED
状态。COND
:有条件的等待,和Condition
有关,可以先忽略。
开始修改节点状态:
这里需要特别注意一点是,当前阶段中如果节点不是 first
节点则不会尝试修改状态,因为AQS必须确保等待队列中的线程会按照FIFO的顺序请求执行任务,也就是说只要节点入队了就不能插队,此时由于非 first
节点已经入队了,所以当前阶段包括后续的阶段都不能再去尝试修改状态了,否则就属于插队了。
阶段4:阻塞
阻塞也就是调用了 LockSupport
类的 park
方法。
在这些阶段中,可以很明显地观察到每一个小阶段前都会尝试修改状态(非 first
节点入队后的阶段除外),也就是前面说的除非迫不得已,否则AQS会尽最大的努力来避免进入阻塞状态。如果刚好尝试修改状态成功了,则会直接返回,如果已经入队列了则会将 first
节点中的信息清空以作为新的虚拟节点并移除之前的虚拟节点,然后再返回。
唤醒first节点
当正在执行任务的线程执行完了后,会释放先前请求修改的状态(具体的释放操作由子类完成),并且在释放成功后会唤醒 first
节点中的线程。
public final boolean release(int arg) {
if (tryRelease(arg)) {
// 唤醒first节点
signalNext(head);
return true;
}
return false;
}
private static void signalNext(Node h) {
Node s;
// 有first节点并且first有状态时才唤醒
if (h != null && (s = h.next) != null && s.status != 0) {
// 将WAITING状态改为CANCELLED状态,表示该线程已经被唤醒了。
s.getAndUnsetStatus(WAITING);
// 唤醒first节点对应的线程
LockSupport.unpark(s.waiter);
}
}
first
节点被唤醒后会进行下一轮的for循环,并且在下一轮的for循环中去尝试修改状态,修改成功后会将 first
节点出队。
first节点自旋
先不考虑线程阻塞超时和被打断的情况,只考虑 first
节点被执行完任务的线程正常唤醒后,如果此时的并发量很大,在 first
节点被唤醒的同时又有新线程请求修改状态,那么此时就算是排到了 first
节点,first
节点也不一定会优先被处理,还得和新来的线程竞争,只有竞争成功了才可以去修改状态。
由于这种情况的存在,如果 first
节点和新来的线程竞争失败了,为了避免立即进入阻塞状态,first
节点还会自旋几次,如果实在修改不了状态才会进入阻塞状态,从这里也可以看出AQS会尽最大努力避免线程进入阻塞状态,以达到高性能的目的。
first
节点自旋的次数是有规律的,公式为 2n+1
,并且最开始 n
的值为0。acquire
方法中会有2个与自旋次数有关的变量,分别为 spins
和 postSpins
,spins
表示再自旋多少次就会阻塞,postSpins
表示 first
节点在被唤醒后如果和新来的线程竞争时又失败了,则需要再自旋多少次才会进入阻塞状态。
自旋的流程如下:
由此可以看到,AQS并不是完全公平的,只有进入到等待队列中的线程才是公平的。
线程自然唤醒或被中断
当节点的阻塞时间达到超时时间后,或者被其它线程打断时,节点会开始退出等待队列。
当节点由于达到超时时间而被唤醒后,并不会立刻退出队列,而是会尝试一下修改状态(只有 first
节点才会尝试修改状态,因为非 first
节点不能插队),如果刚好就修改成功了则直接返回,如果没有修改成功则只能乖乖退出队列了。
当阻塞的节点被其它线程打断时,不应该像上面一样临走前还去尝试一下修改状态,因为线程必须要能够及时响应中断。
当线程退出for循环后,就会开始退出队列。
中断线程时,需要非常注意一点的是区分要中断的是请求修改状态的过程,还是要中断当前线程的执行。acquire
方法中的 interruptible
参数就表示在请求修改状态的过程中是否可以被中断,如果可以的话,在收到中断信号后会清空信号,以防止意外地将这个信号发给后续执行的任务中。
退出等待队列
退出等待队列前线程会将当前节点的状态变成 CANCELLED
,然后从等待队列中的尾部节点开始向前遍历,当遍历到 CANCELLED
状态的节点时就弹出该节点。之所以从后往前遍历,主要是为了避免与线程唤醒操作之间的竞争,因为线程是从头部开始唤醒的,而弹出操作是从尾部开始的,这样就可以最大限度地避免竞争的发生。
在遍历节点时,会用到3个指针,分别是 p
、q
、s
,其中 q
是最主要的,指向了当前遍历到哪个节点,p
和 s
用来指向当前节点的前一个和后一个节点,以便在操作链表时不会把链表搞断。
总结
AQS其实就是一个高性能的用来管理等待队列的一个框架,每个线程在执行任务前都必须要修改AQS的相关状态,只有修改成功的线程才可以执行任务,并且在执行完任务后还需要恢复状态。有了AQS框架后,我们只需关注修改和恢复状态的逻辑即可,而管理其它未修改成功的线程的任务交给AQS去执行即可,这也是队列同步器的含义所在。
转载自:https://juejin.cn/post/7211103821082869818