likes
comments
collection
share

【业务实战2】spring boot整合rocketMQ超详细教学

作者站长头像
站长
· 阅读数 4

一、引言

实际工作中我们还是得用spring来进行开发,所以这篇文章旨在告诉大家spring boot和RocketMQ的整合

1)RocketMQ版本

RocketMQ的版本是4.9.4 注:学习的时候一定要用同一个版本,不然会出现各种奇怪的问题影响学习进度

2)maven依赖配置

先配置好maven依赖

<parent>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-parent</artifactId>
    <version>2.7.18</version>
</parent>

<dependencies>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter</artifactId>
    </dependency>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-web</artifactId>
    </dependency>
    <dependency>
        <groupId>org.apache.rocketmq</groupId>
        <artifactId>rocketmq-spring-boot-starter</artifactId>
        <version>2.2.2</version>
    </dependency>
    <dependency>
        <groupId>org.apache.rocketmq</groupId>
        <artifactId>rocketmq-client</artifactId>
        <version>4.9.4</version>
    </dependency>
    <dependency>
        <groupId>org.projectlombok</groupId>
        <artifactId>lombok</artifactId>
        <version>1.16.18</version>
        <scope>compile</scope>
    </dependency>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-configuration-processor</artifactId>
        <optional>true</optional>
        <scope>runtime</scope>
    </dependency>
</dependencies>

3)代码地址如下(记得结合代码看,好理解)

gitee.com/chen-hongru…

二、 最简实现

不推荐使用在生产环境,只是进行演示下主流程

1)生产者配置

生产者比较简单,直接从容器里面取RocketMQTemplate,然后调用convertAndSend即可

convertAndSend具体的投递函数是一个抽象方法,如果用rocketMQTemplate进行调用,实际的执行和syncSend无异。

convertAndSend提供了一个Header参数的传入,可以给消息指定一些Message

// 包名:org.example.controller.SimpleController
@Slf4j
@RestController
@RequestMapping("/simple")
public class SimpleController {
    
    @Autowired
    private RocketMQTemplate rocketMQTemplate;
    
    @RequestMapping("/send")
    public String simpleSend() {
        rocketMQTemplate.convertAndSend("simple-send-topic", "simple-send-topic simple hello");
        return "ok";
    }
}

2)消费者配置

消费者也很好实现,新建一个实现了RocketMQListener的类,然后用@RocketMQMessageListener修饰即可

// org.example.listener.SimpleMessageListener
@Slf4j
@Component
@RocketMQMessageListener(
        topic = "simple-send-topic",
        consumerGroup = "simple-send-group"
)
public class SimpleMessageListener implements RocketMQListener<String> {
    @Override
    public void onMessage(String message) {
        log.info("Received message: " + message);
    }
}

3)结果演示

在浏览器中访问localhost:8080/simple/simplePush,可在idea控制台中看到打印

2024-01-30 20:57:58.823 ...: Received message: simple-send-topic simple hello

三、推荐实现

1. 先来个配置类,方面获取配置

// org.example.config.RocketMQConfigProperties
@Data
@Component
@ConfigurationProperties(prefix = "rocketmq")
public class RocketMQConfigProperties {
    private String nameServer;
    private HashMap<String, String> topic;
    private Consumer consumer;
    @Data
    public static class Consumer {
        private HashMap<String, String> group;
    }
}
// application.properties
server.port = 8080
# NameServer地址
rocketmq.name-server=127.0.0.1:9876
# topic
rocketmq.topic.common=common-topic
rocketmq.topic.pull=pull-topic
rocketmq.topic.rpc=rpc-topic
rocketmq.topic.order=order-topic
rocketmq.topic.transaction=transaction-topic
rocketmq.topic.delay=delay-topic
rocketmq.topic.batch=batch-topic
# Producer配置
rocketmq.producer.group=group
# Consumer配置
rocketmq.consumer.group.common=common-group
rocketmq.consumer.group.pull=pull-group
rocketmq.consumer.group.rpc=rpc-group
rocketmq.consumer.group.order=order-group
rocketmq.consumer.group.transaction=transaction-group
rocketmq.consumer.group.delay=delay-group
rocketmq.consumer.group.batch=batch-group

