【CountDownLatch】理解AQS后,第一个拿你练手
前言
CountDownLatch介绍
CountDownLatch典型的用法创建值为n的CountDownLatch。子线程完成执行时,都会在这个锁存器上调用countDown,锁计数器减1。父线程调用await,触发阻塞,直到锁计数器为0,唤醒父线程继续执行。
CountDownLatch UML图
源码分析
搞懂了AQS 其实也就那么回事,慢慢看。
类的内部类 Sync
private static final class Sync extends AbstractQueuedSynchronizer {
private static final long serialVersionUID = 4982264981922014374L;
// 继承AQS 构造方法是指定state的值
Sync(int count) {
setState(count);
}
// 当前state的值
int getCount() {
return getState();
}
// 尝试在共享状态获取锁
protected int tryAcquireShared(int acquires) {
return (getState() == 0) ? 1 : -1;
}
// 尝试在共享状态释放锁
protected boolean tryReleaseShared(int releases) {
// Decrement count; signal when transition to zero
for (;;) {
//获取锁的状态标识
int c = getState();
if (c == 0)
//state=0 代表没有线程占用锁
return false;
int nextc = c-1;
// cas指定state的值
if (compareAndSetState(c, nextc))
return nextc == 0;
}
}
}
类的构造方法
CountDownLatch内部就是持有Sync,从而完成多线程同步工作。
public CountDownLatch(int count) {
if (count < 0) throw new IllegalArgumentException("count < 0");
this.sync = new Sync(count);
}
核心方法 countDown
将锁的的计数器减1。
public void countDown() {
// 转发给内部类
sync.releaseShared(1);
}
sync的releaseShared()方法是继承自父类AQS的方法。判断是否释放成功,内部实现是无限循环,通过cas修改 state的值,只有在state被修改为0 才真正返回true。
public final boolean releaseShared(int arg) {
// sync重写的方法
if (tryReleaseShared(arg)) {
// 共享模式释放锁
doReleaseShared();
return true;
}
return false;
}
doReleaseShared()真正执行释放锁的操作,唤醒被阻塞的线程,释放的是哪些线程呢?没错就是调用await方法被阻塞的线程。
private void doReleaseShared() {
for (;;) {
Node h = head;
// 节点队列不为空
if (h != null && h != tail) {
int ws = h.waitStatus;
if (ws == Node.SIGNAL) {
// 尝试将头结点的waitStatus变成0 修改头结点标识
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
// cas 失败重试
continue; // loop to recheck cases
//唤醒线程
unparkSuccessor(h);
}
else if (ws == 0 &&
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue; // loop on failed CAS
}
if (h == head) // loop if head changed
break;
}
}
核心方法 await
将会使当前线程等待,直到锁计数器为零。
public void await() throws InterruptedException {
// 转发给内部类sync
sync.acquireSharedInterruptibly(1);
}
CountDownLatch对象的await的调用会转发为对Sync的acquireSharedInterruptibly(从AQS继承的方法)方法的调用。
public final void acquireSharedInterruptibly(int arg)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
// 尝试获取锁
// tryAcquireShared内部判断state==0 如果等于0 返回1 不等0返回-1
// 等于0 代表所有子线程都调用了countDown方法 条件不成立 当前线程继续执行即可
// 不等于0 代表当前线程的state>0 还有线程没有调用countDown方法,需要阻塞当前线程
if (tryAcquireShared(arg) < 0)
//阻塞线程
doAcquireSharedInterruptibly(arg);
}
doAcquireSharedInterruptibly同样也是父类AQS的方法,这里你给我点个赞,我再说一遍。
r>0 具体场景:
1、父线程执行时 state=0, 直接执行。
2、当最后一个子线程调用countDown方法 state=0,父线程被唤醒,继续在循环里面执行 走到场景1。
r<0 具体场景:
1、state>0,还有子线程没有调用countDown方法,所以需要阻塞当前线程。
private void doAcquireSharedInterruptibly(int arg)
throws InterruptedException {
// 将当前线程封装成一个节点,并放入等待队列
// 注意 这里放的是一个共享模式的节点
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
//循环
for (;;) {
// 获取当前结点的前驱结点
final Node p = node.predecessor();
if (p == head) {
// 如果前驱节点是头结点 尝试获取锁 同样是判断state==0
int r = tryAcquireShared(arg);
// 如果state=0 则r=1 获取锁成功 代表所有子线程执行完毕
if (r >= 0) {
//设置队列头,并传播。
setHeadAndPropagate(node, r);
p.next = null; // help GC
failed = false;
// 退出循环 继续执行
return;
}
}
// 这个方法是设置前驱节点的 waitSatus
// 前驱节点的waitStatus标记后继节点的状态
if (shouldParkAfterFailedAcquire(p, node) &&
// 真正阻塞线程
parkAndCheckInterrupt())
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}
setHeadAndPropagate方法的作用是设置头节点并进行传播(释放后继节点)。
private void setHeadAndPropagate(Node node, int propagate) {
Node h = head; // Record old head for check below
// 重新设置头结点
setHead(node);
// CountDownLatch重写了tryAcquireShared 这里传进来的参数是1
// 原始头结点为空、原始头结点状态不为cancel、新头结点为空、新头结点状态不为cancel
if (propagate > 0 || h == null || h.waitStatus < 0 ||
(h = head) == null || h.waitStatus < 0) {
Node s = node.next;
if (s == null || s.isShared())
// 共享模式释放锁
doReleaseShared();
}
}
shouldParkAfterFailedAcquire():CAS设置前驱节点状态。
parkAndCheckInterrupt():真正阻塞线程。
如果走到这两步,线程就被挂起,进入阻塞队列。
等待其他线程调用countDown方法,直至state变成0,调用doReleaseShared(),将阻塞队列的所有节点唤醒,线程再继续执行;
总结
countDownLatch是基于AQS的一个实现类,理解了AQS后,再看CountDownLatch就会很清晰。
await方法:判断state的值,如果state>0,就阻塞当前线程,放入CLH等待队列。
countDown方法:通过cas将state的值减1。当state的值减少到0。唤醒等待队列的全部线程。
如果你都看到这了,点个赞吧,欢迎留言沟通交流。
转载自:https://juejin.cn/post/7356143704699715611