Redis 08_Redis发布订阅

一、Redis发布订阅简介

1.1 发布和订阅是什么

Redis 发布订阅(pub/sub) 是一种消息通信模式,其基本原理是消息的发送者(发布者 pub)不会直接发送消息给特定的接收者(订阅者 sub),而是将消息分成不同的类别(频道 channel),然后将消息发送给订阅了这些类别的所有接收者。发布订阅模式在分布式系统中广泛应用,例如实时消息推送、日志收集等。

Redis 发布/订阅(pub/sub) 由三部分组成:

  • 发布者(publisher) 是指通过 PUBLISH 命令向订阅者发送信息的客户端; 订阅者(subscriber) 是指使用 SUBSCRIBE 或者 PSUBSCRIBE 命令接收信息的客户端; 频道(channel) 是为了解耦发布者(publisher)和订阅者(subscriber)之间的关系,作为两者的中介 —— 发布者将信息直接发布给 channel ,而 channel 负责将信息发送给适当的订阅者,使得发布者和订阅者之间没有相互关系,也不知道对方的存在。

发布者(publisher)订阅者(subscriber) 属于客户端,频道(channel) 是 Redis 服务端,发布者通过 PUBLISH 命令向指定的频道发送消息,而订阅者则通过 SUBSCRIBE/UNSUBSCRIBE 命令订阅/取消订阅指定的频道,并通过监听器(Callback)接收到发布者发送的消息。

Redis 客户端可以订阅任意数量的频道,Redis 在订阅者和发布者之间起到了消息路由的功能:

发布者(publisher) 给一个频道发布消息后,消息就会发送给事先订阅了它的所有客户端:

发布订阅机制与 db 空间无关,比如在 db 10 发布, db 0 的订阅者也会收到消

Redis 发布订阅(pub/sub) 有两种实现模式:

  • 使用频道(Channel)实现的 发布订阅(pub/sub) 模式,该模式有三步:

    • 订阅者订阅频道;
    • 发布者向 频道(channel) 发布消息;
    • 所有订阅 频道(channel) 的订阅者收到消息;
  • 使用模式(Pattern)的发布订阅;

    • 当发送者向频道发布消息时,该消息还会发布到与这个频道匹配的 模式 上,订阅这个 模式 的客户端也会收到消息;

二、通过频道(Channel)实现发布订阅(pub/sub)

Redis 中的 频道(Channel) 相当于消息的分类,一个频道可以有多个订阅者,一个订阅者也可以订阅多个频道。在 Redis 中,通过 PUBLISH 命令向指定的频道发送消息,而通过 SUBSCRIBE/UNSUBSCRIBE 命令来订阅/取消订阅指定的频道,并通过监听器接收到发布者发送的消息。

在 Redis 中,发布/订阅模式的实现基于 Redis 的事件机制,即订阅者通过执行 SUBSCRIBE 命令将自己的监听器添加到 Redis 服务器的事件循环器中,当发布者通过 PUBLISH 命令向指定频道发送消息时,Redis 服务器会将消息发送给监听该频道的所有订阅者。

具体来说,Redis 服务器会维护一个事件循环器,并在其中注册所有客户端的监听器。当客户端通过 SUBSCRIBE 命令订阅某个频道时,Redis 服务器会将该客户端的监听器添加到与该频道相关的事件处理器中,并在事件循环器中注册该事件处理器。当发布者通过 PUBLISH 命令向指定频道发送消息时,Redis 服务器会将消息发送给与该频道相关的事件处理器中的所有监听器,从而实现消息的发布和订阅。

使用频道(Channel)实现的 发布订阅(pub/sub) 模式分为三步:

  • 订阅者订阅频道;
  • 发布者向 频道(channel) 发布消息;
  • 所有订阅 频道(channel) 的订阅者收到消息;

2.1 订阅者订阅频道

订阅者事先使用 SUBSCRIBE channel [channel ...] 订阅一个或者多个频道,O(n) 时间复杂度,n = 订阅的 Channel 数量。