2. 生产者(同步/异步/单程)

同步、异步和单程都可以通过rocketMQTemplate搞定

// org.example.controller.SendController
@RequestMapping("/common")
public String common() {
    // 同步发送
    rocketMQTemplate.syncSend(prop.getTopic().get("common"), "syncSend");
    // 异步发送
    rocketMQTemplate.asyncSend(prop.getTopic().get("common"), "asyncSend", new SendCallback() {
        @Override
        public void onSuccess(SendResult sendResult) {
            log.info("Message sent successfully! MessageId: " + sendResult.getMsgId());
        }
        @Override
        public void onException(Throwable e) {
            log.error("Failed to send message: " + e.getMessage(), e);
        }
    });
    // 单程发送
    rocketMQTemplate.sendOneWay(prop.getTopic().get("common"), "sendOneWay");
    return "ok";
}

2. 推模式消费者

与最简实现不同的地方是通过实现RocketMQPushConsumerLifecycleListener生命周期方法,实现了注解所不能实现的配置,可以更细粒度的控制参数

// org.example.listener.PushMessageListener
@Slf4j
@Component
@RocketMQMessageListener(
        topic = "",
        consumerGroup = "${rocketmq.consumer.group.common}",
        consumeMode = ConsumeMode.CONCURRENTLY,
        messageModel = MessageModel.CLUSTERING)
public class PushMessageListener implements RocketMQListener<String>, RocketMQPushConsumerLifecycleListener {
    @Autowired
    private RocketMQConfigProperties prop;
    @Override
    public void prepareStart(DefaultMQPushConsumer defaultMQPushConsumer) {
        try {
            // 订阅主题
            defaultMQPushConsumer.subscribe(prop.getTopic().get("common"), "*");
            // 设置开始消费位置
            defaultMQPushConsumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
        }catch (Exception e) {
            log.error(e.getMessage(), e);
        }
    }
    @Override
    public void onMessage(String message) {
        log.info("Received message: " + message);
    }
}

3. 拉模式消费者

拉模式的偏移量完全用自己控制,0就表示从目前队列的第一个数据开始拉取

当偏移量为1表示目前准备取第二条数据

consumer.pull(queue, "*", offset, 1);的最后一个参数表示每次拉取的数据量

// org.example.controller.PullController
@RequestMapping("/pull")
public String pull() throws Exception {
    DefaultMQPullConsumer consumer = new DefaultMQPullConsumer(prop.getConsumer().getGroup().get("pull"));
    consumer.setNamesrvAddr(prop.getNameServer());
    consumer.start();
    Set<MessageQueue> messageQueues = consumer.fetchSubscribeMessageQueues(prop.getTopic().get("pull"));
    for(MessageQueue queue : messageQueues){
        // 从偏移量0开始拉取所有的数据
        long offset = 0;
        while(true){
            PullResult pullResult = consumer.pull(queue, "*", offset, 1);
            offset = pullResult.getNextBeginOffset();
            // 在队列中拉取不到消息就结束
            if(PullStatus.FOUND != pullResult.getPullStatus()) {
                break;
            }
            List<MessageExt> messageExtList = pullResult.getMsgFoundList();
            for (MessageExt m : messageExtList) {
                log.info("当前队列为:"+queue.getQueueId()+",偏移量为:"+offset+
                         ",拉取到数据:"+ new String(m.getBody())+",投递的时间为:" + timeToString(m.getBornTimestamp()));
            }
        }
    }
    consumer.shutdown();
    return "ok";
}
private String timeToString(long timestamp) {
    Instant instant = Instant.ofEpochMilli(timestamp);
    LocalDateTime dateTime = LocalDateTime.ofInstant(instant, ZoneId.systemDefault());
    DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
    return dateTime.format(formatter);
}

4. 远程调用

可以很方便的实现远程调用

生产者:

