likes
comments
collection
share

从ReentrantLock学习AQS -- JDK17版

作者站长头像
站长
· 阅读数 45

从ReentrantLock学习AQS -- JDK17版

前言

技术无止境,无论任何时刻,通过加强我们对技术的了解,都能为我们带来信心!

当前大部分的课程,或生产环境,可能都还在使用JDK8,但到现在这个时间点,包括Spring、Jenkins等等核心框架或核心生态,都已经在逐渐的升级到JDK17。

而JDK17对于JDK8来说的升级,简单来说,就是性能上的提升,无论还是底层逻辑,还是新的GC,任何事物,想长久发展,就必须与时俱进,自我升级!

AQS可以说,是所有Java高级工程师必须要掌握的知识点,掌握AQS对于在代码中如何并发编程,可以说是Java并发编码的基础,与Synchronized共同支撑着Java并发的整个架构。

本文概述

AQS在JDK17的代码有了一些调整。

已经掌握JDK8的大佬可以对比一下,看有哪些区别;

从前未研究过的,可以尝试一下,希望能帮助你了解AQS;

如有疑问,欢迎评论共同探讨。

这篇文章,是希望找一个角度来分析AQS究竟是如何实现锁的,主要的内容包括:

  • 简单的说一下AQS是什么
  • ReentrantLock是如何基于AQS实现锁管理的

AQS是什么

AQS是Java的JUC包中的AbstractQueueSynchronizer类,他是一个抽象类,基本上Java所有与锁有关的,都是基于AQS实现的。

常见类中包含:

  • ReentrantRock
  • ReentrantReadWriteRock
  • CountDownLatch
  • Semaphore
  • 线程池

数据结构

AQS通过定义了一些链表内部类,结合自身,实现了线程的状态管理与线程执行顺序

AQS定义了head与tail来表示头部和尾部,还有state属于用于表示当前的锁状态,AQS里面定义了一个抽象静态内部类Node,AQS与Node共同实现了一个双端列表;

同时在AQS中,还定义了一个内部静态类ConditionNode继承Node,与AQS的另一个内部类ConditionObject可以实现一个单向链表;

注意:JDK8中,AQS使用的waiter字来保存当前持有锁的线程, 在JDK17中通过父类AbstractOwnableSynchronizer来管理

JDK17的AQS简单代码:

public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer
    implements java.io.Serializable {
    // 等待队列头部,懒汉式初始化
    private transient volatile Node head;
    // 等待队列尾部
    private transient volatile Node tail;
    // 锁状态对于可重入锁,落锁的时候可以一直递增,只有释放锁直到为0时才真正的释放
    private volatile int state;
    
    // JDK8使用的waiter来保存的线程
    // 在JDK17中通过父类AbstractOwnableSynchronizer来管理了
    
    abstract static class Node {
        // ... 省略部分代码
        volatile Node prev;       // 上一节点
        volatile Node next;       // 下一节点
        Thread waiter;            // 等待的线程
        /**
         * 节点抢锁状态 
         * WAITING 1表示待获取锁并在阻塞前重新检查状态准备重试
         * CANCELLER 0x80000000 表示取消抢锁
         * 16进制,对应二进制1000 0000 0000 0000 0000 0000 0000 0000,在java中表示-1
         * 2表示正在等待
         */
        volatile int status;      
    }
    
    public class ConditionObject implements Condition, java.io.Serializable {
        // ...省略很多代码
        /** First node of condition queue. */
        private transient ConditionNode firstWaiter;
        /** Last node of condition queue. */
        private transient ConditionNode lastWaiter;
    }
    
    static final class ConditionNode extends Node
        implements ForkJoinPool.ManagedBlocker {
        // ...省略部分代码
        ConditionNode nextWaiter;   // 用来保存下一节点,单向链表没有使用父类的 next作为下一节点
    }
    
    // ...省略很多代码
}

AQS与Node搭配实现的双端链表

从ReentrantLock学习AQS -- JDK17版

AQS、ConditionObject与CondtionNode搭配实现的单向链表

从ReentrantLock学习AQS -- JDK17版

AQS的核心方法acquire

所有基于AQS的锁,都是通过AQS的acquire方法获取锁,这个方法是final的,不可重写的

这里是摘抄源码,只是增加了一些翻译,最好结合实现来食用,才会得到比较好的效果,比较复杂,可以暂时跳过:

