Redis 15_基于Redis实现消息队列详解

一、消息队列简介

1.1 消息队列的核心能力

谈到消息队列(MQ,message queue),最重要的两项能力就是 业务解耦流量削峰消息队列

MQ 的解耦能力:

  • producer 不需要过分关心 consumer 的身份信息,只需要把消息按照指定的协议投递到对应的 topic 即可。
  • producer 在处理请求时,只需要把消息投递到 mq 即可认为流程处理结束,相比于同步请求下游,整个流程会更加轻便灵活,拥有更高的吞吐量。
  • 因为有 mq 作为缓冲层,下游 consumer 可以设定好合适的消费限流参数,按照指定的速率进行消费,能够在很大程度上对 consumer 起到保护作用。

MQ 的削峰能力: 在实际的生产环境中,倘若上游请求量很大,而下游都需要第一时间进行同步响应的话,这对于下游系统可能产生很大的负荷. 此时如果能把同步流程转为异步,把消息放到 mq 组件中进行一轮缓冲,让下游可以根据自身的处理能力,按照自己的节奏消化这部分积攒的流量,这对于下游系统来说能起到很好的保护作用.

除此之外,作为消息队列,还需具备以下基础能力:

1、消息不丢失

MQ 最基本的一项能力是,是要确保整个交互流程不出现消息丢失的问题. 这里从三个环节去看待这个问题:

  • producer 将 msg 投递到 MQ 时不出现丢失;
  • msg 存放在 MQ 时不出现丢失;
  • consumer 从 MQ 消费 msg 时不出现丢失;

针对于上述第二点,各 MQ 组件在实现上大抵上是基于数据落盘+数据备份的方式保证的。

而针对于上述的一、三点,则是通过两个交互环节中的 ack 机制保证的。以 producer 投递 msg 到 MQ 的环节为例,只要 MQ 没有给到投递成功的 ack 反馈,那么 producer 就应该把本次投递流程视为失败,执行重新投递的操作。consumer 的消费流程同样如此。

因此,MQ 交互流程主要通过 ack 机制保证消息投递以及消费环节做到 at least once(至少一次)的语义,然而无法保证消息不重复的问题。因此,处于最下游的消费者 consumer 需要能够具备消息幂等去重的能力,避免流程被重复处理。

2、支持消息存储 MQ 另一项基础能力是支持消息的存储,这样当下游 consumer 没来得及第一时间消费消息时,消息能缓存在 MQ 组件中一段时间,让消费方自由选择合适的时间过来进行消费。

1.2 消息队列的消费类型

在消息队列的架构中,根据 consumer 消费消息的流程,可将 MQ 分为 push 型 和 ** pull 型**。

1、 push 型

push 型指的是当 producer 将消息投递到 MQ 后,由 MQ 负责将消息以推送(push)的形式主动发送给各个建立了订阅关系的消费方(consumer)。

对于 push 型,存在的优势是:

  • 流程实时性比较强,消息来了就执行推送;
  • 比较契合发布/订阅的模型;

劣势:

  • 对下游 consumer 的保护力度不够。MQ 的核心功能是解耦、削峰,本质上是提供了一个缓冲的空间,让 consumer 能根据自己的消费能力在合适的时机进行消息处理。所以 push 型在这方面体现的优势不够明显,消息到达后就需要向各个 consumer 发起推送。不过这个问题可以在一定程度上通过消费限流的方式加以弥补。

2、pull 型

pull 型指的是当 MQ 中存在消息时,由 consumer 主动执行拉取消息的操作。

对于 pull 型则与 push 型刚好相反:

  • 优势是:下游握有消费操作的主动权,能选择在合适的时机执行消费操作 & 劣势是:实时性会弱一些,和主动 pull 的轮询机制有关

1.3 基于Redis实现消息队列的一类通用问题

1、存储昂贵

redis 本身是基于内存实现的缓存组件,因此在存储消息时总容量相对有限。

2、数据丢失

redis 存储消息时会不可避免地存在数据丢失的风险,可以从两个方面出发考虑:

  • 内存是易失性存储,即便 redis 中有 rdb/aof 之类的持久化机制加以弥补,但这个持久化流程是异步执行的,无法提供百分百的保证力度;
  • redis 走的是 ap 高可用流派,数据的主从复制流程是异步执行的,主从切换时数据存在弱一致问题;

这些问题,不论是在 redis 缓存数据 还是实现 MQ 的流程中都是存在的,在选型使用 redis 时需一定要对这些存在的问题做到了然于心。

二、基于 Redis list 实现的消息队列

2.1 Redis list 实现的消息队列简介

