likes
comments
collection
share

RocketMQ:写一个生产者程序要注意些什么

作者站长头像
站长
· 阅读数 14

背景

消息队列是我们日常工作中最长用到的中间件之一,那么写一个生产者程序要注意什么,这里已 RocketMQ 为示例,简单总结下。

关键字:RocketMQ,消息中间件,生产者,

选择合适的发送消息的方式

RocketMQ 支持同步发送、异步发送、延迟投递、单向投递(Oneway),我们看下各自的适用场景。

同步发送

如果要立即知道消息发送的结果,等待消息发送完成后才能继续下一步动作,就应该采用同步发送。

同步发送的代码如下:

    public static void test02() throws UnsupportedEncodingException, MQClientException, RemotingException, InterruptedException, MQBrokerException {

        // 1. 指定 group、nameserver
        DefaultMQProducer producer = new DefaultMQProducer("test_topic01_group001");
        producer.setNamesrvAddr("localhost:9876");

        // 2. 启动生产者
        producer.start();

        // 3. 构造消息
        Message message = new Message("test_topic01", ("hello world" + System.currentTimeMillis()).getBytes(RemotingHelper.DEFAULT_CHARSET));

        // 4. 同步发送,第一个参数是要发送的消息,第二个参数是超时时间
        producer.send(message, 10);

        System.out.println("消息发送成功");
        producer.shutdown();
    }

异步发送

如果不需要立即感知消息发送的结果,或者对性能有一定要求的话,就可以采用异步方式。采用异步发送,RocketMQ 会起一个线程,异步执行消息发送逻辑,不影响主流程。消息发送完成后,会有个回调函数,业务代码基于回调结果做后续的操作。

    public static void test01() throws MQClientException, UnsupportedEncodingException, RemotingException, InterruptedException {
        // 1. 指定 group、nameserver
        DefaultMQProducer producer = new DefaultMQProducer("test_topic01_group001");
        producer.setNamesrvAddr("localhost:9876");

        // 2.启动 producer //
        producer.start();

        // 3. 构造消息
        Message m = new Message("test_topic01", null, ("hello world" + System.currentTimeMillis()).getBytes(RemotingHelper.DEFAULT_CHARSET));
        
        // 4. 异步发送,一个参数是要发送的消息,第二个参数是消息发送完成后(可能成功,可能失败)的回调接口
        producer.send(m, new SendCallback() {
            @Override
            public void onSuccess(SendResult sendResult) {
                System.out.println(String.format("sendResult = %s, sendStatus = %s", sendResult, sendResult.getSendStatus()));
            }

            @Override
            public void onException(Throwable e) {
                e.printStackTrace();
            }
        }, 1);

        System.out.println("消息发送成功");
        producer.shutdown();

    }

异步发送完成后,会回调我们指定的回调函数。发送结果有四种状态:

  • SEND_OK:大吉大利,发送成功
  • FLUSH_DISK_TIMEOUT:表示消息发给了 Broker,但是 Broker 机器存储消息的时候,没有在规定时间内 flush 到硬盘上
  • FLUSH_SLAVE_TIMEOUT:表示消息发给了 Broker,但是 Broker 做主从同步的时候,没有在规定时间内同步给 slave 机器
  • SLAVE_NOT_AVAILABLE:表示消息发给了 Broker,但是 Broker 做主从同步的时候,没有找到 slave 机器

延迟投递

我们在外卖平台上点外卖,下单后会跳到一个界面,告诉我们,说是要在15分钟内付款,不然就自动取消订单,这个该怎么实现呢?

用延迟投递就可以简单的实现,用户下单成功后,发一个延迟15分钟的MQ消息,消费者收到消息,再校验下有没有付款,没付款的话,就取消订单即可。

代码实现起来很简单,构造好消息后,调用 setDelayTimeLevel() 方法,设置好延迟时间,再发送即可

Message message = new Message("test_topic01", ("hello world" + System.currentTimeMillis()).getBytes(RemotingHelper.DEFAULT_CHARSET));

message.setDelayTimeLevel(2);

producer.send(message, 10);

单向投递(Oneway)

如果面对一些追求吞吐量,对稳定要求不高的场景,比如日志收集,用户行为收集等场景,单向投递就是一个不错的选择。

当采用单向投递方式,RocketMQ 只负责把消息写入 broker 的 socket 缓冲区就返回,不等待对方的处理结果。这种消息发送方式,可以达到微秒级。

代码实现如下,调用producer 的 sendOneway() 方法即可:

Message message = new Message("test_topic01", ("hello world" + System.currentTimeMillis()).getBytes(RemotingHelper.DEFAULT_CHARSET));
        
producer.sendOneway(message);

思考是否要使用消息 tag

分享一个日常工作中实际遇到的 case。

最近接入了这么一个业务场景,商家端因为商家端很多,每天会推送很多消息(百万级),这些消息分为几种类型:营销活动提醒、缴费提醒、日常推送等等。

伟大的PM说,营销活动的提醒如果能让销售也收到,这样销售就知道商家最近都感知到哪些活动,就能及时跟进讲解。所以想着把营销活动类的提醒也转发一份给销售端。

这个伟大的需求,最简单的方式就是申请一个消费者,消费商家端的消息,判断是否是【营销活动提醒】,如果是就进行转发,不是就忽略。

但是每天百万级的消息,仅仅三四台消费者机器去消费肯定不够,会造成消息堆积。再一打听,【营销活动提醒】类型的消息每天很少,就一两条。难道要为这一两条消息投入七八台机器吗,有什么办法吗?

这个时候,使用消息 tag 就能很好的解决。让生成者发消息的时候,把消息类型传到tag里,而我这边消费的时候也只消费【营销活动提醒】类型的消息。这样,从broker拉取的消息,从百万级降低到几条消息,少占用带宽从而提高吞吐。

所以,写生成者程序的时候要思考下业务场景,尤其是消息量级比较大的场景,消费者那边是否会需要关注所有的消息,不需要的话,就要考虑是否使用tag。

代码实例如下,Message 的构造函数,第二个就是 tag 参数。

Message message = 
    new Message("test_topic01", "tag", ("hello world" + System.currentTimeMillis()).getBytes(RemotingHelper.DEFAULT_CHARSET));

重试

现在都是分布式环境了,消息的发送是要经过网络的,而网络又是不稳定的,另外消息中间件本身也可能会出问题(机器宕机,机器迁移等)。

所以需要有合适的重试机制,提高消息发送的成功率。

设置重试次数的时候有一点要注意,就是同步发送和异步发送用的是不同的配置,不要设置错了。

RocketMQ:写一个生产者程序要注意些什么

总结

写一个生产者程序要注意基于业务场景,选择合适的发送消息的方式,其次是思考是否要使用消息 tag 和重试。

Ref

  • 《RocketMQ 实战与原理解析》杨开元
转载自:https://juejin.cn/post/7155348744379547655
评论
请登录