likes
comments
collection
share

RabbitMQ 消息可靠性:别让消息在“高速公路”上翻车!消息队列(MQ)如同信息的高速公路,而 RabbitMQ 就

作者站长头像
站长
· 阅读数 61

消息队列(MQ)如同信息的高速公路,而 RabbitMQ 就是其中一款性能优秀的“汽车”。在这条高速公路上,如何保证消息在传递过程中不丢失、不出错、不迟到?这是我们今天要解决的问题。看完这篇文章,你会掌握 RabbitMQ 的各种可靠性保障机制,轻松应对工作中的复杂场景!

RabbitMQ 消息可靠性:别让消息在“高速公路”上翻车!消息队列(MQ)如同信息的高速公路,而 RabbitMQ 就

1. 如何保证消息的可靠性?

消息可靠性涉及到消息从发送方到接收方全过程中的任何一个环节。RabbitMQ 提供了多个机制来确保消息在传输和处理过程中的可靠性。

1.1 发送方的保障措施

发送方可以通过以下几种方式来确保消息成功发送并被 RabbitMQ 处理。

  1. **发送端确认机制:**确保消息成功发送到 RabbitMQ。
  2. **消息返回机制:**若消息找不到目标队列,RabbitMQ 会通知发送方。

让我们来详细看看这些机制的实现。

1.1.1 发送端确认机制(Publisher Confirms)

RabbitMQ 提供了三种发送端确认机制:单条同步确认、多条同步确认和异步确认。

单条同步确认机制

单条同步确认机制可以简单地理解为“一问一答”模式。每发送一条消息,客户端等待 RabbitMQ 返回一个确认(Ack)或否定确认(Nack)来判断消息是否被成功处理。

实现方法:

try (Connection connection = connectionFactory.newConnection();
    Channel channel = connection.createChannel()) {
    channel.confirmSelect();//将通道设置发布为确认模式,以便接收消息确认,此模式下,所有发布的消息都会有一个唯一的标识符(delivery tag)

    String messageToSend = objectMapper.writeValueAsString(orderMessageDTO);  // 序列化消息

    channel.basicPublish("exchange.order.restaurant", "key.restaurant", null, messageToSend.getBytes());
    log.info("message send");

    if (channel.waitForConfirms()) {
        log.info("RabbitMQ confirm success");
    } else {
        log.info("RabbitMQ confirm failed");
    }
}

在这个例子中:

  • channel.confirmSelect():开启通道的发布确认模式。
  • channel.waitForConfirms():阻塞等待 RabbitMQ 确认消息的状态。
多条同步确认机制

多条同步确认机制就像一个班车,攒一批乘客(消息)再一起出发(等待确认),从而减少频繁的确认请求,提升性能。

实现方法:

try (Connection connection = connectionFactory.newConnection();
     Channel channel = connection.createChannel()) {
     channel.confirmSelect();//将通道设置发布为确认模式,以便接收消息确认,此模式下,所有发布的消息都会有一个唯一的标识符(delivery tag)

    String messageToSend = objectMapper.writeValueAsString(orderMessageDTO);

    for (int i = 0; i < 10; i++) {
        channel.basicPublish("exchange.order.restaurant", "key.restaurant", null, messageToSend.getBytes());
        log.info("message send");
    }

    if (channel.waitForConfirms()) {
        log.info("RabbitMQ confirm success");
    } else {
        log.info("RabbitMQ confirm failed");
    }
}

在这个实现中:

  • 一次性发送多条消息,然后统一等待 RabbitMQ 的确认。
异步确认机制

异步确认机制通过添加一个监听器,可以在不阻塞主线程的情况下接收 RabbitMQ 的确认回调,是性能最佳的选择。

实现方法:

try (Connection connection = connectionFactory.newConnection();
     Channel channel = connection.createChannel()) {

    String messageToSend = objectMapper.writeValueAsString(orderMessageDTO);
    channel.confirmSelect();

    channel.addConfirmListener(new ConfirmListener() {
        @Override
        public void handleAck(long deliveryTag, boolean multiple) {
            // 当消息被成功投递到所有队列后调用
            log.info("Ack, deliveryTag: {}, multiple: {}", deliveryTag, multiple);	
        }

        @Override
        public void handleNack(long deliveryTag, boolean multiple) {
            // 当消息未能成功投递时调用
            log.info("Nack, deliveryTag: {}, multiple: {}", deliveryTag, multiple);
        }
    });

    for (int i = 0; i < 10; i++) {
        channel.basicPublish("exchange.order.restaurant", "key.restaurant", null, messageToSend.getBytes());
        log.info("message sent");
    }

    Thread.sleep(100000); // 保持连接,等待异步确认的回调
}

