这是我参与11月更文挑战的第17天,活动详情查看:2021最后一次更文挑战
在后端开发中经常会涉及到类似「拦截器」或者「过滤器」的组件,以 Spring Interceptor 为例,它的工作原理就是在每一次请求之前及之后处理一部分逻辑,这样可以让应用程序在不影响业务逻辑的前提下,实现一个可插拔的事件处理链。
Kafka 拦截器介绍
在 Kafka 中,也有相似的特性,也就是 Kafka 的拦截器。
Kafka 中的拦截器有两种,分别是生产者拦截器和消费者拦截器。生产者拦截器可以在消息发送之前以及消息成功提交之后插入逻辑;而消费者拦截器可以在消息消费之前以及提交位移后插入逻辑。
使用方法
在 Kafka 的生产者端和消费者端,都有一个配置项 interceptor.classes
用来配置拦截器,从配置名称可以看出,拦截器是通过一个类来实现的,并且可以配置多个拦截器。注意,这里要配置拦截器类的全限定名。
之前写到生产者分区策略的时候写过,可以通过实现一个 Kafka 提供的接口及其方法来自定义分区策略,拦截器的实现也一样,通过实现 org.apache.kafka.clients.producer.ProducerInterceptor
或者 org.apache.kafka.clients.consumer.ConsumerInterceptor
接口及其方法,创建拦截器类,并配置到 interceptor.classes
即可。
在 ProducerInterceptor
接口中有两个方法:
onSend
会在消息发送之前调用,再次可以加入修改消息内容的逻辑。onAcknowledgement
会在消息提交成功或者发送失败的时候调用,如果发送消息使用了带有callback
参数的方法,那么此调用在callback
之前。
需要注意的是,以上的两个方法并不一定是在同一线程中调用,且要注意如果这里的逻辑过复杂,会影响到生产者端的消息处理效率。
在 ConsumerInterceptor
接口中同样有两个方法:
onConsume
在消费者端正式处理消息之前被调用。onCommit
在提交位移之后调用。
本文转载自: 掘金