likes
comments
collection
share

【Java计时器】CountDownLatch

作者站长头像
站长
· 阅读数 5

【Java计时器】CountDownLatch


😃 AQS(一) AQS(二) AQS(三) 😮

前言

CountDownLatch是基于AQS进行实现的一个计时器,如果阅读了上面AQS系列的文章,相信对CountDownLatch里头的区区300多行代码,会有一个更加深刻的理解。


如何使用

CountDownLatch计时器的作用可以让多个线程先各自完成自己的任务再继续执行接下来的工作。实现方式如下:

这里先模拟构造了一个查询线程,其中睡个2s钟模拟操作,在这之后,计数器会进行减一的操作。

class InquireThread implements Runnable {
    private CountDownLatch countDownLatch;
    public InquireThread(CountDownLatch countDownLatch) {
        this.countDownLatch = countDownLatch;
    }
    @Override
    public void run() {
        System.out.println("查询开始...当前线程号: " + Thread.currentThread().getId());
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        countDownLatch.countDown(); // 计数器减一
    }
}

在CountDownLatchDemo 这里,我的计数器countDownLatch 初始化为5,意味着我可以让5条线程共享锁。然后,创建线程的时候,把它作为参数进行传递。然后调用await进行等待,待计数器置为0,执行compute方法。

/***
 *
 * @Author:fsn
 * @Date: 2020/4/2 14:24
 * @Description 倒计时器
 * 可以联系join() 一起理解, CountDownLatch可以使当前线程,
 * 不用等到子线程全部执行完才开始继续执行, 只要子线程执行的
 * 过程中检测到计时器为0, 就会唤醒当前线程
 */


public class CountDownLatchDemo {
    private CountDownLatch countDownLatch = new CountDownLatch(5);
    private int count = 6;
    
    void test() throws InterruptedException {
        Thread thread = null;
        long result;
        while ((count--) > 0) {
            thread= new Thread(new InquireThread(countDownLatch));
            thread.start();
        }
        countDownLatch.await();
        compute();
    }

    private void compute() {
        System.out.println("任务完成");
    }

    public static void main(String[] args) throws InterruptedException {
        CountDownLatchDemo demo = new CountDownLatchDemo();
        demo.test();
    }
}

分析

构造方法

我们可以从构造方法入手,CountDownLatch提供的构造方法需要一个int参数,该参数用于初始化Syn内部类,该类也是继承于AQS。

 public CountDownLatch(int count) {
        if (count < 0) throw new IllegalArgumentException("count < 0");
        this.sync = new Sync(count);
    }

Sync内部类的构造方法如下所示,它会调用父类AQS的setState,初始化同步状态。

Sync(int count) {
      setState(count);
}

/**
  * Sets the value of synchronization state.
  * This operation has memory semantics of a {@code volatile} write.
  * @param newState the new state value
  */
protected final void setState(int newState) {
     state = newState;
}

await

在初始化计时器后,我们会调用 countDownLatch.await()进行阻塞。该方法如下,如果阻塞的过程中,某些线程发生中断,它会抛出中断异常。另外,如果计时器迟迟没有归0,又没有线程中断,它会一直在这里等到天荒地老(没有设置超时机制的话)。

可以尝试将上述例子中,countDownLatch初始化的线程数调大或者创建线程数小于countDownLatch初始化的线程数。

    public void await() throws InterruptedException {
        sync.acquireSharedInterruptibly(1);
    }

acquireSharedInterruptibly

接着就是acquireSharedInterruptibly方法,该放方法先进行线程中断的判断,然后通过tryAcquireShared方法获取共享锁。

tryAcquireShared和我们之前分析的 tryAcquire方法一样,AQS都有没有提供具体实现,而是由子类去做具体的操作。

 public final void acquireSharedInterruptibly(int arg)
            throws InterruptedException {
        if (Thread.interrupted())
            throw new InterruptedException();
        if (tryAcquireShared(arg) < 0)
            doAcquireSharedInterruptibly(arg);
 }

所以tryAcquireShared的具体操作是在CountDownLatch类中,它会获取同步状态,并判断是否为0。为0代表获取成功,则返回1,反之-1。

貌似有个问题。。这个acquires似乎这里没有用到啊。。那还传进来做啥?小伙伴可以先想想,我把自己的想法放在结束语里边。看看能否达成共识!

  protected int tryAcquireShared(int acquires) {
            return (getState() == 0) ? 1 : -1;
  }

doAcquireSharedInterruptibly

当tryAcquireShared 返回-1的时候,getState()返回的同步状态不为0,意味着还有线程占着茅坑,获取锁失败,所以得进入同步队列中。即接着就是doAcquireSharedInterruptibly方法。是不是觉得代码和前面分析的acquireQueued颇有相似之处。

