深度剖析 CompletableFuture 工作原理!
嗨,你好,我是猿java
在日常开发中,为了提高程序的性能,我们经常会使用异步方式来完成,在本文中,我们将学习一种常用的工具类: CompletableFuture
,并且学习如何使用它来提高 Java 应用程序的性能,让我们开始学习旅程吧!
在分析 CompletableFuture
之前,我们先来看看 Future
。
什么是 Future?
Future
是 Java 5 中引入的 Java 接口,用于表示将来可用的值,使用 Future
给 Java 带来了巨大的好处,它可以帮助我们异步执行一些非常密集的计算,而不会阻塞当前线程,因此,我们可以在当前线程中继续做一些工作。
我们可以把 Future
想象成去餐厅吃晚饭,在晚餐准备期间,我们可以干些其他的事情,一旦晚餐准备好,就可以吃饭了。
Future
的源码如下:
package java.util.concurrent;
public interface Future<V> {
boolean cancel(boolean mayInterruptIfRunning);
boolean isCancelled();
boolean isDone();
V get() throws InterruptedException, ExecutionException;
V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException;
}
什么是 CompletableFuture?
CompletableFuture
是 Java 8 引入的一个类,用于处理异步编程。它是 Future 接口的一个增强版,提供了一些有用的方法来管理异步任务。其源码如下:
public class CompletableFuture<T> implements Future<T>, CompletionStage<T> {
volatile Object result; // Either the result or boxed AltResult
volatile Completion stack; // Top of Treiber stack of dependent actions
// 方法太多,此处省略
}
CompletableFuture 和 Future
在本节中,我们将了解 Future
接口以及它的一些局限性,并且分析如何使用 CompletableFuture
类来解决这些问题。
定义超时
Future
接口只提供了 get() 方法来获取计算结果,但如果计算时间过长,我们的线程就会一直堵塞。
为了更好地理解,让我们看一些示例代码:
import java.util.concurrent.*;
public class FutureDemo {
public static void main(String[] args) throws ExecutionException, InterruptedException {
ExecutorService executor = Executors.newSingleThreadExecutor();
Future<String> future = executor.submit(() -> threadSleep());
System.out.println("The result is: " + future.get());
}
private static String threadSleep() throws InterruptedException {
TimeUnit.DAYS.sleep(365 * 10);
return "finishSleep";
}
}
上述示例中,我们创建了一个 ExecutorService 实例,并且使用它来提交一个线程睡眠的任务(threadSleep()),然后通过调用 future.get() 方法在控制台上打印结果值,因为 threadSleep() 方法中,我们让线程睡眠了 10年,所以控制台要等待 10年才会打印值,而且 Future
也没有任何方法可以手动完成任务。
那么,CompletableFuture
是如何克服这个问题的呢?
我们还是使用相同的场景,并且在流程中调用了 CompletableFuture.complete() 方法,示例代码如下:
import java.util.concurrent.*;
public class CompletableFutureDemo {
public static void main(String[] args) {
CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(() -> threadSleep());
completableFuture.complete("Completed");
System.out.println("result: " + completableFuture.get());
System.out.println("completableFuture done ? " + completableFuture.isDone());
}
private static String threadSleep(){
try {
TimeUnit.DAYS.sleep(365 * 10);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
return "finishSleep";
}
}
在上述示例中:
- 首先,通过调用 CompletableFuture.supplyAsync() 方法创建一个 String 类型的 CompletableFuture;
- 然后,调用 completableFuture.complete()方法;
- 接着,调用 isDone() 方法,并打印其结果值;
- 最后,main() 方法的输出如下:
result: Completed
completableFuture done ? true
通过运行结果我们可以看到:需要睡眠 10年的 threadSleep() 居然完成了,为什么呢?在整个代码中嫌疑最大的是 completableFuture.complete(),因此我们来看看这个方法到底做了什么?其源码如下:
/**
* If not already completed, sets the value returned by {@link
* #get()} and related methods to the given value.
*
* @param value the result value
* @return {@code true} if this invocation caused this CompletableFuture
* to transition to a completed state, else {@code false}
*/
public boolean complete(T value) {
boolean triggered = completeValue(value);
postComplete();
return triggered;
}
通过源码我们可以知道:complete(T value) 方法是用于手动完成一个 CompletableFuture 任务(即使任务尚未执行或未完成)并且返回 value。
因此,CompletableFuture
是通过 complete(T value)方法手动结束 任务,从而客服了 Futrue
无法手动结束任务的限制。
组合异步操作
假设我们需要调用两个 Method:firstMethod() 和 secondMethod(),并且将 firstMethod() 的结果作为 secondMethod() 的输入。
通过使用 Future
接口,我们无法异步组合这两个操作,只能同步完成,示例代码如下:
import java.util.concurrent.*;
public class FutureDemo {
public static void main(String[] args) throws ExecutionException, InterruptedException {
ExecutorService executor = Executors.newSingleThreadExecutor();
Future<Integer> firstFuture = executor.submit(() -> firstMethod(1));
int firstMethodResult = firstFuture.get(); // 获取 firstMethod的结果值
System.out.println("firstMethodResult:" + firstMethodResult);
Future<Integer> secondFuture = executor.submit(() -> secondMethod(firstMethodResult));
System.out.println("secondMethodResult:" + secondFuture.get());
executor.shutdown();
}
private static int firstMethod(int num) {
return num;
}
private static int secondMethod(int firstMethodResult) {
return 2 + firstMethodResult;
}
}
在上述示例代码中:
- 首先,通过 ExecutorService 提交一个返回
Future
的任务来调用 firstMethod; - 然后,需要将 firstMethod 的结果值传递给第 secondMethod,但检索 firstMethod 结果值的唯一方法是使用 Future.get(),该方法会阻塞主线程;
- 接着,我们必须等到 firstMethod 返回结果,然后再执行 secondMethod 操作,整个流程就变成同步过程;
- 最后,main() 的输出如下:
firstMethodResult:1
secondMethodResult:3
通过运行结果可以看出,结果符合预期而且整个过程是串行执行的。
那么,CompletableFuture
是如何在不阻塞主线程的前提下,异步组合两个过程的呢?具体操作如下示例代码:
import java.util.concurrent.*;
public class CompletableFutureDemo {
public static void main(String[] args) throws ExecutionException, InterruptedException {
var finalResult = CompletableFuture.supplyAsync(() -> firstMethod(1))
.thenApply(firstMethodResult -> secondMethod(firstMethodResult));
System.out.println("finalResult:" + finalResult.get());
}
private static int firstMethod(int num) {
return num;
}
private static int secondMethod(int firstMethodResult) {
return 2 + firstMethodResult;
}
}
在上述示例代码中:
- 首先,通过 CompletableFuture.supplyAsync 方法,返回一个新的
CompletableFuture
,该CompletableFuture
是在 ForkJoinPool.commonPool() 中异步完成的,并且将结果值赋值给 Supplier; - 接着,获取 firstMethod() 的结果并使用 thenApply() 方法,将其传递给另一个调用 secondMethod();
- 最后,main() 的输出如下:
finalResult:3
通过运行结果可以看出,结果符合预期而且 CompletableFuture
成功的把 firstMethod() 和 secondMethod() 两个异步过程整合。
CompletableFuture如何提升性能?
在本节中,我们将通过设定的场景来验证 CompletableFuture
是如何提升性能的。
场景设定
- 定义一个 Transaction 类,并且包含 id 属性;
- 定义一个 TransactionExecutor类,包含 transactionId 属性和静态方法 doTransaction(Transaction transaction),方法接收 Transaction对象;
- 在 doTransaction() 方法中,通过线程睡眠 1秒来模拟业务耗时;
场景涉及的代码如下:
public class TransactionExecutor {
private final String transactionId;
public TransactionExecutor(String transactionId) {
this.transactionId = transactionId;
}
public static TransactionExecutor doTransaction(Transaction transaction) {
Thread.sleep(1000L); // 通过线程睡眠来模拟真实的业务耗时
return new TransactionExecutor("transactionId: " + transaction.getId());
}
@Override
public String toString() {
return "TransactionExecutor{" +
"transactionId='" + transactionId + ''' +
'}';
}
}
public class Transaction {
private String id;
public Transaction(String id) {
this.id = id;
}
// getter setter方法
}
接着,我们将通过以下 3种方式来执行给定的场景:
- 同步实现
- 并行流实现
- CompletableFuture 实现
同步实现
public class Demo {
public static void main(String[] args) {
long start = System.currentTimeMillis();
var executor = Stream.of(
new Transaction("1"),
new Transaction("2"),
new Transaction("3"))
.map(TransactionExecutor::doTransaction)
.collect(Collectors.toList());
long end = System.currentTimeMillis();
System.out.printf("The operation take %s ms%n", end - start);
System.out.println("TransactionExecutor are: " + executor);
}
}
运行代码结果如下:
The operation take 3087 ms
TransactionExecutor are: [TransactionExecutor{transactionId='transactionId: 1'},
TransactionExecutor{transactionId='transactionId: 2'},
TransactionExecutor{transactionId='transactionId: 3'}]
从运行结果可以看出:该程序花费了 3 秒多,因为每个事务都是串行执行,并且每个事务消耗 1秒,和预期时间比较吻合。
并行流实现
我们使用 Lambda 的 parallel并行流来实现,示例代码如下:
public class Demo {
public static void main(String[] args) {
long start = System.currentTimeMillis();
var executor = Stream.of(
new Transaction("1"),
new Transaction("2"),
new Transaction("3"))
.parallel()
.map(TransactionExecutor::doTransaction)
.collect(Collectors.toList());
long end = System.currentTimeMillis();
System.out.printf("The operation took %s ms%n", end - start);
System.out.println("TransactionExecutor are: " + executor);
}
}
通过运行结果,我们发现它合同步方法的差异是巨大的!应用程序运行速度几乎快了三倍,如下所示:
The operation took 1007 ms
TransactionExecutor are: [TransactionExecutor{transactionId='transactionId: 1'},
TransactionExecutor{transactionId='transactionId: 2'},
TransactionExecutor{transactionId='transactionId: 3'}]
CompletableFuture实现
现在将重构我们的客户端应用程序以利用 CompletableFuture:
public class Demo {
public static void main(String[] args) {
Executor executor = Executors.newFixedThreadPool(10);
long start = System.currentTimeMillis();
var future = Stream.of(
new Transaction("1"),
new Transaction("2"),
new Transaction("3"),
new Transaction("4"),
new Transaction("5"),
new Transaction("6"),
new Transaction("7"),
new Transaction("8"),
new Transaction("9"),
new Transaction("10")
).map(transaction -> CompletableFuture.supplyAsync(
() -> TransactionExecutor::doTransaction(transaction), executor)
).collect(toList());
var categories = future.stream()
.map(CompletableFuture::join)
.collect(toList());
long end = System.currentTimeMillis();
System.out.printf("The operation took %s ms%n", end - start);
System.out.println("TransactionExecutor are: " + categories);
}
}
使用 CompletableFuture
,对于 10 个线程,执行的时间也在 1 秒左右,性能简直杆杆的,运行结果如下:
The operation took 1040 ms
TransactionExecutor are:[TransactionExecutor{transactionId='transactionId: 1'},
TransactionExecutor{transactionId='transactionId: 2'},
TransactionExecutor{transactionId='transactionId: 3'},
TransactionExecutor{transactionId='transactionId: 4'},
TransactionExecutor{transactionId='transactionId: 5'},
TransactionExecutor{transactionId='transactionId: 6'},
TransactionExecutor{transactionId='transactionId: 7'},
TransactionExecutor{transactionId='transactionId: 8'},
TransactionExecutor{transactionId='transactionId: 9'},
TransactionExecutor{transactionId='transactionId: 10'}]
CompletableFuture工作流程
CompletableFuture
的执行流程可以分为以下 3个阶段:
- 创建阶段: 创建 CompletableFuture 实例,并提交异步任务。
- 执行阶段: 异步任务在线程池中执行,并在完成后触发相关的回调函数。
- 完成阶段: 异步任务完成,CompletableFuture 的状态更新,并执行后续操作。
创建阶段
CompletableFuture
的创建可以通过以下几种方法:
- CompletableFuture.supplyAsync(Supplier supplier): 提交一个返回结果的异步任务;
- CompletableFuture.runAsync(Runnable runnable): 提交一个不返回结果的异步任务;
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
// 执行任务
return "Result";
});
执行阶段
异步任务提交后,会在线程池中执行。默认情况下,CompletableFuture 使用 ForkJoinPool.commonPool() 作为它的线程池。如果需要自定义线程池,可以使用以下方法:
- CompletableFuture.supplyAsync(Supplier supplier, Executor executor);
- CompletableFuture.runAsync(Runnable runnable, Executor executor);
如下代码:
Executor executor = Executors.newFixedThreadPool(3);
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
// 执行任务
return "Result";
}, executor);
完成阶段
任务完成后,CompletableFuture 的状态将从未完成(未完成或未异常)更新为已完成(正常完成或异常完成)。你可以定义一系列操作来处理任务的结果或异常。
正常完成
任务正常完成后,可以使用以下方法进行链式调用:
- thenApply(Function<T, U> fn): 对任务结果进行转换;
- thenAccept(Consumer action): 对任务结果进行消费;
- thenRun(Runnable action): 执行一个额外的任务,不依赖于任务结果;
如下代码:
future.thenApply(result -> {
return "Processed " + result;
}).thenAccept(processedResult -> {
System.out.println(processedResult);
});
异常完成
如果任务执行过程中抛出了异常,可以使用以下方法处理异常:
- exceptionally(Function<Throwable, T> fn): 在任务抛出异常时提供一个默认值;
- handle(BiFunction<T, Throwable, U> fn): 任务完成后处理结果或异常;
如下代码:
future.exceptionally(ex -> {
System.out.println("Exception: " + ex.getMessage());
return "Default Result";
});
合并多个CompletableFuture
可以合并多个CompletableFuture
,等待它们全部完成或其中任意一个完成:
- allOf(CompletableFuture<?>... cfs): 等待所有 CompletableFuture 完成;
- anyOf(CompletableFuture<?>... cfs): 等待任意一个 CompletableFuture 完成。
如下代码:
CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> "Result 1");
CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> "Result 2");
CompletableFuture<Void> combinedFuture = CompletableFuture.allOf(future1, future2);
combinedFuture.thenRun(() -> System.out.println("All tasks completed."));
CompletableFuture
的执行流程分为创建、执行和完成三个阶段。通过创建异步任务、在线程池中执行任务,并在任务完成后进行结果处理,可以实现复杂的异步编程逻辑。
适用场景
CompletableFuture
适合在处理异步编程和并发任务时使用。以下是一些适合使用 CompletableFuture 的典型场景:
异步 I/O 操作
在进行 I/O 操作时,例如文件读取、数据库访问或网络请求,使用 CompletableFuture 可以使这些操作异步执行,从而避免阻塞主线程,提高应用程序的响应性。
并行处理任务
当需要并行处理多个独立任务时,使用 CompletableFuture 可以有效利用多核 CPU,提高计算效率。
流水线处理
当有一系列依赖的操作需要按顺序执行时,CompletableFuture 可以使这些操作异步执行,形成处理流水线,从而提高处理效率。
事件驱动的异步处理
在事件驱动的系统中,例如 GUI 应用或服务器请求处理,CompletableFuture 可以在事件发生时异步处理任务。
异常处理和回退机制
CompletableFuture 提供了灵活的异常处理机制,可以在异步任务发生异常时执行回退操作或提供默认值。
并发任务的组合
CompletableFuture 可以组合多个并发任务的结果,例如使用 allOf 和 anyOf 方法。
总结
在本文中,我们学习了如何在 Java 中使用 Future
接口以及它的局限性,同时,我们还学习了如何使用 CompletableFuture
类来克服 Future
的这些限制。
接着,我们通过一个业务场景演示,通过同步执行,并发执行,CompletableFuture
执行来比较它们的执行效率。
最后,CompletableFuture
适合在异步编程、并发任务、非阻塞 I/O、事件驱动处理、异常处理、任务组合等场景中使用。它提供了丰富的 API 来处理异步操作,使代码更加简洁、可读和高效。通过使用 CompletableFuture,开发者可以更好地利用系统资源,提高应用程序的性能和响应性。
学习交流
如果你觉得文章有帮助,请帮忙点个赞呗,或者关注公众号:猿java,持续输出硬核文章。
转载自:https://juejin.cn/post/7390593301509849100