其他

Redis 发布订阅机制使用场景和实现

2017-06-23 拿客-三产 开源中国


简介

Redis提供了基于“发布/订阅”模式的消息机制,此种模式下,消息发布者和订阅者不进行直接通信,发布者客户端向指定的频道(channel)发布消息,订阅该频道的每个客户端都可以收到该消息(频道没有”创建“的概念,可以直接订阅、亦可直接发布消息)。


命令


发布消息

➤ PUBLISH

自2.0.0可用。


时间复杂度:O(N+M),其中 N 是频道 channel 的订阅者数量,而 M 则是使用模式订阅(subscribed patterns)的客户端的数量。

语法:PUBLISH channel message


说明:

将信息 message 发送到指定的频道 channel 。


返回值:

接收到信息 message 的订阅者数量。


示例:

# 向没有订阅者的频道发送信息

coderknock> PUBLISH new_channel "test publish"

(integer) 0

# 向有订阅者的频道发送信息

coderknock>  PUBLISH channel1 "new channel1"

(integer) 1


订阅

➤ SUBSCRIBE

自2.0.0可用。

时间复杂度:O(N),其中 N 是订阅的频道的数量。


语法:SUBSCRIBE channel [channel ...]


说明:

订阅给定的一个或多个频道的信息。


返回值:

接收到的信息(请参见下面的代码说明)。


示例:

coderknock> SUBSCRIBE channel1

Reading messages... (press Ctrl-C to quit)

# 订阅成功

1) "subscribe"      # 返回值的类型:显示订阅成功

2) "channel1"       # 订阅的频道名字

3) (integer) 1      # 目前已订阅的频道数量

# 接收到信息

1) "message"        # 返回值的类型:信息

2) "channel1"       # 来源(从哪个频道发送过来)

3) "new channel1"   # 信息内容


➤  PSUBSCRIBE

自2.0.0可用。


时间复杂度:O(N), N 是订阅的模式的数量。


语法:PSUBSCRIBE pattern [pattern ...]


说明:

订阅一个或多个符合给定模式的频道。

每个模式以 * 作为匹配符,比如 it* 匹配所有以 it 开头的频道( it.news 、 it.blog 、 it.tweets 等等), news.* 匹配所有以 news. 开头的频道( news.it 、 news.global.today 等等),诸如此类。


返回值:

接收到的信息(请参见下面的代码说明)。


示例:

coderknock> PSUBSCRIBE news.* coderknock.*

                           Reading messages... (press Ctrl-C to quit)

# 订阅成功

1) "psubscribe"     # 返回值的类型:显示订阅成功

2) "news.*"         # 订阅的频道名字

3) (integer) 1      # 目前已订阅的频道数量

1) "psubscribe"

2) "coderknock.*"

3) (integer) 2

# 接收到信息

1) "pmessage"       # 返回值的类型:信息

2) "news.*"         # 来源频道模式

3) "news.123"       # 具体频道(从哪个频道发送过来)

4) "123             # 消息


1) "pmessage"

2) "news.*"

3) "news.222"

4) "222"


查看发布/订阅系统状态

➤  PUBSUB

自2.8.0可用。

时间复杂度:O(N), N 是订阅的模式的数量。


语法:PUBSUB subcommand [argument [argument ...]]


说明:

PUBSUB 是一个查看订阅与发布系统状态的内省命令, 它由数个不同格式的子命令组成, 以下将分别对这些子命令进行介绍。


➤  PUBSUB CHANNELS [pattern]


说明:

列出当前的活跃频道。


活跃频道指的是那些至少有一个订阅者的频道, 订阅模式的客户端不计算在内。


pattern 参数是可选的:

● 如果不给出 pattern 参数,那么列出订阅与发布系统中的所有活跃频道。


● 如果给出 pattern 参数,那么只列出和给定模式 pattern 相匹配的那些活跃频道。


复杂度: O(N) , N 为活跃频道的数量(对于长度较短的频道和模式来说,将进行模式匹配的复杂度视为常数)。


返回值: 一个由活跃频道组成的列表。


示例:

# 客户端1

coderknock> SUBSCRIBE coderknock sanchan news test

Reading messages... (press Ctrl-C to quit)

1) "subscribe"

2) "coderknock"

3) (integer) 1

1) "subscribe"

2) "sanchan"

3) (integer) 2

1) "subscribe"

2) "news"

3) (integer) 3

1) "subscribe"

2) "test"

3) (integer) 4

# 客户端2

coderknock> SUBSCRIBE coderknock sanchan blog oschina

Reading messages... (press Ctrl-C to quit)

1) "subscribe"

2) "coderknock"

3) (integer) 1

1) "subscribe"

2) "sanchan"

3) (integer) 2

1) "subscribe"

2) "blog"

3) (integer) 3

1) "subscribe"

2) "oschina"

