Java并发编程入门(十九)异步任务调度工具CompleteFeature
相关阅读:
1. CompleteFeature简介
CompleteFeature是对Feature的增强,Feature只能处理简单的异步任务,而CompleteFeature可以将多个异步任务进行复杂的组合,支持串行执行,并行执行,And汇聚,Or汇聚,从而能对复杂的关联任务进行调度。
2. CompleteFeature支持的业务场景
2.1. 串行任务
串行任务指任务B要等待任务A执行完成之后才会执行,串行任务有如下属性:
属性 | 描述 |
---|---|
可获取A的结果 | 任务B可获取任务A的执行结果作为参数使用 |
B有返回值 | 如果任务B有返回值,可以将执行结果通过返回值返回 |
可获取A异常 | 任务B可以获取任务A抛出的异常 |
A异常则终止 | 当任务A抛出异常后,程序是否会终止,若会终止,程序将退出,任务B不会执行,否则程序不会退出,继续执行。 |
CompleteFeature支持的串行任务方法如下:
方法 | 可获取A的结果 | B有返回值 | 可获取A异常 | A异常则终止 |
---|---|---|---|---|
thenRun | 否 | 否 | 否 | 是 |
thenApply | 是 | 是 | 否 | 是 |
thenAccept | 是 | 否 | 否 | 是 |
thenCompose | 是 | 是 | 否 | 是 |
whenComplete | 是 | 否 | 是 | 否 |
exceptionally | 否 | 是 | 是 | 否 |
handle | 是 | 是 | 是 | 否 |
总结:
- 任务不会抛出异常就使用前四个方法,否则使用后三个方法。
- exceptionally相当于try {} catch {}的catch部分,whenComplete和handle相当于try {} catch {} finally {} 的catch和finall部分,区别是一个有返回值,一个没有返回值。
- thenApply和thenCompose的区别是,thenCompose在任务B中返回的是CompletableFuture,可参考后面的例子对比差异。
1.2. And汇聚关系
And汇聚关系是指:任务C要等待任务A或任务B都执行完后才执行。CompleteFeature支持此关系的方法如下:
方法 | C接收A或B返回值作为参数 | C有返回值 |
---|---|---|
thenCombine | 是 | 是 |
thenAcceptBoth | 是 | 否 |
runAfterBoth | 否 | 否 |
1.3. Or汇聚关系
Or汇聚关系是指:任务C等待任务A或任务B其中一个执行完后就执行,即C只需等待最先执行完成的任务后就可执行。CompleteFeature支持此关系的方法如下:
方法 | C接收A或B返回值作为参数 | C有返回值 |
---|---|---|
applyToEither | 是 | 是 |
acceptEither | 是 | 否 |
runAfterEither | 否 | 否 |
1.4. 多任务
CompletableFuture提供了两个多任务的方法:
方法 | 描述 |
---|---|
anyOf | 多个任务中的任意一个任务执行完则结束,可以获取到最先执行完的任务的返回值。 |
allOf | 多个任务都执行完后才结束,不能获取到任何一个任务的返回值 |
以上所有方法的返回值都是CompletableFuture,这样就可以继续调用前面描述的方法来进行任务组合,组合出更加复杂的任务处理流程。
1.5. 方法族
以上方法中的最后一个任务都是和前面的任务在一个线程内执行,CompletableFuture中还有一套方法让最后一个任务在新线程中执行,只要在原方法上加上Async后缀则可,例如:
同步 | 异步 |
---|---|
thenApply | thenApplyAsync |
thenAccept | thenAcceptAsync |
thenRun | thenRunAsync |
thenCompose | thenComposeAsync |
具体还有哪些,可参考源码。 |
2. 代码例子
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
public class CompleteFeatureDemo {
public static void main(String[] args) throws ExecutionException, InterruptedException {
simpleTask();
serialTask();
andTask();
orTask();
complexTask();
sleep(2000); // 等待子线程结束
System.out.println("end.");
}
private static void simpleTask() throws ExecutionException, InterruptedException {
// 1. runAsync 执行一个异步任务,没有返回值
CompletableFuture.runAsync(()-> System.out.println("1. runAsync"));
sleep(100);
// 2. supplyAsync 执行一个异步任务,有返回值
CompletableFuture<String> future = CompletableFuture.supplyAsync(()->{
System.out.println("2.1 supplyAsync task be called");
sleep(100);
return "2.2 supplyAsync return value";
});
System.out.println("2.3 after supplyAsync");
System.out.println(future.get());
sleep(200);
}
private static void serialTask() throws ExecutionException, InterruptedException {
// 3. thenRun
CompletableFuture.supplyAsync(()->{
System.out.println("3.1 supplyAsync begin");
sleep(100); // 用于证明B等待A结束才会执行
return "3.2 supplyAsync end";
}).thenRun(()->{
System.out.println("3.3 thenRun be called.");
});
sleep(200);
// 4. thenApply
CompletableFuture<String> future4 = CompletableFuture.supplyAsync(()->{
sleep(100);
return "4.1 apple";
}).thenApply(returnVal->{
return "4.2 " + returnVal + "-苹果";
});
System.out.println("4.3 get: " + future4.get());
sleep(100);
// 5. thenAccept
CompletableFuture.supplyAsync(()->{
sleep(100);
return "5.1 orange";
}).thenAccept(returnVal->{
System.out.println("5.2 " + returnVal + "-桔子");
});
sleep(100);
// 6. thenCompose
CompletableFuture<String> future6 = CompletableFuture.supplyAsync(()->{
sleep(100);
return "6.1 apple";
}).thenCompose((returnVal)->{
return CompletableFuture.supplyAsync(()->{
return "6.2 " + returnVal;
});
});
System.out.println("6.3 get: " + future6.get());
sleep(100);
// 7. whenComplete
CompletableFuture.supplyAsync(()->{
sleep(100);
if (true) { //修改boolean值观察不同结果
return "7.1 return value for whenComplete";
} else {
throw new RuntimeException("7.2 throw exception for whenComplete");
}
}).whenComplete((returnVal, throwable)->{
System.out.println("7.2 returnVal: " + returnVal); // 可以直接拿到返回值,不需要通过future.get()得到
System.out.println("7.3 throwable: " + throwable); // 异步任务抛出异常,并不会因为异常终止,而是会走到这里,后面的代码还会继续执行
});
sleep(100);
// 8. exceptionally
CompletableFuture<String> future8 = CompletableFuture.supplyAsync(()->{
sleep(100);
if (false) { //修改boolean值观察不同结果
return "8.1 return value for exceptionally";
} else {
throw new RuntimeException("8.2 throw exception for exceptionally");
}
}).exceptionally(throwable -> {
throwable.printStackTrace();
return "8.3 return value after dealing exception.";
});
System.out.println("8.4 get: " + future8.get());
sleep(100);
// 9. handle
CompletableFuture<String> future9 = CompletableFuture.supplyAsync(()->{
sleep(100);
if (false) { //修改boolean值观察不同结果
return "9.1 return value for handle";
} else {
throw new RuntimeException("9.2 throw exception for handle");
}
}).handle((retuanVal, throwable)->{
System.out.println("9.3 retuanVal: " + retuanVal);
System.out.println("9.4 throwable: " + throwable);
return "9.5 new return value.";
});
System.out.println("9.6 get: " + future9.get());
sleep(100);
}
private static void andTask() throws ExecutionException, InterruptedException {
// 10. thenCombine 合并结果
CompletableFuture<String> future10 = CompletableFuture.supplyAsync(()->{
sleep(100);
return "10.1 TaskA return value";
}).thenCombine(CompletableFuture.supplyAsync(()->{
sleep(100);
return "10.2 TaskB return value";
}), (taskAReturnVal, taskBReturnVal) -> taskAReturnVal + taskBReturnVal);
System.out.println("10.3 get: " + future10.get());
sleep(200);
// 11. thenAcceptBoth
CompletableFuture.supplyAsync(()->{
sleep(100);
return "11.1 TaskA return value";
}).thenAcceptBoth(CompletableFuture.supplyAsync(()->{
sleep(100);
return "11.2 TaskB return value";
}), (taskAReturnVal, taskBReturnVal) -> System.out.println(taskAReturnVal + taskBReturnVal));
sleep(200);
// 12. runAfterBoth A,B都执行完后才执行C,C不关心前面任务的返回值
CompletableFuture.supplyAsync(()->{
sleep(200); // 虽然这个任务先执行,但是执行时间比下面的任务长,所以最后会使用下面的返回结果
System.out.println("12.1 TaskA be called.");
return "12.2 TaskA return value";
}).runAfterBoth(CompletableFuture.supplyAsync(()->{
sleep(100);
System.out.println("12.3 TaskB be called.");
return "12.4 TaskB return value";
}), () -> System.out.println("12.5 TaskC be called."));
sleep(300);
}
private static void orTask() throws ExecutionException, InterruptedException {
// 13. applyToEither 使用A,B两个异步任务优先返回的结果
CompletableFuture<String> future13 = CompletableFuture.supplyAsync(()->{
sleep(200); // 虽然这个任务先执行,但是执行时间比下面的任务长,所以最后会使用下面的返回结果
System.out.println("13.1 TaskA be called"); // 用于证明拿到B的结果后,A还会继续执行,并不会终止
return "13.2 TaskA return value";
}).applyToEither(CompletableFuture.supplyAsync(()->{
sleep(100);
return "13.3 TaskB return value";
}), (returnVal) -> returnVal);
System.out.println("13.4 get: " + future13.get());
sleep(300);
// 14. acceptEither 使用A,B两个异步任务优先返回的结果
CompletableFuture.supplyAsync(()->{
sleep(200); // 虽然这个任务先执行,但是执行时间比下面的任务长,所以最后会使用下面的返回结果
return "14.1 TaskA return value";
}).acceptEither(CompletableFuture.supplyAsync(()->{
sleep(100);
return "14.2 TaskB return value";
}), (returnVal) -> System.out.println(returnVal));
sleep(300);
// 15. runAfterEither A,B任意一个执行完后就执行C,C不关心前面任务的返回值
CompletableFuture.supplyAsync(()->{
sleep(200); // 虽然这个任务先执行,但是执行时间比下面的任务长,所以最后会使用下面的返回结果
System.out.println("15.1 TaskA be called.");
return "15.2 TaskA return value";
}).runAfterEither(CompletableFuture.supplyAsync(()->{
sleep(100);
System.out.println("15.3 TaskB be called.");
return "15.4 TaskB return value";
}), () -> System.out.println("15.5 TaskC be called."));
sleep(300);
}
private static void complexTask() throws ExecutionException, InterruptedException {
// 16. anyOf
CompletableFuture future16 = CompletableFuture.anyOf(CompletableFuture.supplyAsync(()->
{
sleep(300);
System.out.println("16.1 TaskA be called.");
return "16.2 TaskA return value.";
}), CompletableFuture.supplyAsync(()->{
sleep(100);
System.out.println("16.3 TaskB be called.");
return "16.4 TaskB return value.";
}));
System.out.println("16.5 get: " + future16.get());
sleep(400);
// 17. allOf
CompletableFuture<Void> future17 = CompletableFuture.allOf(CompletableFuture.supplyAsync(()->
{
sleep(300);
System.out.println("17.1 TaskA be called.");
return "17.2 TaskA return value.";
}), CompletableFuture.supplyAsync(()->{
sleep(100);
System.out.println("17.3 TaskB be called.");
return "17.4 TaskB return value.";
}));
System.out.println("17.5 get: " + future17.get()); // allOf没有返回值
}
private static void sleep(long millis) {
try {
Thread.sleep(millis);
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
}
}
}
输出日志:
1. runAsync
2.3 after supplyAsync
2.1 supplyAsync task be called
2.2 supplyAsync return value
3.1 supplyAsync begin
3.4
3.5 xxx
3.6 AAA
3.3 thenRun be called.
4.3 get: 4.2 4.1 apple-苹果
5.2 5.1 orange-桔子
6.3 get: 6.2 6.1 apple
7.2 returnVal: 7.1 return value for whenComplete
7.3 throwable: null
java.util.concurrent.CompletionException: java.lang.RuntimeException: 8.2 throw exception for exceptionally
at java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:273)
at java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:280)
at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1584)
at java.util.concurrent.CompletableFuture$AsyncSupply.exec(CompletableFuture.java:1574)
at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289)
at java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056)
at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1689)
at java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:157)
Caused by: java.lang.RuntimeException: 8.2 throw exception for exceptionally
at com.javageektour.hikaricp.demo.CompleteFeatureDemo.lambda$serialTask$14(CompleteFeatureDemo.java:101)
at com.javageektour.hikaricp.demo.CompleteFeatureDemo?Lambda$14/769287236.get(Unknown Source)
at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1582)
... 5 more
8.4 get: 8.3 return value after dealing exception.
9.3 retuanVal: null
9.4 throwable: java.util.concurrent.CompletionException: java.lang.RuntimeException: 9.2 throw exception for handle
9.6 get: 9.5 new return value.
10.3 get: 10.1 TaskA return value10.2 TaskB return value
11.1 TaskA return value11.2 TaskB return value
12.3 TaskB be called.
12.1 TaskA be called.
12.5 TaskC be called.
13.4 get: 13.3 TaskB return value
13.1 TaskA be called
14.2 TaskB return value
15.3 TaskB be called.
15.5 TaskC be called.
15.1 TaskA be called.
16.3 TaskB be called.
16.5 get: 16.4 TaskB return value.
16.1 TaskA be called.
17.3 TaskB be called.
17.1 TaskA be called.
17.5 get: null
end.
3. 总结
CompleteFeature支持复杂的异步任务调度,支持多个任务串行,并行,汇聚,当多个异步任务有依赖关系时,通过CompleteFeature来调度任务可以大大简化代码和提高执行性能。
end.
<--阅过留痕,左边点赞! 
转载自:https://juejin.cn/post/6844904130570371079