Redis Stream 和Kafka类似, 基本上是一个Append Only的日志数据结构. 他主要有两个特色: 1. BLOCK API 2. Consumer Group
Redis Stream 具体的使用在官网: Intro to Redis Stream说的很清楚了, 对于英文不好的可以参见Redis中文站的相关翻译, 这里主要探讨几个在看官网和实际使用中产生的疑问. (假定读者已经对于基本使用有一定的了解).
一 : Entry IDs
组成
使用XADD
命令创建一条记录
1 | 复制代码>XADD mystream * sensor-id 1234 temperature 19.8 |
这里返回的是Entry ID
, 用于唯一标识一条记录. 它由两部分组成:
1 | 复制代码<millisecondsTime>-<sequenceNumber> |
milliseconds time 部分为改Redis 节点根据当前时间戳产生的唯一ID, 实际可能出现:
- 时钟回拨, 当前时间比之前的要小
- 该毫秒存在多条记录
对于第二种情况, sequence number 代表这个时间点的第多少个记录. 默认从0开始, 没有sequenceNumber
的时候也默认为0. 第一种情况, 文档上说, 如果当前时间小于之前时间, 那么去之前的时间递增, 而不是使用现在时间. 从而保证有序.
数据有序
写入的数据必须要比当前的最大id要大.
比较id大小, 也是先比较时间戳, 然后是sequence number.
数据消费
Redis Stream支持三种消费方式:
- 多个消费者可以同时看到新消息的到来
- 通过时序的方式来看这段时间内产生的数据.
- 多个节点共同消费同一份数据, 一个消息只能被一个节点消费.
这三种分别对应XREAD
XRANGE, XREVRANGE
, XREADGROUP
三类命令. 这里简单概述下:
和Pub/Sub不同的是, Redis Stream保存消息历史. 即时客户端不在线, 客户端也可以拉取到所需要的数据. XREAD
可以读取可以读取或者监听某个entry id之后的数据. 其中读取支持COUNT
参数, 持续监听支持BLOCK
的方式.
1 | 复制代码127.0.0.1:6379> XREAD BLOCK 0 STREAMS mystream $ |
至于某个范围的数据就是 XRANGE
, XREVRANGE
, 其中支持COUNT
, - +
分别指代最小和最大的Entry ID, XREVRANGE
和XRANGE
一致.
在XRANGE的
start
和end
中, 如果不指定sequenceNumber
, start默认为0, end 默认为最大值
Consumer Group
It is very important to understand that Redis consumer groups have nothing to do from the point of view of the implementation with Kafka (TM) consumer groups, but they are only similar from the point of view of the concept they implement, so I decided to do not change terminology compared to the software product that initially popularized such idea.
基本概念和流程:
- 每条消息会被不同的Consumer消费, 不存在一条消息发往多个消费者的情况.
- 同一个Consumer Group的Consumer, 根据名字进行区分(大小写敏感). Consumer Group 保存所有的状态
- 每一个消费者组都有
first ID never consumed
. 当消费者请求数据, 只能获取到没有被发送的数据.(XREADGROUP >) - 使用消费者组消费, 需要使用
ACK
命令, 确认这条消息被正确消费, 可以从Consumer Group中排除了.(XACK) - 当消息被发往消费者, 还没有被
ACK
标记时, 这条消息会维护进PENDING
列表. 但是每个消费者只能看见发向自己的消息.(PEL和XREADGROUP)
XREADGROUP
1 | 复制代码127.0.0.1:6379> xreadgroup group mygroup alice count 1 STREAMS mystream > |
- 如果
ID
为>
, 只会接受没有发送给其他消费者的数据, 并且会更新last ID
- 如果
ID
为其他有效的数字ID时, 我们会获取 history of pending messages. 被发往消费者, 但是执行ACK
的数据
XPENDING和XCLAIM
某些消费者可能永久无法恢复, 但是有一些数据还是在PEL中. 这个时候我们可以通过XCLAIM
把它发到别的消费者的PEL中. 具体语法见文档.
Consumer Group整体处理流程
那么目前一个完整的Consumer Group处理流程是什么样子的呢?
- 初始化系统时, 检测Stream和Group是否存在, 如果不存在通过
XGROUP CREATE xxx $ MKSTREAM
的方式进行创建. - 项目启动时, 检测本Consumer中处在PEL中的消息, 并且进行处理. 保证消息会被ACK.(重试应该在代码中实现)
- 主流程: 设置ID为
>
的方式, 监听新数据的到来, 处理然后ACK. - BackUp: 定时扫描PEL(不指定Consumer), 将超出某个处理时间范围的数据进行重新处理(通过XREAD), 或者通过XCLAIM的方式指定给别的消费者(这样需要将第二步做成定时任务).
后续任务
- Redis分布式系统中id生成和保证有序
- Redis Stream为什么要设计成时间序列呢?
- 测试时钟回拨和相关ID生成
本文转载自: 掘金