1
2
3
4
5
SUBSCRIBE channel1
Reading messages... (press Ctrl-C to quit)
1) "subscribe"  # 消息类型
2) "channel1"   # 频道
3) (integer) 1  # 当前客户端订阅的频道数量

执行 SUBSCRIBE 令后,客户端进入订阅状态,订阅者只能使用 subscribeunsubscribepsubscribepunsubscribe 这四个属于 “发布/订阅” 的指令。

进入订阅后的客户端可以收到 3 种类型的消息回复:

  • subscribe:订阅成功的反馈消息,第二个值是订阅成功的频道名称,第三个是当前客户端订阅的频道数量。
  • message:客户端接收到消息,第二个值表示产生消息的频道名称,第三个值是消息的内容。
  • unsubscribe:表示成功取消订阅某个频道。第二个值是对应的频道名称,第三个值是当前客户端订阅的频道数量,当此值为 0 时客户端会退出订阅状态,之后就可以执行其它非"发布/订阅"模式的命令了。

2.2 发布者发布消息

发布者使用 `PUBLISH channel message 向指定 channel频道发布消息。

1
2
PUBLISH channel1 'do job'
(integer) 1

发布者发布的消息并不会持久化,消息发布之后有新的订阅的话,新的订阅者只能接收后续发布到该频道的消息。

2.3 订阅者接受消息

订阅了 channel1频道的订阅者 之后将会收到发布者发布到该频道的消息。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
// 订阅 channel1 频道
SUBSCRIBE channel1
Reading messages... (press Ctrl-C to quit)
1) "subscribe"  # 订阅频道成功 (等等接收发往该频道的消息)
2) "channel1"   # 频道
3) (integer) 1  # 当前客户端订阅的频道数量
#  当发布者发布消息,订阅者就能读取到的消息,如下
1) "message"    # 接受到消息
2) "channel1"   # 频道名称
3) "do job"     # 消息内容

2.4 退订频道

退订频道是订阅频道的反向操作,使用 UNSUBSCRIBE 命令取消订阅频道。

三、通过模式(Pattern)实现发布订阅(pub/sub)

Redis 还支持基于 模式(Pattern) 的发布/订阅,模式是一种特殊的频道,它可以匹配一个或多个频道。在 Redis 中,通过 PSUBSCRIBE/PUNSUBSCRIBE 命令订阅/取消订阅匹配指定模式的频道,并通过监听器接收到发布者发送的消息。

基于 模式 的发布/订阅 与 基于 频道 的发布/订阅 实现原理类似,只是在订阅时可以使用 通配符匹配多个频道,从而实现更加灵活的消息过滤和订阅。

具体来说,当客户端通过 PSUBSCRIBE 命令订阅某个模式时,Redis 服务器会将该客户端的监听器添加到所有与该模式匹配的频道相关的事件处理器中,并在事件循环器中注册该事件处理器。当发布者通过 PUBLISH 命令向与匹配该模式的频道发送消息时,Redis 服务器会将消息发送给与该模式相关的事件处理器中的所有监听器,从而实现基于模式的消息发布和订阅。

在基于频道的订阅中,是输入频道的完整名称实现订阅,而在模式的订阅则不需要指定全名,通过模式匹配多个字符串来实现消息的订阅,模式匹配过程中通配符 ? 表示1个占位符,* 表示任意任意个占位符,例如 com.ahead.* 相当于订阅了以 com.ahead. 开头的所有频道。当发送者向频道发布消息时,该消息还会发布到与这个频道匹配的 所有模式 上,订阅这个 模式 的所有客户端也会收到该消息; Redis 客户端 使用模式 com.ahead.* 订阅了所有以该 pattern 开头的的频道,目前与这个模式匹配的两个频道是 com.ahead.juccom.ahead.thread 这两个频道,现在只要有发布者向 com.ahead.juccom.ahead.thread 或者其它以 com.ahead. 开头的频道发布消息,除了订阅了这些频道的粉丝(订阅者)可以收到对应频道的消息以外,发布到以com.ahead. 开头的这类频道的消息同时还会发送一份给订阅 com.ahead.* 模式的订阅者(因为频道与模式匹配)。

Tips: 通配符中?表示1个占位符,*表示任意个占位符(包括0),?*表示1个以上占位符。

3.1 订阅者基于模式的订阅

订阅模式的指令是 PSUBSCRIBE,如下表示 客户端订阅 com.ahead.* 模式:

1
2
3
4
5
SUBSCRIBE com.ahead.*
Reading messages... (press Ctrl-C to quit)
1) "psubscribe"     # 消息类型
2) "com.ahead.*"  # 订阅模式
3) (integer) 1      # 订阅数

等等接收 以 com.ahead. 为前缀的频道发布的消息。

2.2 发布者发布消息

发布者使用 `PUBLISH channel message 向指定 channel频道发布消息。

