【Spark Streming】DStream转换操作

这是我参与11月更文挑战的第19天,活动详情查看:2021最后一次更文挑战

一、概述

DStream 上的操作与 RDD 的类似, 分为 Transformations(转换) 和 Output Operations(输出)两种, 此外转换操作中还有一些比较特殊的方法, 如: updateStateByKeytransform 以及各种 Window 相关的操作。

备注:

  • DStreamRDD 上的转换操作非常类似(无状态的操作)
  • DStream 有自己特殊的操作(窗口操作、追踪状态变化操作)
  • DStream 上的转换操作比 RDD 上的转换操作少

DStream 的转化操作可以分为 无状态(stateless) 和 有状态(stateful) 两种:

  • 无状态转化操作。每个批次的处理不依赖于之前批次的数据。常见的 RDD 转化操作, 例如 mapfilterreduceByKey
  • 有状态转化操作。需要使用之前批次的数据 或者是 中间结果来计算当前批次的数据。有状态转化操作包括: 基于滑动窗口的转化操作 或 追踪状态变化的转化操作

二、无状态转换

无状态转化操作就是把简单的 RDD 转化操作应用到每个批次上, 也就是转化 DStream 中的每一个 RDD

常见的无状态转换包括: mapflatMapfilterrepartitionreduceByKeygroupByKey;

直接作用在 DStream 上。

案例:黑名单过滤

假设: arr1 为黑名单数据(自定义), true 表示数据生效, 需要被过滤掉; false 表示数据未生效
val arr1 = Array(("spark", true), ("scala", false))

假设: 流式数据格式为 “time word”, 需要根据黑名单中的数据对流式数据执行过滤操作。如 “2 spark” 要被过滤掉

1
2
3
4
5
6
arduino复制代码1 hadoop
2 spark
3 scala
4 java
5 hive
// 结果:"2 spark" 被过滤

方法有三:

  1. 方法一: 使用外连接
  2. 方法二: 使用 SQL
  3. 方法三: 直接过滤

1) 方法一:使用外链接

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
scala复制代码import org.apache.spark.SparkConf
import org.apache.spark.rdd.RDD
import org.apache.spark.streaming.dstream.ConstantInputDStream
import org.apache.spark.streaming.{Seconds, StreamingContext}

/**
* @author donald
* @date 2021/02/23
*/
object BlackListFilter1 {
def main(args: Array[String]) {
// 初始化
val conf = new
SparkConf().setAppName(this.getClass.getCanonicalName).setMaster(
"local[2]")
val ssc = new StreamingContext(conf, Seconds(10))
ssc.sparkContext.setLogLevel("WARN")
// 黑名单数据
val blackList = Array(("spark", true), ("scala", true))
val blackListRDD = ssc.sparkContext.makeRDD(blackList)
// 生成测试DStream。使用ConstantInputDStream
val strArray: Array[String] = ("spark java scala hadoop kafka " +
"hive hbase zookeeper")
.split("\\s+")
.zipWithIndex
.map { case (word, idx) => s"$idx $word" }
val rdd = ssc.sparkContext.makeRDD(strArray)
val clickStream = new ConstantInputDStream(ssc, rdd)
// 流式数据的处理
val clickStreamFormatted = clickStream.map(value =>
(value.split(" ")(1), value))
clickStreamFormatted.transform(clickRDD => {
// 通过leftOuterJoin操作既保留了左侧RDD的所有内容,又获得了内容是否在黑名单中
val joinedBlackListRDD: RDD[(String, (String, Option[Boolean]))] = clickRDD.leftOuterJoin(blackListRDD)
joinedBlackListRDD.filter { case (word, (streamingLine, flag)) =>
if (flag.getOrElse(false)) false
else true
}.map { case (word, (streamingLine, flag)) => streamingLine
}
}).print()

// 启动流式作业
ssc.start()
ssc.awaitTermination()
}
}

2)方法二:使用 SQL

3)方法三:直接过滤

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
scala复制代码import org.apache.spark.SparkConf
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.streaming.dstream.ConstantInputDStream
import org.apache.spark.streaming.{Seconds, StreamingContext}

/**
* @author donald
* @date 2021/02/23
*/
object BlackListFilter3 {
def main(args: Array[String]) {
// 初始化
val conf = new
SparkConf().setAppName(this.getClass.getCanonicalName).setMaster(
"local[2]")
val ssc = new StreamingContext(conf, Seconds(10))
ssc.sparkContext.setLogLevel("WARN")
// 黑名单数据
val blackList = Array(("spark", true), ("scala", true))
val blackListBC: Broadcast[Array[String]] =
ssc.sparkContext.broadcast(blackList.filter(_._2).map(_._1))
// 生成测试DStream。使用ConstantInputDStream
val strArray: Array[String] = "spark java scala hadoop kafka hive hbase zookeeper"
.split("\\s+")
.zipWithIndex
.map { case (word, idx) => s"$idx $word" }
val rdd = ssc.sparkContext.makeRDD(strArray)
val clickStream = new ConstantInputDStream(ssc, rdd)
// 流式数据的处理
clickStream.map(value => (value.split(" ")(1), value))
.filter { case (word, _) =>
!blackListBC.value.contains(word)
}
.map(_._2)
.print()
// 启动流式作业
ssc.start()
ssc.awaitTermination()
}
}

