likes
comments
collection
share

消息队列Pulsar入门(一) 生产者/消费者/Topic详解,附源码演示

作者站长头像
站长
· 阅读数 24

前言

对于pulsar的特性以及优异,这里不多讲解,直接上干货,主要讲一下Pulsar的docker部署,生产者/消费者几种 
不同模式,以及Topic的使用规则

Docker部署pulsar

docker run -it -p 80:80 -p 8080:8080 -p 6650:6650 -d apachepulsar/pulsar-standalone

部署问题

因为我用的是腾讯云最基础的服务器,在执行docker命令后,发现Pulsar会启动失败或启动不久便停止,查看日志发现是内存顶不住

消息队列Pulsar入门(一) 生产者/消费者/Topic详解,附源码演示

查看官网Pulsar默认启动是2g,因此把启动配置修改成机器支持的即可;
docker exec -it pulsar-test sh
cd /pulsar/conf/
vim conf/pulsar_env.sh;       之后重启pulsar即可

消息队列Pulsar入门(一) 生产者/消费者/Topic详解,附源码演示

消息队列Pulsar入门(一) 生产者/消费者/Topic详解,附源码演示

连接Pulsar

/**
 * pulsar 连接bean
 */
@Bean
public PulsarClient getPulsarClient() throws PulsarClientException {
    return PulsarClient.builder()
            .serviceUrl("pulsar://Ip地址:6650")
            .build();
}

基础概念了解

Produce 消息的源头,也是消息的发布者,负责将消息发送到 topic。

Consumer 消息的消费者,负责从 topic 订阅并消费消息。

Topic 消息数据的载体,在 Pulsar 中 Topic 可以指定分为多个 partition,如果不设置默认只有一个 partition
(这个指定多个partition,我会在文中后面示例演示,可以留意下)

Brkber 一个无状态组件,主要负责接收 Producer 发送过来的消息,并交付给 Consumer,可以理解成送快递的小哥

消息队列Pulsar入门(一) 生产者/消费者/Topic详解,附源码演示

Produce详解

创建方式

简单方法创建
    Producer<String> stringProducer = client.newProducer(Schema.STRING)
        .topic("my-topic")
        .create();
    stringProducer.send("My message")
loadConf自定义配置创建
config里面可以填一些自定义配置,如sendTimeoutMs 消息发送超时(毫秒)。如果在sendTimeout过期之前服务器未确认消息,则会发生错误,其他有趣的可以看下官网

Pulsar官网

/**
* 使用loadConf创建Produce
*/
@Test
public void testProducer() throws Exception {
   Map<String, Object> config1 = new HashMap<>();
   config1.put("producerName", "produce-demo1");
   config1.put("topicName", "topic1");
   Producer producer1 = client
           .newProducer()
           .loadConf(config1)
           .create();
   producer1.send(("test1 --- " + new Date()).getBytes());
}

发送模式

同步发送
 同步发送消息是Producer发送消息以后要等到broker的确认以后才会认为消息发送成功,如果没有收到确认就认为消息发送失败 
/**
 * 测试同步发送
 */
@Test
public void testProducer22() throws Exception {
    Producer<String> stringProducer = client
            .newProducer(Schema.STRING)
            .topic("my-topic")
            .producerName("produce-demo1")
            .create();
    MessageId messageId = stringProducer.send("My message" + "发送消息时间" + new Date());
    System.out.println("消息同步发送---");
    System.in.read();
}
异步发送
 异步发送消息是 Producer 发送消息,将消息放到阻塞队列中并立即返回。不需要等待 broker 的确认
/**
 * 测试异步发送
 */
@Test
public void testProducer222() throws Exception {
    Producer<String> stringProducer = client
            .newProducer(Schema.STRING)
            .topic("my-topic")
            .producerName("produce-demo1")
            .create();
    CompletableFuture<MessageId> messageIdCompletableFuture = stringProducer.sendAsync(
            "异步发送的消息");
    System.in.read();
}

访问方式/发送方式

Share模式(默认情况)
 默认情况下多个生产者可以发布消息到同一个Topic,指定发送模式.accessMode(ProducerAccessMode.Shared)方法