// org.example.controller.RpcController
// rpc发送
@RequestMapping("/send")
public String send() {
    // rpc发送
    String responseMessage = rocketMQTemplate.sendAndReceive(prop.getTopic().get("rpc"), "我是rpc message", String.class);
    log.info("rpc Listener发送结果:"+responseMessage);
    return "ok";
}

消费者:

// org.example.listener.RpcMessageListener
@Slf4j
@Component
@RocketMQMessageListener(
        topic = "${rocketmq.consumer.rpc-topic}",
        consumerGroup = "${rocketmq.consumer.rpc-group}")
public class RpcMessageListener implements RocketMQReplyListener<String, String> {
    @Override
    public String onMessage(String message) {
        log.info("rpc listener receive" + message);
        return "reply string";
    }
}

5. 顺序消息

想要发送顺序消息需要注意两点: 1)分区有序和全局有序(当有多个MessageQueue时,是分区有序,通过HashKey将消息分发到指定的MessageQueue中;当只有一个MessageQueue时,因为分区即全局所以是全局有序) 2)当有多个分区时,消费者会在多个MessageQueue中来回切换,要结合需求看看是否可行 【业务实战2】spring boot整合rocketMQ超详细教学 3)发送必须单线程,否则无法保证生产的消息是按预想顺序推送的 生产者用syncSendOrderly就行

// org.example.controller.OrderController
// 发送顺序消息 hashKey用来让顺序消息被放到同一个队列上
@RequestMapping("/send")
public String send() {
    for(int i = 0; i < 100; i++) {
        rocketMQTemplate.syncSendOrderly(prop.getTopic().get("order"), "我是order-"+TmpTools.timeToString(new Date().getTime())+"-"+i, "商品ID");  
        rocketMQTemplate.syncSendOrderly(prop.getTopic().get("order"), "我是order-"+TmpTools.timeToString(new Date().getTime())+"-"+i, "商品ID2");
    }
    return "ok";
}

消费者将consumeMode设置为有序的就行

// org.example.listener.OrderMessageListener
@Slf4j
@Component
@RocketMQMessageListener(
        topic = "${rocketmq.topic.order}",
        consumerGroup = "${rocketmq.consumer.group.order}",
        consumeMode = ConsumeMode.ORDERLY)
public class OrderMessageListener implements RocketMQListener<String> {
    @Override
    public void onMessage(String message) {
        log.info("Received message: " + message);
    }
}

6. 事务消息

事务发送比较简单

// org.example.controller.TransactionController
@RequestMapping("/send")
public String send() {
    // 事务发送
    Message<String> message = MessageBuilder.withPayload("我是事务消息")
        .setHeader(RocketMQHeaders.KEYS, 1)
        .setHeader("orderID", 10)
        .setHeader(RocketMQHeaders.TRANSACTION_ID, 100).build();
    rocketMQTemplate.sendMessageInTransaction(prop.getTopic().get("transaction"), message, null);
    return "ok";
}

业务代码需要写在Listener中,可以通过rocketMQTemplateBeanName指定Template的bean名称

如果想写多个,则需要创建多个Template

// org.example.listener.TransactionSendListener
@Slf4j
@Component
@RocketMQTransactionListener(rocketMQTemplateBeanName = "rocketMQTemplate")
public class TransactionSendListener implements RocketMQLocalTransactionListener {
    @Override
    public RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg) {
        log.info("本地事务成功");
        return RocketMQLocalTransactionState.COMMIT;
    }
    @Override
    public RocketMQLocalTransactionState checkLocalTransaction(Message msg) {
        log.info("事务检查成功");
        return RocketMQLocalTransactionState.COMMIT;
    }
}

消费者正常消费即可,上游事务妥妥的已完成

// org.example.listener.TransactionPushListener
@Slf4j
@Component
@RocketMQMessageListener(
        topic = "",
        consumerGroup = "${rocketmq.consumer.group.transaction}",
        consumeMode = ConsumeMode.CONCURRENTLY,
        messageModel = MessageModel.CLUSTERING)