基于 redis 实现 MQ 的方式之一是使用 redis 中的 list 结构。

redis 中的 list 是一个双向链表,天然契合 MQ 中的 queue 队列模型。在使用 redis list 实现 MQ 时,可以将生产消息的流程当做一次将数据追加到 list 尾部的操作;同时,可以把消费消息的流程当做一次从 list 头部摘取数据的操作,这样,一个简易版的消息队列就轻松实现了。

2.2 Redis list 实现消息队列的操作指令

在使用 list 充当消息队列时,list 对应的 key 则对应为消息的 topic 名称。

producer 在投递消息时,可以使用 lpush 指令,对应的时间复杂度为 O(1)。

1
2
127.0.0.1:6379> lpush msg_topic msg
(integer) 1

consumer 消费消息时,使用 rpop 指令,对应的时间复杂度 O(1).

1
2
127.0.0.1:6379> rpop msg_topic
"msg"

2.3 Redis list 实现消息队列的消费流程分析

在上述流程中,存在的第一个问题是 consumer 的轮询消费流程应该如何组织?

这种基于 list 实现的 MQ 是属于 pull 类型。消费方自行组织流程,并在合适的时机通过 rpop 进行消息的主动拉取(pull)。

首先,consumer 在消费时,一定是一个类似于 loop thread 的自旋模型,每一轮循环中,通过 rpop 指令尝试从 list 中读取消息,如果成功读取到了消息,则进行相应的逻辑处理.

然而在此处,需要注意的是,redis 的 rpop 指令是非阻塞型的,即在 list 没有数据时,也会即时返回一个结果为 nil 的响应,这样在组织这段自旋程序的时候就显得有些尴尬:

  • 倘若在 rpop 捕捉到 nil 时,立即开启下一轮循环,则这个轮询行为可能是没有意义的,因为 list 中可能仍然不存在数据。这样的高频率自旋,对于 cpu 资源是一种无谓的损耗。
  • 倘若选择让 consumer 休眠一段时间进行循环,这个休眠的时长又具有一定的人为误判性。倘若把时长设得太短,仍然会存在 cpu 浪费的问题;倘若设得太长,则可能会导致消息处理不及时的问题。

在这个过程中,最理想的实现方案是,在 list 中有数据到达时,consumer 可以即时获取到对应的结果;倘若 list 数据为空,则令 consumer 陷入阻塞等待的状态,直到有数据抵达时程序才被唤醒。

在 Redis 中,可以使用 redis 中的 brpop 指令替代 rpop 指令来做到在有数据时才返回响应,否则当前程序陷入阻塞。

1
2
3
127.0.0.1:6379> brpop msg_topic 0
1) "msg_topic"
2) "msg"
  • msg_topic: topic 名称
  • 0:阻塞等待的超时时长,达到此阈值仍未获取数据时会返回 nil,如果设置为 0,则代表没有这个超时限制(将会一直阻塞,直到有消息可以消费)

2.4 Redis list 实现消息队列的局限性分析

虽然采用 brpop 解决了 consumer 合理阻塞消费数据的问题,但是基于 redis list 实现的 MQ 仍然不能称为一个成熟的实现方案,其中主要存在着以下几项缺陷:

1、无法支持发布/订阅模式

list 中的数据是独一份的,被 pop 出去后就不复存在了,因此 redis 中的 list 是无法支持 mq 中的发布/订阅模式的,即下游倘若存在多个独立的消费者组 consumer group,各自都需要独立获取一份完整的数据,那么此时通过 redis list 是无法满足这个诉求的。

2、无法支持消费端 ack 机制

consumer 通过 brpop 获取到数据后,若发生宕机或者其它意外错误,这时没有一种有效的手段能给予 MQ 一个消息处理失败的反馈。这条消息一旦从 list 中被取走,就不再有机会被重新获取了,因此在这个场景下,消息就真的丢失了。

三、基于 Redis pub/sub 实现的消息队列

3.1 Redis pub/sub 实现的消息队列简介

为解决 redis list 存在的无法支持 发布 / 订阅 模式的问题,redis 提供了 pub/sub 机制,能够有效地弥补这方面的缺陷。

pub/sub 全称为 publish/subscribe,指的正是消息队列中的发布/订阅模式。

Tips: 为了贴合 pub/sub 的语义,在本章节中,统一把生产者 producer 称为 publisher,消费者 consumer 称为 subscriber

在实现上,pub/sub 会在 publisher 和 subscriber 之间建立一个用于实时通讯的信道——channel,在传递消息时,会根据 channel 查找到所有建立过订阅关系的 subscriber 一一 将消息送达到它们手中。由此空间 基于 Redis 的 pub/sub 实现的 MQ 是属于 push 类型。

