springboot集成本地kafka

话不多说,直接开始。

一、本地安装kafka并运行。

首先打开kafka官方下载地址kafka.apache.org/downloads ,下载kafka到本地。
版本选择的话,一般比springboot版本大一版即可,比如我使用的是springboot2.2.2版本,下载2.3.1版本,使用spring-kafka版本为2.3.4。(不要下载带src的源码压缩文件)

下载成功解压,在cmd进入kafka所在文件位置,比如我的kafka路径为D:\devtools\kafka_2.12-2.3.1。

image.png
输入命令:.\bin\windows\zookeeper-server-start.bat .\config\zookeeper.properties

zookeeper运行成功,同理,进入相同路径,输入命令:.\bin\windows\kafka-server-start.bat .\config\server.properties

此时,kafka服务启动成功。

二、springboot集成kafka。

1. pom依赖

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
xml复制代码<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
</dependencies>

我这里spring-kafka默认是 2.3.4release。

springboot版本为2.2.2release

image.png

2. 配置及代码

配置有两种配置方法,一种是springboot中application配置文件配置,另一种是java代码配置。选择其一即可。

(1). 配置文件配置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
yaml复制代码# 应用端口号
server:
port: 28099
# Kafka配置
spring:
kafka:
# 指定kafka 代理地址,可以多个
bootstrap-servers: 127.0.0.1:9092

producer:
retries: 0
# 每次批量发送消息的数量
batch-size: 16384
# 缓存容量
buffer-memory: 33554432
# 指定消息key和消息体的编解码方式
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer
consumer:
# 指定默认消费者group id
group-id: consumer-tutorial
auto-commit-interval: 100
auto-offset-reset: earliest
enable-auto-commit: true
# 指定消息key和消息体的编解码方式
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
# 指定listener 容器中的线程数,用于提高并发量
listener:
missing-topics-fatal: false
concurrency: 3

(2). java配置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
typescript复制代码package com.xuegao.kafkaproject.config;

import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.config.KafkaListenerContainerFactory;
import org.springframework.kafka.config.TopicBuilder;
import org.springframework.kafka.core.*;
import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;
import org.springframework.stereotype.Component;

import java.util.HashMap;
import java.util.Map;

/**
* @author xg
* @Date: 2021/10/21 19:22
*/
@Configuration
@EnableKafka
public class KafkaConfig {

@Bean
ConcurrentKafkaListenerContainerFactory<Integer, String>
kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<Integer, String> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
return factory;
}

@Bean
public ConsumerFactory<Integer, String> consumerFactory() {
return new DefaultKafkaConsumerFactory<>(consumerConfigs());
}

@Bean
public Map<String, Object> consumerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "default-topic-group");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
return props;
}


@Bean
public ProducerFactory<String, Object> producerFactory() {
return new DefaultKafkaProducerFactory<>(producerConfigs());
}

@Bean
public Map<String, Object> producerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
return props;
}

@Bean
public KafkaTemplate<String, Object> kafkaTemplate() {
return new KafkaTemplate<String, Object>(producerFactory());
}


@Bean
public NewTopic initialSendMsgTopic() {
return new NewTopic("kafka.topic",8, (short) 2 );
}

@Bean
public NewTopic topic1() {
return TopicBuilder.name("my-topic")
.partitions(10)
.replicas(3)
.compact()
.build();
}
}

注:使用java配置,使用的topic需要在配置文件里注册,如上,否则运行会报错,而配置文件中有missing-topics-fatal: false,使用未注册为bean的topic不会报错。

生产者和消费者方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
typescript复制代码@Resource
private KafkaTemplate<String, Object> kafkaTemplate;

/**
* kafka 发送消息
*
* @param obj 消息对象
*/
public void send(T obj) {
String jsonObj = JSON.toJSONString(obj);
logger.info("------------ message = {}", jsonObj);

//发送消息
ListenableFuture<SendResult<String, Object>> future = kafkaTemplate.send("kafka.topic", jsonObj);
future.addCallback(new ListenableFutureCallback<SendResult<String, Object>>() {
@Override
public void onFailure(Throwable throwable) {
logger.info("Produce: The message failed to be sent:" + throwable.getMessage());
}

@Override
public void onSuccess(SendResult<String, Object> stringObjectSendResult) {
//TODO 业务处理
logger.info("Produce: The message was sent successfully:");
logger.info("Produce: _+_+_+_+_+_+_+ result: " + stringObjectSendResult.toString());
}
});
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
less复制代码/**
* 监听kafka.tut 的topic,不做其他业务
*
* @param record
* @param topic topic
*/
@KafkaListener(id = "tutest", topics = "kafka.topic")
public void listen(ConsumerRecord<?, ?> record, @Header(KafkaHeaders.RECEIVED_TOPIC) String topic) {
Optional<?> kafkaMessage = Optional.ofNullable(record.value());

if (kafkaMessage.isPresent()) {
Object message = kafkaMessage.get();

logger.info("Receive: +++++++++++++++ Topic:" + topic);
logger.info("Receive: +++++++++++++++ Record:" + record);
logger.info("Receive: +++++++++++++++ Message:" + message);
}
}

最后,在写个测试controller,使用网页访问接口调用生产者方法,访问成功之后,控制台会出现消费者打印的日志。

祝你好运,有异常百度即可。

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

0%