【业务实战2】spring boot整合rocketMQ超详细教学
一、引言
实际工作中我们还是得用spring来进行开发,所以这篇文章旨在告诉大家spring boot和RocketMQ的整合
1)RocketMQ版本
RocketMQ的版本是4.9.4 注:学习的时候一定要用同一个版本,不然会出现各种奇怪的问题影响学习进度
2)maven依赖配置
先配置好maven依赖
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.7.18</version>
</parent>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>2.2.2</version>
</dependency>
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
<version>4.9.4</version>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.16.18</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-configuration-processor</artifactId>
<optional>true</optional>
<scope>runtime</scope>
</dependency>
</dependencies>
3)代码地址如下(记得结合代码看,好理解)
二、 最简实现
不推荐使用在生产环境,只是进行演示下主流程
1)生产者配置
生产者比较简单,直接从容器里面取RocketMQTemplate,然后调用convertAndSend即可
convertAndSend具体的投递函数是一个抽象方法,如果用rocketMQTemplate进行调用,实际的执行和syncSend无异。
convertAndSend提供了一个Header参数的传入,可以给消息指定一些Message
// 包名:org.example.controller.SimpleController
@Slf4j
@RestController
@RequestMapping("/simple")
public class SimpleController {
@Autowired
private RocketMQTemplate rocketMQTemplate;
@RequestMapping("/send")
public String simpleSend() {
rocketMQTemplate.convertAndSend("simple-send-topic", "simple-send-topic simple hello");
return "ok";
}
}
2)消费者配置
消费者也很好实现,新建一个实现了RocketMQListener的类,然后用@RocketMQMessageListener修饰即可
// org.example.listener.SimpleMessageListener
@Slf4j
@Component
@RocketMQMessageListener(
topic = "simple-send-topic",
consumerGroup = "simple-send-group"
)
public class SimpleMessageListener implements RocketMQListener<String> {
@Override
public void onMessage(String message) {
log.info("Received message: " + message);
}
}
3)结果演示
在浏览器中访问localhost:8080/simple/simplePush,可在idea控制台中看到打印
2024-01-30 20:57:58.823 ...: Received message: simple-send-topic simple hello
三、推荐实现
1. 先来个配置类,方面获取配置
// org.example.config.RocketMQConfigProperties
@Data
@Component
@ConfigurationProperties(prefix = "rocketmq")
public class RocketMQConfigProperties {
private String nameServer;
private HashMap<String, String> topic;
private Consumer consumer;
@Data
public static class Consumer {
private HashMap<String, String> group;
}
}
// application.properties
server.port = 8080
# NameServer地址
rocketmq.name-server=127.0.0.1:9876
# topic
rocketmq.topic.common=common-topic
rocketmq.topic.pull=pull-topic
rocketmq.topic.rpc=rpc-topic
rocketmq.topic.order=order-topic
rocketmq.topic.transaction=transaction-topic
rocketmq.topic.delay=delay-topic
rocketmq.topic.batch=batch-topic
# Producer配置
rocketmq.producer.group=group
# Consumer配置
rocketmq.consumer.group.common=common-group
rocketmq.consumer.group.pull=pull-group
rocketmq.consumer.group.rpc=rpc-group
rocketmq.consumer.group.order=order-group
rocketmq.consumer.group.transaction=transaction-group
rocketmq.consumer.group.delay=delay-group
rocketmq.consumer.group.batch=batch-group
2. 生产者(同步/异步/单程)
同步、异步和单程都可以通过rocketMQTemplate搞定
// org.example.controller.SendController
@RequestMapping("/common")
public String common() {
// 同步发送
rocketMQTemplate.syncSend(prop.getTopic().get("common"), "syncSend");
// 异步发送
rocketMQTemplate.asyncSend(prop.getTopic().get("common"), "asyncSend", new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
log.info("Message sent successfully! MessageId: " + sendResult.getMsgId());
}
@Override
public void onException(Throwable e) {
log.error("Failed to send message: " + e.getMessage(), e);
}
});
// 单程发送
rocketMQTemplate.sendOneWay(prop.getTopic().get("common"), "sendOneWay");
return "ok";
}
2. 推模式消费者
与最简实现不同的地方是通过实现RocketMQPushConsumerLifecycleListener生命周期方法,实现了注解所不能实现的配置,可以更细粒度的控制参数
// org.example.listener.PushMessageListener
@Slf4j
@Component
@RocketMQMessageListener(
topic = "",
consumerGroup = "${rocketmq.consumer.group.common}",
consumeMode = ConsumeMode.CONCURRENTLY,
messageModel = MessageModel.CLUSTERING)
public class PushMessageListener implements RocketMQListener<String>, RocketMQPushConsumerLifecycleListener {
@Autowired
private RocketMQConfigProperties prop;
@Override
public void prepareStart(DefaultMQPushConsumer defaultMQPushConsumer) {
try {
// 订阅主题
defaultMQPushConsumer.subscribe(prop.getTopic().get("common"), "*");
// 设置开始消费位置
defaultMQPushConsumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
}catch (Exception e) {
log.error(e.getMessage(), e);
}
}
@Override
public void onMessage(String message) {
log.info("Received message: " + message);
}
}
3. 拉模式消费者
拉模式的偏移量完全用自己控制,0就表示从目前队列的第一个数据开始拉取
当偏移量为1表示目前准备取第二条数据
consumer.pull(queue, "*", offset, 1);的最后一个参数表示每次拉取的数据量
// org.example.controller.PullController
@RequestMapping("/pull")
public String pull() throws Exception {
DefaultMQPullConsumer consumer = new DefaultMQPullConsumer(prop.getConsumer().getGroup().get("pull"));
consumer.setNamesrvAddr(prop.getNameServer());
consumer.start();
Set<MessageQueue> messageQueues = consumer.fetchSubscribeMessageQueues(prop.getTopic().get("pull"));
for(MessageQueue queue : messageQueues){
// 从偏移量0开始拉取所有的数据
long offset = 0;
while(true){
PullResult pullResult = consumer.pull(queue, "*", offset, 1);
offset = pullResult.getNextBeginOffset();
// 在队列中拉取不到消息就结束
if(PullStatus.FOUND != pullResult.getPullStatus()) {
break;
}
List<MessageExt> messageExtList = pullResult.getMsgFoundList();
for (MessageExt m : messageExtList) {
log.info("当前队列为:"+queue.getQueueId()+",偏移量为:"+offset+
",拉取到数据:"+ new String(m.getBody())+",投递的时间为:" + timeToString(m.getBornTimestamp()));
}
}
}
consumer.shutdown();
return "ok";
}
private String timeToString(long timestamp) {
Instant instant = Instant.ofEpochMilli(timestamp);
LocalDateTime dateTime = LocalDateTime.ofInstant(instant, ZoneId.systemDefault());
DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
return dateTime.format(formatter);
}
4. 远程调用
可以很方便的实现远程调用
生产者:
// org.example.controller.RpcController
// rpc发送
@RequestMapping("/send")
public String send() {
// rpc发送
String responseMessage = rocketMQTemplate.sendAndReceive(prop.getTopic().get("rpc"), "我是rpc message", String.class);
log.info("rpc Listener发送结果:"+responseMessage);
return "ok";
}
消费者:
// org.example.listener.RpcMessageListener
@Slf4j
@Component
@RocketMQMessageListener(
topic = "${rocketmq.consumer.rpc-topic}",
consumerGroup = "${rocketmq.consumer.rpc-group}")
public class RpcMessageListener implements RocketMQReplyListener<String, String> {
@Override
public String onMessage(String message) {
log.info("rpc listener receive" + message);
return "reply string";
}
}
5. 顺序消息
想要发送顺序消息需要注意两点:
1)分区有序和全局有序(当有多个MessageQueue时,是分区有序,通过HashKey将消息分发到指定的MessageQueue中;当只有一个MessageQueue时,因为分区即全局所以是全局有序)
2)当有多个分区时,消费者会在多个MessageQueue中来回切换,要结合需求看看是否可行
3)发送必须单线程,否则无法保证生产的消息是按预想顺序推送的
生产者用syncSendOrderly就行
// org.example.controller.OrderController
// 发送顺序消息 hashKey用来让顺序消息被放到同一个队列上
@RequestMapping("/send")
public String send() {
for(int i = 0; i < 100; i++) {
rocketMQTemplate.syncSendOrderly(prop.getTopic().get("order"), "我是order-"+TmpTools.timeToString(new Date().getTime())+"-"+i, "商品ID");
rocketMQTemplate.syncSendOrderly(prop.getTopic().get("order"), "我是order-"+TmpTools.timeToString(new Date().getTime())+"-"+i, "商品ID2");
}
return "ok";
}
消费者将consumeMode设置为有序的就行
// org.example.listener.OrderMessageListener
@Slf4j
@Component
@RocketMQMessageListener(
topic = "${rocketmq.topic.order}",
consumerGroup = "${rocketmq.consumer.group.order}",
consumeMode = ConsumeMode.ORDERLY)
public class OrderMessageListener implements RocketMQListener<String> {
@Override
public void onMessage(String message) {
log.info("Received message: " + message);
}
}
6. 事务消息
事务发送比较简单
// org.example.controller.TransactionController
@RequestMapping("/send")
public String send() {
// 事务发送
Message<String> message = MessageBuilder.withPayload("我是事务消息")
.setHeader(RocketMQHeaders.KEYS, 1)
.setHeader("orderID", 10)
.setHeader(RocketMQHeaders.TRANSACTION_ID, 100).build();
rocketMQTemplate.sendMessageInTransaction(prop.getTopic().get("transaction"), message, null);
return "ok";
}
业务代码需要写在Listener中,可以通过rocketMQTemplateBeanName指定Template的bean名称
如果想写多个,则需要创建多个Template
// org.example.listener.TransactionSendListener
@Slf4j
@Component
@RocketMQTransactionListener(rocketMQTemplateBeanName = "rocketMQTemplate")
public class TransactionSendListener implements RocketMQLocalTransactionListener {
@Override
public RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg) {
log.info("本地事务成功");
return RocketMQLocalTransactionState.COMMIT;
}
@Override
public RocketMQLocalTransactionState checkLocalTransaction(Message msg) {
log.info("事务检查成功");
return RocketMQLocalTransactionState.COMMIT;
}
}
消费者正常消费即可,上游事务妥妥的已完成
// org.example.listener.TransactionPushListener
@Slf4j
@Component
@RocketMQMessageListener(
topic = "",
consumerGroup = "${rocketmq.consumer.group.transaction}",
consumeMode = ConsumeMode.CONCURRENTLY,
messageModel = MessageModel.CLUSTERING)
public class TransactionPushListener implements RocketMQListener<String>, RocketMQPushConsumerLifecycleListener {
@Autowired
private RocketMQConfigProperties prop;
@Override
public void prepareStart(DefaultMQPushConsumer defaultMQPushConsumer) {
try {
// 订阅主题
defaultMQPushConsumer.subscribe(prop.getTopic().get("transaction"), "*");
// 设置开始消费位置
defaultMQPushConsumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
}catch (Exception e) {
log.error(e.getMessage(), e);
}
}
@Override
public void onMessage(String message) {
log.info("Received message: " + message);
}
}
7. 延迟消息
生产者如下
最后一个参数是延迟级别,这些级别分别是:
1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h
3表示第三个级别,即10秒
// org.example.controller.DelayController
@RequestMapping("/send")
public String send() {
rocketMQTemplate.syncSend(prop.getTopic().get("delay"),
new GenericMessage<>("我是延迟消息" + TmpTools.timeToString(new Date().getTime())),
3000,
3);
return "ok";
}
消费者
// org.example.listener.DelayPushMessageListener
@Slf4j
@Component
@RocketMQMessageListener(
topic = "",
consumerGroup = "${rocketmq.consumer.group.delay}",
consumeMode = ConsumeMode.CONCURRENTLY,
messageModel = MessageModel.CLUSTERING)
public class DelayPushMessageListener implements RocketMQListener<Message>, RocketMQPushConsumerLifecycleListener {
@Autowired
private RocketMQConfigProperties prop;
@Override
public void prepareStart(DefaultMQPushConsumer defaultMQPushConsumer) {
try {
// 订阅主题
defaultMQPushConsumer.subscribe(prop.getTopic().get("delay"), "*");
// 设置开始消费位置
defaultMQPushConsumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
}catch (Exception e) {
log.error(e.getMessage(), e);
}
}
@Override
public void onMessage(Message message) {
log.info("现在时间是:"+ TmpTools.timeToString(new Date().getTime())+ ",收到延迟消息: " + new String(message.getBody()));
}
}
8. 批量消息
生产者
// org.example.controller.BatchSendController
@RequestMapping("/send")
public String send() {
List<Message<String>> messages = new ArrayList<>();
// 构建消息1
Message<String> message1 = MessageBuilder.withPayload("Message 1").build();
messages.add(message1);
// 构建消息2
Message<String> message2 = MessageBuilder.withPayload("Message 2").build();
messages.add(message2);
// 构建消息3
Message<String> message3 = MessageBuilder.withPayload("Message 3").build();
messages.add(message3);
// 批量发送消息
rocketMQTemplate.syncSend(prop.getTopic().get("batch"), messages);
return "ok";
}
消费者
// org.example.listener.BatchPushMessageListener
@Slf4j
@Component
@RocketMQMessageListener(
topic = "",
consumerGroup = "${rocketmq.consumer.group.batch}",
consumeMode = ConsumeMode.CONCURRENTLY,
messageModel = MessageModel.CLUSTERING)
public class BatchPushMessageListener implements RocketMQListener<Message>, RocketMQPushConsumerLifecycleListener {
@Autowired
private RocketMQConfigProperties prop;
@Override
public void prepareStart(DefaultMQPushConsumer defaultMQPushConsumer) {
try {
// 订阅主题
defaultMQPushConsumer.subscribe(prop.getTopic().get("batch"), "*");
// 设置开始消费位置
defaultMQPushConsumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
}catch (Exception e) {
log.error(e.getMessage(), e);
}
}
@Override
public void onMessage(Message message) {
log.info("收到消息: " + new String(message.getBody()));
}
}
9. 特别注意
同一个消费者组,如果有多个消费者实例,则这些消费者实例需要订阅相同的主题,否则只有一个消费者实例能获取到数据
转载自:https://juejin.cn/post/7330515218605375523