🐳 一文学会 《RabbitMQ》 的死信、延时队列
一、什么是死信
RabbitMQ中的死信(Dead Letter)主要指的是被消费者接收但未被成功处理,或者因其他原因无法被正常消费的消息。这些消息在特定条件下会被RabbitMQ自动转发到所谓的“死信队列”(Dead Letter Queue)中。
二、产生死信的情况
-
消费者拒绝消费:当队列中的消息被消费者接收,但消费者通过执行reject或nack操作(并将requeue参数设置为false)拒绝消费该消息时,该消息会变成死信。
-
消息过期:每条消息都可以设置一个过期时间(TTL),如果在这个时间内消息没有被消费,那么该消息就会变成死信。此外,也可以为队列中的所有消息统一设置一个过期时间。
-
队列达到最大长度:当队列设置了最大长度限制,并且队列已满时,从交换机路由到该队列的新消息会自动变成死信。
三、解决死信的方法
-
丢弃(Discard) :
- 对于那些被判定为不重要或低优先级的消息,一旦它们变成死信,系统可以直接将它们丢弃,不进行任何进一步的处理。这种做法节省了系统资源,但可能丢失某些信息。
-
入库(Store in Database) :
- 将死信消息写入数据库进行持久化存储,以便日后进行分析、处理或审计。这种方法确保了数据的完整性,但可能增加数据库的负担。
-
监听(Listen on Dead-Letter Queue) :
- 当消息变成死信并进入死信队列时,配置专门的消费者来监听这个队列,并对这些消息执行后续的处理逻辑。这通常涉及采用不同的处理方式或策略来尝试再次处理这些消息。
四、代码实现
① 先在rabbitmq的图形化界面创建正常队列以及死信队列
② 导入pom文件
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>3.1.5</version>
</parent>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</dependency>
</dependencies>
② 新建一个RabbitMQConsumer(springboot启动类)
@SpringBootApplication
public class RabbitMQConsumer {
public static void main(String[] args) {
SpringApplication.run(RabbitMQConsumer.class, args);
}
}
③ 编写yaml配置类
server:
port: 8080
spring:
rabbitmq:
host: 192.168.118.130
port: 5672
username: guest
password: 123456
virtual-host: /
logging:
level:
com.bottom.mq.listener.ConsumerListener: info
4.1 代码演示消费者拒绝消费
①新建ConsumerListener(消费者监听器)
@Component
@Slf4j
public class ConsumerListener {
public static final String EXCHANGE_Name = "exchange.normal";
public static final String ROUTING_KEY = "normal";
public static final String QUEUE_NAME = "queue.normal";
@RabbitListener(bindings = @QueueBinding(
value = @Queue(value = QUEUE_NAME, durable = "true"),
exchange = @Exchange(value = EXCHANGE_Name),
key = {ROUTING_KEY}))
public void processMessage(String dataString, Message message, Channel channel) throws IOException, InterruptedException {
log.info("消息接拒收");
channel.basicReject(message.getMessageProperties().getDeliveryTag(), false);
}
}
② 创建 DeadController
@RestController
@RequestMapping(value="/dead")
public class DeadController {
public static final String EXCHANGE_DIRECT = "exchange.normal";
public static final String ROUTING_KEY = "dead";
@Resource
private RabbitTemplate rabbitTemplate;
@GetMapping( "/reject")
public String send(){
rabbitTemplate.convertAndSend(EXCHANGE_DIRECT, ROUTING_KEY , "测试死信消息是否生效");
return "";
}
}
③ 启动springboot项目
③ 用IDEA自带的API工具发送这个请求
此外我们也可以在监听器设置多一个绑定,绑定的队列为死信队列,也可以成功看到死信接收到此消息,代码如下
@RabbitListener(queues = {QUEUE_DEAD_LETTER})
public void processMessageDead(String dataString, Message message, Channel channel) throws IOException {
// 监听死信队列
log.info("死信监听方法");
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
}
4.2 代码演示队列达到最大长度、消息过期
新建另一个springboot启动类,端口为8081 -> 为了模拟发送20条消息
将8080的消费者监听器里面的方法设置为接收
public void processMessage(String dataString, Message message, Channel channel) throws IOException, InterruptedException {
// 监听正常队列
log.info("消息接收到");
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
}
同时启动8081、8081、并发送这20条消息
五、总结
本文介绍了RabbitMQ中的死信概念,包括其产生原因及解决方法。死信指的是被消费者接收但未成功处理,或因其他原因无法被正常消费的消息。产生死信的情况包括消费者拒绝消费、消息过期、队列达到最大长度等。解决死信的方法有丢弃、入库、以及监听死信队列等。文章还通过代码演示了消费者拒绝消费导致死信产生的场景,并展示了如何通过监听死信队列来处理这些消息。总之,了解并妥善处理RabbitMQ中的死信,对于确保消息的可靠传输和系统的稳定运行至关重要。
转载自:https://juejin.cn/post/7374299311934652442