likes
comments
collection
share

【AQS】我们不妨把话讲的再明白一点

作者站长头像
站长
· 阅读数 6

前言

如果被请求的共享资源被多个线程争抢,那么就需要一套线程阻塞等待以及被唤醒时锁分配的机制,AQS就实现了这个机制。

AQS(AbstractQueuedSynchronizer)即是抽象的队列式的同步器,内部定义了很多锁相关的方法,我们熟知的ReentrantLockReentrantReadWriteLockCountDownLatchSemaphore等都是基于AQS来实现的。

【AQS】我们不妨把话讲的再明白一点

AQS的实现原理

AQS使用一个volatile int成员变量来表示同步状态,通过内置的FIFO(CLH)队列来完成获取资源的排队工作。AQS使用CAS对该同步状态进行原子操作实现对其值的修改。

CLH(Craig,Landin,and Hagersten)队列是一个虚拟的双向队列(虚拟的双向队列即不存在队列实例,仅存在结点之间的关联关系)。AQS是将每条请求共享资源的线程封装成一个CLH锁队列的一个结点(Node)来实现锁的分配。

AQS的数据结构

【AQS】我们不妨把话讲的再明白一点

AQS对资源的共享方式

  • Exclusive(独占):只有一个线程能执行,如ReentrantLock。
  • Share(共享):多个线程可同时执行,如CountDownLatch。

独占锁(Exclusive)又可分为公平锁和非公平锁。

  • 公平锁:按照排队顺序获取锁。
  • 非公平锁:要获取锁时,直接去抢锁,抢不到再乖乖排队。

AQS的模板方法

使用者继承AbstractQueuedSynchronizer并重写指定方法,即可自定义同步器。 AQS使用了模板方法模式,自定义同步器时需要重写下面几个AQS提供的模板方法:

isHeldExclusively()//该线程是否正在独占资源。只有用到condition才需要去实现它。
tryAcquire(int)//独占方式。尝试获取资源,成功则返回true,失败则返回false。
tryRelease(int)//独占方式。尝试释放资源,成功则返回true,失败则返回false。
tryAcquireShared(int)//共享方式。尝试获取资源。负数表示失败;0表示成功,但没有剩余可用资源;正数表示成功,且有剩余资源。
tryReleaseShared(int)//共享方式。尝试释放资源,成功则返回true,失败则返回false。

默认情况下,每个方法都抛出 UnsupportedOperationException。 这些方法的实现必须是内部线程安全的,并且通常应该中断而不是阻塞。AQS类中的其他方法都是final ,所以无法被其他类使用,只有这几个方法可以被其他类使用。

源码分析

类的继承关系

AbstractQueuedSynchronizer继承自AbstractOwnableSynchronizer抽象类,并且实现了Serializable接口,可以进行序列化。

AbstractOwnableSynchronizer抽象类仅有一个成员变量,表示独占线程,以及独占线程的get和set方法。

public abstract class AbstractOwnableSynchronizer
    implements java.io.Serializable {

    private static final long serialVersionUID = 3737899427754241961L;

    protected AbstractOwnableSynchronizer() { }

    /**
     * 独占模式下的线程
     */
    private transient Thread exclusiveOwnerThread;

    protected final void setExclusiveOwnerThread(Thread thread) {
        exclusiveOwnerThread = thread;
    }

    protected final Thread getExclusiveOwnerThread() {
        return exclusiveOwnerThread;
    }
}

AbstractQueuedSynchronizer类有两个内部类,分别为Node类与ConditionObject类。

类的内部类-Node类

static final class Node {
    /** 用于指示节点正在共享模式下等待的标记 */
    static final Node SHARED = new Node();
    /** 用于指示节点正在独占模式下等待的标记 */
    static final Node EXCLUSIVE = null;

    //节点状态枚举值
    /** 表示线程已取消 */
    static final int CANCELLED =  1;
    /** 表示当前节点的后继节点包含的线程需要运行,也就是unpark */
    static final int SIGNAL    = -1;
    /** waiting on condition */
    static final int CONDITION = -2;
    /** 当前场景下后续的acquireShared能够得以执行 */
    static final int PROPAGATE = -3;

    // 结点状态,如果值为0,表示当前节点在sync队列中,等待获取锁
    volatile int waitStatus;

    // 前驱节点
    volatile Node prev;

    // 后继节点
    volatile Node next;

    // 节点内的线程
    volatile Thread thread;

    // 下一个等待者
    Node nextWaiter;

