一、Redis 流(Streams)
1.1 Redis 流(Streams)简介
Redis 5.0 增加了很多新的特色功能,其中最大的新特性就是多出了一个数据结构 Stream,它是一个新的强大的支持多播的可持久化消息队列,Stream 极大地借鉴了 Kafka 的设计。
Redis Stream 有一个消息链表,将所有加入的消息都串起来,每个消息都有一个唯一的 ID 和 对应的内容。消息是持久化的,Redis 重启后,内容还在。
Redis Stream 的作用类似于追加日志。使用 Stream 可以实时记录 和 整合事件。Redis流用例示例包括:
- 事件来源(例如,跟踪用户操作、点击等)。
- 传感器监测(例如,现场设备的读数)。
- 通知(例如,将每个用户的通知记录存储在单独的流中)
Redis为每个 Stream 条目生成一个唯一的ID。可以使用这些ID在以后检索它们的关联条目,或者读取和处理流中的所有后续条目。
Redis流支持多种微调策略(以防止流无限增长)和多种消费策略(请参阅XREAD、XREADGRUP和XRANGE)。
Redis Stream 主要用于实现消息队列(MQ,Message Queue),可以说是目前最新Redis版本(6.2)中最完美的消息队列实现。
Redis Stream 的功能如下:
- 提供了对于消费者和消费者组的阻塞、非阻塞的获取消息的功能;
- 提供了消息多播的功能,同一个消息可被分发给多个单消费者和消费者组;
- 提供了消息持久化的功能,可以让任何消费者访问任何时刻的历史消息;
- 提供了强大的消费者组的功能:
- 消费者组实现同组多个消费者并行但不重复消费消息的能力,提升消费能力;
- 消费者组能够记住最新消费的信息,保证消息连续消费;
- 消费者组能够记住消息转移次数,实现消费失败重试以及永久性故障的消息转移;
- 消费者组能够记住消息转移次数,借此可以实现死信消息的功能(需自己实现);
- 消费者组提供了PEL未确认列表和ACK确认机制,保证消息被成功消费,不丢失;
Redis Stream基本上可以满足你对消息队列的所有需求。
1.2 Redis 流(Streams)的基本结构
Redis Stream 像是一个仅追加内容的消息链表,把所有加入的消息都一个一个串起来,每个消息都有一个唯一的 ID 和 内容,它还从 Kafka 借鉴了另一种概念:消费者组(Consumer Group),这让 Redis Stream 变得更加复杂。
Redis Stream的结构如下:
每个 Stream 都有唯一的名称,它就是 Redis 的 key,在首次使用 XADD 指令追加消息时自动创建该 key。
- Consumer Group:消费者组,消费者组记录了 Starem 的状态,使用
XGROUP CREATE
命令手动创建,在同一个 Stream 内消费者组名称唯一。一个消费者组可以有多个消费者(Consumer)同时进行组内消费,所有消费者共享 Stream 内的所有信息,但同一条消息只会有一个消费者消费到,不同的消费者会消费Stream中不同的消息,这样就可以应用在分布式的场景中来保证消息消费的唯一性。 - last_delivered_id:游标,用来记录某个消费者组在 Stream 上的消费位置信息,每个消费者组会有一个游标,任意一个消费者读取了消息都会使游标
last_delivered_id
往前移动。创建消费者组时需要指定从 Stream 的哪一个消息ID(哪个位置)开始消费,该位置之前的数据会被忽略,同时还用来初始化last_delivered_id
这个变量。这个last_delivered_id
一般来说就是最新消费的消息ID。 - pending_ids:消费者内部的状态变量,作用是维护消费者的未确认的消息ID。
pending_ids
记录了当前已经被客户端读取,但是还没有ack (Acknowledge character:确认字符)
的消息,目的是为了保证客户端至少消费了消息一次,而不会因为在网络传输的中途丢失而没有对消息进行处理。如果客户端没有ack
,那么这个变量里面的消息 ID 就会越来越多,一旦某个消息被ack
,它就会对应开始减少。这个变量也被 Redis 官方称为 PEL (Pending Entries List)。 - 消息ID:消息ID的形式是
timestampInMillis-sequence
,例如1527846880572-5
,它表示当前的消息在毫米时间戳1527846880572
时产生,并且是该毫秒内产生的第5条消息。消息ID可以由服务器自动生成,也可以由客户端自己指定,但是形式必须是整数-整数,而且必须是后面加入的消息的ID要大于前面的消息ID。 消息内容:消息内容就是键值对(形如 hash结构 的键值对)。
每个 Stream 都可以挂多个消费组(Consumer Group),每个消费组会有一个游标 last_delivered_id
在 Stream 数组之上往前移动,表示当前消费组已经消费到哪条消息了。每个消费组都有一个 Stream 内唯一的名称,消费组不会自动创建,它需要单独的指令 xgroup create
进行创建,需要指定从 Stream 的某个消息 ID 开始消费,这个 ID 用来初始化 last_delivered_id
变量。
每个消费组的状态都是独立的,相互不受影响。也就是说 同一份 Stream 内部的消息会被每个消费组都消费到。同一个消费组可以挂接多个消费者(Consumer),这些消费者之间是竞争关系,任意一个消费者读取了消息都会使游标 last_delivered_id
往前移动。每个消费者有一个组内唯一名称。
消费者内部会有一个状态变量 pending_ids
,它记录了当前已经被客户端读取,但是还没有 ack
的消息。如果客户端没有 ack
,这个变量里面的消息 ID 就会越来越多,一旦某个消息被 ack
,它就开始减少。这个 pending_ids
变量在 Redis 官方被称为 PEL,也就是 Pending Entries List,这是一个核心的数据结构,它用来确保客户端至少消费了消息一次,而不会在网络传输的中途丢失了而没被处理。
二、Streams 类型相关命令
2.1 Redis Streams 相关命令简介
Streams(流)是 Redis 中的一种支持多播的可持久化消息队列的数据结构,它有一个消息链表,将所有加入的消息都串起来,每个消息都有一个唯一的 ID 和 对应的内容。消息是持久化的,Redis 重启后,内容还在。以下是关于 Redis Set 相关命令的简介表,包括命令、作用以及时间复杂度:
命令 | 作用 | 时间复杂度 |
---|---|---|
XADD | 将指定的流条目追加到指定 key 的流中。 | O(1) |
2.1 XADD 往Stream末尾添加消息
XADD 命令将指定的流条目追加到指定 key 的流中。如果 key 不存在,作为运行这个命令的副作用,将使用流的条目自动创建 key。
一个条目(消息)是由一组键值对组成的,它基本上是一个小的字典。键值对将会以用户给定的顺序存储,并且读取流的命令(如 XRANGE
或者 XREAD
)可以保证按照通过 XADD
添加的顺序返回。XADD
是 唯一可以向流添加数据的 Redis 命令,但是还有其它命令,例如 XDEL
和 XTRIM
,它们能够从流中删除数据。
命令语法:
|
|
命令参数:
- key:必须参数,流的的名字,如果key对应的Stream不存在,则自动创建。
- NOMKSTREAM:Redis 6.2.0 添加的可选参数,key后面添加 NOMKSTREAM命令 可以禁止 key 不存在是自动创建Stream。
- MAXLEN:可选参数,用于限制 Stream消息 的长度(数量),只要流的长度超过指定的阈值 threshold(其中threshold值是正整数),就会驱逐条目。
- MINID:驱逐ID低于 threshold值 的条目,其中 threshold值 是流ID。
- =|~:表示 MAXLEN/MINID 的值 精确/模糊 等于 threshold
- ID:必须参数,流条目 ID 标识流内的给定条目。如果指定的 ID 参数是字符
*
(星号 ASCII 字符),XADD
命令会自动生成一个唯一的 ID。但是,也可以指定一个良好格式的 ID,以便新的条目以指定的 ID 准确存储,虽然仅在极少数情况下有用。- ID 是由
-
隔开的两个数字组成的:1526919030474-55
,两个部分数字都是 64 位的,当自动生成 ID 时,第一部分是生成 ID 的 Redis 实例的毫秒格式的 Unix 时间。第二部分只是一个序列号,是用来区分同一毫秒内生成的 ID 的。 - ID 保证始终是递增的:如果比较刚插入的条目的 ID,它将大于其它任何过去的 ID,因此条目在流中是完全排序的。为了保证这个特性,如果流中当前最大的 ID 的时间大于实例的当前本地时间,将会使用前者,并将 ID 的序列部分递增。例如,在故障转移之后新主机具有不同的绝对时间,则可能发生这种情况。
- 当用户为 XADD 命令指定显式 ID 时,最小有效的 ID 是 0-1,并且用户必须指定一个比当前流中的任何 ID 都要大的 ID,否则命令将失败。通常使用特定 ID 仅在您有另一个系统生成唯一 ID(例如 SQL 表),并且您确实希望 Redis 流 ID 与该另一个系统的 ID 匹配时才有用。
- ID 是由
命令返回值:
- 当 key 存在且是流类型时,返回 流条目ID;
- 当 key 不存在时,创建 流 key, 并添加流条目,返回 流条目ID;
- 当 key 不是 Stream 时,返回类型不匹配的错误信息;
示例:
|
|
2.2 XRANGE 和 XREVRANGE 查看Stream中的消息
1、XRANGE 命令 XRANGE 命令获取消息列表,ID 从小到大,会自动过滤已经删除的消息。 命令语法:
|
|
命令参数:
- start/end:获取消息的范围,默认是闭区间;
-
表示最小值;+
表示自大值;(
表示开区间;start=timestampInMillis
表示获取大于等于 timestampInMillis 这个时间的 数据;start=end
表示获取一条数据
- COUNT 指定获取多少条数据;
命令返回值:
- 当 key 存在且是流类型时,返回有指定范围内的消息列表,ID 从小到大;
- 当 key 不存在时,返回 (empty array);
- 当 key 不是流类型时,返回类型不匹配的错误信息;
2、XREVRANGE 命令 XREVRANGE 命令使用方式和XRANGE类似,获取反向消息列表,ID 从大到小,会自动过滤已经删除的消息。 命令语法:
|
|
2.3 XDEL 删除 Stream中的消息
XDEL 删除 Stream中的 指定ID 的消息,可以一次指定多个消息ID 删除;
从Stream中删除一个消息,这个消息并不是被真正的删除了,而是被标记为删除状态,这个时候这个消息还是占据着内容空间的。如果所有Stream中所有的消息都被标记删除,这个时候才会回收内存空间。但是这个Stream并不会被删除。
命令语法:
|
|
命令返回值:
- 当 key 存在且是流类型时,返回删除的消息数;
- 当 key 不存在时,返回 0;
- 当 key 不是流类型时,返回类型不匹配的错误信息;
2.4 XLEN 查看Stream中元素的长度
命令语法:
|
|
命令返回值:
- 当 key 存在且是流类型时,返回流中消息的数量;
- 当 key 不存在时,返回 0;
- 当 key 不是ZSet集合时,返回类型不匹配的错误信息;
2.5 XTRIM 对Stream中的元素进行修剪
命令语法:
|
|
命令参数:
- MAXLEN 指定 threshold 为 保留的最大消息数(最新的), 超过该阈值最旧的消息将被删除;
- MINID 指定 threshold 为保留的最小消息ID,比之小的消息将被删除
- =|~
=
表示精确指定 MAXLEN|MINID 的值为threshold
(默认情况),~
表示模糊匹配(近似相等) MAXLEN|MINID 的值为threshold
;
命令返回值:
- 当 key 存在且是流类型时,返回流中剩余消息的数量;
- 当 key 不存在时,返回 0;
- 当 key 不是流类型时,返回类型不匹配的错误信息;
2.6 XREAD 独立消费Stream中的消息
XREAD 以阻塞或非阻塞方式读取消息,读取完之后并不会删除消息。使用XREAD读取消息,是完全独立与消费者组的,多个客户端可以同时读取消息。一次可以读取多个 流key。
XREAD 进行顺序消费 当使用 XREAD 进行顺序消息时,需要记住返回的消息id,同时下次调用xread时,需要将上次返回的消息id传递进去;
XREAD 读取消息,完全无视消费组,此时Stream就可以理解为一个普通的list。
命令语法:
|
|
命令参数:
- COUNT count 指定 获取的消息数量;
- BLOCK milliseconds 指定 如果 Stream 中没有消息了,则阻塞 milliseconds 毫秒,milliseconds=0 表示永久阻塞,直到有消息可以获取
- =|~
=
表示精确指定 MAXLEN|MINID 的值为threshold
(默认情况),~
表示模糊匹配(近似相等) MAXLEN|MINID 的值为threshold
;
|
|
$
表示读取队列最新进来的一个消息,不是 Stream的最后一个消息。是xread block执行后,再次使用xadd添加消息后,xread block才会返回。所以 读取队列尾的下一个消息,在非阻塞模式下始终是 nil
|
|
命令返回值:
- 当 key 存在且是流类型时,返回流中剩余消息的数量;
- 当 key 不存在时,返回 0;
- 当 key 不是流类型时,返回类型不匹配的错误信息;
2.7 XGROUP 创建消费者组
命令语法:
|
|
创建流 stream-key
|
|
1、创建一个从头开始消费的消费者组
|
|
2、创建一个从Stream最新的一个消息消费的消费者组
|
|
$
表示从最后一个元素消费,不包括Stream中的最后一个元素,即消费最新的消息。
3、创建一个从某个消息之后消费的消费者组
|
|
1636362619125-0某个消息的具体的ID,这个g3消费者组中的消息都是大于>这个id的消息。
2.8 XREADGROUP 从消费者中读取消息
命令语法:
|
|
示例:
|
|
2.9 XINFO 命令
命令语法:
|
|
命令参数:
- CONSUMERS key groupname 返回 存储在流 key 中属于 groupname 消费者组的消费者列表;
- GROUPS key 返回 存储在 流key 中的的所有消费者组的列表。
- STREAM key 返回 存储在 流key 中的所有的 流信息;
2.10 XPENDING 转移消费者的消息
XPENTING 命令通过消费者组从流中获取数据,而不是确认这些数据,具有创建待处理条目的效果。 命令语法:
|
|
|
|
上面 功能 也可以通过 xautoclaim
来实现。
2.11 XACK命令
XACK命令用于从流的消费者组的待处理条目列表(简称PEL)中删除一条或多条消息。 当一条消息交付到某个消费者时,它将被存储在PEL中等待处理, 这通常出现在作为调用XREADGROUP命令的副作用,或者一个消费者通过调用XCLAIM命令接管消息的时候。 待处理消息被交付到某些消费者,但是服务器尚不确定它是否至少被处理了一次。 因此对新调用XREADGROUP来获取消费者的消息历史记录(比如用0作为ID)将返回此类消息。 类似地,待处理的消息将由检查PEL的XPENDING命令列出。
一旦消费者成功地处理完一条消息,它应该调用XACK,这样这个消息就不会被再次处理, 且作为一个副作用,关于此消息的PEL条目也会被清除,从Redis服务器释放内存。
命令语法:
|
|
命令返回值:
- 当 key 存在且是流类型时,返回流中返回成功确认的消息数;
Tips: 某些消息ID可能不再是PEL的一部分(例如因为它们已经被确认), 而且XACK不会把它们算到成功确认的数量中。
- 当 key 不存在时,返回 0;
- 当 key 不是流类型时,返回类型不匹配的错误信息;
该命令返回成功确认的消息数。 某些消息ID可能不再是PEL的一部分(例如因为它们已经被确认), 而且XACK不会把它们算到成功确认的数量中。
2.12 一些监控命令
|
|