微服务MySQL分库分表数据到MongoDB同步方案

在现有的架构中做大数据分析,第一步面临的问题就是数据如何从关系型数据库到非关系数据库,网上有很多的解决方案,我们也经过了很多的摸索,经历了三套方案的实践,最终使用了canal。这是我们大数据部门的一个同事张同睿写的文章,分享给大家,如果感兴趣后面可以进一步的介绍。

需求背景

近年来,微服务概念持续火热,网络上针对微服务和单体架构的讨论也是越来越多,面对日益增长的业务需求是,很多公司做技术架构升级时优先选用微服务方式。我所在公司也是选的这个方向来升级技术架构,以支撑更大访问量和更方便的业务扩展。

发现问题

微服务拆分主要分两种方式:拆分业务系统不拆分数据库,拆分业务系统拆分库。如果数据规模小的话大可不必拆分数据库,因为拆分数据看必将面对多维度数据查询,跨进程之间的事务等问题。而我所在公司随着业务发展单数据库实例已经不能满足业务需要,所以选择了拆分业务系统同时拆分数据库的模式,所以也面临着以上的问题。本文主要介绍多维度数据实时查询解决方案。当前系统架构和存储结构如下:

解决思路

  • 要对多数据库数据进行查询,首先就需要将数据库同步到一起以方便查询
  • 为了满足大数据量数据需求,所以优先选择NOSQL数据库做同步库
  • NOSQL数据库基本无法进行关联查询,所以需要将关系数据进行拼接操作,转换成非关系型数据
  • 业务多维度查询需要实时性,所以需要选择NOSQL中实时性相对比较好的数据库:MongoDB

根据以上思路,总结数据整合架构如下图所示:

解决方案

目前网上一些数据同步案例分两种:MQ消息同步和binlog数据读取同步

先说MQ消息同步,该同步方式我所在公司试用过一段时间,发现以下问题:

  • 数据围绕业务进行,对业务关键性数据操作发送MQ消息,对业务系统依赖性比较高
  • 对于数据库中存量数据需要单独处理
  • 对于工具表还需要单独维护同步
  • 每次新增数据表都需要重新添加MQ逻辑

考虑到以上问题,用MQ方式同步数据最优解决办法

使用binlog 数据读取方式目前有一些成熟方案,比如tungsten replicator,但这些同步工具只能实现数据1:1复制,数据复制过程自定义逻辑添加比较麻烦,不支持分库分表数据归集操作。综上所述,最优方案应该是读取后binlog后自行处理后续数据逻辑。目前binlog读取binlog工具中最成熟的方案应该就是alibaba开源的canal了。

canal

canal是阿里巴巴mysql数据库binlog的增量订阅&消费组件 。阿里云DRDS、阿里巴巴TDDL 二级索引、小表复制. 都是基于canal做的,应用广泛。canal原理相对比较简单:

  • canal模拟mysql slave的交互协议,伪装自己为mysql slave,向mysql master发送dump协议
  • mysql master收到dump请求,开始推送binary log给slave(也就是canal)
  • canal解析binary log对象(原始为byte流)

canal介绍: https://github.com/alibaba/canal/wiki

我使用的是canal的HA模式,由zookeeper选举可用实例,每个数据库一个instance,服务端配置如下:

目录:

1
2
3
4
5
6
复制代码`conf`
   database1
       -instance.properties
   database2
       -instance.properties
   canal.properties

instance.properties

1
2
3
4
5
6
7
8
9
10
11
复制代码`canal.instance.mysql.slaveId = 1001`
canal.instance.master.address = X.X.X.X:3306
canal.instance.master.journal.name =
canal.instance.master.position =
canal.instance.master.timestamp =
canal.instance.dbUsername = canal
canal.instance.dbPassword = canal
canal.instance.defaultDatabaseName =
canal.instance.connectionCharset = UTF-8
canal.instance.filter.regex = .*\\..*
canal.instance.filter.black.regex =

canal.properties

