likes
comments
collection
share

从ReentrantLock入手解析AQS核心原理;资源抢占、线程挂起与唤醒AQS提供了锁资源属性以及竞争锁资源的方法,

作者站长头像
站长
· 阅读数 20

背景

工作年份少的java程序员,几乎百分之80的面试流程中都会问到AQS,本身它就是一个抽象类没有一个依附的完整实体,很不好回答。本文就是对AQS从ReentrantLock的角度进行整理的文章,其中会记录哪些是AQS的能力,哪些是需要自己实现的这样可以更好的帮助大家理解AQS.

AQS介绍

AQS全称AbstractQueuedSynchronizer,是java中的一个抽象类;是juc包下的一个基础类,大多数的并发工具都是基于AQS实现的,它对外提供:多线程下能安全更改的互斥变量;未获取到锁资源的等待与唤醒。来帮助并发工具类实现自身的业务功能。

ASQ内部实现

在AQS整个类有两千多行代码,如果只是讲解它每个方法的含义,无法形成体系,看完可能过一两天也就忘记了。这里采用讲解AQS的实现ReentrantLock,通过我们常用ReentrantLock的加锁(lock)以及释放锁(unlock)方法来讲解其中的调用细节。

ReentrantLock的用法

ReentrantLock reentrantLock = new ReentrantLock();

public void test(){
    reentrantLock.lock();
    try {
        // todo 业务代码
    }finally {
        reentrantLock.unlock();
    }
}

多个线程同时执行test()过程:

  1. new 一个ReentrantLock;
  2. A线程执行lock方法,获取锁资源;
  3. A线程将state从0更改为1,获取锁资源成功;
  4. B线程获取锁资源,但锁资源目前被占用;
  5. B线程获取锁资源失败,需要去排队,添加到双向链表中;并挂起线程;
  6. A线程执行完业务逻辑,释放锁资源,唤醒head.next节点(B线程);
  7. B线程重新尝试获取锁资源。

接下来会详细介绍上述的过程。

步骤一:创建一个ReentrantLock

首先new出来一个ReentrantLock,它有两个构造方法,无参的构造方法会直接new一个非公平的同步器,有参的可以通过boolean来控制构造公平还是非公平的同步器。

public ReentrantLock() {
    sync = new NonfairSync();
}

public ReentrantLock(boolean fair) {
    sync = fair ? new FairSync() : new NonfairSync();
}

而FairSync和NonfairSync都会实现AQS,而AQS中有三个属性,此时三个属性和初始化如下所示:

// 双向链表头节点
private transient volatile Node head;
// 双向链表尾节点  用来组成一个双向链表,存储线程资源
private transient volatile Node tail;
// 互斥变量
private volatile int state;

从ReentrantLock入手解析AQS核心原理;资源抢占、线程挂起与唤醒AQS提供了锁资源属性以及竞争锁资源的方法,

步骤二:如何加锁

当调用lock方法时对于不同的锁有不同的处理方式;

  • 非公平锁:上来尝试将state从0更改为1,如果更改成功则说明获取到锁;如果失败则调用acquire方法。
  • 公平锁:直接调用acquire方法。

这里的state就是AQS内部的private volatile int state; 属性,当多个线程同时修改state的值时,会通过cas的形式进行修改,保证了多线程下它是原子性的。

acquire方法是AQS提供的方式,它的源码如下:

