likes
comments
collection
share

RocketMQ源码8-broker存储文件组织和内存映射

作者站长头像
站长
· 阅读数 29

本系列RocketMQ4.8注释github地址,希望对大家有所帮助,要是觉得可以的话麻烦给点一下Star哈

1. 存储设计

RocketMQ存储文件主要包括:CommitLog文件、ConsumerQueue文件、Index文件

  • CommitLog文件:所有Topic的消息按照抵达顺序依次追加到CommitLog中,一旦写入不支持修改
  • ConsumeQueue文件:消息消费队列,用于消费者消费,即消费者通过此文件来从CommitLog中获取消息。消息达到CommitLog后,将异步转发到ConsumeQueue文件
  • Index文件:消息索引,主要存储消息key与offset的对应关系

RocketMQ将所有Topic的消息都存储在同一个CommitLog文件中,一般按照Topic来检索消息,所以为了提高消息消费的效率,RocketMQ引入了ConsumeQueue文件(消费队列),每一个Topic包含多个消息消费队列,每一个消费队列都有一个文件。

为了根据消息的属性从CommitLog文件中快速检索消息,RocketMQ引入了Index索引文件。

存储目录为:

RocketMQ源码8-broker存储文件组织和内存映射

1.1 基本原理

  • Page Cache:Page Cache是文件系统层的Cache,主要用来减少对磁盘的I/O操作,通过对磁盘的访问变为物理内存的访问,缓存的是内存页面,操作时按照页为基本单位。在Linux系统中写入数据的时候并不会直接写到硬盘上,而是会先写到Page Cache中,并打上dirty标识,由内核线程flusher定期将被打上dirty的页发送给IO调度层,最后由IO调度决定何时落地到磁盘中,而Linux一般会把还没有使用的内存全拿来给Page Cache使用。而读的过程也是类似,会先到Page Cache中寻找是否有数据,有的话直接返回,如果没有才会到磁盘中去读取并写入Page Cache,然后再次读取Page Cache并返回。而且读的这个过程中操作系统也会有一个预读的操作,你的每一次读取操作系统都会帮你预读出后面一部分数据。当你一直在使用预读数据的时候,系统会帮你预读出更多的数据(最大到128K)。

  • Buffer Cache:Buffer Cache是针对设备的,实际操作按块为基本单位,对于裸盘的读写会占用Buffer Cache,当读写完成之后,会归还给操作系统。

  • 在linux2.4内核中Buffer CachePage Cache是共存的,因为文件的读写最终会转化为块设备的读写,即同一份文件的数据,可能既在Buffer Cache中也在Page Cache中,这样就造成了物理内存的浪费。

  • linux2.6内核对Buffer CachePage Cache进行了合并,统一为Page Cache。当进行文件读写时,如果文件在磁盘上的存储块是连续的,那么文件在Page Cache中对应的页是普通的page,如果文件在磁盘上的数据块是不连续的,或者是设备文件,那么文件在Page Cache中对应的页就是Buffer Cache

  • 查看内存情况

    $ # cat /proc/meminfo                     
    MemTotal:        3876772 kB
    MemFree:          126704 kB
    MemAvailable:     137132 kB
    Buffers:              48 kB
    Cached:           258648 kB
    SwapCached:        12344 kB
    ...省略...
    
    Buffers: 表示`Buffer Cache`的容量
    Cached: 表示位于物理内存中的页缓存`Page Cache`
    SwapCached:表示位于磁盘交换区的页缓存`Page Cache`
    实际的`Page Cache`容量=Cached+SwapCached
    
  • linux底层提供mmap将文件映射进虚拟内存,对文件的读写变成对内存的读写,能充分利用Page Cache,但是如果对文件进行随机读写,会使虚拟内存产生很多缺页(Page Fault)中断,此时操作系统需要将磁盘文件的数据再次加载到Page Cache,这个过程比较慢。如果对文件进行顺序读写,读和写的区域都是被操作系统缓存过的热点区域,不会产生大量的缺页中断,文件的读写操作相当于直接内存的操作,性能会提升很多。如果内存不够充足,内核把内存分配给Page Cache后,空闲内存会变少,如果程序有新的内存分配或者缺页中断,但是空闲内存不够,内核需要花费时间将热度低的Page Cache内存回收掉,此时性能会下降。当遇到操作系统进行脏页回写,内存回收,内存换入换出等情形时,会产生较大的读写延迟,造成存储引擎偶发的高延迟,针对这种现象,RocketMQ采用了多种优化技术,比如内存预分配,文件预热,mlock系统调用,读写分离等,来保证利用Page Cache优点的同时,消除其带来的延迟。

  • 工具查看:hcache是基于pcstat,pcstat可以查看文件是否被缓存和根据pid来查看缓存了哪些文件,hcachepcstat的增强版本,增加了查看整个系统Cache和根据Cache大小排序的功能:

    查看使用Cache最多的3个进程
    $ hcache --top 3  
    +----------------------------------+--------------+-------+--------+---------+
    | Name                             | Size (bytes) | Pages | Cached | Percent |
    |----------------------------------+--------------+-------+--------+---------|
    | /usr/share/atom/atom             | 81137776     | 19810 | 19785  | 099.874 |
    | /usr/bin/dockerd                 | 68608880     | 16751 | 14321  | 085.493 |
    | /usr/share/atom/snapshot_blob.bin| 54619240     | 13335 | 13335  | 100.000 |
    +----------------------------------+--------------+-------+--------+---------+
    

