这是我参与8月更文挑战的第5天,活动详情查看:8月更文挑战
ODS 层
采集到 kafka 直接作为 ODS
层,不需要额外处理,保持数据原貌。
日志数据主题:ods_base_log
业务数据主题:ods_base_db_m
DWD 层
日志 DWD 层
我们前面采集的日志数据已经保存到Kafka中,作为日志数据的ODS层,从kafka的ODS层读取的日志数据分为3类, 页面日志、启动日志和曝光日志。这三类数据虽然都是用户行为数据,但是有着完全不一样的数据结构,所以要拆分处理。将拆分后的不同的日志写回Kafka不同主题中,作为日志DWD层。
页面日志输出到主流,启动日志输出到启动侧输出流,曝光日志输出到曝光侧输出流。
1. 分析主要任务
- 接收Kafka数据,过滤空值数据
对Maxwell抓取的数据进行ETL,保留有用的部分,过滤掉没用的
- 实现 动态 分流功能
由于MaxWell是把全部数据统一写入一个Topic中, 这样显然不利于日后的数据处理。所以需要把各个表拆开处理。但是由于每个表有不同的特点,有些表是维度表
,有些表是事实表
,有的表既是事实表在某种情况下也是维度表。
在实时计算中一般把维度数据写入存储容器,一般是方便通过主键查询的数据库比如HBase,Redis,MySQL等。一般把事实数据写入流中,进行进一步处理,最终形成宽表。但是作为Flink实时计算任务,如何得知哪些表是维度表,哪些是事实表呢?而这些表又应该采集哪些字段呢?
我们可以将上面的内容放到某一个地方,集中配置。这样的配置不适合写在配置文件中,因为业务端随着需求变化每增加一张表,就要修改配置重启计算程序。所以这里需要一种动态配置方案,把这种配置长期保存起来,一旦配置有变化,实时计算可以自动感知。
这种可以有两个方案实现
+ 一种是用Zookeeper存储,通过Watch感知数据变化。
+ 另一种是用mysql数据库存储,周期性的同步,使用 `FlinkCDC` 读取。这里选择第二种方案,主要是mysql对于配置数据初始化和维护管理,用sql都比较方便。
如图:
业务数据保存到 Kafka
的主题中
维度数据保存到 Hbase
的表中
2. 代码实现
1. 建 配置表
1 | sql复制代码CREATE TABLE `table_process` ( |
2. maven
依赖
1 | xml复制代码<dependency> |
3. 配置表实体类
1 | java复制代码@Data |
4. 在MySQL Binlog 添加对配置数据库的监听,并重启MySQL
修改配置文件
1 | bash复制代码sudo vim /etc/my.cnf |
把存放配置数据库(tmall_realtime
)添加至 binlog-do-db
1 | ini复制代码server-id=1 |
下期预告:DWD层 用到的工具类
关注专栏持续更新 👇🏻👇🏻👇🏻👇🏻👇🏻👇🏻👇🏻👇🏻
本文转载自: 掘金