likes
comments
collection
share

深度剖析 CompletableFuture 工作原理!

作者站长头像
站长
· 阅读数 24

嗨,你好,我是猿java

在日常开发中,为了提高程序的性能,我们经常会使用异步方式来完成,在本文中,我们将学习一种常用的工具类: CompletableFuture,并且学习如何使用它来提高 Java 应用程序的性能,让我们开始学习旅程吧!

在分析 CompletableFuture 之前,我们先来看看 Future

什么是 Future?

Future 是 Java 5 中引入的 Java 接口,用于表示将来可用的值,使用 Future 给 Java 带来了巨大的好处,它可以帮助我们异步执行一些非常密集的计算,而不会阻塞当前线程,因此,我们可以在当前线程中继续做一些工作。

我们可以把 Future 想象成去餐厅吃晚饭,在晚餐准备期间,我们可以干些其他的事情,一旦晚餐准备好,就可以吃饭了。

Future 的源码如下:

package java.util.concurrent;

public interface Future<V> {

    boolean cancel(boolean mayInterruptIfRunning);
    
    boolean isCancelled();
   
    boolean isDone();
    
    V get() throws InterruptedException, ExecutionException;
    
    V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException;
}

什么是 CompletableFuture?

CompletableFuture 是 Java 8 引入的一个类,用于处理异步编程。它是 Future 接口的一个增强版,提供了一些有用的方法来管理异步任务。其源码如下:

public class CompletableFuture<T> implements Future<T>, CompletionStage<T> {
    volatile Object result;       // Either the result or boxed AltResult
    volatile Completion stack;    // Top of Treiber stack of dependent actions
    // 方法太多,此处省略
}

CompletableFuture 和 Future

在本节中,我们将了解 Future 接口以及它的一些局限性,并且分析如何使用 CompletableFuture 类来解决这些问题。

定义超时

Future 接口只提供了 get() 方法来获取计算结果,但如果计算时间过长,我们的线程就会一直堵塞。

为了更好地理解,让我们看一些示例代码:

import java.util.concurrent.*;

public class FutureDemo {

    public static void main(String[] args) throws ExecutionException, InterruptedException {
        ExecutorService executor = Executors.newSingleThreadExecutor();
        Future<String> future = executor.submit(() -> threadSleep());
        System.out.println("The result is: " + future.get());
    }

    private static String threadSleep() throws InterruptedException {
        TimeUnit.DAYS.sleep(365 * 10);
        return "finishSleep";
    }
}

上述示例中,我们创建了一个 ExecutorService 实例,并且使用它来提交一个线程睡眠的任务(threadSleep()),然后通过调用 future.get() 方法在控制台上打印结果值,因为 threadSleep() 方法中,我们让线程睡眠了 10年,所以控制台要等待 10年才会打印值,而且 Future 也没有任何方法可以手动完成任务。

那么,CompletableFuture 是如何克服这个问题的呢?

我们还是使用相同的场景,并且在流程中调用了 CompletableFuture.complete() 方法,示例代码如下:

import java.util.concurrent.*;

public class CompletableFutureDemo {

    public static void main(String[] args) {
        CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(() -> threadSleep());
        completableFuture.complete("Completed");
        System.out.println("result: " + completableFuture.get());
        System.out.println("completableFuture done ? " + completableFuture.isDone());
    }

    private static String threadSleep(){
        try {
            TimeUnit.DAYS.sleep(365 * 10);
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
        return "finishSleep";
    }
}

在上述示例中:

  • 首先,通过调用 CompletableFuture.supplyAsync() 方法创建一个 String 类型的 CompletableFuture;
  • 然后,调用 completableFuture.complete()方法;
  • 接着,调用 isDone() 方法,并打印其结果值;
  • 最后,main() 方法的输出如下:
result: Completed
completableFuture done ? true

通过运行结果我们可以看到:需要睡眠 10年的 threadSleep() 居然完成了,为什么呢?在整个代码中嫌疑最大的是 completableFuture.complete(),因此我们来看看这个方法到底做了什么?其源码如下:

