• 1. spark ----高效的分布式计算架构 ---- 何奇14112852181Life is short,you need spark!
  • 2. Life is short,you need spark! 目标Scope(解决什么问题) 在大规模的特定数据集上的迭代运算或重复查询检索 官方定义: a MapReduce-like cluster computing framework designed for low-latency interativejobs and interactive use from an interpreter
  • 3. 目的理解 首先,MapReduce-like是说架构上和多数分布式计算框架类似,Spark有分配任务的主节点(Driver)和执行计算的工作节点(Worker) 其次,Low-latency基本上应该是源于Worker进程较长的生命周期,可以在一个Job过程中长驻内存执行Task,减少额外的开销 然后对interative重复迭代类查询运算的高效支持,是Spark的出发点了。最后它提供了一个基于Scala的Shell方便交互式的解释执行任务
  • 4. 小小示意图
  • 5. 那是如何实现的?核心思路或架构: RDD:Spark的核心概念是RDD (resilient distributed dataset),指的是一个只读的,可分区的分布式数据集,这个数据集的全部或部分可以缓存在内存中,在多次计算间重用。 RDD--分布式弹性数据集可以把数据集保持在内存中,而不是在磁盘中,这样每次计算只需要从内存中读取数据,而不是通过IO读取磁盘,跨过了系统IO瓶颈,大大节省了数据传输时间. Scala语言的简洁的特点,所以,Spark非常合适做机器学习的工作中频繁的迭代计算.RDD可以从本地数据集中通过输入转换产生,也可以使用已保存的RDD,也可以从别的RDD转换而来,需要使用时,可以把RDD缓存在内存中(如果内存不够大,会自动保存到本地). RDD通过血统来实现容错机制,每一次转换,系统会保存转换日志,如果RDD出现故障,系统会根据转换日志重建RDD.
  • 6. 还有呢----血统Lineage:利用内存加快数据加载在众多的其它的In-Memory类数据库或Cache类系统中也有实现,Spark的主要区别在于它处理分布式运算环境下的数据容错性(节点实效/数据丢失)问题时采用的方案。为了保证RDD中数据的鲁棒性,RDD数据集通过所谓的血统关系(Lineage)记住了它是如何从其它RDD中演变过来的。 相比其它系统的细颗粒度的内存数据更新级别的备份或者LOG机制,RDD的Lineage记录的是粗颗粒度的特定数据变换(Transformation)操作(filter, map, join etc.)行为。 当这个RDD的部分分区数据丢失时,它可以通过Lineage获取足够的信息来重新运算和恢复丢失的数据分区。这种粗颗粒的数据模型,限制了Spark的运用场合,但同时相比细颗粒度的数据模型,也带来了性能的提升。
  • 7. 适用领域和细节 总之,Spark的核心思路就是将数据集缓存在内存中加快读取速度,同时用lineage关联的RDD以较小的性能代价保证数据的鲁棒性。 正如其目标scope,Spark适用于需要多次操作特定数据集的应用场合。需要反复操作的次数越多,所需读取的数据量越大,受益越大,数据量小但是计算密集度较大的场合,受益就相对较小。 快在哪里? 使用内存缓存数据集快在以下几个方面:首先是磁盘IO,其次数据的序列化和反序列化的开销也节省了,最后相对其它内存数据库系统,粗颗粒度的内存管理机制减小了数据容错的代价(如典型的数据备份复制机制)--------------------前面也有提到!
  • 8. Hadoop Vs. Spark适用范围大比拼1. Hadoop Vs. Spark Hadoop/MapReduce和Spark最适合的都是做离线型的数据分析,但Hadoop特别适合是单次分析的数据量“很大”的情景,而Spark则适用于数据量不是很大的情景。这儿所说的“很大”,是相对于整个集群中的内存容量而言的,因为Spark是需要将数据HOLD在内存中的。一般的,1TB以下的数据量都不能算很大,而10TB以上的数据量都是算“很大”的。比如说,20个节点的一个集群(这样的集群规模在大数据领域算是很小的了),每个节点64GB内存(不算很小,但也不能算大),共计1.28TB。让这样规模的一个集群把500GB左右的数据HOLD在内存中还是很轻松的。这时候,用Spark的执行速度都会比Hadoop快,毕竟在MapReduce过程中,诸如spill等这些操作都是需要写磁盘的。
  • 9. 大数据的“电光石火”这儿有2点需要提一下: 1)一般情况下,对于中小互联网和企业级的大数据应用而言,单次分析的数量都不会“很大”,因此可以优先考虑使用Spark,特别是当Spark成熟了以后(Hadoop已经出到2.5了,而Spark才刚出1.0呢)。比如说,中国移动的一个省公司(在企业级,移动公司的数据量还是算相当大的),他们单次分析的数量一般也就几百GB,连1TB都很少超过,更不用说超过10TB了,所以完全可以考虑用Spark逐步替代Hadoop。 2)业务通常认为Spark更适用于机器学习之类的“迭代式”应用,但这仅仅是“更”。一般地,对于中等规模的数据量,即便是不属于“更适合”范畴的应用,Spark也能快2~5倍左右。一个对比测试显示,80GB的压缩数据(解压后超过200GB),10个节点的集群规模,跑类似“sum+group-by”的应用,MapReduce花了5分钟,而spark只需要2分钟。
  • 10. 细细聊聊sparkspark是一个开源的分布式计算系统,提供快速的数据分析功能。从灵机一闪的实验室“电火花”成长为大数据技术平台中异军突起的新锐。Spark如其名,展现了大数据不常见的“电光石火”。具体特点概括为“轻、快、灵和巧”。 官网地址 据说性能高出hadoop很多(个人理解主要是因为两点:内存和cache),而且相对更加简单,灵活。非常适合需要反复迭代的计算,比如机器学习。 spark基于scala编写,对我而言 是门陌生的语言,我是初学者 ~~V~V~~~~
  • 11. SparkSpark是UC Berkeley AMP lab所开源的类Hadoop MapReduce的通用的并行计算框架,Spark基于map reduce算法实现的分布式计算,拥有Hadoop MapReduce所具有的优点; 但不同于MapReduce的是Job中间输出和结果可以保存在内存中,从而不再需要读写HDFS,因此Spark能更好地适用于数据挖掘与机器学习等需要迭代的map reduce的算法。
  • 12. spark
  • 13. Spark与Hadoop的对比 Spark的中间数据放到内存中,对于迭代运算效率更高。 Spark aims to extend MapReduce for iterative algorithms, and interactive low latency data mining. One major difference between MapReduce and Spark is that MapReduce is acyclic. That is, data flows in from a stable source, isprocessed, and flows out to a stable filesystem. Spark allows iterative computation on the same data, which would form a cycle if jobs were visualized. spark旨在延长MapReduce的迭代算法,和互动低延迟数据挖掘的。 MapReduce和Spark的一个主要区别,MapReduce是非周期性。也就是说,数据流从一个稳定的来源,加工,流出到一个稳定的文件系统。“Spark允许相同的数据,这将形成一个周期,如果工作是可视化的迭代计算。
  • 14. Spark与Hadoop的对比Resilient Distributed Dataset (RDD) serves as an abstraction to rawdata, and some data is kept in memory and cached for later use. This last pointis very important; Spark allows data to be committed in RAM for an approximate20x speedup over MapReduce based on disks. RDDs are immutable and created through parallel transformations such as map, filter, groupBy and reduce. 弹性分布式数据集(RDD)作为原始数据的抽象,和一些数据保存在内存中缓存供以后使用。最后这点很重要;spark允许在RAM致力于为近似20X基于加速了MapReduce的磁盘上的数据。RDDs是不可改变的,并通过并行转换,如地图,过滤器,GroupBy和减少创建的。
  • 15. Spark与Hadoop的对比Spark比Hadoop更通用。 Spark提供的数据集操作类型有很多种,不像Hadoop只提供了Map和Reduce两种操作。比如map, filter, flatMap, sample, groupByKey, reduceByKey, union, join, cogroup, mapValues, sort,partionBy等多种操作类型,Spark把这些操作称为Transformations。同时还提供Count, collect, reduce, lookup, save等多种actions操作。 这些多种多样的数据集操作类型,给给开发上层应用的用户提供了方便。各个处理节点之间的通信模型不再像Hadoop那样就是唯一的Data Shuffle一种模式。用户可以命名,物化,控制中间结果的存储、分区等。可以说编程模型比Hadoop更灵活。 Spark不适用那种异步细粒度更新状态的应用,例如web服务的存储或者是增量的web爬虫和索引。就是对于那种增量修改的应用模型,当然不适合把大量数据拿到内存中了。增量改动完了,也就不用了,不需要迭代了。
  • 16. Spark与Hadoop的对比 容错性。 在分布式数据集计算时通过checkpoint来实现容错,而checkpoint有两种方式,一个是checkpoint data,一个是logging the updates。用户可以控制采用哪种方式来实现容错。 做checkpoint的两种方式,一个是checkpoint data,一个是logging the updates。貌似Spark采用了后者。但是文中后来又提到,虽然后者看似节省存储空间。但是由于数据处理模型是类似DAG的操作过程,由于图中的某个节点出错,由于lineage chains的依赖复杂性,可能会引起全部计算节点的重新计算,这样成本也不低。他们后来说,是存数据,还是存更新日志,做checkpoint还是由用户说了算吧。相当于什么都没说,又把这个皮球踢给了用户。所以我看就是由用户根据业务类型,衡量是存储数据IO和磁盘空间的代价和重新计算的代价,选择代价较小的一种策略。 可用性。 Spark通过提供丰富的Scala, Java,Python API及交互式Shell来提高可用性
  • 17. Spark与Hadoop的结合Spark可以直接对HDFS进行数据的读写,同样支持Spark on YARN。Spark可以与MapReduce运行于同集群中,共享存储资源与计算,数据仓库Shark实现上借用Hive,几乎与Hive完全兼容。 从Hadoop 0.23把MapReduce做成了库,看出Hadoop的目标是要支持包括MapReduce在内的更多的并行计算模型,比如MPI,Spark等。毕竟现在Hadoop的单节点CPU利用率并不高,那么假如这种迭代密集型运算是和现有平台的互补。同时,这对资源调度系统就提出了更高的要求。有关资源调度方面,UC Berkeley貌似也在做一个Mesos的东西,还用了Linux container,统一调度Hadoop和其他应用模型。 小贴士:配置的时候也是先配置hadoop集群,然后在配置spark集群。
  • 18. 运行模式 本地模式 Standalone模式 Mesoes模式 yarn模式(mapreduce的改进,淘宝~~~)
  • 19. Spark生态系统Shark ( Hive on Spark): Shark基本上就是在Spark的框架基础上提供和Hive一样的H iveQL命令接口,为了最大程度的保持和Hive的兼容性,Shark使用了Hive的API来实现query Parsing和 Logic Plan generation,最后的PhysicalPlan execution阶段用Spark代替Hadoop MapReduce。通过配置Shark参数,Shark可以自动在内存中缓存特定的RDD,实现数据重用,进而加快特定数据集的检索。同时,Shark通过UDF用户自定义函数实现特定的数据分析学习算法,使得SQL数据查询和运算分析能结合在一起,最大化RDD的重复使用。 Spark streaming: 构建在Spark上处理Stream数据的框架,基本的原理是将Stream数据分成小的时间片断(几秒),以类似batch批量处理的方式来处理这小部分数据。Spark Streaming构建在Spark上,一方面是因为Spark的低延迟执行引擎(100ms+)可以用于实时计算,另一方面相比基于Record的其它处理框架(如Storm),RDD数据集更容易做高效的容错处理。此外小批量处理的方式使得它可以同时兼容批量和实时数据处理的逻辑和算法。方便了一些需要历史数据和实时数据联合分析的特定应用场合。 Bagel: Pregel on Spark,可以用Spark进行图计算,这是个非常有用的小项目。Bagel自带了一个例子,实现了Google的PageRank算法。
  • 20. Spark核心概念理解Resilient Distributed Dataset 缩写RDD 弹性分布数据集 RDD是Spark的最基本抽象,是对分布式内存的抽象使用,实现了以操作本地集合的方式来操作分布式数据集的抽象实现。RDD是Spark最核心的东西,它表示已被分区,不可变的并能够被并行操作的数据集合,不同的数据集格式对应不同的RDD实现。RDD必须是可序列化的。RDD可以cache到内存中,每次对RDD数据集的操作之后的结果,都可以存放到内存中,下一个操作可以直接从内存中输入,省去了MapReduce大量的磁盘IO操作。这对于迭代运算比较常见的机器学习算法, 交互式数据挖掘来说,效率提升比较大。
  • 21. RDD的特点: 它是在集群节点上的不可变的、已分区的集合对象。 通过并行转换的方式来创建如(map, filter, join, etc)。 失败自动重建。 可以控制存储级别(内存、磁盘等)来进行重用。 必须是可序列化的。 是静态类型的。
  • 22. RDD的好处 RDD只能从持久存储或通过Transformations操作产生,相比于分布式共享内存(DSM)可以更高效实现容错,对于丢失部分数据分区只需根据它的lineage就可重新计算出来,而不需要做特定的Checkpoint。 RDD的不变性,可以实现类Hadoop MapReduce的推测式执行。 RDD的数据分区特性,可以通过数据的本地性来提高性能,这与Hadoop MapReduce是一样的。 RDD都是可序列化的,在内存不足时可自动降级为磁盘存储,把RDD存储于磁盘上,这时性能会有大的下降但不会差于现在的MapReduce。
  • 23. RDD的存储与分区 用户可以选择不同的存储级别存储RDD以便重用。 当前RDD默认是存储于内存,但当内存不足时,RDD会spill到disk。 RDD在需要进行分区把数据分布于集群中时会根据每条记录Key进行分区(如Hash 分区),以此保证两个数据集在Join时能高效。 Life is short,you need spark!
  • 24. RDD的内部表示 在RDD的内部实现中每个RDD都可以使用5个方面的特性来表示: 分区列表(数据块列表) 计算每个分片的函数(根据父RDD计算出此RDD) 对父RDD的依赖列表 对key-value RDD的Partitioner【可选】 每个数据分片的预定义地址列表(如HDFS上的数据块的地址)【可选】 Life is short,you need spark!
  • 25. RDD的存储级别RDD根据useDisk、useMemory、deserialized、replication四个参数的组合提供了11种存储级别: val NONE = new StorageLevel(false, false, false) val DISK_ONLY = new StorageLevel(true, false, false) val DISK_ONLY_2 = new StorageLevel(true, false, false, 2) val MEMORY_ONLY = new StorageLevel(false, true, true) val MEMORY_ONLY_2 = new StorageLevel(false, true, true, 2) val MEMORY_ONLY_SER = new StorageLevel(false, true, false) val MEMORY_ONLY_SER_2 = new StorageLevel(false, true, false, 2) val MEMORY_AND_DISK = new StorageLevel(true, true, true) val MEMORY_AND_DISK_2 = new StorageLevel(true, true, true, 2) val MEMORY_AND_DISK_SER = new StorageLevel(true, true, false) val MEMORY_AND_DISK_SER_2 = new StorageLevel(true, true, false, 2)
  • 26. RDD的生成RDD有两种创建方式: 1、从Hadoop文件系统(或与Hadoop兼容的其它存储系统)输入(例如HDFS)创建。 2、从父RDD转换得到新RDD。 下面来看一从Hadoop文件系统生成RDD的方式,如:val file = spark.textFile("hdfs://..."),file变量就是RDD(实际是HadoopRDD实例),生成的它的核心代码如下:
  • 27. RDD的生成1 // SparkContext根据文件/目录及可选的分片数创建RDD, 这里我们可以看到Spark与Hadoop MapReduce很像 // 需要InputFormat, Key、Value的类型,其实Spark使用的Hadoop的InputFormat, Writable类型。 def textFile(path: String, minSplits: Int = defaultMinSplits): RDD[String] = { hadoopFile(path, classOf[TextInputFormat], classOf[LongWritable], classOf[Text], minSplits) .map(pair => pair._2.toString) } // 根据Hadoop配置,及InputFormat等创建HadoopRDD new HadoopRDD(this, conf, inputFormatClass, keyClass, valueClass, minSplits)
  • 28. RDD的生成2 // 根据hadoop配置和分片从InputFormat中获取RecordReader进行数据的读取。 reader = fmt.getRecordReader(split.inputSplit.value, conf, Reporter.NULL) val key: K = reader.createKey() val value: V = reader.createValue() //使用Hadoop MapReduce的RecordReader读取数据,每个Key、Value对以元组返回。 override def getNext() = { try { finished = !reader.next(key, value) } catch { case eof: EOFException => finished = true} (key, value) }
  • 29. RDD的转换与操作对于RDD可以有两种计算方式:转换(返回值还是一个RDD)与操作(返回值不是一个RDD)。 转换(Transformations) (如:map, filter, groupBy, join等),Transformations操作是Lazy的,也就是说从一个RDD转换生成另一个RDD的操作不是马上执行,Spark在遇到Transformations操作时只会记录需要这样的操作,并不会去执行,需要等到有Actions操作的时候才会真正启动计算过程进行计算。 操作(Actions) (如:count, collect, save等),Actions操作会返回结果或把RDD数据写到存储系统中。Actions是触发Spark启动计算的动因。 下面示例说明Transformations与Actions在Spark的使用。 Life is short,you need spark!
  • 30. val sc = new SparkContext(master, "Example", System.getenv("SPARK_HOME"), Seq(System.getenv("SPARK_TEST_JAR"))) val rdd_A = sc.textFile(hdfs://.....) val rdd_B = rdd_A.flatMap((line => line.split("\\s+"))).map(word => (word, 1)) val rdd_C = sc.textFile(hdfs://.....) val rdd_D = rdd_C.map(line => (line.substring(10), 1)) val rdd_E = rdd_D.reduceByKey((a, b) => a + b) val rdd_F = rdd_B.jion(rdd_E) rdd_F.saveAsSequenceFile(hdfs://....)
  • 31. (本页无文本内容)
  • 32. Lineage(血统)利用内存加快数据加载,在众多的其它的In-Memory类数据库或Cache类系统中也有实现,Spark的主要区别在于它处理分布式运算环境下的数据容错性(节点实效/数据丢失)问题时采用的方案。为了保证RDD中数据的鲁棒性,RDD数据集通过所谓的血统关系(Lineage)记住了它是如何从其它RDD中演变过来的。相比其它系统的细颗粒度的内存数据更新级别的备份或者LOG机制,RDD的Lineage记录的是粗颗粒度的特定数据转换(Transformation)操作(filter, map, join etc.)行为。当这个RDD的部分分区数据丢失时,它可以通过Lineage获取足够的信息来重新运算和恢复丢失的数据分区。这种粗颗粒的数据模型,限制了Spark的运用场合,但同时相比细颗粒度的数据模型,也带来了性能的提升。 RDD在Lineage依赖方面分为两种Narrow Dependencies与Wide Dependencies用来解决数据容错的高效性。Narrow Dependencies是指父RDD的每一个分区最多被一个子RDD的分区所用,表现为一个父RDD的分区对应于一个子RDD的分区或多个父RDD的分区对应于一个子RDD的分区,也就是说一个父RDD的一个分区不可能对应一个子RDD的多个分区。Wide Dependencies是指子RDD的分区依赖于父RDD的多个分区或所有分区,也就是说存在一个父RDD的一个分区对应一个子RDD的多个分区。对与Wide Dependencies,这种计算的输入和输出在不同的节点上,lineage方法对与输入节点完好,而输出节点宕机时,通过重新计算,这种情况下,这种方法容错是有效的,否则无效,因为无法重试,需要向上其祖先追溯看是否可以重试(这就是lineage,血统的意思),Narrow Dependencies对于数据的重算开销要远小于Wide Dependencies的数据重算开销。
  • 33. 资源管理与作业调度
  • 34. Spark对于资源管理与作业调度可以使用Standalone(独立模式),Apache Mesos及Hadoop YARN来实现。 Spark on Yarn在Spark0.6时引用,但真正可用是在现在的branch-0.8版本。Spark on Yarn遵循YARN的官方规范实现,得益于Spark天生支持多种Scheduler和Executor的良好设计,对YARN的支持也就非常容易。 Life is short,you need spark!
  • 35. 在业界的使用 Spark项目在2009年启动,2010年开源, 现在使用的有:Berkeley, Princeton, Klout, Foursquare, Conviva, Quantifind, Yahoo! Research & others, 淘宝等,豆瓣也在使用Spark的python克隆版Dpark。 Life is short,you need spark!
  • 36. 淘宝与sparkSpark是一个最近越来越热门的基于内存分布式计算框架,之前我们已经对它有过一段时间的研究和使用,总体来说,有如下优点: 灵活的函数式MapReduce(Java & Scala) 兼容Hive语法的Shark 图计算 流计算 性能极高的内存计算 直接Hdfs数据读写 基于以上优点,我们把它引入生产,为团队内从事数据分析和挖掘的同学提供多一种更有力的数据挖掘支持
  • 37. 淘宝与spark选择了一个数据量比较大、计算量复杂度比较高的作业,对该Spark分支进行了压测,以期望尽可能多的暴露问题并解决。在这个过程下,我们跌跌撞撞的掉进并爬出了几个坑,并把它们都踩踩平,同时回馈社区,起到了良好的良性反馈作用。虽然每个改动都不大,但是都很关键,影响了Spark On Yarn的稳定性和容错性。整个整个过程,在《Spark on Yarn:几个关键Pull Request》中,有详细的记录和剖析,对Spark和Yarn计算框架有兴趣的同学,可以一起研究和探讨一下。 目前我们的集群规模是:基于云梯的Yarn集群,100台机器,每台24G内存,8核CPU,单个作业最大可用内存400G。在集群上,我们已经实现的算法有:MLR,PageRank和KMeans,其中的MLR已经在平稳的进行生产调度。关于MLR的详细性能评测,在《Spark on Yarn: Softmax Regression算法的实现》这篇文章中,会有详细的阐述,研究算法同学可以多关注一下。-----2013年
  • 38. 最新进展----12月份Spark 1.2 发布,此版本包括 172 位贡献者和超过 1000 个 commits。 此版本包括 : Spark 核心操作和性能改进; 添加新的网络传输子系统,进行了较大的改进; Spark SQL 引入了一个外部数据源的支持,支持 Hive13; 动态分区; fixed-precision decimal type; MLlib 添加了一个新的面向管道包 (spark.ml),组合多个算法; Spark Streaming 添加了一个 Python API,提前写错误容错日志; GraphX 正式脱离 Alpha 版本,引入了一个稳定的 API。
  • 39. 百分之九十的spark学习者 总会被安装spark集群所难倒!Spark的现状与未来发展
  • 40. 感谢聆听~鉴于本人能力有限,spark集群花费时间过多 如有不足之处,请见谅