likes
comments
collection
share

RocketMQ 源码探究 -- broker 中存储实现

作者站长头像
站长
· 阅读数 33

前面我们探究了 name server 和 producer 的核心设计,今天我们开始对 broker 中的文件存储部分进行深入研究。

存储结构

RocketMQ 源码探究 -- broker 中存储实现

RocketMQ 消息存储架构

Producer 向 broker 发送消息,消息被封装后存储到 CommitLog 中,落盘成功后再分发到 ConsumeQueue 和 IndexFile。Consumer 在消费消息时,向 ConsumeQueue 请求消息数据,再根据 ConsumeQueue 的消息描述数据向 CommitLog 请求具体的消息数据,完成消费过程。

RocketMQ 的核心文件包括:CommitLog、ConsumeQueue 和 IndexFile。

  • CommitLog 是消息主体以及元数据主体存储单元。消息数据全部存储于CommitLog 中。默认大小为 1G,当文件写满后,再创建新的文件。
  • ConsumeQueue 是消息消费队列,存储 CommitLog 中消息的位置信息,可以看做 CommitLog 的索引文件。
  • IndexFile(索引文件)提供了一种可以通过key或时间区间来查询消息的方法。

RocketMQ 采用混合型存储结构。在 broker 实例下共用同一个 CommitLog,采用顺序写的方式,有着不错的写入性能。在消费消息时,如果消费者直接去 CommitLog 遍历查找(随机读)会明显降低消费性能(顺序写:600MB/s,随机读:100KB/s)RocketMQ 引入 ConsumeQueue 来简化文件的寻址操作。

文件基本操作

对文件的操作不外乎读和写。

从宏观层面看,RocketMQ 整体是采用顺序写和随机读的方式。对 CommitLog 的写入是顺序的,但读取是随机的。

从微观角度看,RocketMQ 通过 mmap 的方式读文件,通过 mmap write 或 FileChannel write 的方式写文件。

MappedFile

三个文件:MappedFile 初始化时会将 CommitLog、ConsumeQueue 和 IndexFile 通过 mmap 的方式加载,并初始化类中文件相关的属性。

两种写入方式:MappedFile 封装了 FileChannel 和 mmap 两种文件写入方式。

三个文件指针:MappedFile 用 wrotePosition、committedPosition 和 flushedPosition 三个位置指针来标识文件写入的进度。

加载文件

MappedFile 在启动时会将目录下的 CommitLog、ConsumeQueue 和 IndexFile 通过 mmap 的方式载入,并将类中文件相关属性(文件名、文件大小、偏移量等数据)进行初始化操作。

文件 commit

transientStorePoolEnable 为 true 时,RocketMQ 采用 FileChannel 写方式。

MappedFile 在初始化时,从预分配好的堆外内存集合中获取一个 DirectBuffer,消息写入 DirectBuffer。当 commit 时,会将 DirectBuffer 的数据 write 到 FileChannel 里,并更新 committedPosition,供 FileChannel 的 write 调用。

文件 flush

无论是 mmap 还是 FileChannel 写都是写入到 page cache 中,操作系统会按照一定的调度规则将 page cache 中的数据刷入磁盘。可通过 sysctl -a | grep dirty 查看操作系统刷脏页的配置,默认 5s 刷新一次。

Java 也提供了强制刷新的 API,RocketMQ 会按照配置(同步 or 异步,时间频次)来触发刷盘操作。

需要注意的是,page cache 默认是 4kb,RocketMQ 在 commit 和 flush 操作时会尽量凑齐 4*4kb 触发,减少与磁盘的交互次数。

MappedFileQueue

RocketMQ 中的 CommitLog 会有多个,在启动时以集合的形式加载,在使用时路由到指定的 MappedFile 进行处理。

broker 文件写入流程

broker 通过 Netty server 接收到 client 的请求消息后,会触发将消息写入磁盘的过程。

构建消息体

broker 在拿到数据后会进行基础校验,比如对 broker 状态、消息长度以及属性进行校验,确保消息能够被写入磁盘中。

校验通过后,broker 就开始构建存入磁盘的消息体。消息体是不定长的,由消息描述信息和消息体组成。消息格式如下图所示:

RocketMQ 源码探究 -- broker 中存储实现

CommitLog 消息体格式