    /**
     * If not already completed, sets the value returned by {@link
     * #get()} and related methods to the given value.
     *
     * @param value the result value
     * @return {@code true} if this invocation caused this CompletableFuture
     * to transition to a completed state, else {@code false}
     */
    public boolean complete(T value) {
        boolean triggered = completeValue(value);
        postComplete();
        return triggered;
    }

通过源码我们可以知道:complete(T value) 方法是用于手动完成一个 CompletableFuture 任务(即使任务尚未执行或未完成)并且返回 value。

因此,CompletableFuture 是通过 complete(T value)方法手动结束 任务,从而客服了 Futrue 无法手动结束任务的限制。

组合异步操作

假设我们需要调用两个 Method:firstMethod() 和 secondMethod(),并且将 firstMethod() 的结果作为 secondMethod() 的输入。

通过使用 Future 接口,我们无法异步组合这两个操作,只能同步完成,示例代码如下:

import java.util.concurrent.*;

public class FutureDemo {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        ExecutorService executor = Executors.newSingleThreadExecutor();
        Future<Integer> firstFuture = executor.submit(() -> firstMethod(1));

        int firstMethodResult = firstFuture.get(); // 获取 firstMethod的结果值
        System.out.println("firstMethodResult:" + firstMethodResult);
        Future<Integer> secondFuture = executor.submit(() -> secondMethod(firstMethodResult));
        System.out.println("secondMethodResult:" + secondFuture.get());
        executor.shutdown();
    }

    private static int firstMethod(int num) {
        return num;
    }

    private static int secondMethod(int firstMethodResult) {
        return 2 + firstMethodResult;
    }
}

在上述示例代码中:

  • 首先,通过 ExecutorService 提交一个返回 Future 的任务来调用 firstMethod;
  • 然后,需要将 firstMethod 的结果值传递给第 secondMethod,但检索 firstMethod 结果值的唯一方法是使用 Future.get(),该方法会阻塞主线程;
  • 接着,我们必须等到 firstMethod 返回结果,然后再执行 secondMethod 操作,整个流程就变成同步过程;
  • 最后,main() 的输出如下:
firstMethodResult:1
secondMethodResult:3

通过运行结果可以看出,结果符合预期而且整个过程是串行执行的。

那么,CompletableFuture 是如何在不阻塞主线程的前提下,异步组合两个过程的呢?具体操作如下示例代码:

import java.util.concurrent.*;

public class CompletableFutureDemo {

    public static void main(String[] args) throws ExecutionException, InterruptedException {
        var finalResult = CompletableFuture.supplyAsync(() -> firstMethod(1))
                .thenApply(firstMethodResult -> secondMethod(firstMethodResult));
        System.out.println("finalResult:" + finalResult.get());
    }
    private static int firstMethod(int num) {
        return num;
    }

    private static int secondMethod(int firstMethodResult) {
        return 2 + firstMethodResult;
    }
}

在上述示例代码中:

  • 首先,通过 CompletableFuture.supplyAsync 方法,返回一个新的 CompletableFuture,该 CompletableFuture 是在 ForkJoinPool.commonPool() 中异步完成的,并且将结果值赋值给 Supplier;
  • 接着,获取 firstMethod() 的结果并使用 thenApply() 方法,将其传递给另一个调用 secondMethod();
  • 最后,main() 的输出如下:
finalResult:3

通过运行结果可以看出,结果符合预期而且 CompletableFuture 成功的把 firstMethod() 和 secondMethod() 两个异步过程整合。

CompletableFuture如何提升性能?

在本节中,我们将通过设定的场景来验证 CompletableFuture 是如何提升性能的。

场景设定

  • 定义一个 Transaction 类,并且包含 id 属性;
  • 定义一个 TransactionExecutor类,包含 transactionId 属性和静态方法 doTransaction(Transaction transaction),方法接收 Transaction对象;
  • 在 doTransaction() 方法中,通过线程睡眠 1秒来模拟业务耗时;

场景涉及的代码如下:

public class TransactionExecutor {
    private final String transactionId;

    public TransactionExecutor(String transactionId) {
        this.transactionId = transactionId;
    }

    public static TransactionExecutor doTransaction(Transaction transaction) {
        Thread.sleep(1000L); // 通过线程睡眠来模拟真实的业务耗时
        return new TransactionExecutor("transactionId: " + transaction.getId());
    }
    