不同的是,这里tryAcquireShared尝试获取的是共享锁,当同步状态不为0,则会尝试挂起。另外,setHeadAndPropagate也是不一样的地方,这里不仅会设置头部还会向后进行传播。

拓展:doAcquireSharedInterruptibly方法和doAcquireShared类似,只不过,该方法才遇到中断后会抛出中断异常。

private void doAcquireSharedInterruptibly(int arg)
        throws InterruptedException {
        final Node node = addWaiter(Node.SHARED);
        boolean failed = true;
        try {
            for (;;) {
                // 获取节点的上一个节点
                final Node p = node.predecessor();
                if (p == head) {
                    // 尝试获取共享锁
                    int r = tryAcquireShared(arg);
                    // r = (getState() == 0) ? 1 : -1;
                    // 如果同步状态等于0,返回1,反之返回-1
                    if (r >= 0) {
                        // 设置头部和传播
                        setHeadAndPropagate(node, r);
                        p.next = null; // help GC
                        failed = false;
                        return;
                    }
                }
                // 判断获取锁失败之后是否可以进入等待唤醒状态
                // 该方法保证当前线程的前驱节点的waitStatus属性值为SIGNAL,
                // 从而保证了自己挂起后,前驱节点会负责在合适的时候唤醒自己。
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    throw new InterruptedException();
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }

setHeadAndPropagate

当进入该方法时,意味着获取共享锁成功。此时同步状态为0了,此方法在共享模式下,如果propagate > 0或 传播(propagate )状态已设置,会检查后继者是否正在等待。

关于传播状态,我们之前已经说过,这里回忆一下吧:

  1. CANCELLED :1 表明一个等待的线程被取消了
  2. SIGNAL : -1 表明一个等待线程的下一个线程需要被唤醒
  3. CONDITION: -2 当前线程正在等待中
  4. PROPAGATE :-3 下一次申请共享锁方法应该被无条件的传播
  5. 0:初始值
 /**
     * Sets head of queue, and checks if successor may be waiting
     * in shared mode, if so propagating if either propagate > 0 or
     * PROPAGATE status was set.
     *
     * @param node the node
     * @param propagate the return value from a tryAcquireShared
     */
    private void setHeadAndPropagate(Node node, int propagate) {
        Node h = head; // Record old head for check below
        setHead(node);
        /*
         * Try to signal next queued node if:
         *   Propagation was indicated by caller,
         *     or was recorded (as h.waitStatus either before
         *     or after setHead) by a previous operation
         *     (note: this uses sign-check of waitStatus because
         *      PROPAGATE status may transition to SIGNAL.)
         * and
         *   The next node is waiting in shared mode,
         *     or we don't know, because it appears null
         *
         * The conservatism in both of these checks may cause
         * unnecessary wake-ups, but only when there are multiple
         * racing acquires/releases, so most need signals now or soon
         * anyway.
         */
        if (propagate > 0 || h == null || h.waitStatus < 0 ||
            (h = head) == null || h.waitStatus < 0) {
            Node s = node.next;
            if (s == null || s.isShared())
                doReleaseShared();
        }
    }

setHead(node)之前也做了分析了,它将获取锁的节点设置为头部节点,将线程和prev 节点置为null。

private void setHead(Node node) {
        head = node;
        node.thread = null;
        node.prev = null;
    }

然后是 Node s = node.next,该行代码会寻找当前节点的下一个节点,如果下一个节点为null或者是共享节点(通过如下isShared方法进行判断)则进入doReleaseShared方法。

 /**
   * Returns true if node is waiting in shared mode.
  */
        final boolean isShared() {
            return nextWaiter == SHARED;
        }

doReleaseShared

该方法做了个啥事情呢?通过注释,我们可以知道它的工作是释放共享锁,并确保能够唤醒继任者。

首先,获取头结点的状态,并判断是否为SIGNALif (ws == Node.SIGNAL),如果是的话CAS操作修改为0,如果成功唤醒继任者,不成功继续自旋。如果状态已经是0了,则尝试设置为PROPAGATE状态(下一次申请共享锁方法应该被无条件的传播 )

最后,会判断头部节点是否被修改了,若被修改了说明有其他获取了锁,为了保证正确的传播,继续循环。反之,跳出doReleaseShared方法。最后在doAcquireSharedInterruptibly方法处return。

 /**
     * Release action for shared mode -- signals successor and ensures
     * propagation. (Note: For exclusive mode, release just amounts
     * to calling unparkSuccessor of head if it needs signal.)
     */
    private void doReleaseShared() {
        /*
         * Ensure that a release propagates, even if there are other
         * in-progress acquires/releases.  This proceeds in the usual
         * way of trying to unparkSuccessor of head if it needs
         * signal. But if it does not, status is set to PROPAGATE to
         * ensure that upon release, propagation continues.
         * Additionally, we must loop in case a new node is added
         * while we are doing this. Also, unlike other uses of
         * unparkSuccessor, we need to know if CAS to reset status
         * fails, if so rechecking.
         */
        for (;;) {
            Node h = head;
            // 要唤醒继任者(下一个)节点,所以h不能为null,也不能等于尾部节点
            if (h != null && h != tail) {
                // 如果节点为SIGNAL,会通过CAS将状态置为0,然后唤醒继任者
                int ws = h.waitStatus;
                if (ws == Node.SIGNAL) {
                    if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
                        continue;            // loop to recheck cases
                    unparkSuccessor(h);
                }
                // 如果节点的状态为0,则CAS尝试设置为PROPAGATE
                else if (ws == 0 &&
                         !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                    continue;                // loop on failed CAS
            }
            // 多线程环境下, 头结点可能被修改, 此时说明其他节点获取到了锁
            if (h == head)                   // loop if head changed
                break;
        }
    }

countDown

CountDownLatch里头的await方法分析完了还有一件事,就是我们的计时器进行同步状态的减操作,不然,同步状态一直不会为0,await不设置超时则会一直卡壳。所以接下来分析的就是CountDownLatch操作。

如下所示,每次使用countDown,都会进行减一,而实际操作在于releaseShared

public void countDown() {
        sync.releaseShared(1);
    }

releaseShared

其中,tryReleaseShared方法可以和上文的tryAcquireShared方法对比理解。如果减一成功,进行释放共享锁的操作。

而关于这个doReleaseShared方法,我们已经在面分析过了,这也是为什么该方法设置state的时候需要CAS的原因之一,因为该方法有两个入口,会产生线程不安全。

 /**
     * Releases in shared mode.  Implemented by unblocking one or more
     * threads if {@link #tryReleaseShared} returns true.
     *
     * @param arg the release argument.  This value is conveyed to
     *        {@link #tryReleaseShared} but is otherwise uninterpreted
     *        and can represent anything you like.
     * @return the value returned from {@link #tryReleaseShared}
     */
    public final boolean releaseShared(int arg) {
        if (tryReleaseShared(arg)) {
            doReleaseShared();
            return true;
        }
        return false;
    }

tryReleaseShared

关于tryReleaseShared方法,也比较简单,获取同步状态,判断是否为0,进行减一操作,然后进行CAS交换。

protected boolean tryReleaseShared(int releases) {
            // Decrement count; signal when transition to zero
            for (;;) {
                int c = getState();
                if (c == 0)
                    return false;
                int nextc = c-1;
                if (compareAndSetState(c, nextc))
                    return nextc == 0;
            }
        }

插曲:独占&共享

在前面的文章分析中,我们对 addWaiter做过分析。我们从acquire方法进行引入,然后分析的是独占锁的申请方式。而这里addWaiter(Node.SHARED),表示的创建一个申请共享锁的节点,并进入同步队列。

public final void acquire(int arg) {
        // 如果申请锁不成功, 则放入阻塞队列,这里还是会调用tryAcquire()方法
        // 该方法也是我们自定义实现的
        if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
    }

关于共享和独占的区别之一在于,Node.EXCLUSIVE是一个null节点,而Node.SHARED则是一个实例化过的节点。

【Java计时器】CountDownLatch

它们进入addWaiter之后,会重新构造一个节点,其中mode对应的就是share或者exclusive。从构造方法这里,我们可以看到对于独占锁来说,nextWaiter 总是null的。而共享锁则是一个实例化过的节点。

Node(Thread thread, Node mode) {     // Used by addWaiter
      this.nextWaiter = mode;
      this.thread = thread;
 }

总结

本文分析了CountDownLatch的原理,并通过await方法和countDown方法进行切入点,分析了AQS的共享锁的获取及传播操作。

结束语

文中提到了acquires参数在tryAcquireShared都没有用到为什么还要传进来呢?其实我觉的是 Doug Lea的设计习惯,因为这个参数对于CountDownLatch虽然没有用,但不代表对重入锁没有用,而AQS作为它们两个的父类(说的不准确,应该是它们内部类Syn的父类),也可以对tryAcquireShared方法进行重载,但专门为CountDownLatch类重载一个方法又没太多必要,所以干脆就这么用了吧。

protected int tryAcquireShared(int acquires) {
            return (getState() == 0) ? 1 : -1;
  }