likes
comments
collection
share

RocketMQ源码系列(7) — 消息存储之刷盘机制

作者站长头像
站长
· 阅读数 21

上一篇文章主要介绍了消息通过 CommitLog 写入 MappedFile 的过程,但这其中还有很多关键的点没有介绍,比如消息写入到写缓冲区 writeBuffer 后,怎么同步到文件 FileChannel,之后又怎么刷到磁盘的?CommitLog 文件数量不可能无限增长,过期机制是什么?宕机了数据怎么恢复等等?

刷盘机制

上一篇文章提到,CommitLog 写入消息是从 MappedFileQueue 中获取最后一个 MappedFile,然后将消息追加到 MappedFile(appendMessage)中。MappedFile 在创建时,如果开启了瞬时存储池(TransientStorePool),就会从中取一块 ByteBuffer 作为写缓冲区(writeBuffer)。

MappedFile 初始化的时候,MappedFile 通过 RandomAccessFile 关联到磁盘上的 commitlog 文件,然后再拿到 FileChannel 通道,然后通过 map() 拿到了内存映射区 MappedByteBuffer,这样读写就会更快。

那么这里就有两个问题,一是写缓冲区什么时候同步到 FileChannel?二是 FileChannel 什么时候刷到磁盘文件?

RocketMQ源码系列(7) — 消息存储之刷盘机制

提交 flush 请求

CommitLog 在创建时会创建两个组件:flushCommitLogServicecommitLogService,看名字就大概知道是在做同步和刷盘的事情。

public class CommitLog {
    private final FlushCommitLogService flushCommitLogService;
    private final FlushCommitLogService commitLogService;

    public CommitLog(final DefaultMessageStore defaultMessageStore) {
        if (FlushDiskType.SYNC_FLUSH == defaultMessageStore.getMessageStoreConfig().getFlushDiskType()) {
            this.flushCommitLogService = new GroupCommitService();
        } else {
            this.flushCommitLogService = new FlushRealTimeService();
        }

        this.commitLogService = new CommitRealTimeService();
    }
}

flushCommitLogService 创建时有同步和异步两种方式,默认配置是异步flush(ASYNC_FLUSH)。同步flush对象是 GroupCommitService,异步flush对象是 FlushRealTimeService

在向 CommitLog 写入消息时(asyncPutMessage),会在写入 MappedFile 之后提交 flushreplica 请求,然后同步等待 flush 和 replica 完成之后才会返回追加消息的结果。

public CompletableFuture<PutMessageResult> asyncPutMessage(final MessageExtBrokerInner msg) {
    // 写入 MappedFile
    AppendMessageResult result = mappedFile.appendMessage(msg, this.appendMessageCallback, putMessageContext);

    // 提交 flush 和 replica 请求
    CompletableFuture<PutMessageStatus> flushResultFuture = submitFlushRequest(result, msg);
    CompletableFuture<PutMessageStatus> replicaResultFuture = submitReplicaRequest(result, msg);

    PutMessageResult putMessageResult = new PutMessageResult(PutMessageStatus.PUT_OK, result);
    // 等待 flush 和 replica 请求完成
    return flushResultFuture.thenCombine(replicaResultFuture, (flushStatus, replicaStatus) -> {
        putMessageResult.setPutMessageStatus(flushStatus);
        putMessageResult.setPutMessageStatus(replicaStatus);
        return putMessageResult;
    });
}

首先看提交 flush 请求的方法 submitFlushRequest,可以看到它分同步和异步 flush。

同步 flush

  1. 如果是同步 flush,则使用 GroupCommitService 来提交请求。接着判断是否等等消息存储完成,这个在构建 Message 时默认是 true

  2. 然后创建 GroupCommitRequest,第一个参数是 nextOffset,第二个参数 syncFlushTimeout 同步超时时间默认是5秒

    result 中 wroteOffset = fileFromOffset + byteBuffer.position(),也就是这条消息开始写入的物理偏移量。

    然后 nextOffset = wroteOffset + wroteBytes,就是说下一个偏移量 = 等于写入起始偏移量 + 写入的字节数,看的出来 nextOffset 其实就是写入消息后的偏移量位置。

  3. 然后将 GroupCommitRequest 添加到 FlushDiskWatcher 监视器中。

  4. 最后才是将 GroupCommitRequest 提交到 GroupCommitService 中,然后返回 Future 对象。这个 future 对象就是同步的关键,GroupCommitService 提交请求后,处理完请求会将flush结果通过 GroupCommitRequest 的 wakeupCustomer 来传递,future 就会同步拿到处理结果。

