Redis 07_Redis流Streams

一、Redis 流(Streams)

1.1 Redis 流(Streams)简介

Redis 5.0 增加了很多新的特色功能,其中最大的新特性就是多出了一个数据结构 Stream,它是一个新的强大的支持多播的可持久化消息队列,Stream 极大地借鉴了 Kafka 的设计。

Redis Stream 有一个消息链表,将所有加入的消息都串起来,每个消息都有一个唯一的 ID 和 对应的内容。消息是持久化的,Redis 重启后,内容还在。

Redis Stream 的作用类似于追加日志。使用 Stream 可以实时记录 和 整合事件。Redis流用例示例包括:

  • 事件来源(例如,跟踪用户操作、点击等)。
  • 传感器监测(例如,现场设备的读数)。
  • 通知(例如,将每个用户的通知记录存储在单独的流中)

Redis为每个 Stream 条目生成一个唯一的ID。可以使用这些ID在以后检索它们的关联条目,或者读取和处理流中的所有后续条目。

Redis流支持多种微调策略(以防止流无限增长)和多种消费策略(请参阅XREAD、XREADGRUP和XRANGE)。

Redis Stream 主要用于实现消息队列(MQ,Message Queue),可以说是目前最新Redis版本(6.2)中最完美的消息队列实现。

Redis Stream 的功能如下:

  • 提供了对于消费者和消费者组的阻塞、非阻塞的获取消息的功能;
  • 提供了消息多播的功能,同一个消息可被分发给多个单消费者和消费者组;
  • 提供了消息持久化的功能,可以让任何消费者访问任何时刻的历史消息;
  • 提供了强大的消费者组的功能:
    • 消费者组实现同组多个消费者并行但不重复消费消息的能力,提升消费能力;
    • 消费者组能够记住最新消费的信息,保证消息连续消费;
    • 消费者组能够记住消息转移次数,实现消费失败重试以及永久性故障的消息转移;
    • 消费者组能够记住消息转移次数,借此可以实现死信消息的功能(需自己实现);
    • 消费者组提供了PEL未确认列表和ACK确认机制,保证消息被成功消费,不丢失;

Redis Stream基本上可以满足你对消息队列的所有需求。

1.2 Redis 流(Streams)的基本结构

Redis Stream 像是一个仅追加内容的消息链表,把所有加入的消息都一个一个串起来,每个消息都有一个唯一的 ID 和 内容,它还从 Kafka 借鉴了另一种概念:消费者组(Consumer Group),这让 Redis Stream 变得更加复杂。

Redis Stream的结构如下:

每个 Stream 都有唯一的名称,它就是 Redis 的 key,在首次使用 XADD 指令追加消息时自动创建该 key。

  • Consumer Group:消费者组,消费者组记录了 Starem 的状态,使用 XGROUP CREATE 命令手动创建,在同一个 Stream 内消费者组名称唯一。一个消费者组可以有多个消费者(Consumer)同时进行组内消费,所有消费者共享 Stream 内的所有信息,但同一条消息只会有一个消费者消费到,不同的消费者会消费Stream中不同的消息,这样就可以应用在分布式的场景中来保证消息消费的唯一性。
  • last_delivered_id:游标,用来记录某个消费者组在 Stream 上的消费位置信息,每个消费者组会有一个游标,任意一个消费者读取了消息都会使游标 last_delivered_id 往前移动。创建消费者组时需要指定从 Stream 的哪一个消息ID(哪个位置)开始消费,该位置之前的数据会被忽略,同时还用来初始化 last_delivered_id 这个变量。这个 last_delivered_id 一般来说就是最新消费的消息ID。
  • pending_ids:消费者内部的状态变量,作用是维护消费者的未确认的消息ID。pending_ids 记录了当前已经被客户端读取,但是还没有 ack (Acknowledge character:确认字符) 的消息,目的是为了保证客户端至少消费了消息一次,而不会因为在网络传输的中途丢失而没有对消息进行处理。如果客户端没有 ack,那么这个变量里面的消息 ID 就会越来越多,一旦某个消息被 ack,它就会对应开始减少。这个变量也被 Redis 官方称为 PEL (Pending Entries List)
  • 消息ID:消息ID的形式是 timestampInMillis-sequence,例如 1527846880572-5,它表示当前的消息在毫米时间戳 1527846880572 时产生,并且是该毫秒内产生的第5条消息。消息ID可以由服务器自动生成,也可以由客户端自己指定,但是形式必须是整数-整数,而且必须是后面加入的消息的ID要大于前面的消息ID。 消息内容:消息内容就是键值对(形如 hash结构 的键值对)。

每个 Stream 都可以挂多个消费组(Consumer Group),每个消费组会有一个游标 last_delivered_id 在 Stream 数组之上往前移动,表示当前消费组已经消费到哪条消息了。每个消费组都有一个 Stream 内唯一的名称,消费组不会自动创建,它需要单独的指令 xgroup create 进行创建,需要指定从 Stream 的某个消息 ID 开始消费,这个 ID 用来初始化 last_delivered_id 变量。

每个消费组的状态都是独立的,相互不受影响。也就是说 同一份 Stream 内部的消息会被每个消费组都消费到。同一个消费组可以挂接多个消费者(Consumer),这些消费者之间是竞争关系,任意一个消费者读取了消息都会使游标 last_delivered_id 往前移动。每个消费者有一个组内唯一名称。

消费者内部会有一个状态变量 pending_ids,它记录了当前已经被客户端读取,但是还没有 ack 的消息。如果客户端没有 ack,这个变量里面的消息 ID 就会越来越多,一旦某个消息被 ack,它就开始减少。这个 pending_ids 变量在 Redis 官方被称为 PEL,也就是 Pending Entries List,这是一个核心的数据结构,它用来确保客户端至少消费了消息一次,而不会在网络传输的中途丢失了而没被处理。

二、Streams 类型相关命令

2.1 Redis Streams 相关命令简介

Streams(流)是 Redis 中的一种支持多播的可持久化消息队列的数据结构,它有一个消息链表,将所有加入的消息都串起来,每个消息都有一个唯一的 ID 和 对应的内容。消息是持久化的,Redis 重启后,内容还在。以下是关于 Redis Set 相关命令的简介表,包括命令、作用以及时间复杂度:

命令 作用 时间复杂度
XADD 将指定的流条目追加到指定 key 的流中。 O(1)

2.1 XADD 往Stream末尾添加消息

XADD 命令将指定的流条目追加到指定 key 的流中。如果 key 不存在,作为运行这个命令的副作用,将使用流的条目自动创建 key。

一个条目(消息)是由一组键值对组成的,它基本上是一个小的字典。键值对将会以用户给定的顺序存储,并且读取流的命令(如 XRANGE 或者 XREAD)可以保证按照通过 XADD 添加的顺序返回。XADD唯一可以向流添加数据的 Redis 命令,但是还有其它命令,例如 XDELXTRIM,它们能够从流中删除数据。

命令语法:

1
127.0.0.1:6379> XADD key [NOMKSTREAM] [MAXLEN|MINID [=|~] threshold [LIMIT count]] *|ID field value [field value ...]

命令参数:

  • key:必须参数,流的的名字,如果key对应的Stream不存在,则自动创建。
  • NOMKSTREAM:Redis 6.2.0 添加的可选参数,key后面添加 NOMKSTREAM命令 可以禁止 key 不存在是自动创建Stream。
  • MAXLEN:可选参数,用于限制 Stream消息 的长度(数量),只要流的长度超过指定的阈值 threshold(其中threshold值是正整数),就会驱逐条目。
  • MINID:驱逐ID低于 threshold值 的条目,其中 threshold值 是流ID。
  • =|~:表示 MAXLEN/MINID 的值 精确/模糊 等于 threshold
  • ID:必须参数,流条目 ID 标识流内的给定条目。如果指定的 ID 参数是字符 *(星号 ASCII 字符),XADD 命令会自动生成一个唯一的 ID。但是,也可以指定一个良好格式的 ID,以便新的条目以指定的 ID 准确存储,虽然仅在极少数情况下有用。
    • ID 是由 - 隔开的两个数字组成的:1526919030474-55,两个部分数字都是 64 位的,当自动生成 ID 时,第一部分是生成 ID 的 Redis 实例的毫秒格式的 Unix 时间。第二部分只是一个序列号,是用来区分同一毫秒内生成的 ID 的。
    • ID 保证始终是递增的:如果比较刚插入的条目的 ID,它将大于其它任何过去的 ID,因此条目在流中是完全排序的。为了保证这个特性,如果流中当前最大的 ID 的时间大于实例的当前本地时间,将会使用前者,并将 ID 的序列部分递增。例如,在故障转移之后新主机具有不同的绝对时间,则可能发生这种情况。
    • 当用户为 XADD 命令指定显式 ID 时,最小有效的 ID 是 0-1,并且用户必须指定一个比当前流中的任何 ID 都要大的 ID,否则命令将失败。通常使用特定 ID 仅在您有另一个系统生成唯一 ID(例如 SQL 表),并且您确实希望 Redis 流 ID 与该另一个系统的 ID 匹配时才有用。

命令返回值:

  • 当 key 存在且是流类型时,返回 流条目ID;
  • 当 key 不存在时,创建 流 key, 并添加流条目,返回 流条目ID;
  • 当 key 不是 Stream 时,返回类型不匹配的错误信息;

示例:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
127.0.0.1:6379> xadd stream-key * username zhangsan # 向stream-key这个流中增加一个 username 是zhangsan的数据 *表示自动生成id
"1635999858912-0" # 返回的是ID
127.0.0.1:6379> keys *
1) "stream-key" # 可以看到stream自动创建了
127.0.0.1:6379> xadd not-exists-stream nomkstream * username lisi # 因为指定了nomkstream参数,而not-exists-stream之前不存在,所以加入失败
(nil)
127.0.0.1:6379> keys *
(empty array)
127.0.0.1:6379> xadd stream-key 1-1 username lisi # 此处id的值是自己传递的1-1,而不是使用*自动生成
"1-1" # 返回的是id的值
127.0.0.1:6379>

2.2 XRANGE 和 XREVRANGE 查看Stream中的消息

1、XRANGE 命令 XRANGE 命令获取消息列表,ID 从小到大,会自动过滤已经删除的消息。 命令语法:

1
127.0.0.1:6379> XRANGE key start end [COUNT count]

命令参数:

  • start/end:获取消息的范围,默认是闭区间;
    • - 表示最小值;
    • + 表示自大值;
    • ( 表示开区间;
    • start=timestampInMillis 表示获取大于等于 timestampInMillis 这个时间的 数据;
    • start=end 表示获取一条数据
  • COUNT 指定获取多少条数据;

命令返回值:

  • 当 key 存在且是流类型时,返回有指定范围内的消息列表,ID 从小到大;
  • 当 key 不存在时,返回 (empty array);
  • 当 key 不是流类型时,返回类型不匹配的错误信息;

2、XREVRANGE 命令 XREVRANGE 命令使用方式和XRANGE类似,获取反向消息列表,ID 从大到小,会自动过滤已经删除的消息。 命令语法:

1
127.0.0.1:6379> XREVRANGE key end start [COUNT count]

2.3 XDEL 删除 Stream中的消息

XDEL 删除 Stream中的 指定ID 的消息,可以一次指定多个消息ID 删除;

从Stream中删除一个消息,这个消息并不是被真正的删除了,而是被标记为删除状态,这个时候这个消息还是占据着内容空间的。如果所有Stream中所有的消息都被标记删除,这个时候才会回收内存空间。但是这个Stream并不会被删除。

命令语法:

1
127.0.0.1:6379> XDEL key ID [ID ...]

命令返回值:

  • 当 key 存在且是流类型时,返回删除的消息数;
  • 当 key 不存在时,返回 0;
  • 当 key 不是流类型时,返回类型不匹配的错误信息;

2.4 XLEN 查看Stream中元素的长度

命令语法:

1
127.0.0.1:6379> XLEN key

命令返回值:

  • 当 key 存在且是流类型时,返回流中消息的数量;
  • 当 key 不存在时,返回 0;
  • 当 key 不是ZSet集合时,返回类型不匹配的错误信息;

2.5 XTRIM 对Stream中的元素进行修剪

命令语法:

1
127.0.0.1:6379> XTRIM key MAXLEN|MINID [=|~] threshold [LIMIT count]

命令参数:

  • MAXLEN 指定 threshold 为 保留的最大消息数(最新的), 超过该阈值最旧的消息将被删除;
  • MINID 指定 threshold 为保留的最小消息ID,比之小的消息将被删除
  • =|~ = 表示精确指定 MAXLEN|MINID 的值为 threshold (默认情况),~ 表示模糊匹配(近似相等) MAXLEN|MINID 的值为 threshold

命令返回值:

  • 当 key 存在且是流类型时,返回流中剩余消息的数量;
  • 当 key 不存在时,返回 0;
  • 当 key 不是流类型时,返回类型不匹配的错误信息;

2.6 XREAD 独立消费Stream中的消息

XREAD 以阻塞或非阻塞方式读取消息,读取完之后并不会删除消息。使用XREAD读取消息,是完全独立与消费者组的,多个客户端可以同时读取消息。一次可以读取多个 流key。

XREAD 进行顺序消费 当使用 XREAD 进行顺序消息时,需要记住返回的消息id,同时下次调用xread时,需要将上次返回的消息id传递进去;

XREAD 读取消息,完全无视消费组,此时Stream就可以理解为一个普通的list。

命令语法:

1
127.0.0.1:6379> XREAD [COUNT count] [BLOCK milliseconds] STREAMS key [key ...] ID [ID ...]

命令参数:

  • COUNT count 指定 获取的消息数量;
  • BLOCK milliseconds 指定 如果 Stream 中没有消息了,则阻塞 milliseconds 毫秒,milliseconds=0 表示永久阻塞,直到有消息可以获取
  • =|~ = 表示精确指定 MAXLEN|MINID 的值为 threshold (默认情况),~ 表示模糊匹配(近似相等) MAXLEN|MINID 的值为 threshold
1
2
127.0.0.1:6379> xread streams stream-key $
(nil)
  • $ 表示读取队列最新进来的一个消息,不是 Stream的最后一个消息。是xread block执行后,再次使用xadd添加消息后,xread block才会返回。所以 读取队列尾的下一个消息,在非阻塞模式下始终是 nil
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
127.0.0.1:6379> xread streams stream-key 1636011806261-0 # 此处写的是lisi的id,即读取到的数据需要是 > 1636011806261-0
1) 1) "stream-key"
   2) 1) 1) "1636011810905-0"
         2) 1) "username"
            2) "wangwu"

127.0.0.1:6379> xread count 2 streams stream-key 0-0
1) 1) "stream-key"
   2) 1) 1) "1636011801365-0"
         2) 1) "username"
            2) "zhangsan"
      2) 1) "1636011806261-0"
         2) 1) "username"
            2) "lisi"

命令返回值:

  • 当 key 存在且是流类型时,返回流中剩余消息的数量;
  • 当 key 不存在时,返回 0;
  • 当 key 不是流类型时,返回类型不匹配的错误信息;

2.7 XGROUP 创建消费者组

命令语法:

1
127.0.0.1:6379> XGROUP [CREATE key groupname ID|$ [MKSTREAM]] [SETID key groupname ID|$] [DESTROY key groupname] [CREATECONSUMER key groupname consumername] [DELCONSUMER key groupname consumername]

创建流 stream-key

1
2
3
4
127.0.0.1:6379> xadd stream-key * aa aa
"1636362619125-0"
127.0.0.1:6379> xadd stream-key * bb bb
"1636362623191-0"

1、创建一个从头开始消费的消费者组

1
127.0.0.1:6379> xgroup create stream-key(Stream 名) g1(消费者组名) 0-0(表示从头开始消费)

2、创建一个从Stream最新的一个消息消费的消费者组

1
127.0.0.1:6379> xgroup create stream-key g2 $(表示从头开始消费)
  • $ 表示从最后一个元素消费,不包括Stream中的最后一个元素,即消费最新的消息。

3、创建一个从某个消息之后消费的消费者组

1
127.0.0.1:6379> xgroup create stream-key g3 1636362619125-0  # 1636362619125-0 这个是上方aa消息的id的值

1636362619125-0某个消息的具体的ID,这个g3消费者组中的消息都是大于>这个id的消息。

2.8 XREADGROUP 从消费者中读取消息

命令语法:

1
127.0.0.1:6379> XREADGROUP GROUP group consumer [COUNT count] [BLOCK milliseconds] [NOACK] STREAMS key [key ...] ID [ID ...]

示例:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
127.0.0.1:6379> xreadgroup group g1(消费组名) c1(消费者名,自动创建) count 3(读取3条) streams stream-key(Stream 名) >(从该消费者组中还未分配给另外的消费者的消息开始读取)
1) 1) "stream-key"
   2) 1) 1) "1636362619125-0"
         2) 1) "aa"
            2) "aa"
      2) 1) "1636362623191-0"
         2) 1) "bb"
            2) "bb"
127.0.0.1:6379> 
127.0.0.1:6379> xreadgroup group g2 c1 count 3 streams stream-key >
(nil) # 返回 nil 是因为 g2消费组是从最新的一条信息开始读取(创建消费者组时使用了$),需要在另外的窗口执行`xadd`命令,才可以再次读取到消息
127.0.0.1:6379> xreadgroup group g3 c1 count 3 streams stream-key >  #只读取到一条消息是因为,在创建消费者组时,指定了aa消息的id,bb消息的id大于aa,所以读取出来了。
1) 1) "stream-key"
   2) 1) 1) "1636362623191-0"
         2) 1) "bb"
            2) "bb"
127.0.0.1:6379> 
# 读取消费者的pending消息
127.0.0.1:6379> xgroup create stream-key g4 0-0 # 创建一个新的消费者组,消息是从头开始消费
OK
127.0.0.1:6379> xinfo consumers stream-key g1
1) 1) "name"
   2) "c1"
   3) "pending"     # pending状态的小节 即 未 ack 的消息
   4) (integer) 2   # g1 消费组中的消费者 c1 有2个 pending 的消息
   5) "idle"
   6) (integer) 88792
127.0.0.1:6379> xinfo consumers stream-key g4 # g4 消费组目前没有消息
(empty array)
127.0.0.1:6379> xreadgroup group g1 c1 count 1 streams stream-key 1636362619125-0
1) 1) "stream-key" # 读取 g1 消费组 中消费者 c1 的 id 大于 1636362619125-0 的消息,这个是从pending列表中读取到的
   2) 1) 1) "1636362623191-0"
         2) 1) "bb"
            2) "bb"
127.0.0.1:6379> xreadgroup group g4 c1 count 1 block 0 streams stream-key 1636362619125-0
1) 1) "stream-key" # g4 消费组的c1消费者的pending列表为空,没有读取到消息, 同时 block 阻塞也是失效的
   2) (empty array)

2.9 XINFO 命令

命令语法:

1
127.0.0.1:6379> XINFO [CONSUMERS key groupname] [GROUPS key] [STREAM key] [HELP]

命令参数:

  • CONSUMERS key groupname 返回 存储在流 key 中属于 groupname 消费者组的消费者列表;
  • GROUPS key 返回 存储在 流key 中的的所有消费者组的列表。
  • STREAM key 返回 存储在 流key 中的所有的 流信息;

2.10 XPENDING 转移消费者的消息

XPENTING 命令通过消费者组从流中获取数据,而不是确认这些数据,具有创建待处理条目的效果。 命令语法:

1
127.0.0.1:6379> XPENDING key group [[IDLE min-idle-time] start end count [consumer]]
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
127.0.0.1:6379> xpending stream-key g1 - + 10 c1 # 检查发现存在2 哥 pending 的消息
1) 1) "1636362619125-0"
   2) "c1"
   3) (integer) 2686183 # 未ack的时间,单位 ms
   4) (integer) 1
2) 1) "1636362623191-0"
   2) "c1"
   3) (integer) 102274 # 未ack的时间,单位 ms
   4) (integer) 7
127.0.0.1:6379> xpending stream-key g1 - + 10 c2
(empty array)
127.0.0.1:6379> xclaim stream-key g1 c2 102274 1636362623191-0 # 消息需要未ack的时间要大于 102274
1) 1) "1636362623191-0"
   2) 1) "bb"
      2) "bb"
127.0.0.1:6379> xpending stream-key g1 - + 10 c2 # 转移
1) 1) "1636362623191-0"
   2) "c2"
   3) (integer) 17616
   4) (integer) 8
127.0.0.1:6379>

上面 功能 也可以通过 xautoclaim 来实现。

2.11 XACK命令

XACK命令用于从流的消费者组的待处理条目列表(简称PEL)中删除一条或多条消息。 当一条消息交付到某个消费者时,它将被存储在PEL中等待处理, 这通常出现在作为调用XREADGROUP命令的副作用,或者一个消费者通过调用XCLAIM命令接管消息的时候。 待处理消息被交付到某些消费者,但是服务器尚不确定它是否至少被处理了一次。 因此对新调用XREADGROUP来获取消费者的消息历史记录(比如用0作为ID)将返回此类消息。 类似地,待处理的消息将由检查PEL的XPENDING命令列出。

一旦消费者成功地处理完一条消息,它应该调用XACK,这样这个消息就不会被再次处理, 且作为一个副作用,关于此消息的PEL条目也会被清除,从Redis服务器释放内存。

命令语法:

1
127.0.0.1:6379> XACK key group ID [ID ...]

命令返回值:

  • 当 key 存在且是流类型时,返回流中返回成功确认的消息数;

Tips: 某些消息ID可能不再是PEL的一部分(例如因为它们已经被确认), 而且XACK不会把它们算到成功确认的数量中。

  • 当 key 不存在时,返回 0;
  • 当 key 不是流类型时,返回类型不匹配的错误信息;

该命令返回成功确认的消息数。 某些消息ID可能不再是PEL的一部分(例如因为它们已经被确认), 而且XACK不会把它们算到成功确认的数量中。

2.12 一些监控命令

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
# 1、查看消费组中消费者的pending消息
127.0.0.1:6379> xpending stream-key g1 - + 10 c2
1) 1) "1636362623191-0"
   2) "c2"
   3) (integer) 1247680
   4) (integer) 8

# 2、查看消费组中的消费者信息
127.0.0.1:6379> xinfo consumers stream-key g1
1) 1) "name"
   2) "c1"
   3) "pending"
   4) (integer) 1
   5) "idle"
   6) (integer) 1474864
2) 1) "name"
   2) "c2"
   3) "pending"
   4) (integer) 1
   5) "idle"
   6) (integer) 1290069
127.0.0.1:6379>

# 3、查看消费组信息
127.0.0.1:6379> xinfo groups stream-key
1) 1) "name"
   2) "g1"
   3) "consumers"
   4) (integer) 2
   5) "pending"
   6) (integer) 2
   7) "last-delivered-id"
   8) "1636362623191-0"
2) 1) "name"
   2) "g2"
   3) "consumers"
......

# 4、查看Stream信息
127.0.0.1:6379> xinfo stream stream-key
 1) "length"
 2) (integer) 2
 3) "radix-tree-keys"
 4) (integer) 1
 5) "radix-tree-nodes"
 6) (integer) 2
 7) "last-generated-id"
 8) "1636362623191-0"
 9) "groups"
10) (integer) 4
11) "first-entry"
12) 1) "1636362619125-0"
    2) 1) "aa"
       2) "aa"
13) "last-entry"
14) 1) "1636362623191-0"
    2) 1) "bb"
       2) "bb"
127.0.0.1:6379>
Licensed under CC BY-NC-SA 4.0
最后更新于 2023-11-08 15:04 CST