likes
comments
collection
share

2分钟看懂RocketMQ延迟消息核心原理

作者站长头像
站长
· 阅读数 42

前言

延迟消息在业务场景中使用的非常多,订单失效,过期通知等功能都可以借助延迟消息机制来实现。本文将从源码层面来分析Rocketmq的延迟消息实现原理机制。

一、延迟消息的使用            

先看下延迟消息的使用,发送消息逻辑和普通消息一样,只要在生产者端将Message对象中设置延迟消息的等级,Rocketmq的开源版本支持18个等级,每个等级代表一个延迟时间。

2分钟看懂RocketMQ延迟消息核心原理

Rocketmq有18个延迟等级分别对应如下延迟时间最小1s,最大2h,客户端从等级从1开始,上面设置为2,代表延迟5s。

private String messageDelayLevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h";

三、broker端原理解析

  • 配置解析与初始化定时任务

broker端分析,在Broker服务启动的时候,会初始化延迟消息的配置解析和对延迟消息处理的定时任务的开启。

2分钟看懂RocketMQ延迟消息核心原理

brocker启动时,根据不同等级开启定时器,即分别对每个延迟等级都开启一个定时器,rocketmq是基于java.util.Timer的定时器。

2分钟看懂RocketMQ延迟消息核心原理

源码中单独开启了一个定时任务来持久化对延迟等级和offset的关系。

为什么需要持久化延迟等级和offset的关系呢?

offset是每次写入消息都会实时更新,下次直接在新的offset写消息即可,持久化可以保证brocker重启后offset不丢失,实现高可用机制,并且高性能(避免重新计算)。 

  • Broker接收到生产者延迟消息处理

下面broker接收到延迟消息的处理逻辑。会将生产者提供的真实的topic和队列进行备份暂存,然后将消息存储到内置的一个延迟消息topic队列中。

2分钟看懂RocketMQ延迟消息核心原理

  • 定时将时间到的延迟消息写入真实topic和队列

延迟消息处理定时任务类DeliverDelayedMessageTimerTask,每次都将当前延迟等级对应的消息,写入真实的topic和队列,并维护新的offset,然后重新开启定时任务,等下次再重新处理该延迟等级的消息。

如果时间到了,他的处理核心逻辑比较清晰

1、 首先是还原延迟消息真实topic和队列

2分钟看懂RocketMQ延迟消息核心原理

2、将消息写入commitlog

//将消息写入真实的topic和队列
PutMessageResult putMessageResult =
    ScheduleMessageService.this.writeMessageStore
        .putMessage(msgInner);

当消息写入真实的topic和队列中后,消费者就可以正常的消费到消息了。

整理下来,总的核心流程如下:

2分钟看懂RocketMQ延迟消息核心原理         

总结

哈哈哈,上面的内容就是Rocketmq延迟消息原理了,马上过节了,祝大家中秋国庆节快乐,天天happy。

2分钟看懂RocketMQ延迟消息核心原理

2分钟看懂RocketMQ延迟消息核心原理

转载自:https://juejin.cn/post/7283361391670870016
评论
请登录