likes
comments
collection
share

RocketMQ(五)消息类型--普通消息

作者站长头像
站长
· 阅读数 16

Rocketmq支持四种消息类型:普通消息、顺序消息、定时/延时消息、分布式事务消息

您在调用SDK收发消息时需注意,消息队列RocketMQ版提供的四种消息类型所对应的Topic不能混用,例如,您创建的普通消息的Topic只能用于收发普通消息,不能用于收发其他类型的消息;同理,事务消息的Topic也只能收发事务消息,不能用于收发其他类型的消息,以此类推。

后面代码都是基于springboot-rocketmq-stater。 ##一、普通消息 无特性的消息,区别于有特性的定时和延时消息、顺序消息和事务消息。 普通消息又分为同步消息,异步消息,单向消息。 ####1.1、同步消息 每向rocketmq服务发送一条消息会收到同步响应。

RocketMQ(五)消息类型--普通消息

下面通过代码实现下同步发送: 测试控制器:

    @Autowired
    private RocketMqProducer rocketMqProducer;

    /**
     * 普通消息同步发送
     */
    @RequestMapping("/send/sync")
    public void sendMsg(){
        rocketMqProducer.sendSync("普通消息-同步发送","test_sync");
    }

生产者:

    @Autowired
    private RocketMQTemplate rocketMQTemplate;

    /**
     * 发送普通消息(同步发送)
     */
    public void sendSync(String msgBody,String topic) {
        for (int i = 0; i < 10; i++) {
            try{
                SendResult sendResult = rocketMQTemplate.syncSend(topic, MessageBuilder.withPayload(msgBody).build());
                if (ObjectUtils.isNotEmpty(sendResult)){
                    //sendResult不空则表示消息发送成功
                    log.info("send success , send msg = {}, messageId = {}",msgBody, sendResult.getMsgId());
                }
            }catch (Exception e){
                log.info("send failed, msg = {}", e);
            }

        }
    }

结果:

2020-11-27 10:33:24.157  INFO 12988 --- [nio-8085-exec-9] c.c.b.m.r.producer.RocketMqProducer      : send success , send msg = 普通消息-同步发送, messageId = AC10020832BC18B4AAC288293D9F001E
2020-11-27 10:33:24.186  INFO 12988 --- [nio-8085-exec-9] c.c.b.m.r.producer.RocketMqProducer      : send success , send msg = 普通消息-同步发送, messageId = AC10020832BC18B4AAC288293DBD0020
2020-11-27 10:33:24.215  INFO 12988 --- [nio-8085-exec-9] c.c.b.m.r.producer.RocketMqProducer      : send success , send msg = 普通消息-同步发送, messageId = AC10020832BC18B4AAC288293DDA0023
2020-11-27 10:33:24.243  INFO 12988 --- [nio-8085-exec-9] c.c.b.m.r.producer.RocketMqProducer      : send success , send msg = 普通消息-同步发送, messageId = AC10020832BC18B4AAC288293DF70026
2020-11-27 10:33:24.272  INFO 12988 --- [nio-8085-exec-9] c.c.b.m.r.producer.RocketMqProducer      : send success , send msg = 普通消息-同步发送, messageId = AC10020832BC18B4AAC288293E140029
2020-11-27 10:33:24.301  INFO 12988 --- [nio-8085-exec-9] c.c.b.m.r.producer.RocketMqProducer      : send success , send msg = 普通消息-同步发送, messageId = AC10020832BC18B4AAC288293E30002C
2020-11-27 10:33:24.330  INFO 12988 --- [nio-8085-exec-9] c.c.b.m.r.producer.RocketMqProducer      : send success , send msg = 普通消息-同步发送, messageId = AC10020832BC18B4AAC288293E4D002F
2020-11-27 10:33:24.359  INFO 12988 --- [nio-8085-exec-9] c.c.b.m.r.producer.RocketMqProducer      : send success , send msg = 普通消息-同步发送, messageId = AC10020832BC18B4AAC288293E6A0032
2020-11-27 10:33:24.390  INFO 12988 --- [nio-8085-exec-9] c.c.b.m.r.producer.RocketMqProducer      : send success , send msg = 普通消息-同步发送, messageId = AC10020832BC18B4AAC288293E880035
2020-11-27 10:33:24.417  INFO 12988 --- [nio-8085-exec-9] c.c.b.m.r.producer.RocketMqProducer      : send success , send msg = 普通消息-同步发送, messageId = AC10020832BC18B4AAC288293EA60038

