likes
comments
collection
share

Redis Streams 教程

作者站长头像
站长
· 阅读数 44

前言:作为一个深受网络上各种CV技术内容的受害者,这里只提供纯粹的官方文档翻译,不添加一丁点个人理解,篇幅较长。有能力建议直接阅读官方文档。

严禁转载!!!(纵然屎一坨,但熏吾一人)

介绍

Redis 5.0 引入了 Redis stream 数据类型。Stream 模型类似于日志数据结构,但还实现了几个操作,以克服典型追加日志的一些限制。其中包括在 O(1) 时间内进行随机访问以及实现复杂的消费策略,如消费者组

> XREAD BLOCK 0 STREAMS mystream $

基础知识

流是一种只能追加的数据结构。基本的写入命令称为XADD,它将新条目追加到指定的流中。 每个流条目包含一个或多个字段-值对,有点像记录或Redis哈希:

> XADD mystream * sensor-id 1234 temperature 19.8
1518951480106-0

上述对XADD命令的调用将一个条目(sensor-id:1234,temperature:19.8)添加到名为mystream的键的流中,并使用自动生成的条目ID,即命令返回的那个ID,具体为1518951480106-0。它的第一个参数是键名mystream,第二个参数是在流中标识每个条目的条目ID。然而,在这种情况下,我们传递了*,因为我们希望服务器为我们生成一个新的ID。每个新的ID都会单调递增,所以更简单地说,每个添加的新条目的ID都会比过去的所有条目的ID高。服务器自动生成的ID几乎总是您想要的,明确指定ID的原因非常罕见。我们稍后会再讨论这个问题。每个流条目都有一个ID,这与日志文件类似,在日志文件中,行号或文件内的字节偏移量可以用于识别给定的条目。回到我们的XADD示例,键名和ID之后,下一个参数是组成我们的流条目的字段-值对。

可以使用XLEN命令获取流中的项目数量:

> XLEN mystream
(integer) 1

Entry IDs

XADD命令返回的条目ID,用于在给定流中唯一标识每个条目,由两部分组成:

<millisecondsTime>-<sequenceNumber>

毫秒时间部分实际上是生成流ID的本地Redis节点的本地时间,但是如果当前的毫秒时间恰好小于前一个条目的时间,那么将使用前一个条目的时间,所以如果时钟向后跳动,单调递增ID属性仍然成立。序列号用于在同一毫秒内创建的条目。由于序列号宽度为64位,在实际应用中,在同一毫秒内可以生成的条目数量没有限制。

这种ID的格式初看起来可能很奇怪,你可能会想知道为什么时间是ID的一部分。原因是Redis流支持按ID进行范围查询。因为ID与条目生成的时间有关,这使得基本上可以免费查询时间范围。我们将在介绍XRANGE命令时很快了解到这一点。

如果出于某种原因,用户需要与时间无关的增量ID,而实际上与另一个外部系统ID关联,如前所述,XADD命令可以采用显式ID,而不是触发自动生成的*通配符ID,如下面的例子:

> XADD somestream 0-1 field value
0-1
> XADD somestream 0-2 foo bar
0-2

请注意,在这种情况下,最小的ID是0-1,命令不会接受等于或小于前一个ID的ID:

> XADD somestream 0-1 foo bar
(error) ERR The ID specified in XADD is equal or smaller than the target stream top item

如果您正在运行Redis 7或更高版本,您还可以仅提供由毫秒部分组成的显式ID。在这种情况下,ID的序列部分将自动生成。要执行此操作,请使用以下语法:

> XADD somestream 0-* baz qux 
0-3

从stream中获取数据

现在我们终于可以通过XADD将条目追加到流中了。然而,虽然将数据追加到流中相当明显,但是流可以通过哪种方式查询以提取数据并不那么明显。如果我们继续使用日志文件的类比,一个明显的方法是模仿我们通常使用Unix命令tail -f执行的操作,也就是说,我们可以开始监听以获取追加到流中的新消息。请注意,与Redis的阻塞列表操作不同,其中给定元素将到达一个阻塞在类似BLPOP的弹出样式操作中的单个客户端,对于流,我们希望多个消费者能够看到追加到流中的新消息(就像许多tail -f进程可以看到添加到日志中的内容一样)。使用传统术语,我们希望流能够将消息扩散到多个客户端。

然而,这只是一种潜在的访问模式。我们还可以以完全不同的方式看待流:不是作为一种消息系统,而是作为一个时间序列存储。在这种情况下,也许获取新追加的消息也是有用的,但另一种自然的查询模式是通过时间范围获取消息,或者通过使用游标递增地检查所有历史记录来迭代消息。这绝对是另一种有用的访问模式。

最后,如果我们从消费者的角度看待流,我们可能希望以另一种方式访问流,也就是说,作为可以分割给多个处理这些消息的消费者的消息流,这样,消费者群组只能看到单个流中到达的消息的子集。通过这种方式,可以在不同的消费者之间扩展消息处理,而无需单个消费者处理所有消息:每个消费者只需获取不同的消息进行处理。这基本上就是Kafka(TM)通过消费者组所做的事情。通过消费者组读取消息是从Redis流中读取的另一种有趣模式。

Redis流通过不同的命令支持上述三种查询模式。接下来的章节将展示它们所有的内容,从最简单、最直接的使用开始:范围查询。

通过XRANGE和XREVRANGE查询

要按范围查询流,我们只需要指定两个ID,开始和结束。返回的范围将包括具有开始或结束ID的元素,因此范围是包含的。两个特殊ID - 和 + 分别表示可能的最小和最大ID。

> XRANGE mystream - +
1) 1) 1518951480106-0
   2) 1) "sensor-id"
      2) "1234"
      3) "temperature"
      4) "19.8"
2) 1) 1518951482479-0
   2) 1) "sensor-id"
      2) "9999"
      3) "temperature"
      4) "18.2"

