likes
comments
collection
share

AQS(AbstractQueuedSynchronizer)介绍及源码分析AQS简介以及原理,本文从源码角度深入分析了

作者站长头像
站长
· 阅读数 26
简介

AQS,全称AbstractQueuedSynchronizer(抽象队列同步器),位于java.util.concurrent.locks下,是一个可用来构建锁和同步器的框架。

核心思想

两个核心点:共享资源(state)CLH队列

获取共享资源(通过CAS修改state的值,成功获取代表获取到锁,否则获取锁失败),成功当前工作线程运行,否则添加到队列中等待其他线程释放资源。

备注:CAS为原子操作,会有单独的章节讲解。

重要属性
private volatile int state; // 通过修改此值获取锁、释放锁

// CLH队列,存储了未获取到锁的线程队列
private transient volatile Node head; // 头节点
private transient volatile Node tail; // 尾节点

// AQS 分为共享模式、独占模式
static final class Node {
        static final Node SHARED = new Node(); // 共享模式标记
        static final Node EXCLUSIVE = null// 独占模式标记
        static final int CANCELLED =  1// 线程取消
        static final int SIGNAL    = -1// 释放资源需唤醒后继节点
        static final int CONDITION = -2// 等待condition唤醒
        static final int PROPAGATE = -3// 共享模式下,需要向后传播,资源是否有剩余,唤醒后继节点
        volatile int waitStatus;
        volatile Node prev;
        volatile Node next;
        volatile Thread thread; // 等待锁的线程
        Node nextWaiter; // 等待条件的下个节点,congitionObject用到
    }

AQS 中的方法

获取锁、释放锁其实都是对state变量的修改;加锁方法**:acquire()acquireShared();释放锁方法release()**、releaseShared(),包含shared为共享模式,其他两个为独占模式。

独占模式:只有一个线程持有资源

共享模式:同时可以有多个多线持有资源


源码角度开始分析AQS原理

