AQS抽象的队列同步器-详解
一、AQS的概念
AQS,全称AbstractQueuedSynchronizer,抽象的队列同步器,使用了模板设计模式。AQS是用来构建锁或者其他同步器组件的重量级基础框架及整个JUC体系的基石,通过内置的FIFO队列来完成资源获取线程的排队工作,并通过一个int类型变量state表示持有锁的状态。
为什么要用AQS?
抢到资源的线程直接处理业务逻辑,抢不到资源的线程涉及一种排队等候机制。抢占资源失败的线程继续等待(类似银行业务办理窗口都满了,暂时没有受理窗口的顾客只能去候客区排队等候),但排队线程仍然保留获取锁的可能且获取锁流程仍在继续(候客区的顾客也在等着叫号,轮到了再去受理窗口办理业务)。
既然说到了排队等候机制,那么就一定会有某种队列形成,这样的队列是什么数据结构呢?
二、初识AQS类
2.1 AQS内部体系架构
从上图可以看到:
- ReentrantLock类继承自Lock接口,而且该类有三个内部类:Sync, NonFairSync, FairSync。
- Sync抽象类继承自AQS抽象类,而且该类有两个子类:NonFairSync, FairSync。
- AQS抽象类中有两个内部类:Node, ConditionObject。
- Lock接口的实现类内部都是聚合了AQS的子类完成线程访问控制的。
2.2 类定义
state:同步状态。=0:表示锁空闲,可以抢占资源。 >=1:表示已经有线程占有资源。
下面来看一下Node对象定义的内容。
static final class Node {
//共享节点,表示线程以共享的方式等待锁
static final Node SHARED = new Node();
/排他节点,表示线程以独占的方式等待锁
static final Node EXCLUSIVE = null;
//等待状态:取消状态。表示线程获取锁的请求已经取消了
static final int CANCELLED = 1;
//等待状态:阻塞状态。表示线程都准备好了,就等资源释放了
static final int SIGNAL = -1;
//等待状态:条件等待状态。表示节点在等待队列中,节点线程等待唤醒
static final int CONDITION = -2;
//等待状态:获取共享锁时使用。
static final int PROPAGATE = -3;
//当前节点在队列中的状态。0-初始状态。
volatile int waitStatus;
//前驱节点
volatile Node prev;
//后继节点
volatile Node next;
//当前节点持有线程
volatile Thread thread;
//指向下一个处于CONDITION状态的节点
Node nextWaiter;
}
由Node节点组成了双向队列。
AQS中的CLH队列的变体
如果共享资源被占用,就需要一定的阻塞等待唤醒机制来保证锁分配。这个机制主要用的是CLH队列的变体实现的,将暂时获取不到锁的线程加入到队列中,这个队列就是AQS的抽象表现。它将请求共享资源的线程封装成队列的节点(Node),通过CAS、自旋以及LockSupport.park() 的方式,维护volatile的int类型的state变量的状态,使并发达到同步的控制效果。
三、公平锁与非公平锁
3.1 公平锁
讲究先到先得,线程在获取锁时,如果这个锁的等待队列中已经有线程在等待,那么当前线程就会进入等待队列中。
static final class FairSync extends Sync {
final void lock() {
acquire(1);
}
//如果等待队列中有线程,则当前请求线程加入到等待队列尾部。
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
//hasQueuedPredecessors():如果队列中有等待线程返回true。
if (!hasQueuedPredecessors() &&
compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
}
3.2 非公平锁
不管是否有等待队列,如果可以获取锁,则立刻占有锁对象。也就是说队列的第一个排队线程在unpark()之后还是需要竞争(存在线程竞争的情况下)。
static final class NonfairSync extends Sync {
final void lock() {
//当前线程在入队列之前,尝试获取锁
if (compareAndSetState(0, 1))
setExclusiveOwnerThread(Thread.currentThread());
else
acquire(1);
}
protected final boolean tryAcquire(int acquires) {
return nonfairTryAcquire(acquires);
}
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
//当前线程在入队列之前,尝试获取锁
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
}
3.3 总结
可以明显看出,公平锁与非公平锁的lock()方法唯一的区别在于公平锁在获取同步状态时多了一个限制条件:hasQueuedPredecessors(),它是公平锁加锁时判断等待队列中是否存在有效节点的办法。
四、跟踪源码-非公平锁
本次源码以非公平锁的lock()/unlock()为入口。
4.1 资源空闲,线程来可以直接索取锁
final void lock() {
//资源空闲,当前线程可以直接索取锁。将state通过CAS操作改成1
if (compareAndSetState(0, 1))
//设置当前线程为资源的占有者
setExclusiveOwnerThread(Thread.currentThread());
else
//资源被占用,线程尝试获取锁
acquire(1);
}
4.2 资源被占用,线程需要等待获取锁
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
- 当前线程尝试获取锁,如果获取锁成功,则结束;
- 如果获取锁失败,则将当前线程加入到等待队列中;
- 线程进入到等待队列后,首先尝试获取锁,如果获取失败,则进入阻塞状态并等待被唤醒。
下面将重点讲解三个方法:tryAcquire、addWaiter、acquireQueued。
4.2.1 tryAcquire(arg)-尝试获取锁
protected final boolean tryAcquire(int acquires) {
return nonfairTryAcquire(acquires);
}
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
//state=0, 表示当前资源空闲,当前线程尝试获取锁
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
//state != 0, 表示当前资源已经被占用
//如果当前线程正在持有锁,则state+1, 表示重入锁的次数
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
//资源已经被其他线程占有,也就是当前线程获取锁失败,返回false
return false;
}
4.2.2 addWaiter(Node.EXCLUSIVE)-进入等待队列
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
Node pred = tail;
//tail不为null,表示队列中已经有节点,将node节点加入队尾
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
//tail为null,表示队列未空,需要先初始化队列,再将node节点加入到队列中
enq(node);
return node;
}
/**
* 通过自旋的方式将node节点入队
*/
private Node enq(final Node node) {
for(;;) {
Node t = tail;
if (t == null) {
//tail节点为null, 表示队列为空,则新建一个空节点作为头节点,又称为傀儡节点、哨兵节点
if (compareAndSetHead(new Node()))
tail = head;
else {
//队列不为空,将node节点作为尾节点入队
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
等待队列中的第一个节点为虚节点(也叫哨兵节点),其实并不存储任务信息,只是用来占位。真正第一个有数据的节点,是从第二个节点开始的。
4.2.3 acquireQueued(node, arg)-队列中的节点获取锁或被阻塞
final boolean acquireQueued(final Node node, int arg) {
boolean interrupted = false;
try {
//自旋。
for (;;) {
//得到node节点的前驱节点p
final Node p = node.predecessor();
//如果p节点是头节点,则当前线程尝试获取锁
if (p == head && tryAcquire(arg)) {
//头节点的后继节点获取锁成功,后继节点成为新的头节点,原头节点等待被GC回收
setHead(node);
p.next = null; // help GC
return interrupted;
}
//获取锁失败,检验并更新节点的waitStatus
if (shouldParkAfterFailedAcquire(p, node))
interrupted |= parkAndCheckInterrupt();
}
} catch (Throwable t) {
cancelAcquire(node);
if (interrupted)
selfInterrupt();
throw t;
}
}
/**
* 获取锁失败后,校验并更新节点的waitStatus。
* 如果线程应该阻塞,return true.
*/
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
//获取前驱节点的状态
int ws = pred.waitStatus;
if (ws == Node.SIGNAL)
//等待被占用的资源释放,直接返回true
return true;
////ws > 0 说明是CANCELLED状态的
if (ws > 0) {
//循环判断前驱节点的前驱节点是否也为CANCELLED状态,忽略该状态的节点,重新连接队列
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
//将当前节点的前驱节点设置为SIGNAL状态,用于后续唤醒操作
//程序第一次执行到这返回false,还会进行外层第二次循环
pred.compareAndSetWaitStatus(ws, Node.SIGNAL);
}
return false;
}
/**
* 阻塞线程
*/
private final boolean parkAndCheckInterrupt() {
//线程挂起,程序不会继续向下执行
LockSupport.park(this);
/*
* 根据park方法API描述,程序在下述三种情况会继续向下执行
* 1. 被unpark
* 2. 被中断(interrupt)
* 3. 其他不合逻辑的返回
*
* 因上述三种情况,程序执行至此,返回当前线程的中断状态,并清空中断状态
* 如果由于被中断,该方法会返回true
*/
return Thread.interrupted();
}
4.3 释放资源
//释放锁
public void unlock() {
sync.release(1);
}
//尝试释放锁,
//如果释放成功,则唤醒第二个节点的线程
public final boolean release(int arg) {
//尝试释放锁
if (tryRelease(arg)) {
Node h = head;
//如果头节点的waitStatus != 0,
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
//将AQS的state更新为0
@ReservedStackAccess
protected final boolean tryRelease(int releases) {
int c = getState() - releases;
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
if (c == 0) {
free = true;
//将占用资源的线程置为null, 表示当前资源空闲
setExclusiveOwnerThread(null);
}
setState(c);
return free;
}
//唤醒线程
private void unparkSuccessor(Node node) {
//node为头节点
int ws = node.waitStatus;
if (ws < 0)
//将waitStatus通过CAS更新成0
node.compareAndSetWaitStatus(ws, 0);
Node s = node.next;
//如果节点为null或者是CANCELLED状态,则跳过,直至找到非取消状态的节点
if (s == null || s.waitStatus > 0) {
s = null;
for (Node p = tail; p != node && p != null; p = p.prev)
if (p.waitStatus <= 0)
s = p;
}
if (s != null)
//唤醒线程
LockSupport.unpark(s.thread);
}
4.4 列队中的第二个节点抢占锁
通过上一步将第二个节点中的线程唤醒后,因为第《4.2.3 acquireQueued(node, arg)-队列中的节点获取锁或被阻塞》小节中,acquireQueued方法中for(;;)自旋一直在进行,线程唤醒后,立即抢占锁。如果抢占锁成功,则第二个节点成为新的头节点,原头节点从队列中剔除。
转载自:https://juejin.cn/post/7270861556030799872