likes
comments
collection
share

Java 并发编程 AQS 介绍和源码解析(配视频)

作者站长头像
站长
· 阅读数 26

前言

今天想和大家聊一聊 AQS 。想起来在进同程之前面试了沪上阿姨(我没想到做饮品的竟然也有自己的研发团队,那时正好年底十二月份不好找工作就去试了试)。那个面试官是个铁八股文,全程八股文面试。聊完 MySQLredo logundo log。然后又要聊 AQS,以下是我们的对话:

面试官:看过 AQS 源码吗

我:没有

面试官:五年经验没看过 AQS 源码?

我:(内心:这和几年经验有半毛钱关系吗?工作中用不到为什么要看?看了你就会设计吗?为了应付面试硬扒源码有意思?而且我特么是三年经验......)

我: 额,我目前三年工作经验,不是五年,因为工作中用不到,所以没看过,需要锁的时候都是直接拿实现好的类。例如:ReentrantLock,Semaphore

我: 我看过 SpringCloud 相关组件源码,例如 LoadBalancer、OpenFeign、Eureka,因为 工作中有生产问题需要排查,而且我觉得源码不难,通常是有需要阅读源码解决现有问题的时候才会看,工作中用不到为什么要看呢?

面试官:哼,用不到(语气词,反正一脸不屑)

也怪我被他带偏了,导致自己语气也不好。我是真的很难理解,为什么这么多面试官一边做着秒级别并发不过百请求的项目,一边招聘又对候选人要求懂各种高并发原理、底层细节、亿级别流量处理。难道当自己是顶端大厂招人吗?咱们现实点,根据自己薪资范围招聘合适水平的人好吗?

我之前看 SpringCloud 组件源码的时候都是工作中有需要用到才会去看,而且候选人没看过某一个技术组件源码又能说明什么呢?除了说明候选人没看过之外,真的说明不出任何问题。你总不能因为他没看过某个技术源码,就说他能力不行吧?万一这个源码候选人看两遍就懂了呢?当场扒代码给面试官讲解,阁下如何应对?

好了废话不多说了,我们直接进入主题吧。本篇文章 AQS 源码基于 JDK 1.8。由于本篇内容文章讲解可能不易理解,我专门出了视频讲解在 B 站,视频会更清楚的讲解一些细节。下面会附上链接

视频解析

文字以及静态图的方式不容易读懂,因此我自己录制了一个详细的视频,精确到每一行代码来讲解 AQS 专题 从 ReentrantLock.lock() 逐行讲解 AQS 源码+内存图

前置知识

在学习 AQS 之前,需要确保对以下内容有基本的了解

  • CAS (Compare And Swap)

一种乐观锁思想,有三个元素分别是 内存位置、原先旧值、要更新的新值。伪代码:compareAndSwap( stateOffset, expect, update) 如果 stateOffset 目前的值是 expect,那么就将其更新为 update,否则不修改。注意这是个操作系统层面支持的线程安全的方法。

  • volatile 关键字
  • LockSupport

AQS 简介 & 源码初探

我们说的 AQS 其实是一个类的首字母简称,这个类的全名是 AbstractQueuedSynchronizer。翻译过来是 抽象被排队的同步器AQSJava 并发包下面多种并发工具的核心实现,本篇文章我们选取常用的 ReentrantLock 为例讲解 AQS 源码。

AQS 锁住的是什么

回想我们使用 synchronized 关键字实现线程安全的时候,synchronized 关键字锁定的是某个对象。

synchronized(obj){
//...
}

//锁住方法调用者对象
public void synchronized method(){...}

//锁住方法调用者类的 Class 对象
public static void synchronized method(){...}

我们知道对象其实是在 JVM 堆中的一块内存地址,内存地址中有一块区域是对象头,对象头中有一个锁状态标志。举个例子,下面这段代码

public class Test {
    User user = new User();
    private int number;

    public static void main(String[] args) {
        Test test = new Test();
    }
}

这段代码在内存中的布局为:

Java 并发编程 AQS 介绍和源码解析(配视频)

这块内容不是今天的重点,但是后面阅读 AQS 源码需要有 JVM 运行时内存分配的知识,这里简单提一下。

