Flink 从0-1实现 电商实时数仓 - 业务数据采集

这是我参与8月更文挑战的第3天,活动详情查看:8月更文挑战

业务数据采集

流程

image.png

  1. 用户产生业务数据
  2. 后端服务存入Mysql
  3. 开启Mysql binlog
  4. 使用 Maxwell 将业务数据发送至 Kafka

Maxwell 介绍

Maxwell 是由美国 Zendesk 开源,用Java编写的MySQL实时抓取软件。实时读取MySQL二进制日志Binlog,并生成 JSON 格式的消息,作为生产者发送给 Kafka,Kinesis、RabbitMQ、Redis、Google Cloud Pub/Sub、文件或其它平台的应用程序。

官网地址:maxwells-daemon.io/

与 Maxwell 相同功能的框架还有:Debezium、canal

选择 Maxwell 的原因的,一行数据会生成一个json,结构简单,而且没有表字段类型什么的,比较清晰,很符合业务需要。

Maxwell 原理

很简单,就是把自己伪装成 slave 假装从 master 复制数据

1. MySQL主从复制过程
  • Master主库将改变记录,写到二进制日志(binary log)中
  • Slave从库向mysql master发送dump协议,将master主库的binary log events拷贝到它的中继日志(relay log);
  • Slave从库读取并重做中继日志中的事件,将改变的数据同步到自己的数据库。
2. MySQL的binlog
  1. 什么是binlog

 MySQL的二进制日志可以说MySQL最重要的日志了,它记录了所有的DDL和DML(除了数据查询语句)语句,以事件形式记录,还包含语句所执行的消耗的时间,MySQL的二进制日志是事务安全型的。

 一般来说开启二进制日志大概会有1%的性能损耗。二进制有两个最重要的使用场景:

   Ø 其一:MySQL Replication在Master端开启binlog,Master把它的二进制日志传递给slaves来达到master-slave数据一致的目的。

   Ø 其二:自然就是数据恢复了,通过使用mysqlbinlog工具来使恢复数据。

 二进制日志包括两类文件:二进制日志索引文件(文件名后缀为.index)用于记录所有的二进制文件,二进制日志文件(文件名后缀为.00000*)记录数据库所有的DDL和DML(除了数据查询语句)语句事件。
2. binlog 的开启

1. 找到MySQL配置文件的位置
2. Linux: /etc/my.cnf


如果/etc目录下没有,可以通过locate my.cnf查找位置
3. 在mysql的配置文件下,修改配置


在[mysqld] 区块,设置/添加  log-bin=mysql-bin这个表示binlog日志的前缀是mysql-bin,以后生成的日志文件就是 mysql-bin.123456 的文件后面的数字按顺序生成,每次mysql重启或者到达单个文件大小的阈值时,新生一个文件,按顺序编号。
  1. binlog 的分类设置

mysql binlog的格式有三种,分别是STATEMENT,MIXED,ROW。

在配置文件中可以选择配置 binlog_format= statement|mixed|row

三种格式的区别

1. statement


语句级,binlog会记录每次一执行写操作的语句。


相对row模式节省空间,但是可能产生不一致性,比如


update  tt set create\_date=now()


如果用binlog日志进行恢复,由于执行时间不同可能产生的数据就不同。


优点: 节省空间


缺点: 有可能造成数据不一致。
2. row


行级, binlog会记录每次操作后每行记录的变化。


优点:保持数据的绝对一致性。因为不管sql是什么,引用了什么函数,他只记录执行后的效果。


缺点:占用较大空间。
3. mixed


statement的升级版,一定程度上解决了,因为一些情况而造成的statement模式不一致问题


默认还是statement,在某些情况下譬如:



1
2
3
4
scss复制代码 当函数中包含 UUID() 时;
包含 AUTO_INCREMENT 字段的表被更新时;
执行 INSERT DELAYED 语句时;
用 UDF 时;
会按照 ROW的方式进行处理 优点:节省空间,同时兼顾了一定的一致性。 缺点:还有些极个别情况依旧会造成不一致,另外statement和mixed对于需要对binlog的监控的情况都不方便。**综合上面对比,Maxwell想做监控分析,选择row格式比较合适**

实现

1. 配置 MySQL
  1. 配置binlog
1
bash复制代码sudo vim /etc/my.cnf
1
2
3
4
ini复制代码server-id=1
log-bin=mysql-bin
binlog_format=row
binlog-do-db=tmall

注意:binlog-do-db根据自己的情况进行修改,指定具体要同步的数据库
2. 重启MySQL使配置生效

1
复制代码sudo systemctl restart mysqld
2. 配置 Maxwell
  1. 下载 Maxwell

官网地址:maxwells-daemon.io/

github:github.com/zendesk/max…

我使用的是 1.25.0 版本
2. 解压 Maxwell

1
bash复制代码tar -zxvf /opt/software/maxwell-1.25.0.tar.gz -C /opt/module/
  1. 初始化 Maxwell 元数据库
1. 创建 `maxwell` 数据库

1
ini复制代码CREATE DATABASE maxwell ;
2. 分配一个账号可以操作该数据库
1
sql复制代码GRANT ALL ON maxwell.* TO 'maxwell'@'%' IDENTIFIED BY '666666';
3. 分配这个账号可以监控其他数据库的权限
1
vbnet复制代码GRANT SELECT ,REPLICATION SLAVE , REPLICATION CLIENT  ON *.* TO maxwell@'%';
  1. 配置 Maxwell
1. 拷贝配置文件

1
2
arduino复制代码cd /opt/module/maxwell-1.25.0/
cp config.properties.example config.properties
2. 修改配置文件
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
	ini复制代码# 配置kafka
producer=kafka
kafka.bootstrap.servers=hd1:9092,hd2:9092,hd3:9092

# 需要添加topic
kafka_topic=ods_base_db_m

# mysql
host=hd3
user=maxwell
password=666666

# 处理历史数据需要用到
client_id=maxwell_1

```**`注意`**:默认还是输出到指定Kafka主题的`一个`kafka分区,因为多个分区并行可能会打乱binlog的顺序


如果要提高并行度,首先设置kafka的分区数 > 1,然后设置`producer_partition_by`属性


可选值 producer\_partition\_by=database|table|primary\_key|random|column
5. 启动 Maxwell

bash复制代码/opt/module/maxwell-1.25.0/bin/maxwell –config /opt/module/maxwell-1.25.0/config.properties >/dev/null 2>&1 &

1
6. 测试 Maxwell

bash复制代码cd /opt/module/kafka_2.11-2.4.1/
bin/kafka-console-consumer.sh –bootstrap-server hd1:9092 –topic ods_base_db_m




> 关注专栏持续更新 👇🏻👇🏻👇🏻👇🏻👇🏻👇🏻👇🏻👇🏻



**本文转载自:** [掘金](https://juejin.cn/post/6997667773472047118)

*[开发者博客 – 和开发相关的 这里全都有](https://dev.newban.cn/)*
0%