以上步骤没有消费者,下面演示增加消费者的情况: 消费者代码:

/**
 * RocketMqProducer
 * @date: 2020/11/26
 * @author weirx
 * @version 3.0
 */
@Slf4j
@Component
@RocketMQMessageListener(topic = "test_sync", selectorExpression = "*", consumerGroup = "test")
public class SimpleMessageListener implements RocketMQListener<MessageExt> {

    @Override
    public void onMessage(MessageExt messageExt) {
        byte[] body = messageExt.getBody();
        String msg = new String(body);
        log.info("receive sync message:{}", msg);
    }
}

启动后会立即消费刚才发送的消息:

2020-11-27 10:39:49.543  INFO 45668 --- [MessageThread_2] c.c.b.m.r.c.SimpleMessageListener        : receive sync message:普通消息-同步发送
2020-11-27 10:39:49.543  INFO 45668 --- [essageThread_20] c.c.b.m.r.c.SimpleMessageListener        : receive sync message:普通消息-同步发送
2020-11-27 10:39:49.544  INFO 45668 --- [MessageThread_1] c.c.b.m.r.c.SimpleMessageListener        : receive sync message:普通消息-同步发送
2020-11-27 10:39:49.544  INFO 45668 --- [essageThread_16] c.c.b.m.r.c.SimpleMessageListener        : receive sync message:普通消息-同步发送
2020-11-27 10:39:49.544  INFO 45668 --- [essageThread_13] c.c.b.m.r.c.SimpleMessageListener        : receive sync message:普通消息-同步发送
2020-11-27 10:39:49.543  INFO 45668 --- [MessageThread_4] c.c.b.m.r.c.SimpleMessageListener        : receive sync message:普通消息-同步发送
2020-11-27 10:39:49.544  INFO 45668 --- [MessageThread_6] c.c.b.m.r.c.SimpleMessageListener        : receive sync message:普通消息-同步发送
2020-11-27 10:39:49.543  INFO 45668 --- [MessageThread_5] c.c.b.m.r.c.SimpleMessageListener        : receive sync message:普通消息-同步发送
2020-11-27 10:39:49.544  INFO 45668 --- [MessageThread_8] c.c.b.m.r.c.SimpleMessageListener        : receive sync message:普通消息-同步发送
2020-11-27 10:39:49.544  INFO 45668 --- [essageThread_18] c.c.b.m.r.c.SimpleMessageListener        : receive sync message:普通消息-同步发送

再次发送的结果,发送一条消费一条,符合同步响应的模型:

2020-11-27 10:41:23.837  INFO 45668 --- [nio-8085-exec-1] c.c.b.m.r.producer.RocketMqProducer      : send success , send msg = 普通消息-同步发送, messageId = AC100208B26418B4AAC288308F520029
2020-11-27 10:41:23.839  INFO 45668 --- [essageThread_11] c.c.b.m.r.c.SimpleMessageListener        : receive sync message:普通消息-同步发送
2020-11-27 10:41:23.868  INFO 45668 --- [nio-8085-exec-1] c.c.b.m.r.producer.RocketMqProducer      : send success , send msg = 普通消息-同步发送, messageId = AC100208B26418B4AAC288308F7E002B
2020-11-27 10:41:23.869  INFO 45668 --- [essageThread_16] c.c.b.m.r.c.SimpleMessageListener        : receive sync message:普通消息-同步发送
2020-11-27 10:41:23.896  INFO 45668 --- [nio-8085-exec-1] c.c.b.m.r.producer.RocketMqProducer      : send success , send msg = 普通消息-同步发送, messageId = AC100208B26418B4AAC288308F9C0031
2020-11-27 10:41:23.897  INFO 45668 --- [MessageThread_9] c.c.b.m.r.c.SimpleMessageListener        : receive sync message:普通消息-同步发送
2020-11-27 10:41:23.925  INFO 45668 --- [nio-8085-exec-1] c.c.b.m.r.producer.RocketMqProducer      : send success , send msg = 普通消息-同步发送, messageId = AC100208B26418B4AAC288308FB80037
2020-11-27 10:41:23.926  INFO 45668 --- [MessageThread_5] c.c.b.m.r.c.SimpleMessageListener        : receive sync message:普通消息-同步发送
2020-11-27 10:41:23.954  INFO 45668 --- [nio-8085-exec-1] c.c.b.m.r.producer.RocketMqProducer      : send success , send msg = 普通消息-同步发送, messageId = AC100208B26418B4AAC288308FD6003E
2020-11-27 10:41:23.954  INFO 45668 --- [essageThread_10] c.c.b.m.r.c.SimpleMessageListener        : receive sync message:普通消息-同步发送
2020-11-27 10:41:23.985  INFO 45668 --- [nio-8085-exec-1] c.c.b.m.r.producer.RocketMqProducer      : send success , send msg = 普通消息-同步发送, messageId = AC100208B26418B4AAC288308FF20043
2020-11-27 10:41:23.986  INFO 45668 --- [essageThread_17] c.c.b.m.r.c.SimpleMessageListener        : receive sync message:普通消息-同步发送
2020-11-27 10:41:24.015  INFO 45668 --- [nio-8085-exec-1] c.c.b.m.r.producer.RocketMqProducer      : send success , send msg = 普通消息-同步发送, messageId = AC100208B26418B4AAC2883090120049
2020-11-27 10:41:24.016  INFO 45668 --- [MessageThread_4] c.c.b.m.r.c.SimpleMessageListener        : receive sync message:普通消息-同步发送
2020-11-27 10:41:24.043  INFO 45668 --- [nio-8085-exec-1] c.c.b.m.r.producer.RocketMqProducer      : send success , send msg = 普通消息-同步发送, messageId = AC100208B26418B4AAC28830902F004F
2020-11-27 10:41:24.044  INFO 45668 --- [essageThread_18] c.c.b.m.r.c.SimpleMessageListener        : receive sync message:普通消息-同步发送
2020-11-27 10:41:24.070  INFO 45668 --- [nio-8085-exec-1] c.c.b.m.r.producer.RocketMqProducer      : send success , send msg = 普通消息-同步发送, messageId = AC100208B26418B4AAC28830904B0055
2020-11-27 10:41:24.072  INFO 45668 --- [MessageThread_6] c.c.b.m.r.c.SimpleMessageListener        : receive sync message:普通消息-同步发送
2020-11-27 10:41:24.099  INFO 45668 --- [nio-8085-exec-1] c.c.b.m.r.producer.RocketMqProducer      : send success , send msg = 普通消息-同步发送, messageId = AC100208B26418B4AAC288309067005B
2020-11-27 10:41:24.100  INFO 45668 --- [MessageThread_2] c.c.b.m.r.c.SimpleMessageListener        : receive sync message:普通消息-同步发送

####1.2、异步消息 RocketMQ(五)消息类型--普通消息

测试控制器:

    /**
     * 普通消息异步步发送
     */
    @RequestMapping("/send/async")
    public void sendAsync(){
        rocketMqProducer.sendAsync("普通消息-异步发送","test_async");
    }

生产者:

    /**
     * 发送普通消息(异步发送)
     */
    public void sendAsync(String msgBody, String topic) {
        for (int i = 0; i < 10; i++) {
            rocketMQTemplate.asyncSend(topic, MessageBuilder.withPayload(msgBody).build(), new SendCallback() {

                @Override
                public void onSuccess(SendResult sendResult) {
                    log.info("send success , send msg = {}, messageId = {}", msgBody, sendResult.getMsgId());
                }

                @Override
                public void onException(Throwable throwable) {
                    log.info("send failed, msg = {}", throwable);
                }
            });
        }
    }

消费者:

