likes
comments
collection
share

redis消息队列,你还不敢用?

作者站长头像
站长
· 阅读数 20

前言

消息队列要能支持组件通信消息的快速读写,而 Redis 作为一款常用的缓存组件,本身支持数据的高速访问,正好可以满足消息队列的读写性能需求。不过,除了性能,消息队列还有其他的要求,所以,很多人都很关心一个问题:“Redis 适合做消息队列吗?”

其实,这个问题的背后,隐含着两方面的核心问题:

  • 消息队列适用场景?有哪些设计要点?
  • Redis 如何实现消息队列的需求?

一、关于消息队列

1、应用场景

在系统设计中,常常会考虑系统的高可用高性能易扩展能力,而消息队列也作为一种工具手段,通常用于解决:

  • 应用解耦
  • 削峰填谷

消息队列常见的使用场景:

  • 比如电商里的下单后与会员积分、物流订单等异步解耦处理
  • 电商促销短信下发,使用MQ来削峰填谷

市面上已经存在专业的MQ有RocketMQ、Kafka等,为什么还需要Redis来自定义实现消息队列?

  • 重!需要额外的成本负担,包括运维成本、学习成本等等;所以如果你的场景足够简单,redis 完全能满足需求,可以考虑使用 redis 做消息队列
  • redis 是一款轻量级内存组件,相信你一定也经常使用,使用成本低。

2、如何设计消息队列

一般设计消息队列需要考虑三个需求,分别是

  • 消息保序:对应消息需要有序消费的场景
  • 处理重复消息:如网络抖动引起的同一条消息多次被投递到队列的场景
  • 保证消息可靠性:消息从队列取出,此时客户端宕机,消息未正常消费的场景

当然,以上三个条件可根据场景选择性实现

需求一:消息保序

虽然消费者是异步处理消息,但是,消费者仍然需要按照生产者发送消息的顺序来处理消息,避免后发送的消息被先处理了。对于要求消息保序的场景来说,一旦出现这种消息被乱序处理的情况,就可能会导致业务逻辑被错误执行,从而给业务方造成损失。

需求二:重复消息处理

消费者从消息队列读取消息时,有时会因为网络堵塞而出现消息重传的情况。此时,消费者可能会收到多条重复的消息。对于重复的消息,消费者如果多次处理的话,就可能造成一个业务逻辑被多次执行,如果业务逻辑正好是要修改数据,那就会出现数据被多次修改的问题了。

需求三:消息可靠性保证

消费者在处理消息的时候,还可能出现因为故障或宕机导致消息没有处理完成的情况。此时,消息队列需要能提供消息可靠性的保证,也就是说,当消费者重启后,可以重新读取消息再次进行处理,否则,就会出现消息漏处理的问题了。

二、Redis 消息队列解决方案

1、基于 List 的消息队列解决方案

List 本身就是按先进先出的顺序对数据进行存取的,所以,如果使用 List 作为消息队列保存消息的话,就已经能满足 消息保序 的需求了。

具体来说,生产者可以使用 LPUSH 命令把要发送的消息依次写入 List,而消费者则可以使用 RPOP 命令,从 List 的另一端按照消息的写入顺序,依次读取消息并进行处理。

127.0.0.1:6379> lpush list1 one two three four five
(integer) 5

127.0.0.1:6379> rpop list1
"one"

需要注意的是,当有多个消费端消费同一个队列时,这个时候就不是有序了。

在消费者读取数据时,有一个潜在的性能风险点。

在生产者往 List 中写入数据时,List 并不会主动地通知消费者有新消息写入,如果消费者想要及时处理消息,就需要在程序中不停地调用 RPOP 命令。如果有新消息写入,RPOP 命令就会返回结果,否则,RPOP 命令返回空值,再继续循环。

while (1) {
   val msg = redis.rpop()
   ...
}

所以,即使没有新消息写入 List,消费者也要不停地调用 RPOP 命令,这就会导致消费者程序的 CPU 一直消耗在执行 RPOP 命令上,引发 CPU 空转问题。

当然,也有解决办法;当没有拉取到消息时,就 sleep 一会即可:

while (1) {
   val msg = redis.rpop()
   
   if (msg == null) {
      sleep(t) // t=2000
      continue
   }
   ...
}

但是这种情况下,可能存在消息延迟 t 时间处理,接着往下看:

Redis 提供了 BRPOP 命令。BRPOP 命令也称为阻塞式读取,客户端在没有读到队列数据时,自动阻塞,直到有新的数据写入队列,再开始读取新数据。和消费者程序自己不停地调用 RPOP 命令相比,这种方式能节省 CPU 开销。

