likes
comments
collection
share

解锁Java并发编程基石:深入探索AQS的奥秘

作者站长头像
站长
· 阅读数 13

Lock简介

之前通过深入剖析Java中的synchronized关键字学习了synchronized,下面我们学习一下并发编程线程同步的另一个比较重要的实现Lock

Lock接口定义了一组用于控制线程访问共享资源的方法,如下: 解锁Java并发编程基石:深入探索AQS的奥秘

相对于需要 JVM 隐式获取和释放锁的 Synchronized 同步锁,Lock 同步锁(以下简称 Lock 锁)需要的是显示获取和释放锁,这就为获取和释放锁提供了更多的灵活性。Lock 锁的基本操作是通过乐观锁来实现的,但由于 Lock 锁也会在阻塞时被挂起,因此它依然属于悲观锁。我们可以通过一张图来简单对比下两个同步锁,了解下各自的特点:

解锁Java并发编程基石:深入探索AQS的奥秘

Lock是一个接口,它只提供了释放锁和抢占锁的抽象方法,在J.U.C中提供了以下的具体实现。

解锁Java并发编程基石:深入探索AQS的奥秘

  • ReentrantLock,重入锁,属于排他锁类型,功能和synchronized相似。
  • ReentrantReadWriteLock(RRW),可重入读写锁,该类中维护了两个锁,一是ReadLock,二是WriteLock,它们分别实现了Lock接口。
  • StampedLock,Java 8引入的新的锁机制,它是ReentrantReadWriteLock的改进版本。

它们都是依赖 AbstractQueuedSynchronizer(AQS)类实现的。

AQS 介绍

AQS就是一个抽象类,其主要维护一个资源同步状态变量的值(state)和一个存放排队线程的CLH双向队列,同时线程阻塞等待以及被唤醒时锁分配的机制。具体结构如下:

解锁Java并发编程基石:深入探索AQS的奥秘

AQS使用一个volatile的int类型的成员变量来表示同步状态,通过内置的先进先出的CLH队列来完成获取资源线程的排队工作,将每条要去抢占资源的线程封装成一个Node节点来实现锁的分配,通过CAS完成对State值的修改。

CLH锁其实就是一种是基于逻辑队列非线程饥饿的一种自旋公平锁,由于是 Craig、Landin 和 Hagersten三位大佬的发明,因此命名为CLH锁。

解锁Java并发编程基石:深入探索AQS的奥秘

AQS有一个内部类Node,Node节点对每一个等待获取资源的线程的封装,其包含了需要同步的线程本身及其等待状态,如是否被阻塞、是否等待唤醒、是否已经被取消等。

这里可以直观地看出该node节点会存储当前被阻塞的请求资源线程,变量waitStatus则表示当前Node节点的等待状态,共有5种取值如下:

  • CANCELLED(1):表示当前结点已取消调度。当timeout或被中断(响应中断的情况下),会触发变更为此状态,进入该状态后的结点将不会再变化。
  • SIGNAL(-1):表示后继结点在等待当前结点唤醒。后继结点入队时,会将前继结点的状态更新为SIGNAL。
  • CONDITION(-2):表示结点等待在Condition上,当其他线程调用了Condition的signal()方法后,CONDITION状态的结点将从等待队列转移到同步队列中,等待获取同步锁。
  • PROPAGATE(-3):共享模式下,前继结点不仅会唤醒其后继结点,同时也可能会唤醒后继的后继结点。
  • 0:新结点入队时的默认状态。

AQS的关键方法有以下几个:

  1. acquire():用于线程获取同步器的状态,如果无法获取,则线程会进入阻塞状态等待其他线程释放同步器。
  2. release():用于线程释放同步器的状态,并唤醒等待队列中的其他线程。
  3. tryAcquire(int):独占方式。尝试获取资源,成功则返回true,失败则返回false。
  4. tryRelease(int):独占方式。尝试释放资源,成功则返回true,失败则返回false。•
  5. tryAcquireShared(int):共享方式。尝试获取资源。负数表示失败;0表示成功,但没有剩余可用资源;正数表示成功,且有剩余可用资源。
  6. tryReleaseShared(int):共享方式。尝试释放资源,如果释放后允许唤醒后续等待结点则返回true,否则返回false。