/**
 * shard模式 默认情况下多个生产者可以发布消息到同一个 Topic
 */
@Test
public void testProducer222() throws IOException {
    Producer<String> stringProducer = client
            .newProducer(Schema.STRING)
            .accessMode(ProducerAccessMode.Shared)
            .topic("访问模式-shared")
            .producerName("produce-demo1")
            .create();
    stringProducer.send("My message 1 " + "发送消息时间" + new Date());

    Producer<String> stringProducer2 = client
            .newProducer(Schema.STRING)
            .accessMode(ProducerAccessMode.Shared)
            .topic("访问模式-shared")
            // Producer with name 'produce-demo1' is already connected to topic
            //注意生产者名称不能重复
            .producerName("produce-demo2")
            .create();
    stringProducer2.send("My message 2 " + "发送消息时间" + new Date());

    System.in.read();
}
请注意:
  这里我特意标注了生产者名称不能重复,否则对于Pulsar来说,发送消息会报错,如下图,已经有一个produce- 
  demo1的生产者了,再来一个就会报错Producer with name 'produce-demo1' is already connected to topic
  因此如果我们是集群部署的话,尤其注意每一个节点生产者的命名
  当然对于消费者也是同样的规则,不允许名称重复(在下文我也会演示到)
/**
 * 演示生产者名称重复,发送报错
 */
@Test
public void testProducer1() throws IOException {
    Producer<String> stringProducer = client
            .newProducer(Schema.STRING)
            .topic("访问模式-shared")
            .producerName("produce-demo1")
            .create();
    stringProducer.send("My message 1 " + "发送消息时间" + new Date());
    System.in.read();
}

/**
 * 演示生产者名称重复,发送报错
 */
@Test
public void testProducer11() throws IOException {
    Producer<String> stringProducer = client
            .newProducer(Schema.STRING)
            .topic("访问模式-shared")
            .producerName("produce-demo1")
            .create();
    stringProducer.send("My message 1 " + "发送消息时间" + new Date());
    System.in.read();
}

消息队列Pulsar入门(一) 生产者/消费者/Topic详解,附源码演示

Exclusive
要求生产者以独占模式访问 Topic,在此模式下如果 Topic已经有了生产者,那么其他生产者在连接就会失败报错。 
/**
 * Exclusive 要求生产者以独占模式访问 Topic,在此模式下 如果 Topic 已经有了生产者,那么其他生产者在连接就会失败报错。
 * <p>
 * "Topic has an existing exclusive producer: standalone-0-12
 */
@Test
public void testProducer6() throws IOException {
    Producer<String> stringProducer = client
            .newProducer(Schema.STRING)
            .topic("访问模式-Exclusive")
            //设置访问模式 默认shared
            .accessMode(ProducerAccessMode.Exclusive)
            .producerName("produce-demo1")
            .create();
    stringProducer.send("My message 1 " + "发送消息时间" + new Date());

    Producer<String> stringProducer2 = client
            .newProducer(Schema.STRING)
            .topic("访问模式-Exclusive")
            //设置访问模式 默认shared
            .accessMode(ProducerAccessMode.Exclusive)
            // Producer with name 'produce-demo1' is already connected to topic
            //注意生产者名称不能重复
            .producerName("produce-demo2")
            .create();
    stringProducer2.send("My message 2 " + "发送消息时间" + new Date());

    System.in.read();
}

消息队列Pulsar入门(一) 生产者/消费者/Topic详解,附源码演示

WaitForExclusive
如果主题已经连接了生产者,则将当前生产者挂起,直到生产者获得了Exclusive 访问权限。
该怎么来理解这句话,打个不恰当比喻,类似于Java中的独占锁Sycronized一样,你没有获取到锁,没有获取到权限,就不能发消息,
对比Exclusive报错来说,WaitForExclusive是不会报错的,只会是挂起,
来看下面的demo感受下

1 我们先开启一个线程A向 访问模式-WaitForExclusive topic发送一条消息,My message 1 ***