3.2 Redis pub/sub 实现消息队列的操作指令

使用 pub/sub 实现 MQ 流程:

1、消费方 subscriber 通过 subscribe 指令建立对某个 channel 的订阅关系。

1
2
3
4
5
127.0.0.1:6379> subscribe msg_channel_topic
Reading messages... (press Ctrl-C to quit)
1) "subscribe"
2) "msg_channel_topic"
3) (integer) 1
  • msg_channel_topic: 发布/订阅的频道(channel),即 MQ 的 topic 名称。

每个通过 subscribe 指令建立 channel 订阅关系的使用方都会被视为一个独立的 subscriber,后续 channel 中有消息到达时,会被复制成多份,一一推送到各个 subscriber 手中。

2、生产方 publisher 通过 publish 指令往对应的 channel 中执行消息投递操作

1
2
127.0.0.1:6379> publish msg_channel_topic msg
(integer) 1

此时,之前对这个 channel 执行过 subscribe 操作的 subscriber 都会接收到这则消息:

1
2
3
1) "message"
2) "msg_channel_topic"
3) "msg"

消费者通过 subscribe 指令会对 channel 采用阻塞模式进行监听,只有在有消息到来时,才会从阻塞状态中被唤醒。

3.3 Redis pub/sub 消息队列实现原理

首先,消费方 subscriber 通过 subscribe 指令建立和指定 channel 之间的订阅关系。这时在 redis 中会维护好 channel 和对应 subscriber 列表的映射关系,并在内存中为每个在线活跃的 subscriber 分配好一个缓冲区 buffer,用以承载后续到来的消息数据。

接下来随着 publisher 执行 publish 指令,往对应 channel 中投递消息后,此时 redis 会实时查看 channel 对应 subscriber 名单,往每个 subscriber 的缓冲区 buffer 中推送这条数据

每个执行了 subscribe 指令的 subscriber 会处于阻塞监听缓冲区 buffer 的状态,随着新数据到达,subscriber 会获取到这条数据(消息)。

基于这个流程,可以看出来,pub/sub 对于 channel 以及 subscribers 之间的实时映射关系存在强依赖。因此在操作的执行顺序上,需要保证先执行 subscribe 指令,再执行 publish 指令,否则执行 subscribe 前已经 publish 投递的消息就会因为没有 subscriber 而被直接丢弃。

3.4 Redis pub/sub 实现消息队列的优缺点分析

pub/sub 模式最大的优势就是能够支持发布/订阅能力,同一份消息会被推送给所有通过 subscribe 操作订阅了该 channel 的 subscriber。

然而,pub/sub 存在的问题是很显著的,就是丢消息问题。这个问题可以从多个维度展开:

1、缺乏 ack 机制

这个问题和 redis list 相同, subscriber 在获取到消息后,没有对应的 ack 机制,因此倘若处理失败,想要执行消息的重放操作是无法做到的。

2、缺乏消息存储能力

redis pub/sub 机制就有点类似于 golang 中的无缓冲型 channel,它相当于只维护了channel 和 subscribers 的映射关系,但是每条被投递的消息都是即来即走,并不会停留在 channel 中,于是在以下几个场景中,都会发生消息丢失问题:

  • subscriber 宕机:倘若某个 subscriber 中途宕机,则会被踢出名单,在恢复前的这段时间内,到达的消息都不会push到这个 subscriber。
  • redis 宕机:每条 publish 的消息都会第一时间分发到 subscriber 对应的内存缓冲区中,而这个缓冲区是完全基于内存实现的易失性存储。一旦 redis 服务端宕机,缓冲区中的数据就完全丢失且不可恢复了。此外,pub/sub 模式下的消息数据不属于 redis 中的基本数据类型,因此 redis 中的持久化机制 rdb 和 aof 对于 pub/sub 中的数据是完全不生效的,数据丢失的可能性大幅度提高。
  • subscriber 消息积压:由于消息数据会被放在 redis 侧各 subscriber 的缓冲区 buffer 中,这部分空间是相对有限的,一旦某个 subscriber 因为消费能力弱,导致 buffer 中的的数据发生积压,此时 redis 很可能会自动把 subscriber 踢除下线,于是这部分数据也丢失了。subscriber 对应的缓冲区容量阈值可以在 redis.conf 文件中进行配置,其默认值为:client-output-buffer-limit pubsub 32mb 8mb 60, 对应的含义是,倘若某个 subscriber 的缓冲区 buffer 大小达到 32MB,则 subscriber 会被踢下线;倘若缓冲区内数据量在连续 60s 内达到 8MB 大小,subscriber 也会踢下线。

