图解RocketMQ之消息的过滤
经过之前几篇文章,我们知道了 RocketMQ 的架构以及主题 topic 的属性和配置,知道了原来消息是存在一个个队列中的。
也知道了同一个 topic 有一个或者多个队列对应给一个或者多个消费者消费。消费者根据消费位点确定消费记录。
还理解了可以通过重置消费位点来处理消息堆积、重新消费和跳过消费某一消息。
按照发布订阅模型,RocketMQ 会将所有订阅了主题的消息都投递给消费者,但有时候消费者只关心消息里的某一内容而不是全量消息。
比如订单系统订单状态的改变需要被不同的子系统处理,而下游不同系统需要不同状态的订单:
- 库存系统只关心已支付的订单,用来扣减库存
- 物流系统只关心待发货订单,用于安排发货
- 会计系统关心所有已完成的订单,且金额大于1000元的订单,用于财务统计
这个时候怎么办呢?聪明的你肯定想到这还不简单,在各自系统做逻辑判断,全量接收后,各自系统做过滤呗。
这当然不是优雅的办法(追求优雅的道路不能停🐶),消息多了是会耗费资源和传输带宽的,要想解决这个问题,其实就用到了 RocketMQ 的消息过滤机制。
什么是消息过滤
消息过滤是 RocketMQ 提供的一种帮助消费者高效过滤自己需要消息的机制。
当然了,过滤是将符合条件的消息投递给消费者,而不是将消息过滤掉。
也就是在 RocketMQ 投递消息的时候,就已经确定了消费者需要什么消息,而不是一股脑的都喂给消费者,这叫什么?
这就叫投其所好。(看到没,计算机的世界同样如此)
现在库存系统只想要已支付订单的消息是吧?
好,那我就只吧已支付订单的消息给你,其他消息就算烂掉也不给你,你也没说要啊。
用一张图来解释下会更清晰:
消息过滤原理
问题很多的小明他又问了,那么消息过滤是怎么实现的,RocketMQ 怎么知道我想要什么消息,哪些消息投递给我呢?
但凡是要投其所好,得先知道对方喜欢什么吧?在 RocketMQ 中有一个强大的功能,可以对消息打标签,也可以对消息自定义属性。
这个很好理解,生产者在发送消息的时候就对消息做好标签和属性管理,在 RocketMQ 服务端仅需要根据过滤条件进行筛选,消费者订阅对应标签和属性的消息即可。
同样拿订单系统为例,将订单消息打好标签(已支付、待发货、已完成),库存系统现在仅需要已支付的订单,RocketMQ 就把标签是已支付的订单消息发给库存系统。
这其实主要是三方做下简单设置:
- 生产者:发送消息的时候,给消息定义属性和标签
- 消费者:调用订阅关系注册接口告诉 RocketMQ 我想要哪些标签或属性的消息
- RocketMQ 服务端:消费者获取消息时,会触发服务端的过滤算法,按需给消费者对应消息
以上就是消息过滤的原理,看起来是不是蛮简单的,那具体该怎么操作呢?
如何进行消息过滤
了解了消息过滤原理不难知道,首先是生产者设置标签或属性,然后消费者在订阅关系中指定自己需要的标签和属性即可完成。
这里说的订阅关系,其实就是消费者获取消息的配置。
消息过滤分 2 大类型,一种是 tag 标签过滤,一种是 SQL 属性过滤。
对于简单场景打个标签过滤就行了,但如果你的业务足够复杂,需要自定义过滤条件,那就得需要 SQL 属性过滤。
Tag 消息过滤
这是最常用的消息过滤类型了,生产者设置消息标签,消费者选择对应标签下的消息消费。
那么生产者如何对消息打标签呢?看看实例代码吧:
Message message = messageBuilder.setTopic("topic")
//设置消息索引键,可根据关键字精确查找某条消息。
.setKeys("messageKey")
//设置消息Tag,用于消费端根据指定Tag过滤消息。
//该示例表示消息的Tag设置为"TagA"。
.setTag("TagA")
//消息体。
.setBody("messageBody".getBytes())
.build();
看吧,是不是超简单,无非就是 setTag 一下呗。
消费者端如何拿到这个标签下的消息呢?看看示例代码:
[Type/Paste Your Code](<String topic = "Your Topic";
//只订阅消息标签为"已支付"的消息。
FilterExpression filterExpression = new FilterExpression("yizhifu", FilterExpressionType.TAG);
pushConsumer.subscribe(topic, filterExpression);>)
这样就可以匹配单个标签的消息了,那问题很多的小明他又问了,我如果需要匹配多个标签的消息怎么办呢?
比如客户服务系统,需要知道已完成的订单和退款的订单,用于分析客户行为。
那也是没问题的,只需要在消费端匹配多个 tag 标签即可,示例如下:
String topic = "Your Topic";
//只订阅消息标签为"已支付"、"待发货"或"已完成"的消息。
FilterExpression filterExpression = new FilterExpression("yizhifu||daifahuo||yiwancheng", FilterExpressionType.TAG);
pushConsumer.subscribe(topic, filterExpression);
当然了过滤条件 FilterExpression 中啥也不写,那就是过滤所有标签了:
[Type/Paste Your Code](<String topic = "Your Topic";
//使用Tag标签过滤消息,订阅所有消息。
FilterExpression filterExpression = new FilterExpression("*", FilterExpressionType.TAG);
pushConsumer.subscribe(topic, filterExpression);>)
SQL 属性过滤
这是一种高级的过滤方式,有多高级呢?其实也没多高级😂,无非是生产者对消息设置 key-value 属性,消费者设置 SQL 语法的过滤表达式即可。
其实在 SQL 语法中,如果定义了属性 TAGS,那其实也是 tag 标签。
同样是上面的订单系统,订单的状态也可以用属性来定义,比如属性就叫做 status,也就是 key 是 status,那么不同的状态就对应不同的 value 值:
- 已支付:{"status": "Have paid"}
- 待发货:{"status": "wait sent"}
- 已完成:{"status": "complete"}
对应的过滤效果如下图所示:
生产者设置消息的属性:
Message message = messageBuilder.setTopic("topic")
//设置消息索引键,可根据关键字精确查找某条消息。
.setKeys("messageKey")
//消息也可以设置自定义的分类属性,例如环境标签、地域、逻辑分支。
//该示例表示为消息自定义一个属性,该属性为订单状态,属性值为已完成。
.addProperty("status", "complete")
//消息体。
.setBody("messageBody".getBytes())
.build();
那么消费者只需要写对应的 SQL 表达式来选择过滤条件,RocketMQ 中 SQL 过滤使用 SQL92语法作为过滤规则表达式。
想要更系统了解的也可以自行搜索下该语法的使用哈,整体不难,对应规则判断就好。
那么在消费端就可以进行定义 SQL 过滤表达式来设置过滤条件:
[Type/Paste Your Code](<String topic = "topic";
//只订阅地域属性为杭州的消息。
FilterExpression filterExpression = new FilterExpression("statu IS NOT NULL AND statu='complete'", FilterExpressionType.SQL92);
simpleConsumer.subscribe(topic, filterExpression);>)
这样就可以完成 SQL 过滤啦,当然 FilterExpression 中可以传入多个值,代表可以同时根据多个自定义属性匹配消息,也可以传空,则为订阅所有属性的消息。
PmHub 使用实践
下面进入最激动人心的实战环节,在 PmHub 中使用的是 tag 标签来进行的消息过滤。使用场景主要是进行任务的预期提醒以及流程状态更新。
在生产者侧,往对应的 tag 下发送消息:
在消费者侧,设置对应的标签进行消费:
当然了涉及到的代码和业务会更复杂,大家可以看看源码就可更深入了解,源码都是在 GitHub 上开源的,大家可以自行搜索哈。
总结
以上,我们详细介绍了 RocketMQ 的消息过滤机制,让消息投其所好,最大化发挥他该有的价值,这其中包含 tag 标签过滤和 SQL 属性过滤。
当然了,不管哪种方式,其核心的过滤原理不变。
大家有没有想过,消息是过滤了,但如果消费者出现异常,消费某一条消息失败,这时候 RocketMQ 会怎么处理呢?
好啦,今天的分享结束。
我是苍何,这是图解 RocketMQ 教程的第 5 篇,我们下篇见~
转载自:https://juejin.cn/post/7395862212183326770