揭秘 | RocketMQ文件清理机制~
前言
最近在公司的MQ
论坛里,看到好几次Broker
扩容消息,我心里就在想RocketMq
是有文件清理机制的,咱们的MQ
消息就那么多吗,hh~
虽然我知道RocketMQ
存在文件清理机制,但是具体的清理策略却不是很清楚,本文就来整理一下RocketMQ
的清理机制及源码
RocketMQ文件清理机制
- 每天凌晨4点会清理过期的文件
- 当磁盘使用率达到70%时,会立刻清理过期的文件。
- 当磁盘使用率达到85%时,会从最早创建的文件开始清理,不管文件是否已过期,直到磁盘空间充足。
源码
RocketMQ
的消息是存储在Broker
上的,而Broker
的消息存储是依赖于DefaultMessageStore
Broker
启动时,也会start DefaultMessageStore
// org.apache.rocketmq.store.DefaultMessageStore#start
public void start() throws Exception {
// ......
// 添加一些定时任务
this.addScheduleTask();
}
private void addScheduleTask() {
// todo 清理过期文件 每隔10s
// this.messageStoreConfig.getCleanResourceInterval() == 10000
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
// 文件清理定时任务
DefaultMessageStore.this.cleanFilesPeriodically();
}
}, 1000 * 60, this.messageStoreConfig.getCleanResourceInterval(), TimeUnit.MILLISECONDS);
}
DefaultMessageStore start
时,会开启一些定时任务,其中就包含文件清理的定时任务,每隔10s
执行一次
private void cleanFilesPeriodically() {
// 清理commitLog
this.cleanCommitLogService.run();
// 清理ConsumerQueue
this.cleanConsumeQueueService.run();
}
清理CommitLog
class CleanCommitLogService {
public void run() {
try {
// 删除过期文件
this.deleteExpiredFiles();
// 兜底删除
this.redeleteHangedFile();
} catch (Throwable e) {
DefaultMessageStore.log.warn(this.getServiceName() + " service has exception. ", e);
}
}
}
private void deleteExpiredFiles() {
int deleteCount = 0;
// 文件保留时间(默认72个小时)
long fileReservedTime = DefaultMessageStore.this.getMessageStoreConfig().getFileReservedTime();
// 两次删除文件的间隔时间(默认100ms)
int deletePhysicFilesInterval = DefaultMessageStore.this.getMessageStoreConfig().getDeleteCommitLogFilesInterval();
// 强制删除间隔时间
int destroyMapedFileIntervalForcibly = DefaultMessageStore.this.getMessageStoreConfig().getDestroyMapedFileIntervalForcibly();
// 是否到达删除时间点,例如: 默认每天凌晨4点删除一次
boolean timeup = this.isTimeToDelete();
// 磁盘空间是否快满了
boolean spacefull = this.isSpaceToDelete();
// 手动删除(删除命令),可调用executeDeleteFilesManually进行删除
boolean manualDelete = this.manualDeleteFileSeveralTimes > 0;
if (timeup || spacefull || manualDelete) {
if (manualDelete)
this.manualDeleteFileSeveralTimes--;
boolean cleanAtOnce = DefaultMessageStore.this.getMessageStoreConfig().isCleanFileForciblyEnable() && this.cleanImmediately;
log.info("begin to delete before {} hours file. timeup: {} spacefull: {} manualDeleteFileSeveralTimes: {} cleanAtOnce: {}",
fileReservedTime,
timeup,
spacefull,
manualDeleteFileSeveralTimes,
cleanAtOnce);
fileReservedTime *= 60 * 60 * 1000;
// todo 清理过期文件
deleteCount = DefaultMessageStore.this.commitLog.deleteExpiredFile(fileReservedTime, deletePhysicFilesInterval,
destroyMapedFileIntervalForcibly, cleanAtOnce);
if (deleteCount > 0) {
} else if (spacefull) {
log.warn("disk space will be full soon, but delete file failed.");
}
}
}
文件清理存在三种情况
- 每天的定时删除
- 磁盘容量达到阈值
- 手动执行的删除命令
只要有一种满足,就需要去执行清理逻辑~
定时删除
isTimeToDelete
用于判断是否到达删除的时间点,例如: 默认每天凌晨4点进行删除
支持多个删除时间点,用
;分隔即可
**在文章开头也提到了,会开启每隔10s
的定时任务,所以如果当前时间符合定义的删除时间点,及满足清理条件~ **
private boolean isTimeToDelete() {
// 默认是 "04"
String when = DefaultMessageStore.this.getMessageStoreConfig().getDeleteWhen();
// 是否到达可删除的事件点
if (UtilAll.isItTimeToDo(when)) {
DefaultMessageStore.log.info("it's time to reclaim disk space, " + when);
return true;
}
return false;
}
public static boolean isItTimeToDo(final String when) {
// 支持多个删除时间点,用;分隔
String[] whiles = when.split(";");
if (whiles.length > 0) {
Calendar now = Calendar.getInstance();
for (String w : whiles) {
int nowHour = Integer.parseInt(w);
// 是否到达删除时间点
if (nowHour == now.get(Calendar.HOUR_OF_DAY)) {
return true;
}
}
}
return false;
}
磁盘使用量达到阈值删除
有关阈值的定义,一共有三个
0.75
: 默认的清理阈值,大于该阈值才会清理,但需要等到文件达到过期时间0.85
: 强制清理阈值,达到该阈值,不需要等到文件是否达到过期时间(72小时)0.95
: 告警阈值,逻辑与0.85
阈值一样,只不过会多条error`日志
private boolean isSpaceToDelete() {
// 清理阈值比例,默认 0.75,文件磁盘使用比例超过75%,则需要进行清理
double ratio = DefaultMessageStore.this.getMessageStoreConfig().getDiskMaxUsedSpaceRatio() / 100.0;
// 是否立即清除
cleanImmediately = false;
{
// 多存储路径
String commitLogStorePath = DefaultMessageStore.this.getStorePathPhysic();
String[] storePaths = commitLogStorePath.trim().split(MessageStoreConfig.MULTI_PATH_SPLITTER);
Set<String> fullStorePath = new HashSet<>();
// 最小的磁盘使用比例及对应的文件path
double minPhysicRatio = 100;
String minStorePath = null;
// 遍历所有文件,拿到最小的磁盘使用比例及对应的文件path
for (String storePathPhysic : storePaths) {
// 计算磁盘的使用率
double physicRatio = UtilAll.getDiskPartitionSpaceUsedPercent(storePathPhysic);
if (minPhysicRatio > physicRatio) {
minPhysicRatio = physicRatio;
minStorePath = storePathPhysic;
}
// 是否超过磁盘空间清理强制比例
if (physicRatio > diskSpaceCleanForciblyRatio) {
fullStorePath.add(storePathPhysic);
}
}
// 如果最小的磁盘使用比例超过了磁盘告警比例(0.95) or 磁盘强制清除比例(0.85)
// 那么cleanImmediately = true; 需立即清除
DefaultMessageStore.this.commitLog.setFullStorePaths(fullStorePath);
if (minPhysicRatio > diskSpaceWarningLevelRatio) {
// 超过了磁盘告警比例(0.95)
boolean diskok = DefaultMessageStore.this.runningFlags.getAndMakeDiskFull();
if (diskok) {
DefaultMessageStore.log.error("physic disk maybe full soon " + minPhysicRatio +
", so mark disk full, storePathPhysic=" + minStorePath);
}
cleanImmediately = true;
} else if (minPhysicRatio > diskSpaceCleanForciblyRatio) {
// 超过了磁盘强制清除比例(0.85)
cleanImmediately = true;
} else {
boolean diskok = DefaultMessageStore.this.runningFlags.getAndMakeDiskOK();
if (!diskok) {
DefaultMessageStore.log.info("physic disk space OK " + minPhysicRatio +
", so mark disk ok, storePathPhysic=" + minStorePath);
}
}
// 最小的磁盘使用比例 < 0 或者 > 默认清除阈值(0.75),则需要清理文件
if (minPhysicRatio < 0 || minPhysicRatio > ratio) {
DefaultMessageStore.log.info("physic disk maybe full soon, so reclaim space, "
+ minPhysicRatio + ", storePathPhysic=" + minStorePath);
return true;
}
}
// ......同上
return false;
}
清理文件
当满足定时删除、达到阈值删除、手动删除
其中任何一种时,即可触发文件的清理逻辑
public int deleteExpiredFile(
final long expiredTime,
final int deleteFilesInterval,
final long intervalForcibly,
final boolean cleanImmediately
) {
// 清理过期文件
return this.mappedFileQueue.deleteExpiredFileByTime(expiredTime, deleteFilesInterval, intervalForcibly, cleanImmediately);
}
// org.apache.rocketmq.store.MappedFileQueue#deleteExpiredFileByTime
public int deleteExpiredFileByTime(final long expiredTime,
final int deleteFilesInterval,
final long intervalForcibly,
final boolean cleanImmediately) {
// 拿到mappedFile引用
Object[] mfs = this.copyMappedFiles(0);
if (null == mfs)
return 0;
// -1, 因为最后一个文件肯定处于使用中,不需要清理
int mfsLength = mfs.length - 1;
int deleteCount = 0;
List<MappedFile> files = new ArrayList<MappedFile>();
if (null != mfs) {
for (int i = 0; i < mfsLength; i++) {
MappedFile mappedFile = (MappedFile) mfs[i];
// 计算文件的最大存活时间,文件最后一次修改时间 + 过期时间(默认72小时)
long liveMaxTimestamp = mappedFile.getLastModifiedTimestamp() + expiredTime;
// 如果当前时间 > 最大存活时间,或者需要立即删除
if (System.currentTimeMillis() >= liveMaxTimestamp || cleanImmediately) {
// 销毁mappedFile
if (mappedFile.destroy(intervalForcibly)) {
// 添加到待删除文件集合中
files.add(mappedFile);
deleteCount++;
// 每一批次,最多删除十个文件
if (files.size() >= DELETE_FILES_BATCH_MAX) {
break;
}
if (deleteFilesInterval > 0 && (i + 1) < mfsLength) {
try {
// 多次删除文件之间停顿一下(默认100ms)
Thread.sleep(deleteFilesInterval);
} catch (InterruptedException e) {
}
}
} else {
break;
}
} else {
//avoid deleting files in the middle
break;
}
}
}
// 清除mappedFile
deleteExpiredFile(files);
return deleteCount;
}
流程如下
- 遍历
MappedFile
,除了最后一个(最后一个处于使用中,不用清理) - 计算文件最大存活时间,即文件的最后一次修改时间 + 过期时间(72小时)
- 如果**
当前时间 > 最大存活时间
,或者需要强制删除**,才进行文件清理操作 - 销毁
MappedFile
,关闭文件channel
,删除File
,并且每次删除之间存在一定的时间间隔(默认100ms
) - 从
mappedFiles
中,删除当前mappedFile
下面再看下destroy
逻辑
public boolean destroy(final long intervalForcibly) {
// 扣减引用计数,超过intervalForcibly(120s)
this.shutdown(intervalForcibly);
// 是否清楚结束
if (this.isCleanupOver()) {
try {
// 关闭文件通道
this.fileChannel.close();
log.info("close file channel " + this.fileName + " OK");
long beginTime = System.currentTimeMillis();
// 删除文件
boolean result = this.file.delete();
log.info("delete file[REF:" + this.getRefCount() + "] " + this.fileName
+ (result ? " OK, " : " Failed, ") + "W:" + this.getWrotePosition() + " M:"
+ this.getFlushedPosition() + ", "
+ UtilAll.computeElapsedTimeMilliseconds(beginTime));
} catch (Exception e) {
log.warn("close file channel " + this.fileName + " Failed. ", e);
}
return true;
} else {
log.warn("destroy mapped file[REF:" + this.getRefCount() + "] " + this.fileName
+ " Failed. cleanupOver: " + this.cleanupOver);
}
return false;
}
因为文件是会被使用的,所以当我们要删除文件的时候,文件可能正在被使用,即文件被引用次数 > 0,此时是不能删除的
所以当引用次数 > 0
时删除,此时的第一次删除并不会真正删除,而是会记录第一次准备删除的时间
只有当引用计数 <= 0
或者引用计数 > 0
但是超过了强制删除时间,才会去删除,即关闭文件channel
和删除file
主要删除流程deleteExpiredFiles
看完了,直接看看redeleteHangedFile
上面刚说,因为引用计数的原因,导致某个文件可能还无法被删除,当能删除的文件都删除后,那么未能删除的文件就成为了第一个mappedFile
所以此时需要redeleteHangedFile
再来兜个底,看看第一个mappedFile
能否删除,如果可以就删除
private void redeleteHangedFile() {
int interval = DefaultMessageStore.this.getMessageStoreConfig().getRedeleteHangedFileInterval();
long currentTimestamp = System.currentTimeMillis();
if ((currentTimestamp - this.lastRedeleteTimestamp) > interval) {
this.lastRedeleteTimestamp = currentTimestamp;
// 上面看到了的,强制删除间隔时间
int destroyMapedFileIntervalForcibly =
DefaultMessageStore.this.getMessageStoreConfig().getDestroyMapedFileIntervalForcibly();
// 重试删除第一个mappedFile
if (DefaultMessageStore.this.commitLog.retryDeleteFirstFile(destroyMapedFileIntervalForcibly)) {
}
}
}
public boolean retryDeleteFirstFile(final long intervalForcibly) {
// 拿到第一个mappedFile
MappedFile mappedFile = this.getFirstMappedFile();
if (mappedFile != null) {
// 如果不可用
if (!mappedFile.isAvailable()) {
log.warn("the mappedFile was destroyed once, but still alive, " + mappedFile.getFileName());
// 则执行destroy
boolean result = mappedFile.destroy(intervalForcibly);
if (result) {
log.info("the mappedFile re delete OK, " + mappedFile.getFileName());
List<MappedFile> tmpFiles = new ArrayList<MappedFile>();
tmpFiles.add(mappedFile);
// destroy成功,从mappedFils中删除该mappedFile
this.deleteExpiredFile(tmpFiles);
} else {
log.warn("the mappedFile re delete failed, " + mappedFile.getFileName());
}
return result;
}
}
return false;
}
清理ConsumerQueue
private void cleanFilesPeriodically() {
// todo 清除CommitLog文件
this.cleanCommitLogService.run();
// todo 清除ConsumeQueue文件
this.cleanConsumeQueueService.run();
}
class CleanConsumeQueueService {
private long lastPhysicalMinOffset = 0;
public void run() {
try {
// 删除过期文件
this.deleteExpiredFiles();
} catch (Throwable e) {
DefaultMessageStore.log.warn(this.getServiceName() + " service has exception. ", e);
}
}
}
private void deleteExpiredFiles() {
// 每次清理的间隔,默认100ms
int deleteLogicsFilesInterval = DefaultMessageStore.this.getMessageStoreConfig().getDeleteConsumeQueueFilesInterval();
// 拿到commitLog的最小偏移量
long minOffset = DefaultMessageStore.this.commitLog.getMinOffset();
// 是否大于最后一次清理的最小偏移量,默认为0
if (minOffset > this.lastPhysicalMinOffset) {
// 更新一下
this.lastPhysicalMinOffset = minOffset;
ConcurrentMap<String, ConcurrentMap<Integer, ConsumeQueue>> tables = DefaultMessageStore.this.consumeQueueTable;
// 遍历删除
for (ConcurrentMap<Integer, ConsumeQueue> maps : tables.values()) {
for (ConsumeQueue logic : maps.values()) {
// 删除
int deleteCount = logic.deleteExpiredFile(minOffset);
// 如果删除过文件,且需要进行间隔
if (deleteCount > 0 && deleteLogicsFilesInterval > 0) {
try {
// 休眠一下,间隔
Thread.sleep(deleteLogicsFilesInterval);
} catch (InterruptedException ignored) {
}
}
}
}
// todo 清理IndexFile
DefaultMessageStore.this.indexService.deleteExpiredFile(minOffset);
}
}
当CommitLog
清除完毕后,那么在ConsumerQueue、IndexFile
中小于第一个MappedFile
偏移量的记录都要被删除掉
所以上述代码在拿到minOffset
后,遍历consumeQueueTable
,挨个判断并删除
public int deleteExpiredFileByOffset(long offset, int unitSize) {
// 拿到引用
Object[] mfs = this.copyMappedFiles(0);
List<MappedFile> files = new ArrayList<MappedFile>();
int deleteCount = 0;
if (null != mfs) {
// 最后一个不清楚
int mfsLength = mfs.length - 1;
for (int i = 0; i < mfsLength; i++) {
boolean destroy;
MappedFile mappedFile = (MappedFile) mfs[i];
// 其实就是拿到最后一个消息的buffer
SelectMappedBufferResult result = mappedFile.selectMappedBuffer(this.mappedFileSize - unitSize);
if (result != null) {
// 然后取这个消息的偏移量,即就是这个文件的最大偏移量了
long maxOffsetInLogicQueue = result.getByteBuffer().getLong();
result.release();
// 是否小于需要删除的偏移量,小于就要删除
destroy = maxOffsetInLogicQueue < offset;
if (destroy) {
log.info("physic min offset " + offset + ", logics in current mappedFile max offset "
+ maxOffsetInLogicQueue + ", delete it");
}
} else if (!mappedFile.isAvailable()) { // Handle hanged file.
log.warn("Found a hanged consume queue file, attempting to delete it.");
destroy = true;
} else {
log.warn("this being not executed forever.");
break;
}
// 进行销毁,间隔强制删除时间间隔为60s
if (destroy && mappedFile.destroy(1000 * 60)) {
files.add(mappedFile);
deleteCount++;
} else {
break;
}
}
}
// 从mappedFiles中清除该mappedFile
deleteExpiredFile(files);
return deleteCount;
}
删除逻辑大体与CommitLog
删除逻辑一致,只不过CommitLog
是根据文件过期时间来删除,而ConsumerQueue
是根据offset
来删除
清理IndexFile
依旧是deleteExpiredFiles
方法,在清理完consumerQueue
后,会同步清理IndexFile
private void deleteExpiredFiles() {
int deleteLogicsFilesInterval = DefaultMessageStore.this.getMessageStoreConfig().getDeleteConsumeQueueFilesInterval();
long minOffset = DefaultMessageStore.this.commitLog.getMinOffset();
if (minOffset > this.lastPhysicalMinOffset) {
// ...... 清理consumerQueue
// todo 清理IndexFile
DefaultMessageStore.this.indexService.deleteExpiredFile(minOffset);
}
}
public void deleteExpiredFile(long offset) {
Object[] files = null;
try {
this.readWriteLock.readLock().lock();
if (this.indexFileList.isEmpty()) {
return;
}
// 拿到第一个IndexFile,进行判断最后的偏移量是否小于offset
long endPhyOffset = this.indexFileList.get(0).getEndPhyOffset();
if (endPhyOffset < offset) {
files = this.indexFileList.toArray();
}
} catch (Exception e) {
log.error("destroy exception", e);
} finally {
this.readWriteLock.readLock().unlock();
}
if (files != null) {
// 说明第一个IndexFile最后的偏移量小于offset
List<IndexFile> fileList = new ArrayList<IndexFile>();
// 遍历,将小于offset的file加入到待删除集合中
for (int i = 0; i < (files.length - 1); i++) {
IndexFile f = (IndexFile) files[i];
if (f.getEndPhyOffset() < offset) {
fileList.add(f);
} else {
break;
}
}
// 删除过期文件,逻辑与之前基本一致,即destroy,然后从IndexFileList中移除即可
this.deleteExpiredFile(fileList);
}
}
逻辑还是与commitLog、consumerQueue
基本一致,无非就是遍历文件比较,如果文件需要删除就加入到待删除集中中
然后统一对indexFile destroy
,再移除掉该indexFile
引用即可
总结
源码学完总结一下
首先Broker
启动的时候会开启定时任务,每隔10s
去清理过期文件,其中包含commitLog、consumerQueue、indexFile
三种文件
先清除commitLog
,当满足以下条件时,可执行commitLog
删除逻辑
- 定时删除,达到符合删除的时间节点,即可触发
- 达到阈值,阈值分为三种
0.75、0.85、0.95
- 当达到
0.75
时就可以触发清理逻辑了,但此时受限于文件是否过期 - 当阈值超过
0.85
时,此时需强制删除,不受限于文件是否过期,0.95
与0.85
的区别就是多了一条error
日志
- 当达到
- 手动删除,执行删除命令
当commitLog
清理完毕后,那么可以获取minOffset
在consumerQueue、indexFile
中,遍历判断小于该minOffset
的文件都需要删除
转载自:https://juejin.cn/post/7270871863161520184