简单的理解就是这块内存地址在被一个线程加锁访问时,其他线程无法访问。所以,锁可以认为是一块内存地址或者说是一块共享资源。

那么我们可以猜想 AQS 要实现锁,肯定也要锁定一块内存地址,但是我们发现使用 ReentrantLock.lock()、ReentrantLock.unlock() 方法的时候没有传一个引用类型的锁对象,这表明锁是被维护在 AQS 内部的。不过通过阅读源码我们会知道, AQS 内部不是锁一个引用类型的对象,而是维护了一个 int 类型的成员变量 state 作为锁

/**  
 * The synchronization state.  
 */  
private volatile int state;

/**  
 * 当前获取锁的线程
 */  
private transient Thread exclusiveOwnerThread;

/**  
 * AQS 中维护的同步队列的头节点
 */  
private transient volatile Node head;  
  
 /**  
 * AQS 中维护的同步队列的尾节点
 */  
private transient volatile Node tail;

statevolatile 关键字修饰,代表被线程更新后,其他线程的读取能够读取到最新值。这个变量的不同值代表锁的不同状态。

  • state = 0 当前没有线程持有锁
  • state = 1 当前有一个线程持有锁
  • state > 1 当前某个线程多次上锁(重入锁)

AQS 工作流程草图

一个线程在持有锁期间,其他线程都会排队等待,直到被唤醒之后脱离队列,可以简单理解为下面两张图的变化

Java 并发编程 AQS 介绍和源码解析(配视频)

线程1 释放锁,等待线程2 获取锁之后,依次 Java 并发编程 AQS 介绍和源码解析(配视频)

AQS 怎样确定唤醒哪个线程

synchronizedJVM 内置支持的,进入代码块获取锁,退出代码块释放锁,其他线程会自动根据 CPU 调度抢占。但是 AQSJava 代码实现的,当一个线程的业务执行完毕(我们手动调用 unlock() 后),需要唤醒其他等待锁的线程应该如何实现?当然我们都知道用 LockSupport.unpark(thread) 唤醒,但是 thread 对象从哪来?如何知道众多等待线程中应该唤醒哪一个呢?所以,在竞争锁的时候就需要一个数据结构来提前存储所有参与等待锁的线程。这样在唤醒的时候根据指定规则拿到目标线程 thread,调用 API 唤醒即可。

正如上一段我们看到的图,AQS 提供了一个静态内部类 Node 来存储所有参与竞争锁的线程。每个 Node 对应一个线程,每个 Node 节点内部存储了自己的前继节点 prev 和后继节点 next,多个 Node 节点之间会组成一个双向链表。下面代码是 Node 类中几个重要属性

static final class Node{
    /**
     * 节点状态:0:初始化 -1:需要准备唤醒后继节点的线程
     */
    volatile int waitStatus;
    
    /**
     * 前继节点
     */
    volatile Node prev;
    
    /**
     * 后继节点
     */
    volatile Ndde next;
    
    /**
     * 当前线程
     */
    volatile Thread thread;
}

注意我们使用 ReentrantLock.lock() 场景时, waitStatus 只会用到 0 和 -1 两个值,所以其他值这里暂不讨论。

当有多个线程竞争锁的时候 Node 节点关系图示如下,他们组成一个双向链表:

Java 并发编程 AQS 介绍和源码解析(配视频)

这样就可以从链表中拿到 thread 对象去指定唤醒。这里的链表就是由 AQShead、tail 两个节点不断添加新节点组成的。

通过 ReentrantLock.lock 探究 AQS 的奥秘