异步 flush

  1. 如果未启用瞬时存储池技术,则调用 flushCommitLogService(FlushRealTimeService)唤醒方法。

  2. 如果启用了瞬时存储池技术,则调用 commitLogService(CommitRealTimeService)唤醒方法。

public CompletableFuture<PutMessageStatus> submitFlushRequest(AppendMessageResult result, MessageExt messageExt) {
    if (FlushDiskType.SYNC_FLUSH == this.defaultMessageStore.getMessageStoreConfig().getFlushDiskType()) {
        final GroupCommitService service = (GroupCommitService) this.flushCommitLogService;
        if (messageExt.isWaitStoreMsgOK()) { // 等待消息OK
            GroupCommitRequest request = new GroupCommitRequest(
                    result.getWroteOffset() + result.getWroteBytes(), 
                    this.defaultMessageStore.getMessageStoreConfig().getSyncFlushTimeout() // 同步刷新超时
            );
            // 放到监视器里
            flushDiskWatcher.add(request);
            // 提交请求
            service.putRequest(request);
            return request.future();
        } else {
            service.wakeup();
            return CompletableFuture.completedFuture(PutMessageStatus.PUT_OK);
        }
    } else {
        // 异步刷盘,不提交请求,由异步刷盘线程后台自动刷盘
        if (!this.defaultMessageStore.getMessageStoreConfig().isTransientStorePoolEnable()) {
            flushCommitLogService.wakeup(); // FlushRealTimeService
        } else {
            commitLogService.wakeup(); // CommitRealTimeService
        }
        return CompletableFuture.completedFuture(PutMessageStatus.PUT_OK);
    }
}

同步 flush

1、FlushDiskWatcher

同步 flush 会将 GroupCommitRequest 添加到刷盘监视器 FlushDiskWatcher,先看一下它主要是干什么的。

FlushDiskWatcher 继承自 ServiceThread,就是说它也是一个独立线程一直在后台运行一个任务。

FlushDiskWatcher 内部有一个 commitRequests 的阻塞队列来放添加进来的 GroupCommitRequest,运行任务就是在消费这个阻塞队列。

在 run 方法中,就是 FlushDiskWatcher 的功能:

  1. 从 commitRequests 取出 GroupCommitRequest,如果这个flush请求还没有完成且已经超时(5秒),就标记为刷盘超时(FLUSH_DISK_TIMEOUT)。

  2. 然后可以看到,如果没有超时,就会通过 sleep 10 毫秒来循环处理。

看下来 FlushDiskWatcher 的功能很简单,就是判断 GroupCommitRequest 是否超时了,超时就直接通知刷盘超时了。

public class FlushDiskWatcher extends ServiceThread {
    // 组提交请求阻塞队列
    private final LinkedBlockingQueue<GroupCommitRequest> commitRequests = new LinkedBlockingQueue<>();

    @Override
    public void run() {
        while (!isStopped()) {
            GroupCommitRequest request = commitRequests.take();
            
            while (!request.future().isDone()) {
                long now = System.nanoTime();
                if (now - request.getDeadLine() >= 0) {
                    request.wakeupCustomer(PutMessageStatus.FLUSH_DISK_TIMEOUT);
                    break;
                }
                long sleepTime = (request.getDeadLine() - now) / 1_000_000;
                sleepTime = Math.min(10, sleepTime);
                if (sleepTime == 0) {
                    request.wakeupCustomer(PutMessageStatus.FLUSH_DISK_TIMEOUT);
                    break;
                }
                Thread.sleep(sleepTime);
            }
        }
    }

    public void add(GroupCommitRequest request) {
        commitRequests.add(request);
    }
}

2、GroupCommitService

GroupCommitService 继承自 FlushCommitLogService,它又继承自 ServiceThread,也就是说 GroupCommitService 也是一个线程在跑任务。

GroupCommitService 有两个集合 requestsWriterequestsRead,添加请求的时候放入 requestsWrite 中。run 方法中 waitForRunning() 结束后会调用 onWaitEnd(),这时就会调用 swapRequests() 来交换 requestsWrite 和 requestsRead 两个集合,实现写和读的分离。这种读写分离的设计可以在高并发写入消息的时候避免读写并发的问题,同时也可以同时进行 flush 刷盘和提交 flush请求,提升并发性能。

