RocketMQ 源码探究 -- broker 中存储实现
前面我们探究了 name server 和 producer 的核心设计,今天我们开始对 broker 中的文件存储部分进行深入研究。
存储结构
RocketMQ 消息存储架构
Producer 向 broker 发送消息,消息被封装后存储到 CommitLog 中,落盘成功后再分发到 ConsumeQueue 和 IndexFile。Consumer 在消费消息时,向 ConsumeQueue 请求消息数据,再根据 ConsumeQueue 的消息描述数据向 CommitLog 请求具体的消息数据,完成消费过程。
RocketMQ 的核心文件包括:CommitLog、ConsumeQueue 和 IndexFile。
- CommitLog 是消息主体以及元数据主体存储单元。消息数据全部存储于CommitLog 中。默认大小为 1G,当文件写满后,再创建新的文件。
- ConsumeQueue 是消息消费队列,存储 CommitLog 中消息的位置信息,可以看做 CommitLog 的索引文件。
- IndexFile(索引文件)提供了一种可以通过key或时间区间来查询消息的方法。
RocketMQ 采用混合型存储结构。在 broker 实例下共用同一个 CommitLog,采用顺序写的方式,有着不错的写入性能。在消费消息时,如果消费者直接去 CommitLog 遍历查找(随机读)会明显降低消费性能(顺序写:600MB/s,随机读:100KB/s)RocketMQ 引入 ConsumeQueue 来简化文件的寻址操作。
文件基本操作
对文件的操作不外乎读和写。
从宏观层面看,RocketMQ 整体是采用顺序写和随机读的方式。对 CommitLog 的写入是顺序的,但读取是随机的。
从微观角度看,RocketMQ 通过 mmap 的方式读文件,通过 mmap write 或 FileChannel write 的方式写文件。
MappedFile
三个文件:MappedFile 初始化时会将 CommitLog、ConsumeQueue 和 IndexFile 通过 mmap 的方式加载,并初始化类中文件相关的属性。
两种写入方式:MappedFile 封装了 FileChannel 和 mmap 两种文件写入方式。
三个文件指针:MappedFile 用 wrotePosition、committedPosition 和 flushedPosition 三个位置指针来标识文件写入的进度。
加载文件
MappedFile 在启动时会将目录下的 CommitLog、ConsumeQueue 和 IndexFile 通过 mmap 的方式载入,并将类中文件相关属性(文件名、文件大小、偏移量等数据)进行初始化操作。
文件 commit
当transientStorePoolEnable
为 true 时,RocketMQ 采用 FileChannel 写方式。
MappedFile 在初始化时,从预分配好的堆外内存集合中获取一个 DirectBuffer,消息写入 DirectBuffer。当 commit 时,会将 DirectBuffer 的数据 write 到 FileChannel 里,并更新 committedPosition,供 FileChannel 的 write 调用。
文件 flush
无论是 mmap 还是 FileChannel 写都是写入到 page cache 中,操作系统会按照一定的调度规则将 page cache 中的数据刷入磁盘。可通过 sysctl -a | grep dirty
查看操作系统刷脏页的配置,默认 5s 刷新一次。
Java 也提供了强制刷新的 API,RocketMQ 会按照配置(同步 or 异步,时间频次)来触发刷盘操作。
需要注意的是,page cache 默认是 4kb,RocketMQ 在 commit 和 flush 操作时会尽量凑齐 4*4kb 触发,减少与磁盘的交互次数。
MappedFileQueue
RocketMQ 中的 CommitLog 会有多个,在启动时以集合的形式加载,在使用时路由到指定的 MappedFile 进行处理。
broker 文件写入流程
broker 通过 Netty server 接收到 client 的请求消息后,会触发将消息写入磁盘的过程。
构建消息体
broker 在拿到数据后会进行基础校验,比如对 broker 状态、消息长度以及属性进行校验,确保消息能够被写入磁盘中。
校验通过后,broker 就开始构建存入磁盘的消息体。消息体是不定长的,由消息描述信息和消息体组成。消息格式如下图所示:
CommitLog 消息体格式
将所有字段填充完毕后,写入到 ByteBuffer 里,更新 wrotePosition
。
刷盘操作
刷盘就是将 page cache 中的数据写入到磁盘的过程,根据刷盘的时机可以分为同步和异步两种刷盘方式。
ServiceThread 简介
ServiceThread 是 RocketMQ 中基础的线程实现类,通过 waitForRunning
方法定义了等待执行的主体框架。其实现类在其 run 方法中调用该方法完成特定时间的等待机制。
protected final CountDownLatch2 waitPoint = new CountDownLatch2(1);
protected volatile AtomicBoolean hasNotified = new AtomicBoolean(false);
protected void waitForRunning(long interval) {
if (hasNotified.compareAndSet(true, false)) {
this.onWaitEnd();
return;
}
//entry to wait
waitPoint.reset();
try {
waitPoint.await(interval, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
log.error("Interrupted", e);
} finally {
hasNotified.set(false);
this.onWaitEnd();
}
}
同步刷盘
GroupCommitService
是同步刷盘的实现。
同步刷盘有两种策略,当配置 *PROPERTY_WAIT_STORE_MSG_OK* = "WAIT"
时,代表需要等到消息实际落盘后才算提交成功。
我们这里只分析配置该参数的情况,不配置的话同步刷盘跟下面介绍的异步刷盘基本一致,只是刷新间隔(10ms)短了些而已。
GroupCommitService
里维护了两个链表。
class GroupCommitService extends FlushCommitLogService {
private volatile LinkedList<GroupCommitRequest> requestsWrite = new LinkedList<>();
private volatile LinkedList<GroupCommitRequest> requestsRead = new LinkedList<>();
}
当调用 putRequest
时,会往写请求链表中追加数据,并调用 waitPoint.countDown()
跳过 ServiceThread
的等待。同时会交换读写链表,将写链表中的数据全部转移到读链表中。
private void swapRequests() {
lock.lock();
try {
LinkedList<GroupCommitRequest> tmp = this.requestsWrite;
this.requestsWrite = this.requestsRead;
this.requestsRead = tmp;
} finally {
lock.unlock();
}
}
RocketMQ 通过读写链表的设计降低了消息生产者和持久化消费者的锁竞争。
之后就是遍历读链表,将对其中的每一条数据强制进行 flush 操作。
boolean flushOK = CommitLog.this.mappedFileQueue.getFlushedWhere() >= req.getNextOffset();
// 重试两次。
for(int i = 0; i < 2 && !flushOK; i++) {
CommitLog.this.mappedFileQueue.flush(0);
flushOK = CommitLog.this.mappedFileQueue.getFlushedWhere() >= req.getNextOffset();
}
这里的 flush(0) 代表强制刷新,如果传入其它值,则会判断当前 writePosition 和 flushPostion 之间的数据是否能凑够传入值的 page。如果够,则触发刷新,否则不做操作。
异步刷盘
异步刷盘也分为两种策略,如果启用了暂存池,需要多一个 commit 的过程,将堆外内存的数据 commit 到 FileChannel 里,然后再触发 flush 操作。
异步刷盘有三个核心配置:每隔多长时间刷盘一次(500ms)、每次至少刷多少页(4)、每隔多长时间触发全量刷盘(10s)。根据这些配置我们也可以推断出 RocketMQ 最多会丢 500ms 的请求数据,最多可能丢 4 * 4kb 的数据,最大的时间延迟为 10s。
具体到实现上,每次 await 阻塞 500ms,唤醒后触发刷盘操作。记录上次全量刷盘时间,如果当前时间大于上次全量刷盘时间 + 10s,则触发全量刷盘操作 flush(0)。
总结
- 顺序写优于随机写。
- RocketMQ 通过切分 Commitlog、引入 Mmap、使用 SSD 等方式解决随机读问题。
- 使用对 Mmap、Page cache 能极大提升文件写入效率。
- 刷盘策略实际上是一个 trade-off 的过程。
转载自:https://juejin.cn/post/7374685303884693556