Canal Client 代码入门

首先分享之前的所有文章 , 欢迎点赞收藏转发三连下次一定 >>>> 😜😜😜

文章合集 : 🎁 juejin.cn/post/694164…

Github : 👉 github.com/black-ant

CASE 备份 : 👉 gitee.com/antblack/ca…

一 . 前言

这一篇来开始学习 canal 的源码 , 文章目的 :

  • 了解 canal 项目结构
  • 如何启动 canal 源码
  • canal Client 主流程

canal 主要用于基于MySQL 的增量日志解析 , 它将自己模拟为一个备份库 , 主库会推送 binlog 带 Canal , Canal 解析 binlog , 并且推送到其他的环境

二 . canal 使用流程

Canal 启动包含 个部分 :

  • Canal Git 拉取最新的依赖包 ( canal.deployer)
  • 修改 Canal 配置文件 , 并且启动 Canal Server
  • 自行编写 Canal Client , 完成 binlog 的截取

前期准备

1
2
java复制代码create user 'canal'@'%' identified by 'Canal@123456';
grant SELECT, REPLICATION SLAVE, REPLICATION CLIENT on *.* to 'canal'@'%' identified by 'Canal@123456';

Canal 配置文件 (\conf\example\instance.properties)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
java复制代码// 数据库访问地址
canal.instance.master.address=192.168.158.30:3306
// binlog日志名称 , 可以处理指定的日志
canal.instance.master.journal.name=
// 起始偏移量 , 从指定的位置开始同步
canal.instance.master.position=
//
canal.instance.master.timestamp=
canal.instance.master.gtid=

canal.instance.dbUsername=canal
canal.instance.dbPassword=Canal@19950824
canal.instance.connectionCharset = UTF-8

canal.instance.filter.regex=.*\\..*
canal.instance.filter.black.regex=mysql\\.slave_.*

canal.mq.topic=example
canal.mq.partition=0

Canal 启动环节

Canal 启动只需要调用 \bin\startup.bat 即可 , 其中可能会涉及到如下问题 :

1
2
3
4
5
6
7
8
9
10
11
12
java复制代码//  log 日志
topic 分片日志 : \logs\example
canal 主日志 :


// >>> 问题

// 1. Received error packet: errno = 1236, sqlstate = HY000 errmsg = Misconfigured master - master server_id is 0
> 修改my.cnf , 添加 server_id=1

// 2. can't found begin/commit position before with fixed position mysql-bin.000018:1360
canal.instance.master.journal.name 和 position 配置问题

Canal Client 拦截

Canal 项目中提供了一个案例项目 : github.com/alibaba/can… , 个人加了一些注释 :

1
2
3
4
5
6
java复制代码<!-- Step 1 : 添加依赖关系 -->
<dependency>
<groupId>com.alibaba.otter</groupId>
<artifactId>canal.client</artifactId>
<version>1.1.0</version>
</dependency>
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
java复制代码public static void main(String args[]) {

// 创建链接
// 此处包含三种创建方式 : newSingleConnector / newClusterConnector / newClusterConnector
CanalConnector connector = CanalConnectors.newSingleConnector(
new InetSocketAddress(AddressUtils.getHostIp(),11111), "example", "", "");

int batchSize = 1000;
int emptyCount = 0;
try {

// 打开连接
connector.connect();
// 配置扫描范围
connector.subscribe(".*\..*");
// 回滚到未进行ack的地方,下次fetch的时候,可以从最后一个没有ack的地方开始拿
connector.rollback();

int totalEmptyCount = 120;


while (emptyCount < totalEmptyCount) {
Message message = connector.getWithoutAck(batchSize); // 获取指定数量的数据
long batchId = message.getId();
int size = message.getEntries().size();
if (batchId == -1 || size == 0) {
emptyCount++;
System.out.println("empty count : " + emptyCount);
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
}
} else {
emptyCount = 0;
// 打印SQL语句
// System.out.printf("message[batchId=%s,size=%s] \n", batchId, size);
printEntry(message.getEntries());
}

connector.ack(batchId); // 提交确认
// connector.rollback(batchId); // 处理失败, 回滚数据
}

System.out.println("empty too many times, exit");
} finally {
connector.disconnect();
}
}