3) (integer) 4

# 统计出的有订阅的频道

coderknock> PUBSUB CHANNELS

1) "coderknock"

2) "sanchan"

3) "blog"

4) "news"

5) "oschina"

6) "test"

# 统计频道名包含 o 的频道

coderknock>  PUBSUB CHANNELS *o*

                        1) "coderknock"

2) "blog"

3) "oschina"

#我们关闭客户端1,只有客户端1订阅的 "news" "test" 频道消失

coderknock> PUBSUB CHANNELS

1) "blog"

2) "oschina"

3) "coderknock"

4) "sanchan"


# 重新订阅

# 客户端1

coderknock>PSUBSCRIBE news.* coderknock.*

                          Reading messages... (press Ctrl-C to quit)

1) "psubscribe"

2) "news.*"

3) (integer) 1

1) "psubscribe"

2) "coderknock.*"

3) (integer) 2

# 客户端2

coderknock> PSUBSCRIBE sanchan.* coderknock.*

                           Reading messages... (press Ctrl-C to quit)

1) "psubscribe"

2) "sanchan.*"

3) (integer) 1

1) "psubscribe"

2) "coderknock.*"

3) (integer) 2

# 客户端3

coderknock>  PSUBSCRIBE sanchan.* coderknock.* news blog

Reading messages... (press Ctrl-C to quit)

1) "psubscribe"

2) "sanchan.*"

3) (integer) 1

1) "psubscribe"

2) "coderknock.*"

3) (integer) 2

1) "psubscribe"

2) "news"

3) (integer) 3

1) "psubscribe"

2) "blog"

3) (integer) 4

# 说明不会统计 PSUBSCRIBE 订阅

coderknock> PUBSUB CHANNELS

(empty list or set)


➤  PUBSUB NUMSUB [channel-1 ... channel-N]


说明:

返回给定频道的订阅者数量, 订阅模式的客户端不计算在内。


复杂度:O(N) , N 为给定频道的数量。


返回值: 一个多条批量回复(Multi-bulk reply),回复中包含给定的频道,以及频道的订阅者数量。 格式为:频道 channel-1 , channel-1的订阅者数量,频道 channel-2 , channel-2 的订阅者数量,诸如此类。 回复中频道的排列顺序和执行命令时给定频道的排列顺序一致。 不给定任何频道而直接调用这个命令也是可以的, 在这种情况下, 命令只返回一个空列表。


示例:

# 订阅的客户端


#客户端1

coderknock>  SUBSCRIBE coderknock sanchan blog oschina

Reading messages... (press Ctrl-C to quit)

1) "subscribe"

2) "coderknock"

3) (integer) 1

1) "subscribe"

2) "sanchan"

3) (integer) 2

1) "subscribe"

2) "blog"

3) (integer) 3

1) "subscribe"

2) "oschina"

3) (integer) 4

#客户端2

coderknock> SUBSCRIBE coderknock sanchan news test

Reading messages... (press Ctrl-C to quit)

1) "subscribe"

2) "coderknock"

3) (integer) 1

1) "subscribe"

2) "sanchan"

3) (integer) 2

1) "subscribe"

2) "news"

3) (integer) 3

1) "subscribe"

2) "test"

3) (integer) 4

#客户端3

coderknock> PSUBSCRIBE sanchan.* coderknock.* news blog

Reading messages... (press Ctrl-C to quit)

1) "psubscribe"

2) "sanchan.*"

3) (integer) 1

1) "psubscribe"

2) "coderknock.*"

3) (integer) 2

1) "psubscribe"

2) "news"

3) (integer) 3

1) "psubscribe"

2) "blog"

3) (integer) 4

# 统计,可以看到 PSUBSCRIBE 订阅同样不会被统计而且不支持模式匹配 

coderknock> PUBSUB NUMSUB coderknock news blog oschina test sanchan coderknock.*

                                                                        1) "coderknock"

2) (integer) 2

3) "news"

4) (integer) 1

5) "blog"

6) (integer) 1

7) "oschina"

8) (integer) 1

9) "test"

10) (integer) 1

11) "sanchan"

12) (integer) 2

13) "coderknock.*"

14) (integer) 0


➤  PUBSUB NUMPAT


说明:

返回 订阅模式 的数量。

注意, 这个命令返回的不是订阅模式的客户端的数量, 而是客户端订阅的所有模式的数量总和。


复杂度:O(N) , N 为给定频道的数量。


返回值: 一个整数回复(Integer reply)。


示例:

#采用上面示例中的订阅客户端,这里统计的 订阅模式 包含 PSUBSCRIBE 订阅

coderknock> PUBSUB NUMPAT

(integer) 8

# 添加一个客户端4

coderknock>  PSUBSCRIBE blog*

                            Reading messages... (press Ctrl-C to quit)

1) "psubscribe"

2) "blog*"

