AQS(AbstractQueuedSynchronizer)介绍及源码分析AQS简介以及原理,本文从源码角度深入分析了
简介
AQS,全称AbstractQueuedSynchronizer(抽象队列同步器),位于java.util.concurrent.locks下,是一个可用来构建锁和同步器的框架。
核心思想
两个核心点:共享资源(state)、CLH队列
获取共享资源(通过CAS修改state的值,成功获取代表获取到锁,否则获取锁失败),成功当前工作线程运行,否则添加到队列中等待其他线程释放资源。
备注:CAS为原子操作,会有单独的章节讲解。
重要属性
private volatile int state; // 通过修改此值获取锁、释放锁
// CLH队列,存储了未获取到锁的线程队列
private transient volatile Node head; // 头节点
private transient volatile Node tail; // 尾节点
// AQS 分为共享模式、独占模式
static final class Node {
static final Node SHARED = new Node(); // 共享模式标记
static final Node EXCLUSIVE = null; // 独占模式标记
static final int CANCELLED = 1; // 线程取消
static final int SIGNAL = -1; // 释放资源需唤醒后继节点
static final int CONDITION = -2; // 等待condition唤醒
static final int PROPAGATE = -3; // 共享模式下,需要向后传播,资源是否有剩余,唤醒后继节点
volatile int waitStatus;
volatile Node prev;
volatile Node next;
volatile Thread thread; // 等待锁的线程
Node nextWaiter; // 等待条件的下个节点,congitionObject用到
}
AQS 中的方法
获取锁、释放锁其实都是对state变量的修改;加锁方法**:acquire()、acquireShared();释放锁方法release()**、releaseShared(),包含shared为共享模式,其他两个为独占模式。
独占模式:只有一个线程持有资源
共享模式:同时可以有多个多线持有资源
从源码角度开始分析AQS原理
源码分析
-
独占模式获取锁(acquire())
public final void acquire(int arg) { // 1、尝试获取资源 if (!tryAcquire(arg) && // 2、添加到队列 acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) // 3、当前线程中断 selfInterrupt(); } // 模版方法,获取资源交给具体实现类实现 protected boolean tryAcquire(int arg) { throw new UnsupportedOperationException(); }
//addWaiter 当前线程添加到队尾,并返回队列中节点
private Node addWaiter(Node mode) { Node node = new Node(Thread.currentThread(), mode); Node pred = tail; // 头结点不为空,添加到尾节点 if (pred != null) { node.prev = pred; if (compareAndSetTail(pred, node)) { pred.next = node; return node; } } // 队列为空 enq(node); return node; } private Node enq(final Node node) { // 1. 头结点放空节点(目的:前面有一个线程持有锁) // 2. 自旋把当前节点添加到队尾,指向头节点 for (;;) { Node t = tail; if (t == null) { // Must initialize if (compareAndSetHead(new Node())) tail = head; } else { node.prev = t; if (compareAndSetTail(t, node)) { t.next = node; return t; } } } }
// acquireQueued:自旋获取资源并判断是否需要被挂起
final boolean acquireQueued(final Node node, int arg) boolean true try boolean false for final // 前一个节点是头节点,尝试获取资源(再次判断获取到锁的线程是否释放锁) if // 获取到锁,设置头节点为当前 null // help GC false return // 尝试获取锁失败后执行到此 // 判断当前节点是否应该park if (shouldParkAfterFailedAcquire(p, node) && // 中断当前节点线程 parkAndCheckInterrupt()) interrupted = true; } } finally { if (failed) cancelAcquire(node); } // 挂起当前线程 private final boolean parkAndCheckInterrupt() this return
shouldParkAfterFailedAcquire
此方法处于死循环中,有效前驱找到设置为signal
// 作用:将node有效前驱找到,并设为signal,之后返回true就可以阻塞了 private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) { int ws = pred.waitStatus; if (ws == Node.SIGNAL) /* * 前驱设置SIGNAL,可以阻塞 **/ return true; if (ws > 0) { /* * 前驱状态大于0,及cancelled,说明前驱超时或中断,取消了自己。 * 跳过cancelled节点,找到一个<=0的节点 */ do { node.prev = pred = pred.prev; } while (pred.waitStatus > 0); pred.next = node; } else { /* * waitStatus 只能是 0 or PROPAGATE * CAS 设置ws为 signal */ compareAndSetWaitStatus(pred, ws, Node.SIGNAL); } return false; }
独占模式获取锁流程
tryAcquire() 尝试获取资源(具体实现类实现)
获取失败,通过addWaiter 方法把线程添加到队尾,并标记为独占模式(Node.EXCLUSIVE)
再次尝试获取资源(acquireQueued方法);前置节点是头结点,获取资源,成功当前节点为头结点,否则调用shouldParkAfterFailedAcquire
检测是否应该park;true,中断线程(parkAndCheckInterrupt),否则继续获取资源
检测是否应该park时,找到前置为signal的节点
最终tryAcquire 返回false,acquireQueued 返回true,中断当前线程
-
独占模式释放锁
public final boolean release(int arg) { // 尝试释放资源,具体实现类实现 if (tryRelease(arg)) { Node h = head; if (h != null && h.waitStatus != 0) // 唤醒后继节点 unparkSuccessor(h); return true; } return false; } protected int tryAcquireShared(int arg) { throw new UnsupportedOperationException(); }
// unparkSuccessor:获取下一个获取资源的线程,唤醒指定节点的后继节点
private void unparkSuccessor(Node node) { // 获取首节点状态 int ws = node.waitStatus; // 如果小于0 , 设置0,首节点准备释放同步状态,相当于清除此状态 if (ws < 0) compareAndSetWaitStatus(node, ws, 0); // 找到后继节点,如果为空或大于0,找到队列中靠前有效节点变为首节点后继节点 Node s = node.next; if (s == null || s.waitStatus > 0) { s = null; for (Node t = tail; t != null && t != node; t = t.prev) if (t.waitStatus <= 0) s = t; } // 后继节点不为空,唤醒 if (s != null) LockSupport.unpark(s.thread); }
独占模式释放锁流程
tryRelease 尝试释放资源(具体子类实现)
释放后,头结点不为空同时状态不为0,找到需要唤醒后继节点
-
共享模式获取锁
public final void acquireShared(int arg) { // 尝试获取锁,具体实现类实现; if (tryAcquireShared(arg) < 0) // 小于0,放入队列 doAcquireShared(arg); } protected int tryAcquireShared(int arg) { throw new UnsupportedOperationException(); }
private void doAcquireShared(int arg) { // 线程加入到队尾,设置共享模式标记 final Node node = addWaiter(Node.SHARED); boolean failed = true; try { boolean interrupted = false; for (;;) { final Node p = node.predecessor(); if (p == head) { int r = tryAcquireShared(arg); if (r >= 0) { // 设置头节点,唤醒后继节点获取资源 setHeadAndPropagate(node, r); p.next = null; // help GC if (interrupted) selfInterrupt(); failed = false; return; } } if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true; } } finally { if (failed) cancelAcquire(node); } }
// setHeadAndPropagate:
private void setHeadAndPropagate(Node node, int propagate) // Record old head for check below // 设置头结点 // 有剩余资源 // waitStatus < 0 标识有后继节点需要被唤醒 if (propagate > 0 || h == null || h.waitStatus < 0 || (h = head) == null || h.waitStatus < 0) { Node s = node.next; if (s == null || s.isShared()) doReleaseShared();
}
```
**共享模式获取锁流程**
- tryAcquireShared() 尝试获取资源(具体实现类实现)
- 获取失败,通过addWaiter 方法把线程添加到队尾,并标记为共享模式(Node.SHARED)
- 再次尝试获取资源(acquireQueued方法);前置节点是头结点,获取资源,资源大于等于0,调用setHeadAndPropagate,唤醒后继节点,否则调用shouldParkAfterFailedAcquire
- 检测是否应该park;true,中断线程(parkAndCheckInterrupt),否则继续获取资源
- 检测是否应该park时,找到前置为signal的节点
-
共享模式释放锁
public final boolean releaseShared(int arg) { if (tryReleaseShared(arg)) { doReleaseShared(); return true; } return false; }
// doReleaseShared
private void doReleaseShared() { for (;;) { Node h = head; // 队列至少2个node if (h != null && h != tail) { int ws = h.waitStatus; if (ws == Node.SIGNAL) { // 需要被唤醒 if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0)) continue; // loop to recheck cases // 唤醒后继节点 unparkSuccessor(h); } // 已经被唤醒或者即将被唤醒 else if (ws == 0 && !compareAndSetWaitStatus(h, 0, Node.PROPAGATE)) continue; // loop on failed CAS } // head 变化的情况:线程被唤醒,重新设置head if (h == head) // loop if head changed break; } }
获取锁、释放锁基本代码已经梳理完,下面再看重要的内部类ConditionObject
ConditionObject
是AQS中的内部类,实现了Condition,Condition提供了两个功能:等待、唤醒,与Object中等待唤醒类似;
ConditionObject就是条件队列,阻塞线程或唤醒线程
- 使用
Condition condition = new ConditionObject();
condition.wait();// 等待
condition.wait();// 唤醒
示例
ReentrantLock lock = new ReentrantLock();
Condition condition = lock.newCondition();
Thread consumer = new Thread(() -> {
String name = Thread.currentThread().getName();
try {
lock.lock();
System.out.println(name + " 获取到锁 并进入条件等待");
try {
condition.await();// 阻塞,直到被唤醒
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
System.out.println(name + " 被唤醒");
}finally {
lock.unlock();
System.out.println(name + " 释放锁");
}
},"消费者");
consumer.start();
Thread produce = new Thread(() -> {
String name = Thread.currentThread().getName();
try {
lock.lock();
System.out.println(name + " 获取到锁 三秒后唤醒");
try {
Thread.sleep(3000);
condition.signal();// 调用后,等待的线程会被唤醒
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
System.out.println(name + " 通知等待线程");
}finally {
lock.unlock();
System.out.println(name + " 释放锁");
}
},"生产者");
produce.start();
结果:
消费者 获取到锁 并进入条件等待
生产者 获取到锁 三秒后唤醒
生产者 通知等待线程
生产者 释放锁
消费者 被唤醒
消费者 释放锁
-
原理
内部维护等待队列,调用wait方法等待时,节点加入到等待队列,并释放锁,当调用signal唤醒时,获取等待队列第一个节点放到同步队列,等待被其他线程释放锁唤醒。
-
源码
-
类结构
public class ConditionObject implements Condition, java.io.Serializable { /** First node of condition queue. */ private transient Node firstWaiter; // 等待头节点 /** Last node of condition queue. */ private transient Node lastWaiter; // 等待尾节点 }
-
源码分析
等待
public final void await() throws InterruptedException { // 响应中断 if (Thread.interrupted()) throw new InterruptedException(); // 添加条件队列 Node node = addConditionWaiter(); // 释放锁 int savedState = fullyRelease(node); int interruptMode = 0; // 如果节点不在同步队列,无竞争锁资格 while (!isOnSyncQueue(node)) { // 挂起 LockSupport.park(this); // 唤醒后,从下面开始执行 // 如果线程中断退出 if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) break; } // 循环结束说明:节点到阻塞队列或线程中断 // 再次获取锁 if (acquireQueued(node, savedState) && interruptMode != THROW_IE) interruptMode = REINTERRUPT; // 线程被中断唤醒,断开node和后继 if (node.nextWaiter != null) // clean up if cancelled // 中断,移动到同步队列 unlinkCancelledWaiters(); if (interruptMode != 0) // 被中断唤醒的处理 reportInterruptAfterWait(interruptMode); }
addConditionWaiter:添加到等待队列尾节点
private Node addConditionWaiter() { Node t = lastWaiter; // If lastWaiter is cancelled, clean out. if (t != null && t.waitStatus != Node.CONDITION) { unlinkCancelledWaiters(); t = lastWaiter; } Node node = new Node(Thread.currentThread(), Node.CONDITION); if (t == null) firstWaiter = node; else t.nextWaiter = node; lastWaiter = node; return node; }
// 从头节点剔除非condition的节点 private void unlinkCancelledWaiters() { Node t = firstWaiter; Node trail = null; while (t != null) { Node next = t.nextWaiter; if (t.waitStatus != Node.CONDITION) { t.nextWaiter = null; if (trail == null) firstWaiter = next; else trail.nextWaiter = next; if (next == null) lastWaiter = trail; } else trail = t; t = next; } }
// 加入等待队列,需要释放占有的独占锁fullRelease
final int fullyRelease(Node node) { boolean failed = true; try { // 获取state值 int savedState = getState(); // 释放 if (release(savedState)) { failed = false; return savedState; } else { throw new IllegalMonitorStateException(); } } finally { // 获取失败,节点取消,并抛异常 if (failed) node.waitStatus = Node.CANCELLED; } }
注意:必须先获取锁,不然调用wait就会设置为取消
isOnSyncQueue:是否在阻塞队列中(AQS),
final boolean isOnSyncQueue(Node node) { // 节点在条件队列:同步队列使用pre和next,条件队列使用nextWaiter if (node.waitStatus == Node.CONDITION || node.prev == null) return false; // 同步队列 if (node.next != null) // If has successor, it must be on queue return true; // 状态是0.prev不为null return findNodeFromTail(node); }
private boolean findNodeFromTail(Node node) { Node t = tail; for (;;) { if (t == node) return true; if (t == null) return false; t = t.prev; } }
private static final int REINTERRUPT = 1;// wait方法退出从新调用中断
private static final int THROW_IE = -1;//wait方法退出抛出异常
// 中断先于唤醒:THROW_IE 唤醒先于中断:REINTERRUPT
private int checkInterruptWhileWaiting(Node node) {
return Thread.interrupted() ?
(transferAfterCancelledWait(node) ? THROW_IE : REINTERRUPT) :
0;
}
final boolean transferAfterCancelledWait(Node node) {
if (compareAndSetWaitStatus(node, Node.CONDITION, 0)) {
// 节点还在条件队列
enq(node);// 放于同步队列
return true;
}
// 在同步队列或入队
while (!isOnSyncQueue(node))
Thread.yield();
return false;
}
**唤醒**
```
public final void signal() {
// 非独占模式,抛异常
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
Node first = firstWaiter;
if (first != null)
// 唤醒等待队列第一个节点
doSignal(first);
}
```
```
private void doSignal(Node first) {
do {
if ( (firstWaiter = first.nextWaiter) == null)
lastWaiter = null;
// 头节点移除
first.nextWaiter = null;
} while (!transferForSignal(first) &&
(first = firstWaiter) != null);
}
```
```
final boolean transferForSignal(Node node) {
// 节点值改为0
if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
return false;
// 添加到同步队列
Node p = enq(node);
int ws = p.waitStatus;
if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
LockSupport.unpark(node.thread);
return true;
}
```
-
流程总结
等待:必须先获取锁
- 添加到等待队列
- 释放占有的独占锁
- 判断是否在同步队列,不在同步队列,挂起
要退出这个wait,有两个条件:
- 逻辑到break,及线程被中断
- 节点被移到同步队列,及另外线程调用signal或signalAll
- 尝试再获取锁(再次说明调用wait必须获取锁) 唤醒
- 获取第一个头节点
- 移动节点到同步队列
- 等待被唤醒
总结
本文从源码分析了AQS获取锁、释放锁原理,以及ConditionObject原理及流程;AQS作为一个同步器的框架,提供了模版方法,可以实现不同的同步机制;后续会分析通过AQS构造的同步器,如:ReentrantLock、Semaphore、ReentrantReadAndWriteLock等等。
转载自:https://juejin.cn/post/7369519814640304143