将所有字段填充完毕后,写入到 ByteBuffer 里,更新 wrotePosition

刷盘操作

刷盘就是将 page cache 中的数据写入到磁盘的过程,根据刷盘的时机可以分为同步和异步两种刷盘方式。

RocketMQ 源码探究 -- broker 中存储实现

ServiceThread 简介

ServiceThread 是 RocketMQ 中基础的线程实现类,通过 waitForRunning 方法定义了等待执行的主体框架。其实现类在其 run 方法中调用该方法完成特定时间的等待机制。


protected final CountDownLatch2 waitPoint = new CountDownLatch2(1);
protected volatile AtomicBoolean hasNotified = new AtomicBoolean(false);

protected void waitForRunning(long interval) {
    if (hasNotified.compareAndSet(true, false)) {
        this.onWaitEnd();
        return;
    }
  
    //entry to wait
    waitPoint.reset();
  
    try {
          waitPoint.await(interval, TimeUnit.MILLISECONDS);
    } catch (InterruptedException e) {
        log.error("Interrupted", e);
    } finally {
        hasNotified.set(false);
        this.onWaitEnd();
    }
}

同步刷盘

GroupCommitService 是同步刷盘的实现。

同步刷盘有两种策略,当配置 *PROPERTY_WAIT_STORE_MSG_OK* = "WAIT" 时,代表需要等到消息实际落盘后才算提交成功。

我们这里只分析配置该参数的情况,不配置的话同步刷盘跟下面介绍的异步刷盘基本一致,只是刷新间隔(10ms)短了些而已。

GroupCommitService 里维护了两个链表。

class GroupCommitService extends FlushCommitLogService {
   private volatile LinkedList<GroupCommitRequest> requestsWrite = new LinkedList<>();
   private volatile LinkedList<GroupCommitRequest> requestsRead = new LinkedList<>();
}

当调用 putRequest 时,会往写请求链表中追加数据,并调用 waitPoint.countDown() 跳过 ServiceThread 的等待。同时会交换读写链表,将写链表中的数据全部转移到读链表中。

private void swapRequests() {
  lock.lock();
  try {
    LinkedList<GroupCommitRequest> tmp = this.requestsWrite;
    this.requestsWrite = this.requestsRead;
    this.requestsRead = tmp;
   } finally {
     lock.unlock();
   }
}

RocketMQ 通过读写链表的设计降低了消息生产者和持久化消费者的锁竞争。

之后就是遍历读链表,将对其中的每一条数据强制进行 flush 操作。

boolean flushOK = CommitLog.this.mappedFileQueue.getFlushedWhere() >= req.getNextOffset();
// 重试两次。
for(int i = 0; i < 2 && !flushOK; i++) {
  CommitLog.this.mappedFileQueue.flush(0);
  flushOK = CommitLog.this.mappedFileQueue.getFlushedWhere() >= req.getNextOffset();
}

这里的 flush(0) 代表强制刷新,如果传入其它值,则会判断当前 writePosition 和 flushPostion 之间的数据是否能凑够传入值的 page。如果够,则触发刷新,否则不做操作。

异步刷盘

异步刷盘也分为两种策略,如果启用了暂存池,需要多一个 commit 的过程,将堆外内存的数据 commit 到 FileChannel 里,然后再触发 flush 操作。

异步刷盘有三个核心配置:每隔多长时间刷盘一次(500ms)、每次至少刷多少页(4)、每隔多长时间触发全量刷盘(10s)。根据这些配置我们也可以推断出 RocketMQ 最多会丢 500ms 的请求数据,最多可能丢 4 * 4kb 的数据,最大的时间延迟为 10s。

具体到实现上,每次 await 阻塞 500ms,唤醒后触发刷盘操作。记录上次全量刷盘时间,如果当前时间大于上次全量刷盘时间 + 10s,则触发全量刷盘操作 flush(0)。

总结

  • 顺序写优于随机写。
  • RocketMQ 通过切分 Commitlog、引入 Mmap、使用 SSD 等方式解决随机读问题。
  • 使用对 Mmap、Page cache 能极大提升文件写入效率。
  • 刷盘策略实际上是一个 trade-off 的过程。
转载自:https://juejin.cn/post/7374685303884693556
评论
请登录