以下面的代码为例,根据 ReenTrantLock.lock() 方法进入源码

  public static void main(String[] args) {
        ReentrantLock lock = new ReentrantLock();
        new Thread(new Runnable() {
            @Override @SneakyThrows
            public void run() {
                lock.lock();
                System.out.println("A 线程执行中...");
                TimeUnit.MINUTES.sleep(10);//测试
                System.out.println("A 线程执行完毕...");
                lock.unlock();
            }
        },"线程A").start();

        new Thread(new Runnable() {
            @Override @SneakyThrows
            public void run() {
                TimeUnit.SECONDS.sleep(1);
                lock.lock();
                System.out.println("B 线程执行中...");
                System.out.println("B 线程执行完毕...");
                lock.unlock();
            }
        },"线程B").start();

        new Thread(new Runnable() {
            @Override @SneakyThrows
            public void run() {
                TimeUnit.SECONDS.sleep(2);
                lock.lock();
                System.out.println("C 线程执行中...");
                System.out.println("C 线程执行完毕...");
                lock.unlock();
            }
        },"线程C").start();

        new Thread(new Runnable() {
            @Override @SneakyThrows
            public void run() {
                TimeUnit.SECONDS.sleep(3);
                lock.lock();
                System.out.println("D 线程执行中...");
                System.out.println("D 线程执行完毕...");
                lock.unlock();
            }
        },"线程D").start();
        
        new Thread(new Runnable() {
                    @Override @SneakyThrows
                    public void run() {
                        TimeUnit.MINUTES.sleep(10);
                        System.out.println("E线程准备和 B 线程竞争锁...");
                        lock.lock();
                        System.out.println("E线程成功竞争锁...");
                        System.out.println("E线程执行完毕...");
                        lock.unlock();
                    }
                },"线程E").start();
    }

从上面代码可以看到 线程A 先进入 lock() 方法。

public void lock() {
    sync.lock();
}

可以看到这里是直接调用 sync 对象的 lock() 方法,查看 Sync 类发现它是 AQS 的一个子类

abstract static class Sync extends AbstractQueuedSynchronizer {
    abstract void lock();
}

AQS.lock() 方法是抽象的,有两个实现 FairSync、NonFairSync 默认的 ReentrantLock 构造函数是非公平锁

public ReentrantLock() {  
    sync = new NonfairSync();  
}

所以会调用 NonFairSync.lock() 我们直接查看它的源码

final void lock() {
    //直接尝试 CAS 修改 state 从 0 变为 1 获取锁(第一次)
    if (compareAndSetState(0, 1))
        //修改成功后,设置当前持有锁的线程
        setExclusiveOwnerThread(Thread.currentThread());
    else
       //CAS 失败,表明其他线程持有锁,执行 acquire 逻辑
        acquire(1);
}

线程A 进来之后直接尝试用 CAS 修改 state 的值,由于 state = 0,所以当前 线程A 是可以修改成功的,修改完之后 state = 1,表示 线程A 持有锁。然后将当前持有锁的线程设置为 线程A,结束。

根据示例代码,此时 线程B 进入 lock(),由于此时 state = 1,所以 CAS 操作不会成功,进入 acquire(1) 方法。

