从零开始搭建Kafka+SpringBoot分布式消息系统

前言

由于kafka强依赖于zookeeper,所以需先搭建好zookeeper集群。由于zookeeper是由java编写的,需运行在jvm上,所以首先应具备java环境。
(ps:默认您的centos系统可联网,本教程就不教配置ip什么的了)
(ps2:没有wget的先装一下:yum install wget)
(ps3:人啊,就是要条理。东边放一点,西边放一点,过段时间就不知道自己装在哪里了。本教程所有下载均放在/usr/local目录下)
(ps4:kafka可能有内置zookeeper,感觉可以越过zookeeper教程,但是这里也配置出来了。我没试过)

文章首发公众号:Java架构师联盟,每日更新技术好文

一、配置jdk

因为oracle 公司不允许直接通过wget 下载官网上的jdk包。所以你直接wget以下地址下载下来的是一个只有5k的网页文件而已,并不是需要的jdk包。(垄断地位就是任性)。
(请通过java -version判断是否自带jdk,我的没带)

1、官网下载

下面是jdk8的官方下载地址:

1
ruby复制代码https://www.oracle.com/technetwork/java/javase/downloads/java-archive-javase8u211-later-5573849.html

从零开始搭建Kafka+SpringBoot分布式消息系统

2、上传解压

这里通过xftp上传到服务器指定位置:/usr/local

从零开始搭建Kafka+SpringBoot分布式消息系统

对压缩文件进行解压:

1
复制代码tar -zxvf jdk-8u221-linux-x64.tar.gz

对解压后的文件夹进行改名:

1
bash复制代码mv jdk1.8.0_221 jdk1.8

3、配置环境变量

1
2
3
4
5
bash复制代码vim /etc/profile
#java environment
export JAVA_HOME=/usr/local/jdk1.8
export CLASSPATH=.:${JAVA_HOME}/jre/lib/rt.jar:${JAVA_HOME}/lib/dt.jar:${JAVA_HOME}/lib/tools.jar
export PATH=$PATH:${JAVA_HOME}/bin

操作之后的界面如下:

从零开始搭建Kafka+SpringBoot分布式消息系统

运行命令使环境生效

1
bash复制代码source /etc/profile

从零开始搭建Kafka+SpringBoot分布式消息系统

二、搭建zookeeper集群

1、下载zookeeper

创建zookeeper目录,在该目录下进行下载:

1
bash复制代码mkdir /usr/local/zookeeper

这一步如果出现连接被拒绝时可多试几次,我就是第二次请求才成功的。

1
bash复制代码wget http://archive.apache.org/dist/zookeeper/zookeeper-3.4.6/zookeeper-3.4.6.tar.gz

从零开始搭建Kafka+SpringBoot分布式消息系统

等待下载完成之后解压:

1
复制代码tar -zxvf zookeeper-3.4.6.tar.gz

从零开始搭建Kafka+SpringBoot分布式消息系统

重命名为zookeeper1

1
2
3
bash复制代码mv zookeeper-3.4.6 zookeeper1
cp -r zookeeper1 zookeeper2
cp -r zookeeper1 zookeeper3

2、创建data、logs文件夹

在zookeeper1目录下创建

从零开始搭建Kafka+SpringBoot分布式消息系统

在data目录下新建myid文件。内容为1

从零开始搭建Kafka+SpringBoot分布式消息系统

从零开始搭建Kafka+SpringBoot分布式消息系统

3、修改zoo.cfg文件

1
2
bash复制代码cd /usr/local/zookeeper/zookeeper1/conf/
cp zoo_sample.cfg zoo.cfg

进行过上面两步之后,有zoo.cfg文件了,现在修改内容为:

从零开始搭建Kafka+SpringBoot分布式消息系统

1
2
3
4
5
ini复制代码dataDir=/usr/local/zookeeper/zookeeper1/data
dataLogDir=/usr/local/zookeeper/zookeeper1/logs
server.1=192.168.233.11:2888:3888
server.2=192.168.233.11:2889:3889
server.3=192.168.233.11:2890:3890

