02flink实时数据管理-cdc捕获ETL作业进度 +

引言

需求背景: ETL离线作业,需要实时监控运行状况,由于调度工具是Azkaban,故同步获取其后台配置库Mysql;本文为第二步:flink-cdc监控execution_jobs表,实时获取作业运行状况,关联第一步处理的projects广播变量,组装信息—— projectName、flowName、jobName、status、startTime、endTime

功能部件

flink-cdc-connector + Mysql + flink Table Api

实现逻辑

  1. 利用flink-connector,实现cdc元数据库execution_jobs表,获取作业实时运行情况;
  2. 筛选指定的字段,将TableStream转化为DataStream,并封装样例类;
  3. join广播变量,获取projectName;

前提准备

  1. 确保Mysql库,开启binlong并设置为ROW格式; (off:关闭; on:开启)
1
sql复制代码SHOW VARIABLES LIKE 'log_bin';
  1. 创建cdc对应的用户
1
2
3
4
5
6
sql复制代码-- create user
CREATE USER 'flink_cdc'@'localhost' IDENTIFIED BY 'password';
-- grant user
GRANT SELECT, RELOAD, SHOW DATABASES, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'flink_cdc' IDENTIFIED BY 'password';
-- flush
FLUSH PRIVILEGES;

实现Demo

  1. 创建结果样例类
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
scala复制代码object Domain {

case class ExecutionJobs(
project_node: String,
project_id: String,
flow_name: String,
job_name: String,
active_status: String,
create_time: String,
update_time: String,
event_time: String
)

case class ProjectsClass(
project_id: String,
project_name: String,
project_desc: String,
create_time: String,
update_time: String
)

case class UpsertNeo4j(
project_node: String,
project_name: String,
flow_name: String,
job_name: String,
active_status: String,
create_time: String,
update_time: String
)



}
  1. flink-cdc实现 TableApi
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
scala复制代码
import com.haierubic.bigdata.dataflow.models.Domain.ExecutionJobs
import com.haierubic.bigdata.dataflow.models.Sentence
import com.haierubic.bigdata.dataflow.utils.ConfigParse
import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment
import org.apache.flink.streaming.api.scala._

/**
* flink-cdc azkaban-mysql 元数据库获取工程运行情况
* created by moun
*
* @FAQ:
* 1. 使用flink-cdc-mysql 1.1.0版本;
* 2. flink-cdc-mysql: scan.startup.mode 参数不起作用; --替换为:'debezium.snapshot.mode' = 'schema_only'
*/

object ChangeDataCapture extends Sentence {

def genUpdateNeo4jInfo(tEnv: StreamTableEnvironment) = {
val azkHost: String = ConfigParse.getString("db.azkaban.host")
val azkPort: Int = ConfigParse.getInt("db.azkaban.port")
val azkUserName: String = ConfigParse.getString("db.azkaban.username")
val azkPassword: String = ConfigParse.getString("db.azkaban.password")
val azkDBName: String = ConfigParse.getString("db.azkaban.dbname")
val cdcTabExecutionJobs: String = ConfigParse.getString("cdc.azkaban_run_jobs.tablename")

// val url = ConfigParse.getString("db.neo4j.commitUrl")
val project_node = ConfigParse.getString("kg.neo4j.projectNode")


// cdc execution_jobs 表,获取工程作业运行记录
tEnv.executeSql(
s"""
|create table execution_jobs (
| exec_id int NOT NULL,
| project_id string NOT NULL,
| version int NOT NULL,
| flow_id string NOT NULL,
| job_id string NOT NULL,
| attempt int NOT NULL,
| start_time bigint NULL,
| end_time bigint NULL,
| status string NULL,
| input_params string,
| output_params string,
| attachments string
|) with (
| 'connector' = 'mysql-cdc',
| 'hostname' = '${azkHost}',
| 'port' = '${azkPort}',
| 'username' = '${azkUserName}',
| 'password' = '${azkPassword}',
| 'database-name' = '${azkDBName}',
| 'table-name' = '${cdcTabExecutionJobs}',
| 'debezium.snapshot.mode' = 'schema_only'
|)
|""".stripMargin
)


val update_info = tEnv.sqlQuery(
s"""
|select
| '${project_node}' as project_node,
| project_id,
| flow_id,
| job_id,
| status as active_status,
| from_unixtime(start_time/1000,'yyyy-MM-dd HH:mm:ss') as create_time,
| from_unixtime(end_time/1000,'yyyy-MM-dd HH:mm:ss') as update_time,
| cast((case when end_time = -1 then start_time else end_time end) as varchar) as event_time
|from execution_jobs
|""".stripMargin)


tEnv.toRetractStream[ExecutionJobs](update_info)
}

def main(args: Array[String]): Unit = {
println("cdcMysql~~")
}
}
  1. join广播变量获取projectName
  • 自定义BroadcastProcessFunction 实现join关联逻辑
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
scala复制代码
import com.haierubic.bigdata.dataflow.models.Domain.{ExecutionJobs, ProjectsClass, UpsertNeo4j}
import org.apache.flink.api.common.state.MapStateDescriptor
import org.apache.flink.api.common.typeinfo.BasicTypeInfo
import org.apache.flink.streaming.api.functions.co.BroadcastProcessFunction
import org.apache.flink.util.Collector

