【Java】Java 8的CompletableFuture介绍
【参考】
- Introduction to CompletableFuture in Java 8:www.youtube.com/watch?v=Imt…
1. non-blocking操作
这里的asynchronous操作即non-blocking操作,非阻塞式的操作,表示主线程可以提交task给独立的线程,独立的线程可以自行运行该task,主线程则会继续运行别的业务逻辑代码。
提交的task可以是Runnable实现,也可以是Callable实现(有返回值):
2. 通过ExecutorService进行task的提交
具体可以参考:【Java】线程池(ExecutorService)相关的总结
ExecutorService
提交Callable的task后,可以通过future.get()
拿到task的执行结果,get()
方法是阻塞式的获取结果,即如果该task还没有执行完毕,此时main thread需要等待,直到task执行完毕并返回结果。
ExecutorService存在的问题是如果一次性提交了4个tasks,那么在拿结果的时候,需要依次拿(future.get()),但这个方法是阻塞式的,假如task3或4提交完成了,也需要等待main thread从task1先拿到结果:
更为复杂的例子,假如处理1个order分5个步骤,即拿到order、补齐order必要的属性、付款、发送order,最后是邮件发送:
代码如下,因为get是阻塞式的,所以for循环里order1的处理,会影响后续order的速度,因为main thread有可能会卡在order1,而这时候后续的order可能已经处理完毕,这也使得使用多线程处理变的没有必要,因为order的处理顺序依旧是线性执行的:
public void orderLifeCycle() {
ExecutorService service = Executors.newFixedThreadPool(10);
for (int i = 0; i < 5; i ++) {
try {
Future<Order> future = service.submit(getOrderTask());
Order order = future.get(); // 阻塞
Future<Order> future1 = service.submit(enrichTask(order));
order = future1.get(); // 阻塞
Future<Order> future2 = service.submit(performPaymentTask(order));
order = future2.get(); // 阻塞
Future<Order> future3 = service.submit(dispatchTask(order));
order = future3.get(); // 阻塞
Future<Order> future4 = service.submit(sendEmailTask(order));
order = future4.get(); // 阻塞
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
}
}
而我们想要的效果是,order本身的flow是一个整体(虽然它可能分了很多个Callable步骤),但order与order之间,不应该相互等待,比如order1可以由某个线程执行并正在执行sendEmail方法,而order2可以由另外一个线程执行并正在执行payment方法,即每个order都是独立的流水线(independent flows):
3. CompletableFuture
可以使用CompletableFuture
来实现上述想要的期望:
public void orderSubmit() {
for (int i =0; i < 5; i ++) {
CompletableFuture.supplyAsync(() -> getOrder())
.thenApply(order -> enrich(order))
.thenApply(order -> performPayment(order))
.thenApply(order -> dispatch(order))
.thenAccept(order -> sendEmail(order));
}
}
3.1 supplyAsync方法
supplyAsync()
方法是CompletableFuture
中的static
方法,从定义可以看到它接受Supplier
接口的参数,Supplier
接口位于java的java.util.function
包中,也是Java 8引入的函数式接口,与之配套的还有另外两个接口:Function
和Consumer
,具体来说:
Supplier
接口:不接收参数,但返回一个对象(用消息组件来类比,可以看作是生产者)。Function
接口:接收参数后,再返回另一个对象(用消息组件来类比,即接收一个消息,进行处理后再发送至另一个地方)。Consumer
接口:接收参数,但不会再返回对象(用消息组件来类比,可以看作是消费者)。
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier) {
return asyncSupplyStage(ASYNC_POOL, supplier);
}
在我们order的例子中,supplyAsync(() -> getOrder())
即通过线程来处理拿到一个order的业务,然后返回。
supplyAsync()
除了接收suppilier函数外,还可接收线程池,即传入自定义的线程池来处理。如果没有指定,则会创建一个:Executor ASYNC_POOL = USE_COMMON_POOL ? ForkJoinPool.commonPool() : new ThreadPerTaskExecutor()
。
3.2 thenApply方法
thenApply()
方法接收的是Function
接口,Function
接口在上面介绍过了,它能接收一个参数,并返回一个结果。在我们的order例子中,它接收了getOrder()
返回的order参数,并进行处理后(通过方法enrich(order)
)再返回order结果。
public <U> CompletableFuture<U> thenApply(Function<? super T,? extends U> fn) {
return uniApplyStage(null, fn);
}
与thenApply()
方法类似的还有一个方法叫thenApplyAsync()
,两者的区别是:
thenApply()
会沿用上一个方法相同的线程。thenApplyAsync()
会使用另一个线程来执行方法体内的task,也可以传入线程池。
具体来说,假设我们的getOrder()
方法是IO开销比较大的(比如需要从DB里查询),而我们的enrich(order)
方法只是一些计算相关,即CPU开销比较大,那么我们在执行的时候,可能会用两个不同的线程池来执行,IO开销大的可以设置线程数多一些。
public void orderSubmit() {
ExecutorService cpuService = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());
ExecutorService ioService = Executors.newCachedThreadPool();
for (int i =0; i < 5; i ++) {
CompletableFuture.supplyAsync(() -> getOrder(), ioService)
.thenApplyAsync(order -> enrich(order), cpuService)
.thenApply(order -> performPayment(order))
.thenApply(order -> dispatch(order))
.thenAccept(order -> sendEmail(order));
}
}
3.3 thenAccept方法
thenAccept()
方法接收Comsumer
接口(接收一个参数,但没有返回值)。
与thenAccept()
方法相对的,有thenAcceptAsync()
方法,同样的,该方法能传入一个线程池,使得方法里的task可以在传入的线程池中进行提交执行。
public CompletableFuture<Void> thenAccept(Consumer<? super T> action) {
return uniAcceptStage(null, action);
}
3.4 exceptionally方法
异常处理。如果getOrder()
或enrich(order)
或performPayment(order)
中出现了一些异常,我们可以使用exceptionally()
来捕获异常,然后返回别的类,这样dispatch(order)
方法就知道在前面的步骤中出现了异常(比如拿到参数后先进行类型判断,是normal的Order
还是FailedOrder
。
CompletableFuture.supplyAsync(() -> getOrder(), ioService)
.thenApplyAsync(order -> enrich(order), cpuService)
.thenApply(order -> performPayment(order))
.exceptionally(e -> new FailedOrder())
.thenApply(order -> dispatch(order))
.thenAccept(order -> sendEmail(order));
3.5 其它方法,如thenCombine
这个可以连接两个stage的结果。但这个写起来比较复杂,推荐java的另一个框架,叫Reactor
框架,实现起来可能更加便捷,代码也更易读。
4. 总结
CompletableFuture
中suplyAsnc()
或thenApply()
或thenAccept()
都不会阻塞main thread,如果我们有100个order flow需要处理,那么每个order的流程都不会相互阻塞。
转载自:https://juejin.cn/post/7201787862236266552