private static void printEntry(List<Entry> entrys) {
for (CanalEntry.Entry entry : entrys) {
if (entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONBEGIN || entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONEND) {
//开启/关闭事务的实体类型,跳过
continue;
}
//RowChange对象,包含了一行数据变化的所有特征
//比如isDdl 是否是ddl变更操作 sql 具体的ddl sql beforeColumns afterColumns 变更前后的数据字段等等
RowChange rowChage;
try {
rowChage = RowChange.parseFrom(entry.getStoreValue());
} catch (Exception e) {
throw new RuntimeException("ERROR ## parser of eromanga-event has an error , data:" + entry.toString(), e);
}
//获取操作类型:insert/update/delete 等类型
CanalEntry.EventType eventType = rowChage.getEventType();

//打印Header信息
System.out.println(String.format("================》; binlog[%s:%s] , name[%s,%s] , eventType : %s",
entry.getHeader().getLogfileName(), entry.getHeader().getLogfileOffset(),
entry.getHeader().getSchemaName(), entry.getHeader().getTableName(),
eventType));

//判断是否是DDL语句
if (rowChage.getIsDdl()) {

System.out.println("================》;isDdl: true,sql:" + rowChage.getSql());
}

//获取RowChange对象里的每一行数据,打印出来
for (RowData rowData : rowChage.getRowDatasList()) {
//如果是删除语句
if (eventType == EventType.DELETE) {
printColumn(rowData.getBeforeColumnsList());
//如果是新增语句
} else if (eventType == CanalEntry.EventType.INSERT) {
printColumn(rowData.getAfterColumnsList());
//如果是更新的语句
} else {
//变更前的数据
System.out.println("------->; before");
printColumn(rowData.getBeforeColumnsList());
//变更后的数据
System.out.println("------->; after");
printColumn(rowData.getAfterColumnsList());
}
}
}

}

private static void printColumn(List<Column> columns) {
for (Column column : columns) {
System.out.println(column.getName() + " : " + column.getValue() + " update=" + column.getUpdated());
}
}

三 . canal 项目结构

Canal Client 源码很清晰 , 代码量也不大 , 这里仅大概过一下

3.1 CanalConnector 接口

CanalConnector提供了很多核心的工具方法 (可以看看源码 , 里面写的很详细) , 它有 SimpleCanalConnector 和 ClusterCanalConnector 2个主要的实现类 , 分别用于处理基础功能和集群功能

Canal 支持三种创建连接的方式 :

1
2
3
4
5
6
7
8
java复制代码// 创建单链接的客户端链接
CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress("127.0.0.1", 11111), "example", "", "");

// 创建带cluster模式的客户端链接,自动完成failover切换,服务器列表自动扫描
CanalConnector connector1 = CanalConnectors.newClusterConnector("127.0.0.1:2181", "example", "", "");

// 建带cluster模式的客户端链接,自动完成failover切换
CanalConnector connector2 = CanalConnectors.newClusterConnector(Arrays.asList(new InetSocketAddress(AddressUtils.getHostIp(), 11111)), "example", "", "");

ClusterCanalConnector 重试的原理 (failover切换的方式)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
java复制代码// Step 1 : 构建集合列表
// failover 通过 SimpleNodeAccessStrategy 策略实现切换
private List<SocketAddress> nodes = new ArrayList<SocketAddress>();
private int index = 0;
public SocketAddress nextNode() {
try {
return nodes.get(index);
} finally {
// 此处会重新构建索引值
index = (index + 1) % nodes.size();
}
}

// Step 2 : ClusterCanalConnector 的创建环节
// 可以看到 , 创建的还是 SimpleCanalConnector , 只不过重写了策略方法
currentConnector = new SimpleCanalConnector(null, username, password, destination) {

@Override
public SocketAddress getNextAddress() {
// 在 ClusterCanalConnector 中如果抛出异常了 , 会将 address 置空
return accessStrategy.nextNode();
}

};

