这是我参与11月更文挑战的第19天,活动详情查看:2021最后一次更文挑战
一、概述
DStream
上的操作与 RDD
的类似, 分为 Transformations
(转换) 和 Output Operations
(输出)两种, 此外转换操作中还有一些比较特殊的方法, 如: updateStateByKey
、transform
以及各种 Window
相关的操作。
备注:
- 在
DStream
与RDD
上的转换操作非常类似(无状态的操作) DStream
有自己特殊的操作(窗口操作、追踪状态变化操作)- 在
DStream
上的转换操作比RDD
上的转换操作少
DStream
的转化操作可以分为 无状态(stateless
) 和 有状态(stateful
) 两种:
- 无状态转化操作。每个批次的处理不依赖于之前批次的数据。常见的
RDD
转化操作, 例如map
、filter
、reduceByKey
等 - 有状态转化操作。需要使用之前批次的数据 或者是 中间结果来计算当前批次的数据。有状态转化操作包括: 基于滑动窗口的转化操作 或 追踪状态变化的转化操作
二、无状态转换
无状态转化操作就是把简单的 RDD
转化操作应用到每个批次上, 也就是转化 DStream
中的每一个 RDD
。
常见的无状态转换包括: map
、flatMap
、filter
、repartition
、reduceByKey
、groupByKey
;
直接作用在
DStream
上。
案例:黑名单过滤
假设: arr1
为黑名单数据(自定义), true
表示数据生效, 需要被过滤掉; false
表示数据未生效val arr1 = Array(("spark", true), ("scala", false))
假设: 流式数据格式为 “time word”, 需要根据黑名单中的数据对流式数据执行过滤操作。如 “2 spark” 要被过滤掉
1 | arduino复制代码1 hadoop |
方法有三:
- 方法一: 使用外连接
- 方法二: 使用
SQL
- 方法三: 直接过滤
1) 方法一:使用外链接
1 | scala复制代码import org.apache.spark.SparkConf |
2)方法二:使用 SQL
3)方法三:直接过滤
1 | scala复制代码import org.apache.spark.SparkConf |
三、有状态转换
有状态的转换主要有两种: 窗口操作、状态跟踪操作
(1)窗口操作
Window Operations
可以设置窗口大小和滑动窗口间隔来动态的获取当前 Streaming
的状态。
基于窗口的操作会在一个比 StreamingContext
的 batchDuration
(批次间隔)更长的时间范围内, 通过整合多个批次的结果, 计算出整个窗口的结果。
如图:
基于窗口的操作需要两个参数:
- 窗口长度(
windowDuration
)。控制每次计算最近的多少个批次的数据 - 滑动间隔(
slideDuration
)。用来控制对新的DStream
进行计算的间隔
两者都必须是 StreamingContext
中批次间隔(batchDuration
)的整数倍。
每秒发送1个数字:
1 | scala复制代码import java.io.PrintWriter |
1)举个栗子一
- 观察窗口的数据;
- 观察
batchDuration
、windowDuration
、slideDuration
三者之间的关系; - 使用窗口相关的操作;
1 | scala复制代码import org.apache.spark.SparkConf |
2)举个栗子二
热点搜索词实时统计:每隔 10 秒, 统计最近20秒的词出现的次数。
1 | scala复制代码import org.apache.spark.SparkConf |
(2)updateStateByKey
状态跟踪操作
UpdateStateByKey
的主要功能:
- 为
Streaming
中每一个Key
维护一份state
状态,state
类型可以是任意类型的, 可以是自定义对象; 更新函数也可以是自定义的。 - 通过更新函数对该
key
的状态不断更新, 对于每个新的batch
而言,Spark Streaming
会在使用updateStateByKey
的时候为已经存在的key
进行state
的状态更新 - 使用
updateStateByKey
时要开启checkpoint
功能
流式程序启动后计算 wordcount
的累计值, 将每个批次的结果保存到文件:
1 | scala复制代码import org.apache.spark.SparkConf |
统计全局的 key
的状态, 但是就算没有数据输入, 也会在每一个批次的时候返回之前的 key
的状态。
这样的缺点: 如果数据量很大的话, checkpoint
数据会占用较大的存储, 而且效率也不高。
mapWithState
: 也是用于全局统计 key
的状态。如果没有数据输入, 便不会返回之前的 key
的状态, 有一点增量的感觉。
这样做的好处是, 只关心那些已经发生的变化的 key
, 对于没有数据输入, 则不会返回那些没有变化的 key
的数据。即使数据量很大, checkpoint
也不会像 updateStateByKey
那样, 占用太多的存储。
1 | scala复制代码import org.apache.spark.SparkConf |
本文转载自: 掘金