@ReservedStackAccess
public final void acquire(int arg) {
    //1.调用tryAcquire方法:尝试获取锁资源,如果拿到锁资源,返回ture,
    //直接返回。没有获取则执行后面方法。
    
    //2. 获取锁资源失败,调用addWaiter:将线程封装为Node,并且插入到AQS队列的尾部,
    //并且作为tail
    
    //3.继续调用acquireQueued,查看当前排队的Node是否在队列的前面,
    //如果在前面(head的next),尝试获取锁资源。 如果没在前面,尝试将线程挂起,阻塞起来!
    if (!tryAcquire(arg) &&
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        selfInterrupt();
}

AQS:tryAcquire方法:

tryAcquire分为公平和非公平两种;这两种tryAcquire主要流程:

  • 如果state为0,尝试获取锁资源
  • 如果state不为0,看一下是不是锁重入操作
protected boolean tryAcquire(int arg) {
    // 直接抛出异常,对于ReentrantLock它有公平和非公平两种实现
    throw new UnsupportedOperationException();
}
  • 非公平锁实现:
final boolean nonfairTryAcquire(int acquires) {
    // 拿到当前线程!
    final Thread current = Thread.currentThread();
    // 拿到AQS的state
    int c = getState();
    // 如果state == 0,说明没有线程占用着当前的锁资源
    if (c == 0) {
        // 没线程占用锁资源,直接更改state的值
        if (compareAndSetState(0, acquires)) {
            // 将当前占用这个互斥锁的线程属性设置为当前线程
            setExclusiveOwnerThread(current);
            // 返回true,拿锁成功
            return true;
        }
    }
    // 当前state != 0,说明有线程占用着锁资源
    // 判断拿着锁的线程是不是当前线程(锁重入)
    else if (current == getExclusiveOwnerThread()) {
        // 将state再次+1
        int nextc = c + acquires;
        // 锁重入是否超过最大限制
        // 当大于 Integer.MAX_VALUE 时 抛出error
        if (nextc < 0) 
            throw new Error("Maximum lock count exceeded");
        // 将值设置给state
        setState(nextc);
        // 返回true,拿锁成功
        return true;
    }
    return false;
}

  • 公平锁实现:
// 整体上公平锁和非公平锁的实现方式是差不多的
protected final boolean tryAcquire(int acquires) {
    final Thread current = Thread.currentThread();
    int c = getState();
    if (c == 0) {
        // hasQueuedPredecessors判断是否有线程在排队,如果有线程排队,返回true,配上前面的!,直接就返回false
        if (!hasQueuedPredecessors() &&
            // 如果没有线程排队,直接CAS尝试获取锁资源
            compareAndSetState(0, acquires)) {
            setExclusiveOwnerThread(current);
            return true;
        }
    }
    else if (current == getExclusiveOwnerThread()) {
        int nextc = c + acquires;
        if (nextc < 0)
            throw new Error("Maximum lock count exceeded");
        setState(nextc);
        return true;
    }
    return false;
}

AQS:addWaiter方法:

acquire方法中是addWaiter(Node.EXCLUSIVE)这样调用addWaiter方法的,它有两个属性,而ReentrantLock是互斥锁,所以传入EXCLUESIVEl

// 共享模式
static final Node SHARED = new Node();
// 互斥模式
static final Node EXCLUSIVE = null;
private Node addWaiter(Node mode) {
    // 将当前线程封装为Node对象,mode为互斥锁
    Node node = new Node(Thread.currentThread(), mode);
    // 拿到尾巴节点
    Node pred = tail;
    // pred不是null,说明有线程正在排队
    if (pred != null) {
        // 将当前的节点prev,指向尾巴节点
        node.prev = pred;
        // 以CAS的方式,将当前节点变更为尾巴节点
        if (compareAndSetTail(pred, node)) {
            // 旧的尾巴节点的next指向当前节点。
            pred.next = node;
            return node;
        }
    }
    // 如果上述流程失败,则通过以下方法加入队列,该方法就是一个for循环方法,一直加成功为止。
    enq(node);
    return node;
}
// 无论怎样都添加进入
private Node enq(final Node node) {
    for (;;) {
        // 拿到tail
        Node t = tail;
        // 如果tail为null,说明当前没有Node在队列中
        if (t == null) { 
            // 创建一个新的Node作为head,并且将tail和head指向一个Node
            if (compareAndSetHead(new Node()))
                tail = head;
        } else {
            // 和addWaiter代码一致,把当前节点放在变为尾巴节点
            node.prev = t;
            if (compareAndSetTail(t, node)) {
                t.next = node;
                return t;
            }
        }
    }
}

流程:

  1. 当前节点指向尾巴节点;
  2. tail指向当前节点;
  3. 旧的尾巴节点指向当前节点。 结尾会详细分析。

AQS:acquireQueued方法

acquireQueued方法会查看当前排队的Node是否是head的next,如果是,尝试获取锁资源,如果不是或者获取锁资源失败那么就尝试将当前Node的线程挂起unsafe.park()

@ReservedStackAccess
final boolean acquireQueued(final Node node, int arg) {
    boolean failed = true;
    try {
        boolean interrupted = false;
        for (;;) {
            // 获取当前节点的上一个节点。
            final Node p = node.predecessor();
            // 如果上一个节点是head,说明是head.next节点,直接调用tryAcquire竞争锁资源。
            if (p == head && tryAcquire(arg)) {
                // 把当前节点设置为头
/*** 具体代码如下:
head = node;
node.thread = null;
node.prev = null;
***/
                //通过上述的设置,原来的头节点已经没有引用了,就可以释放掉。       
                setHead(node);
                p.next = null; // help GC
                // 设置获取锁成功
                failed = false;
                return interrupted;
            }
            // 如果不是head.next节点 or 获取锁失败;
            // 尝试将线程挂起。
            // 1、找到上一个节点状态为-1节点。如果上个节点就是-1则返回true,如果不是则false,通过上层循环下一次就会变为true.
/***
// 取消状态
static final int CANCELLED =  1;
// 正常状态
static final int SIGNAL    = -1;
// 实现wait notify的状态
static final int CONDITION = -2;
// 共享锁
static final int PROPAGATE = -3;
***/
            if (shouldParkAfterFailedAcquire(p, node) &&
            // 2、 直接调用底层的`LockSupport.park(this);`
                parkAndCheckInterrupt())
                interrupted = true;
        }
    } finally {
        // 中断时执行取消
        if (failed)
            cancelAcquire(node);
    }
}

shouldParkAfterFailedAcquire方法: 它先当前节点的上一个节点的状态,如果是正常状态直接返回true,如果ws大于0,也就是1为取消状态,for循环往前找,直到小于0的节点

private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
    int ws = pred.waitStatus;
    
    if (ws == Node.SIGNAL)
        return true;
    if (ws > 0) {
        // 循环往前找到小于0的节点
        do {
            node.prev = pred = pred.prev;
        } while (pred.waitStatus > 0);
        pred.next = node;
    } else {
        // 把节点为-2、-3的状态通过cas变更为-1
        compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
    }
    return false;
}