class CustomBroadcastProcessFunction extends BroadcastProcessFunction[ExecutionJobs, ProjectsClass, UpsertNeo4j] {

// 建立MapStateDescriptor
val projectsDimDesc = new MapStateDescriptor(
"projects",
BasicTypeInfo.STRING_TYPE_INFO,
BasicTypeInfo.STRING_TYPE_INFO
)

override def processElement(
in1: ExecutionJobs,
readOnlyContext: BroadcastProcessFunction[ExecutionJobs, ProjectsClass, UpsertNeo4j]#ReadOnlyContext,
collector: Collector[UpsertNeo4j]): Unit = {
// 1. 从value中获取 project_id
val project_id = in1.project_id

val project_name = readOnlyContext.getBroadcastState(projectsDimDesc).get(project_id)

// 可以查询到对应的工程名称,则输出,未查到则跳过
if (project_name != null) {
// 2. 封装collector 类型
val result = UpsertNeo4j(in1.project_node, project_name, in1.flow_name, in1.job_name, in1.active_status, in1.create_time, in1.update_time)

collector.collect(result)
}

}

override def processBroadcastElement(
in2: ProjectsClass,
context: BroadcastProcessFunction[ExecutionJobs, ProjectsClass, UpsertNeo4j]#Context,
collector: Collector[UpsertNeo4j]): Unit = {
// 获取广播状态
val broadcastState = context.getBroadcastState(projectsDimDesc)

// 清空广播状态
// broadcastState.clear()

// 更新广播状态
broadcastState.put(in2.project_id, in2.project_name)
}
}
  • 将cdc结果与广播变量join组合新的DataStream
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
scala复制代码object DataFlowMonitor extends Sentence {
def main(args: Array[String]): Unit = {


// flink env
val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment

// 设置时间语义为 事件时间Event Time
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)

env.setParallelism(1)

// flink table settings
val tSettings: EnvironmentSettings = EnvironmentSettings
.newInstance()
.useBlinkPlanner()
.inStreamingMode()
.build()

// flink table env
val tEnv: StreamTableEnvironment = StreamTableEnvironment.create(env, tSettings)

// Step1: cdc-azkaban-execution_jobs, get project running status
val updateNeo4jSteam = ChangeDataCapture.genUpdateNeo4jInfo(tEnv)
// filter status = 'insert'
val filterUpdateNeo4jStream = updateNeo4jSteam.filter(_._1).map(_._2)
// Step2: broadcast azkaban-projects, get project_name
val projectsDataStream = QueryProjects.getProjects(env)

// Step3: join broadcast, replace project_name
val result = filterUpdateNeo4jStream.connect(projectsDataStream).process(
new CustomBroadcastProcessFunction() // 自定义广播变量函数
)

}

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

0%