    // 结点是否在共享模式下等待
    final boolean isShared() {
        return nextWaiter == SHARED;
    }

    // 获取前驱节点,如果为空则抛出异常
    final Node predecessor() throws NullPointerException {
        Node p = prev;
        if (p == null)
            throw new NullPointerException();
        else
            return p;
    }

    Node() {}

    Node(Thread thread, Node mode) {     // Used by addWaiter
        this.nextWaiter = mode;
        this.thread = thread;
    }

    Node(Thread thread, int waitStatus) { // Used by Condition
        this.waitStatus = waitStatus;
        this.thread = thread;
    }
}

类的内部类 - ConditionObject类(可以先跳过不看)

public class ConditionObject implements Condition, java.io.Serializable {
    private static final long serialVersionUID = 1173984872572414699L;
    
    /** condition队列头结点*/
    private transient Node firstWaiter;
    /** condition队列尾结点*/
    private transient Node lastWaiter;

    public ConditionObject() { }

    // Internal methods

    // 添加新的waiter到wait队列
    private Node addConditionWaiter() {
        Node t = lastWaiter;
        // If lastWaiter is cancelled, clean out.
        // 尾结点不为空,并且尾结点状态不为CONDITION
        if (t != null && t.waitStatus != Node.CONDITION) {
            // 清楚状态为CONDITION的结点
            unlinkCancelledWaiters();
            // 最后一个结点赋值给t
            t = lastWaiter;
        }
        // 新建一个结点
        Node node = new Node(Thread.currentThread(), Node.CONDITION);
        if (t == null)
            // 尾结点为空设置CONDITION队列的头结点
            firstWaiter = node;
        else
            // 尾结点不为空,将结点放到尾结点后面
            t.nextWaiter = node;
        // 更新尾结点
        lastWaiter = node;
        return node;
    }


    private void doSignal(Node first) {
        do {
            // 该结点的nextWaiter为空
            if ( (firstWaiter = first.nextWaiter) == null)
                // 设置尾结点为空
                lastWaiter = null;
            // 设置first结点的nextWaiter
            first.nextWaiter = null;
        // 将结点从CONDITION队列转移到sync队列失败,并且condition队列中的头结点不为空,一直循环
        } while (!transferForSignal(first) &&
                 (first = firstWaiter) != null);
    }


    private void doSignalAll(Node first) {
        // condition队列的头结点尾结点都设置为空
        lastWaiter = firstWaiter = null;
        do {
            //一直循环 将condition队列中的每个节点依次转移到sync队列
            Node next = first.nextWaiter;
            first.nextWaiter = null;
            transferForSignal(first);
            first = next;
        } while (first != null);
    }

    // 从condition队列中清除状态为cancel的结点
    private void unlinkCancelledWaiters() {
        Node t = firstWaiter;
        Node trail = null;
        while (t != null) {
            Node next = t.nextWaiter;
            if (t.waitStatus != Node.CONDITION) {
                t.nextWaiter = null;
                if (trail == null)
                    firstWaiter = next;
                else
                    trail.nextWaiter = next;
                if (next == null)
                    lastWaiter = trail;
            }
            else
                trail = t;
            t = next;
        }
    }

    // public methods

    // 唤醒一个等待线程,如果所有的线程都在等待次条件,则选择其中的一个唤醒。在从await返回之前,改线程必须重新获取锁
    public final void signal() {
        // 不被当前线程独占,抛出异常
        if (!isHeldExclusively())
            throw new IllegalMonitorStateException();
        // condition队列头结点
        Node first = firstWaiter;
        // 头节点不为空,唤醒一个等待线程
        if (first != null)
            doSignal(first);
    }

    // 唤醒所有等待线程。
    public final void signalAll() {
        // 不被当前线程独占,抛出异常
        if (!isHeldExclusively())
            throw new IllegalMonitorStateException();
        Node first = firstWaiter;
        // 头结点不为空,唤醒所有线程
        if (first != null)
            doSignalAll(first);
    }

