这是我参与11月更文挑战的第17天,活动详情查看:2021最后一次更文挑战
一、概述
有时候需要在多个任务之间共享变量, 或者在任务(Task) 和 Driver Program 之间共享变量。
在
Spark作业中,用户编写的高阶函数会在集群中的Executor里执行,这些Executor可能会用到相同的变量,这些变量被复制到每个Executor中,而Executor对变量的更新不会传回Driver。
为了满足这种需求, Spark 还是提供了两类共享变量:
- 广播变量(
broadcast variable) - 累加器(
accumulator)。
当然,对于分布式变量,如果不加限制会出现一致性的问题,所以共享变量是两种非常特殊的变量。
这两种变量可以认为是在用算子定义的数据管道外的两个全局变量,供所有计算任务使用。
广播变量、累加器主要作用是为了优化 Spark 程序。
二、广播变量
广播变量类似于 MapReduce 中的 DistributeFile,通常来说是一份不大的数据集,一旦广播变量在 Driver 中被创建,整个数据集就会在集群中进行广播,能让所有正在运行的计算任务以只读方式访问。
广播变量支持一些简单的数据类型,如整型、集合类型等,也支持很多复杂数据类型,如一些自定义的数据类型。
广播变量为了保证数据被广播到所有节点,使用了很多办法。
这其实是一个很重要的问题,我们不能期望 100 个或者 1000 个
Executor去连接Driver,并拉取数据,这会让Driver不堪重负。Executor采用的是通过HTTP连接去拉取数据,类似于BitTorrent点对点传输。
这样的方式更具扩展性,避免了所有Executor都去向Driver请求数据而造成Driver故障。
Spark 广播机制运作方式是这样的:
Driver将已序列化的数据切分成小块,然后将其存储在自己的块管理器BlockManager中,当Executor开始运行时,每个Executor首先从自己的内部块管理器中试图获取广播变量,如果以前广播过,那么直接使用;- 如果没有,
Executor就会从Driver或者其他可用的Executor去拉取数据块。一旦拿到数据块,就会放到自己的块管理器中。供自己和其他需要拉取的Executor使用。这就很好地防止了Driver单点的性能瓶颈。
广播变量将变量在节点的 Executor 之间进行共享(由 Driver 广播出去);
广播变量用来高效分发较大的对象。向所有工作节点(Executor)发送一个较大的只读值, 以供一个或多个操作使用。
使用广播变量的过程如下:
- 对一个类型
T的对象调用SparkContext.broadcast创建出一个Broadcast[T]对象。任何可序列化的类型都可以这么实现(在Driver端) - 通过
value属性访问该对象的值(在Executor中) - 变量只会被发到各个
Executor一次, 作为只读值处理
如图所示:
广播变量的相关参数:
spark.broadcast.blockSize(缺省值:4m)spark.broadcast.checksum(缺省值:true)spark.broadcast.compress(缺省值:true)
三、累加器
累加器的作用: 可以实现一个变量在不同的 Executor 端能保持状态的累加;
累计器在 Driver 端定义, 读取; 在 Executor 中完成累加;
累加器也是 lazy 的, 需要 Action 触发; Action 触发一次, 执行一次, 触发多次, 执行多次;
累加器一个比较经典的应用场景是用来在 Spark Streaming 应用中记录某些事件的数量;
实操:
1 | scala复制代码val data = sc.makeRDD(Seq("hadoop map reduce", "spark mllib")) |
Spark 内置了三种类型的累加器, 分别是:
LongAccumulator:用来累加整数型DoubleAccumulator:用来累加浮点型CollectionAccumulator:用来累加集合元素
1 | scala复制代码val data = sc.makeRDD("hadoop spark hive hbase java scala hello world spark scala java hive".split("\\s+")) |
本文转载自: 掘金