likes
comments
collection
share

CompletableFuture中的CompletionException异常真是坑到我了!

作者站长头像
站长
· 阅读数 32

开发环境

  • JDK 17
  • Idea 2022

熟悉JDK 8版本的同学,大概率都使用过java.util.concurrent.CompletableFuture这个类,有时候在业务服务中你可能需要并行的去执行某些骚操作,那就少不了它的存在。

我在业务中就有大量的需求场景,比如:我需要拉渠道方的订单和订单明细,落地到我司系统中。此时用到了两个接口:订单列表查询订单明细查询,查询列表很简单,一页一页的翻页查询,因为它也只返回了订单号,我举个例子吧!

这里是订单列表查询接口返回的数据结构(Mock数据),一堆的订单号

{
    "data":[
        "125345345345",
        "235894563423",
        "345345345343"
    ]
}

接下来我肯定要根据订单号去查询订单明细的,千万不要下意识的进行for循环一个一个查,那要查到什么时候啊! 这个时候CompletableFuture就派上用场了。

CompletableFuture应用

前提条件,我为了方便使用,我是直接在业务的Base包中,封装了一个通用实现,当然只是适用于我们业务的,需要的同学自取。

注意:下面这段代码没有处理真实的异常,想直接用,再往下看!!!


/**
 * 偷个懒,线程池直接这样写了先,真实业务中不是这样搞的哈!
 */
private final static ExecutorService executorService = Executors.newFixedThreadPool(4);


/**
 * 创建并行任务并执行
 *
 * @param list            数据源
 * @param api             API调用逻辑
 * @param exceptionHandle 异常处理逻辑
 * @param <S>             数据源类型
 * @param <T>             程序返回类型
 * @return 处理结果列表
 */
public <S, T> List<T> parallelFutureJoin(Collection<S> list, Function<S, T> api, BiFunction<Throwable, S, T> exceptionHandle) {
    //规整所有任务
    List<CompletableFuture<T>> collectFuture = list.stream()
            .map(s -> this.createFuture(() -> api.apply(s), e -> exceptionHandle.apply(e, s)))
            .toList();
    //汇总所有任务,并执行join,全部执行完成后,统一返回
    return CompletableFuture.allOf(collectFuture.toArray(new CompletableFuture<?>[]{}))
            .thenApply(f -> collectFuture.stream()
                    .map(CompletableFuture::join)
                    .filter(Objects::nonNull)
                    .collect(Collectors.toList()))
            .join();
}

/**
 * 创建单个CompletableFuture任务
 *
 * @param logic           任务逻辑
 * @param exceptionHandle 异常处理
 * @param <T>             类型
 * @return 任务
 */
public <T> CompletableFuture<T> createFuture(Supplier<T> logic, Function<Throwable, T> exceptionHandle) {
    return CompletableFuture.supplyAsync(logic, executorService).exceptionally(exceptionHandle);
}

利用上面这块封装的代码,完全适用于我在公司的大部分并行业务场景,也确实提升了我Pod节点的CPU利用率。

不过之后有个问题就坑了,你调用外部API的时候,偶尔会失败,那么失败就要重试,这个时候我就需要正确的判断异常并进行重试操作。

先定义个业务异常类

public static class BizApiException extends RuntimeException {
    public BizApiException() {
    }

    public BizApiException(String message) {
        super(message);
    }
}

示例代码

下面的代码只是模拟我在业务端的场景,大家乐呵乐呵就行。

public static void main(String[] args) {
    CompletableFutureDemo f = new CompletableFutureDemo();
    List<Integer> numList = f.parallelFutureJoin(Arrays.asList(1, 2, 3), num -> {
        //模拟API调用
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            //...
        }
        if (num > 2) {
            throw new BizApiException("心别太大");
        }
        return num;
    }, (e, num) -> {
        //异常向你打开了大门
        if (e instanceof BizApiException) {
            System.out.println("业务异常,我在处理数字:" + num + ",异常原因:" + e);
            return -1;
        }
        System.out.println("我异常了,老六,我刚才在处理数字:" + num + ",异常原因:" + e);
        return -1;
    });
    System.out.println(numList);
}

