likes
comments
collection
share

详解AQS的具体流程

作者站长头像
站长
· 阅读数 15

AQS中的四个字段

本文讲解的是JDK17中的源码。

AQS中的字段很少,除去一些静态常量后就只剩下四个字段,并且这四个字段还恰恰是非常关键的,它们分别是 headtailstateexclusiveOwnerThread,其中 exclusiveOwnerThread 字段是继承自父类的。

由于AQS就是一个多线程同步器,用来同步多个线程的,因此需要一个数据结构来存放正在同步过程中的线程(也就是正在排队中的线程),在这里AQS使用了双向链表数据结构来作为等待队列,而AQS中的 headtail 变量就是用来指向等待队列中头节点和尾节点的。

AQS就像一个窗口柜台业务员,它的 state 字段用来表示该业务员当前的状态,比如是正在办理业务中还是正在空闲中,如果是正在办理中的话,那么它的 exclusiveOwnerThread 字段就是用来表示当前正在给谁办理业务中,如果正在空闲中的话,则该字段的值为 null,表示现在没人正在办理业务中。

在AQS的等待队列中存放的并不是线程对象本身,而是封装了线程相关信息的节点对象,结构如下图:

详解AQS的具体流程

等待队列中的虚拟节点

在上图的AQS等待队列中会注意到头节点是一个虚拟节点,这并不是什么特殊情况,而是一种简化链表操作的巧妙设计,这种设计在学习链表数据结构的时候就经常会用到。如果没有这个虚拟节点的话,则我们需要做很多繁琐的临界判断,比如在每次插入节点的时候都需要判断头指针是否指向 null,如果是的话则需要额外将头指针指向自己,然后再修改为指针,而有了虚拟节点后,在插入节点的时候直接改变尾指针即可,不用考虑头指针的指向 null 情况了,因为有了虚拟节点的存在后头指针一定不为 null

虚拟节点也可以叫做辅助节点、伪节点,我这里是翻译自AQS注释中的 “dummy node”。

请求修改状态

在AQS中,线程想办理某种业务时必须要先去请求执行该业务的许可,只有得到许可后才可以去窗口柜台中办理业务,如果本次请求失败了,那么可以过一会再来请求。

在请求许可时,实际上是去操作AQS中的 state 字段,可以通过 acquire 方法来发出请求,参数为要修改成的状态(具体的状态对应的整数值由子类决定),该方法会在请求成功后返回,中途必要时会发生阻塞行为。

acquire 方法在执行前会先去尝试修改状态,修改成功则直接就返回了,由于是尝试修改,所以即使修改失败了也会直接返回,而不会产生任何的阻塞行为。具体的是怎么修改的以及状态整数值的意义是什么,都是由子类来决定的,而AQS只是负责同步线程,也就是维护秩序的保安。从这里就可以体会到AQS(抽象队列同步器)的名称含义所在了。

acquire 方法的源码如下:

public final void acquire(int arg) {
    if (!tryAcquire(arg))   // 尝试请求修改状态
        // 请求失败,开始进入等待队列。
        acquire(null, arg, false, false, false, 0L);
}

AQS为我们提供了3个用来操作 state 字段的方法,分别为:

  • getState:普通的Getter方法。
  • setState:普通的Setter方法。
  • compareAndSetState:通过CAS的方式将预期的状态改为指定的状态。

具体的请求流程

我们先假设有一个线程正在执行足够耗时的任务,此时第二个线程去请求执行任务,然后我们分析第二个线程的执行流程。入队的完整源码如下:

final int acquire(AbstractQueuedSynchronizer.Node node, int arg, boolean shared,
                  boolean interruptible, boolean timed, long time) {
    Thread current = Thread.currentThread();
    byte spins = 0, postSpins = 0;
    boolean interrupted = false, first = false;
    AbstractQueuedSynchronizer.Node pred = null;

    for (;;) {
        if (!first && (pred = (node == null) ? null : node.prev) != null &&
                !(first = (head == pred))) {
            if (pred.status < 0) {
                cleanQueue();
                continue;
            } else if (pred.prev == null) {
                Thread.onSpinWait();
                continue;
            }
        }
        if (first || pred == null) {
            boolean acquired;
            try {
                if (shared)
                    acquired = (tryAcquireShared(arg) >= 0);
                else
                    acquired = tryAcquire(arg);
            } catch (Throwable ex) {
                cancelAcquire(node, interrupted, false);
                throw ex;
            }
            if (acquired) {
                if (first) {
                    node.prev = null;
                    head = node;
                    pred.next = null;
                    node.waiter = null;
                    if (shared)
                        signalNextIfShared(node);
                    if (interrupted)
                        current.interrupt();
                }
                return 1;
            }
        }
        if (node == null) {
            if (shared)
                node = new AbstractQueuedSynchronizer.SharedNode();
            else
                node = new AbstractQueuedSynchronizer.ExclusiveNode();
        } else if (pred == null) {
            node.waiter = current;
            AbstractQueuedSynchronizer.Node t = tail;
            node.setPrevRelaxed(t);
            if (t == null)
                tryInitializeHead();
            else if (!casTail(t, node))
                node.setPrevRelaxed(null);
            else
                t.next = node;
        } else if (first && spins != 0) {
            --spins;
            Thread.onSpinWait();
        } else if (node.status == 0) {
            node.status = WAITING;
        } else {
            long nanos;
            spins = postSpins = (byte)((postSpins << 1) | 1);
            if (!timed)
                LockSupport.park(this);
            else if ((nanos = time - System.nanoTime()) > 0L)
                LockSupport.parkNanos(this, nanos);
            else
                break;
            node.clearStatus();
            if ((interrupted |= Thread.interrupted()) && interruptible)
                break;
        }
    }
    return cancelAcquire(node, interrupted, interruptible);
}

