探秘 ReentrantLock 源码,认识AQS
「这是我参与2022首次更文挑战的第28天,活动详情查看:2022首次更文挑战」。
相信大家对ReentrantLock都非常熟悉,那么它的内部实现你是否了解呢?大名鼎鼎的AQS你是否清楚其运作原理呢?下面我们将一一揭秘其内部原理,最后通过几张图描述线程并发获取资源锁的过程。
前言
ReentrantLock 与 synchronized功能基本相同,是为了保证并发条件下多线程有序的获取资源,它比synchronized的更加灵活,可以灵活的控制加锁,解锁。
ReentrantLock的核心功能都依赖于AbstractQueuedSynchronizer(AQS,抽象的队列式的同步器),所以我们在探秘ReentrantLock时,必须得弄懂AQS,本文中也包含相关讲解。
ReentrantLock 的类继承关系
下图是ReentrantLock的类继承关系,我们可以看出ReentrantLock 的类继承关系是相对复杂的,它极大的依赖于AbstractQueuedSynchronizer,所有我们在研究ReentrantLock 的源码时不能急躁。
我们看一下Lock接口的规定了哪些功能。
public interface Lock {
/**
* 获取锁
*/
void lock();
/**
* 响应中断地获取锁
* 在锁获取过程中若某个时刻当前线程是中断状态,则立即抛出中断异常
*/
void lockInterruptibly() throws InterruptedException;
/**
* 尝试获取锁
*/
boolean tryLock();
/**
* 在一定时间内尝试获取锁
* 1.在给定时间内获得了锁,返回true
* 2.当前线程被中断,抛出异常,方法返回
* 3.·············超过给定时间,方法返回false。
*/
boolean tryLock(long time, TimeUnit unit) throws InterruptedException;
/**
* 释放锁
*/
void unlock();
/**
* 返回一个与当前锁绑定的新Condition实例,可以绑定多个Condition实例,此Condition实例用作线程通信
* Condition类的await()/signal()/signalAll() 类比于 Object类的wait()/notify()/notifyAll()
*/
Condition newCondition();
}
ReentrantLock 的源码解读
ReentrantLock 的核心AQS
当我们点进ReentrantLock的源码,首先我们就能看到一个变量Sync,它是通过静态内部类定义的,继承自AbstractQueuedSynchronizer,我们所说的AQS就是它。
AQS是一个用于构建锁和同步器的框架。它能降低构建锁和同步器的工作量,还可以避免处理多个位置上发生的竞争问题。在基于AQS构建的同步器中,只可能在一个时刻发生阻塞,从而降低上下文切换的开销,并提高吞吐量。
abstract static class Sync extends AbstractQueuedSynchronizer {
private static final long serialVersionUID = -5179523762034025860L;
/**
* 获取锁
*/
abstract void lock();
/**
* 非空尝试获取
*/
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}// c!= 0 且 当前线程是独占所有者线程
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
/**
* 尝试释放
*/
protected final boolean tryRelease(int releases) {
int c = getState() - releases;
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
if (c == 0) {
free = true;
setExclusiveOwnerThread(null);
}
setState(c);
return free;
}
protected final boolean isHeldExclusively() {
// While we must in general read state before owner,
// we don't need to do so to check if current thread is owner
return getExclusiveOwnerThread() == Thread.currentThread();
}
final ConditionObject newCondition() {
return new ConditionObject();
}
// Methods relayed from outer class
final Thread getOwner() {
return getState() == 0 ? null : getExclusiveOwnerThread();
}
final int getHoldCount() {
return isHeldExclusively() ? getState() : 0;
}
final boolean isLocked() {
return getState() != 0;
}
/**
* 从流中重新构造实例(对其进行反序列化)。
*/
private void readObject(java.io.ObjectInputStream s)
throws java.io.IOException, ClassNotFoundException {
s.defaultReadObject();
setState(0); // reset to unlocked state
}
}
我们再来走进AQS(AbstractQueuedSynchronizer),看看其究竟是何方神圣,从源码注释我们可以看出其本质是一个有如下结构的队列
+------+ prev +-----+ +-----+
head | | <---- | | <---- | | tail
+------+ +-----+ +-----+
我们再来看一下AQS队列中每一个节点的构成,源码如下:
首先有两个节点标记到底是共享还是独占,下边是表示等待状态的几个常量,节点包含当前线程,前一个节点,后一个节点,等待节点。
AQS支持独占锁(exclusive)和共享锁(share)两种模式。
- 独占锁:只能被一个线程获取到(Reentrantlock)
- 共享锁:可以被多个线程同时获取(CountDownLatch,ReadWriteLock).
static final class Node {
/** 用于指示节点正在共享模式下等待的标记 */
static final Node SHARED = new Node();
/** 用于指示节点正在独占模式下等待的标记 */
static final Node EXCLUSIVE = null;
/** 线程已取消 */
static final int CANCELLED = 1;
/** 后续线程需要取消连接 */
static final int SIGNAL = -1;
/** 线程正在等待状态 */
static final int CONDITION = -2;
/**
* 下一个应该获得共享 无条件传播
*/
static final int PROPAGATE = -3;
/**
* 等待状态 CANCELLED SIGNAL CONDITION PROPAGATE
* 0: 以上都没有
*/
volatile int waitStatus;
/**
* 前一个
*/
volatile Node prev;
/**
* 下一个
*/
volatile Node next;
/**
* 线程
*/
volatile Thread thread;
/**
* 下一个等待的
*/
Node nextWaiter;
/**
* 是共享的
*/
final boolean isShared() {
return nextWaiter == SHARED;
}
/**
* 返回上一个节点,如果为null,则抛出NullPointerException
*/
final Node predecessor() throws NullPointerException {
Node p = prev;
if (p == null)
throw new NullPointerException();
else
return p;
}
Node() { // 用于建立初始头部或共享标记
}
Node(Thread thread, Node mode) { // 用于添加等待
this.nextWaiter = mode;
this.thread = thread;
}
Node(Thread thread, int waitStatus) { // 按条件使用
this.waitStatus = waitStatus;
this.thread = thread;
}
}
再来看一下AQS队列的成员变量,其中state表示同步状态,从这里也可以看出AQS关注于头尾节点,分别存储其头结点与尾结点的指针。
/**
* 等待队列的头,懒加载。除了初始化时,只能通过方法setHead修改。注:如果head存在,则其waitStatus保证不会被删除取消。
*/
private transient volatile Node head;
/**
* 等待队列的尾部,延迟初始化。仅通过方法enq添加新的等待节点
*/
private transient volatile Node tail;
/**
* 同步状态
*/
private volatile int state;
下面,我们一起探秘一下AQS的原理,如下图
AQS会把所有的请求线程构成一个等待队列,当一个线程释放锁后,会激活自己的后继节点,等待执行的线程全部处于阻塞状态。
线程的显式阻塞是通过调用LockSupport.park() 完成,而LockSupport.park()则调用sun.misc.Unsafe.park() 本地方法,再进一步,HotSpot在Linux中中通过调用pthread_mutex_lock函数把线程交给系统内核进行阻塞。
AQS内部实现了两个队列,一个同步队列,一个条件队列。
-
同步队列:当线程获取资源失败之后,就进入同步队列的尾部保持自旋等待,不断判断自己是否是链表的头节点,如果是头节点,就不断参试获取资源,获取成功后则退出同步队列。
-
条件队列:为Lock实现的一个基础同步器,并且一个线程可能会有多个条件队列,只有在使用了Condition才会存在条件队列。
ReentrantLock 的公平锁和非公平锁
我们可以看到ReentrantLock 类中不仅仅有核心的Sync内部类,另外的还有两个子类分别用来实现公平锁和非公平锁,我们再结合其构造方法,就可以知道我们默认创建的是非公平锁,当我们在构造函数中传入true才会创建公平锁。
public ReentrantLock() {
sync = new NonfairSync();
}
/**
* 更据入参是创建公平锁还是非公平锁
*/
public ReentrantLock(boolean fair) {
sync = fair ? new FairSync() : new NonfairSync();
}
/**
* 非公平锁的同步对象
*/
static final class NonfairSync extends Sync {
private static final long serialVersionUID = 7316153563782823691L;
/**
* 执行锁定。尝试立即停止,恢复正常,在失败中获得成功。
*/
final void lock() {
if (compareAndSetState(0, 1))
setExclusiveOwnerThread(Thread.currentThread());
else
acquire(1);
}
protected final boolean tryAcquire(int acquires) {
return nonfairTryAcquire(acquires);
}
}
/**
* 公平锁的同步对象
*/
static final class FairSync extends Sync {
private static final long serialVersionUID = -3000897897090466540L;
final void lock() {
acquire(1);
}
protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
// 此处判断与公平锁不同
if (!hasQueuedPredecessors() &&
compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
}
从上述源码中我们可以看到tryAcquire(int acquires)公平锁与非公平锁实现的方案略有不同。
-
公平锁:!hasQueuedPredecessors() &&compareAndSetState(0, acquires)
// 查询是否有线程等待获取的时间长于当前线程。 public final boolean hasQueuedPredecessors() { Node t = tail; Node h = head; Node s; return h != t && ((s = h.next) == null || s.thread != Thread.currentThread()); }
-
非公平锁:compareAndSetState(0, acquires)
ReentrantLock 的核心方法
这些核心方法也是Lock接口规定的那些方法,我们可以看到这些方法全部交由Sync去实,我们还得再回头去看Sync。
public void lock() {
sync.lock();
}
public void lockInterruptibly() throws InterruptedException {
sync.acquireInterruptibly(1);
}
public boolean tryLock() {
return sync.nonfairTryAcquire(1);
}
public boolean tryLock(long timeout, TimeUnit unit)
throws InterruptedException {
return sync.tryAcquireNanos(1, unit.toNanos(timeout));
}
public void unlock() {
sync.release(1);
}
public Condition newCondition() {
return sync.newCondition();
}
首先看加锁:
// 非公平加锁
final void lock() {
if (compareAndSetState(0, 1))
setExclusiveOwnerThread(Thread.currentThread());
else
acquire(1);
}
// 公平锁
final void lock() {
acquire(1);
}
非公平锁的时候,会优先尝试通过CAS获取资源,如果当前空闲则会直接占用资源。如果当前状态是阻塞的,通过AQS提供的acquire方法获取锁,在acquire方法中,首先会调用tryAcquire(arg)尝试直接获取资源,如果获取成功,直接返回。tryAcquire(arg)是由子类实现的,在公平锁,和非公平锁中,会有不同的实现,但也都基于CAS。如果没有直接获取到资源,就将当前线程加入等待队列的尾部,并标记为独占模式,使线程在等待队列中自旋等待获取资源,直到获取资源成功才返回。如果线程在等待的过程中被中断过,就返回true,否则返回false。如果acquireQueued(addWaiter(Node.EXCLUSIVE), arg)执行过程中被中断过,就会执行selfInterrupt();方法。因为在等待队列中自旋状态的线程是不会响应中断的,它会把中断记录下来,如果在自旋时发生过中断,就返回true。然后就会执行selfInterrupt()方法,这个方法就是中断当前线程,其作用就是保证在自旋时不响应中断。
其中acquireQueued的主要作用是把已经追加到队列的线程节点进行阻塞,但阻塞前又通过tryAccquire重试是否能获得锁,如果重试成功能则无需阻塞 , 这里是非公平锁的由来之二。
public final void acquire(int arg) {
if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt()) // 防止无限循环下去
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
Node pred = tail;
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
enq(node);
return node;
}
解锁方法:
解锁方式调用的是AQS中的release方法
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
release方法中的tryRelease(int releases)是交由子类实现的
protected final boolean tryRelease(int releases) {
int c = getState() - releases;
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
if (c == 0) {
free = true;
setExclusiveOwnerThread(null);
}
setState(c);
return free;
}
尝试释放资源成功后,会通过unparkSuccessor找出第一个可以unpark的线程,一般说来head.next == head,Head就是第一个线程,但Head.next可能被取消或被置为null,因此比较稳妥的办法是从后往前找第一个可用线程。之后便是通知系统内核继续该线程,在Linux下是通过pthread_mutex_unlock完成。之后,被解锁的线程进入上面所说的重新竞争状态。
private void unparkSuccessor(Node node) {
int ws = node.waitStatus;
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
Node s = node.next;
if (s == null || s.waitStatus > 0) {
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
if (s != null)
LockSupport.unpark(s.thread);
}
public static void unpark(Thread thread) {
if (thread != null)
UNSAFE.unpark(thread);
}
ReentrantLock 加解锁流程演示
-
多个线程并发访问资源
-
线程1抢占到资源
-
线程1释放资源,线程2被唤醒,线程5,线程6加入资源竞争(如果是公平锁,则会判断线程等待时间)
-
线程5获取到资源,线程6加入队列
-
就这样一直到AQS队列空闲,无线程竞争资源
转载自:https://juejin.cn/post/7068214365933010952