likes
comments
collection
share

深入探讨Java线程池的最佳实践:配置、监控与避坑

作者站长头像
站长
· 阅读数 9

在多线程编程中,使用线程池是一种常见的优化手段,可以有效管理和复用线程,提高系统的性能和稳定性。本文将深入探讨Java线程池的最佳实践,并介绍一些常见问题及解决方案。

1. 正确使用声明线程池

Java中线程池的使用有多种方式,常见的包括原生的ThreadPoolExecutor和Spring框架提供的ThreadPoolTaskExecutor。在使用ThreadPoolExecutor时,需要注意正确设置参数,例如核心线程数、最大线程数、队列容量等,以及选择合适的拒绝策略。

核心参数解读

  1. corePoolSize(核心线程数): 表示线程池中始终保持存活的线程数。当新任务到来时,如果当前线程池中的线程数小于corePoolSize,则会创建新的线程执行任务,即使存在空闲的线程也会被创建。

  2. maximumPoolSize(最大线程数): 表示线程池中允许的最大线程数。当队列任务数等于队列容量时,即达到队列的最大负载时,线程池会继续创建新线程,但不超过maximumPoolSize

  3. workQueue(任务队列): 用于保存等待执行的任务的阻塞队列。当线程池中的线程数大于等于corePoolSize时,新任务会被放入任务队列。如果任务队列已满,且线程池中的线程数小于maximumPoolSize,则会创建新的线程执行任务。

  4. keepAliveTime(线程空闲时间): 当线程池中的线程数大于核心线程数,并且没有新任务到达时,空闲线程的存活时间。如果超过这个时间,空闲线程将被回收,直到线程池线程数等于corePoolSize

  5. unit(时间单位): 用于指定keepAliveTime的时间单位,例如TimeUnit.SECONDS

  6. threadFactory(线程工厂): 用于创建新线程的工厂。默认情况下,使用Executors.defaultThreadFactory()

  7. handler(饱和策略): 当线程池和任务队列都满了,即达到饱和状态时,采用的策略。常见的饱和策略有四种:

    • ThreadPoolExecutor.AbortPolicy(默认):抛出RejectedExecutionException,拒绝新任务。
    • ThreadPoolExecutor.CallerRunsPolicy:由调用线程执行该任务,可能会影响性能。
    • ThreadPoolExecutor.DiscardPolicy:直接丢弃新任务,不做任何处理。
    • ThreadPoolExecutor.DiscardOldestPolicy:丢弃队列中最早的未处理任务,然后将新任务添加到队列中。

对于Spring框架中的ThreadPoolTaskExecutor,其配置可以更加灵活,可以通过Spring的配置文件或注解方式来定义。确保合理配置,以满足业务需求。

2. 检测线程池运行状态

了解线程池的运行状态对于系统监控和故障排查至关重要。在Spring Boot项目中,可以使用Actuator组件提供的端点来获取线程池的运行状态。对于ThreadPoolExecutor,可以通过相关API来实现状态的监控和管理,例如:

public static Map<String, Object> getPoolSatus() {
        int corePoolSize = executor.getCorePoolSize();
        int poolSize = executor.getPoolSize();
        long completedTaskCount = executor.getCompletedTaskCount();
        int activeCount = executor.getActiveCount();
        long taskCount = executor.getTaskCount();
        long count = executor.getQueue().stream().count();
        Map<String, Object> poolDetail  = new HashMap<>();
        poolDetail.put("corePoolSize", corePoolSize);
        poolDetail.put("poolSize", poolSize);
        poolDetail.put("completedTaskCount", completedTaskCount);
        poolDetail.put("activeCount", activeCount);
        poolDetail.put("taskCount", taskCount);
        poolDetail.put("队列等待线程数", count);
        return poolDetail;
    }

Actuator组件集成

步骤1: 添加Actuator依赖

首先,在你的Spring Boot项目的pom.xml文件中,确保添加了spring-boot-starter-actuator依赖:

xmlCopy code<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-actuator</artifactId>
</dependency>

步骤2: 配置Actuator端点

application.propertiesapplication.yml文件中,开启Actuator的端点:

yamlCopy codemanagement:
  endpoints:
    web:
      exposure:
        include: '*'

上述配置将开启所有的Actuator端点,包括线程池监控。如果你想要更加精细地控制哪些端点暴露,可以将include中的通配符替换为你需要的端点,比如healthmetrics等。