这里需要先了解一些要点:

  1. AQS对于阻塞操作的优化。阻塞线程是一个很昂贵的操作,涉及到操作系统从用户态到内核态的切换,因此AQS会尽可能地避免阻塞,会在进入等待队列前频繁地尝试修改状态,如果尝试多次后实在修改不了,才会迫不得已进入等待队列并开始阻塞。
  2. 源码中的 first 变量指向的节点并不是等待队列中的虚拟节点,而是虚拟节点的下一个节点。
  3. pred 变量指向当前节点的上一个节点,由于有虚拟节点的存在,只要该变量的值不为 null,则说明当前节点已经进入到等待队列中了,如果为 null 则说明节点还没有创建或已创建但还没有进入到等待队列中。
  4. 入队操作分为好几个小阶段,每一次for循环都会执行其中一个小阶段的任务。

这段源码不是高挑美女,没有什么惊艳的设计模式,而是传统的素颜美女,非常的耐看,每一次看都会有不一样的体会。由于这段代码非常的绕(不愧是Doug Lea),每一次for循环都是一个小阶段,并且不同阶段会走不同的分支,我们只需关注特定阶段的分支代码并了解该阶段的具体流程即可,因此我会省略掉一些无关代码,并用红色框框出特定阶段需要关注的代码。

阶段1:创建空节点

详解AQS的具体流程

阶段2:创建虚拟节点/入队

创建虚拟节点

在入队先需要先判断是否有虚拟节点,如果没有虚拟节点则当前阶段会变成创建虚拟节点阶段。

详解AQS的具体流程

在创建好虚拟节点后并不是直接入队的,由于AQS需要面对高并发的场景,因此AQS在修改任何内部的共享变量时都会使用到CAS技术。

private void tryInitializeHead() {
    Node h = new ExclusiveNode();
    if (U.compareAndSetReference(this, HEAD, null, h))
        tail = h;
}

假如此时有大量的线程在同时创建完虚拟节点,接下来同时修改 head 变量,此时只有一个线程可以修改成功,而其它线程将会修改失败。这里需要注意的是,修改失败的线程并不需要自旋重试,因为只要有了虚拟节点就行,后面又不会使用到该虚拟节点,因此没有必要再创建一个新的虚拟节点然后再覆盖之前的虚拟节点。

CAS成功的线程可以直接修改 tail 变量,而不用担心线程安全的问题,因为CAS失败的线程直接就返回了,不会操作 tail 变量。

入队

有了虚拟节点后,当前阶段就会变成入队阶段了。入队其实就是将 tail 变量由原来的指向改成指向当前节点。由于 tail 变量是共享变量,因此需要使用CAS来修改。这里其实还有个需要注意的地方, tail 变量原来指向的节点其实也算是共享变量,但由于只有CAS修改 tail 变量成功的线程才会操作该共享变量,不存在竞争关系,因此无需再使用CAS来修改。

详解AQS的具体流程

阶段3:修改节点状态

节点是用来封装对应线程相关信息的,封装的信息如下:

  • prev:指向等待队列中的前一个节点。
  • next:指向等待队列中的后一个节点。
  • waiter:当前节点对应的线程对象。
  • status:当前节点的状态。

详解AQS的具体流程

节点的状态有4种:

  1. 0:初始值,表示节点刚刚被创建,无任何状态。
  2. WAITING:线程正在阻塞状态中。
  3. CANCELLED:值为负数,表示线程已退出,有可能是等太久不想等了(超时),也有可能是被其它人叫走了(被打断)。只要节点的状态值为负数,则一律为 CANCELLED 状态。
  4. COND:有条件的等待,和 Condition 有关,可以先忽略。

详解AQS的具体流程

开始修改节点状态:

详解AQS的具体流程

这里需要特别注意一点是,当前阶段中如果节点不是 first 节点则不会尝试修改状态,因为AQS必须确保等待队列中的线程会按照FIFO的顺序请求执行任务,也就是说只要节点入队了就不能插队,此时由于非 first 节点已经入队了,所以当前阶段包括后续的阶段都不能再去尝试修改状态了,否则就属于插队了。

