11.CompletableFuture异步编排
CompletableFuture 异步编排
Future 是 Java 5 添加的类,用来描述一个异步计算的结果。使用isDone
方法检查计算是否完成,或者使用get
阻塞住调用线程,直到计算完成返回结果,你也可以使用cancel
方法停止任务的执行。
在 Java 8 中, 新增加了一个类: CompletableFuture,它实现了 Fucture 接口,提供了非常强大的 Future 的扩展功能,简化异步编程的复杂性,提供了函数式编程的能力,可以 通过回调的方式处理计算结果,并且提供了转换和组合 CompletableFuture 的方法。 因为实现了 Future 接口,所以也可以和 Future 一样通过阻塞等方法去获取结果,但违背了初衷,并不推荐使用。
一、创建异步对象
CompletableFuture 提供了四个静态方法来创建一个异步操作
方法 | 备注 |
---|---|
runAsync(Runnable runnable) | 没有返回值,使用默认线程池 |
runAsync(Runnable runnable, Executor executor) | 没有返回值,使用传入的线程池 |
supplyAsync(Supplier supplier) | 有返回值,使用默认线程池 |
supplyAsync(Supplier supplier, Executor executor) | 有返回值,使用传入的线程池 |
简单使用:
CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {
/** * 处理逻辑 * **/
}, executor);
二、计算结果完成后的回调方法
下面方法都是计算结果结束后才会触发
方法 | 备注 |
---|---|
whenComplete(BiConsumer<? super T,? super Throwable> action) | 执行成功后回调,能感知异常,执行当前任务的线程执行继续执行 whenComplete 的任务 |
whenCompleteAsync(BiConsumer<? super T,? super Throwable> action) | 执行成功后回调,能感知异常,并把任务继续提交给线程池来进行执行(可能是其他线程) |
whenCompleteAsync(BiConsumer<? super T,? super Throwable> action, Executor executor) | 执行成功后回调,能感知异常,并把任务继续提交给传进来的线程池执行 |
exceptionally(Function<Throwable,? extends T> fn) | 能感知异常,同时返回默认值 |
handle 方法执行完后的处理(无论成功失败)
方法 | 备注 |
---|---|
handle( BiFunction<? super T, Throwable, ? extends U> fn) | 方法完成后处理,可以修改返回结果,本线程处理 |
handleAsync( BiFunction<? super T, Throwable, ? extends U> fn, Executor executor) | 方法完成后处理,可以修改返回结果,交给传进来的线程池处理 |
handleAsync( BiFunction<? super T, Throwable, ? extends U> fn) | 方法完成后处理,可以修改返回结果,交给当前线程池处理 |
三、线程串行化方法
thenRun : 不能获取上一步的执行结果,无返回值
public CompletableFuture<Void> thenRunAsync(Runnable action);
public CompletableFuture<Void> thenRunAsync(Runnable action,Executor executor) ```
thenAccept : 可以接收上一步的执行结果,无返回值
public CompletableFuture<Void> thenAcceptAsync(Consumer<? super T> action)
public CompletableFuture<Void> thenAcceptAsync(Consumer<? super T> action, Executor executor)
thenApply : 可以接收上一步的执行结果,有返回值
public <U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn)
public <U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn, Executor executor)
T:上一个任务返回结果的类型 U:当前任务的返回值类型
四、任务合并
合并两个任务 - 两个任务都要完成
和之前的方法一样,都有Async、传参 executor 版本,含义一样,不在赘述,这三个方法都是等待两个线程都执行完成后,执行下一步操作,区别如下
thenAcceptBoth 接收AB任务的返回值,后续处理没有返回值
public <U> CompletableFuture<Void> thenAcceptBoth(
CompletionStage<? extends U> other,
BiConsumer<? super T, ? super U> action) {
return biAcceptStage(null, other, action);
}
thenCombine 接收AB任务的返回值,后续处理有返回值
public <U,V> CompletableFuture<V> thenCombine(
CompletionStage<? extends U> other,
BiFunction<? super T,? super U,? extends V> fn) {
return biApplyStage(null, other, fn);
}
runAfterBoth 不接收AB任务的返回值,后续处理也没有返回值
public CompletableFuture<Void> runAfterBoth(CompletionStage<?> other,
Runnable action) {
return biRunStage(null, other, action);
}
使用示例:后续方法使用也是如此
public static void main(String[] args) throws ExecutionException, InterruptedException {
CompletableFuture<Object> f1 = CompletableFuture.supplyAsync(() -> {
System.out.println("任务1结束");
return 1;
},executor);
CompletableFuture<Object> f2 = CompletableFuture.supplyAsync(() -> {
System.out.println("任务2开始");
try {
TimeUnit.SECONDS.sleep(3);
System.out.println("任务2结束");
} catch (InterruptedException e) {
e.printStackTrace();
}
return 2;
},executor);
f1.thenAcceptBothAsync(f2,(s1,s2)->{
System.out.println("任务三开始:@f1:"+ s1 + " @f2:" + s2);
},executor);
}
/**--------------打印结果-----------------**/
/**任务1结束**/
/**任务2开始**/
/**任务2结束**/
/**任务三开始:@f1:1 @f2:2**/
合并两个任务 - 两个任务完成一个
和之前的方法一样,都有Async、传参 executor 版本,含义一样,不在赘述,这三个方法都是两个线程中任意一个线程执行完成,区别如下
applyToEither 接收前置任务最快执行完成的结果,后续处理无返回值
public <U> CompletableFuture<U> applyToEither(
CompletionStage<? extends T> other, Function<? super T, U> fn) {
return orApplyStage(null, other, fn);
}
acceptEither 接收前置任务最快执行完成的结果,后续处理有返回值
public CompletableFuture<Void> acceptEither(
CompletionStage<? extends T> other, Consumer<? super T> action) {
return orAcceptStage(null, other, action);
}
runAfterEither 不接收前置任务的结果,后续处理无返回值
public CompletableFuture<Void> runAfterEither(CompletionStage<?> other,
Runnable action) {
return orRunStage(null, other, action);
}
#### 合并多个任务
public static CompletableFuture<Void> allOf(CompletableFuture<?>... cfs)
public static CompletableFuture<Object> anyOf(CompletableFuture<?>... cfs)
- allOf:等待所有线程执行完成,没有返回值,没有内置回调函数
- anyOf:等待任意一个线程执行完成,返回最快执行完成线程的结果,没有内置回调函数
使用实例:
public static void main(String[] args) throws ExecutionException, InterruptedException {
CompletableFuture<Object> f1 = CompletableFuture.supplyAsync(() -> {
System.out.println("任务1结束");
return 1;
},executor);
CompletableFuture<Object> f2 = CompletableFuture.supplyAsync(() -> {
System.out.println("任务2开始");
try {
TimeUnit.SECONDS.sleep(3);
System.out.println("任务2结束");
} catch (InterruptedException e) {
e.printStackTrace();
}
return 2;
},executor);
CompletableFuture<Object> f3 = CompletableFuture.supplyAsync(() -> {
System.out.println("任务3结束");
return 3;
},executor);
CompletableFuture<Void> cfa = CompletableFuture.allOf(f1, f2, f3);
cfa.get();// 或者join
System.out.println("结束 如果不调get这块会先走完");
}
转载自:https://juejin.cn/post/7126836392034271268