likes
comments
collection
share

AQS抽象的队列同步器-详解

作者站长头像
站长
· 阅读数 11

一、AQS的概念

AQS,全称AbstractQueuedSynchronizer,抽象的队列同步器,使用了模板设计模式。AQS是用来构建锁或者其他同步器组件的重量级基础框架及整个JUC体系的基石,通过内置的FIFO队列来完成资源获取线程的排队工作,并通过一个int类型变量state表示持有锁的状态。

为什么要用AQS?

抢到资源的线程直接处理业务逻辑,抢不到资源的线程涉及一种排队等候机制。抢占资源失败的线程继续等待(类似银行业务办理窗口都满了,暂时没有受理窗口的顾客只能去候客区排队等候),但排队线程仍然保留获取锁的可能且获取锁流程仍在继续(候客区的顾客也在等着叫号,轮到了再去受理窗口办理业务)。

既然说到了排队等候机制,那么就一定会有某种队列形成,这样的队列是什么数据结构呢?

二、初识AQS类

2.1 AQS内部体系架构

AQS抽象的队列同步器-详解 从上图可以看到:

  • ReentrantLock类继承自Lock接口,而且该类有三个内部类:Sync, NonFairSync, FairSync。
  • Sync抽象类继承自AQS抽象类,而且该类有两个子类:NonFairSync, FairSync。
  • AQS抽象类中有两个内部类:Node, ConditionObject。
  • Lock接口的实现类内部都是聚合了AQS的子类完成线程访问控制的。

2.2 类定义

AQS抽象的队列同步器-详解 state:同步状态。=0:表示锁空闲,可以抢占资源。 >=1:表示已经有线程占有资源。

下面来看一下Node对象定义的内容。

static final class Node {
        
    //共享节点,表示线程以共享的方式等待锁
    static final Node SHARED = new Node();
    /排他节点,表示线程以独占的方式等待锁
    static final Node EXCLUSIVE = null;

    //等待状态:取消状态。表示线程获取锁的请求已经取消了
    static final int CANCELLED =  1;
    //等待状态:阻塞状态。表示线程都准备好了,就等资源释放了
    static final int SIGNAL    = -1;
    //等待状态:条件等待状态。表示节点在等待队列中,节点线程等待唤醒
    static final int CONDITION = -2;
    //等待状态:获取共享锁时使用。
    static final int PROPAGATE = -3;
    
    //当前节点在队列中的状态。0-初始状态。
    volatile int waitStatus;

    //前驱节点
    volatile Node prev;

    //后继节点
    volatile Node next;

    //当前节点持有线程
    volatile Thread thread;

    //指向下一个处于CONDITION状态的节点
    Node nextWaiter;
}

由Node节点组成了双向队列。

AQS中的CLH队列的变体

AQS抽象的队列同步器-详解

如果共享资源被占用,就需要一定的阻塞等待唤醒机制来保证锁分配。这个机制主要用的是CLH队列的变体实现的,将暂时获取不到锁的线程加入到队列中,这个队列就是AQS的抽象表现。它将请求共享资源的线程封装成队列的节点(Node),通过CAS、自旋以及LockSupport.park() 的方式,维护volatile的int类型的state变量的状态,使并发达到同步的控制效果。

三、公平锁与非公平锁

3.1 公平锁

讲究先到先得,线程在获取锁时,如果这个锁的等待队列中已经有线程在等待,那么当前线程就会进入等待队列中。

static final class FairSync extends Sync {
        final void lock() {
            acquire(1);
        }
        
        //如果等待队列中有线程,则当前请求线程加入到等待队列尾部。
        public final void acquire(int arg) {
            if (!tryAcquire(arg) &&
                acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
                selfInterrupt();
        }
    
        protected final boolean tryAcquire(int acquires) {
            final Thread current = Thread.currentThread();
            int c = getState();
            if (c == 0) {
                //hasQueuedPredecessors():如果队列中有等待线程返回true。
                if (!hasQueuedPredecessors() &&
                    compareAndSetState(0, acquires)) {
                    setExclusiveOwnerThread(current);
                    return true;
                }
            }
            else if (current == getExclusiveOwnerThread()) {
                int nextc = c + acquires;
                if (nextc < 0)
                    throw new Error("Maximum lock count exceeded");
                setState(nextc);
                return true;
            }
            return false;
        }
    }

3.2 非公平锁

不管是否有等待队列,如果可以获取锁,则立刻占有锁对象。也就是说队列的第一个排队线程在unpark()之后还是需要竞争(存在线程竞争的情况下)。

static final class NonfairSync extends Sync {
    
