likes
comments
collection
share

【多线程05】Java中的异步编程

作者站长头像
站长
· 阅读数 12

本文主要有以下内容:

  • 使用 Thread 实现异步编程
  • 使用 Future 和 FutureTask 实现异步编程
  • 使用 CompletableFuture 实现异步编程
  • Spring Boot 对异步编程的支持
  • Guava 对异步编程的支持

异步编程

同步:代码按照书写顺序执行,每一个操作都会阻塞当前线程,直到该操作完成后才能进行下一个操作,因此代码执行的顺序是确定的。

异步:异步指的是代码执行不会等待上一步操作是否执行完毕,而是继续执行下面的代码。

用不恰当的例子说明就是:同步就相当于过独木桥一样,必须等待前一个人过了后面的人才能上桥,否则就掉下去了,而异步就相当于你在玩 QQ 飞车个人赛一样,完全不用管其他人在哪,跑就完事了!接下来就来看一看常见的几种异步编程方式!

Thread 实现异步编程

通过新建线程的方式来执行异步的代码,这样就可以实现,以异步打印日志为例,首先是同步打印的例子。

@Test
public void syncPrint(){
    log.info("同步打印:我在上面我先打印");
    log.info("同步打印:我在下面我后打印");
}

下面是通过新建线程异步打印的例子。