// Step 3 : ClusterCanalConnector 访问异常切换
} catch (Exception e) {
logger.warn("failed to connect to:{} after retry {} times", accessStrategy.currentNode(), times);
currentConnector.disconnect();
// 首先会将连接置空 , 下次获取连接时使用下一个节点
currentConnector = null;
times = times + 1;

// 外层有2个循环 ,当超过重试次数后会抛出异常 , 不再参与循环
if (times >= retryTimes) {
throw new CanalClientException(e);
} else {
// fixed issue #55,增加sleep控制,避免重试connect时cpu使用过高
try {
Thread.sleep(retryInterval);
} catch (InterruptedException e1) {
throw new CanalClientException(e1);
}
}
}


// 总结 :
整个结构非常的精妙 , 2个 while 循环 , 一个用于重新创建连接 , 一个用于反复重试

ClusterCanalConnector 的 zk 方式

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
java复制代码核心的处理方式就是通过 ZkClientx 发起远程的调用

// 监听 zk 中节点的变化
String clusterPath = ZookeeperPathUtils.getDestinationClusterRoot(destination);
this.zkClient.subscribeChildChanges(clusterPath, childListener);

dataListener = new IZkDataListener() {

// watch 判断变化后 , 调用初始化操作
public void handleDataChange(String dataPath, Object data) throws Exception {
// runningAddress = new InetSocketAddress(strs[0], Integer.valueOf(strs[1]));
initRunning(data);
}

};

四 . canal 主处理流程

4.1 数据的拉取

从上文已经看过了 , 不管是哪种方式 , 其最终都是建立了 InetSocketAddress , 且端口为 11111

我们来看一下各环节的数据情况 :

Message 属性格式

Message message = connector.getWithoutAck(BATCH_SIZE);

image.png

其中主要包含 binlog 文件信息 , 偏移量 , 操作类型等数据

4.2 SQL 的读取

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
java复制代码
// RowChange对象,包含了一行数据变化的所有特征 , 其中主要包含 如下配置 :
- eventType: CREATE
- isDdl: true
- sql: "CREATE TABLE `test`.`table6` (\r\n `id` int(0) NOT NULL,\r\n `username` varchar(255) NULL,\r\n `time` datetime(0) NULL,\r\n `version` int(255) NULL,\r\n PRIMARY KEY (`id`)\r\n)"
- ddlSchemaName: "test"
RowChange rowChage = RowChange.parseFrom(entry.getStoreValue());


// 读取 EventType
// EventType 是一个枚举 , 里面包含 INSERT , UPDATE , DELETE 等多种状态 , 用于标识 SQL 类型
EventType eventType = rowChage.getEventType()


// 获取字段集合 , 适用于数据操作
// CanalEntry 中包含一个集合用于存放字段值 :private java.util.List<RowData> rowDatas_;
rowData.getBeforeColumnsList()

4.3 不同操作的 SQL 样式

CREATE TABLE

创建语句会直接携带 SQL 过来

image.png

INSERT

插入数据时 , 值只要在 AfterColumn 中

image.png

UPDATE

Update 时会记录修改前数据和修改后数据 , 所以也可以在这里做数据审计 , 避免反复查询

image.png

DELETE

delete 和 insert 正好相反

image.png

五. 扩展

5.1 扩展思路

对于 DDL 语句 , 是不需要进行相关同步的 , 主要同步的就是数据变更的相关操作.

在业务中 , 可以同步到 MySQL , Redis ,ES 等多种介质中 ,

同时Canal 最新版本已经默认支持 MQ 的直接推送 , github.com/alibaba/can… , TODO : 后续看 Canal Server

5.2 同步案例

TODO : 后续有时间会完善一个同步案例

总结

这篇文章主要是入门篇 , 是Canal 学习过程中的一些笔记和梳理 , 第二阶段会进入 Canal Server 学习 , 对 Server 中如何拉取 Binlog , 如何模拟数据库进行深入的学习

同时会对其扩展点进行思考

参考文档

www.cnblogs.com/janes/p/931…

blog.csdn.net/yehongzhi19…

www.cnblogs.com/wangzhisdu/…

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

0%