Flink学习01-快速上手(Kafka+Flink+Mys

背景介绍

近期项目中需要对数据进行转存,简单学习了一下Flink相关的知识点,通过一个简单的集成,对近期的学习成果做一个记录和梳理,帮助初学的同学窥探Flink流式数据处理的完整过程。Demo的设计思路是:应用系统定期向Kafka中发送数据,Flink获取Kafka中的流式数据后存储到MySQL数据库中。

1
2
复制代码【说明】由于本人在写笔记的时候也是一个Flink的初学者,难免会有思虑不全或者不对的地方,希望大家能够见谅,
如果本文有幸对您的工作或学习有所帮助,笔者在此感到无限荣幸。

一、环境需求

1、开发环境操作系统是windows,需要安装kafka(2.13-2.7.0),kafka启动依赖zookeeper(3.5.5),jdk(1.8.61),mysql(8.0.15)

2、zookeeper环境:由于kafka启动时对zookeeper有依赖,首先配置zookeeper环境,解压从官网下载的zookeeper安装文件后(切记目录不要放得太深,否则windows下启动会保错),进入conf文件加下,修改zoo.cfg文件(如果没有可以将zoo_sample.cfg重命名),增加或修改内容如下

1
2
3
4
ini复制代码# 可随意指定数据存放目录
dataDir=D:\software\bigData\zookeeper\3.5.5\data
# 文件末尾增加一行日志存放目录
dataLogDir=D:\software\bigData\zookeeper\3.5.5\data\log

从命令行进入zookeeper安装目录下,使用下面命令启动windows下的zookeeper服务,默认服务端口为2181

1
python复制代码.\bin\zkServer.cmd

命令行启动日志没有error信息说明启动成功,继续下面步骤。

3、kafka环境:解压下载的kafka安装包至合适的位置后,进入config目录,修改server.properties文件,修改内容如下

1
2
3
4
ini复制代码# kafka日志文件位置
log.dirs=./logs
# 步骤2中配置的zookeeper的地址和端口
zookeeper.connect=localhost:2181

由于当前示例仅仅是测试开发,kafka、zookeeper没有做集群配置,其他配置均使用默认,使用下面命令启动kafka服务,如果控制台没有错误信息,说明启动成功。

1
vbscript复制代码.\bin\windows\kafka-server-start.bat .\config\server.properties

二、Demo开发

1、创建mysql数据库表

1
2
3
4
5
6
7
sql复制代码CREATE TABLE `student`  (
`id` bigint(20) NOT NULL AUTO_INCREMENT,
`name` varchar(20) CHARACTER SET utf8 COLLATE utf8_general_ci NOT NULL,
`age` bigint(20) NULL DEFAULT NULL,
`createDate` varchar(100) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL,
PRIMARY KEY (`id`) USING BTREE
)

2、开发项目中的maven依赖

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
pom复制代码<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
<flink.version>1.10.0</flink.version>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>${flink.version}</version>
</dependency>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_2.11</artifactId>
<version>${flink.version}</version>
</dependency>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_2.11</artifactId>
<version>${flink.version}</version>
</dependency>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka-0.11_2.11</artifactId>
<version>${flink.version}</version>
</dependency>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-jdbc_2.11</artifactId>
<version>${flink.version}</version>
</dependency>

<dependency>
<groupId>org.influxdb</groupId>
<artifactId>influxdb-java</artifactId>
<version>2.17</version>
</dependency>

<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>8.0.15</version>
</dependency>

<dependency>
<groupId>com.alibaba</groupId>
<artifactId>druid</artifactId>
<version>1.1.22</version>
</dependency>

<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.18.12</version>
</dependency>

<dependency>
<groupId>com.google.code.gson</groupId>
<artifactId>gson</artifactId>
<version>2.8.5</version>
</dependency>

<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.71</version>
</dependency>

<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-collections4</artifactId>
<version>4.2</version>
</dependency>

<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>1.7.25</version>
</dependency>

<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-simple</artifactId>
<version>1.7.25</version>
<scope>compile</scope>
</dependency>

<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-nop</artifactId>
<version>1.7.25</version>
<scope>test</scope>
</dependency>

3、使用druid创建数据库连接池

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
java复制代码package com.nandy.influxdb.common;

import com.alibaba.druid.pool.DruidDataSourceFactory;
import lombok.extern.slf4j.Slf4j;

import javax.sql.DataSource;
import java.io.IOException;
import java.io.InputStream;
import java.sql.Connection;
import java.sql.SQLException;
import java.util.Properties;

/**
* @author nandy
* @create 2021/3/2 16:32
*/
@Slf4j
public class ConnectionPoolUtils {

private static final Properties properties;
private static Connection conn;

static {
properties = new Properties();
ClassLoader classLoader = ConnectionPoolUtils.class.getClassLoader();
InputStream resourceAsStream = classLoader.getResourceAsStream("jdbc.properties");
// 加载配置文件
try {
properties.load(resourceAsStream);
} catch (IOException e) {
log.error("Load jdbc properties exception.");
}
}

private ConnectionPoolUtils() {
}

public static Connection getConnection() throws SQLException {

try {
DataSource dataSource = DruidDataSourceFactory.createDataSource(properties);
conn = dataSource.getConnection();
} catch (SQLException e) {
log.error("error occurred :" + e);
conn = null;
} catch (Exception e) {
log.error("error occurred while creating connection pool!");
}
return conn;
}
}

jdbc.properties

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
properties复制代码driverClassName=com.mysql.cj.jdbc.Driver
url=jdbc:mysql://localhost:3306/flink_study?serverTimezone=UTC
username=root
password=xxx
filters=stat
initialSize=5
maxActive=50
maxWait=60000
timeBetweenEvictionRunsMillis=60000
minEvictableIdleTimeMillis=300000
validationQuery=SELECT 1
testWhileIdle=true
testOnBorrow=false
testOnReturn=false
poolPreparedStatements=false
maxPoolPreparedStatementPerConnectionSize=200

log4j.properties

1
2
3
4
5
js复制代码log4j.rootLogger=info,stdout
log4j.appender.stdout = org.apache.log4j.ConsoleAppender
log4j.appender.stdout.Target = System.out
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss,SSS} [%t] [%c] [%p] - %m%n

