使用AQS实现LimitLatch:掌握高效灵活的同步框架
AQS(AbstractQueuedSynchronizer)是 Java 中用于实现同步器的基础框架,通过它可以很方便地实现自定义的同步组件。在并发编程中,同步控制是十分关键的,AQS 提供了一种高效灵活的方式来实现各种同步组件。
AQS 的好处包括:
- 灵活性:AQS 是一个通用的同步框架,它提供了 acquire 和 release 两个基本操作,可以被子类进行扩展和重写。这使得 AQS 可以用于实现各种同步组件,如 CountDownLatch、Semaphore、ReentrantLock 等。
- 高效性:与 synchronized 相比,AQS 实现了更加底层的同步机制,使用了一些高效的技术,例如 CAS 操作和 volatile 关键字等。这些技术可以让 AQS 在高并发环境下获得更好的性能。
- 可扩展性:AQS 具有较强的可扩展性。通过继承 AQS 并实现其抽象方法,我们可以方便地实现各种同步组件。同时,AQS 还提供了 Condition 对象,支持复杂的线程协作机制。
总之,AQS 是一个高效、灵活、可扩展的同步框架,可以为我们提供丰富的同步组件,帮助我们实现高效的并发编程。
下面是一个实践demo,来源Tomcat 的 NioEndpoint 组件里的LimitLatch。
LimitLatch工具类
import java.util.concurrent.locks.AbstractQueuedSynchronizer;
import java.util.concurrent.atomic.AtomicLong;
public class LimitLatch {
// 定义 Sync 类作为 AQS 的实现
private class Sync extends AbstractQueuedSynchronizer {
@Override
protected int tryAcquireShared(int arg) {
// 获取计数并增加 1
long newCount = count.incrementAndGet();
// 如果增加后的计数大于限制,则减少计数并返回 -1 表示获取失败
if (newCount > limit) {
count.decrementAndGet();
return -1;
} else { // 计数小于或等于限制,则返回 1 表示获取成功
return 1;
}
}
@Override
protected boolean tryReleaseShared(int arg) {
// 减少计数并返回 true 表示释放成功
count.decrementAndGet();
return true;
}
}
private final Sync sync; // 使用 AQS 实现同步控制逻辑
private final AtomicLong count; // 维护当前的连接数
private volatile long limit; // 连接数的上限
public LimitLatch() {
count = new AtomicLong(0);
sync = new Sync();
}
public void setLimit(long limit) {
this.limit = limit;
}
public void countUpOrAwait() throws InterruptedException {
// 获取共享锁,如果当前的计数已经超过了限制,则调用线程将会被阻塞,直到有一个许可证可用。
sync.acquireSharedInterruptibly(1);
}
public long countDown() {
// 释放共享锁,并返回当前计数
sync.releaseShared(0);
return getCount();
}
public long getCount() {
// 获取当前的计数
return count.get();
}
}
这段代码实现了一个 LimitLatch
类,是一个简单的用于控制连接数的工具类。
在这个类中,我们定义了 Sync
内部类来实现同步控制逻辑,它继承自 AbstractQueuedSynchronizer
类。tryAcquireShared(int arg)
方法尝试获取共享锁,如果增加后的计数大于限制,则减少计数并返回 -1 表示获取失败;否则,增加计数并返回 1 表示获取成功。tryReleaseShared(int arg)
方法释放共享锁,并减少计数。 countUpOrAwait()
方法调用 acquireSharedInterruptibly(1)
获取共享锁。如果当前的计数已经超过了限制,则调用线程将会被阻塞,直到有一个许可证可用。countDown()
方法调用 releaseShared(0)
释放共享锁,并返回当前计数。getCount()
方法返回当前计数。
注意,我们使用了 AtomicLong
来维护当前的计数,以确保并发线程访问的正确性。 limit
是一个volatile变量,用于确保多个线程之间对其值的可见性。
该类可以用于限制同时运行的任务数量等场景,例如一个数据库连接池中同时可以打开的连接数量。
运行代码
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
public class LimitLatchDemo {
public static void main(String[] args) throws InterruptedException {
// 创建 LimitLatch 实例
LimitLatch limitLatch = new LimitLatch();
// 设置连接许可证的上限为 3
limitLatch.setLimit(3);
// 创建线程池,最多同时运行 5 个线程
ExecutorService executorService = Executors.newFixedThreadPool(5);
// 向线程池提交任务
for (int i = 0; i < 10; i++) {
executorService.submit(() -> {
try {
// 获取一个连接许可证,如果当前的计数已经超过了限制,则调用线程将会被阻塞,直到有一个许可证可用。
limitLatch.countUpOrAwait();
// 打印线程名称和当前活动连接数
System.out.println(Thread.currentThread().getName() + " entered. Active connections: " + limitLatch.getCount());
Thread.sleep(1000); // 睡眠1秒钟模拟线程正在处理工作
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
// 释放一个连接许可证并返回当前计数
long count = limitLatch.countDown();
// 打印线程名称和当前活动连接数
System.out.println(Thread.currentThread().getName() + " exited. Active connections: " + count);
}
});
}
// 关闭线程池并等待所有任务完成
executorService.shutdown();
executorService.awaitTermination(1, TimeUnit.MINUTES);
}
}
这是一个使用 LimitLatch
实现的简单示例。 在 main()
方法中,我们创建了一个 LimitLatch
实例,并将连接许可证的上限设置为 3。 然后,我们创建了一个线程池,最多同时运行 5 个线程。 我们向线程池提交了10个任务。每个任务都会尝试获取一个连接许可证并打印当前活动连接数。 执行任务时,如果当前的计数已经超过了限制,则调用线程将会被阻塞,直到有一个许可证可用。在任务完成后,它将释放连接许可证并打印当前活动连接数。
这个例子模拟了一个简单的数据库连接池,其中只允许同时存在 3 个活动连接。
一般来说,当我们需要实现自定义的同步组件时,比如 CountDownLatch、Semaphore、ReentrantLock 等,或者需要实现复杂的线程协作机制时,AQS 就是一个非常好的选择。由于其高效灵活的特性,它可以帮助我们实现各种高效的并发工具,并应对各种复杂的并发场景。
转载自:https://juejin.cn/post/7242957405330702392