/**
 * Main acquire method, invoked by all exported acquire methods.
 * 主要的获取锁方法,所有实现类获取锁的方法
 * @param node  null unless a reacquiring Condition 除非是有设置条件,默认是空的
 * @param arg   the acquire argument 获取锁的参数
 * @param shared    true if shared mode else exclusive 共享锁还是排他锁,true为共享锁
 * @param interruptible if abort and return negative on interrupt 如果运行时中断锁,将返回负值
 * @param timed if true use timed waits 是否定时等待
 * @param time if timed, the System.nanoTime value to timeout 如果设置了定时,使用System.nanoTime定义时间
 * @return positive if acquired, 0 if timed out, negative if interrupted 如果获取到锁返回正数,如果超时返回0,如果中断返回负数
 */
final int acquire(Node node, int arg, boolean shared,
                      boolean interruptible, boolean timed, long time) {
        Thread current = Thread.currentThread();
        byte spins = 0, postSpins = 0;   // retries upon unpark of first thread
        boolean interrupted = false, first = false;
        Node pred = null;                // predecessor of node when enqueued

        /*
         * Repeatedly: 判断逻辑
         *  Check if node now first 判断当前节点是否为首节点
         *    if so, ensure head stable, else ensure valid predecessor
         *  if node is first or not yet enqueued, try acquiring
         *  else if node not yet created, create it
         *  else if not yet enqueued, try once to enqueue
         *  else if woken from park, retry (up to postSpins times)
         *  else if WAITING status not set, set and retry
         *  else park and clear WAITING status, and check cancellation
         */

        for (;;) {
            // 如果非首节点,找到获取锁node(参数中的node)的前置节点,如果node的前置节点不为空
            // 并且AQS的首节点不是当前node的前置节点,这里还会给first赋值,用于下面的首节点判断
            // 这里最终要判断的逻辑就是,node不为首节点后的第一个节点,AQS的head节点是虚拟的
            // 默认first就是false,所以此处肯定会执行
            if (!first && (pred = (node == null) ? null : node.prev) != null &&
                !(first = (head == pred))) {
                if (pred.status < 0) {
                    // 如果node的前置节点的状态值小于0表示要回收,把前置节点回收
                    cleanQueue();           // predecessor cancelled
                    continue;
                } else if (pred.prev == null) {
                    // 确保序列化
                    Thread.onSpinWait();    // ensure serialization
                    continue;
                }
            }
            // 如果是node是“第一节点”,或前置节点为空
            if (first || pred == null) {
                boolean acquired;
                // 尝试获取锁
                try {
                    if (shared)
                        acquired = (tryAcquireShared(arg) >= 0);
                    else
                        acquired = tryAcquire(arg);
                } catch (Throwable ex) {
                    cancelAcquire(node, interrupted, false);
                    throw ex;
                }
                //如果获取锁成功
                if (acquired) {
                    // 如果是“第一节点”
                    if (first) {
                        // 此处是把节点的前置设为空,这样就解决了node对前置节点的引用
                        node.prev = null;
                        // 抢到锁了就表示当前节点成为了首节点
                        head = node;
                        // 前面赋值的前置节点,通过把next设为空,解除与当前node的引用,
                        // 此时node的prev不指向pred,pred的next也不指向node,
                        // pred没有了使用场景,即可帮助GC
                        pred.next = null;
                        // node的线程已经获取到了锁,不需要再等待了,把waiter置成空,
                        // 待此线程执行完后,就可以帮助线程回收
                        node.waiter = null;
                        if (shared)
                            signalNextIfShared(node);
                        if (interrupted)
                            // 如果中断,即中断当前线程
                            current.interrupt();
                    }
                    // 拿到锁,返回1,表示成功
                    return 1;
                }
            }
            // 如果node为空,则创建一个初始node
            if (node == null) {                 // allocate; retry before enqueue
                if (shared)
                    node = new SharedNode();
                else
                    node = new ExclusiveNode();
            } else if (pred == null) {          // try to enqueue 
                // pred==null表示是新的,新的抢锁没成功,所以尝试排队
                node.waiter = current;
                Node t = tail;
                node.setPrevRelaxed(t);         // avoid unnecessary fence
                if (t == null)
                    // 尝试初始化head
                    tryInitializeHead();
                else if (!casTail(t, node)) // 如果不能把node设置为队尾
                    node.setPrevRelaxed(null);  // back out
                else
                    // 把node设为队尾后,t已经不是队尾了,node才是队尾,所以t.next = node
                    t.next = node;
            } else if (first && spins != 0) {
                --spins;                        // reduce unfairness on rewaits
                Thread.onSpinWait();
            } else if (node.status == 0) {
                // 如果不是新的,但是状态为0,0是一个初始值,表示还需要继续等待下一轮抢锁
                node.status = WAITING;          // enable signal and recheck
            } else {
                long nanos;
                spins = postSpins = (byte)((postSpins << 1) | 1);
                if (!timed)
                    LockSupport.park(this);
                else if ((nanos = time - System.nanoTime()) > 0L)
                    LockSupport.parkNanos(this, nanos);
                else
                    break;
                node.clearStatus();
                if ((interrupted |= Thread.interrupted()) && interruptible)
                    break;
            }
            // 此循环中,如果没有设置超时或中断等待就会有一个持续获取锁的循环
        }
        // 取消获取锁
        return cancelAcquire(node, interrupted, interruptible);
    }