通过ReentrantLock分析AQS的实现

ReentrantLock是JUC并发包中一个重要常用的同步器,其底层就是依赖AQS实现的。下面我们将详细看下ReentrantLock的实现。

获取锁流程

ReentrantLock抢占锁交互图:

ReentrantLockSyncNofairSyncAQSlocklockacquiretryAcquirenofairTryAcquiretrue/false判断抢占锁成功抢占锁失败addWaiterReentrantLockSyncNofairSyncAQS

整体流程如下:

解锁Java并发编程基石:深入探索AQS的奥秘

获取锁源码分析

下面我们分别看下这些方法的源码:

打开lock.lock()这时候跳转到ReentrantLock的内部类NonfairSync(NonfairSync继承自AQS)lock()方法: 解锁Java并发编程基石:深入探索AQS的奥秘

acquire

acquire()方法是AQS中的方法,这里就是线程加锁占位资源不成功,把线程放到CLH队列等待通知唤醒的核心入口

解锁Java并发编程基石:深入探索AQS的奥秘 核心流程步骤如下:

  1. tryAcquire()尝试直接去获取资源,如果成功则直接返回(这里体现了非公平锁,每个线程获取锁时会尝试直接抢占加塞一次,而CLH队列中可能还有别的线程在等待);
  2. addWaiter()将该线程加入等待队列的尾部,并标记为独占模式;
  3. acquireQueued()使线程阻塞在等待队列中获取资源,一直获取到资源后才返回。如果在整个等待过程中被中断过,则返回true,否则返回false。
  4. 如果线程在等待过程中被中断过,它是不响应的。只是获取资源后才再进行自我中断selfInterrupt(),将中断补上。

tryAcquire

FairSync.tryAcquire的实现如下:

final boolean nonfairTryAcquire(int acquires) {  
    final Thread current = Thread.currentThread();  
    int c = getState();  
    if (c == 0) {  
        if (compareAndSetState(0, acquires)) {  
            setExclusiveOwnerThread(current);  
            return true;  
        }  
    }  
    else if (current == getExclusiveOwnerThread()) {  
        int nextc = c + acquires;  
        if (nextc < 0) // overflow  
            throw new Error("Maximum lock count exceeded");  
        setState(nextc);  
        return true;  
    }  
    return false;  
}

这段代码是 AbstractQueuedSynchronizer (AQS) 中的 nonfairTryAcquire 方法的实现。该方法用于非公平地尝试获取同步器的状态。

  1. 首先,获取当前线程并获取当前的状态值 c。
  2. 如果当前状态值 c 为 0,表示同步器当前没有被其他线程占用。在这种情况下,直接使用 compareAndSetState 方法将状态从 0 修改为 acquires(期望值为 0,更新值为 acquires)。如果修改成功,表示当前线程成功获取了同步器,并将当前线程设置为独占式锁的拥有者,然后返回 true。
  3. 如果当前状态值 c 不为 0,表示同步器已经被其他线程占用。此时需要判断当前线程是否为同步器的独占式锁拥有者,如果是,则将当前状态值加上 acquires(增加锁的持有次数),并返回 true。
  4. 如果以上条件都不满足,则表示当前线程无法获取同步器的状态,返回 false。

addWaiter

此方法用于将当前线程加入到等待队列CLH的队尾,并返回当前线程所在的结点

private Node addWaiter(Node mode) {  
    Node node = new Node(Thread.currentThread(), mode);  
    // Try the fast path of enq; backup to full enq on failure  
    Node pred = tail;  
    if (pred != null) {  
        node.prev = pred;  
        if (compareAndSetTail(pred, node)) {  
            pred.next = node;  
            return node;  
        }  
    }  
    enq(node);  
    return node;  
}
  1. 首先,创建一个新的节点 Node,该节点包含当前线程和指定的等待模式 mode。
  2. 尝试使用快速路径 (fast path) 将新节点添加到队列中。首先获取当前队列的尾节点 pred,如果尾节点不为 null,则将新节点的 prev 指向 pred,并尝试使用 compareAndSetTail 方法将尾节点更新为新节点。如果更新成功,则将原尾节点 pred 的 next 指向新节点,并返回新节点。
  3. 如果快速路径失败,即尾节点为 null 或者 compareAndSetTail 失败,表示有其他线程在并发修改队列。这时候需要使用完整的入队操作 (enq) 将新节点添加到队列中。
  4. 在 enq 方法中,首先通过自旋操作将新节点添加到队列的尾部。
  5. 返回新节点。

