掌握Java并发编程3:同步工具类-Future、Semaphore、CountDownLatch、CyclicBarrier、Exchanger和Phaser
引言
概述
在现代软件开发中,多线程和并发编程是提高应用性能和效率的关键技术之一。Java平台通过java.util.concurrent
包提供了一套强大的并发工具类,它们可以帮助开发者有效地管理和协调线程之间的交互。本文将深入探讨这些工具类的原理、特点和使用场景,包括Future
, Semaphore
, CountDownLatch
, CyclicBarrier
, Exchanger
, 和 Phaser
。
正文
Future
原理
Future
接口在 Java 中代表一个异步计算的结果。当提交一个任务给 ExecutorService
执行时,会得到一个实现了 Future
接口的对象。通过这个 Future
对象,可以检查任务是否已经完成、等待任务完成并获取计算结果,或者取消任务的执行。
使用场景
- 异步执行长时间运行的计算任务,而不阻塞主线程。
- 在多个任务并发执行时,获取每个任务的结果。
- 取消尚未完成的任务。
Semaphore
原理
Semaphore
是一个计数信号量,用于管理一组资源的访问。信号量维护了一组许可(permits),线程可以通过调用 acquire()
方法来获取许可,如果没有可用许可,acquire()
会阻塞直到有许可被释放。线程完成资源的使用后,应该调用 release()
方法来释放许可。
使用场景
- 控制资源的并发访问,例如限制同时访问文件的线程数量。
- 实现生产者-消费者模式,其中许可代表可用的资源。
CountDownLatch
原理
CountDownLatch
是一个同步辅助类,它允许一个或多个线程等待一组事件发生。CountDownLatch
初始化时带有一个计数器,调用 countDown()
方法会使计数器减一,调用 await()
方法的线程会阻塞直到计数器达到零。
使用场景
- 等待服务的一组操作初始化完成后,再继续执行。
- 同时开始执行一组操作,如在并行测试中启动多个线程执行任务。
CyclicBarrier
原理
CyclicBarrier
类似于 CountDownLatch
,但它可以重复使用。CyclicBarrier
维护一个计数器,当一组线程都到达某个屏障点(barrier point)时,计数器会归零,然后所有等待的线程会被释放,并且屏障被重置以便下一次使用。
使用场景
- 同步一组线程的进度,确保它们在继续执行之前都到达了某个公共点。
- 实现多阶段计算,每个阶段都需要等待所有线程完成。
Exchanger
原理
Exchanger
是一个同步点,在这个点上,两个线程可以交换数据。每个线程在到达同步点时提供一些数据给 Exchanger
,并且接收另一个线程提供的数据。
使用场景
- 在两个线程之间交换数据,例如在遗传算法或管道设计中。
Phaser
原理
Phaser
是一个可重用的同步辅助类,它将 CyclicBarrier
和 CountDownLatch
的功能结合起来。它支持动态地注册线程,线程可以分阶段地到达,并且在每个阶段结束时同步。
使用场景
- 同步可变数量的线程执行分阶段的任务。 - 当任务被分为多个步骤执行,并且每个步骤必须等待之前步骤中的所有线程完成后才能开始。
Future
Future
接口代表了一个异步计算的结果。它提供了一种检查计算是否完成、等待计算完成并检索其结果的机制。通过将耗时任务提交给ExecutorService
,可以获得一个Future
对象,并继续执行其他任务,直到需要结果。使用Future.get()
可以获取结果,但这是一个阻塞调用;如果计算尚未完成,调用者将等待。为了避免阻塞,可以使用Future.isDone()
来检查计算是否完成。
package com.dereksmart.crawling.future;
import java.util.concurrent.*;
/**
* @Author derek_smart
* @Date 2024/8/1 8:15
* @Description Future测试类
*/
public class FutureExample {
public static void main(String[] args) throws ExecutionException, InterruptedException {
ExecutorService executor = Executors.newSingleThreadExecutor();
Future<Integer> future = executor.submit(() -> {
TimeUnit.SECONDS.sleep(1); // 模拟长时间运行的任务
return 42; // 返回异步计算的结果
});
// 在这里可以做一些其他的事情,而计算仍在异步进行
System.out.println("异步计算的结果是1: " + future.isDone());
// 等待异步计算的结果
Integer result = future.get(); //这里会阻塞直到任务完成
System.out.println("异步计算的结果是2: " + future.isDone());
System.out.println("异步计算的结果是: " + result);
executor.shutdown();
}
}
Semaphore
Semaphore
是一个计数信号量,常用于控制对资源的并发访问。它维护了一组许可证,线程在访问资源之前必须先获得许可证,使用完资源后释放许可证。如果请求许可证时没有可用的许可证,则线程阻塞直到有许可证被释放。这是一种实现生产者-消费者模式的有效方式。
package com.dereksmart.crawling.future;
import java.util.concurrent.*;
/**
* @Author derek_smart
* @Date 2024/8/1 8:26
* @Description Semaphore测试类
*/
public class SemaphoreExample {
private static final int MAX_PERMITS = 3;
private final Semaphore semaphore = new Semaphore(MAX_PERMITS);
public void accessResource(int threadId) {
try {
semaphore.acquire(); // 获取许可
System.out.println("线程 " + threadId + " 访问资源.");
TimeUnit.SECONDS.sleep(2); // 模拟资源访问
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} finally {
semaphore.release(); // 释放许可
System.out.println("线程 " + threadId + " 释放资源.");
}
}
public static void main(String[] args) {
SemaphoreExample example = new SemaphoreExample();
ExecutorService executor = Executors.newCachedThreadPool();
for (int i = 1; i <= 3; i++) {
final int threadId = i;
executor.execute(() -> example.accessResource(threadId));
}
executor.shutdown();
}
}
CountDownLatch
CountDownLatch
是一种同步辅助工具,它允许一个或多个线程等待一组事件发生。它通过一个计数器来实现,该计数器初始化时设置为需要等待的事件数。当一个事件发生时,计数器减一。调用CountDownLatch.await()
的线程会阻塞,直到计数器变为零。
package com.dereksmart.crawling.future;
import java.util.concurrent.*;
/**
* @Author derek_smart
* @Date 2024/8/1 8:31
* @Description CountDownLatch测试类
*/
public class CountDownLatchExample {
private static final int NUMBER_OF_THREADS = 5;
public static void main(String[] args) throws InterruptedException {
CountDownLatch latch = new CountDownLatch(NUMBER_OF_THREADS);
ExecutorService executor = Executors.newFixedThreadPool(NUMBER_OF_THREADS);
for (int i = 0; i < NUMBER_OF_THREADS; i++) {
executor.submit(() -> {
System.out.println("线程 " + Thread.currentThread().getId() + " 执行任务.");
try {
Long s = (long) (Math.random()*1000);
Thread.sleep(s);
System.out.println("线程 " + Thread.currentThread().getId() + " 执行任务."+s+"s");
} catch (InterruptedException e) {
e.printStackTrace();
}
latch.countDown(); // 通知 CountDownLatch 一个线程已经完成了任务
});
}
latch.await(); // 等待直到所有线程完成任务
System.out.println("所有线程已完成任务.");
executor.shutdown();
}
}
CyclicBarrier
CyclicBarrier
与CountDownLatch
类似,但它可以重置并重复使用。CyclicBarrier
被一组固定数量的线程使用,这些线程必须等待彼此到达一个共同的屏障点。当所有线程都到达屏障时,可以选择运行一个屏障动作。CyclicBarrier
适用于分步骤执行的并行任务。
package com.dereksmart.crawling.future;
import java.util.concurrent.*;
/**
* @Author derek_smart
* @Date 2024/8/1 8:35
* @Description CyclicBarrier测试类
*/
public class CyclicBarrierExample {
private static final int NUMBER_OF_THREADS = 5;
public static void main(String[] args) {
CyclicBarrier barrier = new CyclicBarrier(NUMBER_OF_THREADS, () -> {
// 这个runnable会在所有线程都到达屏障后执行
System.out.println("所有线程都到达了屏障点.");
});
ExecutorService executor = Executors.newFixedThreadPool(NUMBER_OF_THREADS);
for (int i = 0; i < NUMBER_OF_THREADS; i++) {
executor.submit(() -> {
System.out.println("线程 " + Thread.currentThread().getId() + " 正在工作.");
try {
TimeUnit.SECONDS.sleep(2); // 模拟工作
barrier.await(); // 等待其他线程
System.out.println("线程 " + Thread.currentThread().getId() + " 通过了屏障.");
} catch (InterruptedException | BrokenBarrierException e) {
Thread.currentThread().interrupt();
}
});
}
executor.shutdown();
}
}
Exchanger
Exchanger
允许两个线程在一个同步点交换数据。每个线程在到达同步点时提供一些数据,并接收另一个线程提供的数据。这是一个双向的同步交换,常用于遗传算法、管道设计等需要线程间协作的场景。
package com.dereksmart.crawling.future;
import java.util.concurrent.*;
/**
* @Author derek_smart
* @Date 2024/8/1 8:45
* @Description Exchanger测试类
*/
public class ExchangerExample {
private static final Exchanger<String> exchanger = new Exchanger<>();
public static void main(String[] args) {
ExecutorService executor = Executors.newFixedThreadPool(2);
executor.execute(() -> {
try {
String message = "Hello from Thread A";
String response = exchanger.exchange(message); // 交换数据
System.out.println("Thread A received: " + response);
} catch (InterruptedException e) {
Thread.currentThread().interrupt(); }
});
executor.execute(() -> {
try {
String message = "Hello from Thread B";
String response = exchanger.exchange(message); // 交换数据
System.out.println("Thread B received: " + response);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
});
executor.execute(() -> {
try {
String message = "Hello from Thread C";
String response = exchanger.exchange(message); // 交换数据
System.out.println("Thread C received: " + response);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
});
executor.execute(() -> {
try {
String message = "Hello from Thread D";
String response = exchanger.exchange(message); // 交换数据
System.out.println("Thread D received: " + response);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
});
executor.shutdown();
}
}
Phaser
Phaser
是一个可重用的同步辅助工具,它结合了CyclicBarrier
和CountDownLatch
的功能。它可以处理动态数量的线程参与多阶段任务的同步。每个阶段结束时,Phaser
等待预定数量的线程到达,然后前进到下一个阶段。
package com.dereksmart.crawling.future;
import java.util.concurrent.*;
/**
* @Author derek_smart
* @Date 2024/8/1 8:51
* @Description Phaser测试类
*/
public class PhaserExample {
private static final int NUMBER_OF_PARTIES = 3;
private static final int PHASES_TO_COMPLETE = 3;
public static void main(String[] args) {
final Phaser phaser = new Phaser(NUMBER_OF_PARTIES) {
@Override
protected boolean onAdvance(int phase, int registeredParties) {
System.out.println("阶段 " + phase + " 完成.");
return phase >= PHASES_TO_COMPLETE - 1 || registeredParties == 0;
}
};
ExecutorService executor = Executors.newFixedThreadPool(NUMBER_OF_PARTIES);
for (int i = 0; i < NUMBER_OF_PARTIES; i++) {
final int threadId = i;
executor.execute(() -> {
for (int phase = 0; phase < PHASES_TO_COMPLETE; phase++) {
System.out.println("线程 " + threadId + " 正在执行阶段 " + phase);
phaser.arriveAndAwaitAdvance(); // 等待其他线程完成当前阶段
}
});
}
executor.shutdown();
}
}
这些并发工具类为开发者提供了强大的机制来管理复杂的多线程场景。正确使用这些工具可以大大提高应用程序的并发性能和响应能力,同时保证数据的一致性和线程的安全。然而,多线程编程仍然充满挑战,开发者需要仔细设计他们的程序以避免死锁、竞态条件和其他并发相关问题。
实战演练
考虑一个真实的使用场景,比如一个并行数据处理系统,它需要执行以下步骤:
- 从不同的数据源并行加载数据。
- 当所有数据加载完毕后,执行一些数据预处理。
- 分批处理数据,并在每批处理完成后进行一些清理工作。
- 每批数据处理完毕后,交换处理结果以便进行下一步的聚合处理。
- 所有数据处理完成后,执行最终的数据聚合和清理工作。
在这个场景中,我们可以使用CountDownLatch
来同步数据加载,CyclicBarrier
来同步批处理,Exchanger
来交换数据,Phaser
来管理多阶段任务,以及Semaphore
来限制同时进行的处理任务数量。以下是对应的代码实现:
代码实现
package com.dereksmart.crawling.future;
import java.util.concurrent.*;
/**
* @Author derek_smart
* @Date 2024/8/1 8:35
* @Description 并行数据处理系统
*/
public class DataProcessingSystem {
private static final int NUMBER_OF_SOURCES = 3;
private static final int NUMBER_OF_BATCHES = 5;
private final CountDownLatch dataLoadedLatch = new CountDownLatch(NUMBER_OF_SOURCES);
private final CyclicBarrier batchProcessingBarrier = new CyclicBarrier(NUMBER_OF_SOURCES);
private final Exchanger<String>[] exchangers = new Exchanger[NUMBER_OF_SOURCES / 2]; // For an even number of sources
private final Semaphore processingSemaphore = new Semaphore(2); // Limit concurrent processing
public DataProcessingSystem() {
for (int i = 0; i < exchangers.length; i++) {
exchangers[i] = new Exchanger<>();
}
}
public void loadData() throws InterruptedException ,Exception{
ExecutorService executorService = Executors.newFixedThreadPool(NUMBER_OF_SOURCES);
for (int i = 0; i < NUMBER_OF_SOURCES; i++) {
final int sourceId = i;
executorService.submit(() -> {
System.out.println("Loading data from source " + sourceId);
// Simulate data loading
try {
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
e.printStackTrace();
}
dataLoadedLatch.countDown();
});
}
dataLoadedLatch.await(); // Wait for all data to be loaded
System.out.println("Data loaded from all sources. Starting processing...");
executorService.shutdown(); // Shutdown executor after all tasks are submitted
}
public void processData() throws InterruptedException, BrokenBarrierException ,Exception{
ExecutorService executorService = Executors.newFixedThreadPool(NUMBER_OF_SOURCES);
Phaser finalAggregationPhaser = new Phaser(1); // Register self
for (int i = 0; i < NUMBER_OF_SOURCES; i++) {
final int sourceId = i;
final String initialData = "Data from source " + sourceId; // Loaded data
finalAggregationPhaser.register(); // Register thread with the phaser
executorService.submit(() -> {
String data = initialData;
try {
for (int batch = 0; batch < NUMBER_OF_BATCHES; batch++) {
processingSemaphore.acquire(); // Acquire permit to limit concurrent processing
System.out.println("Processing batch " + batch + " from source " + sourceId);
// Simulate batch processing
TimeUnit.SECONDS.sleep(1);
processingSemaphore.release(); // Release permit
// Exchange data with another source if possible
if (sourceId < exchangers.length) {
int pairId = (sourceId + 1) % 2;
data = exchangers[pairId].exchange(data);
System.out.println("Source " + sourceId + " exchanged data with source " + pairId);
}
// Wait for all sources to complete the batch
batchProcessingBarrier.await();
}
} catch (InterruptedException | BrokenBarrierException e) {
Thread.currentThread().interrupt();
} finally {
finalAggregationPhaser.arriveAndDeregister(); // Deregister when done
}
});
}
executorService.shutdown(); // Shutdown executor after all tasks are submitted
executorService.awaitTermination(1, TimeUnit.HOURS); // Wait for all tasks to complete
// Wait for final aggregation to complete and perform any additional finalization
finalAggregationPhaser.arriveAndAwaitAdvance();
System.out.println("Final aggregation and cleanup complete. System shutdown.");
}
public static void main(String[] args) throws InterruptedException, BrokenBarrierException,Exception {
DataProcessingSystem system = new DataProcessingSystem();
system.loadData();
system.processData();
}
}
在这个例子中:
CountDownLatch
用于等待所有数据源加载完毕。CyclicBarrier
用于同步每批数据处理的结束,以便进行清理工作。Exchanger
用于在数据源之间交换处理结果。Phaser
用于在所有数据处理完毕后进行最终的聚合和清理工作。Semaphore
用于限制同时执行的数据处理任务数量,以防止系统过载。
时序图:
这个例子展示了如何在复杂的数据处理系统中使用java.util.concurrent
包下的同步工具类来协调多线程任务的执行。这些工具类的组合使用可以有效地管理线程间的同步和协作,从而提高系统的并发性能和稳定性。
总结
并发编程中,java.util.concurrent
包提供了一系列强大的同步工具类,使得开发者能够更有效地管理和协调多线程间的交互。以下是这些工具类的简要概述:
- Future: 表示异步计算的结果。它允许你启动一个可能耗时的计算,继续执行其他任务,然后在将来的某个时间点取得计算结果。
- Semaphore: 计数信号量,用于控制对共享资源的并发访问。它维护一组许可证,线程在访问资源之前必须获取许可证,使用完资源后释放许可证。
- CountDownLatch: 一个同步辅助类,允许一个或多个线程等待一组事件的完成。它通过一个计数器实现,该计数器在事件完成时递减,直到计数器为零时释放所有等待的线程。
- CyclicBarrier: 类似于CountDownLatch
,但它可以重置并重复使用。它用于同步一组线程,使它们在继续执行之前等待彼此到达一个共同的屏障点。
- Exchanger: 允许两个线程在一个同步点交换数据。这是一个双向同步操作,通常用于需要线程间协作的场景。
- Phaser: 一个可重用的同步辅助类,结合了CyclicBarrier
和CountDownLatch
的功能。它可以处理动态数量的线程参与多阶段任务的同步。
这些工具类具有不同的用途和应用场景,它们可以帮助开发者解决复杂的并发问题,如数据同步、线程间通信、流程控制等。正确使用这些工具可以提高程序的并发性能和响应能力,同时保证数据的一致性和线程的安全。
转载自:https://juejin.cn/post/7398050410806280218