8月份 FlinkCDC 发布2.0.0版本,相较于1.0版本,在全量读取阶段支持分布式读取、支持checkpoint,且在全量 + 增量读取的过程在不锁表的情况下保障数据一致性。 详细介绍参考 Flink CDC 2.0 正式发布,详解核心改进。
Flink CDC2.0 数据读取逻辑并不复杂,复杂的是 FLIP-27: Refactor Source Interface 的设计及对Debezium Api的不了解。本文重点对 Flink CDC 的处理逻辑进行介绍, FLIP-27 的设计及 Debezium 的API调用不做过多讲解。
本文先以Flink SQL 案例来介绍Flink CDC2.0的使用,接着介绍CDC中的核心设计包含切片划分、切分读取、增量读取,最后对数据处理过程中涉及flink-mysql-cdc 接口的调用及实现进行代码讲解。
案例
全量读取+增量读取 Mysql表数据,以changelog-json
格式写入kafka,观察 RowKind 类型及影响的数据条数。
1 | swift复制代码public static void main(String[] args) { |
全量数据输出:
1 | bash复制代码{"data":{"order_id":1010,"order_date":"2021-09-17","order_time":"2021-09-22 10:52:12.189","quantity":53,"product_id":502,"purchaser":"flink"},"op":"+I"} |
核心设计
切片划分
全量阶段数据读取方式为分布式读取,会先对当前表数据按主键划分成多个Chunk,后续子任务读取Chunk 区间内的数据。根据主键列是否为自增整数类型,对表数据划分为均匀分布的Chunk及非均匀分布的Chunk。
均匀分布
主键列自增且类型为整数类型(int,bigint,decimal)。查询出主键列的最小值,最大值,按 chunkSize 大小将数据均匀划分,因为主键为整数类型,根据当前chunk 起始位置、chunkSize大小,直接计算chunk 的结束位置。
1 | sql复制代码// 计算主键列数据区间 |
非均匀分布
主键列非自增或者类型为非整数类型。主键为非数值类型,每次划分需要对未划分的数据按主键进行升序排列,取出前 chunkSize 的最大值为当前 chunk 的结束位置。
1 | sql复制代码// 未拆分的数据排序后,取 chunkSize 条数据取最大值,作为切片的终止位置。 |
全量切片数据读取
Flink 将表数据划分为多个Chunk,子任务在不加锁的情况下,并行读取 Chunk数据。因为全程无锁在数据分片读取过程中,可能有其他事务对切片范围内的数据进行修改,此时无法保证数据一致性。因此,在全量阶段Flink 使用快照记录读取+Binlog数据修正的方式来保证数据的一致性。
快照读取
通过JDBC执行SQL查询切片范围的数据记录。
1 | ini复制代码## 快照记录数据读取SQL |
数据修正
在快照读取操作前、后执行 SHOW MASTER STATUS
查询binlog文件的当前偏移量,在快照读取完毕后,查询区间内的binlog数据并对读取的快照记录进行修正。
快照读取+Binlog数据读取时的数据组织结构。
BinlogEvents 修正 SnapshotEvents 规则。
- 未读取到binlog数据,即在执行select阶段没有其他事务进行操作,直接下发所有快照记录。
- 读取到binlog数据,且变更的数据记录不属于当前切片,下发快照记录。
- 读取到binlog数据,且数据记录的变更属于当前切片。delete 操作从快照内存中移除该数据,insert 操作向快照内存添加新的数据,update操作向快照内存中添加变更记录,最终会输出更新前后的两条记录到下游。
修正后的数据组织结构:
以读取切片 [1,11)范围的数据为例,描述切片数据的处理过程。c,d,u代表Debezium捕获到的新增、删除、更新操作。
修正前数据及结构:
修正后数据及结构:
单个切片数据处理完毕后会向 SplitEnumerator 发送已完成切片数据的起始位置(ChunkStart, ChunkStartEnd)、Binlog的最大偏移量(High watermark),用来为增量读取指定起始偏移量。
增量切片数据读取
全量阶段切片数据读取完成后,SplitEnumerator 会下发一个 BinlogSplit 进行增量数据读取。BinlogSplit读取最重要的属性就是起始偏移量,偏移量如果设置过小下游可能会有重复数据,偏移量如果设置过大下游可能是已超期的脏数据。而 Flink CDC增量读取的起始偏移量为所有已完成的全量切片最小的Binlog偏移量,只有满足条件的数据才被下发到下游。数据下发条件:
- 捕获的Binlog数据的偏移量 > 数据所属分片的Binlog的最大偏移量。
例如,SplitEnumerator 保留的已完成切片信息为。
切片索引
Chunk 数据范围
切片读取的最大Binlog
0
[1,100]
1000
1
[101,200]
800
2
[201,300]
1500
增量读取时,从偏移量 800 开始读取Binlog数据 ,当捕获到数据 <data:123, offset:1500> 时,先找到 123 所属快照分片,并找到对应的最大Binlog 偏移量 800。 当前偏移量大于快照读的最大偏移量,则下发数据,否则直接丢弃。
代码详解
关于 FLIP-27: Refactor Source Interface 设计不做详细介绍,本文侧重对 flink-mysql-cdc 接口调用及实现进行讲解。
MySqlSourceEnumerator 初始化
SourceCoordinator作为OperatorCoordinator对Source的实现,运行在Master节点,在启动时通过调用MySqlParallelSource#createEnumerator 创建 MySqlSourceEnumerator 并调用start方法,做一些初始化工作。
- 创建 MySqlSourceEnumerator,使用 MySqlHybridSplitAssigner 对全量+增量数据进行切片,使用 MySqlValidator 对 mysql 版本、配置进行校验。
- MySqlValidator 校验:
1. mysql版本必须大于等于5.7。
2. binlog\_format 配置必须为 ROW。
3. binlog\_row\_image 配置必须为 FULL。
- MySqlSplitAssigner 初始化:
1. 创建 ChunkSplitter用来划分切片。
2. 筛选出要读的表名称。
- 启动周期调度线程,要求 SourceReader 向 SourceEnumerator 发送已完成但未发送ACK事件的切片信息。
1 | csharp复制代码private void syncWithReaders(int[] subtaskIds, Throwable t) { |
MySqlSourceReader 初始化
SourceOperator 集成了SourceReader,通过OperatorEventGateway 和 SourceCoordinator 进行交互。
- SourceOperator 在初始化时,通过 MySqlParallelSource 创建 MySqlSourceReader。MySqlSourceReader 通过 SingleThreadFetcherManager 创建Fetcher拉取分片数据,数据以 MySqlRecords 格式写入到 elementsQueue。
1 | perl复制代码MySqlParallelSource#createReader |
- 将创建的 MySqlSourceReader 以事件的形式传递给 SourceCoordinator 进行注册。SourceCoordinator 接收到注册事件后,将reader 地址及索引进行保存。
SourceCoordinator#handleReaderRegistrationEvent
// note: SourceCoordinator 处理Reader 注册事件
private void handleReaderRegistrationEvent(ReaderRegistrationEvent event) {
context.registerSourceReader(new ReaderInfo(event.subtaskId(), event.location()));
enumerator.addReader(event.subtaskId());
}
3. MySqlSourceReader 启动后会向 MySqlSourceEnumerator 发送请求分片事件,从而收集分配的切片数据。
4. SourceOperator 初始化完毕后,调用 emitNext 由 SourceReaderBase 从 elementsQueue 获取数据集合并下发给 MySqlRecordEmitter。接口调用示意图:!
MySqlSourceEnumerator 处理分片请求
MySqlSourceReader 启动时会向 MySqlSourceEnumerator 发送请求 RequestSplitEvent 事件,根据返回的切片范围读取区间数据。MySqlSourceEnumerator 全量读取阶段分片请求处理逻辑,最终返回一个MySqlSnapshotSplit。
- 处理切片请求事件,为请求的Reader分配切片,通过发送AddSplitEvent时间传递MySqlSplit(全量阶段MySqlSnapshotSplit、增量阶段MySqlBinlogSplit)。
1 | scss复制代码MySqlSourceEnumerator#handleSplitRequest |
- MySqlHybridSplitAssigner 处理全量切片、增量切片的逻辑。
1. 任务刚启动时,remainingTables不为空,noMoreSplits返回值为false,创建 SnapshotSplit。
2. 全量阶段分片读取完成后,noMoreSplits返回值为true, 创建 BinlogSplit。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
kotlin复制代码MySqlHybridSplitAssigner#getNext
@Override
public Optional<MySqlSplit> getNext() {
if (snapshotSplitAssigner.noMoreSplits()) {
// binlog split assigning
if (isBinlogSplitAssigned) {
// no more splits for the assigner
return Optional.empty();
} else if (snapshotSplitAssigner.isFinished()) {
// we need to wait snapshot-assigner to be finished before
// assigning the binlog split. Otherwise, records emitted from binlog split
// might be out-of-order in terms of same primary key with snapshot splits.
isBinlogSplitAssigned = true;
//note: snapshot split 切片完成后,创建BinlogSplit。
return Optional.of(createBinlogSplit());
} else {
// binlog split is not ready by now
return Optional.empty();
}
} else {
// note: 由MySqlSnapshotSplitAssigner 创建 SnapshotSplit
// snapshot assigner still have remaining splits, assign split from it
return snapshotSplitAssigner.getNext();
}
}
- MySqlSnapshotSplitAssigner 处理全量切片逻辑,通过 ChunkSplitter 生成切片,并存储到Iterator中。
1 | scss复制代码@Override |
4.ChunkSplitter 将表划分为均匀分布 or 不均匀分布切片的逻辑。读取的表必须包含物理主键。
1 | scss复制代码public Collection<MySqlSnapshotSplit> generateSplits(TableId tableId) { |
- splitTableIntoChunks 根据物理主键划分切片。
1 | scss复制代码private List<ChunkRange> splitTableIntoChunks(TableId tableId, Column splitColumn) |
MySqlSourceReader 处理切片分配请求
MySqlSourceReader接收到切片分配请求后,会为先创建一个 SplitFetcher线程,向 taskQueue 添加、执行AddSplitsTask 任务用来处理添加分片任务,接着执行 FetchTask 使用Debezium API进行读取数据,读取的数据存储到 elementsQueue** **中,SourceReaderBase 会从该队列中获取数据,并下发给 MySqlRecordEmitter。
- 处理切片分配事件时,创建SplitFetcher向taskQueue添加AddSplitsTask。
1 | scss复制代码SingleThreadFetcherManager#addSplits |
- 执行 SplitFetcher线程,首次执行 AddSplitsTask 线程添加分片,以后执行 FetchTask 线程拉取数据。
1 | scss复制代码SplitFetcher#runOnce |
- AddSplitsTask 调用 MySqlSplitReader 的 handleSplitsChanges方法,向切片队列中添加已分配的切片信息。在下一次fetch()调用时,从队列中获取切片并读取切片数据。
1 | typescript复制代码AddSplitsTask#run |
- MySqlSplitReader 执行fetch(),由DebeziumReader读取数据到事件队列,在对数据修正后以MySqlRecords格式返回。
1 | scss复制代码MySqlSplitReader#fetch |
DebeziumReader 数据处理
DebeziumReader 包含全量切片读取、增量切片读取两个阶段,数据读取后存储到 ChangeEventQueue,执行pollSplitRecords 时对数据进行修正。
- SnapshotSplitReader 全量切片读取。全量阶段的数据读取通过执行Select语句查询出切片范围内的表数据,在写入队列前后执行 SHOW MASTER STATUS 时,写入当前偏移量。
1 | scss复制代码public void submitSplit(MySqlSplit mySqlSplit) { |
- SnapshotSplitReader 增量切片读取。增量阶段切片读取重点是判断BinlogSplitReadTask什么时候停止,在读取到分片阶段的结束时的偏移量即终止。
1 | scss复制代码MySqlBinlogSplitReadTask#handleEvent |
- SnapshotSplitReader 执行pollSplitRecords 时对队列中的原始数据进行修正。 具体处理逻辑查看 RecordUtils#normalizedSplitRecords。
1 | csharp复制代码public Iterator<SourceRecord> pollSplitRecords() throws InterruptedException { |
- BinlogSplitReader 数据读取。读取逻辑比较简单,重点是起始偏移量的设置,起始偏移量为所有切片的HW。
- BinlogSplitReader 执行pollSplitRecords 时对队列中的原始数据进行修正,保障数据一致性。 增量阶段的Binlog读取是无界的,数据会全部下发到事件队列,BinlogSplitReader 通过shouldEmit()判断数据是否下发。
1 | scss复制代码BinlogSplitReader#pollSplitRecords |
事件下发条件:1. 新收到的event post 大于 maxwm 2. 当前 data值所属某个snapshot spilt & 偏移量大于 HWM,下发数据。
1 | scss复制代码/** |
MySqlRecordEmitter 数据下发
SourceReaderBase 从队列中获取切片读取的DataChangeEvent数据集合,将数据类型由Debezium的DataChangeEvent 转换为Flink 的RowData类型。
- SourceReaderBase 处理切片数据流程。
1 | kotlin复制代码org.apache.flink.connector.base.source.reader.SourceReaderBase#pollNext |
- MySqlRecords 返回单条数据集合。
1 | csharp复制代码com.ververica.cdc.connectors.mysql.source.split.MySqlRecords#nextRecordFromSplit |
- MySqlRecordEmitter 通过 RowDataDebeziumDeserializeSchema 将数据转换为Rowdata。
1 | scss复制代码com.ververica.cdc.connectors.mysql.source.reader.MySqlRecordEmitter#emitRecord |
RowDataDebeziumDeserializeSchema 序列化过程。
1 | ini复制代码com.ververica.cdc.debezium.table.RowDataDebeziumDeserializeSchema#deserialize |
MySqlSourceReader 汇报切片读取完成事件
MySqlSourceReader处理完一个全量切片后,会向MySqlSourceEnumerator发送已完成的切片信息,包含切片ID、HighWatermar ,然后继续发送切片请求。
1 | scss复制代码com.ververica.cdc.connectors.mysql.source.reader.MySqlSourceReader#onSplitFinished |
MySqlSourceEnumerator 分配增量切片
全量阶段所有分片读取完毕后,MySqlHybridSplitAssigner 会创建BinlogSplit 进行后续增量读取,在创建BinlogSplit 会从全部已完成的全量切片中筛选最小BinlogOffset。注意:2.0.0分支 createBinlogSplit 最小偏移量总是从0开始,最新master分支已经修复这个BUG.
1 | ini复制代码private MySqlBinlogSplit createBinlogSplit() { |
本文转载自: 掘金