/**
 * WaitForExclusive
 * <p>
 * 如果主题已经连接了生产者,则将当前生产者挂起,直到生产者获得了 Exclusive 访问权限。
 * <p>
 * 也就是存在相同的生产者,不会报错,当然也不会发送消息,     获取到独占后,会将未获取到独占时的消息进行发送!!!
 */
@Test
public void testProducer2() throws Exception {
    Producer<String> stringProducer = client
            .newProducer(Schema.STRING)
            .topic("访问模式-WaitForExclusive")
            //设置访问模式 默认shared
            .accessMode(ProducerAccessMode.WaitForExclusive)
            .producerName("produce-demo1")
            .create();
    stringProducer.send("My message 1 " + "发送消息时间" + new Date());
    System.in.read();
}
2 然后再开启另一个线程B向 访问模式-WaitForExclusive topic发送10条消息,My message 2 ***
/**
 * WaitForExclusive
 */
@Test
public void testProducer22() throws Exception {
    Producer<String> stringProducer = client
            .newProducer(Schema.STRING)
            .topic("访问模式-WaitForExclusive")
            //设置访问模式 默认shared
            .accessMode(ProducerAccessMode.WaitForExclusive)
            .producerName("produce-demo1")
            .create();
    //假设有10条消息在未获取 独占前,均未被发送,模拟来看一下,获取独占后, 这10条消息会进行发送吗 ? 会
    for (int i = 0; i < 10; i++) {
        stringProducer.send("My message 2 " + "发送消息时间" + new Date());
    }
    System.in.read();
}
3 然后写个简单的消费者看一下消费情况
@Test
public void testConsumer2() throws IOException {
    MessageListener myMessageListener = (consumer, msg) -> {
        try {
            System.out.println("Message received: " + new String(msg.getData()));
            consumer.acknowledge(msg);
        } catch (Exception e) {
            consumer.negativeAcknowledge(msg);
        }
    };
    Consumer consumer = client.newConsumer()
            .topic("访问模式-WaitForExclusive")
            .subscriptionName("my-subscription")
            .messageListener(myMessageListener)
            .subscribe()
    System.in.read();
}
4 会看到消费者只消费到了 线程A发送的消息,线程B的消息未被消费,因为此时topic的独占权还在线程池A

消息队列Pulsar入门(一) 生产者/消费者/Topic详解,附源码演示

5 手动杀死线程A,然后看消费者情况,会看到开始消费出My message 2 *** 也就是线程B的消息,
因为此时线程A被杀死,线程B得到了独占权,线程B将消息发送出去 

消息队列Pulsar入门(一) 生产者/消费者/Topic详解,附源码演示

Consumer详解

创建方式

简单方法创建
可以看到写了一个while true去获取消息,对于线城是阻塞不友好的,因此我一般用第二种,监听器方法

/**
 * 创建消费者
 */
@Test
public void testConsumer22() throws Exception{
    Consumer consumer = client.newConsumer()
            .topic("my-topic")
            .subscriptionName("my-subscription")
            .subscribe();
    while (true) {
        // Wait for a message
        Message msg = consumer.receive();
        try {
            // Do something with the message
            System.out.println("Message received: " + new String(msg.getData()));
            // Acknowledge the message so that it can be deleted by the message broker
            consumer.acknowledge(msg);
        } catch (Exception e) {
            // Message failed to process, redeliver later
            consumer.negativeAcknowledge(msg);
        }
    }
}
监听器方法创建
/**
 * 接收消息:异步 不阻塞主线程
 */
@Test
public void testConsumer2() throws IOException {
    MessageListener myMessageListener = (consumer, msg) -> {
        try {
            System.out.println("Message received: " + new String(msg.getData()));
            consumer.acknowledge(msg);
        } catch (Exception e) {
            consumer.negativeAcknowledge(msg);
        }
    };
    Consumer consumer = client.newConsumer()
            .topic("my-topic")
            .subscriptionName("my-subscription")
            .messageListener(myMessageListener)
            .subscribe();
    System.out.println("监听器方式,不阻塞线程");
    System.in.read();
}

消息队列Pulsar入门(一) 生产者/消费者/Topic详解,附源码演示

loadConf自定义配置创建
更多自定义的配置可以看下官网文件
/**
 * loadConf创建消费者
 */
