CompletableFuture 异步编排、案例及应用小案例
如果没有学习过JUC相关的知识,说起来可能真的不知道
CompletableFuture
异步编排这方面的知识。又或者就是像我这种,以前按部就班的学习过,但迟迟没有上手应用,属于是忘得差不多了。
在去年这个时候,其实我也写过一篇
CompletableFuture
相关的文章,但是那个时候可能就比较的青涩,完全就是写案例,没有什么应用场景,都不好意思说自己了,已经笑麻了~
前言
今天的话,就来以一个应用场景来进行一步一步的推导,在实现案例的过程中,将CompletableFuture
相关的知识点逐步讲述明白。
应用场景如下:
//1. 文章内容信息 1s
//2. 作者相关信息 0.5s 依赖于1的查询结果
//3. 文章评论信息 0.5s 依赖于1的查询结果
//4. 文章分类信息 0.5s 依赖于1的查询结果
//5. 文章专栏信息 0.5s 依赖于1的查询结果
//6. 相关文章信息 0.5s 依赖于1的查询结果
//...
补充
:这是我随意拆分的,里面具体的表结构和接口请求先后的关系,以及具体的请求时间都是比较随意的,具体想要陈述的就是同步和异步编程的关系。
那么我们就要根据这个组装一个视图数据来进行返回。
现在来说:按照以前我们以前串行化的执行方式,那么总花费的时间就是3.5s
,也就是从一开始执行到六,无疑这样是非常慢的,并且 2,3,4,5,6
都是依赖于 1的结果查询,但2,3,4,5,6
并不互相依赖,此时我们可以将他们从串行化变成异步执行,自己准备一个线程池,然后在执行的时候,将它们放进线程池中异步运行。
如此总耗费时间就从原来的 3.5s
变成了 1.5s
,编程思想的改变,对于性能还是有一定程度的提高的
接下来我们就开始接触CompletableFuture
吧
一、CompletableFuture 引入之前
再讲CompletableFuture
之前,我还是秉承着一贯的理念,先讲述一些之前的东西,然后再将它引入进来,不至于让大家对于它的出现处于一种非常迷茫的状态。
在之前我们如果只是普通异步的执行一个任务,无需返回结果的话,只要将一个任务实现 Runnable
接口,然后将放进线程池即可。
如果需要返回结果,就让任务实现Callable
接口,但实际上Callable
与 Thread 并没有任何关系,Callable
还需要使用Future
与线程建立关系,然后再让线程池执行,最后通过futureTask.get()
方法来获取执行的返回结果。
futureTask
是Future
接口的一个基本实现。
(Callable 类似于Runnable 接口,但 Runnable 接口中的 run()方法不会返回结果,并且也无法抛出经过检查的异常,但是 Callable中 call()方法能够返回计算结果,并且也能够抛出经过检查的异常。)
一个小案例:
/**
* @description:
* @author: Yihui Wang
* @date: 2022年08月21日 11:34
*/
public class Demo {
public static ExecutorService executorService = new ThreadPoolExecutor(
10,
100,
30L,
TimeUnit.SECONDS,
new LinkedBlockingQueue<Runnable>(100),
Executors.defaultThreadFactory(),
new ThreadPoolExecutor.DiscardOldestPolicy());
public static void runnableTest(){
RunnableTest runnableTest = new RunnableTest();
executorService.submit(runnableTest);
}
public static void callableTest() throws ExecutionException, InterruptedException, TimeoutException {
CallableAndFutureTest callableAndFutureTest = new CallableAndFutureTest();
FutureTask<String> task = new FutureTask<>(callableAndFutureTest);
// 采用线程池执行完程序并不会结束, 如果是想测试一次性的那种 可以采用
// new Thread(task).start();
executorService.submit(task);
//System.out.println("尝试取消任务,传true表示取消任务,false则不取消任务::"+task.cancel(true));
System.out.println("判断任务是否已经完成::"+task.isDone());
//结果已经计算出来,则立马取出来,如若摸没有计算出来,则一直等待,直到结果出来,或任务取消或发生异常。
System.out.println("阻塞式获取结果::"+task.get());
System.out.println("在获取结果时,给定一个等待时间,如果超过等待时间还未获取到结果,则会主动抛出超时异常::"+task.get(2, TimeUnit.SECONDS));
}
public static void main(String[] args) throws ExecutionException, InterruptedException, TimeoutException {
runnableTest();
callableTest();
}
}
class RunnableTest implements Runnable{
@Override
public void run() {
System.out.println("我是Runnable执行的结果,我无法返回结果");
}
}
class CallableAndFutureTest implements Callable<String> {
@Override
public String call() throws Exception {
String str = "";
for (int i = 0; i < 10; i++) {
str += String.valueOf(i);
Thread.sleep(100);
}
return str;
}
}
看起来Callable
搭配Future
好像已经可以实现我们今天要实现的效果了,从结果的意义上来说,确实可以,但是并不优雅,也会存在一些问题。
如果多个线程之间存在依赖组合,该如何呢?
这个时候就轮到 CompletableFuture
出现了~
二、CompletableFuture 案例
我先直接将实现应用场景的效果代码写出来,然后再接着慢慢的去讲
package com.nzc;
import lombok.Data;
import java.util.concurrent.*;
/**
* @description:
* @author: Yihui Wang
* @date: 2022年08月21日 11:48
*/
public class CompletableFutureDemo {
public static ExecutorService executorService = new ThreadPoolExecutor(
10,
100,
30L,
TimeUnit.SECONDS,
new LinkedBlockingQueue<Runnable>(100),
Executors.defaultThreadFactory(),
new ThreadPoolExecutor.DiscardOldestPolicy());
public static ArticleVO asyncReturn(){
ArticleVO article=new ArticleVO();
long startTime=System.currentTimeMillis();
CompletableFuture<ArticleVO> articleContent = CompletableFuture.supplyAsync(() -> {
try {
article.setId(1L);
article.setContent("我是宁在春写的文章内容");
Thread.sleep(1000);
} catch (Exception e) {
e.printStackTrace();
}
return article;
},executorService);
// 这里的res 就是第一个个 CompletableFuture 执行完返回的结果
// 如果要测试它们的异步性,其实应该在下面的所有执行中,都让它们沉睡一会,这样效果会更加明显
// executorService 是放到我们自己创建的线程池中运行
CompletableFuture<Void> author = articleContent.thenAcceptAsync((res) -> {
res.setAuthor(res.getId()+"的作者是宁在春");
},executorService);
CompletableFuture<Void> articleComment = articleContent.thenAcceptAsync((res) -> {
res.setComment(res.getId()+"的相关评论");
},executorService);
CompletableFuture<Void> articleCategory = articleContent.thenAcceptAsync((res) -> {
res.setCategory(res.getId()+"的分类信息");
},executorService);
CompletableFuture<Void> articleColumn = articleContent.thenAcceptAsync((res) -> {
res.setColumn(res.getId()+"的文章专栏信息");
},executorService);
CompletableFuture<Void> recommend = articleContent.thenAcceptAsync((res) -> {
res.setRecommend(res.getId()+"的文章推荐信息");
},executorService);
CompletableFuture<Void> futureAll = CompletableFuture.allOf(articleContent, author, articleComment, articleCategory, articleColumn, recommend);
try {
// get() 是一个阻塞式方法 这里是阻塞式等待所有结果返回
// 因为要等待所有结果返回,才允许方法的结束,否则一些还在执行中,但是方法已经返回,就会造成一些错误。
futureAll.get();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
long endTime=System.currentTimeMillis();
System.out.println("耗费的总时间===>"+(endTime-startTime));
// 所有任务执行完成后,将构建出来的视图结果进行返回
return article;
}
public static void main(String[] args) {
ArticleVO articleVO = asyncReturn();
System.out.println(articleVO);
}
}
@Data
class ArticleVO {
private Long id;
private String content;
private String author;
private String comment;
private String category;
private String column;
private String recommend;
}
这里就是对应着应用场景里的那几步,引入以下lombok包就可以直接测试了。
为了更好的看出效果,也可以在执行某个任务的时候,让它睡上一会。
三、CompletableFuture 详解
看完上面的例子,算是看到他是如何的啦,接下来还是需要详细说一说的,思维导图如下:
3.1、通过 CompletableFuture 创建普通异步任务
CompletableFuture.runAsync()
创建无返回值的简单异步任务 Executor
表示线程池~
package com.nzc;
import java.util.concurrent.*;
/**
* @description:
* @author: Yihui Wang
* @date: 2022年08月21日 15:58
*/
public class AsyncDemo {
public static ExecutorService executorService = new ThreadPoolExecutor(
10,
100,
30L,
TimeUnit.SECONDS,
new LinkedBlockingQueue<Runnable>(100),
Executors.defaultThreadFactory(),
new ThreadPoolExecutor.DiscardOldestPolicy());
public static void main(String[] args) throws ExecutionException, InterruptedException {
System.out.println("主线程开始");
CompletableFuture<Void> runAsync = CompletableFuture.runAsync(() -> {
try {
Thread.sleep(500L);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("通过CompletableFuture.runAsync创建一个简单的异步任务~");
// 另外此处还可以填写第二个参数,放进自定义线程池中执行
},executorService);
//runAsync.isDone() 可以判断任务是否已经 完成
System.out.println("任务是否完成==>" + runAsync.isDone());
//这里是阻塞式等待任务完成
runAsync.get();
System.out.println("主线程结束");
System.out.println("任务是否完成==>" + runAsync.isDone());
}
}
CompletableFuture.supplyAsync()
创建有返回值的简单异步任务
public static void main(String[] args) throws ExecutionException, InterruptedException {
CompletableFuture<String> supplyAsync = CompletableFuture.supplyAsync(() -> {
return "我是由宁在春创建的有返回结果的异步任务";
}, executorService);
// 如果只有一条返回语句,还可以写的更加简便
//CompletableFuture<String> supplyAsync1 = CompletableFuture.supplyAsync(() -> "我是有返回结果的异步任务", executorService);
//这里同样也是阻塞式的
String result = supplyAsync.get();
System.out.println("异步任务执行的回调结果:==>"+result);
}
3.2、thenRun/thenRunAsync
简单说就是,这两个方法就是将执行任务的线程排起来,执行完一个,接着再执行第二个。并且它不需要接收上一个任务的结果,也不会返回结果,一定程度上来说,它的应用场景并不是特别高。
public static ExecutorService executorService = new ThreadPoolExecutor(
10,
100,
30L,
TimeUnit.SECONDS,
new LinkedBlockingQueue<Runnable>(100),
Executors.defaultThreadFactory(),
new ThreadPoolExecutor.DiscardOldestPolicy());
/**
* @param args
*/
public static void main(String[] args) throws Exception {
thenRun();
thenRunAsync();
}
public static void thenRun() throws ExecutionException, InterruptedException {
long startTime = System.currentTimeMillis();
System.out.println("主线程开始1");
CompletableFuture<String> future =
CompletableFuture.supplyAsync(() -> {
System.out.println("我是一个无需传参也没有返回值的简单异步任务1");
return "我是宁在春";
});
CompletableFuture<Void> thenRun = future.thenRun(() -> {
try {
Thread.sleep(500L);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("等待任务1执行完后,我再执行任务2");
});
CompletableFuture<Void> thenRun1 = future.thenRun(() -> {
try {
Thread.sleep(500L);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("等待任务1执行完后,我再执行任务3");
});
CompletableFuture<Void> thenRun2 = future.thenRun(() -> {
try {
Thread.sleep(500L);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("等待任务1执行完后,我再执行任务4");
});
future.get();
long endTime = System.currentTimeMillis();
System.out.println("主线程结束1,耗费时间为:"+(endTime-startTime));
}
public static void thenRunAsync() throws ExecutionException, InterruptedException {
long startTime = System.currentTimeMillis();
System.out.println("主线程开始2");
CompletableFuture<String> future =
CompletableFuture.supplyAsync(() -> {
System.out.println("我是一个无需传参也没有返回值的简单异步任务 一");
return "我是宁在春";
},executorService);
CompletableFuture<Void> thenRunAsync1 = future.thenRunAsync(() -> {
try {
Thread.sleep(500L);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("等待任务一执行完后,我再执行任务二");
},executorService);
CompletableFuture<Void> thenRunAsync2 = future.thenRunAsync(() -> {
try {
Thread.sleep(500L);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("等待任务一执行完后,我再执行任务三");
},executorService);
CompletableFuture<Void> thenRunAsync3 = future.thenRunAsync(() -> {
try {
Thread.sleep(500L);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("等待任务一执行完后,我再执行任务四");
},executorService);
// 这里是让所有的阻塞,等待所有任务完成,才结束整个任务
CompletableFuture<Void> allOf = CompletableFuture.allOf(future,thenRunAsync1, thenRunAsync2, thenRunAsync3);
allOf.get();
long endTime = System.currentTimeMillis();
System.out.println("主线程结束2,耗费时间为:"+(endTime-startTime));
}
}
小结:
浅显的说它们两个的区别的话,其实就是thenRunAsync
可异步执行,thenRun
不可异步执行,不过都可以异步的阻塞式等待结果的返回。
在案例中我是自己手动创建了线程池,但其实就算我没有手动创建线程池,当调用thenRunAsync
方法,它也是放在异步线程中执行的。
源码比较:
public CompletableFuture<Void> thenRun(Runnable action) {
return uniRunStage(null, action);
}
public CompletableFuture<Void> thenRunAsync(Runnable action) {
return uniRunStage(asyncPool, action);
}
public CompletableFuture<Void> thenRunAsync(Runnable action,
Executor executor) {
return uniRunStage(screenExecutor(executor), action);
}
/**
* Default executor -- ForkJoinPool.commonPool() unless it cannot
* support parallelism.
*/
private static final Executor asyncPool = useCommonPool ?
ForkJoinPool.commonPool() : new ThreadPerTaskExecutor();
thenRun
它是同第一个任务是同一个线程,所以当第一个任务结束后,它才会开始执行任务。thenRunAsync
它则是不一样的,如果我传入我自定义的线程池,它就是放入我们自定义的线程池进行运行,如果传线程池这个参数的话,就是默认使用ForkJoin线程池
之后的类比区别也是同样的,总共是三组这样的方法。
3.3、thenApply和thenApplyAsync
thenApply 和 thenApplyAsync 让线程成为了一种串行化的关系,第一个任务执行完的返回值会作为第二个的任务的入参.
案例的话,比较简单.
package com.nzc;
import java.util.concurrent.*;
/**
* @description:
* @author: Yihui Wang
* @date: 2022年08月21日 16:32
*/
public class ThenApplyAndAsyncDemo {
public static ExecutorService executorService = new ThreadPoolExecutor(
10,
100,
30L,
TimeUnit.SECONDS,
new LinkedBlockingQueue<Runnable>(100),
Executors.defaultThreadFactory(),
new ThreadPoolExecutor.DiscardOldestPolicy());
/**
* @param args
*/
public static void main(String[] args) throws ExecutionException, InterruptedException {
thenApply();
thenApplyAsync();
}
/**
* 线程串行化
* 1、我进入商场
* 2、找到了我要买的商品
* 3、准备付款结账
* 4、拿着东西回家!!!
* 你会发现这是一步扣一步的在走,其实业务场景中也有很多这样的场景,希望大家在遇到的时候能够想到
*
* @return
* @throws ExecutionException
* @throws InterruptedException
*/
public static String thenApply() throws ExecutionException, InterruptedException {
System.out.println("主线程开始1");
// CompletableFuture<String> future =
// CompletableFuture.supplyAsync(() -> {
// return "我进入商场, ";
// });
// CompletableFuture<String> future1 = future.thenApply(res -> {
// return res += "找到了我要买的商品,";
// });
// future.thenApply(res->{
// return res+="准备付款结账,";
// }).thenApply(res->{
// return res+="拿着东西回家!!!";
// });
// 上面那种分开写和下面这种链式写法是一样的
CompletableFuture<String> future =
CompletableFuture.supplyAsync(() -> {
System.out.println(Thread.currentThread().getId());
return "我进入商场, ";
}).thenApply(res -> {
System.out.println(Thread.currentThread().getId());
return res += "找到了我要买的商品,";
}).thenApply(res -> {
System.out.println(Thread.currentThread().getId());
return res += "准备付款结账,";
}).thenApply(res -> {
return res += "拿着东西回家!!!";
});
String result = future.get();
System.out.println("主线程1结束, 子线程的结果为:" + result);
return result;
}
/**
* 这里因为是异步的原因,它们之间倒是没有一个顺序上的规范
*
* @return
* @throws ExecutionException
* @throws InterruptedException
*/
public static String thenApplyAsync() throws ExecutionException, InterruptedException {
System.out.println("主线程2开始");
CompletableFuture<String> future =
CompletableFuture.supplyAsync(() -> {
return "我进入商场, ";
},executorService).thenApplyAsync(res -> {
System.out.println(Thread.currentThread().getId());
return res += "找到了我要买的商品,";
},executorService).thenApplyAsync(res -> {
try {
System.out.println(Thread.currentThread().getId());
Thread.sleep(1000L);
} catch (InterruptedException e) {
e.printStackTrace();
}
return res += "准备付款结账,";
},executorService).thenApplyAsync(res -> {
System.out.println(Thread.currentThread().getId());
return res += "拿着东西回家!!!";
});
String result = future.get();
System.out.println("主线程2结束, 子线程的结果为:" + result);
return result;
}
}
小结:
thenApply 和 thenApplyAsync 本质上就是将它们串起来了,必须要先完成第一个任务,才能接着做下面的任务
这里的本质区别和前面和之前说的还是一样
但是你如果仔细看了案例代码,你会发现我在里面打印了线程ID. 并且我在测试的时候,尝试将放入自定义线程池和不放入两种情况,实际上 thenApplyAsync 执行的任务线程确实不是一个.
但效果其实和 thenApply 是一样的,都需要等待上一个任务完成。
注意我说的是效果,并非是内部的执行机制。再说就又得进去看源码了...
3.4、thenAccept 和 thenAcceptAsync
如果你不想从你的回调函数中返回任何东西,仅仅想在Future完成后运行一些代码片段,你可以使用
thenAccept()
和 thenRun()
方法,这些方法经常在调用链的最末端的最后一个回调函数中使用。
thenAccept
消费处理结果, 接收任务的处理结果,并消费处理,无返回结果。
thenAcceptAsync
则是异步的消费处理结果, 接收任务的处理结果,并消费处理,无返回结果。
package com.nzc;
import java.util.concurrent.*;
import java.util.function.Consumer;
/**
* @description:
* @author: Yihui Wang
* @date: 2022年08月21日 17:21
*/
public class ThenAcceptDemo {
public static ExecutorService executorService = new ThreadPoolExecutor(
10,
100,
30L,
TimeUnit.SECONDS,
new LinkedBlockingQueue<Runnable>(100),
Executors.defaultThreadFactory(),
new ThreadPoolExecutor.DiscardOldestPolicy());
public static void main(String[] args) throws Exception {
thenAccept();
thenAcceptAsync();
}
private static String action1 = "";
public static void thenAccept() {
System.out.println("主线程开始");
CompletableFuture.supplyAsync(() -> {
try {
action1 = "逛jd,想买台电脑~ ";
} catch (Exception e) {
e.printStackTrace();
}
return action1;
}).thenApply(string -> {
return action1 + "选中了,付款,下单成功!!";
}).thenApply(String -> {
return action1 + "等待快递到来";
}).thenAccept((res) -> {
System.out.println("子线程全部处理完成,最后调用了 accept,结果为:" + res);
});
}
private static String action2 = "";
public static void thenAcceptAsync() {
System.out.println("主线程开始");
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
try {
action2 = "逛jd,想买台电脑~ ";
} catch (Exception e) {
e.printStackTrace();
}
return action2;
}).thenApply(string -> {
return action2 + "选中了,付款,下单成功!!";
}).thenApply(String -> {
return action2 + "等待快递到来";
});
// 这里不采用链式写法,是因为thenAcceptAsync 无返回值,
// 第二个thenAcceptAsync 连接在第一个thenAcceptAsync后,会没有入参值
// 就都拿出来了。
future.thenAcceptAsync((res) -> {
System.out.println("线程ID"+Thread.currentThread().getId()+"拿到依任务一二的返回结果,===>异步的执行任务三,晚饭时间了,打算一边看电影");
},executorService);
future.thenAcceptAsync((res) -> {
System.out.println("线程ID"+Thread.currentThread().getId()+"拿到依任务一二的返回结果,===>异步的执行任务四,一边干饭~");
},executorService);
}
}
thenAcceptAsync
也是我们今天文章开头中用到的,异步编排式的组合视图结果集。
这一部分平时用的倒是不少,也比较方便~
上面说了这么多,但是万一我们在执行某个任务的时候出现异常该如何处理呢?
别慌,它也封装好了的。
3.5、exceptionally 和 handle
exceptionally
异常处理,出现异常时触发,可以回调给你一个从原始Future
中生成的错误恢复的机会。你可以在这里记录这个异常并返回一个默认值。
一般而言,exceptionally
都是写到方法调用的末尾,以来出来中间过程中会出现的异常。
另外就是 handle 也可以用来处理异常。
public class ExceptionallyDemo {
public static void main(String[] args) throws Exception{
System.out.println("主线程开始");
CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
int i= 1/0;
System.out.println("子线程执行中");
return i;
}).exceptionally(ex -> {
System.out.println(ex.getMessage());
return -1;
});
System.out.println(future.get());
}
}
//主线程开始
//java.lang.ArithmeticException: / by zero
//-1
public static void main(String[] args) throws Exception {
CompletableFuture<Integer> future2 = CompletableFuture.supplyAsync(() -> {
System.out.println("任务开始");
int i = 0 / 1;
return i;
}).handle((i, ex) -> {
System.out.println("进入 handle 方法");
if (ex != null) {
System.out.println("发生了异常,内容为:" + ex.getMessage());
return -1;
} else {
System.out.println("正常完成,内容为: " + i);
return i;
}
});
}
handle是有入参和带返回值的,入参是之前任务执行的结果。
当然一切具体的使用还是要看业务场景啦
3.6、结果合并
thenCompose
合并两个有依赖关系的 CompletableFutures
的执行结果,有入参有返回值。
它的入参是第一个future
和第一二两个的任何的返回结果。
thenAcceptBoth
则是会将两个任务的执行结果作为方法入参,传递到指定方法中,但无返回值
runAfterBoth
则是不会把执行结果当做方法入参,也没有返回值。
package com.nzc;
import java.util.WeakHashMap;
import java.util.concurrent.*;
/**
* @description:
* @author: Yihui Wang
* @date: 2022年08月21日 17:53
*/
public class ThenCombineDemo {
public static void main(String[] args) throws Exception {
test();
}
private static Integer num = 10;
public static void test() throws Exception {
System.out.println("主线程开始");
//第一步加 10
CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
System.out.println("第一个任务:让num+10;任务开始");
num += 10;
return num;
});
CompletableFuture<String > future1 = CompletableFuture.supplyAsync(() -> {
System.out.println("第二个任务:让num+1;任务开始");
return num + 1;
//它的入参是第一个future和第一二两个的任何的返回结果。
}).thenCombine(future,(w,s)->{
System.out.println("任务一的结果==>"+w);
System.out.println("任务二的结果==>"+s);
return "我是两个任务的合并"+(w+s);
});
System.out.println(future.get());
System.out.println(future1.get());
}
}
/**
* 主线程开始
* 第一个任务:让num+10;任务开始
* 第二个任务:让num+1;任务开始
* 任务一的结果==>21
* 任务二的结果==>20
* 20
* 我是两个任务的合并41
*/
thenAcceptBoth
package com.nzc;
import java.util.WeakHashMap;
import java.util.concurrent.*;
/**
* @description:
* @author: Yihui Wang
* @date: 2022年08月21日 17:53
*/
public class ThenCombineDemo {
public static void main(String[] args) throws Exception {
test();
}
private static Integer num = 10;
public static void test() throws Exception {
System.out.println("主线程开始");
//第一步加 10
CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
System.out.println("第一个任务:让num+10;任务开始");
num += 10;
return num;
});
CompletableFuture<Void > future1 = CompletableFuture.supplyAsync(() -> {
System.out.println("第二个任务:让num+1;任务开始");
return num + 1;
}).thenAcceptBoth(future,(w,s)->{
System.out.println("任务一的结果==>"+w);
System.out.println("任务二的结果==>"+s);
System.out.println( "我是两个任务的合并"+(w+s)+"但是我没有返回值");
});
System.out.println("任务一的结果==>"+future.get());
// 不采用链式写法,任务二实际上是有返回值,大家看业务场景写即可
System.out.println("任务二后接上thenAcceptBoth方法的结果==>"+future1.get());
}
}
/**
主线程开始
第一个任务:让num+10;任务开始
第二个任务:让num+1;任务开始
任务一的结果==>21
任务二的结果==>20
我是两个任务的合并41但是我没有返回值
任务一的结果==>20
任务二后接上thenAcceptBoth方法的结果==>null
*/
runAfterBoth
public static void test2(){
//第一步加 10
CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
System.out.println("第一个任务:让num+10;任务开始");
num += 10;
return num;
});
CompletableFuture<Integer> future2 = CompletableFuture.supplyAsync(() -> {
System.out.println("第一个任务:让num+10;任务开始");
num += 10;
return num;
});
future2.runAfterBoth(future,()->{
System.out.println("不会把执行结果当做方法入参,也没有返回值");
});
}
除了这些外,CompletableFuture
还有我之前案例中就已经用到的allof
和anyOf
3.7、allof 合并多个任务结果
allOf
: 一系列独立的 future
任务,等其所有的任务执行完后做一些事情.
public class CompletableFutureDemo9 {
private static Integer num = 10;
public static void main(String[] args) throws Exception{
System.out.println("主线程开始");
List<CompletableFuture> list = new ArrayList<>();
CompletableFuture<Integer> job1 = CompletableFuture.supplyAsync(() -> {
System.out.println("加 10 任务开始");
num += 10;
return num;
});
list.add(job1);
CompletableFuture<Integer> job2 = CompletableFuture.supplyAsync(() -> {
System.out.println("乘以 10 任务开始");
num = num * 10;
return num;
});
list.add(job2);
CompletableFuture<Integer> job3 = CompletableFuture.supplyAsync(() -> {
System.out.println("减以 10 任务开始");
num = num - 10;
return num;
});
list.add(job3);
CompletableFuture<Integer> job4 = CompletableFuture.supplyAsync(() -> {
System.out.println("除以 10 任务开始");
num = num / 10;
return num;
});
list.add(job4);
//多任务合并
List<Integer> collect =
list.stream().map(CompletableFuture<Integer>::join).collect(Collectors.toList());
System.out.println(collect);
}
}
/**主线程开始
乘以 10 任务开始
加 10 任务开始
减以 10 任务开始
除以 10 任务开始
[110, 100, 100, 10]
*/
allof的除了在合并结果时经常用到以外,像我们今天案例它也用到了它的get()方法,在那里使用的作用时,阻塞式的等待所有的任务结束,才结束方法的调用。
3.8、anyof
anyOf
: 只要在多个 future
里面有一个返回,整个任务就可以结束,而不需要等到每一个 future 结束
public class CompletableFutureDemo10 {
private static Integer num = 10;
/**
* 先对一个数加 10,然后取平方
* @param args
*/
public static void main(String[] args) throws Exception{
System.out.println("主线程开始");
CompletableFuture<Integer>[] futures = new CompletableFuture[4];
CompletableFuture<Integer> job1 = CompletableFuture.supplyAsync(() -> {
try{
Thread.sleep(5000);
System.out.println("加 10 任务开始");
num += 10;
return num;
}catch (Exception e){
return 0;
}
});
futures[0] = job1;
CompletableFuture<Integer> job2 = CompletableFuture.supplyAsync(() -> {
try{
Thread.sleep(2000);
System.out.println("乘以 10 任务开始");
num = num * 10;
return num;
}catch (Exception e){
return 1;
}
});
futures[1] = job2;
CompletableFuture<Integer> job3 = CompletableFuture.supplyAsync(() -> {
try{
Thread.sleep(3000);
System.out.println("减以 10 任务开始");
num = num - 10;
return num;
}catch (Exception e){
return 2;
}
});
futures[2] = job3;
CompletableFuture<Integer> job4 = CompletableFuture.supplyAsync(() -> {
try{
Thread.sleep(4000);
System.out.println("除以 10 任务开始");
num = num / 10;
return num;
}catch (Exception e){
return 3;
}
});
futures[3] = job4;
CompletableFuture<Object> future = CompletableFuture.anyOf(futures);
System.out.println(future.get());
}
}
//主线程开始
//乘以 10 任务开始
//100
3.9、注意的小问题
1、一般来讲,如果要使用线程的话,都应该是自定义线程,这点阿里Java开发规范中也有说到。
2、自定义线程池的话,一定要把参数设置合理,这点没啥可说的,都得测,空谈都是大话,线程池的话有一篇美团技术团队的文章,讲的很好。Java线程池实现原理及其在美团业务中的实践
3、今天的案例,我在最后调用了 get()方法,一直阻塞到所有任务完成,所以你在编排的时候,一定要注意你需不需要任务的返回结果,不然很可能会产生一些数据方面问题。
4、关于异常我写到后面心有些浮躁,写的不是非常精细。获取异常信息,future需要获取返回值,才能获取异常信息。
后记
今天最想说的就是 “温故而知新”
这方面的知识在去年,我其实已经学过一遍,但应用场景一少,你就会慢慢忘记它的存在。
另外想要说明的是基础我觉得是十分重要的。
最近在翻阅 Java 8 实战这本书,Lamda表达式一直会写,但是对于那些思想,我一直处于一直很模糊的状态,这次在看书的时候,发现了很多以前不知道的知识,也让自己恍然大悟。
写于 2022 年 8 月 21日下午,作者:宁在春
转载自:https://juejin.cn/post/7134276734915969031