likes
comments
collection
share

CompletableFuture使用与原理解析

作者站长头像
站长
· 阅读数 19

本文概览

CompletableFuture使用与原理解析

Future vs CompletableFuture

Future简介

在并发编程中,我们一般会使用Runnable编写任务的内容然后提交给线程池交由线程池调度线程执行。这种情况我们通常是针对不关心任务执行的结果,但如果关心任务执行的结果,并且根据执行结果执行后续的动作,这个时候就需要配合使用Callable+Future来实现了,其中Callable关注异步任务的执行,而Future则关注异步任务的执行结果,它提供了方法来检查任务是否完成,并在完成后获取结果。我们可以看下下面的代码:

package com.markus.concurrent.future;

import com.markus.concurrent.completable_future.utils.CommonUtils;

import java.util.concurrent.*;

/**
 * @author: markus
 * @date: 2023/9/11 11:10 PM
 * @Description:
 * @Blog: https://markuszhang.com
 * It's my honor to share what I've learned with you!
 */
public class FutureHistory {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        ExecutorService pool = Executors.newFixedThreadPool(1);
        pool.submit(new Runnable() {
            @Override
            public void run() {
                System.out.println("Hello Runnable");
            }
        });
        Future<String> callableFuture = pool.submit(new Callable<String>() {
            @Override
            public String call() throws Exception {
                return "Hello Callable";
            }
        });
        CommonUtils.sleep(1, TimeUnit.SECONDS);
        // 一直阻塞等待
        CommonUtils.printLog(callableFuture.get());
        pool.shutdown();
    }
}

Future的局限性

Future的出现解决了对异步任务结果的管理,但也存在一些局限性,具体如下:

  • 在没有阻塞的情况下,无法对Future的结果执行进一步的操作。Future不会告知我们它什么时候完成,我们如果想要等待完成,则必须通过get()方法阻塞等待任务完成,Future不提供回调函数。
  • 无法解决任务相互依赖的问题。filterWordFuture和newsFuture的结果不能自动发送给finalNewsFuture,需要在finalNewsFuture中手动获取,所以使用Future不能轻而易举地创建异步工作流。示例在下面
  • 不能将多个Future合并,比如你现在有两个异步任务A、B,如果要求A、B都执行完再去执行后续的逻辑,这个动作在Future中很难独立完成。
  • 不能进行异常处理,Future.get调用时需要调用者手动去进行异常处理(显式抛出或捕获异常)。
package com.markus.concurrent.future;

import com.markus.concurrent.completable_future.utils.CommonUtils;

import java.util.concurrent.*;

/**
 * @author: markus
 * @date: 2023/9/11 11:23 PM
 * @Description:
 * @Blog: https://markuszhang.com
 * It's my honor to share what I've learned with you!
 */
public class FutureLimit {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        ExecutorService pool = Executors.newFixedThreadPool(2);
        Future<String> newsFuture = pool.submit(new Callable<String>() {
            @Override
            public String call() throws Exception {
                return CommonUtils.readFile("news.txt");
            }
        });

        Future<String> filterWordFuture = pool.submit(new Callable<String>() {
            @Override
            public String call() throws Exception {
                return CommonUtils.readFile("filter_word.txt");
            }
        });

        Future<String> finalNews = pool.submit(new Callable<String>() {
            @Override
            public String call() throws Exception {
                String news = newsFuture.get();
                String filterWord = filterWordFuture.get();
                // 将新闻中的敏感词过滤掉

                String[] filterWords = filterWord.split(",");
                for (String filter : filterWords) {
                    if (news.contains(filter)) {
                        news = news.replaceAll(filter, "**");
                    }
                }
                return news;
            }
        });
        System.out.println(finalNews.get());
        pool.shutdown();
    }
}

CompletableFuture的优势

CompletableFuture使用与原理解析

我们从类UML图中可以看到,CompletableFuture实现了Future和CompletionStage接口,它提供Future所能提供的功能,并相对于Future具有以下优势:

  • 为快速创建、链接依赖和组合多个Future提供了大量的便利方法。
  • 提供了适用于各种开发场景的回调函数,它还提供了非常全面的异常处理支持。
  • 无缝衔接和亲和Lambda表达式和Stream API
  • 真正意义上的异步编程,把异步编程和函数式编程、响应式编程多种高阶编程思维集于一身,设计上更优雅。