public final void acquire(int arg) {
    if (!tryAcquire(arg) &&
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        selfInterrupt();
}

可以看到这个方法里面有四个方法 tryAcquire(arg)、addWaiter(Node.EXCLUSIVE)、acquireQueued()、selfInterrupt() 下面我们来逐个分析

tryAcquire 代码分析

跟踪这个方法发现最后调用的是 NonfairSync.nonfairTryAcquire(1) ,方法名翻译过来是 非公平模式的尝试获取 ,源码如下

final boolean nonfairTryAcquire(int acquires) {
    //当前线程
    final Thread current = Thread.currentThread();
    //当前锁 state 的值
    int c = getState();
    //如果 state 暂时没有线程上锁
    if (c == 0) {
        //当前线程再次尝试竞争锁(第二次)
        if (compareAndSetState(0, acquires)) {
            //成功抢到锁,修改当前持有锁的线程
            setExclusiveOwnerThread(current);
            return true;
        }
    }
    //如果当前线程就是持有锁的线程,表明一个线程多次加锁(重入锁), state 自增
    else if (current == getExclusiveOwnerThread()) {
        int nextc = c + acquires;
        if (nextc < 0) // overflow
            throw new Error("Maximum lock count exceeded");
        setState(nextc);
        return true;
    }
    return false;
}

线程B 进入方法由于此时 state线程A 持有 10分钟。所以这里 state = 1线程B 进入这个方法直接返回 false!tryAcquire(arg) == true,所以会执行 addWaiter()

addWaiter & enq 代码分析

addWaiter 这个方法名翻译过来是 添加等待 源码如下

private Node addWaiter(Node mode) {
    //构造一个当前线程的 Node 节点
    Node node = new Node(Thread.currentThread(), mode);
    Node pred = tail;
    //如果尾结点不为空
    if (pred != null) {
        //将当前线程的前驱节点指向尾结点
        node.prev = pred;
        //把当前节点替换成尾结点
        if (compareAndSetTail(pred, node)) {
            //把原尾结点的后继节点指向当前节点
            pred.next = node;
            return node;
        }
    }
    enq(node);
    return node;
}

线程B 进入的时候此时尾结点肯定是 NULL 的,所以会走到 enq(node) 方法,分析其源码

/**
 * @param node 新进入的线程对应的 Node 节点
 */
private Node enq(final Node node) {
    //死循环
    for (;;) {
        Node t = tail;
        //如果尾节点为空
        if (t == null) { // Must initialize
            //初始化一个虚拟节点当做头节点,头节点里面 thread = null
            if (compareAndSetHead(new Node()))
                //尾结点也指向头节点指向的内存地址(即当前头结点就是尾节点)
                tail = head;
        } else {
            //如果尾节点不为空,让当前节点的前继节点指向尾节点
            node.prev = t;
            //把当前节点设置成尾结点
            if (compareAndSetTail(t, node)) {
                //把原来尾节点的后继节点指向已经被设置为尾节点的当前节点
                t.next = node;
                return t;
            }
        }
    }
}

上面这段代码不复杂,但是如果没有一点 JVM 运行时内存分配的知识,绝对是很难能够完全看得懂。总结上述代码的功能:enq(node) 当队列中没有任何节点的时候,初始化一个虚拟的头节点,然后把当前节点当做尾结点。 内存图如下:

Java 并发编程 AQS 介绍和源码解析(配视频)

数字1 代表第一次循环的时候指针指向,数字2 代表第二次。enq(node) 方法执行完毕之后 AQS 里面就有了 head、tail 两个节点。

enq(node) 方法执行完毕之后 线程BaddWaiter() 方法就结束了,下面会进入 acquireQueued() 方法。这里我们先暂停,继续来看 addWaiter() 。当 线程C 进入 addWaiter() 方法时,由于尾结点不为 NULL。此时 线程C 的节点会被设置为尾结点,然后原来的尾结点(线程B 的节点)会被作为 线程C 的前继节点。

Java 并发编程 AQS 介绍和源码解析(配视频)

上图 609 ~ 613 行代码 线程C 执行后的队列内存结构如下图

Java 并发编程 AQS 介绍和源码解析(配视频)

也就是说,每一个没有获取到锁的节点都会放入到双向链表队列尾部

acquireQueued 代码分析

我们继续分析 线程B 进入 acquireQueued() 方法之后

final boolean acquireQueued(final Node node, int arg) {
    boolean failed = true;
    try {
        boolean interrupted = false;
        //死循环
        for (;;) {
            final Node p = node.predecessor();
            //如果当前节点的前继节点是头节点,再次尝试获取锁(第三次)
            if (p == head && tryAcquire(arg)) {
                //获取锁成功之后,把自己设置成头节点,并且清空 prev、thread 等成员变量
                //(因为头节点是虚拟节点,不绑定线程)
                setHead(node);
                //原先的头节点已经没用了,等 GC 操作即可
                p.next = null; // help GC
                failed = false;
                return interrupted;
            }
            //获取锁失败后将前继节点的 waitStatus 设置为 -1,然后挂起线程等待唤醒
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                interrupted = true;
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}

线程B 进去这个方法的时候由于 线程A 持有锁 10分钟,所以这里会进入 shouldParkAfterFailedAcquire(p, node)。查看其源码

/**
 * @param pred 当前线程所在节点的前驱节点
 * @param node 当前线程所在的节点
 */
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
        int ws = pred.waitStatus;
        //如果前继节点 waitStatus 已经是 -1 直接返回 true
        if (ws == Node.SIGNAL)
            return true;
        if (ws > 0) {
            //本篇文章的场景不会走进来
            do {
                node.prev = pred = pred.prev;
            } while (pred.waitStatus > 0);
            pred.next = node;
        } else {
            //如果前继节点 waitStatus = 0,将其改为 -1
            compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
        }
        return false;
    }

线程B 进入此方法之后,前继节点的 waitStatus = 0 ,所以会执行 compareAndSetWaitStatus(pred, ws, Node.SIGNAL); 将前继节点的 waitStatus 改成 Node.SIGNAL = -1。然后返回 false

然后死循环会再次执行到这里,发现前继节点已经是 waitStatus = Node.SIGNAL 了,所以会直接 return true;。然后会执行 parkAndCheckInterrupt() 方法,查看其源码

private final boolean parkAndCheckInterrupt() {
    //挂起当前线程,等待许可唤醒
    LockSupport.park(this);
    return Thread.interrupted();
}

哇喔,看到这里柳暗花明又一村了呀,直接把当前线程挂起了,LockSupport.park(this);执行完毕之后,线程B 就停在这等待唤醒之后才能继续往下执行代码。然后 线程C、线程D 也会和 线程B 一样挂起在这个位置。

总结

这里我们总结 AQS 加锁过程中几个重要的操作

  • 线程使用 CAS 修改 state = 1 来获取锁
  • 未获取锁的线程将会依次入队列,初始化一个虚拟头节点,每次新来一个线程都会 new 一个新节点放入队尾
  • 放入队尾的节点会将前继节点的 waitStatus 修改成 -1,表明前继节点获取锁执行业务完毕,释放锁之后要通知自己
  • 入队完成之后会将当前线程挂起,等待 LockSupport.unpark() 许可唤醒
  • 每个线程在调用 lock() 方法尝试竞争锁到完成入队列挂起之间,最多有三次竞争锁的机会

unlock 是怎样解锁的

源码分析

前面我们已经逐步分析了加锁、入队列的源码,在上一段我们的示例代码四个线程都执行完 lock() 方法之后,队列图示如下。

Java 并发编程 AQS 介绍和源码解析(配视频)

现在线程B、线程C、线程D 都在等着唤醒呢,假设过了十分钟,线程A 调用 unlock() 释放锁,观察 unlock() 源码:

public void unlock() {
    sync.release(1);
}

跟踪源码最终调用的是 AQS 中的 release(arg) 方法

public final boolean release(int arg) {
    //尝试释放锁
    if (tryRelease(arg)) {
        Node h = head;
        //如果头节点存在并且头节点状态不是 0(此时通常 waitStatus = -1) 
        if (h != null && h.waitStatus != 0)
            //唤醒后继节点(head 后面的那个节点就是真正参与竞争锁的第一个节点)
            unparkSuccessor(h);
        return true;
    }
    return false;
}

我们看 tryRelease(arg) 是如何释放锁的,其实非常简单,既然加锁是对 state 操作,那么解锁自然也是对 state 操作

protected final boolean tryRelease(int releases) {
    // state 减 1
    int c = getState() - releases;
    //如果释放锁的线程不是当前持有锁的线程,报错
    if (Thread.currentThread() != getExclusiveOwnerThread())
        throw new IllegalMonitorStateException();
    boolean free = false;
    if (c == 0) {
        free = true;
        //置空当前持有锁的线程
        setExclusiveOwnerThread(null);
    }
    //更新 state 的值
    setState(c);
    return free;
}

直接给 state = state - 1,这里不是直接把 state1 改成 0 是因为重入锁的存在,重入锁获取了几次锁,就需要解掉几次锁来释放。

注意此方法的返回值,只有当 state = 0 的时候返回的是 true ,代表完全解锁,当 state != 0 的时候说明是重入锁只解了一部分锁

释放锁之后我们来观察 unparkSuccessor() 源码

/**
 * 唤醒后继节点(如果存在)
 * @param node 当前同步队列里面的头节点
 */
private void unparkSuccessor(Node node) {
    int ws = node.waitStatus;
    if (ws < 0)
        compareAndSetWaitStatus(node, ws, 0);
    //头节点的后继节点,也就是等待竞争锁的第一个节点
    Node s = node.next;
    if (s == null || s.waitStatus > 0) {
        s = null;
        //当前头节点的后继节点如果不存在,那么反向从尾向头遍历,找到实际 waitStatus < 0 的节点线程去唤醒
        //这种情况会出现在异常场景调用 cancelAcquire 的时候
        for (Node t = tail; t != null && t != node; t = t.prev)
            if (t.waitStatus <= 0)
                s = t;
    }
    //如果头节点的后继节点不为 NULL
    if (s != null)
        //发放许可,唤醒头节点的后继节点的线程
        LockSupport.unpark(s.thread);
}

不需要唤醒的场景

上面解锁的代码里面,释放锁之后判断是否要唤醒其他线程时有这样一个条件,如下图

Java 并发编程 AQS 介绍和源码解析(配视频)

条件限定了头节点不为空,并且头节点的 waitStatus != 0 的时候才需要唤醒。

  • h == null 代表头节点为空,说明同步队列中没有等待的节点,无需唤醒
  • 头节点 waitStatus = 0 说明后继节点暂时还未挂起,无需唤醒。因为 acquireQueued 代码分析 这一段我们分析过,线程挂起之前会把自己前继节点的 waitStatus = -1

unlock 解锁并非传递锁

从上面的代码我们可以看到 LockSupport.unpark(s.thread) 之后目标线程被唤醒,值得注意的是,被唤醒的线程会从 LockSupport.park(this) 继续执行代码,由于这段代码在循环里,如下图

Java 并发编程 AQS 介绍和源码解析(配视频)

所以它被唤醒之后不是直接获取了锁,而是获得了获取锁的机会,至于能不能获取到锁,要看它的前继节点是不是 head,以及它能不能通过 CAS 竞争到锁。所以我们可以得到这样的结论:某个线程调用 unlock() 之后,并不是将锁移交给了被唤醒的线程,而是给了被唤醒线程一个获取锁的机会。

cancelAcquire 取消获取锁

acquireQueuedcancelAcquire 方法的进入场景是线程可响应中断场景,ReentrantLocklock() 是不可响应中断的,所以这个场景不会走到,这里暂不介绍。如果有兴趣的话读一读 ReentrantLock.lockInterruptibly() 代码这个是可以响应中断的。

非公平锁到底“不公平”在哪里

错误理解

前面我们是以非公平锁 NonFairSync 为例学习 AQS 的,我们已经知道了多个线程竞争锁的时候如果竞争不到会被放入同步队列等待唤醒。不知道未深入接触过 AQS 的你,曾几何时是否有过下面这样的错误认知。

公平锁的含义是每一个调用 lock() 方法争抢锁的线程,都会按照先后顺序排队,依次从队列头部获取锁(正确)

非公平锁的含义是同步队列中等待唤醒的线程会被 CPU 调度随机唤醒(错误)

在我阅读 AQS 源码之前,我对非公平锁就有这样错误的认知。其实我也觉得是官方的命名导致翻译过来让人容易先入为主的弄错。其实从刚刚阅读的源码可见,非公平锁并不是随机调度的含义。

无情抢夺

我们分析 ReentrantLock.lock() 的示例代码里面有一个 线程E 代码如下

new Thread(new Runnable() {
    @Override @SneakyThrows
    public void run() {
        TimeUnit.MINUTES.sleep(10);
        System.out.println("E 线程准备和 B 线程竞争锁...");
        lock.lock();
        System.out.println("E线程成功竞争锁...");
        System.out.println("E 线程执行完毕...");
        lock.unlock();
    }
},"E").start();

这里 线程E睡了十分钟才执行 lock()方法, 和 线程A持有锁的时间一样,当 线程A 释放锁的时候,线程E就有可能直接获取到锁而不进入队列,这个时候对于在队列头等待的 线程B 来说,就是不公平的。 我们观察 NonFairSyncFairSynclock() 方法源码

// NonFairSync 实现
final void lock() {
    if (compareAndSetState(0, 1))
        setExclusiveOwnerThread(Thread.currentThread());
    else
        acquire(1);
}
// FairSync 实现
final void lock() {
    acquire(1);
}

可以看到公平与非公平的实现区别在于:非公平锁加锁的时候,新来的线程将会直接用 CAS 抢夺一次锁,抢夺不到的情况下,然后再执行 acquire(1) 方法准备入队。 而公平锁 FairSync 加锁的时候对于新来的线程是直接入队列。

而无论是公平还是非公平锁,已经在队列中等待唤醒的线程都是按照队列从头到尾的顺序依次获取锁。

这里 线程E 执行 lock() 方法时,恰好是 线程A 释放锁的时间,线程E 有很大概率能够通过 CAS 抢夺到锁,对于等了十分钟的 线程B 来说,太不公平了!

额外的思考

前面我们提到过非公平锁新来的线程在入队等待唤醒的过程中,将会有三次竞争锁的机会:

  • 刚进入 lock() 方法时 compareAndSetState(0, 1)
  • lock() 那一步没抢到,进入 acquire() 方法时 tryAcquire(arg)
  • acquire() 没抢到, acquireQueued()内部 tryAcquire(arg)

刚刚我们又说,对于公平锁,新来的线程不会参与竞争锁,既然公平锁和非公平锁都调用了 tryAcquire(arg)。 那么公平锁 FairSync 内部的实现 tryAcquire(arg) 一定做了特殊处理,否则对于公平锁来说新来的线程也会参与竞争锁。查看 FairSync.tryAcquire() 方法的实现源码

protected final boolean tryAcquire(int acquires) {
    //当前线程
    final Thread current = Thread.currentThread();
    //锁资源 state 的值
    int c = getState();
    //如果锁没有被占用
    if (c == 0) {
        //判断是否需要竞争锁
        if (!hasQueuedPredecessors() &&
            compareAndSetState(0, acquires)) {
            //竞争锁成功,设置当前持有锁的线程
            setExclusiveOwnerThread(current);
            return true;
        }
    }
    //如果当前线程就是持有锁的线程
    else if (current == getExclusiveOwnerThread()) {
        //重入锁自增 state 的值
        int nextc = c + acquires;
        if (nextc < 0)
            throw new Error("Maximum lock count exceeded");
        setState(nextc);
        return true;
    }
    return false;
}

发现与 NonFairSync 相比尝试获取锁之前多了一个hasQueuedPredecessors(),查看其源码

public final boolean hasQueuedPredecessors() {
    Node t = tail; // Read fields in reverse initialization order
    Node h = head;
    Node s;
    //如果当前头节点后面还有后继节点
    return h != t &&
        ((s = h.next) == null || s.thread != Thread.currentThread());
}

这段代码的意思是如果队列中还有等待唤醒的节点,那么就返回 true,返回 true 之后外面的 if 条件第一个 !hasQueuedPredecessors() == false ,所以代码不会执行到 compareAndSetState(0, acquires) 尝试获取锁,乖乖在队列排队了。

能否真正实现完全非公平锁

上面我们知道了 AQS 实现的所谓非公平锁,其实并不是所有参与等待锁的线程都能像 synchonized 关键字一样,让所有线程等待 CPU 调度,听天由命的随机获取锁。

现在我们假设如果我们要实现让同步队列中正在等待唤醒的所有线程,在 state = 0 的时候,也就是上一个持有锁的线程释放锁之后,能够随机唤醒,而不是按照队列头往后遍历的顺序,能够实现吗?如果能那么该如何实现?如果不能,那么为什么?

这是我基于自己错误的认识非公平锁之后自创的问题,我认为面试如果这样来问才真的能够筛选候选人。我认为这��个好问题。。。虽然不难,但是起码能够得到候选人自己的扩展思考,能够筛选一定的八股文背诵者!

其实想要实现随机唤醒,我们只需要把数据结构换一个就行了,把存放等待线程的 Node 节点之间的队列换成无顺序的例如 Set,或者使用有顺序的 ArrayList,唤醒的时候产生一个随机索引下标拿到 ArrayList 指定索引下标的元素。

大家可以思考会不会有什么问题,欢迎讨论。

结语

本篇文章深入介绍了 AQS 加锁、解锁的源码。单独从文字配备静态图,读者可能会难以理解,所以结合视频讲解,相信大家对 AQS 源码应该比较熟悉了。还是那句话我认为源码阅读难度一般,但是让我来写,我写不出来。

如果这篇文章对你有帮助,记得点赞加关注!你的支持就是我继续创作的动力!

转载自:https://juejin.cn/post/7384353917750689819
评论
请登录