四、基于 Redis streams 实现的消息队列

4.1 Redis streams 实现的消息队列简介

由于 redis 中的 list 和 pub/sub 实现的 MQ 功能,各自都存在着比较明显的功能缺陷,都是无法被当作一个成熟的 MQ 组件来使用的。因此 Redis 提供了 streams,基于 streams 可以实现趋近于成熟的 MQ 方案。

从 redis 5.0 中,推出了一个新的数据类型——streams。这一数据类型的目标正是为实现 MQ 组件的功能而生。

4.2 Redis streams 实现消息的操作指令

使用 redis streams 时涉及到的几个核心操作指令:

1、生产消息

用于生产消息的指令 XADD 指令往 topic 中投入一组 kv 对消息:

1
2
3
4
127.0.0.1:6379> XADD msg_streams_topic * key1 val1
"1638515664470-0"
127.0.0.1:6379> XADD msg_streams_topic * key2 val2
"1638515672769-0"
  • msg_streams_topic:topic 名称
  • *:消息自动生成唯一标识 id,基于时间戳+自增序号生成
  • key1/val1、key2/val2:消息数据 kv 对

2、消费消息

用于消费消息的指令 XREAD 指令从对应 topic 中获取(pull)消息:

1
2
3
4
5
6
7
8
127.0.0.1:6379> XREAD STREAMS msg_streams_topic 0-0
1) 1) "msg_streams_topic"
   2) 1) 1) "1638515664470-0"
         2) 1) "key1"
            2) "val1"
      2) 1) "1638515672769-0"
         2) 1) "key2"
            2) "val2"
  • msg_streams_topic: topic 名称
  • 0-0:从头开始消费,倘若这里填为某条消息 id,则代表从这条消息之后(不包含这条消息)开始消费

3、阻塞模式消费消息

streams 支持在消费时,采用阻塞模式进行消费,若存在数据则即时返回处理,否则会阻塞消费流程。

1
2
3
# BLOCK 0 表示阻塞等待时没有超时时间上限
127.0.0.1:6379> XREAD BLOCK 0 STREAMS msg_streams_topic 1638515672769-0
(nil)
  • BLOCK:阻塞消费模式
  • 0:阻塞等待超时时间,超过这个时长会返回 nil. 设置为 0 则表示不设置超时阈值

4、创建消费者组

streams 也支持发布订阅模式,能保证消息被多个消费者组 consumer group 同时消费到。

需要先进行消费者组的创建:

1
2
127.0.0.1:6379> XGROUP CREATE msg_streams_topic consumer_group 0-0
OK
  • msg_streams_topic:topic 名称
  • consumer_group:消费者组名称
  • 0-0:从头开始消费

5、基于消费者组消费消息

同一份数据在同一个消费者组下只会被消费到一次;不同消费者组各自能获取到独立完整的消息数据。

通过 XReadGroup 指令,以消费者组的身份进行消费:

1
2
3
4
5
6
7
8
127.0.0.1:6379> XREADGROUP GROUP consumer_group consumer BLOCK 0 STREAMS msg_streams_topic >
1) 1) "msg_streams_topic"
   2) 1) 1) "1638515664470-0"
         2) 1) "key1"
            2) "val1"
      2) 1) "1638515672769-0"
         2) 1) "key2"
            2) "val2"
  • consumer_group: 消费者组名称
  • consumer:消费者名称
  • msg_streams_topic:topic 名称
  • block 0: 采用阻塞等待的模式,0 代表没有超时上限
  • >: 读最新的消息 (尚未分配给某个 consumer 的消息)

还有另一种消费模式,读取的是已分配给当前消费者,但是还未经确认的老消息:

1
2
3
4
5
6
7
8
XREADGROUP GROUP consumer_group consumer STREAMS msg_streams_topic 0-0
1) 1) "msg_streams_topic"
   2) 1) 1) "1638515664470-0"
         2) 1) "key1"
            2) "val1"
      2) 1) "1638515672769-0"
         2) 1) "key2"
            2) "val2"
  • 0-0:标识读取已分配给当前 consumer ,但是还没经过 xack 指令确认的消息

6、确认消息

通过 xack 指令,携带上消费者组、topic 名称以及消息 id,能够完成对某条消息的确认操作:

1
2
127.0.0.1:6379> XACK msg_streams_topic consumer_group 1638515664470-0
(integer) 1
  • msg_streams_topic:topic 名称
  • consumer_group:消费者组名称
  • 1638515664470-0:消息 id

