likes
comments
collection
share

从 ArrayBlockingQueue 探究 AQS 细节 — 条件队列篇

作者站长头像
站长
· 阅读数 17

开篇语

条件队列是 AQS 中最容易被忽视的一个细节。大部分时候,我们都用不上条件队列,但是这并不说明条件队列就没有用处了,它反而是我们学习生产者-消费者模式的最佳教材。

什么是条件队列

条件队列是指一个阻塞队列,其中的元素是等待某个条件成立的线程。在 Java 的同步机制中,条件队列通常用于实现线程等待和唤醒的操作。当条件不满足时,线程会被加入到条件队列中等待;当条件满足时,线程会被从条件队列中移除并继续执行。

AQS 中的条件队列

AbstractQueuedSynchronizer 中的条件队列是一个单向链表,每个节点代表一个等待线程。条件队列的头节点是一个特殊的节点,表示等待队列的头部。当条件不满足时,线程会被加入到条件队列的尾部等待;当条件满足时,线程会从条件队列中移除并加入到同步队列中等待获取锁。

AbstractQueuedSynchronizer 中的条件队列是通过内部维护的等待队列和同步队列实现的。当线程调用 await() 方法时,它会被加入到等待队列中等待条件满足。

从 ArrayBlockingQueue 探究 AQS 细节 — 条件队列篇

当有其他线程调用了条件队列的 signal() 方法,线程则会从条件队列中移除,并加入到同步队列中等待获取锁。

从 ArrayBlockingQueue 探究 AQS 细节 — 条件队列篇

方法解析

await

该方法的作用是将当前线程加入条件队列并阻塞,直到被唤醒或中断。

public final void await() throws InterruptedException {
    if (Thread.interrupted())
        throw new InterruptedException();
    Node node = addConditionWaiter();
    int savedState = fullyRelease(node);
    int interruptMode = 0;
    while (!isOnSyncQueue(node)) {
        LockSupport.park(this);
        if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
            break;
    }
    if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
        interruptMode = REINTERRUPT;
    if (node.nextWaiter != null) // clean up if cancelled
        unlinkCancelledWaiters();
    if (interruptMode != 0)
        reportInterruptAfterWait(interruptMode);
}
  1. 如果线程被中断过,那么直接抛出中断异常
  2. 创建一个 Condition 类型的节点,并插入到等待队列中。
private Node addConditionWaiter() {
    if (!isHeldExclusively())
        throw new IllegalMonitorStateException();
    Node t = lastWaiter;
    // If lastWaiter is cancelled, clean out.
    if (t != null && t.waitStatus != Node.CONDITION) {
        unlinkCancelledWaiters();
        t = lastWaiter;
    }

    Node node = new Node(Node.CONDITION);

    if (t == null)
        firstWaiter = node;
    else
        t.nextWaiter = node;
    lastWaiter = node;
    return node;
}

我们看到 isHeldExclusively(),这个方法主要就是判断当前线程是否是持有独占锁的线程,如果不是,则直接抛出异常。所以,我们可以知道条件队列只适用于独占锁的情况。并且调用 await() 方法前,必须先获取锁。

然后,如果最后一个节点被取消的话,则会调用 unlinkCancelledWaiters() 方法移除掉所有被取消的节点。这个方法就是从等待的第一个元素开始,依次向后查找被取消的节点,然后将这些被取消的节点移除出等待队列中。

最后创建一个 Condition 节点,并且加入到条件队列中。

注意:操作等待队列的过程中,因为依然持有独占锁,所以是线程安全的,并不需要额外的同步操作。

  1. 释放锁。fullyRelease 是一个释放当前持有锁的方法,这个方法是调用 tryRelease 释放所持有的锁。

这里释放锁的原因是接下来将阻塞线程,如果不释放锁,那么这个锁资源将无法再被使用,直到这个持有锁的线程被唤醒,会造成资源的浪费。

  1. 循环判断当前节点是否在同步队列中
while (!isOnSyncQueue(node)) {
    LockSupport.park(this);
    if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
        break;
}

