likes
comments
collection
share

Redis Pub/Sub 快速入门与源码解析

作者站长头像
站长
· 阅读数 6

Redis 发布订阅:PubSub

Redis 实现了发布订阅模型,有三个角色:发布者,订阅者,频道。

频道用以解耦双方通信,订阅者通过订阅频道获取信息,发布者通过频道知道自己需要把信息发到哪。

频道也可以理解为消息类型,唯一确定某一类消息。

pubsub快速入门

1、基本pubsub模型

发布消息:publish

PUBLISH [channel] [message]

订阅消息:subscribe

SUBSCRIBE [channel1] [channel2] ...

取消订阅:unsubscribe

UNSUBSCRIBE [channel1] [channel2] ...
Redis Pub/Sub 快速入门与源码解析 Redis Pub/Sub 快速入门与源码解析

2、模式匹配相关

客户端可订阅 glob 样式的模式,以接收发送到与给定模式匹配的通道名的所有消息。

订阅模式:psubscribe

PSUBSCRIBE news.*

取消订阅模式:punsubscribe

PUNSUBSCRIBE news.*

3、分片相关:Sharded Pub/Sub

分片相关在 Redis 7.0 引入。

全局 pubsub 存在着广播风暴问题,即在集群中的一个节点发布了一条消息,需要广播给集群的所有其它节点。

Sharded Pub/Sub 对 shard channels 做了与 key 一样的分片计算。即在 shard channels 上发布的消息必须发给拥有该 channel 对应的 slot 的节点,而订阅者只需要和该节点及其从节点建立连接,即可收到消息。

Sharded Pub/Sub 不会在集群的节点间进行无效的广播,从而提高性能。

发布:spublish

PUBLISH [channel] [message]

订阅:ssubscribe

SSUBSCRIBE [channel]

取消订阅:sunsubscribe

PUNSUBSCRIBE [channel]

一条消息,多次匹配

一条消息,可能既与满足模式匹配,也满足基本模型的匹配,就会收到两条消息。

举个例子,如果订阅了:

SUBSCRIBE foo
PSUBSCRIBE f*

然后收到一个来自 foo 频道的消息,会同时收到两条消息。

一个 message ,一个 pmessage

消息传递模型:at-most-once

pubsub的消息至多传递一次,容易丢失,不可靠。

pubsub实现原理

维护频道:哈希表

Redis 用dict 来维护所有的频道,key:频道的名称,value:频道的订阅者

    // 三种 dict 分别对应三种 pubsub
    dict *pubsub_channels; 
    dict *pubsub_patterns;  
    dict *pubsubshard_channels;  
    int notify_keyspace_events; 
/* Events to propagate via Pub/Sub. This is an xor of NOTIFY_... flags. */
                                   

Redis启动时,在initServer中,会对它们做初始化,如下:

    server.pubsub_channels = dictCreate(&keylistDictType);
    server.pubsub_patterns = dictCreate(&keylistDictType);
    server.pubsubshard_channels = dictCreate(&keylistDictType);

频道的来源:subscribe

subscribe 对应的实现函数是 subscribeCommand

除了订阅频道的作用,如果要订阅的频道不存在,会创建频道(publish 不会创建频道)

int pubsubSubscribeChannel(client *c, robj *channel, pubsubtype type) {
    dictEntry *de;
    list *clients = NULL;
    int retval = 0;
    // 客户端维护订阅的频道
    if (dictAdd(type.clientPubSubChannels(c),channel,NULL) == DICT_OK) {
        retval = 1;
        incrRefCount(channel);
        de = dictFind(*type.serverPubSubChannels, channel);
        // Redis Server 端不存在该频道 创建
        if (de == NULL) {
            clients = listCreate();
            dictAdd(*type.serverPubSubChannels, channel, clients);
            incrRefCount(channel);
        } else {
            clients = dictGetVal(de);
        }
        // 将该 client 加入订阅列表
        listAddNodeTail(clients,c);
    }
    // 返回该客户端订阅的频道数量
    addReplyPubsubSubscribed(c,channel,type);
    return retval;
}

publish:发布消息

publish对应的实现函数是 publishCommand

// 查找频道是否存在
de = dictFind(server.pubsub_channels,channel);
    if (de) { //频道存在
        // ...
        //遍历频道对应的订阅者
        //向每个订阅者发送要发布的消息
        while ((ln = listNext(&li)) != NULL) {
            client *c = ln->value;
            addReplyPubsubMessage(c,channel,message,*type.messageBulk);
            updateClientMemUsageAndBucket(c);
            // ...
        }
    }
// ... shared pubsub
// ... 查找模式匹配的频道

pubsub应用场景

哨兵间通信

哨兵的通信机制是基于 pubsub 实现。例如:“sentinel:hello”频道,就是哨兵发现彼此存在的频道。

监控

基于 pubsub ,我们可以监控服务端的特定行为。

pubsub的潜在问题

不可靠性

pubsub的消息传递模型是至多一次,不会对消息做持久化,因此出现连接断开时,消息会丢失

如果需要保证可靠,请考虑 Redis Stream。

订阅客户端的限制

在 RESP2 中,订阅频道的客户端除了订阅等命令不能发出其它命令,只能通过 Ctrl-C 退出。

不过 Redis6.0 推出的 RESP3 已经解决了这个问题。你可以通过HELLO 3 切换到 RESP3 协议。