1
2
3
4
5
6
7
8
9
10
11
复制代码`canal.id= 1`
canal.ip=X.X.X.X
canal.port= 11111
canal.zkServers=X.X.X.X:2181,X.X.X.X:2181,X.X.X.X:2181
canal.zookeeper.flush.period = 1000
canal.file.data.dir = ${canal.conf.dir}
canal.file.flush.period = 1000
canal.instance.memory.buffer.size = 16384
canal.instance.memory.buffer.memunit = 1024 ...
canal.instance.global.lazy = false
canal.instance.global.spring.xml = classpath:spring/default-instance.xml

部署数据流如下:

tip:*虽然canal同时支持mixed和row类型的binlog日志,但是获取行数据时如果是mixed类型的日志则获取不到表名,所以本方案暂只支持row格式的binlog*

数据同步

创建canal client应用订阅canal读取的binlog数据

1.开启多instance 订阅,订阅多个instance

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
> 复制代码`public void initCanalStart() {`
>    List<String> destinations = canalProperties.getDestination();
>    final List<CanalClient> canalClientList = new ArrayList<>();
>    if (destinations != null && destinations.size() > 0) {
>     `for (String destination : destinations) {`
>            // 基于zookeeper动态获取canal server的地址,建立链接,其中一台server发生crash,可以支持failover
>            CanalConnector connector = CanalConnectors.newClusterConnector(canalProperties.getZkServers(), destination, "", "");
>            CanalClient client = new CanalClient(destination, connector);
>            canalClientList.add(client);
>            client.start();
>        }
>    }
>    Runtime.getRuntime().addShutdownHook(new Thread() {
>        public void run() {
>            try {
>                logger.info("## stop the canal client");
>                for (CanalClient canalClient : canalClientList) {
>                    canalClient.stop();
>                }
>            } catch (Throwable e) {
>                logger.warn("##something goes wrong when stopping canal:", e);
>            } finally {
>                logger.info("## canal client is down.");
>            }
>        }
>    });
> }
>
>

订阅消息处理

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
> 复制代码`private void process() {`
>    int batchSize = 5 * 1024;
>    while (running) {
>        try {
>            MDC.put("destination", destination);
>            connector.connect();
>            connector.subscribe();
>            while (running) {
>                Message message = connector.getWithoutAck(batchSize); // 获取指定数量的数据
>                long batchId = message.getId();
>                int size = message.getEntries().size();
>                if (batchId != -1 && size > 0) {
>                    saveEntry(message.getEntries());
>                }
>             `connector.ack(batchId); // 提交确认`
>                // connector.rollback(batchId); // 处理失败, 回滚数据
>            }
>        } catch (Exception e) {
>            logger.error("process error!", e);
>        } finally {
>            connector.disconnect();
>            MDC.remove("destination");
>        }
>    }}
>
>

根据数据库事件处理消息,过滤消息列表,对数据变动进行处理,用到信息为:

  • insert :schemaName,tableName,beforeColumnsList
  • update :schemaName,tableName,afterColumnsList
  • delete :schemaName,tableName,afterColumnsList
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
> 复制代码`RowChange rowChage = null;`
>    try {
>        rowChage = RowChange.parseFrom(entry.getStoreValue());
>    } catch (Exception e) {
>        throw new RuntimeException("parse event has an error , data:" + entry.toString(), e);
>    }
>    EventType eventType = rowChage.getEventType();
>    logger.info(row_format,
>            entry.getHeader().getLogfileName(),
>            String.valueOf(entry.getHeader().getLogfileOffset()), entry.getHeader().getSchemaName(),
>            entry.getHeader().getTableName(), eventType,
>            String.valueOf(entry.getHeader().getExecuteTime()), String.valueOf(delayTime));
>    if (eventType == EventType.QUERY || rowChage.getIsDdl()) {
>        logger.info(" sql ----> " + rowChage.getSql());
>        continue;
>    }
>   `DataService dataService = SpringUtil.getBean(DataService.class);`
>    for (RowData rowData : rowChage.getRowDatasList()) {
>        if (eventType == EventType.DELETE) {
>            dataService.delete(rowData.getBeforeColumnsList(), entry.getHeader().getSchemaName(), entry.getHeader().getTableName());
>        } else if (eventType == EventType.INSERT) {
>            dataService.insert(rowData.getAfterColumnsList(), entry.getHeader().getSchemaName(), entry.getHeader().getTableName());
>        } else if (eventType == EventType.UPDATE) {
>            dataService.update(rowData.getAfterColumnsList(), entry.getHeader().getSchemaName(), entry.getHeader().getTableName());
>        } else {
>            logger.info("未知数据变动类型:{}", eventType);
>        }
>    }
> }
>
>