三、有状态转换

有状态的转换主要有两种: 窗口操作、状态跟踪操作

(1)窗口操作

Window Operations 可以设置窗口大小和滑动窗口间隔来动态的获取当前 Streaming 的状态。

基于窗口的操作会在一个比 StreamingContextbatchDuration(批次间隔)更长的时间范围内, 通过整合多个批次的结果, 计算出整个窗口的结果。

如图:

基于窗口的操作需要两个参数:

  • 窗口长度(windowDuration)。控制每次计算最近的多少个批次的数据
  • 滑动间隔(slideDuration)。用来控制对新的 DStream 进行计算的间隔

两者都必须是 StreamingContext 中批次间隔(batchDuration)的整数倍。

每秒发送1个数字:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
scala复制代码import java.io.PrintWriter
import java.net.{ServerSocket, Socket}

/**
* @author donald
* @date 2021/02/23
*/
object SocketLikeNCWithWindow {
def main(args: Array[String]): Unit = {
val port = 1521
val ss = new ServerSocket(port)
val socket: Socket = ss.accept()
println("connect to host : " + socket.getInetAddress)
var i = 0
// 每秒发送1个数
while(true) {
i += 1
val out = new PrintWriter(socket.getOutputStream)
out.println(i)
out.flush()
Thread.sleep(1000)
}
}
}

1)举个栗子一

  • 观察窗口的数据;
  • 观察 batchDurationwindowDurationslideDuration 三者之间的关系;
  • 使用窗口相关的操作;
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
scala复制代码import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}
import org.apache.spark.streaming.{Seconds, StreamingContext}

/**
* @author donald
* @date 2021/02/23
*/
object WindowDemo {
def main(args: Array[String]): Unit = {
val conf = new
SparkConf().setMaster("local[*]").setAppName(this.getClass.getCanonicalName)
// 每 5s 生成一个RDD(mini-batch)
val ssc = new StreamingContext(conf, Seconds(5))
ssc.sparkContext.setLogLevel("error")
val lines: ReceiverInputDStream[String] =
ssc.socketTextStream("localhost", 1521)
lines.foreachRDD{ (rdd, time) =>
println(s"rdd = ${rdd.id}; time = $time")
rdd.foreach(value => println(value))
}
// 20s 窗口长度(ds包含窗口长度范围内的数据);10s 滑动间隔(多次时间处理一次数据)
val res1: DStream[String] = lines.reduceByWindow(_ + " " + _,
Seconds(20), Seconds(10))
res1.print()
val res2: DStream[String] = lines.window(Seconds(20),
Seconds(10))
res2.print()
// 求窗口元素的和
val res3: DStream[Int] =
lines.map(_.toInt).reduceByWindow(_+_, Seconds(20), Seconds(10))
res3.print()
// 求窗口元素的和
val res4 = res2.map(_.toInt).reduce(_+_)
res4.print()
ssc.start()
ssc.awaitTermination()
}
}

2)举个栗子二

热点搜索词实时统计:每隔 10 秒, 统计最近20秒的词出现的次数。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
scala复制代码import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}
import org.apache.spark.streaming.{Seconds, StreamingContext}

/**
* @author donald
* @date 2021/02/23
*/
object HotWordStats {
def main(args: Array[String]): Unit = {
val conf: SparkConf = new SparkConf()
.setMaster("local[2]")
.setAppName(this.getClass.getCanonicalName)
val ssc = new StreamingContext(conf, Seconds(2))
ssc.sparkContext.setLogLevel("ERROR")
//设置检查点,检查点具有容错机制。生产环境中应设置到HDFS
ssc.checkpoint("data/checkpoint/")
val lines: ReceiverInputDStream[String] =
ssc.socketTextStream("localhost", 9999)
val words: DStream[String] = lines.flatMap(_.split("\\s+"))
val pairs: DStream[(String, Int)] = words.map(x => (x, 1))
// 通过reduceByKeyAndWindow算子, 每隔10秒统计最近20秒的词出现的次数
// 后 3个参数:窗口时间长度、滑动窗口时间、分区
val wordCounts1: DStream[(String, Int)] =
pairs.reduceByKeyAndWindow(
(a: Int, b: Int) => a + b,
Seconds(20),
Seconds(10), 2)
wordCounts1.print
// 这里需要checkpoint的支持
val wordCounts2: DStream[(String, Int)] =
pairs.reduceByKeyAndWindow(
_ + _,
_ - _,
Seconds(20),
Seconds(10), 2)
wordCounts2.print
ssc.start()
ssc.awaitTermination()
}
}

