RocketMQ中事务消息的Commit和Rollback是如何执行的?
前边几篇文章我们介绍了RocketMQ的事务消息,可是他是如何执行Commit和Rollback的呢。
先剧透
为了阅读方便,会先将总结性的结论放在前边
- 角色检查,查看是不是Slave Broker
- 检查消息类型,除了commit和rollback消息,其他的不做处理
- commit
- 根据位点找到具体消息
- 校验消息参数
- 还原消息
- 将还原的消息放入CommitLog中等待消费
- 删除消息
- rollback
- 根据位点找到具体消息
- 校验消息参数
- 删除消息
开始正文
事务消息的commit和rollback逻辑都在org.apache.rocketmq.broker.processor.EndTransactionProcessor#processRequest
中。具体步骤如下
角色检查
因为Slave Broker
是不处理事务消息的,所以当slave broker
收到消息之后,将直接返回。
//slave broker不处理事务消息
if (BrokerRole.SLAVE == brokerController.getMessageStoreConfig().getBrokerRole()) {
response.setCode(ResponseCode.SLAVE_NOT_AVAILABLE);
LOGGER.warn("Message store is slave mode, so end transaction is forbidden. ");
return response;
}
消息类型检查
这里只通过了commit或者rollback消息,其余的消息类型直接return了。
//消息类型检查,这里只处理commit或者rollback的消息
if (requestHeader.getFromTransactionCheck()) {
switch (requestHeader.getCommitOrRollback()) {
case MessageSysFlag.TRANSACTION_NOT_TYPE: {
return null;
}
case MessageSysFlag.TRANSACTION_COMMIT_TYPE: {
break;
}
case MessageSysFlag.TRANSACTION_ROLLBACK_TYPE: {
break;
}
default:
return null;
}
} else {
switch (requestHeader.getCommitOrRollback()) {
case MessageSysFlag.TRANSACTION_NOT_TYPE: {
return null;
}
case MessageSysFlag.TRANSACTION_COMMIT_TYPE: {
break;
}
case MessageSysFlag.TRANSACTION_ROLLBACK_TYPE: {
break;
}
default:
return null;
}
}
Commit
然后就是Commit的逻辑了。
//提交half消息,根据位点找到消息并提交
result = this.brokerController.getTransactionalMessageService().commitMessage(requestHeader);
if (result.getResponseCode() == ResponseCode.SUCCESS) {
//Half消息数据校验
RemotingCommand res = checkPrepareMessage(result.getPrepareMessage(), requestHeader);
if (res.getCode() == ResponseCode.SUCCESS) {
//将MessageExt对象转化为MessageExtBrokerInner对象,并设置Topic和Consume Queue等信息
MessageExtBrokerInner msgInner = endMessageTransaction(result.getPrepareMessage());
msgInner.setSysFlag(MessageSysFlag.resetTransactionValue(msgInner.getSysFlag(), requestHeader.getCommitOrRollback()));
msgInner.setQueueOffset(requestHeader.getTranStateTableOffset());
msgInner.setPreparedTransactionOffset(requestHeader.getCommitLogOffset());
msgInner.setStoreTimestamp(result.getPrepareMessage().getStoreTimestamp());
MessageAccessor.clearProperty(msgInner, MessageConst.PROPERTY_TRANSACTION_PREPARED);
//将还原后的消息发送到CommitLog中,若是发送成功,则消费者就可以进行消费
RemotingCommand sendResult = sendFinalMessage(msgInner);
if (sendResult.getCode() == ResponseCode.SUCCESS) {
//当发成功,则通过设置标记,删除此half消息
this.brokerController.getTransactionalMessageService().deletePrepareMessage(result.getPrepareMessage());
}
return sendResult;
}
return res;
}
commitMessage(requestHeader)
看方法名是提交了消息,但是进到方法里边看只是按照位点取出了消息。
private OperationResult getHalfMessageByOffset(long commitLogOffset) {
OperationResult response = new OperationResult();
MessageExt messageExt = this.transactionalMessageBridge.lookMessageByOffset(commitLogOffset);
if (messageExt != null) {
response.setPrepareMessage(messageExt);
response.setResponseCode(ResponseCode.SUCCESS);
} else {
response.setResponseCode(ResponseCode.SYSTEM_ERROR);
response.setResponseRemark("Find prepared transaction message failed");
}
return response;
}
checkPrepareMessage()
这是half数据校验,
- 校验了发送消息的生产者的group和当前执行commit或rollback的生产者的group是否是同一个
- 校验了当前消息和请求commit或rollback的消息是否同一个。
endMessageTransaction()
还原消息类型,并设置Topic和Consume Queue等信息
deletePrepareMessage()
删除消息,删除消息的时候会调用addRemoveTagInTransactionOp
方法,构建一个tag为已删除的message消息对象,并进行保存。
Rollback
rollback的逻辑是查询,校验,删除,方法均与commit中的相同。
//查询half消息并返回
result = this.brokerController.getTransactionalMessageService().rollbackMessage(requestHeader);
if (result.getResponseCode() == ResponseCode.SUCCESS) {
//Half消息数据校验
RemotingCommand res = checkPrepareMessage(result.getPrepareMessage(), requestHeader);
if (res.getCode() == ResponseCode.SUCCESS) {
//删除half消息
this.brokerController.getTransactionalMessageService().deletePrepareMessage(result.getPrepareMessage());
}
return res;
}
转载自:https://juejin.cn/post/7133217207844077576