创建异步任务

CompletableFuture在异步任务创建上提供了如下方法:

public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier)
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier,Executor executor);
public static CompletableFuture<Void> runAsync(Runnable runnable);
public static CompletableFuture<Void> runAsync(Runnable runnable,Executor executor);

runAsync

如果要异步任务运行某些耗时的后台任务,并且不想从任务中返回任何内容,则可以使用CompletableFuture#runAsync()方法,它接受一个Runnable接口的实现类对象,方法返回CompletableFuture对象

package com.markus.concurrent.completable_future._01_completable_create;

import com.markus.concurrent.completable_future.utils.CommonUtils;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;

import static com.markus.concurrent.completable_future.utils.CommonUtils.printLog;
import static com.markus.concurrent.completable_future.utils.CommonUtils.sleep;

/**
 * @Author: zhangchenglong06
 * @Date: 2023/9/4
 * @Description:
 */
public class RunApiDemo {
  public static void main(String[] args) {
    System.out.println("main start");
    /* lambda 写法 */
    CompletableFuture<Void> futureLambda = CompletableFuture.runAsync(() -> {
      String news = CommonUtils.readFile("news.txt");
      printLog(news);
    });

    /*匿名内部类写法*/
    CompletableFuture<Void> future = CompletableFuture.runAsync(new Runnable() {
      @Override
      public void run() {
        String news = CommonUtils.readFile("news.txt");
        printLog(news);
      }
    });

    printLog("main not blocked");
    sleep(1, TimeUnit.SECONDS);
    printLog("main end");
  }
}

runAsync(Runnable runnable)和runAsync(Runnable runnable,Executor executor)的区别就在于:任务由哪个线程池来执行,如果采用第一个方法,则默认使用ForkJoin.commonPool,如果采用第二个方法,则使用传入的自定义线程池的线程来执行,对于第二个方法的使用,这里就不做代码赘述了。

supplyAsync

上面runAsync是不关心任务执行的结果,但是如果想从后台的异步任务中返回一个结果怎么办?那么则可以通过CompletableFuture#supplyAsync实现。

package com.markus.concurrent.completable_future._01_completable_create;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;

import static com.markus.concurrent.completable_future.utils.CommonUtils.printLog;
import static com.markus.concurrent.completable_future.utils.CommonUtils.sleep;

/**
 * @Author: zhangchenglong06
 * @Date: 2023/9/4
 * @Description:
 */
public class SupplyApiDemo {
  public static void main(String[] args) {
    CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
      printLog("supply api create future");
      return "future create";
    });

    printLog("main not blocked");
    sleep(1, TimeUnit.SECONDS);
    printLog("main thread end");
  }
}

supplyAsync(Supplier<U> supplier)和supplyAsync(Supplier<U> supplier,Executor executor)的区别就在于:任务由哪个线程池来执行,如果采用第一个方法,则默认使用ForkJoin.commonPool,如果采用第二个方法,则使用传入的自定义线程池的线程来执行,对于第二个方法的使用,这里就不做代码赘述了。

异步任务中的线程池

最佳实践:创建属于自己的业务线程池

如果所有的CompletableFuture都共享一个线程池,那么一旦有异步任务执行一些很慢的I/O操作,就会导致线程池中所有线程都阻塞在I/O操作上,从而造成线程饥饿,进而影响整个系统的性能。所以我们可以根据不同的业务类型创建不同的线程池,以避免互相干扰。

异步编程思想

异步编程是可以让程序并行(或并发)运行的一种手段,其可以让程序中的一个工作单元作为异步任务与主线程分开独立运行,并且在异步任务运行结束后,会通知主线程它的运行结果或者失败原因,毫无疑问,一个异步任务其实就是开启一个线程来完成的,使用异步编程可以提高应用程序的性能和响应能力等。

但在上面我们写的代码中,我们没有显示地创建线程,更没有涉及线程通信的概念,整个过程根本就没涉及线程知识。更专业点讲:线程的创建和线程负责的任务进行解耦,他给我们带来的好处是线程的创建和启动全部交给线程池负责,具体任务的编写就交给程序员(我们大部分应用层程序员),专人专事。

上面的说的也是一种编程思想,即异步编程思想。