在 run 方法中,会调用 doCommit 来消费 requestsRead 中的 GroupCommitRequest 请求。它首先判断 mappedFileQueue 中的 flush 偏移量(flushedWhere)是否大于这条消息的写入偏移量(nextOffset),如果是,那么说明这条消息已经被 flush 过了,否则就需要执行 flush 操作。

class GroupCommitService extends FlushCommitLogService {
    private volatile LinkedList<GroupCommitRequest> requestsWrite = new LinkedList<GroupCommitRequest>();
    private volatile LinkedList<GroupCommitRequest> requestsRead = new LinkedList<GroupCommitRequest>();
    private final PutMessageSpinLock lock = new PutMessageSpinLock();

    public synchronized void putRequest(final GroupCommitRequest request) {
        lock.lock();
        try {
            this.requestsWrite.add(request);
        } finally {
            lock.unlock();
        }
        this.wakeup();
    }

    private void swapRequests() {
        lock.lock();
        try {
            LinkedList<GroupCommitRequest> tmp = this.requestsWrite;
            this.requestsWrite = this.requestsRead;
            this.requestsRead = tmp;
        } finally {
            lock.unlock();
        }
    }

    private void doCommit() {
        for (GroupCommitRequest req : this.requestsRead) {
            boolean flushOK = CommitLog.this.mappedFileQueue.getFlushedWhere() >= req.getNextOffset();
            for (int i = 0; i < 2 && !flushOK; i++) {
                // 刷盘
                CommitLog.this.mappedFileQueue.flush(0);
                flushOK = CommitLog.this.mappedFileQueue.getFlushedWhere() >= req.getNextOffset();
            }
            req.wakeupCustomer(flushOK ? PutMessageStatus.PUT_OK : PutMessageStatus.FLUSH_DISK_TIMEOUT);
        }

        long storeTimestamp = CommitLog.this.mappedFileQueue.getStoreTimestamp();
        if (storeTimestamp > 0) {
            CommitLog.this.defaultMessageStore.getStoreCheckpoint().setPhysicMsgTimestamp(storeTimestamp);
        }
        this.requestsRead = new LinkedList<>();
    }

    public void run() {
        while (!this.isStopped()) {
            this.waitForRunning(10);
            // 提交
            this.doCommit();
        }
    }

    @Override
    protected void onWaitEnd() {
        this.swapRequests();
    }
}

可以看到它其实就是在调用 mappedFileQueue 的 flush() 方法来执行刷盘。flush 之后会再次判断 flushedWhere 是否大于 nextOffset,如果不是则说明还没 flush 到 nextOffset 的位置,因为一次 flush 只会刷一个 MappedFile,所以如果 flushedWhere 在上一个 MappedFile,nextOffset 已经到了下一个 MappedFile 时,会再 flush 一次,这次一般都会 flush 成功,然后唤醒消费者,提醒 flush 成功(PUT_OK)。

flush 完了之后,还会更新存储检查点 StoreCheckpoint 的 physicMsgTimestamp 为 MappedFileQueue 的 storeTimestamp 时间,StoreCheckpoint 我们后面再来看。

RocketMQ源码系列(7) — 消息存储之刷盘机制

异步 flush

如果是异步 flush(ASYNC_FLUSH),且没有启用瞬时存储池(TransientStorePool),就唤醒 FlushRealTimeService;如果启用了瞬时存储池,就唤醒 CommitRealTimeService,这两个组件都是一个 ServiceThread,作为异步线程一直在运行一个任务。

1、FlushRealTimeService

从 FlushRealTimeService 的源码可以了解到,它是默认每 500ms 就会执行一次 MappedFileQueue 的 flush 方法,flush 的参数 flushLeastPages 表示至少刷多少缓存页,默认是 4 页,这个其实是表示至少有4页的脏数据才会 flush,否则不会执行 flush。

它还有一个机制是每隔10秒,会将刷盘页数设置为 0,刷盘页为0时,就不管有多少页脏数据,直接刷盘,是一个强制刷盘的操作,可以避免一些小的数据长时间没有同步到磁盘。

最后,如果程序正常下线,还会强制刷盘,确保缓存页的数据flush到磁盘中。

整体看下来,FlushRealTimeService 其实就是在异步的每隔500毫秒 flush 一次,每隔 10 秒强制 flush 一次。

class FlushRealTimeService extends FlushCommitLogService {
    // 最近一次刷盘时间
    private long lastFlushTimestamp = 0;