1
2
3
4
5
PUBLISH com.ahead.juc 'juc do job'
(integer) 1

PUBLISH com.ahead.thread 'thread do job'
(integer) 1

发布者发布的消息并不会持久化,消息发布之后有新的订阅的话,新的订阅者只能接收后续发布到该频道的消息。

3.3 模式订阅者接受消息

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
PSUBSCRIBE com.ahead.*
Reading messages... (press Ctrl-C to quit)
1) "psubscribe"
2) "com.ahead.*"
3) (integer) 1
1) "pmessage"
2) "com.ahead.*"
3) "com.ahead.juc"
4) "juc do job"
1) "pmessage"
2) "com.ahead.*"
3) "com.ahead.thread"
4) "thread do job"

如果一个客户端同时订阅了与模式匹配的模式和频道,那么客户端会收到多次消息

3.4 退订模式

使用 PUNSUBSCRIBE 命令可以退订指定的模式,这个命令执行的是订阅模式的反操作:根据模式从 pubsub_patterns字典中找到客户端链表,遍历链表将当前客户端删除。

四、Redis 发布/订阅的实现原理

Redis 源码注释:https://github.com/huangz1990/redis-3.0-annotated

4.1 订阅频道(channel)的实现原理

Redis 源码使用 redis.h 中的 struct redisServer 结构体来维护每个服务器进程的服务器状态,其中 pubsub_channels 属性字段是一个字典,用于保存订阅频道的信息,所有 channel 的信息就储存在 redisServer 这个结构中:

1
2
3
4
5
6
struct redisServer {
  ...
  /* Pubsub */
   dict *pubsub_channels;
  ...
}

pubsub_channels 字典的键就是一个个的 channel ,而字典的值则是一个链表,链表中保存了所有订阅这个 channel 的客户端。

例如,在一个 redisServer 实例中,有一个叫做 channel_2 的频道,这个频道同时被client 3、client 4 和 client 5 三个客户端订阅,那么这个 redisServer 结构看起来应该是这样子:

可以看出,实现 SUBSCRIBE 订阅命令的关键,就是将客户端添加到给定 channel 的订阅链表中。

函数 pubsubSubscribeChannelSUBSCRIBE 订阅命令的底层实现,它完成了将客户端添加到订阅链表中的工作:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
// 订阅指定频道,订阅成功返回 1,如果已经订阅过,返回 0 
int pubsubSubscribeChannel(redisClient *c, robj *channel) { 
    struct dictEntry *de; 
    list *clients = NULL; 
    int retval = 0; 
    /* Add the channel to the client -> channels hash table */ 
    // dictAdd 在添加新元素成功时返回 DICT_OK 
    // 因此这个判断句表示,如果新订阅 channel 成功,那么 。。。 
    if (dictAdd(c->pubsub_channels,channel,NULL) == DICT_OK) { 
        retval = 1; 
        incrRefCount(channel); 
        /* Add the client to the channel -> list of clients hash table */ 
        // 将 client 添加到订阅给定 channel 的链表中 
        // 这个链表是一个哈希表的值,哈希表的键是给定 channel 
        // 这个哈希表保存在 server.pubsub_channels 里 
        de = dictFind(server.pubsub_channels,channel); 
        if (de == NULL) { 
        // 如果 de 等于 NULL 
        // 表示这个客户端是首个订阅这个 channel 的客户端 
        // 那么创建一个新的列表, 并将它加入到哈希表中 
        clients = listCreate(); 
        dictAdd(server.pubsub_channels,channel,clients); 
        incrRefCount(channel); 
        } else { 
            // 如果 de 不为空,就取出这个 clients 链表 
            clients = dictGetVal(de); 
        } 
        // 将客户端加入到链表中 
        listAddNodeTail(clients,c); 
    } 
    /* Notify the client */ 
    addReply(c,shared.mbulkhdr[3]); 
    addReply(c,shared.subscribebulk); 
    // 返回订阅的频道 
    addReplyBulk(c,channel); 
    // 返回客户端当前已订阅的频道和模式数量的总和 
    addReplyLongLong(c,dictSize(c->pubsub_channels)+listLength(c->pubsub_patterns)); 
    return retval; 
}