该方法的作用是将当前线程作为等待线程添加到同步队列中。它采用了一种乐观的策略,首先尝试使用快速路径将新节点添加到队列尾部,如果失败则使用完整的入队操作。通过这种方式,可以在大多数情况下避免竞争和线程阻塞,提高并发性能。

acquireQueued

该方法用于在同步队列中获取锁,当无法直接获取锁时,将当前线程加入到等待队列中并进行自旋等待。

final boolean acquireQueued(final Node node, int arg) {  
    boolean failed = true;  
    try {  
        boolean interrupted = false;  
        for (;;) {  
            final Node p = node.predecessor();  
            if (p == head && tryAcquire(arg)) {  
                setHead(node);  
                p.next = null; // help GC  
                failed = false;  
                return interrupted;  
            }  
            if (shouldParkAfterFailedAcquire(p, node) &&  
                parkAndCheckInterrupt())  
                interrupted = true;  
            }  
    } finally {  
        if (failed)  
            cancelAcquire(node);  
    }  
}
  1. 首先,设置一个标志位 failed 为 true,用于记录获取锁的过程中是否出现异常。
  2. 在 try-catch-finally 块中进行循环,直到成功获取到锁或被中断。
  3. 在循环中,首先获取当前节点的前驱节点 p,判断当前节点的前驱节点是否为头节点且尝试获取锁成功(通过调用 tryAcquire 方法)。如果是,则将当前节点设置为新的头节点,断开原头节点和当前节点的连接,将前驱节点的 next 指针置为 null,最后将标志位 failed 设为 false,并返回中断状态 interrupted。
  4. 如果前驱节点 p 不满足条件,调用 shouldParkAfterFailedAcquire 方法判断是否需要阻塞当前线程,并调用 parkAndCheckInterrupt 方法进行线程阻塞,并检查线程是否被中断。如果被中断,则将中断状态 interrupted 设置为 true。
  5. 循环回到第2步,继续尝试获取锁或阻塞。
  6. 如果循环结束时仍然无法获取锁,即获取锁的过程中出现异常,则调用 cancelAcquire 方法取消当前节点的获取操作。

shouldParkAfterFailedAcquire

该方法用于判断在获取锁失败后,当前线程是否应该进行阻塞:

private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {  
    int ws = pred.waitStatus;  
    if (ws == Node.SIGNAL)  
        return true;  
    if (ws > 0) {  
        do {  
            node.prev = pred = pred.prev;  
        } while (pred.waitStatus > 0);  
        pred.next = node;  
    } else {  
        compareAndSetWaitStatus(pred, ws, Node.SIGNAL);  
    }  
    return false;  
}
  1. 首先,获取前驱节点 pred 的等待状态 waitStatus。
  2. 如果等待状态 waitStatus 等于 Node.SIGNAL,表示前驱节点已经设置了状态要求释放锁来发出信号,因此当前节点可以安全地进行阻塞,返回 true。
  3. 如果等待状态 waitStatus 大于 0,表示前驱节点已被取消。循环遍历跳过前驱节点及其之前的已被取消的节点,直到找到一个等待状态小于等于 0 的前驱节点 pred。然后更新当前节点的 prev 指针为找到的前驱节点 pred,将前驱节点 pred 的 next 指针指向当前节点 node。
  4. 如果等待状态 waitStatus 为 0 或 Node.PROPAGATE,则表明需要一个信号来唤醒当前节点,但不立即进行阻塞。调用 compareAndSetWaitStatus 方法将前驱节点 pred 的等待状态修改为 Node.SIGNAL,以表示需要一个信号。
  5. 返回 false,表示当前线程不需要进行阻塞。

解锁源码分析

unlock

NonfairSync(NonfairSync继承自AQS)unlock()方法:

public void unlock() {  
    sync.release(1);  
}

release

