likes
comments
collection
share

关于AQS等待/唤醒机制:ConditionObject的原理分析

作者站长头像
站长
· 阅读数 72

说明: 本篇不再啰嗦AQS相关知识,因为我们上一篇都讲过了,稍微介绍下ConditionObject,写个小案例然后直入主题再来个图解就完活。

注意: 条件等待队列(一定要和 等待队列 区分开来),这里我们画图解释一下: 关于AQS等待/唤醒机制:ConditionObject的原理分析

几个注意的点:

  1. 在下文中: 只要提到等待队列,就是CLH队列,也就是存放 (获取锁失败后/或者被signal唤醒后从条件等待队列移到等待队列)的node队列,而一提到条件等待队列,就是在说(调用await后存放)Node的队列!,这俩队列一定要搞清楚,否则就很迷了。
  2. 条件等待队列可能存在多个,而CLH等待队列只能是一个。这一点我们要清楚。多个条件等待队列也是ReentrantLock实现细粒度唤醒的一个基本原因。
  3. AQS中的await和signale 只能是排他锁使用,共享锁绝对不会存在 等待/唤醒机制这么一说。
  4. 条件等待队列 中的线程,想要获取锁必然 需要通过signal方法 移动到等待队列中去才有机会
  5. 条件等待队列 和CLH一样也是FIFO 但是是单向链表结构这个要知道,另外signal唤醒的总是条件等待队列的头节点,await后插入的Node总是从条件等待队列的尾部进行插入。

1、ConditionObject有啥用以及小案例

对于ConditionObject,可能很多人没直接用过,但是如果你用过ReentrantLock,那么还是有一定概率使用到他的,尤其是在一些 生产/消费(或者说等待/唤醒) 场景下。ConditionObject他是AQS的一个内部类他实现了Condition接口,并且实现了其中的await(),signal(),signalAll()等方法,ConditionObject主要是为并发编程中的同步提供了等待/唤醒的实现方式,可以在不满足某个条件的时候挂起线程等待(使用await方法) 或者在满足某些条件时唤醒其他等待的线程(使用signal/signalAll方法)。就像使用synchroized时,使用的wait()和notify()/notifyAll()一样,只不过(基于ConditionObject实现的ReentrantLock)可以根据条件唤醒指定线程,而synchroized却不行他只能唤醒某一个或者全部唤醒,粒度没有(基于ConditionObject实现的ReentrantLock)

由于jdk中基于ConditionObject实现的条件等待机制也就是ReentrantLock和读写锁,而ReentrantLock用的多一些所以我们以ReentrantLock为例,做一个生产/消费的小案例,来切身体会一下也方便源码分析时的切入和debug。 关于AQS等待/唤醒机制:ConditionObject的原理分析

生产/消费 案例完整源码如下:

/**
 * @Auther: Huangzhuangzhuang
 * @Date: 2023/10/20 07:02
 * @Description:
 */
@Slf4j
public class AwaitSignalDemo {

   private static volatile int shoeCount = 0;
   private static ThreadPoolExecutor producerThread = new ThreadPoolExecutor(1, 1, 1000 * 60, TimeUnit.MILLISECONDS, SemaphoreTest.asyncSenderThreadPoolQueue = new LinkedBlockingQueue<Runnable>(500), new ThreadFactory() {
      private final AtomicInteger threadIndex = new AtomicInteger(0);
      @Override
      public Thread newThread(Runnable r) {
         return new Thread(r, "生产线程_" + this.threadIndex.incrementAndGet());
      }
   });
   private static ThreadPoolExecutor consumerThread = new ThreadPoolExecutor(1, 1, 1000 * 60, TimeUnit.MILLISECONDS, SemaphoreTest.asyncSenderThreadPoolQueue = new LinkedBlockingQueue<Runnable>(500), new ThreadFactory() {
      private final AtomicInteger threadIndex = new AtomicInteger(0);
      @Override
      public Thread newThread(Runnable r) {
         return new Thread(r, "消费线程_" + this.threadIndex.incrementAndGet());
      }
   });

   public static void main(String[] args) {
      Lock lock = new ReentrantLock();
      Condition producerCondition = lock.newCondition();
      Condition consumerCondition = lock.newCondition();
      //不停生产鞋,攒够5双了就唤醒消费线程
      producerThread.execute(() -> {
         while (true) {
            lock.lock(); // 获取锁资源
            try {
               if (shoeCount > 5) { //如果生产够5双, 则阻塞等待生产线程,待消费线程消费完后再生产
                  System.out.println(Thread.currentThread().getName() + "_生产鞋完成" + (shoeCount - 1) + "双");
                  consumerCondition.signal();//唤醒消费鞋子的线程
                  producerCondition.await();//挂起生产鞋的线程
               } else {
                  shoeCount++;//生产鞋子
               }
            } catch (Exception e) {
               e.printStackTrace();
            } finally {
               lock.unlock();//释放锁资源
            }
         }
      });
      //不停消费鞋,把鞋消费完了就唤醒生产线程然他继续造
      consumerThread.execute(() -> {
         while (true) {
            lock.lock();//获取锁资源
            try {
               if (shoeCount == 0) {//如果消费完了
                  System.out.println(Thread.currentThread().getName() + "_鞋子全部消费完了");
                  System.out.println();
                  producerCondition.signal(); //消费完鞋子之后,唤醒生产鞋子的线程
                  consumerCondition.await(); //挂起消费鞋子的线程,等待生产完后唤醒当前挂起线程
               } else {
                  shoeCount--;//消费鞋子
               }
            } catch (Exception e) {
               e.printStackTrace();
            } finally {
               lock.unlock();//释放锁资源
            }
         }
      });
   }
}