    public void run() {
        while (!this.isStopped()) {
            // 定时刷盘 CommitLog,默认开启
            boolean flushCommitLogTimed = CommitLog.this.defaultMessageStore.getMessageStoreConfig().isFlushCommitLogTimed();
            // 刷盘间隔时间,默认 500ms
            int interval = CommitLog.this.defaultMessageStore.getMessageStoreConfig().getFlushIntervalCommitLog();
            // 每次刷盘页数,默认 4
            int flushPhysicQueueLeastPages = CommitLog.this.defaultMessageStore.getMessageStoreConfig().getFlushCommitLogLeastPages();
            // 强制物理刷盘间隔时间,默认 10s
            int flushPhysicQueueThoroughInterval = CommitLog.this.defaultMessageStore.getMessageStoreConfig().getFlushCommitLogThoroughInterval();

            long currentTimeMillis = System.currentTimeMillis();
            // 隔了10秒
            if (currentTimeMillis >= (this.lastFlushTimestamp + flushPhysicQueueThoroughInterval)) {
                this.lastFlushTimestamp = currentTimeMillis;
                flushPhysicQueueLeastPages = 0;
            }
            try {
                if (flushCommitLogTimed) {
                    Thread.sleep(interval); // 休眠 500ms
                } else {
                    this.waitForRunning(interval); // 利用 CountDownLatch 来等待
                }
                // 刷盘
                CommitLog.this.mappedFileQueue.flush(flushPhysicQueueLeastPages);
                // 存储时间
                long storeTimestamp = CommitLog.this.mappedFileQueue.getStoreTimestamp();
                // 设置检查点
                if (storeTimestamp > 0) {
                    CommitLog.this.defaultMessageStore.getStoreCheckpoint().setPhysicMsgTimestamp(storeTimestamp);
                }
            } catch (Throwable e) {}
        }
        // Normal shutdown, to ensure that all the flush before exit
        boolean result = false;
        for (int i = 0; i < RETRY_TIMES_OVER && !result; i++) {
            result = CommitLog.this.mappedFileQueue.flush(0);
        }
    }
}

2、CommitRealTimeService

启用了 TransientStorePool 时,唤醒的是 CommitRealTimeService,从它的源码可以了解到,它就是每隔200ms执行一次,调用 MappedFileQueue 的 commit() 方法,每次默认也是 至少有4页才会commit。

但是可以看到它会每隔200ms将页数设置为0,强制提交一次,这其实就是近乎实时的执行 commit。这样能尽可能保证写缓冲区的数据及时提交到 FileChannel 中,避免数据丢失。如果在高并发情况下,想提高commit性能,可以调整 commitCommitLogThoroughInterval 参数,比如每 10s 强制 commit 一次。

commit 数据之后,就会唤醒 FlushRealTimeService 来执行一次 flush,保证数据刷到磁盘。

class CommitRealTimeService extends FlushCommitLogService {
    private long lastCommitTimestamp = 0;

    @Override
    public void run() {
        while (!this.isStopped()) {
            // 提交间隔时间(200ms)
            int interval = CommitLog.this.defaultMessageStore.getMessageStoreConfig().getCommitIntervalCommitLog();
            // 至少提交的页数(4)
            int commitDataLeastPages = CommitLog.this.defaultMessageStore.getMessageStoreConfig().getCommitCommitLogLeastPages();
            // 提交间隔时间阈值(200ms)
            int commitDataThoroughInterval = CommitLog.this.defaultMessageStore.getMessageStoreConfig().getCommitCommitLogThoroughInterval();

            long begin = System.currentTimeMillis();
            if (begin >= (this.lastCommitTimestamp + commitDataThoroughInterval)) {
                this.lastCommitTimestamp = begin;
                commitDataLeastPages = 0;
            }
            try {
                // MappedFile 提交
                boolean result = CommitLog.this.mappedFileQueue.commit(commitDataLeastPages);
                long end = System.currentTimeMillis();
                if (!result) {
                    this.lastCommitTimestamp = end; 
                    flushCommitLogService.wakeup();
                }
                // 等待200ms
                this.waitForRunning(interval);
            } catch (Throwable e) {}
        }

        boolean result = false;
        for (int i = 0; i < RETRY_TIMES_OVER && !result; i++) {
            result = CommitLog.this.mappedFileQueue.commit(0);
        }
    }
}

这块代码看下来我们大概能了解到 flush 相关的核心组件是 GroupCommitService、FlushRealTimeService、CommitRealTimeService。默认配置是异步flush,也就是由 FlushRealTimeService 每隔 500ms 去flush一次。如果启用了 TransientStorePool,则由 CommitRealTimeService 先执行 commit,再由 FlushRealTimeService 执行 flush。

