『十倍程序员』SpringBoot接入kafka最简教程

「这是我参与11月更文挑战的第24天,活动详情查看:2021最后一次更文挑战」。

点赞再看,养成习惯👏👏

前言

Hello 大家好,我是l拉不拉米,今天的『十倍程序员』系列给大家分享SpringBoot项目如何快速接入kafka

上集回顾

上集《『十倍程序员』Docker部署kafka+zookeeper》中,我们已经学会了通过docker-compose将kafka环境搭建起来,现在我们要通过Springboot的项目接入kafka,实现消息的生产和消费。

Springboot接入kafka

1、创建Springboot工程

image.png

2、Pom文件引入依赖

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
xml复制代码<!-- 必须 -->
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>

<!-- 下面三个依赖包非必须,方便演示用 -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.75</version>
</dependency>

3、application.yml配置文件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
yml复制代码# 应用服务 WEB 访问端口
server:
port: 8080

# 应用名称
spring:
application:
name: kafka
kafka:
bootstrap-servers: 127.0.0.1:9092 # kafka集群信息,多个节点通过“,”分隔
producer: # 生产者配置
retries: 3 # 设置大于0的值,则客户端会将发送失败的记录重新发送
batch-size: 16384 # 16K
buffer-memory: 33554432 # 32M 缓冲区大小
acks: 1 # 应答级别:多少个分区副本备份完成时向生产者发送ack确认(可选0、1、all/-1)
# 指定消息key和消息体的编解码方式
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.springframework.kafka.support.serializer.JsonSerializer
consumer: # 消费者配置
group-id: testGroup # 消费者组
enable-auto-commit: false # 关闭自动提交
# 当kafka中没有初始offset或offset超出范围时将自动重置offset
# earliest:重置为分区中最小的offset;
# latest:重置为分区中最新的offset(消费分区中新产生的数据);
# none:只要有一个分区不存在已提交的offset,就抛出异常;
auto-offset-reset: latest
max-poll-records: 50 # 批量消费每次最多消费多少条消息
# 指定消息key和消息体的编解码方式
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
listener:
missing-topics-fatal: false # 消费端监听的topic不存在时,项目启动会报错(关掉)
type: batch # 批量消费

4、生产者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
java复制代码@Slf4j
@RestController
public class KafkaProducer {

private final KafkaTemplate<String, Object> kafkaTemplate;

@Autowired
public KafkaProducer(KafkaTemplate<String, Object> kafkaTemplate) {
this.kafkaTemplate = kafkaTemplate;
}

@PostMapping("/message")
public void sendMessage(@RequestBody User user) {
kafkaTemplate.send("user", user).addCallback(new ListenableFutureCallback<SendResult<String, Object>>() {
@Override
public void onFailure(Throwable throwable) {
log.info("sendMessage onFailure:{}", throwable.getMessage());
}

@Override
public void onSuccess(SendResult<String, Object> result) {
log.info("sendMessage onSuccess:{},{},{}", result.getRecordMetadata().topic(), result.getRecordMetadata().partition(), result.getRecordMetadata().offset());
}
});
}
}

5、消费者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
java复制代码@Slf4j
@Component
public class KafkaConsumer {

@KafkaListener(topics = {"user"})
public void userListener(ConsumerRecords<String, String> payload) {
if (payload.isEmpty()) {
return;
}

payload.partitions().forEach(tp -> {
List<ConsumerRecord<String, String>> tpRecords = payload.records(tp);
tpRecords.forEach(record -> {
final User user = JSON.parseObject(record.value(), User.class);
log.info("userListener -> user = {}", user);
});
});
}
}

6、Postman调用接口生产消息

image.png

查看日志

image.png

接入完毕。

总结

通过这两篇文章,我们发现通过docker容器化和Springboot的高效化,简单化接入kafka的能力,整个开发过程不到半个小时。对比传统方式需要大半天的时间,可以说实现了『十倍程序员』的水平。

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

0%