总文档 :文章目录
Github : github.com/black-ant
一 . 前言
之前对RabbitMQ 进行了简单的分析 , 这一篇来过一下 Kafka 的相关操作.
二 . 基础使用
kafka 是一个很常用的高性能消息队列 , 先看一下基础的使用
Maven 核心依赖
1 | xml复制代码<dependency> |
配置信息
1 | properties复制代码spring.kafka.bootstrap-servers=127.0.0.1:9092 |
生产消息
1 | java复制代码 @Resource |
消费消息
1 | java复制代码@KafkaListener(id = "one", topics = "start", clientIdPrefix = "myClientId") |
总结
- KafkaTemplate 发布消息
- KafkaListener 监听消息
整体来说 , 还是和之前的 Rabbit 使用一致 , 具体的内部使用 , 后续源码中再详细看
三 . 基础知识点
基础成员参考消息队列文档.
四 . 工具源码一览
我们围绕 KafkaTemplate 和KafkaListener 2个类进行分析.
4.1 KafkaTemplate
发送的起点
可以看到 , 其中是通过 Future 获取返回结果
1 | java复制代码protected ListenableFuture<SendResult<K, V>> doSend(final ProducerRecord<K, V> producerRecord) { |
消息的创建
主要逻辑还是通过 org.apache.kafka.clients.producer.Producer 对象来完成发送 , 这是一个 Kafka 原生包 , 属于kafka-client .
1 | java复制代码// 第一步 : 消息的构建 |
PS:M_01_01 ProducerRecord 对象的作用
作用: 每一个 ProducerRecord 都是一个 消息 , 该对象中包含 topic ,partition , headers 用于映射发送的逻辑
1 | java复制代码public class ProducerRecord<K, V> { |
PS:M_02_02 Sender 对象
作用: 核心发送对象 , 用于消息发送 , 用于集群处理
所属包: org.apache.kafka.clients.producer.internals
1 | java复制代码TODO |
PS:M_02_03 生成 partition 核心逻辑
1 | java复制代码 private int partition(ProducerRecord<K, V> record, byte[] serializedKey, byte[] serializedValue, Cluster cluster) { |
PS:M_02_03 InterceptorCallback 作用
消息的发送
整体来说消息的发送存在3条线 :
- 线路一 : 项目启动时 , KafkaMessageListenerContainer run 开始循环
- 调用 pollSelectionKeys 完成 channel write 流程
- 线路二 : 设置 send
- Sender # runOnce sendProducerData(currentTimeMs) 中设置 一个 ClientRequest
- 线路三 : 获取 Send 发送
- Sender # runOnce client.poll(pollTimeout, currentTimeMs) 中 发起 poll 执行
1 | java复制代码 |
消息的回调操作
回调是基于 Sender # handleProduceResponse 发起 , 该方法在 sendProduceRequest 方法中设置
1 | java复制代码 |
Transaction 的管理
TransactionSynchronizationManager
4.2 KafkaListener
kafka Listener 的核心是 KafkaListenerAnnotationBeanPostProcessor , 初始化的方式是基于 BeanPostProcessor 类的postProcessAfterInitialization 来完成
Kafka 的消费过程有以下几个路线 :
- container 的注册
- container 的循环监听
- 消息的消费
container 注册
1 | java复制代码C3- KafkaListenerAnnotationBeanPostProcessor |
PS:M3_01_01 多KafkaListener 注解
1 | java复制代码kafkaTemplate.send("start", "one", "are you ok one?" + "----"); |
PS:M7_01_02 执行线程
container 的循环监听
主要逻辑在 KafkaMessageListenerContainer 内部类 ListenerConsumer 中
- C- ConsumerListener # run
- C- KafkaMessageListenerContainer # pollAndInvoke
- C- KafkaMessageListenerContainer # invokeOnMessage
- C- KafkaMessageListenerContainer # doInvokeOnMessage
- 该类中调用对应的 MessagingMessageListenerAdapter 进行最终执行
1 | java复制代码 |
最终执行处理
最终会从 MessagingMessageListenerAdapter 中获取 执行的 Handler ,该 Handler 在 KafkaListenerEndpointRegistrar 是注入
1 | java复制代码 // 最终执行类 |
五 . Kafka 要点深入
TODO : 后续文章一个个节点分析 Kafka 如果实现相关功能
FAQ
java.nio.file.FileSystemException -> 另一个程序正在使用此文件,进程无法访问
问题详情 : D:\tmp\kafka-logs\topic_1-0\00000000000000000000.timeindex.deleted: 另一个程序正在使用此文件,进程无法访问
解决方案 : 手动删除\kafka-logs里的日志文件重启kafka
总结
这篇文章主要说了2部分 , 发送和监听 , 发送主要基于 client.poll 发送 , 监听核心主要是扫描和 while 循环 consumer poll 拉取 .
了解这一段流程后 ,后续就可以开始详细看看其在集群等更多功能下 , 如何操作和定制
本文转载自: 掘金