🐳 一文学会 消息队列 《RabbitMQ》 的消息可靠性
一、Springboot集成RabbitMQ
① 导入pom文件
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>3.1.5</version>
</parent>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</dependency>
</dependencies>
编写application.yml
server:
port: 8080
spring:
rabbitmq:
host: 192.168.118.130
port: 5672
username: guest
password: 123456
virtual-host: /
logging:
level:
com.bottom.mq.listener.ConsumerListener: info
②新建一个ConsumerListener(消费者)的监听器
@Component
@Slf4j
public class ConsumerListener {
public static final String EXCHANGE_Name = "bottom.direct";
public static final String ROUTING_KEY = "hello";
public static final String QUEUE_NAME = "hello.word";
@RabbitListener(bindings = @QueueBinding(
value = @Queue(value = QUEUE_NAME, durable = "true"),
exchange = @Exchange(value = EXCHANGE_Name),
key = {ROUTING_KEY}))
public void processMessage(String dataString, Message message, Channel channel) {
log.info("消费端接收到了消息:" + dataString);
}
}
③新建ProduceController
@RestController
@RequestMapping(value="/produce")
public class ProduceController {
public static final String EXCHANGE_DIRECT = "bottom.direct";
public static final String ROUTING_KEY = "hello";
@Resource
private RabbitTemplate rabbitTemplate;
@GetMapping("/send")
public String send(){
rabbitTemplate.convertAndSend(EXCHANGE_DIRECT, ROUTING_KEY, "生产者发送消息");
return "";
}
}
④ 用IDEA自带的API工具发送这个请求
二、可靠性传递
2.1 什么是可靠性传递
RabbitMQ的可靠性传递就是确保消息能够从生产者成功发送到消息队列,并被消费者正确接收和处理,而不丢失。它通过确认机制来验证消息传递的各个环节,从而保障消息传递的可靠性。
2.2 为什么需要消息的可靠性
消息传递过程中可能会丢失,有以下三种情况
-
消息发出后,中途网络故障,服务器未收到:
- 这种情况发生在消息从生产者发出后,由于网络问题导致消息未能成功到达RabbitMQ服务器。
-
消息发出后,服务器收到了,但在持久化之前服务器宕机:
- 即使消息成功到达RabbitMQ服务器,如果在服务器将消息持久化到磁盘之前发生宕机,那么消息可能会丢失。
-
消息发出后,服务器收到了,消费方未处理业务逻辑而服务挂掉,且消息已自动签收:
- 当消息被成功消费但消费方的业务逻辑尚未处理完成时,如果服务挂掉并且消息已经被自动签收,那么该消息实际上并未被正确处理,相当于丢失。
2.3 解决方案
-
对于网络故障导致消息未到达服务器的问题:
- 生产者在发送消息时应该实施重试机制,遇到网络故障时能够重新发送消息。
- 可以利用RabbitMQ的事务功能或confirm机制来确保消息被成功发送到服务器。
-
对于服务器收到消息但在持久化前宕机的问题:
- 确保RabbitMQ服务器开启消息持久化功能,以便即使服务器重启,消息也能从磁盘恢复。
- 通过RabbitMQ的镜像队列功能,将队列内容同步到其它节点,提高系统的可用性。
-
对于消费方未处理完业务逻辑而服务挂掉,且消息已自动签收的问题:
- 消费方应该设置手动消息确认模式,确保在业务逻辑处理完成后再确认消息,避免消息在未处理完成时就被认为已经处理。
- 如果服务在处理过程中挂掉,可以利用RabbitMQ的消息幂等性或者通过记录消费进度,在服务恢复后继续处理未完成的消息。
2.4 代码实现
①针对对于网络故障导致消息未到达服务器的问题
1) 修改yaml配置
spring:
rabbitmq:
host: 192.168.47.100
port: 5672
username: guest
password: 123456
virtual-host: /
publisher-confirm-type: CORRELATED
publisher-returns: true
logging:
level:
com.com.bottom.mq.config.MQProducerAckConfig: info
2)新建RabbitConfig(生产者ACK配置类)
@Configuration
@Slf4j
public class RabbitConfig implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnsCallback {
@Autowired
private RabbitTemplate rabbitTemplate;
@PostConstruct
public void initRabbitTemplate() {
rabbitTemplate.setConfirmCallback(this);
rabbitTemplate.setReturnsCallback(this);
}
/**
* 消息发送到交换机成功或失败时调用这个方法
* @param correlationData correlation data for the callback.
* @param ack true for ack, false for nack
* @param cause An optional cause, for nack, when available, otherwise null.
*/
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
log.info("confirm() 回调函数打印 CorrelationData:" + correlationData);
log.info("confirm() 回调函数打印 ack:" + ack);
log.info("confirm() 回调函数打印 cause:" + cause);
}
/**
* 消费发送到队列失败调用的方法
* @param returned the returned message and metadata.
*/
@Override
public void returnedMessage(ReturnedMessage returned) {
log.info("returnedMessage() 回调函数 消息主体: " + new String(returned.getMessage().getBody()));
log.info("returnedMessage() 回调函数 应答码: " + returned.getReplyCode());
log.info("returnedMessage() 回调函数 描述:" + returned.getReplyText());
log.info("returnedMessage() 回调函数 消息使用的交换器 exchange : " + returned.getExchange());
log.info("returnedMessage() 回调函数 消息使用的路由键 routing : " + returned.getRoutingKey());
}
}
3)新建ConfirmController
@RestController
@RequestMapping(value="/confirm")
public class ConfirmController {
public static final String EXCHANGE_DIRECT = "bottom.direct";
public static final String ROUTING_KEY = "helloA";
@Resource
private RabbitTemplate rabbitTemplate;
@GetMapping("/produceAck")
public String send(){
rabbitTemplate.convertAndSend(EXCHANGE_DIRECT, ROUTING_KEY , "Message Test Confirm~~~ ~~~");
return "";
}
}
②针对于服务器收到消息但在持久化前宕机的问题
我们重启rabbitmq看看队列交换机会不会消失
说明交换机和队列等都自动设置了持久化,我们查看源码
③针对消费方未处理完业务逻辑而服务挂掉,且消息已自动签收的问题
1) 修改yaml配置
spring:
rabbitmq:
host: 192.168.47.100
port: 5672
username: guest
password: 123456
virtual-host: /
publisher-confirm-type: CORRELATED
publisher-returns: true
listener:
simple:
acknowledge-mode: manual
2)修改消费者监听器ConsumerListener
@Component
@Slf4j
public class ConsumerListener {
public static final String EXCHANGE_Name = "bottom.direct";
public static final String ROUTING_KEY = "hello";
public static final String QUEUE_NAME = "hello.word";
@RabbitListener(bindings = @QueueBinding(
value = @Queue(value = QUEUE_NAME, durable = "true"),
exchange = @Exchange(value = EXCHANGE_Name),
key = {ROUTING_KEY}))
public void processMessage(String dataString, Message message, Channel channel) throws IOException {
long deliveryTag = message.getMessageProperties().getDeliveryTag();
try {
channel.basicAck(deliveryTag, false);
} catch (Exception e) {
Boolean redelivered = message.getMessageProperties().getRedelivered();
if (redelivered) {
channel.basicNack(deliveryTag, false, false);
} else {
channel.basicNack(deliveryTag, false, true);
}
}
}
}
三、总结
本文主要介绍了Springboot集成RabbitMQ的过程,包括导入必要的pom依赖、编写配置文件、创建消费者监听器和生产者控制器等步骤。同时,文章还深入探讨了RabbitMQ消息的可靠性传递问题,针对网络故障、服务器持久化前宕机以及消费方未处理完业务逻辑而服务挂掉等三种情况,提出了具体的解决方案,并通过代码实现加以说明。这些措施共同确保了RabbitMQ消息传递的可靠性,为实际生产环境中的消息通信提供了有力保障。
转载自:https://juejin.cn/post/7373937820179120166