RocketMQ重置消费位点浅析

重置消费位点

根据时间重置消费位点的操作在Rocket正常的执行逻辑中好像是不会出现的,貌似只有从控制台上使用重置功能这个逻辑才会触发。

我们来分析一下ta的实现过程。

入口

1
2
3
4
5
6
7
scss复制代码Rocket的运维指令都包装成了SubCommand对象,而重置消费位点对应的是ResetOffsetByTimeCommand对象
Rocket的MQAdmin启动时,调用initCommand(),将命令对象实例化并注册到subCommandList中。

接收到对应的命令之后findSubCommand()可以定位到对应的SubCommand对象,
并调用该命令对象的buildCommandlineOptions()进行参数的解析与组装,最后返回一个参数集合对象Options

真正的逻辑处理需要执行命令对象的execute();

ResetOffsetByTimeCommand.execute()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
css复制代码首先要根据时间戳获取到ConsumeQueue中的偏移量

调用栈如下:

ResetOffsetByTimeCommand.execute()
-> DefaultMQAdminExt.resetOffsetByTimestamp()
-> DefaultMQAdminExtImpl.resetOffsetByTimestamp()
-> MQClientAPIImpl.invokeBrokerToResetOffset()
-> RemotingClient.invokeSync()

假设没有服务端开发经验,也没有系统的研究过网络编程,调用链到此处就断开了。
想必你应该知道Rocket的各个组件通过Tcp建立通信连接,组件间的信息交换则是通过一个个RPC调用实现的。
Rocket组件间的交互信息的通讯协议专门定义了此数据帧的业务类型,所有类型在RequestCode中都有明确定义。
同样重置消费位点也有约定其业务类型枚举常量:RequestCode.INVOKE_BROKER_TO_RESET_OFFSET。
顺藤摸瓜你就可以找到此消息的处理器:
(这里默认你已经理解了Rocket中处理数据包的那套逻辑,即使不理解也没问题,跟着文中思路也可以找到此处)
AdminBrokerProcessor.processRequest()
-> AdminBrokerProcessor.resetOffset()
-> Broker2Client.resetOffset()
-> DefaultMessageStore.getOffsetInQueueByTime()
-> ConsumeQueue.getOffsetInQueueByTime()

根据时间戳获取到ConsumeQueue中的偏移量的实现就在这一个方法中ConsumeQueue.getOffsetInQueueByTime()

Broker2Client

对数据帧进行拆包解析之后其中就包含要重置消费位点的关键信息:topic、group、timeStamp。TopicConfigManager中保存了各个Topic的相关配置,所以很容易就可以根据topic的名字获取到对应配置。

此处主要是为了获取到ta的可写队列数目

关键代码片段:

1
2
3
4
5
6
7
8
9
10
11
12
13
java复制代码    Map<MessageQueue, Long> offsetTable = new HashMap<>();
for (int i = 0; i < topicConfig.getWriteQueueNums(); i++) {
/* i即是QueueId,构造出MessageQueue对象 */
MessageQueue mq = new MessageQueue();
mq.setBrokerName(this.brokerController.getBrokerConfig().getBrokerName());
mq.setTopic(topic);
mq.setQueueId(i);
/* 根据时间戳计算出ConsumeQueue中的偏移量 */
timeStampOffset = this.brokerController.getMessageStore()
.getOffsetInQueueByTime(topic, i, timeStamp);

offsetTable.put(mq, timeStampOffset);
}

getOffsetInQueueByTime()的实现也是相当精彩,最终会调用到
ConsumeQueue#getOffsetInQueueByTime()使用二分法定位出最终的偏移量。

到此时Broker已经计算出该Topic各个ConsumeQueu中指定时间点的偏移量,但是Client并没有任何感知。

于是跟之前的操作一样,Broker执行一次RPC调用告诉Client你的某Topic下某Group需要更新消费偏移量,以我本次传输给的数据为准

Broker Rpc Client

摘录一些关键代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
java复制代码public RemotingCommand resetOffset(String topic, String group, long timeStamp,
boolean isForce, boolean isC) {
/* 构造请求体 */
ResetOffsetRequestHeader requestHeader = new ResetOffsetRequestHeader();
requestHeader.setTopic(topic);
requestHeader.setGroup(group);
requestHeader.setTimestamp(timeStamp);
RemotingCommand request = RemotingCommand.createRequestCommand(
RequestCode.RESET_CONSUMER_CLIENT_OFFSET, requestHeader
);
ResetOffsetBody body = new ResetOffsetBody();
body.setOffsetTable(offsetTable);
request.setBody(body.encode());

/* 执行Rpc调用 */
this.brokerController.getRemotingServer().invokeOneway(entry.getKey(), request, 5000);
}

Client reset

原理同上我们可以RequestCode找到对应的处理逻辑

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
java复制代码public RemotingCommand resetOffset(ChannelHandlerContext ctx,
RemotingCommand request) throws RemotingCommandException {
ResetOffsetRequestHeader requestHeader =
(ResetOffsetRequestHeader) request.decodeCommandCustomHeader(ResetOffsetRequestHeader.class);
/* 序列化数据得到Message、消费偏移量的映射关系 */
Map<MessageQueue, Long> offsetTable = new HashMap<>();
if (request.getBody() != null) {
ResetOffsetBody body = ResetOffsetBody.decode(request.getBody(), ResetOffsetBody.class);
offsetTable = body.getOffsetTable();
}

/* 根据上述数据修改客户端本地消费进度 */
this.mqClientFactory.resetOffset(requestHeader.getTopic(), requestHeader.getGroup(), offsetTable);
return null;
}

MqClientFactory

MqClientFactory封装了 Rocket 网络处理 Api,是消息生产者、消息消费者、NameServ、Broker交换信息的网络通道。

同时也持有该Jvm实例下所有的Consumer实例

摘取MqClientFactory关键代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
java复制代码resetOffset(String topic, String group, Map<MessageQueue, Long> offsetTable) {
/* 根据Group从列表中搜索到对应的Consumer */
MQConsumerInner impl = this.consumerTable.get(group);

/* 如果是推模式则强转为DefaultMQPushConsumerImpl类型 */
if (impl != null && impl instanceof DefaultMQPushConsumerImpl) {
consumer = (DefaultMQPushConsumerImpl) impl;
} else {
/* 如果不是则直接结束 */
return;
}

/* 暂停消费 */
consumer.suspend();

/* 循环更新消费进度 */
while (iterator.hasNext()) {
MessageQueue mq = iterator.next();
Long offset = offsetTable.get(mq);
if (topic.equals(mq.getTopic()) && offset != null) {
try {
consumer.updateConsumeOffset(mq, offset);
} catch (Exception e) {
log.warn("reset offset failed. group={}, {}", group, mq, e);
}
}
}

/* 恢复工作 */
consumer.resume();
}

疑问:

resetOffset执行过程中发现消费者类型不是DefaultMQPushConsumerImpl则直接提前返回了,那拉模式下如何实现的消费位点重置呢?

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

0%