4.2 订阅模式(pattern)的实现原理

除了直接订阅给定 channel 外,还可以使用 PSUBSCRIBE 订阅一个模式(pattern),订阅一个模式等同于订阅所有匹配这个模式的 channel 。

redisServer.pubsub_channels 属性类似,redisServer.pubsub_patterns 属性用于保存所有被订阅的模式,和 pubsub_channels 不同的是,pubsub_patterns 是一个链表(而不是字典):

1
2
3
4
5
struct redisServer { 
    // 省略 ... 
    list *pubsub_patterns; // A list of pubsub_patterns 
    // 省略 ... 
    };

pubsub_patterns 的每一个节点都是一个 pubsubPattern 结构的实例,它保存了被订阅的模式,以及订阅这个模式的客户客户端:

1
2
3
4
typedef struct pubsubPattern { 
    redisClient *client; 
    robj *pattern; 
} pubsubPattern;

例如,在一个 redisServer 实例中,有一个叫做 com.* 的模式同时被客户端client 1、client 2 和 client 3 订阅,那么这个 redisServer 结构看起来应该是这样子:

可以看出,实现 PSUBSCRIBE 命令的关键,就是将客户端和订阅的模式添加到 redisServer.pubsub_patterns 当中。

pubsubSubscribePattern 函数 是 PSUBSCRIBE 的底层实现,它将客户端和所订阅的模式添加到 redisServer.pubsub_patterns 当中:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
// 订阅指定模式,订阅成功返回 1 ,如果已经订阅过,返回 0 
int pubsubSubscribePattern(redisClient *c, robj *pattern) { 
    int retval = 0; 
    // 向 c->pubsub_patterns 中查找指定 pattern 
    // 如果返回值为 NULL ,说明这个 pattern 还没被这个客户端订阅过 
    if (listSearchKey(c->pubsub_patterns,pattern) == NULL) { 
        retval = 1; 
        // 添加 pattern 到客户端 pubsub_patterns 
        listAddNodeTail(c->pubsub_patterns,pattern); 
        incrRefCount(pattern); 
        // 将 pattern 添加到服务器 
        pubsubPattern *pat; 
        pat = zmalloc(sizeof(*pat)); 
        pat->pattern = getDecodedObject(pattern); 
        pat->client = c; 
        listAddNodeTail(server.pubsub_patterns,pat); 
    } 
    /* Notify the client */ 
    addReply(c,shared.mbulkhdr[3]); 
    addReply(c,shared.psubscribebulk); 
    // 返回被订阅的模式 
    addReplyBulk(c,pattern); 
    // 返回客户端当前已订阅的频道和模式数量的总和 
    addReplyLongLong(c,dictSize(c->pubsub_channels)+listLength(c->pubsub_patterns)); 
    return retval; 
 }

4.3 Redis PUBLISH 发布命令的实现原理

使用 PUBLISH 命令向订阅者发送消息,需要执行以下两个步骤:

  1. 使用给定的频道作为键,在 redisServer.pubsub_channels 字典中查找记录了订阅这个频道的所有客户端的链表,遍历这个链表,将消息发布给所有订阅者;
  2. 遍历 redisServer.pubsub_patterns 链表,将链表中的模式和给定的频道进行匹配,如果匹配成功,那么将消息发布到相应模式的客户端当中;