消息保序的问题解决了,接下来,还需要考虑解决 重复消息处理 的问题;来看看消费者是怎么针对消息进行重复性判断。

  • 一方面,每条消息要有全局唯一性 ID;
  • 另一方面,消费者程序要把已经处理过的消息的 ID 号记录下来。

当收到一条消息后,消费者程序就可以对比收到的消息 ID 和记录的已处理过的消息 ID,来判断当前收到的消息有没有经过处理。如果已经处理过,消费者程序就不再进行处理了。这种处理特性也称为幂等性,幂等性就是指,对于同一条消息,消费者收到一次的处理结果和收到多次的处理结果是一致的。

不过,List 本身是不会为每个消息生成 ID 号的,所以,消息的全局唯一 ID 号就需要生产者程序在发送消息前自行生成。生成之后,我们在用 LPUSH 命令把消息插入 List 时,需要在消息中包含这个全局唯一 ID。

最后,我们再来看下,List 类型是如何保证 消息可靠性 的。

当消费者程序从 List 中读取一条消息后,List 就不会再留存这条消息了。所以,如果消费者程序在处理消息的过程出现了故障或宕机,就会导致消息没有处理完成,那么,消费者程序再次启动后,就没法再次从 List 中读取消息了。

我们可以新增一个备份队列,当消息从队列中 RPOP 时,及时放入备份队列,当消费端提交 ACK 时再从备份队列中删除;

当然,List 类型提供了 BRPOPLPUSH 命令,这个命令的作用是让消费者程序从一个 List 中读取消息,同时,Redis 会把这个消息再插入到另一个 List(可以叫作备份 List)留存。这样一来,如果消费者程序读了消息但没能正常处理,等它重启后,就可以从备份 List 中重新读取消息并进行处理了。

当然,关于备份队列不一定要用 list 的 BRPOPLPUSH 命令,还可以直接使用 hash 这样的结构来存储,总之,选择多样性

消息可靠性相关伪代码如下:

// 入队
public void schedule(Object o) {
    ...

    redis.lpush(o);

    ...
}

// 出队
public Object poll() {
    ...
    // 取出一个元素, 并存储备份队列
    Object o = redis.brpoplpush("source_queue", "destination_backup_queue", 1000);

    ...

    return o;
}

// ACK 
public Object commit(String message) {
    ...

    // 从备份队列中删除
    // 当然这里删除操作复杂度为 O(n),其中 n 为备份队列元素个数
    jedis.lrem("destination_backup_queue", message)
  
    ...
}

// 故障恢复
public void recoverFromCrash() {
   ...
   
   // 1. 取出所有未 ACK 的 message
   List<String> messages = redis.lrange("destination_backup_queue", 0, -1);
   
   // 2. 遍历 messages
   foreach o in messages {
       // 3. 重新入队
       this.schedule(o);
   }
   ...
}

2、基于 zset 的消息队列解决方案

在日常开发中,你也可能经常遇到需要延时队列来处理的场景,比如支付订单的15分钟有效时间、物流订单30分钟同步一次最新状态等等。这种场景,你可以考虑使用 redis 的 zset 来实现

zset 结构有一个参数 score, 可以用来控制时延,比如设置15分钟后处理,可以这样写:

public Object schedule() {
    ...
   
    try (Jedis jedis = jedisUtils.acquire()) {
        jedis.zadd(queueName, 当前时间戳 + 15分钟, content);
    }
    
    ...
}

然后这样取:

public Object poll() {
    // 1. 取第一个元素
    Object o = jedis.zrangeWithScores(queueName, 0, 0)
    // 2. 然后和当前时间比,是否已到了处理时间
    if (o == null || o.getScheduledAt() > nowTime) {
        return null
    }
    
    // 做一些逻辑判断
    ...
    
    // 这里为了安全起,先拿到一个元素,放入备份队列之后再从原队列中删除
    // 3. 先放入备份队列
    jedis.hset("backup", key, o.getContent());
    // 4. 在从 zset 中删除
    jedis.zrem(queueName, o.getContent()
}

// ACK
public Object commit(String key) {
    ...
    
    // 从备份队列中删除
    // 时间复杂度 O(1)
    jedis.hdel("backup", key)
 
    ...
}

// 故障恢复
public void recoverFromCrash() {
   ...
   
   // 1. 取出所有未 ACK 的 key
   List<String> keys = redis.hkeys(keyName);
   
   // 2. 遍历 keys
   foreach key in keys {
       // 3.重新拿到这条消息
       Object o = redis.hget(keyName, key);
       
       // 4. 重新入队
       this.schedule(o);
   }
   ...
}

zset 是一个有序的结构,每次取第一个元素来判断能否处理,如果第一个元素都不能处理,说明之后的元素都没有到执行时间。

这里和 list 类似,都需要一个备份队列来保证消息的可靠性;也就是3、4步骤,将消息先存入备份队列,再从原队列中删除消息,才算是取出了消息;当客户端提交了 ACK 之后,再从备份队列中删除消息,到这,才算完整的处理了该消息。

# 入队
127.0.0.1:6379> zadd zset1 1644741353 zhangsan
(integer) 1

# 查看
127.0.0.1:6379> zrange zset1 0 -1 WITHSCORES
1) "zhangsan"
2) "1644741353"