3) (integer) 1

# 统计会发现模式增加 1

coderknock> PUBSUB NUMPAT

(integer) 9


退订

➤  UNSUBSCRIBE

自2.0.0可用。

时间复杂度:O(N) , N 是客户端已订阅的频道的数量。


语法:UNSUBSCRIBE [channel [channel ...]]


说明:

指示客户端退订给定的频道。

如果没有频道被指定,也即是,一个无参数的 UNSUBSCRIBE 调用被执行,那么客户端使用 SUBSCRIBE 命令订阅的所有频道都会被退订。在这种情况下,命令会返回一个信息,告知客户端所有被退订的频道。


返回值:

这个命令在不同的客户端中有不同的表现。


示例:

在命令行中该命令无法测试(订阅后命令行会阻塞),我们使用 python 进行测试:

import redis

import time


r = redis.StrictRedis(host='127.0.0.1', password='admin123', port=6379, db=0)

p = r.pubsub()

p.subscribe("coderknock", "sanchan", "python")

"""

# 基于上一个命令的示例,此时在客户端中执行

coderknock> PUBSUB CHANNELS

1) "python"

2) "blog"

3) "news"

4) "test"

5) "oschina"

6) "coderknock"

7) "sanchan"

# 说明订阅成功

# 统计订阅数量

coderknock> PUBSUB NUMSUB coderknock news blog oschina test sanchan coderknock.* python

 1) "coderknock"

 2) (integer) 3

 3) "news"

 4) (integer) 1

 5) "blog"

 6) (integer) 1

 7) "oschina"

 8) (integer) 1

 9) "test"

10) (integer) 1

11) "sanchan"

12) (integer) 3

13) "coderknock.*"

14) (integer) 0

15) "python"

16) (integer) 1

"""

time.sleep(10)  # 休眠 10 秒

p.unsubscribe("sanchan")

"""

# 此时取消了一个 sanchan 的订阅

coderknock> PUBSUB NUMSUB coderknock news blog oschina test sanchan coderknock.* python

 1) "coderknock"

 2) (integer) 3

 3) "news"

 4) (integer) 1

 5) "blog"

 6) (integer) 1

 7) "oschina"

 8) (integer) 1

 9) "test"

10) (integer) 1

11) "sanchan"

12) (integer) 2

13) "coderknock.*"

14) (integer) 0

15) "python"

16) (integer) 1

"""

time.sleep(10)  # 休眠 10 秒

p.unsubscribe()

"""

# 此时该 python 中订阅全部退订

coderknock> PUBSUB NUMSUB coderknock news blog oschina test sanchan coderknock.* python

 1) "coderknock"

 2) (integer) 2

 3) "news"

 4) (integer) 1

 5) "blog"

 6) (integer) 1

 7) "oschina"

 8) (integer) 1

 9) "test"

10) (integer) 1

11) "sanchan"

12) (integer) 2

13) "coderknock.*"

14) (integer) 0

15) "python"

16) (integer) 0

"""


➤  UNSUBSCRIBE

自2.0.0可用。

时间复杂度:O(N+M) ,其中 N 是客户端已订阅的模式的数量, M 则是系统中所有客户端订阅的模式的数量。


语法:PUNSUBSCRIBE [pattern [pattern ...]]


说明:

指示客户端退订所有给定模式。

如果没有模式被指定,也即是,一个无参数的 PUNSUBSCRIBE 调用被执行,那么客户端使用 PSUBSCRIBE 命令订阅的所有模式都会被退订。在这种情况下,命令会返回一个信息,告知客户端所有被退订的模式


返回值:


这个命令在不同的客户端中有不同的表现。


有关订阅命令有两点需要注意:

● 客户端在执行订阅命令之后进入了订阅状态,只能接收 SUBSCRIBE 、PSUBSCRIBE 、UNSUBSCRIBE 、PUNSUBSCRIBE 四个命令。


● 开启的订阅客户端,无法收到该频道之前的消息,因为 Redis 不会对发布的消息进行持久化。

和很多专业的消息队列系统(例如Kafka、RocketMQ)相比,Redis的发布订阅略显粗糙,例如无法实现消息堆积和回溯。但胜在足够简单,如果当前场景可以容忍的这些缺点,也不失为一个不错的选择。


使用场景

聊天室、公告牌、服务之间利用消息解耦都可以使用发布订阅模式



推荐阅读

33 款主宰 2017 iOS 开发的开源库

Nginx+Keepalived(双机热备)搭建高可用负载均衡环境(HA)

十大 Node.js 端到端测试框架,快速提升工作效率

从“某公司专利”事件,谈谈开源协议与知识产权

“放码过来”邀您亮“项”,一不小心就火了!

点击“阅读原文”查看更多精彩内容

您可能也对以下帖子感兴趣

文章有问题?点此查看未经处理的缓存