likes
comments
collection
share

SpringBoot使用异步线程池实现生产环境批量数据推送

作者站长头像
站长
· 阅读数 13

前言

  SpringBoot使用异步线程池:

  1、编写线程池配置类,自定义一个线程池;

  2、定义一个异步服务;

  3、使用@Async注解指向定义的线程池;

  这里以我工作中使用过的一个案例来做描述,我所在公司是医疗行业,敏感数据需要上报到某监管平台,所以有一个定时任务在流量较小时(一般是凌晨后)执行上报行为。

  但特殊时期会存在一定要在工作时间大批量上报数据的情况,且要求短时间内就要完成,此时就考虑写一个专门的异步上报接口手动执行,利用线程池上报,极大提高了速度。

编写线程池配置类

 1 import lombok.extern.slf4j.Slf4j;
 2 import org.springframework.context.annotation.Bean;
 3 import org.springframework.context.annotation.Configuration;
 4 import org.springframework.scheduling.annotation.EnableAsync;
 5 import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
 6
 7 import java.util.concurrent.Executor;
 8 import java.util.concurrent.ThreadPoolExecutor;
 9
10 /**
11  * 类名称:ExecutorConfig
12  * ********************************
13  * <p>
14  * 类描述:线程池配置
15  *
16  * @author guoj
17  * @date 2021-09-07 09:00
18 */
19 @Configuration
20 @EnableAsync
21 @Slf4j
22 public class ExecutorConfig {
23    /**
24     * 定义数据上报线程池
25     * @return
26     */
27    @Bean("dataCollectionExecutor")
28    public Executor dataCollectionExecutor() {
29
30        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
31
32        // 核心线程数量:当前机器的核心数
33        executor.setCorePoolSize(
34                Runtime.getRuntime().availableProcessors());
35
36        // 最大线程数
37        executor.setMaxPoolSize(
38                Runtime.getRuntime().availableProcessors() * 2);
39
40        // 队列大小
41        executor.setQueueCapacity(Integer.MAX_VALUE);
42
43        // 线程池中的线程名前缀
44        executor.setThreadNamePrefix("sjsb-");
45
46        // 拒绝策略:直接拒绝
47        executor.setRejectedExecutionHandler(
48                new ThreadPoolExecutor.AbortPolicy());
49
50        // 执行初始化
51        executor.initialize();
52
53        return executor;
54    }
55
56 }

PS:

  1)、需要注意,这里一定要自己定义ThreadPoolTaskExecutor线程池,否则springboot的异步注解会执行默认线程池,存在线程阻塞导致CPU飙高及内存溢出的风险。这一点可以参考阿里开发手册,线程池定义这块明确提到了这一点;

  2)、在@Bean注解中定义线程池名称,后面异步注解会用到。

编写异步服务

 1 /**
 2 * 异步方法的服务, 不影响主程序运行。
 3 */
 4 @Service
 5 public class AsyncService {
 6
 7    private final Logger log = LoggerFactory.getLogger(AsyncService.class);
 8
 9    /**
10     * 发送短信
11     */
12    @Async("sendMsgExecutor")
13    public void sendMsg(String access_token, Consult item, Map<String, String> configMap) {
14        // 此处编写发送短信业务
15        // 1、buildConsultData();
16        // 2、sendMsg();
17    }
18
19    /**
20     * 发送微信订阅消息
21     */
22    @Async
23    public void sendSubscribeMsg(String access_token, Consult item, Map<String, String> configMap) {
24        // 此处编写发送微信订阅消息业务
25        // 1、buildConsultData();
26        // 2、sendSubscribeMsg();
27    }
28
29    /**
30     * 数据并上报
31     */
32    @Async("dataCollectionExecutor")
33    public void buildAndPostData(String access_token, Consult item, Map<String, String> configMap) {
34        // 此处编写上报业务,如拼接数据,然后执行上报。
35        // 1、buildConsultData();
36        // 2、postData();
37    }
38 }

PS:

  1)、以上是代码片段,个人经验认为专门定义一个异步service存放各个异步方法最佳,这样可以避免编码时一些误操作比如异步方法不是void或者是private修饰,导致@Async注解失效的情况,同时可以安排每个注解指向不同的自定义线程池更加灵活;

  2)、@Async注解中的名称就是上面定义的自定义线程池名称,这样业务执行时就会从指定线程池中获取异步线程。

异步批量上报数据

 1 @Autowired
 2 private AsyncService asyncService;
 3
 4 /**
 5  * 手动上报问诊记录,线程池方式。
 6  */
 7 public void manualUploadConsultRecordsAsync(String channel, Date startTime, Date endTime) {
 8
 9     // 查询指定时间内的问诊记录
10    List<Consult> consultList = consultService
11        .findPaidListByChannelAndTime(channel, startTime, endTime, configMap.get("serviceId"));
12
13    if (!CollectionUtils.isEmpty(consultList)) {
14
15        log.debug("[SendWZDataService][manualUploadConsultRecordsAsync]>>>> 手动上报问诊记录, 一共[{}]条", consultList.size());
16
17        consultList.forEach((item) -> {
18            try {
19                // 异步调用,使用线程池。
20                asyncService.buildAndPostData(access_token, item, configMap);
21            } catch (Exception ex) {
22                log.error("[SendWZDataService][manualUploadConsultRecordsAsync]>>>> 手动上报问诊记录发生异常: ", ex);
23            }
24        });
25
26    }
27 }

总结

  以上方式已经在生产环境运行,在工作时间内执行过很多次,一次数万条记录基本是几分钟内就全部上报完毕,而正常循环遍历时一次大概需要半个小时左右。

  线程池的使用方式往往来源于业务场景,如果类似的业务不存在紧急处理的情况,大体还是以任务调度执行为主,因为更安全。如果存在紧急处理的情况,那么使用SpringBoot+线程池的方式不仅能节省非常多的时间,且不占用主线程的执行空间。

END