JUC(3) : CompletableFuture 异步编排
前言
在 Java8 中,新增了 CompletableFuture 异步编程工具类。它与我们之前使用的 Future 接口有何区别呢?本文将对这个重要的 CompletableFuture 类进行分析学习。
一、Future 接口及改进
Future 接口(FutureTask 实现类),定义了操作异步任务执行的一些方法。例如获取异步任务的执行结果、取消任务的执行、判断任务是否被取消、判断任务执行是否完毕等方法。
Modifier and Type | Method | Description |
---|---|---|
boolean | cancel(boolean mayInterruptIfRunning) | 尝试取消执行此任务。 |
V | get() | 等待计算完成,然后检索其结果。 |
V | get(long timeout, TimeUnit unit) | 如果需要等待最多在给定的时间计算完成,然后检索其结果(如果可用)。 |
boolean | isCancelled() | 如果此任务在正常完成之前被取消,则返回 true 。 |
boolean | isDone() | 返回 true 如果任务已完成。 |
我们通常使用 Future 接口进行异步编程时,获取结构都需要阻塞获取。主线程中会阻塞,其实更好的结果是让它执行完结果后通知我们获取。
总结:future 接口可以为主线程开一个分支任务,专门为主线程处理耗时和费力的复杂业务。
1.1 FutureTask 简介
由关系图可以看到,FutureTask 除了实现 Future 接口外,还实现了 Runnable 接口,因此,FutureTask 可以交给 Executor 执行。也可以由调用线程执行执行 FutureTask.run() 方法。
入门案例:
public class FutureTest {
public static void main(String[] args) throws ExecutionException, InterruptedException {
FutureTask<String> stringFutureTask = new FutureTask<>(new MyThread2());
Thread thread = new Thread(stringFutureTask);
thread.start();
String s = stringFutureTask.get();
System.out.println(s);
}
}
class MyThread2 implements Callable<String>{
@Override
public String call() throws Exception {
System.out.println("come in");
return "hello";
}
}
// come in
// hello
future + 线程池异步多线程任务配合,能显著提高程序的执行效率。
public static void main(String[] args) throws ExecutionException, InterruptedException {
ExecutorService threadPool = Executors.newFixedThreadPool(3);
FutureTask<String> stringFutureTask = new FutureTask<String>(()->{
TimeUnit.MICROSECONDS.sleep(500);
return "aaa";
});
threadPool.submit(stringFutureTask);
FutureTask<String> stringFutureTask2 = new FutureTask<String>(()->{
TimeUnit.MICROSECONDS.sleep(300);
return "bbb";
});
threadPool.submit(stringFutureTask2);
TimeUnit.MICROSECONDS.sleep(300);
threadPool.shutdown();
}
注意:
- 1、futureTask.get() 会造成阻塞,一般建议放在程序后面,不管你是否计算完成,容易造成程序阻塞。
- 2、假如不愿意等待很长时间,可以加个到期时间,过期不候。stringFutureTask.get(10,TimeUnit.SECONDS);
1.2 异步获取
如果不想阻塞主线程,异步获取结果,通常会以轮询的方式获取结果,不会造成主线程的阻塞,但是轮询的方式会耗费 CPU 资源,调用 isDone() 进行轮询。
public class CompletableFutureDemo2 {
public static void main(String[] args) throws ExecutionException, InterruptedException {
FutureTask<String> futureTask = new FutureTask<>(() -> {
System.out.println("-----come in FutureTask");
try { TimeUnit.SECONDS.sleep(3); } catch (InterruptedException e) { e.printStackTrace(); }
return ""+ ThreadLocalRandom.current().nextInt(100);
});
new Thread(futureTask,"t1").start();
System.out.println(Thread.currentThread().getName()+"\t"+"线程完成任务");
/**
* 用于阻塞式获取结果,如果想要异步获取结果,通常都会以轮询的方式去获取结果
*/
while (true){
if(futureTask.isDone()){
System.out.println(futureTask.get());
break;
}
}
}
}
总结:
- future 对于结果的获取不是很友好,只能通过阻塞或轮询的方式得到任务的结果’
1.3 对 Future 接口的改进
1.3.1 CompletableFuture 类
虽然,Future 接口提供了异步任务的能力,但是对于结果的获取非常不方便,阻塞的方式违背了异步编程的初衷,轮询的方式又会耗费无谓的 CPU 资源,且不能及时的得到计算结果,为什么不能使用观察者设计模式当计算结果完成及时通知监听者呢?
很多语言,比如 Node.js 采用回调的方式实现异步编程。 Java 的一些框架,比如 Netty,也提供了通用的扩展 Future,而作为正统的 Java 类库,Java8 新增了一个包含 50 个方法左右的类 ==> CompletableFuture。提供了非常强大的 Future 扩展功能,提供了函数式编程的能力,可以通过回调的方式处理计算结果。
它实现了 future 和 completionStage 两个接口。 提供了观察者模式,可以让任务执行完成后通知监听的一方。
- 异步任务结束时,会自动回调某个方法
- 主线程设置回调后,不再关心异步任务的执行,异步任务之间可以顺序执行。
- 异步任务出错,会自动回调某个对象的方法
1.3.2 CompletionStage
代表异步计算过程的某一阶段,一个阶段完成以后可能会触发另外一个阶段,有些类似 Linux 系统的管道分隔符传参数。 一个阶段的执行可能是被单个阶段的完成触发,也可能是由多个阶段一起触发
CompletionStage 的接口方法可以从多种角度进行分类,从最宏观的横向划分,CompletionStage 的接口主要分为三类:
- 供给型 : 就是用上一个阶段的结果作为指定函数的参数执行函数产生新的结果。这一类接口方法名中基本都有 apply字样。
- 消耗型:就是用上一个阶段的结果作为指定操作,但不对阶段结果产生影响。这一类接口方法名中基本都有 accept 字样。
- 不消耗不产出:就是不依据上一个阶段的执行结果,主要上一个阶段完成,就执行指定的操作,且不对阶段的结果产生影响,基本都有 run 字样。
CompletionStage
接口定义了任务编排的方法,执行某一阶段,可以向下执行后续阶段。异步执行的,默认线程池是ForkJoinPool.commonPool()
,但为了业务之间互不影响,且便于定位问题,强烈推荐使用自定义线程池。
二、使用介绍
CompletableFuture
提供了四个静态方法来创建一个异步操作:
public static CompletableFuture<Void> runAsync(Runnable runnable)
public static CompletableFuture<Void> runAsync(Runnable runnable, Executor executor)
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier)
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier, Executor executor)
- 1、runXxx 都是没有返回结果的,supplyXxx 都是可以获取返回值的;
- 2、可以传入自定义的线程池,否则就用默认的线程池:ForkJoinPool.commonPool() ;
- 3、Async 代表异步的意思。
2.1 获得结果和触发计算
- 获得结果
- get() :不见不散
- get(long timeout, TimeUnit unit):过时不候
- join():约等于get,不会抛出异常
- getNow(T valueIfAbsent):立即获得结果,计算完返回计算的结果,否则返回设定的指。
public class CompletableFutureDemo2{
public static void main(String[] args) throws ExecutionException, InterruptedException{
CompletableFuture<Integer> completableFuture = CompletableFuture.supplyAsync(() -> {
try { TimeUnit.SECONDS.sleep(1); }
catch (InterruptedException e) { e.printStackTrace(); }
return 533;
});
//去掉注释上面计算没有完成,返回444
//开启注释上面计算完成,返回计算结果
try { TimeUnit.SECONDS.sleep(2); }
catch (InterruptedException e) { e.printStackTrace(); }
System.out.println(completableFuture.getNow(444));
}
}
public class CompletableFutureDemo2{
public static void main(String[] args) throws ExecutionException, InterruptedException{
System.out.println(CompletableFuture.supplyAsync(() -> "abc")
.thenApply(r -> r + "123").join());
}
}
- 主动触发计算
打断 get 方法立即获得返回括号值。
// 是否打断get方法立即返回括号值
public boolean complete(T value)
public class CompletableFutureDemo4{
public static void main(String[] args) throws ExecutionException, InterruptedException{
CompletableFuture<Integer> completableFuture = CompletableFuture.supplyAsync(() -> {
try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); }
return 533;
});
//注释掉暂停线程,get还没有算完只能返回complete方法设置的444;暂停2秒钟线程,异步线程能够计算完成返回get
try { TimeUnit.SECONDS.sleep(2); } catch (InterruptedException e) { e.printStackTrace(); }
//当调用CompletableFuture.get()被阻塞的时候,complete方法就是结束阻塞并get()获取设置的complete里面的值.
System.out.println(completableFuture.complete(444)+"\t"+completableFuture.get());
}
}
2.2 对计算结果进行处理
2.2.1 thenApply
计算结果存在依赖关系,这两个线程串行化,由于依赖关系,当前步骤有异常就叫停,主线程如果结束,默认线程池就会关闭。
public class CompletableFutureDemo4{
public static void main(String[] args) throws ExecutionException, InterruptedException{
//当一个线程依赖另一个线程时用 thenApply 方法来把这两个线程串行化,
CompletableFuture.supplyAsync(() -> {
//暂停几秒钟线程
try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); }
System.out.println("111");
return 1024;
}).thenApply(f -> {
System.out.println("222");
return f + 1;
}).thenApply(f -> {
//int age = 10/0; // 异常情况:那步出错就停在那步。
System.out.println("333");
return f + 1;
}).whenCompleteAsync((v,e) -> {
System.out.println("*****v: "+v);
}).exceptionally(e -> {
e.printStackTrace();
return null;
});
System.out.println("-----主线程结束,END");
// 主线程不要立刻结束,否则CompletableFuture默认使用的线程池会立刻关闭:
try { TimeUnit.SECONDS.sleep(2); }
catch (InterruptedException e) { e.printStackTrace(); }
}
}
2.2.2 handle
有异常也可以往下一步走,根据带的异常可以进一步处理。
// 有异常也可以往下一步走,根据带的异常参数可以进一步处理
public class CompletableFutureDemo4{
public static void main(String[] args) throws ExecutionException, InterruptedException{
//当一个线程依赖另一个线程时用 handle 方法来把这两个线程串行化,
// 异常情况:有异常也可以往下一步走,根据带的异常参数可以进一步处理
CompletableFuture.supplyAsync(() -> {
//暂停几秒钟线程
try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); }
System.out.println("111");
return 1024;
}).handle((f,e) -> {
int age = 10/0;
System.out.println("222");
return f + 1;
}).handle((f,e) -> {
System.out.println("333");
return f + 1;
}).whenCompleteAsync((v,e) -> {
System.out.println("*****v: "+v);
}).exceptionally(e -> {
e.printStackTrace();
return null;
});
System.out.println("-----主线程结束,END");
// 主线程不要立刻结束,否则CompletableFuture默认使用的线程池会立刻关闭:
try { TimeUnit.SECONDS.sleep(2); } catch (InterruptedException e) { e.printStackTrace(); }
}
}
2.3 对于结算结果进行消费
与结果处理和结果转换犀利函数返回一个新的 CompletableFuture 不同,结果消费系列函数只对结果执行 Action,而不返回新的计算值。 根据对结果的处理方式,结果消费函数又可以分为三大类:
- thenAccept():对单个结果进行消费
- thenAcceptBoth():对两个结果进行消费
- thenRun():不关心结果,只对结果执行 Action
2.3.1 thenAccept
观察该函数的参数类型可知,它们是函数式接口 Consumer,这个接口就只有输入,没有返回值。 常用方法:
public CompletionStage<Void> thenAccept(Consumer<? super T> action);
public CompletionStage<Void> thenAcceptAsync(Consumer<? super T> action);
任务A 执行完执行 B,B 需要 A 的结果,但是任务 B 无返回值。
CompletableFuture<Void> future = CompletableFuture
.supplyAsync(() -> {
int number = new Random().nextInt(10);
System.out.println("第一次运算:" + number);
return number;
}).thenAccept(number ->
System.out.println("第二次运算:" + number * 5));
2.3.2 thenAcceptBoth
这个函数的作用是,当两个 CompletionStage 都正常完成计算的时候,就会执行提供的 action 消费两个异步的结果。
具体使用:
CompletableFuture<Integer> futrue1 = CompletableFuture.supplyAsync(new Supplier<Integer>() {
@Override
public Integer get() {
int number = new Random().nextInt(3) + 1;
try {
TimeUnit.SECONDS.sleep(number);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("任务1结果:" + number);
return number;
}
});
CompletableFuture<Integer> future2 = CompletableFuture.supplyAsync(new Supplier<Integer>() {
@Override
public Integer get() {
int number = new Random().nextInt(3) + 1;
try {
TimeUnit.SECONDS.sleep(number);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("任务2结果:" + number);
return number;
}
});
futrue1.thenAcceptBoth(future2, new BiConsumer<Integer, Integer>() {
@Override
public void accept(Integer x, Integer y) {
System.out.println("最终结果:" + (x + y));
}
});
2.3.3 thenRun
thenRun 也是对线程任务结果的一种消费函数,与 thenAccept 不同的是,thenRun 会在上一阶段 Runable。
任务A 执行完 B,并且 B 不需要 A 的结果。
常用方法:
public CompletionStage<Void> thenRun(Runnable action);
public CompletionStage<Void> thenRunAsync(Runnable action);
具体使用:
CompletableFuture<Void> future = CompletableFuture.supplyAsync(() -> {
int number = new Random().nextInt(10);
System.out.println("第一阶段:" + number);
return number;
}).thenRun(() ->
System.out.println("thenRun 执行"));
2.4 对计算速度进行选用
线程交互指将两个线程任务获取结果的速度相对比较,按一定的规则进行下一步处理。
2.4.1 applyToEither
一句话:谁快用谁。
两个线程任务相比较,先获得执行结果的,就对该结果进行下一步的转化操作。两个任务有一个执行完成,获取它的返回值,处理任务并有新的返回值。
常用方法:
public <U> CompletionStage<U> applyToEither(CompletionStage<? extends T> other,Function<? super T, U> fn);
public <U> CompletionStage<U> applyToEitherAsync(CompletionStage<? extends T> other,Function<? super T, U> fn);
具体使用:
public class CompletableFutureDemo5{
public static void main(String[] args) throws ExecutionException, InterruptedException{
CompletableFuture<Integer> completableFuture1 = CompletableFuture.supplyAsync(() -> {
System.out.println(Thread.currentThread().getName() + "\t" + "---come in ");
//暂停几秒钟线程
try { TimeUnit.SECONDS.sleep(2); } catch (InterruptedException e) { e.printStackTrace(); }
return 10;
});
CompletableFuture<Integer> completableFuture2 = CompletableFuture.supplyAsync(() -> {
System.out.println(Thread.currentThread().getName() + "\t" + "---come in ");
try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); }
return 20;
});
CompletableFuture<Integer> thenCombineResult = completableFuture1.applyToEither(completableFuture2,f -> {
System.out.println(Thread.currentThread().getName() + "\t" + "---come in ");
return f + 1;
});
System.out.println(Thread.currentThread().getName() + "\t" + thenCombineResult.get());
}
}
2.4.2 acceptEither
两个线程任务相比较,先获得执行结果的,就对该结果进行下一步的消费操作。两个任务有一个执行完成,获取它的返回值,处理任务,没有新的返回值。
常用方法:
public CompletionStage<Void> acceptEither(CompletionStage<? extends T> other,Consumer<? super T> action);
public CompletionStage<Void> acceptEitherAsync(CompletionStage<? extends T> other,Consumer<? super T> action);
具体使用:
CompletableFuture<Integer> future1 = CompletableFuture.supplyAsync(new Supplier<Integer>() {
@Override
public Integer get() {
int number = new Random().nextInt(10) + 1;
try {
TimeUnit.SECONDS.sleep(number);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("第一阶段:" + number);
return number;
}
});
CompletableFuture<Integer> future2 = CompletableFuture.supplyAsync(new Supplier<Integer>() {
@Override
public Integer get() {
int number = new Random().nextInt(10) + 1;
try {
TimeUnit.SECONDS.sleep(number);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("第二阶段:" + number);
return number;
}
});
future1.acceptEither(future2, new Consumer<Integer>() {
@Override
public void accept(Integer number) {
System.out.println("最快结果:" + number);
}
});
2.4.3 runAfterEither
两个任务有一个执行完成,不需要获取 future 的结果,处理任务,也没有返
回值。
常用方法:
public CompletionStage<Void> runAfterEither(CompletionStage<?> other,Runnable action);
public CompletionStage<Void> runAfterEitherAsync(CompletionStage<?> other,Runnable action);
具体使用:
CompletableFuture<Integer> future1 = CompletableFuture.supplyAsync(new Supplier<Integer>() {
@Override
public Integer get() {
int number = new Random().nextInt(5);
try {
TimeUnit.SECONDS.sleep(number);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("任务1结果:" + number);
return number;
}
});
CompletableFuture<Integer> future2 = CompletableFuture.supplyAsync(new Supplier<Integer>() {
@Override
public Integer get() {
int number = new Random().nextInt(5);
try {
TimeUnit.SECONDS.sleep(number);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("任务2结果:" + number);
return number;
}
});
future1.runAfterEither(future2, new Runnable() {
@Override
public void run() {
System.out.println("已经有一个任务完成了");
}
}).join();
2.5 对计算结果进行合并
2.5.1 两个任务都要完成
-
thenCombine:组合两个 future,获取两个 future 的返回结果,并返回当前任务的返回值
-
thenAcceptBoth:组合两个 future,获取两个 future 任务的返回结果,然后处理任务,没有返回值。
-
runAfterBoth:组合两个 future,不需要获取 future 的结果,只需两个 future 处理完任务后,处理该任务
CompletableFuture<Integer> future1 = CompletableFuture
.supplyAsync(new Supplier<Integer>() {
@Override
public Integer get() {
int number = new Random().nextInt(10);
System.out.println("任务1结果:" + number);
return number;
}
});
CompletableFuture<Integer> future2 = CompletableFuture
.supplyAsync(new Supplier<Integer>() {
@Override
public Integer get() {
int number = new Random().nextInt(10);
System.out.println("任务2结果:" + number);
return number;
}
});
CompletableFuture<Integer> result = future1
.thenCombine(future2, new BiFunction<Integer, Integer, Integer>() {
@Override
public Integer apply(Integer x, Integer y) {
return x + y;
}
});
System.out.println("组合后结果:" + result.get());
2.5.2 多任务组合
- allOf:等待所有任务完成
Random random = new Random();
CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> {
try {
TimeUnit.SECONDS.sleep(random.nextInt(5));
} catch (InterruptedException e) {
e.printStackTrace();
}
return "hello";
});
CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> {
try {
TimeUnit.SECONDS.sleep(random.nextInt(1));
} catch (InterruptedException e) {
e.printStackTrace();
}
return "world";
});
CompletableFuture<Object> result = CompletableFuture.anyOf(future1, future2);
- anyOf:只要有一个任务完成
public static void main(String[] args) {
List<CompletableFuture> futures = Arrays.asList(CompletableFuture.completedFuture("hello"),
CompletableFuture.completedFuture(" world!"),
CompletableFuture.completedFuture(" hello"),
CompletableFuture.completedFuture("java!"));
final CompletableFuture<Void> allCompleted = CompletableFuture.allOf(futures.toArray(new CompletableFuture[]{}));
allCompleted.thenRun(() -> {
futures.stream().forEach(future -> {
try {
System.out.println("get future at:"+System.currentTimeMillis()+", result:"+future.get());
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
});
});
}
测试结果:
get future at:1568892339473, result:hello
get future at:1568892339473, result: world!
get future at:1568892339473, result: hello
get future at:1568892339473, result:java!
三、案例
场景:例如查询商品详情页面的逻辑比较复杂
// 1. 获取sku的基本信息 0.5s
// 2. 获取sku的图片信息 0.5s
// 3. 获取sku的促销信息 TODO 1s
// 4. 获取spu的所有销售属性 1s
// 5. 获取规格参数组及组下的规格参数 TODO 1.5s
// 6. spu详情 TODO 1s
.........
那么一步步的执行下去,用户需要6.5秒后才能看到详情页的内容,很显然不能接受的,如果有多个线程同时完成这6步操作,也许只需要 1.5s 即可完成内容。
@Override
public SkuItemVo item(Long skuId) throws ExecutionException, InterruptedException {
SkuItemVo skuItemVo = new SkuItemVo();
CompletableFuture<SkuInfoEntity> infoFuture = CompletableFuture.supplyAsync(() -> {
//1、sku基本信息的获取 pms_sku_info
SkuInfoEntity info = this.getById(skuId);
skuItemVo.setInfo(info);
return info;
}, executor);
CompletableFuture<Void> saleAttrFuture = infoFuture.thenAcceptAsync((res) -> {
//3、获取spu的销售属性组合
List<SkuItemSaleAttrVo> saleAttrVos = skuSaleAttrValueService.getSaleAttrBySpuId(res.getSpuId());
skuItemVo.setSaleAttr(saleAttrVos);
}, executor);
CompletableFuture<Void> descFuture = infoFuture.thenAcceptAsync((res) -> {
//4、获取spu的介绍 pms_spu_info_desc
SpuInfoDescEntity spuInfoDescEntity = spuInfoDescService.getById(res.getSpuId());
skuItemVo.setDesc(spuInfoDescEntity);
}, executor);
CompletableFuture<Void> baseAttrFuture = infoFuture.thenAcceptAsync((res) -> {
//5、获取spu的规格参数信息
List<SpuItemAttrGroupVo> attrGroupVos = attrGroupService.getAttrGroupWithAttrsBySpuId(res.getSpuId(), res.getCatalogId());
skuItemVo.setGroupAttrs(attrGroupVos);
}, executor);
//2、sku的图片信息 pms_sku_images
CompletableFuture<Void> imageFuture = CompletableFuture.runAsync(() -> {
List<SkuImagesEntity> imagesEntities = skuImagesService.getImagesBySkuId(skuId);
skuItemVo.setImages(imagesEntities);
}, executor);
CompletableFuture.allOf(saleAttrFuture,descFuture,baseAttrFuture,imageFuture,seckillFuture).get();
return skuItemVo;
}
转载自:https://juejin.cn/post/7129283625362653192