在这个实现中:

  • 使用 addConfirmListener 方法添加了确认监听器,可以在消息被确认或否定确认时进行回调处理。

1.2 消息返回机制

当消息发送到 RabbitMQ 时,如果消息无法找到匹配的队列,它会被退回给发送方。RabbitMQ 提供了两种返回机制:ReturnListenerReturnCallback

实现方法:

try (Connection connection = connectionFactory.newConnection();
     Channel channel = connection.createChannel()) {
    channel.addReturnListener(new ReturnListener() {
        @Override
        public void handleReturn(int replyCode, String replyText, String exchange, String routingKey, AMQP.BasicProperties properties, byte[] body) {
            // 当消息无法被正确路由时,记录相关信息
            // replyCode:返回码,表示消息返回的原因。
            // replyText:返回文本,提供更详细的返回原因信息。
            // exchange:目标交换机名称。
            // routingKey:发布消息时使用的路由键。
            // properties:消息的元数据。
            // body:消息的内容,以字节数组形式存在。
            log.info("Message Return: replyCode:{}, replyText:{}, exchange:{}, routingKey:{}, properties:{}, body:{}",
                    replyCode, replyText, exchange, routingKey, properties, body);
        }
    });

    String messageToSend = objectMapper.writeValueAsString(orderMessageDTO);
    channel.basicPublish("exchange.order.restaurant", "key.order", true, null, messageToSend.getBytes());
}

或者使用 ReturnCallback:

channel.addReturnListener(new ReturnCallback() {
    @Override
    public void handle(Return returnMessage) {
        log.info("Message Return: returnMessage:{}", returnMessage);
    }
});

区别:

  • ReturnListener 是早期版本的接口,接收多个参数。
  • ReturnCallback 是简化版的接口,接收一个包含所有信息的 Return 对象。

1.3 消费端的保障措施

在消息到达 RabbitMQ 队列之后,下一步是由消费者来消费这些消息。为了确保消息被成功处理,RabbitMQ 提供了消费端确认机制(Consumer Acknowledgements)。

1.3.1 消费端确认机制

消费端确认机制主要通过两种方式实现:自动确认手动确认

自动确认(Auto Acknowledge)

在自动确认模式下,RabbitMQ 在消息被消费者接收后立即认为消息已被成功处理。虽然这种模式实现简单,但缺乏可靠性,因为如果消费者在处理消息时意外失败(如程序崩溃),该消息将会丢失。

实现方法:

channel.basicConsume("queue_name", true, new DefaultConsumer(channel) {
    @Override
    public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
        String message = new String(body, "UTF-8");
        log.info("Received message: {}", message);
    }
});

在这个实现中,basicConsume 方法的第二个参数设为 true,表示开启自动确认模式。

手动确认(Manual Acknowledge)

相比之下,手动确认模式更具灵活性和可靠性。消费者需要在成功处理消息后,显式地向 RabbitMQ 发送一个确认(Ack)信号。

实现方法:

channel.basicConsume("queue_name", false, new DefaultConsumer(channel) {
    @Override
    public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
        String message = new String(body, "UTF-8");
        log.info("Received message: {}", message);
        
        try {
            // 处理消息
            log.info("Processing message...");
            channel.basicAck(envelope.getDeliveryTag(), false); // 消息确认
        } catch (Exception e) {
            log.error("Error processing message", e);
            channel.basicNack(envelope.getDeliveryTag(), false, true); // 消息否定确认,并重新入队
        }
    }
});

在这个实现中:

  • basicConsume 方法的第二个参数设为 false,表示关闭自动确认模式。
  • basicAck 方法用于确认消息已被成功处理。
  • basicNack 方法用于否定确认消息,并选择是否将消息重新入队。

以下是一个基于Java的RabbitMQ消费者,使用手动确认机制的示例代码:

