盘点 MQ Kafka 一览

总文档 :文章目录

Github : github.com/black-ant

一 . 前言

之前对RabbitMQ 进行了简单的分析 , 这一篇来过一下 Kafka 的相关操作.

二 . 基础使用

kafka 是一个很常用的高性能消息队列 , 先看一下基础的使用

Maven 核心依赖

1
2
3
4
xml复制代码<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>

配置信息

1
2
3
4
5
6
7
8
9
10
11
12
properties复制代码spring.kafka.bootstrap-servers=127.0.0.1:9092
# 指定默认消费者group id
spring.kafka.consumer.group-id=ant
spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
# 配置消费者消息的key和value的编解码方式-consumer
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.enable-auto-commit=true
spring.kafka.consumer.auto-commit-interval=1000
spring.kafka.listener.missing-topics-fatal=false

生产消息

1
2
3
4
5
6
java复制代码    @Resource
private KafkaTemplate<String, String> kafkaTemplate;

public void produce() {
kafkaTemplate.send("start", "one", "are you ok?" + "----" + i);
}

消费消息

1
2
3
4
java复制代码@KafkaListener(id = "one", topics = "start", clientIdPrefix = "myClientId")
public void listener0(ConsumerRecord<?, ?> record) {
logger.info("------> this is in listerner 0:{}<-------", record.value());
}

总结

  • KafkaTemplate 发布消息
  • KafkaListener 监听消息

整体来说 , 还是和之前的 Rabbit 使用一致 , 具体的内部使用 , 后续源码中再详细看

三 . 基础知识点

基础成员参考消息队列文档.

四 . 工具源码一览

我们围绕 KafkaTemplateKafkaListener 2个类进行分析.

4.1 KafkaTemplate

发送的起点

可以看到 , 其中是通过 Future 获取返回结果

1
2
3
4
5
6
7
8
9
java复制代码protected ListenableFuture<SendResult<K, V>> doSend(final ProducerRecord<K, V> producerRecord) {
final Producer<K, V> producer = getTheProducer();
final SettableListenableFuture<SendResult<K, V>> future = new SettableListenableFuture<>();
producer.send(producerRecord, buildCallback(producerRecord, producer, future));
if (this.autoFlush) {
flush();
}
return future;
}

消息的创建

主要逻辑还是通过 org.apache.kafka.clients.producer.Producer 对象来完成发送 , 这是一个 Kafka 原生包 , 属于kafka-client .

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
java复制代码// 第一步 : 消息的构建
C0- KafkaProducer
M_01- send(ProducerRecord<K, V> record, Callback callback)
- 首先调用 ProducerInterceptors 构建一个 ProducerRecord -> PS:M_01_01
?- 注意 , 这里主要是通过拦截器添加额外的功能 , 会内部进行一个 For 循环
|- For 许纳湖List<ProducerInterceptor<K, V>>
- 调用对应的 interceptor.onSend(interceptRecord) , 替换之前的 ProducerInterceptors
- 如果集合为空 , 则直接返回正确的
- 调用 doSend 主流程 -> M_02
M_02- doSend
1- 先判断 Sender 对象是否存在且运行 , 否则抛出异常 -> PS:M_02_02
2- 调用 waitOnMetadata , 判断集群元数据可用
3- 构建 Cluster 对象
4- 构建序列化 serializedKey , serializedValue
5- 生成 partition -> PS:M_02_03
6- 生成最终发送对象 TopicPartition
7- 构建 Header 和 设置 readOnly 属性
8- 确定消息size
9- 构建 callback 拦截器 -> PS:M_02_05
10- 发送消息 , 获得一个 RecordAccumulator.RecordAppendResult 用于异步获取结果
?- 其中包含一个 future 对象
?- RecordAccumulator append 发起发送