返回的每个条目都是一个包含两个项目的数组:ID和字段值对列表。我们已经说过,条目ID与时间有关,因为"-"字符左边的部分是创建流条目时本地节点的Unix毫秒时间(但请注意,流使用完全指定的XADD命令进行复制,因此副本将具有与主服务器相同的ID)。这意味着我可以使用XRANGE查询一段时间范围。为了这样做,然而,我可能想省略ID的序列部分:如果省略,在范围的开始部分,它将被假定为0,而在结束部分,它将被假定为可用的最大序列号。这样,仅使用两个毫秒Unix时间进行查询,我们可以获取在该时间范围内生成的所有条目,以包含的方式。例如,如果我想查询一个两毫秒的时间段,我可以使用:

> XRANGE mystream 1518951480106 1518951480107
1) 1) 1518951480106-0
   2) 1) "sensor-id"
      2) "1234"
      3) "temperature"
      4) "19.8"

在这个范围内,我只有一个条目,然而在实际数据集中,我可以查询几个小时的范围,或者仅在两毫秒内就有许多项目,返回的结果可能会很大。因此,XRANGE支持在末尾添加一个可选的COUNT选项。通过指定计数,我可以只获取前N个项目。如果我想要更多,我可以获取返回的最后一个ID,将序列部分递增1,并再次查询。让我们在以下示例中看到这一点。我们从XADD开始添加10个项目(我不会展示这一点,假设流mystream已经填充了10个项目)。要开始我的迭代,每次命令获取2个项目,我从完整范围开始,但计数为2。

> XRANGE mystream - + COUNT 2
1) 1) 1519073278252-0
   2) 1) "foo"
      2) "value_1"
2) 1) 1519073279157-0
   2) 1) "foo"
      2) "value_2"

为了继续迭代并获得接下来的两个项目,我必须选择返回的最后一个ID,即1519073279157-0,并为其添加前缀(。在这种情况下,结果排除范围间隔(本例中为(1519073279157-0)现在可以用作下一个XRANGE调用的新开始参数:

> XRANGE mystream (1519073279157-0 + COUNT 2
1) 1) 1519073280281-0
   2) 1) "foo"
      2) "value_3"
2) 1) 1519073281432-0
   2) 1) "foo"
      2) "value_4"

依此类推。由于XRANGE的复杂度是O(log(N))进行查找,然后O(M)返回M个元素,所以使用较小计数的命令具有对数时间复杂度,这意味着每次迭代的步骤都很快。因此,XRANGE也是实际上的流迭代器,不需要XSCAN命令。

XREVRANGE命令与XRANGE等效,但是按相反的顺序返回元素,因此XREVRANGE的实际用途是检查流中的最后一项:

> XREVRANGE mystream + - COUNT 1
1) 1) 1519073287312-0
   2) 1) "foo"
      2) "value_10"

请注意,XREVRANGE命令以相反的顺序接受开始和停止参数。

用XREAD监听新item

当我们不想按范围访问流中的项目时,我们通常需要的是订阅到达流的新项目。这个概念看起来与Redis Pub/Sub有关,你可以订阅一个频道,或者使用Redis阻塞列表,等待一个键获取要提取的新元素,但在消费流的方式上有根本性的区别:

  1. 流可以有多个客户端(消费者)等待数据。默认情况下,每个新项目都会发送给正在等待给定流中数据的每个消费者。这种行为与阻塞列表不同,每个消费者将获得不同的元素。然而,向多个消费者传播的能力类似于Pub/Sub。
  2. 虽然在Pub/Sub中消息是fire and forget类型且永远不会存储,而在使用阻塞列表时,当客户端接收到消息时,它会从列表中弹出(有效地删除),但流的工作方式有根本不同。所有的消息都会无限期地附加到流中(除非用户明确要求删除条目):不同的消费者会通过记住收到的最后一条消息的ID来了解从其角度来看什么是新消息。
  3. 流消费者组提供了Pub/Sub或阻塞列表无法实现的功能,同一stream的不同组,消息处理的明确确认,检查挂起的消息能力,声明未处理的消息,以及对每个单独客户端的一致历史可见性,也就是只能查看其自己的过去历史消息。

提供监听流中新消息到达能力的命令叫做XREAD。它比XRANGE稍微复杂一些,所以我们将从简单形式开始,稍后将提供整个命令布局。

> XREAD COUNT 2 STREAMS mystream 0
1) 1) "mystream"
   2) 1) 1) 1519073278252-0
         2) 1) "foo"
            2) "value_1"
      2) 1) 1519073279157-0
         2) 1) "foo"
            2) "value_2"

以上是XREAD的非阻塞形式。请注意,COUNT选项不是必需的,事实上,命令的唯一必需选项是STREAMS选项,它为调用消费者的每个流指定一组键和相应的最大ID,这样命令就只会为客户端提供大于我们指定的ID的消息。

在上面的命令中,我们写了STREAMS mystream 0,所以我们想要所有在Stream mystream中ID大于0-0的消息。正如上面的例子所示,命令返回键名,因为实际上可以使用多个键同时从不同的流中调用此命令。例如,我可以写:STREAMS mystream otherstream 0 0。请注意,在STREAMS选项之后,我们需要提供键名,然后是ID。因此,STREAMS选项必须始终是最后一个选项。其他任何选项都必须在STREAMS选项之前。

除了XREAD可以一次访问多个流以及我们能够为每个流指定最后一个ID以仅获取更新的消息之外,在这种简单形式下,该命令与XRANGE没有太大区别。然而,有趣的部分是我们可以通过指定BLOCK参数轻松地将XREAD转换为阻塞命令:

> XREAD BLOCK 0 STREAMS mystream $

请注意,在上面的示例中,除了删除COUNT,我还使用超时为0毫秒(即永不超时)的新BLOCK选项。此外,对于流mystream,我使用了特殊的ID $,而不是传递常规ID。这个特殊ID意味着XREAD应该使用作为最后一个ID存储在流mystream中的最大ID,这样我们只会收到新消息,从我们开始监听的时候起。这在某种程度上类似于Unix命令tail -f

请注意,当使用BLOCK选项时,我们不必使用特殊ID 。我们可以使用任何有效的ID。如果命令可以立即同步地为我们的请求提供服务而不会阻塞,它会这样做,否则它会阻塞。通常,如果我们要从新条目开始消费流,我们从ID。我们可以使用任何有效的ID。如果命令可以立即同步地为我们的请求提供服务而不会阻塞,它会这样做,否则它会阻塞。通常,如果我们要从新条目开始消费流,我们从ID 。我们可以使用任何有效的ID。如果命令可以立即同步地为我们的请求提供服务而不会阻塞,它会这样做,否则它会阻塞。通常,如果我们要从新条目开始消费流,我们从ID开始,然后继续使用收到的最后一条消息的ID进行下一次调用,依此类推。

XREAD的阻塞形式还可以同时监听多个流,只需指定多个键名。如果请求可以同步提供服务,因为至少有一个流的元素大于我们指定的相应ID,它将返回结果。否则,该命令将阻塞,并返回第一个获取新数据的流的项目(根据指定的ID)。

与阻塞列表操作类似,阻塞流读取对于等待数据的客户端来说是公平的,因为语义是FIFO风格。第一个阻塞给定流的客户端将是第一个在有新项目可用时取消阻塞的客户端。

XREAD除了COUNTBLOCK之外没有其他选项,所以它是一个相当基本的命令,具有特定的目的,将消费者附加到一个或多个流。使用消费者组API可以获得更强大的功能来消费流,但是通过消费者组阅读是由另一个名为XREADGROUP的命令实现的,将在本指南的下一部分介绍。

Consumer groups

当手头上的任务是从不同的客户端消耗相同的流时,XREAD已经提供了一种向N个客户端分发的方法,并且还可以使用副本来提供更多的读取可伸缩性。然而,在某些问题中,我们想要做的不是向许多客户端提供相同的消息流,而是向许多客户端提供相同流的不同子集。这很有用的显然的情况是处理消息的速度很慢:有N个不同的工人接收不同的部分的流的能力允许我们扩展消息处理,通过将不同的消息路由到准备做更多工作的不同工人。

实际上,如果我们想象有三个消费者C1、C2、C3,并且流包含消息1、2、3、4、5、6、7,那么我们想要按照以下图表提供消息:

1 -> C1
2 -> C2
3 -> C3
4 -> C1
5 -> C2
6 -> C3
7 -> C1

为了实现这一点,Redis使用了一个称为消费者组的概念。从实现的角度来看,重要的是要理解Redis消费者组与Kafka(TM)消费者组没有任何关系。然而,它们在功能上是相似的,所以我决定保留Kafka(TM)的术语,因为它最初普及了这个想法。

消费者组就像一个从流获取数据的伪消费者,实际上服务于多个消费者,提供特定的保证:

  • 每条消息都会被不同的消费者接收,因此不可能将相同的消息传递给多个消费者。
  • 消费者在消费者组内被名称识别,该名称是客户端实现消费者必须选择的区分大小写字符串。这意味着即使断开连接后,流消费者组仍然保留所有状态,因为客户端将再次声称自己是相同的消费者。然而,这也意味着客户端必须提供唯一的标识符。
  • 每个消费者组都有从未消费的第一个ID的概念,因此当消费者请求新消息时,它可以提供以前从未传递过的消息。
  • 但是,消费消息需要使用特定命令明确确认。Redis将确认解释为:此消息已正确处理,因此可以从消费者组中删除。
  • 消费者组跟踪当前挂起的所有消息,也就是说,已经传递给消费者组的某个消费者的消息,但尚未被确认为已处理。由于这个功能,当访问流的消息历史时,每个消费者将只看到传递给它的消息。

从这个角度来看,消费者组可以被想象为关于流的一些状态:

+----------------------------------------+
| consumer_group_name: mygroup           |
| consumer_group_stream: somekey         |
| last_delivered_id: 1292309234234-92    |
|                                        |
| consumers:                             |
|    "consumer-1" with pending messages  |
|       1292309234234-4                  |
|       1292309234232-8                  |
|    "consumer-42" with pending messages |
|       ... (and so forth)               |
+----------------------------------------+

如果你从这个角度看待它,那么理解消费者组可以做什么,它如何只提供消费者的挂起消息历史,以及请求新消息的消费者如何仅被大于last_delivered_id的消息服务,就非常简单。与此同时,如果把消费者组看作Redis流的辅助数据结构,那么单个流可以有多个消费者组,拥有不同的消费者集合,就是显而易见的。实际上,同一流可以有不通过消费者组的XREAD读取的客户端,以及通过XREADGROUP在不同消费者组中读取的客户端。

现在是时候放大来看看基本的消费者组命令。它们如下:

  • XGROUP用于创建、销毁和管理消费者组。
  • XREADGROUP用于通过消费者组读取流。
  • XACK是允许消费者标记待处理消息为正确处理的命令。 通过使用这些命令,你可以创建消费者组,为不同的消费者分配不同的消息,并在消息处理完成后确认消息已被正确处理。这样,你就可以以分布式方式处理大量消息,以提高效率和扩展性。

创建一个consumer group

假设我已经有一个类型为 stream 的键 mystream,那么为了创建消费者组,我只需要执行以下操作:

> XGROUP CREATE mystream mygroup $
OK

如上面的命令所示,在创建消费者组时,我们必须指定一个 ID,在本例中是 。这是必需的,因为消费者组必须知道下一个要提供的消息是什么,即在第一个消费者连接时,消息组的最后一条消息ID是什么。如果我们像示例中那样提供。这是必需的,因为消费者组必须知道下一个要提供的消息是什么,即在第一个消费者连接时,消息组的最后一条消息 ID 是什么。如果我们像示例中那样提供 。这是必需的,因为消费者组必须知道下一个要提供的消息是什么,即在第一个消费者连接时,消息组的最后一条消息ID是什么。如果我们像示例中那样提供,那么只有从现在开始到达流中的新消息才会提供给组中的消费者。如果我们指定 0,则消费者组将一开始消耗流的所有消息。当然,您可以指定其他任何有效的 ID。您知道的是,消费者组将开始传递大于您指定的 ID 的消息。因为 表示流中的当前最大ID,指定表示流中的当前最大 ID,指定表示流中的当前最大ID,指定 将导致仅消耗新消息。

XGROUP CREATE 同样支持在不存在的情况下自动创建流,只需将可选的 MKSTREAM 子命令作为最后一个参数:

> XGROUP CREATE newstream mygroup $ MKSTREAM
OK

现在已经创建了消费者组,我们可以立即使用 XREADGROUP 命令从消费者组读取消息。我们将从两个消费者读取,分别称为 Alice 和 Bob,以查看系统如何向 Alice 或 Bob 返回不同的消息

XREADGROUP 与 XREAD 非常相似,提供相同的 BLOCK 选项,否则它是一个同步命令。然而,有一个必须始终指定的选项,即 GROUP,有两个参数:消费者组的名称和试图读取的消费者的名称。选项 COUNT 也被支持,与 XREAD 中的选项相同。

在从流中读取之前,让我们先在里面放一些消息:

> XADD mystream * message apple
1526569495631-0
> XADD mystream * message orange
1526569498055-0
> XADD mystream * message strawberry
1526569506935-0
> XADD mystream * message apricot
1526569535168-0
> XADD mystream * message banana
1526569544280-0

注意:这里的 message 是字段名称,fruit 是相关值,请记住stream item是一个小型字典。

是时候试着使用消费者组读取了:

> XREADGROUP GROUP mygroup Alice COUNT 1 STREAMS mystream >
1) 1) "mystream"
   2) 1) 1) 1526569495631-0
         2) 1) "message"
            2) "apple"

XREADGROUP 回复就像 XREAD 回复。然而,请注意上面提供的GROUP  。它表示我想使用消费者组 mygroup 从流中读取,并且我是消费者 Alice。每当消费者使用消费者组执行操作时,都必须指定其名称,以在组内唯一标识此消费者。

上面的命令行中有另一个非常重要的细节,在强制性的 STREAMS 选项之后,请求的 mystream 键的 ID 是特殊 ID >。此特殊 ID 仅在消费者组的上下文中有效,意味着:迄今为止从未向其他消费者传递过的消息。

这通常是您想要的,但也可以指定一个真实的 ID,例如 0 或其他有效的 ID。在这种情况下,我们从 XREADGROUP 请求仅提供未处理消息的历史,并且在这种情况下,永远不会在组中看到新消息。因此,基于我们指定的 ID,XREADGROUP 具有以下行为:

  • 如果ID是特殊ID >,则命令将仅返回从未传递给其他消费者的新消息,并作为副作用更新消费者组的最后一个ID。
  • 如果 ID 是其他有效的数字 ID,则该命令将允许我们访问未决消息的历史记录,也就是说,已经发送到此指定消费者(由提供的名称标识)但尚未使用 XACK 确认的消息集。 可以立即指定 ID 为 0 来测试此行为,不指定 COUNT 选项:我们只会看到一条待处理消息,即关于苹果的消息:
> XREADGROUP GROUP mygroup Alice STREAMS mystream 0
1) 1) "mystream"
   2) 1) 1) 1526569495631-0
         2) 1) "message"
            2) "apple"

然而,如果我们确认消息已处理,它将不再是未决消息历史的一部分,因此系统将不再报告任何内容:

> XACK mystream mygroup 1526569495631-0
(integer) 1
> XREADGROUP GROUP mygroup Alice STREAMS mystream 0
1) 1) "mystream"
   2) (empty list or set)

不用担心,如果您还不知道 XACK 如何工作,其理念仅仅是已处理的消息不再是我们可以访问的历史的一部分。

现在轮到鲍勃读些东西了:

> XREADGROUP GROUP mygroup Bob COUNT 2 STREAMS mystream >
1) 1) "mystream"
   2) 1) 1) 1526569498055-0
         2) 1) "message"
            2) "orange"
      2) 1) 1526569506935-0
         2) 1) "message"
            2) "strawberry"

Bob 请求最多两条消息,并通过同一个组 mygroup 读取。因此,Redis 仅报告新消息。如您所见,"apple" 消息未被交付,因为它已经被交付给了 Alice,因此 Bob 获得了 orange 和 strawberry 等等。

这样,Alice、Bob和组中的其他消费者就可以从同一个流读取不同的消息,读取尚未处理的消息的历史记录,或将消息标记为已处理。这允许为消费流中的消息创建不同的拓扑和语义。

需要注意几点:

  • 消费者在第一次提到时会自动创建,无需明确创建。
  • 即使使用 XREADGROUP,您也可以同时从多个键读取,但是要实现此功能,您需要在每个流中使用相同名称创建消费者组。这不是一个常见的需求,但值得一提的是,该功能在技术上是可用的。
  • XREADGROUP 是一个写命令,因为即使它从流中读取,但是消费者组由于读取而被修改,因此它只能在主实例上调用。

以下是 Ruby 语言中使用消费者组的消费者实现的示例,这段 Ruby 代码的目的是让任何有经验的程序员都能读懂,即使他们不熟悉 Ruby 语言:

require 'redis'

if ARGV.length == 0
    puts "Please specify a consumer name"
    exit 1
end

ConsumerName = ARGV[0]
GroupName = "mygroup"
r = Redis.new

def process_message(id,msg)
    puts "[#{ConsumerName}] #{id} = #{msg.inspect}"
end

$lastid = '0-0'

puts "Consumer #{ConsumerName} starting..."
check_backlog = true
while true
    # Pick the ID based on the iteration: the first time we want to
    # read our pending messages, in case we crashed and are recovering.
    # Once we consumed our history, we can start getting new messages.
    if check_backlog
        myid = $lastid
    else
        myid = '>'
    end

    items = r.xreadgroup('GROUP',GroupName,ConsumerName,'BLOCK','2000','COUNT','10','STREAMS',:my_stream_key,myid)

    if items == nil
        puts "Timeout!"
        next
    end

    # If we receive an empty reply, it means we were consuming our history
    # and that the history is now empty. Let's start to consume new messages.
    check_backlog = false if items[0][1].length == 0

    items[0][1].each{|i|
        id,fields = i

        # Process the message
        process_message(id,fields)

        # Acknowledge the message as processed
        r.xack(:my_stream_key,GroupName,id)

        $lastid = id
    }
end

可以看到,这里的想法是从消费历史开始,也就是我们的待处理消息列表。这是有用的,因为消费者可能以前已经崩溃,因此在重新启动的情况下,我们希望重新读取已经传递给我们但没有得到确认的消息。请注意,我们可能会多次处理消息,或者一次处理消息(至少在消费者失败的情况下,但也有 Redis 持久性和复制的限制,请参见关于此主题的特定章节)。

一旦历史消息被消费,并且我们得到了一个空消息列表,我们可以切换到使用特殊ID来消费新消息。

故障恢复

以上示例允许我们编写参与相同消费者组的消费者,每个消费者处理一个消息子集,在从故障中恢复时重新读取仅对它们发送的待处理消息。但是在现实世界中,消费者可能永久失败,永远不会恢复。如果消费者由于任何原因停止,未恢复的待处理消息会发生什么情况?

Redis 消费者组提供了一个用于这种情况的功能,以便声明给定消费者的待处理消息,使这些消息更改所有权并重新分配给不同的消费者。该功能非常明确。消费者必须检查待处理消息的列表,并必须使用特殊命令声明特定消息,否则服务器将永远保留待处理的消息并分配给旧消费者。这样,不同的应用程序可以选择是否使用此功能,以及如何使用它。

XPENDING 指令命令是这个过程的第一步,它提供了消费者组中待处理条目的可观察性。这是一个只读指令,始终安全调用,不会改变任何消息的所有权。在最简单的形式中,该指令带有两个参数,分别是流的名称和消费者组的名称。

> XPENDING mystream mygroup
1) (integer) 2
2) 1526569498055-0
3) 1526569506935-0
4) 1) 1) "Bob"
      2) "2"

当以这种方式调用命令时,该命令将输出消费者组中待处理消息的总数(在这种情况下为两条),待处理消息中的最低和最高消息 ID,最后是消费者列表和他们有多少条待处理消息。我们只有 Bob 有两条待处理消息,因为 Alice 请求的单条消息已使用 XACK 被确认。

我们可以通过向 XPENDING 提供更多参数来请求更多信息,因为完整的命令签名如下:

XPENDING <key> <groupname> [[IDLE <min-idle-time>] <start-id> <end-id> <count> [<consumer-name>]]

通过提供起始 ID 和结束 ID(可以是 XRANGE 中的 - 和 +)以及用于控制命令返回的信息量的计数,我们可以了解更多未决消息的信息。可选的最后一个参数是消费者名称,如果我们想限制仅输出给定消费者的消息,但在以下示例中不使用该功能。

> XPENDING mystream mygroup - + 10
1) 1) 1526569498055-0
   2) "Bob"
   3) (integer) 74170458
   4) (integer) 1
2) 1) 1526569506935-0
   2) "Bob"
   3) (integer) 74170458
   4) (integer) 1

现在,我们拥有每条消息的详细信息:ID、消费者名称、闲置时间(以毫秒为单位,即自上次消息传递给某个消费者以来经过的毫秒数),最后是给定消息传递的次数。我们有两条来自 Bob 的消息,它们闲置了 74170458 毫秒,约为 20 小时。

请注意,没有任何限制阻止我们通过仅使用 XRANGE 检查第一条消息内容。

> XRANGE mystream 1526569498055-0 1526569498055-0
1) 1) 1526569498055-0
   2) 1) "message"
      2) "orange"

我们只需在参数中重复相同的ID。现在我们已经有了一些想法,爱丽丝可能决定,在不处理消息的20个小时后,鲍勃可能不会及时恢复,现在是时候声明这些消息并代替鲍勃恢复处理了。为此,我们使用 XCLAIM 命令。

这个命令很复杂,它的完整形式充满了选项,因为它用于复制消费者组的变化,但我们通常只使用所需的参数。在这种情况下,它非常简单:

XCLAIM <key> <group> <consumer> <min-idle-time> <ID-1> <ID-2> ... <ID-N>

基本上我们说,对于这个特定的键和组,我希望指定的消息 ID 会改变所有权,并分配给指定的消费者名称。然而,我们也提供了最小的空闲时间,因此该操作仅在提到的消息的空闲时间大于指定的空闲时间时才能工作。这很有用,因为也许两个客户端同时试图认领消息:

Client 1: XCLAIM mystream mygroup Alice 3600000 1526569498055-0
Client 2: XCLAIM mystream mygroup Lora 3600000 1526569498055-0

然而,作为一个副作用,声明一条消息将重置其空闲时间并增加其交付计数器,因此第二个客户端将无法声明。这样,我们可以避免对消息进行简单的重处理(即使在一般情况下您无法获得精确的处理)。

这是命令执行的结果:

> XCLAIM mystream mygroup Alice 3600000 1526569498055-0
1) 1) 1526569498055-0
   2) 1) "message"
      2) "orange"

消息已被 Alice 成功声明,她现在可以处理并确认该消息,即使原始消费者未恢复,也可以推进事情。

从上面的示例可以明显看出,作为成功声明给定消息的副作用,XCLAIM 命令也会返回它。然而,这不是强制的。可以使用 JUSTID 选项仅返回成功声明的消息的 ID。如果您希望减少客户端和服务器之间的带宽使用(以及命令的性能),并且不对消息感兴趣,因为您的消费者的实现方式是它将不时重新扫描待处理消息的历史记录,那么这是有用的。

声明也可以由单独的进程实现:只检查待处理消息的列表,并将空闲消息分配给似乎处于活动状态的消费者。可以使用 Redis 流的可观察性功能之一获取活动消费者。这是下一节的主题。

Automatic claiming

Redis 6.2 新增了 XAUTOCLAIM 命令,实现了上面描述的认领过程。XPENDING 和 XCLAIM 提供了不同类型恢复机制的基础构建块。该命令通过 Redis 管理该过程进行优化,并为大多数恢复需求提供了简单的解决方案。

XAUTOCLAIM 标识空闲待处理消息,并将其所有权转移给消费者。该命令的签名如下:

XAUTOCLAIM <key> <group> <consumer> <min-idle-time> <start> [COUNT count] [JUSTID]

在上面的示例中,我可以使用自动声明来声明单个消息,如下:

> XAUTOCLAIM mystream mygroup Alice 3600000 0-0 COUNT 1
1) 1526569498055-0
2) 1) 1526569498055-0
   2) 1) "message"
      2) "orange"

与XCLAIM命令类似,该命令将回复一个已声明消息的数组,但它还返回一个流ID,该流ID允许迭代待处理条目。流ID是一个游标,我可以在下一次调用中使用它,以继续声明空闲的待处理消息:

> XAUTOCLAIM mystream mygroup Lora 3600000 1526569498055-0 COUNT 1
1) 0-0
2) 1) 1526569506935-0
   2) 1) "message"
      2) "strawberry"

当XAUTOCLAIM返回“0-0”流ID作为游标时,表示它已经到达了消费者组待处理条目列表的末尾。这并不意味着没有新的闲置待处理消息,因此该过程会通过从流的开头调用XAUTOCLAIM来继续进行。

Claiming and the delivery counter

你在 XPENDING 输出中观察到的计数器是每个消息传递的次数。当消息通过 XCLAIM 成功被声明或者使用 XREADGROUP 调用以访问待处理消息的历史记录时,计数器会以两种方式递增。

当存在失败时,通常会出现消息被传递多次的情况,但最终它们通常会被处理和确认。但是,可能会出现某些特定消息无法被处理的问题,因为它们已经损坏或被制作成触发处理代码中错误的方式。在这种情况下,消费者将继续无法处理此特定消息。因为我们有传递尝试次数的计数器,我们可以使用该计数器来检测某些原因无法处理的消息。因此,一旦传递计数器达到您选择的给定大数字,最明智的做法可能是将这些消息放在另一个流中并向系统管理员发送通知。这基本上就是Redis Streams实现死信概念的方式。

Streams observability

消息系统如果缺乏可观察性,将会非常难以处理。不知道谁在消费消息,哪些消息在等待处理,给定流中活动的消费者组集合,所有这些都会变得不透明。因此,Redis Streams和消费者组有不同的观察方式。我们已经介绍了XPENDING,它允许我们检查在给定时刻正在处理的消息列表,以及它们的空闲时间和交付次数。

然而,我们可能希望做更多的事情,而XINFO命令是一个可观察性接口,可与子命令一起使用,以获取有关流或使用者组的信息。

这个命令使用子命令来显示关于流和其消费者组状态的不同信息。例如,XINFO STREAM 报告有关流本身的信息。

> XINFO STREAM mystream
 1) "length"
 2) (integer) 2
 3) "radix-tree-keys"
 4) (integer) 1
 5) "radix-tree-nodes"
 6) (integer) 2
 7) "last-generated-id"
 8) "1638125141232-0"
 9) "max-deleted-entryid"
10) "0-0"
11) "entries-added"
12) (integer) 2
13) "groups"
14) (integer) 1
15) "first-entry"
16) 1) "1638125133432-0"
    2) 1) "message"
       2) "apple"
17) "last-entry"
18) 1) "1638125141232-0"
    2) 1) "message"
       2) "banana"

输出显示有关流如何在内部进行编码的信息,并显示流中的第一个和最后一个消息。可用的另一条信息是与此流相关联的使用者组的数量。我们可以进一步挖掘,要求有关使用者组的更多信息。

> XINFO GROUPS mystream
1)  1) "name"
    2) "mygroup"
    3) "consumers"
    4) (integer) 2
    5) "pending"
    6) (integer) 2
    7) "last-delivered-id"
    8) "1638126030001-0"
    9) "entries-read"
   10) (integer) 2
   11) "lag"
   12) (integer) 0
2)  1) "name"
    2) "some-other-group"
    3) "consumers"
    4) (integer) 1
    5) "pending"
    6) (integer) 0
    7) "last-delivered-id"
    8) "1638126028070-0"
    9) "entries-read"
   10) (integer) 1
   11) "lag"
   12) (integer) 1

如你所见,在此输出和先前输出中,XINFO 命令输出一系列的字段-值项目。因为它是一个可观测性命令,这允许人类用户立即了解报告的信息,并且通过添加更多字段来报告更多信息,而不会破坏与旧客户端的兼容性。其他必须更具带宽效率的命令,比如 XPENDING,只报告信息而不报告字段名。

使用 GROUPS 子命令的上述示例输出,应该通过观察字段名称清晰可见。我们可以通过检查在组中注册的消费者来更详细地检查特定消费者组的状态。

> XINFO CONSUMERS mystream mygroup
1) 1) name
   2) "Alice"
   3) pending
   4) (integer) 1
   5) idle
   6) (integer) 9104628
2) 1) name
   2) "Bob"
   3) pending
   4) (integer) 1
   5) idle
   6) (integer) 83841983

如果您不记得命令的语法,请直接询问该命令以获取帮助:

> XINFO HELP
1) XINFO <subcommand> [<arg> [value] [opt] ...]. Subcommands are:
2) CONSUMERS <key> <groupname>
3)     Show consumers of <groupname>.
4) GROUPS <key>
5)     Show the stream consumer groups.
6) STREAM <key> [FULL [COUNT <count>]
7)     Show information about the stream.
8) HELP
9)     Prints this help.

和kafka分区之间的区别

Redis流中的消费者组可能在某些方面类似于Kafka(TM)基于分区的消费者组,但请注意,实际上Redis流非常不同。分区仅仅是逻辑上的,消息仅被放置在一个Redis键中,因此不同的客户端被服务的方式基于谁准备处理新消息,而不是来自客户端正在读取哪个分区。例如,如果消费者C3在某个时刻永久性地失败了,Redis将继续为C1和C2提供所有到达的新消息,就好像现在只有两个逻辑分区。

同样地,如果给定的消费者处理消息的速度比其他消费者快得多,那么在同一时间单位内,该消费者将会接收到比例更多的消息。这是可能的,因为Redis明确跟踪所有未确认的消息,并记住谁接收了哪个消息以及第一个从未传递给任何消费者的消息的ID。

然而,这也意味着在 Redis 中,如果你真的想要将同一流中的消息分区到多个 Redis 实例中,你必须使用多个键和一些分片系统,比如 Redis Cluster 或其他应用程序特定的分片系统。单个 Redis 流不会自动分区到多个实例。

我们可以简单地说以下内容是正确的:

  • 如果你使用1个流->1个消费者,你会按顺序处理消息。
  • 如果你使用N个流和N个消费者,这样只有给定的消费者会处理N个流的一个子集,你可以扩展上述1个流->1个消费者的模型。
  • 如果你使用1个流->N个消费者,你正在将负载平衡到N个消费者,然而在这种情况下,关于相同逻辑项的消息可能会被以乱序的方式消费,因为一个给定的消费者可能会比另一个消费者更快地处理第3条消息而不是第4条消息。

所以基本上 Kafka 分区更类似于使用 N 个不同的 Redis 键,而 Redis 消费者组是将给定流的消息在服务器端负载均衡到 N 个不同的消费者的系统。

流的上限

许多应用程序不想永远将数据收集到流中。有时在流中最多拥有给定数量的项目非常有用,而其他时候,一旦达到给定的大小,将数据从Redis移动到不在内存中且不如快速存储适合存储未来可能长达数十年的历史记录。Redis流对此提供了一些支持。其中之一是XADD命令的MAXLEN选项。这个选项非常容易使用:

> XADD mystream MAXLEN 2 * value 1
1526654998691-0
> XADD mystream MAXLEN 2 * value 2
1526654999635-0
> XADD mystream MAXLEN 2 * value 3
1526655000369-0
> XLEN mystream
(integer) 2
> XRANGE mystream - +
1) 1) 1526654999635-0
   2) 1) "value"
      2) "2"
2) 1) 1526655000369-0
   2) 1) "value"
      2) "3"

使用 MAXLEN 命令,当达到指定长度时,旧条目会自动被驱逐,以便使流保持在恒定大小。目前没有选项可以告诉流只保留不超过给定时间段的项目,因为这样的命令为了保持一致性,可能会阻塞很长时间以驱逐项目。例如,想象一下如果有一个插入峰值,然后长时间暂停,再插入另一个项目,所有这些项目都有相同的最大时间。流会阻塞以驱逐在暂停期间变得太旧的数据。因此,用户需要进行一些规划并了解所需的最大流长度。此外,尽管流的长度与使用的内存成比例,但按时间修剪不太容易控制和预测:它取决于插入速率,而插入速率通常随时间变化(当它不变时,只按大小修剪是微不足道的)。

然而,使用 MAXLEN 进行修剪可能很耗费资源:流被表示为基数树中的宏节点,以实现非常高效的内存使用。改变由少量元素组成的单个宏节点并不是最优的。因此,可以使用以下特殊形式的指令命令:

XADD mystream MAXLEN ~ 1000 * ... entry fields here ...

"MAXLEN" 选项和实际计数之间的 ~ 参数意味着,我不需要确切地 1000 个项目。它可以是 1000 或 1010 或 1030,只要确保至少保存 1000 个项目。使用此参数仅在可以删除整个节点时才执行修剪。这使得它更加高效,并且通常是您想要的。

还有一个XTRIM命令,执行的操作与上面的MAXLEN选项非常相似,不同之处在于它可以独立运行:

> XTRIM mystream MAXLEN 10

或者,针对XADD选项:

> XTRIM mystream MAXLEN ~ 10

然而,XTRIM被设计成接受不同的修剪策略。另一种修剪策略是MINID,它驱逐低于指定ID的条目。

由于XTRIM是一个明确的命令,用户应该知道不同修剪策略可能存在的缺点。

另一个未来可能添加到 XTRIM 的有用的驱逐策略是通过 ID 范围进行删除,以便在需要将数据从 Redis 移动到其他存储系统时更轻松地使用 XRANGE 和 XTRIM。

关于Stream ID的API

你可能已经注意到了在Redis API中可以使用几个特殊的ID。以下是一个简短的总结,以便在未来更好地理解它们。

第一个特殊的 ID 是“-”,第二个特殊的 ID 是“+”,并且在 XRANGE 命令的范围查询中使用。这两个 ID 分别表示可能的最小 ID(基本上是 0-1),和可能的最大 ID(即 18446744073709551615-18446744073709551615)。如您所见,使用“-”和“+”代替这些数字会更加清晰简洁。

然后有一些API需要指定流中ID最大的项的ID。这就是的含义。例如,如果我只想要使用XREADGROUP获取新条目,我使用这个ID来表示我已经拥有所有现有的条目,但没有未来将插入的新条目。同样,当我创建或设置消费者组的ID时,我可以将最后传递的项目设置为的含义。例如,如果我只想要使用XREADGROUP获取新条目,我使用这个ID来表示我已经拥有所有现有的条目,但没有未来将插入的新条目。同样,当我创建或设置消费者组的ID时,我可以将最后传递的项目设置为的含义。例如,如果我只想要使用XREADGROUP获取新条目,我使用这个ID来表示我已经拥有所有现有的条目,但没有未来将插入的新条目。同样,当我创建或设置消费者组的ID时,我可以将最后传递的项目设置为,以便只向组中的消费者传递新条目。

如您所见,不表示+,它们是两个不同的符号,因为+是在每个流中可能出现的最大ID,而不表示 +,它们是两个不同的符号,因为 + 是在每个流中可能出现的最大 ID,而不表示+,它们是两个不同的符号,因为+是在每个流中可能出现的最大ID,而 则是在包含给定条目的给定流中的最大 ID。此外,API 通常只能理解 + 或 $,但避免使给定符号具有多重含义是有用的。

另一个特殊的 ID 是 >,它仅与使用 XREADGROUP 命令相关的消费者组有关,并具有特殊含义。该特殊 ID 表示我们只想获取尚未传递给其他消费者的条目。因此,> ID 是消费者组的最后一个传递的 ID。

最后那个特殊的 ID *,只能与 XADD 命令一起使用,意味着自动为新条目选择一个 ID。

所以我们有-, +, $, > 和*,它们都有不同的意义,并且大多数情况下可以在不同的上下文中使用。

持久化、复制和消息安全

一个 Stream,和 Redis 的其他数据结构一样,会被异步地复制到副本并被持久化到 AOF 和 RDB 文件。然而可能不太明显的是,消费者组的全部状态也会被传播到 AOF、RDB 和副本中,所以如果一个消息在主节点上是待处理状态,那么副本也会有相同的信息。同样地,在重启后,AOF 会恢复消费者组的状态。

然而请注意,Redis流和消费者组是使用Redis默认复制进行持久化和复制的,因此:

  • AOF 必须与强 fsync 策略一起使用,如果消息的持久性在您的应用程序中很重要。
  • 默认情况下,异步复制将不保证 XADD 命令或消费者组状态更改被复制:在故障转移后,根据副本接收来自主节点的数据的能力,可能会缺少某些内容。
  • WAIT 命令可用于强制将更改传播到一组副本。但请注意,尽管这使数据丢失的可能性非常小,但 Sentinel 或 Redis Cluster 运行的 Redis 故障转移过程仅进行了最佳努力检查以故障转移到最新的副本,并且在某些特定的故障条件下可能会晋升缺少某些数据的副本。

因此,在使用 Redis streams 和消费者组设计应用程序时,请确保理解应用程序在失败时应具有的语义属性,并相应地配置事物,评估它是否足够安全以适合您的用例。

从流中移除单个项

Streams还有一个特殊命令,可以通过ID从流中删除项目。对于仅追加的数据结构来说,这似乎是一个奇怪的功能,但它实际上对于涉及隐私规定的应用程序非常有用。该命令称为XDEL,接收流的名称,后跟要删除的ID:

> XRANGE mystream - + COUNT 2
1) 1) 1526654999635-0
   2) 1) "value"
      2) "2"
2) 1) 1526655000369-0
   2) 1) "value"
      2) "3"
> XDEL mystream 1526654999635-0
(integer) 1
> XRANGE mystream - + COUNT 2
1) 1) 1526655000369-0
   2) 1) "value"
      2) "3"

然而在当前的实现中,只有当一个宏节点完全为空时,内存才会真正被回收,因此您不应滥用此功能。

空Stream

流和其他Redis数据结构之间的差异在于,当调用删除元素的命令后,其他数据结构不再具有任何元素时,键本身将被删除。因此,例如,当调用ZREM删除排序集合中的最后一个元素时,排序集合将被完全删除。另一方面,流允许保持零个元素,这既可以通过使用带有计数为零的MAXLEN选项(XADD和XTRIM命令),也可以通过调用XDEL命令实现。

因为Streams可能有关联的consumer groups,而我们不想因为流中不再有任何项而丢失consumer groups定义的状态,所以存在这种不对称性。目前即使没有关联的consumer groups,流也不会被删除。

消费一条消息的总延迟

非阻塞流命令(例如XRANGE和没有BLOCK选项的XREAD或XREADGROUP)像任何其他Redis命令一样同步执行,因此讨论此类命令的延迟是没有意义的:更有趣的是检查Redis文档中命令的时间复杂度。可以说流命令至少与提取范围的有序集合命令一样快,并且如果使用管道技术,XADD非常快,并且可以轻松地在平均计算机上每秒插入50万到100万个项目。

然而,如果我们想要理解在消费者组中阻塞消费者的情况下,从使用 XADD 生产消息到使用 XREADGROUP 返回消息时消费者处理消息的延迟,延迟就成为了一个有趣的参数。

如何为被屏蔽的消费者提供服务

在提供执行测试结果之前,了解Redis在路由流消息时使用的模型(以及通常管理等待数据的任何阻塞操作的方式)是很有趣的。

  • 被阻止的客户端被引用在哈希表中,该哈希表将至少有一个阻止消费者的键映射到等待该键的消费者列表。通过这种方式,给定接收到数据的键,我们可以解决等待该数据的所有客户端。
  • 当发生写操作时,在此情况下调用XADD命令时,它将调用signalKeyAsReady()函数。该函数将把键放入需要处理的键列表中,因为这些键可能有新的数据供被阻塞的消费者使用。请注意,这些准备就绪的键将在稍后处理,因此在同一事件循环周期内,该键可能会接收其他写操作。
  • 最后,在返回事件循环之前,将最终处理准备好的键。对于每个键,扫描等待数据的客户端列表,如果适用,这些客户端将接收到到达的新数据。对于流,数据是由消费者请求的适用范围内的消息。

如您所见,基本上,在返回事件循环之前,调用 XADD 的客户端和阻塞以消费消息的客户端都将在输出缓冲区中接收到它们的回复,因此调用 XADD 的调用者应该在消费者接收到新消息的同时从 Redis 接收到回复。

这个模型是基于推送的,因为通过调用XADD动作直接向消费者缓冲区添加数据,所以延迟往往是相当可预测的。

延迟测试

为了检查这些延迟特征,使用多个 Ruby 程序实例进行了测试,这些程序推送带有计算机毫秒时间作为额外字段的消息,并有 Ruby 程序从消费者组读取消息并处理它们。消息处理步骤包括将当前计算机时间与消息时间戳进行比较,以了解总延迟时间。

结果展示:

Processed between 0 and 1 ms -> 74.11%
Processed between 1 and 2 ms -> 25.80%
Processed between 2 and 3 ms -> 0.06%
Processed between 3 and 4 ms -> 0.01%
Processed between 4 and 5 ms -> 0.02%

99.9%的请求延迟时间小于等于2毫秒,而剩下的离群值仍非常接近平均值。

将几百万未经确认的消息添加到流中并不会改变基准测试的要点,大多数查询仍以非常短的延迟时间进行处理。

附注:

  • 这里每次迭代最多处理10k条消息,这意味着 XREADGROUP 的 COUNT 参数设置为10000。这会增加很多延迟,但为了让慢消费者能够跟上消息流,这是必要的。因此,你可以期望真实世界的延迟要小得多。
  • 这个基准测试所使用的系统与当今的标准相比非常慢。
转载自:https://juejin.cn/post/7224489546997039164
评论
请登录