异步任务回调

CompletableFuture.get()方法是阻塞的。调用时它会阻塞等待直到这个Future完成,并在完成后返回结果。但是,很多时候这不是我们想要的。对于构建异步系统,我们应该能够将回调附加到CompletableFuture上,当这个Future完成时,该回调应自动被调用。这样,我们就不必等待结果了,然后在Future的回调函数内编写完成Future之后需要执行的逻辑。我们可以通过如下函数实现回调功能:

public <U> CompletableFuture<U> thenApply(Function<? super T,? extends U> fn);
public <U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn);
public <U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn, Executor executor);
public CompletableFuture<Void> thenAccept(Consumer<? super T> action);
public CompletableFuture<Void> thenAcceptAsync(Consumer<? super T> action);
public CompletableFuture<Void> thenAcceptAsync(Consumer<? super T> action,Executor executor);
public CompletableFuture<Void> thenRun(Runnable action);
public CompletableFuture<Void> thenRunAsync(Runnable action);
public CompletableFuture<Void> thenRunAsync(Runnable action,Executor executor)

thenApply

使用thenApply()方法可以处理和转换CompletableFuture的结果,他以Function<T,R>作为参数,Function<T,R>是一个函数式接口,表示一个转换操作,它接受类型T的参数并产生类型R的结果。

package com.markus.concurrent.completable_future._02_completable_callback;

import com.markus.concurrent.completable_future.utils.CommonUtils;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

/**
 * @Author: zhangchenglong06
 * @Date: 2023/9/4
 * @Description: thenApply回调
 */
public class ThenApplyApiDemo {
    public static void main(String[] args) {
        System.out.println("main start");
        ExecutorService pool = Executors.newFixedThreadPool(4);
        CompletableFuture<String> readFileFuture = CompletableFuture.supplyAsync(() -> {
            return CommonUtils.readFile("news.txt");
        }, pool);

        /**
         * Function 将输入的参数t,经过一定的逻辑转为R类型的对象
         * R apply(T t);
         */
        readFileFuture.thenApply((content) -> {
            CommonUtils.printLog(content);
            return "读取完成";
        });
        readFileFuture.thenApplyAsync((content) -> {
            CommonUtils.printLog("async " + content);
            return "读取完成";
        }, pool);
        System.out.println("main not blocked");
        CommonUtils.sleep(1, TimeUnit.SECONDS);
        pool.shutdown();
        System.out.println("main end");
    }
}

thenAccpet

如果不需要回调函数返回结果,只是想在Future完成后执行一些代码,则可以使用thenAccept(),这些方法入参是一个Consumer<T>,它可以对异步任务结果进行消费使用,方法返回CompletableFuture<Void>。通过作为回调链中的最后一个回调。

package com.markus.concurrent.completable_future._02_completable_callback;


import com.markus.concurrent.completable_future.utils.CommonUtils;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

/**
 * @Author: zhangchenglong06
 * @Date: 2023/9/4
 * @Description: thenAccept回调
 */
public class ThenAcceptApiDemo {
    public static void main(String[] args) {
        ExecutorService pool = Executors.newFixedThreadPool(4);
        System.out.println("main start");
        CompletableFuture<String> readFileFuture = CompletableFuture.supplyAsync(() -> {
            CommonUtils.printLog("开始加载文件!");
            return CommonUtils.readFile("news.txt");
        }, pool);

        /**
         * Consumer 将输入的参数t,进行消费,不返回结果
         * void accept(T t);
         */
        readFileFuture.thenAccept((content) -> {
            CommonUtils.printLog(content);
        });

        System.out.println("main not blocked");
        CommonUtils.sleep(1, TimeUnit.SECONDS);
        pool.shutdown();
        System.out.println("main end");
    }
}

thenRun

前面我们已经知道,通过thenApply对链式操作中的上一个异步任务的结果进行转换,返回一个新的结果;通过thenAccept对链式操作的上一个异步任务的结果进行消费使用,不返回新结果。如果我们只是想从CompletableFuture的链式操作得到一个完成的通知,甚至都不使用上一步链式操作的结果,那么可以使用CompletableFuture.thenRun(),并返回CompletableFuture<Void>。

package com.markus.concurrent.completable_future._02_completable_callback;