// M_02 代码
private Future<RecordMetadata> doSend(ProducerRecord<K, V> record, Callback callback) {
TopicPartition tp = null;
try {
throwIfProducerClosed();
// 首先,确保主题的元数据可用
ClusterAndWaitTime clusterAndWaitTime;
try {
// 内部判断逻辑 :
// 构建元数据 , 发起 metadata.fetch() ,判断超时时间
clusterAndWaitTime = waitOnMetadata(record.topic(), record.partition(), maxBlockTimeMs);
} catch (KafkaException e) {
if (metadata.isClosed())
throw new KafkaException....;
throw e;
}
long remainingWaitMs = Math.max(0, maxBlockTimeMs - clusterAndWaitTime.waitedOnMetadataMs);
Cluster cluster = clusterAndWaitTime.cluster;
byte[] serializedKey;
try {
serializedKey = keySerializer.serialize(record.topic(), record.headers(), record.key());
} catch (ClassCastException cce) {
throw ....;
}
byte[] serializedValue;
try {
serializedValue = valueSerializer.serialize(record.topic(), record.headers(), record.value());
} catch (ClassCastException cce) {
throw ....;
}
int partition = partition(record, serializedKey, serializedValue, cluster);
tp = new TopicPartition(record.topic(), partition);

setReadOnly(record.headers());
Header[] headers = record.headers().toArray();

int serializedSize = AbstractRecords.estimateSizeInBytesUpperBound(apiVersions.maxUsableProduceMagic(),
compressionType, serializedKey, serializedValue, headers);
ensureValidRecordSize(serializedSize);
long timestamp = record.timestamp() == null ? time.milliseconds() : record.timestamp();
// 生产者回调将确保调用'回调'和拦截回调
Callback interceptCallback = new InterceptorCallback<>(callback, this.interceptors, tp);
// 事务管理
if (transactionManager != null && transactionManager.isTransactional())
transactionManager.maybeAddPartitionToTransaction(tp);

// 发送消息且返回一个 RecordAppendResult 对象
RecordAccumulator.RecordAppendResult result = accumulator.append(tp, timestamp, serializedKey,
serializedValue, headers, interceptCallback, remainingWaitMs);
if (result.batchIsFull || result.newBatchCreated) {
this.sender.wakeup();
}
return result.future;
// handling exceptions and record the errors;
// for API exceptions return them in the future,
// for other exceptions throw directly
} catch (Exception e) {
// ... 省略异常处理 , 主要有以下操作
// ApiException / InterruptedException / BufferExhaustedException / KafkaException / Exception
// this.errors.record();
// this.interceptors.onSendError(record, tp, e);
}
}


// Step 2 : 消息的发送 -- 见后文

PS:M_01_01 ProducerRecord 对象的作用

作用: 每一个 ProducerRecord 都是一个 消息 , 该对象中包含 topic ,partition , headers 用于映射发送的逻辑

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
java复制代码public class ProducerRecord<K, V> {

// Topic 类型
private final String topic;

// 如果指定了一个有效的分区号,该分区将在发送记录时使用
// 如果没有分区 , 但指定了一个键,则使用该键的哈希值来选择分区
// 如果键和分区都没有 , 则分区将以循环方式分配
private final Integer partition;
private final Headers headers;

// 监听的 Id
private final K key;
// 发送的消息体
private final V value;

// 如果用户没有提供时间戳,生产者将记录当前时间。
// Kafka最终使用的时间戳取决于为 Topic 配置的时间戳类型。
private final Long timestamp;
//.................
}

image.png

PS:M_02_02 Sender 对象

作用: 核心发送对象 , 用于消息发送 , 用于集群处理

所属包: org.apache.kafka.clients.producer.internals

1
java复制代码TODO

PS:M_02_03 生成 partition 核心逻辑

1
2
3
4
5
6
7
java复制代码    private int partition(ProducerRecord<K, V> record, byte[] serializedKey, byte[] serializedValue, Cluster cluster) {
Integer partition = record.partition();
return partition != null ?
partition :
partitioner.partition(
record.topic(), record.key(), serializedKey, record.value(), serializedValue, cluster);
}

PS:M_02_03 InterceptorCallback 作用

消息的发送

整体来说消息的发送存在3条线 :

  • 线路一 : 项目启动时 , KafkaMessageListenerContainer run 开始循环
    • 调用 pollSelectionKeys 完成 channel write 流程
  • 线路二 : 设置 send
    • Sender # runOnce sendProducerData(currentTimeMs) 中设置 一个 ClientRequest
  • 线路三 : 获取 Send 发送
    • Sender # runOnce client.poll(pollTimeout, currentTimeMs) 中 发起 poll 执行
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
java复制代码
// PS : 消息的产生和发送是异步的 , 消息的发送主要依赖于 Sender