import com.rabbitmq.client.*;

public class ManualAckConsumer {
    private final static String QUEUE_NAME = "test_queue";

    public static void main(String[] argv) throws Exception {
        // 创建连接和频道
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();

        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        System.out.println(" [*] Waiting for messages. To exit press CTRL+C");

        // 创建消费者,并使用手动确认机制
        DeliverCallback deliverCallback = (consumerTag, delivery) -> {
            String message = new String(delivery.getBody(), "UTF-8");
            System.out.println(" [x] Received '" + message + "'");
            
            try {
                // 模拟处理消息
                doWork(message);
                // 处理成功,发送确认
                // delivery:这是 DeliverCallback 回调函数中的一个参数,它代表从 RabbitMQ 收到的消息对象,类型是 Delivery。Delivery 包含了消息的元数据信息和消息体。

				// getEnvelope():delivery.getEnvelope() 方法返回一个 Envelope 对象,这个对象包含了消息传递过程中的一些元数据信息,比如交换机名称(exchange)、路由键(routingKey)、以及传递标记(deliveryTag)。

				// getDeliveryTag():delivery.getEnvelope().getDeliveryTag() 返回一个长整型值(long),表示该消息的传递标记。deliveryTag 是 RabbitMQ 用来标识每条消息的唯一序列号,在当前通道(Channel)中每个消息都有一个唯一的 deliveryTag。deliveryTag 的主要作用是帮助 RabbitMQ 和消费者跟踪和确认消息的状态。当消费者处理完一条消息后,会使用这个 deliveryTag 来告诉 RabbitMQ,该消息已成功处理或被拒绝(使用 basicAck、basicNack 或 basicReject 方法)。
                channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
                System.out.println(" [x] Done");
            } catch (Exception e) {
                // 处理失败,拒绝消息并重新放回队列
                channel.basicNack(delivery.getEnvelope().getDeliveryTag(), false, true);
                System.out.println(" [x] Failed, message requeued");
            }
        };

        // 设置消费者,手动确认模式
        boolean autoAck = false; // 关闭自动确认
        channel.basicConsume(QUEUE_NAME, autoAck, deliverCallback, consumerTag -> { });
    }

    private static void doWork(String task) throws InterruptedException {
        // 模拟处理时间
        for (char ch : task.toCharArray()) {
            if (ch == '.') Thread.sleep(1000);
        }
    }
}
1.3.2 消息重回队列机制(Dead-Letter Exchanges and Queues)

如果消息处理失败,我们可以使用“死信队列(Dead Letter Queue, DLQ)”来捕获无法被正常处理的消息。死信队列可以作为消息的“安全网”,确保消息不被丢失。

死信交换机(Dead-Letter Exchange, DLX)和队列

当消息因以下原因之一被拒绝时,可以被转发到一个死信交换机:

  • 消费者显式拒绝消息,且 requeue 参数为 false
  • 消息在队列中达到最大长度或存活时间(TTL)。
  • 队列被删除。

实现方法:

  1. 配置一个死信交换机和死信队列:
Map<String, Object> arguments = new HashMap<>();
arguments.put("x-dead-letter-exchange", "dlx.exchange");
arguments.put("x-dead-letter-routing-key", "dlx.routingKey");

channel.queueDeclare("order.queue", true, false, false, arguments);
channel.exchangeDeclare("dlx.exchange", "direct", true);
channel.queueDeclare("dlx.queue", true, false, false, null);
channel.queueBind("dlx.queue", "dlx.exchange", "dlx.routingKey");
  1. 将无法处理的消息发送到死信交换机:
channel.basicNack(deliveryTag, false, false);  // 第三个参数设为 false 表示不重新入队

在这个实现中:

  • 配置消息队列时使用 x-dead-letter-exchange 参数指定死信交换机。
  • 使用 basicNack 方法拒绝消息,并将其发送到死信交换机。

1.4 API说明

basicAckbasicNackbasicReject 是 RabbitMQ 客户端提供的三个 API,用于消费者向 RabbitMQ 确认或拒绝消息的处理结果。这些 API 是 RabbitMQ 消费端手动确认机制的核心部分,允许消费者在消息处理完成后显式地通知 RabbitMQ消息的状态。

  1. basicAck(基本确认)

