如何实现任意时刻的延迟队列 -- 以 QMQ 为例
摘要
QMQ 相较于 RocketMQ 而言支持任意时间的延迟队列,本文主要通过其架构设计和源码来探究其实现原理。
QMQ 整体架构图如下,虚线部分是与注册中心的交互,实线部分是消息的流转路径。
QMQ 架构图
与 RocketMQ 不同的是:为更好地支持延迟消息,QMQ 增加了单独的 delay server 模块。
当生产者发送延迟消息时,首先发往 delay server,等到延迟到期后,delay server 会将消息投递到 server 处,再由 consumer 消费该延迟消息,从而完成
本文会从存储和任务调度两方面来描述 QMQ 的延迟设计。
存储
存储介质一般有内存和磁盘两种。
- 内存读写效率高,操作简便。但价格相比于磁盘较贵,并且不能持久化,断电重启会丢失数据。
- 磁盘读写效率相比于内存低一些,但价格便宜,并且可以持久化。
考虑到 QMQ 默认支持两年以内的延迟消息,如果全部采用内存的话,会导致消息长驻内存,在很长时间之后才被消费,比较浪费内存资源。权衡效率与成本,QMQ 选择了内存+磁盘的存储模式。
磁盘存储设计
QMQ 和 RocketMQ 一样,都有一个消息的主存储日志,在 QMQ 中,该日志被称为 message log。message log 存储了所有主题的数据。
无额外存储设计
既然 message log 中包含所有的延迟消息数据体,那么我们只需通过定时轮询的方式,不断地判断延迟消息是否过期。如果到期,则将其变更为可被消费,消费者可直接消费消息。
缺点:
- 消息大部分是非延迟消息,遍历成本太高;
- 默认映射 72 小时内的文件,但延迟消息默认两年,会造成大量的随机 IO 操作。
点评:基本不可用。
引入额外存储
如果单纯依赖 message log,会出现大量随机 IO 操作,性能下降非常严重。因此需要设计针对于延迟消息的特定 log。
我们可以借鉴 RocketMQ 中 commit log 的设计,通过将大文件拆分(每个文件默认 1G)来减少随机 IO 的量。
需要权衡文件拆分的粒度。文件拆得太粗,在大文件中随机 IO 性能较低;文件拆得太细,文件空洞会比较严重。
在 QMQ 中,根据延迟到期时间,每小时生成一个文件。
如何设计存储内容?
一、我们可以借鉴 RocketMQ 的 consume queue 设计,存储消息在 message log 中的物理位置,然后经过一次 IO 操作取得实际的数据。
二、我们彻底将 delay server 和 message log 隔离, 将消息的全部内容存储到文件中,等到到期后,直接将消息发送到 server 即可。
我觉得第二种设计可行性较高,原因如下:
- 考虑到延迟消息的特性,会倒查过去很长时间的消息,例如现在发送了一条一年之后的延迟消息,如果采用第一种设计思路,那么 message log 的消息一年不能删除,这对磁盘空间也是极大的浪费。
- 即使限制了延迟时间,也会存在大量随机 IO 操作。
- 第二种方案使得延迟消息完全不受限于 message log,并且随机 IO 操作很少。额外的延迟消息的存储开销也能承受,毕竟正常消息还是占大多数的,如果想要进一步节省空间,可以在延迟消息单独落盘后删除 message log 中的对应延迟消息。
上面单独的额外存储在 QMQ 中被称为 schedule log,日志的流转就变成了下图所示:
QMQ 日志流转
文件传输到 message log 后,再按照到期时间被分发到不同的 schedule log 中。
内存存储设计
QMQ 默认会将离触发时间三十分钟的任务加载到内存中,等待调度任务的触发。同磁盘设计一样,我们也需要考虑消息在内存的存储模式。
一、只存储消息的偏移量数据,等到任务触发时,再到 schedule log 中找到具体消息,完成向 server 的投递。
二、存储消息的完整数据,等到任务触发时,直接向 server 投递。
我觉得第一种方案可行性更高,原因如下:
- 如果存储消息的完整数据,假设一条消息为 1KB,delay server 所在机器内存 8GB,最多能够存储 800w 消息,算下来 TPS 只有 4000 多,这还是算的内存满载情况。考虑到其它的内存占用和 GC 影响,可能最终只有不到 2000 TPS。这对于消息队列来说性能不足。
- 如果只存储偏移量数据,一条消息只需要几十个字节的存储,对于 delay server 来说,能够存储足量的数据。
- 关于随机 IO,触发的任务都是三十分钟内的,所以基本上集中于一两个文件,完全可以用内存映射去提高 IO 效率。
小结
QMQ 中存储分为内存和磁盘存储两种。在磁盘上按照时间每小时生成 schedule log 文件,存储所有延迟消息数据;在内存中存储将要触发的三十分钟内的数据,只存储数据的偏移量等关键信息,等到任务触发后,再根据偏移量去 schedule log 中获取消息数据。
调度
调度分为两部分:一是将临期磁盘数据加载到内存中;二是将内存数据调度到 server 中。
当 WheelTickManager#start
调用时,会启动两个定时调度器,分别负责上面的两个调度逻辑。
磁盘到内存调度
该调度器负责将快要触发的任务加载到内存中的时间轮中。
调度器由 ScheduledExecutorService
实现,默认每分钟执行一次。
调度器维护一个游标,游标里记录着已经加载过的 offset。当被调度时,会计算从当前时间后推三十分钟的时间戳,同时根据时间戳计算出对应的 schedule log 的文件名。
然后从 offset 开始,直到计算出的时间戳为止,不断地将数据添加到时间轮中,并更新 offset。
offset 可能被更新到半小时之后,如果此时添加了半小时内的任务怎么办?
此时会判断延迟队列执行时间与 offset 的关系,如果小于 offset,则会直接将数据加入到时间轮中。
内存到 server 调度
该调度器负责将要触发的任务投递到 server 中,供消费者消费。
调度器由 HashedTimeWheel
实现,默认每 500ms 执行一次。
该实现与 Netty 实现基本一致,只是修改了任务过期的逻辑。Netty 时间轮的调度和过期逻辑都是由一个线程执行,如果任一个步骤发生阻塞,那么会影响整体的流程。QMQ 中过期逻辑需要去 schedule log 中获取消息体,属于 IO 操作,比较耗时。因此考虑到性能因素,QMQ 将超时投递操作转交给线程池执行,避免阻塞。
时间轮实现在前文(mp.weixin.qq.com/s/1pV6nTPyv…
总结
- QMQ 通过引入单独的 delay server 来实现任意时间的延迟队列功能。
- delay server 核心技术包括存储和调度两方面。存储包括磁盘存储和内存存储,调度包括从磁盘到内存和从内存到 server。
- 成本、性能都是选型要考虑的因素。比如随机 IO、持久化、内存开销等。
转载自:https://juejin.cn/post/7374700579815440436