阶段4:阻塞

阻塞也就是调用了 LockSupport 类的 park 方法。

详解AQS的具体流程

在这些阶段中,可以很明显地观察到每一个小阶段前都会尝试修改状态(非 first 节点入队后的阶段除外),也就是前面说的除非迫不得已,否则AQS会尽最大的努力来避免进入阻塞状态。如果刚好尝试修改状态成功了,则会直接返回,如果已经入队列了则会将 first 节点中的信息清空以作为新的虚拟节点并移除之前的虚拟节点,然后再返回。

详解AQS的具体流程

唤醒first节点

当正在执行任务的线程执行完了后,会释放先前请求修改的状态(具体的释放操作由子类完成),并且在释放成功后会唤醒 first 节点中的线程。

public final boolean release(int arg) {
    if (tryRelease(arg)) {
        // 唤醒first节点
        signalNext(head);
        return true;
    }
    return false;
}

private static void signalNext(Node h) {
    Node s;
    // 有first节点并且first有状态时才唤醒
    if (h != null && (s = h.next) != null && s.status != 0) {
        // 将WAITING状态改为CANCELLED状态,表示该线程已经被唤醒了。
        s.getAndUnsetStatus(WAITING);
        // 唤醒first节点对应的线程
        LockSupport.unpark(s.waiter);
    }
}

first 节点被唤醒后会进行下一轮的for循环,并且在下一轮的for循环中去尝试修改状态,修改成功后会将 first 节点出队。

详解AQS的具体流程

first节点自旋

先不考虑线程阻塞超时和被打断的情况,只考虑 first 节点被执行完任务的线程正常唤醒后,如果此时的并发量很大,在 first 节点被唤醒的同时又有新线程请求修改状态,那么此时就算是排到了 first 节点,first 节点也不一定会优先被处理,还得和新来的线程竞争,只有竞争成功了才可以去修改状态。

由于这种情况的存在,如果 first 节点和新来的线程竞争失败了,为了避免立即进入阻塞状态,first 节点还会自旋几次,如果实在修改不了状态才会进入阻塞状态,从这里也可以看出AQS会尽最大努力避免线程进入阻塞状态,以达到高性能的目的。

first 节点自旋的次数是有规律的,公式为 2n+1,并且最开始 n 的值为0。acquire 方法中会有2个与自旋次数有关的变量,分别为 spinspostSpinsspins 表示再自旋多少次就会阻塞,postSpins 表示 first 节点在被唤醒后如果和新来的线程竞争时又失败了,则需要再自旋多少次才会进入阻塞状态。

详解AQS的具体流程

自旋的流程如下:

详解AQS的具体流程

由此可以看到,AQS并不是完全公平的,只有进入到等待队列中的线程才是公平的。

线程自然唤醒或被中断

当节点的阻塞时间达到超时时间后,或者被其它线程打断时,节点会开始退出等待队列。

当节点由于达到超时时间而被唤醒后,并不会立刻退出队列,而是会尝试一下修改状态(只有 first 节点才会尝试修改状态,因为非 first 节点不能插队),如果刚好就修改成功了则直接返回,如果没有修改成功则只能乖乖退出队列了。

详解AQS的具体流程

当阻塞的节点被其它线程打断时,不应该像上面一样临走前还去尝试一下修改状态,因为线程必须要能够及时响应中断。

详解AQS的具体流程

当线程退出for循环后,就会开始退出队列。

详解AQS的具体流程

中断线程时,需要非常注意一点的是区分要中断的是请求修改状态的过程,还是要中断当前线程的执行。acquire 方法中的 interruptible 参数就表示在请求修改状态的过程中是否可以被中断,如果可以的话,在收到中断信号后会清空信号,以防止意外地将这个信号发给后续执行的任务中。

详解AQS的具体流程

退出等待队列

退出等待队列前线程会将当前节点的状态变成 CANCELLED,然后从等待队列中的尾部节点开始向前遍历,当遍历到 CANCELLED 状态的节点时就弹出该节点。之所以从后往前遍历,主要是为了避免与线程唤醒操作之间的竞争,因为线程是从头部开始唤醒的,而弹出操作是从尾部开始的,这样就可以最大限度地避免竞争的发生。

在遍历节点时,会用到3个指针,分别是 pqs,其中 q 是最主要的,指向了当前遍历到哪个节点,ps 用来指向当前节点的前一个和后一个节点,以便在操作链表时不会把链表搞断。

详解AQS的具体流程

详解AQS的具体流程

总结

AQS其实就是一个高性能的用来管理等待队列的一个框架,每个线程在执行任务前都必须要修改AQS的相关状态,只有修改成功的线程才可以执行任务,并且在执行完任务后还需要恢复状态。有了AQS框架后,我们只需关注修改和恢复状态的逻辑即可,而管理其它未修改成功的线程的任务交给AQS去执行即可,这也是队列同步器的含义所在。