likes
comments
collection
share

深入分析RocketMQ的Broker端消息分发流程

作者站长头像
站长
· 阅读数 4

最近因为兴趣使然,又一次尝试着手写一个rmq,但是发现底层依然有很多比较棘手的问题难以实现,于是就开始翻开rmq的broker端源代码进行学习。本文主要是记录在学习broker源码的时候的一些内容。

Rocketmq的消息处理流程

深入分析RocketMQ的Broker端消息分发流程

  • 消息接收:消息接收是指接收producer的消息,处理类是SendMessageProcessor,将消息写入到commigLog文件后,接收流程处理完毕;

  • 消息分发broker处理消息分发的类是ReputMessageService,它会启动一个线程,不断地将commitLong分到到对应的consumerQueue,这一步操作会写两个文件:consumerQueueindexFile,写入后,消息分发流程处理 完毕;

  • 消息投递:消息投递是指将消息发往consumer的流程,consumer会发起获取消息的请求,broker收到请求后,调用PullMessageProcessor类处理,从consumerQueue文件获取消息,返回给consumer后,投递流程处理完毕。

消息写入部分涉及到的点非常多,有写入commitLog,写入indexFile,还有写入consumerQueue。

commitLog部分的写入非常简单,就是一个追加写入mmap的过程,这块我不做过多分析,关于写入indexFile和consumerQueue是比较复杂的部分,由于精力有限,本文重点介绍consumerQueue部分。

Broker端Dispatcher的原理

Dispatcher是Broker端的一个消息写入机制,大家可以理解为是当RocketMQ的消息写入到commitlog后,接着broker会对这些消息进行分发操作,将commitLog的消息转为索引信息,然后写入到consumerQueue文件中。

Dispatcher内部的源代码位于 BrokerController#start 方法中,是一个单独的线程负责这部分的工作。 BrokerController#start 方法的源代码如下:

public void start() throws Exception {
    // 启动各组件
    if (this.messageStore != null) {
        this.messageStore.start();
    }
    ...
}

这个messageStore的具体实现是 DefaultMessageStore类 ,所以我们debug切入到DefaultMessageStore的start函数中观察流程:

public void start() throws Exception {
    ...
    // 处理 maxPhysicalPosInLogicQueue 的值
    long maxPhysicalPosInLogicQueue = commitLog.getMinOffset();
    for (ConcurrentMap<Integer, ConsumeQueue> maps : this.consumeQueueTable.values()) {
        for (ConsumeQueue logic : maps.values()) {
            if (logic.getMaxPhysicOffset() > maxPhysicalPosInLogicQueue) {
                maxPhysicalPosInLogicQueue = logic.getMaxPhysicOffset();
            }
        }
    }
    if (maxPhysicalPosInLogicQueue < 0) {
        maxPhysicalPosInLogicQueue = 0;
    }
    if (maxPhysicalPosInLogicQueue < this.commitLog.getMinOffset()) {
        maxPhysicalPosInLogicQueue = this.commitLog.getMinOffset();
    }
    this.reputMessageService.setReputFromOffset(maxPhysicalPosInLogicQueue);
    // 消息分发操作,启动新线程来处理
    this.reputMessageService.start();
    ...
}

这段源代码中的reputMessageService线程,是一个定时任务,每隔1ms进行一次消息分发操作,也就是下边的reput函数:

class ReputMessageService extends ServiceThread {
    @Override
    public void run() {
        DefaultMessageStore.log.info(...);

        while (!this.isStopped()) {
            try {
                Thread.sleep(1);
                // 调用的是 doReput() 方法
                this.doReput();
            } catch (Exception e) {
                DefaultMessageStore.log.warn(...);
            }
        }

        DefaultMessageStore.log.info(...);
    }
}

reput部分的源代码:


