面试必考:怎样解决线上消息队列积压问题
引言
提到消息队列,最常问到的问题就是消息积压,真实线上环境该怎么解决消息积压的问题?真实情况并不是网上说的把积压的消息投递到一个新Topic中,然后慢慢消费。这样做,成本太高、流程复杂、操作麻烦,而且还是一次性操作,不可持续,下次再出现这种问题,还要再次手动操作,太麻烦。
今天跟大家一块学习一下线上真实环境遇到消息积压的解决方案。以最常用的Kafka和RocketMQ消息队列为例。 先聊一下消息积压产生的原因,针对不同的原因,设计不同的解决方案。
消息积压的原因
消息积压是指消息在队列中等待处理的数量不断增加。这种情况会导致系统性能下降,影响整个应用的响应时间和可靠性。 常用原因如下:
- 生产者发送速度过快
在某些情况下,生产者可能会突然增加发送速率,或者持续发送大量消息,超出了系统的处理能力。
- 流量高峰: 特定事件或情况可能导致消息量暴增,如促销活动、日志收集系统在错误发生时的突增等。
- 生产者配置不当: 生产者配置错误可能导致发送过多的消息到队列。
- 消费者处理速度慢
最常见的原因是消费者处理消息的速度跟不上生产者生产消息的速度。这可能是由于:
- 消费者处理逻辑复杂或效率低: 如果每条消息的处理时间过长,会导致处理队列中的消息堆积。
- 消费者数量不足: 消费者的数量可能不足以处理入队消息的数量,尤其是在高峰时间。
- 消费者处理能力预估不足: 针对消费者的处理能力没有做好压测和限流。
- 消费端存在业务逻辑bug,导致消费速度低于平常速度。
- 资源限制
服务器或网络的资源限制也可能导致消息处理能力受限,从而引起消息积压:
- 服务器性能限制: CPU、内存或I/O性能不足,无法高效处理消息。
- 网络问题: 网络延迟或带宽不足也会影响消息的发送和接收速度。
- 错误和异常处理
错误处理机制不当也可能导致消息积压:
- 失败重试: 消费者在处理某些消息失败后进行重试,但如果重试策略不当或错误频发,会导致处理速度降低。
- 死信消息: 处理失败的消息过多,导致死信队列中的消息堆积。
- 设计和配置问题
系统设计不合理或配置不当也可能导致消息积压:
- 错误的分区策略: 在 Kafka 等系统中,分区策略不合理可能导致部分分区过载。
- 不合理的消息大小: 消息太大或太小都可能影响系统的处理性能。
解决方案
针对消息队列中消息积压的问题,常用的解决方案如下:
- 增加消费者数量或优化消费者性能
- 水平扩展消费者:增加消费者的数量,以提高并行处理能力。在一个消费者组里增加更多的消费者,可以提高该组的消息处理能力。
- 优化消费逻辑:减少每条消息处理所需的时间。例如,通过减少不必要的数据库访问、缓存常用数据、或优化算法等方式。
- 多线程消费:如果消费者支持多线程处理,并且是非顺序性消息,可通过增加线程数来提升消费速率。
- 控制消息生产速率
- 限流措施:对生产者实施限流措施,确保其生产速率不会超过消费者的处理能力。
- 批量发送:调整生产者的发送策略,使用批量发送减少网络请求次数,提高系统吞吐量。
- 资源优化和网络增强
- 服务器升级:提升处理能力,例如增加 CPU、扩大内存,或提高 I/O 性能。
- 网络优化:确保网络带宽和稳定性,避免网络延迟和故障成为瓶颈。
- 改进错误和异常处理机制
- 错误处理策略:合理设置消息的重试次数和重试间隔,避免过多无效重试造成的额外负担。
- 死信队列管理:对于无法处理的消息,移动到死信队列,并定期分析和处理这些消息。
- 系统和配置优化
- 消息分区策略优化:合理配置消息队列的分区数和分区策略,确保负载均衡。
- 消息大小控制:控制消息的大小,避免因单个消息过大而影响系统性能。
- 实施有效的监控与告警
- 实时监控:实施实时监控系统,监控关键性能指标如消息积压数、处理延迟等。
- 告警系统:设定阈值,一旦发现异常立即触发告警,快速响应可能的问题。
- 消费者和生产者配置调整
- 调整消费者拉取策略:例如,调整
max.poll.records
和fetch.min.bytes
等参数,根据实际情况优化拉取数据的量和频率。 - 生产者发送策略优化:调整
linger.ms
和batch.size
,使生产者在发送消息前进行更有效的批处理。
Kafka消息积压
具体到解决 Kafka 消息积压问题,有以下调优策略解决 Kafka 消息积压问题:
- 增加分区数量
在增加消费者之前,确保 Topic 有足够的分区来支持更多的消费者。如果一个 Topic 的分区数量较少,即使增加了消费者数量,也无法实现更高的并行度。你可以通过修改 Topic 的配置来增加分区数:
# 使用 Kafka 命令行工具增加分区
kafka-topics.sh --bootstrap-server <broker-list> --alter --topic <topic-name> --partitions <new-number-of-partitions>
- 增加消费者数量
在 Kafka 中,一个 Partition 只能由消费者组中的一个消费者消费,因此增加消费者数量是可以提高并发处理能力的。具体操作就是启动多个消费者,并加入到同一个消费者组里,同时确保这些消费者拥有相同的group.id
(消费者组ID)。
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import java.util.Collections;
import java.util.Properties;
public class Consumer {
public static void main(String[] args) {
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "consumer-group"); // group.id必须相同
props.put("enable.auto.commit", "true");
props.put("auto.commit.interval.ms", "1000");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Collections.singletonList("your-topic"));
try {
while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records) {
System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
}
}
} finally {
consumer.close();
}
}
}
- 修改消费者配置
通过修改下面的消费者配置,可以提高消费者处理速度:
- fetch.min.bytes 消费者每次从服务器拉取最小数据量,增加这个值,可以减少消费者请求次数,降低网络消耗,提升消费者处理性能。
- fetch.max.bytes 与上面配置相对应,这是消费者每次从服务器拉取最大数据量,增加这个值,也有同样的效果。
- fetch.max.wait.ms 这个配置指定了消费者在读取到 fetch.min.bytes 设置的数据量之前,最多可以等待的时间。增加这个值,也有同样的效果。
- max.poll.records 消费者每次可以拉取的最大记录数,增加这个值,也有同样的效果,不过会增加每次消息处理的时间。
- max.partition.fetch.bytes 消费者从每个分区里拉取的最大数据量
RocketMQ消息积压
具体到解决 RocketMQ 消息积压问题,有以下调优策略解决 RocketMQ 消息积压问题:
1. 增加消费者数量
增加消费者数量可以通过简单地启动更多的消费者实例来实现,并配置为相同的消费者组。
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.message.MessageExt;
import java.util.List;
public class Consumer {
public static void main(String[] args) throws Exception {
// 创建消费者实例,指定消费者组名
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumerGroup1");
// 指定 NameServer 地址
consumer.setNamesrvAddr("localhost:9876");
// 订阅一个或多个主题
consumer.subscribe("TopicTest", "*");
// 注册回调实现类来处理从 brokers 拉取回来的消息
consumer.registerMessageListener(new MessageListenerConcurrently() {
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
for (MessageExt msg : msgs) {
System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), new String(msg.getBody()));
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
// 启动消费者实例
consumer.start();
System.out.printf("Consumer Started.%n");
}
}
- 修改消费者配置
可以在消费者配置中调整消费线程的数量来提高并行处理能力。
// 设置消费者最小线程数
consumer.setConsumeThreadMin(20);
// 设置消费者最大线程数
consumer.setConsumeThreadMax(40);
增加消费者每次拉取消息的数量
// 设置每次拉取消息的数量,默认是32条
consumer.setPullBatchSize(32);
增加消费者监听器每次处理消息的最大数量
// 设置每批最多消费10条消息,默认是1
consumer.setConsumeMessageBatchMaxSize(10);
- 修改生产者失败重试配置
减少生产者发送消息失败重试次数,也可以减少消息积压问题。
DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName");
producer.setNamesrvAddr("localhost:9876");
producer.setRetryTimesWhenSendFailed(0); // 设置发送失败时的重试次数,默认是2
producer.start();
- 修改消费者失败重试配置
减少消费者处理消息失败重试次数,也可以减少消息积压问题。
// 设置最大重试次数,普通消息默认是16次,顺序消息默认是Integer最大值
consumer.setMaxReconsumeTimes(3);
- 增加消费者分区数量
可以使用 RocketMQ 的命令行工具 mqadmin 来更新 Topic 的配置。需要注意的是,增加分区是一个重要操作,它可能会影响现有的消息平衡和消费者的消费行为。
mqadmin updateTopic -n {nameServerAddress} -c {clusterName} -t {topicName} -w {writeQueueNums} -r {readQueueNums}
- {nameServerAddress}: NameServer 的地址。
- {clusterName}: RocketMQ 集群名称。
- {topicName}: 需要增加分区的 Topic 名称。
- {writeQueueNums} 和 {readQueueNums}: 新的分区(MessageQueue)数量。这通常是一个比当前更高的数字。
转载自:https://juejin.cn/post/7362783257257050112