这是我参与11月更文挑战的第19天,活动详情查看:2021最后一次更文挑战
一、概述
DStream 上的操作与 RDD 的类似, 分为 Transformations(转换) 和 Output Operations(输出)两种, 此外转换操作中还有一些比较特殊的方法, 如: updateStateByKey、transform 以及各种 Window 相关的操作。
备注:
- 在
DStream与RDD上的转换操作非常类似(无状态的操作) DStream有自己特殊的操作(窗口操作、追踪状态变化操作)- 在
DStream上的转换操作比RDD上的转换操作少
DStream 的转化操作可以分为 无状态(stateless) 和 有状态(stateful) 两种:
- 无状态转化操作。每个批次的处理不依赖于之前批次的数据。常见的
RDD转化操作, 例如map、filter、reduceByKey等 - 有状态转化操作。需要使用之前批次的数据 或者是 中间结果来计算当前批次的数据。有状态转化操作包括: 基于滑动窗口的转化操作 或 追踪状态变化的转化操作
二、无状态转换
无状态转化操作就是把简单的 RDD 转化操作应用到每个批次上, 也就是转化 DStream 中的每一个 RDD。
常见的无状态转换包括: map、flatMap、filter、repartition、reduceByKey、groupByKey;
直接作用在
DStream上。
案例:黑名单过滤
假设: arr1 为黑名单数据(自定义), true 表示数据生效, 需要被过滤掉; false 表示数据未生效val arr1 = Array(("spark", true), ("scala", false))
假设: 流式数据格式为 “time word”, 需要根据黑名单中的数据对流式数据执行过滤操作。如 “2 spark” 要被过滤掉
1 | arduino复制代码1 hadoop |
方法有三:
- 方法一: 使用外连接
- 方法二: 使用
SQL - 方法三: 直接过滤
1) 方法一:使用外链接
1 | scala复制代码import org.apache.spark.SparkConf |
2)方法二:使用 SQL
3)方法三:直接过滤
1 | scala复制代码import org.apache.spark.SparkConf |
三、有状态转换
有状态的转换主要有两种: 窗口操作、状态跟踪操作
(1)窗口操作
Window Operations 可以设置窗口大小和滑动窗口间隔来动态的获取当前 Streaming 的状态。
基于窗口的操作会在一个比 StreamingContext 的 batchDuration(批次间隔)更长的时间范围内, 通过整合多个批次的结果, 计算出整个窗口的结果。
如图:
基于窗口的操作需要两个参数:
- 窗口长度(
windowDuration)。控制每次计算最近的多少个批次的数据 - 滑动间隔(
slideDuration)。用来控制对新的DStream进行计算的间隔
两者都必须是 StreamingContext 中批次间隔(batchDuration)的整数倍。
每秒发送1个数字:
1 | scala复制代码import java.io.PrintWriter |
1)举个栗子一
- 观察窗口的数据;
- 观察
batchDuration、windowDuration、slideDuration三者之间的关系; - 使用窗口相关的操作;
1 | scala复制代码import org.apache.spark.SparkConf |
2)举个栗子二
热点搜索词实时统计:每隔 10 秒, 统计最近20秒的词出现的次数。
1 | scala复制代码import org.apache.spark.SparkConf |
(2)updateStateByKey 状态跟踪操作
UpdateStateByKey 的主要功能:
- 为
Streaming中每一个Key维护一份state状态,state类型可以是任意类型的, 可以是自定义对象; 更新函数也可以是自定义的。 - 通过更新函数对该
key的状态不断更新, 对于每个新的batch而言,Spark Streaming会在使用updateStateByKey的时候为已经存在的key进行state的状态更新 - 使用
updateStateByKey时要开启checkpoint功能
流式程序启动后计算 wordcount 的累计值, 将每个批次的结果保存到文件:
1 | scala复制代码import org.apache.spark.SparkConf |
统计全局的 key 的状态, 但是就算没有数据输入, 也会在每一个批次的时候返回之前的 key 的状态。
这样的缺点: 如果数据量很大的话, checkpoint 数据会占用较大的存储, 而且效率也不高。
mapWithState : 也是用于全局统计 key 的状态。如果没有数据输入, 便不会返回之前的 key 的状态, 有一点增量的感觉。
这样做的好处是, 只关心那些已经发生的变化的 key, 对于没有数据输入, 则不会返回那些没有变化的 key 的数据。即使数据量很大, checkpoint 也不会像 updateStateByKey 那样, 占用太多的存储。
1 | scala复制代码import org.apache.spark.SparkConf |
本文转载自: 掘金