而最后是由 MappedFileQueue 来完成刷盘的操作,与之相关的就是 commit()flush() 方法。

RocketMQ源码系列(7) — 消息存储之刷盘机制

同步写缓冲区

1、MappedFileQueue commit

从上面的图可以看到,同步flush下是不需要commit的,同时也不会开启 TransientStorePool。如果为了提升性能,可以启用异步flush且开启TransientStorePool,然后就需要 commit 操作来将写缓冲区数据同步到 FileChannel,再flush数据到磁盘。

MappedFileQueue 中有三个属性,注意这三个属性是 CommitLog 维度的,一个 CommitLog 下可能有多个 commitlog 文件。

  • flushedWhere: 刷新位置
  • committedWhere: 提交位置
  • storeTimestamp: 最新存储时间

可以看出,commit 方法就是根据当前的提交位置(committedWhere)找到对应的 MappedFile,然后调用 MappedFile 的commit方法来提交,它会返回提交后的偏移量(offset),那么最新的提交位置 committedWhere = fileFromOffset + offset。然后返回的结果就是提交后的位置与之前的提交位置是否一样,一样则说明没有数据提交,不一样则说明提交了新的数据。

public class MappedFileQueue {
    protected long flushedWhere = 0;
    private long committedWhere = 0;
    private volatile long storeTimestamp = 0;
    
    public boolean commit(final int commitLeastPages) {
        boolean result = true;
        MappedFile mappedFile = this.findMappedFileByOffset(this.committedWhere, this.committedWhere == 0);
        if (mappedFile != null) {
            int offset = mappedFile.commit(commitLeastPages);
            long where = mappedFile.getFileFromOffset() + offset;
            result = where == this.committedWhere;
            this.committedWhere = where;
        }
        return result;
    }
}

再来看根据 offset 找 MappedFile 的方法:

  1. 首先获取第一个和最后一个 MappedFile,然后检查 offset 的合法性,offset 必须大于第一个 MappedFile 的起始偏移量,且小于最后一个 MappedFile 的末尾偏移量。

    这里需要注意的是,MappedFile 的起始偏移量 fileFromOffset 等于 commitlog 的文件名,第一个 MappedFile 的起始偏移量不会一直都是 0,因为 commitlog 会不断增加,而前面的 commitlog 文件会过期删除,所以第一个 MappedFile 是会变化的。

  2. 接着会去计算一个index: index = (offset - fileFromOffset) / mappedFileSize

    就是用 offset 减去第一个 MappedFile 的 fileFromOffset,再除以文件大小,就得到 offset 对应的索引位置,就找到了 offset 所在的 MappedFile。

  3. 最后就会检查下 offset 是否在选出来的 MappedFile 的区间,如果在就找到了;如果不在,还会遍历每个 MappedFile 去判断下偏移量区间,如果还是没有,则根据参数看是否返回第一个 MappedFile。

public MappedFile findMappedFileByOffset(final long offset, final boolean returnFirstOnNotFound) {
    MappedFile firstMappedFile = this.getFirstMappedFile();
    MappedFile lastMappedFile = this.getLastMappedFile();

    if (firstMappedFile != null && lastMappedFile != null) {
        if (offset < firstMappedFile.getFileFromOffset() || offset >= lastMappedFile.getFileFromOffset() + this.mappedFileSize) {
            LOG_ERROR.warn("Offset not matched. Request offset: {}");
        } else {
            int index = (int) ((offset / this.mappedFileSize) - (firstMappedFile.getFileFromOffset() / this.mappedFileSize));
            MappedFile targetFile = this.mappedFiles.get(index);

            // 偏移量在这个文件中,偏移量大于文件的起始偏移量,小于总偏移量
            if (targetFile != null && offset >= targetFile.getFileFromOffset() && offset < targetFile.getFileFromOffset() + this.mappedFileSize) {
                return targetFile;
            }
            // 挨个计算偏移量
            for (MappedFile tmpMappedFile : this.mappedFiles) {
                if (offset >= tmpMappedFile.getFileFromOffset() && offset < tmpMappedFile.getFileFromOffset() + this.mappedFileSize) {
                    return tmpMappedFile;
                }
            }
        }
        if (returnFirstOnNotFound) {
            return firstMappedFile;
        }
    }
    return null;
}

2、MappedFile commit

