1. spark ----高效的分布式计算架构
---- 何奇14112852181Life is short,you need spark!
2. Life is short,you need spark!
目标Scope(解决什么问题)
在大规模的特定数据集上的迭代运算或重复查询检索
官方定义:
a MapReduce-like cluster computing framework designed for low-latency interativejobs and interactive use from an interpreter
8. Hadoop Vs. Spark适用范围大比拼1. Hadoop Vs. Spark
Hadoop/MapReduce和Spark最适合的都是做离线型的数据分析,但Hadoop特别适合是单次分析的数据量“很大”的情景,而Spark则适用于数据量不是很大的情景。这儿所说的“很大”,是相对于整个集群中的内存容量而言的,因为Spark是需要将数据HOLD在内存中的。一般的,1TB以下的数据量都不能算很大,而10TB以上的数据量都是算“很大”的。比如说,20个节点的一个集群(这样的集群规模在大数据领域算是很小的了),每个节点64GB内存(不算很小,但也不能算大),共计1.28TB。让这样规模的一个集群把500GB左右的数据HOLD在内存中还是很轻松的。这时候,用Spark的执行速度都会比Hadoop快,毕竟在MapReduce过程中,诸如spill等这些操作都是需要写磁盘的。
13. Spark与Hadoop的对比
Spark的中间数据放到内存中,对于迭代运算效率更高。
Spark aims to extend MapReduce for iterative algorithms, and interactive low latency data mining. One major difference between MapReduce and Spark is that MapReduce is acyclic. That is, data flows in from a stable source, isprocessed, and flows out to a stable filesystem. Spark allows iterative computation on the same data, which would form a cycle if jobs were visualized.
spark旨在延长MapReduce的迭代算法,和互动低延迟数据挖掘的。 MapReduce和Spark的一个主要区别,MapReduce是非周期性。也就是说,数据流从一个稳定的来源,加工,流出到一个稳定的文件系统。“Spark允许相同的数据,这将形成一个周期,如果工作是可视化的迭代计算。
14. Spark与Hadoop的对比Resilient Distributed Dataset (RDD) serves as an abstraction to rawdata, and some data is kept in memory and cached for later use. This last pointis very important; Spark allows data to be committed in RAM for an approximate20x speedup over MapReduce based on disks. RDDs are immutable and created through parallel transformations such as map, filter, groupBy and reduce.
弹性分布式数据集(RDD)作为原始数据的抽象,和一些数据保存在内存中缓存供以后使用。最后这点很重要;spark允许在RAM致力于为近似20X基于加速了MapReduce的磁盘上的数据。RDDs是不可改变的,并通过并行转换,如地图,过滤器,GroupBy和减少创建的。
23. RDD的存储与分区
用户可以选择不同的存储级别存储RDD以便重用。
当前RDD默认是存储于内存,但当内存不足时,RDD会spill到disk。
RDD在需要进行分区把数据分布于集群中时会根据每条记录Key进行分区(如Hash 分区),以此保证两个数据集在Join时能高效。
Life is short,you need spark!
24. RDD的内部表示
在RDD的内部实现中每个RDD都可以使用5个方面的特性来表示:
分区列表(数据块列表)
计算每个分片的函数(根据父RDD计算出此RDD)
对父RDD的依赖列表
对key-value RDD的Partitioner【可选】
每个数据分片的预定义地址列表(如HDFS上的数据块的地址)【可选】
Life is short,you need spark!
25. RDD的存储级别RDD根据useDisk、useMemory、deserialized、replication四个参数的组合提供了11种存储级别:
val NONE = new StorageLevel(false, false, false)
val DISK_ONLY = new StorageLevel(true, false, false)
val DISK_ONLY_2 = new StorageLevel(true, false, false, 2)
val MEMORY_ONLY = new StorageLevel(false, true, true)
val MEMORY_ONLY_2 = new StorageLevel(false, true, true, 2)
val MEMORY_ONLY_SER = new StorageLevel(false, true, false)
val MEMORY_ONLY_SER_2 = new StorageLevel(false, true, false, 2)
val MEMORY_AND_DISK = new StorageLevel(true, true, true)
val MEMORY_AND_DISK_2 = new StorageLevel(true, true, true, 2)
val MEMORY_AND_DISK_SER = new StorageLevel(true, true, false)
val MEMORY_AND_DISK_SER_2 = new StorageLevel(true, true, false, 2)
28. RDD的生成2 // 根据hadoop配置和分片从InputFormat中获取RecordReader进行数据的读取。
reader = fmt.getRecordReader(split.inputSplit.value, conf, Reporter.NULL)
val key: K = reader.createKey()
val value: V = reader.createValue()
//使用Hadoop MapReduce的RecordReader读取数据,每个Key、Value对以元组返回。
override def getNext() = {
try {
finished = !reader.next(key, value)
} catch {
case eof: EOFException =>
finished = true}
(key, value)
}
29. RDD的转换与操作对于RDD可以有两种计算方式:转换(返回值还是一个RDD)与操作(返回值不是一个RDD)。
转换(Transformations) (如:map, filter, groupBy, join等),Transformations操作是Lazy的,也就是说从一个RDD转换生成另一个RDD的操作不是马上执行,Spark在遇到Transformations操作时只会记录需要这样的操作,并不会去执行,需要等到有Actions操作的时候才会真正启动计算过程进行计算。
操作(Actions) (如:count, collect, save等),Actions操作会返回结果或把RDD数据写到存储系统中。Actions是触发Spark启动计算的动因。
下面示例说明Transformations与Actions在Spark的使用。
Life is short,you need spark!
30. val sc = new SparkContext(master, "Example", System.getenv("SPARK_HOME"),
Seq(System.getenv("SPARK_TEST_JAR")))
val rdd_A = sc.textFile(hdfs://.....)
val rdd_B = rdd_A.flatMap((line => line.split("\\s+"))).map(word => (word, 1))
val rdd_C = sc.textFile(hdfs://.....)
val rdd_D = rdd_C.map(line => (line.substring(10), 1))
val rdd_E = rdd_D.reduceByKey((a, b) => a + b)
val rdd_F = rdd_B.jion(rdd_E)
rdd_F.saveAsSequenceFile(hdfs://....)
34.
Spark对于资源管理与作业调度可以使用Standalone(独立模式),Apache Mesos及Hadoop YARN来实现。 Spark on Yarn在Spark0.6时引用,但真正可用是在现在的branch-0.8版本。Spark on Yarn遵循YARN的官方规范实现,得益于Spark天生支持多种Scheduler和Executor的良好设计,对YARN的支持也就非常容易。
Life is short,you need spark!
35. 在业界的使用
Spark项目在2009年启动,2010年开源, 现在使用的有:Berkeley, Princeton, Klout, Foursquare, Conviva, Quantifind, Yahoo! Research & others, 淘宝等,豆瓣也在使用Spark的python克隆版Dpark。
Life is short,you need spark!
37. 淘宝与spark选择了一个数据量比较大、计算量复杂度比较高的作业,对该Spark分支进行了压测,以期望尽可能多的暴露问题并解决。在这个过程下,我们跌跌撞撞的掉进并爬出了几个坑,并把它们都踩踩平,同时回馈社区,起到了良好的良性反馈作用。虽然每个改动都不大,但是都很关键,影响了Spark On Yarn的稳定性和容错性。整个整个过程,在《Spark on Yarn:几个关键Pull Request》中,有详细的记录和剖析,对Spark和Yarn计算框架有兴趣的同学,可以一起研究和探讨一下。
目前我们的集群规模是:基于云梯的Yarn集群,100台机器,每台24G内存,8核CPU,单个作业最大可用内存400G。在集群上,我们已经实现的算法有:MLR,PageRank和KMeans,其中的MLR已经在平稳的进行生产调度。关于MLR的详细性能评测,在《Spark on Yarn: Softmax Regression算法的实现》这篇文章中,会有详细的阐述,研究算法同学可以多关注一下。-----2013年