4、搭建zookeeper2

首先,复制改名。

1
2
bash复制代码cd /usr/local/zookeeper/
cp -r zookeeper1 zookeeper2

然后修改具体的某些配置:

1
bash复制代码vim zookeeper2/conf/zoo.cfg

将下图三个地方1改成2

从零开始搭建Kafka+SpringBoot分布式消息系统

1
bash复制代码vim zookeeper2/data/myid

同时将myid中的值改成2

从零开始搭建Kafka+SpringBoot分布式消息系统

5、搭建zookeeper3

同上,复制改名

1
bash复制代码cp -r zookeeper1 zookeeper3

从零开始搭建Kafka+SpringBoot分布式消息系统

1
bash复制代码vim zookeeper3/conf/zoo.cfg

修改为3

从零开始搭建Kafka+SpringBoot分布式消息系统

1
bash复制代码vim zookeeper3/data/myid

修改为3

从零开始搭建Kafka+SpringBoot分布式消息系统

6、测试zookeeper集群

1
bash复制代码cd /usr/local/zookeeper/zookeeper1/bin/

由于启动所需代码比较多,这里简单写了一个启动脚本:

1
sql复制代码vim start

start的内容如下

1
2
3
4
5
6
bash复制代码cd /usr/local/zookeeper/zookeeper1/bin/
./zkServer.sh start ../conf/zoo.cfg
cd /usr/local/zookeeper/zookeeper2/bin/
./zkServer.sh start ../conf/zoo.cfg
cd /usr/local/zookeeper/zookeeper3/bin/
./zkServer.sh start ../conf/zoo.cfg

下面是连接脚本:

1
复制代码vim login

login内容如下:

1
bash复制代码./zkCli.sh -server 192.168.233.11:2181,192.168.233.11:2182,192.168.233.11:2183

脚本编写完成,接下来启动:

1
2
sql复制代码sh start
sh login

启动集群成功,如下图:

从零开始搭建Kafka+SpringBoot分布式消息系统

这里zookeeper就告一段落了,由于zookeeper占用着输入窗口,这里可以在xshell右键标签,新建ssh渠道。然后就可以在新窗口继续操作kafka了!

从零开始搭建Kafka+SpringBoot分布式消息系统

三、搭建kafka集群

1、下载kafka

首先创建kafka目录:

1
bash复制代码mkdir /usr/local/kafka

然后在该目录下载

1
2
bash复制代码cd /usr/local/kafka/
wget https://archive.apache.org/dist/kafka/1.1.0/kafka_2.11-1.1.0.tgz

下载成功之后解压:

1
复制代码tar -zxvf kafka_2.11-1.1.0.tgz

2、修改集群配置

首先进入conf目录下:

1
bash复制代码cd /usr/local/kafka/kafka_2.11-1.1.0/config

修改server.properties
修改内容:

1
2
3
ini复制代码broker.id=0
log.dirs=/tmp/kafka-logs
listeners=PLAINTEXT://192.168.233.11:9092

复制两份server.properties

1
2
matlab复制代码cp server.properties server2.properties
cp server.properties server3.properties

修改server2.properties

1
matlab复制代码vim server2.properties

修改主要内容为:

1
2
3
ini复制代码broker.id=1
log.dirs=/tmp/kafka-logs1
listeners=PLAINTEXT://192.168.233.11:9093

如上,修改server3.properties
修改内容为:

1
2
3
ini复制代码broker.id=2
log.dirs=/tmp/kafka-logs2
listeners=PLAINTEXT://192.168.233.11:9094

3、启动kafka

这里还是在bin目录编写一个脚本:

1
2
bash复制代码cd ../bin/
vim start

脚本内容为:

1
2
3
bash复制代码./kafka-server-start.sh ../config/server.properties &
./kafka-server-start.sh ../config/server2.properties &
./kafka-server-start.sh ../config/server3.properties &

