Redis 实现消息队列
前言
如果要实现完整的消息队列能力,比如实现可靠性保证
、持久化
、广播模式
、延时队列
等功能,使用 Redis 来实现 MQ 显然是不明智的。但如果不想引入一个重量级的MQ组件,仅仅想借用Redis来实现简单的MQ也是可以的。
Redis 实现 MQ 主要有三种方案:(1)List 结构;(2)Pub/Sub 模式;(3)Stream 结构。
1. 使用List结构来实现消息队列
方案1:使用LPUSH
向队列发送消息,使用RPOP
从队列消费消息。
问题:虽然实现了简单的消息队列的功能,但是存在巨大的性能问题,消费者需要不停地循环执行RPOP
来轮询消息,即时没有消息时也要不但轮询。
方案2:使用LPUSH
向队列发送消息,使用BRPOP
来消费消息。
优点:BRPOP
支持传入超时时间,单位为s,在超时前会一直等待返回结果,设为0则表示一直等待直到有结果返回。这样在没有消息时无需循环重试,一定程度克服了方案1的缺点。
方案3:使用LPUSH
向队列发送消息,使用BRPOPLPUSH
来消费并备份消息。
优点:备份消息的作用相当于做了一定程度可靠性保证,消费者处理失败时还可以再次消费消息,消费成功后同时删除备份消息。
BRPOPLPUSH
需要传入两个参数,第一个是要取出数据的 list key,第二个是要放入消息的 list key。(其实和用lua脚本分两步实现 BRPOP 和 LPUSH 一个效果)
2. 使用 Pub/Sub 来实现消息队列
方案1:使用publish
向channel发送消息,使用subcribe
来订阅channel的消息。
优点:相当于广播模式发送消息,而且subcribe
可以持续订阅,不需要业务上层不停轮询。
缺点:不能持久化消息,只能收到即时消息,不能收到历史消息,要求subscribe
必须在publish
之前执行。
方案2 使用publish
向channel发送消息,使用psubscribe
来订阅特定模式channel的消息。
优点:通过psubscribe
可以订阅满足特定模式的 channel 的消息,有点类似于 rabbitmq 里面的 topic 交换机。使得应用场景可以更加广泛。
3. 使用 Stream 来实现消息队列
Redis Stream相比较 pub/sub ,提供了持久化的能力,并且增加了一些完整 MQ 组件的概念,比如消费者组。 接下来只介绍使用 Stream 实现简单消息队列的方式。
(1)使用 XADD 来发布消息
xadd mystream * name jiangwang age 26
mystream
是 redis key;- 而
*
是消息 ID,使用*
表示由 redis 自己生成消息 ID(也可以指定,但是要保证 ID 唯一)。默认生成的消息 ID 格式为:时间戳_同时间戳内消息index
。 - 后面的
消息主体
是多组field-value
对,比如 name 是 filed,jiangwang 是 value。
(2)使用 xread 读取指定区间的消息
xread COUNT 2 BLOCK 0 STREAMS mystream 0-0
- 其中
COUNT
表示要读取的消息数目(不指定 count 会一直读取到最新的消息),这里 count 是2; BLOCK
表示阻塞时间,0表示一直等待到有消息;STREAMS
表示要读取的 stream key;0-0
表示开始消息 ID
,格式同样是时间戳_index
;如果消息ID
设置为$
,那么只会从当前开始读取一条消息。
小结
学习并记录了使用 Redis 实现消息队列的3种方式,分析不同方案的优缺点,并给出了实际使用示例。
参考网址
https://mp.weixin.qq.com/s/_q0bI62iFrG8h-gZ-bCvNQ
转载自:https://juejin.cn/post/6917576292808261645