大家好,我是往事随风_h。相信大家和我一样,都有一个大厂梦,作为一名大数据工程师,深知计算topN的重要性,废话不多说,接下来我将用案例给大家演示如何计算topN。
TopN的常见应用场景,最热商品购买量,最高人气作者的阅读量等等。
- Flink创建kafka数据源;
- 基于 EventTime 处理,如何指定 Watermark;
- Flink中的Window,滚动(tumbling)窗口与滑动(sliding)窗口;
- State状态的使用;
- ProcessFunction 实现 TopN 功能;
通过用户访问日志,计算最近一段时间平台最活跃的几位用户topN。
- 创建kafka生产者,发送测试数据到kafka;
- 消费kafka数据,使用滑动(sliding)窗口,每隔一段时间更新一次排名;
这里使用kafka api发送测试数据到kafka,代码如下:
1 | bash复制代码@Data |
这里通过随机数来扰乱username,便于使用户名大小不一,让结果更加明显。KafkaUtil是自己写的一个kafka工具类,代码很简单,主要是平时做测试方便。
创建一个main程序,开始编写代码。
==创建flink环境,关联kafka数据源。==
1 | css复制代码Map<String, String> config = Configuration.initConfig("commons.xml"); |
==EventTime 与 Watermark==
1 | java复制代码senv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); |
设置属性==senv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)==,表示按照数据时间字段来处理,默认是==TimeCharacteristic.ProcessingTime==.
1 | java复制代码/** The time characteristic that is used if none other is set. */ |
这个属性必须设置,否则后面,可能窗口结束无法触发,导致结果无法输出。取值有三种:
- ==ProcessingTime==:事件被处理的时间。也就是由flink集群机器的系统时间来决定。
- ==EventTime==:事件发生的时间。一般就是数据本身携带的时间。
- ==IngestionTime==:摄入时间,数据进入flink流的时间,跟ProcessingTime还是有区别的;
指定好使用数据的实际时间来处理,接下来需要指定flink程序如何get到数据的时间字段,这里使用调用==DataStream==的==assignTimestampsAndWatermarks==方法,抽取时间和设置==watermark==。
1 | java复制代码senv.addSource( |
前面给出的代码中可以看出,由于发送到kafka的时候,将User对象转换为json字符串了,这里使用的是==fastjson==,接收过来可以转化为==JsonObject==来处理,我这里还是将其转化为User对象==JSON.parseObject(x, User.class)==,便于处理。
这里考虑到数据可能乱序,使用了可以处理乱序的抽象类==BoundedOutOfOrdernessTimestampExtractor==,并且实现了唯一的一个没有实现的方法==extractTimestamp==,乱序数据,会导致数据延迟,在构造方法中传入了一个==Time.milliseconds(1000)==,表明数据可以延迟一秒钟。比如说,如果窗口长度是10s,010s的数据会在11s的时候计算,此时watermark是10,才会触发计算,也就是说引入watermark处理乱序数据,最多可以容忍0t这个窗口的数据,最晚在t+1时刻到来。
==窗口统计==
业务需求上,通常可能是一个小时,或者过去15分钟的数据,5分钟更新一次排名,这里为了演示效果,窗口长度取10s,每次滑动(slide)5s,即5秒钟更新一次过去10s的排名数据。
1 | java复制代码.keyBy("username") |
==我们使用.keyBy(“username”)对用户进行分组==,==使用.timeWindow(Time size, Time slide)对==每个用户做滑动窗口(10s窗口,5s滑动一次)。然后我们==使用 .aggregate(AggregateFunction af, WindowFunction wf) 做==增量的聚合操作,它能使用==AggregateFunction==提前聚合掉数据,减少 state 的存储压力。==较之.apply(WindowFunction wf)会将窗口中的数据都存储下来==,最后一起计算要高效地多。aggregate() 方法的第一个参数用于
这里的==CountAgg==实现了==AggregateFunction==接口,功能是统计窗口中的条数,即遇到一条数据就加一。
1 | java复制代码public class CountAgg implements AggregateFunction<User, Long, Long>{ |
.aggregate(AggregateFunction af, WindowFunction wf) 的第二个参数==WindowFunction==将每个 key每个窗口聚合后的结果带上其他信息进行输出。我们这里实现的==WindowResultFunction==将用户名,窗口,访问量封装成了==UserViewCount==进行输出。
1 | java复制代码private static class WindowResultFunction implements WindowFunction<Long, UserViewCount, Tuple, TimeWindow> { |
==TopN计算最活跃用户==
为了统计每个窗口下活跃的用户,我们需要再次按窗口进行分组,这里根据==UserViewCount==中的==windowEnd==进行keyBy()操作。然后使用 ==ProcessFunction== 实现一个自定义的 ==TopN 函数 TopNHotItems==来计算点击量排名前3名的用户,并将排名结果格式化成字符串,便于后续输出。
1 | java复制代码.keyBy("windowEnd") |
==ProcessFunction== 是 Flink 提供的一个 low-level API,用于实现更高级的功能。它主要提供了定时器 timer 的功能(支持EventTime或ProcessingTime)。本案例中我们将利用 timer 来判断何时收齐了某个 window 下所有用户的访问数据。由于 Watermark 的进度是全局的,在 ==processElement== 方法中,每当收到一条数据(==ItemViewCount==),我们就注册一个 ==windowEnd+1== 的定时器(Flink 框架会自动忽略同一时间的重复注册)。==windowEnd+1== 的定时器被触发时,意味着收到了==windowEnd+1==的 Watermark,即收齐了该==windowEnd==下的所有用户窗口统计值。我们在 ==onTimer()== 中处理将收集的所有商品及点击量进行排序,选出 TopN,并将排名信息格式化成字符串后进行输出。
这里我们还使用了 ==ListState< ItemViewCount >== 来存储收到的每条 ==UserViewCount== 消息,保证在发生故障时,状态数据的不丢失和一致性。==ListState== 是 Flink 提供的类似 Java List 接口的 State API,它集成了框架的 checkpoint 机制,自动做到了 exactly-once 的语义保证。
1 | java复制代码private static class TopNHotUsers extends KeyedProcessFunction<Tuple, UserViewCount, String> { |
==结果输出==
可以看到,每隔5秒钟更新输出一次数据。
如果文章有不足的地方欢迎大家在评论区指出。
本文转载自: 掘金