    // 等待,当前线程在接到信号之前,一直处于等待状态,不响应中断
    public final void awaitUninterruptibly() {
        Node node = addConditionWaiter();
        long savedState = fullyRelease(node);
        boolean interrupted = false;
        while (!isOnSyncQueue(node)) {
            LockSupport.park(this);
            if (Thread.interrupted())
                interrupted = true;
        }
        if (acquireQueued(node, savedState) || interrupted)
            selfInterrupt();
    }

    
    /** Mode meaning to reinterrupt on exit from wait */
    private static final int REINTERRUPT =  1;
    /** Mode meaning to throw InterruptedException on exit from wait */
    private static final int THROW_IE    = -1;

    
    private int checkInterruptWhileWaiting(Node node) {
        return Thread.interrupted() ?
            (transferAfterCancelledWait(node) ? THROW_IE : REINTERRUPT) :
            0;
    }

    
    private void reportInterruptAfterWait(int interruptMode)
        throws InterruptedException {
        if (interruptMode == THROW_IE)
            throw new InterruptedException();
        else if (interruptMode == REINTERRUPT)
            selfInterrupt();
    }

    // 等待 当前线程在接到信号或被中断之前一直处于等待状态
    public final void await() throws InterruptedException {
        // 当前线程被中断,抛出异常
        if (Thread.interrupted())
            throw new InterruptedException();
        // 在wait队列上添加一个结点
        Node node = addConditionWaiter();
        long savedState = fullyRelease(node);
        int interruptMode = 0;
        while (!isOnSyncQueue(node)) {
            // 阻塞当前线程
            LockSupport.park(this);
            if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
                break;
        }
        if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
            interruptMode = REINTERRUPT;
        if (node.nextWaiter != null) // clean up if cancelled
            unlinkCancelledWaiters();
        if (interruptMode != 0)
            reportInterruptAfterWait(interruptMode);
    }

    // 等待,当前线程在接到信号、被中断或达到指定等待时间之前一直处于等待状态
    public final long awaitNanos(long nanosTimeout)
            throws InterruptedException {
        if (Thread.interrupted())
            throw new InterruptedException();
        Node node = addConditionWaiter();
        long savedState = fullyRelease(node);
        final long deadline = System.nanoTime() + nanosTimeout;
        int interruptMode = 0;
        while (!isOnSyncQueue(node)) {
            if (nanosTimeout <= 0L) {
                transferAfterCancelledWait(node);
                break;
            }
            if (nanosTimeout >= spinForTimeoutThreshold)
                LockSupport.parkNanos(this, nanosTimeout);
            if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
                break;
            nanosTimeout = deadline - System.nanoTime();
        }
        if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
            interruptMode = REINTERRUPT;
        if (node.nextWaiter != null)
            unlinkCancelledWaiters();
        if (interruptMode != 0)
            reportInterruptAfterWait(interruptMode);
        return deadline - System.nanoTime();
    }

    // 等待,当前线程在接到信号、被中断或到达指定最后期限之前一直处于等待状态
    public final boolean awaitUntil(Date deadline)
            throws InterruptedException {
        long abstime = deadline.getTime();
        if (Thread.interrupted())
            throw new InterruptedException();
        Node node = addConditionWaiter();
        long savedState = fullyRelease(node);
        boolean timedout = false;
        int interruptMode = 0;
        while (!isOnSyncQueue(node)) {
            if (System.currentTimeMillis() > abstime) {
                timedout = transferAfterCancelledWait(node);
                break;
            }
            LockSupport.parkUntil(this, abstime);
            if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
                break;
        }
        if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
            interruptMode = REINTERRUPT;
        if (node.nextWaiter != null)
            unlinkCancelledWaiters();
        if (interruptMode != 0)
            reportInterruptAfterWait(interruptMode);
        return !timedout;
    }

    // 等待,当前线程在接到信号、被中断或到达指定等待时间之前一直处于等待状态
    public final boolean await(long time, TimeUnit unit)
            throws InterruptedException {
        long nanosTimeout = unit.toNanos(time);
        if (Thread.interrupted())
            throw new InterruptedException();
        Node node = addConditionWaiter();
        long savedState = fullyRelease(node);
        final long deadline = System.nanoTime() + nanosTimeout;
        boolean timedout = false;
        int interruptMode = 0;
        while (!isOnSyncQueue(node)) {
            if (nanosTimeout <= 0L) {
                timedout = transferAfterCancelledWait(node);
                break;
            }
            if (nanosTimeout >= spinForTimeoutThreshold)
                LockSupport.parkNanos(this, nanosTimeout);
            if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
                break;
            nanosTimeout = deadline - System.nanoTime();
        }
        if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
            interruptMode = REINTERRUPT;
        if (node.nextWaiter != null)
            unlinkCancelledWaiters();
        if (interruptMode != 0)
            reportInterruptAfterWait(interruptMode);
        return !timedout;
    }

    //  support for instrumentation

    final boolean isOwnedBy(AbstractQueuedLongSynchronizer sync) {
        return sync == AbstractQueuedLongSynchronizer.this;
    }