如果不在同步队列中,则直接阻塞线程,此时线程就停留在LockSupport.park(this); 处。直到其他线程将他唤醒或者中断,才会继续执行。

如何判断节点是否在同步队列中呢?

我们可以先思考一下,等待队列中的节点状态都是 Condition 的,而同步队列中的状态则是 0、Signal等等,因此我们通过判断节点的状态。其次,同步队列中使用 pred、next 指针来组织同步队列的结构,我们可以通过判断节点的 pred 和 next 指针是否有值来判断。

final boolean isOnSyncQueue(Node node) {
    if (node.waitStatus == Node.CONDITION || node.prev == null)
        return false;
    if (node.next != null) // If has successor, it must be on queue
        return true;
    /*
     * node.prev can be non-null, but not yet on queue because
     * the CAS to place it on queue can fail. So we have to
     * traverse from tail to make sure it actually made it.  It
     * will always be near the tail in calls to this method, and
     * unless the CAS failed (which is unlikely), it will be
     * there, so we hardly ever traverse much.
     */
    return findNodeFromTail(node);
}

第一种情况:通过判断节点的状态和 pred 指针来确定节点是否已经在同步队列中。

第二种情况:通过判断节点的 next 指针来确定节点是否已经在同步队列中。

而第三种情况其实是第一种情况的延伸,主要跟 enq() 插入同步队列有关。主要考虑的是当 pred 指针不为 null 时,说明节点可能已经为同步队列中,为什么说可能,是因为 CAS 设置 next 指针可能会失败。所以需要从同步队列的后面从前面开始在同步队列寻找此节点。

private Node enq(Node node) {
    for (;;) {
        Node oldTail = tail;
        if (oldTail != null) {
            node.setPrevRelaxed(oldTail);
            if (compareAndSetTail(oldTail, node)) {
                oldTail.next = node;
                return oldTail;
            }
        } else {
            initializeSyncQueue();
        }
    }
}

在插入同步队列方法中,我们可以看到第 5 行,它是先设置节点的 pred 指针。然后再通过 CAS 设置节点的 next 指针,如果此时 CAS 失败了,就会出现 pred 指针有值,但是 next 是找不到节点的。

  1. 当线程被唤醒之后(signal 或者中断),检查中断。
private int checkInterruptWhileWaiting(Node node) {
    return Thread.interrupted() ?
        (transferAfterCancelledWait(node) ? THROW_IE : REINTERRUPT) :
        0;
}

如果线程在 signal 之前就已经被唤醒(中断方式),则返回 THROW_IE;如果在 signal 唤醒之后才被中断,则返回REINTERRUPT。

怎么判断线程的唤醒方式呢?

我们可以来思考一下:signal 的时候,会将节点的 Condition 状态修改为 0,也就是说如果是使用 signal 的方式唤醒线程,那么节点在被唤醒之前,它的节点状态已经被修改为 0 了。而使用中断的方式唤醒线程,则不会修改节点的状态。因此我们只需要判断节点的状态就可以知道线程是被哪种方式唤醒的。

我们通过源码来揭晓我们的猜想是否正确:

final boolean transferAfterCancelledWait(Node node) {
    if (node.compareAndSetWaitStatus(Node.CONDITION, 0)) {
        enq(node);
        return true;
    }
    /*
     * If we lost out to a signal(), then we can't proceed
     * until it finishes its enq().  Cancelling during an
     * incomplete transfer is both rare and transient, so just
     * spin.
     */
    while (!isOnSyncQueue(node))
        Thread.yield();
    return false;
}

通过 CAS 修改节点的状态,如果节点的状态是 Condition 的话,则可以成功修改为 0。说明节点就是被中断唤醒。如果节点状态无法修改成功,则说明节点的状态已经不是 Condition 了,那么则说明在唤醒之前已经被修改了,那么只可能是 signal 方法唤醒的。

如果节点是通过 signal 方法唤醒的,这里会循环判断节点是否已经在同步队列中了,只有节点已经在同步队列,才会结束执行。