其他方法

AQS的hasQueuedThreads方法

此方法的作用是判断当前排队链表中,是否有线程在等待获取锁:

public final boolean hasQueuedThreads() {
    // 只要head不等于tail,且tail不为空则一直循环
    // 从tail往head循环
    for (Node p = tail, h = head; p != h && p != null; p = p.prev)
        if (p.status >= 0) 
            return true; // 只要有线程不为负,即表示有线程在排队
    return false;
}

AQS的cleanQueue方法

这个方法是用来整理AQS的链表的,这里也是回收线程,通知下一链表准备的位置之一:

private void cleanQueue() {
    for (;;) {
        for (Node q = tail, s = null, p, n;;) { // (p, q, s) triples
            // 本段逻辑中,通过最后的语义,会从tail开始,不断的让q = q.prev
            // 所以这段逻辑是从尾到头的反向遍历
            // p<- ->q<- ->s的关系(要记住,后面处理都有关系)
            if (q == null || (p = q.prev) == null)
                // 只有q为空了,或者q的前面为空了,才会中止清理队列
                // 此方法只有这一个退出点
                return;                      // end of list
            if (s == null ? tail != q : (s.prev != q || s.status < 0))
                // 经过循环,会对s赋值为上一循环的q,所以s不一定为空
                // 如果s为空,且tail != q表示上一个循环的q被清除了
                // 或者上一循环的q的前置不等于新的q时,表示s与q的关系发生了变化
                // 或者s的状态表示要回收了
                // 此时s需要被清除,或者重新处理逻辑
                // 这些都表示此时链表发生了变化,所以需要跳出此次循环
                // 但上一级是无限循环,所以还是会重新开始从尾到头开始处理 
                break;                       // inconsistent
            if (q.status < 0) {              // cancelled
                // 这里表示q结点是需要清除的
                if ((s == null ? casTail(q, p) : s.casPrev(q, p)) 
                    && q.prev == p) {
                    // 如果s为空,表示上一循环的s已经被回收了,
                    // 如果设置成功把尾节点换成p
                    // 或者s不为空,把s的前置替换为p(s.casPrev(q, p))成功,
                    // 并且q.prev == p,理论上q.prev == p是成立的
                    // 那么把p的下一节点替换成s
                    // 为什么替换成s,因为这里的逻辑是要把q干掉
                    p.casNext(q, s);         // OK if fails
                    if (p.prev == null)
                        // 此处的逻辑是通知p的下一个节点
                        // 如果p的前置为空了,表示p是head,而p此时的next是s
                        signalNext(p);
                }
                // 这些都表示此时链表发生了变化,所以需要跳出此次循环
                // 但上一级是无限循环,所以还是会重新开始从尾到头开始处理
                break;
            }
            // 因为前面,pq.prev,所以理论上p.next应该为q
            // 所以如果不相等,就需要作处理
            if ((n = p.next) != q) {         // help finish
                if (n != null && q.prev == p) {
                    // n不为空,但是q.prev又等于p
                    // 把p的next设置为q
                    p.casNext(n, q);
                    if (p.prev == null)
                        // 通知q准备
                        signalNext(p);
                }
                break;
            }
            // 对s,q重新赋值,让遍历的p,q,s各向前移一位
            s = q;
            q = q.prev;
        }
    }
}

