从零开始读RocketMq源码(六)延迟消息、事务消息原理解析
前言
在之前对Message在Producer发送、Broker存储、Consumer消费的源码逻辑进行了分析,了解了Message整个生命周期的动态去向。但在面试中常常会被问到一些特殊消息的实现原理,却不知道从何说起。本文不深入源码细节,仅对延迟消息、事务消息、消息重试等特性进行整理,以便于面试时更好地回答相关问题 。
准备
本篇RocketMq源码参考版本:5.2.0
延迟消息
延迟等级
rocketMq官方默认设置了18个延迟等级
1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h
发送延迟消息
Message msg = new Message(TOPIC, TAG, "OrderID188", "Hello world".getBytes(StandardCharsets.UTF_8));
//设置延迟等级
msg.setDelayTimeLevel(3);
producer.send(msg);
按照默认顺序1-18数字就对应上面的延迟时间
也可以自定义延迟时间,进入配置文件broker.conf更改
更改完成后重启broker服务生效
延迟消息原理
- 延迟消息不会直接存储到指定的Topic中
- 延迟消息都会被存储到 RocketMQ 的一个内部 Topic
SCHEDULE_TOPIC_XXXX
当中 SCHEDULE_TOPIC_XXXX
总共有 18 个 MessageQueue,对应着延迟消息的 18 个等级,RocektMQ 会根据指定的 DelayTimeLevel 来决定选择哪个 MessageQueue- 然后会有一个定时任务,每100ms执行一次来判断SCHEDULE_TOPIC_XXXX topic中的MessageQueue的消息是否到达延迟时间。
- 到达延迟时间,会将 SCHEDULE_TOPIC_XXXX中的消息投递到消息最初需要投递的Topic中,最后立马被消费者消费
注: 综上所述,因为使用了定时任务以及逻辑处理等因素,延迟消息并不会非常准时的投递,都会有一些误差时间,但几乎可以忽略不计的。
事务消息
事务消息是基于两阶段提交来完成的。
两阶段提交
-
第一阶段:也就是发送 Message,但这里的 Message 和之前的 Producer Demo 发送的不太一样,这里的 Message 叫 Half Message,即半事务消息。此类型的 Message 是不会被 Consumer 消费到的。因为第二阶段还���完成。
-
第二阶段:如果半事务消息投递成功,则会开始执行本地事务。这里分为如下三种 case。
- 如果本地事务执行成功,则会向 Broker 发送
commit
消息,被commit
过后的 Message 才能被 Consumer 消费到。 - 如果本地事务执行失败,则会向 Broker 发送
rollback
消息,Broker 则会将刚刚投递的半事务消息删除,从而保证上下游数据的一致性。 - 如果 Producer 实例或者网络出现了问题,Producer 没能及时地将本地事务执行的结果通知 Broker,Broker 会通过扫描发现某条 Message 长时间处于“半事务消息”状态,Broker 会变被动为主动,主动地向 Producer 询问此 Message 对应的事务状态。
- 如果本地事务执行成功,则会向 Broker 发送
事务消息原理
-
事务消息投递和上面延迟消息一样不会直接投递到真是的Topic中。
-
将 Message 原本真实的 Topic 和 MessageQueue 进行备份, 放入到**
PROPERTY_REAL_TOPIC
、PROPERTY_REAL_QUEUE_ID
**中保存。 -
然后将消息投递到一个内部Topic中**
RMQ_SYS_TRANS_HALF_TOPIC
**,该队列专门存储事务消息。 -
所有的 Half Message 全部都写入到
queueId
为 0 的 MessageQueue -
因为一个 Topic 下如果只有 1 个 MessageQueue,那么这个 Topic 下的所有 Message 就是全局有序的,它们会按照先来后到的顺序被消费。
-
如果本地事务执行成功进行Commit,则将RMQ_SYS_TRANS_HALF_TOPIC 队列中的消息投递到真实的Topic中,供后续流程执行。并删除这条 Half Message ,但删除也是假删除,只是给 Message 打上一个删除的 Tag。
-
如果本地事务执行失败进行rollback,则直接删除这条 Half Message ,但删除也是假删除。
-
如果本地事务迟迟没有返回结果 (默认时间是****6s),则会触发事务回查机制,
- 执行回查之前需要校验检查次数是否到达了最大值(需要手动设置,没有默认值)
- 或者是当前 Half Message 存在是否超过了 Message 保存的上限,即 3 天。
- 如果满足上面条件中的一种Half Message 会被放进
TRANS_CHECK_MAX_TIME_TOPIC
Topic 当中,这个Topic和下面会讲到的死信队列性质一样。 - 一旦判定为需要执行事务回查逻辑,那么当前这条 Half Message 就算已经被消费了
- 在没达到最大的校验次数之前,都还需要将其投递到事务队列当中,以便下次重试时再次执行 Check 逻辑。
- 如果回查成功则删除投递的 Half Message
消费重试
重试时间
消息消费失败后,并不会立即重试,而是有一个递增的时间间隔来进行重试 重试次数默认16次
10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h
只比延迟消息的时间间隔等级少了前两个,延迟消息总共有 18 个等级,而消息重试使用了原延迟消息的第 3 - 18
等级
消费重试原理
- 重试的 Message,RocketMQ 的做法并不是将其投递回原 Topic,而是重试队列 。
- 每个 ConsumerGroup 都有自己的重试队列,其名称是由特定的前缀拼接上 ConsumerGroup 所组成,默认 %RETRY%+消费者组名称。
- 所以在 Consumer 启动时,就会同时消费其 ConsumerGroup 对应的重试队列和普通队列。
- 消费失败的 Message,Consumer 会将其投回 Broker,相当于这条 Message 已经被消费掉了,之后重试的只是内容相同、但实际不是同一条的 Message, 和上面的事务消息相似。
- 然后会校验重试的次数,如果达到16次则会进入死信队列 , 组成为 %DLQ%+消费者组名称
- 未达到最大重试次数,则会根据重试间隔时间等级将其投递到延迟队列SCHEDULE_TOPIC_XXXX中,这部分逻辑与延迟消息一样。
- 然后等到了延迟等级对应的时间之后,再投递到 ConsumerGroup 所对应的重试队列当中,供后续消费。
总结
通过对RocketMQ延迟消息、事务消息、消息重试等特性的了解,可以更全面地掌握消息队列的功能和实现原理。这不仅有助于应对面试中的各种问题,还能在实际工作中更有效地利用这些特性,提高系统的可靠性和效率。希望本文的整理对你有所帮助。
转载自:https://juejin.cn/post/7394006604281708596