likes
comments
collection
share

揭秘 | RocketMQ文件清理机制~

作者站长头像
站长
· 阅读数 20

前言

最近在公司的MQ论坛里,看到好几次Broker扩容消息,我心里就在想RocketMq是有文件清理机制的,咱们的MQ消息就那么多吗,hh~

虽然我知道RocketMQ存在文件清理机制,但是具体的清理策略却不是很清楚,本文就来整理一下RocketMQ清理机制及源码


RocketMQ文件清理机制

  • 每天凌晨4点会清理过期的文件
  • 当磁盘使用率达到70%时,会立刻清理过期的文件。
  • 当磁盘使用率达到85%时,会从最早创建的文件开始清理,不管文件是否已过期,直到磁盘空间充足。

源码

RocketMQ的消息是存储在Broker上的,而Broker的消息存储是依赖于DefaultMessageStore

Broker启动时,也会start DefaultMessageStore

// org.apache.rocketmq.store.DefaultMessageStore#start
public void start() throws Exception {

  // ......
  
  // 添加一些定时任务
  this.addScheduleTask();
  
}

private void addScheduleTask() {

  // todo 清理过期文件 每隔10s
  // this.messageStoreConfig.getCleanResourceInterval() == 10000
  this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
    @Override
    public void run() {
      // 文件清理定时任务
      DefaultMessageStore.this.cleanFilesPeriodically();
    }
  }, 1000 * 60, this.messageStoreConfig.getCleanResourceInterval(), TimeUnit.MILLISECONDS);
}

DefaultMessageStore start时,会开启一些定时任务,其中就包含文件清理的定时任务,每隔10s执行一次

private void cleanFilesPeriodically() {
	// 清理commitLog
  this.cleanCommitLogService.run();
  // 清理ConsumerQueue
  this.cleanConsumeQueueService.run();
}

清理CommitLog

class CleanCommitLogService {
  public void run() {
    try {
      // 删除过期文件
      this.deleteExpiredFiles();

      // 兜底删除
      this.redeleteHangedFile();
    } catch (Throwable e) {
      DefaultMessageStore.log.warn(this.getServiceName() + " service has exception. ", e);
    }
  }
}
private void deleteExpiredFiles() {
  int deleteCount = 0;
  
  // 文件保留时间(默认72个小时)
  long fileReservedTime = DefaultMessageStore.this.getMessageStoreConfig().getFileReservedTime();
  
  // 两次删除文件的间隔时间(默认100ms)
  int deletePhysicFilesInterval = DefaultMessageStore.this.getMessageStoreConfig().getDeleteCommitLogFilesInterval();
  // 强制删除间隔时间
  int destroyMapedFileIntervalForcibly = DefaultMessageStore.this.getMessageStoreConfig().getDestroyMapedFileIntervalForcibly();

  // 是否到达删除时间点,例如: 默认每天凌晨4点删除一次
  boolean timeup = this.isTimeToDelete();
  // 磁盘空间是否快满了
  boolean spacefull = this.isSpaceToDelete();
  // 手动删除(删除命令),可调用executeDeleteFilesManually进行删除
  boolean manualDelete = this.manualDeleteFileSeveralTimes > 0;

  if (timeup || spacefull || manualDelete) {

    if (manualDelete)
      this.manualDeleteFileSeveralTimes--;

    boolean cleanAtOnce = DefaultMessageStore.this.getMessageStoreConfig().isCleanFileForciblyEnable() && this.cleanImmediately;

    log.info("begin to delete before {} hours file. timeup: {} spacefull: {} manualDeleteFileSeveralTimes: {} cleanAtOnce: {}",
             fileReservedTime,
             timeup,
             spacefull,
             manualDeleteFileSeveralTimes,
             cleanAtOnce);

    fileReservedTime *= 60 * 60 * 1000;

    // todo 清理过期文件
    deleteCount = DefaultMessageStore.this.commitLog.deleteExpiredFile(fileReservedTime, deletePhysicFilesInterval,
                                                                       destroyMapedFileIntervalForcibly, cleanAtOnce);
    if (deleteCount > 0) {
    } else if (spacefull) {
      log.warn("disk space will be full soon, but delete file failed.");
    }
  }
}

文件清理存在三种情况

  1. 每天的定时删除
  2. 磁盘容量达到阈值
  3. 手动执行的删除命令

只要有一种满足,就需要去执行清理逻辑~


定时删除