import com.markus.concurrent.completable_future.utils.CommonUtils;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;

import static com.markus.concurrent.completable_future.utils.CommonUtils.printLog;

/**
 * @Author: zhangchenglong06
 * @Date: 2023/9/4
 * @Description: thenRun回调
 */
public class ThenRunApiDemo {
  public static void main(String[] args) {
    System.out.println("main start");
    CompletableFuture<String> readFileFuture = CompletableFuture.supplyAsync(() -> {
      printLog("read news.txt file");
      return CommonUtils.readFile("news.txt");
    });

    /**
     * Runnable 不依赖上面的结果,只关心任务是否完成,如果完成则执行进一步处理
     */
    readFileFuture.thenRun(() -> {
      printLog("readFile action execution is completed!");
    });

    System.out.println("main not blocked");
    CommonUtils.sleep(1, TimeUnit.SECONDS);
    System.out.println("main end");
  }
}

更进一步提升并行化

一般而言,commonPool为了提高性能,thenApply中回调任务和supplyAsync中的异步任务使用的是同一个线程

特殊情况:如果supplyAsync中的任务是立即返回结果(不是耗时的任务),thenApply回调任务也会在主线程中执行。

package com.markus.concurrent.completable_future._02_completable_callback;

import com.markus.concurrent.completable_future.utils.CommonUtils;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;

/**
 * @author: markus
 * @date: 2023/9/12 10:47 PM
 * @Description:
 * @Blog: https://markuszhang.com
 * It's my honor to share what I've learned with you!
 */
public class MainThreadExecuteDemo {
    public static void main(String[] args) {
        CompletableFuture<String> strFuture = CompletableFuture.supplyAsync(() -> {
            return "不耗时任务";
        });
        strFuture.thenApply((content) -> {
            CommonUtils.printLog(content);
            return "执行完成";
        });

        CommonUtils.sleep(1, TimeUnit.SECONDS);
    }
}
// 控制台
/**
 * 1694530125167 |  1 | main | 不耗时任务
 * <p>
 * Process finished with exit code 0
 */

异步任务编排

thenCompose

thenCompose用来编排2个依赖关系的异步任务,假设现在没有这个函数,我们要实现这样一个需求:异步读取filter_words.txt文件的内容,读取完成后,转换成敏感词数组让主线程使用。我们怎么实现?

一般来说,实现两个CompletableFuture,并通过thenApply进行链式调用,最终返回一个CompletableFuture嵌套的结果,具体代码如下:

package com.markus.concurrent.completable_future._03_completable_arrange;

import com.markus.concurrent.completable_future.utils.CommonUtils;

import java.util.concurrent.CompletableFuture;

/**
 * @author: markus
 * @date: 2023/9/12 10:57 PM
 * @Description: 简述如果没有thenCompose,应该如何实现两个有依赖的异步任务合并操作
 * @Blog: https://markuszhang.com
 * It's my honor to share what I've learned with you!
 */
public class ThenComposeBackgroundDemo {
    public static void main(String[] args) {
        CompletableFuture<CompletableFuture<String[]>> completableFutureCompletableFuture = readFileFuture("filter_word.txt")
                .thenApply((content) -> {
                    return splitFuture(content);
                });
        String[] split = completableFutureCompletableFuture.join().join();
        for (String s : split) {
            System.out.println(s);
        }
    }

    private static CompletableFuture<String> readFileFuture(String fileName) {
        return CompletableFuture.supplyAsync(() -> {
            return CommonUtils.readFile(fileName);
        });
    }

    private static CompletableFuture<String[]> splitFuture(String content) {
        return CompletableFuture.supplyAsync(() -> {
            return content.split(",");
        });
    }
}

就是这一大坨:CompletableFuture<CompletableFuture<String[]>>,而thenCompose让我们仅关注异步任务执行的结果,仅通过一次CompletableFuture.get() 或者 join即可得到结果。我们来看下使用thenCompose实现上述逻辑:

package com.markus.concurrent.completable_future._03_completable_arrange;

import com.markus.concurrent.completable_future.utils.CommonUtils;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;

/**
 * @author: markus
 * @date: 2023/9/4 11:07 PM
 * @Description: thenCompose 示例
 * @Blog: https://markuszhang.com
 * It's my honor to share what I've learned with you!
 */
