likes
comments
collection
share

消息队列Pulsar入门(三) 探究不同消费模式下的ACK失败重试机制,附源码演示

作者站长头像
站长
· 阅读数 16

前言

前面写了两篇Pulsar入门文章,讲了Pulsar生产者/消费者/topic 以及UI工具Pulsar Manage使用姿势,有兴趣的可以再回顾下

消息队列Pulsar入门(一) 生产者/消费者/Topic详解,附源码演示

消息队列Pulsar入门(二) UI工具Pulsar Manage 部署,Topic Namespace详解,附源码演示

此篇主要看下Pulsar消费者的ACK消息确认,失败重试机制,ACK失败重试概念这里不赘叙了
为什么要单独拿出来讲?? 因为Pulsar有四种消费模式,是不是四种模式下充实消费都是一样的表现??

比如说默认情况Exclusive下,A,B两台集群机器,例如只是A消费失败了,那么A,B都会收到重试的消息吗? 还是只有A会收到?

单机部署

先看一下最简单的单机情况下的例子,对ACK失败重试这一套回顾下
consumer.acknowledge(msg);  //消费成功确认
consumer.negativeAcknowledge(msg);  //消费失败确认
模拟单机出现消费失败的情况 int i=1/0,会看到后续消息会再次被投递消费
@Test
public void testProduce() throws Exception {
    Producer<String> producer = client.newProducer(Schema.STRING)
            .topic("my-topic")
            .enableBatching(false)
            .create();
    producer.send("my-topic发送消息" + System.currentTimeMillis());
    System.in.read();
}

/**
 * 单机模式下的失败重试
 */
@Test
public void testConsumerDanJI() throws IOException {
    MessageListener myMessageListener = (consumer, msg) -> {
        try {
            System.out.println("Message received my-subscription1 : " + new String(msg.getData()));
            //模拟消费失败的情况
            int i =1/0;
            consumer.acknowledge(msg);
        } catch (Exception e) {
            consumer.negativeAcknowledge(msg);
        }
    };
    Consumer consumer = client.newConsumer()
            .topic("my-topic")
            .subscriptionName("my-subscription1")
            .messageListener(myMessageListener)
            .subscribe();
    System.in.read();
}

消息队列Pulsar入门(三) 探究不同消费模式下的ACK失败重试机制,附源码演示

Exclusive

Exclusive模式例如有集群A,B两台机器,正常A,B都会收到消息,现在模拟A消费失败
看下后续是A,B两台机器都会重新消费 还是只有A会??
答案是只有A
/**
 * Exclusive下的失败重试
 */
@Test
public void testConsumerExclusive() throws IOException {
    MessageListener myMessageListener1 = (consumer, msg) -> {
        try {
            System.out.println("Message received my-subscription1 : " + new String(msg.getData()));
            //模拟消费失败的情况
            int i=1/0;
            consumer.acknowledge(msg);
        } catch (Exception e) {
            consumer.negativeAcknowledge(msg);
        }
    };
    Consumer consumer = client.newConsumer()
            .topic("my-topic")
            .subscriptionName("my-subscription1")
            .subscriptionType(SubscriptionType.Exclusive)
            .messageListener(myMessageListener1)
            .subscribe();

    MessageListener myMessageListener2 = (consumer2, msg) -> {
        try {
            System.out.println("Message received my-subscription2 : " + new String(msg.getData()));
            consumer2.acknowledge(msg);
        } catch (Exception e) {
            consumer2.negativeAcknowledge(msg);
        }
    };
    Consumer consumer2 = client.newConsumer()
            .topic("my-topic")
            .subscriptionName("my-subscription2")
            .subscriptionType(SubscriptionType.Exclusive)
            .messageListener(myMessageListener2)
            .subscribe();

    System.in.read();
}

消息队列Pulsar入门(三) 探究不同消费模式下的ACK失败重试机制,附源码演示

Failvor