private void doReput() {
    // 处理 reputFromOffset
    if (this.reputFromOffset < DefaultMessageStore.this.commitLog.getMinOffset()) {
        log.warn(...);
        this.reputFromOffset = DefaultMessageStore.this.commitLog.getMinOffset();
    }
    for (boolean doNext = true; this.isCommitLogAvailable() && doNext; ) {

        if (DefaultMessageStore.this.getMessageStoreConfig().isDuplicationEnable()
                && this.reputFromOffset >= DefaultMessageStore.this.getConfirmOffset()) {
            break;
        }

        // 从CommitLog中获取需要进行转发的消息
        SelectMappedBufferResult result 
            = DefaultMessageStore.this.commitLog.getData(reputFromOffset);
        if (result != null) {
            try {
                this.reputFromOffset = result.getStartOffset();

                for (int readSize = 0; readSize < result.getSize() && doNext; ) {
                    // 检验数据
                    DispatchRequest dispatchRequest = DefaultMessageStore.this.commitLog
                        .checkMessageAndReturnSize(result.getByteBuffer(), false, false);
                    int size = dispatchRequest.getBufferSize() == -1 
                        ? dispatchRequest.getMsgSize() : dispatchRequest.getBufferSize();

                    if (dispatchRequest.isSuccess()) {
                        if (size > 0) {
                            // 分发消息
                            DefaultMessageStore.this.doDispatch(dispatchRequest);
                            // 长轮询:如果有消息到了主节点,并且开启了长轮询
                            if (BrokerRole.SLAVE != DefaultMessageStore.this
                                    .getMessageStoreConfig().getBrokerRole()
                                    &&DefaultMessageStore.this.brokerConfig.isLongPollingEnable()){
                                // 调用NotifyMessageArrivingListener的arriving方法
                                DefaultMessageStore.this.messageArrivingListener.arriving(
                                    dispatchRequest.getTopic(),
                                    dispatchRequest.getQueueId(), 
                                    dispatchRequest.getConsumeQueueOffset() + 1,
                                    dispatchRequest.getTagsCode(), 
                                    dispatchRequest.getStoreTimestamp(),
                                    dispatchRequest.getBitMap(), 
                                    dispatchRequest.getPropertiesMap());
                            }

                            ...
                        } else if (size == 0) {
                            ...
                        }
                    } else if (!dispatchRequest.isSuccess()) {
                        ...
                    }
                }
            } finally {
                result.release();
            }
        } else {
            doNext = false;
        }
    }
}

该方法依旧很长,我们重点关注与分发相关的流程:

  • commitLog.getData(...)

CommitLog中获取DispatchRequest需要分发的消息,参数reputFromOffset就是消息在文件中的偏移量

  • this.doDispatch(...)

分发操作,就是把消息的相关写入ConsumeQueueIndexFile两个文件中

  • 是否为主判断

如果当前节点为主节点,且启用了长轮询,则调用NotifyMessageArrivingListenerarriving方法,在这里会把消息主动投递到consumer

总的来说,当消息写入到commitLog后,ReputMessage会根据上一次分发消息的偏移量依次从commitLog文件中读取消息信息,写入到ConsumeQueueIndexFile两个文件中,当然了,这里写入的只是消息的发送时间、在commitLog中的位置信息,完整的消息只有commitLog文件才存在。

写完这两个文件后,接下来就等待consumer来拉取消息了。当然,consumer主动来拉取可能会导致消息无法实时送达,为解决这个问题,rocketMq给出的解决方案是长轮询,具体为:如果当前没有消息,就holdconsumer的请求30s,这30s内一旦有消息过来,就及时唤醒consumer的请求,实际将消息发送出去,就也是**NotifyMessageArrivingListener#arriving**方法所做的工作。

这里的分析,可以看出写入部分还有非常多的细节需要探讨,这里我们重点来看dispatcher的部分逻辑,其余的大家可以自行下载源码进行分析。

Dispatcher函数的源码流程分析

我们再来看看消息分发消息,进入DefaultMessageStore#doDispatch


