likes
comments
collection
share

并发编程-常见并发工具CountDownLatch、Semaphore、Atomic

作者站长头像
站长
· 阅读数 14

线程的基础和使用 Synchronized原理分析 并发编程-探索可见性背后的本质以及vloatile原理 并发编程-死锁/ThreadLocal 并发编程-ReentrantLook底层设计 并发编程-Condition底层设计 并发编程-常见并发工具BlockingQueue的使用及原理解析 发编程-深入分析ConcurrentHashMap原理

CountDownLatch

CountDownLatch 是一个同步工具类,可以用于在多个线程之间依次执行某些操作。它提供了一种控制多个线程执行顺序的方式,并且可以在所有线程都完成特定操作后继续执行下一个操作

常用方法

  1. await():用于等待所有线程执行完毕。在使用 countDown() 方法减少等待线程数后,剩余的线程将会在await()方法调用后开始执行。如果在等待过程中发生异常,则会抛出 CountDownLatch.await() 方法中的异常。
  2. countDown():用于减少等待线程数。当 countDown() 方法被调用时,剩余的线程将会开始执行,而不用等待所有线程执行完毕。
  3. await(long time, TimeUnit unit):用于等待指定时间。当指定的时间到达时,该方法将自动抛出异常。

使用示例

public class CountDownLatchDemo {
    public static void main(String[] args) {
        CountDownLatch latch = new CountDownLatch(2);
        Thread t1 = new Thread(() -> {
            System.out.println("Thread-1 starts");
            latch.countDown();
        });
        Thread t3 = new Thread(() -> {
            System.out.println("Thread-3 starts");
            latch.countDown();
        });
        Thread t2 = new Thread(() -> {
            System.out.println("Thread-2 starts");
            try {
                latch.await();
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
            System.out.println("Thread-2 completes");
        });
        t1.start();
        t2.start();
        t3.start();

    }
}

执行结果: 并发编程-常见并发工具CountDownLatch、Semaphore、Atomic

源码分析

实例化源码

public CountDownLatch(int count) {
    if (count < 0) throw new IllegalArgumentException("count < 0");
    this.sync = new Sync(count);
}

并发编程-常见并发工具CountDownLatch、Semaphore、Atomic 发现底层实现是和 ReentrantLock都是基于AQS实现的,而对应的 count就是 AQS中的 state

await源码分析

public void await() throws InterruptedException {
    sync.acquireSharedInterruptibly(1);
}
public final void acquireSharedInterruptibly(int arg)
        throws InterruptedException {
    if (Thread.interrupted())
        throw new InterruptedException();
    if (tryAcquireShared(arg) < 0)
        doAcquireSharedInterruptibly(arg);
}
protected int tryAcquireShared(int acquires) {
    return (getState() == 0) ? 1 : -1;
}

这个方法就是判断当前的 state的值如果是等于0的其实就不需要操作什么直接让线程继续往下走就行了,否则这里会返回-1, 就会走到 doAcquireSharedInterruptibly这个方法中

private void doAcquireSharedInterruptibly(int arg)
    throws InterruptedException {
    final Node node = addWaiter(Node.SHARED);
    boolean failed = true;
    try {
        for (;;) {
            final Node p = node.predecessor();
            if (p == head) {
                int r = tryAcquireShared(arg);
                if (r >= 0) {
                    setHeadAndPropagate(node, r);
                    p.next = null; // help GC
                    failed = false;
                    return;
                }
            }
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                throw new InterruptedException();
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}

addWaiter源码这里就不细说了(想知道的话可以看 ReentrantLock的那篇文章),主要作用就是新建一个node添加到AQS链表中但是此时 nodenextWaiter赋值了Node.SHARED 继续往下看

boolean failed = true;
try {
    for (;;) {
        //获取node的上一个节点
        final Node p = node.predecessor();
        if (p == head) {
        //如果此时的node就在队列的第一个,那就继续尝试获取锁如果获取了就将当前的结点冲aqs队列中移除
            int r = tryAcquireShared(arg);
            if (r >= 0) {
                setHeadAndPropagate(node, r);
                p.next = null; // help GC
                failed = false;
                return;
            }
        }
        //这里就开始给线程加锁了
        if (shouldParkAfterFailedAcquire(p, node) &&
            parkAndCheckInterrupt())
            throw new InterruptedException();
    }
} finally {
    if (failed)
        cancelAcquire(node);
}

countDown源码分析

public void countDown() {
    sync.releaseShared(1);
}
public final boolean releaseShared(int arg) {
    if (tryReleaseShared(arg)) {
        doReleaseShared();
        return true;
    }
    return false;
}

再看 tryReleaseShared源码

protected boolean tryReleaseShared(int releases) {
    // Decrement count; signal when transition to zero
    for (;;) {
        int c = getState();
        if (c == 0)
            return false;
        int nextc = c-1;
        if (compareAndSetState(c, nextc))
            return nextc == 0;
    }
}

其实就是通过 CASstate值减一,减完之后的state如果是0则会返回true执行doReleaseShared方法

private void doReleaseShared() {
    for (;;) {
        Node h = head;
        if (h != null && h != tail) {
            int ws = h.waitStatus;
            if (ws == Node.SIGNAL) {
                if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
                    continue;            // loop to recheck cases
                unparkSuccessor(h);
            }
            else if (ws == 0 &&
                     !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                continue;                // loop on failed CAS
        }
        if (h == head)                   // loop if head changed
            break;
    }
}

这里需要注意的是上文中node的初始是在shouldParkAfterFailedAcquire方法中设置成了 Node.SIGNAL,所以这个方法中都会走到unparkSuccessor也就是唤醒所有线程

Semaphore

信号量(Semaphore)是用于控制多个线程对共享资源的访问的同步对象。它们提供了对共享资源的互斥访问,即只有一个线程可以在同一时间访问共享资源,但是允许多个线程在资源释放后继续执行。 Java 中信号量的实现是在 java.util.concurrent 包中,提供了一个 Semaphore 类的实现。Semaphore 类有两个主要的方法:acquire() 和 release(),用于获取和释放信号量。

可用作限流使用

主要方法

  • Acquire() 方法用于获取信号量,如果信号量的值为 true(即当前没有线程持有信号量),则线程可以获取信号量并执行,同时信号量的值会被设置为 false(即当前有线程持有信号量)。如果信号量的值为 false,则线程会被阻塞,直到有其他线程释放信号量或者当前线程主动释放信号量。
  • Release() 方法用于释放信号量,如果信号量的值为 true,则当前持有信号量的线程会被唤醒,并且信号量的值会被设置为 false。如果信号量的值为 false,则该方法不起作用。
  • acquire(int permits) 用于获取指定数量的信号量
  • release(int permits) 用于释放指定数量的信号量
  • acquireAndDecrement(int permits)用于先获取信号量,再减少一个信号量的值等。这些方法可以根据需要进行调用。

使用示例

package org.example;

import java.util.concurrent.Semaphore;

public class SemaphoreExample {
    private Semaphore semaphore = new Semaphore(6); // 创建信号量对象,初始值为 1

    public void httpGet() throws InterruptedException {
        semaphore.acquire();
        System.out.println("执行用户请求");
        Thread.sleep(5000);
        semaphore.release();
    }

    public static void main(String[] args) {
        SemaphoreExample semaphoreExample = new SemaphoreExample();
        for (int i = 0; i < 20; i++) {
            new Thread(() -> {
                try {
                    semaphoreExample.httpGet();
                } catch (InterruptedException e) {
                    throw new RuntimeException(e);
                }
            }).start();
        }
    }
}

源码分析

构造方法源码

public Semaphore(int permits) {
    sync = new NonfairSync(permits);
}
static final class NonfairSync extends Sync {
    private static final long serialVersionUID = -2694183684443567898L;

    NonfairSync(int permits) {
        super(permits);
    }

    protected int tryAcquireShared(int acquires) {
        return nonfairTryAcquireShared(acquires);
    }
}

并发编程-常见并发工具CountDownLatch、Semaphore、Atomic 发现一样是基于AQS实现的,其中构造传递信号许可的数量也就是AQS中的state

acquire方法源码

public void acquire() throws InterruptedException {
    sync.acquireSharedInterruptibly(1);
}
public final void acquireSharedInterruptibly(int arg)
        throws InterruptedException {
    if (Thread.interrupted())
    // 有中断就抛出中断异常
        throw new InterruptedException();
    // 尝试获取共享资源,失败返回小于0的数,之后调用doAcquireSharedInterruptibly(arg)方法
    if (tryAcquireShared(arg) < 0)
        doAcquireSharedInterruptibly(arg);
}
protected int tryAcquireShared(int acquires) {
    for (;;) {
        // 判断阻塞队列是否有等待者线程,有的话直接返回-1结束方法
        if (hasQueuedPredecessors())
            return -1;
        int available = getState();
        int remaining = available - acquires;
        if (remaining < 0 ||
            compareAndSetState(available, remaining))
            return remaining;
    }
}
public final boolean hasQueuedPredecessors() {
    // The correctness of this depends on head being initialized
    // before tail and on head.next being accurate if the current
    // thread is first in queue.
    Node t = tail; // Read fields in reverse initialization order
    Node h = head;
    Node s;
    return h != t &&
        ((s = h.next) == null || s.thread != Thread.currentThread());
}

这里可以理一下这个判断 h != t 说明链表不是空链表 ((s = h.next) == null 说明AQS链表的第一个结点不是自己 s.thread != Thread.currentThread()指的是AQS链表第一个结点不是自己,所以hasQueuedPredecessors方法返回true的话说明此时有线程正在等待被唤醒 如果如果有的话直接返回-1 否则则尝试修改state值

int available = getState();
int remaining = available - acquires;
//如果remaining小于0直接返回,否则修改state值为remaining最后返回remaining
if (remaining < 0 ||
    compareAndSetState(available, remaining))
    return remaining;
    

回到前面看acquireSharedInterruptibly源码,返回小于0的树说明许可证发完了,所以此时的线程需要阻塞等到有许可证的时候载尝试申请,所以-1会走到doAcquireSharedInterruptibly这个方法

private void doAcquireSharedInterruptibly(int arg)
    throws InterruptedException {
    final Node node = addWaiter(Node.SHARED);
    boolean failed = true;
    try {
        for (;;) {
            final Node p = node.predecessor();
            if (p == head) {
                int r = tryAcquireShared(arg);
                if (r >= 0) {
                    setHeadAndPropagate(node, r);
                    p.next = null; // help GC
                    failed = false;
                    return;
                }
            }
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                throw new InterruptedException();
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}

这个方法就比较简单了前面的文章也讲了很多遍了,就是将当前线程插入到AQS链表中然后阻塞当前线程(当然中间会先尝试着获取共享锁)

acquire方法源码总结

就是进来一个线程将AQS的state值减去对应的数字,如果减去后的数字是-1,则说明此时信号量的许可证发完了需要等到有许可证的时候,所以此时就会将当前的线程阻塞并且加到AQS等待队列中,如果减去后的数字>0说明此时还有许可证则修改state值然后继续往下执行。流程图如下: 并发编程-常见并发工具CountDownLatch、Semaphore、Atomic

release方法源码

public void release() {
    sync.releaseShared(1);
}
public final boolean releaseShared(int arg) {
    //state加一,正常情况下都会返回true
    if (tryReleaseShared(arg)) {
        doReleaseShared();
        return true;
    }
    return false;
}
protected final boolean tryReleaseShared(int releases) {
    for (;;) {
        int current = getState();
        int next = current + releases;
        if (next < current) // overflow
            throw new Error("Maximum permit count exceeded");
        if (compareAndSetState(current, next))
            return true;
    }
}

tryReleaseShared就是设置state = state + arg,正常情况下是会返回true的否则直接抛出异常,所以继续看doReleaseShared源码:

private void doReleaseShared() {
    for (;;) {
        Node h = head;
        if (h != null && h != tail) {
        //这里就表示等待链表不为null
            int ws = h.waitStatus;
            // 当前节点的状态是唤醒后继节点
            if (ws == Node.SIGNAL) {
                // 更新头结点状态为0(默认状态),更新失败自旋,更新成功唤醒后继结点
                if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
                    continue;            // loop to recheck cases
                // 唤醒head的后继节点
                unparkSuccessor(h);
            }
            else if (ws == 0 &&
                     !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                continue;                // loop on failed CAS
        }
        if (h == head)                   // loop if head changed
            break;
    }
}

其实就是将链表的第一个节点移除等待队列,并唤醒线程,然后重新抢占锁

Atomic原子操作

Java中的Atomic包提供了原子类型的变量,这些变量可以在多线程环境下被安全地使用。Atomic包中提供了AtomicBoolean、AtomicInteger、AtomicLong和AtomicIntegerArray类。这些类可以用于保护某些共享资源,以确保多个线程对它们进行读写操作时正确的顺序和数值。

先看个例子:

package org.example;
import java.util.concurrent.atomic.AtomicInteger;

public class AtomicDemo {
    private AtomicInteger count = new AtomicInteger(0);

    public void increment() {
//        count.addAndGet(2);
        count.incrementAndGet();
    }

    public int getCount() {
        return count.get();
    }

    public static void main(String[] args) throws InterruptedException {
        AtomicDemo demo = new AtomicDemo();

        Thread thread1 = new Thread(() -> {
            for (int i = 0; i < 10000; i++) {
                demo.increment();
            }
        });

        Thread thread = new Thread(() -> {
            for (int i = 0; i < 10000; i++) {
                demo.increment();
            }
        });
        thread.start();
        thread1.start();
        thread.join();
        thread1.join();
        System.out.println("Main thread value: " + demo.getCount());
    }
}

输出结果是20000 如果不使用原子操作的话结果就不一定是20000了这个原因就不用多说了。我们就拿AtomicInteger源码来看

构造方法源码:

并发编程-常见并发工具CountDownLatch、Semaphore、Atomic 可以发现底层是有一个value值的,这个值就是我们传入的数字,并且使用volatile修饰的保证了这个值的可见性

incrementAndGet源码

public final int incrementAndGet() {
    return unsafe.getAndAddInt(this, valueOffset, 1) + 1;
}

并发编程-常见并发工具CountDownLatch、Semaphore、Atomic 发现修改这个值是通过CAS来修改的也就保证了这个值的线程安全性,然后有用了循环来做,也就是如果修改失败会重新获取值然后继续CAS加一

注:CAS用的乐观锁的机制保证了线程安全性,底层会对操作的数据存一个版本号修改的时候会对比版本号和传入的版本号是否一致不一致说明被改过了所以直接返回false修改不成功,如果一致说明还没有 被改过所以直接修改值并返回成功