likes
comments
collection
share

Java 异步编程的完美利器:CompletableFuture 指南

作者站长头像
站长
· 阅读数 13

Future获取异步执行结果

Java线程详解 万字探索线程池:优化并发任务的利器

之前我们详细探索了线程池,在上一篇文章中,我们仅仅介绍了 ThreadPoolExecutor 的 void execute(Runnable command) 方法,利用这个方法虽然可以提交任务,但是却没有办法获取任务的执行结果(execute() 方法没有返回值)。而很多场景下,我们又都是需要获取任务的执行结果的。

Future介绍

Java 通过 ThreadPoolExecutor 提供的 3 个 submit() 方法和 1 个 FutureTask 工具类来支持获得任务执行结果的需求。下面我们先来介绍这 3 个 submit() 方法,这 3 个方法的方法签名如下。


// 提交Runnable任务
Future<?> submit(Runnable task);
// 提交Callable任务
<T> Future<T> submit(Callable<T> task);
// 提交Runnable任务及结果引用  
<T> Future<T> submit(Runnable task, T result);

我们发现它们的返回值都是 Future 接口。

Java 异步编程的完美利器:CompletableFuture 指南 Future 接口有 5 个方法:

  • 取消任务的方法 cancel()
  • 判断任务是否已取消的方法 isCancelled()
  • 判断任务是否已结束的方法 isDone()
  • 获得任务执行结果的 get() 和 get(timeout, unit),其中最后一个 get(timeout, unit) 支持超时机制。

下面我们简单看下Future的例子:

public class FutureExample {
    public static void main(String[] args) throws InterruptedException, ExecutionException {
        ExecutorService executor = Executors.newFixedThreadPool(2);

        Future<Integer> future = executor.submit(() -> {
            // 模拟耗时计算
            Thread.sleep(2000);
            return 2 + 3;
        });
        System.out.println("异步计算中...");
        // 阻塞等待计算结果
        int result = future.get();

        System.out.println("计算结果: " + result);
        executor.shutdown();
    }
}

FutureTask

FutureTask是一个实现了RunnableFuture接口的类,它既可以作为Runnable对象传递给线程执行,也可以作为Future对象获取任务的结果。因此,我们可以通过FutureTaskCallable任务转化为可执行的异步任务,并在需要时获取任务的结果。

Java 异步编程的完美利器:CompletableFuture 指南

下面是一个FutureTask的例子:

public class FutureTaskExample {
    public static void main(String[] args) throws InterruptedException, ExecutionException {
        Callable<Integer> callable = () -> {
            // 模拟耗时计算
            Thread.sleep(2000);
            return 2 + 3;
        };

        FutureTask<Integer> futureTask = new FutureTask<>(callable);

        // 创建线程执行任务
        Thread thread = new Thread(futureTask);
        thread.start();

        System.out.println("异步计算中...");

        // 阻塞等待计算结果
        int result = futureTask.get();

        System.out.println("计算结果: " + result);
    }
}

FutureTask也可以直接作为ExecutorService的参数进行提交,以便执行任务,而无需手动创建线程。这样可以更方便地管理线程池和异步任务。

Future与FutureTask的不足

尽管Future与FutureTask在Java中提供了一种基本的异步编程方式,但它也存在一些不足之处:

  1. 缺乏异步回调机制:FutureTaskFuture接口都没有直接提供异步回调的机制。在某些场景下,我们可能希望在任务完成后立即执行一些操作,而不是阻塞等待结果。需要手动编写额外的代码来实现异步回调逻辑,增加了代码的复杂性。
  2. 无法手动完成或取消任务:FutureTaskFuture都没有提供主动完成或取消任务的方法。一旦任务提交,就无法在外部控制其执行状态。这可能会导致无法优雅地管理任务的生命周期和资源。
  3. 阻塞式获取结果:在使用get()方法获取结果时,如果任务还未完成,调用线程会被阻塞,无法进行其他操作。这种阻塞式获取结果的方式可能导致整体性能下降,特别是在多个异步任务同时执行时。
  4. 缺乏异常处理灵活性:FutureFutureTask在处理任务执行过程中的异常时,比较简单且不够灵活。通过捕获ExecutionException来获取异常信息,可能需要额外的处理逻辑来处理不同类型的异常情况。

为了解决这些问题,Guava提供了ListenableFuture,Java 8引入了CompletableFuture,它们都提供了更丰富的功能和灵活性,如异步回调、异常处理、任务组合等。

CompletableFuture使用

CompletableFutur介绍

CompletableFuture提供下面几种方法创建任务,它们之间的区别是:Runnable 接口的 run() 方法没有返回值,而 Supplier 接口的 get() 方法是有返回值的。

//使用内置线程ForkJoinPool.commonPool(),根据supplier构建执行任务
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier) {  
    return asyncSupplyStage(asyncPool, supplier);  
}  

//指定自定义线程,根据supplier构建执行任务
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier,  
Executor executor) {  
    return asyncSupplyStage(screenExecutor(executor), supplier);  
}  

//使用内置线程ForkJoinPool.commonPool(),根据runnable构建执行任务
public static CompletableFuture<Void> runAsync(Runnable runnable) {  
    return asyncRunStage(asyncPool, runnable);  
} 