public class Sender implements Runnable{.........}

// Step 1 : Thread run 方法一览
C2- Sender
M2_01- runOnce()
- client.poll(pollTimeout, currentTimeMs) 发送消息
M2_02- sendProduceRequest : 核心发送方法
- 构建了一个 RequestCompletionHandler . 其中有个 onComplete 用于后续回调
- 构建一个 ClientRequest
- 调用 ClientRequest.send 发送消息
?-NetworkClient.doSend -> PS:M2_02_01


// PS:M2_02_01 底层一览
public void send(Send send) {
String connectionId = send.destination();
KafkaChannel channel = openOrClosingChannelOrFail(connectionId);
if (closingChannels.containsKey(connectionId)) {
// 确保通知通过'断开',离开通道在状态中关闭被触发
this.failedSends.add(connectionId);
} else {
try {
// 此处使用 KafkaChannel 进行处理
channel.setSend(send);
} catch (Exception e) {
// 更新状态以保持一致性,通道关闭后将被丢弃
channel.state(ChannelState.FAILED_SEND);
// 确保在下一次轮询中处理' failedsent '时通过' disconnected '通知
this.failedSends.add(connectionId);
close(channel, CloseMode.DISCARD_NO_NOTIFY);
if (!(e instanceof CancelledKeyException)) {
throw e;
}
}
}
}

// 隐藏方法 Selector , 在上文中 M2_02 中设置了 send , 此处进行处理
C- Selector
M- pollSelectionKeys
- send = channel.write()

消息的回调操作

回调是基于 Sender # handleProduceResponse 发起 , 该方法在 sendProduceRequest 方法中设置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
java复制代码
// Start : 请求的返回处理
C- Sender # handleProduceResponse
// 核心代码一览
ProduceResponse produceResponse = (ProduceResponse) response.responseBody();
for (Map.Entry<TopicPartition, ProduceResponse.PartitionResponse> entry : produceResponse.responses().entrySet()) {
TopicPartition tp = entry.getKey();
ProduceResponse.PartitionResponse partResp = entry.getValue();
ProducerBatch batch = batches.get(tp);
completeBatch(batch, partResp, correlationId, now, receivedTimeMs + produceResponse.throttleTimeMs());
}
this.sensors.recordLatency(response.destination(), response.requestLatencyMs());

// End : template 回调
// 在 KafkaTemplate.doSend 中设置
C-KafkaTemplate
M- doSend 设置
- producer.send(producerRecord, buildCallback(producerRecord, producer, future));
M- buildCallback
-

// PS : 请求到 Callback 的流转
C- Sender # handleProduceResponse
C- Sender # completeBatch
C- ProduceBatch # done
C- ProduceBatch # completeFutureAndFireCallbacks
C- InterceptorCallback # onCompletion
// 此处就会调用 End : template 回调

Transaction 的管理

TransactionSynchronizationManager

4.2 KafkaListener

kafka Listener 的核心是 KafkaListenerAnnotationBeanPostProcessor , 初始化的方式是基于 BeanPostProcessor 类的postProcessAfterInitialization 来完成

Kafka 的消费过程有以下几个路线 :

  • container 的注册
  • container 的循环监听
  • 消息的消费

KafkaMQ.png