isTimeToDelete用于判断是否到达删除的时间点,例如: 默认每天凌晨4点进行删除

支持多个删除时间点,用;分隔即可

**在文章开头也提到了,会开启每隔10s的定时任务,所以如果当前时间符合定义的删除时间点,及满足清理条件~ **

private boolean isTimeToDelete() {
  // 默认是 "04"
  String when = DefaultMessageStore.this.getMessageStoreConfig().getDeleteWhen();
  // 是否到达可删除的事件点
  if (UtilAll.isItTimeToDo(when)) {
    DefaultMessageStore.log.info("it's time to reclaim disk space, " + when);
    return true;
  }

  return false;
}

public static boolean isItTimeToDo(final String when) {
  // 支持多个删除时间点,用;分隔
  String[] whiles = when.split(";");
  if (whiles.length > 0) {
    Calendar now = Calendar.getInstance();
    for (String w : whiles) {
      int nowHour = Integer.parseInt(w);
      // 是否到达删除时间点
      if (nowHour == now.get(Calendar.HOUR_OF_DAY)) {
        return true;
      }
    }
  }

  return false;
}

磁盘使用量达到阈值删除

有关阈值的定义,一共有三个

  1. 0.75: 默认的清理阈值,大于该阈值才会清理,但需要等到文件达到过期时间
  2. 0.85: 强制清理阈值,达到该阈值,不需要等到文件是否达到过期时间(72小时)
  3. 0.95: 告警阈值,逻辑与0.85阈值一样,只不过会多条error`日志
private boolean isSpaceToDelete() {
  // 清理阈值比例,默认 0.75,文件磁盘使用比例超过75%,则需要进行清理
  double ratio = DefaultMessageStore.this.getMessageStoreConfig().getDiskMaxUsedSpaceRatio() / 100.0;

  // 是否立即清除
  cleanImmediately = false;

  {
    // 多存储路径
    String commitLogStorePath = DefaultMessageStore.this.getStorePathPhysic();
    String[] storePaths = commitLogStorePath.trim().split(MessageStoreConfig.MULTI_PATH_SPLITTER);
    Set<String> fullStorePath = new HashSet<>();
    
    // 最小的磁盘使用比例及对应的文件path
    double minPhysicRatio = 100;
    String minStorePath = null;
    
    // 遍历所有文件,拿到最小的磁盘使用比例及对应的文件path
    for (String storePathPhysic : storePaths) {
      
      // 计算磁盘的使用率
      double physicRatio = UtilAll.getDiskPartitionSpaceUsedPercent(storePathPhysic);
      if (minPhysicRatio > physicRatio) {
        minPhysicRatio =  physicRatio;
        minStorePath = storePathPhysic;
      }
      
      // 是否超过磁盘空间清理强制比例
      if (physicRatio > diskSpaceCleanForciblyRatio) {
        fullStorePath.add(storePathPhysic);
      }
    }
    
    // 如果最小的磁盘使用比例超过了磁盘告警比例(0.95) or 磁盘强制清除比例(0.85)
    // 那么cleanImmediately = true; 需立即清除
    DefaultMessageStore.this.commitLog.setFullStorePaths(fullStorePath);
    if (minPhysicRatio > diskSpaceWarningLevelRatio) {
      // 超过了磁盘告警比例(0.95)
      boolean diskok = DefaultMessageStore.this.runningFlags.getAndMakeDiskFull();
      if (diskok) {
        DefaultMessageStore.log.error("physic disk maybe full soon " + minPhysicRatio +
                                      ", so mark disk full, storePathPhysic=" + minStorePath);
      }

      cleanImmediately = true;
    } else if (minPhysicRatio > diskSpaceCleanForciblyRatio) {
      // 超过了磁盘强制清除比例(0.85)
      cleanImmediately = true;
    } else {
      boolean diskok = DefaultMessageStore.this.runningFlags.getAndMakeDiskOK();
      if (!diskok) {
        DefaultMessageStore.log.info("physic disk space OK " + minPhysicRatio +
                                     ", so mark disk ok, storePathPhysic=" + minStorePath);
      }
    }

    // 最小的磁盘使用比例 < 0 或者 > 默认清除阈值(0.75),则需要清理文件
    if (minPhysicRatio < 0 || minPhysicRatio > ratio) {
      DefaultMessageStore.log.info("physic disk maybe full soon, so reclaim space, "
                                   + minPhysicRatio + ", storePathPhysic=" + minStorePath);
      return true;
    }
  }

  // ......同上
  
  return false;
}

清理文件

当满足定时删除、达到阈值删除、手动删除其中任何一种时,即可触发文件的清理逻辑

public int deleteExpiredFile(
  final long expiredTime,
  final int deleteFilesInterval,
  final long intervalForcibly,
  final boolean cleanImmediately
) {
  // 清理过期文件
  return this.mappedFileQueue.deleteExpiredFileByTime(expiredTime, deleteFilesInterval, intervalForcibly, cleanImmediately);
}
// org.apache.rocketmq.store.MappedFileQueue#deleteExpiredFileByTime
public int deleteExpiredFileByTime(final long expiredTime,
                                   final int deleteFilesInterval,
                                   final long intervalForcibly,
                                   final boolean cleanImmediately) {
  // 拿到mappedFile引用
  Object[] mfs = this.copyMappedFiles(0);

  if (null == mfs)
    return 0;

  // -1, 因为最后一个文件肯定处于使用中,不需要清理
  int mfsLength = mfs.length - 1;
  int deleteCount = 0;
  List<MappedFile> files = new ArrayList<MappedFile>();
  if (null != mfs) {
    for (int i = 0; i < mfsLength; i++) {
      MappedFile mappedFile = (MappedFile) mfs[i];
      
      // 计算文件的最大存活时间,文件最后一次修改时间 + 过期时间(默认72小时)
      long liveMaxTimestamp = mappedFile.getLastModifiedTimestamp() + expiredTime;
      
      // 如果当前时间 > 最大存活时间,或者需要立即删除
      if (System.currentTimeMillis() >= liveMaxTimestamp || cleanImmediately) {
        
        // 销毁mappedFile
        if (mappedFile.destroy(intervalForcibly)) {
          // 添加到待删除文件集合中
          files.add(mappedFile);
          deleteCount++;

          // 每一批次,最多删除十个文件
          if (files.size() >= DELETE_FILES_BATCH_MAX) {
            break;
          }

          if (deleteFilesInterval > 0 && (i + 1) < mfsLength) {
            try {
              // 多次删除文件之间停顿一下(默认100ms)
              Thread.sleep(deleteFilesInterval);
            } catch (InterruptedException e) {
            }
          }
        } else {
          break;
        }
      } else {
        //avoid deleting files in the middle
        break;
      }
    }
  }

  // 清除mappedFile
  deleteExpiredFile(files);

  return deleteCount;
}

流程如下

  1. 遍历MappedFile,除了最后一个(最后一个处于使用中,不用清理)
  2. 计算文件最大存活时间,即文件的最后一次修改时间 + 过期时间(72小时)
  3. 如果**当前时间 > 最大存活时间,或者需要强制删除**,才进行文件清理操作
  4. 销毁MappedFile,关闭文件channel,删除File,并且每次删除之间存在一定的时间间隔(默认100ms
  5. mappedFiles中,删除当前mappedFile

下面再看下destroy逻辑

public boolean destroy(final long intervalForcibly) {
  // 扣减引用计数,超过intervalForcibly(120s)
  this.shutdown(intervalForcibly);

  // 是否清楚结束
  if (this.isCleanupOver()) {
    try {
      // 关闭文件通道
      this.fileChannel.close();
      log.info("close file channel " + this.fileName + " OK");

      long beginTime = System.currentTimeMillis();
      
      // 删除文件
      boolean result = this.file.delete();
      log.info("delete file[REF:" + this.getRefCount() + "] " + this.fileName
               + (result ? " OK, " : " Failed, ") + "W:" + this.getWrotePosition() + " M:"
               + this.getFlushedPosition() + ", "
               + UtilAll.computeElapsedTimeMilliseconds(beginTime));
    } catch (Exception e) {
      log.warn("close file channel " + this.fileName + " Failed. ", e);
    }

    return true;
  } else {
    log.warn("destroy mapped file[REF:" + this.getRefCount() + "] " + this.fileName
             + " Failed. cleanupOver: " + this.cleanupOver);
  }

  return false;
}

因为文件是会被使用的,所以当我们要删除文件的时候,文件可能正在被使用,即文件被引用次数 > 0,此时是不能删除的

所以当引用次数 > 0时删除,此时的第一次删除并不会真正删除,而是会记录第一次准备删除的时间

只有当引用计数 <= 0 或者引用计数 > 0 但是超过了强制删除时间,才会去删除,即关闭文件channel和删除file

主要删除流程deleteExpiredFiles看完了,直接看看redeleteHangedFile

上面刚说,因为引用计数的原因,导致某个文件可能还无法被删除,当能删除的文件都删除后,那么未能删除的文件就成为了第一个mappedFile

所以此时需要redeleteHangedFile再来兜个底,看看第一个mappedFile能否删除,如果可以就删除

private void redeleteHangedFile() {
  int interval = DefaultMessageStore.this.getMessageStoreConfig().getRedeleteHangedFileInterval();
  long currentTimestamp = System.currentTimeMillis();
  if ((currentTimestamp - this.lastRedeleteTimestamp) > interval) {
    this.lastRedeleteTimestamp = currentTimestamp;

    // 上面看到了的,强制删除间隔时间
    int destroyMapedFileIntervalForcibly =
      DefaultMessageStore.this.getMessageStoreConfig().getDestroyMapedFileIntervalForcibly();
    // 重试删除第一个mappedFile
    if (DefaultMessageStore.this.commitLog.retryDeleteFirstFile(destroyMapedFileIntervalForcibly)) {
    }
  }
}
public boolean retryDeleteFirstFile(final long intervalForcibly) {
  // 拿到第一个mappedFile
  MappedFile mappedFile = this.getFirstMappedFile();
  if (mappedFile != null) {
    // 如果不可用
    if (!mappedFile.isAvailable()) {
      log.warn("the mappedFile was destroyed once, but still alive, " + mappedFile.getFileName());
      // 则执行destroy
      boolean result = mappedFile.destroy(intervalForcibly);
      if (result) {
        log.info("the mappedFile re delete OK, " + mappedFile.getFileName());
        List<MappedFile> tmpFiles = new ArrayList<MappedFile>();
        tmpFiles.add(mappedFile);
        // destroy成功,从mappedFils中删除该mappedFile
        this.deleteExpiredFile(tmpFiles);
      } else {
        log.warn("the mappedFile re delete failed, " + mappedFile.getFileName());
      }

      return result;
    }
  }

  return false;
}

清理ConsumerQueue

private void cleanFilesPeriodically() {
  // todo 清除CommitLog文件
  this.cleanCommitLogService.run();
  // todo 清除ConsumeQueue文件
  this.cleanConsumeQueueService.run();
}
class CleanConsumeQueueService {
  private long lastPhysicalMinOffset = 0;

  public void run() {
    try {
      // 删除过期文件
      this.deleteExpiredFiles();
    } catch (Throwable e) {
      DefaultMessageStore.log.warn(this.getServiceName() + " service has exception. ", e);
    }
  }
}
private void deleteExpiredFiles() {
  // 每次清理的间隔,默认100ms
  int deleteLogicsFilesInterval = DefaultMessageStore.this.getMessageStoreConfig().getDeleteConsumeQueueFilesInterval();

  // 拿到commitLog的最小偏移量
  long minOffset = DefaultMessageStore.this.commitLog.getMinOffset();

  // 是否大于最后一次清理的最小偏移量,默认为0
  if (minOffset > this.lastPhysicalMinOffset) {
    // 更新一下
    this.lastPhysicalMinOffset = minOffset;

    ConcurrentMap<String, ConcurrentMap<Integer, ConsumeQueue>> tables = DefaultMessageStore.this.consumeQueueTable;

    // 遍历删除
    for (ConcurrentMap<Integer, ConsumeQueue> maps : tables.values()) {
      for (ConsumeQueue logic : maps.values()) {

        // 删除
        int deleteCount = logic.deleteExpiredFile(minOffset);

        // 如果删除过文件,且需要进行间隔
        if (deleteCount > 0 && deleteLogicsFilesInterval > 0) {
          try {
            // 休眠一下,间隔
            Thread.sleep(deleteLogicsFilesInterval);
          } catch (InterruptedException ignored) {
          }
        }
      }
    }

    // todo 清理IndexFile
    DefaultMessageStore.this.indexService.deleteExpiredFile(minOffset);
  }
}

CommitLog清除完毕后,那么在ConsumerQueue、IndexFile中小于第一个MappedFile偏移量的记录都要被删除掉

所以上述代码在拿到minOffset后,遍历consumeQueueTable,挨个判断并删除

public int deleteExpiredFileByOffset(long offset, int unitSize) {
  // 拿到引用
  Object[] mfs = this.copyMappedFiles(0);

  List<MappedFile> files = new ArrayList<MappedFile>();
  int deleteCount = 0;
  if (null != mfs) {

    // 最后一个不清楚
    int mfsLength = mfs.length - 1;

    for (int i = 0; i < mfsLength; i++) {
      boolean destroy;
      MappedFile mappedFile = (MappedFile) mfs[i];
      
      // 其实就是拿到最后一个消息的buffer
      SelectMappedBufferResult result = mappedFile.selectMappedBuffer(this.mappedFileSize - unitSize);
      if (result != null) {
        // 然后取这个消息的偏移量,即就是这个文件的最大偏移量了
        long maxOffsetInLogicQueue = result.getByteBuffer().getLong();
        result.release();
        
        // 是否小于需要删除的偏移量,小于就要删除
        destroy = maxOffsetInLogicQueue < offset;
        if (destroy) {
          log.info("physic min offset " + offset + ", logics in current mappedFile max offset "
                   + maxOffsetInLogicQueue + ", delete it");
        }
      } else if (!mappedFile.isAvailable()) { // Handle hanged file.
        log.warn("Found a hanged consume queue file, attempting to delete it.");
        destroy = true;
      } else {
        log.warn("this being not executed forever.");
        break;
      }

      // 进行销毁,间隔强制删除时间间隔为60s
      if (destroy && mappedFile.destroy(1000 * 60)) {
        files.add(mappedFile);
        deleteCount++;
      } else {
        break;
      }
    }
  }

  // 从mappedFiles中清除该mappedFile
  deleteExpiredFile(files);

  return deleteCount;
}

删除逻辑大体与CommitLog删除逻辑一致,只不过CommitLog是根据文件过期时间来删除,而ConsumerQueue是根据offset来删除


清理IndexFile

依旧是deleteExpiredFiles方法,在清理完consumerQueue后,会同步清理IndexFile

private void deleteExpiredFiles() {
  int deleteLogicsFilesInterval = DefaultMessageStore.this.getMessageStoreConfig().getDeleteConsumeQueueFilesInterval();

  long minOffset = DefaultMessageStore.this.commitLog.getMinOffset();
  if (minOffset > this.lastPhysicalMinOffset) {
    
    // ...... 清理consumerQueue
    
    // todo 清理IndexFile
    DefaultMessageStore.this.indexService.deleteExpiredFile(minOffset);
  }
}
public void deleteExpiredFile(long offset) {
  Object[] files = null;
  try {
    this.readWriteLock.readLock().lock();
    if (this.indexFileList.isEmpty()) {
      return;
    }

    // 拿到第一个IndexFile,进行判断最后的偏移量是否小于offset
    long endPhyOffset = this.indexFileList.get(0).getEndPhyOffset();
    if (endPhyOffset < offset) {
      files = this.indexFileList.toArray();
    }
  } catch (Exception e) {
    log.error("destroy exception", e);
  } finally {
    this.readWriteLock.readLock().unlock();
  }

  if (files != null) {
    // 说明第一个IndexFile最后的偏移量小于offset
    List<IndexFile> fileList = new ArrayList<IndexFile>();
    
    // 遍历,将小于offset的file加入到待删除集合中
    for (int i = 0; i < (files.length - 1); i++) {
      IndexFile f = (IndexFile) files[i];
      if (f.getEndPhyOffset() < offset) {
        fileList.add(f);
      } else {
        break;
      }
    }

    // 删除过期文件,逻辑与之前基本一致,即destroy,然后从IndexFileList中移除即可
    this.deleteExpiredFile(fileList);
  }
}

逻辑还是与commitLog、consumerQueue基本一致,无非就是遍历文件比较,如果文件需要删除就加入到待删除集中中

然后统一对indexFile destroy,再移除掉该indexFile引用即可


总结

源码学完总结一下

首先Broker启动的时候会开启定时任务每隔10s去清理过期文件,其中包含commitLog、consumerQueue、indexFile三种文件

先清除commitLog,当满足以下条件时,可执行commitLog删除逻辑

  1. 定时删除,达到符合删除的时间节点,即可触发
  2. 达到阈值,阈值分为三种0.75、0.85、0.95
    1. 当达到0.75时就可以触发清理逻辑了,但此时受限于文件是否过期
    2. 当阈值超过0.85时,此时需强制删除,不受限于文件是否过期,0.950.85的区别就是多了一条error日志
  3. 手动删除,执行删除命令

commitLog清理完毕后,那么可以获取minOffset

consumerQueue、indexFile中,遍历判断小于该minOffset的文件都需要删除