likes
comments
collection
share

使用SpringBoot的线程池处理异步任务

作者站长头像
站长
· 阅读数 13

介绍

其实 SpringBoot 已经为我们创建并配置好了这个东西,这里就来学习一下如何来使用 SpringBoot 为我们设置的线程池。

如有错误欢迎联系我指正!

使用

创建配置类

首先我们需要创建一个配置类来让 SpringBoot 加载,并且在里面设置一些自己需要的参数。

@Configuration
@EnableAsync
public class ExecutorConfig {

    private static final Logger logger = LoggerFactory.getLogger(ExecutorConfig.class);

    @Value("${async.executor.thread.core_pool_size}")
    private int corePoolSize;
    @Value("${async.executor.thread.max_pool_size}")
    private int maxPoolSize;
    @Value("${async.executor.thread.queue_capacity}")
    private int queueCapacity;
    @Value("${async.executor.thread.keep_alive_seconds}")
    private int keepAliveSeconds;
    @Value("${async.executor.thread.name.prefix}")
    private String namePrefix;

    @Bean(name = "asyncServiceExecutor")
    public Executor asyncServiceExecutor() {
        logger.info("开启SpringBoot的线程池!");

        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();

        // 设置核心线程数
        executor.setCorePoolSize(corePoolSize);
        // 设置最大线程数
        executor.setMaxPoolSize(maxPoolSize);
        // 设置缓冲队列大小
        executor.setQueueCapacity(queueCapacity);
        // 设置线程的最大空闲时间
        executor.setKeepAliveSeconds(keepAliveSeconds);
        // 设置线程名字的前缀
        executor.setThreadNamePrefix(namePrefix);
        // 设置拒绝策略:当线程池达到最大线程数时,如何处理新任务
        // CALLER_RUNS:在添加到线程池失败时会由主线程自己来执行这个任务
        executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());

        // 线程池初始化
        executor.initialize();

        return executor;
    }
}

首先,

  • @Configuration 的作用是表明这是一个配置类。
  • @EnableAsync 的作用是启用 SpringBoot 的异步执行

其次,关于线程池的设置有

  • corePoolSize: 核心线程数,当向线程池提交一个任务时池里的线程数小于核心线程数,那么它会创建一个线程来执行这个任务,一直直到池内的线程数等于核心线程数
  • maxPoolSize: 最大线程数,线程池中允许的最大线程数量。关于这两个数量的区别我会在下面解释
  • queueCapacity: 缓冲队列大小,用来保存阻塞任务队列(注意这里的队列放的是任务而不是线程)
  • keepAliveSeconds: 允许线程存活时间(空闲状态下),单位为秒,默认60s
  • namePrefix: 线程名前缀
  • RejectedExecutionHandler: 拒绝策略,当线程池达到最大线程数时,如何处理新任务。线程池为我们提供的策略有
    • AbortPolicy:默认策略。直接抛出 RejectedExecutionException
    • DiscardPolicy:直接丢弃掉被拒绝的任务,且不会抛出任何异常
    • DiscardOldestPolicy:丢弃掉队列中的队头元素(也就是最早在队列里的任务),然后重新执行 提交该任务 的操作
    • CallerRunsPolicy:由主线程自己来执行这个任务,该机制将减慢新任务的提交

关于 corePoolSizemaxPoolSize 的区别也是困惑了我很久,官方文档上的解释说的很清楚。我的理解如下:

这个线程池其实是有点“弹性的”。当向线程池提交任务时:

  • 当前运行的线程数 < corePoolSize

    即使其它的工作线程处于空闲状态,线程池也会创建一个新线程来执行任务

  • corePoolSize < 当前运行的线程数 < maxPoolSize

    • 队列已满

      则 创建新线程来执行任务

    • 队列未满

      则 加入队列中

  • 当前运行的线程数 > maxPoolSize

    • 队列已满

      则 拒绝任务

    • 队列未满

      则 加入队列中

所以当想要创建固定大小的线程池时,将 corePoolSizemaxPoolSize 设置成一样就行了。

最后,别忘了给方法加上 @Bean 注解,否则 SpringBoot 不会加载。