ColumnsList转换成MongoTemplate 可用的数据类:DBObject,顺便做下数据类型转换

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
> 复制代码`public static DBObject columnToJson(List<CanalEntry.Column> columns) {`
>    DBObject obj = new BasicDBObject();
>    try {
>        for (CanalEntry.Column column : columns) {
>            String mysqlType = column.getMysqlType();
>            //int类型,长度11以下为Integer,以上为long
>            if (mysqlType.startsWith("int")) {
>                int lenBegin = mysqlType.indexOf('(');
>                int lenEnd = mysqlType.indexOf(')');
>                if (lenBegin > 0 && lenEnd > 0) {
>                    int length = Integer.parseInt(mysqlType.substring(lenBegin + 1, lenEnd));
>                    if (length > 10) {
>                        obj.put(column.getName(), StringUtils.isBlank(column.getValue()) ? null : Long.parseLong(column.getValue()));
>                        continue;
>                    }
>                }
>             `obj.put(column.getName(), StringUtils.isBlank(column.getValue()) ? null : Integer.parseInt(column.getValue()));`
>            } else if (mysqlType.startsWith("bigint")) {
>                obj.put(column.getName(), StringUtils.isBlank(column.getValue()) ? null : Long.parseLong(column.getValue()));
>            } else if (mysqlType.startsWith("decimal")) {
>                int lenBegin = mysqlType.indexOf('(');
>                int lenCenter = mysqlType.indexOf(',');
>                int lenEnd = mysqlType.indexOf(')');
>                if (lenBegin > 0 && lenEnd > 0 && lenCenter > 0) {
>                    int length = Integer.parseInt(mysqlType.substring(lenCenter + 1, lenEnd));
>                    if (length == 0) {
>                        obj.put(column.getName(), StringUtils.isBlank(column.getValue()) ? null : Long.parseLong(column.getValue()));
>                        continue;
>                    }
>                }
>             `obj.put(column.getName(), StringUtils.isBlank(column.getValue()) ? null : Double.parseDouble(column.getValue()));`
>            } else if (mysqlType.equals("datetime") || mysqlType.equals("timestamp")) {
>                obj.put(column.getName(), StringUtils.isBlank(column.getValue()) ? null : DATE_TIME_FORMAT.parse(column.getValue()));
>            } else if (mysqlType.equals("date")) {
>                obj.put(column.getName(), StringUtils.isBlank(column.getValue()) ? null : DATE_FORMAT.parse(column.getValue()));
>            } else if (mysqlType.equals("time")) {
>                obj.put(column.getName(), StringUtils.isBlank(column.getValue()) ? null : TIME_FORMAT.parse(column.getValue()));
>            } else {
>                obj.put(column.getName(), column.getValue());
>            }
>        }
>    } catch (ParseException e) {
>        e.printStackTrace();        }
>   `return obj;`
> }
>
>

tip:*DBObject对象如果同时用于保存原始数据和组合数据或其他数据,使用时应该深度拷贝对象生成副本,然后使用副本*

数据拼接

我们获取了数据库数据后做拼接操作,比如两张用户表:

1
2
复制代码`user_info:{id,user_no,user_name,user_password}`
user_other_info:{id,user_no,idcard,realname}

拼接后mongo数据为:

1
复制代码user:{_id,user_no,userInfo:{id,user_no,user_name,user_password},userOtherInfo:{id,user_no,idcard,realname})

接收到的数据信息很多,如何才能简单的触发数据拼接操作呢?