    //  查询是否有正在等待此条件的任何线程
    protected final boolean hasWaiters() {
        if (!isHeldExclusively())
            throw new IllegalMonitorStateException();
        for (Node w = firstWaiter; w != null; w = w.nextWaiter) {
            if (w.waitStatus == Node.CONDITION)
                return true;
        }
        return false;
    }

    // 返回正在等待此条件的线程数估计值
    protected final int getWaitQueueLength() {
        if (!isHeldExclusively())
            throw new IllegalMonitorStateException();
        int n = 0;
        for (Node w = firstWaiter; w != null; w = w.nextWaiter) {
            if (w.waitStatus == Node.CONDITION)
                ++n;
        }
        return n;
    }

    // 返回包含那些可能正在等待此条件的线程集合
    protected final Collection<Thread> getWaitingThreads() {
        if (!isHeldExclusively())
            throw new IllegalMonitorStateException();
        ArrayList<Thread> list = new ArrayList<Thread>();
        for (Node w = firstWaiter; w != null; w = w.nextWaiter) {
            if (w.waitStatus == Node.CONDITION) {
                Thread t = w.thread;
                if (t != null)
                    list.add(t);
            }
        }
        return list;
    }
}

子类实现了Condition接口,Condition接口定义了条件操作规范,具体如下

public interface Condition {

    // 等待,当前线程在接到信号或被中断之前一直处于等待状态
    void await() throws InterruptedException;
    
    // 等待,当前线程在接到信号之前一直处于等待状态,不响应中断
    void awaitUninterruptibly();
    
    //等待,当前线程在接到信号、被中断或到达指定等待时间之前一直处于等待状态 
    long awaitNanos(long nanosTimeout) throws InterruptedException;
    
    // 等待,当前线程在接到信号、被中断或到达指定等待时间之前一直处于等待状态。
    boolean await(long time, TimeUnit unit) throws InterruptedException;
    
    // 等待,当前线程在接到信号、被中断或到达指定最后期限之前一直处于等待状态
    boolean awaitUntil(Date deadline) throws InterruptedException;
    
    // 唤醒一个等待线程。如果所有的线程都在等待此条件,则选择其中的一个唤醒。在从 await 返回之前,该线程必须重新获取锁。
    void signal();
    
    // 唤醒所有等待线程。如果所有的线程都在等待此条件,则唤醒所有线程。在从 await 返回之前,每个线程都必须重新获取锁。
    void signalAll();
}

Condition接口中定义了await、signal方法,用来等待条件、释放条件。之后会详细分析CondtionObject的源码。

类的属性

属性中包含了头节点head,尾结点tail,状态state、自旋时间spinForTimeoutThreshold,还有AbstractQueuedSynchronizer抽象的属性在内存中的偏移地址,通过该偏移地址,可以获取和设置该属性的值,同时还包括一个静态初始化块,用于加载内存偏移地址。

public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer
    implements java.io.Serializable {    
    // 版本号
    private static final long serialVersionUID = 7373984972572414691L;    
    // 头节点
    private transient volatile Node head;    
    // 尾结点
    private transient volatile Node tail;    
    // 状态
    private volatile int state;    
    // 自旋时间
    static final long spinForTimeoutThreshold = 1000L;
    
    // Unsafe类实例
    private static final Unsafe unsafe = Unsafe.getUnsafe();
    // state内存偏移地址
    private static final long stateOffset;
    // head内存偏移地址
    private static final long headOffset;
    // state内存偏移地址
    private static final long tailOffset;
    // tail内存偏移地址
    private static final long waitStatusOffset;
    // next内存偏移地址
    private static final long nextOffset;
    // 静态初始化块
    static {
        try {
            stateOffset = unsafe.objectFieldOffset
                (AbstractQueuedSynchronizer.class.getDeclaredField("state"));
            headOffset = unsafe.objectFieldOffset
                (AbstractQueuedSynchronizer.class.getDeclaredField("head"));
            tailOffset = unsafe.objectFieldOffset
                (AbstractQueuedSynchronizer.class.getDeclaredField("tail"));
            waitStatusOffset = unsafe.objectFieldOffset
                (Node.class.getDeclaredField("waitStatus"));
            nextOffset = unsafe.objectFieldOffset
                (Node.class.getDeclaredField("next"));

        } catch (Exception ex) { throw new Error(ex); }
    }
}

类的核心方法 - acquire方法