/**
 * RocketMqProducer
 * @date: 2020/11/26
 * @author weirx
 * @version 3.0
 */
@Slf4j
@Component
@RocketMQMessageListener(topic = "test_async", selectorExpression = "*", consumerGroup = "test_async")
public class SimpleAsyncMessageListener implements RocketMQListener<MessageExt> {

    @Override
    public void onMessage(MessageExt messageExt) {
        byte[] body = messageExt.getBody();
        String msg = new String(body);
        log.info("receive async message:{}", msg);
    }
}

结果,没有像同步一样发送一条消费一条:

2020-11-27 11:09:00.483  INFO 25416 --- [ublicExecutor_7] c.c.b.m.r.producer.RocketMqProducer      : send success , send msg = 普通消息-异步发送, messageId = AC100208634818B4AAC28849D3C00005
2020-11-27 11:09:00.483  INFO 25416 --- [ublicExecutor_8] c.c.b.m.r.producer.RocketMqProducer      : send success , send msg = 普通消息-异步发送, messageId = AC100208634818B4AAC28849D3C00001
2020-11-27 11:09:00.483  INFO 25416 --- [ublicExecutor_4] c.c.b.m.r.producer.RocketMqProducer      : send success , send msg = 普通消息-异步发送, messageId = AC100208634818B4AAC28849D3C00002
2020-11-27 11:09:00.483  INFO 25416 --- [ublicExecutor_6] c.c.b.m.r.producer.RocketMqProducer      : send success , send msg = 普通消息-异步发送, messageId = AC100208634818B4AAC28849D3C00006
2020-11-27 11:09:00.483  INFO 25416 --- [ublicExecutor_5] c.c.b.m.r.producer.RocketMqProducer      : send success , send msg = 普通消息-异步发送, messageId = AC100208634818B4AAC28849D3C00004
2020-11-27 11:09:00.483  INFO 25416 --- [ublicExecutor_1] c.c.b.m.r.producer.RocketMqProducer      : send success , send msg = 普通消息-异步发送, messageId = AC100208634818B4AAC28849D3C00000
2020-11-27 11:09:00.483  INFO 25416 --- [ublicExecutor_8] c.c.b.m.r.producer.RocketMqProducer      : send success , send msg = 普通消息-异步发送, messageId = AC100208634818B4AAC28849D3C00003
2020-11-27 11:09:00.490  INFO 25416 --- [MessageThread_6] c.c.b.m.r.c.SimpleAsyncMessageListener   : receive async message:普通消息-异步发送
2020-11-27 11:09:00.490  INFO 25416 --- [MessageThread_4] c.c.b.m.r.c.SimpleAsyncMessageListener   : receive async message:普通消息-异步发送
2020-11-27 11:09:00.490  INFO 25416 --- [MessageThread_5] c.c.b.m.r.c.SimpleAsyncMessageListener   : receive async message:普通消息-异步发送
2020-11-27 11:09:00.490  INFO 25416 --- [MessageThread_1] c.c.b.m.r.c.SimpleAsyncMessageListener   : receive async message:普通消息-异步发送
2020-11-27 11:09:00.490  INFO 25416 --- [MessageThread_2] c.c.b.m.r.c.SimpleAsyncMessageListener   : receive async message:普通消息-异步发送
2020-11-27 11:09:00.490  INFO 25416 --- [MessageThread_3] c.c.b.m.r.c.SimpleAsyncMessageListener   : receive async message:普通消息-异步发送
2020-11-27 11:09:00.509  INFO 25416 --- [ublicExecutor_6] c.c.b.m.r.producer.RocketMqProducer      : send success , send msg = 普通消息-异步发送, messageId = AC100208634818B4AAC28849D6A00010
2020-11-27 11:09:00.509  INFO 25416 --- [ublicExecutor_4] c.c.b.m.r.producer.RocketMqProducer      : send success , send msg = 普通消息-异步发送, messageId = AC100208634818B4AAC28849D3C00007
2020-11-27 11:09:00.509  INFO 25416 --- [ublicExecutor_5] c.c.b.m.r.producer.RocketMqProducer      : send success , send msg = 普通消息-异步发送, messageId = AC100208634818B4AAC28849D6A00011
2020-11-27 11:09:00.510  INFO 25416 --- [MessageThread_7] c.c.b.m.r.c.SimpleAsyncMessageListener   : receive async message:普通消息-异步发送
2020-11-27 11:09:00.510  INFO 25416 --- [MessageThread_8] c.c.b.m.r.c.SimpleAsyncMessageListener   : receive async message:普通消息-异步发送
2020-11-27 11:09:00.510  INFO 25416 --- [MessageThread_9] c.c.b.m.r.c.SimpleAsyncMessageListener   : receive async message:普通消息-异步发送
2020-11-27 11:09:00.519  INFO 25416 --- [essageThread_10] c.c.b.m.r.c.SimpleAsyncMessageListener   : receive async message:普通消息-异步发送

