博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
spark的持久化和共享变量
阅读量:6161 次
发布时间:2019-06-21

本文共 4017 字,大约阅读时间需要 13 分钟。

1. 持久化算子cache

  介绍:正常情况下,一个RDD是不包含真实数据的,只包含描述这个RDD元数据信息,如果对这个RDD调用cache方法,那么这个RDD的数据,依然没有真实数据,直到第一次调用一个action的算子触发了这个RDD的数据生成,那么cache操作就会把数据存储在内存中,所以第二次重复利用这个RDD的时候,计算速度将会快很多。

spark的持久化和共享变量
其中最主要的储存级别为:

//不存储在内存也不在磁盘val NONE = new StorageLevel(false, false, false, false)//存储在磁盘val DISK_ONLY = new StorageLevel(true, false, false, false)//存储在磁盘,保存2份val DISK_ONLY_2 = new StorageLevel(true, false, false, false, 2)//存储在内存val MEMORY_ONLY = new StorageLevel(false, true, false, true)//存储在内存 保存2份val MEMORY_ONLY_2 = new StorageLevel(false, true, false, true, 2)//存储在内存并序列化val MEMORY_ONLY_SER = new StorageLevel(false, true, false, false)val MEMORY_ONLY_SER_2 = new StorageLevel(false, true, false, false, 2)//内存磁盘结合使用val MEMORY_AND_DISK = new StorageLevel(true, true, false, true)val MEMORY_AND_DISK_2 = new StorageLevel(true, true, false, true, 2)val MEMORY_AND_DISK_SER = new StorageLevel(true, true, false, false)val MEMORY_AND_DISK_SER_2 = new StorageLevel(true, true, false, false, 2)//存储在堆外内存val OFF_HEAP = new StorageLevel(true, true, true, false, 1)

相应的操作:

//设置持久化        listRDD.cache()        //移除持久化        listRDD.unpersist()

2. 共享变量

  介绍:在 Spark 程序中,当一个传递给 Spark 操作(例如 map 和 reduce)的函数在远程节点上面运行 时,Spark 操作实际上操作的是这个函数所用变量的一个独立副本。这些变量会被复制到每台机器上,并且这些变量在远程机器上的所有更新都不会传递回驱动程序。通常跨任务的读 写变量是低效的,但是,Spark 还是为两种常见的使用模式提供了两种有限的共享变量:广播变量(Broadcast Variable)和累加器(Accumulator)。

 (1)广播变量

在不使用广播变量的时候:

object SparktTest {    def main(args: Array[String]): Unit = {        val conf: SparkConf = new SparkConf()        conf.setAppName("SparktTest")        conf.setMaster("local[2]")        val sc: SparkContext = new SparkContext(conf)        val list = List(("math", 18), ("hbase", 18), ("hive", 22), ("hive", 18))        val listRDD: RDD[(String, Int)] = sc.parallelize(list)        //这一句代码是在 driver中执行的,相当于这个变量是在driver进程中的。        val a=3        /**          * kv._2+a这句代码是在executor中执行的,          * 其中a这个变量会在和f序列化的过程中,会携带过去。          * 并且每一个task都会复制一份,可想而知如果这个a变量是一个大对象,那就是一个灾难          */        listRDD.map(kv=>(kv._1,kv._2+a))    }}

spark的持久化和共享变量

使用广播变量的时候:

object SparktTest {    def main(args: Array[String]): Unit = {        val conf: SparkConf = new SparkConf()        conf.setAppName("SparktTest")        conf.setMaster("local[2]")        val sc: SparkContext = new SparkContext(conf)        val list = List(("math", 18), ("hbase", 18), ("hive", 22), ("hive", 18))        val listRDD: RDD[(String, Int)] = sc.parallelize(list)        //这一句代码是在 driver中执行的,相当于这个变量是在driver进程中的。        val a=3        //设置广播变量,每一个executor中的task共享一个广播变量        val broadcast: Broadcast[Int] = sc.broadcast(a)        listRDD.map(kv=>{            //获取广播变量            val aa=broadcast.value            (kv._1,kv._2+aa)        })    }}

spark的持久化和共享变量

总结:如果 executor 端用到了 Driver 的变量,如果不使用广播变量在 Executor 有多少 task 就有 多少 Driver 端的变量副本。如果 Executor 端用到了 Driver 的变量,如果使用广播变量在每个 Executor 中都只有一份 Driver 端的变量副本。
使用的广播变量的条件
   - 广播变量只能在driver端定义,不能在executor中定义
   - 在 Driver 端可以修改广播变量的值,在 Executor 端无法修改广播变量的值。
   - 广播变量的值越大,使用广播变量的优势越明显
   - task个数越多,使用广播变量的优势越明显

 (2)累加器

   介绍:在 Spark 应用程序中,我们经常会有这样的需求,如异常监控,调试,记录符合某特性的数据的数目,这种需求都需要用到计数器,如果一个变量不被声明为一个累加器,那么它将在被改变时不会在 driver 端进行全局汇总,即在分布式运行时每个 task 运行的只是原始变量的一个副本,并不能改变原始变量的值,但是当这个变量被声明为累加器后,该变量就会有分布式计数的功能。

案例

object SparktTest {    def main(args: Array[String]): Unit = {        val conf: SparkConf = new SparkConf()        conf.setAppName("SparktTest")        conf.setMaster("local[2]")        val sc: SparkContext = new SparkContext(conf)        //统计文件有多少行        val hdfsRDD: RDD[String] = sc.textFile("/data/word.txt")        //设置累加器        val mysum: LongAccumulator = sc.longAccumulator("Mysum")        hdfsRDD.map(line=>{            mysum.add(1)            line        }).collect() //触发提交操作        //获取累加器的值        println(mysum.value)        //重置累加器        mysum.reset()    }}

使用累加器的注意事项

   - 累加器在 Driver 端定义赋初始值,累加器只能在 Driver 端读取最后的值,在 Excutor 端更新。
   - 累加器不是一个调优的操作,因为如果不这样做,结果是错的。
   - 累加器不是一个调优的操作,因为如果不这样做,结果是错的。
   - 累加器不是一个调优的操作,因为如果不这样做,结果是错的。
   - 如果系统自带的累加器不能满足要求,还可以自定义累加器

转载于:https://blog.51cto.com/14048416/2338188

你可能感兴趣的文章
js时间和时间戳之间如何转换(汇总)
查看>>
js插件---图片懒加载echo.js结合 Amaze UI ScrollSpy 使用
查看>>
java中string和int的相互转换
查看>>
P1666 前缀单词
查看>>
HTML.2文本
查看>>
Ubuntu unity安装Indicator-Multiload
查看>>
解决Eclipse中新建jsp文件ISO8859-1 编码问题
查看>>
7.对象创建型模式-总结
查看>>
【论文阅读】Classification of breast cancer histology images using transfer learning
查看>>
移动端处理图片懒加载
查看>>
jQuery.on() 函数详解
查看>>
谈缓存和Redis
查看>>
【转】百度地图api,根据多点注标坐标范围计算地图缩放级别zoom自适应地图
查看>>
用户调研(补)
查看>>
ExtJS之开篇:我来了
查看>>
☆1018
查看>>
oracle 去掉空格
查看>>
6.13心得
查看>>
Runtime类
查看>>
eclipse decompiler
查看>>