likes
comments
collection
share

RocketMQ事务消息中,生产者没能commit咋办?

作者站长头像
站长
· 阅读数 54

简述

若是由于种种原因,当提交完事务消息之后,没有将Commit或者Rollback命令发送给Broker。那么Broker的Half消息要如何处理呢?

Broker有一个回查机制处理既未发送commit也没有发送rollback的消息。 在Broker接受事务消息的一段时间之后,如果生产者没有发送commit消息也没有发送rollback消息,那么Broker会主动询问生产者该事务消息的执行结果。

回查服务的实现类是org.apache.rocketmq.broker.transaction.TransactionalMessageCheckService,这是一个线程服务会在后台不停地执行它的run方法

@Override
public void run() {
    log.info("Start transaction check service thread!");
    while (!this.isStopped()) {
        long checkInterval = brokerController.getBrokerConfig().getTransactionCheckInterval();
        this.waitForRunning(checkInterval);
    }
    log.info("End transaction check service thread!");
}

waitForRunning()如下:

protected void waitForRunning(long interval) {
    if (hasNotified.compareAndSet(true, false)) {
        this.onWaitEnd();
        return;
    }

    //entry to wait
    waitPoint.reset();

    try {
        waitPoint.await(interval, TimeUnit.MILLISECONDS);
    } catch (InterruptedException e) {
        log.error("Interrupted", e);
    } finally {
        hasNotified.set(false);
        this.onWaitEnd();
    }
}

这个方法可以理解为,每隔一段时间执行onWaitEnd()方法。这个时间就是上边取到的checkIntervalonWaitEnd()方法则如下所示:

@Override
protected void onWaitEnd() {
    long timeout = brokerController.getBrokerConfig().getTransactionTimeOut();
    int checkMax = brokerController.getBrokerConfig().getTransactionCheckMax();
    long begin = System.currentTimeMillis();
    log.info("Begin to check prepare message, begin time:{}", begin);
    this.brokerController.getTransactionalMessageService().check(timeout, checkMax, this.brokerController.getTransactionalMessageCheckListener());
    log.info("End to check prepare message, consumed time:{}", System.currentTimeMillis() - begin);
}
  • timeout是超时时间,若是事务消息在这个时间内没有进行commit或者rollback,则会进行一次回查。但是在这个时间内是不会进行回查的。
  • checkMax是最大回查次数,也就是说,回查若是失败了,则会执行多次。若是达到这个次数还是没有回查成功,则这个消息会被忽略。 Broker每隔一段时间会执行check()方法来执行回查命令。

回查

在check()方法里,首先获取RMQ_SYS_TRANS_HALF_TOPIC中达到回查条件但没有回查过的消息,这个topic里边存的都是需要回查的消息。然后轮询所有消息,检查是否需要回查。

String topic = TopicValidator.RMQ_SYS_TRANS_HALF_TOPIC;
Set<MessageQueue> msgQueues = transactionalMessageBridge.fetchMessageQueues(topic);
if (msgQueues == null || msgQueues.size() == 0) {
    log.warn("The queue of topic is empty :" + topic);
    return;
}
log.debug("Check topic={}, queues={}", topic, msgQueues);
for (MessageQueue messageQueue : msgQueues) {
    //检查该消息是否需要回查的逻辑
}

对于消息是否都需要回查,则在for循环内的while循环中,这里分步骤说明。

回查前的校验

MAX_PROCESS_TIME_LIMIT,这是回查限制时间,每条消息回查限制60S内,且这个时间不能配置。

if (System.currentTimeMillis() - startTime > MAX_PROCESS_TIME_LIMIT) {
    log.info("Queue={} process time reach max={}", messageQueue, MAX_PROCESS_TIME_LIMIT);
    break;
}

removeMap保存的是已经执行过commit或者rollback的half消息位点

long i = halfOffset;
if (removeMap.containsKey(i)) {
    log.debug("Half offset {} has been committed/rolled back", i);
    Long removedOpOffset = removeMap.remove(i);
    doneOpOffset.add(removedOpOffset);
}

以上的校验是,若是在允许回查的时间内,而且生产者对这条消息没有执行过commit或者rollback,那么将执行回查。

检查是否有消息需要回查

这段逻辑是对空消息的判断,从位点i获取消息,若是消息为空,那么:

  • 若是消息为空的次数超过设定值,那么结束这次回查。
  • 若是回查的位点有问题,则进行下一个位点的检查。
GetResult getResult = getHalfMsg(messageQueue, i);
MessageExt msgExt = getResult.getMsg();
if (msgExt == null) {
    if (getMessageNullCount++ > MAX_RETRY_COUNT_WHEN_HALF_NULL) {
        break;
    }
    if (getResult.getPullResult().getPullStatus() == PullStatus.NO_NEW_MSG) {
        log.debug("No new msg, the miss offset={} in={}, continue check={}, pull result={}", i,
            messageQueue, getMessageNullCount, getResult.getPullResult());
        break;
    } else {
        log.info("Illegal offset, the miss offset={} in={}, continue check={}, pull result={}",
            i, messageQueue, getMessageNullCount, getResult.getPullResult());
        i = getResult.getPullResult().getNextBeginOffset();
        newOffset = i;
        continue;
    }
}
  • msgExt是half消息对象
  • getMessageNullCount是当前回查的次数,每执行一次+1
  • MAX_RETRY_COUNT_WHEN_HALF_NULL是允许half消息为空的次数,默认是1,不能配置。

