likes
comments
collection
share

【CountDownLatch】理解AQS后,第一个拿你练手

作者站长头像
站长
· 阅读数 4

前言

CountDownLatch介绍

CountDownLatch典型的用法创建值为n的CountDownLatch。子线程完成执行时,都会在这个锁存器上调用countDown,锁计数器减1。父线程调用await,触发阻塞,直到锁计数器为0,唤醒父线程继续执行。

CountDownLatch UML图

【CountDownLatch】理解AQS后,第一个拿你练手

源码分析

搞懂了AQS 其实也就那么回事,慢慢看。

类的内部类 Sync

private static final class Sync extends AbstractQueuedSynchronizer {
    private static final long serialVersionUID = 4982264981922014374L;

    // 继承AQS 构造方法是指定state的值
    Sync(int count) {
        setState(count);
    }

    // 当前state的值
    int getCount() {
        return getState();
    }

    // 尝试在共享状态获取锁
    protected int tryAcquireShared(int acquires) {
        return (getState() == 0) ? 1 : -1;
    }
    
    // 尝试在共享状态释放锁
    protected boolean tryReleaseShared(int releases) {
        // Decrement count; signal when transition to zero
        for (;;) {
            //获取锁的状态标识
            int c = getState();
            if (c == 0)
                //state=0 代表没有线程占用锁
                return false;
            int nextc = c-1;
            // cas指定state的值
            if (compareAndSetState(c, nextc))
                return nextc == 0;
        }
    }
}

类的构造方法

CountDownLatch内部就是持有Sync,从而完成多线程同步工作。

public CountDownLatch(int count) {
    if (count < 0) throw new IllegalArgumentException("count < 0");
    this.sync = new Sync(count);
}

核心方法 countDown

将锁的的计数器减1。

public void countDown() {
    // 转发给内部类
    sync.releaseShared(1);
}

sync的releaseShared()方法是继承自父类AQS的方法。判断是否释放成功,内部实现是无限循环,通过cas修改 state的值,只有在state被修改为0 才真正返回true。

public final boolean releaseShared(int arg) {
    // sync重写的方法 
    if (tryReleaseShared(arg)) {
        // 共享模式释放锁
        doReleaseShared();
        return true;
    }
    return false;
}

doReleaseShared()真正执行释放锁的操作,唤醒被阻塞的线程,释放的是哪些线程呢?没错就是调用await方法被阻塞的线程。

private void doReleaseShared() {
    for (;;) {
        Node h = head;
        // 节点队列不为空
        if (h != null && h != tail) {
            int ws = h.waitStatus;
            if (ws == Node.SIGNAL) {
                // 尝试将头结点的waitStatus变成0 修改头结点标识
                if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
                    // cas 失败重试
                    continue;            // loop to recheck cases
                //唤醒线程
                unparkSuccessor(h);
            }
            else if (ws == 0 &&
                     !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                continue;                // loop on failed CAS
        }
        if (h == head)                   // loop if head changed
            break;
    }
}

核心方法 await

将会使当前线程等待,直到锁计数器为零。

public void await() throws InterruptedException {
    // 转发给内部类sync
    sync.acquireSharedInterruptibly(1);
}

CountDownLatch对象的await的调用会转发为对Sync的acquireSharedInterruptibly(从AQS继承的方法)方法的调用。

public final void acquireSharedInterruptibly(int arg)
        throws InterruptedException {
    if (Thread.interrupted())
        throw new InterruptedException();
    // 尝试获取锁 
    // tryAcquireShared内部判断state==0 如果等于0 返回1 不等0返回-1
    // 等于0 代表所有子线程都调用了countDown方法 条件不成立 当前线程继续执行即可
    // 不等于0 代表当前线程的state>0 还有线程没有调用countDown方法,需要阻塞当前线程
    if (tryAcquireShared(arg) < 0)
        //阻塞线程
        doAcquireSharedInterruptibly(arg);
}

doAcquireSharedInterruptibly同样也是父类AQS的方法,这里你给我点个赞,我再说一遍。

r>0 具体场景:

1、父线程执行时 state=0, 直接执行。

2、当最后一个子线程调用countDown方法 state=0,父线程被唤醒,继续在循环里面执行 走到场景1。

r<0 具体场景:

1、state>0,还有子线程没有调用countDown方法,所以需要阻塞当前线程。

private void doAcquireSharedInterruptibly(int arg)
    throws InterruptedException {
    // 将当前线程封装成一个节点,并放入等待队列
    // 注意 这里放的是一个共享模式的节点
    final Node node = addWaiter(Node.SHARED);
    boolean failed = true;
    try {
        //循环
        for (;;) {
            // 获取当前结点的前驱结点
            final Node p = node.predecessor();
            if (p == head) {
                // 如果前驱节点是头结点 尝试获取锁 同样是判断state==0
                int r = tryAcquireShared(arg);
                // 如果state=0 则r=1 获取锁成功 代表所有子线程执行完毕
                if (r >= 0) {
                    //设置队列头,并传播。
                    setHeadAndPropagate(node, r);
                    p.next = null; // help GC
                    failed = false;
                    // 退出循环 继续执行
                    return;
                }
            }
            // 这个方法是设置前驱节点的 waitSatus 
            // 前驱节点的waitStatus标记后继节点的状态
            if (shouldParkAfterFailedAcquire(p, node) &&
                // 真正阻塞线程
                parkAndCheckInterrupt())
                throw new InterruptedException();
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}

setHeadAndPropagate方法的作用是设置头节点并进行传播(释放后继节点)。

private void setHeadAndPropagate(Node node, int propagate) {
    Node h = head; // Record old head for check below
    // 重新设置头结点
    setHead(node);

    // CountDownLatch重写了tryAcquireShared 这里传进来的参数是1
    // 原始头结点为空、原始头结点状态不为cancel、新头结点为空、新头结点状态不为cancel
    if (propagate > 0 || h == null || h.waitStatus < 0 ||
        (h = head) == null || h.waitStatus < 0) {
        Node s = node.next;
        if (s == null || s.isShared())
            // 共享模式释放锁
            doReleaseShared();
    }
}

shouldParkAfterFailedAcquire():CAS设置前驱节点状态。

parkAndCheckInterrupt():真正阻塞线程。

如果走到这两步,线程就被挂起,进入阻塞队列。

等待其他线程调用countDown方法,直至state变成0,调用doReleaseShared(),将阻塞队列的所有节点唤醒,线程再继续执行;

总结

countDownLatch是基于AQS的一个实现类,理解了AQS后,再看CountDownLatch就会很清晰。

await方法:判断state的值,如果state>0,就阻塞当前线程,放入CLH等待队列。

countDown方法:通过cas将state的值减1。当state的值减少到0。唤醒等待队列的全部线程。

如果你都看到这了,点个赞吧,欢迎留言沟通交流。