docker-compose搭建kafka集群及spring

docker搭建kafka集群(基于centos7)

1 docker安装

1
2
3
4
5
6
7
8
9
10
arduino复制代码yum remove docker \
docker-client \
docker-client-latest \
docker-common \
docker-latest \
docker-latest-logrotate \
docker-logrotate \
docker-selinux \
docker-engine-selinux \
docker-engine

1.1 安装docker 源

1
2
3
4
5
6
arduino复制代码yum install -y yum-utils device-mapper-persistent-data lvm2

yum-config-manager \
--add-repo \
http://mirrors.aliyun.com/docker-ce/linux/centos/docker-ce.repo
yum install docker-ce-18.03.0.ce

1.2 安装docker所需工具

1
kotlin复制代码yum install -y yum-utils device-mapper-persistent-data lvm2

1.3 安装指定版本docker

1
复制代码yum install -y docker-ce-18.09.9-3.el7

1.4 启动docker

1
bash复制代码systemctl enable docker && systemctl start docker

1.5 因国内下载docker镜像存在问题,需要配置下docker镜像源

1
2
3
4
5
bash复制代码vi /etc/docker/daemon.json 
## daemon.json 内容如下(daemon.json 初始不存在)
{
"registry-mirrors": ["https://alzgoonw.mirror.aliyuncs.com","http://hub-mirror.c.163.com"]
}

1.6 配置完docker 镜像后记得重启docker

1
复制代码systemctl restart docker

2 docker-compose 安装

2.1 下载docker-compose

1
bash复制代码curl -L "https://github.com/docker/compose/releases/download/1.23.2/docker-compose-$(uname -s)-$(uname -m)" -o /usr/local/bin/docker-compose

2.2 添加docker-compose 执行命令

1
bash复制代码chmod +x /usr/local/bin/docker-compose

3 使用docker-compose安装docker集群

3.1环境配置

3.1.1 kafka目录
1
2
3
arduino复制代码mkdir kafka1
mkdir kafka2
mkdir kafka3
3.1.2 zookeeper集群目录
1
2
3
arduino复制代码mkdir zookeeper1
mkdir zookeeper2
mkdir zookeeper3
3.1.3 zookeeper其他目录及配置
1
2
3
4
5
bash复制代码mkdir zooConfig
cd zooConfig
mkdir zoo1
mkdir zoo2
mkdir zoo3

3.1.4在zoo1,zoo2,zoo3中分别创建myid文件,并写入分别写入id数字,如zoo1中的myid中写入1

1
2
3
bash复制代码echo 1 ./zoo1/myid
echo 2 ./zoo1/myid
echo 3 ./zoo1/myid

3.1.5 创建zoo配置文件zoo.cfg

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
ini复制代码# The number of milliseconds of each tick
tickTime=2000
# The number of ticks that the initial
# synchronization phase can take
initLimit=10
# The number of ticks that can pass between
# sending a request and getting an acknowledgement
syncLimit=5
# the directory where the snapshot is stored.
# do not use /tmp for storage, /tmp here is just
# example sakes.
dataDir=/data
dataLogDir=/datalog
# the port at which the clients will connect
clientPort=2181
# the maximum number of client connections.
# increase this if you need to handle more clients
#maxClientCnxns=60
#
# Be sure to read the maintenance section of the
# administrator guide before turning on autopurge.
#
# http://zookeeper.apache.org/doc/current/zookeeperAdmin.html#sc_maintenance
#
# The number of snapshots to retain in dataDir
autopurge.snapRetainCount=3
# Purge task interval in hours
# Set to "0" to disable auto purge feature
autopurge.purgeInterval=1
server.1= 172.23.0.11:2888:3888
server.2= 172.23.0.12:2888:3888
server.3= 172.23.0.13:2888:3888

3.2 创建docker容器网络

docker network create --driver bridge --subnet 172.23.0.0/25 --gateway 172.23.0.1 zookeeper_network

3.3 docker-compose.yml

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
yaml复制代码version: '2'

services:

zoo1:
image: zookeeper:3.4.14
restart: always
container_name: zoo1
hostname: zoo1
ports:
- "2181:2181"
volumes:
- "./zooConfig/zoo.cfg:/conf/zoo.cfg"
- "/disk/docker/zookeeper1/data:/data"
- "/disk/docker/zookeeper1/datalog:/datalog"
environment:
ZOO_MY_ID: 1
ZOO_SERVERS: server.1=zoo1:2888:3888 server.2=zoo2:2888:3888 server.3=zoo3:2888:3888
networks:
default:
ipv4_address: 172.23.0.11