该方法以独占模式获取(资源),忽略中断,即线程在aquire过程中,中断此线程是无效的。源码如下:

public final void acquire(long arg) {
    if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        selfInterrupt();
}

由上述源码可以知道,当一个线程调用acquire时,调用方法流程如下

首先调用tryAcquire方法,调用此方法的线程会试图在独占模式下获取对象状态。需要子类实现。

protected boolean tryAcquire(long arg) {
    throw new UnsupportedOperationException();
}

如果tryAcquire失败,则调用addWaiter方法,addWaiter方法将调用此方法的线程封装成一个结点,并放入sync queue。

// 添加等待者
private Node addWaiter(Node mode) {
    // 生成一个结点 默认为独占模式
    Node node = new Node(Thread.currentThread(), mode);
    // Try the fast path of enq; backup to full enq on failure
    Node pred = tail;
    if (pred != null) {
        // node结点的prev连接到尾结点
        node.prev = pred;
        // cas设置尾结点为node
        if (compareAndSetTail(pred, node)) {
            //原尾结点的next=node
            pred.next = node;
            return node;
        }
    }
    // 尾结点为空(没有初始化),或者cas失败,则入队
    enq(node);
    return node;
}

acquireQueued方法,该方法是sync queue中的结点不断尝试获取资源,成功返回true,失败返回false。

final boolean acquireQueued(final Node node, long arg) {
    boolean failed = true;
    try {
        // 中断标识
        boolean interrupted = false;
        for (;;) {
            //获取当前node的前驱节点
            final Node p = node.predecessor();
            //前驱为头节点并且成功获取锁
            if (p == head && tryAcquire(arg)) {
                setHead(node);
                p.next = null; // help GC
                failed = false;
                return interrupted;
            }
            // 前驱节点不为头结点或者加锁失败,将head节点的waitStatus变成SIGNAL=-1
            if (shouldParkAfterFailedAcquire(p, node) &&
                // 执行方法挂起线程
                parkAndCheckInterrupt())
                interrupted = true;
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}

shouldParkAfterFailedAcquire方法将node的前驱节点的waitStatus变成-1。只有当前当前节点的前驱结点状态为SIGNAL才能对该节点进行park。

private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
    int ws = pred.waitStatus;
    if (ws == Node.SIGNAL)
        // 可以park
        return true;
    if (ws > 0) {
        // 前一个线程canceled 循环找到前一个不为cacneled的结点
        do {
            node.prev = pred = pred.prev;
        } while (pred.waitStatus > 0);
        pred.next = node;
    } else {
        // CAS设置前驱结点的状态为SIGNAL
        compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
    }
    return false;
}

真正挂起当前线程

private final boolean parkAndCheckInterrupt() {
    LockSupport.park(this);
    return Thread.interrupted();
}

类的核心方法 - release方法

public final boolean release(long arg) {
    if (tryRelease(arg)) {
        Node h = head;
        // 头结点不为空 并且头结点状态不为0
        if (h != null && h.waitStatus != 0)
            // 释放头结点的后继节点
            unparkSuccessor(h);
        return true;
    }
    return false;
}

tryRelease方法默认实现是抛出异常。

protected boolean tryRelease(long arg) {
    throw new UnsupportedOperationException();
}

unparkSuccessor()方法的作用将head节点的waitStatus设置为0,并且唤醒head的后置节点。

private void unparkSuccessor(Node node) {
    
    int ws = node.waitStatus;
    if (ws < 0)
        // 比较并且设置结点的等待状态 设置为0
        compareAndSetWaitStatus(node, ws, 0);

    Node s = node.next;
    if (s == null || s.waitStatus > 0) {
        s = null;
        // 从尾结点开始从后往前遍历
        for (Node t = tail; t != null && t != node; t = t.prev)
             // 找到等待状态小于等于0的结点,找到最前的状态小于等于0的结点
            if (t.waitStatus <= 0)
                s = t;
    }
    if (s != null)
        LockSupport.unpark(s.thread);
}

当后续线程被唤醒以后,会继续执行acquireQueued里面的for循环,流程继续!!!

公平锁

前面我们说了tryAcquire()方法是子类自定义实现的方法,那么公平锁是如何实现这个这个方法达到公平的呢?

java.util.concurrent.locks.ReentrantLock.FairSync#tryAcquire。

先判断state的值,如果不为0且获取锁的线程不是当前线程,至直接返回false。如果state=0,代表此时没有线程持有锁,那也不能直接去获取锁,需要先判断队列是否为空或者当前线程是队列头节点的next(头结点是虚拟头结点)才能去获取锁,不是乖乖去排队了。

    protected final boolean tryAcquire(int acquires) {
        final Thread current = Thread.currentThread();
        // 锁状态标识
        int c = getState();
        if (c == 0) {
            // 队列中没有线程或者当前线程为队头线程 则尝试获取锁
            if (!hasQueuedPredecessors() &&
                compareAndSetState(0, acquires)) {
                setExclusiveOwnerThread(current);
                return true;
            }
        }
        // 获取锁的线程 和当前线程比较
        else if (current == getExclusiveOwnerThread()) {
            // 可重入锁执行原理
            int nextc = c + acquires;
            if (nextc < 0)
                throw new Error("Maximum lock count exceeded");
            setState(nextc);
            return true;
        }
        return false;
    }
}