步骤三:释放锁

ReentractLock:unlock方法

对于unlock不区分公平非公平的;直接调用AQS的release方法,对state减1;减到0时,唤醒排队中的Node节点。

public void unlock() {
    sync.release(1);
}

AQS:release方法

@ReservedStackAccess
public final boolean release(int arg) {
    // tryRelease是一个基类方法,需要自己实现,具体实现看下一段代码。
    if (tryRelease(arg)) {
        // 释放锁资源释放干净了。  (state == 0)
        Node h = head;
        // 如果头节点不为null,并且头节点的状态不为0,唤醒排队的线程
        if (h != null && h.waitStatus != 0)
            // 唤醒线程,具体内容详解放在最后的代码块。
            unparkSuccessor(h);
        return true;
    }
    return false;
}

ReentracLock的实现:

@ReservedStackAccess
protected final boolean tryRelease(int releases) {
    // 计算state的值
    int c = getState() - releases;
    // 判断当前线程是否持有锁资源
    if (Thread.currentThread() != getExclusiveOwnerThread())
        throw new IllegalMonitorStateException();
    // 是否成功的将锁资源释放利索 (state == 0)
    boolean free = false;
    if (c == 0) {
        // 锁资源释放干净。
        free = true;
        // 将占用锁资源的属性设置为null
        setExclusiveOwnerThread(null);
    }
    // 将state赋值
    setState(c);
    // 返回true,代表释放干净了
    return free;
}

unparkSuccessor唤醒节点:

private void unparkSuccessor(Node node) {
    // 拿到头节点状态
    int ws = node.waitStatus;
    // 如果头节点状态小于0,cas换为0
    if (ws < 0)
        compareAndSetWaitStatus(node, ws, 0);

    // 拿到当前节点的next
    Node s = node.next;
    // 如果s == null ,或者s的状态为1 取消状态
    if (s == null || s.waitStatus > 0) {
        // next节点不需要唤醒
        s = null;
        // 从尾巴节点往前找,找到离头节点最近,并且状态是正常的节点
        for (Node t = tail; t != null && t != node; t = t.prev)
            if (t.waitStatus <= 0)
                s = t;
    }
    if (s != null)
        // 唤醒线程
        LockSupport.unpark(s.thread);
}

这里如果下一个节点不是正常节点就从尾巴开始找,这样查找的时间复杂度就变成了O(N),如果从头开始找的话时间复杂度就是O(1)了,而采用O(N)时间复杂度是因为,node加入双向链表的流程导致的。如图:

从ReentrantLock入手解析AQS核心原理;资源抢占、线程挂起与唤醒AQS提供了锁资源属性以及竞争锁资源的方法, 其实经过分析前面的addWaiter方法就可以知道加入节点的步骤;

  1. 当前节点指向尾巴节点;
  2. tail指向当前节点;
  3. 旧的尾巴节点指向当前节点。

如果此时刚好执行完第二步,还没来得及执行第三步,那么就永远也找到刚加入链表中的node,也就没办法对其唤醒,所以需要从tail节点开始查。

那么如果把整体流程换一下,先执行nodeA.next = nodeB呢?这样就可以从头开始查询。但这样会带来其他的问题,注意nodeA和tail是全局共享的,如果此时来多个线程加入队列,刚执行nodeA.next=nodeB的话,还没来得及执行下一步,就来一个nodeC,就变成了nodeA.next=nodeC,后面执行到tail.prev=nodeB或者tail.prev=nodeC时就会带来不一致的问题。只有像addWaiter方法这样执行,才可以使用一个cas就保证了线程安全。

总结

看完整篇文章后,可以了解到AQS通过内部volatile修饰的state状态变量来标记锁或信号量状态,当state为0时表示没有被占用,当state大于1时表示被占用,线程可以通过cas来抢占state进而获取锁资源,当获取资源失败时会进入到一个双向链表中进行等待,当持有锁的线程释放锁后会唤醒在双向链表中等待的线程。

转载自:https://juejin.cn/post/7398050410805723162
评论
请登录