这里因为我加了 @Value 注解,可以在 application.properties 中配置相关数据,如

# 配置核心线程数
async.executor.thread.core_pool_size = 5
# 配置最大线程数
async.executor.thread.max_pool_size = 5
# 配置队列大小
async.executor.thread.queue_capacity = 999
# 配置线程池中的线程的名称前缀
async.executor.thread.name.prefix = test-async-
# 配置线程最大空闲时间
async.executor.thread.keep_alive_seconds = 30

在具体的方法中使用

配置完上面那些使用起来就轻松了,只需在业务方法前加上 @Async 注解,它就会异步执行了。

在 Service 中添加如下方法

@Async("asyncServiceExecutor")
// 注:@Async所修饰的函数不能定义为static类型,这样异步调用不会生效
public void asyncTest() throws InterruptedException {
    logger.info("任务开始!");

    System.out.println("异步执行某耗时的事...");
    System.out.println("如休眠5秒");
    Thread.sleep(5000);

    logger.info("任务结束!");
}

然后在 Controller 里调用一下这个方法,在网页上连续发送请求做一个测试。 我这里连续发起了5次请求,可以看到这5个任务确实是成功地异步执行了。

使用SpringBoot的线程池处理异步任务

我设置的线程池大小为 5,所以当超过 5 个任务被提交时,会放入阻塞队列中

使用SpringBoot的线程池处理异步任务

到这里,基本的异步执行任务就实现了。

自定义

虽然它提供给我们的线程池已经很强大了,但是有时候我们还需要一些额外信息,比如说我们想知道这个线程池已经执行了多少任务了、当前有多少线程在运行、阻塞队列里还有多少任务等等。那么这个时候我们就可以自定义我们的线程池。

自定义很简单,自己写一个类继承 Spring 提供的 ThreadPoolTaskExecutor,在此之上做修改就好了。如

public class VisibleThreadPoolTaskExecutor extends ThreadPoolTaskExecutor {

    private static final Logger logger = LoggerFactory.getLogger(VisibleThreadPoolTaskExecutor.class);

    public void info() {
        ThreadPoolExecutor executor = getThreadPoolExecutor();

        if (executor == null) return;

        String info = "线程池" + this.getThreadNamePrefix() +
                "中,总任务数为 " + executor.getTaskCount() +
                " ,已处理完的任务数为 " + executor.getCompletedTaskCount() +
                " ,目前正在处理的任务数为 " + executor.getActiveCount() +
                " ,缓冲队列中任务数为 " + executor.getQueue().size();

        logger.info(info);
    }

    @Override
    public void execute(Runnable task) {
        info();
        super.execute(task);
    }

    @Override
    public void execute(Runnable task, long startTimeout) {
        info();
        super.execute(task, startTimeout);
    }

    @Override
    public Future<?> submit(Runnable task) {
        info();
        return super.submit(task);
    }

    @Override
    public <T> Future<T> submit(Callable<T> task) {
        info();
        return super.submit(task);
    }

    @Override
    public ListenableFuture<?> submitListenable(Runnable task) {
        info();
        return super.submitListenable(task);
    }

    @Override
    public <T> ListenableFuture<T> submitListenable(Callable<T> task) {
        info();
        return super.submitListenable(task);
    }
}

然后在我们的配置类 ExecutorConfig 中将

ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); 改为 ThreadPoolTaskExecutor executor = new VisibleThreadPoolTaskExecutor();, 也就是使用我们自己定义的线程池,然后会在相应的任务执行(execute())、任务提交(submit())时打印我们需要的信息了。

打印结果,在此之前已处理了5个任务:

使用SpringBoot的线程池处理异步任务

查询线程池信息

上面自定义线程池后想查询信息只能在线程池中的方法查询,那如果我想在任意地方查询线程池的信息呢?那也是可以的,而且非常简单。我这里写一个接口来查询线程池的任务信息以做示例。

首先修改一下线程池里的 Info() 方法,让它返回我们需要的信息。

