作者介绍
蒋鹏程,苏州万店掌软件技术有限公司
前言
CloudCanal 近期提供了自定义代码构建宽表能力,我们第一时间参与了该特性内测,并已落地生产稳定运行。开发流程详见官方文档 《CloudCanal自定义代码实时加工》。
能力特点包括:
- 灵活,支持反查打宽表,特定逻辑数据清洗,对账,告警等场景
- 调试方便,通过任务参数配置自动打开 debug 端口,对接 IDE 调试
- SDK 接口清晰,提供丰富的上下文信息,方便数据逻辑开发
本文基于我们业务中的实际需求(MySQL -> ElasticSearch 宽表构建),梳理一下具体的开发调试流程,希望对大家有所帮助。
使用案例
案例一:商品表和SKU宽表行构建
业务背景
在对接用户的小程序进行商品搜索时,需要如下几个能力
- 基于分词的全文索引
- 同时搜索不同表中的字段
需要全文索引的初衷是希望用户搜索商品的关键词就可以搜索到想要的商品。这在传统数据库中一般支持的都比较弱甚至不支持,因此需要借助 ES 分词器搜索。
而第二个能力主要是由于业务数据通常分布在多个表中,但是 ES 并不能像需要关系型数据库那样联表查询,CloudCanal 自定义代码的能力则整号解决了我们多表关联的痛点。
业务流程
在使用 CloudCanal 总体的流程变得十分清晰,在 CloudCanal 层面通过订阅表结合自定义代码中的反查数据库以及数据处理,可以直接生成可以写到对端 ES 的宽表行。
表结构
准备的 mysql 表结构如下,一个商品会对应多个 SKU,我们在对端创建好索引,其中的 sku_detail
保存一个商品关联的 SKU 信息,是一个典型的一对多场景。
ES mapping
中的字段对应主表 tb_enterprise_goods
中字段,额外新增的 sku_detail
字段就是我们需要从子表 tb_enterprise_sku
中同步的数据。
1 | sql复制代码## 商品表 |
1 | sql复制代码## SKU表 |
ES 索引如下:
1 | json复制代码 "enterprise_id": { |
注:为了方便大家理解,此处表字段进行了缩减
自定义代码工作流程
自定义代码源码
1 | ini复制代码public List<CustomRecord> addData(CustomRecord customRecord, DataSource dataSource) { |
思路
customRecord
对象即自定义代码传入的参数,传入的 id
为子表 tb_enterprise_sku
的外键 enterprise_goods_id
,查询出子表关于这个外键的所有数据,放入 addFieldValueMap
中,再利用源码提供的方法RecordBuilder.modifyRecordBuilder(customRecord).addField(addFieldValueMap)
,对 customRecord
进行加工。
创建任务步骤
新建源端对端数据源
选择订阅表及同步到对端的索引
选择同步字段,选择自定义包
完成创建任务
实现效果
1 | json复制代码{ |
案例二:订单表、商品表宽表构建
业务背景
小程序商城中需要展示猜你喜欢的商品,对猜你喜欢商品是根据用户购买商品的频率来决定,主要涉及订单表,订单商品表,用户表,商品表等,使用ES 查询同样面临多表无法 join
的问题,本案例中依然采用 CloudCanal 自定义代码同步为扁平化数据。
业务原使用技术及问题
同步 ES 的方案原先使用 logstash
的方式全量同步数据,由于数据量的问题,同步数据放在每日的凌晨,带来的问题为,数据同步不及时,并且只能是全量风险比较高。多次出现删除索引数据后并没有同步的情况。
表结构
1 | less复制代码CREATE TABLE `tb_order` ( |
ES 索引字段
1 | json复制代码"store_id":{ |
注:ES表结构中涉及多张表,为了方便举例,这边只贴出2张表。es_doc展示纬度为订单商品纬度。
实现流程
订阅订单表
订阅字段
画出横线的即为需要同步的字段,有一个点需要特别注意:ES 中需要展示的字段一定要勾上同步,不勾上的话在自定义代码中 add
后 也不会被同步 官方给出的解释为字段黑白名单。
这里有几个细节点,订阅的表的维度并非 ES 存储数据的维度,所以这边的 id
并不是 ES 的 _id
,对于这种需要在源端同步必须传的字段,设置对端字段可以随意设置一个对端已有的字段,在自定义代码中可以灵活的去重新配置需要同步的字段。(如果设置默认,ES 的 index
会创建出这个字段,这显然不是我们想要看到的效果)
业务流程
代码实现
查询扁平化数据
1 | vbnet复制代码SELECT |
思路:自定义代码获取 order
表的主键后,查询上面的 SQL,先将原 customRecord
中数据删除,再以查询出的结果维度新增数据。修改的逻辑亦如此。
1 | ini复制代码public List<CustomRecord> addData(CustomRecord customRecord, DataSource dataSource) { |
实现效果
1 | json复制代码 { |
写在最后
CloudCanal 的自定义代码很好地解决了我们多表关联同步 ES 的问题,简洁易用的界面和有深度的功能都令人印象深刻,期待 CloudCanal 更多新能力。关于 CloudCanal 自定义代码的能力,也欢迎大家与我交流。
参与内测
CloudCanal 会不断提供一些预览的能力,包括新数据链路, 优化能力,功能插件。本文所描述的自定义代码能力目前也处于内测阶段。如需体验,可添加我们小助手(微信号:suhuayue001)进行了解和试用。
加入CloudCanal粉丝群掌握一手消息和获取更多福利,请添加我们小助手微信:suhuayue001
CloudCanal-免费好用的企业级数据同步工具,欢迎品鉴。
了解更多产品可以查看官方网站: www.clougence.com
CloudCanal社区:www.askcug.com/
本文转载自: 掘金