public class DefaultMessageStore implements MessageStore {

    private final LinkedList<CommitLogDispatcher> dispatcherList;

    /**
     * DefaultMessageStore 构造方法
     */
    public DefaultMessageStore(...) throws IOException {
        ...
        // 消息分发处理
        this.dispatcherList = new LinkedList<>();
        // 写入 ConsumeQueue 文件
        this.dispatcherList.addLast(new CommitLogDispatcherBuildConsumeQueue());
        // 写入 Index 文件
        this.dispatcherList.addLast(new CommitLogDispatcherBuildIndex());
        ...
    }

    /**
     * 分发操作
     */
    public void doDispatch(DispatchRequest req) {
        // 进行分发操作,dispatcherList 包含两个对象:
        // 1. CommitLogDispatcherBuildConsumeQueue:写入 ConsumeQueue 文件
        // 2. CommitLogDispatcherBuildIndex:写入 Index 文件
        for (CommitLogDispatcher dispatcher : this.dispatcherList) {
            dispatcher.dispatch(req);
        }
    }
}

从整个方法的运行来看,DefaultMessageStore在创建时,会准备两个CommitLogDispatcher,它们分别如下:

  • CommitLogDispatcherBuildConsumeQueue

处理ConsumeQueue文件的写入。

  • CommitLogDispatcherBuildIndex

处理IndexFile文件的写入,在DefaultMessageStore#doDispatch方法中,就是对这两个文件的写入操作了。


/**
 * consumerQueue 文件分发的构建器
 */
class CommitLogDispatcherBuildConsumeQueue implements CommitLogDispatcher {

    @Override
    public void dispatch(DispatchRequest request) {
        final int tranType = MessageSysFlag.getTransactionValue(request.getSysFlag());
        switch (tranType) {
            case MessageSysFlag.TRANSACTION_NOT_TYPE:
            case MessageSysFlag.TRANSACTION_COMMIT_TYPE:
                // 将消息在commitLog文件的位置、tags等信息写入ConsumerQueue文件
                DefaultMessageStore.this.putMessagePositionInfo(request);
                break;
            case MessageSysFlag.TRANSACTION_PREPARED_TYPE:
            case MessageSysFlag.TRANSACTION_ROLLBACK_TYPE:
                break;
        }
    }
}

/**
 * indexFile 文件分发的构建器
 */
class CommitLogDispatcherBuildIndex implements CommitLogDispatcher {

    @Override
    public void dispatch(DispatchRequest request) {
        if (DefaultMessageStore.this.messageStoreConfig.isMessageIndexEnable()) {
            DefaultMessageStore.this.indexService.buildIndex(request);
        }
    }
}

需要注意的是,在这两个文件中,写入的仅是消息的位置信息,完整的消息内容仅在commitLog中保存。

上边代码中的dispatch还是比较简单,就是判断一下这个消息有关事务的状态,如果是事务半提交阶段,或者是回滚了事务的消息就不做buildConsumeQueue处理。

这里各位读者们可以试想一下,ConsumeQueue是干什么的,它主要是面向消息消费者的,在ConsumeQueue里面的消息都是能被消息消费者看到的。

所以如果是还没提交事务的消息或者已经回滚了的消息是不可以直接就写入到ConsumeQueue的,而是会被写入到另一个队列中,这一点在另一处源码中有所体现。

好了,下边我们接下来继续深入分析刚才dispatcher函数内部的putMessagePositionInfo方法:


// 将msg的位置信息 放到consumerQueue里面
public void putMessagePositionInfo(DispatchRequest dispatchRequest) {
    // todo 根据消息主题与队列ID,先获取对应的ConsumeQueue文件
    ConsumeQueue cq = this.findConsumeQueue(dispatchRequest.getTopic(), dispatchRequest.getQueueId());
    // todo 将内容追加到consumeQueue的内存映射文件中
    cq.putMessagePositionInfoWrapper(dispatchRequest);
}