继承者们基于AQS需要实现的重要方法

AQS在获取锁或释放锁的过程中,AQS无法单独完成,因为部分的尝试机制并没有具体实现,而是只有抽象方法,

他们分别是:

方法名语义AQS的调用位置
tryAcquire尝试获取锁在acquire中调用
tryRelease尝试释放锁在release中调用
tryAcquireShared尝试获取共享锁在acquireShared中调用
tryReleaseShared尝试释放共享锁在releaseShared中调用
isHeldExclusively是否为独占锁调用的地方较多,主要用于判断

ReentrantLock

ReentrantLock是JUC中定义的一个可重入锁,他并没有通过直接继承AQS来实现锁的功能,而是通过定义一个Sync内部来继承AQS来实现锁的功能的,

Sync也是一个抽象类,所以他也不是最终的使用实例,在ReentrantLock中,还分别有FairSync与NonfairSync两个内部静态类继承Sync来实现公平锁与非公平锁,

ReentrantLock使用代理模式通过sync来实现锁的管理,通过构造函数传参决定是公平锁还是非公平锁:

public class ReentrantLock implements Lock, java.io.Serializable {
    // ReetrantLock实现锁功能的代理属性
    private final Sync sync;
    abstract static class Sync extends AbstractQueuedSynchronizer {
        // 实现AQS的Sync
    }
    
    static final class NonfairSync extends Sync {
        // 非公平锁实现
    }
    
    static final class FairSync extends Sync {
        // 公平锁实现
    }
    
    // 两个构造参数,默认是非公平锁实现
    public ReentrantLock() {
        sync = new NonfairSync();
    }

    // 有参构造,true为公平锁,false为非公平锁
    public ReentrantLock(boolean fair) {
        sync = fair ? new FairSync() : new NonfairSync();
    }

    // 通过sync来实现具体的锁管理,此处仅列举一项
    public void lock() {
        sync.lock();
    }
}

ReentrantLock的Sync

Sync实现了tryRelease,但未对tryAcquire进行实现:

abstract static class Sync extends AbstractQueuedSynchronizer {
        // 默认的lock方法,ReentrantLock的lock方法就是这里实现的
        @ReservedStackAccess
        final void lock() {
            if (!initialTryLock())
                // 如果初始尝试获取不成功,则使用AQS的acquire的重写方法获取锁
                acquire(1);
        }
        
        /**
         * Checks for reentrancy and acquires if lock immediately
         * available under fair vs nonfair rules. Locking methods
         * perform initialTryLock check before relaying to
         * corresponding AQS acquire methods.
         * 初次尝试锁定
         * 实现类需要实现的方法,用来检查是否可以重入,同时判断锁是否可用
         * 实现类需要在调用AQS的acquire获取锁之前进行前置的判断
         */
        abstract boolean initialTryLock();
        
        // 实现的尝试释放锁方法
        @ReservedStackAccess
        protected final boolean tryRelease(int releases) {
            // 具体实现
        }
        
        // 非公平的尝试获取锁
        @ReservedStackAccess
        final boolean tryLock() {
            // 具体实现
        }
    }

Sync的两个实现NonfairSync与FairSync

NonfairSync与FairSync的区别在于对Sync的initialTryLock以及AQS的tryAcquire有着些微不同的实现,绝大部分都是相同的:

static final class xxxxSync extends Sync {
        final boolean initialTryLock() {
            Thread current = Thread.currentThread();
            int c = getState(); // 公平锁才会判断当前的锁状态
            if (c == 0) {
                // FairSync会通过hasQueuedThreads()判断是否有其他线程持有锁,而NonfairSync不会
                // 同时两种锁都尝试能不能把AQS的state从0变成1,
                if (!hasQueuedThreads() && compareAndSetState(0, 1)) {
                    // 如果设置成功,则表示获取锁,设置当前线程拥有锁,并返回获取锁成功
                    setExclusiveOwnerThread(current);
                    return true;
                }
            } else if (getExclusiveOwnerThread() == current) { 
                // 如果state不为0且持有线程为当前线程,进入此逻辑
                // 这里是判断有没有存在死循环,这里直接抛出异常
                if (++c < 0) // overflow
                    throw new Error("Maximum lock count exceeded");
                // 这里对state完成了递增,所以通过lock方法加锁多少次,state就会递增多少
                setState(c);
                return true;
            }
            return false;
        }
        