    final void lock() {
            //当前线程在入队列之前,尝试获取锁
            if (compareAndSetState(0, 1))
                setExclusiveOwnerThread(Thread.currentThread());
            else
                acquire(1);
    }
    protected final boolean tryAcquire(int acquires) {
            return nonfairTryAcquire(acquires);
    }
    final boolean nonfairTryAcquire(int acquires) {
            final Thread current = Thread.currentThread();
            int c = getState();
            if (c == 0) {
                //当前线程在入队列之前,尝试获取锁
                if (compareAndSetState(0, acquires)) {
                    setExclusiveOwnerThread(current);
                    return true;
                }
            }
            else if (current == getExclusiveOwnerThread()) {
                int nextc = c + acquires;
                if (nextc < 0) // overflow
                    throw new Error("Maximum lock count exceeded");
                setState(nextc);
                return true;
            }
            return false;
    }
}

3.3 总结

可以明显看出,公平锁与非公平锁的lock()方法唯一的区别在于公平锁在获取同步状态时多了一个限制条件:hasQueuedPredecessors(),它是公平锁加锁时判断等待队列中是否存在有效节点的办法。

四、跟踪源码-非公平锁

本次源码以非公平锁的lock()/unlock()为入口。

4.1 资源空闲,线程来可以直接索取锁

final void lock() {
    //资源空闲,当前线程可以直接索取锁。将state通过CAS操作改成1
    if (compareAndSetState(0, 1))
        //设置当前线程为资源的占有者
        setExclusiveOwnerThread(Thread.currentThread());
    else
        //资源被占用,线程尝试获取锁
        acquire(1);
}

4.2 资源被占用,线程需要等待获取锁

public final void acquire(int arg) {
    if (!tryAcquire(arg) &&
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        selfInterrupt();
}
  1. 当前线程尝试获取锁,如果获取锁成功,则结束;
  2. 如果获取锁失败,则将当前线程加入到等待队列中;
  3. 线程进入到等待队列后,首先尝试获取锁,如果获取失败,则进入阻塞状态并等待被唤醒。

下面将重点讲解三个方法:tryAcquire、addWaiter、acquireQueued。

4.2.1 tryAcquire(arg)-尝试获取锁
protected final boolean tryAcquire(int acquires) {
    return nonfairTryAcquire(acquires);
}

final boolean nonfairTryAcquire(int acquires) {
    final Thread current = Thread.currentThread();
    int c = getState();
    if (c == 0) {
        //state=0, 表示当前资源空闲,当前线程尝试获取锁
        if (compareAndSetState(0, acquires)) {
            setExclusiveOwnerThread(current);
            return true;
        }
    }
    //state != 0, 表示当前资源已经被占用
    //如果当前线程正在持有锁,则state+1, 表示重入锁的次数
    else if (current == getExclusiveOwnerThread()) {
        int nextc = c + acquires;
        if (nextc < 0) // overflow
            throw new Error("Maximum lock count exceeded");
        setState(nextc);
        return true;
    }
    //资源已经被其他线程占有,也就是当前线程获取锁失败,返回false
    return false;
}
4.2.2 addWaiter(Node.EXCLUSIVE)-进入等待队列
private Node addWaiter(Node mode) {
    Node node = new Node(Thread.currentThread(), mode);
    
    Node pred = tail;
    //tail不为null,表示队列中已经有节点,将node节点加入队尾
    if (pred != null) {
        node.prev = pred;
        if (compareAndSetTail(pred, node)) {
            pred.next = node;
            return node;
        }
    }
    //tail为null,表示队列未空,需要先初始化队列,再将node节点加入到队列中
    enq(node);
    return node;
}

/**
 * 通过自旋的方式将node节点入队
 */
private Node enq(final Node node) {
    for(;;) {
        Node t = tail;
        if (t == null) {
            //tail节点为null, 表示队列为空,则新建一个空节点作为头节点,又称为傀儡节点、哨兵节点
            if (compareAndSetHead(new Node()))
                tail = head;
        else {
            //队列不为空,将node节点作为尾节点入队
            node.prev = t;
            if (compareAndSetTail(t, node)) {
                t.next = node;
                return t;
            }
        }
    }
}

AQS抽象的队列同步器-详解 等待队列中的第一个节点为虚节点(也叫哨兵节点),其实并不存储任务信息,只是用来占位。真正第一个有数据的节点,是从第二个节点开始的。

4.2.3 acquireQueued(node, arg)-队列中的节点获取锁或被阻塞
final boolean acquireQueued(final Node node, int arg) {
    boolean interrupted = false;
    try {
        //自旋。
        for (;;) {
            //得到node节点的前驱节点p
            final Node p = node.predecessor();
            //如果p节点是头节点,则当前线程尝试获取锁
            if (p == head && tryAcquire(arg)) {
                //头节点的后继节点获取锁成功,后继节点成为新的头节点,原头节点等待被GC回收
                setHead(node);
                p.next = null; // help GC
                return interrupted;
            }
            //获取锁失败,检验并更新节点的waitStatus
            if (shouldParkAfterFailedAcquire(p, node))
                interrupted |= parkAndCheckInterrupt();
        }
    } catch (Throwable t) {
        cancelAcquire(node);
        if (interrupted)
            selfInterrupt();
        throw t;
    }
}


/**
 * 获取锁失败后,校验并更新节点的waitStatus。
 * 如果线程应该阻塞,return true.
 */
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
    //获取前驱节点的状态
    int ws = pred.waitStatus;
    if (ws == Node.SIGNAL)
        //等待被占用的资源释放,直接返回true
        return true;
    ////ws > 0 说明是CANCELLED状态的
    if (ws > 0) {
        //循环判断前驱节点的前驱节点是否也为CANCELLED状态,忽略该状态的节点,重新连接队列
        do {
            node.prev = pred = pred.prev;
        } while (pred.waitStatus > 0);
        pred.next = node;
    } else {
        //将当前节点的前驱节点设置为SIGNAL状态,用于后续唤醒操作
        //程序第一次执行到这返回false,还会进行外层第二次循环
        pred.compareAndSetWaitStatus(ws, Node.SIGNAL);
    }
    return false;
}

/**
 * 阻塞线程
 */
private final boolean parkAndCheckInterrupt() {
    //线程挂起,程序不会继续向下执行
    LockSupport.park(this);
    /*
     * 根据park方法API描述,程序在下述三种情况会继续向下执行
     * 1. 被unpark
     * 2. 被中断(interrupt)
     * 3. 其他不合逻辑的返回
     *
     * 因上述三种情况,程序执行至此,返回当前线程的中断状态,并清空中断状态
     * 如果由于被中断,该方法会返回true
     */
    return Thread.interrupted();
}

AQS抽象的队列同步器-详解

4.3 释放资源

//释放锁
public void unlock() {
    sync.release(1);
}
//尝试释放锁,
//如果释放成功,则唤醒第二个节点的线程
public final boolean release(int arg) {
    //尝试释放锁
    if (tryRelease(arg)) {
        Node h = head;
        //如果头节点的waitStatus != 0,
        if (h != null && h.waitStatus != 0)
            unparkSuccessor(h);
        return true;
    }
    return false;
}

//将AQS的state更新为0
@ReservedStackAccess
protected final boolean tryRelease(int releases) {
    int c = getState() - releases;
    if (Thread.currentThread() != getExclusiveOwnerThread())
        throw new IllegalMonitorStateException();
    boolean free = false;
    if (c == 0) {
        free = true;
        //将占用资源的线程置为null, 表示当前资源空闲
        setExclusiveOwnerThread(null);
    }
    setState(c);
    return free;
}

//唤醒线程
private void unparkSuccessor(Node node) {
        //node为头节点
        int ws = node.waitStatus;
        if (ws < 0)
            //将waitStatus通过CAS更新成0
            node.compareAndSetWaitStatus(ws, 0);

        Node s = node.next;
        //如果节点为null或者是CANCELLED状态,则跳过,直至找到非取消状态的节点
        if (s == null || s.waitStatus > 0) {
            s = null;
            for (Node p = tail; p != node && p != null; p = p.prev)
                if (p.waitStatus <= 0)
                    s = p;
        }
        if (s != null)
            //唤醒线程
            LockSupport.unpark(s.thread);
}

AQS抽象的队列同步器-详解

4.4 列队中的第二个节点抢占锁

通过上一步将第二个节点中的线程唤醒后,因为第《4.2.3 acquireQueued(node, arg)-队列中的节点获取锁或被阻塞》小节中,acquireQueued方法中for(;;)自旋一直在进行,线程唤醒后,立即抢占锁。如果抢占锁成功,则第二个节点成为新的头节点,原头节点从队列中剔除。

AQS抽象的队列同步器-详解