步骤3: 访问线程池监控端点

一旦Actuator配置完成,你就可以通过访问相应的端点来获取线程池的监控信息。默认情况下,线程池相关的信息可以通过 /actuator/metrics/jvm.threads.* 端点获取。

例如,可以通过以下链接获取当前存活的线程数:

httpCopy codeGET http://localhost:8080/actuator/metrics/jvm.threads.live

或者,你可以通过 /actuator/metrics 获取所有可用的监控指标,然后在返回的JSON中找到线程池相关的信息。

3. 建议不同业务使用不同的线程池

在多业务场景下,为不同的任务使用独立的线程池是一种良好的实践。可以针对业务的特性创建合适的线程池,任务的执行流程要谨慎设计,以避免潜在的问题。

下面通过一个例子说明,父任务和子任务共用一个线程池在一些情况下会导致死锁问题

// 线程池参数定义如下, 线程池拒绝策略为抛出异常
private static final ThreadPoolExecutor executor =
        new ThreadPoolExecutor(4, 8, 1L, TimeUnit.SECONDS, new ArrayBlockingQueue<>(20),
            new ThreadPoolExecutor.AbortPolicy());
public class ThreadPoolBlockTest {

    // 父任务执行,连续提交4个任务
    public void task() throws InterruptedException {
        for (int i = 0; i < 4; i++) {
            int finalI = i;
            try {
                ThreadPoolUtils.execute(()->{
                    System.out.println("父任务T"+ finalI+"开始执行");
                    try {
                        Thread.sleep(1000);
                        sonTask(finalI);
                    } catch (Exception e) {
                        e.printStackTrace();
                    }

                });
            } catch (Exception e) {
                System.out.println("父任务T"+ finalI+"执行异常+++++++++++++++++++++++++++++");
                e.printStackTrace();
            }
            
        }
        // 为了看到最后的线程池的状态,主线程停顿10s打印线程池中的信息
        Thread.sleep(10000);
        printThreadPoolSatus();
    }
    
