前言
由于kafka强依赖于zookeeper,所以需先搭建好zookeeper集群。由于zookeeper是由java编写的,需运行在jvm上,所以首先应具备java环境。
(ps:默认您的centos系统可联网,本教程就不教配置ip什么的了)
(ps2:没有wget的先装一下:yum install wget)
(ps3:人啊,就是要条理。东边放一点,西边放一点,过段时间就不知道自己装在哪里了。本教程所有下载均放在/usr/local目录下)
(ps4:kafka可能有内置zookeeper,感觉可以越过zookeeper教程,但是这里也配置出来了。我没试过)
文章首发公众号:Java架构师联盟,每日更新技术好文
一、配置jdk
因为oracle 公司不允许直接通过wget 下载官网上的jdk包。所以你直接wget以下地址下载下来的是一个只有5k的网页文件而已,并不是需要的jdk包。(垄断地位就是任性)。
(请通过java -version判断是否自带jdk,我的没带)
1、官网下载
下面是jdk8的官方下载地址:
1 | ruby复制代码https://www.oracle.com/technetwork/java/javase/downloads/java-archive-javase8u211-later-5573849.html |
2、上传解压
这里通过xftp上传到服务器指定位置:/usr/local
对压缩文件进行解压:
1 | 复制代码tar -zxvf jdk-8u221-linux-x64.tar.gz |
对解压后的文件夹进行改名:
1 | bash复制代码mv jdk1.8.0_221 jdk1.8 |
3、配置环境变量
1 | bash复制代码vim /etc/profile |
操作之后的界面如下:
运行命令使环境生效
1 | bash复制代码source /etc/profile |
二、搭建zookeeper集群
1、下载zookeeper
创建zookeeper目录,在该目录下进行下载:
1 | bash复制代码mkdir /usr/local/zookeeper |
这一步如果出现连接被拒绝时可多试几次,我就是第二次请求才成功的。
1 | bash复制代码wget http://archive.apache.org/dist/zookeeper/zookeeper-3.4.6/zookeeper-3.4.6.tar.gz |
等待下载完成之后解压:
1 | 复制代码tar -zxvf zookeeper-3.4.6.tar.gz |
重命名为zookeeper1
1 | bash复制代码mv zookeeper-3.4.6 zookeeper1 |
2、创建data、logs文件夹
在zookeeper1目录下创建
在data目录下新建myid文件。内容为1
3、修改zoo.cfg文件
1 | bash复制代码cd /usr/local/zookeeper/zookeeper1/conf/ |
进行过上面两步之后,有zoo.cfg文件了,现在修改内容为:
1 | ini复制代码dataDir=/usr/local/zookeeper/zookeeper1/data |
4、搭建zookeeper2
首先,复制改名。
1 | bash复制代码cd /usr/local/zookeeper/ |
然后修改具体的某些配置:
1 | bash复制代码vim zookeeper2/conf/zoo.cfg |
将下图三个地方1改成2
1 | bash复制代码vim zookeeper2/data/myid |
同时将myid中的值改成2
5、搭建zookeeper3
同上,复制改名
1 | bash复制代码cp -r zookeeper1 zookeeper3 |
1 | bash复制代码vim zookeeper3/conf/zoo.cfg |
修改为3
1 | bash复制代码vim zookeeper3/data/myid |
修改为3
6、测试zookeeper集群
1 | bash复制代码cd /usr/local/zookeeper/zookeeper1/bin/ |
由于启动所需代码比较多,这里简单写了一个启动脚本:
1 | sql复制代码vim start |
start的内容如下
1 | bash复制代码cd /usr/local/zookeeper/zookeeper1/bin/ |
下面是连接脚本:
1 | 复制代码vim login |
login内容如下:
1 | bash复制代码./zkCli.sh -server 192.168.233.11:2181,192.168.233.11:2182,192.168.233.11:2183 |
脚本编写完成,接下来启动:
1 | sql复制代码sh start |
启动集群成功,如下图:
这里zookeeper就告一段落了,由于zookeeper占用着输入窗口,这里可以在xshell右键标签,新建ssh渠道。然后就可以在新窗口继续操作kafka了!
三、搭建kafka集群
1、下载kafka
首先创建kafka目录:
1 | bash复制代码mkdir /usr/local/kafka |
然后在该目录下载
1 | bash复制代码cd /usr/local/kafka/ |
下载成功之后解压:
1 | 复制代码tar -zxvf kafka_2.11-1.1.0.tgz |
2、修改集群配置
首先进入conf目录下:
1 | bash复制代码cd /usr/local/kafka/kafka_2.11-1.1.0/config |
修改server.properties
修改内容:
1 | ini复制代码broker.id=0 |
复制两份server.properties
1 | matlab复制代码cp server.properties server2.properties |
修改server2.properties
1 | matlab复制代码vim server2.properties |
修改主要内容为:
1 | ini复制代码broker.id=1 |
如上,修改server3.properties
修改内容为:
1 | ini复制代码broker.id=2 |
3、启动kafka
这里还是在bin目录编写一个脚本:
1 | bash复制代码cd ../bin/ |
脚本内容为:
1 | bash复制代码./kafka-server-start.sh ../config/server.properties & |
通过jps命令可以查看到,共启动了3个kafka。
4、创建Topic
1 | bash复制代码cd /usr/local/kafka/kafka_2.11-1.1.0 |
kafka打印了几条日志
在启动的zookeeper中可以通过命令查询到这条topic!
1 | bash复制代码ls /brokers/topics |
查看kafka状态
1 | css复制代码bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic my-replicated-topic |
可以看到此时有三个节点 1 , 2 , 0
Leader 是1 ,
因为分区只有一个 所以在0上面,
Replicas:主从备份是 1,2,0,
ISR(in-sync):现在存活的信息也是 1,2,0
5、启动生产者
1 | bash复制代码bin/kafka-console-producer.sh --broker-list localhost:9092 --topic my-replicated-topic |
由于不能按删除,不能按左右键去调整,所以语句有些乱啊。em…
6、启动消费者
1 | javascript复制代码bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --from-beginning --topic my-replicated-topic |
可以看出,启动消费者之后就会自动消费。
在生产者又造了一条。
消费者自动捕获成功!
四、集成springboot
先贴一张kafka兼容性目录:
不满足的话启动springboot的时候会抛异常的!!!ps:该走的岔路我都走了o(╥﹏╥)o
(我的kafka-clients是1.1.0,spring-kafka是2.2.2,中间那列暂时不用管)
回归正题,搞了两个小时,终于搞好了,想哭…
遇到的问题基本就是jar版本不匹配。
上面的步骤我也都会相应的去修改,争取大家按照本教程一遍过!!!
1、pom文件
1 | xml复制代码<?xml version="1.0" encoding="UTF-8"?> |
pom文件中,重点是下面这两个版本。
1 | xml复制代码<parent> |
2、application.yml
1 | yaml复制代码spring: |
没有配置Redis的可以把Redis部分删掉,也就是下图:
想学习配置Redis集群的可以参考:《Redis集群redis-cluster的搭建及集成springboot》
3、生产者
1 | typescript复制代码package com.gzky.study.utils; |
4、消费者
1 | kotlin复制代码package com.gzky.study.utils; |
5、对外接口
1 | less复制代码package com.gzky.study.controller; |
6、postman测试
这里首先应该在服务器启动监听器(kafka根目录),下面命令必须是具体的服务器ip,不能是localhost,是我踩过的坑:
推荐此处重启一下集群
关闭kafka命令:
1 | bash复制代码cd /usr/local/kafka/kafka_2.11-1.1.0/bin |
此处应该jps看一下,等待所有的kafka都关闭(关不掉的kill掉),再重新启动kafka:
1 | bash复制代码./kafka-server-start.sh ../config/server.properties & |
等待kafka启动成功后,启动消费者监听端口:
1 | bash复制代码cd /usr/local/kafka/kafka_2.11-1.1.0 |
曾经我乱输的测试信息全部被监听过来了!
启动springboot服务
然后用postman生产消息:
然后享受成果,服务器端监听成功。
项目中也监听成功!
本文转载自: 掘金