引言
需求背景: ETL离线作业,需要实时监控运行状况,由于调度工具是Azkaban,故同步获取其后台配置库Mysql;本文为第二步:flink-cdc监控execution_jobs表,实时获取作业运行状况,关联第一步处理的projects广播变量,组装信息—— projectName、flowName、jobName、status、startTime、endTime
功能部件
flink-cdc-connector + Mysql + flink Table Api
实现逻辑
- 利用flink-connector,实现cdc元数据库execution_jobs表,获取作业实时运行情况;
- 筛选指定的字段,将TableStream转化为DataStream,并封装样例类;
- join广播变量,获取projectName;
前提准备
- 确保Mysql库,开启binlong并设置为ROW格式; (off:关闭; on:开启)
1 | sql复制代码SHOW VARIABLES LIKE 'log_bin'; |
- 创建cdc对应的用户
1 | sql复制代码-- create user |
实现Demo
- 创建结果样例类
1 | scala复制代码object Domain { |
- flink-cdc实现 TableApi
1 | scala复制代码 |
- join广播变量获取projectName
- 自定义BroadcastProcessFunction 实现join关联逻辑
1 | scala复制代码 |
- 将cdc结果与广播变量join组合新的DataStream
1 | scala复制代码object DataFlowMonitor extends Sentence { |
本文转载自: 掘金