    // 子任务中提交4个任务执行,等待每个任务执行完毕获取执行结果
    public void sonTask(int id) {
        List<Future<?>> futures = new ArrayList<>();
        for (int i = 0; i < 4; i++) {
            int finalI = i;
            try {
                 Callable<String> callable = () -> {
                     System.out.println("任务T"+id+"的子任务"+ finalI +"开始执行");
                    try {
                        Thread.sleep(1000);
                        return "任务T"+id+"的子任务"+ finalI;
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                     return null;
                 };
                Future<?> future = ThreadPoolUtils.submit(callable);
                futures.add(future);
            } catch (Exception e) {
                System.out.println("任务T"+id+"的子任务"+ finalI +"执行异常————————————————————————————————");
                e.printStackTrace();
            }
        }
        futures.forEach(future -> {
            try {
                Object o = future.get();
                System.out.println(o +"执行完成");
            } catch (InterruptedException | ExecutionException e) {
                e.printStackTrace();
            }
        });
    }
    
    // 打印线程池信息
    public void printThreadPoolSatus() {
            Map<String, Object> poolSatus = ThreadPoolUtils.getPoolSatus();
            System.out.println(poolSatus);
    }

    public static void main(String[] args) {

        ThreadPoolBlockTest test = new ThreadPoolBlockTest();
        try {
            test.task();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
        
}

运行结果如下,未打印子任务的运行情况,相隔10s打印出线程池的状态信息;从结果可以看出,4个父任务提交到线程池执行,等待各自的子任务执行,由于核心线程用完,提交16个子任务线程进入等待队列(队列容量为20),不能触发线程池使用新的线程,只能等待核心线程运行完成进行释放,即形成了相互等待,导致死锁。

父任务T0开始执行
父任务T1开始执行
父任务T2开始执行
父任务T3开始执行
{activeCount=4, taskCount=20, 队列等待线程数=16, poolSize=4, corePoolSize=4, completedTaskCount=0}

4. 线程池命名与配置

为线程池设置有意义的命名和前缀是方便问题定位和监控的关键。通过合理配置线程池参数,如超时时间、队列类型等,可以更好地适应不同的业务场景。

5. 关闭线程池

关闭线程池时需要注意,调用shutdownshutdownNow是异步通知关闭处理,前者不会中断正在执行的线程,只中断空闲的线程,而后者会尝试中断正在执行的线程(如果能响应中断,就会被终止,比如sleep,await,Condition等)

如果需要等待线程池彻底关闭,可以使用awaitTermination方法。确保在系统关闭或者不再需要线程池时及时关闭,以释放资源。

6. 避免耗时任务阻塞线程池

在线程池中避免放入耗时任务,以免阻塞其他任务的执行。可以通过异步处理或使用专门的线程池来处理耗时任务。

7. 小心线程池的使用陷阱

  • 重复创建线程池: 不要重复创建线程池,应该采用单例模式或者通过Spring容器管理,避免资源浪费。
  • Spring内部线程池配置: 在Spring项目中,手动自定义线程池时应配置合理的参数,以免每个请求都创建一个线程,导致系统资源耗尽。
  • 线程池与ThreadLocal共用: 当线程池与ThreadLocal共用时,可能会取到旧值,特别是在线程复用时。可以考虑使用TransmittableThreadLocal来解决这个问题,这是阿里巴巴开发的解决方案。
  • 线程池中核心线程数和最大线程数一致,而队列的容量小于一次性提交的任务数时,可能会触发拒绝策略。还是复用上面的例子进行说明, 循环100次,去一次性提交4个任务,看看结果,线程池核心和最大设置成一样,队列长度改为3,小于一次性提交数量4。
private static final ThreadPoolExecutor executor =
        new ThreadPoolExecutor(8, 8, 1L, TimeUnit.SECONDS, new ArrayBlockingQueue<>(3),
            new ThreadPoolExecutor.AbortPolicy());
    public void sonTask(int id) {
        List<Future<?>> futures = new ArrayList<>();
        for (int i = 0; i < 4; i++) {
            int finalI = i;
            try {
                 Callable<String> callable = () -> {
                     System.out.println("任务T"+id+"的子任务"+ finalI +"开始执行");
                    try {
                        Thread.sleep(1000);
                        return "任务T"+id+"的子任务"+ finalI;
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                     return null;
                 };
                Future<?> future = ThreadPoolUtils.submit(callable);
                futures.add(future);
            } catch (Exception e) {
                System.out.println("任务T"+id+"的子任务"+ finalI +"执行异常————————————————————————————————");
                e.printStackTrace();
            }
        }
        futures.forEach(future -> {
            try {
                Object o = future.get();
                System.out.println(o +"执行完成");
            } catch (InterruptedException | ExecutionException e) {
                e.printStackTrace();
            }
        });
    }
    
    public void printThreadPoolSatus() {
            Map<String, Object> poolSatus = ThreadPoolUtils.getPoolSatus();
            System.out.println(poolSatus);
    }

    public static void main(String[] args) {

        ThreadPoolBlockTest test = new ThreadPoolBlockTest();
        for (int i = 0; i < 100; i++) {
            test.sonTask(i);
        }
        
    }

下面是截取的部分运行日志,可以看出触发了拒绝策略。按照一般的认知来说,一次性提交4个任务,是小于核心线程8的,执行完成之后释放线程,可以再次执行。

任务T12的子任务3执行完成
任务T13的子任务3执行异常————————————————————————————————
任务T13的子任务1开始执行
任务T13的子任务0开始执行
任务T13的子任务2开始执行
java.util.concurrent.RejectedExecutionException: Task java.util.concurrent.FutureTask@35f983a6 rejected from java.util.concurrent.ThreadPoolExecutor@17c68925[Running, pool size = 8, active threads = 0, queued tasks = 3, completed tasks = 51]
    at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2047)
    at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:823)
    at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1369)
    at java.util.concurrent.AbstractExecutorService.submit(AbstractExecutorService.java:134)
    at com.thunisoft.practice.thread.ThreadPoolUtils.submit(ThreadPoolUtils.java:35)
    at com.thunisoft.practice.thread.ThreadPoolBlockTest.sonTask(ThreadPoolBlockTest.java:64)
    at com.thunisoft.practice.thread.ThreadPoolBlockTest.main(ThreadPoolBlockTest.java:90)
任务T13的子任务0执行完成
任务T13的子任务1执行完成
任务T13的子任务2执行完成
任务T14的子任务0开始执行
任务T14的子任务2开始执行

所以在使用线程池时,充分理解线程池的特性和原理,结合实际业务需求进行合理的配置和使用,可以有效提高系统的并发性能和稳定性。

转载自:https://juejin.cn/post/7337942542024556579
评论
请登录