public String info() {
    ThreadPoolExecutor executor = getThreadPoolExecutor();
    if (executor == null) return "线程池不存在";

    String info = "线程池" + this.getThreadNamePrefix() +
            "中,总任务数为 " + executor.getTaskCount() +
            " ,已处理完的任务数为 " + executor.getCompletedTaskCount() +
            " ,目前正在处理的任务数为 " + executor.getActiveCount() +
            " ,缓冲队列中任务数为 " + executor.getQueue().size();

    logger.info(info);

    return info;
}

然后修改一下配置类 ExecutorConfig 里注册线程池的方法,让它注册的是我们自定义的线程池类型

@Bean(name = "asyncServiceExecutor")
public VisibleThreadPoolTaskExecutor asyncServiceExecutor() {
    logger.info("开启SpringBoot的线程池!");

    // 修改这里,要返回我们自己定义的类 VisibleThreadPoolTaskExecutor
    VisibleThreadPoolTaskExecutor executor = new VisibleThreadPoolTaskExecutor();
//        ThreadPoolTaskExecutor executor = new VisibleThreadPoolTaskExecutor();

    // 设置核心线程数
    executor.setCorePoolSize(corePoolSize);
    // 设置最大线程数
    executor.setMaxPoolSize(maxPoolSize);
    // 设置缓冲队列大小
    executor.setQueueCapacity(queueCapacity);
    // 设置线程的最大空闲时间
    executor.setKeepAliveSeconds(keepAliveSeconds);
    // 设置线程名字的前缀
    executor.setThreadNamePrefix(namePrefix);
    // 设置拒绝策略:当线程池达到最大线程数时,如何处理新任务
    // CALLER_RUNS:不在新线程中执行任务,而是有调用者所在的线程来执行
    executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());

    // 线程池初始化
    executor.initialize();

    return executor;
}

再在我们需要信息的地方自动注入这个线程池,然后调用一下 info() 方法就能得到信息了,我这里以在 Service 层中获取信息为例。


@Service
public class DemoService {

    private static final Logger logger = LoggerFactory.getLogger(DemoService.class);

    // 别忘了这里要用 SpringBoot 的自动注入
    @Autowired
    private VisibleThreadPoolTaskExecutor executor;

    // @SneakyThrows 这个注解是Lombok带的,我为了代码简洁使用的。你也可以使用 try catch 的方法。
    @SneakyThrows 
    @Async("asyncServiceExecutor")
    public void asyncTest() {
        logger.info("任务开始!");

        System.out.println("异步执行某耗时的事...");
        System.out.println("如休眠5秒");
        Thread.sleep(5000);

        logger.info("任务结束!");

        // 你甚至可以在任务结束时再打印一下线程池信息
        executor.info();
    }

    public String getExecutorInfo() {
        return executor.info();
    }
}

最后在 Controller 层中调用一下,就大功告成了!

@RestController
public class DemoController {

    @Autowired
    private DemoService demoService;

    @GetMapping("/async")
    public void async() {
        demoService.asyncTest();
    }

    @GetMapping("/info")
    public String info() {
        return demoService.getExecutorInfo();
    }
}

来看一下测试的结果吧,我这里调用 /async 一口气开启了 15 个任务,然后在不同时间使用 /info 来看看信息。

刚开始时的结果:

使用SpringBoot的线程池处理异步任务

一口气提交了15个任务后的中间结果:

使用SpringBoot的线程池处理异步任务

所有任务都执行完了的最终结果:

使用SpringBoot的线程池处理异步任务

总结

本篇到这里就结束了,篇幅略长。总结一下,要想在SpringBoot中使用它提供的线程池其实很简单,只要两步:

  1. 注册线程池(使用 @Bean 来注册),设置一些自己想要的参数
  2. 在想要异步调用的方法上加上 @Async 注解

当然你也可以不使用 @Async 注解,直接在想开线程的地方自动注入你注册的线程池,然后像普通线程池一样使用就行了。

其实关于这一方面的知识也讲得并不够详尽,比如线程池里还有哪些方法、SpringBoot是如何为我们弄得这么方便的等等,还需要多多补充知识。

参考

转载自:https://juejin.cn/post/6970927877348917261
评论
请登录