这里会找到这个消息对应的 consumeQueue ,接着根据topic 与queueId来找具体的 consumeQueue 内存映射对象:


/**
 * 根据消息主题与队列ID,先获取对应的ConsumeQueue文件
 *
 * 因为每一个消息主题对应一个ConsumeQueue目
 * 录,主题下每一个消息队列对应一个文件夹,所以取出该文件夹最后
 * 的ConsumeQueue文件即可
 */
public ConsumeQueue findConsumeQueue(String topic, int queueId) {
    ConcurrentMap<Integer, ConsumeQueue> map = consumeQueueTable.get(topic);
    if (null == map) {
        ConcurrentMap<Integer, ConsumeQueue> newMap = new ConcurrentHashMap<Integer, ConsumeQueue>(128);
        ConcurrentMap<Integer, ConsumeQueue> oldMap = consumeQueueTable.putIfAbsent(topic, newMap);
        if (oldMap != null) {
            map = oldMap;
        } else {
            map = newMap;
        }
    }

    ConsumeQueue logic = map.get(queueId);
    if (null == logic) {
        ConsumeQueue newLogic = new ConsumeQueue(
            topic,
            queueId,
            StorePathConfigHelper.getStorePathConsumeQueue(this.messageStoreConfig.getStorePathRootDir()),
            this.getMessageStoreConfig().getMappedFileSizeConsumeQueue(),
            this);
        ConsumeQueue oldLogic = map.putIfAbsent(queueId, newLogic);
        if (oldLogic != null) {
            logic = oldLogic;
        } else {
            logic = newLogic;
        }
    }

    return logic;
}

可以看下,逻辑就是从这个consumeQueueTable 缓存中获取,没有的话就创建,consumeQueueTable 就是个map。

private final ConcurrentMap<String/* topic */, ConcurrentMap<Integer/* queueId */, ConsumeQueue>> consumeQueueTable;

接着就是调用consumeQueue对象的putMessagePositionInfoWrapper 方法来处理:


public void putMessagePositionInfoWrapper(DispatchRequest request) {

    final int maxRetries = 30;
    // 是否可以写
    boolean canWrite = this.defaultMessageStore.getRunningFlags().isCQWriteable();
    for (int i = 0; i < maxRetries && canWrite; i++) {
        ...
        // todo 写入consumerQueue 队列中
        boolean result = this.putMessagePositionInfo(request.getCommitLogOffset(),
            request.getMsgSize(), tagsCode, request.getConsumeQueueOffset());
        if (result) {
            if (this.defaultMessageStore.getMessageStoreConfig().getBrokerRole() == BrokerRole.SLAVE ||
                this.defaultMessageStore.getMessageStoreConfig().isEnableDLegerCommitLog()) {
                this.defaultMessageStore.getStoreCheckpoint().setPhysicMsgTimestamp(request.getStoreTimestamp());
            }
            this.defaultMessageStore.getStoreCheckpoint().setLogicsMsgTimestamp(request.getStoreTimestamp());
            return;
        }
        ...
}

for循环是重试的,可以看到重试30次,需要重点关注的是putMessagePositionInfo方法,我们来看下:


private boolean putMessagePositionInfo(final long offset, final int size, final long tagsCode,
    final long cqOffset) {

    if (offset + size <= this.maxPhysicOffset) {
        log.warn("Maybe try to build consume queue repeatedly maxPhysicOffset={} phyOffset={}", maxPhysicOffset, offset);
        return true;
    }

    // 依次将消息偏移量、消息长度、tag哈希码写入
    //ByteBuffer,并根据consumeQueueOffset计算ConsumeQueue中的物理
    //地址,将内容追加到ConsumeQueue的内存映射文件中(本操作只追
    //加,不刷盘),ConsumeQueue的刷盘方式固定为异步刷盘
    this.byteBufferIndex.flip();
    this.byteBufferIndex.limit(CQ_STORE_UNIT_SIZE);
    this.byteBufferIndex.putLong(offset);
    this.byteBufferIndex.putInt(size);
    this.byteBufferIndex.putLong(tagsCode);

    // 根据 consumeQueue offset 计算在 ConsumeQueue 的位置
    final long expectLogicOffset = cqOffset * CQ_STORE_UNIT_SIZE;

    MappedFile mappedFile = this.mappedFileQueue.getLastMappedFile(expectLogicOffset);
    if (mappedFile != null) {
        // 判断mappedFile 是否是第一个 且 cqOffset 不是0 且mappedFile 写位置是0
        if (mappedFile.isFirstCreateInQueue() && cqOffset != 0 && mappedFile.getWrotePosition() == 0) {
            // 设置 最小offset
            this.minLogicOffset = expectLogicOffset;
            // 设置 从哪开始 offset
            this.mappedFileQueue.setFlushedWhere(expectLogicOffset);
            // 设置从哪开始commit
            this.mappedFileQueue.setCommittedWhere(expectLogicOffset);
            this.fillPreBlank(mappedFile, expectLogicOffset);
            log.info("fill pre blank space " + mappedFile.getFileName() + " " + expectLogicOffset + " "
                + mappedFile.getWrotePosition());
        }

        if (cqOffset != 0) {
            // 当前在这个consumeQueue 的offset
            long currentLogicOffset = mappedFile.getWrotePosition() + mappedFile.getFileFromOffset();
            // 你现在要插入offset 比当前在这个consumeQueue 的offset要小,这个就是说明 你在找之前的位置插入,但是人家已经有东西了
            // 要是让你插入的话 就会造成重复,所以这里不让你插入的
            if (expectLogicOffset < currentLogicOffset) {
                log.warn("Build  consume queue repeatedly, expectLogicOffset: {} currentLogicOffset: {} Topic: {} QID: {} Diff: {}",
                    expectLogicOffset, currentLogicOffset, this.topic, this.queueId, expectLogicOffset - currentLogicOffset);
                return true;
            }

            // 按照正常情况下是一样大的,不一样大打印错误日志
            if (expectLogicOffset != currentLogicOffset) {
                LOG_ERROR.warn(
                    "[BUG]logic queue order maybe wrong, expectLogicOffset: {} currentLogicOffset: {} Topic: {} QID: {} Diff: {}",
                    expectLogicOffset,
                    currentLogicOffset,
                    this.topic,
                    this.queueId,
                    expectLogicOffset - currentLogicOffset
                );
            }
        }
        // 设置最大的 物理offset
        this.maxPhysicOffset = offset + size;
        // todo 追加消息
        return mappedFile.appendMessage(this.byteBufferIndex.array());
    }
    return false;
}

putMessagePositionInfo 方法中重要的就是封装要存入consumeQueue的东西,可以看到有 在commitlog中的offset,消息的大小,这个tagcode就是关于tag一些东西,其实就是tag的一个hashcode,可以看到加起来一共是20字节,通过queue offset 计算出在consumeQueue中的一个偏移量。

接着就是获取MappedFile,下面这一堆就是校验的了,可以看到最后执行appendMessage操作了,就是把上面组织的内容写入到buffer中:

public boolean appendMessage(final byte[] data) {
    // 获取当前写的位置
    int currentPos = this.wrotePosition.get();

    // 判断在这个MappedFile中能不能 放下
    if ((currentPos + data.length) <= this.fileSize) {
        try {
            // 写入消息
            this.fileChannel.position(currentPos);
            this.fileChannel.write(ByteBuffer.wrap(data));
        } catch (Throwable e) {
            log.error("Error occurred when append message to mappedFile.", e);
        }
        // 重置 写入消息
        this.wrotePosition.addAndGet(data.length);
        return true;
    }

    return false;
}

好了,到这对于dispatcher中reput函数的分析就结束了,希望能够对各位读者们有所帮助。