@Test
public void testConsumer222() throws IOException {
    MessageListener myMessageListener = (consumer, msg) -> {
        try {
            System.out.println("Message received: " + new String(msg.getData()));
            consumer.acknowledge(msg);
        } catch (Exception e) {
            consumer.negativeAcknowledge(msg);
        }
    };
    Map<String, Object> config1 = new HashMap<>();
    config1.put("subscriptionName", "consumer-demo1");
    config1.put("topicNames", Arrays.asList(new String[]{"my-topic"}));

    Consumer consumer = client
            .newConsumer()
            .loadConf(config1)
            .messageListener(myMessageListener)
            .subscribe();
    System.out.println("loadConf方式");
    System.in.read();
}

多主题订阅

 多主题订阅主要是指一个消费者,可以订阅多个topic,这里我只演示其中两个
传入List数组的多主题订阅
/**
 * Multi-topic subscriptions
 * 多主题订阅
 * 多topic 订阅list设置的topic1 topic2
 */
@Test
public void testConsumer3() throws IOException {
    MessageListener myMessageListener = (consumer, msg) -> {
        try {
            System.out.println("Message received: " + new String(msg.getData()));
            consumer.acknowledge(msg);
        } catch (Exception e) {
            consumer.negativeAcknowledge(msg);
        }
    };

    ConsumerBuilder consumerBuilder = client.newConsumer()
            .subscriptionName("consumer-3");
    List<String> topics = Arrays.asList(
            "topic1",
            "topic2"
    );
    Consumer multiTopicConsumer = consumerBuilder
            .topics(topics)
            .messageListener(myMessageListener)
            .subscribe();
    System.in.read();
}

消息队列Pulsar入门(一) 生产者/消费者/Topic详解,附源码演示

正则表达式多主题订阅
简单点就是正则表达式匹配,根据业务需要自行设置表达式,这里不多演示
/**
 * Multi-topic subscriptions
 * 多主题订阅
 * 正则表达式,订阅所有以1结束的topic
 *
 */
@Test
public void testConsumer222() throws IOException {
    MessageListener myMessageListener = (consumer, msg) -> {
        try {
            System.out.println("Message received: " + new String(msg.getData()));
            consumer.acknowledge(msg);
        } catch (Exception e) {
            consumer.negativeAcknowledge(msg);
        }
    };
    ConsumerBuilder consumerBuilder = client.newConsumer()
            .subscriptionName("consumer-1");
    Pattern allTopicsInNamespace = Pattern.compile("public/default/.*1");
    Consumer allTopicsConsumer = consumerBuilder
            .topicsPattern(allTopicsInNamespace)
            .messageListener(myMessageListener)
            .subscribe();
    System.in.read();
}

消费模式

Exclusive(默认)
 这里需要注意的是同一topic主题上只能有一个具有相同订阅名称的使用者 默认,也就是说 如果后端是集群部署的话,请注意默认情况下subscriptionName的命名情况,否则会报错

/**
 * Exclusive 模式  也是默认的
 * 同一主题上只能有一个具有相同订阅名称的使用者 默认
 * 否则会启动报错
 */
@Test
public void testConsumerExclusive() throws IOException {
    MessageListener myMessageListener = (consumer, msg) -> {
        try {
            System.out.println("Message received: " + new String(msg.getData()));
            consumer.acknowledge(msg);
        } catch (Exception e) {
            consumer.negativeAcknowledge(msg);
        }
    };
    Consumer consumer = client.newConsumer()
            .topic("my-topic")
            .subscriptionName("my-subscription2")
            .subscriptionType(SubscriptionType.Exclusive)
            .messageListener(myMessageListener)
            .subscribe();

    Consumer consumer2 = client.newConsumer()
            .topic("my-topic")
            .subscriptionName("my-subscription2")
            .subscriptionType(SubscriptionType.Exclusive)
            .messageListener(myMessageListener)
            .subscribe();

    System.in.read();
}

消息队列Pulsar入门(一) 生产者/消费者/Topic详解,附源码演示

