一、前言
发布订单系统是日常开发中经常会用到的功能。简单来说,就是发布者发布消息,订阅者就会接受到消息并进行相应的处理,如下图所示。
二、发布/订阅
Redis为我们提供了发布/订阅的功能模块PubSub,可以用于消息传递。
其中发布者publisher、订阅者subscriber都是redis客户端,channel则是redis服务器。
发布者publisher向channel发送消息,订阅该channel的subscriber就会接收到消息。
2.1 常用命令
2.1.1 订阅频道subscribe
1 | vbnet复制代码127.0.0.1:6379> subscribe test1 test2 |
发布消息publish
1 | ruby复制代码127.0.0.1:6379> publish test1 hello |
订阅test1、test2的客户端会收到消息
1 | arduino复制代码1) "message" |
2.1.2 订阅模式psubscribe
按照上述这种方式,如果订阅者subscriber想要订阅多个channel则需要同时指定多个channel的名称,redis为了解决这个问题提供psubscribe模式匹配这种订阅方式,可以通过通配符的方式匹配频道。
1 | ruby复制代码127.0.0.1:6379> psubscribe ch* |
发布消息
1 | ruby复制代码127.0.0.1:6379> publish cha hello |
之前订阅ch*的客户端就会收到cha频道和china频道的消息,这样就一次性订阅多个频道
1 | arduino复制代码1) "pmessage" |
2.2 实现原理
redis服务端存储了订阅频道/模式的客户端列表
1 | arduino复制代码struct redisServer { |
相当于如果客户端订阅一个频道,那么服务端的pubsub_channels就会存储一条数据,pubsub_channels其实是一个链表,key对应channel,value对应客户端列表,根据key订阅的频道,就可以找到订阅该频道的所有客户端。
同时如果客户端订阅一个模式,pubsub_patterns也会新增一条数据,记录当前客户端订阅的模式,pubsub_patterns也有自己的数据结构,其中就包含了客户端以及模式。
1 | arduino复制代码typedef struct pubsubPattern { |
当发布者向某个频道发布消息时,就会遍历pubsub_channels找到订阅该频道的客户端列表,依次向这些客户端发送消息。
然后遍历pubsub_patterns找到符合当前频道的模式,同时找到模式对应的客户端,然后向客户端发送消息。
三、Stream
虽然Redis提供了发布/订阅的功能,但是并不完善,导致基本没有合适的场景能够使用。
PubSub缺点:
- 订阅者如果部署多个节点,会出现重复消息的情况。
- 没有ack机制,消息容易发生丢失。如果在订阅消息的期间有消费者宕机了,那么后续他重连之后也无法接收到宕机这段时间内发布的消息了。
- 消息不会持久化。一旦Redis服务端宕机了,所有的消息都会丢失。
直到Redis5.0出现之后,出现了Stream这种数据结构,才终于完善了Redis的消息机制。
Stream实际上就是一个消息列表,只是他几乎实现了消息队列所需要的所有功能,包括:
- 消息ID的序列化生成
- 消息遍历
- 消息的阻塞和非阻塞读取
- 消息的分组消费
- 未完成消息的处理
- 消息队列监控
同时需要注意的是Stream只是一个数据结构,他不会主动把消息推送给消费者,需要消费者主动来消费数据。
每个Stream都有唯一的名称,它就是Redis的key,首次使用 xadd 指令追加消息时自动创建。
常见操作命令如下表:
命令名称 | 命令格式 | 描述 |
---|---|---|
xadd | xadd key id<*> field1 value1 | 将指定消息追加到指定队列(key)中,*表示自动生成id(当前时间+序列号) |
xread | xread [COUNT count] [BLOCK milliseconds] STREAMS key [key …] ID [ID …] | 从消息队列中读取,COUNT:读取条数,BLOCK:阻塞读(默认不阻塞),key:队列名称,id:消息ID(起始ID) |
xrange | xrange key start end [COUNT] | 读取队列中给定ID范围的消息,COUNT:返回消息条数(消息id从小到大) |
xrevrange | xrevrange key start end [COUNT] | 读取队列中给定ID范围的消息,COUNT:返回消息条数(消息id从大到小) |
xdel | xdel key id | 删除队列的消息 |
xgroup create | xgroup create key groupname id | 创建一个新的消费组 |
xgroup destroy | xgroup destroy key groupname | 删除指定消费组 |
xgroup delconsumer | xgroup delconsumer key groupname cname | 删除指定消费组中的指定消费者 |
xgroup setid | xgroup setid key id | 修改指定消息的最大id |
xreadgroup | xreadgroup group groupname consumer COUNT streams key | 消费消费组的数据(consumer不存在则创建) |
3.1 使用示例
3.1.1 新增、读取消息
1 | makefile复制代码// 新增消息 队列名:mq 数据:score=100 |
如果客户端希望知道自身消费到第几条数据了,那么就需要记录一下当前消费的消息ID,下次再次消费的时候就从上次消费的消息ID开始读取数据即可。
3.2 消费组
消费组中多了一个游标last_delivered_id,表示当前消费到了哪一条数据。同时所有的数据都是待处理消息(PEL),只有消费者处理完毕之后使用ack指令告知redis服务器,数据才会从PEL中移除,确认后的消息就无法再次消费。
1 | php复制代码// 查看当前消息队列中有多少数据 |
3.2 消息队列过长
如果接收到的消息比较多,为了避免Stream过长,可以选择指定Stream的最大长度,一旦到达了最大长度,就会从最早的消息开始清除,保证Stream中最新的消息。
本文转载自: 掘金