4.3 Redis streams 实现消息的优缺点分析

1、支持发布/订阅模式

redis streams 引入了消费者组 group 的概念,因此是能够保证各个消费者组 consumer group 均能够获取到一份独立而完整的消息数据。

2、数据可持久化

redis 中的 streams 和 string、list 等数据类型一样,都能够通过 rdb( redis database)、aof( append only file) 的持久化机制进行落盘存储,能够在很大程度上降低数据丢失的概率。

3、支持消费端 ack 机制

redis streams 中另一项非常重要的改进,是支持 consumer 的 ack 能力。consumer 在处理好某条消息后,能通过 xack 指令对该消息进行确认,这样对于没经过 ack 确认的消息,redis streams 还是为 consumer 保留了重新消费的能力。

4、支持消息缓存

和 pub/sub 模式不同的是,redis streams 中会实际开辟内存空间用于存储 streams 中的数据,因此哪怕某个 consumer group 是在消息生产之后才完成注册操作,也能够进行消息溯源,从 topic 起点开始执行消息的消费操作。

不过这里需要考虑的问题是,redis 基于内存实现消息数据的存储,倘若大量的消息数据被堆积在内存中,在资源使用上会存在很大的压力和很高的成本,严重时甚至可能发生 OOM 问题。

基于此,redis streams 支持在每次投递消息时,显式设定一个 topic 中能缓存的数据长度,来人为限制这个缓存空间的容量。可以通过在 XADD 指令中加上 maxlen 参数,用于指定 topic 中能缓存的数据长度:

1
127.0.0.1:6379> XADD topic1 MAXLEN 10000 * key1 val1
  • MAXLEN 10000:最多缓存 10000 条数据

这样若 topic 数据容量超限,则新消息的到达会把最老的消息挤出队列,意味着也可能存在数据丢失的风险,因此大家在使用时需要合理设置 maxlen 参数。

五、Redis 实现消息队列的整体对比分析

5.1 基于redis 实现的不同消息队列的对比分析

MQ实现方案 发布/订阅能力 消费端ACK机制 消息缓存能力 数据丢失风险
list 不支持 不支持 支持
pub/sub 支持 不支持 不支持
streams 支持 支持 支持

可以看到,在各项能力上 list 和 pub/sub 互有优缺点,而 streams 可以说是兼具了各方面的优势,称得上是已经趋近于成熟的 MQ 实现方案。

然而,应该能注意到,此处评价 streams 方案的数据丢失风险时,仅仅是评价为“低”,而不是“无”,这说明 Redis 实现的消息队列始终是存在消息丢失的问题。

5.2 redis stream 消息队列与 kafka 对比分析

mq 组件 消息存储介质 消息分区/并发能力 数据丢失风险 运维成本
redis streams 内存 不支持
kafka 磁盘 支持 理论上不存在 偏高

可以看到,redis streams 在存储介质上需要使用内存,因此消息存储容量相对有限;且同一个 topic 的数据由于对应为同一个 key,因此会被分发到相同节点,无法实现数据的纵向分治,因此不具备类似于 kafka 纵向分区以提高并发度的能力。

此外,很重要的一个点是,基于 redis 实现的 MQ 一定是存在消息丢失的风险的。尽管在生产端和消费端,producer/consumer 在和 MQ 交互时可以通过 ack 机制保证在交互环节不出现消息的丢失,然而在 redis 本身存储消息数据的环节就可能存在数据丢失问题,原因在于:

  • redis 数据基于内存存储:哪怕通过最严格 aof 等级设置,由于持久化流程本身是异步执行的,也无法保证数据绝对不丢失;
  • redis 走的是 ap 高可用流派:为保证可用性,redis 会在一定程度上牺牲数据一致性。在主从复制时,采用的是异步流程,倘若主节点宕机,从节点的数据可能存在滞后,这样在主从切换时消息就可能丢失;

与之相对的,kafka 只要合理设置好 ISR(In Sync Replica) 有关参数,理论上在集群存在多数节点仍能正常运作的情况下,对应的消息数据是不会出现丢失的。

Redis实现消息队列的优势在于 其相对轻量化,相比于传统 MQ 组件有着更低的使用和运维成本。

因此,在实际的选型过程中,可以根据业务诉求进行选型,倘若业务流程对于数据的精度没有特别严格的要求,那此时使用 redis streams 这样一种轻量化的 MQ 实现方案未尝不是一种好的选择和尝试。

六、基于 redis streams 实现消息队列(go语言)

实现可参考:https://github.com/xiaoxuxiansheng/redmq/tree/main