@Test
public void asyncPrintByNewThread(){
    Thread threadA = new Thread(() -> {
        try {
            TimeUnit.SECONDS.sleep(1);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println("async: 清风拂山岗");
    });
    threadA.start();
    try {
        TimeUnit.SECONDS.sleep(1);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
    System.out.println("Main:他强任他强");

}

运行结果如下:

【多线程05】Java中的异步编程

②和③这种情况就说明新建的线程在没有其他同步机制的保证下,无法保证谁先运行,谁后运行!因此异步编程使用的场景之一就是在执行前后业务逻辑关系不大的场景时,就可以将一些耗时的操作采用这种方式进行操作,典型的就是安卓开发时,主线程不能访问网络资源,需要再子线程中进行(这个举例好像不太恰当)!

Future 和 FutureTask

虽然通过新建线程的方式可以实现异步编程,但是他也有很多限制,比如频繁的新建、销毁线程带来了资源浪费,比如无法知道线程的运行情况,任务是开始了还是取消了?任务运行的结果是什么?

因此在 JDK1.5 时,引入了 Future 和 FutureTask 进一步丰富了Java异步编程的能力。

Future

Future 是一个接口,该接口中定义了如下的方法:

public interface Future<V> {
    boolean cancel(boolean mayInterruptIfRunning);
    boolean isCancelled();
    boolean isDone();
    V get() throws InterruptedException, ExecutionException;
    V get(long timeout, TimeUnit unit)
        throws InterruptedException, ExecutionException, TimeoutException;
}

boolean cancel(boolean mayInterruptIfRunning):该方法用于通知一个任务是否需要取消 ,传入true 表示需要取消,反之则不需要;如果该任务已经完成,或者取消或者由于其他原因不能取消将会返回false,如果任务成功取消,则该任务将再也不会运行!

boolean isCancelled():查看任务是否取消,如果已取消,则返回 true

isDone():判断任务是否完成,如果任务正常完成,出异常,或者取消,都将返回 true

get():获取计算结果,如果任务没有执行完毕,则阻塞直到任务执行完成!

FutureTask

FutureTask 实现了 Future 接口的同时也实现了Runnaable接口,因此可以接收一个Runnable 或者Callable对象作为参数,也正因如此可以提交给线程池使用!这样也不用频繁的创建和销毁线程池!

下面是使用示例:

// 创建线程池
ExecutorService executorService = Executors.newFixedThreadPool(5);
// 创建一个Callable任务
Callable<Integer> callable = () -> {
    // 模拟耗时计算任务
    Thread.sleep(2000);
    return 20230613;
};
// 创建一个FutureTask,并将Callable任务传递给它
FutureTask<Integer> futureTask = new FutureTask<>(callable);
// 提交FutureTask到线程池执行
Future<?> future = executorService.submit(futureTask);
// 在这里可以进行其他操作
try {
    // 获取任务的结果 如果没有计算完毕,会一直阻塞在这里
    Integer result = futureTask.get();
    System.out.println("任务结果为:" + result);
} catch (InterruptedException | ExecutionException e) {
    e.printStackTrace();
}
// 关闭线程池
executorService.shutdown();
// output: 任务结果为:20230613

Future 和 FutureTask 解决了第一种方式里面的一些问题,但是由于其本身方法在获取计算结果(get())时仍然会阻塞当前线程,因此当我们的任务不需要返回结果时,这是一种尚可接受的方式,但是在我们需要获取结果时,这并不是一个很好的异步方法!

CompletableFuture

CompletableFutureJDK1.8引入的,在使用 Future 和 FutureTask 进行异步编程时,如果异步编程之间存在依赖关系,如线程B的计算结果依赖线程A的异步计算结果作为输入、亦或者需要组合线程A和B的计算结果,那么使用Future 和 FutureTask 不能够很好的解决这种,需要我们进行额外的编码才能够达到这种效果,而且对于程序员而言,懒才是我们最大的生产力,如果每一次都通过添加额外的代码来达到效果,比如我只想用电脑打印一个hello world,你却告诉我要先安装jdk、然后配置环境变量,安装idea等其他操作,显然这不够"懒",而且额外的同步控制代码写起来真的很麻烦!! 而 CompletableFuture 就能够让我们足够的"懒"!

下面是CompletableFuture的一个使用示例:

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;

public class CompletableFutureExample {
    public static void main(String[] args) {
        // 创建一个CompletableFuture对象
        CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
            // 模拟异步任务,返回结果
            try {
                Thread.sleep(3000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return "Hello, CompletableFuture!";
        });
        // 处理异步任务的结果
        future.thenAccept(result -> {
            System.out.println("异步任务完成,结果为:" + result);
        });
        // 阻塞等待异步任务的完成,并获取结果
        try {
            String result = future.get();
            System.out.println("异步任务的最终结果为:" + result);
        } catch (InterruptedException | ExecutionException e) {
            e.printStackTrace();
        }
    }
}

上面的代码就简单的演示了CompletableFuture对象,接下来就详细的描述这个类的简单用法,后面专门写一篇关于这个类的源码学习。

创建异步任务

在创建异步任务时,有如下两个方法供我们使用:

  • supplyAsync():用于执行有返回值的异步任务
  • runAsync():用于执行没有返回值的异步任务
CompletableFuture<String> supplyAsync = CompletableFuture.supplyAsync(() -> "创建有返回值的对象");
CompletableFuture<Void> runAsync = CompletableFuture.runAsync(() -> System.out.println("没有返回值!"));
try {
    String s = supplyAsync.get();
    System.out.println("supplyAsync = " + s);
    Void unused = runAsync.get();
    System.out.println("runAsync = " + unused);
} catch (InterruptedException e) {
    e.printStackTrace();
} catch (ExecutionException e) {
    e.printStackTrace();
}

【多线程05】Java中的异步编程

异步任务结果的处理

如果我们需要对异步计算的结果进行处理,可以使用thenAccept(),该方法接收一个Comsumer Function作为参数,具体使用方式如下:

CompletableFuture<String> supplyAsync = CompletableFuture.supplyAsync(() -> "我的返回值会作为thenAccept()方法的参数");
supplyAsync.thenAccept((result) -> System.out.println("接收的result = " + result));
// 输出:接收的result = 我的返回值会作为thenAccept()方法的参数

除此之外,还有其他组合使用CompletableFuture的方法:

  • thenApply():对任务的结果进行转换。
  • thenCompose():将两个 CompletableFuture 串联起来,一个 CompletableFuture 的结果作为另一个CompletableFuture的输入。
  • thenCombine():将两个 CompletableFuture 的结果进行合并。
  • allOf():等待多个 CompletableFuture 全部完成。
  • anyOf():等待多个 CompletableFuture 中的任何一个完成。

下面的示例代码演示了thenApply()thenCompose()thenCombine()这三个方法的用法:

CompletableFuture<Object> supplyAsync = CompletableFuture.supplyAsync(() -> "Hello");

CompletableFuture<String> thenApplyFuture = supplyAsync.thenApply((result) -> {
    String s = (String) result;
    return s.toUpperCase(Locale.ROOT);
});
CompletableFuture<String> supplyAsync2 = CompletableFuture.supplyAsync(() -> "CompletableFuture");
CompletableFuture<String> completableFuture = supplyAsync.thenCompose((result1) -> supplyAsync2.thenApply((result2) -> {
    System.out.println("result1 = " + result1);
    System.out.println("result2 = " + result2);
    return result1 + "," + result2;
}));

CompletableFuture<Integer> intFuture1 = CompletableFuture.supplyAsync(() -> 20);
CompletableFuture<Integer> intFuture2 = CompletableFuture.supplyAsync(() -> 30);

intFuture1.thenCombine(intFuture2,(result1,result2) ->{
    System.out.println("intResult1 = " + result1);
    System.out.println("intResult2 = " + result2);
    return  result1 + result2;
}).thenAccept((sum)-> System.out.println("thenCombine() sum = " +sum));

try {
    String convertResult = thenApplyFuture.get();
    System.out.println("thenApply() 转换结果:convertResult = " + convertResult);
    String composeResult = completableFuture.get();
    System.out.println("thenCompose() 串联结果:composeResult = "+composeResult);
} catch (InterruptedException e) {
    e.printStackTrace();
} catch (ExecutionException e) {
    e.printStackTrace();
}

运行结果如下图: 【多线程05】Java中的异步编程

接下演示allOf()以及anyOf()的方法使用:

CompletableFuture<String> supplyAsync = CompletableFuture.supplyAsync(() -> {
    try {
        System.out.println("supplyAsync " + Instant.now());
        TimeUnit.SECONDS.sleep(3);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
    return "hello";
});
CompletableFuture<String> supplyAsync2 = CompletableFuture.supplyAsync(() -> {

    try {
        System.out.println("supplyAsync2 "+Instant.now());
        TimeUnit.SECONDS.sleep(10);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
    return "world";
});
CompletableFuture<Void> allOf = CompletableFuture.allOf(supplyAsync, supplyAsync2);
allOf.thenRun(()-> {
    System.out.println("All Finished "+ Instant.now());
    try {
        System.out.println(supplyAsync.get());
        System.out.println(supplyAsync2.get());
    } catch (InterruptedException e) {
        e.printStackTrace();
    } catch (ExecutionException e) {
        e.printStackTrace();
    }
});
CompletableFuture<Object> anyOf = CompletableFuture.anyOf(supplyAsync, supplyAsync2);
anyOf.thenRun(()->{
    System.out.println("any Finished "+ Instant.now());
    try {
        System.out.println(supplyAsync.get());
    } catch (InterruptedException e) {
        e.printStackTrace();
    } catch (ExecutionException e) {
        e.printStackTrace();
    }
    System.out.println("any 执行完毕 "+ Instant.now());
});

allOf.join();
anyOf.join();

【多线程05】Java中的异步编程

从结果可以看到 anyOf() 只需要其中一个完成就可以,allOf() 则需要等待全部完成!

异常处理

CompletableFuture 还提供了对异常情况的处理,如运行时出现的异常:

  • exceptionally():在 CompletableFuture 发生异常时,触发的回调函数
  • handle():处理正常或者异常运行的情况
  • whenComplete():在 CompletableFuture 完成时触发的回调

下面是方法的使用示例:

CompletableFuture<Object> future = CompletableFuture.supplyAsync(() -> {
    throw new RuntimeException("我宕机了!!!");
    // return "正常返回";
});
future.exceptionally((ex) -> {
    System.out.println("exceptionally():"+ex.getMessage());
    return "处理异常之后, 返回默认结果";
});

future.handle((result,ex)->{
    if (ex != null){
        System.out.println("异常handle(): " + ex.getMessage());
    }else {
        System.out.println("正常handle():"+ result);
    }
    return "我可以处理正常和异常这两种情况!";
});

future.whenComplete((result,ex)->{
    if (ex != null){
        System.out.println("whenComplete(): exception occur");

    }else {
        System.out.println("whenComplete(): " + result);
    }
});
future.join();

先是正常返回,然后是抛出异常,下面是运行结果截图:

【多线程05】Java中的异步编程

任务的终结

在新建异步任务之后,有时候需要我们手动改变异步任务的状态,我们可以通过complete()completeExceptionallycancel()手动指定任务的完成,异常完成,任务的取消。

CompletableFuture<String> future = new CompletableFuture<>();
// 在某个地方将CompletableFuture标记为完成状态
future.complete("Completed value");
future.thenAccept(result -> System.out.println(result));
CompletableFuture<String> exFuture = new CompletableFuture<>();
exFuture.completeExceptionally(new RuntimeException("异常"));
exFuture.exceptionally((ex)-> {
    System.out.println(ex.getMessage());
    return "异常!";
});
CompletableFuture<String> cancelFuture = new CompletableFuture<>();
cancelFuture.cancel(true);
System.out.println(" isCancelled = " + cancelFuture.isCancelled());

future.join();
exFuture.join();
cancelFuture.join();

运行结果如下图: 【多线程05】Java中的异步编程

Spring boot 中的异步编程

除了上面提到的异步,Spring框架也提供了对异步编程的支持,以Spring Boot为例,具体方法步骤如下:

首先在启动类上使用@EnableAsync注解。

接着自定义线程池,然后再具体的service层方法上通过@Async指定使用即可。

@Slf4j
@Component
public class ThreadPoolExecutorConfig {
    @Bean(name = "defaultThreadPoolExecutor", destroyMethod = "shutdown")
    public ThreadPoolExecutor systemCheckPoolExecutorService() {
        return new ThreadPoolExecutor(3, 10, 60, TimeUnit.SECONDS,
                new LinkedBlockingQueue<Runnable>(10000),
                Executors.defaultThreadFactory(),
                (r, executor) -> log.error("system pool is full! "));
    }
}
// service方法
@Async("defaultThreadPoolExecutor")
@Override
public Boolean execute(Integer num) {
    log.info("Thread:" + Thread.currentThread().getName() + " , Task:" + num);
    return true;
}

Guava 中的异步编程

在 Google 提供的 Java 工具包中,也提供了异步编程的能力,首先在项目中引入相关依赖。

<dependency>
    <groupId>com.google.guava</groupId>
    <artifactId>guava</artifactId>
    <version>30.0-jre</version>
</dependency>

接下来,我们可以使用 ListenableFuture 来执行异步任务。假设我们有一个模拟的耗时计算任务,返回一个字符串结果。我们可以使用 Guava 的 MoreExecutors类中的方法来创建一个 ExecutorService 来执行异步任务。

import com.google.common.util.concurrent.*;

import java.util.concurrent.Callable;
import java.util.concurrent.Executors;

public class ListenableFutureExample {

    public static void main(String[] args) {
        // 创建一个ExecutorService
        ListeningExecutorService executorService = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(10));
        // 提交一个异步任务
        ListenableFuture<String> futureResult = executorService.submit(new Callable<String>() {
            @Override
            public String call() throws Exception {
                // 模拟耗时的计算任务
                Thread.sleep(2000);
                return "Hello, ListenableFuture!";
            }
        });
        // 添加一个回调函数来处理任务完成后的结果
        Futures.addCallback(futureResult, new FutureCallback<String>() {
            @Override
            public void onSuccess(String result) {
                System.out.println("任务成功完成,结果为:" + result);
                executorService.shutdown(); // 关闭ExecutorService
            }
            @Override
            public void onFailure(Throwable t) {
                System.out.println("任务执行失败:" + t.getMessage());
                executorService.shutdown(); // 关闭ExecutorService
            }
        },executorService);
    }
}

除去上面提到的这几种方式,还有如 hutool工具包,以及一些其他的异步编程方式,不过这里就不做介绍了,感兴趣的请自行了解学习!