container 注册

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
java复制代码C3- KafkaListenerAnnotationBeanPostProcessor
M3_01- postProcessAfterInitialization
?- 这里来自于Bean 加载的 applyBeanPostProcessorsAfterInitialization 方法 , 处理Bean 的前置扩展
- 获取 Class 级别的 KafkaListener
- 通过 MethodIntrospector 反射获取各方法的 KafkaListener
- 注意 , 这里是一个 set 集合 , 意味着他允许一个方法上面编注多个 @KafkaListener -> PS:M3_01_01
FOR- 循环处理 KafkaListener
- processKafkaListener 循环处理
M3_02- processKafkaListener
- 判断是否为代理方法
- 创建一个 MethodKafkaListenerEndpoint 节点 , 将当前方法设置到 MethodKafkaListenerEndpoint 节点中
- 为节点设置相关的属性
?- Bean / MessageHandlerMethodFactory / Id / GroupId / TopicPartitions
?- Topics / TopicPattern / ClientIdPrefix
- 调用 KafkaListenerEndpointRegistrar 注册当前节点
M3_03- Method checkProxy(Method methodArg, Object bean)
- 获取当前 method
- 返回由AOP代理代理的接口
- 从代理接口中获取对应的 method
M3_04- processListener
- 继续构建 MethodKafkaListenerEndpoint
?- Bean / MessageHandlerMethodFactory / Id / GroupId / TopicPartitions / Topics /TopicPattern / ClientIdPrefix
?- 根据条件处理 concurrency / autoStartup / Group /
- 通过 BeanFactory 构建 KafkaListenerContainerFactory
- 为 MethodKafkaListenerEndpoint 设置属性
?- beanFactory / ErrorHandler
- 调用 registrar.registerEndpoint 注册当前 endpoint
?- 该对象会在 afterSingletonsInstantiated 中被消费
M3_05- afterSingletonsInstantiated
- 为 registrar 注册各种属性
?- BeanFactory / EndpointRegistry / ContainerFactoryBeanName
- 调用 registerAllEndpoints 处理 registrar -> M4_06
M3_07- resolveContainerFactory
- 返回一个 KafkaListenerContainerFactory


// M3_01 核心代码
public Object postProcessAfterInitialization(final Object bean, final String beanName) throws BeansException {
if (!this.nonAnnotatedClasses.contains(bean.getClass())) {
// 获取 Class 级别的 KafkaListener
Class<?> targetClass = AopUtils.getTargetClass(bean);
Collection<KafkaListener> classLevelListeners = findListenerAnnotations(targetClass);
final boolean hasClassLevelListeners = classLevelListeners.size() > 0;
final List<Method> multiMethods = new ArrayList<>();
// 获取方法级别注解
Map<Method, Set<KafkaListener>> annotatedMethods = MethodIntrospector.selectMethods(targetClass,
(MethodIntrospector.MetadataLookup<Set<KafkaListener>>) method -> {
Set<KafkaListener> listenerMethods = findListenerAnnotations(method);
return (!listenerMethods.isEmpty() ? listenerMethods : null);
});
if (hasClassLevelListeners) {
Set<Method> methodsWithHandler = MethodIntrospector.selectMethods(targetClass,
(ReflectionUtils.MethodFilter) method ->
AnnotationUtils.findAnnotation(method, KafkaHandler.class) != null);
multiMethods.addAll(methodsWithHandler);
}
if (annotatedMethods.isEmpty()) {
this.nonAnnotatedClasses.add(bean.getClass());
}
else {
// Non-empty set of methods
for (Map.Entry<Method, Set<KafkaListener>> entry : annotatedMethods.entrySet()) {
Method method = entry.getKey();
for (KafkaListener listener : entry.getValue()) {
// 核心处理逻辑
processKafkaListener(listener, method, bean, beanName);
}
}
}
if (hasClassLevelListeners) {
processMultiMethodListeners(classLevelListeners, multiMethods, bean, beanName);
}
}
return bean;
}


// M3_03 核心伪代码 :
private Method checkProxy(Method methodArg, Object bean) {
Method method = methodArg;
if (AopUtils.isJdkDynamicProxy(bean)) {
method = bean.getClass().getMethod(method.getName(), method.getParameterTypes());
Class<?>[] proxiedInterfaces = ((Advised) bean).getProxiedInterfaces();
for (Class<? iface : proxiedInterfaces) {
method = iface.getMethod(method.getName(), method.getParameterTypes());
break;
}
}
return method;
}

// 手动注入 Listener
// 无意中在代码中发现了一个 KafkaListenerConfigurer 接口 , 该接口可以用于手动注入 Listener
I- KafkaListenerConfigurer
M- configureKafkaListeners(KafkaListenerEndpointRegistrar registrar)
- 构建了一个 MessageListenerContainer


// M4_01 的触发流程 : afterPropertiesSet , 再往上为 DefaultListableBeanFactory # afterSingletonsInstantiated