例如,有两个客户端分别订阅 it.news 频道和 it.* 模式,当执行消息发布命令 PUBLISH it.news "hello moto" 的时候,it.news 频道的订阅者会在步骤 1 收到信息,而当 PUBLISH 进行到步骤 2 的时候,it.* 模式的订阅者也会收到信息。

PUBLISH 命令的实现由 pubsubPublishMessage 函数完成,它的完整定义如下:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
// 发送消息 
int pubsubPublishMessage(robj *channel, robj *message) {
    int receivers = 0; 
    struct dictEntry *de; 
    listNode *ln; 
    listIter li; 
    /* Send to clients listening for that channel */ 
    // 向所有频道的订阅者发送消息 
    de = dictFind(server.pubsub_channels,channel); 
    if (de) {
        list *list = dictGetVal(de); // 取出所有订阅者 
        listNode *ln; 
        listIter li; 
        // 遍历所有订阅者, 向它们发送消息 
        listRewind(list,&li); 
        while ((ln = listNext(&li)) != NULL) { 
            redisClient *c = ln->value; 
            addReply(c,shared.mbulkhdr[3]); 
            addReply(c,shared.messagebulk); 
            addReplyBulk(c,channel); // 打印频道名 
            addReplyBulk(c,message); // 打印消息 
            receivers++; // 更新接收者数量 
        } 
    } 
    /* Send to clients listening to matching channels */ 
    // 向所有被匹配模式的订阅者发送消息 
    if (listLength(server.pubsub_patterns)) { 
        listRewind(server.pubsub_patterns,&li); // 取出所有模式 
        channel = getDecodedObject(channel); 
        while ((ln = listNext(&li)) != NULL) { 
            pubsubPattern *pat = ln->value; // 取出模式 
            // 如果模式和 channel 匹配的话 
            // 向这个模式的订阅者发送消息 
            if (stringmatchlen((char*)pat->pattern->ptr, 
            sdslen(pat->pattern->ptr), 
            (char*)channel->ptr, 
            sdslen(channel->ptr),0)) { 
                addReply(pat->client,shared.mbulkhdr[4]); 
                addReply(pat->client,shared.pmessagebulk); 
                addReplyBulk(pat->client,pat->pattern); // 打印被匹配的模式 
                addReplyBulk(pat->client,channel); // 打印频道名 
                addReplyBulk(pat->client,message); // 打印消息 
                receivers++; // 更新接收者数量 
            } 
        } 
        decrRefCount(channel); // 释放用过的 channel 
    } 
    return receivers; // 返回接收者数量 
}

4.4 UNSUBSCRIBE 和 PUNSUBSCRIBE 的实现

UNSUBSCRIBEPUNSUBSCRIBE 分别是 SUBSCRIBEPSUBSCRIBE 的反操作,如果明白了 SUBSCRIBEPSUBSCRIBE 的工作机制的话,应该不难理解这两个反操作的原理,所以这里就省略详细的分析了,有兴趣的可以直接看代码。

五、Redis发布订阅的应用场景

5.1 哨兵间通信

redis 哨兵集群中,每个哨兵节点利用 Pub/Sub 发布订阅实现哨兵之间的相互发现彼此和找到 Slave。

哨兵与 Master 建立通信后,利用 master 提供发布/订阅机制在__sentinel__:hello发布自己的信息,比如 IP、端口……,同时订阅这个频道来获取其它哨兵的信息,就这样实现哨兵间通信。

5.2 消息队列

利用 Redis 发布订阅实现轻量级简单的 MQ 功能,实现上下游解耦,需要注意点是 Redis 发布订阅的消息不会被持久化,所以新订阅的客户端将收不到历史消息。

也不支持 ACK 机制,所以当前业务不能容忍这些缺点,那就使用专业的消息队列,如果能容忍那就能享受 Redis 快带来的优势。