最好懂的AQS核心源码
最好懂的AQS核心源码
相必大家应该都在各个八股文里听过AQS吧。只要你简历里写上你会多线程,这个知识点几乎是必问的。
在阅读之前,读者希望你们对ReentrantLock和CountDownLounch()两个类有过使用经历。
如果中途有什么名词不能理解,可以去文章底部答疑区域寻找答案哟~
什么是AQS?
AQS的全称就是 AbstractQueuedSynchronizer,翻译过来就是抽象队列同步器,这是JUC包中的一个类。
这个类很关键,很多并发工具类都是基于这个类实现的,他也是JUC同步框架的基石。
实现类需要去继承该类,并重写指定方法就可以实现一套线程同步机制。
我们经常用到的ReentrantLock和CountDownLatch等也都是基于AQS实现的。
从生活角度理解AQS
AQS的设计就和取钱的场景很类似,1号,2号,3号选手都在银行取钱,柜台只有一个,因此同一时间只有一个人可以取钱。1号在取钱的时候,2号,3号都会在等待区等待。1号取完后,会通知2号,2号再去取。
AQS框架
AQS核心
AQS的核心只有两个东西:
- 一个用volatile修饰的state变量(最后有讲解)
- 一个CLH(三个人名缩写)的双向队列。
每一个线程获取锁的时候,就会试图去修改state变量,如果修改成功就能获取锁,如果失败,就自动加入双向队列的末尾,等待持有锁的线程对其进行释放。
这里就可以理解成银行取钱,只有一张票,获得票的人可以取钱,取完后,把这个票给另一个在等待的人,另一个人就能取钱了
其中,AQS对state的访问设置了三种方法:
- getState()
- setState()
- compareAndSetState()
其中compareAndSetState 就是 CAS 法的思想体现。它结合了CAS自旋 volatile 变量(最后有讲解)。
两种资源共享方式
AQS定义了两种资源共享方式:Exclusive(独占的,仅一个线程可以执行,如ReentrantLock)和Share(共享的,如CountDownLatch)
不同自定义同步器争用共享资源的方式不同。在实现自定义同步器的时候,只需要实现state的释放和获取的方式即可。
像ReentrantLock,CountDownLatch都是jdk里实现了以下方法的自定义同步器
自定义同步器需要实现以下几种方法:
独占式(比如ReentrantLock)
- tryAcquire(int) : 独占的资源共享方式。尝试获取资源,成功返回true,失败返回false
- tryRelease(int) : 独占的资源共享方式。尝试释放资源,成功返回true,失败返回false
共享式(比如CountDownLatch)
- tryAcquireShared(int):共享的资源共享方式。尝试获取资源,负数表示失败,0表示成功,但没有剩余资源,正数表示成功,有剩余资源。
- tryReleaseShared(int):共享的资源共享方式。尝试释放资源。
ReentrantLock为例(独占)
以ReentrantLock为例,state初始化是0,表示未锁定状态。A线程调用ReentrantLock的lock()方法时,同时会调用tryAcquire()方法,抢占锁,并且将state+1。此后所有的线程去执行tryRelease方法都会失败,直到A线程调用ReentrantLock的unLock()方法,也就是调用ryRelease(int)方法,state会变成0,之后其他线程才能获得到锁。当然A线程在锁的时候,也是可以重复获取锁的,每重复获取一次,state+1,在释放的时候,获得多少次就要减多少次。
CountDownLatch为例(共享)
以CountDownLatch为例,当你在构造器里面传入一个参数n的时候,state也会被初始化成n。每个子线程countDown()一次,state就会通过CAS减1。当state=0的时候,就会把主调用线程unpark()掉,然后主调用线程也会从await()处返回,继续后续动作。
源码布局
下面我们要说的源码,以ReentrantLock为例,它的核心毫无疑问是lock和unLock方法,而这两个方法的底层则是AQS中的acquire和release方法。
本文只说明讲解独占式的核心源码。
源码细究(acquire)
AQS最核心的地方就是 acquire(int) 方法了
public final void acquire(int){
if(!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE,arg))){
selfInterrupt();
}
}
同时,它也是ReentrantLock 的lock()方法的底层
我们来对这个源码进行一个解读:
- tryAcquire()方法尝试直接获取资源,如果成功就返回。
- addWaiter()方法则是把该线程加入等待队列的队尾,并且标记为独占模式
- acquireQueued()让线程阻塞在等待队列里面获取资源,直到获取到资源才返回,如果在等待的过程被中断,就返回false,否则返回true。
- 如果获取资源失败了或者等待过程被中断过,则需要自我中断一下。
1.tryAcquire()
protected boolean tryAcquire(int arg) {
throw new UnsupportedOperationException();
}
我们不难发现,这个tryAcquire它啥都没干!
我们上文提到过,AQS只是一个框架,需要实现它的类去实现对应的关键方法,如果没有实现就会抛出异常,这里只是定义了一个接口。
具体资源的获取逻辑要由自定义的同步器去实现。至于是否能重入,就要看自定义同步器的的设计了。
2.addWaiter(Node mode)
这一步上述提到,主要就是对当前的节点进行初始化,并加入队列中等待。运用到了CAS的思想。
private Node addWaiter(Node mode) {
//以给定模式构造结点。mode有两种:EXCLUSIVE(独占)和SHARED(共享)
Node node = new Node(Thread.currentThread(), mode);
//尝试快速方式直接放到队尾。
Node pred = tail;
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
//上一步失败则通过enq入队。
enq(node);
return node;
}
private Node enq(final Node node) {
//CAS"自旋",直到成功加入队尾
for (;;) {
Node t = tail;
if (t == null) { // 队列为空,创建一个空的标志结点作为head结点,并将tail也指向它。
if (compareAndSetHead(new Node()))
tail = head;
} else {//正常流程,放入队尾
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
不懂CAS自旋的读者,请看最下面的详解
3.acquireQueued(Node,int)
acquiredQueued中传入了刚刚被加入等待队列末尾的线程节点。线程进入这一步后,要做的事情也就是进入等待区休息,直到别的线程释放资源,将自己唤醒。
final boolean acquireQueued(final Node node,int arg){
//标记是否成功拿到资源
boolean failed = true;
try{
//标记是否在等待过程被中断过
boolean interrupted = false;
//自旋
for(;;){
//拿到当前节点的前驱
final Node p = node.processor();
//如果这个前驱是head节点,说明当前节点是第二位。说明head节点释放完资源唤醒当前节点
if(p == head && tryAcquire(arg)){
//如果尝试获取成功了,就将当前节点设置为头节点
setHead(node);
//将头节点的next指针设置为空,这样就能被垃圾回收了
p.next = null;
//成功获取到了资源
failed = false;
return interrupted;
}
//如果自己可以休息了,就通过park()进入waiting状态,直到被unpark()。如果不可中断的情况下被中断了,那么会从park()中醒过来,发现拿不到资源,从而继续进入park()等待。
if(shouldParkAfterFailedAcquire(p,node) && parkAndCheckInterrupt()){
//等待过程被中断了,只要有一次,就标记为true
interrupt = true;
}
}
}finally{
//如果等待过程中timout,没有成功获取资源,就取消等待。
if(falied){
cancelAcquire(node);
}
}
}
接下来我们看一下这个过程中调用的两个函数shouldParkAfterFailedAcquire和parkAndCheckInterrupt吧
3.1.shouldParkAfterFailedAcquire(Node pred, Node node)
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus;//拿到前驱的状态
if (ws == Node.SIGNAL)
//如果已经告诉前驱拿完号后通知自己一下,那就可以安心休息了
return true;
if (ws > 0) {
/*
* 如果前驱放弃了,那就一直往前找,直到找到最近一个正常等待的状态,并排在它的后边。
* 注意:那些放弃的结点,由于被自己“加塞”到它们前边,它们相当于形成一个无引用链,稍后就会被保安大叔赶走了(GC回收)!
*/
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
//如果前驱正常,那就把前驱的状态设置成SIGNAL,告诉它拿完号后通知自己一下。有可能失败,人家说不定刚刚释放完呢!
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
负值表示结点处于有效等待状态,而正值表示结点已被取消。所以源码中很多地方用>0、<0来判断结点的状态是否正常
总的来说,它干的事情就是:如果前驱节点状态不是SIGNAL,就去等待队列前面慢慢找,直到找到一个节点的waitStatus > 0的地方安家
3.2.parkAndCheckInterrupt()
如果线程找到地方安家了,就安心去休息了。
private final boolean parkAndCheckInterrupt() {
LockSupport.park(this);//调用park()使线程进入waiting状态
return Thread.interrupted();//如果被唤醒,查看自己是不是被中断的。
}
4.小结
看完这些后,我们再重新看这个核心源码
public final void acquire(int){
if(!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE,arg))){
selfInterrupt();
}
}
我们总结下流程:
- 自定义同步器自定义的tryAcquire()方法尝试获取资源,如果成功直接返回
- 如果没有成功就将线程加入等待队列尾部,标记为独占模式(addWaiter(Node.EXCLUSIVE,arg))
- acquireQueued()方法会让线程在等待队列里休息,有机会被unpark()唤醒尝试获取资源,如果等待过程被中断,返回true,否则返回false
- 如果线程被中断,就会获取资源后自我中断
这也是ReentrantLock.lock()的流程,其它的lock()源码也和这个大同小异。
源码细究(Release)
我们刚刚说了acquire操作,下面我们来讲讲它的反向操作release操作吧
这也是独占模式下,释放共享资源(state)的顶层入口。他会唤醒其它线程去获取资源,这也正是unlock()的语义
1.tryRelease(int)
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;//找到头结点
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);//唤醒等待队列里的下一个线程
return true;
}
return false;
}
当然,这个tryRelease()方法也是要实现这个框架的类去自己实现的。
protected boolean tryRelease(int arg) {
throw new UnsupportedOperationException();
}
2.unparkSuccessor(Node)
private void unparkSuccessor(Node node) {
//这里,node一般为当前线程所在的结点。
int ws = node.waitStatus;
if (ws < 0)//置零当前线程所在的结点状态,允许失败。
compareAndSetWaitStatus(node, ws, 0);
Node s = node.next;//找到下一个需要唤醒的结点s
if (s == null || s.waitStatus > 0) {//如果为空或已取消
s = null;
for (Node t = tail; t != null && t != node; t = t.prev) // 从后向前找。
if (t.waitStatus <= 0)//从这里可以看出,<=0的结点,都是还有效的结点。
s = t;
}
if (s != null)
LockSupport.unpark(s.thread);//唤醒
}
总的来说,这个函数用unpark()方法去唤醒等待队列里面没有放弃的线程
答疑
什么是volatile?
我们刚刚提到,这个state字段是用volatile修饰的。
Java内存模型里规定了,所有的变量都存储在主内存中,每条线程都有自己的工作内存,线程的工作内存中保留了被该线程用到的变量(这些变量拷贝自主存),线程间变量的传递需要由主存传递。
这就好像,你和你的小伙伴被关在了不同的小黑屋里,这些小黑屋外面有个人看守,你和其它小伙伴需要通过这个看守人来通信。
笔者觉得这个思想也是借鉴了操作系统里线程通信的理念。
回到正题,什么是volatile?
举个例子:现在有两个线程。
第一个线程执行两个步骤:
int i = 0;
i = 10
第二个线程执行
j=i
由上述讲到的java内存模型,我们得知:第一个线程从主存中取得i的变量,写入工作内存,将 i
写成10.
然而,还没等第一个线程把 i
写回主存,主存中的 i
仍然是10,第二个线程就把这个i
拿走了,那么最后结果 y = i = 0
这个过程,不难发现,j
对于i
的修改是不可见的。为了确保变量的可见性,我们需要volatile
volatile所修饰的变量能保证,所修改的变量回立刻更新到主存,感兴趣的同学可以去深入了解。
什么是CAS自旋 volatile 变量?
拿JUC包里的AtomicInteger类里面的常用的getAndIncrement方法举例:
public final int getAndIncrement() {
for (;;) {
int current = get(); // 取得AtomicInteger里存储的数值
int next = current + 1; // 加1
if (compareAndSet(current, next)) // 调用compareAndSet执行原子更新操作
return current;
}
}
这里我们看到了compareAndSet方法(CAS)。
我们点进这个方法的源码,发现,compareAndSet()方法中调用的是sun.misc.Unsafe.compareAndSwapInt(Object obj, long valueOffset, int expect, int update)方法,compareAndSwapInt 是用native修饰的,说明它是基于的是CPU 的 原语,性能好
它使用的是基于冲突检测的乐观并发策略。我们都知道,乐观并发策略在线程数大的时候失败的概率也会上升。
什么是park()和unpark()?
park()和unPark()是LockSupport类里面的方法
//暂停当前线程
LockSupport.park();
//恢复某个线程
LockSupport.unpark(暂停线程对象);
举个例子就懂啦
Thread thread = new Thread(() -> {
System.out.println("start.....");
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("park....");
LockSupport.park();
System.out.println("resume.....");
});
thread.start();
Thread.sleep(2000);
System.out.println("unpark....");
LockSupport.unpark(thread);
这段代码先开启一个线程,然后休眠,让主线程先暂停,之后休眠后,再让开启的线程暂停。
运行结果:
start.....
park....
unpark....
resume.....
参考文章:
转载自:https://juejin.cn/post/7229965619570262075