代码逻辑不过多解释了,有注释也很简单没什么可说的。

2、等待(await)机制源码分析

ReentrantLock的等待机制最终是依赖AQS的ConditionObject类的await方法实现的,所以我们直接来到AQS#ConditionObject的await方法一探究竟,源码如下:

public final void await() throws InterruptedException {
    if (Thread.interrupted())
        throw new InterruptedException();
    //将当前线程加入到 条件等待的链表最后,并返回该节点(内部会创建 Node.CONDITION=-2 类型的 Node)
    Node node = addConditionWaiter();
    //释放当前线程获取的锁(通过操作 state 的值,一直减到state==0)释放了锁就会被阻塞挂起,
    //fullyRelease内部就是调用的我们在AQS独占锁释放时候的tryRelease方法
    int savedState = fullyRelease(node);
    int interruptMode = 0;
    //判断 node节点是否在 AQS 等待队列中(注意该方法中如果node是head的话是返回false的,也就是会执行park逻辑)
    while (!isOnSyncQueue(node)) {
        //如果是head或者当前节点在队列则挂起当前线程
        LockSupport.park(this);
        //如果上边挂起线程后,紧接着又有其他线程中断/唤醒了当前线程(这种情况理论可能比较少但是并发情况下也不一定😄),那么则跳出循环,
        //下边(循环外的)acquireQueued将 node移至AQS等待队列,让其继续抢锁
        if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
            break;
    }
    //acquireQueued将 node移至AQS等待队列,让其再次抢锁
    //注意此处是 : 采用排他模式的资源竞争方法 acquireQueued
    if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
        interruptMode = REINTERRUPT;
    if (node.nextWaiter != null) // clean up if cancelled
        //清除取消的线程
        unlinkCancelledWaiters();
    if (interruptMode != 0)
        reportInterruptAfterWait(interruptMode);
}


//将当前线程包装成 CONDITION 节点,排入该 Condition 对象内的(条件等待队列)的队尾
private Node addConditionWaiter() {
    if (!isHeldExclusively())
        throw new IllegalMonitorStateException();
    Node t = lastWaiter;
    //遍历 Condition 队列,踢出 Cancelled 节点
    if (t != null && t.waitStatus != Node.CONDITION) {
        unlinkCancelledWaiters();
        t = lastWaiter;
    }
    //将当前线程包装成 CONDITION 节点,排入该 Condition 对象内的条件等待队列的队尾
    Node node = new Node(Node.CONDITION);
    if (t == null)
        firstWaiter = node;
    else
        t.nextWaiter = node;
    lastWaiter = node;
    return node;
}
//检测是否有中断
private int checkInterruptWhileWaiting(Node node) {
    return Thread.interrupted() ?
        (transferAfterCancelledWait(node) ? THROW_IE : REINTERRUPT) :
        0;
}

final boolean transferAfterCancelledWait(Node node) {
    //将node 状态由 CONDITION 设置为 0,如果设置成功,则说明当前线程抢占到了安排 node 进入 AQS 等待队列的权利,证明了 interrupt 操作先于 signal 操作
    if (node.compareAndSetWaitStatus(Node.CONDITION, 0)) {
        //加到等待队列
        enq(node);
        return true;
    }
    //如果 CAS 操作失败,说明其他线程调用 signal 先行处理了 node 节点。
    //当前线程没竞争到 node 节点的唤醒权,要在 node 节点进入 AQS 队列前一直自旋,同时要执行 yield 让出 CPU
    while (!isOnSyncQueue(node))
        Thread.yield();
    return false;
}
    

等待(即await)的源码考虑的很细,有些细节我们不做过多深挖,直接画个图演示下await主要做了什么: 关于AQS等待/唤醒机制:ConditionObject的原理分析

3、唤醒(signal)机制源码分析与图解

public final void signal() {
    //如果当前线程未持有资源state,则抛出异常
    if (!isHeldExclusively())
        throw new IllegalMonitorStateException();
    Node first = firstWaiter;
    if (first != null)
        doSignal(first);
}

//唤醒
private void doSignal(Node first) {
    do {
        if ( (firstWaiter = first.nextWaiter) == null)
            lastWaiter = null;
        first.nextWaiter = null;
    } while (!transferForSignal(first) &&
             (first = firstWaiter) != null);
}

//将node 节点从 条件等待队列转移到 等待队列中去
final boolean transferForSignal(Node node) {
    //尝试将节点状态由 CONDITION 改为 0
    if (!node.compareAndSetWaitStatus(Node.CONDITION, 0))
        return false;

    //end方法将 node 节点插入 AQS 等待队列 队尾,返回 node 节点的前驱节点
    Node p = enq(node);
    int ws = p.waitStatus;
    //如果当前node的前置节点状态为 CANCELLED(大于0只有取消一种),或者设置前置节点状态为 SIGNAL失败,则将 node 节点持有的线程唤醒
    if (ws > 0 || !p.compareAndSetWaitStatus(ws, Node.SIGNAL))
        LockSupport.unpark(node.thread);
    return true;
}

唤醒机制(signal)的图解: 关于AQS等待/唤醒机制:ConditionObject的原理分析

4、总结

转载自:https://juejin.cn/post/7291935944117239834
评论
请登录