38 views

Redis发布/订阅机制

By | 2019年2月14日

相关命令:

PUBLISH 发布
SUBSCRIBE 订阅
PSUBSCRIBE 一种订阅符合给定模式的所有频道的方法
UNSUBSCRIBE 退订
PUNSUBSCRIBE 退订一个订阅的模式
这些命令被广泛用于构建即时通信应用,比如网络聊天室(chatroom)和实时广播、实时提醒等。

Redis相关源码文件:pubsub.c

使用
PUBLISH 命令用于向给定的频道发送信息,返回值为接收到信息的订阅者数量
redis> PUBLISH treehole “top secret here …”
(integer) 0
redis> PUBLISH chatroom “hi?”
(integer) 1
SUBSCRIBE 命令订阅给定的一个或多个频道:
redis> SUBSCRIBE chatroom
Reading messages… (press Ctrl-C to quit)
1) “subscribe” # 订阅反馈
2) “chatroom” # 订阅的频道
3) (integer) 1 # 目前客户端已订阅频道/模式的数量
1) “message” # 信息
2) “chatroom” # 发送信息的频道
3) “hi?” # 信息内容
SUBSCRIBE 的返回值当中, 1) “”subscribe””是订阅的反馈信息,1)”message “的则是订阅的频道所发送的信息。

SUBSCRIBE 还可以订阅多个频道,这样一来它接收到的信息就可能来自多个频道:

redis> SUBSCRIBE chatroom talk-to-jack
Reading messages… (press Ctrl-C to quit)
1) “subscribe” # 订阅 chatroom 的反馈
2) “chatroom”
3) (integer) 1
1) “subscribe” # 订阅 talk-to-jack 的反馈
2) “talk-to-jack”
3) (integer) 2
1) “message” # 来自 chatroom 的消息
2) “chatroom”
3) “yahoo”
1) “message” # 来自 talk-to-peter 的消息
2) “talk-to-jack”
3) “Goodmorning, peter.”
PSUBSCRIBE 提供了一种订阅符合给定模式的所有频道的方法,比如说,使用 it.* 为输入,就可以订阅所有以 it. 开头的频道,比如 it.news 、 it.blog 、 it.tweets ,诸如此类:
redis> PSUBSCRIBE it.*
Reading messages… (press Ctrl-C to quit)
1) “psubscribe”
2) “it.*”
3) (integer) 1
1) “pmessage”
2) “it.*” # 匹配的模式
3) “it.news” # 消息的来源频道
4) “Redis 2.6rc5 release” # 消息内容
1) “pmessage”
2) “it.*”
3) “it.blog”
4) “Why NoSQL matters”
1) “pmessage”
2) “it.*”
3) “it.tweet”
4) “@redis: when will the 2.6 stable release?”
当然, PSUBSCRIBE 也可以接受多个参数,从而匹配多种模式。

UNSUBSCRIBE 和 PUNSUBSCRIBE 负责退订给定的频道或模式。
内部实现
流程
当一个客户端通过 PUBLISH 命令向订阅者发送信息的时候,我们称这个客户端为发布者(publisher)。

而当一个客户端使用 SUBSCRIBE 或者 PSUBSCRIBE 命令接收信息的时候,我们称这个客户端为订阅者(subscriber)。

为了解耦发布者(publisher)和订阅者(subscriber)之间的关系,Redis 使用了 channel (频道)作为两者的中介 —— 发布者将信息直接发布给 channel ,而 channel 负责将信息发送给适当的订阅者,发布者和订阅者之间没有相互关系,也不知道对方的存在

具体实现
SUBSCRIBE 命令的实现
Redis 将所有接受和发送信息的任务交给 channel 来进行,而所有 channel 的信息就储存在 redisServer 这个结构中:

struct redisServer {
// ……
dict *pubsub_channels; // Map channels to list of subscribed clients
// ……
};
pubsub_channels 是一个字典,字典的键就是一个个 channel ,而字典的值则是一个链表,链表中保存了所有订阅这个 channel 的客户端。(haspmap之类)

实现 SUBSCRIBE 命令的关键,就是将客户端添加到给定 channel 的订阅链表中。

函数 pubsubSubscribeChannel 是 SUBSCRIBE 命令的底层实现,它完成了将客户端添加到订阅链表中的工作:

// 订阅指定频道
// 订阅成功返回 1 ,如果已经订阅过,返回 0
int pubsubSubscribeChannel(redisClient *c, robj *channel) {
struct dictEntry *de;
list *clients = NULL;
int retval = 0;
/* Add the channel to the client -> channels hash table */
// dictAdd 在添加新元素成功时返回 DICT_OK
// 因此这个判断句表示,如果新订阅 channel 成功,那么 。。。
if (dictAdd(c->pubsub_channels,channel,NULL) == DICT_OK) {
retval = 1;
incrRefCount(channel);
/* Add the client to the channel -> list of clients hash table */
// 将 client 添加到订阅给定 channel 的链表中
// 这个链表是一个哈希表的值,哈希表的键是给定 channel
// 这个哈希表保存在 server.pubsub_channels 里
de = dictFind(server.pubsub_channels,channel);
if (de == NULL) {
// 如果 de 等于 NULL
// 表示这个客户端是首个订阅这个 channel 的客户端
// 那么创建一个新的列表, 并将它加入到哈希表中
clients = listCreate();
dictAdd(server.pubsub_channels,channel,clients);
incrRefCount(channel);
} else {
// 如果 de 不为空,就取出这个 clients 链表
clients = dictGetVal(de);
}
// 将客户端加入到链表中
listAddNodeTail(clients,c);
}
/* Notify the client */
addReply(c,shared.mbulkhdr[3]);
addReply(c,shared.subscribebulk);
// 返回订阅的频道
addReplyBulk(c,channel);
// 返回客户端当前已订阅的频道和模式数量的总和
addReplyLongLong(c,dictSize(c->pubsub_channels)+listLength(c->pubsub_patterns));
return retval;
}
PSUBSCRIBE 命令的实现
和 redisServer.pubsub_channels 属性类似, redisServer.pubsub_patterns 属性用于保存所有被订阅的模式,和 pubsub_channels 不同的是, pubsub_patterns 是一个链表(而不是字典):

struct redisServer {
// ……
list *pubsub_patterns; // A list of pubsub_patterns
// ……
};

发表评论

电子邮件地址不会被公开。 必填项已用*标注