//指定自定义线程,根据runnable构建执行任务
public static CompletableFuture<Void> runAsync(Runnable runnable,  
Executor executor) {  
    return asyncRunStage(screenExecutor(executor), runnable); 
}

默认情况下 CompletableFuture 会使用公共的 ForkJoinPool 线程池,这个线程池默认创建的线程数是 CPU 的核数(也可以通过 JVM option:-Djava.util.concurrent.ForkJoinPool.common.parallelism 来设置 ForkJoinPool 线程池的线程数)。

如果所有 CompletableFuture 共享一个线程池,那么一旦有任务执行一些很慢的 I/O 操作,就会导致线程池中所有线程都阻塞在 I/O 操作上,从而造成线程饥饿,进而影响整个系统的性能。所以我们要根据不同的业务类型创建不同的线程池,以避免互相干扰。

下面是一个CompletableFuture获取异步结果的例子:

public class CompletableFutureExample {
    public static void main(String[] args) throws InterruptedException, ExecutionException {
        CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
            // 模拟耗时计算
            try {
                Thread.sleep(2000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return 2 + 3;
        });

        System.out.println("异步计算中...");

        future.thenAccept(result -> System.out.println("计算结果: " + result));

        // 阻塞等待任务完成
        future.get();
    }
}

时序依赖关系

通常任务的依赖分为以下几种:

  1. 串行执行
Start
Future1
Future2
Future3
End
  1. 所有都执行完:
Start
Future1
Future2
Future3
Converge
End
  1. 任意完成:
Start
Future1
Future2
Future3
Converge
End

CompletableFuture的CompletionStage 接口可以清晰地描述任务之间的这种时序依赖关系。下面我们看下CompletionStage 接口如何描述串行关系、AND 聚合关系、OR 聚合关系以及异常处理。

CompletionStage编排

串行执行

描述串行关系CompletionStage 接口里面描述串行关系,主要是 thenApply、thenAccept、thenRun 和 thenCompose 这四种类型的接口。

  • thenApply 系列函数里参数 fn 的类型是接口 Function,这个接口里与 CompletionStage 相关的方法是 R apply(T t),这个方法既能接收参数也支持返回值,所以 thenApply 系列方法返回的是CompletionStage。
  • thenAccept 系列方法里参数 consumer 的类型是接口Consumer,这个接口里与 CompletionStage 相关的方法是 void accept(T t),这个方法虽然支持参数,但却不支持回值,所以 thenAccept 系列方法返回的是CompletionStage。
  • thenRun 系列方法里 action 的参数是 Runnable,所以 action 既不能接收参数也不支持返回值,所以 thenRun 系列方法返回的也是CompletionStage。这些方法里面 Async 代表的是异步执行 fn、consumer 或者 action。
  • thenCompose 方法,这个系列的方法会新创建出一个子流程,最终结果和 thenApply 系列是相同的。
CompletionStage thenApply(fn);
CompletionStage thenApplyAsync(fn);
CompletionStage thenAccept(consumer);
CompletionStage thenAcceptAsync(consumer);
CompletionStage thenRun(action);
CompletionStage thenRunAsync(action);
CompletionStage thenCompose(fn);
CompletionStage thenComposeAsync(fn);

通过下面的示例代码,我们可以看一下 thenApply() 方法是如何使用的。首先通过 supplyAsync() 启动一个异步流程,之后是两个串行操作,虽然这是一个异步流程,但任务①②③却是串行执行的,②依赖①的执行结果,③依赖②的执行结果。


CompletableFuture<String> f0 = 
  CompletableFuture.supplyAsync(
    () -> "Hello World")      //①
  .thenApply(s -> s + " 你好!")  //②
  .thenApply(String::toUpperCase);//③

System.out.println(f0.join());

AND汇聚关系

CompletionStage 接口里面描述 AND 汇聚关系,主要是 thenCombine、thenAcceptBoth 和 runAfterBoth 系列的接口,这些接口的区别也是源自 fn、consumer、action 这三个核心参数不同。

CompletionStage<R> thenCombine(other, fn);
CompletionStage<R> thenCombineAsync(other, fn);
CompletionStage<Void> thenAcceptBoth(other, consumer);
CompletionStage<Void> thenAcceptBothAsync(other, consumer);
CompletionStage<Void> runAfterBoth(other, action);
CompletionStage<Void> runAfterBothAsync(other, action);

OR 汇聚关系

CompletionStage 接口里面描述 OR 汇聚关系,主要是 applyToEither、acceptEither 和 runAfterEither 系列的接口,这些接口的区别也是源自 fn、consumer、action 这三个核心参数不同。

CompletionStage applyToEither(other, fn);
CompletionStage applyToEitherAsync(other, fn);
CompletionStage acceptEither(other, consumer);
CompletionStage acceptEitherAsync(other, consumer);
CompletionStage runAfterEither(other, action);
CompletionStage runAfterEitherAsync(other, action);

下面的示例代码展示了如何使用 applyToEither() 方法来描述一个 OR 汇聚关系。


