觉得不错请按下图操作,掘友们,哈哈哈!!!
一:Canal 官网介绍
canal :译意为水道/管道/沟渠,主要用途是基于 MySQL 数据库增量日志解析,提供增量数据订阅和消费
早期阿里巴巴因为杭州和美国双机房部署,存在跨机房同步的业务需求,实现方式主要是基于业务 trigger 获取增量变更。从 2010 年开始,业务逐步尝试数据库日志解析获取增量变更进行同步,由此衍生出了大量的数据库增量订阅和消费业务。
基于日志增量订阅和消费的业务包括
- 数据库镜像
- 数据库实时备份
- 索引构建和实时维护(拆分异构索引、倒排索引等)
- 业务 cache 刷新
- 带业务逻辑的增量数据处理
当前的 canal 支持源端 MySQL 版本包括 5.1.x , 5.5.x , 5.6.x , 5.7.x , 8.0.x
二:工作原理
使用Canal 首先我们要了解MySql主从复制的工作原理。
2.1 MySQL主从复制原理
- MySQL master 将数据变更写入二进制日志( binary log, 其中记录叫做二进制日志事件binary log events,可以通过 show binlog events 进行查看)
- MySQL slave 将 master 的 binary log events 拷贝到它的中继日志(relay log)
- MySQL slave 重放 relay log 中事件,将数据变更反映它自己的数据
2.2 canal 工作原理
- canal 模拟 MySQL slave 的交互协议,伪装自己为 MySQL slave ,向 MySQL master 发送dump 协议
- MySQL master 收到 dump 请求,开始推送 binary log 给 slave (即 canal )
- canal 解析 binary log 对象(原始为 byte 流)
三:canal 安装使用
3.1 canal 简介
去官网下载页面进行下载:github.com/alibaba/can…
我这里下载的是1.1.6的版本:
canal 对应包的下载和装置的教程,都间接看 canal官网github,安装包目前有三兄弟:
- canal deployer:又称 canal server,是真正监听 mysql 日志的服务端。
- canal adapter:顾名思义“适配器”,搭配 canal server,目前能实现mysql 数据到 hbase、rdb、es的增量同步,妥妥的 ETL 工具。
- canal admin:也是为 canal server 服务的,为canal提供整体配置管理、节点运维等面向运维的性能,提供绝对敌对的WebUI操作界面。如果 canal server 要搭建集群环境,必少不了 canal admin 这样业余的运维工具。
3.2 mysql 相关配置
1. MySql 相关配置
- 安装canal前我们先开启MySql的 binlog,在MySQL配置文件my.cnf设置如下信息:
1 | ini复制代码[mysqld] |
- 改了配置文件之后,重启MySQL,使用命令查看是否打开binlog模式:
查看binlog日志文件列表:
查看当前正在写入的binlog文件:
MySQL服务器这边配置就完成。
- 在mysql中给canal单独建一个用户,给全库全表的读,拷贝,复制的权限
1 | sql复制代码-- 使用命令登录: |
3.3 安装canal
3.3.1 canal.deployer
解压canal.deployer-1.1.6.tar.gz,我们可以看到里面有四个文件夹:
接着打开配置文件conf/example/instance.properties,配置信息如下:
1 | ini复制代码## mysql serverId , v1.0.26+ will autoGen |
我这里用的是家里的win10系统,公司的mac没遇到这个问题,所以在bin目录下找到startup.bat启动:
但是启动就报错,要踩坑了???
后来修改一下启动的脚本startup.bat解决了:
然后再启动脚本:
这就启动成功了。
3.3.2 canal adapter
作用:
- 对接上游消息,包括kafka、rocketmq、canal-server
- 实现mysql数据的增量同步
- 实现mysql数据的全量同步
- 下游写入支持mysql、es、hbase等
它既然是适配器,那么就得介绍“源头”和“指标”这两个部位数据的对接:
- 源头:
- (1)canal adapter 能够直连 canal server ,生产 instance的数据;
- (2)也能够在让 canal server 将数据投递到 MQ,而后 cancal adapter 生产 MQ 中的数据。
- 指标:对接上游消息,包括kafka、rocketmq、canal-server 实现mysql数据的增量同步 实现mysql数据的全量同步 下游写入支持mysql、es、hbase
目前 adapter
是支持动态配置的,也就是说修改配置文件后无需重启,任务会自动刷新配置!
(1) 修改application.yml
执行 vim conf/application.yml
修改consumerProperties、srcDataSources、canalAdapters的配置
1 | yaml复制代码canal.conf: |
- 一份数据可以被多个group同时消费, 多个group之间会是一个并行执行, 一个group内部是一个串行执行多个outerAdapters, 比如例子中logger和hbase
- 目前client adapter数据订阅的方式支持两种,直连canal server 或者 订阅kafka/RocketMQ的消息
(2) conf/es7目录下新增映射配置文件
adapter将会自动加载
conf/es7
下的所有.yml
结尾的配置文件
新增表映射的配置文件,如 sys_user.yml
内容如下:
1 | yaml复制代码dataSourceKey: defaultDS |
- dataSourceKey 配置
application.yml
里srcDataSources
的值 - destination 配置
canal.deployer
的Instance
名 - groupId 配置
application.yml
里canalAdapters.groups
的值 - _index 配置索引名
- _id 配置主键对应的字段
- upsert 是否更新
- sql 映射sql
- etlCondition etl 的条件参数,全量同步时可以使用
- commitBatch 提交批大小
sql映射支持多表关联自由组合, 但是有一定的限制:
- 主表不能为子查询语句
- 只能使用left outer join即最左表一定要是主表
- 关联从表如果是子查询不能有多张表
- 主sql中不能有where查询条件(从表子查询中可以有where条件但是不推荐, 可能会造成数据同步的不一致, 比如修改了where条件中的字段内容)
- 关联条件只允许主外键的’=’操作不能出现其他常量判断比如: on a.role_id=b.id and b.statues=1
- 关联条件必须要有一个字段出现在主查询语句中比如: on a.role_id=b.id 其中的 a.role_id 或者 b.id 必须出现在主select语句中
Elastic Search的mapping 属性与sql的查询值将一一对应(不支持 select *), 比如: select a.id as _id, a.name, a.email as _email from user, 其中name将映射到es mapping的name field, _email将 映射到mapping的_email field, 这里以别名(如果有别名)作为最终的映射字段. 这里的_id可以填写到配置文件的 _id: _id映射
四: 项目实战
4.1 项目使用场景
在现有演出业务中,我们有这样一个场景,管理员在后台管理系统中可以对先有演出项目,场次,售卖时间,票种等信息进行修改,比如原先我有一个演出项目 “周杰伦演唱会”,这时候用户在C端就可以看到名称就是“周杰伦演唱会”,但是我觉的这个项目的名字不够大气,这时候我在后台改了叫 “周杰伦嘉年华演唱会”,在后台更改后首先项目不会立即生效,而是要通过一个发布的过程C端才能看到更改后的。
那么由于C端查询量巨,特别是在这种大型演唱会的时候,面对这么大的流量我们怎么去设计呢???
所以这就是我们使用的场景。
4.2 设计对比
- 流程上涉及的模型直接添加Redis 缓存,在业务侧组装查询逻辑
- 这是我们原先的使用方案,但是对Redis 依赖很大,而且在高峰期的时候出现过缓存击穿的问题
- 由于查询逻辑复杂,业务侧能实现查询业务的组装,但是在查询高峰期内存消耗极高,效率也不是很高。
- 数据库数据更改后在业务层再去更新ES,ES提供对外的查询能力
- 这就是上边方案的变种,ES有天然的查询优势,并且能应对复杂的查询条件,所以在上边很多业务组装的复杂查询此时都可以用ES来实现。但是数据同步ES的地方就比较分散,每个更改表的地方都要考虑,如果同步的表比较多就很难维护了。
- 使用Canal 监听数据库binary log,同步到Kafka,专门的服务消费kafka消息,同步ES
- 第三种方案,需要进入中间件,canal和kafka,但是这个方案只会在服务搭建的时候比较复杂一些,属于一劳永逸那种,canal 我们监听数据库 binary log ,然后将 binary log 同步到kafka中,新增消费服务读取kafka 中 binary log,处理ES逻辑。这样业务方可以提各种各样的查询需求,我们只需要在查询服务中利用ES进行组装即可。
4.3 应用:
在我们的项目中,数据流向包含:腾讯云订阅服务 -> canal adapter -> es
。
只使用canal的客户端功能也就是canal adapter,因为我们我们数据库是使用的腾讯云Mql,腾讯云提供了数据订阅功能。
在这里说明下,实际上腾讯云,阿里云都提供了这样的服务,他们对数据库指定的表监听binglog,然后写入到kakaka中,比如上边的截图。
4.4 问题以及解决方案
问题1:监听多张表怎么保证数据消息不乱?
就像这个截图,我们可以按照表名进行分区,也就是说,同一张表数据的变更只会在一个partion上,这样表变更的消息在一个partion 上就变得有序了。
问题2:如果数据库中数据和ES中的数据对不上怎么处理?
目前我们是提供一个job,每十分钟跑一次去对比数据库中有效数据的数量和ES中的数据数量,但是Job很慢,可以考虑,从数据库捞数据,分批跑(shard的方式,先取比如订单号是1的,后续一直累加到9),如果有多台机器就可以分布式跑,每台下放shardIndex ,数据库sql就捞 orderNum%10 = shardIndex的数据,这样也不会跑重,处理的时候再用多线程。
问题3: 怎么提高处理速度以及效率
在摸索中我们采用了一种方法:就是对消息批量处理,我们从消息中心拿消息的时候批量去拿,举个例子吧,goods 表对应的消息现在有1000条,orderId =100 的数据在占了100个,分布在不同的offset中,如果我每次都是拿一条消息去消费一次这样重复的逻辑我要跑100次,但是们可以批量拿200条消息,然后再对消息中的OrderId进行分组,得到最终要执行的OrderId这样是不是就可以少跑很多次。拿到要执行的OrderId后我们就可以找到对应的Sql得到最后的Select结果(这个地方有个点,我不关心你是什么操作【DML】,我只要操作的数据ID,然后执行我的Select sql 去查询最后的结果,然后再去同步到ES中),处理的过程中我们也可以使用多线程来加快处理速度,这都是一些优化的点。
设计模式:
JYM 设计模式系列- 单例模式,适配器模式,让你的代码更优雅!!!
JYM 设计模式系列- 责任链模式,装饰模式,让你的代码更优雅!!!
JYM 设计模式系列- 策略模式,模板方法模式,让你的代码更优雅!!!
Spring相关:
Spring源码解析-老生常谈Bean ⽣命周期 ,但这个你值得看!!!
Spring 源码解析-JYM你值得拥有,从源码角度看bean的循环依赖!!!
本文正在参加「金石计划」
本文转载自: 掘金