回查次数校验,消息过期时间校验

if (needDiscard(msgExt, transactionCheckMax) || needSkip(msgExt)) {
    listener.resolveDiscardMsg(msgExt);
    newOffset = i + 1;
    i++;
    continue;
}

needDiscard(msgExt, transactionCheckMax)方法用来检查该half消息的回查次数是否已经达到最大值

private boolean needDiscard(MessageExt msgExt, int transactionCheckMax) {
    //取到该消息的回查次数
    String checkTimes = msgExt.getProperty(MessageConst.PROPERTY_TRANSACTION_CHECK_TIMES);
    int checkTime = 1;
    //回查次数不为空
    if (null != checkTimes) {
        checkTime = getInt(checkTimes);
        //回查次数已经大于等于最大值了
        if (checkTime >= transactionCheckMax) {
            return true;
        } else {
            checkTime++;
        }
    }
    //更新回查次数
    msgExt.putUserProperty(MessageConst.PROPERTY_TRANSACTION_CHECK_TIMES, String.valueOf(checkTime));
    return false;
}

needSkip(MessageExt msgExt)是用来校验Half对应的commitlog是否已经过期

private boolean needSkip(MessageExt msgExt) {
    long valueOfCurrentMinusBorn = System.currentTimeMillis() - msgExt.getBornTimestamp();
    if (valueOfCurrentMinusBorn
        > transactionalMessageBridge.getBrokerController().getMessageStoreConfig().getFileReservedTime()
        * 3600L * 1000) {
        log.info("Half message exceed file reserved time ,so skip it.messageId {},bornTime {}",
            msgExt.getMsgId(), msgExt.getBornTimestamp());
        return true;
    }
    return false;
}

新建的消息不回查

在这个回查任务之后新建的消息,不回查

if (msgExt.getStoreTimestamp() >= startTime) {
    log.debug("Fresh stored. the miss offset={}, check it later, store={}", i,
        new Date(msgExt.getStoreTimestamp()));
    break;
}

免疫回查时间内的消息也不回查

在消息产生的一段时间内(默认6秒钟),不执行回查。

//消息产生到现在的时间
long valueOfCurrentMinusBorn = System.currentTimeMillis() - msgExt.getBornTimestamp();
//免疫时间,也就是本地事务最长可以执行的时间
long checkImmunityTime = transactionTimeout;
String checkImmunityTimeStr = msgExt.getUserProperty(MessageConst.PROPERTY_CHECK_IMMUNITY_TIME_IN_SECONDS);
if (null != checkImmunityTimeStr) {
    //若是设置的transactionTimeout可以转为long,则用这个时间,若是不可以,就用checkImmunityTimeStr*1000这个时间。
    checkImmunityTime = getImmunityTime(checkImmunityTimeStr, transactionTimeout);
    //消息产生到现在的时间小于消息回查免疫时间,不执行回查
    if (valueOfCurrentMinusBorn < checkImmunityTime) {
        if (checkPrepareQueueOffset(removeMap, doneOpOffset, msgExt)) {
            newOffset = i + 1;
            i++;
            continue;
        }
    }
} else {
    if ((0 <= valueOfCurrentMinusBorn) && (valueOfCurrentMinusBorn < checkImmunityTime)) {
        log.debug("New arrived, the miss offset={}, check it later checkImmunity={}, born={}", i,
            checkImmunityTime, new Date(msgExt.getBornTimestamp()));
        break;
    }
}

最终判断是否需要回查并进行回查

needCheck的逻辑如注解中说的那样,判断完毕之后,又会将消息放回到回查队列中。回查之执行完毕会更新消费位点。

List<MessageExt> opMsg = pullResult.getMsgFoundList();
//没有op消息,并且当前时间在消息免疫期外
boolean isNeedCheck = (opMsg == null && valueOfCurrentMinusBorn > checkImmunityTime)
        //当前half消息存在op消息,并且最后一个op消息产生的时间在免疫期外
        || (opMsg != null && (opMsg.get(opMsg.size() - 1).getBornTimestamp() - startTime > transactionTimeout))
        //消息产生的时间有问题,也就是Broker和客户端有时差
        || (valueOfCurrentMinusBorn <= -1);

if (isNeedCheck) {
    //将消息放回到回查队列中,因为回查是异步的,下一次依旧会判断这个half消息是否需要回查,若是已经回查成功,会写入removeMap中,下一次将跳过
    if (!putBackHalfMsgQueue(msgExt, i)) {
        continue;
    }
    //执行回查
    listener.resolveHalfMsg(msgExt);
} else {
    pullResult = fillOpRemoveMap(removeMap, opQueue, pullResult.getNextBeginOffset(), halfOffset, doneOpOffset);
    log.debug("The miss offset:{} in messageQueue:{} need to get more opMsg, result is:{}", i,
        messageQueue, pullResult);
    continue;
}

至此,回查逻辑完毕。

转载自:https://juejin.cn/post/7132864792909316104
评论
请登录