spark快速大数据分析之读书笔记

SophiaSpoff 8年前

来自: http://my.oschina.net/sucre/blog/617340


RDD编程

1、Spark中的RDD就是一个不可变的分布式对象集合。每个RDD都被分为多个分区,这些分区运行在集群中的不同节点上。

2、用户可以使用两种方法创建RDD:读取一个外部数据集,以及在驱动器程序中对一个集合进行并行化(比如list和set)。

创建RDD最简单的方式就是把程序中一个已有的集合传给SparkContext的parallelize()方法。

val lines = sc.textFile("README.md")  val linesP = sc.parallelize(List("pandas","i like pandas"))

创建出来后,RDD支持两种类型的操作:转化操作(transformation)和行动操作(action)。转化操作会由一个RDD生成一个新的RDD。行动操作会对RDD计算出一个结果,并把结果返回到驱动器程序中,或把结果存储到外部存储系统(如HDFS)中

3、转化操作和行动操作的区别在于spark计算RDD的方式不同。如果对于一个特定的函数是属于转化操作还是行动操作感到困惑,可以看看它的返回值类型:转化操作返回的是RDD,而行动操作返回的是其他的数据类型。

虽然你可以在任何时候定义新的RDD,但spark只会惰性计算这些RDD。它们只有第一次在一个行动操作中用到时,才会真正计算。默认情况下,Spark的RDD会在你每次对它们进行行动操作时重新计算。如果想在多个行动操作中重用同一个RDD,可以使用RDD.persist(),这里也可以使用RDD.cache(),让spark把这个RDD缓存下来。

来看两个例子:

转化操作

val inputRDD = sc.textFile("log.txt")  val errorRDD = inputRDD.filter(line=>line.contains("error"))

行动操作

errorRDD.count()  for(String line:errorRDD.take(10)){      System.out.println(line)  }

这里take()获取了RDD中的少量元素。除此这外,RDD中还有一个获取元素的方法,collect()函数,但是只有当整个数据集能在单台机器的内存中放得下时,才能使用collect(),因此,collect()不能用在大规模数据集上。

4、之所以把RDD称为弹性的,是因为在任何时候都能进行重算。当保存RDD数据的一台机器失败时,Spark还可以使用这种特性来重算出丢掉的分区。

5、总结一下,每个spark程序或shell对话都按如下方式工作:

(1)从外部数据创建出输入RDD。

(2)使用诸如filter()这样的转化操作对RDD进行转化,以定义新的RDD。

(3)告诉spark对需要被重用的中间结果RDD执行persist()操作。

(4)使用行动操作(例如count()和first()等)来触发一次并行计算,spark会对计算进行优化后再执行。