源码分析
  • 独占模式获取锁(acquire())

    public final void acquire(int arg) {
      // 1、尝试获取资源
      if (!tryAcquire(arg) &&
          // 2、添加到队列
          acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
          // 3、当前线程中断
          selfInterrupt();
    }
    // 模版方法,获取资源交给具体实现类实现
    protected boolean tryAcquire(int arg) {
          throw new UnsupportedOperationException();
    }
    
    

    //addWaiter 当前线程添加到队尾,并返回队列中节点

    private Node addWaiter(Node mode) {
      Node node = new Node(Thread.currentThread(), mode);
      Node pred = tail;
      // 头结点不为空,添加到尾节点
      if (pred != null) {
          node.prev = pred;
          if (compareAndSetTail(pred, node)) {
              pred.next = node;
              return node;
          }
      }
      // 队列为空
      enq(node);
      return node;
    }
    private Node enq(final Node node) {
    // 1. 头结点放空节点(目的:前面有一个线程持有锁)
    // 2. 自旋把当前节点添加到队尾,指向头节点
          for (;;) {
              Node t = tail;
              if (t == null) { // Must initialize
                  if (compareAndSetHead(new Node()))
                      tail = head;
              } else {
                  node.prev = t;
                  if (compareAndSetTail(t, node)) {
                      t.next = node;
                      return t;
                  }
              }
          }
      }
    
    

    // acquireQueued:自旋获取资源并判断是否需要被挂起

    final boolean acquireQueued(final Node node, int arg) 
    
    boolean
    true
    
    try
    
    boolean
    false
    
    for
    
    final
    
    // 前一个节点是头节点,尝试获取资源(再次判断获取到锁的线程是否释放锁)
    
    if
    
    // 获取到锁,设置头节点为当前
    
    
    null
    // help GC
    
    false
    
    return
    
    
    // 尝试获取锁失败后执行到此
              // 判断当前节点是否应该park          if (shouldParkAfterFailedAcquire(p, node) &&              // 中断当前节点线程              parkAndCheckInterrupt())              interrupted = true;      }  } finally {      if (failed)          cancelAcquire(node);  }
    
    // 挂起当前线程
    
    private final boolean parkAndCheckInterrupt() 
    
    this
    
    return
    
    
    

    shouldParkAfterFailedAcquire

    此方法处于死循环中,有效前驱找到设置为signal

    // 作用:将node有效前驱找到,并设为signal,之后返回true就可以阻塞了
    private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
      int ws = pred.waitStatus;
      if (ws == Node.SIGNAL)
          /*
           * 前驱设置SIGNAL,可以阻塞
           **/
          return true;
      if (ws > 0) {
          /*
           * 前驱状态大于0,及cancelled,说明前驱超时或中断,取消了自己。
           * 跳过cancelled节点,找到一个<=0的节点
           */
          do {
              node.prev = pred = pred.prev;
          } while (pred.waitStatus > 0);
          pred.next = node;
      } else {
          /*
           * waitStatus 只能是 0 or PROPAGATE
           * CAS 设置ws为 signal
           */
          compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
      }
      return false;
    }
    
    

    独占模式获取锁流程

  1. tryAcquire() 尝试获取资源(具体实现类实现)

  2. 获取失败,通过addWaiter 方法把线程添加到队尾,并标记为独占模式(Node.EXCLUSIVE)

  3. 再次尝试获取资源(acquireQueued方法);前置节点是头结点,获取资源,成功当前节点为头结点,否则调用shouldParkAfterFailedAcquire

  4. 检测是否应该park;true,中断线程(parkAndCheckInterrupt),否则继续获取资源

  5. 检测是否应该park时,找到前置为signal的节点

  6. 最终tryAcquire 返回false,acquireQueued 返回true,中断当前线程

  • 独占模式释放锁

    public final boolean release(int arg) {
      // 尝试释放资源,具体实现类实现
      if (tryRelease(arg)) {
          Node h = head;
          if (h != null && h.waitStatus != 0)
              // 唤醒后继节点
              unparkSuccessor(h);
          return true;
      }
      return false;
    }
    protected int tryAcquireShared(int arg) {
          throw new UnsupportedOperationException();
      }
    
    

    // unparkSuccessor:获取下一个获取资源的线程,唤醒指定节点的后继节点

    private void unparkSuccessor(Node node) {
      // 获取首节点状态
      int ws = node.waitStatus;
      // 如果小于0 , 设置0,首节点准备释放同步状态,相当于清除此状态
      if (ws < 0)
          compareAndSetWaitStatus(node, ws, 0);
    
      // 找到后继节点,如果为空或大于0,找到队列中靠前有效节点变为首节点后继节点
      Node s = node.next;
      if (s == null || s.waitStatus > 0) {
          s = null;
          for (Node t = tail; t != null && t != node; t = t.prev)
              if (t.waitStatus <= 0)
                  s = t;
      }
      // 后继节点不为空,唤醒
      if (s != null)
          LockSupport.unpark(s.thread);
    }
    
    
    独占模式释放锁流程
  1. tryRelease 尝试释放资源(具体子类实现)

  2. 释放后,头结点不为空同时状态不为0,找到需要唤醒后继节点

  • 共享模式获取锁

    public final void acquireShared(int arg) {
      // 尝试获取锁,具体实现类实现;
      if (tryAcquireShared(arg) < 0)
          // 小于0,放入队列
          doAcquireShared(arg);
    }
    protected int tryAcquireShared(int arg) {
          throw new UnsupportedOperationException();
      }
    
    
    private void doAcquireShared(int arg) {
      // 线程加入到队尾,设置共享模式标记
      final Node node = addWaiter(Node.SHARED);
      boolean failed = true;
      try {
          boolean interrupted = false;
          for (;;) {
              final Node p = node.predecessor();
              if (p == head) {
                  int r = tryAcquireShared(arg);
                  if (r >= 0) {
                      // 设置头节点,唤醒后继节点获取资源
                      setHeadAndPropagate(node, r);
                      p.next = null; // help GC
                      if (interrupted)
                          selfInterrupt();
                      failed = false;
                      return;
                  }
              }
              if (shouldParkAfterFailedAcquire(p, node) &&
                  parkAndCheckInterrupt())
                  interrupted = true;
          }
      } finally {
          if (failed)
              cancelAcquire(node);
      }
    }
    
    

    // setHeadAndPropagate:

    private void setHeadAndPropagate(Node node, int propagate) 
    
    // Record old head for check below
    
    // 设置头结点
      // 有剩余资源  // waitStatus &lt; 0 标识有后继节点需要被唤醒  
      if (propagate > 0 || h == null || h.waitStatus < 0 ||      (h = head) == null || h.waitStatus < 0) {      
      Node s = node.next;      
      if (s == null || s.isShared())
        doReleaseShared();  
    

}