Faivor模式下,如A,B两台集群机器,正常消费是只有A消费,或者B,其中一台,直到A那边宕机什么的,才会转移到B消费,也就是先固定某台机器,然后失败转移
那么在这种模式下,假设一开始是A消费,然后A消费失败,
那么重试的话,还会一直是A吗?还是会转移到B?
答案是会一直是A
/**
 * Failover 下的失败重试
 */
@Test
public void testConsumerFailover() throws IOException {
    MessageListener myMessageListener1 = (consumer, msg) -> {
        try {
            System.out.println("Message received my-subscription1 : " + new String(msg.getData()));
            //模拟消费失败的情况
            int i=1/0;
            consumer.acknowledge(msg);
        } catch (Exception e) {
            consumer.negativeAcknowledge(msg);
        }
    };
    Consumer consumer = client.newConsumer()
            .topic("my-topic")
            .subscriptionName("my-subscription1")
            .subscriptionType(SubscriptionType.Failover)
            .messageListener(myMessageListener1)
            .subscribe();

    MessageListener myMessageListener2 = (consumer2, msg) -> {
        try {
            System.out.println("Message received my-subscription2 : " + new String(msg.getData()));
            consumer2.acknowledge(msg);
        } catch (Exception e) {
            consumer2.negativeAcknowledge(msg);
        }
    };
    Consumer consumer2 = client.newConsumer()
            .topic("my-topic")
            .subscriptionName("my-subscription1")
            .subscriptionType(SubscriptionType.Failover)
            .messageListener(myMessageListener2)
            .subscribe();

    System.in.read();
}

消息队列Pulsar入门(三) 探究不同消费模式下的ACK失败重试机制,附源码演示

Shard

Shard模式下,如A,B两台集群机器,正常情况下是轮询消费,也就是A,B轮着来,那么现在假设A消费失败了
那么重试消费的是 下一次的B?如果B消费成功,还会继续重试?
还是继续A?
答案是 A消费失败的话,重试消费就是另一台B了,如果B成功消费,就停止重试
/**
 * Shard 下的失败重试
 */
@Test
public void testConsumerShard() throws IOException {
    MessageListener myMessageListener1 = (consumer, msg) -> {
        try {
            System.out.println("Message received my-subscription1 : " + new String(msg.getData()));
            //模拟消费失败的情况
            int i=1/0;
            consumer.acknowledge(msg);
        } catch (Exception e) {
            consumer.negativeAcknowledge(msg);
        }
    };
    Consumer consumer = client.newConsumer()
            .topic("my-topic")
            .subscriptionName("my-subscription1")
            .subscriptionType(SubscriptionType.Shared)
            .messageListener(myMessageListener1)
            .subscribe();

    MessageListener myMessageListener2 = (consumer2, msg) -> {
        try {
            System.out.println("Message received my-subscription2 : " + new String(msg.getData()));
            consumer2.acknowledge(msg);
        } catch (Exception e) {
            consumer2.negativeAcknowledge(msg);
        }
    };
    Consumer consumer2 = client.newConsumer()
            .topic("my-topic")
            .subscriptionName("my-subscription1")
            .subscriptionType(SubscriptionType.Shared)
            .messageListener(myMessageListener2)
            .subscribe();

    System.in.read();
}

消息队列Pulsar入门(三) 探究不同消费模式下的ACK失败重试机制,附源码演示

Key_Shard

Key_Shard 我们先回顾下功能,会看到同key的消息会落到同一台机器
发送消息的时候,给这批消息指定一个key,那么消息被消费的时候,相同key的这批消息,只能被同一个节点消费 如下示例我发送消息时,指定下key,然后写消费者看下消费情况,会看到key相同的消息被同一节点消费
@Test
public void testProduce2() throws PulsarClientException {
    Producer<String> producer = client.newProducer(Schema.STRING)
            .topic("my-topic")
            .enableBatching(false)
            .create();
    producer.newMessage().key("key-1").value("message-1-1").send();
    producer.newMessage().key("key-1").value("message-1-2").send();
    producer.newMessage().key("key-1").value("message-1-3").send();
    producer.newMessage().key("key-2").value("message-2-1").send();
    producer.newMessage().key("key-2").value("message-2-2").send();
    producer.newMessage().key("key-2").value("message-2-3").send();
    producer.newMessage().key("key-3").value("message-3-1").send();
    producer.newMessage().key("key-3").value("message-3-2").send();
    producer.newMessage().key("key-4").value("message-4-1").send();
    producer.newMessage().key("key-4").value("message-4-2").send();
}

