likes
comments
collection
share

Java CompletableFuture 多线程操作利器使用指南

作者站长头像
站长
· 阅读数 3

摘要:本文主要介绍JDK8后新增的CompletableFuture来操作多线程,让大家更加优雅简单的使用。

简介

CompletableFuture结合了Future的优点,提供了非常强大的Future的扩展功能,可以帮助我们简化异步编程的复杂性,提供了函数式编程的能力,可以通过回调的方式处理计算结果,并且提供了转换和组合CompletableFuture的方法。

案例开始

线程池说明

  • 用户不指定线程池,则所有异步回调都共用CommonPool,容易让系统出现瓶颈。
  • 推荐大家使用的时候,自定义线程池

自定义线程池

@Configuration
public class JobConfig {

    @Bean(name = "MyCompletableFutureThreadPoolTaskExecutor")
    public ThreadPoolTaskExecutor threadPoolTaskExecutor() {
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        executor.setCorePoolSize(10);
        executor.setMaxPoolSize(20);
        executor.setQueueCapacity(500);
        executor.setKeepAliveSeconds(60);
        executor.setThreadNamePrefix("MyCompletableFutureThreadPool-");
        // 拒绝策略,如果超过最大容量就由提交线程处理
        executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
        executor.initialize();
        return executor;
    }

}

两种异步实现方式

  • runAsync:不会返回结果,只等待线程执行完成
  • supplyAsync:异步返回执行结果

runAsync方式调用

JobService 接口

@Slf4j
@Service
public class JobService {

    public List<String> testOne() throws Exception{
        List<String> resultList = new CopyOnWriteArrayList<>();
        CompletableFuture<Void> task1 = CompletableFuture.runAsync(() -> {
            try{
                log.info("task1线程"+Thread.currentThread().getName());
                Thread.sleep(5000L);
                resultList.add("task1Result");
            }catch (Exception ex){
                log.error("task1异常",ex);
            }
        });
        CompletableFuture<Void> task2 = CompletableFuture.runAsync(() -> {
            try{
                log.info("task2线程"+Thread.currentThread().getName());
                Thread.sleep(1000L);
                resultList.add("task2Result");
            }catch (Exception ex){
                log.error("task2异常",ex);
            }
        });
        CompletableFuture.allOf(task1, task2).get();
        log.info("异步任务执行完成");
        return resultList;
    }

}

JobController调用

@RestController
@RequestMapping(value = "job")
public class JobController {

    @Autowired
    private JobService jobService;

    @GetMapping(value = "job1")
    public Object job1() throws Exception{
        return jobService.testOne();
    }

}

结果

2024-04-06 19:03:36.595  INFO 24852 --- [onPool-worker-9] com.example.home.cf.JobService           : task1线程ForkJoinPool.commonPool-worker-9
2024-04-06 19:03:36.595  INFO 24852 --- [onPool-worker-2] com.example.home.cf.JobService           : task2线程ForkJoinPool.commonPool-worker-2
2024-04-06 19:03:41.604  INFO 24852 --- [nio-8080-exec-1] com.example.home.cf.JobService           : 异步任务执行完成
换成自定义线程池
@Slf4j
@Service
public class JobService {

    @Qualifier("MyCompletableFutureThreadPoolTaskExecutor")
    @Autowired
    private ThreadPoolTaskExecutor threadPoolTaskExecutor;

    public List<String> testOne() throws Exception{
        List<String> resultList = new CopyOnWriteArrayList<>();
        CompletableFuture<Void> task1 = CompletableFuture.runAsync(() -> {
            try{
                log.info("task1线程"+Thread.currentThread().getName());
                Thread.sleep(5000L);
                resultList.add("task1Result");
            }catch (Exception ex){
                log.error("task1异常",ex);
            }
        }, threadPoolTaskExecutor);
        CompletableFuture<Void> task2 = CompletableFuture.runAsync(() -> {
            try{
                log.info("task2线程"+Thread.currentThread().getName());
                Thread.sleep(1000L);
                resultList.add("task2Result");
            }catch (Exception ex){
                log.error("task2异常",ex);
            }
        }, threadPoolTaskExecutor);
        CompletableFuture.allOf(task1, task2).get();
        log.info("异步任务执行完成");
        return resultList;
    }

}
结果
2024-04-06 19:18:36.056  INFO 23104 --- [ureThreadPool-1] com.example.home.cf.JobService           : task1线程MyCompletableFutureThreadPool-1
2024-04-06 19:18:36.057  INFO 23104 --- [ureThreadPool-2] com.example.home.cf.JobService           : task2线程MyCompletableFutureThreadPool-2
2024-04-06 19:18:41.065  INFO 23104 --- [nio-8080-exec-1] com.example.home.cf.JobService           : 异步任务执行完成

