RabbitMQ 消息可靠性:别让消息在“高速公路”上翻车!消息队列(MQ)如同信息的高速公路,而 RabbitMQ 就
消息队列(MQ)如同信息的高速公路,而 RabbitMQ 就是其中一款性能优秀的“汽车”。在这条高速公路上,如何保证消息在传递过程中不丢失、不出错、不迟到?这是我们今天要解决的问题。看完这篇文章,你会掌握 RabbitMQ 的各种可靠性保障机制,轻松应对工作中的复杂场景!
1. 如何保证消息的可靠性?
消息可靠性涉及到消息从发送方到接收方全过程中的任何一个环节。RabbitMQ 提供了多个机制来确保消息在传输和处理过程中的可靠性。
1.1 发送方的保障措施
发送方可以通过以下几种方式来确保消息成功发送并被 RabbitMQ 处理。
- **发送端确认机制:**确保消息成功发送到 RabbitMQ。
- **消息返回机制:**若消息找不到目标队列,RabbitMQ 会通知发送方。
让我们来详细看看这些机制的实现。
1.1.1 发送端确认机制(Publisher Confirms)
RabbitMQ 提供了三种发送端确认机制:单条同步确认、多条同步确认和异步确认。
单条同步确认机制
单条同步确认机制可以简单地理解为“一问一答”模式。每发送一条消息,客户端等待 RabbitMQ 返回一个确认(Ack)或否定确认(Nack)来判断消息是否被成功处理。
实现方法:
try (Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel()) {
channel.confirmSelect();//将通道设置发布为确认模式,以便接收消息确认,此模式下,所有发布的消息都会有一个唯一的标识符(delivery tag)
String messageToSend = objectMapper.writeValueAsString(orderMessageDTO); // 序列化消息
channel.basicPublish("exchange.order.restaurant", "key.restaurant", null, messageToSend.getBytes());
log.info("message send");
if (channel.waitForConfirms()) {
log.info("RabbitMQ confirm success");
} else {
log.info("RabbitMQ confirm failed");
}
}
在这个例子中:
channel.confirmSelect()
:开启通道的发布确认模式。channel.waitForConfirms()
:阻塞等待 RabbitMQ 确认消息的状态。
多条同步确认机制
多条同步确认机制就像一个班车,攒一批乘客(消息)再一起出发(等待确认),从而减少频繁的确认请求,提升性能。
实现方法:
try (Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel()) {
channel.confirmSelect();//将通道设置发布为确认模式,以便接收消息确认,此模式下,所有发布的消息都会有一个唯一的标识符(delivery tag)
String messageToSend = objectMapper.writeValueAsString(orderMessageDTO);
for (int i = 0; i < 10; i++) {
channel.basicPublish("exchange.order.restaurant", "key.restaurant", null, messageToSend.getBytes());
log.info("message send");
}
if (channel.waitForConfirms()) {
log.info("RabbitMQ confirm success");
} else {
log.info("RabbitMQ confirm failed");
}
}
在这个实现中:
- 一次性发送多条消息,然后统一等待 RabbitMQ 的确认。
异步确认机制
异步确认机制通过添加一个监听器,可以在不阻塞主线程的情况下接收 RabbitMQ 的确认回调,是性能最佳的选择。
实现方法:
try (Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel()) {
String messageToSend = objectMapper.writeValueAsString(orderMessageDTO);
channel.confirmSelect();
channel.addConfirmListener(new ConfirmListener() {
@Override
public void handleAck(long deliveryTag, boolean multiple) {
// 当消息被成功投递到所有队列后调用
log.info("Ack, deliveryTag: {}, multiple: {}", deliveryTag, multiple);
}
@Override
public void handleNack(long deliveryTag, boolean multiple) {
// 当消息未能成功投递时调用
log.info("Nack, deliveryTag: {}, multiple: {}", deliveryTag, multiple);
}
});
for (int i = 0; i < 10; i++) {
channel.basicPublish("exchange.order.restaurant", "key.restaurant", null, messageToSend.getBytes());
log.info("message sent");
}
Thread.sleep(100000); // 保持连接,等待异步确认的回调
}
在这个实现中:
- 使用
addConfirmListener
方法添加了确认监听器,可以在消息被确认或否定确认时进行回调处理。
1.2 消息返回机制
当消息发送到 RabbitMQ 时,如果消息无法找到匹配的队列,它会被退回给发送方。RabbitMQ 提供了两种返回机制:ReturnListener
和 ReturnCallback
。
实现方法:
try (Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel()) {
channel.addReturnListener(new ReturnListener() {
@Override
public void handleReturn(int replyCode, String replyText, String exchange, String routingKey, AMQP.BasicProperties properties, byte[] body) {
// 当消息无法被正确路由时,记录相关信息
// replyCode:返回码,表示消息返回的原因。
// replyText:返回文本,提供更详细的返回原因信息。
// exchange:目标交换机名称。
// routingKey:发布消息时使用的路由键。
// properties:消息的元数据。
// body:消息的内容,以字节数组形式存在。
log.info("Message Return: replyCode:{}, replyText:{}, exchange:{}, routingKey:{}, properties:{}, body:{}",
replyCode, replyText, exchange, routingKey, properties, body);
}
});
String messageToSend = objectMapper.writeValueAsString(orderMessageDTO);
channel.basicPublish("exchange.order.restaurant", "key.order", true, null, messageToSend.getBytes());
}
或者使用 ReturnCallback
:
channel.addReturnListener(new ReturnCallback() {
@Override
public void handle(Return returnMessage) {
log.info("Message Return: returnMessage:{}", returnMessage);
}
});
区别:
ReturnListener
是早期版本的接口,接收多个参数。ReturnCallback
是简化版的接口,接收一个包含所有信息的Return
对象。
1.3 消费端的保障措施
在消息到达 RabbitMQ 队列之后,下一步是由消费者来消费这些消息。为了确保消息被成功处理,RabbitMQ 提供了消费端确认机制(Consumer Acknowledgements)。
1.3.1 消费端确认机制
消费端确认机制主要通过两种方式实现:自动确认和手动确认。
自动确认(Auto Acknowledge)
在自动确认模式下,RabbitMQ 在消息被消费者接收后立即认为消息已被成功处理。虽然这种模式实现简单,但缺乏可靠性,因为如果消费者在处理消息时意外失败(如程序崩溃),该消息将会丢失。
实现方法:
channel.basicConsume("queue_name", true, new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String message = new String(body, "UTF-8");
log.info("Received message: {}", message);
}
});
在这个实现中,basicConsume
方法的第二个参数设为 true
,表示开启自动确认模式。
手动确认(Manual Acknowledge)
相比之下,手动确认模式更具灵活性和可靠性。消费者需要在成功处理消息后,显式地向 RabbitMQ 发送一个确认(Ack)信号。
实现方法:
channel.basicConsume("queue_name", false, new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String message = new String(body, "UTF-8");
log.info("Received message: {}", message);
try {
// 处理消息
log.info("Processing message...");
channel.basicAck(envelope.getDeliveryTag(), false); // 消息确认
} catch (Exception e) {
log.error("Error processing message", e);
channel.basicNack(envelope.getDeliveryTag(), false, true); // 消息否定确认,并重新入队
}
}
});
在这个实现中:
basicConsume
方法的第二个参数设为false
,表示关闭自动确认模式。basicAck
方法用于确认消息已被成功处理。basicNack
方法用于否定确认消息,并选择是否将消息重新入队。
以下是一个基于Java的RabbitMQ消费者,使用手动确认机制的示例代码:
import com.rabbitmq.client.*;
public class ManualAckConsumer {
private final static String QUEUE_NAME = "test_queue";
public static void main(String[] argv) throws Exception {
// 创建连接和频道
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
// 创建消费者,并使用手动确认机制
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
System.out.println(" [x] Received '" + message + "'");
try {
// 模拟处理消息
doWork(message);
// 处理成功,发送确认
// delivery:这是 DeliverCallback 回调函数中的一个参数,它代表从 RabbitMQ 收到的消息对象,类型是 Delivery。Delivery 包含了消息的元数据信息和消息体。
// getEnvelope():delivery.getEnvelope() 方法返回一个 Envelope 对象,这个对象包含了消息传递过程中的一些元数据信息,比如交换机名称(exchange)、路由键(routingKey)、以及传递标记(deliveryTag)。
// getDeliveryTag():delivery.getEnvelope().getDeliveryTag() 返回一个长整型值(long),表示该消息的传递标记。deliveryTag 是 RabbitMQ 用来标识每条消息的唯一序列号,在当前通道(Channel)中每个消息都有一个唯一的 deliveryTag。deliveryTag 的主要作用是帮助 RabbitMQ 和消费者跟踪和确认消息的状态。当消费者处理完一条消息后,会使用这个 deliveryTag 来告诉 RabbitMQ,该消息已成功处理或被拒绝(使用 basicAck、basicNack 或 basicReject 方法)。
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
System.out.println(" [x] Done");
} catch (Exception e) {
// 处理失败,拒绝消息并重新放回队列
channel.basicNack(delivery.getEnvelope().getDeliveryTag(), false, true);
System.out.println(" [x] Failed, message requeued");
}
};
// 设置消费者,手动确认模式
boolean autoAck = false; // 关闭自动确认
channel.basicConsume(QUEUE_NAME, autoAck, deliverCallback, consumerTag -> { });
}
private static void doWork(String task) throws InterruptedException {
// 模拟处理时间
for (char ch : task.toCharArray()) {
if (ch == '.') Thread.sleep(1000);
}
}
}
1.3.2 消息重回队列机制(Dead-Letter Exchanges and Queues)
如果消息处理失败,我们可以使用“死信队列(Dead Letter Queue, DLQ)”来捕获无法被正常处理的消息。死信队列可以作为消息的“安全网”,确保消息不被丢失。
死信交换机(Dead-Letter Exchange, DLX)和队列
当消息因以下原因之一被拒绝时,可以被转发到一个死信交换机:
- 消费者显式拒绝消息,且
requeue
参数为false
。 - 消息在队列中达到最大长度或存活时间(TTL)。
- 队列被删除。
实现方法:
- 配置一个死信交换机和死信队列:
Map<String, Object> arguments = new HashMap<>();
arguments.put("x-dead-letter-exchange", "dlx.exchange");
arguments.put("x-dead-letter-routing-key", "dlx.routingKey");
channel.queueDeclare("order.queue", true, false, false, arguments);
channel.exchangeDeclare("dlx.exchange", "direct", true);
channel.queueDeclare("dlx.queue", true, false, false, null);
channel.queueBind("dlx.queue", "dlx.exchange", "dlx.routingKey");
- 将无法处理的消息发送到死信交换机:
channel.basicNack(deliveryTag, false, false); // 第三个参数设为 false 表示不重新入队
在这个实现中:
- 配置消息队列时使用
x-dead-letter-exchange
参数指定死信交换机。 - 使用
basicNack
方法拒绝消息,并将其发送到死信交换机。
1.4 API说明
basicAck
、basicNack
和 basicReject
是 RabbitMQ 客户端提供的三个 API,用于消费者向 RabbitMQ 确认或拒绝消息的处理结果。这些 API 是 RabbitMQ 消费端手动确认机制的核心部分,允许消费者在消息处理完成后显式地通知 RabbitMQ消息的状态。
basicAck
(基本确认)
basicAck
方法用于确认消费者已经成功处理了一条或多条消息。当 RabbitMQ 收到来自消费者的确认(ack
)后,便会从队列中移除这条消息,从而确保消息不会再次被消费。
void basicAck(long deliveryTag, boolean multiple) throws IOException;
-
deliveryTag
: 表示当前通道中唯一的消息标识符,用于标识被确认的消息。每条消息都有一个deliveryTag
,它在当前通道中是唯一的。 -
multiple
: 表示是否进行批量确认。true
:确认该deliveryTag
之前的所有未确认的消息。false
:仅确认当前的这条消息。
使用场景
- 消息成功处理后确认:
当消费者成功处理了消息,并希望从队列中移除时,调用
basicAck
方法。 使用multiple = true
可以减少网络通信次数,提升性能。
basicNack
(基本否定确认)
basicNack
方法用于消费者通知 RabbitMQ,消息处理失败。可以选择将消息重新放回队列等待重新消费,或者将其丢弃。
方法签名
void basicNack(long deliveryTag, boolean multiple, boolean requeue) throws IOException;
-
deliveryTag
: 当前通道中的消息唯一标识符。 -
multiple
:true
:拒绝该deliveryTag
之前的所有未确认消息。false
:仅拒绝当前的这条消息。
-
requeue
: 表示是否将消息重新放回队列。true
:将消息重新放回队列。false
:消息被丢弃或转发到死信队列(如果配置了死信队列)。
使用场景
-
临时性错误: 当消费者处理消息时遇到临时性错误(例如外部服务不可用)时,可以使用
basicNack
并设置requeue = true
,将消息重新放回队列以便稍后重试。 -
不可恢复的错误: 当消费者处理消息时遇到不可恢复的错误(例如消息格式错误),可以使用
basicNack
并设置requeue = false
,选择丢弃该消息或将其转发到死信队列。
basicReject
(基本拒绝)
basicReject
方法与 basicNack
类似,也用于拒绝消息,但只能拒绝单条消息,无法进行批量操作。它的功能相对简单,仅用于拒绝当前消息,并决定是否重新放回队列。
void basicReject(long deliveryTag, boolean requeue) throws IOException;
-
deliveryTag
: 当前通道中的消息唯一标识符。 -
requeue
: 表示是否将消息重新放回队列。true
:将消息重新放回队列。false
:消息被丢弃或转发到死信队列(如果配置了死信队列)。
使用场景
- 当只需要拒绝单条消息而不需要批量操作时,使用
basicReject
。 - 与
basicNack
相比,它更简单,但功能上有限制,只能拒绝一条消息。
方法 | 作用 | 批量操作支持 | 重回队列支持 | 使用场景 |
---|---|---|---|---|
basicAck | 确认消息已成功处理 | 是 | 不适用 | 成功处理消息后,确认消息 |
basicNack | 否定确认,消息处理失败 | 是 | 是 | 处理失败,选择丢弃或重新放回队列 |
basicReject | 拒绝消息,消息处理失败 | 否 | 是 | 处理失败,拒绝单条消息 |
1.4 消息持久化(Message Durability)
消息持久化是确保消息在 RabbitMQ 服务宕机或重启时不丢失的关键措施。RabbitMQ 支持对交换机、队列和消息进行持久化设置。
1.4.1 队列和交换机持久化
- 在声明队列和交换机时,通过将
durable
参数设置为true
来实现持久化。
实现方法:
channel.exchangeDeclare("exchange.order.restaurant", "direct", true); // 持久化交换机
channel.queueDeclare("order.queue", true, false, false, null); // 持久化队列
在这个实现中:
exchangeDeclare
的第三个参数为true
,表示交换机持久化。queueDeclare
的第二个参数为true
,表示队列持久化。
1.4.2 消息持久化
在发布消息时,可以通过 BasicProperties
的 deliveryMode
参数设置消息的持久化。
实现方法:
AMQP.BasicProperties props = new AMQP.BasicProperties().builder()
.deliveryMode(2) // 1 为非持久化, 2 为持久化
.build();
channel.basicPublish("exchange.order.restaurant", "key.restaurant", props, messageToSend.getBytes());
在这个实现中,deliveryMode
设置为 2
,表示消息持久化。
转载自:https://juejin.cn/post/7408751118873788425