• 1. RDDs的特性—by 球哥
  • 2. 大数据学习网介绍我们的网址是:bigdatastudy.cn 我们提供专业的大数据学习视频,包括Hadoop,Spark,Storm,Mahout,机器学习等。 我们定价合理,让每个人都学得起大数据。
  • 3. 友情提示本系列课程主要由Learning.Spark这本书整理而来。 本系列课程主要目的,帮助想要学习Spark的同学入门。 本系列课程中的ppt可自由传播,无需任何授权,但不要用于商业用途哦。 本系列课程中的视频不要自由传播哦,如果同学们觉得课程价格不合理,欢迎在下面的群中反馈,我们会考虑调整价格的。 Spark学习交流群:368770186,371896553
  • 4. RDDs的特性RDDs的血统关系图 RDDs的延迟计算 RDD.persist()
  • 5. RDDs的血统关系图Spark维护着RDDs之间的依赖关系和创建关系, 叫做 血统关系图(lineage graph)。 Spark使用血统关系图来计算每个RDD的需求和恢复丢失的数据(当一些存储的RDD丢失的时候)。 血统关系图举例:
  • 6. 延迟计算(Lazy Evaluation)Spark对RDDs的计算是,他们第一次使用action操作的时候(使用Lazy Evaluation:Pig),这种方式在处理大数据的时候特别有用,可以减少数据的传输。 transformations操作不会被立刻执行 ,Spark 内部记录metadata 来表名这个操作已经被响应了。 对RDD概念的另一种理解方式:把每个RDD看做由计算数据的指令组成。 加载数据也是延迟计算。 sc.textFile(),数据只有在必要的时候,才会被加载进去。
  • 7. 延迟计算(Lazy Evaluation)例子:
  • 8. RDD.persist()默认情况下,每次你在RDDs上面进行action操作的时候,Spark都重新计算RDDs。如果你想重复利用一个RDD,在这个RDD上面进行多个action操作,可以使用RDD.persist(),让Spark保存RDD。我们可以把RDDs保存在不同的地方。 在生产环境中,会经常用到persist(),把数据集加载到内存中,并重复的操作。 例子-persist() val result = input.map(x => x * x) result.persist(StorageLevel.DISK_ONLY) println(result.count()) println(result.collect().mkString(","))
  • 9. RDD.persist()当persisit RDD的时候,计算RDD的节点会存储partitions。如果一个已经存储了数据的节点失败的话,在需要使用这些数据的时候,Spark会重新计算丢失的partitions。如果我们想当节点失败的时候,整个集群不降速,可以把数据备份在多个节点上。 基于我们的目标,Spark有多种不同级别的存储。 在Scala和Java中,默认的persist()会把数据按照非序列化存储在JVM heap中。 存在内存中的时候,Least Recently Used (LRU)机制。 把数据存储到硬盘或者非heap中,数据总是序列化的。 unpersist()方法从缓存中移除。
  • 10. org.apache.spark.storage.StorageLevel存储级别adding _2 代表把数据存储在两个机器上 RDD.persist()级别空间占用Cpu消耗是否在内存中是否在硬盘上MEMORY_ONLYHighLowYNMEMORY_ONLY_SERLowHighYNMEMORY_AND_DISKHighMediumSomeSome内存中放不下的时候,往硬盘上放MEMORY_AND_DISK_SERLowHighSomeSome内存中放不下的时候,往硬盘上放,内存中数据时序列化的。DISK_ONLYLowHighNY
  • 11. RDD类型之间的转换在Scala中,有特定函数的RDDs是隐式转换的。 我们需要导入,org.apache.spark.SparkContext._,帮助完成隐式转换。 隐式转换把一个RDD转换成各种包装类,例如DoubleRDDFunctions(对于数字类型的RDDs),PairRDDFunctions(对于key/value pairs)。 Spark-Scala API文档 http://spark.apache.org/docs/latest/api/scala/#package 所以,找mean()方法,不是到RDD中,而是,DoubleRDDFunctions
  • 12. 小结RDDs的血统关系图 RDDs的延迟计算 RDD.persist()