RockerMQ4.x 延时消息原理、流程梳理~
前言
相信用过RocketMq
的小伙伴都知道,RocketMq
支持延时消息,通过设置指定的延时级别
就可以让消息实现不同时效的延时
功能,今天带大家了解的就是延时消息
的原理~
如何实现延时?
在带大家正式了解RocketMq延时
原理之前,先问大家一个问题,如果我们自己来实现延时功能,我们会如何实现?
Sleep
经典永不过时,相信Sleep
是我们最早接触的具有延时功能的函数
,下面代码就可以简单实现延时5s
后执行业务逻辑
public static void main(String[] args) throws InterruptedException {
TimeUnit.SECONDS.sleep(5);
System.out.println("执行业务代码");
}
Timer
Timer
类是在 JDK 1.3 版本中引入的。它位于 java.util
包中,用于支持简单的定时任务调度。
不过Timer
也有着许多缺陷,谨慎使用~
public static void main(String[] args) throws InterruptedException {
// schedule实现延时
Timer timer = new Timer();
timer.schedule(new TimerTask() {
@Override
public void run() {
System.out.println("执行业务逻辑");
}
}, 5000);
}
ScheduledThreadPoolExecutor
ScheduledThreadPoolExecutor
类是在 JDK 1.5 版本中引入的。它是 ThreadPoolExecutor
类的子类,专门用于支持定时任务的调度和执行。
public static void main(String[] args) throws InterruptedException {
ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(1);
// schedule实现延时
executor.schedule(() -> {
System.out.println("执行业务代码");
}, 5, TimeUnit.SECONDS);
}
时间轮
有关时间轮
我就不细说了,在各大开源框架,诸如: Netty、Dubbo、Kafka
都少不了它的影子
RocketMq如何实现的延时消息?
在上面带大家了解了常见的延时方案之后,我们再来探索RcketMq
的延时原理~
RocketMq官方延时案例
下面是RocketMq
官方提供的案例,我们可以看到代码中通过setDelayTimeLevel
设定了延时级别,对应的延时时间就是10s
package org.apache.rocketmq.example.schedule;
public class ScheduledMessageProducer {
public static final String PRODUCER_GROUP = "ExampleProducerGroup";
public static final String DEFAULT_NAMESRVADDR = "127.0.0.1:9876";
public static final String TOPIC = "TestTopic";
public static void main(String[] args) throws Exception {
DefaultMQProducer producer = new DefaultMQProducer(PRODUCER_GROUP);
producer.start();
int totalMessagesToSend = 100;
for (int i = 0; i < totalMessagesToSend; i++) {
Message message = new Message(TOPIC, ("Hello scheduled message " + i).getBytes());
// 设定延时级别
message.setDelayTimeLevel(3);
// 发送消息
SendResult result = producer.send(message);
System.out.print(result);
}
// Shutdown producer after use.
producer.shutdown();
}
}
存储延时消息
Producer
回将消息发送到Broker
,接下来看看Broker
是怎么处理延时消息
的.
org.apache.rocketmq.store.CommitLog#asyncPutMessage
中会对延时消息进行特殊处理
- 如果不是事务消息且
延时级别 > 0
,说明该消息是延时消息,需要进行特殊处理 - 检查延时级别是否超过最大值
18
,如果超过则重置为18
- 保存消息的原始
topic和queueId
,并将该消息的topic
覆盖为延时消息专用的topic
,且将queueId
设置为延时级别 - 1
public CompletableFuture<PutMessageResult> asyncPutMessage(final MessageExtBrokerInner msg) {
// ......
String topic = msg.getTopic();
final int tranType = MessageSysFlag.getTransactionValue(msg.getSysFlag());
// 事务消息不支持延时
if (tranType == MessageSysFlag.TRANSACTION_NOT_TYPE
|| tranType == MessageSysFlag.TRANSACTION_COMMIT_TYPE) {
// 如果延时级别 > 0说明是延时消息
if (msg.getDelayTimeLevel() > 0) {
if (msg.getDelayTimeLevel() > this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel()) {
msg.setDelayTimeLevel(this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel());
}
// 延时消息使用同一个topic: SCHEDULE_TOPIC_XXXX
// 且queueId: 延时级别 - 1
topic = TopicValidator.RMQ_SYS_SCHEDULE_TOPIC;
int queueId = ScheduleMessageService.delayLevel2QueueId(msg.getDelayTimeLevel());
// 存储原始消息的topic、queueId
MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_TOPIC, msg.getTopic());
MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_QUEUE_ID, String.valueOf(msg.getQueueId()));
msg.setPropertiesString(MessageDecoder.messageProperties2String(msg.getProperties()));
// 更新延时消息的topic、queueId
msg.setTopic(topic);
msg.setQueueId(queueId);
}
}
// ......
}
经过上面的一番桶特殊处理及之后一些逻辑后,消息会被存储到commitLog
中去
取出延时消息
Broker
将延迟消息写到了commitLog
中后,由于Broker
替换了消息原始topic
,所以订阅该topic
的消费者此时还无法消费该消息。
org.apache.rocketmq.store.schedule.ScheduleMessageService
,Broker
启动时会启动ScheduleMessageService
而在ScheduleMessageService
中会为每个延时级别都开启一个延时任务
,延时能力正是利用我前面提到的ScheduledThreadPoolExecutor
同时也会开启一个定时任务
,固定时间持久化每个队列的消费偏移量
public class ScheduleMessageService extends ConfigManager {
private static final long FIRST_DELAY_TIME = 1000L;
// Broker启动时会初始化这个Map,key是延迟等级,共计18个
// key: 延时级别,value: 延时时间(毫秒)
private final ConcurrentMap<Integer /* level */, Long/* delay timeMillis */> delayLevelTable =
new ConcurrentHashMap<Integer, Long>(32);
// key: 延时级别,value: 消费偏移量
private final ConcurrentMap<Integer /* level */, Long/* offset */> offsetTable =
new ConcurrentHashMap<Integer, Long>(32);
public void start() {
if (started.compareAndSet(false, true)) {
this.load();
this.deliverExecutorService = new ScheduledThreadPoolExecutor(this.maxDelayLevel, new ThreadFactoryImpl("ScheduleMessageTimerThread_"));
if (this.enableAsyncDeliver) {
this.handleExecutorService = new ScheduledThreadPoolExecutor(this.maxDelayLevel, new ThreadFactoryImpl("ScheduleMessageExecutorHandleThread_"));
}
// 遍历延时级别map
for (Map.Entry<Integer, Long> entry : this.delayLevelTable.entrySet()) {
Integer level = entry.getKey();
Long timeDelay = entry.getValue();
Long offset = this.offsetTable.get(level);
if (null == offset) {
offset = 0L;
}
// 为每个延时级别开一个延时任务,延时1s执行,使用的是ScheduledThreadPoolExecutor
if (timeDelay != null) {
// enableAsyncDeliver 默认false
if (this.enableAsyncDeliver) {
this.handleExecutorService.schedule(new HandlePutResultTask(level), FIRST_DELAY_TIME, TimeUnit.MILLISECONDS);
}
this.deliverExecutorService.schedule(new DeliverDelayedMessageTimerTask(level, offset), FIRST_DELAY_TIME, TimeUnit.MILLISECONDS);
}
}
// 消费偏移量持久化定时任务,每10s执行一次
this.deliverExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
try {
if (started.get()) {
ScheduleMessageService.this.persist();
}
} catch (Throwable e) {
log.error("scheduleAtFixedRate flush exception", e);
}
}
}, 10000, this.defaultMessageStore.getMessageStoreConfig().getFlushDelayOffsetInterval(), TimeUnit.MILLISECONDS);
}
}
}
DeliverDelayedMessageTimerTask
封装了delayLevel、offset
,实现了Runnable
重写了run
方法,在run
方法中如果处于运行中,则调用executeOnTimeup
方法执行具体逻辑
class DeliverDelayedMessageTimerTask implements Runnable {
private final int delayLevel;
private final long offset;
public DeliverDelayedMessageTimerTask(int delayLevel, long offset) {
this.delayLevel = delayLevel;
this.offset = offset;
}
@Override
public void run() {
try {
if (isStarted()) {
// 核心处理逻辑
this.executeOnTimeup();
}
} catch (Exception e) {
// XXX: warn and notify me
log.error("ScheduleMessageService, executeOnTimeup exception", e);
this.scheduleNextTimerTask(this.offset, DELAY_FOR_A_PERIOD);
}
}
}
public void executeOnTimeup() {
// 根据topic和延时级别找到对应的ConsumeQueue
ConsumeQueue cq =
ScheduleMessageService.this.defaultMessageStore.findConsumeQueue(TopicValidator.RMQ_SYS_SCHEDULE_TOPIC,
delayLevel2QueueId(delayLevel));
// ......
long nextOffset = this.offset;
try {
int i = 0;
ConsumeQueueExt.CqExtUnit cqExtUnit = new ConsumeQueueExt.CqExtUnit();
for (; i < bufferCQ.getSize() && isStarted(); i += ConsumeQueue.CQ_STORE_UNIT_SIZE) {
// ......
// 当前时间
long now = System.currentTimeMillis();
// 计算投递时间,时间存储在了tag hashcode 中
long deliverTimestamp = this.correctDeliverTimestamp(now, tagsCode);
nextOffset = offset + (i / ConsumeQueue.CQ_STORE_UNIT_SIZE);
// 剩余时间
long countdown = deliverTimestamp - now;
if (countdown > 0) {
// 还未到投递时间,重新加入到延时任务,此时延时100ms
this.scheduleNextTimerTask(nextOffset, DELAY_FOR_A_WHILE);
return;
}
// 从commitLog读取message
MessageExt msgExt = ScheduleMessageService.this.defaultMessageStore.lookMessageByOffset(offsetPy, sizePy);
if (msgExt == null) {
continue;
}
// 恢复message原始的topic和queueId,之前保存过
MessageExtBrokerInner msgInner = ScheduleMessageService.this.messageTimeup(msgExt);
if (TopicValidator.RMQ_SYS_TRANS_HALF_TOPIC.equals(msgInner.getTopic())) {
log.error("[BUG] the real topic of schedule msg is {}, discard the msg. msg={}",
msgInner.getTopic(), msgInner);
continue;
}
// 将消息再次写入commitlog中,topic是原始topic,这样消费者就可以去消费了
boolean deliverSuc;
// enableAsyncDeliver默认为false
if (ScheduleMessageService.this.enableAsyncDeliver) {
deliverSuc = this.asyncDeliver(msgInner, msgExt.getMsgId(), nextOffset, offsetPy, sizePy);
} else {
deliverSuc = this.syncDeliver(msgInner, msgExt.getMsgId(), nextOffset, offsetPy, sizePy);
}
// 写入commitLog失败,延时重试
if (!deliverSuc) {
this.scheduleNextTimerTask(nextOffset, DELAY_FOR_A_WHILE);
return;
}
}
// 更新消费进度
nextOffset = this.offset + (i / ConsumeQueue.CQ_STORE_UNIT_SIZE);
} catch (Exception e) {
log.error("ScheduleMessageService, messageTimeup execute error, offset = {}", nextOffset, e);
} finally {
bufferCQ.release();
}
// 更新offset,重新添加延时任务处理延时消息
this.scheduleNextTimerTask(nextOffset, DELAY_FOR_A_WHILE);
}
-
根据
topic
和延时级别
找到对应的ConsumeQueue
-
计算校验消息是否到达投递时间
- 如果还没到达则重新加入延时任务
- 到达投递时间,则从
commitLog
中取出消息,将消息的topic、queueId
设置为原始值,并重新写入到commitLog
中
-
更新消费偏移量
nextOffset
,并重新添加延时任务处理延时消息
总结
RocketMq
延时消息大致流程并不复杂,下面简单总结下
-
当我们需要发送延时消息的时候,需要对消息设置延时级别,标明该消息是延时消息
-
Broker
收到延时消息后,会重新设置延时消息的topic、queueId
,并备份消息原始的topic、queueId
,之后写入到commitLog
-
Broker
会启动ScheduleMessageService
,ScheduleMessageService
会为每个延时级别
启动一个延时任务(利用ScheduledThreadPoolExecutor
)-
根据
topic、延时级别
找到对应的consumerQueue
,然后检查消息是否到达投递时间- 如果还没达到投递时间,则重新添加到延时任务中
- 如果已经达到投递时间,则将消息
topic、queueId
归还为原始值,再重新写入commitLog
,这样消费者就能感知到进行消费了。
-
-
最后
更新消费偏移量
,并重新加入到延时任务进行下一次处理
最后,我是 Code皮皮虾 ,会在以后的日子里跟大家一起学习,一起进步!
觉得文章不错的话,希望大家多多支持,一键三连~ 页可以在 掘金 关注我~
转载自:https://juejin.cn/post/7249740342751445053