C4- KafkaListenerEndpointRegistrar
F4_01- List<KafkaListenerEndpointDescriptor> endpointDescriptors
F4_02- Map<String, MessageListenerContainer> listenerContainers
M4_01- registerEndpoint
- 构建一个 KafkaListenerEndpointDescriptor
- 如果startImmediately 初始化就启动 , synchronized 上锁注册到容器中
- 否则先添加到 List<KafkaListenerEndpointDescriptor> 集合中 -> M4_02
M4_02- registerAllEndpoints
- synchronized 上锁注册到容器中
- 注册容器 -> M4_04
M4_03- resolveContainerFactory : 返回一个 KafkaListenerContainerFactory
M4_04- registerListenerContainer
- 构建一个 MessageListenerContainer , 并且加入 Map<String, MessageListenerContainer>
?- createListenerContainer 时会注册 Method
- 如果包含 Group , 则加入List<MessageListenerContainer> 集合
?- 该集合会在 start(继承于Lifecycle)方法中调用 -> M4_05
M4_05- startIfNecessary
- 调用 MessageListenerContainer start 方法
M4_06- createListenerContainer
-
M4_07- start
FOR- getListenerContainers 获取所有的 MessageListenerContainer , 依次调用 startIfNecessary
?- M4_05


C05- MessageListenerContainer
M5_01- start()
-

C06- AbstractMessageListenerContainer
M6_01- doStart()
- 最终调用 KafkaMessageListenerContainer
M6_02- run()
- 这里通过一个 While 方法进行循环处理 ->
M6_03- pollAndInvoke()
- invokeListener(records) 反射处理

C07- KafkaMessageListenerContainer
M7_01- doStart()
- 准备 ContainerProperties
- 校验 AckMode 模式
- 构建 GenericMessageListener
- 构建 ListenerConsumer
- 修改容器状态
?- 这里和后面的循环监听形成前后关联
- 这里还用了一个 CountDownLatch 来等待执行
- 开始线程的执行 , 这里会执行 listenConsumer 的 run 方法 -> PS:M7_01_02
M7_02- doInvokeOnMessage()
- 调用对应 MessageListener 的 onMessage , 最终调用 M8_1
M7_03- onMessage()
-

// M7_01 代码
protected void doStart() {
if (isRunning()) {
return;
}
if (this.clientIdSuffix == null) { // stand-alone container
checkTopics();
}
// 准备 ContainerProperties
ContainerProperties containerProperties = getContainerProperties();
checkAckMode(containerProperties);

Object messageListener = containerProperties.getMessageListener();
if (containerProperties.getConsumerTaskExecutor() == null) {
SimpleAsyncTaskExecutor consumerExecutor = new SimpleAsyncTaskExecutor(
(getBeanName() == null ? "" : getBeanName()) + "-C-");
containerProperties.setConsumerTaskExecutor(consumerExecutor);
}
GenericMessageListener<?> listener = (GenericMessageListener<?>) messageListener;
ListenerType listenerType = determineListenerType(listener);
this.listenerConsumer = new ListenerConsumer(listener, listenerType);
// 修改容器状态
setRunning(true);
this.startLatch = new CountDownLatch(1);
// 此处开始现场的执行
this.listenerConsumerFuture = containerProperties.getConsumerTaskExecutor()
.submitListenable(this.listenerConsumer);
try {
if (!this.startLatch.await(containerProperties.getConsumerStartTimout().toMillis(), TimeUnit.MILLISECONDS)) {
publishConsumerFailedToStart();
}
}catch (@SuppressWarnings(UNUSED) InterruptedException e) {
Thread.currentThread().interrupt();
}
}


C8- ConcurrentMessageListenerContainer
M8_01- doStart()
- 校验 topic
- 启动所有的 KafkaMessageListenerContainer



C10- MethodKafkaListenerEndpoint
M10_01- createMessageListener
- messageListener.setHandlerMethod 注册 Method

PS:M3_01_01 多KafkaListener 注解

1
2
3
4
5
6
7
8
9
10
11
12
13
java复制代码kafkaTemplate.send("start", "one", "are you ok one?" + "----");
kafkaTemplate.send("topic1", "two", "are you ok two?" + "----");


