这是我参与8月更文挑战的第3天,活动详情查看:8月更文挑战
业务数据采集
流程
- 用户产生业务数据
- 后端服务存入Mysql
- 开启Mysql binlog
- 使用 Maxwell 将业务数据发送至 Kafka
Maxwell 介绍
Maxwell 是由美国 Zendesk 开源,用Java编写的MySQL实时抓取软件。实时读取MySQL二进制日志Binlog,并生成 JSON 格式的消息,作为生产者发送给 Kafka,Kinesis、RabbitMQ、Redis、Google Cloud Pub/Sub、文件或其它平台的应用程序。
官网地址:maxwells-daemon.io/
与 Maxwell 相同功能的框架还有:Debezium、canal
选择 Maxwell 的原因的,一行数据会生成一个json,结构简单,而且没有表字段类型什么的,比较清晰,很符合业务需要。
Maxwell 原理
很简单,就是把自己伪装成 slave , 假装从 master 复制数据
1. MySQL主从复制过程
- Master主库将改变记录,写到二进制日志(binary log)中
- Slave从库向mysql master发送dump协议,将master主库的binary log events拷贝到它的中继日志(relay log);
- Slave从库读取并重做中继日志中的事件,将改变的数据同步到自己的数据库。
2. MySQL的binlog
- 什么是binlog
MySQL的二进制日志可以说MySQL最重要的日志了,它记录了所有的DDL和DML(除了数据查询语句)语句,以事件形式记录,还包含语句所执行的消耗的时间,MySQL的二进制日志是事务安全型的。
一般来说开启二进制日志大概会有1%的性能损耗。二进制有两个最重要的使用场景:
Ø 其一:MySQL Replication在Master端开启binlog,Master把它的二进制日志传递给slaves来达到master-slave数据一致的目的。
Ø 其二:自然就是数据恢复了,通过使用mysqlbinlog工具来使恢复数据。
二进制日志包括两类文件:二进制日志索引文件(文件名后缀为.index)用于记录所有的二进制文件,二进制日志文件(文件名后缀为.00000*)记录数据库所有的DDL和DML(除了数据查询语句)语句事件。
2. binlog 的开启
1. 找到MySQL配置文件的位置
2. Linux: /etc/my.cnf
如果/etc目录下没有,可以通过locate my.cnf查找位置
3. 在mysql的配置文件下,修改配置
在[mysqld] 区块,设置/添加 log-bin=mysql-bin这个表示binlog日志的前缀是mysql-bin,以后生成的日志文件就是 mysql-bin.123456 的文件后面的数字按顺序生成,每次mysql重启或者到达单个文件大小的阈值时,新生一个文件,按顺序编号。
- binlog 的分类设置
mysql binlog的格式有三种,分别是STATEMENT,MIXED,ROW。
在配置文件中可以选择配置 binlog_format= statement|mixed|row
三种格式的区别:
1. statement
语句级,binlog会记录每次一执行写操作的语句。
相对row模式节省空间,但是可能产生不一致性,比如
update tt set create\_date=now()
如果用binlog日志进行恢复,由于执行时间不同可能产生的数据就不同。
优点: 节省空间
缺点: 有可能造成数据不一致。
2. row
行级, binlog会记录每次操作后每行记录的变化。
优点:保持数据的绝对一致性。因为不管sql是什么,引用了什么函数,他只记录执行后的效果。
缺点:占用较大空间。
3. mixed
statement的升级版,一定程度上解决了,因为一些情况而造成的statement模式不一致问题
默认还是statement,在某些情况下譬如:
1
2
3
4
scss复制代码 当函数中包含 UUID() 时;
包含 AUTO_INCREMENT 字段的表被更新时;
执行 INSERT DELAYED 语句时;
用 UDF 时;
会按照 ROW的方式进行处理
优点:节省空间,同时兼顾了一定的一致性。
缺点:还有些极个别情况依旧会造成不一致,另外statement和mixed对于需要对binlog的监控的情况都不方便。**综合上面对比,Maxwell想做监控分析,选择row格式比较合适**
实现
1. 配置 MySQL
- 配置binlog
1 | bash复制代码sudo vim /etc/my.cnf |
1 | ini复制代码server-id=1 |
注意:binlog-do-db根据自己的情况进行修改,指定具体要同步的数据库
2. 重启MySQL使配置生效
1 | 复制代码sudo systemctl restart mysqld |
2. 配置 Maxwell
- 下载 Maxwell
官网地址:maxwells-daemon.io/
github:github.com/zendesk/max…
我使用的是 1.25.0 版本
2. 解压 Maxwell
1 | bash复制代码tar -zxvf /opt/software/maxwell-1.25.0.tar.gz -C /opt/module/ |
- 初始化 Maxwell 元数据库
1. 创建 `maxwell` 数据库
1
ini复制代码CREATE DATABASE maxwell ;
2. 分配一个账号可以操作该数据库
1
sql复制代码GRANT ALL ON maxwell.* TO 'maxwell'@'%' IDENTIFIED BY '666666';
3. 分配这个账号可以监控其他数据库的权限
1
vbnet复制代码GRANT SELECT ,REPLICATION SLAVE , REPLICATION CLIENT ON *.* TO maxwell@'%';
- 配置 Maxwell
1. 拷贝配置文件
1
2
arduino复制代码cd /opt/module/maxwell-1.25.0/
cp config.properties.example config.properties
2. 修改配置文件
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
ini复制代码# 配置kafka
producer=kafka
kafka.bootstrap.servers=hd1:9092,hd2:9092,hd3:9092
# 需要添加topic
kafka_topic=ods_base_db_m
# mysql
host=hd3
user=maxwell
password=666666
# 处理历史数据需要用到
client_id=maxwell_1
```**`注意`**:默认还是输出到指定Kafka主题的`一个`kafka分区,因为多个分区并行可能会打乱binlog的顺序
如果要提高并行度,首先设置kafka的分区数 > 1,然后设置`producer_partition_by`属性
可选值 producer\_partition\_by=database|table|primary\_key|random|column
5. 启动 Maxwell
bash复制代码/opt/module/maxwell-1.25.0/bin/maxwell –config /opt/module/maxwell-1.25.0/config.properties >/dev/null 2>&1 &
1 | 6. 测试 Maxwell |
bash复制代码cd /opt/module/kafka_2.11-2.4.1/
bin/kafka-console-consumer.sh –bootstrap-server hd1:9092 –topic ods_base_db_m
> 关注专栏持续更新 👇🏻👇🏻👇🏻👇🏻👇🏻👇🏻👇🏻👇🏻
**本文转载自:** [掘金](https://juejin.cn/post/6997667773472047118)
*[开发者博客 – 和开发相关的 这里全都有](https://dev.newban.cn/)*