CompletableFuture主要方法理解
CompletableFuture
继承了java.util.concurrent.Future
,所以也具备了Future
的所有特性。并且基于JDK1.8
的流式编程已经Lambda
表达式等实现一元操作符、异步性以及事件驱动编程模型,可以用来实现多线程的串行关系,并行关系,聚合关系。它的灵活性和更强大的功能是Future
无法比拟的。
一、创建方式
CompletableFuture<> future = new CompletableFuture<>();
默认使用ForkJoinPool.commonPool()
, commonPool
是一个会被很多任务共享的线程池,比如同一JVM
上的所有CompletableFuture
、并行Stream
都将共享commonPool
,commonPool
设计时的目标场景是运行非阻塞的CPU
密集型任务,为最大利用CPU
,其线程数默认为CPU数量-1
。
无返回值创建方法
public static CompletableFuture<Void> runAsync(Runnable runnable)
public static CompletableFuture<Void> runAsync(Runnable runnable, Executor executor)
示例:
public static void runAsync() throws InterruptedException, ExecutionException {
CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
}
System.out.println("run end...");
});
future.get();
}
有返回值创建方法
public static CompletableFuture<Void> supplyAsync(Runnable runnable)
public static CompletableFuture<Void> supplyAsync(Runnable runnable, Executor executor)
示例:
public static void supplyAsync() throws InterruptedException, ExecutionException {
CompletableFuture<Long> future = CompletableFuture.supplyAsync(() -> {
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
}
System.out.println("run end...");
return System.currentTimeMillis();
});
long time = future.get();
System.out.println("time = " + time);
}
二、计算结果完成时回调
whenComplete
说明:当CompletableFuture
的计算结果完成,或者抛出异常的时候,可以执行特点的Action
。
public CompletableFuture<T> whenComplete(BiConsumer<? super T, ? super Throwable> action)
public CompletableFuture<T> whenCompleteAsync(BiConsumer<? super T, ? super Throwable> action)
public CompletableFuture<T> whenCompleteAsync(BiConsumer<? super T, ? super Throwable> action, Executor executor)
public CompletableFuture<T> exceptionally(Function<Throwable, ? extends T> fn)
- 可以看到
Action
的类型是BiConsumer<? super T, ? super Throwable>
它可以处理正常的计算结果,或者异常情况。 whenComplete
和whenComplete
的区别:whenComplete
:是执行当前任务的线程继续执行whenComplete
whenCompleteAsync
:是执行把whenCompleteAsync
这个任务继续提交给线程池来进行执行 示例:
public static void whenComplete() throws InterruptedException {
CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
}
if (new Random().nextInt() % 2 >= 0) {
int i = 12 / 0;
}
System.out.println("run end..." + Thread.currentThread().getName());
});
future.whenComplete((t, u) -> {
System.out.println("执行完成!" + Thread.currentThread().getName());
});
future.exceptionally(e -> {
System.out.println("执行失败!" + Thread.currentThread().getName());
return null;
});
TimeUnit.SECONDS.sleep(2);
}
三、Handle方法
Handle
说明:handle
是执行任务完成时对结果的处理。handle方法和thenApply方法处理方式基本一样。不同的是在任务完成后再执行,还可以处理异常任务。
public <U> CompletionStage<U> handle(BiFunction<? super T, Throwable, ? extends U> fn);
public <U> CompletionStage<U> handleAsync(BiFunction<? super T, Throwable, ? extends U> fn);
public <U> CompletionStage<U> handleAsync(BiFunction<? super T, Throwable, ? extends U> fn, Executor executor);
示例:
public static void handle() throws InterruptedException, ExecutionException {
CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
int i = 10 / 0;
return new Random().nextInt(10);
}).handle((a, b) -> {
int result = -1;
if (b == null) {
result = a * 2;
} else {
System.out.println(b.getMessage());
}
return result;
});
System.out.println(future.get());
}
从示例中可以看出,在handle
中可以根据任务是否有异常来进行做相应的后续处理操作。而thenApply
方法,如果上个任务出现错误,则不会执行thenApply
方法。
四、CompletableFuture的组合
thenApply
说明:当一个线程依赖另一个线程时,可以使用thenApply
方法来把这两个线程串行化
public <U> CompletableFuture<U> thenApply(Function<? super T,? extends U> fn)
public <U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn)
public <U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn, Executor executor)
Function<? super T, ? extends U>
,T
:上一个任务返回结果的类型,U
:当前任务的返回值类型;
示例:
private static void thenApply() throws InterruptedException, ExecutionException {
CompletableFuture<Long> future = CompletableFuture.supplyAsync(() -> {
long result = new Random().nextInt(100);
System.out.println("result1=" + result);
return result;
}).thenApply((a) -> {
Long result = a * 5;
System.out.println("result2 = " + result);
return result;
});
long result = future.get();
System.out.println(result);
}
输出结果:
result1=65
result2 = 325
325
thenAccept
说明:接收任务的处理结果,并消费处理,无返回结果
public CompletionStage<Void> thenAccept(Consumer<? super T> action);
public CompletionStage<Void> thenAcceptAsync(Consumer<? super T> action);
public CompletionStage<Void> thenAcceptAsync(Consumer<? super T> action, Executor executor);
示例:
public static void thenAccept() throws InterruptedException, ExecutionException {
CompletableFuture<Void> future = CompletableFuture.supplyAsync(() -> {
return new Random().nextInt(10);
}).thenAccept((result) -> {
System.out.println(result);
});
future.get();
}
theRun
说明:跟thenAccept方法不一样的是,不关心任务的处理结果,只要上面的任务执行完成,就开始执行thenAccept。
public CompletionStage<Void> thenRun(Runnable action);
public CompletionStage<Void> thenRunAsync(Runnable action);
public CompletionStage<Void> thenRunAsync(Runnable action, Executor executor);
示例:
public static void thenRun() throws InterruptedException, ExecutionException {
CompletableFuture<Void> future = CompletableFuture.supplyAsync(() -> {
return new Random().nextInt(10);
}).thenRun(() -> {
System.out.println("thenRun...");
});
future.get();
}
thenCombine合并任务
说明:thenCombine
会把两个CompletionStage
的任务都执行完成后,把两个任务的结果一块交给thenCombine
来处理。
public <U, V> CompletionStage<V> thenCombine(CompletionStage<? extends U> other, BiFunction<? super T, ? super U, ? extends V> fn)
public <U, V> CompletionStage<V> thenCombineAsync(CompletionStage<? extends U> other, BiFunction<? super T, ? super U, ? extends V> fn)
public <U, V> CompletionStage<V> thenCombineAsync(CompletionStage<? extends U> other, BiFunction<? super T, ? super U, ? extends V> fn, Executor executor)
示例:
private static void thenCombine() throws InterruptedException, ExecutionException {
CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> {
return "hello";
});
CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> {
return "hello";
});
CompletableFuture<String> result = future1.thenCombine(future2, (e, t) -> {
return e + " " + t;
});
System.out.println(result.get());
}
并行方法
除了串行执行外,多个CompletableFuture
还可以并行执行。
anyOf
只有有任意一个CompletableFuture
结束,就可以做接下来的事情,而无须像allof
那样,等待所有的CompletableFuture
结束。但由于每个CompletableFuture
的返回值类型都可能不同,任意一个,意味着无法判断什么类型,所以anyOf的返回值是CompletableFuture<Object>
类型。
示例
public class Main {
public static void main(String[] args) throws InterruptedException {
CompletableFuture<String> cfQueryFromSina = CompletableFuture.supplyAsync(() -> {
return queryCode("中国石油", "https://finance.sina.com.cn/code/");
});
CompletableFuture<String> cfQueryFrom163 = CompletableFuture.supplyAsync(() -> {
return queryCode("中国石油", "https://money.163.com/code/");
});
// 用anyOf合并为一个新的CompletableFuture
CompletableFuture<Object> cfQuery = CompletableFuture.anyOf(cfQueryFromSina, cfQueryFrom163);
// 两个CompletableFuture执行异步查询
CompletableFuture<Double> cfFetchFromSina = cfQuery.thenApplyAsync((code) -> {
return fetchPrice((String) code, "http://finance.sina.com.cn/price/");
});
CompletableFuture<Double> cfFetchFrom163 = cfQuery.thenApplyAsync((code) -> {
return fetchPrice((String) code, "https://money.163.com/code/");
});
// 用anyOf合并为一个新的CompletableFuture
CompletableFuture<Object> cfFetch = CompletableFuture.anyOf(cfFetchFromSina, cfFetchFrom163);
// 最终结果
cfFetch.thenAccept((result) -> {
System.out.println("price: " + result);
});
Thread.sleep(200);
}
static String queryCode(String name, String url) {
System.out.println("query code from " + url + "...");
try {
Thread.sleep((long) (Math.random() * 100));
} catch (InterruptedException e) {
}
return "601857";
}
static Double fetchPrice(String code, String url) {
System.out.println("query price from " + url + "...");
try {
Thread.sleep((long) (Math.random() * 100));
} catch (InterruptedException e) {
}
return 5 + Math.random() * 20;
}
}
上面代码执行的顺序如下:
allOf
allOf
的返回值是CompletableFuture<Void>
类型,这是因为每个传入的CompletableFuture
的返回值都可能不同,所有组合的结果是无法用某种类型来表示的,索性返回Void
类型。
参考
www.jianshu.com/p/6bac52527… cloud.tencent.com/developer/a… www.liaoxuefeng.com/wiki/125259…
转载自:https://juejin.cn/post/6990619805367664648