RocketMQ源码11-broker 消息分发流程(ConsumeQueue和BuildIndex)
本系列RocketMQ4.8注释github地址,希望对大家有所帮助,要是觉得可以的话麻烦给点一下Star哈
RocketMq
消息处理整个流程如下:
- 消息接收:消息接收是指接收
producer
的消息,处理类是SendMessageProcessor
,将消息写入到commigLog
文件后,接收流程处理完毕; - 消息分发:
broker
处理消息分发的类是ReputMessageService
,它会启动一个线程,不断地将commitLong
分到到对应的consumerQueue
,这一步操作会写两个文件:consumerQueue
与indexFile
,写入后,消息分发流程处理 完毕; - 消息投递:消息投递是指将消息发往
consumer
的流程,consumer
会发起获取消息的请求,broker
收到请求后,调用PullMessageProcessor
类处理,从consumerQueue
文件获取消息,返回给consumer
后,投递流程处理完毕。
以上就是rocketMq
处理消息的流程了,接下来我们就从源码来分析消息分发的实现。
1. 分发线程的启动
消息写入到commitlog
后,接着broker
会对这些消息进行分发操作,这里的分发,是指broker
将消息写入到consumerQueue
文件中。
broker
消息分发的操作是在一个单独的线程中进行的,这里我们来回忆下BrokerController
的启动流程,进入BrokerController#start
方法:
public void start() throws Exception {
// 启动各组件
if (this.messageStore != null) {
this.messageStore.start();
}
...
}
继续进入DefaultMessageStore#start
方法:
public void start() throws Exception {
...
// 处理 maxPhysicalPosInLogicQueue 的值
long maxPhysicalPosInLogicQueue = commitLog.getMinOffset();
for (ConcurrentMap<Integer, ConsumeQueue> maps : this.consumeQueueTable.values()) {
for (ConsumeQueue logic : maps.values()) {
if (logic.getMaxPhysicOffset() > maxPhysicalPosInLogicQueue) {
maxPhysicalPosInLogicQueue = logic.getMaxPhysicOffset();
}
}
}
if (maxPhysicalPosInLogicQueue < 0) {
maxPhysicalPosInLogicQueue = 0;
}
if (maxPhysicalPosInLogicQueue < this.commitLog.getMinOffset()) {
maxPhysicalPosInLogicQueue = this.commitLog.getMinOffset();
}
this.reputMessageService.setReputFromOffset(maxPhysicalPosInLogicQueue);
// 消息分发操作,启动新线程来处理
this.reputMessageService.start();
...
}
在BrokerController
启动时,会处理maxPhysicalPosInLogicQueue
的值,这个值就是分发commitlog
消息的偏移量,之后就启动ReputMessageService
服务来处理。ReputMessageService
是DefaultMessageStore
的内部类,它是ServiceThread
的子类,start()
方法如下:
public abstract class ServiceThread implements Runnable {
public void start() {
if (!started.compareAndSet(false, true)) {
return;
}
stopped = false;
this.thread = new Thread(this, getServiceName());
this.thread.setDaemon(isDaemon);
this.thread.start();
}
...
}
这个方法仅仅是处理线程的启动,我们继续看ServiceThread
。ServiceThread
是Runnable
的子类,它的run()
方法如下:
class ReputMessageService extends ServiceThread {
@Override
public void run() {
DefaultMessageStore.log.info(...);
while (!this.isStopped()) {
try {
Thread.sleep(1);
// 调用的是 doReput() 方法
this.doReput();
} catch (Exception e) {
DefaultMessageStore.log.warn(...);
}
}
DefaultMessageStore.log.info(...);
}
}
从ReputMessageService#run()
方法来看,该线程会休眠1ms
,然后调用doReput()
方法处理,看来doReput()
方法就是关键了!
2. 消息分发:DefaultMessageStore.ReputMessageService#doReput
我们进入DefaultMessageStore.ReputMessageService#doReput
方法:
private void doReput() {
// 处理 reputFromOffset
if (this.reputFromOffset < DefaultMessageStore.this.commitLog.getMinOffset()) {
log.warn(...);
this.reputFromOffset = DefaultMessageStore.this.commitLog.getMinOffset();
}
for (boolean doNext = true; this.isCommitLogAvailable() && doNext; ) {
if (DefaultMessageStore.this.getMessageStoreConfig().isDuplicationEnable()
&& this.reputFromOffset >= DefaultMessageStore.this.getConfirmOffset()) {
break;
}
// 从CommitLog中获取需要进行转发的消息
SelectMappedBufferResult result
= DefaultMessageStore.this.commitLog.getData(reputFromOffset);
if (result != null) {
try {
this.reputFromOffset = result.getStartOffset();
for (int readSize = 0; readSize < result.getSize() && doNext; ) {
// 检验数据
DispatchRequest dispatchRequest = DefaultMessageStore.this.commitLog
.checkMessageAndReturnSize(result.getByteBuffer(), false, false);
int size = dispatchRequest.getBufferSize() == -1
? dispatchRequest.getMsgSize() : dispatchRequest.getBufferSize();
if (dispatchRequest.isSuccess()) {
if (size > 0) {
// 分发消息
DefaultMessageStore.this.doDispatch(dispatchRequest);
// 长轮询:如果有消息到了主节点,并且开启了长轮询
if (BrokerRole.SLAVE != DefaultMessageStore.this
.getMessageStoreConfig().getBrokerRole()
&&DefaultMessageStore.this.brokerConfig.isLongPollingEnable()){
// 调用NotifyMessageArrivingListener的arriving方法
DefaultMessageStore.this.messageArrivingListener.arriving(
dispatchRequest.getTopic(),
dispatchRequest.getQueueId(),
dispatchRequest.getConsumeQueueOffset() + 1,
dispatchRequest.getTagsCode(),
dispatchRequest.getStoreTimestamp(),
dispatchRequest.getBitMap(),
dispatchRequest.getPropertiesMap());
}
...
} else if (size == 0) {
...
}
} else if (!dispatchRequest.isSuccess()) {
...
}
}
} finally {
result.release();
}
} else {
doNext = false;
}
}
}
该方法依旧很长,我们重点关注与分发相关的流程:
commitLog.getData(...)
:从CommitLog
中获取DispatchRequest
需要分发的消息,参数reputFromOffset
就是消息在文件中的偏移量this.doDispatch(...)
:分发操作,就是把消息的相关写入ConsumeQueue
与IndexFile
两个文件中- 如果当前节点为主节点,且启用了长轮询,则调用
NotifyMessageArrivingListener
的arriving
方法,在这里会把消息主动投递到consumer
总的来说,当消息写入到commitLog
后,ReputMessage
会根据上一次分发消息的偏移量依次从commitLog
文件中读取消息信息,写入到ConsumeQueue
与IndexFile
两个文件中,当然了,这里写入的只是消息的发送时间、在commitLog
中的位置信息,完整的消息只有commitLog
文件才存在。
写完这两个文件后,接下来就等待consumer
来拉取消息了。当然,consumer
主动来拉取可能会导致消息无法实时送达,为解决这个问题,rocketMq
给出的解决方案是长轮询
,具体为:如果当前没有消息,就hold
住consumer
的请求30s,这30s内一旦有消息过来,就及时唤醒consumer
的请求,实际将消息发送出去,就也是NotifyMessageArrivingListener#arriving
方法所做的工作,关于这点我们在分析consumer
拉取消息时再详细分析。
2.1 commitLog.getData
public SelectMappedBufferResult getData(final long offset) {
return this.getData(offset, offset == 0);
}
// 根据offset 从MappedFile中获取数据
public SelectMappedBufferResult getData(final long offset, final boolean returnFirstOnNotFound) {
// 每个MappedFile的大小 默认1G
int mappedFileSize = this.defaultMessageStore.getMessageStoreConfig().getMappedFileSizeCommitLog();
MappedFile mappedFile = this.mappedFileQueue.findMappedFileByOffset(offset, returnFirstOnNotFound);
if (mappedFile != null) {
int pos = (int) (offset % mappedFileSize);
SelectMappedBufferResult result = mappedFile.selectMappedBuffer(pos);
return result;
}
return null;
}
先是获取每个MappedFile
文件的大小,这个默认是1G
,然后根据这个offset
获取一下这个offset
所属的MappedFile
,接着就是计算一下这个commitlog offset
在 这个mappedFile
一个位置,我们可以看到其实就是offset% 文件大小
,最后是从这个MappedFile
中获取这个位置的MappedBuffer
,这里解释一下,根据offset
获取这个offset
所在的那个MappedFile
这个代码就不用看了,你直接用offset/文件大小
能算出来在那个MappedFile
中,你要是实在算不出来,你可以commitlog
所有的MappedFile
,它里面有个起始的offset
,只要在起始offset
与起始offset+文件大小
之间就可以了,然后就是从MappedFile
中获取pos
位置的MappedBuffer
我们需要看下:
// 根据位置读取
public SelectMappedBufferResult selectMappedBuffer(int pos) {
// 获取这个MappedFile 里面的一个read position,其实就是这个MappedFile 的一个可读位置
int readPosition = getReadPosition();
if (pos < readPosition && pos >= 0) {
if (this.hold()) {
ByteBuffer byteBuffer = this.mappedByteBuffer.slice();
byteBuffer.position(pos);
int size = readPosition - pos;
ByteBuffer byteBufferNew = byteBuffer.slice();
byteBufferNew.limit(size);
return new SelectMappedBufferResult(this.fileFromOffset + pos, byteBufferNew, size, this);
}
}
return null;
}
这个其实就是获取这个MappedFile 中pos之后到MappedFile 现在写到哪的那块byteBuffer,这块属于nio里面ByteBuffer的一些操作,不懂的可以看下相关的api文档。
2.2 DefaultMessageStore#doDispatch
我们再来看看消息分发消息,进入DefaultMessageStore#doDispatch
:
public class DefaultMessageStore implements MessageStore {
private final LinkedList<CommitLogDispatcher> dispatcherList;
/**
* DefaultMessageStore 构造方法
*/
public DefaultMessageStore(...) throws IOException {
...
// 消息分发处理
this.dispatcherList = new LinkedList<>();
// 写入 ConsumeQueue 文件
this.dispatcherList.addLast(new CommitLogDispatcherBuildConsumeQueue());
// 写入 Index 文件
this.dispatcherList.addLast(new CommitLogDispatcherBuildIndex());
...
}
/**
* 分发操作
*/
public void doDispatch(DispatchRequest req) {
// 进行分发操作,dispatcherList 包含两个对象:
// 1. CommitLogDispatcherBuildConsumeQueue:写入 ConsumeQueue 文件
// 2. CommitLogDispatcherBuildIndex:写入 Index 文件
for (CommitLogDispatcher dispatcher : this.dispatcherList) {
dispatcher.dispatch(req);
}
}
}
从整个方法的运行来看,DefaultMessageStore
在创建时,会准备两个CommitLogDispatcher
:
CommitLogDispatcherBuildConsumeQueue
:处理ConsumeQueue
文件的写入CommitLogDispatcherBuildIndex
:处理IndexFile
文件的写入
在DefaultMessageStore#doDispatch
方法中,就是对这两个文件的写入操作了:
/**
* consumerQueue 文件分发的构建器
*/
class CommitLogDispatcherBuildConsumeQueue implements CommitLogDispatcher {
@Override
public void dispatch(DispatchRequest request) {
final int tranType = MessageSysFlag.getTransactionValue(request.getSysFlag());
switch (tranType) {
case MessageSysFlag.TRANSACTION_NOT_TYPE:
case MessageSysFlag.TRANSACTION_COMMIT_TYPE:
// 将消息在commitLog文件的位置、tags等信息写入ConsumerQueue文件
DefaultMessageStore.this.putMessagePositionInfo(request);
break;
case MessageSysFlag.TRANSACTION_PREPARED_TYPE:
case MessageSysFlag.TRANSACTION_ROLLBACK_TYPE:
break;
}
}
}
/**
* indexFile 文件分发的构建器
*/
class CommitLogDispatcherBuildIndex implements CommitLogDispatcher {
@Override
public void dispatch(DispatchRequest request) {
if (DefaultMessageStore.this.messageStoreConfig.isMessageIndexEnable()) {
DefaultMessageStore.this.indexService.buildIndex(request);
}
}
}
需要注意的是,在这两个文件中,写入的仅是消息的位置信息,完整的消息内容仅在commitLog
中保存。
3. ConsumeQueue
3.1 consumeQueue流程介绍
reput
其实就是后台一个线程,不停的从commitlog
取出消息,然后看看这个消息是哪个topic
的哪个queue
的,然后找到对应的consumeQueue
,将消息在commitlog
中的一个offset
还有几个其他信息写入到这个consumeQueue
中,reput
不光写入consumeQueue
这一个事情,还有一个就是为这个消息构建索引
,第4小节说这个。
这里有个最关键的问题是,我这个reput
工作到底从commitlog
的哪个位置进行reput
,总不能从头开始搞吧,其实在MessageStore
也就是存储器最开始start
的时候,会找到一个合适的offset
进行reput
工作,到底怎么找呢?就是把所有的一个consumeQueue
遍历一遍,然后找到那个最大在commitlog 的offset
,是这个样子的,reput
是单线程的,一旦决定从哪个offset
开始,就会顺着往下找,然后往consumeQueue
追加写,找到这个offset
之后,就会启动这个reput
线程进行工作,反正就是不停的从commitlog
中把消息取出来,然后写到对应topic
,queueId
下的consumeQueue
里面去,具体怎样从commitlog中
取出来,存到这个consumeQueue
中这个过程我们会在后面进行源码剖析。
在broker
中,一个topic
下面的一个queue id
会对应一个consumeQueue
,然后一个consumeQueue
会有一个mappedFileQueue
,这mappedFileQueue
其实就是一个集合,然后里面有一堆的MappedFile
,然后每个MappedFile
映射的文件能存储30w条信息
,每条占20个字节,大于400多m空间
。
3.2 源码解析
我们知道存储器MessageStore
启动的时候,会给reput
这个service
设置从哪个offset
开始读取并启动reput service
:
可以看到 先是找到commitlog
中最小的一个offset
,然后与每个consumeQueue
里面存储的那个最大commitlog offset
做比较,选出一个最大的作为起始offset
,因为consumeQueue
是单线程的,所以找出来那个最大的commitlog offset
(其实这里代码称它为PhysicOffset
)是最为准确的,毕竟之前的commitlog offset
对应的消息一定被存储到 consumeQueue
中了,最后就是设置到reput service
中,启动reput service
。
接2.2小节,我们看一下CommitLogDispatcherBuildConsumeQueue.dispatch()
:
/**
* consumerQueue 文件分发的构建器
*/
class CommitLogDispatcherBuildConsumeQueue implements CommitLogDispatcher {
@Override
public void dispatch(DispatchRequest request) {
// 事务
final int tranType = MessageSysFlag.getTransactionValue(request.getSysFlag());
switch (tranType) {
case MessageSysFlag.TRANSACTION_NOT_TYPE:
case MessageSysFlag.TRANSACTION_COMMIT_TYPE:
// todo 将消息在commitLog文件的位置、tags等信息写入ConsumerQueue文件
DefaultMessageStore.this.putMessagePositionInfo(request);
break;
// 如果事务还没有提交 或者 回滚了 就不需要写入consumerQueue
case MessageSysFlag.TRANSACTION_PREPARED_TYPE:
case MessageSysFlag.TRANSACTION_ROLLBACK_TYPE:
break;
}
}
}
这个dispatch
还是比较简单,就是判断一下这个消息有关事务的状态,如果是事务半提交阶段,或者是回滚了事务的消息就不做buildConsumeQueue
处理,可以想一下,ConsumeQueue
是干什么的,它主要是面向消息消费者的,在ConsumeQueue
里面的消息都是能被消息消费者看到的,你还没提交事务的消息或者已经回滚了的消息是不可能给消息消费者看到的,看到那岂不是拿去消费了。
好了我们看下这个存储器里面的putMessagePositionInfo
方法。
// 将msg的位置信息 放到consumerQueue里面
public void putMessagePositionInfo(DispatchRequest dispatchRequest) {
// todo 根据消息主题与队列ID,先获取对应的ConsumeQueue文件
ConsumeQueue cq = this.findConsumeQueue(dispatchRequest.getTopic(), dispatchRequest.getQueueId());
// todo 将内容追加到consumeQueue的内存映射文件中
cq.putMessagePositionInfoWrapper(dispatchRequest);
}
首先是找到这个消息对应的consumeQueue
,根据topic
与queueId
来找:
/**
* 根据消息主题与队列ID,先获取对应的ConsumeQueue文件
*
* 因为每一个消息主题对应一个ConsumeQueue目
* 录,主题下每一个消息队列对应一个文件夹,所以取出该文件夹最后
* 的ConsumeQueue文件即可
*/
public ConsumeQueue findConsumeQueue(String topic, int queueId) {
ConcurrentMap<Integer, ConsumeQueue> map = consumeQueueTable.get(topic);
if (null == map) {
ConcurrentMap<Integer, ConsumeQueue> newMap = new ConcurrentHashMap<Integer, ConsumeQueue>(128);
ConcurrentMap<Integer, ConsumeQueue> oldMap = consumeQueueTable.putIfAbsent(topic, newMap);
if (oldMap != null) {
map = oldMap;
} else {
map = newMap;
}
}
ConsumeQueue logic = map.get(queueId);
if (null == logic) {
ConsumeQueue newLogic = new ConsumeQueue(
topic,
queueId,
StorePathConfigHelper.getStorePathConsumeQueue(this.messageStoreConfig.getStorePathRootDir()),
this.getMessageStoreConfig().getMappedFileSizeConsumeQueue(),
this);
ConsumeQueue oldLogic = map.putIfAbsent(queueId, newLogic);
if (oldLogic != null) {
logic = oldLogic;
} else {
logic = newLogic;
}
}
return logic;
}
可以看下,很简单,就是从这个consumeQueueTable
缓存中获取,没有的话就创建。consumeQueueTable
就是个map
:
private final ConcurrentMap<String/* topic */, ConcurrentMap<Integer/* queueId */, ConsumeQueue>> consumeQueueTable;
接着就是调用consumeQueue
对象的putMessagePositionInfoWrapper
方法来处理:
public void putMessagePositionInfoWrapper(DispatchRequest request) {
final int maxRetries = 30;
// 是否可以写
boolean canWrite = this.defaultMessageStore.getRunningFlags().isCQWriteable();
for (int i = 0; i < maxRetries && canWrite; i++) {
...
// todo 写入consumerQueue 队列中
boolean result = this.putMessagePositionInfo(request.getCommitLogOffset(),
request.getMsgSize(), tagsCode, request.getConsumeQueueOffset());
if (result) {
if (this.defaultMessageStore.getMessageStoreConfig().getBrokerRole() == BrokerRole.SLAVE ||
this.defaultMessageStore.getMessageStoreConfig().isEnableDLegerCommitLog()) {
this.defaultMessageStore.getStoreCheckpoint().setPhysicMsgTimestamp(request.getStoreTimestamp());
}
this.defaultMessageStore.getStoreCheckpoint().setLogicsMsgTimestamp(request.getStoreTimestamp());
return;
}
...
}
for循环是重试的,可以看到重试30次
,需要重点关注的是putMessagePositionInfo
方法,我们来看下:
private boolean putMessagePositionInfo(final long offset, final int size, final long tagsCode,
final long cqOffset) {
if (offset + size <= this.maxPhysicOffset) {
log.warn("Maybe try to build consume queue repeatedly maxPhysicOffset={} phyOffset={}", maxPhysicOffset, offset);
return true;
}
// 依次将消息偏移量、消息长度、tag哈希码写入
//ByteBuffer,并根据consumeQueueOffset计算ConsumeQueue中的物理
//地址,将内容追加到ConsumeQueue的内存映射文件中(本操作只追
//加,不刷盘),ConsumeQueue的刷盘方式固定为异步刷盘
this.byteBufferIndex.flip();
this.byteBufferIndex.limit(CQ_STORE_UNIT_SIZE);
this.byteBufferIndex.putLong(offset);
this.byteBufferIndex.putInt(size);
this.byteBufferIndex.putLong(tagsCode);
// 根据 consumeQueue offset 计算在 ConsumeQueue 的位置
final long expectLogicOffset = cqOffset * CQ_STORE_UNIT_SIZE;
MappedFile mappedFile = this.mappedFileQueue.getLastMappedFile(expectLogicOffset);
if (mappedFile != null) {
// 判断mappedFile 是否是第一个 且 cqOffset 不是0 且mappedFile 写位置是0
if (mappedFile.isFirstCreateInQueue() && cqOffset != 0 && mappedFile.getWrotePosition() == 0) {
// 设置 最小offset
this.minLogicOffset = expectLogicOffset;
// 设置 从哪开始 offset
this.mappedFileQueue.setFlushedWhere(expectLogicOffset);
// 设置从哪开始commit
this.mappedFileQueue.setCommittedWhere(expectLogicOffset);
this.fillPreBlank(mappedFile, expectLogicOffset);
log.info("fill pre blank space " + mappedFile.getFileName() + " " + expectLogicOffset + " "
+ mappedFile.getWrotePosition());
}
if (cqOffset != 0) {
// 当前在这个consumeQueue 的offset
long currentLogicOffset = mappedFile.getWrotePosition() + mappedFile.getFileFromOffset();
// 你现在要插入offset 比当前在这个consumeQueue 的offset要小,这个就是说明 你在找之前的位置插入,但是人家已经有东西了
// 要是让你插入的话 就会造成重复,所以这里不让你插入的
if (expectLogicOffset < currentLogicOffset) {
log.warn("Build consume queue repeatedly, expectLogicOffset: {} currentLogicOffset: {} Topic: {} QID: {} Diff: {}",
expectLogicOffset, currentLogicOffset, this.topic, this.queueId, expectLogicOffset - currentLogicOffset);
return true;
}
// 按照正常情况下是一样大的,不一样大打印错误日志
if (expectLogicOffset != currentLogicOffset) {
LOG_ERROR.warn(
"[BUG]logic queue order maybe wrong, expectLogicOffset: {} currentLogicOffset: {} Topic: {} QID: {} Diff: {}",
expectLogicOffset,
currentLogicOffset,
this.topic,
this.queueId,
expectLogicOffset - currentLogicOffset
);
}
}
// 设置最大的 物理offset
this.maxPhysicOffset = offset + size;
// todo 追加消息
return mappedFile.appendMessage(this.byteBufferIndex.array());
}
return false;
}
putMessagePositionInfo
方法中重要的就是封装要存入consumeQueue
的东西,可以看到有 在commitlog
中的offset
,消息的大小,这个tagcode
就是关于tag
一些东西,其实就是tag
的一个hashcode
,可以看到加起来一共是20字节,通过queue offset
计算出在consumeQueue
中的一个偏移量。接着就是获取MappedFile
,下面这一堆就是校验的了,可以看到最后执行appendMessage
操作了,就是把上面组织的内容写入到buffer
中:
public boolean appendMessage(final byte[] data) {
// 获取当前写的位置
int currentPos = this.wrotePosition.get();
// 判断在这个MappedFile中能不能 放下
if ((currentPos + data.length) <= this.fileSize) {
try {
// 写入消息
this.fileChannel.position(currentPos);
this.fileChannel.write(ByteBuffer.wrap(data));
} catch (Throwable e) {
log.error("Error occurred when append message to mappedFile.", e);
}
// 重置 写入消息
this.wrotePosition.addAndGet(data.length);
return true;
}
return false;
}
好了,到这我们reput
关于ConsumeQueue
的部分就结束了,我们在回到doReput
方法中去:
执行完doDispatch
后,我们看到是master角色的broker
还需要通知一下messageArrivingListener
,这个我们后面再来看,接着就是维护一些offset ,size
之类的东西了,如果是slave
就是记录一下状态。
好了我们doReput
关于consumeQueue
的内容我们就解析完成了,下面附上一张consumeQueue
存储格式内容的图,一共是20字节:
4. BuildIndex
4.1 BuildIndex原理介绍
首先解释下BuildIndex
这个词是什么意思,其实就是一个创建索引
,构建索引的意思,我们知道当消息生产者send一条消息给broker ,broker 先是会将消息存入commitlog
中,并将消息存入结果返回给消息生产者, 然后后台有一个reput的线程,不断的从commitlog
中取出消息来,交给不同的dispatcher
来进行处理,其中有BuildConsumeQueue
这么一个dispatcher
,拿到消息的信息后,按照消息不同topic 不同的queue 找到对应的ConsumeQueue
,然后将消息的在commitlog
中的一个offset,消息的大小,消息tagcode 根据queue offset 写到consumeQueue
对应的位置中,这样做的对于消息消费者端只需要知道 从哪个topic ,哪个queue,哪个位置(queue offset )开始消费就可以了,通过这个几个元素,就可以获取对应消息在commitlog 的offset ,消息的大小,然后拿着commitlog offset
与消息大小,就可以到commitlog
获取到完整的消息。
通过上面的介绍我们知道了consumeQueue
是给消息消费者进行消费使用的,那么我们构建索引是干什么用的呢?比如说,我们想看看某个消息的被哪个消息消费者给消费的,或者是消息丢了,我们看看这个消息有没有被存到broker 上面,我们可以通过RocketMQ提供的可视化界面根据消息的topic,msgId或者topic
,key进行查找,找到你想要的消息,BuildIndex
你可以理解为往HashMap put 元素,它会根据 消息的topic与msgId 或者是topic与key 生成一个 key,然后将那个消息的commitlog
的offset
,key的hash值等元素封装成一个value,写到indexFile
中。
在介绍写入过程之前我们要知道RocketMQ中它存放索引的文件是有多个的,由indexFileList 这个arrayList存着,然后一个索引文件就是一个indexFile,一个indexFile
就对应着一个MappedFile
,一个MappedFile
就对应着一个文件,一个索引文件中默认是有500w个hash槽,2000w个索引
,还有文件头,一个文件头是40个字节,一个hash槽是4个字节,一个索引是20字节,一个indexFile大约就有400多m。
当一个消息的信息dispatch
过来,先是找到一个indexFile
,就是从indexFileList
这个集合中找最后一个,如果是满了的话就创建一个,根据消息的topic与msgId生成一个key,计算出这个key的hash值,计算出这个hash值哪个hash槽上面,根据hash槽找到具体的位置,然后获取里面的值,如果hash槽里面有值了,这个时候就说明之前有消息挂在这个hash上了,就跟我们HashMap的hash冲突一样,封装数据的时候会把上一个hash槽里面的值封装进去,然后将封装的数据存起来,在索引文件头也就是indexFileIndex
中维护了一个indexCount
来记录当前索引文件的数量,然后你每添加一个索引,就会自+1
,你也可以理解indexCount
是当前索引在索引文件中的位置,事实上,索引在文件的位置就会这个值算出来的。hash槽里面存的也是这个值,当你封装好你索引数据并写入的时候,也会将hash里面的值改成你这个indexCount值。
Index结构分析:
下面我贴上一张流程图:
其实这个样子就可以看到,当hash冲突的时候,跟jdk1.8之前的HashMap
差不多一样的解决方案,就是挂链表,就是上图虚线这个样子。
需要注意的是,它不光根据你topic与msgId
创建索引,还会根据你topic与keys创建索引
,那个keys就是你消息生产者在发送消息之前封装Message的时候塞进去的,我们可以看下最佳实践关于keys的描述
4.2 BuildIndex源码解析
这一小节我们将看看BuildIndex
的源码,我们先是从这个dispatch
开始:
/**
* indexFile 文件分发的构建器
*/
class CommitLogDispatcherBuildIndex implements CommitLogDispatcher {
@Override
public void dispatch(DispatchRequest request) {
// 如果messsageIndexEnable设置为true,则调用
//IndexService#buildIndex构建哈希索引,否则忽略本次转发任务
if (DefaultMessageStore.this.messageStoreConfig.isMessageIndexEnable()) {
// todo
DefaultMessageStore.this.indexService.buildIndex(request);
}
}
}
我们可以看到是调用了IndexSerivce
的buildIndex
方法来构建索引,接着再来看下这个buildIndex
方法:
public void buildIndex(DispatchRequest req) {
// 获取或创建Index文件
IndexFile indexFile = retryGetAndCreateIndexFile();
if (indexFile != null) {
// 获取文件最大的物理偏移量
long endPhyOffset = indexFile.getEndPhyOffset();
DispatchRequest msg = req;
String topic = msg.getTopic();
String keys = msg.getKeys();
// 如果该消息的物理偏移量小于Index文件中的物理偏移量,则说明
//是重复数据,忽略本次索引构建
if (msg.getCommitLogOffset() < endPhyOffset) {
return;
}
final int tranType = MessageSysFlag.getTransactionValue(msg.getSysFlag());
switch (tranType) {
case MessageSysFlag.TRANSACTION_NOT_TYPE:
case MessageSysFlag.TRANSACTION_PREPARED_TYPE:
case MessageSysFlag.TRANSACTION_COMMIT_TYPE:
break;
case MessageSysFlag.TRANSACTION_ROLLBACK_TYPE:
return;
}
// 如果消息的唯一键不为空,则添加到哈希索引中,以便加速根据唯一键检索消息
if (req.getUniqKey() != null) {
// todo 构建索引键
indexFile = putKey(indexFile, msg, buildKey(topic, req.getUniqKey()));
if (indexFile == null) {
log.error("putKey error commitlog {} uniqkey {}", req.getCommitLogOffset(), req.getUniqKey());
return;
}
}
// 构建索引键,RocketMQ支持为同一个消息建立多个索
//引,多个索引键用空格分开
if (keys != null && keys.length() > 0) {
String[] keyset = keys.split(MessageConst.KEY_SEPARATOR);
for (int i = 0; i < keyset.length; i++) {
String key = keyset[i];
if (key.length() > 0) {
// todo
indexFile = putKey(indexFile, msg, buildKey(topic, key));
if (indexFile == null) {
log.error("putKey error commitlog {} uniqkey {}", req.getCommitLogOffset(), req.getUniqKey());
return;
}
}
}
}
} else {
log.error("build index error, stop building index");
}
}
先是调用retryGetAndCreateIndexFile
方法获取一个IndexFile
,接着就是获取消息里面的topic,keys
,事务状态,如果是回滚事务就不需要创建索引了,接着就是构建一个topic#msgId
的索引了,最后是构建topic#key
的索引(我们在塞入keys
的时候如果是一个key的话不要有空字符,不然的话它就会当成2个key,然后进行分割),先来看下这个获取indexFile的retryGetAndCreateIndexFile
方法:
public IndexFile retryGetAndCreateIndexFile() {
IndexFile indexFile = null;
// 重试 3次
for (int times = 0; null == indexFile && times < MAX_TRY_IDX_CREATE; times++) {
// 获取索引文件
indexFile = this.getAndCreateLastIndexFile();
if (null != indexFile)
break;
try {
log.info("Tried to create index file " + times + " times");
Thread.sleep(1000);
} catch (InterruptedException e) {
log.error("Interrupted", e);
}
}
// 没有获取到 index文件的时候
if (null == indexFile) {
this.defaultMessageStore.getAccessRights().makeIndexFileError();
log.error("Mark index file cannot build flag");
}
return indexFile;
}
该方法就是带有重试获取的方法,重试次数是3
,就是调用了getAndCreateLastIndexFile
方法来获取,我们看下这个方法,这个方法很长:
public IndexFile getAndCreateLastIndexFile() {
IndexFile indexFile = null;
IndexFile prevIndexFile = null;
long lastUpdateEndPhyOffset = 0;
long lastUpdateIndexTimestamp = 0;
{
// 获取 读锁
this.readWriteLock.readLock().lock();
// 不为空
if (!this.indexFileList.isEmpty()) {
// 获取最后一个
IndexFile tmp = this.indexFileList.get(this.indexFileList.size() - 1);
// 如果没有写满的话
if (!tmp.isWriteFull()) {
indexFile = tmp;
// 写满了
} else {
lastUpdateEndPhyOffset = tmp.getEndPhyOffset();
lastUpdateIndexTimestamp = tmp.getEndTimestamp();
prevIndexFile = tmp;
}
}
this.readWriteLock.readLock().unlock();
}
// 新建indexFile 然后将上一个indexFile刷盘
if (indexFile == null) {
try {
// 文件名 年月日时分秒毫秒
String fileName =
this.storePath + File.separator
+ UtilAll.timeMillisToHumanString(System.currentTimeMillis());
// 创建一个indexFile
indexFile =
new IndexFile(fileName, this.hashSlotNum, this.indexNum, lastUpdateEndPhyOffset,
lastUpdateIndexTimestamp);
// 获取读锁 将这个indexFile 放入集合中
this.readWriteLock.writeLock().lock();
this.indexFileList.add(indexFile);
} catch (Exception e) {
log.error("getLastIndexFile exception ", e);
} finally {
this.readWriteLock.writeLock().unlock();
}
// 开启一个 刷盘线程
if (indexFile != null) {
final IndexFile flushThisFile = prevIndexFile;
Thread flushThread = new Thread(new Runnable() {
@Override
public void run() {
// 把上一个indexFile 刷盘
IndexService.this.flush(flushThisFile);
}
}, "FlushIndexFileThread");
flushThread.setDaemon(true);
flushThread.start();
}
}
return indexFile;
}
先是从indexFileList
中获取最后一个indexFile
,然后判断满了没满,如果是满了的话,就要重新建了。新建一个indexFile,然后创建线程把上一个的indexFile进行刷盘
,好了这里我们获取indexFile的源码就解释清楚了,接着看下putKey
的方法,这个方法就是失败重试的方法。
// 将key放到索引文件中
private IndexFile putKey(IndexFile indexFile, DispatchRequest msg, String idxKey) {
// todo
for (boolean ok = indexFile.putKey(idxKey, msg.getCommitLogOffset(), msg.getStoreTimestamp()); !ok; ) {
log.warn("Index file [" + indexFile.getFileName() + "] is full, trying to create another one");
// 重新火球或者创建一个indexFile
indexFile = retryGetAndCreateIndexFile();
if (null == indexFile) {
return null;
}
// 重新构建索引
ok = indexFile.putKey(idxKey, msg.getCommitLogOffset(), msg.getStoreTimestamp());
}
return indexFile;
}
我们可以看到有个循环一直重试,失败就是重新获取一个indexFile
,然后再进行putKey
,下面就是具体过程了,与我们上图介绍的流程差不多
/**
* 将消息索引键与消息偏移量的映射关系写入Index
* @param key 消息索引
* @param phyOffset 消息偏移量
* @param storeTimestamp 消息存储时间
* @return
*/
public boolean putKey(final String key, final long phyOffset, final long storeTimestamp) {
// 当前已使用条目小于 允许最大条目数时
if (this.indexHeader.getIndexCount() < this.indexNum) {
// 根据key算出哈希码
int keyHash = indexKeyHashMethod(key);
// 计算哈希槽
int slotPos = keyHash % this.hashSlotNum;
// 当前哈希槽 对应的物理地址
int absSlotPos = IndexHeader.INDEX_HEADER_SIZE + slotPos * hashSlotSize;
这一段是先判断一下这个indexFile
中的index
有没有超,默认是2000w个索引
,如果是超了的话,就返回false
,到上一层方法中就会重新获取indexFile
,重新putKey
,我们这里就是假设它没有超,接着就是 根据key计算一个hash值出来
,计算过程:
public int indexKeyHashMethod(final String key) {
int keyHash = key.hashCode();
int keyHashPositive = Math.abs(keyHash);
if (keyHashPositive < 0)
keyHashPositive = 0;
return keyHashPositive;
}
就是String
的hashCode()方法
,然后取的绝对值。接着就是使用hash值% hash的数量
,就能得到这个index在哪个hash槽上面,然后在计算hash在文件中的位置。
FileLock fileLock = null;
try {
// fileLock = this.fileChannel.lock(absSlotPos, hashSlotSize,
// false);
// 读取哈希槽中存储的数据
int slotValue = this.mappedByteBuffer.getInt(absSlotPos);
// 如果哈希槽存储的数据小于0或大于当前Index文件中的索引条目,则将slotValue设置为0
if (slotValue <= invalidIndex || slotValue > this.indexHeader.getIndexCount()) {
slotValue = invalidIndex;
}
// 计算待存储消息的时间戳与第一条消息时间戳的差值,并转换成秒
long timeDiff = storeTimestamp - this.indexHeader.getBeginTimestamp();
timeDiff = timeDiff / 1000;
if (this.indexHeader.getBeginTimestamp() <= 0) {
timeDiff = 0;
} else if (timeDiff > Integer.MAX_VALUE) {
timeDiff = Integer.MAX_VALUE;
} else if (timeDiff < 0) {
timeDiff = 0;
}
接着就是根据hash槽在文件中的位置
,获取hash
槽里面的那个值,其实hash槽里面存的是索引的index
,也就是这个文件的第几个索引,是维护在header
头里的一个成员变量,这里这个invalidIndex
是0 ,其实就是判断从hash槽里面取出来的值合不合法,不合法就设置成0,下面这段就是计算当前要构建index的这个消息 写入commitlog
时间距离 这个索引文件第一个消息写入commitlog
的一个时间差。
// todo 将条目信息存储在Index文件中
// 计算新添加条目的起始物理偏移量:头部字节长度+哈希槽数
//量×单个哈希槽大小(4个字节)+当前Index条目个数×单个Index条
//目大小(20个字节)
int absIndexPos =
IndexHeader.INDEX_HEADER_SIZE + this.hashSlotNum * hashSlotSize
+ this.indexHeader.getIndexCount() * indexSize;
// 依次将哈希码、消息物理偏移量、消息存储时间戳与前一条记录的index索引存入MappedByteBuffer
this.mappedByteBuffer.putInt(absIndexPos, keyHash);
this.mappedByteBuffer.putLong(absIndexPos + 4, phyOffset);
this.mappedByteBuffer.putInt(absIndexPos + 4 + 8, (int) timeDiff);
this.mappedByteBuffer.putInt(absIndexPos + 4 + 8 + 4, slotValue);
// 将当前Index文件中包含的条目数量存入哈希槽中,覆盖原先哈希槽的值
this.mappedByteBuffer.putInt(absSlotPos, this.indexHeader.getIndexCount());
// 如果当前文件只包含一个条目,
//则更新beginPhyOffset、beginTimestamp、endPyhOffset、
//endTimestamp以及当前文件使用索引条目等信息
if (this.indexHeader.getIndexCount() <= 1) {
this.indexHeader.setBeginPhyOffset(phyOffset);
this.indexHeader.setBeginTimestamp(storeTimestamp);
}
if (invalidIndex == slotValue) {
// 更新哈希槽 有数据的个数
this.indexHeader.incHashSlotCount();
}
this.indexHeader.incIndexCount();
this.indexHeader.setEndPhyOffset(phyOffset);
this.indexHeader.setEndTimestamp(storeTimestamp);
return true;
接着这一部分先是计算出 这个索引应该存在这个索引文件的什么位置header大小(40)+ hash槽数量(500w)* 一个hash槽大小(4)+index(当前这个索引在这个索引文件中是第几个,第一个存入的就是1,第二个存入的就是2 )* 一个索引的大小(20)
,计算好位置后,就进行追加写,先是4个字节的keyhash值,接着就是8个字节消息在commitlog的offset,接着4个字节与第一个索引写入commitlog的一个时间差,最后就是4个字节的 hash槽上一个索引的index值,这个非常重要,这个样子,hash冲突就是链表的形式解决的。
接着又将 这个索引的index值写入 hash槽中
。
最后这一堆其实就是更新索引文件头header
的里面的数值,先是判断是不是第一个索引,第一个索引的话,设置开始的一个commitlog offse
t与开始的一个写入时间。接着hash槽+1,index数量+1,更新最后的commitlog offset 与最后一个索引的写入commitlog的时间。
到这里BuildIndex
源码解析就结束了,是不是跟HashMap的put方法
很像呢,可以结合上面的流程图来理解这些代码。
5. 总结
本文主要分析了broker
消息分发分发,这里说的分发流程,是指broker
将消息写入到consumerQueue
文件和Index文件
的流程。
在broker
启动时,会启动一个专门的线程:ReputMessageService
,该线程会不停地从commitLong
获取消息,然后将其写入到consumerQueue
文件与IndexFile
文件中。
参考文章
转载自:https://juejin.cn/post/7214512376266326075