再来看 MappedFile,首先要记住 MappedFile 中的几个属性,这几个都是相对于当前 MappedFile,即单个 commitlog 文件的维度。

  • wrotePosition:当前写入位置
  • committedPosition:当前 commit 位置
  • flushedPosition:当前 flush 位置
  • fileFromOffset:文件的起始偏移量

然后还有 MappedFile 关联的 FileChannel,这个就是用来读写磁盘文件的,以及通过MMAP技术映射出来的内存映射缓冲区 mappedByteBuffer;如果开启了 TransientStorePool,写入消息就会先写入 writeBuffer 写缓冲区。

public class MappedFile extends ReferenceResource {
    // 写入位置
    protected final AtomicInteger wrotePosition = new AtomicInteger(0);
    // 提交位置
    protected final AtomicInteger committedPosition = new AtomicInteger(0);
    // flush 位置
    private final AtomicInteger flushedPosition = new AtomicInteger(0);

    // FileChannel
    protected FileChannel fileChannel;
    // 磁盘内存映射字节数据
    private MappedByteBuffer mappedByteBuffer;
    // 写缓冲区
    protected ByteBuffer writeBuffer = null;
    // 文件从哪个偏移量开始
    private long fileFromOffset;
    // 文件大小
    protected int fileSize;
}

然后看 MappedFile 的 commit 方法:

  1. 如果 writeBuffer 为空,就是未开启 TransientStorePool 时,就不需要 commit 操作,直接返回 wrotePosition 的位置。

  2. 然后判断是否能够提交,如果可以,就 hold 当前 MappedFile 资源引用,然后执行 commit0(),最后再 release 释放引用。

  3. 提交后,最后会判断提交位置是否等于当前文件大小,如果是的,说明当前文件已经写满了,就不会再向 writeBuffer 中写入数据了,那么就会释放写缓冲区,将其归还到 TransientStorePool 中,以便再次使用。

  4. 最后返回 committedPosition 的值,即提交位置。

public int commit(final int commitLeastPages) {
    if (writeBuffer == null) {
        return this.wrotePosition.get();
    }
    if (this.isAbleToCommit(commitLeastPages)) {
        if (this.hold()) {
            commit0();
            this.release();
        }
    }

    if (writeBuffer != null && this.transientStorePool != null && this.fileSize == this.committedPosition.get()) {
        this.transientStorePool.returnBuffer(writeBuffer);
        this.writeBuffer = null;
    }
    return this.committedPosition.get();
}

再来看下如何判断是否可以 commit:

  1. 首先如果文件已经写满了,则可以提交

  2. 如果 commitLeastPages > 0,就会计算当前是否有足够提交的页数 (wrotePosition - committedPosition) / OS_PAGE_SIZE,有足够的页数就可以提交。

    这里可以知道,committedPosition 到 wrotePosition 之间的数据都是脏数据,还没有提交;而每次提交都是以系统缓存页为单位(默认4KB)来提交,而一次默认至少提交4页,否则不会提交。

  3. 最后,commitLeastPages = 0,也就是前面看到的 commit(0),这时只要 wrotePosition > committedPosition,也就是有脏数据,就提交。也就是说 commit(0) 表示强制提交。

protected boolean isAbleToCommit(final int commitLeastPages) {
    int commit = this.committedPosition.get();
    int write = this.wrotePosition.get();

    if (this.isFull()) {
        return true;
    }

    if (commitLeastPages > 0) {
        return ((write / OS_PAGE_SIZE) - (commit / OS_PAGE_SIZE)) >= commitLeastPages;
    }
    return write > commit;
}

public boolean isFull() {  
    return this.fileSize == this.wrotePosition.get();  
}

最后来看真正的commit操作,首先通过 writeBuffer.slice() 拿到写缓冲区的一个视图,然后将位置锁定在 committedPosition 和 wrotePosition 之间,然后 FileChannel 的位置也定位到 committedPosition,即最后一次提交的位置,然后将写缓冲区的数据写入 FileChannel 中,这就完成了 writeBuffer 同步到 FileChannel 的操作。

最后更新 committedPosition 为 wrotePosition,也就是说 commit 操作会将 committedPosition 到 wrotePosition 之间的脏数据全部写到 FileChannel 中,将写缓冲区数据同步到 FileChannel。