注意:我本来想通过返回的Exception判断是不是BizApiException业务异常的,可惜的是这里的异常类型永远都不会是BizApiException,我输出下内容到控制台,就懂了!

执行代码后,我得到的控制台内容

这个时候其实可以看到拿到的真实异常类型了,一个名为java.util.concurrent.CompletionException的老六出现了。我看看它是个啥!

我异常了,老六,我刚才在处理数字:3,异常原因:java.util.concurrent.CompletionException: com.java.basic.CompletableFutureDemo$BizApiException: 心别太大
[1, 2, -1]

CompletionException

这个类的注释说明了它的用途,简单理解是:它在完成结果或任务的过程中遇到错误或其他异常时会触发。那我懂了!任务异常之后它就会出现,然后还把我们自己的异常给包起来了!

JDK源代码

/**
 * Exception thrown when an error or other exception is encountered
 * in the course of completing a result or task.
 *
 * @since 1.8
 * @author Doug Lea
 */
public class CompletionException extends RuntimeException {}

其实还有个异常类,我们也需要关注,它告诉了我们,真实的异常在哪里了!

ExecutionException

这个类的注释就说的很清楚了,自己的异常都在getCause()中了,那就好办了.

/**
 * Exception thrown when attempting to retrieve the result of a task
 * that aborted by throwing an exception. This exception can be
 * inspected using the {@link #getCause()} method.
 *
 * @see Future
 * @since 1.5
 * @author Doug Lea
 */
public class ExecutionException extends Exception {}

改造我的并发工具类(完整版)

方法extractRealException就是我要获取的真实异常,同时parallelFutureJoin方法中引用一下,这个工具类就解决了需求。

/**
 * 创建并行任务并执行
 *
 * @param list            数据源
 * @param api             API调用逻辑
 * @param exceptionHandle 异常处理逻辑
 * @param <S>             数据源类型
 * @param <T>             程序返回类型
 * @return 处理结果列表
 */
public <S, T> List<T> parallelFutureJoin(Collection<S> list, Function<S, T> api, BiFunction<Throwable, S, T> exceptionHandle) {
    //规整所有任务
    List<CompletableFuture<T>> collectFuture = list.stream()
            .map(s -> this.createFuture(() -> api.apply(s), e -> exceptionHandle.apply(
this.extractRealException(e), s)))
            .toList();
    //汇总所有任务,并执行join,全部执行完成后,统一返回
    return CompletableFuture.allOf(collectFuture.toArray(new CompletableFuture<?>[]{}))
            .thenApply(f -> collectFuture.stream()
                    .map(CompletableFuture::join)
                    .filter(Objects::nonNull)
                    .collect(Collectors.toList()))
            .join();
}

/**
 * 创建CompletableFuture任务
 *
 * @param logic           任务逻辑
 * @param exceptionHandle 异常处理
 * @param <T>             类型
 * @return 任务
 */
public <T> CompletableFuture<T> createFuture(Supplier<T> logic, Function<Throwable, T> exceptionHandle) {
    return CompletableFuture.supplyAsync(logic, executorService).exceptionally(exceptionHandle);
}

/**
 * 提取真正的异常
 * <p>
 * CompletableFuture抛出的往往不是真正的异常
 *
 * @param throwable 异常
 * @return 真正的异常
 */
public Throwable extractRealException(Throwable throwable) {
    if (throwable instanceof CompletionException || throwable instanceof ExecutionException) {
        if (throwable.getCause() != null) {
            return throwable.getCause();
        }
    }
    return throwable;
}

当然,这还只是个简易版的并行任务工具类,还有更多的可能,大家需要自己去探索了!

参考文献