likes
comments
collection
share

线程池错用案例

作者站长头像
站长
· 阅读数 23

背景知识

使用Java作为开发语言的同学,为了提升系统的运行效率,把原来单线程串行执行的流程,改为多线程并发执行,应该是提升系统性能的一个常规手段。

多线程编码时的基本套路就是使用线程池。如Java线程池(ThreadPoolExecutor)在创建时需要设置七个核心参数,也是面试时必背的八股文:

  • 核心线程数(corePoolSize)  :这是线程池维护的最小线程数量。当提交一个任务到线程池时,如果当前没有空闲线程,则会创建一个新的线程来执行任务。即使其他空闲的基本线程能够执行新任务也会创建新线程。
  • 最大线程数(maximumPoolSize)  :这是允许线程池中最大的线程数。当线程池中的活跃线程数超过这个值时,超出的线程将被阻塞在工作队列中等待。
  • 空闲线程存活时间(keepAliveTime)  :当线程数大于核心线程数时,多出来的那部分线程在最大多长时间内如果没有接到新的任务,将会被终止。这个时间单位由unit参数指定。
  • 时间单位(unit)工作队列(workQueue)  、线程工厂(threadFactory)拒绝策略(handler)

在开发人员心中,有一股神奇的暗示,就会觉得线程池这个如此消耗系统资源的对象,一定是为了提升系统的性能才开始使用,不然我使用单线程不是更简单吗?

但是,并不是所有的场景都需要提升性能,一旦用错了,可能多线程反而成为了劣势,一个案例让我啪啪打脸。

线程池错用案例

打脸案例

场景: kafka消息数据每分钟进行一次同步,监控消息数据,进行处理。

class KafkaTest{
    private final static ThreadPoolExecutor KAFKA_TEST = 
      new ThreadPoolExecutor(32, 64, 60, TimeUnit.SECONDS,  
        new LinkedBlockingQueue<>(6144), 
        new NamedThreadFactory("kafkaTest"),  
        new ThreadPoolExecutor.CallerRunsPolicy());
    
    @KafkaListener(topics = "test_topic", groupId = "test_groupId")  
    public void consume(String content) {  
      try {  
        JSONObject obj = JSONObject.parseObject(content);
        if(!checkParamObj(obj)) {
          log.warn("consume content is error. {}", obj);
        }
        NewObj newObj = transfer(obj);
        KAFKA_TEST.execute(() -> {
          KafkaUtils.send(newObj);
        });
      } catch (Exception e) {  
        log.error("consumer kafka error", e);  
      }  
    }
}

实现方式:

  • 消费者拉取消息,进行消息的初步检测和封装。
  • 使用线程池,对消息进行转发,提升消息的处理效率。

上线后服务运行情况:

  • kafka每分钟生产的消息,在5秒内就快速完成消费,并且出现了大量的如下报错:

线程池错用案例

线程池错用案例

原因分析

  • kafka消息消费时拉取的速率过快,而转发消息这块的速率跟不上,从而导致线程池的队列堆积,达到最大积压数6144后线程池抛出任务拒绝异常。

根因分析

这里使用线程池的作用是什么?

  1. 消息解析和消息处理存在速度上的差异,需要使用多线程提高消息的处理效率。
  2. 防止kafka单线程消费过慢出现堆积,提升kafka的消费速度,从而来提升kafka的稳定性。

针对以上的案例是否有必要使用多线程呢?

  • 没有必要。使用线程池异步处理提高了kafka的消费速度,但是线程池处理不过来就会出现报错。由于有1分钟的处理时间,完全没有必要在几秒内就处理完。去除多线程,在时间足够的时候,用时间换空间。
  • 这个保护kafka也没有这个必要,kafka堆积消费慢一点,也影响不了kafka的稳定性,因为并不是消息的生产速率大于消费速率。

总结

Kafka默认的消费线程数是1个。这意味着在没有特别配置的情况下,每个Consumer实例只会使用一个线程来进行消息消费。在Spring整合Kafka时,可以使用concurrency参数来指定消费者的线程数,如下配置如下:

spring:
  kafka:
    consumer:
      properties:
        concurrency: 1

因此存在如下两种情况:

  • concurrency < 线程池配置的最大线程数时。提升kafka的消费速率,防止kafka出现消息积压,影响kafka的稳定性。
  • concurrency > 线程池配置的最大线程数时。减少了kafka的消费速率,保护在线程池中使用的资源对象。

所以在消费端处理消息队列的消息时,线程池存在如下两种作用:

  1. 提升消息队列处理消息的性能。
  2. 保护系统部分资源的稳定性。比如操作数据库时,防止并发量过多,一下子就把数据库打挂掉。

Java线程池的三个最重要作用是:

  1. 提高性能:通过重用已存在的线程来减少线程创建和销毁的开销,从而提高系统的整体性能。
  2. 控制并发级别:通过设定最大线程数来控制同时运行的线程数量,避免过多线程导致的资源争用和上下文切换问题。
  3. 资源管理:有效管理线程资源,确保线程的合理分配和使用,避免资源浪费。
转载自:https://juejin.cn/post/7400157847537860617
评论
请登录