likes
comments
collection
share

源码剖析之CountDownLatch内部是实现原理

作者站长头像
站长
· 阅读数 50

简介

CountDownLatch是Java并发编程中的一个 同步辅助工具 允许一个或多个线程等待在其他线程中执行的一组操作完成。。 用来协调不同线程程之间的任务同步。一般用于将一个复杂任务按照不同的执行顺序拆分成多个相互独立的子任务执行,其中某个线程需要等待其他线程执行完成后才能执行。

源码剖析之CountDownLatch内部是实现原理

使用步骤

1 .定义两个CountDownLatch对象,分别作用于不同的主线程和子线程。

源码剖析之CountDownLatch内部是实现原理

2.创建多个子线程任务并执行。

源码剖析之CountDownLatch内部是实现原理

3.分别将两个CountDownLatch对象的实例传入子线程任务中。

源码剖析之CountDownLatch内部是实现原理

4.主线程中先后调用主线程任务执行。

源码剖析之CountDownLatch内部是实现原理

源码剖析之CountDownLatch内部是实现原理

5.根据任务执行,打印结果。

源码剖析之CountDownLatch内部是实现原理

6.执行过程详解

先开启子线程执行任务,但是在子线程Work的run方法中startSignal.await();挂起等待。主线程执行完第一个doSomethingElse()任务,然后调用startSignal.countDown(),释放同步锁,此时子线程得到了执行的机会。主线程调用doneSignal.await()方法阻塞当前线程,直到所有子线程执行完成。调用downSignal.countDown(),将同步锁计数减到0,主线程恢复执行。执行最后一个doSomethingElse()任务。


源码分析

实现原理

CountDownLatch内部定义了一个静态类Sync继承于队列同步器AbstractQueuedSynchronizer(简称AQS),AQS是用来构建同步锁的基本框架,它内部使用一个Int类型的state表示同步状态、,通过内置的FIFO队列来完成资源获取的线程的排队工作。

源码剖析之CountDownLatch内部是实现原理

设计模式

AQS的设计基于模板方法的设计模式。模板方法是一种行为设计模式,在父类中定义了一些算法框架,子类在实现时不能修改算法结构,只能重写特定的步骤。

核心方法1-countDown方法

    public void countDown() {
        sync.releaseShared(1);
    }

作用:减少锁存器的计数,如果计数为0,则释放所有等待的线程,如果当前计数大于0,则依次递减。

CountDown内部调用sync的releaseShared方法进行锁的释放。

  public final boolean releaseShared(int arg) {
        if (tryReleaseShared(arg)) {
            doReleaseShared();
            return true;
        }
        return false;
    }

sync.releaseShared方法其实是AQS的方法,在AQS的releaseShared方法中,tryReleaseShared作为一个抽象方法有子类具体实现,然后调用 doReleaseShared()。

由于Sync继承与AQS,所有内部实现了tryReleaseShared方法。

protected boolean tryReleaseShared(int releases) {  
    //通过循环遍历,释放锁资源
    for (;;) {
        int c = getState();
         if (c == 0)
            return false;
         int nextc = c-1;
        if (compareAndSetState(c, nextc))
         return nextc == 0;
            }
        }

Sync内部通过循环遍历,对锁的同步状态进行比较。

doReleaseShared()的具体实现在AQS中。

 private void doReleaseShared() {
       
         //循环
        for (;;) {
            Node h = head;
            if (h != null && h != tail) { //如果队链表不为null
                int ws = h.waitStatus;//获取节点的等待状态
                if (ws == Node.SIGNAL) {
                    if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
                        continue;            //CAS操作失败,继续
                    unparkSuccessor(h);
                }
                else if (ws == 0 &&
                         !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                    continue;                //当前状态为0,且CAS失败,则继续
            }
            if (h == head)                   //如果只剩头节点,则跳出循环
                break;
        }
    }

总结await的执行过程如下:

源码剖析之CountDownLatch内部是实现原理

核心方法2-await方法

public void await() throws InterruptedException {
    sync.acquireSharedInterruptibly(1);
}

await方法调用sync的acquireSharedInterruptibly(1)。acquireSharedInterruptibly方法在AQS内部。

public final void acquireSharedInterruptibly(int arg)
            throws InterruptedException {
        //如果线程中断
        if (Thread.interrupted())
            //抛出异常
            throw new InterruptedException();
        if (tryAcquireShared(arg) < 0)//状态为-1,
            //进入等待队列
            doAcquireSharedInterruptibly(arg);
    }

当前线程如果没有中断,则通过tryAcquireShared判断状态。

protected int tryAcquireShared(int acquires) {
    return (getState() == 0) ? 1 : -1; 
}

当前线程未获取到锁资源,进入等待队列,执行doAcquireSharedInterruptibly方法。

 private boolean doAcquireSharedNanos(int arg, long nanosTimeout)
            throws InterruptedException {
        if (nanosTimeout <= 0L)
            return false;
        final long deadline = System.nanoTime() + nanosTimeout;
        final Node node = addWaiter(Node.SHARED);
        boolean failed = true;
        try {
            for (;;) {
                final Node p = node.predecessor();
                if (p == head) { //如果是头节点
                    int r = tryAcquireShared(arg); //获取状态
                    if (r >= 0) {
                        setHeadAndPropagate(node, r); //设置头节点状态
                        p.next = null; // help GC
                        failed = false;
                        return true;
                    }
                }
                nanosTimeout = deadline - System.nanoTime();
                if (nanosTimeout <= 0L)
                    return false;
                if (shouldParkAfterFailedAcquire(p, node) &&
                    nanosTimeout > spinForTimeoutThreshold)
                    LockSupport.parkNanos(this, nanosTimeout);
                if (Thread.interrupted()) //获取失败,中断线程
                    throw new InterruptedException();
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }

在AQS中调用 setHeadAndPropagate(node, r),设置头节点状态。

 private void setHeadAndPropagate(Node node, int propagate) {
        Node h = head; 
        setHead(node);//设置头节点
    
        if (propagate > 0 || h == null || h.waitStatus < 0 ||
            (h = head) == null || h.waitStatus < 0) {
            Node s = node.next;//获取下一个节点
            if (s == null || s.isShared()) //如果后一个节点为null,或为共享状态,则释放
                doReleaseShared();
        }
    }

如果当前只有一个头节点,则执行AQS中的doReleaseShared

 private void doReleaseShared() {
       
         //循环
        for (;;) {
            Node h = head;
            if (h != null && h != tail) { //如果队链表不为null
                int ws = h.waitStatus;//获取节点的等待状态
                if (ws == Node.SIGNAL) {
                    if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
                        continue;            //CAS操作失败,继续
                    unparkSuccessor(h);
                }
                else if (ws == 0 &&
                         !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                    continue;                //当前状态为0,且CAS失败,则继续
            }
            if (h == head)                   //如果只剩头节点,则跳出循环
                break;
        }
    }

总结await的执行过程如下:

源码剖析之CountDownLatch内部是实现原理

总结

转载自:https://juejin.cn/post/7212536164056678456
评论
请登录