Redis发布订阅和Stream

一、前言

发布订单系统是日常开发中经常会用到的功能。简单来说,就是发布者发布消息,订阅者就会接受到消息并进行相应的处理,如下图所示。

消息队列.jpg

二、发布/订阅

Redis为我们提供了发布/订阅的功能模块PubSub,可以用于消息传递。

旅游行程制定流程图.jpg
其中发布者publisher、订阅者subscriber都是redis客户端,channel则是redis服务器。

发布者publisher向channel发送消息,订阅该channel的subscriber就会接收到消息。

2.1 常用命令

2.1.1 订阅频道subscribe

1
2
3
4
5
6
7
8
vbnet复制代码127.0.0.1:6379> subscribe test1 test2
Reading messages... (press Ctrl-C to quit)
1) "subscribe"
2) "test1"
3) (integer) 1
1) "subscribe"
2) "test2"
3) (integer) 2

发布消息publish

1
2
3
4
ruby复制代码127.0.0.1:6379> publish test1 hello
(integer) 1
127.0.0.1:6379> publish test2 world
(integer) 1

订阅test1、test2的客户端会收到消息

1
2
3
4
5
6
arduino复制代码1) "message"
2) "test1"
3) "hello"
1) "message"
2) "test2"
3) "world"

2.1.2 订阅模式psubscribe

按照上述这种方式,如果订阅者subscriber想要订阅多个channel则需要同时指定多个channel的名称,redis为了解决这个问题提供psubscribe模式匹配这种订阅方式,可以通过通配符的方式匹配频道。

psubscribe.jpg

1
2
3
4
5
ruby复制代码127.0.0.1:6379> psubscribe ch*
Reading messages... (press Ctrl-C to quit)
1) "psubscribe"
2) "ch*"
3) (integer) 1

发布消息

1
2
3
4
ruby复制代码127.0.0.1:6379> publish cha hello
(integer) 1
127.0.0.1:6379> publish china world
(integer) 1

之前订阅ch*的客户端就会收到cha频道和china频道的消息,这样就一次性订阅多个频道

1
2
3
4
5
6
7
8
arduino复制代码1) "pmessage"
2) "ch*"
3) "cha"
4) "hello"
1) "pmessage"
2) "ch*"
3) "china"
4) "world"

2.2 实现原理

redis服务端存储了订阅频道/模式的客户端列表

1
2
3
4
5
6
arduino复制代码struct redisServer { 
...
dict *pubsub_channels; // redis服务端进程中维护的订阅频道的客户端信息,key就是channel,value就是客户端列表
list *pubsub_patterns; //redis server进程中维护的pattern;
...
};

相当于如果客户端订阅一个频道,那么服务端的pubsub_channels就会存储一条数据,pubsub_channels其实是一个链表,key对应channel,value对应客户端列表,根据key订阅的频道,就可以找到订阅该频道的所有客户端。

同时如果客户端订阅一个模式pubsub_patterns也会新增一条数据,记录当前客户端订阅的模式,pubsub_patterns也有自己的数据结构,其中就包含了客户端以及模式。

1
2
3
4
arduino复制代码typedef struct pubsubPattern {
   client *client; // 客户端
   robj *pattern; // 模式
} pubsubPattern;

当发布者向某个频道发布消息时,就会遍历pubsub_channels找到订阅该频道的客户端列表,依次向这些客户端发送消息。

然后遍历pubsub_patterns找到符合当前频道的模式,同时找到模式对应的客户端,然后向客户端发送消息。

三、Stream

虽然Redis提供了发布/订阅的功能,但是并不完善,导致基本没有合适的场景能够使用。

PubSub缺点:

  • 订阅者如果部署多个节点,会出现重复消息的情况。
  • 没有ack机制,消息容易发生丢失。如果在订阅消息的期间有消费者宕机了,那么后续他重连之后也无法接收到宕机这段时间内发布的消息了。
  • 消息不会持久化。一旦Redis服务端宕机了,所有的消息都会丢失。

直到Redis5.0出现之后,出现了Stream这种数据结构,才终于完善了Redis的消息机制

Stream实际上就是一个消息列表,只是他几乎实现了消息队列所需要的所有功能,包括:

  • 消息ID的序列化生成
  • 消息遍历
  • 消息的阻塞和非阻塞读取
  • 消息的分组消费
  • 未完成消息的处理
  • 消息队列监控

同时需要注意的是Stream只是一个数据结构,他不会主动把消息推送给消费者,需要消费者主动来消费数据

每个Stream都有唯一的名称,它就是Redis的key,首次使用 xadd 指令追加消息时自动创建。

常见操作命令如下表:

命令名称 命令格式 描述
xadd xadd key id<*> field1 value1 将指定消息追加到指定队列(key)中,*表示自动生成id(当前时间+序列号)
xread xread [COUNT count] [BLOCK milliseconds] STREAMS key [key …] ID [ID …] 从消息队列中读取,COUNT:读取条数,BLOCK:阻塞读(默认不阻塞),key:队列名称,id:消息ID(起始ID)
xrange xrange key start end [COUNT] 读取队列中给定ID范围的消息,COUNT:返回消息条数(消息id从小到大)
xrevrange xrevrange key start end [COUNT] 读取队列中给定ID范围的消息,COUNT:返回消息条数(消息id从大到小)
xdel xdel key id 删除队列的消息
xgroup create xgroup create key groupname id 创建一个新的消费组
xgroup destroy xgroup destroy key groupname 删除指定消费组
xgroup delconsumer xgroup delconsumer key groupname cname 删除指定消费组中的指定消费者
xgroup setid xgroup setid key id 修改指定消息的最大id
xreadgroup xreadgroup group groupname consumer COUNT streams key 消费消费组的数据(consumer不存在则创建)

3.1 使用示例

3.1.1 新增、读取消息

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
makefile复制代码// 新增消息 队列名:mq 数据:score=100
127.0.0.1:6379> xadd mq * score 100
"1627225715999-0"
127.0.0.1:6379> xadd mq * score 80
"1627225761166-0"

// 读取一条数据
127.0.0.1:6379> xread COUNT 1 STREAMS mq 0
1) 1) "mq"
  2) 1) 1) "1627225715999-0"
        2) 1) "score"
           2) "100"

// 获取mq从小到大的所有数据
127.0.0.1:6379> xrange mq - +
1) 1) "1627225715999-0"
  2) 1) "score"
     2) "100"
2) 1) "1627225761166-0"
  2) 1) "score"
     2) "80"
     
// 获取指定id范围的消息
127.0.0.1:6379> xrange mq 1627225761166-0 1627225761166-0
1) 1) "1627225761166-0"
  2) 1) "score"
     2) "80"

如果客户端希望知道自身消费到第几条数据了,那么就需要记录一下当前消费的消息ID,下次再次消费的时候就从上次消费的消息ID开始读取数据即可。

3.2 消费组

消费组中多了一个游标last_delivered_id,表示当前消费到了哪一条数据。同时所有的数据都是待处理消息(PEL),只有消费者处理完毕之后使用ack指令告知redis服务器,数据才会从PEL中移除,确认后的消息就无法再次消费。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
php复制代码// 查看当前消息队列中有多少数据
127.0.0.1:6379> xrange mq - +
1) 1) "1627225715999-0"
  2) 1) "score"
     2) "100"
2) 1) "1627226969615-0"
  2) 1) "score"
     2) "70"
     
// 消息队列mq创建消费组mqGroup
127.0.0.1:6379> xgroup create mq mqGroup 0
OK

// 消费消费组mqGroup中的一条数据,>的意思是指从当前消费组的游标last_delivered_id开始读取
// 读取过后消费id会递增,也就是下次读取的时候就会是下一跳数据
127.0.0.1:6379> xreadgroup GROUP mqGroup consumer1 COUNT 1 STREAMS mq >
1) 1) "mq"
  2) 1) 1) "1627225715999-0"
        2) 1) "score"
           2) "100"
127.0.0.1:6379> xreadgroup GROUP mqGroup consumer1 COUNT 1 STREAMS mq >
1) 1) "mq"
  2) 1) 1) "1627226969615-0"
        2) 1) "score"
           2) "70"
127.0.0.1:6379> xreadgroup GROUP mqGroup consumer1 COUNT 1 STREAMS mq >
(nil)

// 查看当前消息队列对应的消费组的情况
127.0.0.1:6379> xinfo groups mq
1) 1) "name"
  2) "mqGroup" // 消费组的名称
  3) "consumers"
  4) (integer) 1 // 消费者的数量
  5) "pending"
  6) (integer) 2 // 待处理的数据数量,如果仅仅只是读取了数据,但是没有告知redis,那么数据就依旧处于待处理状态
  7) "last-delivered-id"
  8) "1627226969615-0" // 当前已经读取到的消息ID

// ack确认指定消息,返回的数值就是确认的数量
127.0.0.1:6379> xack mq mqGroup 1627226969615-0
(integer) 1

// 查看状态
127.0.0.1:6379> xinfo groups mq
1) 1) "name"
  2) "mqGroup"
  3) "consumers"
  4) (integer) 1
  5) "pending"
  6) (integer) 1 // 待处理的消息数量从2变成了1
  7) "last-delivered-id"
  8) "1627226969615-0"

// 已经确认过的消息,就不能再次消费了
127.0.0.1:6379> xreadgroup GROUP mqGroup consumer1 COUNT 1 STREAMS mq 1627225715999-0
1) 1) "mq"
  2) (empty list or set)

// 消息列表中的消息数量变少了
127.0.0.1:6379> xreadgroup GROUP mqGroup consumer1 COUNT 1 STREAMS mq 0
1) 1) "mq"
  2) 1) 1) "1627225715999-0"
        2) 1) "score"
           2) "100"

3.2 消息队列过长

如果接收到的消息比较多,为了避免Stream过长,可以选择指定Stream的最大长度,一旦到达了最大长度,就会从最早的消息开始清除,保证Stream中最新的消息。

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

0%