掌握Java并发编程2:基本的任务执行框架-Executor、Executors和ExecutorService
引言
并发编程一直是Java编程中最具挑战性的领域之一,它要求开发者对线程的创建、管理和同步有深刻的理解。自Java 5起,java.util.concurrent
包引入了一系列并发工具类,大大简化了并发编程的复杂性。在这些工具中,Executor
接口及其相关的Executors
工具类和ExecutorService
接口是处理线程池和任务执行的基础。它们抽象了线程的创建和管理,允许开发者将精力集中在任务的逻辑实现上,而不是线程的生命周期管理。本文将深入了解这些基本的任务执行框架,并展示如何在Java应用程序中有效地使用它们。
正文
原理
在Java中,java.util.concurrent
包提供了一套强大的并发工具,其中包括了任务执行框架,它由Executor
接口、Executors
工具类和ExecutorService
接口组成。以下是对这些组件的基本介绍:
- Executor接口:
Executor
是一个简单的接口,用于抽象任务的提交过程。它仅有一个execute(Runnable command)
方法,用于接收一个Runnable
对象并执行它。 使用Executor
可以将任务提交和执行的细节分离开来,这样可以更灵活地管理线程的使用。
2. Executors工具类:
Executors
类提供了一系列静态工厂方法来创建不同类型的ExecutorService
。这些方法可以快速创建预定义配置的线程池,例如:
Executors.newFixedThreadPool(int nThreads)
:创建一个固定大小的线程池。Executors.newCachedThreadPool()
:创建一个可缓存线程的线程池。适用于有许多短生命周期的异步任务的场景。
Executors.newSingleThreadExecutor()
:创建一个单线程的Executor,确保所有任务按照提交顺序串行执行。
-
newScheduledThreadPool(int corePoolSize)
: 创建一个固定线程数的线程池,并支持延迟或定期执行任务。适用于需要定时或周期性执行任务的场景。
-
ExecutorService接口:
ExecutorService
是Executor
的子接口,它提供了更完整的异步任务执行机制。除了execute
方法外,它还提供了submit
方法,可以提交实现了Callable
接口的任务,并返回一个Future
对象,通过这个对象可以获取异步执行的结果。ExecutorService
还提供了管理和控制线程池的方法,例如:shutdown()
:平滑地关闭ExecutorService,不再接受新任务,已提交的任务继续执行。shutdownNow()
:尝试立即关闭ExecutorService,尝试停止所有正在执行的任务,不再处理队列中等待的任务,并返回等待执行的任务列表。awaitTermination(long timeout, TimeUnit unit)
:阻塞当前线程直到ExecutorService中所有任务执行完毕,或者超时,或者当前线程被中断。
优缺点:
优点: - 简化资源管理:自动线程管理减少了创建和销毁线程的开销,提高了资源利用率。 - 提高响应性:可以通过线程池快速响应任务请求,尤其是在使用缓存线程池时。 - 增强灵活性:可以根据应用需求选择不同类型的线程池和任务执行策略。 - 任务调度:提供了一种机制来排队和调度任务,使得任务执行更加有序。
缺点: - 复杂性:对于初学者来说,合理配置和使用线程池可能会比较复杂。 - 资源消耗:如果不当使用或配置线程池,可能会导致资源浪费或性能瓶颈。 - 隐蔽的问题:错误的线程池使用可能导致难以发现的问题,如线程泄漏、任务拒绝等。
示例
一个简单的例子使用ExecutorService
来提交任务:
package com.dereksmart.crawling.excutor;
import java.util.concurrent.*;
/**
* @Author derek_smart
* @Date 2024/7/30 7:55
* @Description Executor测试类
*/
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
public class ExecutorExample {
public static void main(String[] args) throws ExecutionException, InterruptedException {
// 创建一个固定大小的线程池
ExecutorService executor = Executors.newFixedThreadPool(2);
// 提交Runnable任务
executor.execute(new Runnable() {
public void run() {
System.out.println("Derek task....");
}
});
// 提交Callable任务并获取Future对象
Future<String> future = executor.submit(() -> {
Thread.sleep(1000);
return "Hello Derek....";
});
while (!future.isDone()) {
try {
// 可以选择性地暂停一下,避免密集循环
TimeUnit.MILLISECONDS.sleep(100);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
break;
}
}
// 检查Future对象的状态和结果
try {
// 任务完成,获取结果
String result = future.get();
System.out.println(result);
} catch (Exception e) {
e.printStackTrace();
}
// 关闭ExecutorService
executor.shutdown();
}
}
在这个例子中,创建了一个固定大小的线程池,提交了一个Runnable
任务和一个Callable
任务。然后尝试获取Callable
任务的结果,并在完成后关闭线程池。
使用java.util.concurrent
包中的这些工具可以大大简化多线程编程的复杂性,使得开发者可以更专注于任务的实现,而不是线程的管理。
类图:
CompletionService顺序获取并发任务结果
package com.dereksmart.crawling.excutor;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;
/**
* @Author derek_smart
* @Date 2024/7/30 8:56
* @Description CompletionService执行并发任务,并且按照任务完成的顺序处理它们的结果
*/
public class AdvancedExecutorExamples {
public static void main(String[] args) {
// 自定义线程池参数
int corePoolSize = 2;
int maximumPoolSize = 4;
long keepAliveTime = 60;
TimeUnit unit = TimeUnit.SECONDS;
BlockingQueue<Runnable> workQueue = new LinkedBlockingQueue<>(10);
// 使用有意义的线程名字的工厂
ThreadFactory threadFactory = new ThreadFactory() {
private final AtomicInteger threadNumber = new AtomicInteger(1);
public Thread newThread(Runnable r) {
Thread t = new Thread(r, "AdvancedExecutorExample-Pool-" + threadNumber.getAndIncrement());
if (t.isDaemon()) t.setDaemon(false);
if (t.getPriority() != Thread.NORM_PRIORITY) t.setPriority(Thread.NORM_PRIORITY);
return t;
}
};
ThreadPoolExecutor executor = new ThreadPoolExecutor(corePoolSize,
maximumPoolSize,
keepAliveTime,
unit,
workQueue,
threadFactory,
new ThreadPoolExecutor.CallerRunsPolicy() // 拒绝策略
);
// 使用CompletionService来按照任务完成的顺序处理结果
CompletionService<String> completionService = new ExecutorCompletionService<>(executor);
// 提交一系列Callable任务
List<Future<String>> futures = new ArrayList<>();
for (int i = 0; i < 5; i++) {
int taskId = i;
Future<String> future = completionService.submit(() -> {
TimeUnit.SECONDS.sleep(taskId); // 模拟任务执行时间
return "Task " + taskId + " completed.";
});
futures.add(future);
}
// 按照任务完成的顺序获取并处理结果
for (int i = 0; i < futures.size(); i++) {
try {
// take()方法将阻塞,直到有任务完成
Future<String> completedTask = completionService.take();
String result = completedTask.get();
System.out.println(result);
} catch (InterruptedException e) {
// 处理中断异常
Thread.currentThread().interrupt();
break;
} catch (ExecutionException e) {
// 处理任务执行异常
Throwable cause = e.getCause();
System.err.println("Task execution failed: " + cause.getMessage());
}
}
// 关闭线程池
shutdownAndAwaitTermination(executor);
}
// 优雅地关闭线程池的方法
private static void shutdownAndAwaitTermination(ExecutorService pool) {
pool.shutdown(); // 禁止提交新任务
try {
// 等待现有任务完成
if (!pool.awaitTermination(60, TimeUnit.SECONDS)) {
pool.shutdownNow();
// 取消正在执行的任务
// 等待任务响应中断
if (!pool.awaitTermination(60, TimeUnit.SECONDS)) {
System.err.println("Pool did not terminate");
}
}
} catch (InterruptedException ie) {
// 重新取消当前线程进行中断响应
pool.shutdownNow();
Thread.currentThread().interrupt();
}
}
}
使用场景: - 并发执行多个任务:当需要并发执行多个独立的任务,并且希望能够快速地处理那些先完成的任务的结果时,这段代码非常有用。
- 资源有效管理:使用线程池可以避免为每个任务创建新线程的开销,提高资源利用效率,并且可以通过调整线程池参数来优化性能。
- 任务执行顺序控制:通过CompletionService
,可以控制任务的执行顺序,即使任务是并发执行的,也可以按照完成顺序来处理结果。
- 长时间运行的应用:在长时间运行的应用中,优雅关闭线程池是很重要的,以确保应用退出前,所有的任务都已经完成,并且所有的资源都已经被正确释放。
- 大量短暂任务的执行:如果有大量的短暂任务需要执行,并且希望能够快速启动和响应,使用ThreadPoolExecutor
的缓存线程池或者自定义参数可以提高效率。
- 计算密集型或IO密集型任务:根据任务的类型,可以通过调整线程池的大小来优化性能,例如,对于计算密集型任务,可能想要限制线程数以匹配CPU核心数;对于IO密集型任务,由于线程经常会被阻塞,可以配置更多的线程来提高吞吐量。
总结
通过本文了解了Executor
、Executors
和ExecutorService
在Java并发编程中的核心作用和使用方法。探讨了如何利用这些工具来创建线程池、提交任务以及管理线程池的生命周期。正确地使用这些并发工具,不仅可以提高程序的性能,还能提升代码的质量和维护性。随着对这些基础概念的掌握,Java开发者将能够构建出更加健壮、可靠且可伸缩的并发应用程序,为处理现代应用程序中的高并发需求打下坚实的基础。记住,优秀的并发设计不仅仅是关于线程的操作,更是关于对资源的合理管理和对任务执行流程的精细控制。
转载自:https://juejin.cn/post/7397028381961945139