```

**共享模式获取锁流程**

  1. tryAcquireShared() 尝试获取资源(具体实现类实现)
  2. 获取失败,通过addWaiter 方法把线程添加到队尾,并标记为共享模式(Node.SHARED)
  3. 再次尝试获取资源(acquireQueued方法);前置节点是头结点,获取资源,资源大于等于0,调用setHeadAndPropagate,唤醒后继节点,否则调用shouldParkAfterFailedAcquire
  4. 检测是否应该park;true,中断线程(parkAndCheckInterrupt),否则继续获取资源
  5. 检测是否应该park时,找到前置为signal的节点
  • 共享模式释放锁

    public final boolean releaseShared(int arg) {
      if (tryReleaseShared(arg)) {
          doReleaseShared();
          return true;
      }
      return false;
    }
    
    

    // doReleaseShared

    private void doReleaseShared() {
          for (;;) {
              Node h = head;
              // 队列至少2个node
              if (h != null && h != tail) {
                  int ws = h.waitStatus;
                  if (ws == Node.SIGNAL) {
                      // 需要被唤醒
                      if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
                          continue;            // loop to recheck cases
                    // 唤醒后继节点  
                    unparkSuccessor(h);
                  }
                // 已经被唤醒或者即将被唤醒
                  else if (ws == 0 &&
                           !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                      continue;                // loop on failed CAS
              }
              // head 变化的情况:线程被唤醒,重新设置head
              if (h == head)                   // loop if head changed
                break;
          }
      }
    
    

获取锁、释放锁基本代码已经梳理完,下面再看重要的内部类ConditionObject

ConditionObject

是AQS中的内部类,实现了Condition,Condition提供了两个功能:等待唤醒,与Object中等待唤醒类似;

ConditionObject就是条件队列,阻塞线程或唤醒线程

  • 使用

Condition condition = new ConditionObject();

condition.wait();// 等待

condition.wait();// 唤醒

示例

ReentrantLock lock = new ReentrantLock();
        Condition condition = lock.newCondition();

        Thread consumer = new Thread(() -> {
            String name = Thread.currentThread().getName();
            try {
                lock.lock();
                System.out.println(name + " 获取到锁 并进入条件等待");
                try {
                    condition.await();// 阻塞,直到被唤醒
                } catch (InterruptedException e) {
                    throw new RuntimeException(e);
                }
                System.out.println(name + " 被唤醒");
            }finally {
                lock.unlock();
                System.out.println(name + " 释放锁");
            }

        },"消费者");


        consumer.start();

        Thread produce = new Thread(() -> {
            String name = Thread.currentThread().getName();
            try {
                lock.lock();
                System.out.println(name + " 获取到锁 三秒后唤醒");
                try {
                    Thread.sleep(3000);
                    condition.signal();// 调用后,等待的线程会被唤醒
                } catch (InterruptedException e) {
                    throw new RuntimeException(e);
                }
                System.out.println(name + " 通知等待线程");
            }finally {
                lock.unlock();
                System.out.println(name + " 释放锁");
            }

        },"生产者");
        produce.start();

结果:
  消费者 获取到锁 并进入条件等待
  生产者 获取到锁 三秒后唤醒
  生产者 通知等待线程
  生产者 释放锁
  消费者 被唤醒
  消费者 释放锁
  • 原理

    内部维护等待队列,调用wait方法等待时,节点加入到等待队列,并释放锁,当调用signal唤醒时,获取等待队列第一个节点放到同步队列,等待被其他线程释放锁唤醒。

  • 源码

  • 类结构

      public class ConditionObject implements Condition, java.io.Serializable {
            /** First node of condition queue. */
            private transient Node firstWaiter; // 等待头节点
            /** Last node of condition queue. */
            private transient Node lastWaiter; // 等待尾节点
      }
    
    
  • 源码分析

    等待

    public final void await() throws InterruptedException {
                // 响应中断
                if (Thread.interrupted())
                    throw new InterruptedException();
                // 添加条件队列
                Node node = addConditionWaiter();
                // 释放锁
                int savedState = fullyRelease(node);
                int interruptMode = 0;
                // 如果节点不在同步队列,无竞争锁资格
                while (!isOnSyncQueue(node)) {
                  // 挂起
                    LockSupport.park(this); // 唤醒后,从下面开始执行
                  // 如果线程中断退出
                    if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
                        break;
                }
                // 循环结束说明:节点到阻塞队列或线程中断
                // 再次获取锁
                if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
                    interruptMode = REINTERRUPT;
                // 线程被中断唤醒,断开node和后继
                if (node.nextWaiter != null) // clean up if cancelled
                    // 中断,移动到同步队列
                    unlinkCancelledWaiters();
                if (interruptMode != 0)
                    // 被中断唤醒的处理
                    reportInterruptAfterWait(interruptMode);
            }
    
    

    addConditionWaiter:添加到等待队列尾节点

    private Node addConditionWaiter() {
                Node t = lastWaiter;
                // If lastWaiter is cancelled, clean out.
                if (t != null && t.waitStatus != Node.CONDITION) {
                    unlinkCancelledWaiters();
                    t = lastWaiter;
                }
                Node node = new Node(Thread.currentThread(), Node.CONDITION);
                if (t == null)
                    firstWaiter = node;
                else
                    t.nextWaiter = node;
                lastWaiter = node;
                return node;
            }
    
    
    // 从头节点剔除非condition的节点
    private void unlinkCancelledWaiters() {
        Node t = firstWaiter;
        Node trail = null;
        while (t != null) {
            Node next = t.nextWaiter;
            if (t.waitStatus != Node.CONDITION) {
                t.nextWaiter = null;
                if (trail == null)
                    firstWaiter = next;
                else
                    trail.nextWaiter = next;
                if (next == null)
                    lastWaiter = trail;
            }
            else
                trail = t;
            t = next;
        }
    }
    
    

    // 加入等待队列,需要释放占有的独占锁fullRelease

    final int fullyRelease(Node node) {
            boolean failed = true;
            try {
                // 获取state值
                int savedState = getState();
                // 释放
                if (release(savedState)) {
                    failed = false;
                    return savedState;
                } else {
                    throw new IllegalMonitorStateException();
                }
            } finally {
                // 获取失败,节点取消,并抛异常
                if (failed)
                    node.waitStatus = Node.CANCELLED;
            }
        }
    
    

    注意:必须先获取锁,不然调用wait就会设置为取消

    isOnSyncQueue:是否在阻塞队列中(AQS),

    final boolean isOnSyncQueue(Node node) {
            // 节点在条件队列:同步队列使用pre和next,条件队列使用nextWaiter
            if (node.waitStatus == Node.CONDITION || node.prev == null)
                return false;
            // 同步队列
            if (node.next != null// If has successor, it must be on queue
                return true;
            // 状态是0.prev不为null
            return findNodeFromTail(node);
        }
    
    
    private boolean findNodeFromTail(Node node) {
        Node t = tail;
        for (;;) {
            if (t == node)
                return true;
            if (t == null)
                return false;
            t = t.prev;
        }
    }
    
    
private static final int REINTERRUPT =  1;// wait方法退出从新调用中断
     private static final int THROW_IE    = -1;//wait方法退出抛出异常
// 中断先于唤醒:THROW_IE   唤醒先于中断:REINTERRUPT
private int checkInterruptWhileWaiting(Node node) {
         return Thread.interrupted() ?
             (transferAfterCancelledWait(node) ? THROW_IE : REINTERRUPT) :
             0;
     }

final boolean transferAfterCancelledWait(Node node) {

     if (compareAndSetWaitStatus(node, Node.CONDITION, 0)) {
       // 节点还在条件队列
         enq(node);// 放于同步队列
         return true;
     }
     // 在同步队列或入队
     while (!isOnSyncQueue(node))
         Thread.yield();
     return false;
 }
**唤醒**

```
public final void signal() {
            // 非独占模式,抛异常
            if (!isHeldExclusively())
                throw new IllegalMonitorStateException();
            Node first = firstWaiter;
            if (first != null)
              // 唤醒等待队列第一个节点
                doSignal(first);
        }

