spark 编程珠玑-RDD 篇

jopen 8年前

 

RDD

有关RDD的基础概念请阅读spark官方文档,或网上搜索其他内容。本文完全是实战经验的总结。

惰性求值

RDD的转换操作都是惰性求值的。
惰性求值意味着我们对RDD调用转化操做(例如map操作)并不会立即执行,相反spark会在内部记录下所要求执行的操作的相关信息。
把数据读取到RDD的操作同样也是惰性的,因此我们调用sc.textFile()时数据没有立即读取进来,而是有必要时才会读取。和转化操作一样读取数据操作也有可能被多次执行。这在写代码时要特别注意。

关于惰性求值,对新手来说可能有与直觉相违背之处。有接触过函数式语言类如haskell的应该不会陌生。
在最初接触spark时,我们也会有这样的疑问。
也参与过这样的讨论:

 val sc = new SparkContext("local[2]", "test")   val f:Int ⇒ Int = (x:Int) ⇒ x + 1   val g:Int ⇒ Int = (x:Int) ⇒ x + 1   val rdd = sc.parallelize(Seq(1,2,3,4),1)   //1   val res1 = rdd.map(x ⇒ g(f(x))).collect   //2   val res2 = rdd.map(g).map(f).collect

第1和第2两种操作均能得到我们想要的结果,但那种操作更好呢?
直观上我们会觉得第1种操作更好,因为第一种操作可以仅仅需要一次迭代就能得到我们想要的结果。第二种操作需要两次迭代操作才能完成。
是我们想象的这样吗?让我们对函数f和g的调用加上打印。按照上面的假设。1和2的输出分别是这样的:

1:  f   g   f   g   f   g   f   g         2:  g   g   g   g   f   f   f   f

代码:

val sc = new SparkContext("local[2]", "test")  val f:Int ⇒ Int = (x:Int) ⇒ {      print("f\t")      x + 1      }  val g:Int ⇒ Int = (x:Int) ⇒ {    print("g\t")    x + 1  }  val rdd = sc.parallelize(Seq(1,2,3,4), 1  //1  val res1 = rdd.map(x ⇒ g(f(x))).collect()  //2  val res2 = rdd.map(f).map(g).collect()

将上面的代码copy试着运行一下吧,我们在控制台得到的结果是这样的。

f   g   f   g   f   g   f   g  f   g   f   g   f   g   f   g

是不是大大出乎我们的意料?这说明什么?说明spark是懒性求值的! 我们在调用map(f)时并不会真正去计算, map(f)只是告诉spark数据是怎么计算出来的。map(f).map(g)其实就是在告诉spark数据先通过f在通过g计算出来的。然后在collect()时,spark在一次迭代中先后对数据调用f、g。

继续回到我们最初的问题,既然两种调用方式,在性能上毫无差异,那种调用方式更好呢?我们更推荐第二种调用方式,除了api更加清晰之外。在调用链很长的情况下,我们可以利用spark的检查点机制,在中间添加检查点,这样数据恢复的代价更小。而第一种方式调用链一旦出错,数据只能从头计算。

那么spark到底施加了何种魔法,如此神奇?让我们来拨开spark的层层面纱。最好的方式当然是看源码了。以map为例:

RDD的map方法

  /**     * Return a new RDD by applying a function to all elements of this RDD.     */    def map[U: ClassTag](f: T => U): RDD[U] = withScope {      val cleanF = sc.clean(f)      new MapPartitionsRDD[U, T](this, (context, pid, iter) => iter.map(cleanF))    }

和MapPartitionsRDD的compute方法

override def compute(split: Partition, context:                  TaskContext): Iterator[U] =                  f(context, split.index,                  firstParent[T].iterator(split,                   context))

关键是这个 iter.map(cleanF)),我们调用一个map方法其实是在iter对象上调用一个map方法。iter对象是scala.collection.Iterator的一个实例。
在看一下Iterator的map方法

def map[B](f: A => B): Iterator[B]=        new AbstractIterator[B] {      def hasNext = self.hasNext      def next() = f(self.next())  }

联想到我们刚才说的我们在RDD上调用一个map方法只是告诉spark数据是怎么计算出来的,并不会真正计算。是不是恍然大悟了。

向spark传递函数

我们可以把定义好的内联函数、方法的引用或静态方法传递给spark。就像scala的其它函数式API一样。我们还要考虑一些细节,比如传递的函数及其引用的变量是可序列话的(实现了java的Serializable接口)。除此之外传递一个对象的方法或字段时,会包含对整个对象的引用。我们可以把该字段放到一个局部变量中,来避免传递包含该字段的整个对象。

scala中的函数传递

class SearchFunctions(val query:String){      def isMatch(s:String) = s.contains(query)        def getMatchFuncRef(rdd:RDD[String])          :RDD[String]= {          //isMatch 代表this.isMatch因此我们要传递整个this          rdd.map(isMatch)      }        def getMatchFieldRef(rdd:RDD[String])={      //query表示this.query因此我们要传递整个this      rdd.map(x=>x.split(query))      }        def getMatchsNoRef(rdd:RDD[String])={      //安全只要把我们需要的字段放到局部变量中      val q = this.query      rdd.map(x=>x.split(query))      }  }

如果在scala中出现了NotSerializableException,通常问题就在我们传递了一个不可序列化类中的函数或字段。传递局部可序列变量或顶级对象中的函数始终是安全的。

持久化

如前所述,spark的RDD是惰性求值的,有时我们希望能过多次使用同一个RDD。如果只是简单的对RDD调用行动操作,spark每次都会重算RDD和它的依赖。这在迭代算法中消耗巨大。 可以使用RDD.persist()让spark把RDD缓存下来。

避免GroupByKey

让我们来看看两种workCount的方式,一种使用reduceByKey,另一种使用groupByKey。

val words = Array("one", "two", "two", "three", "three", "three")  val wordPairsRDD = sc.parallelize(words).map(word => (word, 1))    val wordCountsWithReduce = wordPairsRDD    .reduceByKey(_ + _)    .collect()    val wordCountsWithGroup = wordPairsRDD    .groupByKey()    .map(t => (t._1, t._2.sum))    .collect()

虽然两种方式都能产生正确的结果,但reduceByKey在大数据集时工作的更好。这时因为spark会在shuffling数据之前,为每一个分区添加一个combine操作。这将大大减少shuffling前的数据。

而groupBykey会shuff所有的数据,这大大加重了网络传输的数据量。另外如果一个key对应很多value,这样也可能引起out of memory。

来自: http://my.oschina.net/u/2533099/blog/603996