RocketMQ事务消息中,生产者没能commit咋办?
简述
若是由于种种原因,当提交完事务消息之后,没有将Commit或者Rollback命令发送给Broker。那么Broker的Half消息要如何处理呢?
Broker有一个回查机制处理既未发送commit也没有发送rollback的消息。 在Broker接受事务消息的一段时间之后,如果生产者没有发送commit消息也没有发送rollback消息,那么Broker会主动询问生产者该事务消息的执行结果。
回查服务的实现类是org.apache.rocketmq.broker.transaction.TransactionalMessageCheckService
,这是一个线程服务会在后台不停地执行它的run方法。
@Override
public void run() {
log.info("Start transaction check service thread!");
while (!this.isStopped()) {
long checkInterval = brokerController.getBrokerConfig().getTransactionCheckInterval();
this.waitForRunning(checkInterval);
}
log.info("End transaction check service thread!");
}
waitForRunning()
如下:
protected void waitForRunning(long interval) {
if (hasNotified.compareAndSet(true, false)) {
this.onWaitEnd();
return;
}
//entry to wait
waitPoint.reset();
try {
waitPoint.await(interval, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
log.error("Interrupted", e);
} finally {
hasNotified.set(false);
this.onWaitEnd();
}
}
这个方法可以理解为,每隔一段时间执行onWaitEnd()
方法。这个时间就是上边取到的checkInterval
。
onWaitEnd()
方法则如下所示:
@Override
protected void onWaitEnd() {
long timeout = brokerController.getBrokerConfig().getTransactionTimeOut();
int checkMax = brokerController.getBrokerConfig().getTransactionCheckMax();
long begin = System.currentTimeMillis();
log.info("Begin to check prepare message, begin time:{}", begin);
this.brokerController.getTransactionalMessageService().check(timeout, checkMax, this.brokerController.getTransactionalMessageCheckListener());
log.info("End to check prepare message, consumed time:{}", System.currentTimeMillis() - begin);
}
timeout
是超时时间,若是事务消息在这个时间内没有进行commit或者rollback,则会进行一次回查。但是在这个时间内是不会进行回查的。checkMax
是最大回查次数,也就是说,回查若是失败了,则会执行多次。若是达到这个次数还是没有回查成功,则这个消息会被忽略。 Broker每隔一段时间会执行check()方法来执行回查命令。
回查
在check()方法里,首先获取RMQ_SYS_TRANS_HALF_TOPIC
中达到回查条件但没有回查过的消息,这个topic里边存的都是需要回查的消息。然后轮询所有消息,检查是否需要回查。
String topic = TopicValidator.RMQ_SYS_TRANS_HALF_TOPIC;
Set<MessageQueue> msgQueues = transactionalMessageBridge.fetchMessageQueues(topic);
if (msgQueues == null || msgQueues.size() == 0) {
log.warn("The queue of topic is empty :" + topic);
return;
}
log.debug("Check topic={}, queues={}", topic, msgQueues);
for (MessageQueue messageQueue : msgQueues) {
//检查该消息是否需要回查的逻辑
}
对于消息是否都需要回查,则在for循环内的while循环中,这里分步骤说明。
回查前的校验
MAX_PROCESS_TIME_LIMIT
,这是回查限制时间,每条消息回查限制60S内,且这个时间不能配置。
if (System.currentTimeMillis() - startTime > MAX_PROCESS_TIME_LIMIT) {
log.info("Queue={} process time reach max={}", messageQueue, MAX_PROCESS_TIME_LIMIT);
break;
}
removeMap
保存的是已经执行过commit或者rollback的half消息位点
long i = halfOffset;
if (removeMap.containsKey(i)) {
log.debug("Half offset {} has been committed/rolled back", i);
Long removedOpOffset = removeMap.remove(i);
doneOpOffset.add(removedOpOffset);
}
以上的校验是,若是在允许回查的时间内,而且生产者对这条消息没有执行过commit或者rollback,那么将执行回查。
检查是否有消息需要回查
这段逻辑是对空消息的判断,从位点i获取消息,若是消息为空,那么:
- 若是消息为空的次数超过设定值,那么结束这次回查。
- 若是回查的位点有问题,则进行下一个位点的检查。
GetResult getResult = getHalfMsg(messageQueue, i);
MessageExt msgExt = getResult.getMsg();
if (msgExt == null) {
if (getMessageNullCount++ > MAX_RETRY_COUNT_WHEN_HALF_NULL) {
break;
}
if (getResult.getPullResult().getPullStatus() == PullStatus.NO_NEW_MSG) {
log.debug("No new msg, the miss offset={} in={}, continue check={}, pull result={}", i,
messageQueue, getMessageNullCount, getResult.getPullResult());
break;
} else {
log.info("Illegal offset, the miss offset={} in={}, continue check={}, pull result={}",
i, messageQueue, getMessageNullCount, getResult.getPullResult());
i = getResult.getPullResult().getNextBeginOffset();
newOffset = i;
continue;
}
}
msgExt
是half消息对象getMessageNullCount
是当前回查的次数,每执行一次+1MAX_RETRY_COUNT_WHEN_HALF_NULL
是允许half消息为空的次数,默认是1,不能配置。
回查次数校验,消息过期时间校验
if (needDiscard(msgExt, transactionCheckMax) || needSkip(msgExt)) {
listener.resolveDiscardMsg(msgExt);
newOffset = i + 1;
i++;
continue;
}
needDiscard(msgExt, transactionCheckMax)
方法用来检查该half消息的回查次数是否已经达到最大值。
private boolean needDiscard(MessageExt msgExt, int transactionCheckMax) {
//取到该消息的回查次数
String checkTimes = msgExt.getProperty(MessageConst.PROPERTY_TRANSACTION_CHECK_TIMES);
int checkTime = 1;
//回查次数不为空
if (null != checkTimes) {
checkTime = getInt(checkTimes);
//回查次数已经大于等于最大值了
if (checkTime >= transactionCheckMax) {
return true;
} else {
checkTime++;
}
}
//更新回查次数
msgExt.putUserProperty(MessageConst.PROPERTY_TRANSACTION_CHECK_TIMES, String.valueOf(checkTime));
return false;
}
needSkip(MessageExt msgExt)
是用来校验Half对应的commitlog是否已经过期。
private boolean needSkip(MessageExt msgExt) {
long valueOfCurrentMinusBorn = System.currentTimeMillis() - msgExt.getBornTimestamp();
if (valueOfCurrentMinusBorn
> transactionalMessageBridge.getBrokerController().getMessageStoreConfig().getFileReservedTime()
* 3600L * 1000) {
log.info("Half message exceed file reserved time ,so skip it.messageId {},bornTime {}",
msgExt.getMsgId(), msgExt.getBornTimestamp());
return true;
}
return false;
}
新建的消息不回查
在这个回查任务之后新建的消息,不回查
if (msgExt.getStoreTimestamp() >= startTime) {
log.debug("Fresh stored. the miss offset={}, check it later, store={}", i,
new Date(msgExt.getStoreTimestamp()));
break;
}
免疫回查时间内的消息也不回查
在消息产生的一段时间内(默认6秒钟),不执行回查。
//消息产生到现在的时间
long valueOfCurrentMinusBorn = System.currentTimeMillis() - msgExt.getBornTimestamp();
//免疫时间,也就是本地事务最长可以执行的时间
long checkImmunityTime = transactionTimeout;
String checkImmunityTimeStr = msgExt.getUserProperty(MessageConst.PROPERTY_CHECK_IMMUNITY_TIME_IN_SECONDS);
if (null != checkImmunityTimeStr) {
//若是设置的transactionTimeout可以转为long,则用这个时间,若是不可以,就用checkImmunityTimeStr*1000这个时间。
checkImmunityTime = getImmunityTime(checkImmunityTimeStr, transactionTimeout);
//消息产生到现在的时间小于消息回查免疫时间,不执行回查
if (valueOfCurrentMinusBorn < checkImmunityTime) {
if (checkPrepareQueueOffset(removeMap, doneOpOffset, msgExt)) {
newOffset = i + 1;
i++;
continue;
}
}
} else {
if ((0 <= valueOfCurrentMinusBorn) && (valueOfCurrentMinusBorn < checkImmunityTime)) {
log.debug("New arrived, the miss offset={}, check it later checkImmunity={}, born={}", i,
checkImmunityTime, new Date(msgExt.getBornTimestamp()));
break;
}
}
最终判断是否需要回查并进行回查
needCheck
的逻辑如注解中说的那样,判断完毕之后,又会将消息放回到回查队列中。回查之执行完毕会更新消费位点。
List<MessageExt> opMsg = pullResult.getMsgFoundList();
//没有op消息,并且当前时间在消息免疫期外
boolean isNeedCheck = (opMsg == null && valueOfCurrentMinusBorn > checkImmunityTime)
//当前half消息存在op消息,并且最后一个op消息产生的时间在免疫期外
|| (opMsg != null && (opMsg.get(opMsg.size() - 1).getBornTimestamp() - startTime > transactionTimeout))
//消息产生的时间有问题,也就是Broker和客户端有时差
|| (valueOfCurrentMinusBorn <= -1);
if (isNeedCheck) {
//将消息放回到回查队列中,因为回查是异步的,下一次依旧会判断这个half消息是否需要回查,若是已经回查成功,会写入removeMap中,下一次将跳过
if (!putBackHalfMsgQueue(msgExt, i)) {
continue;
}
//执行回查
listener.resolveHalfMsg(msgExt);
} else {
pullResult = fillOpRemoveMap(removeMap, opQueue, pullResult.getNextBeginOffset(), halfOffset, doneOpOffset);
log.debug("The miss offset:{} in messageQueue:{} need to get more opMsg, result is:{}", i,
messageQueue, pullResult);
continue;
}
至此,回查逻辑完毕。
转载自:https://juejin.cn/post/7132864792909316104