```
```
private void doSignal(Node first) {
            do {
                if ( (firstWaiter = first.nextWaiter) == null)
                    lastWaiter = null;
              // 头节点移除
                first.nextWaiter = null;
            } while (!transferForSignal(first) &&
                     (first = firstWaiter) != null);
        }

```
```
final boolean transferForSignal(Node node) {
   // 节点值改为0
    if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
        return false;

   // 添加到同步队列
    Node p = enq(node);
    int ws = p.waitStatus;
    if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
        LockSupport.unpark(node.thread);
    return true;
}
```
  • 流程总结

    等待:必须先获取锁

    • 添加到等待队列
    • 释放占有的独占锁
    • 判断是否在同步队列,不在同步队列,挂起 要退出这个wait,有两个条件:
      • 逻辑到break,及线程被中断
      • 节点被移到同步队列,及另外线程调用signal或signalAll
    • 尝试再获取锁(再次说明调用wait必须获取锁) 唤醒
    • 获取第一个头节点
    • 移动节点到同步队列
    • 等待被唤醒
总结

本文从源码分析了AQS获取锁、释放锁原理,以及ConditionObject原理及流程;AQS作为一个同步器的框架,提供了模版方法,可以实现不同的同步机制;后续会分析通过AQS构造的同步器,如:ReentrantLock、Semaphore、ReentrantReadAndWriteLock等等。

转载自:https://juejin.cn/post/7369519814640304143
评论
请登录