public final boolean release(int arg) {  
    if (tryRelease(arg)) {  
        Node h = head;  
        if (h != null && h.waitStatus != 0)  
            unparkSuccessor(h);  
        return true;  
    }  
    return false;  
}

tryRelease

该方法用于释放锁资源。

protected final boolean tryRelease(int releases) {  
    int c = getState() - releases;  
    if (Thread.currentThread() != getExclusiveOwnerThread())  
        throw new IllegalMonitorStateException();  
    boolean free = false;  
    if (c == 0) {  
        free = true;  
        setExclusiveOwnerThread(null);  
    }  
    setState(c);  
    return free;  
}
  1. 首先,获取当前锁的状态值 c = getState() - releases,表示要释放锁后的新状态。

  2. 判断当前线程是否是独占锁的拥有者,如果不是则抛出 IllegalMonitorStateException 异常。

  3. 根据新状态 c 进行处理:

    • 如果新状态 c 等于 0,表示锁已完全释放。将 exclusiveOwnerThread 设置为 null,表示当前没有拥有者,同时将 free 标志设置为 true。
    • 否则,更新锁的状态为新状态 c。
  4. 返回 free 标志,表示锁是否已完全释放。

unparkSuccessor

如果上面释放资源成功返回true,那么就会执行unparkSuccessor()唤醒等待队列里的下一个线程

private void unparkSuccessor(Node node) {  
    /*  
    * If status is negative (i.e., possibly needing signal) try  
    * to clear in anticipation of signalling. It is OK if this  
    * fails or if status is changed by waiting thread.  
    */  
    int ws = node.waitStatus;  
    if (ws < 0)  
        compareAndSetWaitStatus(node, ws, 0);  
  
    /*  
    * Thread to unpark is held in successor, which is normally  
    * just the next node. But if cancelled or apparently null,  
    * traverse backwards from tail to find the actual  
    * non-cancelled successor.  
    */  
    Node s = node.next;  
    if (s == null || s.waitStatus > 0) {  
        s = null;  
        for (Node t = tail; t != null && t != node; t = t.prev)  
            if (t.waitStatus <= 0)  
                s = t;  
    }  
    if (s != null)  
        LockSupport.unpark(s.thread);  
}
  1. 首先,判断节点的等待状态 ws,如果 ws 小于 0,则尝试将其置为 0。这是为了预先清除可能需要信号的状态。即使此操作失败或等待线程更改了状态,也不会影响后续的操作。
  2. 获取节点的后继节点 s,通常后继节点就是当前节点的下一个节点。但如果后继节点被取消或为空,就从尾部向前遍历找到实际未被取消的后继节点。这是为了找到真正需要唤醒的节点。
  3. 如果找到了需要唤醒的后继节点 s,则调用 LockSupport.unpark(s.thread) 方法唤醒该节点对应的线程。

公平锁&非公平锁

ReentrantLock分为公平锁和非公平锁,默认构造方法是非公平锁,如果我们需要构建公平锁,只需要传参true即可:

Lock lock = new ReentrantLock(true);

其加锁解锁流程几乎跟上面非公平锁一样,一些细节有些不同:

protected final boolean tryAcquire(int acquires) {  
    final Thread current = Thread.currentThread();  
    int c = getState();  
    if (c == 0) {  
        if (!hasQueuedPredecessors() &&  
            compareAndSetState(0, acquires)) {  
            setExclusiveOwnerThread(current);  
            return true;  
        }  
    }  
    else if (current == getExclusiveOwnerThread()) {  
        int nextc = c + acquires;  
        if (nextc < 0)  
            throw new Error("Maximum lock count exceeded");  
        setState(nextc);  
        return true;  
    }  
    return false;  
}

这里lock()没有和非公平锁的一样先使用CAS尝试加锁占有资源,是直接调用acquire(1)进行入队操作。加锁这里多了hasQueuedPredecessors的逻辑

public final boolean hasQueuedPredecessors() {   
    Node t = tail; // Read fields in reverse initialization order  
    Node h = head;  
    Node s;  
    return h != t &&  
        ((s = h.next) == null || s.thread != Thread.currentThread());  
}
转载自:https://juejin.cn/post/7246365103784099895
评论
请登录