Failover
 这个主要是失败转移,对比Exclusive模式,同一主题上可以有具有相同订阅名称的使用者,也就是subscriptionName可以重复,一个节点挂掉了 剩余消息转移到另一个节点继续消费;
 这块的业务场景挺不错的,假设我们后台有两台集群部署机器A,B,并且subscriptionName相同,
 正常情况下,其他模块往队列仍了一条消息,但是只希望被其中一台机器消费, 一条消息被消费一次,而不是A,B两机器都消费对吧,正常的幂等性操作
 现在开始模拟,假设其他模块发送了10条消息,然后只被其中一台消费
    @Test
    public void testProduce2() throws PulsarClientException {
        Producer<String> producer = client.newProducer(Schema.STRING)
                .topic("my-topic")
                .enableBatching(false)
                .create();
// 3 messages with "key-1", 3 messages with "key-2", 2 messages with "key-3" and 2 messages with "key-4"
        // 这里的key可以类似于 投递到 不同broker的一个标识
        producer.newMessage().key("key-1").value("message-1-1").send();
        producer.newMessage().key("key-1").value("message-1-2").send();
        producer.newMessage().key("key-1").value("message-1-3").send();
        producer.newMessage().key("key-2").value("message-2-1").send();
        producer.newMessage().key("key-2").value("message-2-2").send();
        producer.newMessage().key("key-2").value("message-2-3").send();
        producer.newMessage().key("key-3").value("message-3-1").send();
        producer.newMessage().key("key-3").value("message-3-2").send();
        producer.newMessage().key("key-4").value("message-4-1").send();
        producer.newMessage().key("key-4").value("message-4-2").send();
    }
    
@Test
public void testConsumerFailover() throws IOException {
    MessageListener myMessageListener1 = (consumer, msg) -> {
        try {
           // a++;
          //  if (a > 4) {
               // System.out.println("模拟节点1故障");
                //关闭节点1
              //  consumer.close();
              //  throw new RuntimeException("模拟某时刻节点1故障,转移至节点2消费");
           // }
            System.out.println("Message1 received: " + new String(msg.getData()));
            consumer.acknowledge(msg);
        } catch (Exception e) {
            consumer.negativeAcknowledge(msg);
        }
    };
    Consumer consumer = client.newConsumer()
            .topic("my-topic")
            .subscriptionName("my-subscription")
            .subscriptionType(SubscriptionType.Failover)
            .messageListener(myMessageListener1)
            .subscribe();

    MessageListener myMessageListener2 = (consumer2, msg) -> {
        try {
            System.out.println("Message2 received: " + new String(msg.getData()));
            consumer.acknowledge(msg);
        } catch (Exception e) {
            consumer.negativeAcknowledge(msg);
        }
    };
    Consumer consumer2 = client.newConsumer()
            .topic("my-topic")
            .subscriptionName("my-subscription")
            .subscriptionType(SubscriptionType.Failover)
            .messageListener(myMessageListener2)
            .subscribe();

    System.in.read();
}

消息队列Pulsar入门(一) 生产者/消费者/Topic详解,附源码演示

再来看看失败转移,假设其中一台机器宕机,然后我希望剩下机器B,继续消费未消费完的消息,可以看到一台机器模拟宕机后,另一台机器继续消费,也就是失败转移
/**
 * Failover故障转移 .subscriptionName("my-subscription") 可重复
 * 一个节点挂掉了 剩余消息转移到另一个节点继续消费
 * 注意这些消费模式 都是和subscriptionName("my-subscription") 订阅者名称相关
 */
int a = 0;

@Test
public void testConsumerFailover() throws IOException {
    MessageListener myMessageListener1 = (consumer, msg) -> {
        try {
            a++;
            if (a > 4) {
                System.out.println("模拟节点1故障");
                //关闭节点1
                consumer.close();
                throw new RuntimeException("模拟某时刻节点1故障,转移至节点2消费");
            }
            System.out.println("Message1 received: " + new String(msg.getData()));
            consumer.acknowledge(msg);
        } catch (Exception e) {
            //consumer.negativeAcknowledge(msg);
        }
    };
    Consumer consumer = client.newConsumer()
            .topic("my-topic")
            .subscriptionName("my-subscription")
            .subscriptionType(SubscriptionType.Failover)
            .messageListener(myMessageListener1)
            .subscribe();

    MessageListener myMessageListener2 = (consumer2, msg) -> {
        try {
            System.out.println("Message2 received: " + new String(msg.getData()));
            consumer.acknowledge(msg);
        } catch (Exception e) {
            consumer.negativeAcknowledge(msg);
        }
    };
    Consumer consumer2 = client.newConsumer()
            .topic("my-topic")
            .subscriptionName("my-subscription")
            .subscriptionType(SubscriptionType.Failover)
            .messageListener(myMessageListener2)
            .subscribe();

    System.in.read();
}