1.2 CommitLog

  • RocketMQ Broker单个实例下所有的Topic都使用同一个日志数据文件(CommitLog)来存储(即单个实例消息整体有序),这点与kafka不同(kafka采用每个分区一个日志文件存储)

  • CommitLog单个文件大小默认1G,文件文件名是起始偏移量,总共20位,左边补零,起始偏移量是0。假设文件按照默认大小1G来算

    • 第一个文件的文件名为00000000000000000000 ,当第一个文件被写满之后,开始写入第二个文件
    • 第二个文件的文件名为00000000001073741824 ,1G=1073741824=1024*1024*1024
    • 第三个文件的文件名是00000000002147483648,(文件名相差1G=1073741824=1024*1024*1024)
  • CommitLog按照上述命名的好处是给出任意一个消息的物理偏移量,可以通过二分法进行查找,快速定位这个文件的位置,然后用消息物理偏移量减去所在文件的名称,得到的差值就是在该文件中的绝对地址

1.3 ConsumeQueue

ConsumeQueue是消息消费队列,它是一个逻辑队列,相当于CommitLog的索引文件。因为RocketMQ的队列不存储任何实际数据,它只存储CommitLog中的【起始物理位置偏移量,消息的内容大小,消息Tag的哈希值】,每一个ConsumeQueue存储的格式如下,总共20B。存tag是为了在消费者取到消息offset后先根据tag做一次过滤,剩下的才需要到CommitLog中取消息详情:

RocketMQ源码8-broker存储文件组织和内存映射

每个ConsumeQueue都有一个queueId,queueId 的值为0到TopicConfig配置的队列数量。比如某个Topic的消费队列数量为4,那么四个ConsumeQueuequeueId就分别为0、1、2、3。

消费者消费时会先从ConsumeQueue中查找消息在CommitLog中的offset,再去CommitLog中找原始消息数据。如果某个消息只在CommitLog中有数据,没在ConsumeQueue中, 则消费者无法消费

ConsumeQueue类对应的是每个topic和queuId下面的所有文件。默认存储路径是$HOME/store/consumequeue/{topic}/{queueId}/{fileName},每个文件由30w条数据组成,单个文件的大小是30w x 20Byte,即每个文件为600w字节,单个消费队列的文件大小约为5.722M=(600w/(1024*1024))

1.4 Index文件

IndexFile:索引文件,物理存储上,文件名为创建时间的时间戳命名,固定的单个IndexFile文件大小约为400M,一个IndexFile可以保存2000W个索引

IndexFile(索引文件)由IndexHeader(索引文件头), Slot(槽位)和Index(消息的索引内容)三部分构成。对于每个IndexFile来说IndexHeader是固定大小的,Slot是索引的目录,用于定位IndexIndexFile中存储的物理位置。存储图:

RocketMQ源码8-broker存储文件组织和内存映射

1.5 checkpoint文件

checkpoint检查点文件的作用是记录CommitLog、ConsumeQueue、Index文件的刷盘时间点,文件固定长度为4kb,只用该文件的前24个字节

  • physicMsgTimestamp:CommitLog文件刷盘时间点
  • logicsMsgTimestamp:ConsumeQueue文件刷盘时间点
  • indexMsgTimestamp:Index文件刷盘时间点

1.6 TransientStorePool机制

RocketMQ为了降低PageCache的使用压力,引入了transientStorePoolEnable机制,即内存级别的读写分离机制

默认情况,RocketMQ将消息写入PageCache,消费时从PageCache中读取消息。但是这样在高并发下PageCache压力会比较大,容易出现瞬时broker busy异常。RocketMQ通过开启transientStorePoolEnable=true,将消息写入堆外内存并立即返回,然后异步将堆外内存中的数据批量提交到PageCache,再异步刷盘到磁盘中。这样的好处就是形成内存级别的读写分离,发送写入消息是向堆外内存,消费读取消息是从PageCache

该机制的缺点就是如果意外导致broker进程异常退出,已经放入到PageCache中的数据不会丢失,而存储在堆外内存的数据会丢失

2. MappedFileQueue

RocketMQ使用MappedFile、MappedFileQueue来封装存储文件。MappedFileQueueMappedFile的管理容器,使用CopyOnWriteArrayList来管理所有的MappedFileMappedFileQueue提供查找目录下MappedFile的方法。MappedFileQueue核心属性:

/**
 * 1.MappedFile组成的队列
 *  * 2.包括CommitLog(消息主题以及元数据) ConsumerQueue逻辑队列
 */
public class MappedFileQueue {
    private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.STORE_LOGGER_NAME);
    private static final InternalLogger LOG_ERROR = InternalLoggerFactory.getLogger(LoggerName.STORE_ERROR_LOGGER_NAME);

    private static final int DELETE_FILES_BATCH_MAX = 10;

    // 存储目录
    private final String storePath;

    // 单个文件的存储大小
    private final int mappedFileSize;

    // MappedFile集合
    private final CopyOnWriteArrayList<MappedFile> mappedFiles = new CopyOnWriteArrayList<MappedFile>();

    // 创建MappedFile服务类
    private final AllocateMappedFileService allocateMappedFileService;

    // 当前刷盘指针,表示该指针之前的所有数据全部持久化到磁盘
    private long flushedWhere = 0;
    // 当前数据提交指针,内存中ByteBuffer当前的写指针,该值大于、等于flushedWhere
    // (write >= commit >= flush位置)
    private long committedWhere = 0;
    //当前已刷盘的最后一条消息存储的时间戳
    private volatile long storeTimestamp = 0;

    public MappedFileQueue(final String storePath, int mappedFileSize,
        AllocateMappedFileService allocateMappedFileService) {
        this.storePath = storePath;
        this.mappedFileSize = mappedFileSize;
        this.allocateMappedFileService = allocateMappedFileService;
    }
    ...
}

2.1 获取第一和最后一个MappedFile

获取第一个MappedFile

/**
 * 返回队列中第一个MappedFile,这里忽略索引越界异常,可能一个都没有,返回null
 * 先判断mappedFiles是否为空,然后get(0),因为存在并发,所以需要即使判断为空,还是可能索引越界
 * @return
 */
public MappedFile getFirstMappedFile() {
    MappedFile mappedFileFirst = null;

    if (!this.mappedFiles.isEmpty()) {
        try {
            mappedFileFirst = this.mappedFiles.get(0);
        } catch (IndexOutOfBoundsException e) {
            //ignore
        } catch (Exception e) {
            log.error("getFirstMappedFile has exception.", e);
        }
    }

    return mappedFileFirst;
}

获取最后一个MappedFile

public MappedFile getLastMappedFile() {
    MappedFile mappedFileLast = null;

    while (!this.mappedFiles.isEmpty()) {
        try {
            //由于get和size没有加锁
            // size获取的值可能是旧的,所以可能出现错误的大小,导致索引越界
            // get获取的值可能是旧的数组,所以可能出现索引越界
            mappedFileLast = this.mappedFiles.get(this.mappedFiles.size() - 1);
            break;
        } catch (IndexOutOfBoundsException e) {
            //continue;
        } catch (Exception e) {
            log.error("getLastMappedFile has exception.", e);
            break;
        }
    }

    return mappedFileLast;
}

通过起始偏移量,获取最后一个MappedFile,如果不存在,可自动创建

public MappedFile getLastMappedFile(final long startOffset, boolean needCreate) {
    long createOffset = -1;
    //最后一个映射文件
    MappedFile mappedFileLast = getLastMappedFile();

    if (mappedFileLast == null) {
        //如果没有映射文件就 创建开始的offset
        createOffset = startOffset - (startOffset % this.mappedFileSize);
    }

    if (mappedFileLast != null && mappedFileLast.isFull()) {
        createOffset = mappedFileLast.getFileFromOffset() + this.mappedFileSize;
    }

    //创建新的MappedFile
    if (createOffset != -1 && needCreate) {
        //文件名
        String nextFilePath = this.storePath + File.separator + UtilAll.offset2FileName(createOffset);
        String nextNextFilePath = this.storePath + File.separator
            + UtilAll.offset2FileName(createOffset + this.mappedFileSize);
        MappedFile mappedFile = null;

        if (this.allocateMappedFileService != null) {
            mappedFile = this.allocateMappedFileService.putRequestAndReturnMappedFile(nextFilePath,
                nextNextFilePath, this.mappedFileSize);
        } else {
            try {
                mappedFile = new MappedFile(nextFilePath, this.mappedFileSize);
            } catch (IOException e) {
                log.error("create mappedFile exception", e);
            }
        }

        //添加到队列
        if (mappedFile != null) {
            if (this.mappedFiles.isEmpty()) {
                //标识第一个文件
                mappedFile.setFirstCreateInQueue(true);
            }
            this.mappedFiles.add(mappedFile);
        }

        return mappedFile;
    }

    return mappedFileLast;
}

2.2 获取最小最大偏移量

  • 最小:获取第一个MappedFile,然后获取其起始偏移量
  • 最大:获取最后一个MappedFile,然后【起始偏移量】+【可读位置】
/**
 * 获取存储文件最小偏移量。从这里也可以看出,并不是直接返回
 * 0,而是返回MappedFile的getFileFormOffset()方法
 */
public long getMinOffset() {

    if (!this.mappedFiles.isEmpty()) {
        try {
            return this.mappedFiles.get(0).getFileFromOffset();
        } catch (IndexOutOfBoundsException e) {
            //continue;
        } catch (Exception e) {
            log.error("getMinOffset has exception.", e);
        }
    }
    return -1;
}

/**
 * 获取存储文件的最大偏移量。返回最后一个MappedFile的
 * fileFromOffset,加上MappedFile当前的读指针
 */
public long getMaxOffset() {
    MappedFile mappedFile = getLastMappedFile();
    if (mappedFile != null) {
        return mappedFile.getFileFromOffset() + mappedFile.getReadPosition();
    }
    return 0;
}

/**
 * 返回存储文件当前的写指针。返回最后一个文件的
 * fileFromOffset,加上当前写指针位置
 */
public long getMaxWrotePosition() {
    MappedFile mappedFile = getLastMappedFile();
    if (mappedFile != null) {
        return mappedFile.getFileFromOffset() + mappedFile.getWrotePosition();
    }
    return 0;
}

/**
 *还有多少字节等待commit的(wrote与commit位置之差)
 */
public long remainHowManyDataToCommit() {
    return getMaxWrotePosition() - committedWhere;
}

2.3 根据时间戳查询MappedFile

根据【消息存储的时间戳】查询。如果this.mappedFiles为空,则直接返回null。如果不为空,则从第一个文件开始查找,找到第一个最后一次更新时间大于查找时间戳的文件,如果不存在,则返回最后一个MappedFile文件。

/**
 * 根据消息存储时间戳查找MappdFile
 *
 * 从MappedFile列表中第一个
 * 文件开始查找,找到第一个最后一次更新时间大于待查找时间戳的文
 * 件,如果不存在,则返回最后一个MappedFile
 */
public MappedFile getMappedFileByTime(final long timestamp) {
    Object[] mfs = this.copyMappedFiles(0);

    if (null == mfs)
        return null;

    for (int i = 0; i < mfs.length; i++) {
        MappedFile mappedFile = (MappedFile) mfs[i];
        if (mappedFile.getLastModifiedTimestamp() >= timestamp) {
            return mappedFile;
        }
    }

    return (MappedFile) mfs[mfs.length - 1];
}

private Object[] copyMappedFiles(final int reservedMappedFiles) {
    Object[] mfs;

    if (this.mappedFiles.size() <= reservedMappedFiles) {
        return null;
    }

    mfs = this.mappedFiles.toArray();
    return mfs;
}

2.4 根据消息存储的偏移量查询MappedFile

根据【消息存储的偏移量】查询

/**
 * Finds a mapped file by offset.
 *
 * @param offset Offset.
 * @param returnFirstOnNotFound If the mapped file is not found, then return the first one.
 * @return Mapped file or null (when not found and returnFirstOnNotFound is <code>false</code>).
 *
 * 根据消息偏移量offset查找MappedFile,但是不能直接使用
 * offset%mappedFileSize。这是因为使用了内存映射,只要是存在于存
 * 储目录下的文件,都需要对应创建内存映射文件,如果不定时将已消
 * 费的消息从存储文件中删除,会造成极大的内存压力与资源浪费,所
 * 以RocketMQ采取定时删除存储文件的策略。也就是说,在存储文件
 * 中,第一个文件不一定是00000000000000000000,因为该文件在某一
 * 时刻会被删除,所以根据offset定位MappedFile的算法为
 * (int)((offset/this.mappedFileSize)-(mappedFile.getFileFromOffset()/this.MappedFileSize))
 */
public MappedFile findMappedFileByOffset(final long offset, final boolean returnFirstOnNotFound) {
    try {
        MappedFile firstMappedFile = this.getFirstMappedFile();
        MappedFile lastMappedFile = this.getLastMappedFile();
        if (firstMappedFile != null && lastMappedFile != null) {
            if (offset < firstMappedFile.getFileFromOffset() || offset >= lastMappedFile.getFileFromOffset() + this.mappedFileSize) {
                LOG_ERROR.warn("Offset not matched. Request offset: {}, firstOffset: {}, lastOffset: {}, mappedFileSize: {}, mappedFiles count: {}",
                    offset,
                    firstMappedFile.getFileFromOffset(),
                    lastMappedFile.getFileFromOffset() + this.mappedFileSize,
                    this.mappedFileSize,
                    this.mappedFiles.size());
            } else {
                // todo
                int index = (int) ((offset / this.mappedFileSize) - (firstMappedFile.getFileFromOffset() / this.mappedFileSize));
                MappedFile targetFile = null;
                try {
                    targetFile = this.mappedFiles.get(index);
                } catch (Exception ignored) {
                }

                if (targetFile != null && offset >= targetFile.getFileFromOffset()
                    && offset < targetFile.getFileFromOffset() + this.mappedFileSize) {
                    return targetFile;
                }

                for (MappedFile tmpMappedFile : this.mappedFiles) {
                    if (offset >= tmpMappedFile.getFileFromOffset()
                        && offset < tmpMappedFile.getFileFromOffset() + this.mappedFileSize) {
                        return tmpMappedFile;
                    }
                }
            }

            if (returnFirstOnNotFound) {
                return firstMappedFile;
            }
        }
    } catch (Exception e) {
        log.error("findMappedFileByOffset Exception", e);
    }

    return null;
}

2.5 重置offset

offset以后的MappedFile都清除掉,在当前4.3.1版本中存在bug,貌似也没有使用

/**
 * 将offset以后的MappedFile都清除掉
 */
public boolean resetOffset(long offset) {
    MappedFile mappedFileLast = getLastMappedFile();

    if (mappedFileLast != null) {
        // 最后一个MappedFile的【起始偏移量】+ 【写入PageCache的位置】
        long lastOffset = mappedFileLast.getFileFromOffset() +
            mappedFileLast.getWrotePosition();
        // 最后的写入位置与offset的差值,如果大于2个MappedFile大小,就不做重置
        long diff = lastOffset - offset;

        final int maxDiff = this.mappedFileSize * 2;
        if (diff > maxDiff)
            return false;
    }

    ListIterator<MappedFile> iterator = this.mappedFiles.listIterator();

    while (iterator.hasPrevious()) {
        mappedFileLast = iterator.previous();
        if (offset >= mappedFileLast.getFileFromOffset()) {
            // 定位到offset在第几个MappedFile中
            int where = (int) (offset % mappedFileLast.getFileSize());
            // 重置最后一个MappedFile的位置
            mappedFileLast.setFlushedPosition(where);
            mappedFileLast.setWrotePosition(where);
            mappedFileLast.setCommittedPosition(where);
            break;
        } else {
            // 如果offset小于当前的MappedFile的起始偏移量,则直接删除MappedFile
            iterator.remove();
        }
    }
    return true;
}

3. MappedFile

MappedFileRocketMQ内存映射文件的具体实现。将消息字节写入Page Cache缓冲区中(commit方法),或者将消息刷入磁盘(flush)。CommitLog consumerQueue、index三类文件磁盘的读写都是通过MappedFile

MappedFile的核心属性:

  • wrotePosition:保存当前文件所映射到的消息写入page cache的位置
  • flushedPosition:保存刷盘的最新位置
  • wrotePositionflushedPosition的初始化值为0,一条1k大小的消息送达,当消息commit也就是写入page cache以后,wrotePosition的值为1024 * 1024;如果消息刷盘以后,则flushedPosition也是1024 * 1024;另外一条1k大小的消息送达,当消息commit时,wrotePosition的值为1024 * 1024 + 1024 * 1024,同样,消息刷盘后,flushedPosition的值为1024 * 1024 + 1024 * 1024
public class MappedFile extends ReferenceResource {
    // 操作系统每页大小,默认4KB
    public static final int OS_PAGE_SIZE = 1024 * 4;
    protected static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.STORE_LOGGER_NAME);

    // 当前JVM实例中MappedFile的虚拟内存
    private static final AtomicLong TOTAL_MAPPED_VIRTUAL_MEMORY = new AtomicLong(0);

    // 当前JVM实例中
    // MappedFile对象个数
    private static final AtomicInteger TOTAL_MAPPED_FILES = new AtomicInteger(0);
    // 当前文件的写指针,从0开始(内存映射文件中的写指针)
    protected final AtomicInteger wrotePosition = new AtomicInteger(0);
    // 当前文件的提交指针,如果开启transientStore-PoolEnable,则数据会存储在
    //TransientStorePool中,然后提交到内存映射ByteBuffer中,再写入磁盘
    protected final AtomicInteger committedPosition = new AtomicInteger(0);
    // 将该指针之前的数据持久化存储到磁盘中
    private final AtomicInteger flushedPosition = new AtomicInteger(0);
    // 文件大小
    protected int fileSize;
    // 文件通道
    protected FileChannel fileChannel;
    /**
     * Message will put to here first, and then reput to FileChannel if writeBuffer is not null.
     */
    // 堆外内存ByteBuffer,如果不为空,数据首先将存储在该Buffer中,然后提交到MappedFile创建的
    //FileChannel中。transientStorePoolEnable为true时不为空
    protected ByteBuffer writeBuffer = null;
    // 堆外内存池,该内存池中的内存会提供内存锁机制。transientStorePoolEnable为true时启用
    protected TransientStorePool transientStorePool = null;
    // 文件名称
    private String fileName;
    // 该文件的初始偏移量
    private long fileFromOffset;
    // 物理文件
    private File file;
    // 物理文件对应的内存映射Buffer
    private MappedByteBuffer mappedByteBuffer;
    // 文件最后一次写入内容的时间
    private volatile long storeTimestamp = 0;
    // 是否是MappedFileQueue队列中第一个文件。
    private boolean firstCreateInQueue = false;
}

3.1 构造方法

根据transientStorePoolEnable是否为true调用不同的构造方法。

  • transientStorePoolEnable=true(只在异步刷盘情况下生效)表示将内容先保存在堆外内存中。TransientStorePool会通过ByteBuffer.allocateDirect调用直接申请堆外内存,消息数据在写入内存的时候是写入预申请的内存中
  • 通过Commit线程将数据提交到FileChannel
  • 在异步刷盘的时候,再由刷盘线程(Flush线程)将数据持久化到磁盘文件。

构造方法源码:

/**
 * 如果设置transientStorePoolEnable为false则调用此方法,参见
 * org.apache.rocketmq.store.AllocateMappedFileService#mmapOperation()
 */
public MappedFile(final String fileName, final int fileSize) throws IOException {
    init(fileName, fileSize);
}
/**
 * 如果设置transientStorePoolEnable为true则调用此方法,参见
 *org.apache.rocketmq.store.config.MessageStoreConfig#isTransientStorePoolEnable()
 * org.apache.rocketmq.store.AllocateMappedFileService#mmapOperation()
 */
public MappedFile(final String fileName, final int fileSize,
    final TransientStorePool transientStorePool) throws IOException {
    init(fileName, fileSize, transientStorePool);
}

public void init(final String fileName, final int fileSize,
    final TransientStorePool transientStorePool) throws IOException {
    init(fileName, fileSize);
    //如果transientStorePoolEnable为true,则初始化MappedFile的
    //writeBuffer,该buffer从transientStorePool中获取
    this.writeBuffer = transientStorePool.borrowBuffer();
    this.transientStorePool = transientStorePool;
}

FileChannel提供了map()方法把文件映射到虚拟内存,通常情况可以映射整个文件,如果文件比较大,可以进行分段映射,RocketMQ这里映射大小为(0,fileSize)。当通过map()方法建立映射关系之后,就不依赖于用于创建映射的FileChannel。特别是,关闭通道(Channel)对映射的有效性没有影响。MappedFile的初始化(init)方法,初始化MappedByteBuffer,模式为MapMode.READ_WRITE(读/写),此模式对缓冲区的更改最终将写入文件;但该更改对映射到同一文件的其他程序不一定是可见的。

private void init(final String fileName, final int fileSize) throws IOException {
    this.fileName = fileName;
    this.fileSize = fileSize;
    this.file = new File(fileName);
    //通过文件名获取起始偏移量
    this.fileFromOffset = Long.parseLong(this.file.getName());
    boolean ok = false;
    //确保父目录存在
    ensureDirOK(this.file.getParent());

    try {
        this.fileChannel = new RandomAccessFile(this.file, "rw").getChannel();
        this.mappedByteBuffer = this.fileChannel.map(MapMode.READ_WRITE, 0, fileSize);
        TOTAL_MAPPED_VIRTUAL_MEMORY.addAndGet(fileSize);
        TOTAL_MAPPED_FILES.incrementAndGet();
        ok = true;
    } catch (FileNotFoundException e) {
        log.error("create file channel " + this.fileName + " Failed. ", e);
        throw e;
    } catch (IOException e) {
        log.error("map file " + this.fileName + " Failed. ", e);
        throw e;
    } finally {
        if (!ok && this.fileChannel != null) {
            this.fileChannel.close();
        }
    }
}

修改MappedByteBuffer实际会将数据写入文件对应的Page Cache中,而TransientStorePool方案下写入的则为纯粹的内存。因此在消息写入操作上会更快,因此能更少的占用CommitLog.putMessageLock锁,从而能够提升消息处理量。使用TransientStorePool方案的缺陷主要在于在异常崩溃的情况下会丢失更多的消息。

3.2 追加内容

追加就是将消息内容追加到映射文件中,并且记录更新时间和写的位置

public AppendMessageResult appendMessage(final MessageExtBrokerInner msg, final AppendMessageCallback cb) {
    // todo
    return appendMessagesInner(msg, cb);
}

public AppendMessageResult appendMessages(final MessageExtBatch messageExtBatch, final AppendMessageCallback cb) {
    return appendMessagesInner(messageExtBatch, cb);
}

/**
 * 将消息追加到MappedFile文件中
 */
public AppendMessageResult appendMessagesInner(final MessageExt messageExt, final AppendMessageCallback cb) {
    assert messageExt != null;
    assert cb != null;

    // 获取MappedFile当前文件写指针
    int currentPos = this.wrotePosition.get();

    // 如果currentPos小于文件大小
    if (currentPos < this.fileSize) {
        /**
         * RocketMQ提供两种数据落盘的方式:
         * 1. 直接将数据写到mappedByteBuffer, 然后flush;
         * 2. 先写到writeBuffer, 再从writeBuffer提交到fileChannel, 最后flush.
         */
        ByteBuffer byteBuffer = writeBuffer != null ? writeBuffer.slice() : this.mappedByteBuffer.slice();
        byteBuffer.position(currentPos);
        AppendMessageResult result;
        // 单个消息
        if (messageExt instanceof MessageExtBrokerInner) {
            // todo 追加消息
            result = cb.doAppend(this.getFileFromOffset(), byteBuffer, this.fileSize - currentPos, (MessageExtBrokerInner) messageExt);
        // 批量消息
        } else if (messageExt instanceof MessageExtBatch) {
            result = cb.doAppend(this.getFileFromOffset(), byteBuffer, this.fileSize - currentPos, (MessageExtBatch) messageExt);
        } else {
            return new AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR);
        }
        this.wrotePosition.addAndGet(result.getWroteBytes());
        this.storeTimestamp = result.getStoreTimestamp();
        return result;
    }
    // 如果currentPos大于或等于文件大小,表明文件已写满,抛出异常
    log.error("MappedFile.appendMessage return null, wrotePosition: {} fileSize: {}", currentPos, this.fileSize);
    return new AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR);
}

3.3 提交(commit)

commit()方法:内存映射的提交,commit操作主要针对异步刷盘模式

commitLeastPages 为本次提交最小的页数,如果待提交数据不满commitLeastPages(默认4*4kb),则不执行本次提交操作,待下次提交。commit的作用就是将writeBuffer 中的数据提交到FileChannel中。

/**
 * commitLeastPages 为本次提交最小的页面,默认4页(4*4KB),可参见
 * org.apache.rocketmq.store.CommitLog.CommitRealTimeService#run()
 */
public int commit(final int commitLeastPages) {
    /**
     * 1.writeBuffer 为空就不提交,而writeBuffer只有开启
     * transientStorePoolEnable为true并且是异步刷盘模式才会不为空
     * 所以commit是针对异步刷盘使用的
     */
    if (writeBuffer == null) {
        //no need to commit data to file channel, so just regard wrotePosition as committedPosition.
        return this.wrotePosition.get();
    }
    if (this.isAbleToCommit(commitLeastPages)) {
        if (this.hold()) {
            commit0(commitLeastPages);
            this.release();
        } else {
            log.warn("in commit, hold failed, commit offset = " + this.committedPosition.get());
        }
    }

    // All dirty data has been committed to FileChannel.
    if (writeBuffer != null && this.transientStorePool != null && this.fileSize == this.committedPosition.get()) {
        //清理工作,归还到堆外内存池中,并且释放当前writeBuffer
        this.transientStorePool.returnBuffer(writeBuffer);
        this.writeBuffer = null;
    }

    return this.committedPosition.get();
}

protected void commit0(final int commitLeastPages) {
    int writePos = this.wrotePosition.get();
    int lastCommittedPosition = this.committedPosition.get();

    if (writePos - this.committedPosition.get() > 0) {
        try {
            //创建writeBuffer的共享缓存区,slice共享内存,其实就是切片
            //但是position、mark、limit单独维护
            //新缓冲区的position=0,其capacity和limit将是缓冲区中剩余的字节数,其mark=undefined
            ByteBuffer byteBuffer = writeBuffer.slice();
            //上一次的提交指针作为position
            byteBuffer.position(lastCommittedPosition);
            //当前最大的写指针作为limit
            byteBuffer.limit(writePos);
            //把commitedPosition到wrotePosition的写入FileChannel中
            this.fileChannel.position(lastCommittedPosition);
            this.fileChannel.write(byteBuffer);
            //更新提交指针
            this.committedPosition.set(writePos);
        } catch (Throwable e) {
            log.error("Error occurred when commit data to FileChannel.", e);
        }
    }
}

/**
 * 是否能够flush
 *  1. 文件已经写满
 *  2. flushLeastPages > 0 && 未flush部分超过flushLeastPages
 *  3. flushLeastPages==0&&有新写入的部分
 * @param flushLeastPages flush最小分页
 *      mmap映射后的内存一般是内存页大小的倍数,而内存页大小一般为4K,所以写入到映射内存的数据大小可以以4K进行分页,
 *      而flushLeastPages这个参数只是指示写了多少页后才可以强制将映射内存区域的数据强行写入到磁盘文件
 * @return
 */
protected boolean isAbleToCommit(final int commitLeastPages) {
    int flush = this.committedPosition.get();
    int write = this.wrotePosition.get();
		// 如果文件满了(文件大小与写入位置一样),则返回true
    if (this.isFull()) {
        return true;
    }

    if (commitLeastPages > 0) {
        //总共写入的页大小-已经提交的页大小>=最少一次写入的页大小,OS_PAGE_SIZE默认4kb
        return ((write / OS_PAGE_SIZE) - (flush / OS_PAGE_SIZE)) >= commitLeastPages;
    }

    return write > flush;
}

public synchronized boolean hold() {
    if (this.isAvailable()) {
        if (this.refCount.getAndIncrement() > 0) {
            return true;
        } else {
            this.refCount.getAndDecrement();
        }
    }

    return false;
}

public void release() {
    long value = this.refCount.decrementAndGet();
    if (value > 0)
        return;

    synchronized (this) {
        //如果引用计数等于0,则执行清理堆外内存
        this.cleanupOver = this.cleanup(value);
    }
}

3.4 刷盘(flush)

flush操作是将内存中的数据永久的写入磁盘。刷写磁盘是直接调用MappedByteBufferFileChannelforce()将内存中的数据持久化到磁盘。

/**
 * @return The current flushed position
 *
 * 刷盘指的是将内存中的数据写入磁盘,永久存储在磁盘中
 *
 */
public int flush(final int flushLeastPages) {
    if (this.isAbleToFlush(flushLeastPages)) {
        if (this.hold()) {
            // todo flushedPosition应该等于MappedByteBuffer中的写指针
            int value = getReadPosition();

            try {
                //We only append data to fileChannel or mappedByteBuffer, never both.
                if (writeBuffer != null || this.fileChannel.position() != 0) {
                    // todo 将内存中数据持久化到磁盘
                    this.fileChannel.force(false);
                } else {
                    //
                    this.mappedByteBuffer.force();
                }
            } catch (Throwable e) {
                log.error("Error occurred when force data to disk.", e);
            }
            // 设置
            this.flushedPosition.set(value);
            this.release();
        } else {
            log.warn("in flush, hold failed, flush offset = " + this.flushedPosition.get());
            this.flushedPosition.set(getReadPosition());
        }
    }
    return this.getFlushedPosition();
}

3.5 预热(warm)

对当前映射文件进行预热:

  • 第一步:对当前映射文件的每个内存页写入一个字节0。当刷盘策略为同步刷盘时,执行强制刷盘,并且是每修改pages(默认是16MB)个分页刷一次盘
  • 第二步:将当前MappedFile全部的地址空间锁定在物理存储中,防止其被交换到swap空间。再调用madvise,传入 MADV_WILLNEED 策略,将刚刚锁住的内存预热,其实就是告诉内核, 我马上就要用(MADV_WILLNEED)这块内存,先做虚拟内存到物理内存的映射,防止正式使用时产生缺页中断。

使用mmap()内存分配时,只是建立了进程虚拟地址空间,并没有分配虚拟内存对应的物理内存。当进程访问这些没有建立映射关系的虚拟内存时,处理器自动触发一个缺页异常,进而进入内核空间分配物理内存、更新进程缓存表,最后返回用户空间,恢复进程运行。写入假值0的意义在于实际分配物理内存,在消息写入时防止缺页异常

源码

/**
 * 1. 对当前映射文件进行预热
 *   1.1. 先对当前映射文件的每个内存页写入一个字节0.当刷盘策略为同步刷盘时,执行强制刷盘,并且是每修改pages个分页刷一次盘
 *  再将当前MappedFile全部的地址空间锁定,防止被swap
 *   1.2. 然后将当前MappedFile全部的地址空间锁定在物理存储中,防止其被交换到swap空间。再调用madvise,传入 WILL_NEED 策略,将刚刚锁住的内存预热,其实就是告诉内核,我马上就要用(WILL_NEED)这块内存,先做虚拟内存到物理内存的映射,防止正式使用时产生缺页中断。
 *  2. 只要启用缓存预热,都会通过mappedByteBuffer来写入假值(字节0),并且都会对mappedByteBuffer执行mlock和madvise。
 * @param type 刷盘策略
 * @param pages 预热时一次刷盘的分页数
 */
public void warmMappedFile(FlushDiskType type, int pages) {
    long beginTime = System.currentTimeMillis();
    ByteBuffer byteBuffer = this.mappedByteBuffer.slice();
    //上一次刷盘的位置
    int flush = 0;
    long time = System.currentTimeMillis();
    for (int i = 0, j = 0; i < this.fileSize; i += MappedFile.OS_PAGE_SIZE, j++) {
        byteBuffer.put(i, (byte) 0);
        // force flush when flush disk type is sync
        if (type == FlushDiskType.SYNC_FLUSH) {
            /**
             *  同步刷盘,每修改pages个分页强制刷一次盘,默认16MB
             * 参见org.apache.rocketmq.store.config.MessageStoreConfig#flushLeastPagesWhenWarmMapedFile
             */
            if ((i / OS_PAGE_SIZE) - (flush / OS_PAGE_SIZE) >= pages) {
                flush = i;
                //FIXME 刷入修改的内容,不会有性能问题??
                mappedByteBuffer.force();
            }
        }

        // prevent gc
        if (j % 1000 == 0) {
            log.info("j={}, costTime={}", j, System.currentTimeMillis() - time);
            time = System.currentTimeMillis();
            try {
                /***
                 * Thread.yield与Thread.sleep(0);相同,jvm底层使用的就是os::yield();
                 * https://www.jianshu.com/p/0964124ae822
                 * openJdk源码thread.c jvm.cpp
                 */
                Thread.sleep(0);
            } catch (InterruptedException e) {
                log.error("Interrupted", e);
            }
        }
    }

    // force flush when prepare load finished
    if (type == FlushDiskType.SYNC_FLUSH) {
        log.info("mapped file warm-up done, force to disk, mappedFile={}, costTime={}",
            this.getFileName(), System.currentTimeMillis() - beginTime);
        mappedByteBuffer.force();
    }
    log.info("mapped file warm-up done. mappedFile={}, costTime={}", this.getFileName(),
        System.currentTimeMillis() - beginTime);

    this.mlock();
}

3.6 预读(mlock与munlock)

在读取CommitLog时,虽然可以通过PageCache提高目标消息直接在物理内存中读取的命中率。但是由于CommitLog存放的是所有Topic的消息,在读取时是随机访问,所以仍会出现缺页中断问题,导致内存被频繁换入换出。为此,RocketMQ使用了mlock系统调用,将mmap调用后所占用的堆外内存锁定,变为常驻内存,进一步让目标消息更多的在内存中读取。

mlock这个方法是一个Native级别的调用,调用了标准C库的方法 mlock方法。在标准C中的实现是将锁住指定的内存区域避免被操作系统调到swap空间中,

通过mmap建立的内存文件,在开始时只是建立一个映射关系,当读取相应区域的时候,第一次还是会去读磁盘,后续读写基本上与Page Cache交互。当读相对应页没有拿到数据的时候,系统将会产生一个缺页异常。madvise的作用是一次性先将一段数据读入到映射内存区域,这样就减少了缺页异常的产生, 不过mlock和madvise在windows下的C库没有

madvise系统调用有两个参数:地址指针、区间长度。madvise会向内核提供一个针对于进程虚拟地址区间的I/O建议,内核可能会采纳这个建议,进行预读。