4、数据库持久化对象Student

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
java复制代码package com.nandy.models;

import lombok.Getter;
import lombok.Setter;

import java.io.Serializable;

/**
* @author nandy
* @create 2021/3/2 18:54
*/
@Setter
@Getter
public class Student implements Serializable {

private static final long serialVersionUID = -3247106837870523911L;

private int id;

private String name;

private int age;

private String createDate;
}

5、Flink中的sink对象

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
java复制代码package com.nandy.mysql;

import com.nandy.models.Student;
import com.nandy.mysql.utils.ConnectionPoolUtils;
import lombok.extern.slf4j.Slf4j;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;

import java.sql.Connection;
import java.sql.PreparedStatement;
import java.util.List;

/**
* @author nandy
* @create 2021/3/2 17:13
*/
@Slf4j
public class Flink2JdbcWriter extends RichSinkFunction<List<Student>> {
private static final long serialVersionUID = -5072869539213229634L;


private transient Connection connection = null;
private transient PreparedStatement ps = null;
private volatile boolean isRunning = true;

@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);

connection = ConnectionPoolUtils.getConnection();
if (null != connection) {
ps = connection.prepareStatement("insert into student (name, age, createDate) values (?, ?, ?);");
}
}

@Override
public void invoke(List<Student> list, Context context) throws Exception {

if (isRunning && null != ps) {
for (Student one : list) {
ps.setString(1, one.getName());
ps.setInt(2, one.getAge());
ps.setString(3, one.getCreateDate());
ps.addBatch();
}
int [] count = ps.executeBatch();
log.info("成功写入Mysql数量:" + count.length);
}
}

@Override
public void close() throws Exception {
try {
super.close();
if (connection != null) {
connection.close();
}
if (ps != null) {
ps.close();
}
} catch (Exception e) {
e.printStackTrace();
}
isRunning = false;
}
}

6、主函数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
java复制代码/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.nandy;

import com.alibaba.fastjson.JSON;
import com.nandy.models.Student;
import com.nandy.mysql.Flink2JdbcWriter;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.shaded.guava18.com.google.common.collect.Lists;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.windowing.AllWindowFunction;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010;
import org.apache.flink.util.Collector;