        /**
         * Acquire for non-reentrant cases after initialTryLock prescreen
         * 参数 acquires基本都为1
         */
        protected final boolean tryAcquire(int acquires) {
            // NonfairSync与FairSync的区别与initialTryLock一样,
            // FairSync会通过hasQueuedThreads()判断是否有其他线程持有锁
            if (getState() == 0 && !hasQueuedPredecessors() &&
                compareAndSetState(0, acquires)) {
                // 如果设置成功,则表示获取锁,设置当前线程拥有锁,并返回获取锁成功
                setExclusiveOwnerThread(Thread.currentThread());
                return true;
            }
            return false;
        }
    }

ReentrantLock的抢锁(Lock)流程

流程图如下:

从ReentrantLock学习AQS -- JDK17版

主要的函数如下:

// 第一步:ReentrantLock的lock方法其实是使用代理代工,通过sync实现获取锁的
final void lock() {
    sync.lock()
}

@ReservedStackAccess
final void lock() {
    // 第二步:sync根据具体的实现类,初始化并尝试获取锁
    if (!initialTryLock())
        // 第三步:如果初始化并获取锁失败,则进行AQS的重写的,简化的获取锁逻辑
        acquire(1);
}

// AQS的获取锁的方法acquire, arg = 1
public final void acquire(int arg) {
    // 第四步:尝试获取锁,这里是sync的子类FairSync或NonfairSync
    if (!tryAcquire(arg))
        /** 第五步:尝试获取锁失败,进入到原始的获取锁逻辑,所传参数表示
         * node 节点是空的,
         * ags表示其实是是用于state重入锁用的
         * 三个false分别代表,不是共享锁,不是打断的,永不超时的,最后的超时时间参数没有意义
         */
        acquire(null, arg, false, false, false, 0L);
}

xxxSync的initialTryLock方法

NonfairSync与FairSync在方法上只有些微区别,绝大部分实现是一样的:

final boolean initialTryLock() {
    Thread current = Thread.currentThread();
    int c = getState(); 
    if (c == 0) { // 公平锁才会提前判断当前锁是否被占用
        // 公平锁才会判断队列中是否有线程在排队,没有线程排队才尝试获取能否通过CAS设置state的值
        if (!hasQueuedThreads() && compareAndSetState(0, 1)) {
            // 设置当前线程为锁持有者
            setExclusiveOwnerThread(current);
            return true;
        }
    } else if (getExclusiveOwnerThread() == current) { // 如果c != 0,而持有锁的线程就是当前线程
        if (++c < 0) // overflow 如果c递增后,变为负数了,表示超过上限2^32-1了,直接抛异常
            throw new Error("Maximum lock count exceeded");
        // 设置锁的重入,state递增
        setState(c);
        return true;
    }
    return false;
}

通过这个方法,也可以看到,无论是公平锁,还是非公平锁,尝试初始化锁,都只是对state的尝试赋值,并没有排队的概念,AQS的head和tail都是空的

xxxSync的tryAcquire方法

NonfairSync与FairSync在方法上只有些微区别,绝大部分实现是一样的:

protected final boolean tryAcquire(int acquires) {
    // 公平锁才会判断队列中是否有线程在排队,没有线程排队才尝试获取能否通过CAS设置state的值
    if (getState() == 0 && !hasQueuedPredecessors() &&
        compareAndSetState(0, acquires)) {
        setExclusiveOwnerThread(Thread.currentThread());
        return true;
    }
    return false;
}

在ReentrantLock中AQS的acquire方法

根据ReentrantLock中调用的acquire方法的入参,对AQS的原acquire的方法可以清除后可以简化为以下代码:


final int acquire(Node node, int arg, boolean shared,
                      boolean interruptible, boolean timed, long time) {
    Thread current = Thread.currentThread();
    // 取消标记第一个线程时重试
    byte spins = 0, postSpins = 0;   // retries upon unpark of first thread
    boolean  first = false;
    Node pred = null;                // predecessor of node when enqueued

    // 以下循环查看顺序请按循环次序来看,只有抢到锁才会退出,异常了才会跳出循环取消锁获取
    for (;;) {
        // 第一次循环时
        // 此时node为空,所有pred也一定为空,所以pred != null一定为false,所以一定不会进入这个逻辑
        // 注意,因为这里是pred != null后面接的是&&,在pred != null就停止判断了,
        // 所以first不会重新赋值,所以为flase
        
        // 第二次循环
        // 此时node虽然不为空了,但是node.prev还是为空的,所有pred依旧为null,
        // 所以first依然不会赋值,里面的逻辑也不会执行
        
        // 第三次循环
        // 此时node的prev指向的是原来的tail,所以pred指向了原来的tail,
        // 而上一次循环中,原tail可能等于head,也有可能不等于head,
        // 当tail等于head时,first会赋值为true,但不会进入以下逻辑
        // 当tail不等于head,first还是为false,所以会进入以下逻辑
        if (!first && (pred = (node == null) ? null : node.prev) != null &&
            !(first = (head == pred))) {
            if (pred.status < 0) {
                // 如果前置节点的status为-1,表示需要取消等待
                // 第三次循环如果进入到此逻辑,
                // 详细的逻辑就是从尾部一直往前找
                cleanQueue();           // predecessor cancelled
                continue;
            } else if (pred.prev == null) {
                Thread.onSpinWait();    // ensure serialization
                continue;
            }
        }
        // 第一次循环
        // 此时pred == null,所以一定会进来尝试获取锁
        
        // 第二次循环
        // pred依旧为null,所以会再一次尝试获取锁
        if (first || pred == null) {
            boolean acquired;
            try {
                // 第六步,所以AQS也是通过实现类的tryAcquire尝试获取锁的
                acquired = tryAcquire(arg);
            } catch (Throwable ex) {
                cancelAcquire(node, interrupted, false);
                throw ex;
            }
            if (acquired) {
                // 第一次循环时
                // first在上面的逻辑中,因为不会触发重新赋值,所以以下逻辑是不会进入的
                // 第二次循环时,first依然不会重新赋值,所以以下逻辑还是不会进入
                if (first) {
                    node.prev = null;
                    head = node;
                    pred.next = null;
                    node.waiter = null;
                    current.interrupt();
                }
                // 此时返回1,表示抢到锁了,就直接退出了方法
                return 1;
            }
        }
        // 第一次循环
        // 此时node是空的,这里的逻辑肯定是执行这里,所以走完逻辑后,进入下一次循环
        if (node == null) {                 // allocate; retry before enqueue
            // 初始化一个独享节点,这只是一个类型定义,没有具体实现
            node = new ExclusiveNode();
        } else if (pred == null) {          // try to enqueue
            // 第二次循环时,node不为空了,但是pred == null,所以会进入此逻辑
            // 这个逻辑是尝试加入队列的操作
            // 这里的waiter设置为当前线程
            node.waiter = current;
            Node t = tail;
            // 让node的前置指向AQS的tail
            node.setPrevRelaxed(t);         // avoid unnecessary fence
            if (t == null)
                // 如果是空的,尝试初始化head
                // 里面的逻辑会给head创建一个独享节点,同时把tail也指向head,即链表的头尾相等
                tryInitializeHead();
            else if (!casTail(t, node)) // 如果tail不为空,就尝试把当前节点设为tail
                // 如果设置为tail失败,让node的前置节点指向空,
                node.setPrevRelaxed(null);  // back out
            else
                // 如果tail不为空,让tail的next指向当前node
                t.next = node;
            // 走完逻辑,将进入第三次循环
        } else if (first && spins != 0) {
            --spins;                        // reduce unfairness on rewaits
            Thread.onSpinWait();
        } else if (node.status == 0) {
            node.status = WAITING;          // enable signal and recheck
        } else {
            break;
        }
    }
    return cancelAcquire(node, interrupted, interruptible);
}

通过代码的分析,可以发现,AQS的acquire方法逻辑,最终的获取锁的实现,还是通过子类实现类来实现的,自身只是定义了对链表的的维护逻辑,包括清理、初始化、节点的增加、通知

ReentrantLock的AQS的cleanQueue方法解析