(2)updateStateByKey 状态跟踪操作

UpdateStateByKey 的主要功能:

  • Streaming 中每一个 Key 维护一份 state 状态, state 类型可以是任意类型的, 可以是自定义对象; 更新函数也可以是自定义的。
  • 通过更新函数对该 key 的状态不断更新, 对于每个新的 batch 而言, Spark Streaming 会在使用 updateStateByKey 的时候为已经存在的 key 进行 state 的状态更新
  • 使用 updateStateByKey 时要开启 checkpoint 功能

流式程序启动后计算 wordcount 的累计值, 将每个批次的结果保存到文件:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
scala复制代码import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}
import org.apache.spark.streaming.{Seconds, StreamingContext}

/**
* @author donald
* @date 2021/02/23
*/
object StateTracker1 {
def main(args: Array[String]) {
val conf: SparkConf = new SparkConf().setMaster("local[2]")
.setAppName(this.getClass.getCanonicalName)
val ssc = new StreamingContext(conf, Seconds(5))
ssc.sparkContext.setLogLevel("ERROR")
ssc.checkpoint("data/checkpoint/")
val lines: ReceiverInputDStream[String] =
ssc.socketTextStream("localhost", 9999)
val words: DStream[String] = lines.flatMap(_.split("\\s+"))
val wordDstream: DStream[(String, Int)] = words.map(x => (x,
1))
// 定义状态更新函数
// 函数常量定义,返回类型是Some(Int),表示的含义是最新状态
// 函数的功能是将当前时间间隔内产生的Key的value集合,加到上一个状态中, 得到最新状态
val updateFunc = (currValues: Seq[Int], prevValueState:
Option[Int]) => {
//通过Spark内部的reduceByKey按key规约,然后这里传入某key当前批次的 Seq,再计算当前批次的总和
val currentCount = currValues.sum
// 已累加的值
val previousCount = prevValueState.getOrElse(0)
Some(currentCount + previousCount)
}
val stateDstream: DStream[(String, Int)] =
wordDstream.updateStateByKey[Int](updateFunc)
stateDstream.print()
// 把DStream保存到文本文件中,会生成很多的小文件。一个批次生成一个目录
val outputDir = "data/output1"
stateDstream.repartition(1)
.saveAsTextFiles(outputDir)
ssc.start()
ssc.awaitTermination()
}
}

统计全局的 key 的状态, 但是就算没有数据输入, 也会在每一个批次的时候返回之前的 key 的状态。

这样的缺点: 如果数据量很大的话, checkpoint 数据会占用较大的存储, 而且效率也不高。

mapWithState : 也是用于全局统计 key 的状态。如果没有数据输入, 便不会返回之前的 key 的状态, 有一点增量的感觉。

这样做的好处是, 只关心那些已经发生的变化的 key, 对于没有数据输入, 则不会返回那些没有变化的 key 的数据。即使数据量很大, checkpoint 也不会像 updateStateByKey 那样, 占用太多的存储。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
scala复制代码import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}
import org.apache.spark.streaming.{Seconds, State, StateSpec, StreamingContext}

/**
* @author donald
* @date 2021/02/23
*/
object StateTracker2 {
def main(args: Array[String]) {
val conf: SparkConf = new SparkConf()
.setMaster("local[*]")
.setAppName(this.getClass.getCanonicalName)
val ssc = new StreamingContext(conf, Seconds(2))
ssc.sparkContext.setLogLevel("ERROR")
ssc.checkpoint("data/checkpoint/")
val lines: ReceiverInputDStream[String] =
ssc.socketTextStream("localhost", 9999)
val words: DStream[String] = lines.flatMap(_.split("\\s+"))
val wordDStream: DStream[(String, Int)] = words.map(x => (x, 1))
// 函数返回的类型即为 mapWithState 的返回类型
// (KeyType, Option[ValueType], State[StateType]) => MappedType
def mappingFunction(key: String, one: Option[Int], state:
State[Int]): (String, Int) = {
val sum: Int = one.getOrElse(0) +
state.getOption.getOrElse(0)
state.update(sum)
(key, sum)
}
val spec = StateSpec.function(mappingFunction _)
val resultDStream: DStream[(String, Int)] =
wordDStream.mapWithState[Int, (String, Int)](spec)
resultDStream.cache()
// 把DStream保存到文本文件中,会生成很多的小文件。一个批次生成一个目录
val outputDir = "data/output2/"
resultDStream.repartition(1)
.saveAsTextFiles(outputDir)
ssc.start()
ssc.awaitTermination()
}
}

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

0%