Flink 从0-1实现 电商实时数仓 - DIM & DW

这是我参与8月更文挑战的第7天,活动详情查看:8月更文挑战

往期:juejin.cn/column/6994…

12. 处理业务数据
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
ini复制代码public class BaseDBApp {

/**
* 业务数据 topic
*/
private static final String TOPIC_BASE = "ods_base_db_m";
private static final String BASE_GROUP_ID = "ods_dwd_base_log_db";

public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(4);

//检查点,省略

//拿到数据流
DataStreamSource<String> dataStreamSource = env.addSource(KafkaUtil.ofSource(TOPIC_BASE, BASE_GROUP_ID));
//转化格式,ETL
SingleOutputStreamOperator<JSONObject> filteredDs = dataStreamSource
.map(JSON::parseObject)
.filter(new RichFilterFunction<JSONObject>() {
@Override
public boolean filter(JSONObject jsonObject) throws Exception {
return StringUtils.isNotBlank(jsonObject.getString("table"))
&& jsonObject.getJSONObject("data") != null;
}
});
// Flink CDC 读取配置流
DataStreamSource<String> ruleSource = env.addSource(MySQLSource.<String>builder()
.hostname("hadoop3")
.port(3306)
.username("root")
.password("密码")
.databaseList("tmall_realtime")
.tableList("tmall_realtime.table_process")
.deserializer(new DebeziumDeserializationSchema<String>() {
@Override
public void deserialize(SourceRecord sourceRecord, Collector<String> collector) throws Exception {
Struct value = (Struct) sourceRecord.value();
/*
* Struct{
* after=Struct{
* source_table=111,
* operate_type=tses,
* sink_type=1,
* sink_table=1111
* },
* source=Struct{
* db=tmall_realtime,
* table=table_process
* },
* op=c
* }
*/
Struct source = value.getStruct("source");

JSONObject jsonObject = new JSONObject();
jsonObject.put("database", source.getString("db"));
jsonObject.put("table", source.getString("table"));
jsonObject.put("type", CDCTypeEnum.of(value.getString("op")).toString().toLowerCase());
Struct after = value.getStruct("after");
JSONObject data = new JSONObject();
for (Field field : after.schema().fields()) {
data.put(field.name(), after.get(field.name()));
}
jsonObject.put("data", data);

collector.collect(jsonObject.toJSONString());
}

@Override
public TypeInformation<String> getProducedType() {
return BasicTypeInfo.STRING_TYPE_INFO;
}
})
.startupOptions(StartupOptions.initial())
.build()
);

//配置流状态
MapStateDescriptor<String, TableProcess> mapStateDescriptor = new MapStateDescriptor<>("table_process", String.class, TableProcess.class);

//广播配置流状态
BroadcastStream<String> broadcast = ruleSource.broadcast(mapStateDescriptor);

//定义侧输出流,存放DIM数据
OutputTag<JSONObject> dimTag = new OutputTag<JSONObject>("dim_tag") {
};

//正式处理数据
SingleOutputStreamOperator<JSONObject> dwdDs = filteredDs
//合并 配置流
.connect(broadcast)
.process(new BroadcastProcessFunction<JSONObject, String, JSONObject>() {

private Connection connection;

@Override
public void open(Configuration parameters) throws Exception {
Class.forName("org.apache.phoenix.jdbc.PhoenixDriver");
connection = DriverManager.getConnection(TmallConfig.PHOENIX_SERVER);
}

@Override
public void close() throws Exception {
connection.close();
}

/**
* 处理 ODS 数据流
* @param jsonObject
* @param readOnlyContext
* @param collector
* @throws Exception
*/
@Override
public void processElement(JSONObject jsonObject, ReadOnlyContext readOnlyContext, Collector<JSONObject> collector) throws Exception {
//获取 配置流状态
ReadOnlyBroadcastState<String, TableProcess> broadcastState = readOnlyContext.getBroadcastState(mapStateDescriptor);

String table = jsonObject.getString("table");
String type = jsonObject.getString("type");
// MaxWell 处理历史数据 insert 的操作类型是 bootstrap-insert 需要修正一些
if ("bootstrap-insert".equals(type)) {
type = "insert";
jsonObject.put("type", type);
}

//拿到配置
String key = table + ":" + type;
TableProcess tableProcess = broadcastState.get(key);

if (tableProcess != null) {
//目标表放进去
jsonObject.put("sink_table", tableProcess.getSinkTable());
jsonObject.put("sink_pk", tableProcess.getSinkPk());
//过滤字段
HashSet<String> columnSet = Sets.newHashSet(tableProcess.getSinkColumns().split(","));
jsonObject.getJSONObject("data").entrySet().removeIf(e -> !columnSet.contains(e.getKey()));
//发送位置
String sinkType = tableProcess.getSinkType();
if (TableProcess.SINK_TYPE_KAFKA.equals(sinkType)) {
collector.collect(jsonObject);
} else if (TableProcess.SINK_TYPE_HBASE.equals(sinkType)) {
readOnlyContext.output(dimTag, jsonObject);
}
} else {
//没有配置
System.out.println("NO this Key in TableProcess" + key);
}
}

/**
* 处理 配置流 数据
* @param s
* @param context
* @param collector
* @throws Exception
*/
@Override
public void processBroadcastElement(String s, Context context, Collector<JSONObject> collector) throws Exception {
JSONObject jsonObject = JSON.parseObject(s);
TableProcess tableProcess = jsonObject.getObject("data", TableProcess.class);
String sourceTable = tableProcess.getSourceTable();
String operateType = tableProcess.getOperateType();
String sinkType = tableProcess.getSinkType();
String sinkPk = StringUtils.defaultString(tableProcess.getSinkPk(), "id");
String sinkExt = StringUtils.defaultString(tableProcess.getSinkExtend());
String sinkTable = tableProcess.getSinkTable();
String sinkColumns = tableProcess.getSinkColumns();

//如果是维度数据,需要通过Phoenix创建表
if (TableProcess.SINK_TYPE_HBASE.equals(sinkType) && CDCTypeEnum.INSERT.toString().toLowerCase().equals(operateType)) {
StringBuilder sql = new StringBuilder();
sql.append("create table if not exists ").append(TmallConfig.HBASE_SCHEMA).append(".").append(sinkTable).append(" ( ");
String[] columns = sinkColumns.split(",");
for (int i = 0; i < columns.length; i++) {
String column = columns[i];
if (sinkPk.equals(column)) {
sql.append(column).append(" varchar primary key ");
} else {
sql.append("info.").append(column).append(" varchar ");
}
if (i < columns.length - 1) {
sql.append(" , ");
}
}
sql.append(" ) ")
.append(sinkExt);
System.out.println(sql);
try (PreparedStatement preparedStatement = connection.prepareStatement(sql.toString())) {
preparedStatement.execute();
}
}

//写入状态进行广播
BroadcastState<String, TableProcess> broadcastState = context.getBroadcastState(mapStateDescriptor);
broadcastState.put(sourceTable + ":" + operateType, tableProcess);
}
});

//处理 DIM 侧输出流 存入 HBase
dwdDs.getSideOutput(dimTag).addSink(new RichSinkFunction<JSONObject>() {

private Connection connection;

@Override
public void open(Configuration parameters) throws Exception {
Class.forName("org.apache.phoenix.jdbc.PhoenixDriver");
connection = DriverManager.getConnection(TmallConfig.PHOENIX_SERVER);
}

@Override
public void close() throws Exception {
connection.close();
}

@Override
public void invoke(JSONObject value, Context context) throws Exception {
JSONObject data = value.getJSONObject("data");
StringBuilder sql = new StringBuilder();
//目标表名
String sinkTable = value.getString("sink_table");
sql.append("upsert into ").append(TmallConfig.HBASE_SCHEMA).append(".").append(sinkTable).append(" (")
.append(StringUtils.join(data.keySet(), ","))
.append(") ")
.append("values( '")
.append(StringUtils.join(data.values(), "','"))
.append("' ) ");
//入库
try (PreparedStatement preparedStatement = connection.prepareStatement(sql.toString())) {
preparedStatement.execute();
// 默认不自动提交,需要手动提交
connection.commit();
}
//删除缓存
String type = value.getString("type");
if ("update".equals(type) || "delete".equals(type)) {
String sinkPk = value.getString("sink_pk");
DimUtil.delDimCache(sinkTable, data.getString(sinkPk));
}
}
});

// 处理 DWD 主流数据,存入 Kafka
dwdDs.addSink(KafkaUtil.ofSink(new KafkaSerializationSchema<JSONObject>() {
@Override
public ProducerRecord<byte[], byte[]> serialize(JSONObject jsonObject, @Nullable Long aLong) {
return new ProducerRecord<>(jsonObject.getString("sink_table"), jsonObject.getJSONObject("data").toJSONString().getBytes(StandardCharsets.UTF_8));
}
}));

//执行
env.execute("db_ods_to_dwd");
}
}
13. 处理日志数据
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
java复制代码public class BaseLogApp {

/**
* 所以日志数据
*/
private static final String TOPIC_BASE = "ods_base_log";
private static final String BASE_GROUP_ID = "ods_dwd_base_log_app";

/**
* 启动日志数据
*/
private static final String TOPIC_START = "dwd_start_log";
/**
* 页面日志数据
*/
private static final String TOPIC_PAGE = "dwd_page_log";
/**
* 曝光日志数据
*/
private static final String TOPIC_DISPLAY = "dwd_display_log";

public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(4);

//省略检查点

//拿到数据流
DataStreamSource<String> dataStreamSource = env.addSource(KafkaUtil.ofSource(TOPIC_BASE, BASE_GROUP_ID));

// 处理新用户字段,防止前端数据错误
SingleOutputStreamOperator<JSONObject> midWithNewFlagDS = dataStreamSource
.map(JSON::parseObject)
.keyBy(j -> j.getJSONObject("common").getString("mid"))
.map(new RichMapFunction<JSONObject, JSONObject>() {

private ValueState<String> newMidDateState;
private SimpleDateFormat yyyyMMdd;

@Override
public void open(Configuration parameters) throws Exception {
newMidDateState = getRuntimeContext().getState(new ValueStateDescriptor<String>("newMidDateState", String.class));
yyyyMMdd = new SimpleDateFormat("yyyyMMdd");
}

@Override
public JSONObject map(JSONObject jsonObject) throws Exception {
String isNew = jsonObject.getJSONObject("common").getString("is_new");
//1判断是不是新用户,是新用户,修复
if ("1".equals(isNew)) {
String newMidDate = newMidDateState.value();
String ts = yyyyMMdd.format(new Date(jsonObject.getLong("ts")));
if (StringUtils.isEmpty(newMidDate)) {
newMidDateState.update(ts);
} else {
if (!newMidDate.equals(ts)) {
jsonObject.getJSONObject("common").put("is_new", "0");
}
}
}
return jsonObject;
}
});

//启动日志侧输出流
OutputTag<String> startTag = new OutputTag<String>("startTag") {
};
//曝光日志侧输出流
OutputTag<String> displayTag = new OutputTag<String>("displayTag") {
};

// 判断 不同的日志类型 输出到各流
SingleOutputStreamOperator<String> pageDStream = midWithNewFlagDS
.process(new ProcessFunction<JSONObject, String>() {
@Override
public void processElement(JSONObject jsonObject, Context context, Collector<String> collector) throws Exception {
JSONObject start = jsonObject.getJSONObject("start");
if (start != null) {
context.output(startTag, jsonObject.toJSONString());
} else {
collector.collect(jsonObject.toJSONString());
JSONArray displays = jsonObject.getJSONArray("displays");
if (!CollectionUtil.isNullOrEmpty(displays)) {
for (int i = 0; i < displays.size(); i++) {
JSONObject displaysJsonObject = displays.getJSONObject(i);
String pageId = jsonObject.getJSONObject("page").getString("page_id");
displaysJsonObject.put("page_id", pageId);
context.output(displayTag, displaysJsonObject.toJSONString());
}
}
}
}
});

//写入 kafka
pageDStream.addSink(KafkaUtil.ofSink(TOPIC_PAGE));
pageDStream.getSideOutput(startTag).addSink(KafkaUtil.ofSink(TOPIC_START));
pageDStream.getSideOutput(displayTag).addSink(KafkaUtil.ofSink(TOPIC_DISPLAY));

env.execute("log_ods_to_dwd");
}

}

下期预告:DWM层

关注专栏持续更新 👇🏻👇🏻👇🏻👇🏻👇🏻👇🏻👇🏻👇🏻

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

0%