RocketMQ使用net.java.dev.jna:jna:4.2.2,自己创建一个 LibC类继承Library

import com.sun.jna.Library;
import com.sun.jna.Native;
import com.sun.jna.NativeLong;
import com.sun.jna.Platform;
import com.sun.jna.Pointer;

public interface LibC extends Library {
    LibC INSTANCE = (LibC) Native.loadLibrary(Platform.isWindows() ? "msvcrt" : "c", LibC.class);

    int MADV_WILLNEED = 3;
    int MADV_DONTNEED = 4;

    int MCL_CURRENT = 1;
    int MCL_FUTURE = 2;
    int MCL_ONFAULT = 4;

    /* sync memory asynchronously */
    int MS_ASYNC = 0x0001;
    /* invalidate mappings & caches */
    int MS_INVALIDATE = 0x0002;
    /* synchronous memory sync */
    int MS_SYNC = 0x0004;

    int mlock(Pointer var1, NativeLong var2);

    int munlock(Pointer var1, NativeLong var2);

    int madvise(Pointer var1, NativeLong var2, int var3);

    Pointer memset(Pointer p, int v, long len);

    int mlockall(int flags);

    int msync(Pointer p, NativeLong length, int flags);
}

调用mmap()时内核只是建立了逻辑地址到物理地址的映射表,并没有映射任何数据到内存。 在你要访问数据时内核会检查数据所在分页是否在内存,如果不在,则发出一次缺页中断, linux默认分页为4K,1G的消息存储文件要发生很多次中断。

解决办法:将madvise()mmap()搭配起来使用,在使用数据前告诉内核这一段数据需要使用,将其一次读入内存。 madvise()这个函数可以对映射的内存提出使用建议,从而减少在程序运行时的硬盘缺页中断。

mlock和munlock源码:

public void mlock() {
    final long beginTime = System.currentTimeMillis();
    final long address = ((DirectBuffer) (this.mappedByteBuffer)).address();
    Pointer pointer = new Pointer(address);
    {
        // 内存锁定
        // 通过mlock可以将进程使用的部分或者全部的地址空间锁定在物理内存中,防止其被交换到swap空间。
        // 对时间敏感的应用会希望全部使用物理内存,提高数据访问和操作的效率。
        int ret = LibC.INSTANCE.mlock(pointer, new NativeLong(this.fileSize));
        log.info("mlock {} {} {} ret = {} time consuming = {}", address, this.fileName, this.fileSize, ret, System.currentTimeMillis() - beginTime);
    }

    {
        //文件预读
        //madvise 一次性先将一段数据读入到映射内存区域,这样就减少了缺页异常的产生。
        int ret = LibC.INSTANCE.madvise(pointer, new NativeLong(this.fileSize), LibC.MADV_WILLNEED);
        log.info("madvise {} {} {} ret = {} time consuming = {}", address, this.fileName, this.fileSize, ret, System.currentTimeMillis() - beginTime);
    }
}

public void munlock() {
    final long beginTime = System.currentTimeMillis();
    final long address = ((DirectBuffer) (this.mappedByteBuffer)).address();
    Pointer pointer = new Pointer(address);
    int ret = LibC.INSTANCE.munlock(pointer, new NativeLong(this.fileSize));
    log.info("munlock {} {} {} ret = {} time consuming = {}", address, this.fileName, this.fileSize, ret, System.currentTimeMillis() - beginTime);
}

3.7 清理(cleanup)

JDK默认公开释放MappedByteBuffer的方法,只能通过反射的方式

@Override
public boolean cleanup(final long currentRef) {
    if (this.isAvailable()) {
        log.error("this file[REF:" + currentRef + "] " + this.fileName
            + " have not shutdown, stop unmapping.");
        return false;
    }

    if (this.isCleanupOver()) {
        log.error("this file[REF:" + currentRef + "] " + this.fileName
            + " have cleanup, do not do it again.");
        return true;
    }

    clean(this.mappedByteBuffer);
    //加一个fileSize大小的负数值
    TOTAL_MAPPED_VIRTUAL_MEMORY.addAndGet(this.fileSize * (-1));
    TOTAL_MAPPED_FILES.decrementAndGet();
    log.info("unmap file[REF:" + currentRef + "] " + this.fileName + " OK");
    return true;
}
/**
 * 通过反射清理MappedByteBuffer
 * @param buffer
 */
public static void clean(final ByteBuffer buffer) {
    if (buffer == null || !buffer.isDirect() || buffer.capacity() == 0)
        return;
    /**
     * 嵌套递归获取directByteBuffer的最内部的attachment或者viewedBuffer方法
     * 获取directByteBuffer的Cleaner对象,然后调用cleaner.clean方法,进行释放资源
     *
     */
    invoke(invoke(viewed(buffer), "cleaner"), "clean");
}
private static Object invoke(final Object target, final String methodName, final Class<?>... args) {
    return AccessController.doPrivileged(new PrivilegedAction<Object>() {
        public Object run() {
            try {
                Method method = method(target, methodName, args);
                method.setAccessible(true);
                return method.invoke(target);
            } catch (Exception e) {
                throw new IllegalStateException(e);
            }
        }
    });
}
private static ByteBuffer viewed(ByteBuffer buffer) {
    String methodName = "viewedBuffer";

    Method[] methods = buffer.getClass().getMethods();
    for (int i = 0; i < methods.length; i++) {
        if (methods[i].getName().equals("attachment")) {
            methodName = "attachment";
            break;
        }
    }

    ByteBuffer viewedBuffer = (ByteBuffer) invoke(buffer, methodName);
    if (viewedBuffer == null)
        return buffer;
    else
        return viewed(viewedBuffer);
}

参考文章

RocketMQ4.8注释github地址 RockeMQ源码分析 RocketMQ源码专栏