稍稍解释一下这个,公平锁关键方法--判断是否已有排队的前置任务。

返回false代表队列中没有节点或者仅有一个节点是当前线程创建的节点。返回true则代表队列中存在等待节点,当前线程需要入队等待。

public final boolean hasQueuedPredecessors() {
    Node t = tail; // Read fields in reverse initialization order
    Node h = head;
    Node s;
    // 头结点尾结点相等,队列中没有节点 返回false
    return h != t &&
        // 头结点下一个结点为空 -- 队列刚刚创建头结点
        // 头结点的下一个节点是当前线程
        ((s = h.next) == null || s.thread != Thread.currentThread());
}

可重入锁

如上面的代码:获取锁的线程和当前线程比较,如果是当前线程获取锁,将state+acquires。

Condition 实现原理

Condition 简介

上面已经介绍了AQS所提供的核心功能,当然它还有很多其他的特性,这里我们来继续说下Condition这个组件。

Condition是在java 1.5中才出现的,它用来替代传统的Object的wait()、notify()实现线程间的协作,相比使用Object的wait()、notify(),使用Condition中的await()、signal()这种方式实现线程间协作更加安全和高效。

其中AbstractQueueSynchronizer中实现了Condition中的方法,主要对外提供awaite(Object.wait())挂起当前线程并释放锁signal(Object.notify())唤醒挂起线程。

【AQS】我们不妨把话讲的再明白一点

java.util.concurrent.locks.AbstractQueuedSynchronizer.ConditionObject#await()

public final void await() throws InterruptedException {
    if (Thread.interrupted())
        throw new InterruptedException();
    // 当前线程添加到condition队列中
    Node node = addConditionWaiter();
    // 释放锁 让其他线程争抢锁
    int savedState = fullyRelease(node);
    int interruptMode = 0;
    // 判断当前线程是不是在同步队列中-- 判断waitStatus状态
    // 上一步已经释放了锁,此时有可能已经有线程已经获取锁,并且已经调用了singal()方法
    // 如果执行了singal()方法,不park线程,退出whlie方法
    while (!isOnSyncQueue(node)) {
        LockSupport.park(this);
        if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
            break;
    }
    // 退出park后,抢夺锁
    if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
        interruptMode = REINTERRUPT;
    if (node.nextWaiter != null) // clean up if cancelled
        unlinkCancelledWaiters();
    if (interruptMode != 0)
        reportInterruptAfterWait(interruptMode);
}

java.util.concurrent.locks.AbstractQueuedSynchronizer.ConditionObject#signal

// 假设线程1park 进入condition队列,线程二执行singal方法
public final void signal() {
    if (!isHeldExclusively())
        throw new IllegalMonitorStateException();
    Node first = firstWaiter;
    // 释放头结点
    if (first != null)
        doSignal(first);
}

doSignal(first) 真正唤醒线程

private void doSignal(Node first) {
    do {
        if ( (firstWaiter = first.nextWaiter) == null)
            lastWaiter = null;
        first.nextWaiter = null;
        // 头结点取出,重置头结点位置
    } while (!transferForSignal(first) &&
             (first = firstWaiter) != null);
}
final boolean transferForSignal(Node node) {
    /*
     * If cannot change waitStatus, the node has been cancelled.
     * 通过cas修改当前节点的waitStatus=0
     */
    if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
        return false;

    // 入队 返回当前节点的前置节点
    Node p = enq(node);
    int ws = p.waitStatus;
    // cas修改同步队列节点装填为SIGNAL
    if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
        // 唤醒线程
        LockSupport.unpark(node.thread);
    return true;
}

至此 被唤醒的线程就可以接续await方法里面的while循环。