likes
comments
collection
share

RocketMQ(九)高级特性-消息重试机制

作者站长头像
站长
· 阅读数 23

rocketMq具有消息重试的机制,重试也分为两种重试:producer重试consumer重试

producer重试

如果由于网络抖动等原因,Producer程序向Broker发送消息时没有成功,即发送端没有收到Broker的ACK,导致最终Consumer无法消费消息,此时RocketMQ会自动进行重试。

可以通过以下方式进行配置: 1)使用starter的同学可以直接在yml文件中进行配置,主要属性有超时时间重试次数,另外提供一个在其他broker节点重试的机制,如下所示:

rocketmq:
  name-server: http://101.200.36.168:9876
  producer:
    #指定消息发送者的组,在控制台查询时会用到
    group: test
    #发送失败超时时间
    send-message-timeout: 3000
    #重试次数
    retry-times-when-send-failed: 3
    #在其他broker服务端进行重试默认false,开启设置为on
    retry-next-server: false

2)在java代码中可以对producer进行手动设置:

RocketMQ(九)高级特性-消息重试机制

consumer重试(两种:监听、自定义消费者)

两种方式其实监听是对自定义的一个封装,只不过自定义可能更灵活一些,使用监听的形式我还没找到在哪里设置重试次数。

下面先看看监听的形式: 在消费者监听器,自定义抛出异常,会发生重试:

/**
 * RocketMqProducer
 * @date: 2020/11/26
 * @author weirx
 * @version 3.0
 */
@Slf4j
@Component
@RocketMQMessageListener(topic = "test_reconsume", selectorExpression = "*", consumerGroup = "test_reconsume")
public class RetryConsumerListener implements RocketMQListener<MessageExt> {

    @SneakyThrows
    @Override
    public void onMessage(MessageExt messageExt) {
        byte[] body = messageExt.getBody();
        String msg = new String(body);
        log.info("receive sync message:{}", msg);
        throw new Exception("开始重试");
    }
}

重试时的消息属性如下图:

RocketMQ(九)高级特性-消息重试机制

上图中有一个delay属性,其实是延时等级,还记得我们前面学习的延时消息吗?在borker.conf中配置了了DelayDevel等级(延时等级),重试机制也是按照这个等级来的,默认情况下总共会重试16次,这个等级逐渐加1。下一次的重试属性如下图:

RocketMQ(九)高级特性-消息重试机制

下面举例一个自定义消费者代码:

package com.cloud.bssp.message.rocketmq.consumer;

import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.MessageExt;
import org.springframework.stereotype.Component;

import java.util.List;

/**
 * 重试consumer
 *
 * @date: 2020/11/30
 * @author weirx
 * @version 3.0
 */
@Component
@Slf4j
public class RetryConsumer {
    public static void main(String[] args) throws MQClientException {
        //创建消费者
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("rmq-group");
        //设置NameServer地址,替换成自己的ip地址
        consumer.setNamesrvAddr("ip:9876");
        //设置实例名称
        consumer.setInstanceName("consumer");
        //订阅topic
        consumer.subscribe("test_reconsume_1", "");
        //监听消息
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
                //获取消息
                for (MessageExt messageExt : list) {
                    log.info("消息重试:{} ", messageExt.getMsgId() + "---" + new String(messageExt.getBody()));
                }
                try {
                    //模拟错误
                    int i = 5 / 0;
                } catch (Exception e) {
                    e.printStackTrace();
                    //需要重试
                    return ConsumeConcurrentlyStatus.RECONSUME_LATER;
                }
                //不需要重试
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });
        //启动消费者
        consumer.start();
        System.out.println("Consumer Started!");
    }
}

两种方式默认情况下都是重试16次,使用延时等级配置的时间。 自定义消费者可以使用如下的方式进行配置最大重试次数:

 //设置重试次数为2
 consumer.setMaxReconsumeTimes(2);