RocketMQ 源码探究 -- 延迟队列实现
简介
延迟队列是能存储未来的任务,并能在指定时间节点触发的一种数据结构。
RocketMQ 提供了对延迟消息的支持,我们只需在普通消息上设置 delayTimeLevel,RocketMQ 会在指定的 timeLevel 时触发,完成消息的投递工作。
需要注意的是,RocketMQ 默认支持 18 个时间等级的配置,可以通过在 broker 的配置文件中修改。
下面是简单的代码示例。
public static void main(String[] args) throws Exception {
DefaultMQProducer producer = new DefaultMQProducer("delay-producer");
producer.setNamesrvAddr("localhost:9876");
producer.setSendMsgTimeout(100 * 1000);
producer.start();
for (int i = 0; i < 18; i++) {
Message message = new Message("delay", ("hello-delay" + i).getBytes(StandardCharsets.UTF_8));
message.setDelayTimeLevel(i + 1);
producer.send(message);
}
producer.shutdown();
}
架构设计
延迟队列整体架构
在我看来,延迟队列主要由存储和调度两个核心技术点组成。
存储
如何确保不被提前消费
消费者是跟 consume queue 交互,所以为达成不被提前消费目标,必然需要对 consume queue 的设计进行改造。
消息被发送到 broker 时,首先正常写入 commit log,在分发至 consume queue 时,会将 topic 改成 SCHEDULE_TOPIC_XXXX
,将 queue id 改成 delayTimeLevel - 1
。同时暂存原来的 topic 和 queue。
SCHEDULE_TOPIC_XXXX
被归为系统 topic,当 client 订阅系统 topic 时,会抛出异常。
public static boolean isSystemTopic(String topic, RemotingCommand response) {
if (isSystemTopic(topic)) {
response.setCode(ResponseCode.SYSTEM_ERROR);
response.setRemark("The topic[" + topic + "] is conflict with system topic.");
return true;
}
return false;
}
如何保证有序性
在设计延迟队列时,一般需要保证存储的数据是有序的,这样才能减少遍历访问条数。
Java 的 DelayQueue
依赖小顶堆做排序,Redisson 的 DelayQueue 依赖 zset
做数据排序。
在 RocketMQ 中,只支持固定时间的延迟。RocketMQ 按照不同延迟分 18 个队列来保证数据有序,即在一个队列中,先入队列的永远比后入队列的先到期。
调度
定时任务
定时任务流程图
在 RocketMQ 中,每个延迟队列都会分配一个 Timer
,Timer
轮询 consume queue
数据,进行延时逻辑判断。
当系统启动时,会读取 $ROCKETMQ_HOME/config/delayOffset.json
内容,加载每个队列中的 offset。
{
"offsetTable":{1:3,2:2,3:33,4:2,5:2,6:2,7:2,8:2,9:2,10:2,11:2,12:2,13:2,14:2,15:2,16:2,17:12,18:1
}
}
从 consume queue 中加载从 offset 开始的所有数据,按顺序进行遍历。
注意:在生成 consume queue 时,会将该条消息的过期时间提前算好,放到 consume queue 的 tag code 中。 这也是 consume queue 的 tag code 本来只需 4 字节(hashcode),但分配了 8 字节的原因。
如果消息到了过期时间,会到 commit log 中将数据补齐,换成替换前的 topic 和 queue,再次作为普通消息投递到 commit log 中,等待消费者的拉取。
如果没有到过期时间,系统会启动一个下次请求到来的时的 Timer。这是因为分队列 + 固定时间延迟的好处,不用担心两次请求之间插入新的请求。
offset 持久化
如果 RocketMQ 宕机,如何保证下次启动仍然能够从上次的 offset 消费呢?
RocketMQ 维护了一个每隔 10s 运行一次的 Timer,目的是将每个队列的消费情况写入文件,等到下次重启时,可以从上次消费位置进行消费。
因为不是实时同步,有一定的时间间隔,所以,有可能会出现重复消息的情况。我们需要在消费端做幂等处理,避免重复消费。
总结
- RocketMQ 支持 18 个时间等级的延迟消息
- 在存储层面,RocketMQ 通过修改 topic 保证不被提前消费,生成 18 个队列保证消息的有序性
- 基于 Timer 机制实现轮询 consume queue 逻辑
- 固定时间等级的可用性比较差,需要改造才能用于业务开发
转载自:https://juejin.cn/post/7374824876111200308