先看我们能获取的信息:schemaName,tableName,DBObject,Event(insert,update,delete)

将这些信息标识拼接起来看看:/schemaName/tableName/Event(DBObject),没错,就是一个标准的restful链接。只要我们实现一个简单的springMVC 就能自动获取需要的数据信息进行拼接操作。

先实现@Controller,定义名称为Schema,value对应schemaName

1
2
3
4
5
6
7
8
9
> 复制代码`@Target({ElementType.TYPE})`
> @Retention(RetentionPolicy.RUNTIME)
> @Documented
> @Component
> public  @interface Schema {
> String value() default "";
> }
>
>

然后实现@RequestMapping,定义名称为Table,直接使用Canal中的EventType 对应RequestMethod

1
2
3
4
5
6
7
8
9
> 复制代码`@Target({ElementType.METHOD, ElementType.TYPE})`
> @Retention(RetentionPolicy.RUNTIME)
> @Documented
> public  @interface Table {
>    String value() default "";
>    CanalEntry.EventType[] event() default {};
> }
>
>

然后创建springUtil,实现接口ApplicationContextAware,应用启动 加载的时候初始化两个Map:intanceMap,handlerMap

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
> 复制代码`@Override`
> public void setApplicationContext(ApplicationContext applicationContext) {
>    if (SpringUtil.applicationContext == null) {
>        SpringUtil.applicationContext = applicationContext;
>        //初始化instanceMap数据
>        instanceMap();
>        //初始化handlerMap数据
>        handlerMap();
>    }
> }private void instanceMap() {
>    Map<String, Object> beans = applicationContext.getBeansWithAnnotation(Schema.class);
>    for (Object bean : beans.values()) {
>        Class<?> clazz = bean.getClass();
>        Object instance = applicationContext.getBean(clazz);
>        Schema schema = clazz.getAnnotation(Schema.class);
>        String key = schema.value();
>        instanceMap.put(key, instance);
>        logger.info("instanceMap [{}:{}]", key, bean == null ? "null" : clazz.getName());
>    }
> }private void handlerMap(){  ...}
>
>

调用方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
> 复制代码`public static void doEvent(String path, DBObject obj) throws Exception {`
>    String[] pathArray = path.split("/");
>    if (pathArray.length != 4) {
>        logger.info("path 格式不正确:{}", path);
>        return;
>    }
>    Method method = handlerMap.get(path);
>    Object schema = instanceMap.get(pathArray[1]);
>    //查找不到映射Bean和Method不做处理
>    if (method == null || schema == null) {
>        return;    }
>    try {
>        long begin = System.currentTimeMillis();
>        logger.info("integrate data:{},{}", path, obj);
>        method.invoke(schema, new Object[]{obj});
>        logger.info("integrate data consume: {}ms:", System.currentTimeMillis() - begin);
>    } catch (Exception e) {
>        logger.error("调用组合逻辑异常", e);
>        throw new Exception(e.getCause());
>    }
> }
>
>

数据拼接消息处理:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
> 复制代码`@Schema("demo_user")``public class UserService {`
>    @Table(value = "user_info", event = {CanalEntry.EventType.INSERT, CanalEntry.EventType.UPDATE})
>    public void saveUser_UserInfo(DBObject userInfo) {
>        String userNo = userInfo.get("user_no") == null ? null : userInfo.get("user_no").toString();
>        DBCollection collection = completeMongoTemplate.getCollection("user");
>        DBObject queryObject = new BasicDBObject("user_no", userNo);
>        DBObject user = collection.findOne(queryObject);
>        if (user == null) {
>            user = new BasicDBObject();
>            user.put("user_no", userNo);
>            user.put("userInfo", userInfo);
>            collection.insert(user);
>        } else {                        DBObject updateObj = new BasicDBObject("userInfo", userInfo);
>            DBObject update = new BasicDBObject("$set", updateObj);
>            collection.update(queryObject, update);
>        }
>    }
> }
>
>

示例源码

https://github.com/zhangtr/canal-mongo

原文出处:http://www.torry.top/2017/10/22/canal-mongodb/

推荐阅读

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

0%