通过jps命令可以查看到,共启动了3个kafka。

从零开始搭建Kafka+SpringBoot分布式消息系统

4、创建Topic

1
2
bash复制代码cd /usr/local/kafka/kafka_2.11-1.1.0
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 3 --partitions 1 --topic my-replicated-topic

从零开始搭建Kafka+SpringBoot分布式消息系统

kafka打印了几条日志

从零开始搭建Kafka+SpringBoot分布式消息系统

在启动的zookeeper中可以通过命令查询到这条topic!

1
bash复制代码ls /brokers/topics

从零开始搭建Kafka+SpringBoot分布式消息系统

查看kafka状态

1
css复制代码bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic my-replicated-topic

从零开始搭建Kafka+SpringBoot分布式消息系统

可以看到此时有三个节点 1 , 2 , 0

Leader 是1 ,
因为分区只有一个 所以在0上面,
Replicas:主从备份是 1,2,0,
ISR(in-sync):现在存活的信息也是 1,2,0

5、启动生产者

1
bash复制代码bin/kafka-console-producer.sh --broker-list localhost:9092 --topic my-replicated-topic

由于不能按删除,不能按左右键去调整,所以语句有些乱啊。em…

从零开始搭建Kafka+SpringBoot分布式消息系统

6、启动消费者

1
javascript复制代码bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --from-beginning --topic my-replicated-topic

可以看出,启动消费者之后就会自动消费。

从零开始搭建Kafka+SpringBoot分布式消息系统

在生产者又造了一条。

从零开始搭建Kafka+SpringBoot分布式消息系统

消费者自动捕获成功!

从零开始搭建Kafka+SpringBoot分布式消息系统

四、集成springboot

先贴一张kafka兼容性目录:

从零开始搭建Kafka+SpringBoot分布式消息系统

不满足的话启动springboot的时候会抛异常的!!!ps:该走的岔路我都走了o(╥﹏╥)o
(我的kafka-clients是1.1.0,spring-kafka是2.2.2,中间那列暂时不用管)

从零开始搭建Kafka+SpringBoot分布式消息系统

回归正题,搞了两个小时,终于搞好了,想哭…
遇到的问题基本就是jar版本不匹配。
上面的步骤我也都会相应的去修改,争取大家按照本教程一遍过!!!

1、pom文件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
xml复制代码<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.1.1.RELEASE</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<groupId>com.gzky</groupId>
<artifactId>study</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>study</name>
<description>Demo project for Spring Boot</description>

<properties>
<java.version>1.8</java.version>
</properties>

<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>

<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
<exclusions>
<exclusion>
<groupId>org.junit.vintage</groupId>
<artifactId>junit-vintage-engine</artifactId>
</exclusion>
</exclusions>
</dependency>

<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-redis</artifactId>
<version>1.3.8.RELEASE</version>
</dependency>

<dependency>
<groupId>redis.clients</groupId>
<artifactId>jedis</artifactId>
</dependency>

<!-- https://mvnrepository.com/artifact/org.springframework.kafka/spring-kafka -->
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<version>2.2.0.RELEASE</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka-clients -->
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
</dependency>

</dependencies>

<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>

</project>

pom文件中,重点是下面这两个版本。

1
2
3
4
5
6
7
8
9
10
11
xml复制代码<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.1.1.RELEASE</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<version>2.2.0.RELEASE</version>
</dependency>

2、application.yml

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
yaml复制代码spring:
redis:
cluster:
#设置key的生存时间,当key过期时,它会被自动删除;
expire-seconds: 120
#设置命令的执行时间,如果超过这个时间,则报错;
command-timeout: 5000
#设置redis集群的节点信息,其中namenode为域名解析,通过解析域名来获取相应的地址;
nodes: 192.168.233.11:9001,192.168.233.11:9002,192.168.233.11:9003,192.168.233.11:9004,192.168.233.11:9005,192.168.233.11:9006
kafka:
# 指定kafka 代理地址,可以多个
bootstrap-servers: 192.168.233.11:9092,192.168.233.11:9093,192.168.233.11:9094
producer:
retries: 0
# 每次批量发送消息的数量
batch-size: 16384
buffer-memory: 33554432
# 指定消息key和消息体的编解码方式
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer
consumer:
# 指定默认消费者group id
group-id: test-group
auto-offset-reset: earliest
enable-auto-commit: true
auto-commit-interval: 100
# 指定消息key和消息体的编解码方式
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer

server:
port: 8085
servlet:
#context-path: /redis
context-path: /kafka

没有配置Redis的可以把Redis部分删掉,也就是下图:
想学习配置Redis集群的可以参考:《Redis集群redis-cluster的搭建及集成springboot》

从零开始搭建Kafka+SpringBoot分布式消息系统

3、生产者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
typescript复制代码package com.gzky.study.utils;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Component;

/**
* kafka生产者工具类
*
* @author biws
* @date 2019/12/17
**/
@Component
public class KfkaProducer {

private static Logger logger = LoggerFactory.getLogger(KfkaProducer.class);

@Autowired
private KafkaTemplate<String, String> kafkaTemplate;

/**
* 生产数据
* @param str 具体数据
*/
public void send(String str) {
logger.info("生产数据:" + str);
kafkaTemplate.send("testTopic", str);
}
}

4、消费者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
kotlin复制代码package com.gzky.study.utils;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;

/**
* kafka消费者监听消息
*
* @author biws
* @date 2019/12/17
**/
@Component
public class KafkaConsumerListener {

private static Logger logger = LoggerFactory.getLogger(KafkaConsumerListener.class);

@KafkaListener(topics = "testTopic")
public void onMessage(String str){
//insert(str);//这里为插入数据库代码
logger.info("监听到:" + str);
System.out.println("监听到:" + str);
}

}

5、对外接口

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
less复制代码package com.gzky.study.controller;

import com.gzky.study.utils.KfkaProducer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.*;

/**
* kafka对外接口
*
* @author biws
* @date 2019/12/17
**/
@RestController
public class KafkaController {

@Autowired
KfkaProducer kfkaProducer;

/**
* 生产消息
* @param str
* @return
*/
@RequestMapping(value = "/sendKafkaWithTestTopic",method = RequestMethod.GET)
@ResponseBody
public boolean sendTopic(@RequestParam String str){
kfkaProducer.send(str);
return true;
}
}

6、postman测试

这里首先应该在服务器启动监听器(kafka根目录),下面命令必须是具体的服务器ip,不能是localhost,是我踩过的坑:

推荐此处重启一下集群
关闭kafka命令:

1
2
3
4
bash复制代码cd /usr/local/kafka/kafka_2.11-1.1.0/bin
./kafka-server-stop.sh ../config/server.properties &
./kafka-server-stop.sh ../config/server2.properties &
./kafka-server-stop.sh ../config/server3.properties &

此处应该jps看一下,等待所有的kafka都关闭(关不掉的kill掉),再重新启动kafka:

1
2
3
bash复制代码./kafka-server-start.sh ../config/server.properties &
./kafka-server-start.sh ../config/server2.properties &
./kafka-server-start.sh ../config/server3.properties &

等待kafka启动成功后,启动消费者监听端口:

1
2
bash复制代码cd /usr/local/kafka/kafka_2.11-1.1.0
bin/kafka-console-consumer.sh --bootstrap-server 192.168.233.11:9092 --from-beginning --topic testTopic

从零开始搭建Kafka+SpringBoot分布式消息系统

曾经我乱输的测试信息全部被监听过来了!

启动springboot服务

从零开始搭建Kafka+SpringBoot分布式消息系统

然后用postman生产消息:

从零开始搭建Kafka+SpringBoot分布式消息系统

然后享受成果,服务器端监听成功。

从零开始搭建Kafka+SpringBoot分布式消息系统

项目中也监听成功!

从零开始搭建Kafka+SpringBoot分布式消息系统

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

0%