public class ThenComposeApiDemo {
    public static void main(String[] args) {
        // 需求:编排两个有依赖关系CompletableFuture
        CommonUtils.printLog("main start");
        CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
            String content = CommonUtils.readFile("filter_word.txt");
            return content;
        });

        CompletableFuture<String[]> filterWordFuture = future.thenCompose((content) -> {
            return CompletableFuture.supplyAsync(() -> {
                return content.split(",");
            });
        });
        CommonUtils.printLog("main not blocked!");
        String[] split = filterWordFuture.join();
        for (String s : split) {
            System.out.println(s);
        }
        CommonUtils.printLog("main end");
    }
}

thenCombine

thenCombine用来编排2个非依赖关系的异步任务。我们已经知道,当其中一个Future依赖于另一个Future,使用thenCompose()用于组合两个Future。如果两个Future之间没有依赖关系,你希望两个Future独立运行并在两者都完成之后执行回调操作时,则使用thenCombine()。

package com.markus.concurrent.completable_future._03_completable_arrange;

import com.markus.concurrent.completable_future.utils.CommonUtils;

import java.util.concurrent.CompletableFuture;

/**
 * @author: markus
 * @date: 2023/9/4 11:07 PM
 * @Description: thenCombine 示例
 * @Blog: https://markuszhang.com
 * It's my honor to share what I've learned with you!
 */
public class ThenCombineApiDemo {
    public static void main(String[] args) {
        // 需求:编排两个无依赖关系CompletableFuture
        CommonUtils.printLog("main start");
        CompletableFuture<String> newsFuture = CompletableFuture.supplyAsync(() -> {
            String content = CommonUtils.readFile("news.txt");
            return content;
        });

        CompletableFuture<String> filterWordFuture = CompletableFuture.supplyAsync(() -> {
            String content = CommonUtils.readFile("filter_word.txt");
            return content;
        });

        CompletableFuture<String> future = newsFuture.thenCombine(filterWordFuture, (news, filterWord) -> {
            String[] filterWords = filterWord.split(",");
            for (String word : filterWords) {
                // 说明字符串中有敏感词
                if (news.contains(word)) {
                    news = news.replaceAll(word, "**");
                }
            }
            return news;
        });
        CommonUtils.printLog("main not blocked!");
        System.out.println(future.join());
        CommonUtils.printLog("main end");
    }
}

allOf

allOf用于所有异步任务执行完后执行一个动作。

package com.markus.concurrent.completable_future._03_completable_arrange;

import com.markus.concurrent.completable_future.utils.CommonUtils;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;

/**
 * @author: markus
 * @date: 2023/9/4 11:27 PM
 * @Description: allOf 示例
 * @Blog: https://markuszhang.com
 * It's my honor to share what I've learned with you!
 */
public class AllOfApiDemo {
    public static void main(String[] args) {
        // 演示:每一个异步任务执行完成后都执行一个动作
        CommonUtils.printLog("main start");
        CompletableFuture<String> newsFuture = CompletableFuture.supplyAsync(() -> {
            String content = CommonUtils.readFile("news.txt");
            CommonUtils.printLog(content);
            return content;
        });
        CompletableFuture<String> filterWordFuture = CompletableFuture.supplyAsync(() -> {
            String content = CommonUtils.readFile("filter_word.txt");
            CommonUtils.printLog(content);
            return content;
        });
        CompletableFuture<Void> voidCompletableFuture = CompletableFuture.allOf(newsFuture, filterWordFuture);
        voidCompletableFuture.thenRun(() -> {
            CommonUtils.printLog("异步任务执行完成");
        });

        CommonUtils.printLog("main not blocked!");
        CommonUtils.sleep(2, TimeUnit.SECONDS);
        CommonUtils.printLog("main end");
    }
}

anyOf

anyOf用于多个异步任务中任意一个完成后执行一次动作。

package com.markus.concurrent.completable_future._03_completable_arrange;

import com.markus.concurrent.completable_future.utils.CommonUtils;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;

/**
 * @author: markus
 * @date: 2023/9/4 11:27 PM
 * @Description: allOf 示例
 * @Blog: https://markuszhang.com
 * It's my honor to share what I've learned with you!
 */
