likes
comments
collection
share

使用AQS实现LimitLatch:掌握高效灵活的同步框架

作者站长头像
站长
· 阅读数 14

AQS(AbstractQueuedSynchronizer)是 Java 中用于实现同步器的基础框架,通过它可以很方便地实现自定义的同步组件。在并发编程中,同步控制是十分关键的,AQS 提供了一种高效灵活的方式来实现各种同步组件。

AQS 的好处包括:

  1. 灵活性:AQS 是一个通用的同步框架,它提供了 acquire 和 release 两个基本操作,可以被子类进行扩展和重写。这使得 AQS 可以用于实现各种同步组件,如 CountDownLatch、Semaphore、ReentrantLock 等。
  2. 高效性:与 synchronized 相比,AQS 实现了更加底层的同步机制,使用了一些高效的技术,例如 CAS 操作和 volatile 关键字等。这些技术可以让 AQS 在高并发环境下获得更好的性能。
  3. 可扩展性:AQS 具有较强的可扩展性。通过继承 AQS 并实现其抽象方法,我们可以方便地实现各种同步组件。同时,AQS 还提供了 Condition 对象,支持复杂的线程协作机制。

总之,AQS 是一个高效、灵活、可扩展的同步框架,可以为我们提供丰富的同步组件,帮助我们实现高效的并发编程。

下面是一个实践demo,来源Tomcat 的 NioEndpoint 组件里的LimitLatch。

LimitLatch工具类

        import java.util.concurrent.locks.AbstractQueuedSynchronizer;
        import java.util.concurrent.atomic.AtomicLong;

        public class LimitLatch {
            // 定义 Sync 类作为 AQS 的实现
            private class Sync extends AbstractQueuedSynchronizer {
                @Override
                protected int tryAcquireShared(int arg) {
                    // 获取计数并增加 1
                    long newCount = count.incrementAndGet();
                    // 如果增加后的计数大于限制,则减少计数并返回 -1 表示获取失败
                    if (newCount > limit) {
                        count.decrementAndGet();
                        return -1;
                    } else { // 计数小于或等于限制,则返回 1 表示获取成功
                        return 1;
                    }
                }

                @Override
                protected boolean tryReleaseShared(int arg) {
                    // 减少计数并返回 true 表示释放成功
                    count.decrementAndGet();
                    return true;
                }
            }

            private final Sync sync; // 使用 AQS 实现同步控制逻辑
            private final AtomicLong count; // 维护当前的连接数
            private volatile long limit; // 连接数的上限

            public LimitLatch() {
                count = new AtomicLong(0);
                sync = new Sync();
            }

            public void setLimit(long limit) {
                this.limit = limit;
            }

            public void countUpOrAwait() throws InterruptedException {
                // 获取共享锁,如果当前的计数已经超过了限制,则调用线程将会被阻塞,直到有一个许可证可用。
                sync.acquireSharedInterruptibly(1);
            }

            public long countDown() {
                // 释放共享锁,并返回当前计数
                sync.releaseShared(0);
                return getCount();
            }

            public long getCount() {
                // 获取当前的计数
                return count.get();
            }
        }


这段代码实现了一个 LimitLatch 类,是一个简单的用于控制连接数的工具类。

在这个类中,我们定义了 Sync 内部类来实现同步控制逻辑,它继承自 AbstractQueuedSynchronizer 类。tryAcquireShared(int arg) 方法尝试获取共享锁,如果增加后的计数大于限制,则减少计数并返回 -1 表示获取失败;否则,增加计数并返回 1 表示获取成功。tryReleaseShared(int arg) 方法释放共享锁,并减少计数。 countUpOrAwait() 方法调用 acquireSharedInterruptibly(1) 获取共享锁。如果当前的计数已经超过了限制,则调用线程将会被阻塞,直到有一个许可证可用。countDown() 方法调用 releaseShared(0) 释放共享锁,并返回当前计数。getCount() 方法返回当前计数。

注意,我们使用了 AtomicLong 来维护当前的计数,以确保并发线程访问的正确性。 limit 是一个volatile变量,用于确保多个线程之间对其值的可见性。

该类可以用于限制同时运行的任务数量等场景,例如一个数据库连接池中同时可以打开的连接数量。

运行代码

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

public class LimitLatchDemo {

    public static void main(String[] args) throws InterruptedException {
        // 创建 LimitLatch 实例
        LimitLatch limitLatch = new LimitLatch();
        // 设置连接许可证的上限为 3
        limitLatch.setLimit(3);
        // 创建线程池,最多同时运行 5 个线程
        ExecutorService executorService = Executors.newFixedThreadPool(5);

        // 向线程池提交任务
        for (int i = 0; i < 10; i++) {
            executorService.submit(() -> {
                try {
                    // 获取一个连接许可证,如果当前的计数已经超过了限制,则调用线程将会被阻塞,直到有一个许可证可用。
                    limitLatch.countUpOrAwait();
                    // 打印线程名称和当前活动连接数
                    System.out.println(Thread.currentThread().getName() + " entered. Active connections: " + limitLatch.getCount());
                    Thread.sleep(1000); // 睡眠1秒钟模拟线程正在处理工作
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } finally {
                    // 释放一个连接许可证并返回当前计数
                    long count = limitLatch.countDown();
                    // 打印线程名称和当前活动连接数
                    System.out.println(Thread.currentThread().getName() + " exited. Active connections: " + count);
                }
            });
        }

        // 关闭线程池并等待所有任务完成
        executorService.shutdown();
        executorService.awaitTermination(1, TimeUnit.MINUTES);
    }
}

这是一个使用 LimitLatch 实现的简单示例。 在 main() 方法中,我们创建了一个 LimitLatch 实例,并将连接许可证的上限设置为 3。 然后,我们创建了一个线程池,最多同时运行 5 个线程。 我们向线程池提交了10个任务。每个任务都会尝试获取一个连接许可证并打印当前活动连接数。 执行任务时,如果当前的计数已经超过了限制,则调用线程将会被阻塞,直到有一个许可证可用。在任务完成后,它将释放连接许可证并打印当前活动连接数。

这个例子模拟了一个简单的数据库连接池,其中只允许同时存在 3 个活动连接。

使用AQS实现LimitLatch:掌握高效灵活的同步框架

一般来说,当我们需要实现自定义的同步组件时,比如 CountDownLatch、Semaphore、ReentrantLock 等,或者需要实现复杂的线程协作机制时,AQS 就是一个非常好的选择。由于其高效灵活的特性,它可以帮助我们实现各种高效的并发工具,并应对各种复杂的并发场景。