    @Override
    public String toString() {
        return "TransactionExecutor{" +
                "transactionId='" + transactionId + ''' +
                '}';
    }
}

public class Transaction {
  private String id;

  public Transaction(String id) {
    this.id = id;
  }
  // getter setter方法
}

接着,我们将通过以下 3种方式来执行给定的场景:

  • 同步实现
  • 并行流实现
  • CompletableFuture 实现

同步实现

public class Demo {

  public static void main(String[] args) {
    long start = System.currentTimeMillis();
    var executor = Stream.of(
            new Transaction("1"),
            new Transaction("2"),
            new Transaction("3"))
        .map(TransactionExecutor::doTransaction)
        .collect(Collectors.toList());
    long end = System.currentTimeMillis();

    System.out.printf("The operation take %s ms%n", end - start);
    System.out.println("TransactionExecutor are: " + executor);
  }
}

运行代码结果如下:

The operation take 3087 ms
TransactionExecutor are: [TransactionExecutor{transactionId='transactionId: 1'},
TransactionExecutor{transactionId='transactionId: 2'}, 
TransactionExecutor{transactionId='transactionId: 3'}]

从运行结果可以看出:该程序花费了 3 秒多,因为每个事务都是串行执行,并且每个事务消耗 1秒,和预期时间比较吻合。

并行流实现

我们使用 Lambda 的 parallel并行流来实现,示例代码如下:

public class Demo {

  public static void main(String[] args) {
    long start = System.currentTimeMillis();
    var executor = Stream.of(
            new Transaction("1"),
            new Transaction("2"),
            new Transaction("3"))
        .parallel()
        .map(TransactionExecutor::doTransaction)
        .collect(Collectors.toList());
    long end = System.currentTimeMillis();

    System.out.printf("The operation took %s ms%n", end - start);
    System.out.println("TransactionExecutor are: " + executor);
  }
}

通过运行结果,我们发现它合同步方法的差异是巨大的!应用程序运行速度几乎快了三倍,如下所示:

The operation took 1007 ms
TransactionExecutor are: [TransactionExecutor{transactionId='transactionId: 1'},
TransactionExecutor{transactionId='transactionId: 2'},
TransactionExecutor{transactionId='transactionId: 3'}]

CompletableFuture实现

现在将重构我们的客户端应用程序以利用 CompletableFuture:

public class Demo {

  public static void main(String[] args) {
    Executor executor = Executors.newFixedThreadPool(10);
    long start = System.currentTimeMillis();
    var future = Stream.of(
            new Transaction("1"),
            new Transaction("2"),
            new Transaction("3"),
            new Transaction("4"),
            new Transaction("5"),
            new Transaction("6"),
            new Transaction("7"),
            new Transaction("8"),
            new Transaction("9"),
            new Transaction("10")
        ).map(transaction -> CompletableFuture.supplyAsync(
                () -> TransactionExecutor::doTransaction(transaction), executor)
        ).collect(toList());

    var categories = future.stream()
        .map(CompletableFuture::join)
        .collect(toList());
    long end = System.currentTimeMillis();

    System.out.printf("The operation took %s ms%n", end - start);
    System.out.println("TransactionExecutor are: " + categories);
  }
}

使用 CompletableFuture,对于 10 个线程,执行的时间也在 1 秒左右,性能简直杆杆的,运行结果如下:

The operation took 1040 ms
TransactionExecutor are:[TransactionExecutor{transactionId='transactionId: 1'},
TransactionExecutor{transactionId='transactionId: 2'},
TransactionExecutor{transactionId='transactionId: 3'},
TransactionExecutor{transactionId='transactionId: 4'},
TransactionExecutor{transactionId='transactionId: 5'},
TransactionExecutor{transactionId='transactionId: 6'},
TransactionExecutor{transactionId='transactionId: 7'},
TransactionExecutor{transactionId='transactionId: 8'},
TransactionExecutor{transactionId='transactionId: 9'},
TransactionExecutor{transactionId='transactionId: 10'}]

CompletableFuture工作流程

CompletableFuture 的执行流程可以分为以下 3个阶段:

  1. 创建阶段: 创建 CompletableFuture 实例,并提交异步任务。
  2. 执行阶段: 异步任务在线程池中执行,并在完成后触发相关的回调函数。
  3. 完成阶段: 异步任务完成,CompletableFuture 的状态更新,并执行后续操作。

创建阶段

CompletableFuture 的创建可以通过以下几种方法:

  • CompletableFuture.supplyAsync(Supplier supplier): 提交一个返回结果的异步任务;
  • CompletableFuture.runAsync(Runnable runnable): 提交一个不返回结果的异步任务;
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
    // 执行任务
    return "Result";
});

执行阶段

异步任务提交后,会在线程池中执行。默认情况下,CompletableFuture 使用 ForkJoinPool.commonPool() 作为它的线程池。如果需要自定义线程池,可以使用以下方法:

  • CompletableFuture.supplyAsync(Supplier supplier, Executor executor);
  • CompletableFuture.runAsync(Runnable runnable, Executor executor);

如下代码:

Executor executor = Executors.newFixedThreadPool(3);
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
    // 执行任务
    return "Result";
}, executor);

完成阶段

任务完成后,CompletableFuture 的状态将从未完成(未完成或未异常)更新为已完成(正常完成或异常完成)。你可以定义一系列操作来处理任务的结果或异常。

正常完成

任务正常完成后,可以使用以下方法进行链式调用:

  • thenApply(Function<T, U> fn): 对任务结果进行转换;
  • thenAccept(Consumer action): 对任务结果进行消费;
  • thenRun(Runnable action): 执行一个额外的任务,不依赖于任务结果;

如下代码:

future.thenApply(result -> {
    return "Processed " + result;
}).thenAccept(processedResult -> {
    System.out.println(processedResult);
});

异常完成

如果任务执行过程中抛出了异常,可以使用以下方法处理异常:

  • exceptionally(Function<Throwable, T> fn): 在任务抛出异常时提供一个默认值;
  • handle(BiFunction<T, Throwable, U> fn): 任务完成后处理结果或异常;

如下代码:

future.exceptionally(ex -> {
    System.out.println("Exception: " + ex.getMessage());
    return "Default Result";
});

合并多个CompletableFuture

可以合并多个CompletableFuture,等待它们全部完成或其中任意一个完成:

  • allOf(CompletableFuture<?>... cfs): 等待所有 CompletableFuture 完成;
  • anyOf(CompletableFuture<?>... cfs): 等待任意一个 CompletableFuture 完成。

如下代码:

CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> "Result 1");
CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> "Result 2");

CompletableFuture<Void> combinedFuture = CompletableFuture.allOf(future1, future2);
combinedFuture.thenRun(() -> System.out.println("All tasks completed."));

CompletableFuture 的执行流程分为创建、执行和完成三个阶段。通过创建异步任务、在线程池中执行任务,并在任务完成后进行结果处理,可以实现复杂的异步编程逻辑。

适用场景

CompletableFuture 适合在处理异步编程和并发任务时使用。以下是一些适合使用 CompletableFuture 的典型场景:

异步 I/O 操作

在进行 I/O 操作时,例如文件读取、数据库访问或网络请求,使用 CompletableFuture 可以使这些操作异步执行,从而避免阻塞主线程,提高应用程序的响应性。

并行处理任务

当需要并行处理多个独立任务时,使用 CompletableFuture 可以有效利用多核 CPU,提高计算效率。

流水线处理

当有一系列依赖的操作需要按顺序执行时,CompletableFuture 可以使这些操作异步执行,形成处理流水线,从而提高处理效率。

事件驱动的异步处理

在事件驱动的系统中,例如 GUI 应用或服务器请求处理,CompletableFuture 可以在事件发生时异步处理任务。

异常处理和回退机制

CompletableFuture 提供了灵活的异常处理机制,可以在异步任务发生异常时执行回退操作或提供默认值。

并发任务的组合

CompletableFuture 可以组合多个并发任务的结果,例如使用 allOf 和 anyOf 方法。

总结

在本文中,我们学习了如何在 Java 中使用 Future 接口以及它的局限性,同时,我们还学习了如何使用 CompletableFuture 类来克服 Future 的这些限制。

接着,我们通过一个业务场景演示,通过同步执行,并发执行,CompletableFuture执行来比较它们的执行效率。

最后,CompletableFuture 适合在异步编程、并发任务、非阻塞 I/O、事件驱动处理、异常处理、任务组合等场景中使用。它提供了丰富的 API 来处理异步操作,使代码更加简洁、可读和高效。通过使用 CompletableFuture,开发者可以更好地利用系统资源,提高应用程序的性能和响应性。

学习交流

如果你觉得文章有帮助,请帮忙点个赞呗,或者关注公众号:猿java,持续输出硬核文章。

转载自:https://juejin.cn/post/7390593301509849100
评论
请登录