public class AnyOfApiDemo {
    public static void main(String[] args) {
        // 演示:只要有一个完成即可执行下一步操作
        CommonUtils.printLog("main start");
        CompletableFuture<String> newsFuture = CompletableFuture.supplyAsync(() -> {
            String content = CommonUtils.readFile("news.txt");
            return content;
        });
        CompletableFuture<String> filterWordFuture = CompletableFuture.supplyAsync(() -> {
            String content = CommonUtils.readFile("filter_word.txt");
            return content;
        });
        CompletableFuture<Object> voidCompletableFuture = CompletableFuture.anyOf(newsFuture, filterWordFuture);
        voidCompletableFuture.thenAccept((content) -> {
            CommonUtils.printLog((String) content);
        });

        CommonUtils.printLog("main not blocked!");
        CommonUtils.sleep(2, TimeUnit.SECONDS);
        CommonUtils.printLog("main end");
    }
}

异常处理

CompletableFuture类提供了两个函数用于异常处理:

  • exceptionally:用于处理异步回调链上出现的异常,如果出现异常,则不会继续向下执行,并且在exceptionally中处理异常。
  • handle:与exceptionally不同,handle用于恢复回调链中的异常,后续的回调函数还是可以正常执行。

我们来看下对两个函数的使用示例:

exceptionnal

package com.markus.concurrent.completable_future._04_completable_exception;

import com.markus.concurrent.completable_future.utils.CommonUtils;

import java.util.concurrent.CompletableFuture;

/**
 * @author: markus
 * @date: 2023/9/4 11:36 PM
 * @Description: 异常处理exceptionally 示例
 * @Blog: https://markuszhang.com
 * It's my honor to share what I've learned with you!
 */
public class ExceptionallyApiDemo {
    public static void main(String[] args) {
        CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
            // 1. 当此处出现异常时:
            int i = 1 / 0;
            return "future1";
        }).thenApply((str) -> {
            return str + " future2";
        }).thenApply((str) -> {
            return str + " future3";
        }).exceptionally((ex) -> {
            CommonUtils.printLog(ex.getMessage());
            return "Unknown";
        });

        // 正常情况打印:future1 future2 future3
        // 异常情况打印:Unknown
        System.out.println(future.join());
    }
}

handle

package com.markus.concurrent.completable_future._04_completable_exception;

import com.markus.concurrent.completable_future.utils.CommonUtils;

import java.util.concurrent.CompletableFuture;

/**
 * @author: markus
 * @date: 2023/9/4 11:36 PM
 * @Description: 异常处理exceptionally 示例
 * @Blog: https://markuszhang.com
 * It's my honor to share what I've learned with you!
 */
public class HandleApiDemo {
    public static void main(String[] args) {
        CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
                    // 1. 当此处出现异常时:
                    int i = 1 / 0;
                    return "future1";
                }).handle((res, ex) -> {
                    if (ex != null) {
                        CommonUtils.printLog(ex.getMessage());
                        return "Unknown";
                    }
                    return res;
                })
                .thenApply((str) -> {
                    return str + " future2";
                }).thenApply((str) -> {
                    return str + " future3";
                });

        // 正常情况打印:future1 future2 future3
        // 异常情况打印:Unknown future2 future3
        System.out.println(future.join());
    }
}

异步任务的交互

applyToEither

把两个异步任务作比较,异步任务先执行完成,就对其结果执行下一步的处理,并且返回执行后的结果。

package com.markus.concurrent.completable_future._05_completable_interaction;

import com.markus.concurrent.completable_future.utils.CommonUtils;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;

/**
 * @author: markus
 * @date: 2023/9/12 11:22 PM
 * @Description:
 * @Blog: https://markuszhang.com
 * It's my honor to share what I've learned with you!
 */
public class ApplyToEitherApiDemo {
    public static void main(String[] args) {
        CompletableFuture<String> strFuture1 = CompletableFuture.supplyAsync(() -> {
            CommonUtils.sleep(1, TimeUnit.SECONDS);
            return "strFuture1";
        });

        CompletableFuture<String> strFuture2 = CompletableFuture.supplyAsync(() -> {
            CommonUtils.sleep(2, TimeUnit.SECONDS);
            return "strFuture2";
        });

        CompletableFuture<String> future = strFuture1.applyToEither(strFuture2, (content) -> {
            return content;
        });

        // 理论上应该获取到strFuture1的结果
        // 实际控制台打印的也是strFuture1的结果
        System.out.println(future.join());
    }
}