protected void commit0() {
    int writePos = this.wrotePosition.get();
    int lastCommittedPosition = this.committedPosition.get();

    ByteBuffer byteBuffer = writeBuffer.slice();
    byteBuffer.position(lastCommittedPosition);
    byteBuffer.limit(writePos);
    // 写缓冲区 reput 到 FileChannel
    this.fileChannel.position(lastCommittedPosition);
    this.fileChannel.write(byteBuffer);

    // 更新提交位置
    this.committedPosition.set(writePos);
}

flush 刷盘

再来看 MappedFileQueue 的flush操作:

  1. 首先根据当前 flushedWhere 找到所在的 MappedFile,然后就调用 MappedFile 的 flush 方法执行刷盘操作。

  2. flush 完之后,更新 flushedWhere = fileFromOffset + offset,最后返回是否 flush 了数据。

public boolean flush(final int flushLeastPages) {
    boolean result = true;
    MappedFile mappedFile = this.findMappedFileByOffset(this.flushedWhere, this.flushedWhere == 0);

    if (mappedFile != null) {
        // 刷盘
        long tmpTimeStamp = mappedFile.getStoreTimestamp();
        int offset = mappedFile.flush(flushLeastPages);
        // 刷盘后的位置
        long where = mappedFile.getFileFromOffset() + offset;
        result = where == this.flushedWhere;
        // 刷盘位置
        this.flushedWhere = where;
        // 10秒一次的强制刷盘,更新存储时间
        if (0 == flushLeastPages) {
            this.storeTimestamp = tmpTimeStamp;
        }
    }

    return result;
}

MappedFile 的 flush 操作:

  1. 同 commit,判断是否能够 flush,不可以就直接返回 flushedPosition。

  2. 可以 flush,就 hold 当前 MappedFile,然后获取 readPosition,这个值在没有 writeBuffer 时,就是 wrotePosition 的值,否则就是 committedPosition 的值。也就是说如果开启了写缓冲区,就应该从 committedPosition 开始 flush,否则应该从 wrotePosition 开始 flush。

  3. 如果有 writeBuffer,就会调用 FileChannel.force(false) 来刷盘,否则调用 MappedByteBuffer.force() 刷盘。

  4. 刷盘完成之后,就会更新 flushedPosition 为最新的位置,最后 release 释放引用。

public int flush(final int flushLeastPages) {
    if (this.isAbleToFlush(flushLeastPages)) {
        if (this.hold()) {
            int value = getReadPosition();
            if (writeBuffer != null || this.fileChannel.position() != 0) {
                this.fileChannel.force(false);
            } else {
                this.mappedByteBuffer.force();
            }

            // 设置刷盘位置
            this.flushedPosition.set(value);
            // 释放
            this.release();
        }
    }
    return this.getFlushedPosition();
}

public int getReadPosition() {
    return this.writeBuffer == null ? this.wrotePosition.get() : this.committedPosition.get();
}

总结

1、偏移量关系

经过前面的分析,我们知道 MappedFile 中有 wrotePosition,committedPosition,flushedPosition 三个位置,在 MappedFileQueue 中还有 flushedWhere、committedWhere 两个位置,那么这几个变量的关系是什么呢,看下面那张图来分析下。

  • 首先要清楚,对于单个 MappedFile 来说,wrotePosition,committedPosition,flushedPosition 三个都是相对于单个文件,偏移量从 0 开始增加,直到达到文件的大小。而每个 MappedFile 的起始偏移量 fileFromOffset 都是固定的,比如第一个 MappedFile 是 0,第二个是 1073741824。

  • 当一个 MappedFile 写满之后,且 writeBuffer 已经全部同步到 FileChannel 中,那么 wrotePosition,committedPosition,flushedPosition 三个位置都一样,刚好等于文件大小。然后关联的 writeBuffer 会置空,返还给 TransientStorePool 以复用。

  • 当有新的数据要写入时,先会写入 writeBuffer,写入多少字节,writePosition 就会增长多少, 它代表的就是缓冲区写入的末尾偏移量。

  • 数据从 writeBuffer 拷贝到 FileChannel 时,会直接拷贝到 writePosition 的位置,committedPosition 就代表拷贝数据的末尾偏移量。

  • FileChannel 刷盘时,会直接刷到 committedPosition 的位置,flushedPosition 代表的就是内存数据刷盘到物理磁盘后的末尾偏移量。

  • 而 flushedWhere、committedWhere 是一直在累加的,就算前面的 MappedFile 被删了,这两个值也在一直累加。

RocketMQ源码系列(7) — 消息存储之刷盘机制

注意 commit 其实只针对 writeBuffer,也就是开启了 TransientStorePool 的时候,committedPosition 和 committedWhere 才会生效。

