重置消费位点
根据时间重置消费位点的操作在Rocket正常的执行逻辑中好像是不会出现的,貌似只有从控制台上使用重置功能这个逻辑才会触发。
我们来分析一下ta的实现过程。
入口
1 | scss复制代码Rocket的运维指令都包装成了SubCommand对象,而重置消费位点对应的是ResetOffsetByTimeCommand对象 |
ResetOffsetByTimeCommand.execute()
1 | css复制代码首先要根据时间戳获取到ConsumeQueue中的偏移量 |
Broker2Client
对数据帧进行拆包解析之后其中就包含要重置消费位点的关键信息:topic、group、timeStamp。TopicConfigManager中保存了各个Topic的相关配置,所以很容易就可以根据topic的名字获取到对应配置。
此处主要是为了获取到ta的可写队列数目
关键代码片段:
1 | java复制代码 Map<MessageQueue, Long> offsetTable = new HashMap<>(); |
getOffsetInQueueByTime()的实现也是相当精彩,最终会调用到
ConsumeQueue#getOffsetInQueueByTime()使用二分法定位出最终的偏移量。
到此时Broker已经计算出该Topic各个ConsumeQueu中指定时间点的偏移量,但是Client并没有任何感知。
于是跟之前的操作一样,Broker执行一次RPC调用告诉Client你的某Topic下某Group需要更新消费偏移量,以我本次传输给的数据为准
Broker Rpc Client
摘录一些关键代码
1 | java复制代码public RemotingCommand resetOffset(String topic, String group, long timeStamp, |
Client reset
原理同上我们可以RequestCode找到对应的处理逻辑
1 | java复制代码public RemotingCommand resetOffset(ChannelHandlerContext ctx, |
MqClientFactory
MqClientFactory封装了 Rocket 网络处理 Api,是消息生产者、消息消费者、NameServ、Broker交换信息的网络通道。
同时也持有该Jvm实例下所有的Consumer实例
摘取MqClientFactory关键代码:
1 | java复制代码resetOffset(String topic, String group, Map<MessageQueue, Long> offsetTable) { |
疑问:
resetOffset执行过程中发现消费者类型不是DefaultMQPushConsumerImpl则直接提前返回了,那拉模式下如何实现的消费位点重置呢?
本文转载自: 掘金