背景介绍
近期项目中需要对数据进行转存,简单学习了一下Flink相关的知识点,通过一个简单的集成,对近期的学习成果做一个记录和梳理,帮助初学的同学窥探Flink流式数据处理的完整过程。Demo的设计思路是:应用系统定期向Kafka中发送数据,Flink获取Kafka中的流式数据后存储到MySQL数据库中。
1 | 复制代码【说明】由于本人在写笔记的时候也是一个Flink的初学者,难免会有思虑不全或者不对的地方,希望大家能够见谅, |
一、环境需求
1、开发环境操作系统是windows,需要安装kafka(2.13-2.7.0),kafka启动依赖zookeeper(3.5.5),jdk(1.8.61),mysql(8.0.15)
2、zookeeper环境:由于kafka启动时对zookeeper有依赖,首先配置zookeeper环境,解压从官网下载的zookeeper安装文件后(切记目录不要放得太深,否则windows下启动会保错),进入conf文件加下,修改zoo.cfg文件(如果没有可以将zoo_sample.cfg重命名),增加或修改内容如下
1 | ini复制代码# 可随意指定数据存放目录 |
从命令行进入zookeeper安装目录下,使用下面命令启动windows下的zookeeper服务,默认服务端口为2181
1 | python复制代码.\bin\zkServer.cmd |
命令行启动日志没有error信息说明启动成功,继续下面步骤。
3、kafka环境:解压下载的kafka安装包至合适的位置后,进入config目录,修改server.properties文件,修改内容如下
1 | ini复制代码# kafka日志文件位置 |
由于当前示例仅仅是测试开发,kafka、zookeeper没有做集群配置,其他配置均使用默认,使用下面命令启动kafka服务,如果控制台没有错误信息,说明启动成功。
1 | vbscript复制代码.\bin\windows\kafka-server-start.bat .\config\server.properties |
二、Demo开发
1、创建mysql数据库表
1 | sql复制代码CREATE TABLE `student` ( |
2、开发项目中的maven依赖
1 | pom复制代码<properties> |
3、使用druid创建数据库连接池
1 | java复制代码package com.nandy.influxdb.common; |
jdbc.properties
1 | properties复制代码driverClassName=com.mysql.cj.jdbc.Driver |
log4j.properties
1 | js复制代码log4j.rootLogger=info,stdout |
4、数据库持久化对象Student
1 | java复制代码package com.nandy.models; |
5、Flink中的sink对象
1 | java复制代码package com.nandy.mysql; |
6、主函数
1 | java复制代码/* |
三、测试及结果
1、创建测试类,向kafka创建的主题发送数据
1 | java复制代码package com.nandy.kafka; |
2、数据库student表中的数据
本文转载自: 掘金