likes
comments
collection
share

11.CompletableFuture异步编排

作者站长头像
站长
· 阅读数 16

思维导图:点击查看思维导图 文章图片:点击查看图片

CompletableFuture 异步编排

Future 是 Java 5 添加的类,用来描述一个异步计算的结果。使用isDone方法检查计算是否完成,或者使用get阻塞住调用线程,直到计算完成返回结果,你也可以使用cancel 方法停止任务的执行。

在 Java 8 中, 新增加了一个类: CompletableFuture,它实现了 Fucture 接口,提供了非常强大的 Future 的扩展功能,简化异步编程的复杂性,提供了函数式编程的能力,可以 通过回调的方式处理计算结果,并且提供了转换和组合 CompletableFuture 的方法。 因为实现了 Future 接口,所以也可以和 Future 一样通过阻塞等方法去获取结果,但违背了初衷,并不推荐使用。

一、创建异步对象

CompletableFuture 提供了四个静态方法来创建一个异步操作

方法备注
runAsync(Runnable runnable)没有返回值,使用默认线程池
runAsync(Runnable runnable, Executor executor)没有返回值,使用传入的线程池
supplyAsync(Supplier supplier)有返回值,使用默认线程池
supplyAsync(Supplier supplier, Executor executor)有返回值,使用传入的线程池

简单使用:

 CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {
    /** * 处理逻辑 * **/
 }, executor);

二、计算结果完成后的回调方法

下面方法都是计算结果结束后才会触发

方法备注
whenComplete(BiConsumer<? super T,? super Throwable> action)执行成功后回调,能感知异常,执行当前任务的线程执行继续执行 whenComplete 的任务
whenCompleteAsync(BiConsumer<? super T,? super Throwable> action)执行成功后回调,能感知异常,并把任务继续提交给线程池来进行执行(可能是其他线程)
whenCompleteAsync(BiConsumer<? super T,? super Throwable> action, Executor executor)执行成功后回调,能感知异常,并把任务继续提交给传进来的线程池执行
exceptionally(Function<Throwable,? extends T> fn)能感知异常,同时返回默认值

handle 方法执行完后的处理(无论成功失败)

方法备注
handle( BiFunction<? super T, Throwable, ? extends U> fn)方法完成后处理,可以修改返回结果,本线程处理
handleAsync( BiFunction<? super T, Throwable, ? extends U> fn, Executor executor)方法完成后处理,可以修改返回结果,交给传进来的线程池处理
handleAsync( BiFunction<? super T, Throwable, ? extends U> fn)方法完成后处理,可以修改返回结果,交给当前线程池处理

三、线程串行化方法

thenRun : 不能获取上一步的执行结果,无返回值

public CompletableFuture<Void> thenRunAsync(Runnable action);
public CompletableFuture<Void> thenRunAsync(Runnable action,Executor executor) ```

thenAccept : 可以接收上一步的执行结果,无返回值

public CompletableFuture<Void>  thenAcceptAsync(Consumer<? super T> action)
public CompletableFuture<Void>  thenAcceptAsync(Consumer<? super T> action, Executor executor)

thenApply : 可以接收上一步的执行结果,有返回值

public <U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn)
public <U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn, Executor executor)

T:上一个任务返回结果的类型 U:当前任务的返回值类型

四、任务合并

合并两个任务 - 两个任务都要完成

和之前的方法一样,都有Async、传参 executor 版本,含义一样,不在赘述,这三个方法都是等待两个线程都执行完成后,执行下一步操作,区别如下

thenAcceptBoth 接收AB任务的返回值,后续处理没有返回值

 public <U> CompletableFuture<Void> thenAcceptBoth(
     CompletionStage<? extends U> other,
     BiConsumer<? super T, ? super U> action) {
     return biAcceptStage(null, other, action);
}

thenCombine 接收AB任务的返回值,后续处理有返回值

public <U,V> CompletableFuture<V> thenCombine(
    CompletionStage<? extends U> other,
    BiFunction<? super T,? super U,? extends V> fn) {
    return biApplyStage(null, other, fn);
}

runAfterBoth 不接收AB任务的返回值,后续处理也没有返回值

public CompletableFuture<Void> runAfterBoth(CompletionStage<?> other,
                                                Runnable action) {
    return biRunStage(null, other, action);
}

使用示例:后续方法使用也是如此

public static void main(String[] args) throws ExecutionException, InterruptedException {
    CompletableFuture<Object> f1 = CompletableFuture.supplyAsync(() -> {
        System.out.println("任务1结束");
        return 1;
    },executor);
    CompletableFuture<Object> f2 = CompletableFuture.supplyAsync(() -> {
        System.out.println("任务2开始");
        try {
            TimeUnit.SECONDS.sleep(3);
            System.out.println("任务2结束");
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return 2;
    },executor);
​
    f1.thenAcceptBothAsync(f2,(s1,s2)->{
        System.out.println("任务三开始:@f1:"+ s1 + " @f2:" + s2);
    },executor);
}
/**--------------打印结果-----------------**/
/**任务1结束**/
/**任务2开始**/
/**任务2结束**/
/**任务三开始:@f1:1 @f2:2**/

合并两个任务 - 两个任务完成一个

和之前的方法一样,都有Async、传参 executor 版本,含义一样,不在赘述,这三个方法都是两个线程中任意一个线程执行完成,区别如下

applyToEither 接收前置任务最快执行完成的结果,后续处理无返回值

public <U> CompletableFuture<U> applyToEither(
    CompletionStage<? extends T> other, Function<? super T, U> fn) {
    return orApplyStage(null, other, fn);
}

acceptEither 接收前置任务最快执行完成的结果,后续处理有返回值

public CompletableFuture<Void> acceptEither(
    CompletionStage<? extends T> other, Consumer<? super T> action) {
    return orAcceptStage(null, other, action);
}

runAfterEither 不接收前置任务的结果,后续处理无返回值

public CompletableFuture<Void> runAfterEither(CompletionStage<?> other,
                                                  Runnable action) {
    return orRunStage(null, other, action);
}

#### 合并多个任务

public static CompletableFuture<Void> allOf(CompletableFuture<?>... cfs)
public static CompletableFuture<Object> anyOf(CompletableFuture<?>... cfs)
  • allOf等待所有线程执行完成没有返回值,没有内置回调函数
  • anyOf等待任意一个线程执行完成返回最快执行完成线程的结果,没有内置回调函数

使用实例:

public static void main(String[] args) throws ExecutionException, InterruptedException {
        CompletableFuture<Object> f1 = CompletableFuture.supplyAsync(() -> {
            System.out.println("任务1结束");
            return 1;
        },executor);
        CompletableFuture<Object> f2 = CompletableFuture.supplyAsync(() -> {
            System.out.println("任务2开始");
            try {
                TimeUnit.SECONDS.sleep(3);
                System.out.println("任务2结束");
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return 2;
        },executor);
        CompletableFuture<Object> f3 = CompletableFuture.supplyAsync(() -> {
            System.out.println("任务3结束");
            return 3;
        },executor);
​
        CompletableFuture<Void> cfa = CompletableFuture.allOf(f1, f2, f3);
        cfa.get();// 或者join
​
        System.out.println("结束 如果不调get这块会先走完");
}