import java.util.List;
import java.util.Properties;

/**
* Skeleton for a Flink Streaming Job.
*
* <p>For a tutorial how to write a Flink streaming application, check the
* tutorials and examples on the <a href="http://flink.apache.org/docs/stable/">Flink Website</a>.
*
* <p>To package your application into a JAR file for execution, run
* 'mvn clean package' on the command line.
*
* <p>If you change the name of the main class (with the public static void main(String[] args))
* method, change the respective entry in the POM.xml file (simply search for 'mainClass').
*/
@Slf4j
public class FlinkReadDbWriterDb {

public static void main(String[] args) throws Exception {
// 构建流执行环境
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

// kafka 配置
Properties props = new Properties();
props.put("bootstrap.servers", "192.168.10.42:9092");
props.put("zookeeper.connect", "192.168.10.42:2181");
props.put("group.id", "metric-group");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("auto.offset.reset", "latest");

DataStreamSource<String> dataStreamSource = env.addSource(
new FlinkKafkaConsumer010<String>(
//这个 kafka topic 需要和上面的工具类的 topic 一致
"student",
new SimpleStringSchema(),
props))
//单线程打印,控制台不乱序,不影响结果
.setParallelism(1);
//从kafka里读取数据,转换成Person对象
DataStream<Student> dataStream = dataStreamSource.map(string -> JSON.parseObject(string, Student.class));

//收集5秒钟的总数
dataStream.timeWindowAll(Time.seconds(5L)).apply(new AllWindowFunction<Student, List<Student>, TimeWindow>() {
@Override
public void apply(TimeWindow timeWindow, Iterable<Student> iterable, Collector<List<Student>> collector) throws Exception {
List<Student> students = Lists.newArrayList(iterable);
if(CollectionUtils.isNotEmpty(students)){
log.info("5秒的总共收到的条数:" + students.size());
collector.collect(students);
}
}
//sink 到数据库
}).addSink(new Flink2JdbcWriter());

env.execute("Flink Streaming Java API Skeleton");
}
}

三、测试及结果

1、创建测试类,向kafka创建的主题发送数据

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
java复制代码package com.nandy.kafka;

import com.alibaba.fastjson.JSON;
import com.nandy.models.Student;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.RandomUtils;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;

import java.time.LocalDateTime;
import java.util.Properties;
import java.util.concurrent.TimeUnit;

/**
* @author nandy
* @create 2021/3/4 9:50
*/
@Slf4j
public class KafkaWriter {
//本地的kafka机器列表
public static final String BROKER_LIST = "192.168.10.42:9092";
//kafka的topic
public static final String TOPIC_PERSON = "student";
//key序列化的方式,采用字符串的形式
public static final String KEY_SERIALIZER = "org.apache.kafka.common.serialization.StringSerializer";
//value的序列化的方式
public static final String VALUE_SERIALIZER = "org.apache.kafka.common.serialization.StringSerializer";

public static void writeToKafka() {
Properties props = new Properties();
props.put("bootstrap.servers", BROKER_LIST);
props.put("key.serializer", KEY_SERIALIZER);
props.put("value.serializer", VALUE_SERIALIZER);

try (KafkaProducer<String, String> producer = new KafkaProducer<>(props)) {
//构建Person对象,在name为hqs后边加个随机数
int randomInt = RandomUtils.nextInt(1, 100000);
Student student = new Student();
student.setName("nandy" + randomInt);
student.setAge(randomInt);
student.setCreateDate(LocalDateTime.now().toString());
//转换成JSON
String personJson = JSON.toJSONString(student);

//包装成kafka发送的记录
ProducerRecord<String, String> record = new ProducerRecord<>(TOPIC_PERSON, null,
null, personJson);
//发送到缓存
producer.send(record);
log.info("向kafka发送数据:" + personJson);
//立即发送
producer.flush();
}
}

public static void main(String[] args) {
int count = 0;
while (count < 20) {
try {
//每三秒写一条数据
TimeUnit.SECONDS.sleep(3);
writeToKafka();
count++;
} catch (Exception e) {
e.printStackTrace();
}
}
}
}

2、数据库student表中的数据

image.png

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

0%