线程池错用案例
背景知识
使用Java作为开发语言的同学,为了提升系统的运行效率,把原来单线程串行执行的流程,改为多线程并发执行,应该是提升系统性能的一个常规手段。
多线程编码时的基本套路就是使用线程池。如Java线程池(ThreadPoolExecutor
)在创建时需要设置七个核心参数,也是面试时必背的八股文:
- 核心线程数(corePoolSize) :这是线程池维护的最小线程数量。当提交一个任务到线程池时,如果当前没有空闲线程,则会创建一个新的线程来执行任务。即使其他空闲的基本线程能够执行新任务也会创建新线程。
- 最大线程数(maximumPoolSize) :这是允许线程池中最大的线程数。当线程池中的活跃线程数超过这个值时,超出的线程将被阻塞在工作队列中等待。
- 空闲线程存活时间(keepAliveTime) :当线程数大于核心线程数时,多出来的那部分线程在最大多长时间内如果没有接到新的任务,将会被终止。这个时间单位由
unit
参数指定。 - 时间单位(unit)、工作队列(workQueue) 、线程工厂(threadFactory)、拒绝策略(handler)。
在开发人员心中,有一股神奇的暗示,就会觉得线程池这个如此消耗系统资源的对象,一定是为了提升系统的性能才开始使用,不然我使用单线程不是更简单吗?
但是,并不是所有的场景都需要提升性能,一旦用错了,可能多线程反而成为了劣势,一个案例让我啪啪打脸。
打脸案例
场景: kafka消息数据每分钟进行一次同步,监控消息数据,进行处理。
class KafkaTest{
private final static ThreadPoolExecutor KAFKA_TEST =
new ThreadPoolExecutor(32, 64, 60, TimeUnit.SECONDS,
new LinkedBlockingQueue<>(6144),
new NamedThreadFactory("kafkaTest"),
new ThreadPoolExecutor.CallerRunsPolicy());
@KafkaListener(topics = "test_topic", groupId = "test_groupId")
public void consume(String content) {
try {
JSONObject obj = JSONObject.parseObject(content);
if(!checkParamObj(obj)) {
log.warn("consume content is error. {}", obj);
}
NewObj newObj = transfer(obj);
KAFKA_TEST.execute(() -> {
KafkaUtils.send(newObj);
});
} catch (Exception e) {
log.error("consumer kafka error", e);
}
}
}
实现方式:
- 消费者拉取消息,进行消息的初步检测和封装。
- 使用线程池,对消息进行转发,提升消息的处理效率。
上线后服务运行情况:
- kafka每分钟生产的消息,在5秒内就快速完成消费,并且出现了大量的如下报错:
原因分析
- kafka消息消费时拉取的速率过快,而转发消息这块的速率跟不上,从而导致线程池的队列堆积,达到最大积压数6144后线程池抛出任务拒绝异常。
根因分析
这里使用线程池的作用是什么?
- 消息解析和消息处理存在速度上的差异,需要使用多线程提高消息的处理效率。
- 防止kafka单线程消费过慢出现堆积,提升kafka的消费速度,从而来提升kafka的稳定性。
针对以上的案例是否有必要使用多线程呢?
- 没有必要。使用线程池异步处理提高了kafka的消费速度,但是线程池处理不过来就会出现报错。由于有1分钟的处理时间,完全没有必要在几秒内就处理完。去除多线程,在时间足够的时候,用时间换空间。
- 这个保护kafka也没有这个必要,kafka堆积消费慢一点,也影响不了kafka的稳定性,因为并不是消息的生产速率大于消费速率。
总结
Kafka默认的消费线程数是1个。这意味着在没有特别配置的情况下,每个Consumer实例只会使用一个线程来进行消息消费。在Spring整合Kafka时,可以使用concurrency
参数来指定消费者的线程数,如下配置如下:
spring:
kafka:
consumer:
properties:
concurrency: 1
因此存在如下两种情况:
concurrency
< 线程池配置的最大线程数时。提升kafka的消费速率,防止kafka出现消息积压,影响kafka的稳定性。concurrency
> 线程池配置的最大线程数时。减少了kafka的消费速率,保护在线程池中使用的资源对象。
所以在消费端处理消息队列的消息时,线程池存在如下两种作用:
- 提升消息队列处理消息的性能。
- 保护系统部分资源的稳定性。比如操作数据库时,防止并发量过多,一下子就把数据库打挂掉。
Java线程池的三个最重要作用是:
- 提高性能:通过重用已存在的线程来减少线程创建和销毁的开销,从而提高系统的整体性能。
- 控制并发级别:通过设定最大线程数来控制同时运行的线程数量,避免过多线程导致的资源争用和上下文切换问题。
- 资源管理:有效管理线程资源,确保线程的合理分配和使用,避免资源浪费。
转载自:https://juejin.cn/post/7400157847537860617