为什么这里需要循环判断节点是否已经在同步队列呢?

如果这里不执行这一段的话,那么直接返回 false。我们此时回到 await 方法,会进入到下一次循环,继续判断节点是否在同步队列中,如果节点此时还没有加入到同步队列的话,就会继续被阻塞。那么这个节点就会出现在同步队列,却被阻塞的情况。如果此时没有其他线程唤醒的话,该线程就成为了一个死线程。

  1. 被唤醒之后,尝试获取锁

这里获取锁的方法跟独占锁获取锁的方法是同一个。获取锁成功之后,将返回,并重新记录中断。

signal

该方法的作用是将条件队列中的第一个线程唤醒。

public final void signal() {
    if (!isHeldExclusively())
        throw new IllegalMonitorStateException();
    Node first = firstWaiter;
    if (first != null)
        doSignal(first);
}
  1. 如果线程没有获取锁,那么则直接抛出异常。
  2. 唤醒条件队列中的第一个节点。
private void doSignal(Node first) {
    do {
        if ( (firstWaiter = first.nextWaiter) == null)
            lastWaiter = null;
        first.nextWaiter = null;
    } while (!transferForSignal(first) &&
             (first = firstWaiter) != null);
}

将节点从条件队列中移除,并加入到同步队列中,同时唤醒线程。

这里为什么需要使用一个 while 循环? 因为有可能当前需要唤醒节点已经被取消了,那么就需要继续唤醒下一个节点,直到有一个节点被成功唤醒或者条件队列为空才结束,这样做是保证可以正常唤醒一个线程。

我们线程来看它是如何唤醒一个线程的:

final boolean transferForSignal(Node node) {
    if (!node.compareAndSetWaitStatus(Node.CONDITION, 0))
        return false;

    /*
     * Splice onto queue and try to set waitStatus of predecessor to
     * indicate that thread is (probably) waiting. If cancelled or
     * attempt to set waitStatus fails, wake up to resync (in which
     * case the waitStatus can be transiently and harmlessly wrong).
     */
    Node p = enq(node);
    int ws = p.waitStatus;
    if (ws > 0 || !p.compareAndSetWaitStatus(ws, Node.SIGNAL))
        LockSupport.unpark(node.thread);
    return true;
}

从第一个条件判断,我们就可以知道当节点已经不是 Condition 状态时,说明它可能已经被取消了。因此直接返回 false,让外层的 while 循环去唤醒下一个节点。

当已经加入同步队列后,发现前驱节点已经被取消,或者设置前驱节点的状态为 Signal 失败了,那么则直接唤醒当前节点的线程。

为什么要唤醒线程呢?而不是根据同步队列的规则去唤醒?

这里是为了让节点在同步队列中,可以自己完成修正,主要细节在 acquireQueued 这个方法中。我们这里只做简单的介绍。acquireQueued 会去尝试获取锁,如果获取锁失败,那么则会根据前驱节点的状态,来做出调整。如果前驱节点已经是 Signal,那么则直接进入阻塞,等待前驱节点唤醒。如果前驱节点是取消状态,即 status > 0,那么则往前寻找一个 status <= 0 的节点。

如果当前节点的前驱节点的状态可以正常设置为 Signal,那么则不会进行唤醒,而是按照同步队列的规则去进行后续的唤醒操作。

当线程被唤醒之后,就会继续执行 await 方法的后续代码。

小结

  1. 条件队列只能在独占锁情况下使用。
  2. 在调用 await 和 signal 方法前,必须先获取锁,否则会抛出异常。
  3. 线程可以被中断和 signal 两种方式唤醒。
  4. 线程被唤醒之后,并不会马上获得锁,而是加入同步队列,跟同步队列中的其他节点一起竞争锁。
  5. 使用 await 方法时,如果中断发生调用 signal 方法之前,那么会直接抛出中断异常。如果不想处理中断,可以使用 awaitUninterruptibly(),该方法不会抛出中断异常,只会记录中断标记。