/**
 * Key_Shard 下的失败重试
 */
@Test
public void testConsumerKeyShard() throws IOException {
    MessageListener myMessageListener1 = (consumer, msg) -> {
        try {
            System.out.println("Message received my-subscription1 : " + new String(msg.getData()));
            //模拟消费失败的情况
            // int i=1/0;
            consumer.acknowledge(msg);
        } catch (Exception e) {
            consumer.negativeAcknowledge(msg);
        }
    };
    Consumer consumer = client.newConsumer()
            .topic("my-topic")
            .subscriptionName("my-subscription1")
            .subscriptionType(SubscriptionType.Key_Shared)
            .messageListener(myMessageListener1)
            .subscribe();

    MessageListener myMessageListener2 = (consumer2, msg) -> {
        try {
            System.out.println("Message received my-subscription2 : " + new String(msg.getData()));
            consumer2.acknowledge(msg);
        } catch (Exception e) {
            consumer2.negativeAcknowledge(msg);
        }
    };
    Consumer consumer2 = client.newConsumer()
            .topic("my-topic")
            .subscriptionName("my-subscription1")
            .subscriptionType(SubscriptionType.Key_Shared)
            .messageListener(myMessageListener2)
            .subscribe();

    System.in.read();
}

消息队列Pulsar入门(三) 探究不同消费模式下的ACK失败重试机制,附源码演示

也就是A,B两台机器,相同key的消息,只会跑到A,B其中一台机器,那么现在模拟,如果A消费失败了,
那么重试的话,重试还是在A吗? 还是在B?
答案是依旧在A节点,原节点
/**
 * Key_Shard 下的失败重试
 */
@Test
public void testConsumerKeyShard() throws IOException {
    MessageListener myMessageListener1 = (consumer, msg) -> {
        try {
            System.out.println("Message received my-subscription1 : " + new String(msg.getData()));
            //模拟消费失败的情况
             int i=1/0;
            consumer.acknowledge(msg);
        } catch (Exception e) {
            consumer.negativeAcknowledge(msg);
        }
    };
    Consumer consumer = client.newConsumer()
            .topic("my-topic")
            .subscriptionName("my-subscription1")
            .subscriptionType(SubscriptionType.Key_Shared)
            .messageListener(myMessageListener1)
            .subscribe();

    MessageListener myMessageListener2 = (consumer2, msg) -> {
        try {
            System.out.println("Message received my-subscription2 : " + new String(msg.getData()));
            consumer2.acknowledge(msg);
        } catch (Exception e) {
            consumer2.negativeAcknowledge(msg);
        }
    };
    Consumer consumer2 = client.newConsumer()
            .topic("my-topic")
            .subscriptionName("my-subscription1")
            .subscriptionType(SubscriptionType.Key_Shared)
            .messageListener(myMessageListener2)
            .subscribe();

    System.in.read();
}

消息队列Pulsar入门(三) 探究不同消费模式下的ACK失败重试机制,附源码演示

总结

  • Exclusive

    适合的场景如前端webSocket连接后端集群,后端每个节点都需要消费消息的场景

    A,B集群中,A消费失败,重试继续在A节点

  • Failover

    适用于一些幂等性的业务,业务发送一次,只能被消费一次

    A,B集群中,A消费失败,依旧继续在A节点重试, 除非A宕机,失败重试转移到B

    未能做到集群节点分摊压力,拉胯

  • Shard

    适用于一些幂等性的业务,业务发送一次,只能被消费一次

    A,B集群中,A消费失败,会因为轮询的设计,重试在B节点,消费成功,对业务健壮性很友好,正常情况也能集群轮询,分摊压力,👍,推荐

  • Key_Shard

    A,B集群中,A消费失败,重试依旧继续在A节点

    我再想想......