public class TransactionPushListener implements RocketMQListener<String>, RocketMQPushConsumerLifecycleListener {
    @Autowired
    private RocketMQConfigProperties prop;
    @Override
    public void prepareStart(DefaultMQPushConsumer defaultMQPushConsumer) {
        try {
            // 订阅主题
            defaultMQPushConsumer.subscribe(prop.getTopic().get("transaction"), "*");
            // 设置开始消费位置
            defaultMQPushConsumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
        }catch (Exception e) {
            log.error(e.getMessage(), e);
        }
    }
    @Override
    public void onMessage(String message) {
        log.info("Received message: " + message);
    }
}

7. 延迟消息

生产者如下

最后一个参数是延迟级别,这些级别分别是:

1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h

3表示第三个级别,即10秒

// org.example.controller.DelayController
@RequestMapping("/send")
public String send() {
    rocketMQTemplate.syncSend(prop.getTopic().get("delay"),
                              new GenericMessage<>("我是延迟消息" + TmpTools.timeToString(new Date().getTime())),
                              3000,
                              3);
    return "ok";
}

消费者

// org.example.listener.DelayPushMessageListener
@Slf4j
@Component
@RocketMQMessageListener(
        topic = "",
        consumerGroup = "${rocketmq.consumer.group.delay}",
        consumeMode = ConsumeMode.CONCURRENTLY,
        messageModel = MessageModel.CLUSTERING)
public class DelayPushMessageListener implements RocketMQListener<Message>, RocketMQPushConsumerLifecycleListener {
    @Autowired
    private RocketMQConfigProperties prop;
    @Override
    public void prepareStart(DefaultMQPushConsumer defaultMQPushConsumer) {
        try {
            // 订阅主题
            defaultMQPushConsumer.subscribe(prop.getTopic().get("delay"), "*");
            // 设置开始消费位置
            defaultMQPushConsumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
        }catch (Exception e) {
            log.error(e.getMessage(), e);
        }
    }
    @Override
    public void onMessage(Message message) {
        log.info("现在时间是:"+ TmpTools.timeToString(new Date().getTime())+ ",收到延迟消息: " + new String(message.getBody()));
    }
}

8. 批量消息

生产者

// org.example.controller.BatchSendController
@RequestMapping("/send")
public String send() {
    List<Message<String>> messages = new ArrayList<>();
    // 构建消息1
    Message<String> message1 = MessageBuilder.withPayload("Message 1").build();
    messages.add(message1);
    // 构建消息2
    Message<String> message2 = MessageBuilder.withPayload("Message 2").build();
    messages.add(message2);
    // 构建消息3
    Message<String> message3 = MessageBuilder.withPayload("Message 3").build();
    messages.add(message3);
    // 批量发送消息
    rocketMQTemplate.syncSend(prop.getTopic().get("batch"), messages);
    return "ok";
}

消费者

// org.example.listener.BatchPushMessageListener
@Slf4j
@Component
@RocketMQMessageListener(
        topic = "",
        consumerGroup = "${rocketmq.consumer.group.batch}",
        consumeMode = ConsumeMode.CONCURRENTLY,
        messageModel = MessageModel.CLUSTERING)
public class BatchPushMessageListener implements RocketMQListener<Message>, RocketMQPushConsumerLifecycleListener {
    @Autowired
    private RocketMQConfigProperties prop;
    @Override
    public void prepareStart(DefaultMQPushConsumer defaultMQPushConsumer) {
        try {
            // 订阅主题
            defaultMQPushConsumer.subscribe(prop.getTopic().get("batch"), "*");
            // 设置开始消费位置
            defaultMQPushConsumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
        }catch (Exception e) {
            log.error(e.getMessage(), e);
        }
    }
    @Override
    public void onMessage(Message message) {
        log.info("收到消息: " + new String(message.getBody()));
    }
}

9. 特别注意

同一个消费者组,如果有多个消费者实例,则这些消费者实例需要订阅相同的主题,否则只有一个消费者实例能获取到数据

具体可见:blog.csdn.net/weixin_3597…