####1.3、单向发送 RocketMQ(五)消息类型--普通消息

控制器:

/**
     * 普通消息单向发送
     */
    @RequestMapping("/send/oneway")
    public void sendOneway(){
        rocketMqProducer.sendOneWay("普通消息-单向发送","test_oneway");
    }

生产者:

    /**
     * 发送普通消息(单向发送)
     */
    public void sendOneWay(String msgBody,String topic) {
        for (int i = 0; i < 10; i++) {
            //没有返回值和回调方法
            rocketMQTemplate.sendOneWay(topic, MessageBuilder.withPayload(msgBody).build());
        }
    }

消费者

/**
 * RocketMqProducer
 * @date: 2020/11/26
 * @author weirx
 * @version 3.0
 */
@Slf4j
@Component
@RocketMQMessageListener(topic = "test_oneway", selectorExpression = "*", consumerGroup = "test_oneway")
public class SimpleOnewayMessageListener implements RocketMQListener<MessageExt> {

    @Override
    public void onMessage(MessageExt messageExt) {
        byte[] body = messageExt.getBody();
        String msg = new String(body);
        log.info("receive async message:{}", msg);
    }
}

消费者接收结果:

2020-11-27 11:16:35.700  INFO 14364 --- [MessageThread_5] c.c.b.m.r.c.SimpleOnewayMessageListener  : receive async message:普通消息-单向发送
2020-11-27 11:16:35.700  INFO 14364 --- [MessageThread_3] c.c.b.m.r.c.SimpleOnewayMessageListener  : receive async message:普通消息-单向发送
2020-11-27 11:16:35.700  INFO 14364 --- [MessageThread_4] c.c.b.m.r.c.SimpleOnewayMessageListener  : receive async message:普通消息-单向发送
2020-11-27 11:16:35.700  INFO 14364 --- [MessageThread_1] c.c.b.m.r.c.SimpleOnewayMessageListener  : receive async message:普通消息-单向发送
2020-11-27 11:16:35.700  INFO 14364 --- [MessageThread_6] c.c.b.m.r.c.SimpleOnewayMessageListener  : receive async message:普通消息-单向发送
2020-11-27 11:16:35.700  INFO 14364 --- [MessageThread_2] c.c.b.m.r.c.SimpleOnewayMessageListener  : receive async message:普通消息-单向发送
2020-11-27 11:16:35.729  INFO 14364 --- [MessageThread_7] c.c.b.m.r.c.SimpleOnewayMessageListener  : receive async message:普通消息-单向发送
2020-11-27 11:16:35.729  INFO 14364 --- [MessageThread_9] c.c.b.m.r.c.SimpleOnewayMessageListener  : receive async message:普通消息-单向发送
2020-11-27 11:16:35.730  INFO 14364 --- [MessageThread_8] c.c.b.m.r.c.SimpleOnewayMessageListener  : receive async message:普通消息-单向发送
2020-11-27 11:16:35.731  INFO 14364 --- [essageThread_10] c.c.b.m.r.c.SimpleOnewayMessageListener  : receive async message:普通消息-单向发送

##二、三种方式对比

名称速度TPS反馈可靠性
同步发送可靠
异步发送可靠
单向发送最快不可靠