当前位置:Gxlcms > mysql > redis源代码分析20–发布/订阅

redis源代码分析20–发布/订阅

时间:2021-07-01 10:21:17 帮助过:5人阅读

redis的发布/订阅(publish/subscribe)功能类似于传统的消息路由功能,发布者发布消息,订阅者接收消息,沟通发布者和订阅者之间的桥梁是订阅的channel或者pattern。发布者向指定的publish或者pattern发布消息,订阅者阻塞在订阅的channel或者pattern。可以

redis的发布/订阅(publish/subscribe)功能类似于传统的消息路由功能,发布者发布消息,订阅者接收消息,沟通发布者和订阅者之间的桥梁是订阅的channel或者pattern。发布者向指定的publish或者pattern发布消息,订阅者阻塞在订阅的channel或者pattern。可以看到,发布者不会指定哪个订阅者才能接收消息,订阅者也无法只接收特定发布者的消息。这种订阅者和发布者之间的关系是松耦合的,订阅者不知道是谁发布的消息,发布者也不知道谁会接收消息。

redis的发布/订阅功能主要通过SUBSCRIBE、UNSUBSCRIBE、PSUBSCRIBE、PUNSUBSCRIBE 、PUBLISH五个命令来表现。其中SUBSCRIBE、UNSUBSCRIBE用于订阅或者取消订阅channel,而PSUBSCRIBE、PUNSUBSCRIBE用于订阅或者取消订阅pattern,发布消息则通过publish命令。

对于发布/订阅功能的实现,我们先来看看几个与此相关的结构。

struct redisServer {
    ---
   /* Pubsub */
   dict *pubsub_channels;/* Map channels to list of subscribed clients */
   list *pubsub_patterns;/* A list of pubsub_patterns */
   ---
}
typedef struct redisClient {
   ---
   dict *pubsub_channels; /* channels a client is interested in (SUBSCRIBE) */
   list *pubsub_patterns; /* patterns a client is interested in (SUBSCRIBE) */
} redisClient;

在redis的全局server变量(redisServer类型)中,channel和订阅者之间的关系用字典pubsub_channels来保存,特定channel和所有订阅者组成的链表构成pubsub_channels字典中的一项,即字典中的每一项可表示为(channel,订阅者链表);pattern和订阅者之间的关系用链表pubsub_patterns来保存,链表中的每一项可表示成(pattern,redisClient)组成的字典。

在特定订阅者redisClient的结构中,pubsub_channels保存着它所订阅的channel的字典,而订阅的模式则保存在链表pubsub_patterns中。

从上面的解释,我们再来看看订阅/发布命令的最坏时间复杂度(注意字典增删查改一项的复杂度为O(1),而链表的查删复杂度为O(N),从链表尾部增加一项的复杂度为O(1))。

SUBSCRIBE:

订阅者用SUBSCRIBE订阅特定channel,这需要在订阅者的redisClient结构中的pubsub_channels增加一项(复杂度为 O(1)),然后在redisServer 的pubsub_channels找到该channel(复杂度为O(1)),并在该channel的订阅者链表的尾部增加一项(复杂度为O(1),注意,如果pubsub_channels中没找到该channel,则插入的复杂度也同为O(1)),因此订阅者用SUBSCRIBE订阅特定 channel的最坏时间复杂度为O(1)。

UNSUBSCRIBE:

订阅者取消订阅时,需要先从订阅者的redisClient结构中的pubsub_channels删除一项(复杂度为O(1)),然后在 redisServer 的pubsub_channels找到该channel(复杂度为O(1)),然后在channel的订阅者链表中删除该订阅者(复杂度为O(1)),因此总的复杂度为O(N),N为特定channel的订阅者数。

PSUBSCRIBE:

订阅者用PSUBSCRIBE订阅pattern时,需要先在redisClient结构中的pubsub_patterns先查找是否已存在该 pattern(复杂度为O(N)),并在不存在的情况下往redisClient结构中的pubsub_patterns和redisServer结构中的pubsub_patterns链表尾部各增加一项(复杂度都为O(1)),因此,总的复杂度为O(N),其中N为订阅者已订阅的模式。

PUNSUBSCRIBE:

订阅者用PUNSUBSCRIBE取消对pattern的订阅时,需要先在redisClient结构中的pubsub_patterns链表中删除该 pattern(复杂度为O(N)),并在redisServer结构中的pubsub_patterns链表中删除订阅者和pattern组成的映射(复杂度为O(M),因此,总的复杂度为O(N+M),其中N为订阅者已订阅的模式,而M为系统中所有订阅者和所有pattern组成的映射数。

PUBLISH:

发布消息时,只会向特定channel发布,但该channel可能会匹配某个pattern。因此,需要先在redisServer结构中的 pubsub_channels找到该channel的订阅者链表(O(1)),然后发送给所有订阅者(复杂度为O(N)),然后查看 redisServer结构中的pubsub_patterns链表中的所有项,看channel是否和该项中的pattern匹配(复杂度为O(M))(注意,这并不包括模式匹配的复杂度),因此,总的复杂度为O(N+M),。其中N为该channel的订阅者数,而M为系统中所有订阅者和所有 pattern组成的映射数。另外,从这也可以看出,一个订阅者是可能多次收到同一个消息的。

解释了发布/订阅的算法后,其代码就好理解了,这里仅给出PUBLISH命令的处理函数publishCommand的代码,更多相关命令的代码请参看redis的源代码。

static void publishCommand(redisClient *c) {
    int receivers = pubsubPublishMessage(c->argv[1],c->argv[2]);
    addReplyLongLong(c,receivers);
}
/* Publish a message */
static int pubsubPublishMessage(robj *channel, robj *message) {
    int receivers = 0;
    struct dictEntry *de;
    listNode *ln;
    listIter li;
    /* Send to clients listening for that channel */
    de = dictFind(server.pubsub_channels,channel);
    if (de) {
        list *list = dictGetEntryVal(de);
        listNode *ln;
        listIter li;
        listRewind(list,&li);
        while ((ln = listNext(&li)) != NULL) {
            redisClient *c = ln->value;
            addReply(c,shared.mbulk3);
            addReply(c,shared.messagebulk);
            addReplyBulk(c,channel);
            addReplyBulk(c,message);
            receivers++;
        }
    }
    /* Send to clients listening to matching channels */
    if (listLength(server.pubsub_patterns)) {
        listRewind(server.pubsub_patterns,&li);
        channel = getDecodedObject(channel);
        while ((ln = listNext(&li)) != NULL) {
            pubsubPattern *pat = ln->value;
            if (stringmatchlen((char*)pat->pattern->ptr,
                                sdslen(pat->pattern->ptr),
                                (char*)channel->ptr,
                                sdslen(channel->ptr),0)) {
                addReply(pat->client,shared.mbulk4);
                addReply(pat->client,shared.pmessagebulk);
                addReplyBulk(pat->client,pat->pattern);
                addReplyBulk(pat->client,channel);
                addReplyBulk(pat->client,message);
                receivers++;
            }
        }
        decrRefCount(channel);
    }
    return receivers;
}

最后提醒一下,处于发布/订阅模式的client,是无法发布上述五种命令之外的命令(quit除外),这是在processCommand函数中检查的,可以参看前面命令处理章节对该函数的解释。

人气教程排行