supplyAsync

该方式是返回结果的

Job2Service

@Slf4j
@Service
public class Job2Service {

    @Qualifier("MyCompletableFutureThreadPoolTaskExecutor")
    @Autowired
    private ThreadPoolTaskExecutor threadPoolTaskExecutor;

    public List<String> run() throws Exception{
        List<String> resultList = new CopyOnWriteArrayList<>();
        List<CompletableFuture<String>> completableFutures = new ArrayList<>();
        CompletableFuture<String> task1 = CompletableFuture.supplyAsync(() -> {
            try {
                log.info("task1线程" + Thread.currentThread().getName());
                Thread.sleep(5000L);
                return "task1Result";
            } catch (Exception ex) {
                log.error("task1异常", ex);
                return "task1Error";
            }
        }, threadPoolTaskExecutor);
        CompletableFuture<String> task2 = CompletableFuture.supplyAsync(() -> {
            try {
                log.info("task2线程" + Thread.currentThread().getName());
                Thread.sleep(5000L);
                return "task2Result";
            } catch (Exception ex) {
                log.error("task2异常", ex);
                return "task2Error";
            }
        }, threadPoolTaskExecutor);
        completableFutures.add(task1);
        completableFutures.add(task2);
        completableFutures.forEach(completableFuture -> {
            try{
                resultList.add(completableFuture.get());
            }catch (Exception ex){
                log.error("获取结果失败",ex);
            }
        });
        log.info("异步任务执行完成");
        return resultList;
    }
}

Controller

@RestController
@RequestMapping(value = "job")
public class JobController {

    @Autowired
    private Job2Service job2Service;

    @GetMapping(value = "job2")
    public Object job2() throws Exception{
        return job2Service.run();
    }

}

结果

2024-04-06 19:36:50.124  INFO 21484 --- [ureThreadPool-1] com.example.home.cf.Job2Service          : task1线程MyCompletableFutureThreadPool-1
2024-04-06 19:36:50.125  INFO 21484 --- [ureThreadPool-2] com.example.home.cf.Job2Service          : task2线程MyCompletableFutureThreadPool-2
2024-04-06 19:36:55.129  INFO 21484 --- [nio-8080-exec-1] com.example.home.cf.Job2Service          : 异步任务执行完成

thenCombine方法

合并结果集并返回一个CompletableFuture

@GetMapping(value = "testThenCombine")
public Object testThenCombine(){
    CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> {
        //模拟异步任务
        try {
            Thread.sleep(2000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return "welcome to ";
    });

    CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> {
        //模拟异步任务
        try {
            Thread.sleep(3000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return "java";
    });

    CompletableFuture<String> resultFuture = future1.thenCombine(future2, (result1, result2) -> result1 + " " + result2);
    return resultFuture.join();
}

总结

回顾整个CompletableFuture的用法主要可概括为以下几点,阻塞获取结果都使用get()join()方法

  • 提交任务 runAsync() 与 supplyAsync()
  • 简单用法 get() 与 join() 与 complete()

get()方法需要自己处理异常;join()无需自己处理异常;无论是get还是join底层都调用了waitingGet()方法

  • 链式处理 theRun()、thenAccept() 和 thenApply()
  • 组合处理 thenCompose() 与 thenCombine() 、与anyOf()

allOf():所有任务完成; anyOf():任一一个任务完成;

常用组合

  • 非异步返回结果的runAsync()+CompletableFuture.allOf(xxx,xxx).get()
  • 异步返回结果的supplyAsync()+列表循环CompletableFuture.get()来处理结果集