acceptEither

把两个异步任务作比较,异步任务先执行完成,就对其结果执行下一步的处理,该函数无返回值。

package com.markus.concurrent.completable_future._05_completable_interaction;

import com.markus.concurrent.completable_future.utils.CommonUtils;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;

/**
 * @author: markus
 * @date: 2023/9/12 11:22 PM
 * @Description:
 * @Blog: https://markuszhang.com
 * It's my honor to share what I've learned with you!
 */
public class AcceptEitherApiDemo {
    public static void main(String[] args) {
        CompletableFuture<String> strFuture1 = CompletableFuture.supplyAsync(() -> {
            CommonUtils.sleep(1, TimeUnit.SECONDS);
            return "strFuture1";
        });

        CompletableFuture<String> strFuture2 = CompletableFuture.supplyAsync(() -> {
            CommonUtils.sleep(2, TimeUnit.SECONDS);
            return "strFuture2";
        });

        CompletableFuture<Void> future = strFuture1.acceptEither(strFuture2, (content) -> {
            CommonUtils.printLog(content);
        });

        CommonUtils.sleep(3, TimeUnit.SECONDS);
    }
}

runAfterEither

不关心两个异步任务的执行结果,只要有一个完成了就会触发这个函数的回调,执行一些操作。

package com.markus.concurrent.completable_future._05_completable_interaction;

import com.markus.concurrent.completable_future.utils.CommonUtils;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;

/**
 * @author: markus
 * @date: 2023/9/12 11:22 PM
 * @Description:
 * @Blog: https://markuszhang.com
 * It's my honor to share what I've learned with you!
 */
public class RunAfterEitherApiDemo {
    public static void main(String[] args) {
        CompletableFuture<String> strFuture1 = CompletableFuture.supplyAsync(() -> {
            CommonUtils.sleep(1, TimeUnit.SECONDS);
            CommonUtils.printLog("strFuture1 执行");
            return "strFuture1";
        });

        CompletableFuture<String> strFuture2 = CompletableFuture.supplyAsync(() -> {
            CommonUtils.sleep(2, TimeUnit.SECONDS);
            CommonUtils.printLog("strFuture2 执行");
            return "strFuture2";
        });

        CompletableFuture<Void> future = strFuture1.runAfterEither(strFuture2, () -> {
            CommonUtils.printLog("其中一个任务已经执行完成了");
        });

        CommonUtils.sleep(3, TimeUnit.SECONDS);
    }
}

get和join的区别

我们在获取异步任务执行结果的时候,通常会通过两种方式获取,一种是get,另一种是join,他们俩的区别在于:

  • 异常处理:

    • get(): 当 CompletableFuture 的计算抛出异常时,get() 会将异常封装为 ExecutionException,并抛出这个异常。如果等待被中断,它会抛出 InterruptedException

    • join(): 对于同样的异常,join() 会将其封装为 CompletionException 并抛出,而不是 ExecutionException。并且它不会抛出 InterruptedException;如果当前线程被中断,它仍然会尝试继续运行,直到正常返回结果或者抛出其他异常。

  • 方法签名:

    • get(): 由于 get() 可能会抛出 InterruptedExceptionExecutionException,因此它在方法签名中声明了这两个异常。
    • join(): 它不声明任何检查型异常,因此在使用时不需要显式地处理或声明任何异常。
  • 使用场景:

    • get(): 由于它声明了检查型异常,当你需要明确处理这些异常的时候,get() 可能是更好的选择。

    • join(): 当你希望简化代码,不想处理大量的异常时,join() 是一个不错的选择。另外,当你使用流式API或函数式编程样式时,join() 通常会更简洁。

本文总结

好了,对于CompletableFuture的讲述就写到这里,大部分内容也是我学习的过程中参考别人的,并将内容进行消化理解写成博客分享出来。本文主要从CompletableFuture出现的背景对比Future的优势说起,并紧接着熟悉了CompletableFuture的异步任务创建、回调函数使用、任务编排、异常处理、任务交互等方面做了详细解释并展示了相关代码示例,如果还有什么疑问,欢迎评论交流探讨。

参考阅读