// 第三次循环如果进入到此逻辑
private void cleanQueue() {
    for (;;) {                               // restart point
        // 这里又是一个无限循环,因为有多次进入cleanQuue的可能,为使逻辑的连续性
        // 这里的循环采用 父循环 - 当前循环 - 以下循环 的结构说明
        for (Node q = tail, s = null, p, n;;) { // (p, q, s) triples
            if (q == null || (p = q.prev) == null)
                // 3-1-1循环q,即tail 肯定不等于 null,所以不会进入此逻辑
                // q为空,或q的前置为空,才会中止清理队列
                return;                      // end of list
            if (s == null ? tail != q : (s.prev != q || s.status < 0))
                // 3-1-1循环的s是为空的,所以这里判断tail != q,但此时q==tail,所以不会进入此逻辑
                break;                       // inconsistent
            // 3-1-1循环中,q的status==0,所以不会进入以下逻辑
            if (q.status < 0) {              // cancelled
                // 这里表示q结点是需要清除的
                if ((s == null ? casTail(q, p) : s.casPrev(q, p)) && q.prev == p) {
                    // s一般不为空,所以把p变成tail的前置(s.casPrev(q, p)),
                    // 理论上q.prev == p是成立的,所以把p的next转为上一个循环的q
                    p.casNext(q, s);         // OK if fails
                    if (p.prev == null)
                        // 此处的逻辑是通知p的下一个节点
                        // 如果p的前置为空了,表示p是head,而p此时的next是s
                        signalNext(p);
                }
                break;
            }
            // 在3-1-1循环中,p = tail.prev, 所以通过赋值n=p.next,此时n == q,所以不会进入以下逻辑
            // 经过多此循环后,p.next会不等于q,此时进入逻辑
            if ((n = p.next) != q) {         // help finish
                if (n != null && q.prev == p) {
                    // 如果n不为tail,及已经循环过大于等于2次了,
                    // 因为没有进入q.status<0的逻辑,所以q.prev == p
                    // 把p的下一节占设置为q
                    p.casNext(n, q);
                    if (p.prev == null)
                        signalNext(p);
                }
                break;
            }
            // 3-1-1继续往前找,无限循环
            s = q;
            q = q.prev;
        }
    }
}

ReentrantLock的释放(release)流程

ReentrantLock释放锁的流程较为简单,也是利用Sync采用代理模式完成的,通过判断state是否为0来判断能否彻底释放锁;

因为ReentrantLock在使用lock方法是,对state是持续递增的,如果一直有重入锁的逻辑,state会越来越大,所以在业务逻辑中,调用了多少次lock方法,就要调用多少unlock方法,才能实现锁的释放。

流程图如下:

从ReentrantLock学习AQS -- JDK17版

主要流程函数如下:

// 第一步:ReentrantLock的unlock方法其实是使用代理处理,通过sync实现释放锁的
public void unlock() {
        sync.release(1);
    }

@ReservedStackAccess
public final boolean release(int arg) {
        if (tryRelease(arg)) { // 第二步,尝试释放锁,只有state为0了才能释放锁
            // 如果成功,通知下一等待线程
            signalNext(head);
            return true;
        }
        return false;
}

Sync的tryRelease方法:

@ReservedStackAccess
protected final boolean tryRelease(int releases) {
    // 给C赋值state减去此次释放的次数(1)
    int c = getState() - releases;
    if (getExclusiveOwnerThread() != Thread.currentThread())
        // 如果拥有锁的线程不是当前线程,抛异常
        // 避免被其他线程操作
        throw new IllegalMonitorStateException();
    // 判断state减去release是否已经为0了
    boolean free = (c == 0);
    if (free)
        // 如果c为0,表示可以释放锁了,把拥有锁的线程设为空
        setExclusiveOwnerThread(null);
    // 设置state的新值
    setState(c);
    return free;
}

到此,在JDK17中,ReentrantLock的锁管理基本就是这样了,最后做一个总结:

AQS定义了获取锁的模板方法,但具体获取锁还是需要实现类通过实现tryAcquire来实现真正的锁获取逻辑,AQS定义的模板方法只是规范了AQS链表的管理及处理逻辑。

ReentrantLock是通过定义内部类Sync去继承AQS来达到基于AQS锁功能的,锁的管理也是基本Sync采用代理模式完成的。

Sync实现了绝大部分AQS功能,但未实现tryAcquire方法,而是让两个子类NonfairSync与FairSync来实现,通过ReentrantLock的构造方法可以决定是使用哪个子类作为锁的管理代理对象。

转载自:https://juejin.cn/post/7256255856152559673
评论
请登录