综上可以得到如下的关系:

  • flushedWhere = fileFromOffset + flushedPosition

  • committedWhere = fileFromOffset + committedPosition

  • 开启了 TransientStorePool 时:flushedPosition <= committedPosition <= wrotePosition

  • 未开启 TransientStorePool 时:flushedPosition <= wrotePosition

2、刷盘机制

MappedFile 是对磁盘文件(比如 commitlog)的抽象,它通过 FileChannel 来实现对磁盘文件的读取和写入。基于 JDK NIO 的内存映射技术,通过 FileChannel.map() 方法拿到内存映射缓冲区 mappedByteBuffer。如果是异步刷新且启用了 TransientStorePool,则会关联一块一样大小的写缓冲区 writeBuffer。写入消息时,如果存在 writeBuffer,就会将消息写入 writeBuffer,否则是直接写入 mappedByteBuffer。

如果是写入 writeBuffer,这时就会有一个异步线程 CommitRealTimeService 每隔 200ms 调用 MappedFile 的 commit 方法,将 writeBuffer 新写入的数据重新写入到 FileChannel 中。

默认异步刷新(ASYNC_FLUSH)下,会有一个异步线程 FlushRealTimeService 每隔 500ms 调用 MappedFile 的 flush 方法;而同步刷新(SYNC_FLUSH)下,则由 GroupCommitService 调用 MappedFile 的 flush 方法。

MappedFile 的 flush 方法中,如果之前是写入 writeBuffer 再写入 FileChannel 的,这时会调用 FileChannel 的 force() 方法刷盘;如果没有 writeBuffer,则调用 mappedByteBuffer 的 force() 方法刷盘。

在这里调用 FileChannel.force() 和 mappedByteBuffer.force() 是没什么区别的,FileChannel.map() 内存映射时就是映射的整个文件,而不是文件的一部分。所以调用 FileChannel.write() 写入数据也会反应到 MappedByteBuffer 上。

3、PageCache

可以看到,MappedFile 的 flush 刷盘就是调用 FileChannel 或 MappedByteBuffer 的 force 方法,那为什么要调用这个 force 执行刷盘呢?

这与 PageCache 有关,PageCache 叫页高速缓冲存储器,是操作系统内核的一部分,位于应用和物理存储之间,用于缓存文件系统中的数据。当应用程序读取文件时,操作系统会将文件的数据块(通常是以页为单位,一般为4KB)缓存到 PageCache 中,用于提高文件 I/O 的性能。当应用程序写入文件时,操作系统将数据缓存在 PageCache 中,并延迟将数据写入磁盘,这样可以提高写入性能。

数据在被写入 PageCache 后,并不意味着立即写入了磁盘,为了保证数据的一致性和持久性,PageCache 有以下策略:

  • 延迟写入(Delayed Write):PageCache 将写入的数据暂时缓存在内存中,并将其标记为“已修改”状态。然后,操作系统会根据一定的策略(如内存压力、磁盘空闲等)在合适的时机将已修改的数据批量写入磁盘。这样可以减少频繁的磁盘访问,提高写入性能。

  • 强制刷新(Force Flush):除了延迟写入,PageCache 还提供了强制刷新的机制。应用程序可以显式地调用类似于 fsync() 或 fdatasync() 的函数来要求将数据强制刷新到磁盘,以确保数据的持久性。这会阻塞应用程序,直到数据完全写入磁盘。

MappedFile 是基于内存映射技术(MMAP)来读写文件的,调用 FileChannel.map() 方法时,会创建一个映射缓冲区对象(MappedByteBuffer),并将文件的一部分或整个文件映射到进程的虚拟内存空间中,得到一个与文件相关联的缓冲区。在映射缓冲区创建过程中,操作系统会将映射的文件数据页加载到 PageCache 中,PageCache 中的数据与映射缓冲区之间通过内存映射机制来进行交互。

无论是 MappedByteBuffer 还是 FileChannel 在写入数据时,实际上只是将数据写入 PageCache。当我们将数据写入 PageCache 后,即便我们的应用崩溃了,但是只要系统不崩溃,最终数据也会将数据刷入磁盘。

force 方法就是用于强制将 PageCache 中的修改刷新到磁盘文件,确保所有对文件的更改都被持久化,以便在系统崩溃或断电时不会丢失数据。

RocketMQ源码系列(7) — 消息存储之刷盘机制

转载自:https://juejin.cn/post/7280435431334297655
评论
请登录