# 进入备份队列
127.0.0.1:6379> hset backup key1 zhangsan
(integer) 1

# 从 zset 队列删除
127.0.0.1:6379> zrem zset1 zhangsan
(integer) 1

# ACK 后从备份队列删除
127.0.0.1:6379> hdel backup key1
(integer) 1

来看看 zset 实现队列的几个特点:

  • 消息的有序性?zset 结构本身是按照 score 有序的,因此从消息投递先后来看便是无序;你可以根据 score 参数值的大小来控制消息在队列的先后顺序
  • 重复消息处理消息可靠性保障与 list 结构实现的队列类似

3、基于 Streams 的消息队列解决方案

到目前为止,list 和 zset 实现的队列都没能很好的支持多消费组的场景;当然,你可能会想,按照多消费者组的场景,也可以接着写下去;但是,复杂就飙升了,还容易出错。

Redis 从 5.0 版本开始提供的 Streams 数据类型,是为 redis 设计的消息队列,能支持多消费组的场景。

Streams 提供了丰富的消息队列操作命令。

  • XADD:插入消息,保证有序,可以自动生成全局唯一 ID;
  • XREAD:用于读取消息,可以按 ID 读取数据;
  • XREADGROUP:按消费组形式读取消息;
  • XPENDING 和 XACK:XPENDING 命令可以用来查询每个消费组内所有消费者已读取但尚未确认的消息,而 XACK 命令用于向消息队列确认消息处理已完成。

常用命令:

1、XADD:

> XADD mystream * sensor-id 1234 temperature 19.8
1518951480106-0
  • 其中 * 表示 id 自增,其组成部分为 millisecondsTime-sequenceNumber,毫秒数 + 当前毫秒內递增数;当然,这里 id 也可以自己设置,比如 0-1、0-* 等。
  • sensor-id 和 temperature 都是该消息的字段,1234 和 19.8 是其对应的值

2、XLEN

> XLEN mystream
(integer) 1

表示查询 stream 队列的长度

3、XRANGE

> XRANGE mystream - +
1) 1) 1518951480106-0
   2) 1) "sensor-id"
      2) "1234"
      3) "temperature"
      4) "19.8"
2) 1) 1518951482479-0
   2) 1) "sensor-id"
      2) "9999"
      3) "temperature"
      4) "18.2"

其中 - + 表示最小 id 和 最大 id,也就是返回整个队列数据

> XRANGE mystream - + COUNT 2
1) 1) 1519073278252-0
   2) 1) "foo"
      2) "value_1"
2) 1) 1519073279157-0
   2) 1) "foo"
      2) "value_2"

返回队列前两条数据

4、XREVRANGE

> XREVRANGE mystream + - COUNT 1
1) 1) 1519073287312-0
   2) 1) "foo"
      2) "value_10"

与 XRANGE 正好相反,返回队列最后一条数据

5、XREAD

> XREAD COUNT 2 STREAMS mystream 0
1) 1) "mystream"
   2) 1) 1) 1519073278252-0
         2) 1) "foo"
            2) "value_1"
      2) 1) 1519073279157-0
         2) 1) "foo"
            2) "value_2"
  • 其中 count 为可选参数,0 代表读取所有 id 大于 0-0的记录;
  • 这里 STREAMS 可以读取多个 stream, 比如 xread streams mystream mystream2 0 0
> XREAD BLOCK 0 STREAMS mystream $
  • 这里 BLOCK 选项是一种类似于 BRPOP 的阻塞读取操作
  • 有些场景想要获取最新的消息,可以使用 阻塞字段block + $;
  • $ 表示取当前 max id 之后的消息,也就是等待最新消息,该操作类似于 unix 的 tail -f 命令;
  • 当然,这里 $ 也可以换成 ID;
  • 同样,这里也可以监听多个 STREAM,比如 xread block 10000 STREAMS mystream mystream2 ,只要有一个 stream 有新消息就立即返回,如果都有消息,就都一起返回

6、XGROUP 用于创建、销毁和管理消费组;这里的消费组类似于 kafka 消费组的概念,组内消费者间消费数据具有互斥性、组间可以处理相同数据,不同的是 kafka 有分区的概念,而 redis stream 则没有。

