一个本地文件就能保证RocketMQ5中的定时消息不被丢失?
前两期介绍了定时消息原理以及深入到代码中看运行逻辑。本期,介绍一下RocketMQ中是如何避免定时消息由于系统宕机等系统异常导致丢失的。
启动时恢复状态
在TimerMessageStore
启动时,不光会初始化五个处理Service
,还会从TimerCheckPoint
文件中恢复之前定时消息的处理现场。避免因RocketMQ
意外宕机时造成定时消息的丢失。
TimerCheckPoint数据结构
在了解如何从TimerCheckPoint
中恢复处理线程之前,先了解一下该文件中存放了什么数据。
位置:{user.home}/store/config/timercheck 大小:4KB
存放的数据:
字段 | 作用 |
---|---|
lastReadTimeMs | 最后一次时间轮读取时间 |
lastTimerLogFlushPos | 最后一次保存在TimerLog的位置 |
lastTimerQueueOffset | 最后一次读取TIMER_TOPIC主题的consumerQueue文件的位置 |
masterTimerQueueOffset | 主节点读取TIMER_TOPIC主题的consumerQueue文件的位置 |
数据版本: | |
stateVersion | broker状态版本号 |
timestamp | 版本更新时间戳 |
counter | 计数器 |
如何恢复
在TimerMessageStore
中,使用了一个recover
方法。在该方法中,可以分为两大块,一块为恢复TimerWheel
中的数据;一块为设置TimerMessageStore
服务启动数据。
恢复TimerWheel(时间轮)数据
从TimerChechPoint
中获取上次程序运行中保存的TimerLog
刷盘偏移量。然后将该偏移量减少,使恢复TimerLog
中的数据到TimerWheel
中要比设想的多。因为TimerWheel
中是根据定时时间来获取对应的槽的,所以即使恢复多TimerLog
的数据也不会影响到定时消息的逻辑处理。
public void recover() {
// 从检查点获取最后刷新位置 (最后一个TimerLog的文件中的偏移量)
long lastFlushPos = timerCheckpoint.getLastTimerLogFlushPos();
// 获取 timerLog 队列中的最后一个映射文件
MappedFile lastFile = timerLog.getMappedFileQueue().getLastMappedFile();
if (null != lastFile) {
// 删除一个文件大小的偏移量对于恢复并没有影响,但是可以预防恢复数据错漏。
// 恢复多一点数据对于时间轮来说并没有影响的
lastFlushPos = lastFlushPos - lastFile.getFileSize();
}
if (lastFlushPos < 0) {
lastFlushPos = 0;
}
// 恢复TimerLog中的数据到timerWheel中
long processOffset = recoverAndRevise(lastFlushPos, true);
/**
* --------------- 恢复重要运行变量 -----------------
*/
}
private long recoverAndRevise(long beginOffset, boolean checkTimerLog) {
LOGGER.info("Begin to recover timerLog offset:{} check:{}", beginOffset, checkTimerLog);
MappedFile lastFile = timerLog.getMappedFileQueue().getLastMappedFile();
if (null == lastFile) {
return 0;
}
// 从最后一个timerLog文件开始,从后往前找到beginOffset偏移量在哪个文件当中
List<MappedFile> mappedFiles = timerLog.getMappedFileQueue().getMappedFiles();
int index = mappedFiles.size() - 1;
for (; index >= 0; index--) {
MappedFile mappedFile = mappedFiles.get(index);
if (beginOffset >= mappedFile.getFileFromOffset()) {
break;
}
}
if (index < 0) {
index = 0;
}
// 找到beginOffset偏移量所在的文件后,从前往后遍历每个timerLog,将其中的数据恢复到timerWheel中
long checkOffset = mappedFiles.get(index).getFileFromOffset();
for (; index < mappedFiles.size(); index++) {
MappedFile mappedFile = mappedFiles.get(index);
// 从当前的timerLog文件中获取能够恢复的数据
// checkTimerLog为True时:取整个TimerLog中的内容
// checkTimerLog为False时:取当前读位置之前的所有内容
SelectMappedBufferResult sbr = mappedFile.selectMappedBuffer(0, checkTimerLog ? mappedFiles.get(index).getFileSize() : mappedFile.getReadPosition());
ByteBuffer bf = sbr.getByteBuffer();
int position = 0;
boolean stopCheck = false;
// 从timerLog文件中获取数据,然后放入到timerWheel中
for (; position < sbr.getSize(); position += TimerLog.UNIT_SIZE) {
try {
bf.position(position);
// timelog 大小
int size = bf.getInt();//size
// 上一条timeLog记录的位置
bf.getLong();//prev pos
// magic
int magic = bf.getInt();
// 魔法值为空白(BLANK_MAGIC_CODE)时,说明已经到该文件的末尾
if (magic == TimerLog.BLANK_MAGIC_CODE) {
break;
}
// 如果magic不对,或者size不对,说明该文件已经损坏
if (checkTimerLog && (!isMagicOK(magic) || TimerLog.UNIT_SIZE != size)) {
stopCheck = true;
break;
}
// bf.getLong->currWriteTime bf.getInt->calcDelayedTime-currWriteTime
// 写入时的时间 + (计算过后的定时时间 - 写入时的时间)
// todo lastPos不是-1咋办
long delayTime = bf.getLong() + bf.getInt();
if (TimerLog.UNIT_SIZE == size && isMagicOK(magic)) {
timerWheel.reviseSlot(delayTime, TimerWheel.IGNORE, sbr.getStartOffset() + position, true);
}
} catch (Exception e) {
LOGGER.error("Recover timerLog error", e);
stopCheck = true;
break;
}
}
sbr.release();
checkOffset = mappedFiles.get(index).getFileFromOffset() + position;
if (stopCheck) {
break;
}
}
// 删除checkOffset偏移量之后的文件
if (checkTimerLog) {
timerLog.getMappedFileQueue().truncateDirtyFiles(checkOffset);
}
// 返回timerLog最后的处理偏移量
return checkOffset;
}
在上面的代码中,有一个变量BLANK_MAGIC_CODE
,这个魔法值是在TimerLog
文件中快存储满数据时做的一个标识。避免TimerLog
中存放着不完全的元素。
// TimerLog的append方法
public long append(byte[] data, int pos, int len) {
/**
* ------------省略获取对应TimerLog文件逻辑-------------------
*/
// 数据长度(52) + 最小空白长度(16)= 68 > (文件大小 - 当前写入位置)= 文件剩余空间
// 换个角度 文件剩余空间 < 68,先写入一个空白的TimerLog单元
// timerLog单个文件默认大小100MB对数据长度(52)求余后剩16
if (len + MIN_BLANK_LEN > mappedFile.getFileSize() - mappedFile.getWrotePosition()) {
ByteBuffer byteBuffer = ByteBuffer.allocate(MIN_BLANK_LEN);
// 当前元素大小为TimerLog文件剩余空间
byteBuffer.putInt(mappedFile.getFileSize() - mappedFile.getWrotePosition());
byteBuffer.putLong(0);
// 标识当前TimerLog元素为一个空白的元素
byteBuffer.putInt(BLANK_MAGIC_CODE);
if (mappedFile.appendMessage(byteBuffer.array())) {
// 写入位置设置为文件大小,标明当前文件已达写入上限
mappedFile.setWrotePosition(mappedFile.getFileSize());
} else {
log.error("Append blank error for timer log");
return -1;
}
// 获取最新的文件,避免插入空白单元后,文件已经满了
mappedFile = this.mappedFileQueue.getLastMappedFile(0);
if (null == mappedFile) {
log.error("create mapped file2 error for timer log");
return -1;
}
}
// 当前timerLog文件开始位置加上当前写入位置===>本次timerLog写入偏移量
long currPosition = mappedFile.getFileFromOffset() + mappedFile.getWrotePosition();
if (!mappedFile.appendMessage(data, pos, len)) {
log.error("Append error for timer log");
return -1;
}
return currPosition;
}
经过上面的逻辑后,此时TimerWheel
中已经重新维护好对应的数据结构了,就等待后续的Service
进行处理。
设置运行变量
在TimerMessageStore
运行过程中,有许多的变量在处理定时消息中发挥着作用。其中比较重要的有两个:
- currQueueOffset:当前读取到
TIMER_TOPIC
的consumerQueue文件的偏移量 - currReadTimeMs:当前时间轮的定时消息时间,用于获取对应的时
这两个属性分别控制着定时消息的获取以及定时消息的实际处理,在运行过程中有着至关重要的作用。所以在恢复运行现场时,也是会将这两个值给恢复的。
public void recover() {
//recover timerLog
// 从检查点获取最后刷新位置 (最后一个TimerLog的文件中的偏移量)
long lastFlushPos = timerCheckpoint.getLastTimerLogFlushPos();
// 获取 timerLog 队列中的最后一个映射文件
MappedFile lastFile = timerLog.getMappedFileQueue().getLastMappedFile();
if (null != lastFile) {
// 删除一个文件大小的偏移量对于恢复并没有影响,但是可以预防恢复数据错漏。
// 恢复多一点数据对于时间轮来说并没有影响的
lastFlushPos = lastFlushPos - lastFile.getFileSize();
}
if (lastFlushPos < 0) {
lastFlushPos = 0;
}
// 恢复TimerLog中的数据到timerWheel中
// processOffset是TimerLog最后的处理位置,也可以理解为是TimerLog新开始的偏移量
long processOffset = recoverAndRevise(lastFlushPos, true);
timerLog.getMappedFileQueue().setFlushedWhere(processOffset);
// 修订队列偏移量(用于处理消息的偏移量)
long queueOffset = reviseQueueOffset(processOffset);
if (-1 == queueOffset) {
currQueueOffset = timerCheckpoint.getLastTimerQueueOffset();
} else {
currQueueOffset = queueOffset + 1;
}
currQueueOffset = Math.min(currQueueOffset, timerCheckpoint.getMasterTimerQueueOffset());
// 设置当前时间轮读取时间
currReadTimeMs = timerCheckpoint.getLastReadTimeMs();
long nextReadTimeMs = formatTimeMs(System.currentTimeMillis()) -
(long) slotsTotal * precisionMs + (long) TIMER_BLANK_SLOTS * precisionMs;
if (currReadTimeMs < nextReadTimeMs) {
currReadTimeMs = nextReadTimeMs;
}
// 一般来说,minFirst都会返回Long.MAX_VALUE
long minFirst = timerWheel.checkPhyPos(currReadTimeMs, processOffset);
if (debug) {
minFirst = 0;
}
// 这个一般情况下不会发生
if (minFirst < processOffset) {
LOGGER.warn("Timer recheck because of minFirst:{} processOffset:{}", minFirst, processOffset);
recoverAndRevise(minFirst, false);
}
LOGGER.info("Timer recover ok currReadTimerMs:{} currQueueOffset:{} checkQueueOffset:{} processOffset:{}",
currReadTimeMs, currQueueOffset, timerCheckpoint.getLastTimerQueueOffset(), processOffset);
// 更新读取时间的提交点
commitReadTimeMs = currReadTimeMs;
// 更新队列偏移量的提交点
commitQueueOffset = currQueueOffset;
prepareTimerCheckPoint();
}
TimerFlushService运行逻辑
在TimerMessageStore
中,除了五个Service
对定时消息进行处理以外,还有着一个TimerFlushService
对定时消息涉及到的存储文件(TimerLog、TimerWheel、TimerCheckpoint)进行刷盘处理。
public void run() {
TimerMessageStore.LOGGER.info(this.getServiceName() + " service start");
long start = System.currentTimeMillis();
// 每隔一秒保存一次
while (!this.isStopped()) {
try {
prepareTimerCheckPoint();
timerLog.getMappedFileQueue().flush(0);
timerWheel.flush();
timerCheckpoint.flush();
// ------------- 省略监控代码 -------------
waitForRunning(storeConfig.getTimerFlushIntervalMs());
} catch (Throwable e) {
TimerMessageStore.LOGGER.error("Error occurred in " + getServiceName(), e);
}
}
TimerMessageStore.LOGGER.info(this.getServiceName() + " service end");
}
}
prepareTimerCheckPoint()
设置TimerCheckPoint文件内容
public void prepareTimerCheckPoint() {
// 最后一次在TimerLog的刷盘位置
timerCheckpoint.setLastTimerLogFlushPos(timerLog.getMappedFileQueue().getFlushedWhere());
// 最后一次时间轮读取时间
timerCheckpoint.setLastReadTimeMs(commitReadTimeMs);
// 主节点额外保存多一些信息
if (shouldRunningDequeue) {
timerCheckpoint.setMasterTimerQueueOffset(commitQueueOffset);
if (commitReadTimeMs != lastCommitReadTimeMs || commitQueueOffset != lastCommitQueueOffset) {
timerCheckpoint.updateDateVersion(messageStore.getStateMachineVersion());
lastCommitReadTimeMs = commitReadTimeMs;
lastCommitQueueOffset = commitQueueOffset;
}
}
timerCheckpoint.setLastTimerQueueOffset(Math.min(commitQueueOffset, timerCheckpoint.getMasterTimerQueueOffset()));
}
timerLog.getMappedFileQueue().flush(0);
将内存中的TimerLog刷盘到文件系统中。
timerWheel.flush();
// localBuffer初始化,使用了ThreadLocal,使得不同线程中各有一个副本
private final ThreadLocal<ByteBuffer> localBuffer = new ThreadLocal<ByteBuffer>() {
@Override
protected ByteBuffer initialValue() {
return byteBuffer.duplicate();
}
};
public void flush() {
ByteBuffer bf = localBuffer.get();
bf.position(0);
bf.limit(wheelLength);
mappedByteBuffer.position(0);
mappedByteBuffer.limit(wheelLength);
// 替换
for (int i = 0; i < wheelLength; i++) {
if (bf.get(i) != mappedByteBuffer.get(i)) {
mappedByteBuffer.put(i, bf.get(i));
}
}
this.mappedByteBuffer.force();
}
localBuffer
初始化时,是复制了一份mappedByteBuffer
中的数据的。上面的代码比较简单,将使用中的localBuffer
中的数据保存并刷盘到TimerWheel
文件中。
题外话:在TimerMessageStore
中,一共有三个Service
会同时使用TimerWheel
的localBuffer
。
- TimerEnqueuePutService:保存消息偏移量到localBuffer,维持槽中指向最后一个
TimerLog
的位置。 - TimerDequeueGetService:从localBuffer中获取当前读时间对应的槽,从而获取当前读时间需要处理的所有定时消息。
- TimerFlushService:保存当前
localBuffer
的数据到TimerWheel
中
timerCheckpoint.flush();
将TimerCheckPoint
中的属性,保存到本地文件中。
两个另外的定时任务
在启动TimerMessageStore
时,也会执行两个定时任务,用于清理TimerLog
的过期消息索引以及修正集群下对于定时消息的监控。
定时删除TimerLog
每隔30S,清除TimerLog中最旧的文件,避免TimerLog
文件过多。
scheduler.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
try {
// 获取最旧的消息
long minPy = messageStore.getMinPhyOffset();
// 获取最旧的timerLog的offset
int checkOffset = timerLog.getOffsetForLastUnit();
timerLog.getMappedFileQueue()
.deleteExpiredFileByOffsetForTimerLog(minPy, checkOffset, TimerLog.UNIT_SIZE);
} catch (Exception e) {
LOGGER.error("Error in cleaning timerLog", e);
}
}
}, 30, 30, TimeUnit.SECONDS);
定时修正监控数据
scheduler.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
try {
if (storeConfig.isTimerEnableCheckMetrics()) {
String when = storeConfig.getTimerCheckMetricsWhen();
if (!UtilAll.isItTimeToDo(when)) {
return;
}
long curr = System.currentTimeMillis();
if (curr - lastTimeOfCheckMetrics > 70 * 60 * 1000) {
lastTimeOfCheckMetrics = curr;
checkAndReviseMetrics();
LOGGER.info("[CheckAndReviseMetrics]Timer do check timer metrics cost {} ms",
System.currentTimeMillis() - curr);
}
}
} catch (Exception e) {
LOGGER.error("Error in cleaning timerLog", e);
}
}
}, 45, 45, TimeUnit.MINUTES);
总结
在RocketMQ5中实现了可控的定时消息,在保证定时消息的准时发送之余,RocketMQ还对定时消息的保存以及系统异常宕机下的特殊情况进行处理,保证消息的可靠性。
转载自:https://juejin.cn/post/7352091152583278603