basicAck 方法用于确认消费者已经成功处理了一条或多条消息。当 RabbitMQ 收到来自消费者的确认(ack)后,便会从队列中移除这条消息,从而确保消息不会再次被消费。

void basicAck(long deliveryTag, boolean multiple) throws IOException;
  • deliveryTag 表示当前通道中唯一的消息标识符,用于标识被确认的消息。每条消息都有一个 deliveryTag,它在当前通道中是唯一的。

  • multiple 表示是否进行批量确认。

    • true:确认该 deliveryTag 之前的所有未确认的消息。
    • false:仅确认当前的这条消息。

使用场景

  • 消息成功处理后确认: 当消费者成功处理了消息,并希望从队列中移除时,调用 basicAck 方法。 使用 multiple = true 可以减少网络通信次数,提升性能。
  1. basicNack(基本否定确认)

basicNack 方法用于消费者通知 RabbitMQ,消息处理失败。可以选择将消息重新放回队列等待重新消费,或者将其丢弃。

方法签名

void basicNack(long deliveryTag, boolean multiple, boolean requeue) throws IOException;
  • deliveryTag 当前通道中的消息唯一标识符。

  • multiple

    • true:拒绝该 deliveryTag 之前的所有未确认消息。
    • false:仅拒绝当前的这条消息。
  • requeue 表示是否将消息重新放回队列。

    • true:将消息重新放回队列。
    • false:消息被丢弃或转发到死信队列(如果配置了死信队列)。

使用场景

  • 临时性错误: 当消费者处理消息时遇到临时性错误(例如外部服务不可用)时,可以使用 basicNack 并设置 requeue = true,将消息重新放回队列以便稍后重试。

  • 不可恢复的错误: 当消费者处理消息时遇到不可恢复的错误(例如消息格式错误),可以使用 basicNack 并设置 requeue = false,选择丢弃该消息或将其转发到死信队列。

  1. basicReject(基本拒绝)

basicReject 方法与 basicNack 类似,也用于拒绝消息,但只能拒绝单条消息,无法进行批量操作。它的功能相对简单,仅用于拒绝当前消息,并决定是否重新放回队列。

void basicReject(long deliveryTag, boolean requeue) throws IOException;
  • deliveryTag 当前通道中的消息唯一标识符。

  • requeue 表示是否将消息重新放回队列。

    • true:将消息重新放回队列。
    • false:消息被丢弃或转发到死信队列(如果配置了死信队列)。

使用场景

  • 当只需要拒绝单条消息而不需要批量操作时,使用 basicReject
  • basicNack 相比,它更简单,但功能上有限制,只能拒绝一条消息。
方法作用批量操作支持重回队列支持使用场景
basicAck确认消息已成功处理不适用成功处理消息后,确认消息
basicNack否定确认,消息处理失败处理失败,选择丢弃或重新放回队列
basicReject拒绝消息,消息处理失败处理失败,拒绝单条消息

1.4 消息持久化(Message Durability)

消息持久化是确保消息在 RabbitMQ 服务宕机或重启时不丢失的关键措施。RabbitMQ 支持对交换机队列消息进行持久化设置。

1.4.1 队列和交换机持久化

  1. 在声明队列和交换机时,通过将 durable 参数设置为 true 来实现持久化。

实现方法:

channel.exchangeDeclare("exchange.order.restaurant", "direct", true);  // 持久化交换机
channel.queueDeclare("order.queue", true, false, false, null);  // 持久化队列

在这个实现中:

  • exchangeDeclare 的第三个参数为 true,表示交换机持久化。
  • queueDeclare 的第二个参数为 true,表示队列持久化。

1.4.2 消息持久化

在发布消息时,可以通过 BasicPropertiesdeliveryMode 参数设置消息的持久化。

实现方法:

AMQP.BasicProperties props = new AMQP.BasicProperties().builder()
        .deliveryMode(2)  // 1 为非持久化, 2 为持久化
        .build();
channel.basicPublish("exchange.order.restaurant", "key.restaurant", props, messageToSend.getBytes());

在这个实现中,deliveryMode 设置为 2,表示消息持久化。

转载自:https://juejin.cn/post/7408751118873788425
评论
请登录