消息队列Pulsar入门(一) 生产者/消费者/Topic详解,附源码演示

Shared
多个使用者将能够使用相同的订阅名称,并且消息将根据连接的使用者之间的循环旋转进行分派。 在这种模式下,消费顺序不能保证
/**
 * Shared模式
 * 多个使用者将能够使用相同的订阅名称,并且消息将根据连接的使用者之间的循环旋转进行分派。 在这种模式下,消费顺序不能保证。
 * 也就是消费者 1 消费者2 总共消费10条
 * 注意都是从 .subscriptionName("my-subscription") 视角
 */
@Test
public void testShared() throws IOException {
    MessageListener myMessageListener1 = (consumer, msg) -> {
        try {
            System.out.println("Message1 received: " + new String(msg.getData()));
            consumer.acknowledge(msg);
        } catch (Exception e) {
            consumer.negativeAcknowledge(msg);
        }
    };
    Consumer consumer = client.newConsumer()
            .topic("my-topic")
            .subscriptionName("my-subscription")
            .subscriptionType(SubscriptionType.Shared)
            .messageListener(myMessageListener1)
            .subscribe();

    MessageListener myMessageListener2 = (consumer2, msg) -> {
        try {
            System.out.println("Message2 received: " + new String(msg.getData()));
            consumer.acknowledge(msg);
        } catch (Exception e) {
            consumer.negativeAcknowledge(msg);
        }
    };
    Consumer consumer2 = client.newConsumer()
            .topic("my-topic")
            .subscriptionName("my-subscription")
            .subscriptionType(SubscriptionType.Shared)
            .messageListener(myMessageListener2)
            .subscribe();

    System.in.read();
}

消息队列Pulsar入门(一) 生产者/消费者/Topic详解,附源码演示

Key_Shared模式
这个简单来理解,发送消息的时候,给这批消息指定一个key,那么消息被消费的时候,相同key的这批消息,只能被同一个节点消费
如下示例我发送消息时,指定下key,然后写消费者看下消费情况,会看到key相同的消息被同一节点消费
    @Test
    public void testProduce2() throws PulsarClientException {
        Producer<String> producer = client.newProducer(Schema.STRING)
                .topic("my-topic")
                .enableBatching(false)
                .create();
        producer.newMessage().key("key-1").value("message-1-1").send();
        producer.newMessage().key("key-1").value("message-1-2").send();
        producer.newMessage().key("key-1").value("message-1-3").send();
        producer.newMessage().key("key-2").value("message-2-1").send();
        producer.newMessage().key("key-2").value("message-2-2").send();
        producer.newMessage().key("key-2").value("message-2-3").send();
        producer.newMessage().key("key-3").value("message-3-1").send();
        producer.newMessage().key("key-3").value("message-3-2").send();
        producer.newMessage().key("key-4").value("message-4-1").send();
        producer.newMessage().key("key-4").value("message-4-2").send();
    }
    
/**
 * Key_Shared模式
 * 多个使用者将能够使用相同的订阅名称,并且消息将根据连接的使用者之间的循环旋转进行分派。 在这种模式下,消费顺序不能保证。
 * 也就是消费者 1 消费者2 总共消费10条
 * 注意都是从 .subscriptionName("my-subscription") 视角
 * <p>
 * 具有相同密钥的消息仅按顺序传递给一个消费者。消息在不同消费者之间的可能分布(默认情况下,我们事先不知道哪些密钥将被分配给消费者,但一个密钥只会同时被分配给消费者
 * ("key-1", "message-1-1")
 * ("key-1", "message-1-2")
 * ("key-1", "message-1-3")
 * ("key-3", "message-3-1")
 * ("key-3", "message-3-2")
 * <p>
 * <p>
 * ("key-2", "message-2-1")
 * ("key-2", "message-2-2")
 * ("key-2", "message-2-3")
 * ("key-4", "message-4-1")
 * ("key-4", "message-4-2")
 */
