你可能不怎么知道的 Kafka 拦截器

这是我参与11月更文挑战的第17天,活动详情查看:2021最后一次更文挑战

在后端开发中经常会涉及到类似「拦截器」或者「过滤器」的组件,以 Spring Interceptor 为例,它的工作原理就是在每一次请求之前及之后处理一部分逻辑,这样可以让应用程序在不影响业务逻辑的前提下,实现一个可插拔的事件处理链。

Kafka 拦截器介绍

在 Kafka 中,也有相似的特性,也就是 Kafka 的拦截器。

Kafka 中的拦截器有两种,分别是生产者拦截器和消费者拦截器。生产者拦截器可以在消息发送之前以及消息成功提交之后插入逻辑;而消费者拦截器可以在消息消费之前以及提交位移后插入逻辑。

使用方法

在 Kafka 的生产者端和消费者端,都有一个配置项 interceptor.classes 用来配置拦截器,从配置名称可以看出,拦截器是通过一个类来实现的,并且可以配置多个拦截器。注意,这里要配置拦截器类的全限定名。

之前写到生产者分区策略的时候写过,可以通过实现一个 Kafka 提供的接口及其方法来自定义分区策略,拦截器的实现也一样,通过实现 org.apache.kafka.clients.producer.ProducerInterceptor 或者 org.apache.kafka.clients.consumer.ConsumerInterceptor 接口及其方法,创建拦截器类,并配置到 interceptor.classes 即可。

ProducerInterceptor 接口中有两个方法:

  • onSend 会在消息发送之前调用,再次可以加入修改消息内容的逻辑。
  • onAcknowledgement 会在消息提交成功或者发送失败的时候调用,如果发送消息使用了带有 callback 参数的方法,那么此调用在 callback 之前。

需要注意的是,以上的两个方法并不一定是在同一线程中调用,且要注意如果这里的逻辑过复杂,会影响到生产者端的消息处理效率。

ConsumerInterceptor 接口中同样有两个方法:

  • onConsume 在消费者端正式处理消息之前被调用。
  • onCommit 在提交位移之后调用。

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

0%