CompletableFuture<String> f1 = 
  CompletableFuture.supplyAsync(()->{
    int t = getRandom(5, 10);
    System.sleep(t, TimeUnit.SECONDS);
    return String.valueOf(t);
});

CompletableFuture<String> f2 = 
  CompletableFuture.supplyAsync(()->{
    int t = getRandom(5, 10);
    System.sleep(t, TimeUnit.SECONDS);
    return String.valueOf(t);
});

CompletableFuture<String> f3 = 
  f1.applyToEither(f2,s -> s);

System.out.println(f3.join());

异常处理

异常处理虽然上面我们提到的 fn、consumer、action 它们的核心方法都不允许抛出可检查异常,但是却无法限制它们抛出运行时异常。正常业务代码中,我们可以使用 try{}catch{}来捕获并处理异常,那在异步编程里面,异常该如何处理呢?

CompletableFuture f0 = CompletableFuture.supplyAsync(()->(3/0)) 
    .thenApply(r->r*10);
System.out.println(f0.join());

CompletionStage 接口给我们提供的方案非常简单,比 try{}catch{}还要简单,下面是相关的方法,使用这些方法进行异常处理和串行操作是一样的,都支持链式编程方式。

CompletionStage exceptionally(fn);
CompletionStage whenComplete(consumer);
CompletionStage whenCompleteAsync(consumer);
CompletionStage handle(fn);
CompletionStage handleAsync(fn);

下面的示例代码展示了如何使用 exceptionally() 方法来处理异常,exceptionally() 的使用非常类似于 try{}catch{}中的 catch{},但是由于支持链式编程方式,所以相对更简单。既然有 try{}catch{},那就一定还有 try{}finally{},whenComplete() 和 handle() 系列方法就类似于 try{}finally{}中的 finally{},无论是否发生异常都会执行 whenComplete() 中的回调函数 consumer 和 handle() 中的回调函数 fn。whenComplete() 和 handle() 的区别在于 whenComplete() 不支持返回结果,而 handle() 是支持返回结果的。

CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
    // 模拟一个可能抛出异常的异步操作
    if (Math.random() < 0.5) {
        throw new RuntimeException("Something went wrong");
    }
    return 10;
});

CompletableFuture<Integer> handleException = future.exceptionally(ex -> {
    // 异常处理逻辑
    System.out.println("Exception occurred: " + ex.getMessage());
    return 0; // 返回默认值或处理后的结果
});

handleException.thenAccept(result -> {
    // 在最终结果完成后进行处理
    System.out.println("Final result: " + result);
});

获取结果

CompletableFuture提供了下面几种常用的方法获取结果:

public  T   get()
public  T   get(long timeout, TimeUnit unit)
public  T   getNow(T valueIfAbsent)
public  T   join()
public CompletableFuture<T> allOf()
public CompletableFuture<T> anyOf()
public CompletableFuture<T> whenComplete()
public <U> CompletableFuture<U> handle()
  1. get()get()方法是最常用的获取CompletableFuture结果的方法之一。它会阻塞当前线程,直到异步操作完成并返回结果,或者抛出异常。如果异步操作抛出异常,get()方法会将异常包装在ExecutionException中抛出。这个方法可以用于同步地获取结果。
  2. join()join()方法与get()方法类似,也会阻塞当前线程,直到异步操作完成并返回结果,或者抛出异常。不同之处在于,join()方法不会抛出ExecutionException,而是直接将异常抛出。这个方法可以用于同步地获取结果。
  3. whenComplete(BiConsumer<? super T,? super Throwable> action)whenComplete()方法允许注册一个回调函数,在异步操作完成后执行该函数。回调函数接收异步操作的结果(如果成功完成)或异常(如果发生异常),并可以对结果进行进一步处理或执行其他操作。这个方法不会阻塞线程,异步操作完成后立即执行回调函数。
  4. handle(BiFunction<? super T, Throwable, ? extends U> fn)handle()方法类似于whenComplete(),也允许注册一个回调函数,在异步操作完成后执行该函数。不同之处在于,回调函数的返回值会被包装在新的CompletableFuture中返回,而不是忽略返回值。这个方法可以用于对结果进行处理或转换,并返回包装后的新CompletableFuture。
  5. allOf():就是所有任务都完成时触发。allOf()可以配合get()一起使用。
  6. anyOf():等待任意一个完成。anyOf()方法返回一个新的CompletableFuture<Object>对象,该对象在任意一个输入的CompletableFuture完成后完成,并持有该完成的结果。

下面是allOf()的一个例子

CompletableFuture<Integer> future1 = CompletableFuture.supplyAsync(() -> 10);
CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> "Hello");
CompletableFuture<Double> future3 = CompletableFuture.supplyAsync(() -> 3.14);

CompletableFuture<Void> allFutures = CompletableFuture.allOf(future1, future2, future3);

allFutures.thenRun(() -> {
    System.out.println("All futures completed");
    Integer result1 = future1.join();
    String result2 = future2.join();
    Double result3 = future3.join();
    System.out.println("Result 1: " + result1);
    System.out.println("Result 2: " + result2);
    System.out.println("Result 3: " + result3);
});