zoo2:
image: zookeeper:3.4.14
restart: always
container_name: zoo2
hostname: zoo2
ports:
- "2182:2181"
volumes:
- "./zooConfig/zoo.cfg:/conf/zoo.cfg"
- "/disk/docker/zookeeper2/data:/data"
- "/disk/docker/zookeeper2/datalog:/datalog"
environment:
ZOO_MY_ID: 2
ZOO_SERVERS: server.1=zoo1:2888:3888 server.2=zoo2:2888:3888 server.3=zoo3:2888:3888
networks:
default:
ipv4_address: 172.23.0.12

zoo3:
image: zookeeper:3.4.14
restart: always
container_name: zoo3
hostname: zoo3
ports:
- "2183:2181"
volumes:
- "./zooConfig/zoo.cfg:/conf/zoo.cfg"
- "/disk/docker/zookeeper3/data:/data"
- "/disk/docker/zookeeper3/datalog:/datalog"
environment:
ZOO_MY_ID: 3
ZOO_SERVERS: server.1=zoo1:2888:3888 server.2=zoo2:2888:3888 server.3=zoo3:2888:3888
networks:
default:
ipv4_address: 172.23.0.13

kafka1:
image: wurstmeister/kafka:2.12-2.0.1
restart: always
container_name: kafka1
hostname: kafka1
ports:
- 9092:9092
environment:
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://172.18.255.9:9092
KAFKA_ADVERTISED_HOST_NAME: kafka1
KAFKA_HOST_NAME: kafka1
KAFKA_ZOOKEEPER_CONNECT: zoo1:2181,zoo2:2181,zoo3:2181
KAFKA_ADVERTISED_PORT: 9092
KAFKA_BROKER_ID: 0
KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9092
volumes:
- /etc/localtime:/etc/localtime
- "/disk/docker/kafka1/logs:/kafka"
links:
- zoo1
- zoo2
- zoo3
networks:
default:
ipv4_address: 172.23.0.14

kafka2:
image: wurstmeister/kafka:2.12-2.0.1
restart: always
container_name: kafka2
hostname: kafka2
ports:
- 9093:9092
environment:
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://172.18.255.9:9093
KAFKA_ADVERTISED_HOST_NAME: kafka2
KAFKA_HOST_NAME: kafka2
KAFKA_ZOOKEEPER_CONNECT: zoo1:2181,zoo2:2181,zoo3:2181
KAFKA_ADVERTISED_PORT: 9093
KAFKA_BROKER_ID: 1
KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9092
volumes:
- /etc/localtime:/etc/localtime
- "/disk/docker/kafka2/logs:/kafka"
links:
- zoo1
- zoo2
- zoo3
networks:
default:
ipv4_address: 172.23.0.15

kafka3:
image: wurstmeister/kafka:2.12-2.0.1
restart: always
container_name: kafka3
hostname: kafka3
ports:
- 9094:9092
environment:
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://172.18.255.9:9094
KAFKA_ADVERTISED_HOST_NAME: kafka3
KAFKA_HOST_NAME: kafka3
KAFKA_ZOOKEEPER_CONNECT: zoo1:2181,zoo2:2181,zoo3:2181
KAFKA_ADVERTISED_PORT: 9094
KAFKA_BROKER_ID: 2
KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9092
volumes:
- /etc/localtime:/etc/localtime
- "/disk/docker/kafka3/logs:/kafka"
links:
- zoo1
- zoo2
- zoo3
networks:
default:
ipv4_address: 172.23.0.16

kafka-manager:
image: hlebalbau/kafka-manager:1.3.3.22
restart: always
container_name: kafka-manager
hostname: kafka-manager
ports:
- 9000:9000
links:
- kafka1
- kafka2
- kafka3
- zoo1
- zoo2
- zoo3
environment:
ZK_HOSTS: zoo1:2181,zoo2:2181,zoo3:2181
KAFKA_BROKERS: kafka1:9092,kafka2:9093,kafka3:9094
APPLICATION_SECRET: letmein
KAFKA_MANAGER_AUTH_ENABLED: "true"
KAFKA_MANAGER_USERNAME: "admin"
KAFKA_MANAGER_PASSWORD: "admin"
KM_ARGS: -Djava.net.preferIPv4Stack=true
networks:
default:
ipv4_address: 172.23.0.10

networks:
default:
external:
name: zookeeper_network

注意:

1.网上不少配置JMX_PORT端口,因启动就会出错,我这里没有配置,如必须要配置,见附录1kafka配置JMX_PORT出错解决方案;

2.将配置"PLAINTEXT://172.18.255.9:9094" 中的ip换成自己的机器的ip