@KafkaListener(id = "one", topics = "start", clientIdPrefix = "myClientId")
@KafkaListener(id = "two", topics = "start", clientIdPrefix = "myClientId")
public void listener0(ConsumerRecord<?, ?> record) {
logger.info("------> this is in listerner 0:{}<-------", record.value());
}

// 结果
2021-05-04 22:04:35.094 INFO 20012 --- [ two-0-C-1] c.g.k.demo.service.KafkaConsumerService : ------> this is in listerner 0:are you ok two?----<-------
2021-05-04 22:04:35.098 INFO 20012 --- [ one-0-C-1] c.g.k.demo.service.KafkaConsumerService : ------> this is in listerner 0:are you ok one?----<-------

image.png

PS:M7_01_02 执行线程

image.png

container 的循环监听

主要逻辑在 KafkaMessageListenerContainer 内部类 ListenerConsumer 中

  • C- ConsumerListener # run
  • C- KafkaMessageListenerContainer # pollAndInvoke
  • C- KafkaMessageListenerContainer # invokeOnMessage
  • C- KafkaMessageListenerContainer # doInvokeOnMessage
    • 该类中调用对应的 MessagingMessageListenerAdapter 进行最终执行
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
java复制代码
// Step 1 : 起点
C- KafkaMessageListenerContainer.ListenerConsumer
public void run() {
publishConsumerStartingEvent();
this.consumerThread = Thread.currentThread();
if (this.consumerSeekAwareListener != null) {
this.consumerSeekAwareListener.registerSeekCallback(this);
}
KafkaUtils.setConsumerGroupId(this.consumerGroupId);
this.count = 0;
this.last = System.currentTimeMillis();
initAssignedPartitions();
publishConsumerStartedEvent();
while (isRunning()) {
try {
// 执行 invoke 逻辑
pollAndInvoke();
}catch (@SuppressWarnings(UNUSED) WakeupException e) {
// ... 省略异常处理
}
}
wrapUp();
}

// Step 2 : 循环中 poll 对象
C- ListenerConsumer
M- pollAndInvoke() : 核心方法 , poll 获取并且映射到相关方法
- ConsumerRecords<K, V> records = doPoll()
|- this.consumer.poll(this.pollTimeout)
- invokeListener(records)
M- doPoll


// Step 3 :在 doInvokeWithRecords 中 , 会对消息进行迭代处理
Iterator<ConsumerRecord<K, V>> iterator = records.iterator();
while (iterator.hasNext()) {
doInvokeRecordListener(record, null, iterator)
}


// Step 4 : 在 doInvokeOnMessage 中 , 调用对应的 ListenerAdapter 完成消息处理
// PS : 该 在 KafkaMessageListenerContainer 中通过构造器生成
C- KafkaMessageListenerContainer.ListenerConsumer

// 通过四种监听类型 , 选择处理的 listener
// ACKNOWLEDGING_CONSUMER_AWARE,CONSUMER_AWARE,ACKNOWLEDGING,SIMPLE

最终执行处理

最终会从 MessagingMessageListenerAdapter 中获取 执行的 Handler ,该 Handler 在 KafkaListenerEndpointRegistrar 是注入

1
2
3
4
5
6
7
8
9
java复制代码 // 最终执行类

C08- RecordMessagingMessageListenerAdapter
M8_1- onMessage
- 获取对应的 Method . 反射调用
?- 该 HandlerAdapter 在 MethodKafkaListenerEndpoint # createMessageListener 是处理

C10- RecordMessagingMessageListenerAdapter
M10_1- onMessage

五 . Kafka 要点深入

TODO : 后续文章一个个节点分析 Kafka 如果实现相关功能

FAQ

java.nio.file.FileSystemException -> 另一个程序正在使用此文件,进程无法访问

问题详情 : D:\tmp\kafka-logs\topic_1-0\00000000000000000000.timeindex.deleted: 另一个程序正在使用此文件,进程无法访问

解决方案 : 手动删除\kafka-logs里的日志文件重启kafka

总结

这篇文章主要说了2部分 , 发送和监听 , 发送主要基于 client.poll 发送 , 监听核心主要是扫描和 while 循环 consumer poll 拉取 .

了解这一段流程后 ,后续就可以开始详细看看其在集群等更多功能下 , 如何操作和定制

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

0%