@Test
public void testKeyShared() throws IOException {
    MessageListener myMessageListener1 = (consumer, msg) -> {
        try {
            System.out.println("Message1 received: " + new String(msg.getData()));
            consumer.acknowledge(msg);
        } catch (Exception e) {
            consumer.negativeAcknowledge(msg);
        }
    };
    Consumer consumer = client.newConsumer()
            .topic("my-topic")
            .subscriptionName("my-subscription")
            .subscriptionType(SubscriptionType.Key_Shared)
            .messageListener(myMessageListener1)
            .subscribe();

    MessageListener myMessageListener2 = (consumer2, msg) -> {
        try {
            System.out.println("Message2 received: " + new String(msg.getData()));
            consumer.acknowledge(msg);
        } catch (Exception e) {
            consumer.negativeAcknowledge(msg);
        }
    };
    Consumer consumer2 = client.newConsumer()
            .topic("my-topic")
            .subscriptionName("my-subscription")
            .subscriptionType(SubscriptionType.Key_Shared)
            .messageListener(myMessageListener2)
            .subscribe();

    System.in.read();
}
    

消息队列Pulsar入门(一) 生产者/消费者/Topic详解,附源码演示

模式对比

 Exclusive只支持同一topic只能有一个同名订阅者,对于目前大多集群架构,需要每个节点命名subscriptionName不同操作下,
 集群中的每个节点都能收到topic消息,对于特殊场景如 前端websocket连接后台集群这类场景,还是蛮实用
 Failover:可以保证在集群中消息只被消费一次,幂等性嘛简单点,正常情况下只被其中一台机器消费,也就是固定一台机器,这种就很纱布了
 Shared: 可以保证在集群中消息只被消费一次,也是保证了幂等性,而且消息被集群平均消费了,压力down down
 Key_Shared 我再想想
 

Topic

Pulsar对topic的命名有如下规则,
{persistent|non-persistent}://tenant/namespace/topic
  • persistent / non-persistent 表示主题的类型,主题分为持久化和非持久化主题,默认是持久化的类型。持久化的主题会将消息保存到磁盘上,而非持久化的主题就不会将消息保存到磁盘。

  • tenant Pulsar 中主题的租户,租户对于 Pulsar中的多租户至关重要,并且分布在集群中。

  • namespace 将相关联的 Topic 作为一个组来管理,是管理 Topic 的基本单元。每个租户可以有一个或多个命名空间。

在上面的示例,我们都没有去关注persistent,tenant,namespace的玩法,因为你不去特殊设置的话,pulsar都有默认的
我们可以尝试往persistent://sample/namespace_test4/topic-haha1直接发一条消息,你会发现发送报错Policies not found for sample/namespace_test4 namespace
/**
 * 报错
 * 向租户sample 命名空间 namespace_test4  topic topic-haha1 发送消息
 * 注意namespace需手动先创建好,否则会报错 olicies not found for sample/namespace_test4 namespace
 */
@Test
public void testProduce322() throws Exception {
    Producer<String> producer = client.newProducer(Schema.STRING)
            .topic("persistent://sample/namespace_test4/topic-haha1")
            .enableBatching(false)
            .create();
    producer.send("向租户sample 命名空间 namespace_test2  topic topic-haha1 发送消息");
    System.in.read();
}

消息队列Pulsar入门(一) 生产者/消费者/Topic详解,附源码演示

这里则表示我们需要先创建namespace之后,搞好对应的namespace tenant这些之后才行
那么如何动态去创建namespace,管理tenanat,以及包括我们刚才搞了那么多的生产者消费者测试出来,
能不能有一个UI界面让我一目了然,一手掌握Pulsar呢?

这里我即将介绍Pulsar的一款UI工具Pulsar admin
详情请看下文