3.4 一定要记得开启防火墙

1
2
3
4
5
6
7
8
9
10
css复制代码## 开启kafka-manager外网访问端口
firewall-cmd --zone=public --add-port=9000/tcp --permanent
##这三个为kafka集群节点互相通信的端口,服务器端需要开启,否则各个节点间无法互通
firewall-cmd --zone=public --add-port=9092/tcp --permanent
firewall-cmd --zone=public --add-port=9093/tcp --permanent
firewall-cmd --zone=public --add-port=9094/tcp --permanent
## 重启防火墙
firewall-cmd --reload
## 最好重启下docker
systemctl restart docker

3.5 启动kafka集群

3.5.1 启动集群

docker-compose -f docker-compose.yml up -d

image.png

3.5.2 停止集群

docker-compose -f docker-compose.yml stop

3.6 kafka集群检查

3.6.1 查看zookeeper集群是否正常
1
2
bash复制代码docker exec -it zoo1 bash
bin/zkServer.sh status
3.6.2 创建topic
1
2
3
4
5
sql复制代码docker exec -it kafka1 bash
kafka-topics.sh --create --zookeeper zoo1:2181 --replication-factor 1 --partitions 3 --topic test001
kafka-topics.sh --list --zookeeper zoo1:2181
kafka-topics.sh --list --zookeeper zoo2:2181
kafka-topics.sh --list --zookeeper zoo3:2181
3.6.3 生产消息

kafka-console-producer.sh --broker-list kafka1:9092,kafka2:9093,kafka3:9094 --topic test001

image.png
下面两条警告日志不影响使用,实际是连通状态的

3.6.3 消费消息

kafka-console-consumer.sh --bootstrap-server kafka1:9092,kafka2:9093,kafka3:9094 --topic test001 --from-beginning

image.png

3.7 kafka-manager访问

3.7.1 浏览器打开访问

http://你的虚拟机ip:9000/

image.png

3.7.2 配置你创建的集群

image.png

3.7.2 下面就可以查看你的集群了

image.png

3 spring-boot连接kafka集群

3.1 application.yml配置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
yaml复制代码spring:
application:
name: kafka-service
kafka:
# 指定kafka 代理地址,可以多个
bootstrap-servers: 192.168.207.82:9092,192.168.207.82:9093,192.168.207.82:9094
producer:
retries: 0
# 每次批量发送消息的数量
batch-size: 16384
# 缓存容量
buffer-memory: 33554432
# 指定消息key和消息体的编解码方式
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer
consumer:
# 指定默认消费者group id
group-id: consumer-tutorial
auto-commit-interval: 100
auto-offset-reset: earliest
enable-auto-commit: true
# 指定消息key和消息体的编解码方式
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
# 指定listener 容器中的线程数,用于提高并发量
listener:
concurrency: 3

3.2 启动类

1
2
3
4
5
6
7
less复制代码@SpringBootApplication(exclude = {DataSourceAutoConfiguration.class})
@EnableEurekaClient
public class KafkaApplication {
public static void main(String[] args) {
SpringApplication.run(KafkaApplication.class, args);
}
}

3.3 用了监听启动观察kafka生产及消费情况,小伙伴们可以单测手动调试

3.3.1 KafkaProducer
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
less复制代码@Component
@EnableScheduling
public class KafkaProducer {

@Autowired
private KafkaTemplate<String, String> kafkaTemplate;

/**
* 定时任务
*/
@Scheduled(cron = "00/1 * * * * ?")
public void send(){
String message = UUID.randomUUID().toString();
ListenableFuture<?> future = kafkaTemplate.send("test003","kafka message test");
String message2 = "第二种类的订阅消息发送";
ListenableFuture<?> future2 = kafkaTemplate.send("test002",message2);
future.addCallback(o -> System.out.println("send-消息发送成功:" + message), throwable -> System.out.println("消息发送失败:" + message));
future2.addCallback(o -> System.out.println("send-消息发送成功:" + message2), throwable -> System.out.println("消息发送失败:" + message2));
}
}
3.3.1 KafkaConsumer
1
2
3
4
5
6
7
8
9
10
11
12
13
typescript复制代码@Component
public class KafkaConsumer {

@KafkaListener(topics = {"test001"})
public void receive(String message){
System.out.println("test001--消费消息:" + message);
}

@KafkaListener(topics = {"test002"})
public void receive2(String message){
System.out.println("test002--消费消息:" + message);
}
}

image.png
以上spring-boot项目就自己搭建吧,这里不赘述了

踩坑不易,欢迎大家评阅

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

0%