【AQS】我们不妨把话讲的再明白一点
前言
如果被请求的共享资源被多个线程争抢,那么就需要一套线程阻塞等待以及被唤醒时锁分配的机制,AQS就实现了这个机制。
AQS(AbstractQueuedSynchronizer)
即是抽象的队列式的同步器,内部定义了很多锁相关的方法,我们熟知的ReentrantLock
、ReentrantReadWriteLock
、CountDownLatch
、Semaphore
等都是基于AQS
来实现的。
AQS的实现原理
AQS使用一个volatile int成员变量来表示同步状态,通过内置的FIFO(CLH)队列来完成获取资源的排队工作。AQS使用CAS对该同步状态进行原子操作实现对其值的修改。
CLH(Craig,Landin,and Hagersten)队列是一个虚拟的双向队列(虚拟的双向队列即不存在队列实例,仅存在结点之间的关联关系)。AQS是将每条请求共享资源的线程封装成一个CLH锁队列的一个结点(Node)来实现锁的分配。
AQS的数据结构
AQS对资源的共享方式
- Exclusive(独占):只有一个线程能执行,如ReentrantLock。
- Share(共享):多个线程可同时执行,如CountDownLatch。
独占锁(Exclusive)又可分为公平锁和非公平锁。
- 公平锁:按照排队顺序获取锁。
- 非公平锁:要获取锁时,直接去抢锁,抢不到再乖乖排队。
AQS的模板方法
使用者继承AbstractQueuedSynchronizer并重写指定方法,即可自定义同步器。 AQS使用了模板方法模式,自定义同步器时需要重写下面几个AQS提供的模板方法:
isHeldExclusively()//该线程是否正在独占资源。只有用到condition才需要去实现它。
tryAcquire(int)//独占方式。尝试获取资源,成功则返回true,失败则返回false。
tryRelease(int)//独占方式。尝试释放资源,成功则返回true,失败则返回false。
tryAcquireShared(int)//共享方式。尝试获取资源。负数表示失败;0表示成功,但没有剩余可用资源;正数表示成功,且有剩余资源。
tryReleaseShared(int)//共享方式。尝试释放资源,成功则返回true,失败则返回false。
默认情况下,每个方法都抛出 UnsupportedOperationException。 这些方法的实现必须是内部线程安全的,并且通常应该中断而不是阻塞。AQS类中的其他方法都是final ,所以无法被其他类使用,只有这几个方法可以被其他类使用。
源码分析
类的继承关系
AbstractQueuedSynchronizer继承自AbstractOwnableSynchronizer抽象类,并且实现了Serializable接口,可以进行序列化。
AbstractOwnableSynchronizer抽象类仅有一个成员变量,表示独占线程,以及独占线程的get和set方法。
public abstract class AbstractOwnableSynchronizer
implements java.io.Serializable {
private static final long serialVersionUID = 3737899427754241961L;
protected AbstractOwnableSynchronizer() { }
/**
* 独占模式下的线程
*/
private transient Thread exclusiveOwnerThread;
protected final void setExclusiveOwnerThread(Thread thread) {
exclusiveOwnerThread = thread;
}
protected final Thread getExclusiveOwnerThread() {
return exclusiveOwnerThread;
}
}
AbstractQueuedSynchronizer类有两个内部类,分别为Node类与ConditionObject类。
类的内部类-Node类
static final class Node {
/** 用于指示节点正在共享模式下等待的标记 */
static final Node SHARED = new Node();
/** 用于指示节点正在独占模式下等待的标记 */
static final Node EXCLUSIVE = null;
//节点状态枚举值
/** 表示线程已取消 */
static final int CANCELLED = 1;
/** 表示当前节点的后继节点包含的线程需要运行,也就是unpark */
static final int SIGNAL = -1;
/** waiting on condition */
static final int CONDITION = -2;
/** 当前场景下后续的acquireShared能够得以执行 */
static final int PROPAGATE = -3;
// 结点状态,如果值为0,表示当前节点在sync队列中,等待获取锁
volatile int waitStatus;
// 前驱节点
volatile Node prev;
// 后继节点
volatile Node next;
// 节点内的线程
volatile Thread thread;
// 下一个等待者
Node nextWaiter;
// 结点是否在共享模式下等待
final boolean isShared() {
return nextWaiter == SHARED;
}
// 获取前驱节点,如果为空则抛出异常
final Node predecessor() throws NullPointerException {
Node p = prev;
if (p == null)
throw new NullPointerException();
else
return p;
}
Node() {}
Node(Thread thread, Node mode) { // Used by addWaiter
this.nextWaiter = mode;
this.thread = thread;
}
Node(Thread thread, int waitStatus) { // Used by Condition
this.waitStatus = waitStatus;
this.thread = thread;
}
}
类的内部类 - ConditionObject类(可以先跳过不看)
public class ConditionObject implements Condition, java.io.Serializable {
private static final long serialVersionUID = 1173984872572414699L;
/** condition队列头结点*/
private transient Node firstWaiter;
/** condition队列尾结点*/
private transient Node lastWaiter;
public ConditionObject() { }
// Internal methods
// 添加新的waiter到wait队列
private Node addConditionWaiter() {
Node t = lastWaiter;
// If lastWaiter is cancelled, clean out.
// 尾结点不为空,并且尾结点状态不为CONDITION
if (t != null && t.waitStatus != Node.CONDITION) {
// 清楚状态为CONDITION的结点
unlinkCancelledWaiters();
// 最后一个结点赋值给t
t = lastWaiter;
}
// 新建一个结点
Node node = new Node(Thread.currentThread(), Node.CONDITION);
if (t == null)
// 尾结点为空设置CONDITION队列的头结点
firstWaiter = node;
else
// 尾结点不为空,将结点放到尾结点后面
t.nextWaiter = node;
// 更新尾结点
lastWaiter = node;
return node;
}
private void doSignal(Node first) {
do {
// 该结点的nextWaiter为空
if ( (firstWaiter = first.nextWaiter) == null)
// 设置尾结点为空
lastWaiter = null;
// 设置first结点的nextWaiter
first.nextWaiter = null;
// 将结点从CONDITION队列转移到sync队列失败,并且condition队列中的头结点不为空,一直循环
} while (!transferForSignal(first) &&
(first = firstWaiter) != null);
}
private void doSignalAll(Node first) {
// condition队列的头结点尾结点都设置为空
lastWaiter = firstWaiter = null;
do {
//一直循环 将condition队列中的每个节点依次转移到sync队列
Node next = first.nextWaiter;
first.nextWaiter = null;
transferForSignal(first);
first = next;
} while (first != null);
}
// 从condition队列中清除状态为cancel的结点
private void unlinkCancelledWaiters() {
Node t = firstWaiter;
Node trail = null;
while (t != null) {
Node next = t.nextWaiter;
if (t.waitStatus != Node.CONDITION) {
t.nextWaiter = null;
if (trail == null)
firstWaiter = next;
else
trail.nextWaiter = next;
if (next == null)
lastWaiter = trail;
}
else
trail = t;
t = next;
}
}
// public methods
// 唤醒一个等待线程,如果所有的线程都在等待次条件,则选择其中的一个唤醒。在从await返回之前,改线程必须重新获取锁
public final void signal() {
// 不被当前线程独占,抛出异常
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
// condition队列头结点
Node first = firstWaiter;
// 头节点不为空,唤醒一个等待线程
if (first != null)
doSignal(first);
}
// 唤醒所有等待线程。
public final void signalAll() {
// 不被当前线程独占,抛出异常
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
Node first = firstWaiter;
// 头结点不为空,唤醒所有线程
if (first != null)
doSignalAll(first);
}
// 等待,当前线程在接到信号之前,一直处于等待状态,不响应中断
public final void awaitUninterruptibly() {
Node node = addConditionWaiter();
long savedState = fullyRelease(node);
boolean interrupted = false;
while (!isOnSyncQueue(node)) {
LockSupport.park(this);
if (Thread.interrupted())
interrupted = true;
}
if (acquireQueued(node, savedState) || interrupted)
selfInterrupt();
}
/** Mode meaning to reinterrupt on exit from wait */
private static final int REINTERRUPT = 1;
/** Mode meaning to throw InterruptedException on exit from wait */
private static final int THROW_IE = -1;
private int checkInterruptWhileWaiting(Node node) {
return Thread.interrupted() ?
(transferAfterCancelledWait(node) ? THROW_IE : REINTERRUPT) :
0;
}
private void reportInterruptAfterWait(int interruptMode)
throws InterruptedException {
if (interruptMode == THROW_IE)
throw new InterruptedException();
else if (interruptMode == REINTERRUPT)
selfInterrupt();
}
// 等待 当前线程在接到信号或被中断之前一直处于等待状态
public final void await() throws InterruptedException {
// 当前线程被中断,抛出异常
if (Thread.interrupted())
throw new InterruptedException();
// 在wait队列上添加一个结点
Node node = addConditionWaiter();
long savedState = fullyRelease(node);
int interruptMode = 0;
while (!isOnSyncQueue(node)) {
// 阻塞当前线程
LockSupport.park(this);
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
if (node.nextWaiter != null) // clean up if cancelled
unlinkCancelledWaiters();
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
}
// 等待,当前线程在接到信号、被中断或达到指定等待时间之前一直处于等待状态
public final long awaitNanos(long nanosTimeout)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
Node node = addConditionWaiter();
long savedState = fullyRelease(node);
final long deadline = System.nanoTime() + nanosTimeout;
int interruptMode = 0;
while (!isOnSyncQueue(node)) {
if (nanosTimeout <= 0L) {
transferAfterCancelledWait(node);
break;
}
if (nanosTimeout >= spinForTimeoutThreshold)
LockSupport.parkNanos(this, nanosTimeout);
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
nanosTimeout = deadline - System.nanoTime();
}
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
if (node.nextWaiter != null)
unlinkCancelledWaiters();
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
return deadline - System.nanoTime();
}
// 等待,当前线程在接到信号、被中断或到达指定最后期限之前一直处于等待状态
public final boolean awaitUntil(Date deadline)
throws InterruptedException {
long abstime = deadline.getTime();
if (Thread.interrupted())
throw new InterruptedException();
Node node = addConditionWaiter();
long savedState = fullyRelease(node);
boolean timedout = false;
int interruptMode = 0;
while (!isOnSyncQueue(node)) {
if (System.currentTimeMillis() > abstime) {
timedout = transferAfterCancelledWait(node);
break;
}
LockSupport.parkUntil(this, abstime);
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
if (node.nextWaiter != null)
unlinkCancelledWaiters();
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
return !timedout;
}
// 等待,当前线程在接到信号、被中断或到达指定等待时间之前一直处于等待状态
public final boolean await(long time, TimeUnit unit)
throws InterruptedException {
long nanosTimeout = unit.toNanos(time);
if (Thread.interrupted())
throw new InterruptedException();
Node node = addConditionWaiter();
long savedState = fullyRelease(node);
final long deadline = System.nanoTime() + nanosTimeout;
boolean timedout = false;
int interruptMode = 0;
while (!isOnSyncQueue(node)) {
if (nanosTimeout <= 0L) {
timedout = transferAfterCancelledWait(node);
break;
}
if (nanosTimeout >= spinForTimeoutThreshold)
LockSupport.parkNanos(this, nanosTimeout);
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
nanosTimeout = deadline - System.nanoTime();
}
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
if (node.nextWaiter != null)
unlinkCancelledWaiters();
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
return !timedout;
}
// support for instrumentation
final boolean isOwnedBy(AbstractQueuedLongSynchronizer sync) {
return sync == AbstractQueuedLongSynchronizer.this;
}
// 查询是否有正在等待此条件的任何线程
protected final boolean hasWaiters() {
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
for (Node w = firstWaiter; w != null; w = w.nextWaiter) {
if (w.waitStatus == Node.CONDITION)
return true;
}
return false;
}
// 返回正在等待此条件的线程数估计值
protected final int getWaitQueueLength() {
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
int n = 0;
for (Node w = firstWaiter; w != null; w = w.nextWaiter) {
if (w.waitStatus == Node.CONDITION)
++n;
}
return n;
}
// 返回包含那些可能正在等待此条件的线程集合
protected final Collection<Thread> getWaitingThreads() {
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
ArrayList<Thread> list = new ArrayList<Thread>();
for (Node w = firstWaiter; w != null; w = w.nextWaiter) {
if (w.waitStatus == Node.CONDITION) {
Thread t = w.thread;
if (t != null)
list.add(t);
}
}
return list;
}
}
子类实现了Condition接口,Condition接口定义了条件操作规范,具体如下
public interface Condition {
// 等待,当前线程在接到信号或被中断之前一直处于等待状态
void await() throws InterruptedException;
// 等待,当前线程在接到信号之前一直处于等待状态,不响应中断
void awaitUninterruptibly();
//等待,当前线程在接到信号、被中断或到达指定等待时间之前一直处于等待状态
long awaitNanos(long nanosTimeout) throws InterruptedException;
// 等待,当前线程在接到信号、被中断或到达指定等待时间之前一直处于等待状态。
boolean await(long time, TimeUnit unit) throws InterruptedException;
// 等待,当前线程在接到信号、被中断或到达指定最后期限之前一直处于等待状态
boolean awaitUntil(Date deadline) throws InterruptedException;
// 唤醒一个等待线程。如果所有的线程都在等待此条件,则选择其中的一个唤醒。在从 await 返回之前,该线程必须重新获取锁。
void signal();
// 唤醒所有等待线程。如果所有的线程都在等待此条件,则唤醒所有线程。在从 await 返回之前,每个线程都必须重新获取锁。
void signalAll();
}
Condition接口中定义了await、signal方法,用来等待条件、释放条件。之后会详细分析CondtionObject的源码。
类的属性
属性中包含了头节点head,尾结点tail,状态state、自旋时间spinForTimeoutThreshold,还有AbstractQueuedSynchronizer抽象的属性在内存中的偏移地址,通过该偏移地址,可以获取和设置该属性的值,同时还包括一个静态初始化块,用于加载内存偏移地址。
public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer
implements java.io.Serializable {
// 版本号
private static final long serialVersionUID = 7373984972572414691L;
// 头节点
private transient volatile Node head;
// 尾结点
private transient volatile Node tail;
// 状态
private volatile int state;
// 自旋时间
static final long spinForTimeoutThreshold = 1000L;
// Unsafe类实例
private static final Unsafe unsafe = Unsafe.getUnsafe();
// state内存偏移地址
private static final long stateOffset;
// head内存偏移地址
private static final long headOffset;
// state内存偏移地址
private static final long tailOffset;
// tail内存偏移地址
private static final long waitStatusOffset;
// next内存偏移地址
private static final long nextOffset;
// 静态初始化块
static {
try {
stateOffset = unsafe.objectFieldOffset
(AbstractQueuedSynchronizer.class.getDeclaredField("state"));
headOffset = unsafe.objectFieldOffset
(AbstractQueuedSynchronizer.class.getDeclaredField("head"));
tailOffset = unsafe.objectFieldOffset
(AbstractQueuedSynchronizer.class.getDeclaredField("tail"));
waitStatusOffset = unsafe.objectFieldOffset
(Node.class.getDeclaredField("waitStatus"));
nextOffset = unsafe.objectFieldOffset
(Node.class.getDeclaredField("next"));
} catch (Exception ex) { throw new Error(ex); }
}
}
类的核心方法 - acquire方法
该方法以独占模式获取(资源),忽略中断,即线程在aquire过程中,中断此线程是无效的。源码如下:
public final void acquire(long arg) {
if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
由上述源码可以知道,当一个线程调用acquire时,调用方法流程如下
首先调用tryAcquire方法,调用此方法的线程会试图在独占模式下获取对象状态。需要子类实现。
protected boolean tryAcquire(long arg) {
throw new UnsupportedOperationException();
}
如果tryAcquire失败,则调用addWaiter方法,addWaiter方法将调用此方法的线程封装成一个结点,并放入sync queue。
// 添加等待者
private Node addWaiter(Node mode) {
// 生成一个结点 默认为独占模式
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
Node pred = tail;
if (pred != null) {
// node结点的prev连接到尾结点
node.prev = pred;
// cas设置尾结点为node
if (compareAndSetTail(pred, node)) {
//原尾结点的next=node
pred.next = node;
return node;
}
}
// 尾结点为空(没有初始化),或者cas失败,则入队
enq(node);
return node;
}
acquireQueued方法,该方法是sync queue中的结点不断尝试获取资源,成功返回true,失败返回false。
final boolean acquireQueued(final Node node, long arg) {
boolean failed = true;
try {
// 中断标识
boolean interrupted = false;
for (;;) {
//获取当前node的前驱节点
final Node p = node.predecessor();
//前驱为头节点并且成功获取锁
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
// 前驱节点不为头结点或者加锁失败,将head节点的waitStatus变成SIGNAL=-1
if (shouldParkAfterFailedAcquire(p, node) &&
// 执行方法挂起线程
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
shouldParkAfterFailedAcquire方法将node的前驱节点的waitStatus变成-1。只有当前当前节点的前驱结点状态为SIGNAL才能对该节点进行park。
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus;
if (ws == Node.SIGNAL)
// 可以park
return true;
if (ws > 0) {
// 前一个线程canceled 循环找到前一个不为cacneled的结点
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
// CAS设置前驱结点的状态为SIGNAL
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
真正挂起当前线程
private final boolean parkAndCheckInterrupt() {
LockSupport.park(this);
return Thread.interrupted();
}
类的核心方法 - release方法
public final boolean release(long arg) {
if (tryRelease(arg)) {
Node h = head;
// 头结点不为空 并且头结点状态不为0
if (h != null && h.waitStatus != 0)
// 释放头结点的后继节点
unparkSuccessor(h);
return true;
}
return false;
}
tryRelease方法默认实现是抛出异常。
protected boolean tryRelease(long arg) {
throw new UnsupportedOperationException();
}
unparkSuccessor()
方法的作用将head节点的waitStatus设置为0,并且唤醒head
的后置节点。
private void unparkSuccessor(Node node) {
int ws = node.waitStatus;
if (ws < 0)
// 比较并且设置结点的等待状态 设置为0
compareAndSetWaitStatus(node, ws, 0);
Node s = node.next;
if (s == null || s.waitStatus > 0) {
s = null;
// 从尾结点开始从后往前遍历
for (Node t = tail; t != null && t != node; t = t.prev)
// 找到等待状态小于等于0的结点,找到最前的状态小于等于0的结点
if (t.waitStatus <= 0)
s = t;
}
if (s != null)
LockSupport.unpark(s.thread);
}
当后续线程被唤醒以后,会继续执行acquireQueued里面的for循环,流程继续!!!
公平锁
前面我们说了tryAcquire()方法是子类自定义实现的方法,那么公平锁是如何实现这个这个方法达到公平的呢?
java.util.concurrent.locks.ReentrantLock.FairSync#tryAcquire。
先判断state的值,如果不为0且获取锁的线程不是当前线程,至直接返回false。如果state=0,代表此时没有线程持有锁,那也不能直接去获取锁,需要先判断队列是否为空或者当前线程是队列头节点的next(头结点是虚拟头结点)才能去获取锁,不是乖乖去排队了。
protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
// 锁状态标识
int c = getState();
if (c == 0) {
// 队列中没有线程或者当前线程为队头线程 则尝试获取锁
if (!hasQueuedPredecessors() &&
compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
// 获取锁的线程 和当前线程比较
else if (current == getExclusiveOwnerThread()) {
// 可重入锁执行原理
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
}
稍稍解释一下这个,公平锁关键方法--判断是否已有排队的前置任务。
返回false
代表队列中没有节点或者仅有一个节点是当前线程创建的节点。返回true
则代表队列中存在等待节点,当前线程需要入队等待。
public final boolean hasQueuedPredecessors() {
Node t = tail; // Read fields in reverse initialization order
Node h = head;
Node s;
// 头结点尾结点相等,队列中没有节点 返回false
return h != t &&
// 头结点下一个结点为空 -- 队列刚刚创建头结点
// 头结点的下一个节点是当前线程
((s = h.next) == null || s.thread != Thread.currentThread());
}
可重入锁
如上面的代码:获取锁的线程和当前线程比较,如果是当前线程获取锁,将state+acquires。
Condition 实现原理
Condition 简介
上面已经介绍了AQS
所提供的核心功能,当然它还有很多其他的特性,这里我们来继续说下Condition
这个组件。
Condition是在java 1.5中才出现的,它用来替代传统的Object的wait()、notify()实现线程间的协作,相比使用Object的wait()、notify(),使用Condition中的await()、signal()这种方式实现线程间协作更加安全和高效。
其中AbstractQueueSynchronizer
中实现了Condition
中的方法,主要对外提供awaite(Object.wait())
挂起当前线程并释放锁和signal(Object.notify())
唤醒挂起线程。
java.util.concurrent.locks.AbstractQueuedSynchronizer.ConditionObject#await()
public final void await() throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
// 当前线程添加到condition队列中
Node node = addConditionWaiter();
// 释放锁 让其他线程争抢锁
int savedState = fullyRelease(node);
int interruptMode = 0;
// 判断当前线程是不是在同步队列中-- 判断waitStatus状态
// 上一步已经释放了锁,此时有可能已经有线程已经获取锁,并且已经调用了singal()方法
// 如果执行了singal()方法,不park线程,退出whlie方法
while (!isOnSyncQueue(node)) {
LockSupport.park(this);
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
// 退出park后,抢夺锁
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
if (node.nextWaiter != null) // clean up if cancelled
unlinkCancelledWaiters();
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
}
java.util.concurrent.locks.AbstractQueuedSynchronizer.ConditionObject#signal
// 假设线程1park 进入condition队列,线程二执行singal方法
public final void signal() {
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
Node first = firstWaiter;
// 释放头结点
if (first != null)
doSignal(first);
}
doSignal(first) 真正唤醒线程
private void doSignal(Node first) {
do {
if ( (firstWaiter = first.nextWaiter) == null)
lastWaiter = null;
first.nextWaiter = null;
// 头结点取出,重置头结点位置
} while (!transferForSignal(first) &&
(first = firstWaiter) != null);
}
final boolean transferForSignal(Node node) {
/*
* If cannot change waitStatus, the node has been cancelled.
* 通过cas修改当前节点的waitStatus=0
*/
if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
return false;
// 入队 返回当前节点的前置节点
Node p = enq(node);
int ws = p.waitStatus;
// cas修改同步队列节点装填为SIGNAL
if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
// 唤醒线程
LockSupport.unpark(node.thread);
return true;
}
至此 被唤醒的线程就可以接续await方法里面的while循环。
转载自:https://juejin.cn/post/7355075401612279818