> XGROUP CREATE mystream mygroup $
OK
  • 创建消费组 mygroup,创建组一般需要指明 the last message ID,这里 $ 表示从此刻之后的新消息才会让 mygroup消费到
  • 当然,如果是 0,表示要从头开始消费,也包括 STREAM 的历史消息;同样,这里也可以指定任意 id 开始消费
> XGROUP CREATE newstream mygroup $ MKSTREAM
OK

当指定的 STREAM 不存在时,可以指定子选项 MKSTREAM 自动创建 STREAM

7、XREADGROUP

> XREADGROUP GROUP mygroup Alice COUNT 1 STREAMS mystream >
1) 1) "mystream"
   2) 1) 1) 1526569495631-0
         2) 1) "message"
            2) "apple"
  • 这里 mygroup 是消费组名,Alice 是 mygroup 下的消费者;'>' 是特定 id,也可以指定为其他值
  • 如果这里指定为特定 id 为 '>',则表示从第一条尚未被消费的消息开始读取,并更新消费组的 last ID
  • 如果指定的 id 为其他任意指定的有效 id, 该命令将访问历史 pending 队列消息

消息队列中的消息一旦被消费组里的一个消费者读取了,就不能再被该消费组内的其他消费者读取了。也就是说,同一个消费组内,消息是互斥的。

值得注意的是:使用消费组的目的是让组内的多个消费者共同分担读取消息,所以,通常会让每个消费者读取部分消息,从而实现消息读取负载在多个消费者间是均衡分布的。这一点与 kafka 是不同的,kafka 有分区的概念,消费组内的消费者负责消费不同的分区数据,不存在多个消费者消费同一个分区的情况。

8、XPENDING 查询消费者已拉取但未 ACK 的消息,可用于出现 crash 等未正常执行 ACK 情况下的数据恢复,相当于 list 队列的 “备份队列”,使用如下:

XPENDING <key> <groupname> [[IDLE <min-idle-time>] <start-id> <end-id> <count> [<consumer-name>]]

命令行,如:

> XPENDING mystream mygroup - + 10
1) 1) 1526569498055-0
   2) "Bob"
   3) (integer) 74170458
   4) (integer) 1
2) 1) 1526569506935-0
   2) "Bob"
   3) (integer) 74170458
   4) (integer) 1

返回 pending 队列中前十条数据。

为了保证消费者在发生故障或宕机再次重启后,仍然可以读取未处理完的消息,Streams 会自动使用 PENDING List 队列,备份消费组里每个消费者读取的消息,直到消费者使用 XACK 命令通知 Streams “消息已经处理完成”。如果消费者没有成功处理消息,它就不会给 Streams 发送 XACK 命令,消息仍然会留存。此时,消费者可以在重启后,用 XPENDING 命令查看已读取、但尚未确认处理完成的消息

Streams 是 Redis 5.0 专门针对消息队列场景设计的数据类型,如果你的 Redis 是 5.0 及 5.0 以后的版本,就可以考虑把 Streams 用作消息队列了。

三、总结

1、设计消息队列需要考虑的场景:

  • 消息保序
  • 处理重复消息
  • 消息的可靠性

2、常用的redis消息队列解决方案:

  • redis list 结构
  • zset 权重型延迟队列(优先)
  • stream多消费组队列

3、redis 消息队列的困境

  • 如果消息积压过大,会给内存造成过重负担?你可以设置一些监控机制,比如及时告警处理、限制队列长度等
  • stream 队列出现了消费组的概念,组之间是隔离的,也就意味着没有类似于 list.lpop 操作直接删除数据,那数据一直增长会有什么影响?可以通过选项值 MAXLEN 控制 stream 队列的长度
  • 队列本身是否会丢失数据?这取决于 redis 本身的持久化策略,比如采用 AOF 每1s刷盘一次,这个时候最多就会丢失1s的数据;主从同步时,从库出现延迟,此时从库被提升为主库,也可能出现部分数据丢失

4、redis 消息队列优点

  • 方便,相信目前很多项目开发已引入 redis, 因此,运维、学习成本等就降低了
  • 轻量级,开箱即用,也可以自己做一层简单封装
  • 多样性,可根据需求选择底层队列结构

可以看到,redis 队列对开发者来说是友好的,但你可能考虑到它的消息积压、redis 本身数据可能丢失风险?

  • 就很多实际场景来说,很多情况下都要求数据的实时性,这个时候你需要扩展消费端的能力来解决,防止消息积压;
  • 关于数据丢失:redis 服务宕机是小概率事件,也许你一年两年都遇不上一次

综上,redis 队列可以考虑在非核心业务中使用,比如电商会员积分、短信下发、支付倒计时甚至一些定时处理的订单状态等;总之,先衡量场景、在做好技术方案、最后放心大胆用!

相关参考: