spark学习笔记

larruping 贡献于2017-01-03

作者 iMindMap  创建于2014-12-09 05:37:00   修改者Edward Li  修改于2015-10-14 05:10:00字数112606

文档摘要:
关键词:

SPARK 学习笔记 Spark亚太研究院群集体成员 2014/12/9 shanghai-pd Spark 学习笔记 1 1 序 14 2 Spark学习阶段篇 14 2.1 Scala语言 14 2.2 Spark平台的API 15 2.3 Spark内核 15 2.4 Spark上的核心框架 15 2.5 商业级别的Spark项目 16 2.6 Spark解决方案 16 3 Spark的学习路线篇 16 4 生态圈 16 4.1 Amplab 16 4.2 BDAS 17 4.3 Spark与Hadoop的比较 18 4.3.1 Spark与Hadoop的对比 19 4.3.2 Spark特性 20 4.3.3 Hadoop的MapReduce计算模型 20 4.3.4 Spark的计算模型 21 4.3.5 对比实例 24 4.4 Spark与Hadoop的结合 25 4.5 Shark 25 5 BlinkDB 25 5.1 BlinkDBs架构 25 5.2 BlinkDBs的设计核心思想 26 6 Spark架构设计 27 7 Spark编程模型 28 7.1 应用程序编程模型 28 7.2 RDD 30 7.2.1 RDD模型 30 7.2.2 RDD示意图 30 7.2.3 RDD简介 31 7.2.4 RDD的实现 32 7.2.5 RDD的存储 32 7.2.6 RDD的分区 33 7.2.7 RDD的操作 33 7.2.8 RDD的依赖 34 7.2.8.1 RDD依赖结构图 34 7.2.8.2 依赖与Stage示例 35 7.2.8.3 窄依赖(narrow dependencies) 37 7.2.8.4 宽依赖(wide dependencies) 37 7.2.8.5 两种依赖的的区别 38 7.2.9 RDD的持久化 38 7.2.10 RDD的容错性 38 7.2.10.1 分布式数据集容错方式 38 7.2.10.2 RDD的容错机制 38 7.2.10.3 Spark的高容错机制lineage 39 7.3 广播变量(Broadcast Variables) 41 7.4 累加器(Accumulator) 41 8 Spark执行框架 41 8.1 Spark运行架构 42 8.2 更多的细节 43 8.3 Spark的Task 46 8.4 Shuffle过程 47 9 集群模式的运行架构 47 9.1 Spark Standalone集群模式 47 9.2 Yarn集群 — Spark on Yarn 48 9.2.1 Cluster模式 48 10 spark 的调度机制 49 11 Spark内核 50 11.1 Spark内核初探 50 11.2 Spark内核核心术语解析 50 11.2.1 相关核心概念的结构 50 11.2.2 Application 51 11.2.3 Job 51 11.2.4 Driver Program 51 11.2.5 Cluster Manager 51 11.2.6 Worker Node 51 11.2.7 Executor 52 11.2.8 Task 52 11.2.9 Stage 52 11.2.10 RDD 52 11.3 Spark集群概览 52 11.4 Spark核心组件 53 11.5 资源管理与作业调度 55 11.6 Spark作业的执行 55 11.7 Spark任务调度系统初见 56 11.8 Spark内核核心源码解析 60 11.8.1 RDD依赖源码解析 60 11.8.2 依赖与Stage划分 61 11.8.3 RDD之分区源码解析 63 11.8.4 RDD源码解析 64 11.8.5 SparkContext核心源码解析 68 11.8.6 TaskSceduler启动源码解析 72 11.8.7 Driver中AppClient源码解析 74 11.8.8 AppClient注册Master 77 11.8.9 Worker中Executor启动过程源代码解析 82 11.8.10 DAGScheduler源码解析 86 11.8.11 Spark的Web监控源码解析 87 11.8.12 提交代码源码初步解读 88 11.8.12.1 入口点 88 11.8.12.2 入口点开始源码解读 88 11.8.12.3 由SparkSubmit的主要功能指导源码解读 89 11.8.12.4 (集群管理,部署模式)源码解读 90 11.8.13 提交代码源码解读之(STANDALONE , CLUSTER)方式 90 11.8.13.1 Client源码解读 91 11.8.13.2 ClientActor源码解读 91 11.9 存储源码解读 92 11.10 Job全生命周期源码解读 92 11.10.1 RDD的count操作为例触发Job全生命周期源码研究 92 11.11 Spark内核之存储管理 110 11.12 Spark内核之计算 110 11.13 Spark内核之容错 110 11.14 工具类源码解析 110 11.14.1 Utils 110 11.15 Spark内核之网络传输 111 11.16 Spark内核之Shuffle 111 11.16.1 两种Shuffle Manager的比较 112 12 Spark History Server 112 12.1 源码解析 113 13 Spark性能优化 113 14 Spark核心框架篇 113 15 Spark Streaming 113 15.1 外部交互的数据流图 113 15.2 Spark内部数据处理图 114 15.3 SparkStreaming框架 115 15.4 Spark Streaming的编程模型 117 15.5 Spark Streaming案例分析 119 15.6 性能调优 119 15.6.1 优化运行时间 119 15.6.2 优化内存使用 120 15.7 核心术语 120 15.8 源码解析 121 15.8.1 数据源源码 121 16 Spark SQL 121 16.1 简介 121 16.2 Spark SQL 框架 122 16.3 Spark SQL组件 123 16.4 运行框架 123 16.5 Spark SQL支持版本 124 16.6 与Shark的关系 124 16.7 Spark Sql应用 125 16.7.1 预置条件 125 16.7.1.1 配置 125 16.7.1.2 运行环境 125 16.7.2 Shell下运行Spark SQL 126 16.7.2.1 在本机运行 126 16.7.2.2 在集群运行 126 16.7.3 Spark SQL CLI 方式运行 128 16.7.3.1 示例 129 17 MLlib(Machine Learning library) 和MLBase 129 17.1 MLLib和MLBase的框架 129 17.2 典型的数据分析流 130 18 GraphX(Graph Processing) 130 18.1 GraphX架构 130 19 Bagel (Pregel on Spark) 131 20 SparkR 131 20.1 数据流 132 20.2 R on Spark 133 20.2.1 安装 133 20.2.1.1 安装要求 133 20.2.1.2 package安装 133 20.3 Running sparkR 134 20.4 Examples, Unit tests 134 20.5 Running on EC2 135 20.6 Running on YARN 135 20.7 Report Issues/Feedback 135 21 Spark开发与应用 136 21.1 Spark开发 136 21.2 Spark应用 136 21.2.1 命令行提交应用程序 136 22 Spark 源码、编译 137 22.1 IntelliJ IDEA中编译调试spark的程序 137 22.2 SBT编译 137 22.2.1 编译命令格式 137 22.2.2 示例 138 22.2.3 进一步说明 138 22.3 Maven 编译 138 22.4 Spark 部署包生成命令make-distribution.sh 139 22.5 Spark 1.3 编译脚本的变更 141 22.5.1 官网下载的部署包 141 22.5.2 编译案例1 142 22.5.3 编译案例2 143 22.6 Mvn测试 144 22.7 win下源码编译 144 22.7.1 SBT编译 144 23 Spark源码环境搭建及编译 147 23.1 源码环境搭建 147 23.1.1 scala 147 23.1.2 git 148 23.1.3 Intellij IDEA 149 23.2 源码下载 150 23.3 IntelliJ IDEA - SBT方式 150 23.4 搭建Spark源码阅读环境(需要联网) 150 23.5 搭建Spark开发环境 151 23.5.1 应用程序的创建 151 23.5.2 intellij IDEA调试、运行应用程序 151 23.6 Maven方式 152 24 部署与应用 153 24.1 集群启动 153 24.1.1 手动方式 153 24.2 Spark应用 154 24.2.1 Spark-Submit方式 154 24.2.2 提交时Driver信息解析 156 24.2.2.1 Standalone 156 24.2.2.2 Yarn 156 24.2.2.3 Mesos 156 24.2.3 提交时Executor信息解析 156 24.2.3.1 Local 156 24.2.3.2 Standalone 157 24.2.4 spark-shell方式 157 24.3 Amazon EC2 157 24.4 Spark Standalone 157 24.5 单机模式-local 157 24.5.1 Spark Standalone 伪分布式模式部署 157 24.5.2 Spark Standalone集群部署 158 24.5.2.1 安装环境 158 24.5.2.2 配置-环境变量 158 24.5.3 HA节点启动 159 24.5.4 监控 160 24.5.5 测试 160 24.5.6 Spark Client 部署 160 24.5.7 Spark Standalone HA 部署 160 24.5.7.1 基于文件系统的HA 160 24.5.7.2 基于zookeeper 的HA 160 24.6 Apache Mesos 161 24.7 Hadoop YARN 161 24.7.1 运行模式 161 24.7.2 配置-环境变量 162 24.7.3 部署 162 24.7.4 监控 162 24.7.4.1 Log方式 162 24.7.5 测试 163 24.8 Spark HistoryServer 163 24.8.1 配置-属性 163 24.8.2 启动 164 24.8.3 监控 164 25 监控 165 25.1 Spark的Web监控页面 165 25.2 Spark的日志 165 25.3 Spark进程查看 168 25.4 Metrics 168 26 Spark可配置参数 168 26.1 应用属性 169 26.2 运行环境变量 169 26.3 Storage相关属性 170 26.4 Shuffle相关属性 172 26.5 SparkUI相关属性 175 26.6 压缩和序列化相关属性 176 26.7 执行时相关属性 180 26.8 网络相关属性 181 26.9 调度相关属性 182 26.10 安全相关属性 185 26.11 SparkStreaming相关属性 186 26.12 Standalone模式特有属性 187 26.13 Spark on Yarn特有属性 189 26.14 配置示例 190 26.14.1 spark-env.sh文件 190 26.14.2 spark-defaults.conf文件 190 27 目录及其相关信息分析 191 27.1 配置(环境变量)相关目录 191 27.1.1 SPARK_WORK_DIR 192 27.2 依赖文件、jar包目录 192 27.2.1 HttpFileServer目录 192 27.2.2 运行时下载的依赖文件和jar包存放目录 193 27.3 运行时相关目录 193 27.4 目录性能分析 193 27.5 目录FQA分析 194 28 调优 194 29 作业调度 194 30 安全 194 31 硬件配置 194 32 调试 194 32.1 Spark-shell 程序调试 194 32.2 IDEA 程序调试 195 32.2.1 调试环境搭建 195 32.2.2 远程调试 195 32.2.3 Driver Program调试 196 32.2.3.1 Client部署模式的调试 197 32.2.3.2 Cluster部署模式的远程调试 197 32.2.4 Master组件远程调试 198 32.2.5 Worker组件远程调试 198 32.2.6 History Server进程的远程调试 198 32.2.7 Executor进程的远程调试 198 32.3 调试示例 199 32.3.1 win下idea中提交任务 199 32.3.2 远程SparkSubmit方式提交调试 200 32.3.3 详细调试案例 201 33 Spark性能优化 201 33.1 10大问题及其解决方案 201 33.2 Shuffle优化 202 33.3 Checkpoint优化 203 34 技巧、FAQs 203 34.1 提交应用失败 203 34.2 Scala版本不一致导致的资源获取失败 204 34.2.1 集群关闭失败 205 34.3 页面相关 206 34.4 应用程序管理 206 34.4.1 如何终止应用程序 206 34.5 Exited Code 207 34.5.1 53 207 34.6 群问题记录 208 34.6.1 Spark亚太研究院3群 - 安留军 208 34.7 Spark Streaming 208 34.7.1 allowMultipleContexts 属性配置 208 34.8 空间不足的问题 209 34.9 文件句柄、进程数限制问题 209 34.10 运行时报错 210 34.11 文件编码错误 210 34.12 单节点多应用启动失败 210 34.13 编译问题 211 34.13.1 编译应用代码报对象已定义错误 211 34.13.2 sbt编译工程时报OOM错误 212 34.13.3 scala.reflect.internal.MissingRequirementError 212 34.14 IDEA 214 34.15 Git工具相关 215 34.15.1 IDEA找不到 git.exe 215 34.16 自动测试代码相关 215 34.17 Tachyon相关 217 34.18 Hadoop相关 217 34.19 心跳超时错误,错误码143 218 34.20 On yarn 模式,错误码15 219 35 实战记录 221 35.1 RDD操作性能类 221 35.1.1 RDD窄依赖的pipeline分析 221 35.1.2 groupByKey的性能分析 222 35.1.3 join的性能优化分析 223 36 参考资料 225 36.1 Shark 226 36.2 Spark的适用场景 227 37 参考文献 228 1 序 基于(转载、拷贝)亚太研究院提供的资源、网络资源(博客、论坛、官网等)的学习,以及亚太群成员提供的性能分析、FQA记录、实战记录等资料,以及小部分自己整理的资料,最终整理成的学习笔记。 参考资料大部分列在参考文献章节。 2 SPARK学习阶段篇 http://book.51cto.com/art/201409/451443.htm Spark 的学习阶段篇包含六大部分,从Scala语言到spark内核、框架,以及spark在商业上的企业级实践等。 2.1 SCALA语言 1,Spark框架是采用Scala语言编写的,精致而优雅。要想成为Spark高手,你就必须阅读Spark的源代码,就必须掌握Scala,; 2,虽然说现在的Spark可以采用多语言Java、Python等进行应用程序开发,但是最快速的和支持最好的开发API依然并将永远是Scala方式的API,所以你必须掌握Scala来编写复杂的和高性能的Spark分布式程序; 3,尤其要熟练掌握Scala的trait、apply、函数式编程、泛型、逆变与协变等; 推荐课程:”精通Spark的开发语言:Scala最佳实践” 2.2 SPARK平台的API 1,掌握Spark中面向RDD的开发模式,掌握各种transformation和action函数的使用; 2,掌握Spark中的宽依赖和窄依赖以及lineage机制; 3,掌握RDD的计算流程,例如Stage的划分、Spark应用程序提交给集群的基本过程和Worker节点基础的工作原理等 推荐课程:“18小时内掌握Spark:把云计算大数据速度提高100倍以上!” 2.3 SPARK内核 Spark的内核部分的学习笔记,在学习路线篇中对应章节记录。 此阶段主要是通过Spark框架的源码研读来深入Spark内核部分: 1,通过源码掌握Spark的任务提交过程; 2,通过源码掌握Spark集群的任务调度; 3,尤其要精通DAGScheduler、TaskScheduler和Worker节点内部的工作的每一步的细节; 推荐课程:“Spark 1.0.0企业级开发动手:实战世界上第一个Spark 1.0.0课程,涵盖Spark 1.0.0所有的企业级开发技术” 2.4 SPARK上的核心框架 Spark上的核心框架部分的学习笔记,在学习路线篇中对应章节记录。 Spark作为云计算大数据时代的集大成者,在实时流处理、图技术、机器学习、NoSQL查询等方面具有显著的优势,我们使用Spark的时候大部分时间都是在使用其上的框架例如Shark、Spark Streaming等: 1,Spark Streaming是非常出色的实时流处理框架,要掌握其DStream、transformation和checkpoint等; 2,Spark的离线统计分析功能,Spark 1.0.0版本在Shark的基础上推出了Spark SQL,离线统计分析的功能的效率有显著的提升,需要重点掌握; 3,对于Spark的机器学习和GraphX等要掌握其原理和用法; 推荐课程:“Spark企业级开发最佳实践” 2.5 商业级别的SPARK项目 通过一个完整的具有代表性的Spark项目来贯穿Spark的方方面面,包括项目的架构设计、用到的技术的剖析、开发实现、运维等,完整掌握其中的每一个阶段和细节,这样就可以让您以后可以从容面对绝大多数Spark项目。 推荐课程:“Spark架构案例鉴赏:Conviva、Yahoo!、优酷土豆、网易、腾讯、淘宝等公司的实际Spark案例” 2.6 SPARK解决方案 1,彻底掌握Spark框架源码的每一个细节; 2,根据不同的业务场景的需要提供Spark在不同场景的下的解决方案; 3,根据实际需要,在Spark框架基础上进行二次开发,打造自己的Spark框架; 推荐课程:“精通Spark:Spark内核剖析、源码解读、性能优化和商业案例实战” 前面所述的成为Spark高手的六个阶段中的第一和第二个阶段可以通过自学逐步完成,随后的三个阶段最好是由高手或者专家的指引下一步步完成,最后一个阶段,基本上就是到”无招胜有招”的时期,很多东西要用心领悟才能完成。 3 SPARK的学习路线篇 Spark学习路线篇,基于spark的官网文档组织结构,分别对各个部分进行学习,并记录各个部分学习笔记。 4 生态圈 4.1 AMPLAB 4.2 BDAS BDAS(Berkeley Data Analytics Stack) ,伯克利大学提出的关于数据分析的软件栈: 目前的大数据的处理类型: 1. 复杂的批量数据处理(batch data processing),通常的时间跨度在数十分钟到数小时之间。 2. 基于历史数据的交互式查询(interactive query),通常的时间跨度在数十秒到数分钟之间。 3. 基于实时数据流的数据处理(streaming data processing),通常的时间跨度在数百毫秒到数秒之间。  目前已有很多相对成熟的开源软件来处理以上三种情景,我们可以利用MapReduce来进行批量数据处理,可以用Impala来进行交互式查询,对于流式数据处理,我们可以采用Storm。对于大多数互联网公司来说,一般都会同时遇到以上三种情景,那么在使用的过程中这些公司可能会遇到如下的不便。 1. 三种情景的输入输出数据无法无缝共享,需要进行格式相互转换。 2. 每一个开源软件都需要一个开发和维护团队,提高了成本。 3. 在同一个集群中对各个系统协调资源分配比较困难。  BDAS就是以Spark为基础的一套软件栈,利用基于内存的通用计算模型将以上三种情景一网打尽,同时支持Batch、Interactive、Streaming的处理,且兼容支持HDFS和S3等分布式文件系统,可以部署在YARN和Mesos等流行的集群资源管理器之上。 图:Batch + Interactive + Streaming 4.3 SPARK与HADOOP的比较 Spark是UC Berkeley AMP lab所开源的类Hadoop MapReduce的通用的并行计算框架,Spark基于map reduce算法实现的分布式计算,拥有Hadoop MapReduce所具有的优点;但不同于MapReduce的是Job中间输出和结果可以保存在内存中,从而不再需要读写HDFS,因此Spark能更好地适用于数据挖掘与机器学习等需要迭代的map reduce的算法。 4.3.1 SPARK与HADOOP的对比 1. Spark的中间数据放到内存中,对于迭代运算效率更高 Ø Spark更适合于迭代运算比较多的ML和DM运算。因为在Spark里面,有RDD的抽象概念。 2. Spark比Hadoop更通用 Ø Spark提供的数据集操作类型有很多种,不像Hadoop只提供了Map和Reduce两种操作。比如map, filter, flatMap, sample, groupByKey, reduceByKey, union, join, cogroup, mapValues,sort,partionBy等多种操作类型,Spark把这些操作称为Transformations。同时还提供Count, collect, reduce, lookup, save等多种actions操作。 Ø 这些多种多样的数据集操作类型,给给开发上层应用的用户提供了方便。各个处理节点之间的通信模型不再像Hadoop那样就是唯一的Data Shuffle一种模式。用户可以命名,物化,控制中间结果的存储、分区等。可以说编程模型比Hadoop更灵活。 Ø 不过由于RDD的特性,Spark不适用那种异步细粒度更新状态的应用,例如web服务的存储或者是增量的web爬虫和索引。就是对于那种增量修改的应用模型不适合。 3. 容错性 Ø 在RDD计算,通过checkpint进行容错,而checkpoint有两种方式,一个是checkpoint data,一个是logging the updates。用户可以控制采用哪种方式来实现容错,默认是logging the updates方式,通过记录跟踪所有生成RDD的转换(transformations)也就是记录每个RDD的lineage(血统)来重新计算生成丢失的分区数据。 4. 可用性 Ø Spark通过提供丰富的Scala, Java,Python API及交互式Shell来提高可用性。 各公司对spark与Hadoop的选择: 1. 原先支持Hadoop的四大商业机构纷纷宣布支持Spark; 2. Mahout前一阶段表示从现在起他们将不再接受任何形式的以MapReduce形式实现的算法,另外一方面,Mahout宣布新的算法基于Spark; 3. Cloudera的机器学习框架Oryx的执行引擎也将由Hadoop的MapReduce替换成Spark; 4. Google已经开始将负载从MapReduce转移到Pregel和Dremel上; 5. FaceBook则将负载转移到Presto上; 4.3.2 SPARK特性 1. 内存计算; 2. 提供了支持DAG图的分布式并行计算框架,减少多次计算之间中间结果的IO开销; 3. 提供Cache机制来支持多次迭代计算或者数据共享,减少IO开销; 4. RDD之间维护了血统(lineage)关系,一旦RDD fail掉,能通过父RDD自动重建,保证了容错性; 5. 移动计算而非移动数据,RDD Partition可以就近读取分布式文件系统中的数据块到各个节点内存中进行计算; 6. 使用多线程池模型来减少task启动开销:扩展,参考董西城老师的博客,比较MR与Spark的进程模型与线程池模型; 7. Shuffle过程中不必要的sort操作; 8. 采用容错的、高可伸缩性的akka作为通讯框架; 9. … 4.3.3 HADOOP的MAPREDUCE计算模型 Hadoop本身的计算模型决定了Hadoop上的所有工作都要转化成Map、Shuffle和Reduce等核心阶段,由于每次计算都要从磁盘读或者写数据,同时真个计算模型需要网络传输,这就导致了越来越不能忍受的延迟性,同时在前一个任务运行完之前,任何一个任务都不可以运行,这直接导致了其无力支持交互式应用; 首先我们看一下Hadoop经典的处理过程: MapReduce在每次执行的时候都要从磁盘读数据,计算完毕后都要把数据存放到磁盘上: 4.3.4 SPARK的计算模型 而在Spark中,使用内存替代了使用HDFS存储中间结果: 另外一方面,DAG也是Spark快的极为重要的原因,下面是一张DAG图的示例: 大家也可以看一下网络上一张描述DAG更多细节的图片: 基于DAG,Spark具备了非常精致的作业调度系统: DAG中的依赖有宽依赖和窄依赖之分: 在DAG图中可以根据依赖对pipeline等优化操作: 基于RDD和DAG,并行计算整个Job: Spark之所以快,还有一个原因就是其容错机制,这个我们会在本讲的后面和大家分享。 4.3.5 对比实例 http://spark.apache.org/ Apache Spark™ is a fast and general engine for large-scale data processing. Speed Run programs up to 100x faster than Hadoop MapReduce in memory, or 10x faster on disk. Spark has an advanced DAG execution engine that supports cyclic data flow and in-memory computing. 4.4 SPARK与HADOOP的结合 Spark与Hadoop的结合主要包含两个方面: 1. HDFS:Spark可以直接对HDFS进行数据的读写; 2. YARN:Spark支持Spark on YARN部署模式,由YARN负责集群的资源管理与调度;Spark的应用可以与Hadoop MapReduce运行在同一个集群中,共享存储资源与计算。 4.5 SHARK Shark ( Hive on Spark): Shark基本上就是在Spark的框架基础上提供和Hive一样的H iveQL命令接口,为了最大程度的保持和Hive的兼容性,Shark使用了Hive的API来实现 query Parsing和 Logic Plan generation,最后的PhysicalPlan execution阶段用Spark代替Hadoop MapReduce。通过配置Shark参数,Shark可以自动在内存中缓存特定的RDD,实现数据重用,进而加快特定数据集的检索。同时,Shark通过UDF用户自定义函数实现特定的数据分析学习算法,使得SQL数据查询和运算分析能结合在一起,最大化RDD的重复使用。 5 BLINKDB 5.1 BLINKDBS架构 5.2 BLINKDBS的设计核心思想 1. 通过采样,建立并维护一组多维度样本; 2. 查询进来时,选择合适的样本来运行查询。 6 SPARK架构设计 正如所有的分布式结构一样,Spark分布式集群也是主从结构的: Spark运行时组件如下所示: Spark运行时候的事件流如下所示:   7 SPARK编程模型 7.1 应用程序编程模型 应用程序编程模型包括以下几个部分: 1. Driver Program ( SparkContext ) 2. Executor ( RDD 操作) Ø 输入:Base-> RDD Ø Transformation :RDD->RDD Ø Action: RDD->driver or Base Ø 缓存 Persist or cache():控制型的RDD 3. 共享变量 Ø broadcast variables Ø accumulators Spark是大数据时代通用而高效的计算平台,基于RDD成功实现了“One stack to rule them all”理念。 在Spark中一切都是以RDD为基础和核心。以RDD为基石的Spark编程模型。 7.2 RDD 7.2.1 RDD模型 来源:《大型集群上的快速和通用数据处理架构(修正版).pdf》 1.3 基于RDD机制实现的模型 RDD设计时的最大挑战在于定义一个能提供高效容错能力的编程接口。 7.2.2 RDD示意图 7.2.3 RDD简介 Resilient Distributed Datasets(RDD) 弹性分布数据集,是一个容错的、并行的数据结构,可以让用户显式地将数据存储到磁盘和内存中,并能控制数据的分区。同时,RDD还提供了一组丰富的操作来处理数据。在这些操作中,诸如map、flatMap、filter等转换操作实现了monad模式,很好地契合了Scala的集合操作。除此之外,RDD还提供了诸如join、groupBy、reduceByKey等更为方便的操作(注意,reduceByKey是action,而非transformation),以支持常见的数据运算。 RDD具有以下特点: 1. 它是在集群节点上的不可变的(即,RDD是只读的)、已分区的集合对象。 2. 创建:通过并行转换的方式来创建,如(map, filter, join, etc)。 3. 容错性:通过Lineage,实现失败自动重建。 4. 缓存:可以控制存储级别(内存、磁盘等)来进行重用。 5. 必须是可序列化的。 6. 是静态类型的。 RDD的好处: 1. RDD只能从持久存储或通过Transformations操作产生,相比于分布式共享内存(DSM)可以更高效实现容错,对于丢失部分数据分区只需根据它的lineage就可重新计算出来,而不需要做特定的Checkpoint。 2. RDD的不变性,可以实现类Hadoop MapReduce的推测式执行。 3. RDD的数据分区特性,可以通过数据的本地性来提高性能,这与Hadoop MapReduce是一样的。 4. RDD都是可序列化的,在内存不足时可自动降级为磁盘存储,把RDD存储于磁盘上,这时性能会有大的下降但不会差于现在的MapReduce。 7.2.4 RDD的实现 RDD本身是一个抽象类,其内部实现包括以下5个部分,其中前三个必须实现: 1. 分区列表(数据块列表) 2. 计算每个分片的函数(根据父RDD计算出此RDD) 3. 对父RDD的依赖列表 4. 对key-value RDD的Partitioner【可选】 5. 每个数据分片的预定义地址列表(如HDFS上的数据块的地址)【可选】 其中,前三个用于Lineage,后两个用于优化执行。 另外有两个特殊的RDD: 他们都是controlling operations: 7.2.5 RDD的存储 RDD的存储级别:RDD根据useDisk、useMemory、deserialized、replication四个参数的组合提供了11种存储级别: val NONE = new StorageLevel(false, false, false) val DISK_ONLY = new StorageLevel(true, false, false) val DISK_ONLY_2 = new StorageLevel(true, false, false, 2) val MEMORY_ONLY = new StorageLevel(false, true, true) val MEMORY_ONLY_2 = new StorageLevel(false, true, true, 2) val MEMORY_ONLY_SER = new StorageLevel(false, true, false) val MEMORY_ONLY_SER_2 = new StorageLevel(false, true, false, 2) val MEMORY_AND_DISK = new StorageLevel(true, true, true) val MEMORY_AND_DISK_2 = new StorageLevel(true, true, true, 2) val MEMORY_AND_DISK_SER = new StorageLevel(true, true, false) val MEMORY_AND_DISK_SER_2 = new StorageLevel(true, true, false, 2) 用户可以选择不同的存储级别存储RDD以便重用。 可以使用persist 和cache 方法将任意RDD 缓存到内存或磁盘、tachyon 文件系统中;其中,cache是persist使用MEMORY_ONLY存储级别的快捷方式。 缓存是容错的,如果一个RDD 分片丢失,可以通过构建它的transformation自动重构。 RDD的容错则是利用Lineage机制,当RDD fail时,利用父RDD重新构建。 被缓存的RDD 被使用的时,存取速度会加速10X+。 推荐?:executor 中60% 做cache , 40% 做task。 7.2.6 RDD的分区 分区相关的整理: 1. Spark 内核部分: SparkContext文件加载部分 —— 加载构建的RDD的分区数默认由文件的split(一般用blocks)确定 —— 细节等调整,参考RDD之分区源码解析部分。 2. K-V型的RDD ——默认分区器的分区数配置参数: 7.2.7 RDD的操作 RDD将操作分为两类:transformation与action。其中transformations是lazy execution的,需要具体的action去触发,每个action操作都是一个单独的job; 在RDD的内部实现机制中,底层接口则是基于迭代器的,从而使得数据访问变得更高效,也避免了大量中间结果对内存的消耗。 7.2.8 RDD的依赖 在Spark中RDD是具备依赖关系的,而依赖分为两类: 1. 窄依赖:每个父RDD的分区都至多被一个子RDD的分区使用; 2. 宽依赖:多个子RDD的分区依赖一个父RDD的分区。 如HadoopRDD — 分区数不能小于分块数的根源 —窄依赖。 7.2.8.1 RDD依赖结构图 宽依赖和窄依赖的样例。每一个方框表示一个RDD,其内的阴影矩形表示RDD的分区。 7.2.8.2 依赖与STAGE示例 Spark如何计算job的stage的例子。实线圆角方框标识的是RDD。阴影背景的矩形是分区,若已存于内存中则用黑色背景标识。RDD G 上一个Action的执行将会以宽依赖为分区来构建各个stage,对各stage内部的窄依赖则前后连接构成流水线。在本例中,stage 1 的输出已经存在RAM中,所以直接执行 stage 2 ,然后stage 3。 7.2.8.3 窄依赖(NARROW DEPENDENCIES) “Narrow”依赖的一个好处就是可以进行内部的pipeline操作 具体情况有 : 1. 子RDD 的每个分区依赖于常数个父分区(即与数据规模无关); 2. 输入输出一对一的算子,且结果RDD 的分区结构不变,主要是map 、flatMap; 3. 输入输出一对一,但结果RDD 的分区结构发生了变化,如union 、coalesce; 4. 从输入中选择部分元素的算子,如filter 、distinct 、subtract 、sample; 7.2.8.4 宽依赖(WIDE DEPENDENCIES) 1. 子RDD 的每个分区依赖于所有父RDD 的分区; 2. 对单个RDD 基于key 进行重组和reduce ,如groupByKey 、reduceByKey ; 3. 对两个RDD 基于key 进行join 和重组,如join; 对应宽依赖类的操作 {比如w shuffle依赖),我们会将中间记录物理化到保存父分区的节点上。这和MapReduce物化Map的输出类似,能简化数据的故障恢复过程。 如果某些stage变为不可用(例如,因为shuffle在map阶段的某个输出丢失了),则重新提交相应的任务以并行计算丢失的分区。 7.2.8.5 两种依赖的的区别 从两个方面来说比较有用。 1. 首先,窄依赖可以进行pipeline操作,即,允许在单个集群节点上流水线式执行,这个节点可以计算所有父级分区。 对应的,宽依赖需要所有的父RDD数据可用并且数据已经通过类MapReduce的操作shuffle完成。 2. 其次,在窄依赖中,节点失败后的恢复更加高效。 窄依赖中,只有丢失的父级分区需要重新计算,并且这些丢失的父级分区可以并行地在不同节点上重新计算。与此相反,在宽依赖的继承关系中,单个失败的节点可能导致一个RDD的所有先祖RDD中的一些分区丢失,导致计算的重新执行。 7.2.9 RDD的持久化 RDD在持久化的需要考虑内存策略 7.2.10 RDD的容错性 7.2.10.1 分布式数据集容错方式 分布式数据集支持容错通常采用两种方式:数据复制或日志记录。对于以数据为中心的系统而言,这两种方式都非常昂贵,因为它需要跨集群网络拷贝大量数据,毕竟带宽的数据远远低于内存。 7.2.10.2 RDD的容错机制 RDD天生是支持容错的: 1. 首先,它自身是一个不变的(immutable)数据集; 2. 其次,基于DAG图的lineage(血统关系),记录了操作之间的关系图(Graph of Operation)。 由于无需采用replication方式支持容错,可以很好地降低跨网络的数据传输成本。 不过,在某些场景下,Spark也需要利用记录日志的方式来支持容错。例如,在Spark Streaming中,针对数据进行update操作,或者调用Streaming提供的window操作时,就需要恢复执行过程的中间状态。此时,需要通过Spark提供的checkpoint机制,以支持操作能够从checkpoint得到恢复。 针对RDD的wide dependency,最有效的容错方式同样还是采用checkpoint机制。不过,似乎Spark的最新版本仍然没有引入auto checkpointing机制。 7.2.10.3 SPARK的高容错机制LINEAGE 基于DAG图,lineage是轻量级而高效的,操作之间相互具备lineage的关系,每个操作只关心其父操作,各个分片的数据之间互不影响,出现错误的时候只要恢复单个Split的特定部分即可: 本图来自Matei Zaharia撰写的论文An Architecture for Fast and General Data Processing on Large Clusters。图中,一个box代表一个RDD,一个带阴影的矩形框代表一个partition。 针对调度器自身失败的容错,拷贝相应RDD的lineage是比较直接的解决之道。但现阶段我们并不提供该类容错特性。 1. 利用内存加快数据加载,在众多的其它的In-Memory类数据库或Cache类系统中也有实现,Spark的主要区别在于它处理分布式运算环境下的数据容错性(节点实效/数据丢失)问题时采用的方案。为了保证RDD中数据的鲁棒性,RDD数据集通过所谓的血统关系(Lineage)记住了它是如何从其它RDD中演变过来的。相比其它系统的细颗粒度的内存数据更新级别的备份或者LOG机制,RDD的Lineage记录的是粗颗粒度的特定数据转换(Transformation)操作(filter, map, join etc.)行为。当这个RDD的部分分区数据丢失时,它可以通过Lineage获取足够的信息来重新运算和恢复丢失的数据分区。这种粗颗粒的数据模型,限制了Spark的运用场合,但同时相比细颗粒度的数据模型,也带来了性能的提升。 2. 现有的基于集群的内存存储抽象,比如分布式共享内存[79],键-值存储[81],数据库,以及Piccolo[86],提供了一个对内部状态基于细粒度更新的接口(例如,表格里面的单元).在这样的设计之下,提供容错性的方法就要么是在主机之间复制数据,要么对各主机的更新情况做日志记录。这两种方法对于数据密集型的任务来说代价很高,因为它们需要在带宽远低于内存的集群网络间拷贝大量的数据,同时还将产生大量的存储开销。 3. 与上述系统不同的是,RDD提供一种基于粗粒度变换(如, map, filter, join)的接口,该接口会将相同的操作应用到多个数据集上。这使得他们可以通过记录用来创建数据集的变换(lineage),而不需存储真正的数据,进而达到高效的容错性。 4. RDD在Lineage依赖方面分为两种Narrow Dependencies与Wide Dependencies用来解决数据容错的高效性。Narrow Dependencies是指父RDD的每一个分区最多被一个子RDD的分区所用,表现为一个父RDD的分区对应于一个子RDD的分区或多个父RDD的分区对应于一个子RDD的分区,也就是说一个父RDD的一个分区不可能对应一个子RDD的多个分区。Wide Dependencies是指子RDD的分区依赖于父RDD的多个分区或所有分区,也就是说存在一个父 RDD的一个分区对应一个子RDD的多个分区。对与Wide Dependencies,这种计算的输入和输出在不同的节点上,lineage方法对与输入节点完好,而输出节点宕机时,通过重新计算,这种情况下,这种方法容错是有效的,否则无效,因为无法重试,需要向上其祖先追溯看是否可以重试(这就是lineage,血统的意思),Narrow Dependencies对于数据的重算开销要远小于Wide Dependencies的数据重算开销。 7.3 广播变量(BROADCAST VARIABLES) 1. 广播变量缓存到各个节点的内存中,而不是每个Task; 2. 广播变量被创建后,能在集群中运行的任何函数调用; 3. 广播变量是只读的,不能在被广播后修改; 4. 对于大数据集的广播, Spark 尝试使用高效的广播算法来降低通信成本; 5. 使用方法: val broadcastVar = sc.broadcast(Array(1, 2, 3)) // 创建广播变量 broadcastVar.value // 广播变量的使用 7.4 累加器(ACCUMULATOR) 1. 累加器只支持加法操作; 2. 累加器可以高效地并行,用于实现计数器和变量求和; 3. Spark 原生支持数值类型和标准可变集合的计数器,但用户可以添加新的类型; 4. 只有驱动程序才能获取累加器的值 5. 使用方法: val accum = sc.accumulator(0) // 累加器变量的创建 sc.parallelize(Array(1, 2, 3, 4)).foreach(x => accum += x) // 累加器变量的使用 accum.value // 在Driver Program 中获取累加器变量的值 8 SPARK执行框架 8.1 SPARK运行架构 8.2 更多的细节 1. BlockManager 2. AKKA 3. NETTY 运行的时候是以RDD为统一抽象并行化运行,对应有如下两种视图: 更进一步的详细RDD并行化计算过程如下所示: RDD在产生作业调用的时候,经典的过程如下所示:   1. DAGScheduler 负责构建Stage; 2. 记录哪个RDD 或者Stage 输出被物化; 3. shuffle 输出丢失时,报告fetch failed 错误,并重新提交stage; 4. 为每一个TaskSet 构建一个TaskSetManager 实例,管理其生命周期,并根据不同的Cluster Manager,将Taskset 传给对应的底层调度器,提交到集群运行并监控: Ø spark-cluster TaskScheduler Ø yarn-cluster YarnClusterScheduler Ø yarn-client YarnClientClusterScheduler 5. 数据本地性决定每个Task 最佳位置(process-local, node-local, rack-local and then any) ; 6. 推测执行,碰到straggle 任务需要放到别的节点上重试 8.3 SPARK的TASK Task 分为: 1. ShuffleMapTask :向DAGScheduler 返回一个MapStatus 对象,该对象管理ShuffleMapTask 的运算输出结果在ShuffleBlockManager 里的相关存储信息,而非结果本身,这些存储位置信息将作为下一个Stage 的任务获取输入数据的依据。 2. ResultTask:对应FinalStage的任务,向Driver返回结果,返回结果的类型也包含两类: Ø DirectTaskResult:当结果足够小时,直接放DirectTaskResult 对象中返回给Driver; Ø IndirectTaskResult:当结果超过特定尺寸(默认约10MB )时,先在Executor 端将DirectTaskResult序列化,再把序列化的结果作为一个Block 存放在BlockManager 里,然后将BlockManager 返回的BlockID放在IndirectTaskResult 对象中返回给driver。 8.4 SHUFFLE过程 Spark 内核提供了一个可拔插的shuffle 接口。 shuffle结果的patition 数目由ShuffleDependency 中的Partitioner 对象来决定。 9 集群模式的运行架构 9.1 SPARK STANDALONE集群模式 其中,当 部署模式不同时,运行driver application的节点也不同: 1. Client部署模式:在提交application的节点上运行。 2. Cluster部署模式:在集群中的某个worker节点中运行,该节点由Master调度。 9.2 YARN集群 — SPARK ON YARN 不同部署模式下的运行架构会有所不同。 9.2.1 CLUSTER模式 10 SPARK 的调度机制 Spark的调度机制包括以下几种: 1. Master对Driver、Application的调度机制 2. DAGScheduler对Stage的调度机制 3. TaskScheduler对TaskSet的调度机制 4. TaskSetManager对Task的调度机制 调度器向各机器的任务分配采用延时调度机制[117]并根据数据存储位置(本地性)来确定。若一个任务需要处理的某个分区刚好存储在某个节点的内存中,则该任务会分配给那个节点。否则,如果一个任务处理的某个分区,该分区含有的RDD提供较佳的位置(例如,一个HDFS文件),我们把该任务分配到这些位置。 对应容错方面: 1. 若某个任务执行缓慢 (即"落后者"straggler),系统则会在其他节点上执行该任务的拷贝--这与MapReduce做法类似,并取最先得到的结果作为最终的结果。 2. 最后,虽然目前在Spark中所有的计算都是为了对驱动程序中调用动作的响应而执行,我们也试验让集群上的任务(如映射)调用查找操作,它根据键值能够随机访问散列分区的RDDs的元素。在这种设计下,如果任务所需要的分区丢失了,则该任务需要告知调用器去重新计算该分区。 11 SPARK内核 数据处理系统的三大关键要素——“存储”、“计算”与“容错”。 Spark 是一个大数据处理引擎(或者说是开发包),其核心是Spark Core ,基础是RDD ; Spark内核揭秘共分四个部分: 第一部分:Spark内核初探 第二部分:Spark内核核心源码解析 第三部分:Job全生命周期源码解读 第四部分:Spark性能优化 —— 参加性能篇或FAQ 11.1 SPARK内核初探 本讲是Spark内核揭秘的第一部分:Spark内核初探,具体内容如下所示: 1,Spark内核核心术语解析; 2,Spark集群概览; 3,Spark核心组件; 4,Spark任务调度系统初见; 11.2 SPARK内核核心术语解析 11.2.1 相关核心概念的结构 11.2.2 APPLICATION Application是创建了SparkContext实例对象的Spark用户程序,包含了一个Driver program和集群中多个Worker上的Executor,其中,每个Worker为每个应用仅仅提供一个Executor。 Spark-shell是一个应用程序,因为spark-shell在启动的时候创建了SparkContext对象,其名称为sc。 11.2.3 JOB 和Spark的action相对应,每一个action例如count、savaAsTextFile等都会对应一个Job实例,该Job实例包含多任务的并行计算。 11.2.4 DRIVER PROGRAM 运行Application的main函数并且新建SparkContext实例的程序。通常用SparkContext代表Driver Program。 11.2.5 CLUSTER MANAGER 集群资源管理的外部服务,在Spark上现在主要有Standalone、Yarn、Mesos等三种集群资源管理器,Spark自带的Standalone模式能够满足绝大部分纯粹的Spark计算环境中对集群资源管理的需求,基本上只有在集群中运行多套计算框架的时候才建议考虑Yarn和Mesos。 11.2.6 WORKER NODE 集群中可以运行Application代码的工作节点,相当于Hadoop的slave节点。 11.2.7 EXECUTOR 在Worker Node上为Application启动的一个工作进程,在进程中负责任务(Task)的运行,并且负责将数据存放在内存或磁盘上,必须注意的是,每个应用在一个Worker Node上只会有一个Executor,在Executor内部通过多线程的方式并发处理应用的任务。 每个Application都有各自独立的executors。 11.2.8 TASK 被Driver送到executor上的工作单元,通常情况下一个task会处理一个split的数据,每个split一般就是一个Block块的大小。 11.2.9 STAGE 一个Job会被拆分成多组任务(TaskSet),每一组任务被称为Stage,任务和MapReduce的map和reduce任务很像。 划分Stage的依据在于:Stage开始一般是由于读取外部数据或者Shuffle数据、一个Stage的结束一般是由于发生Shuffle(例如reduceByKey操作)或者整个Job结束时例如要把数据放到hdfs等存储系统上。 11.2.10 RDD Spark的基本计算单元,是Spark的一个最核心的抽象概念,可以通过一系列算子进行操作,包括Transformation和Action两种算子操作。 11.3 SPARK集群概览 需要注意的是Spark Driver所在的机器需要和Spark集群最好位于同一个网络环境中,因为Driver中的SparkContext实例要要发送任务给不同Worker Node的Executor并接受Executor的一些执行结果信息,一般而言,在企业实际的生产环境中Driver所在机器的配置往往都是比较不错的,尤其是其CPU的处理能力往往都很强悍。 11.4 SPARK核心组件 Spark核心组件如下所示: 在初始话SparkContext的时候会初始化一系列的内容,例如会查看内存的设置情况: SparkContext在初始化的时候也会创建和启动scheduler: 集群核心组件中的Block tracker是用于Block和partition对应关系的管理器。 集群核心组件中的Shuffle tracker是用于记录Shuffle操作过程细节的。 从集群中也可以清晰的看出,Executor在执行任务的时候是采用多线程的方式执行的并能够在HDFS或者HBase等系统上存取数据。 在实际的Driver Program运行的时候每个Partition都会由一个Task负责运行: 也就是说有多少partition就会有多少task在运行,而这些task是并发的运行在executor中的。 11.5 资源管理与作业调度 Spark对于资源管理与作业调度可以使用Standalone(独立模式),Apache Mesos及Hadoop YARN来实现。 Spark on Yarn在Spark0.6时引用,但真正可用是在现在的branch-0.8版本。Spark on Yarn遵循YARN的官方规范实现,得益于Spark天生支持多种Scheduler和Executor的良好设计,对YARN的支持也就非常容易,Spark on Yarn的大致框架图。  让Spark运行于YARN上与Hadoop共用集群资源可以提高资源利用率。 11.6 SPARK作业的执行 用户编写的Spark程序被称为Driver程序,Dirver程序会连接master并定义了对各RDD的转换与操作,而对RDD的转换与操作通过Scala闭包(字面量函数)来表示,Scala使用Java对象来表示闭包且都是可序列化的,以此把对RDD的闭包操作发送到各Workers节点。 Workers存储着数据分块和享有集群内存,是运行在工作节点上的守护进程,当它收到对RDD的操作时,根据数据分片信息进行本地化数据操作,生成新的数据分片、返回结果或把RDD写入存储系统。  11.7 SPARK任务调度系统初见 http://book.51cto.com/art/201409/453029.htm Spark的任务调度系统如下所示: 从上图中可以看出由RDD Objects产生DAG,然后进入了DAGScheduler阶段,DAGScheduler是面向Stage的高层次的调度器,DAGScheduler把DAG拆分成很多的Tasks,每组的tasks都是一个Stage,每当遇到Shuffle就会产生新的Stage,可以看出上图一共有三个stage;DAGScheduler需要记录那些RDD被存入磁盘等物化动作,同时要寻求Task的最优化调度,例如数据本地性等;DAGScheduler还要监视因为Shuffle输出导致的失败,如果发现这种失败,可能就要重新提交 该Stage: DAGScheduler划分Stage后以TaskSet为单位把任务交给低层次的可插拔的调度器TaskScheduler来处理: 可以看出TaskScheduler是一个trait,在目前的Spark系统中TaskScheduler的实现类只有一个TaskSchedulerImpl: 一个TaskScheduler只为一个SparkContext实例服务,TaskScheduler接受来自DAGScheduler发送过来的分组的任务,DAGScheduler给TaskScheduler发送任务的时候是以Stage为单位来提交的,TaskScheduler收到任务后负责把任务分发到集群中Worker的Executor中去运行,如果某个task运行失败,TaskScheduler要负责重试;另外如果TaskScheduler发现某个Task一直未运行完,就可能启动同样的任务运行同一个Task,那个任务先运行完就用哪个任务的结果。 TaskScheduler发送的任务交给了Worker上的Executor以多线程的方式运行,每一个线程负责一个任务:   其中的存储系统的管理是BlockManager来负责的: 下面看一下TaskSet的源码: 从TaskSet源码的第一个参数tasks就可以看出其是一个Task的数组,包含一组Task。 11.8 SPARK内核核心源码解析 11.8.1 RDD依赖源码解析 package org.apache.spark Dependency 对RDD依赖的理解可以从以下几个方面出发: 1. 出发点:依赖本身,是描述两个RDD之间的关系。只是一个RDD可以与多个RDD有依赖关系。 2. 宽窄依赖的关注点:在RDD的各个分区对父RDD的分区的依赖关系。 11.8.2 依赖与STAGE划分 根据父子RDD间的依赖关系是否为Shuffle确定是否划分Stage。 每个Stage对应一个TaskSet,除了最后一个stage外,其他Stage对应的task都是ShuffleMapTask,在这些Stage中,会保存该Stage的后续Stage对它的分区依赖信息,即会包含一个ShuffleDependency; 对应的最后一个stage,即finalStage,在DAG调度中,由一个action触发,对应的Stage会在触发的最开始部分生成,此时,构造Stage的时候,对应的ShuffleDependency为Nil,因为该Stage是最后一个Stage,在该job中不需要再向其他Stage输出,所以不需要对应的ShuffleDependency,获取对应的分区等。 1. 图解Stage划分 Gà B : 窄依赖,不生成Stage; BàA :宽依赖,生成Stage1; Gà F : 宽依赖,生成Stage2; Dà F, EàF,CàD: 窄依赖,不生成Stage; 最后得到 Stage3 – ParentStage:[Stage1, Stage2] 2. 源码解析 入口点:DAGScheduler - handleJobSubmitted 步骤:结合图理解 —— 看图更简单 1. 提交job时,从finalRDD构建出finalStage; 2. 以finalStage为当前Stage – curStage,开始下列步骤: 3. 根据curStage对应的RDD(设为curRDD)与每个父RDD的依赖关系,进行分析 a) 当curRDD与父RDD的依赖为ShuffleDependency时,以父RDD 构建新的Stage。对新的Stage重复相同步骤(步骤3),继续分析 b) 当curRDD与父RDD的依赖为 NarrowDependency时,分析的依赖上移,对父RDD与其父RDD的依赖进行相同步骤(步骤3)的分析。 4. 将在curStage基础上逐步上移所分析得到的Stage放入curStage的ParentStages中。 参考github上的JerryLead-SparkInternals: 11.8.3 RDD之分区源码解析 package org.apache.spark Partition 1. RDD的Partitioner:a Partitioner for key-value RDDs (e.g. to say that the RDD is hash-partitioned) 2. 分区器的定义:Partition – defaultPartitioner a) 使用自定义的分区器partitioner; b) 未设置分区器partitioner时,使用默认的分区器HashPartitioner;此时分区器的分区数定义如下: i. 从配置属性"spark.default.parallelism"获取; ii. 使用父RDD的分区数的最大值。 3. 非k-v形式的RDDs,木有分区器,相关分区信息参见各自RDD对应的分区类型,可以参考RDD的getPartitions方法,比如方法会指定RDD分区与父RDD分区间的映射关系等。 11.8.4 RDD源码解析 RDD的官方解释是: 每个RDD的核心方法如下所示: RDD主要分为两种:   下面我们初步分析一下RDD的四个核心方法,首先看一下getPartitions方法的源码: getPartitions返回的是一系列partitions的集合,即一个Partition类型的数组; 而getDependencies表达式RDD之间的依赖关系,如下所示: getDependencies返回的是依赖关系的一个Seq集合,里面的Dependency数组中的下划线是类型的PlaceHolder; 每个RDD都会具有计算的函数,如下所示: Compute方法是针对RDD的每个Partition进行计算的,其TaskContext参数的源码如下: getPreferredLocations是寻找Partition的首选位置: 其实RDD还有一个可选的分区策略: Partitioner的源码如下:     可以看出默认使用的是HashPartitioner,要注意key为Array的情况; 11.8.5 SPARKCONTEXT核心源码解析 SparkContext在获得了一系列的初始化信息后开始创建并启动TaskScheduler实例: 进入createTaskScheduler方法: 我们看一下其Standalone的方式: 在上述代码中首先实例化一个TaskSchedulerImpl: 然后构建出了masterUrls; 接着创建出了非常关键的backend: 我们进入其实现看一下: SparkDeploySchedulerBackend核心是为了启动CoarseGrainedExecutorBackend: 此处使用了Akka技术进行不同机器之间的通信,CoarseGrainedExecutorBackend是具体在Worker上执行具体的任务的进程的代表,所以我们的backend实例就是用来提交任务给Executor的: 其实CoarseGrainedExecutorBackend是Executor的代理人,能够完成很多任务,例如启动一个任务: 回到Standalone的方式的代码处: 接着代码是把backend传给了initialize方法中: 在上述代码中显示处理调度模式例如FIFO和Fair的模式。 在代码块的最后返回实例化后的scheduer:   11.8.6 TASKSCEDULER启动源码解析 TaskScheduler实例对象启动源代码如下所示: 进入start方法: 找到TaskSchedulerImpl实现类中的start方法实现: 其中TaskSchedulerImpl实例对象的主要目的在于启动backend,backend的类型如下所示: SchedulerBackend的具体实现如下所示:   11.8.7 DRIVER中APPCLIENT源码解析 首先从SparkContext中TaskScheduler实例的创建开始: 进入start代码内部: 进入其实现者TaskSchedulerImpl内部: 可以发现在start具体实现的内部首先是有个backend.start方法: 其最终具体的实现类为:   从代码中可以看出,我们把CoarseGrainedExecutorBackend封装成command,然后交给appDesc,接着交给了Appclient,此时的AppClient就是客户端程序! Appclient会调用start方法: 此时启动了ClientActor:   11.8.8 APPCLIENT注册MASTER 注册Master有两种,一种是registerWithMaster方法,一种是tryRegisterAllMasters方法,前者是单Master的情况,后者是多Master,一般情况下是满足HA机制,我们看一下registerWithMaster方法: 此时会发生tryRegisterAllMasters方法:   此时通过Akka通过消息机制发送消息给Master来注册程序,RegisterApplication是一个case class,来封装消息: 我们进入Master的源代码: 看一下接受客户端发送过来消息RegisterApplication的代码如下所示: 此时首先使用ApplicationInfo构建一些准备信息,然后会导致registerApplication代码的调用: 代码中就是一个注册应用的过程。 接着在Master的消息响应中会调用schedule方法: 可以看到schedule方法中首先要启动Driver程序,也就是有main函数的程序,然后在schedule中会调度Worker的过程:   改代码会导致launchExecutor代码的执行: 在launchExecutor内部Master发送消息给Worker节点,消息为LaunchExecutor:   11.8.9 WORKER中EXECUTOR启动过程源代码解析 进入Worker源代码: 可以看出Worker本身是Akka中的一个Actor。 我们看一下Worker对LaunchExecutor消息的处理: 从源代码可以看出Worker节点上要分配CPU和Memory给新的Executor,首先需要创建一个ExecutorRunner: ExecutorRunner是用于维护executor进程的: 其中最重要的方法是fetchAndRunExecutor:   至此,Worker节点上的Executor启动运行。 11.8.10 DAGSCHEDULER源码解析 当构建完TaskScheduler之后,我们需要构建DAGScheduler这个核心对象: 进入其构造器代码: 可以看出构建DAGScheduler实例的时候需要把TaskScheduler实例对象作为参数传入。 在DAGScheduler实例化最终调用其primary constructor的时候会导致以下函数的执行: 看一下该函数内部所做的事情: 可以看出核心在于实例化eventProcessActor对象,eventProcessActor会负责接收和发送DAGScheduler的消息,是DAGScheduler的通信载体。 11.8.11 SPARK的WEB监控源码解析 在SparkContext中可以看到如下代码: 首先是创建一个Spark Application的Web监控实例对象: 然后bind方法会绑定一个web服务器: 可以看出我们使用Jetty服务器来监控程序的运行和显示Spark集群的信息的。 11.8.12 提交代码源码初步解读 从官网等渠道了解spark提交应用程序的方法。 在通读一遍源码后,对源码解读进行整理、总结,以提高对源码的理解。 (个人经验:王老师的视频或书籍中,会讲解涉及到的重要部分,通读一遍源码后,再跟着视频或书籍看一遍,可以抓住重点,加快源码理解) 内容包含以下几个部分。 11.8.12.1 入口点 入口脚本:.bin/spark-submit Object:org.apache.spark.deploy.SparkSubmit 入口方法:def main(args: Array[String]) ——主构造函数做了哪些事?(忽略) 参数类:SparkSubmitArguments 11.8.12.2 入口点开始源码解读 找到Object SparkSubmit入口方法:def main(args: Array[String]) def main(args: Array[String]) { val appArgs = new SparkSubmitArguments(args) if (appArgs.verbose) { printStream.println(appArgs) } val (childArgs, classpath, sysProps, mainClass) = createLaunchEnv(appArgs) launch(childArgs, classpath, sysProps, mainClass, appArgs.verbose) } 通过main源码解读可得最终调用: 1. mainClass = Class.forName(childMainClass, true, loader) 2. val mainMethod = mainClass.getMethod("main", new Array[String](0).getClass) 3. mainMethod.invoke(null, childArgs.toArray) 即:SparkSubmit最终调用的是childMainClass类的main静态方法。 问题:为什么最终调用的不是提交的mainClass,而是childMainClass类? 由该问题引入下一步: 11.8.12.3 由SPARKSUBMIT的主要功能指导源码解读 回到SparkSubmit的描述,对不同集群管理及部署模式的封装体现在childMainClass的设置(注意:我们提交的是mainClass类,调用childMainClass的main方法时,最终一定会调用我们提交的mainClass类,即以childMainClass的调用,封装了mainClass类的调用) /** * Main gateway of launching a Spark application. * * This program handles setting up the classpath with relevant Spark dependencies and provides * a layer over the different cluster managers and deploy modes that Spark supports. */ 11.8.12.4 (集群管理,部署模式)源码解读 重点关注:集群管理+部署模式 ——> childMainClass的设置 集群管理+部署模式组合情况的childMainClass设置(提交mainClass类的封装): (clusterManager, deployMode) childMainClass 说明 (YARN,CLIENT) (_,CLIENT) args.mainClass Client模式:mainClass就在提交点执行main方法 (STANDALONE ,CLIENT) (MESOS ,CLIENT) (LOCAL ,CLIENT) (YARN, CLUSTER) isPython isShell isSqlShell "org.apache.spark.deploy.yarn.Client" (STANDALONE , CLUSTER) "org.apache.spark.deploy.Client" (LOCAL , CLUSTER) “”/args.mainClass 代码可参考SparkContext对isLocal的处理 (MESOS , CLUSTER) 测试(LOCAL , CLUSTER): 测试(_ , CLUSTER) + isShell : 红色部分:当前不支持的组合。 提交mainClass类的封装本质上就是如何将mainClass类设置到childMainClass中: 1. childArgs += (args.master, args.primaryResource, args.mainClass) 2. childArgs += ("--class", args.mainClass) 11.8.13 提交代码源码解读之(STANDALONE , CLUSTER)方式 (STANDALONE , CLIENT)方式,由上一节可知,是在提交点直接调用提交类mainClass的main方法(可以用提交WordCount来理解下)。 分析(STANDALONE , CLUSTER)方式的提交源码解读,两者可对比理解。 (clusterManager, deployMode) childMainClass (STANDALONE , CLIENT) args.mainClass (STANDALONE , CLUSTER) "org.apache.spark.deploy.Client" (STANDALONE , CLIENT) 与(STANDALONE , CLUSTER)两种提交的差异在于执行的主类不同,接下来对以CLUSTER部署模式提交时调用的Client类进行源码解读。 11.8.13.1 CLIENT源码解读 1. 关键点 入口方法:def main(args: Array[String]) ——主构造函数做了哪些事?(忽略) 参数类:ClientArguments: SparkSubmit类在(STANDALONE , CLUSTER)提交模式下,SparkSubmitArguments 重新封装为ClientArguments。 2. 入口点的源码解读 Client的入口方法: /** * Executable utility for starting and terminating drivers inside of a standalone cluster. * 在一个 Standalone 集群中,启动、停止 divers 的一个可执行工具类 */ object Client { def main(args: Array[String]) { …… val (actorSystem, _) = AkkaUtils.createActorSystem( "driverClient", Utils.localHostName(), 0, conf, new SecurityManager(conf)) actorSystem.actorOf(Props(classOf[ClientActor], driverArgs, conf)) actorSystem.awaitTermination() } } 即调用Client的main方法后,构建ActorSystem,然后通过ClientActor与Master进行通讯。 11.8.13.2 CLIENTACTOR源码解读 ClientActor是一个Actor,Actor的关键点:preStart()、receiveWithLogging,… 1. preStart() :发送启动、停止Driver的消息 RequestSubmitDriver + RequestKillDriver val command = new Command(mainClass, Seq("{{WORKER_URL}}", driverArgs.mainClass) ++ driverArgs.driverOptions, sys.env, classPathEntries, libraryPathEntries, javaOpts) val driverDescription = new DriverDescription( driverArgs.jarUrl, driverArgs.memory, driverArgs.cores, driverArgs.supervise, command) masterActor ! RequestSubmitDriver(driverDescription) 可以看到,SparkSubmit提交的mainClass被封装到了Command中,最终封装成DriverDescription,用RequestSubmitDriver消息发送到Master,由Master负责分配调度。 11.9 存储源码解读 11.10 JOB全生命周期源码解读 11.10.1 RDD的COUNT操作为例触发JOB全生命周期源码研究 要想从源码级别研究Job全生命周期,往往需要action触发Job的运行,我们以RDD的count操作为例,如下所示: 可以看到count方法触发SparkContext的runJob方法的调用: 继续跟踪进去: 进一步跟踪下去: 继续跟踪下去: 此时发现进入DagScheduler的runJob方法: 此时执行流程进入submitJob中:   此时我们发现会向eventProcessActor发送JobSubmitted这个消息,JobSubmitted是一个case class: 进入eventProcessActor定义的地方: 此时需要跟踪进DAGSchedulerEventProcessActor看具体的eventProcessActor的实现: 可以看到我们的JobSubmitted消息是交给DAGScheduler的handleJobSubmitted方法来处理的:     源代码中首先构建finalStage,然后又一个getMissingParentsStages方法,可以发现运行有本地运行和集群运行两种模式,本地运行主要用于本地实验和调试: 集群运行模式下回调用submitStage: submitStage第一次传入的参数是Job的最后一个Stage,然后判断一下是否缺失父Stage,如果没有依赖的parent Stage的话就可以submitMissingTasks运行,如果有parent Stage的话就要再一次submitStage做递归操作,最终会导致submitMissingTasks的调用:         从源代码中可以看出DAGScheduler中向TaskScheduler以Stage为单位提交任务,Stage是以TaskSet为单位的: 进入TaskScheduler的submitTasks:   进入TaskSchedulerImpl中的具体实现: 获取资源需要使用reviveOffers方法:   此时的SchedulerBackend是CoarseGrainedSchedulerBackend: 此时看reviveOffers方法的实现: 此时会向driverActor发送ReviveOffers消息,driverActor的实现代码如下:   此时跟踪进DriverActor的实现中:         可以看到ReviveOffers消息的具体实现是makeOffers方法: WorkerOffer对象代表是某个Executor上可用的资源,freeCores(id)是该executor上空余的CPU数目: 进入launchTasks: 此时想executorActor发送启动Task的请求,其实是向CoarseGrainedExecutorBackend发送LaunchTask消息:       在LaunchTask消息中会导致executor.lauchTask的调用: 其中的TaskRunner封装了任务本身: 任务执行的是交给了线程池去执行的。 11.11 SPARK内核之存储管理 11.12 SPARK内核之计算 11.13 SPARK内核之容错 11.14 工具类源码解析 路径:package org.apache.spark.util 11.14.1 UTILS 1. startServiceOnPort方法:在给定端口号上启动一个服务 /** * Attempt to start a service on the given port, or fail after a number of attempts. * Each subsequent attempt uses 1 + the port used in the previous attempt (unless the port is 0). * * @param startPort The initial port to start the service on. * @param maxRetries Maximum number of retries to attempt. * A value of 3 means attempting ports n, n+1, n+2, and n+3, for example. * @param startService Function to start service on a given port. * This is expected to throw java.net.BindException on port collision. * * 尝试在给定的端口上启动一个 service, 或在尝试一定次数后失败。 * 每一次尝试都会使用前一次尝试的 端口号 + 1 (除非端口是 0)。 * * 补充:惨考 ServerSocket 类 , 端口号为 0 时,由系统随机生成端口号。 */ def startServiceOnPort[T]( startPort: Int, startService: Int => (T, Int), serviceName: String = "", maxRetries: Int = portMaxRetries): (T, Int) = { 通过该方法启动的服务有: 1. HTTP服务:org.eclipse.jetty.server.Server 2. Jetty服务:org.eclipse.jetty.server.Server 3. NIO服务:java.nio.channels. ServerSocketChannel 4. ActorSystem服务: akka.actor.ActorSystem 应用程序-driver:启动时,需要启动HTTP Server 和 Web UI – Jetty Server 11.15 SPARK内核之网络传输 针对spark 1.2版本 —— release-notes communication manager换成了netty-based实现 —— 默认配置。 之前每次都要从磁盘读到内核,再到用户态,再回到内核态进入网卡,换了之后用zerocopy来实现。 可以参考:Kafka。  传大量数据的情况下 —— 极大提高。 11.16 SPARK内核之SHUFFLE http://databricks.com/blog/2015/01/15/improved-driver-fault-tolerance-and-zero-data-loss-in-spark-streaming.html 针对spark 1.2版本 —— release-notes shuffle manager默认设置改成sort based。 在shuffle数据比较大的时候,性能会有提升。 与Hadoop的Shuffle-sort相比:目前Spark的sort只是按照Partition key排序,Partition内部目前是不排序的,不过就算内部要排序,也是比较容易实现的。而Hadoop是按照每个Partition内的每个KV排序的。 11.16.1 两种SHUFFLE MANAGER的比较 org.apache.spark.shuffle.sort.HashShuffleManager(配置参数值为hash)和org.apache.spark.shuffle.sort.SortShuffleManager(配置参数值为sort)在实现方式上的区别: HashShuffleManager,故名思义也就是在Shuffle的过程中写数据时不做排序操作,只是将数据根据Hash的结果,将各个Reduce分区的数据写到各自的磁盘文件中。带来的问题就是如果Reduce分区的数量比较大的话,将会产生大量的磁盘文件。如果文件数量特别巨大,对文件读写的性能会带来比较大的影响,此外由于同时打开的文件句柄数量众多,序列化,以及压缩等操作需要分配的临时内存空间也可能会迅速膨胀到无法接受的地步,对内存的使用和GC带来很大的压力,在Executor内存比较小的情况下尤为突出,例如Spark on Yarn模式。 SortShuffleManager,是1.1版本之后实现的一个试验性(也就是一些功能和接口还在开发演变中)的ShuffleManager,它在写入分区数据的时候,首先会根据实际情况对数据采用不同的方式进行排序操作,底线是至少按照Reduce分区Partition进行排序,这样来至于同一个Map任务Shuffle到不同的Reduce分区中去的所有数据都可以写入到同一个外部磁盘文件中去,用简单的Offset标志不同Reduce分区的数据在这个文件中的偏移量。这样一个Map任务就只需要生成一个shuffle文件,从而避免了上述HashShuffleManager可能遇到的文件数量巨大的问题 两者的性能比较,取决于内存,排序,文件操作等因素的综合影响。 对于不需要进行排序的Shuffle操作来说,如repartition等,如果文件数量不是特别巨大,HashShuffleManager面临的内存问题不大,而SortShuffleManager需要额外的根据Partition进行排序,显然HashShuffleManager的效率会更高。 而对于本来就需要在Map端进行排序的Shuffle操作来说,如ReduceByKey等,使用HashShuffleManager虽然在写数据时不排序,但在其它的步骤中仍然需要排序,而SortShuffleManager则可以将写数据和排序两个工作合并在一起执行,因此即使不考虑HashShuffleManager的内存使用问题,SortShuffleManager依旧可能更快。 12 SPARK HISTORY SERVER 12.1 源码解析 由服务启动脚本./sbin/start-history-server.sh得到对应的类,为HistoryServer 查看HistoryServer 源码: class HistoryServer( conf: SparkConf, provider: ApplicationHistoryProvider, securityManager: SecurityManager, port: Int) extends WebUI(securityManager, port, conf) with Logging { …… * export SPARK_HISTORY_OPTS="-Dspark.history.fs.logDirectory=/tmp/spark-events" * ./sbin/start-history-server.sh 13 SPARK性能优化 14 SPARK核心框架篇 15 SPARK STREAMING Spark streaming: 构建在Spark上处理Stream数据的框架,基本的原理是将Stream数据分成小的时间片断(几秒),以类似batch批量处理的方式来处理这小部分数据。Spark Streaming构建在Spark上,一方面是因为Spark的低延迟执行引擎(100ms+)可以用于实时计算,另一方面相比基于Record的其它处理框架(如Storm),RDD数据集更容易做高效的容错处理。此外小批量处理的方式使得它可以同时兼容批量和实时数据处理的逻辑和算法。方便了一些需要历史数据和实时数据联合分析的特定应用场合。 15.1 外部交互的数据流图 除了以上这些输入源,Spark Streaming还可以从普通TCP Socket 获取实时输入流数据。 输入数据流 (Input DStreams) 分类: 1. Basic sources: Sources directly available in the StreamingContext API. Example: file systems, socket connections, and Akka actors. 2. Advanced sources: Sources like Kafka, Flume, Kinesis, Twitter, etc. are available through extra utility classes. These require linking against extra dependencies as discussed in the linking section. 一个  Receiver 关联一个Input DStreams( file stream除外)。 一个  Receiver  对应一个数据源, 所以 Input DStreams对应一个流数据。 一个streaming 应用程序可以同时创建多个Input DStreams并行接收多个流数据。 15.2 SPARK内部数据处理图 1. 将流式计算分解 成一系列短小的批处理作业; 2. 将失败或者执行较慢的任务在其他节点上并行执行; 3. 较强的容错能力(基于RDD继承关系Lineage); 4. 使用和RDD一样的语义。 15.3 SPARKSTREAMING框架 计算流程:Spark Streaming是将流式计算分解成一系列短小的批处理作业。这里的批处理引擎是Spark,也就是把Spark Streaming的输入数据按照batch size(如1秒)分成一段一段的数据(Discretized Stream),每一段数据都转换成Spark中的RDD(Resilient Distributed Dataset),然后将Spark Streaming中对DStream的Transformation操作变为针对Spark中对RDD的Transformation操作,将RDD经过操作变成中间结果保存在内存中。整个流式计算根据业务的需求可以对中间的结果进行叠加,或者存储到外部设备。图2显示了Spark Streaming的整个流程。   图2 Spark Streaming构架图:Spark流系统的高级概述 Spark Streaming把输入数据流分成批次,并将它们存储在Spark内存中,然后通过产生用来处理每个批次数据的Spark作业的方式来执行一个流应用程序。 容错性:对于流式计算来说,容错性至关重要。首先我们要明确一下Spark中RDD的容错机制。每一个RDD都是一个不可变的分布式可重算的数据集,其记录着确定性的操作继承关系(lineage),所以只要输入数据是可容错的,那么任意一个RDD的分区(Partition)出错或不可用,都是可以利用原始输入数据通过转换操作而重新算出的。 图3 Spark Streaming中RDD的lineage关系图 对于Spark Streaming来说,其RDD的传承关系如图3所示,图中的每一个椭圆形表示一个RDD,椭圆形中的每个圆形代表一个RDD中的一个Partition,图中的每一列的多个RDD表示一个DStream(图中有三个DStream),而每一行最后一个RDD则表示每一个Batch Size所产生的中间结果RDD。我们可以看到图中的每一个RDD都是通过lineage相连接的,由于Spark Streaming输入数据可以来自于磁盘,例如HDFS(多份拷贝)或是来自于网络的数据流(Spark Streaming会将网络输入数据的每一个数据流拷贝两份到其他的机器)都能保证容错性。所以RDD中任意的Partition出错,都可以并行地在其他机器上将缺失的Partition计算出来。这个容错恢复方式比连续计算模型(如Storm)的效率更高。  实时性:对于实时性的讨论,会牵涉到流式处理框架的应用场景。Spark Streaming将流式计算分解成多个Spark Job,对于每一段数据的处理都会经过Spark DAG图分解,以及Spark的任务集的调度过程。对于目前版本的Spark Streaming而言,其最小的Batch Size的选取在0.5~2秒钟之间(Storm目前最小的延迟是100ms左右),所以Spark Streaming能够满足除对实时性要求非常高(如高频实时交易)之外的所有流式准实时计算场景。 扩展性与吞吐量:Spark目前在EC2上已能够线性扩展到100个节点(每个节点4Core),可以以数秒的延迟处理6GB/s的数据量(60M records/s),其吞吐量也比流行的Storm高2~5倍,图4是Berkeley利用WordCount和Grep两个用例所做的测试,在Grep这个测试中,Spark Streaming中的每个节点的吞吐量是670k records/s,而Storm是115k records/s。 图4 Spark Streaming与Storm吞吐量比较图 15.4 SPARK STREAMING的编程模型  Spark Streaming的编程和Spark的编程如出一辙,对于编程的理解也非常类似。对于Spark来说,编程就是对于RDD的操作;而对于Spark Streaming来说,就是对DStream的操作。下面将通过一个大家熟悉的WordCount的例子来说明Spark Streaming中的输入操作、转换操作和输出操作。  Spark Streaming初始化:在开始进行DStream操作之前,需要对Spark Streaming进行初始化生成StreamingContext。参数中比较重要的是第一个和第三个,第一个参数是指定Spark Streaming运行的集群地址,而第三个参数是指定Spark Streaming运行时的batch窗口大小。在这个例子中就是将1秒钟的输入数据进行一次Spark Job处理。 val ssc = new StreamingContext(“Spark://…”, “WordCount”, Seconds(1), [Homes], [Jars])   Spark Streaming的输入操作:目前Spark Streaming已支持了丰富的输入接口,大致分为两类:一类是磁盘输入,如以batch size作为时间间隔监控HDFS文件系统的某个目录,将目录中内容的变化作为Spark Streaming的输入;另一类就是网络流的方式,目前支持 Kafka、Flume、Twitter和TCP socket。在WordCount例子中,假定通过网络socket作为输入流,监听某个特定的端口,最后得出输入DStream(lines)。 val lines = ssc.socketTextStream(“localhost”,8888) Spark Streaming的转换操作:与Spark RDD的操作极为类似,Spark Streaming也就是通过转换操作将一个或多个DStream转换成新的DStream。常用的操作包括map、filter、flatmap和join,以及需要进行shuffle操作的groupByKey/reduceByKey等。在WordCount例子中,我们首先需要将DStream(lines)切分成单词,然后将相同单词的数量进行叠加, 最终得到的wordCounts就是每一个batch size的(单词,数量)中间结果。  val words = lines.flatMap(_.split(“ ”)) val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _) 另外,Spark Streaming有特定的窗口操作,窗口操作涉及两个参数:一个是滑动窗口的宽度(Window Duration);另一个是窗口滑动的频率(Slide Duration),这两个参数必须是batch size的倍数。例如以过去5秒钟为一个输入窗口,每1秒统计一下WordCount,那么我们会将过去5秒钟的每一秒钟的WordCount都进行统计,然后进行叠加,得出这个窗口中的单词统计。  val wordCounts = words.map(x => (x, 1)).reduceByKeyAndWindow(_ + _, Seconds(5s),seconds(1)) 但上面这种方式还不够高效。如果我们以增量的方式来计算就更加高效,例如,计算t+4秒这个时刻过去5秒窗口的WordCount,那么我们可以将t+3时刻过去5秒的统计量加上[t+3,t+4]的统计量,在减去[t-2,t-1]的统计量(如图5所示),这种方法可以复用中间三秒的统计量,提高统计的效率。  val wordCounts = words.map(x => (x, 1)).reduceByKeyAndWindow(_ + _, _ - _, Seconds(5s),seconds(1)) 图5 Spark Streaming中滑动窗口的叠加处理和增量处理 Spark Streaming的输入操作:对于输出操作,Spark提供了将数据打印到屏幕及输入到文件中。在WordCount中我们将DStream wordCounts输入到HDFS文件中。 wordCounts = saveAsHadoopFiles(“WordCount”) Spark Streaming启动:经过上述的操作,Spark Streaming还没有进行工作,我们还需要调用Start操作,Spark Streaming才开始监听相应的端口,然后收取数据,并进行统计。 ssc.start() 15.5 SPARK STREAMING案例分析  在互联网应用中,网站流量统计作为一种常用的应用模式,需要在不同粒度上对不同数据进行统计,既有实时性的需求,又需要涉及到聚合、去重、连接等较为复杂的统计需求。传统上,若是使用Hadoop MapReduce框架,虽然可以容易地实现较为复杂的统计需求,但实时性却无法得到保证;反之若是采用Storm这样的流式框架,实时性虽可以得到保证,但需求的实现复杂度也大大提高了。Spark Streaming在两者之间找到了一个平衡点,能够以准实时的方式容易地实现较为复杂的统计需求。 下面介绍一下使用Kafka和Spark Streaming搭建实时流量统计框架。  数据暂存:Kafka作为分布式消息队列,既有非常优秀的吞吐量,又有较高的可靠性和扩展性,在这里采用Kafka作为日志传递中间件来接收日志,抓取客户端发送的流量日志,同时接受Spark Streaming的请求,将流量日志按序发送给Spark Streaming集群。 数据处理:将Spark Streaming集群与Kafka集群对接,Spark Streaming从Kafka集群中获取流量日志并进行处理。Spark Streaming会实时地从Kafka集群中获取数据并将其存储在内部的可用内存空间中。当每一个batch窗口到来时,便对这些数据进行处理。  结果存储:为了便于前端展示和页面请求,处理得到的结果将写入到数据库中。  相比于传统的处理框架,Kafka+Spark Streaming的架构有以下几个优点。  Spark框架的高效和低延迟保证了Spark Streaming操作的准实时性。 利用Spark框架提供的丰富API和高灵活性,可以精简地写出较为复杂的算法。  编程模型的高度一致使得上手Spark Streaming相当容易,同时也可以保证业务逻辑在实时处理和批处理上的复用。  在基于Kafka+Spark Streaming的流量统计应用运行过程中,有时会遇到内存不足、GC阻塞等各种问题。下面介绍一下如何对Spark Streaming应用程序进行调优来减少甚至避免这些问题的影响。  15.6 性能调优  15.6.1 优化运行时间 1. 增加并行度。确保使用整个集群的资源,而不是把任务集中在几个特定的节点上。对于包含shuffle的操作,增加其并行度以确保更为充分地使用集群资源。 2. 减少数据序列化、反序列化的负担。Spark Streaming默认将接收到的数据序列化后存储以减少内存的使用。但序列化和反序列化需要更多的CPU时间,因此更加高效的序列化方式(Kryo)和自定义的序列化接口可以更高效地使用CPU。  3. 设置合理的batch窗口。在Spark Streaming中,Job之间有可能存在着依赖关系,后面的Job必须确保前面的Job执行结束后才能提交。若前面的Job执行时间超出了设置的batch窗口,那么后面的Job就无法按时提交,这样就会进一步拖延接下来的Job,造成后续Job的阻塞。因此,设置一个合理的batch窗口确保Job能够在这个batch窗口中结束是必须的。  4. 减少任务提交和分发所带来的负担。通常情况下Akka框架能够高效地确保任务及时分发,但当batch窗口非常小(500ms)时,提交和分发任务的延迟就变得不可接受了。使用Standalone模式和Coarse-grained Mesos模式通常会比使用Fine-Grained Mesos模式有更小的延迟。  15.6.2 优化内存使用 1. 控制batch size。Spark Streaming会把batch窗口内接收到的所有数据存放在Spark内部的可用内存区域中,因此必须确保当前节点Spark的可用内存至少能够容纳这个batch窗口内所有的数据,否则必须增加新的资源以提高集群的处理能力。 2. 及时清理不再使用的数据。上面说到Spark Streaming会将接收到的数据全部存储于内部的可用内存区域中,因此对于处理过的不再需要的数据应及时清理以确保Spark Streaming有富余的可用内存空间。通过设置合理的spark.cleaner.ttl时长来及时清理超时的无用数据。  3. 观察及适当调整GC策略。GC会影响Job的正常运行,延长Job的执行时间,引起一系列不可预料的问题。观察GC的运行情况,采取不同的GC策略以进一步减小内存回收对Job运行的影响。  15.7 核心术语 1. discretized stream or DStream 高级抽象概念,表示持续流数据。 在内部DStream 表现为一个 RDDs序列。 15.8 源码解析 15.8.1 数据源源码 1. 普通TCP套接字对应的数据源 方法:ssc.socketTextStream(…) 记录数据:每一条记录对应一个文本行( a line of text)。 16 SPARK SQL 16.1 简介 16.2 SPARK SQL 框架 16.3 SPARK SQL组件 16.4 运行框架 1. 引入了新的RDD类型 SchemaRDD,可以像传统数据库定义表一样来定义SchemaRDD, SchemaRDD由定义了列数据类型的行对象构成。 2. SchemaRDD可以从RDD转换过来,也可以从Parquet文件读入,也可以使用HiveQL 从Hive中获取。 3. 在应用程序中可以混合使用不同来源的数据,如可以将来自HiveQL的数据和来自SQL的数据进行join操作。 4. 内嵌catalyst优化器对用户查询语句进行自动优化。 16.5 SPARK SQL支持版本 16.6 与SHARK的关系 16.7 SPARK SQL应用 Spark支持Scala、Python等语言写的脚本直接在Spark环境执行,更重要的是支持对Hive语句进行包装后在Spark上运行。这就是Spark SQL。 16.7.1 预置条件 16.7.1.1 配置  配置的步骤比较简单,把Hive的配置文件hive-site.xml直接放置到$SPARK_HOME的conf路径下即可。如果是想在Spark集群本地执行SQL的话,每个对应的节点都要做同样的配置。 16.7.1.2 运行环境 Hive相关依赖: 1. 如果依赖hive,在编译spark时,需添加hive的profile,如-Phive; 2. 如果依赖thriftserver,在编译spark时,需添加thriftserver的profile,如-Phive-thriftserver。 如果编译时没有加入这两个Profile,在运行时会报相关类的java.lang.ClassNotFoundException异常。 如: java.lang.ClassNotFoundException: org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver           at java.net.URLClassLoader$1.run(URLClassLoader.java:366)           at java.net.URLClassLoader$1.run(URLClassLoader.java:355)  …… JDBC驱动jar包的依赖: 在运行环境中,要在classpath下添加数据库访问所需的驱动jar包。 如为驱动程序Driver添加mysql(Mysql为hive元数据默认库)的驱动包时,可以使用--driver-class-path选项,添加jar包所在路径,如: --driver-class-path /…/lib/mysql-connector-java-X.X.X-bin.jar 16.7.2 SHELL下运行SPARK SQL 启动脚本:bin目录下的spark-shell 根据命令行参数可以在本机(local)模式下运行,也可以指定在集群上运行。 16.7.2.1 在本机运行 执行启动命令: ./bin/spark-shell    启动shell后,依次执行如下语句: val sc: SparkContext    val hiveContext = new org.apache.spark.sql.hive.HiveContext(sc)    import hiveContext._    hql("CREATE TABLE IF NOT EXISTS src (key INT, value STRING)")    hql("LOAD DATA LOCAL INPATH '/examples /data.txt' INTO TABLE src")    hql("FROM src SELECT key, value").collect().foreach(println) 上面的命令,分别是声明SparkContext对象,利用hql方法执行Hive的SQL语句,在执行SQL语句的过程中,可以通过Hive的Cli客户端进行查看相应操作的结果。 16.7.2.2 在集群运行 将应用程序打包成jar文件,使用提交脚本spark-submit.sh 提交。 启动命令:./bin/spark-shell --master …. 以on yarn模式为例:     由于spark-shell脚本是在本地执行的,如果想放到Yarn上去执行的话,可以使用上面第4节中的spark-submit工具,这时候需要对需要输入的sql语句进行包装,将包装类打包成jar文件,再提交。     包装类的代码如下: package spark;    import java.util.List;    import org.apache.spark.SparkConf;  import org.apache.spark.api.java.JavaSparkContext;  import org.apache.spark.sql.api.java.Row;  import org.apache.spark.sql.hive.api.java.JavaHiveContext;    /**   * Description:   * Author: ITScott@163.com   * Date: 2014/7/15   */  public class SparkSQL {        public static void main(String[] args) {          if(args.length != 2){              System.out.println("usage:  ");              System.exit(1);          }            String applicationName = args[0];          String sql = args[1];            SparkConf conf = new SparkConf().setAppName(applicationName);          JavaSparkContext sc = new JavaSparkContext(conf);          JavaHiveContext hiveContext = new JavaHiveContext(sc);          List results = hiveContext.hql(sql).collect();            System.out.println("Sql is:" + sql + ", has been executed over.");          System.out.println("The result size is " + results.size() + ", they are:");          for(int i=0; i Variable subsitution to apply to hive commands. e.g. -d A=B or --define A=B --database Specify the database to use -e SQL from command line -f SQL from files -h connecting to Hive Server on remote host --hiveconf Use value for given property --hivevar Variable subsitution to apply to hive commands. e.g. --hivevar A=B -i Initialization SQL file -p connecting to Hive Server on port number -S,--silent Silent mode in interactive shell -v,--verbose Verbose mode (echo executed SQL to the console) 其中[options] 是CLI启动一个SparkSQL应用程序的参数, 不同模式下执行脚本: Local模式: ./bin/spark-sql : 只能通过http://master:4040进行监控  Master 模式: ./bin/spark-sql --master spark://master :7077 --executor-memory 1g 16.7.3.1 示例 与hive命令行进行比对 1. spark-sql> show tables;   17 MLLIB(MACHINE LEARNING LIBRARY) 和MLBASE 17.1 MLLIB和MLBASE的框架 1. ML Optimizer 优化器会选择最合适的、已经实现好了的机器学习算法和相关参数; 2. MLI 是一个进行特征抽取和高级ML编程抽象的算法实现的API或平台; 3. MLlib 是基于Spark的底层分布式机器学习库,可以不断的扩充算法; 4. MLRuntime 基于Spark计算框架,将Spark的分布式计算应用到机器学习领域。 17.2 典型的数据分析流 18 GRAPHX(GRAPH PROCESSING) 18.1 GRAPHX架构 1. GraphX定义了一个新的概念:弹性分布式属性图, 一个每个顶点和边都带有属性的定向多重图; 2. 引入了三种核心RDD:Vertices、Edges、Triplets; 3. 开放了一组基本操作(如subgraph,joinVertices,and mapReduceTriplets); 4. 不断地扩展图形算法和图形构建工具来简化分析工作。 19 BAGEL (PREGEL ON SPARK) Bagel: Pregel on Spark,可以用Spark进行图计算,这是个非常有用的小项目。Bagel自带了一个例子,实现了Google的PageRank算法。 20 SPARKR 在Sparkrelease计划中,在Spark 1.3中有将SparkR纳入到发行版的可能。 SparkR的出现解决了R语言中无法级联扩展的难题,同时也极大的丰富了Spark在机器学习方面能够使用的Lib库。使用SparkR能让用户同时使用Spark RDD提供的丰富Api,也可以调用R语言中丰富的Lib库。 SparkR是AMPLab发布的一个R开发包,为Apache Spark提供了轻量的前端。SparkR提供了Spark中弹性分布式数据集(RDD)的API,用户可以在集群上通过R shell交互性的运行job。 开源地址:https://github.com/amplab-extras/SparkR-pkg 简介:https://github.com/amplab-extras/SparkR-pkg#readme 20.1 数据流 1. 提供了Spark中弹性分布式数据集(RDD)的API,用户可以在集群上通过R shell交互性的运行Spark job。 2. 支持序列化闭包功能,可以将用户定义函数中所用到的变量自动序列化发送到集群中其他机器上。 3. SparkR还可以很容易地调用R开发包,只需要在集群上执行操作前用includePackage读取R开发包就可以了,当然集群上要安装R开发包。 20.2 R ON SPARK SparkR是一个 R package,向从R转到Spark者提供了轻量级的前端。 20.2.1 安装 20.2.1.1 安装要求 1. 版本要求 Scala 2.10 + Spark version >= 0.9.0. 当前默认使用apache仓库上的1.1.0版本进行构建。 可以通过修改 pkg/src/build.sbt 文件来指定对应的spark版本号。 2. rJava SparkR也要求安装名为rJava 的 R package, 在R中,可以通过以下命令来安装: install.packages("rJava") 20.2.1.2 PACKAGE安装 可以通过以下命令构建scala package 和 R package,用于SparkR开发: ./install-dev.sh To develop SparkR, you can build the scala package and the R package using ./install-dev.sh If you wish to try out the package directly from github, you can use install_github from devtools. Note that you can specify which branch, tag etc to install from. library(devtools) install_github("amplab-extras/SparkR-pkg", subdir="pkg") SparkR by default links to Hadoop 1.0.4. To use SparkR with other Hadoop versions, you will need to rebuild SparkR with the same version that Spark is linked to. For example to use SparkR with a CDH 4.2.0 MR1 cluster, you can run SPARK_HADOOP_VERSION=2.0.0-mr1-cdh4.2.0 ./install-dev.sh By default, SparkR uses sbt to build an assembly jar. If you wish to use maven instead, you can set the environment variable USE_MAVEN=1. For example USE_MAVEN=1 ./install-dev.sh If you are building SparkR from behind a proxy, you can setup maven to use the right proxy server. 20.3 RUNNING SPARKR If you have cloned and built SparkR, you can start using it by launching the SparkR shell with ./sparkR The sparkR script automatically creates a SparkContext with Spark by default in local mode. To specify the Spark master of a cluster for the automatically created SparkContext, you can run MASTER= ./sparkR If you have installed it directly from github, you can include the SparkR package and then initialize a SparkContext. For example to run with a local Spark master you can launch R and then run library(SparkR) sc <- sparkR.init(master="local") To increase the memory used by the driver you can export the SPARK_MEM environment variable. For example to use 1g, you can run SPARK_MEM=1g ./sparkR In a cluster setting to set the amount of memory used by the executors you can pass the variablespark.executor.memory to the SparkContext constructor. library(SparkR) sc <- sparkR.init(master="spark://:7077", sparkEnvir=list(spark.executor.memory="1g")) 20.4 EXAMPLES, UNIT TESTS SparkR comes with several sample programs in the examples directory. To run one of them, use./sparkR . For example: ./sparkR examples/pi.R local[2] You can also run the unit-tests for SparkR by running ./run-tests.sh 20.5 RUNNING ON EC2 Instructions for running SparkR on EC2 can be found in the SparkR wiki. 20.6 RUNNING ON YARN Currently, SparkR supports running on YARN with the yarn-client mode. These steps show how to build SparkR with YARN support and run SparkR programs on a YARN cluster: # assumes Java, R, rJava, yarn, spark etc. are installed on the whole cluster. cd SparkR-pkg/ USE_YARN=1 SPARK_YARN_VERSION=2.4.0 SPARK_HADOOP_VERSION=2.4.0 ./install-dev.sh Before launching an application, make sure each worker node has a local copy of lib/SparkR/sparkr-assembly-0.1.jar. With a cluster launched with the spark-ec2 script, do: ~/spark-ec2/copy-dir ~/SparkR-pkg Finally, when launching an application, the environment variable YARN_CONF_DIR needs to be set to the directory which contains the client-side configuration files for the Hadoop cluster (with a cluster launched with spark-ec2, this defaults to /root/ephemeral-hdfs/conf/): YARN_CONF_DIR=/root/ephemeral-hdfs/conf/ MASTER=yarn-client ./sparkR YARN_CONF_DIR=/root/ephemeral-hdfs/conf/ ./sparkR examples/pi.R yarn-client 20.7 REPORT ISSUES/FEEDBACK For better tracking and collaboration, issues and TODO items are reported to a dedicated SparkR JIRA. In your pull request, please cross reference the ticket item created. Likewise, if you already have a pull request ready, please reference it in your ticket item. 安装Java、R后,在R shell中安装rJava、devtools、SparkR 安装SparkR library(devtools) install_github("amplab-extras/SparkR-pkg", subdir="pkg") install.packages("rJava")、install.packages("devtools")、library(SparkR) 启动SparkR Shell :./sparkR 运行WordCount样例: sc <- sparkR.init(master="local", "RwordCount") lines <- textFile(sc, "README.md") words <- flatMap(lines, function(line) { strsplit(line, " ")[[1]] }) wordCount <- lapply(words, function(word) { list(word, 1L) }) counts <- reduceByKey(wordCount, "+", 2L) output <- collect(counts) for (wordcount in output) { cat(wordcount[[1]], ": ", wordcount[[2]], "\n") } 21 SPARK开发与应用 21.1 SPARK开发 21.2 SPARK应用 21.2.1 命令行提交应用程序 1. 导出jar包 选择“File”–> “Project Structure” –> “Artifact”,选择“+”–> “Jar” –> “From Modules with dependencies”,选择main函数,并在弹出框中选择输出jar位置,并选择“OK”。 最后依次选择“Build”–> “Build Artifact”编译生成jar包。 2. 命令行提交 22 SPARK 源码、编译 http://spark.apache.org/docs/latest/building-spark.html http://mmicky.blog.163.com/blog/static/1502901542014312101657612/ Spark的编译方法,本质上只有两种:Maven和SBT,但通过不同场景的应用,可以演化出多种编译方式。 Spark 的源码编译方法有以下几种: 1. Sbt 2. Maven 3. make-distribution.sh 4. IDE中的源码环境搭建与编译 编译结果示例及比较: 1. maven编译的jar包为:./assembly/target/scala-2.10/spark-assembly_2.10-1.0.0-SNAPSHOT-hadoop2.2.0.jar 2. SBT编译的jar包为: ./assembly/target/scala-2.10/spark-assembly-1.0.0-SNAPSHOT-hadoop2.2.0.jar 22.1 INTELLIJ IDEA中编译调试SPARK的程序 调试applications时,修改debug的前执行编译的命令为sbt的compile。 22.2 SBT编译 22.2.1 编译命令格式 SPARK_HADOOP_VERSION=2.2.0 SPARK_YARN=true sbt/sbt assembly 或 sbt/sbt -Pyarn -Phadoop-2.3 assembly 22.2.2 示例 sbt/sbt -Pyarn -Phadoop-2.4 -Dhadoop.version=2.6.0 -Phive -Phive-thriftserver assembly sbt/sbt -Pyarn -Phadoop-2.6 -Phive -Phive-thriftserver test 注:执行过程中出现三次错误,其中两处为粗心导致。 1. 1.2.0版本中,pom.xml 中没有定义hadoop-2.6的Profile,使用该Profile时会报错; 2. -Dhadoop.version处出现两次错误:version拼写错误 + 拷贝时在-D 和Hadoop中间多了个空行…………… 细节决定成败的说……….. 22.2.3 进一步说明 sbt编译过程比较慢。如果运行sbt时报sbt-launch.jar的错误,那么需要手动的安装,参考这里。 有时候由于网络问题可能导致编译hang在某个地方,或者你重启了,那么下次开始编译前需要删除$HOME/.ivy2/ .sbt.ivy.lock 然后重启开始。sbt会进行增量编译。 22.3 MAVEN 编译 仅部分,全部信息参考官网 此时,需要先设置MAVEN_OPTS,避免编译过程中出现OOM的错误,对应Java 8时,则不需要设置。 export MAVEN_OPTS="-Xmx2g -XX:MaxPermSize=512M -XX:ReservedCodeCacheSize=512m" For Apache Hadoop versions 1.x, Cloudera CDH “mr1” distributions, and other Hadoop versions without YARN, use: # Apache Hadoop 1.2.1 mvn -Dhadoop.version=1.2.1 -DskipTests clean package # Cloudera CDH 4.2.0 with MapReduce v1 mvn -Dhadoop.version=2.0.0-mr1-cdh4.2.0 -DskipTests clean package # Apache Hadoop 0.23.x mvn -Phadoop-0.23 -Dhadoop.version=0.23.7 -DskipTests clean package For Apache Hadoop 2.x, 0.23.x, Cloudera CDH, and other Hadoop versions with YARN, you can enable the “yarn-alpha” or “yarn” profile and optionally set the “yarn.version” property if it is different from “hadoop.version”.  # Apache Hadoop 2.0.5-alpha mvn -Pyarn-alpha -Dhadoop.version=2.0.5-alpha -DskipTests clean package # Cloudera CDH 4.2.0 mvn -Pyarn-alpha -Dhadoop.version=2.0.0-cdh4.2.0 -DskipTests clean package # Apache Hadoop 0.23.x mvn -Pyarn-alpha -Phadoop-0.23 -Dhadoop.version=0.23.7 -DskipTests clean package # Apache Hadoop 2.2.X mvn -Pyarn -Phadoop-2.2 -Dhadoop.version=2.2.0 -DskipTests clean package # Apache Hadoop 2.3.X mvn -Pyarn -Phadoop-2.3 -Dhadoop.version=2.3.0 -DskipTests clean package # Apache Hadoop 2.4.X or 2.5.X mvn -Pyarn -Phadoop-2.4 -Dhadoop.version=VERSION -DskipTests clean package Versions of Hadoop after 2.5.X may or may not work with the -Phadoop-2.4 profile (they were released after this version of Spark). # Different versions of HDFS and YARN. mvn -Pyarn-alpha -Phadoop-2.3 -Dhadoop.version=2.3.0 -Dyarn.version=0.23.7 -DskipTests clean package Building With Hive and JDBC Support # Apache Hadoop 2.4.X with Hive 13 support mvn -Pyarn -Phadoop-2.4 -Dhadoop.version=2.4.0 -Phive -Phive-thriftserver -DskipTests clean package # Apache Hadoop 2.4.X with Hive 12 support mvn -Pyarn -Phadoop-2.4 -Dhadoop.version=2.4.0 -Phive -Phive-0.12.0 -Phive-thriftserver -DskipTests clean package 不设置MAVEN_OPTS的话报错类似于: [INFO] Compiling 203 Scala sources and 9 Java sources to /Users/me/Development/spark/core/target/scala-2.10/classes... [ERROR] PermGen space -> [Help 1] [INFO] Compiling 203 Scala sources and 9 Java sources to /Users/me/Development/spark/core/target/scala-2.10/classes... [ERROR] Java heap space -> [Help 1] 22.4 SPARK 部署包生成命令MAKE-DISTRIBUTION.SH 在通过maven或sbt编译完源代码后,结果目录即包含了编译结果,也包含了源代码。 此时,可以用这个结果构建用户的开发与调试环境。但对应于部署环境,虽然可以直接用编译后的目录再加以配置就可以运行spark,但由于同时包含了源代码,不方便用于集群的部署。 spark源码根目录下已经提供了一个脚本文件make-distribution.sh用于生成部署包。 使用spark提供的部署脚本进行编译,具体参数如下: --hadoop VERSION : Hadoop 版本号,不加此参数时hadoop 版本为1.0.4 。 --with-yarn :是否支持Hadoop YARN ,不加参数时为不支持yarn 。 --with-hive :是否在Spark SQL 中支持hive ,不加此参数时为不支持hive 。 --skip-java-test :是否在编译的过程中略过java 测试,不加此参数时为略过。 --with-tachyon :是否支持内存文件系统Tachyon ,不加此参数时不支持tachyon 。spark1.0.0-SNAPSHOT之后 --tgz :在根目录下生成 spark-$VERSION-bin.tgz ,不加此参数时不生成tgz 文件,只生 成/dist 目录。 --name NAME :和— tgz 结合可以生成spark-$VERSION-bin-$NAME.tgz 的部署包,不加此 参数时NAME 为hadoop 的版本号。 部署包生成示例: – 生成支持yarn 、hadoop2.2.0 的部署包: ./make-distribution.sh --hadoop 2.2.0 --with-yarn --tgz – 生成支持yarn 、hive 的部署包: ./make-distribution.sh --hadoop 2.2.0 --with-yarn --with-hive –tgz – 生成支持yarn 、hive、Tachyon 的部署包: ./make-distribution.sh --hadoop 2.2.0 --with-yarn --with-tachyon --tgz 示例: ./make-distribution.sh --tgz -Pyarn -Phadoop -Dhadoop.version=2.5.0-cdh5.2.0 -Phive -Dhive.version=0.13.11 ./make-distribution.sh --skip-java-test --tgz --mvn mvn --with-tachyon -Pyarn -Phadoop-2.4 -Dhadoop.version=2.4.1 -Phive -Dhive.version=0.13.11 -Phive-thriftserver -DskipTests 报错: 注意: 1. --with-tachyon放前面时,编译报错 —— 具体记录已丢失,需重新验证…… —— --tachyon的参数,不能放在前面,否则执行时不识别; 2. 在配置好可用镜像之后,如果在download的过程中失败,可以多试几次 —— 网络因素导致; 值得注意的是:make-distribution.sh已经带有SBT编译过程,所以不需要先编译再打包。 在编译时的提示: 22.5 SPARK 1.3 编译脚本的变更 ./make-distribution.sh --skip-java-test --tgz --mvn mvn --with-tachyon -Pyarn -Phadoop-2.4 -Dhadoop.version=2.4.1 -Phive -Dhive.version=0.13.1 -Phive-thriftserver -DskipTests 后面mvn 这句,是build/mvn 文件中最后实际执行的代码。 22.5.1 官网下载的部署包 spark-1.3.0-bin-hadoop2.4 $ ll lib 总用量 273392 -rw-rw-r-- 1 harli harli 339666 3月 26 09:51 datanucleus-api-jdo-3.2.6.jar -rw-rw-r-- 1 harli harli 1890075 3月 26 09:51 datanucleus-core-3.2.10.jar -rw-rw-r-- 1 harli harli 1809447 3月 26 09:51 datanucleus-rdbms-3.2.9.jar -rw-rw-r-- 1 harli harli 4136744 3月 26 09:51 spark-1.3.0-yarn-shuffle.jar -rw-rw-r-- 1 harli harli 159319006 3月 26 09:51 spark-assembly-1.3.0-hadoop2.4.0.jar -rw-rw-r-- 1 harli harli 112446389 3月 26 09:51 spark-examples-1.3.0-hadoop2.4.0.jar 22.5.2 编译案例1 1. 编译代码: ./make-distribution.sh --skip-java-test --tgz --mvn mvn --with-tachyon -Pyarn -Phadoop-2.4 -Dhadoop.version=2.4.1 -Phive -Dhive.version=0.13.1 -Phive-thriftserver 2. 编译结果: [INFO] ------------------------------------------------------------------------ [INFO] Reactor Summary: [INFO] [INFO] Spark Project Parent POM .......................... SUCCESS [7.955s] [INFO] Spark Project Networking .......................... SUCCESS [16.980s] [INFO] Spark Project Shuffle Streaming Service ........... SUCCESS [10.652s] [INFO] Spark Project Core ................................ SUCCESS [4:40.897s] [INFO] Spark Project Bagel ............................... SUCCESS [10.388s] [INFO] Spark Project GraphX .............................. SUCCESS [33.478s] [INFO] Spark Project Streaming ........................... SUCCESS [59.923s] [INFO] Spark Project Catalyst ............................ SUCCESS [1:08.138s] [INFO] Spark Project SQL ................................. SUCCESS [1:29.073s] [INFO] Spark Project ML Library .......................... SUCCESS [1:51.935s] [INFO] Spark Project Tools ............................... SUCCESS [5.193s] [INFO] Spark Project Hive ................................ SUCCESS [1:18.146s] [INFO] Spark Project REPL ................................ SUCCESS [19.889s] [INFO] Spark Project YARN ................................ SUCCESS [18.258s] [INFO] Spark Project Hive Thrift Server .................. SUCCESS [11.169s] [INFO] Spark Project Assembly ............................ SUCCESS [3:33.830s] [INFO] Spark Project External Twitter .................... SUCCESS [11.642s] [INFO] Spark Project External Flume Sink ................. SUCCESS [9.788s] [INFO] Spark Project External Flume ...................... SUCCESS [16.074s] [INFO] Spark Project External MQTT ....................... SUCCESS [13.532s] [INFO] Spark Project External ZeroMQ ..................... SUCCESS [13.115s] [INFO] Spark Project External Kafka ...................... SUCCESS [21.436s] [INFO] Spark Project Examples ............................ SUCCESS [4:46.596s] [INFO] Spark Project YARN Shuffle Service ................ SUCCESS [17.095s] [INFO] Spark Project External Kafka Assembly ............. SUCCESS [57.397s] [INFO] ------------------------------------------------------------------------ [INFO] BUILD SUCCESS [INFO] ------------------------------------------------------------------------ [INFO] Total time: 24:43.966s [INFO] Finished at: Wed Apr 01 10:26:11 CST 2015 [INFO] Final Memory: 78M/822M [INFO] ------------------------------------------------------------------------ Fetching tachyon tgz tar (child): tachyon-0.5.0-bin.tar.gz: Cannot open: No such file or directory tar (child): Error is not recoverable: exiting now tar: Child returned status 2 tar: Error is not recoverable: exiting now 木有放入Tachyon的编译包。 3. 编译结果lib包: # ll lib total 273396 -rw-r--r--. 1 root root 339666 Apr 1 10:26 datanucleus-api-jdo-3.2.6.jar -rw-r--r--. 1 root root 1890075 Apr 1 10:26 datanucleus-core-3.2.10.jar -rw-r--r--. 1 root root 1809447 Apr 1 10:26 datanucleus-rdbms-3.2.9.jar -rw-r--r--. 1 root root 4136762 Apr 1 10:26 spark-1.3.0-yarn-shuffle.jar -rw-r--r--. 1 root root 159325312 Apr 1 10:26 spark-assembly-1.3.0-hadoop2.4.1.jar -rw-r--r--. 1 root root 112448656 Apr 1 10:26 spark-examples-1.3.0-hadoop2.4.1.jar 22.5.3 编译案例2 1. 编译代码: ./make-distribution.sh --skip-java-test --tgz --mvn mvn -Pyarn -Phadoop-2.4 -Dhadoop.version=2.6.0 -Phive -Dhive.version=0.13.1 -Phive-thriftserver 22.5.4 编译案例3 1. 编译代码: ./make-distribution.sh --skip-java-test --tgz --mvn mvn -Pyarn -Phadoop-2.4 -Dhadoop.version=2.6.0 -Phive -Dhive.version=1.1.0 -Phive-thriftserver 说明:当前pom.xml 的profile-Hadoop只支持hadoop-2.4, 更新的版本需要自己增加profile,或直接使用hadoop-2.4这个,同时使用正确的2.6.0 版本 修改了Hadoop版本和--with-tachyon。 2. 编译结果: 22.6 MVN测试 mvn -Dsuites=org.apache.spark…XXXSuite test 22.7 WIN下源码编译 环境要求:首先,你得有一个cygwin…. 其他参考linux下编译环境要求。 22.7.1 SBT编译 根据官网运行编译脚本spark-master/sbt/sbt,报错如下: \r: win下回车换行与Linux不同,出现此类错误,可以判断是脚本不识别win的dos格式,因此,对文件进行格式转换。 格式转换:修改源码下sbt目录下两个脚本的格式,转换后保存。 spark-master\sbt : sbt、sbt-launch-lib.bash —— 修改前please保留下原件! 通过UltraEdit工具进行修改: 1. 重复执行第一步的操作,此时报错如下: 可以推断: (1) Build的属性文件查找失败 —— (2) Line54代码受影响 —— 查看报错信息,在使用属性文件时,采用了相对路径,而当前在/目录下执行sbt/sbt命令,因此,文件查找失败。需进入Spark目录下,执行命令。 Line54对应的代码,设置了sbt-launch-.jar的路径。 先用简单方法,直接修改Line54被修改的代码,由提示可以被影响的是sbt-launch-.jar文件找不到,因此,对应代码修改如下: (Line54:acquire_sbt_jar),影响的代码在Line166(run处),修改如下: 注释Line54:acquire_sbt_jar的使用,直接用本机的sbt-launch-.jar路径设置sbt_jar。 2. 继续执行第一步的操作,可能会报错如下: 对应内存空间不够,有两种方式:1)修改sbt的配置文件,增加jvm的内存空间;2)关闭当前费内存的应用(本机为IDEA),关闭后重启即可。 3. 继续执行第一步的操作,出现提示: … 4. 到这一步时,推测已经正常,继续进入spark源码目录,然后在源码目录下运行: …… 这一步比较耗时,坐等………….. 23 SPARK源码环境搭建及编译 以IDEA为例,其他IDE原理上应该是类似的。 23.1 源码环境搭建 23.1.1 SCALA 版本:scala 2.10.x 下载:http://www.scala-lang.org/files/archive/scala-2.10.4.msi 验证:安装完成后,在命令行窗口(cmd)输入scala: 23.1.2 GIT 安装: 1. 安装支持Windows的Git:此处使用Github页面上的windows的安装版本(GitHubSetup.exe)。 2. 设置环境变量:将git.exe所在的bin目录添加到Path环境变量,如:安装目录\GitHub\PortableGit_....\bin; 说明:我在安装时,path配置没有自动设置,需手动设置,根据GitHub所在路径,搜索git.exe文件,可以找到对应的bin路径,将该bin路径设置到path中即可。 验证: 1. 运行GitHub、Git Shell木有问题! 2. 命令行窗口(cmd)运行:git –version,显示版本信息,安装、配置路径成功! 23.1.3 INTELLIJ IDEA 要求:需要安装jdk,git。 Spark相关:scala 2.10.x (注意版本)。 安装: 1. 下载最新免费的商业版本 IDEA -14.0.1 : http://www.jetbrains.com/idea/download/; 2. 安装后,启动IDEA,安装scala插件:依次选择“Configure”–> “Plugins”–> “Browse repositories”,输入scala,然后安装即可; 初始界面,在右下角Configure里面。 补充:也可以在相应界面上直接下载scala插件,放到IDEA对应插件目录下, 然后重启IDEA。 23.2 源码下载 从github获得spark源码: git clone https://github.com/apache/spark.git 23.3 INTELLIJ IDEA - SBT方式 23.4 搭建SPARK源码阅读环境(需要联网) 官方网站建议用sbt-idea插件建立IntelliJ IDEA的工程。 1. 第一种方法:依次选择“import project”–> 选择spark所在目录 –> “SBT”,之后intellij会自动识别SBT文件,并下载依赖的外部jar包,整个流程用时非常长,取决于机器的网络环境(不建议在windows下操作,可能遇到各种问题),一般需花费几十分钟到几个小时。 注意,下载过程会用到git,因此应该事先安装了git。 另外,导入后,项目文件不会自动生成(可以确认下),因此对应的依赖信息等缺失,会导致编译时失败。 2. 第二种方法:首先在linux操作系统上生成intellij项目文件,然后在intellij IDEA中直接通过“Open Project”打开项目。 生成intellij项目文件的方法: 1. 在spark源代码根目录下,输入sbt/sbt gen-idea(需要安装git,不需要安装scala,sbt会自动下载); 2. IDEA中生成,在插件窗口中添加名字为sbt开头的两个插件,之后会在菜单栏中新增SBT Windows,点击其gen-ieda菜单项即可。 注:如果你在windows下阅读源代码,建议先在linux下生成项目文件,然后导入到windows中的intellij IDEA中。 23.5 搭建SPARK开发环境 23.5.1 应用程序的创建 1. 创建工程:在intellij IDEA中创建scala project; 2. 导入编译的spark依赖包:IDEA中,选择 “File”–> “project structure” –> “Libraries”,选择“+”,添加该spark-hadoop的jar包,如 注:spark应用程序只需要导入spark-hadoop对应的jar依赖包(如spark-1.1.0-hadoop-2.2.0.bin.tgz),不需要其他库。 另外,Scala插件安装之后,IDE会自动识别scala库,如果不识别,再手动导入。 23.5.2 INTELLIJ IDEA调试、运行应用程序 编写完应用程序之后,可以在IDEA上直接调试、运行。 应用程序运行的入口脚本spark-submit.sh,对应SparkSubmit类,可以知道各种模式下应用程序执行方式。 23.6 MAVEN方式 1. 获取源码; 2. 编辑pom.xml 文件,将关于yarn的profile中增加一项yarn.version; yarn 2 2.6.0 2.6.0 2.5.0 3. 启动IDEA, 菜单File -> import project -> 编译目录中的pom.xml -> 在选择profile时选择yarn -> 直到导入项目; 4. 在maven projects视图选择Spark Project Parent POM(root),然后选中工具栏倒数第四个按钮(ship Tests mode)按下,这时Liftcycle中test是灰色的。 5. 接着按倒数第一个按钮进入Maven设置,在runner项设置VM option: -Xmx2g -XX:MaxPermSize=512M -XX:ReservedCodeCacheSize=512m 按OK 保存。 回到maven projects视图,点中Liftcycle中package,然后按第5个按钮(Run Maven Build按钮),开始编译。其编译结果和Maven编译是一样的。 24 部署与应用 Spark部署包括: 1. Spark 集群部署; 2. Spark 应用程序部署。 24.1 集群启动 24.1.1 手动方式 Master启动: SPARK_HOME/sbin/start-master.sh Worker启动: SPARK_HOME /bin/spark-class org.apache.spark.deploy.worker.Worker spark://master:7077 24.2 SPARK应用 Spark两个应用工具: 1. spark-shell:交互式工具; 2. spark-submit:应用程序部署工具。 应用程序提交架构: 1. Spark 应用程序由两部分组成: 一个driver 和多个executor; 2. Spark 应用程序可以在多种集群里运行: Mesos 、YARN 、Spark Standalone 、AWS…; 3. Spark 应用程序的部署工具是spark-submit; 24.2.1 SPARK-SUBMIT方式 工具spark-submit: C:\Users\lenovo>E:\bd\spark-1.1.0\bin\spark-shell.cmd -h Usage: spark-submit [options] [app options] Options: --master MASTER_URL spark://host:port, mesos://host:port, yarn, or local. --deploy-mode DEPLOY_MODE Whether to launch the driver program locally ("client") or on one of the worker machines inside the cluster ("cluster" (Default: client). --class CLASS_NAME Your application's main class (for Java / Scala apps). --name NAME A name of your application. --jars JARS Comma-separated list of local jars to include on the driver and executor classpaths. --py-files PY_FILES Comma-separated list of .zip, .egg, or .py files to place on the PYTHONPATH for Python apps. --files FILES Comma-separated list of files to be placed in the working directory of each executor. --conf PROP=VALUE Arbitrary Spark configuration property. --properties-file FILE Path to a file from which to load extra properties. If not specified, this will look for conf/spark-defaults.conf. --driver-memory MEM Memory for driver (e.g. 1000M, 2G) (Default: 512M). --driver-java-options Extra Java options to pass to the driver. --driver-library-path Extra library path entries to pass to the driver. --driver-class-path Extra class path entries to pass to the driver. Note that jars added with --jars are automatically included in the classpath. --executor-memory MEM Memory per executor (e.g. 1000M, 2G) (Default: 1G). --help, -h Show this help message and exit --verbose, -v Print additional debug output Spark standalone with cluster deploy mode only: --driver-cores NUM Cores for driver (Default: 1). --supervise If given, restarts the driver on failure. Spark standalone and Mesos only: --total-executor-cores NUM Total cores for all executors. YARN-only: --executor-cores NUM Number of cores per executor (Default: 1). --queue QUEUE_NAME The YARN queue to submit to (Default: "default"). --num-executors NUM Number of executors to launch (Default: 2). --archives ARCHIVES Comma separated list of archives to be extracted into the working directory of each executor. 24.2.2 提交时DRIVER信息解析 24.2.2.1 STANDALONE Client模式 - 命令行相关选项: Cluster模式 - 命令行相关选项: --driver-cores NUM Cores for driver (Default: 1). --supervise If given, restarts the driver on failure. 24.2.2.2 YARN 24.2.2.3 MESOS 24.2.3 提交时EXECUTOR信息解析 24.2.3.1 LOCAL 命令行相关选项: 1. --executor-memory MEM :每个 Executor的内存设置; 2. --master MASTER_URL : 对应为Local[*],其中*表示Executor的个数 —— 待确认;并且对应的deploy-mode为client。 说明:由指定的master选项,得到Executor的个数。 24.2.3.2 STANDALONE 命令行相关选项: 1. --executor-memory MEM :每个 Executor的内存设置; 2. --total-executor-cores NUM:Executor总的内核数; 说明:根据Master调度算法,得到Executor的个数。 24.2.4 SPARK-SHELL方式 提交参数同SparkSubmit脚本方式。 24.3 AMAZON EC2 24.4 SPARK STANDALONE Spark + Hadoop-HDFS 24.5 单机模式-LOCAL 24.5.1 SPARK STANDALONE 伪分布式模式部署 1. Java 的安装 2. ssh 无密码登录 3. Spark 安装包部署 4. 配置 5. 测试 24.5.2 SPARK STANDALONE集群部署 集群中有两种节点,一种是Master,另一种是Worker节点。 24.5.2.1 安装环境 1. Java 的安装 2. ssh 无密码登录 3. Spark 安装包解压 4. Spark 配置文件配置 1) 文件conf/slave Ø hadoop1 Ø hadoop2 Ø hadoop3 2) 文件conf/spark-env.sh export SPARK_MASTER_IP=hadoop1 export SPARK_MASTER_PORT=7077 export SPARK_WORKER_CORES=1 export SPARK_WORKER_INSTANCES=1 export SPARK_WORKER_MEMORY=3g 24.5.2.2 配置-环境变量 文件:spark-env.sh # Options read when launching programs locally with ./bin/run-example or ./bin/spark-submit # - HADOOP_CONF_DIR, to point Spark towards Hadoop configuration files # - SPARK_LOCAL_IP, to set the IP address Spark binds to on this node # - SPARK_PUBLIC_DNS, to set the public dns name of the driver program # - SPARK_CLASSPATH, default classpath entries to append # Options read by executors and drivers running inside the cluster # - SPARK_LOCAL_IP, to set the IP address Spark binds to on this node # - SPARK_PUBLIC_DNS, to set the public DNS name of the driver program # - SPARK_CLASSPATH, default classpath entries to append # - SPARK_LOCAL_DIRS, storage directories to use on this node for shuffle and RDD data # - MESOS_NATIVE_LIBRARY, to point to your libmesos.so if you use Mesos # Options for the daemons used in the standalone deploy mode: # - SPARK_MASTER_IP, to bind the master to a different IP address or hostname # - SPARK_MASTER_PORT / SPARK_MASTER_WEBUI_PORT, to use non-default ports for the master:Master服务端口,默认为7077 # - SPARK_MASTER_OPTS, to set config properties only for the master (e.g. "-Dx=y") # - SPARK_WORKER_CORES, to set the number of cores to use on this machine:每个Worker进程所需要的CPU核的数目 # - SPARK_WORKER_MEMORY, to set how much total memory workers have to give executors (e.g. 1000m, 2g):每个Worker进程所需要的内存大小 # - SPARK_WORKER_PORT / SPARK_WORKER_WEBUI_PORT, to use non-default ports for the worker # - SPARK_WORKER_INSTANCES, to set the number of worker processes per node:每个Worker节点上运行Worker进程的数目 # - SPARK_WORKER_DIR, to set the working directory of worker processes # - SPARK_WORKER_OPTS, to set config properties only for the worker (e.g. "-Dx=y") # - SPARK_HISTORY_OPTS, to set config properties only for the history server (e.g. "-Dx=y") # - SPARK_DAEMON_JAVA_OPTS, to set config properties for all daemons (e.g. "-Dx=y") # - SPARK_PUBLIC_DNS, to set the public dns name of the master or workers SPARK_MASTER_WEBUI_PORT:Master节点对应Web服务的端口; export SPARK_DAEMON_JAVA_OPTS="-Dspark.deploy.recoveryMode=ZOOKEEPER -Dspark.deploy.zookeeper.url=192.168.1.117:2181,192.168.1.118:2181,192.168.1.119:2181 -Dspark.deploy.zookeeper.dir=/spark":用于指定Master的HA,依赖于zookeeper集群 export SPARK_JAVA_OPTS="-Dspark.cores.max=4":用于限定每个提交的Spark Application的使用的CPU核的数目,因为缺省情况下提交的Application会使用所有集群中剩余的CPU Core。 配置-属性 文件:spark-default.conf #用于指定Master的HA,依赖于zookeeper集群 spark.deploy.recoveryMode ZOOKEEPER spark.deploy.zookeeper.url 192.168.1.117:2181,192.168.1.118:2181,192.168.1.119:2181 spark.deploy.zookeeper.dir /spark #用于限定每个提交的Spark Application的使用的CPU核的数目,因为缺省情况下提交的Application会使用所有集群中剩余的CPU Core spark.cores.max 4 注意:在Worker进程的CPU个数和内存大小的时候,要结合机器的实际硬件条件,如果一个Worker节点上的所有Worker进程需要的CPU总数目或者内存大小超过当前Worker节点的硬件条件,则Worker进程会启动失败。 将配置好的Spark文件拷贝至每个Spark集群的节点上的相同路径中。为方便使用spark-shell,可以在环境变量中配置上SPARK_HOME。 24.5.3 HA节点启动 Active节点:. /sbin /start-all.sh Standby节点:只需要启动master即可, ./sbin/start-master.sh 24.5.4 监控 http://activemaster:8080/ http://standbymaster:8080/ 24.5.5 测试 Spark的bin子目录中的spark-submit脚本是用于提交程序到集群中运行的工具,我们使用此工具做一个关于pi的计算。命令如下: ./bin/spark-submit --master spark://spark113:7077 \  --class org.apache.spark.examples.SparkPi \  --name Spark-Pi --executor-memory 400M \  --driver-memory 512M \   /home/hadoop/spark-1.0.0/examples/target/scala-2.10/spark-examples-1.0.0-hadoop2.0.0-cdh4.5.0.jar   24.5.6 SPARK CLIENT 部署 1. Java 的安装 2. ssh 无密码登录 3. Spark 安装包部署 4. 测试 24.5.7 SPARK STANDALONE HA 部署 24.5.7.1 基于文件系统的HA spark.deploy.recoveryMode 设成FILESYSTEM spark.deploy.recoveryDirectory Spark 保存恢复状态的目录 spark-env.sh 里对SPARK_DAEMON_JAVA_OPTS 设置: export SPARK_DAEMON_JAVA_OPTS="-Dspark.deploy.recoveryMode=FILESYSTEM -Dspark.deploy.recoveryDirectory=/app/hadoop/spark100/recovery" 24.5.7.2 基于ZOOKEEPER 的HA spark.deploy.recoveryMode 设置成ZOOKEEPER spark.deploy.zookeeper.url ZooKeeper URL spark.deploy.zookeeper.dir ZooKeeper 保存恢复状态的目录,缺省为/spark spark-env 里对SPARK_DAEMON_JAVA_OPTS 设置: export SPARK_DAEMON_JAVA_OPTS="-Dspark.deploy.recoveryMode=ZOOKEEPER -Ds park.deploy.zookeeper.url=hadoop1:2181,hadoop2:2181,hadoop3:2181 -Dspar k.deploy.zookeeper.dir=/spark" 24.6 APACHE MESOS 24.7 HADOOP YARN Spark + Hadoop-HDFS + Hadoop-YARN 1. 由YARN负责集群的资源分配; 2. 通过Spark客户端来向Yarn提交任务运行; 24.7.1 运行模式 Spark On Yarn有两种运行模式: 1. Yarn Cluster方式 2. Yarn Client方式。 相关命令行选项: 1. --master:指定程序以yarn、yarn-cluster或者yarn-client中的一种方式运行。 2. --deploy-mode:在指定--master为yarn时,可以通过该选项继续设置client或cluster部署模式,默认为client。 两种运行模式的说明: 1.  Yarn Client: Spark Driver程序在客户端上运行,然后向Yarn申请运行exeutor以运行Task,本地程序负责最后的结果汇总等。客户端的Driver将应用提交给Yarn后,Yarn会先后启动ApplicationMaster和executor,另外ApplicationMaster和executor都是装载在container里运行,container默认的内存是1G,ApplicationMaster分配的内存是driver-memory,executor分配的内存是executor-memory。同时,因为Driver在客户端,所以程序的运行结果可以在客户端显示,Driver以进程名为SparkSubmit的形式存在。 2. Yarn Cluster: Spark Driver程序将作为一个ApplicationMaster在YARN集群中先启动,然后再由ApplicationMaster向RM申请资源启动executor以运行Task。因为Driver程序在Yarn中运行,所以程序的运行结果不能在客户端显示,所以最好将结果保存在HDFS上,客户端的终端显示的是作为Yarn的job的运行情况。 24.7.2 配置-环境变量 文件:spark-env.sh # Options read when launching programs locally with ./bin/run-example or ./bin/spark-submit # - HADOOP_CONF_DIR, to point Spark towards Hadoop configuration files:HDFS节点中的conf配置文件路径,正常情况下此目录为$HADOOP_HOME/etc/hadoop # - SPARK_LOCAL_IP, to set the IP address Spark binds to on this node # - SPARK_PUBLIC_DNS, to set the public dns name of the driver program # - SPARK_CLASSPATH, default classpath entries to append # Options read by executors and drivers running inside the cluster # - SPARK_LOCAL_IP, to set the IP address Spark binds to on this node # - SPARK_PUBLIC_DNS, to set the public DNS name of the driver program # - SPARK_CLASSPATH, default classpath entries to append # - SPARK_LOCAL_DIRS, storage directories to use on this node for shuffle and RDD data # - MESOS_NATIVE_LIBRARY, to point to your libmesos.so if you use Mesos # Options read in YARN client mode # - HADOOP_CONF_DIR, to point Spark towards Hadoop configuration files:当前节点中HDFS的部署路径 # - SPARK_EXECUTOR_INSTANCES, Number of workers to start (Default: 2):在Yarn集群中启动的Worker的数目,默认为2个 # - SPARK_EXECUTOR_CORES, Number of cores for the workers (Default: 1).:每个Worker所占用的CPU核的数目 # - SPARK_EXECUTOR_MEMORY, Memory per Worker (e.g. 1000M, 2G) (Default: 1G):每个Worker所占用的内存大小 # - SPARK_DRIVER_MEMORY, Memory for Master (e.g. 1000M, 2G) (Default: 512 Mb):Spark应用程序Application所占的内存大小,这里的Driver对应Yarn中的ApplicationMaster # - SPARK_YARN_APP_NAME, The name of your application (Default: Spark):Spark Application在Yarn中的名字 # - SPARK_YARN_QUEUE, The hadoop queue to use for allocation requests (Default: ‘default’) # - SPARK_YARN_DIST_FILES, Comma separated list of files to be distributed with the job. # - SPARK_YARN_DIST_ARCHIVES, Comma separated list of archives to be distributed with the job. 24.7.3 部署 1. 只需将Spark的部署包放置到Yarn集群的某个节点上即可(或者是Yarn的客户端,能读取到Yarn集群的配置文件即可); 2. Spark本身的Worker节点、Master节点不需要启动。 24.7.4 监控 24.7.4.1 LOG方式 on Yarn的两种运行方式,其运行结束后的日志不能在Yarn的Application管理界面看到,目前只能在客户端通过yarn logs命令查看每个Application的日志: yarn logs -applicationId 24.7.5 测试 在Spark的部署路径的bin路径下,执行spark-submit脚本来运行spark-examples包中的例子。执行如下: ./bin/spark-submit --master yarn \   --class org.apache.spark.examples.JavaWordCount \   --executor-memory 400M \  --driver-memory 400M \   /home/hadoop/spark-1.0.0/examples/target/scala-2.10/spark-examples-1.0.0-hadoop2.0.0-cdh4.5.0.jar ./hdfs-site.xml  Spark应用程序需要的CPU Core数目和内存,需要根据当前Yarn的NodeManager的硬件条件相应设置,不能超过NodeManager的硬件条件。 在Yarn的ResourceManager对应的Web界面中查看启动的Application。 同时可以在启动脚本的客户端看到WordCount的运行结果。 24.8  SPARK HISTORYSERVER  类似于Mapreduce的JobHistoryServer,Spark也有一个服务可以保存历史Application的运行记录。 24.8.1 配置-属性 文件:修改$SPARK_HOME/conf下的spark-defaults.conf文件(注意,修改后的配置文件在每个节点都要有),其中可修改的配置属性为: 属性名称 默认值 含义 spark.history.updateInterval 10 以秒为单位,更新日志相关信息的时间间隔 spark.history.retainedApplications 250 保存Application历史记录的个数,如果超过这个值,旧的应用程序信息将被删除 spark.history.ui.port 18080 HistoryServer的web端口  spark.history.kerberos.enabled False 是否使用kerberos方式登录访问HistoryServer,对于持久层位于安全集群的 HDFS上是有用的,如果设置为true,就要配置下面的两个属性  spark.history.kerberos.principal   用于HistoryServer的kerberos主体名称 spark.history.kerberos.keytab   用于HistoryServer的kerberos keytab文件位置 spark.history.ui.acls.enable False 授权用户查看应用程序信息的时候是否检查acl。如果启用,只有应用程序所有者和spark.ui.view.acls指定的用户可以查看应用程序信息;否则,不做任何检查 spark.eventLog.enabled False 是否记录Spark事件 spark.eventLog.dir file:///tmp/spark-events  保存日志相关信息的路径,可以是hdfs://开头的HDFS路径,也可以是file://开头的本地路径,都需要提前创建 spark.yarn.historyServer.address   Server端的URL:Ip:port 或者host:port 如: spark.eventLog.enabled  true   spark.eventLog.dir      hdfs://yh/user/hadoop/sparklogs  spark.yarn.historyServer.address    master:18080  24.8.2 启动  设置完文件之后,进入SPARK_HOME目录启动服务:./sbin/start-history-server.sh 24.8.3 监控 运行完成的Application历史记录可以通过访问上面指定的HistoryServer地址查看。 http://master:18080/ 无论运行时是本地模式,还是yarn-client、yarn-cluster,运行记录均可在此页面查看。 并且程序运行时的环境变量、系统参数、各个阶段的耗时均可在此查看。 25 监控 25.1 SPARK的WEB监控页面 集群监控界面: http://master:8080 Worker 监控界面: http://worker:8081 Dirver监控界面:http://driver:4040 Driver:根据部署模式,分别 在提交节点或Worker节点上运行,默认的port为4040,可指定具体端口或随机分配一个端口号(0时随机生成)。 细节参见小节:FAQ部分的单节点多应用启动失败: --conf "spark.ui.port"=4041 25.2 SPARK的日志 Master日志目录: $SPARK_HOME/log Worker日志目录:$SPARK_HOME/worker 其中,Worker目录可通过配置属性进行设置,Excutor输出有stdout和stderr两个文件,可在对应的UI界面上查看具体内容和路径。 25.3 SPARK的日志之内存 25.3.1 案例一 启动命令: [harli@wxx215 spark-1.3.0-bin-hadoop2.4]$ ./bin/spark-shell --executor-memory 4g 界面提示: Welcome to ____ __ / __/__ ___ _____/ /__ _\ \/ _ \/ _ `/ __/ '_/ /___/ .__/\_,_/_/ /_/\_\ version 1.3.0 /_/ Using Scala version 2.10.4 (Java HotSpot(TM) Server VM, Java 1.7.0) Type in expressions to have them evaluated. Type :help for more information. 15/04/08 14:20:30 INFO spark.SparkContext: Running Spark version 1.3.0 15/04/08 14:20:30 INFO spark.SecurityManager: Changing view acls to: harli 15/04/08 14:20:30 INFO spark.SecurityManager: Changing modify acls to: harli 15/04/08 14:20:30 INFO spark.SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(harli); users with modify permissions: Set(harli) 15/04/08 14:20:31 INFO slf4j.Slf4jLogger: Slf4jLogger started 15/04/08 14:20:31 INFO Remoting: Starting remoting 15/04/08 14:20:31 INFO Remoting: Remoting started; listening on addresses :[akka.tcp://sparkDriver@wxx215:45077] 15/04/08 14:20:31 INFO util.Utils: Successfully started service 'sparkDriver' on port 45077. 15/04/08 14:20:31 INFO spark.SparkEnv: Registering MapOutputTracker 15/04/08 14:20:31 INFO spark.SparkEnv: Registering BlockManagerMaster 15/04/08 14:20:31 INFO storage.DiskBlockManager: Created local directory at /tmp/spark-d3d2ece8-f946-4cbe-9db5-b5fc8b671cc0/blockmgr-742a4ee6-153e-49cf-b0a3-95aa6ecdf7a2 15/04/08 14:20:32 INFO storage.MemoryStore: MemoryStore started with capacity 265.0 MB 15/04/08 14:20:32 INFO spark.HttpFileServer: HTTP File server directory is /tmp/spark-7a14d5e4-588b-43c6-8cf1-ad746fcdd7b3/httpd-ea813766-3ef2-4dd5-bd04-ee7f4cb3394f 15/04/08 14:20:32 INFO spark.HttpServer: Starting HTTP Server 15/04/08 14:20:32 INFO server.Server: jetty-8.y.z-SNAPSHOT 15/04/08 14:20:32 INFO server.AbstractConnector: Started SocketConnector@0.0.0.0:29553 15/04/08 14:20:32 INFO util.Utils: Successfully started service 'HTTP file server' on port 29553. 15/04/08 14:20:32 INFO spark.SparkEnv: Registering OutputCommitCoordinator 15/04/08 14:20:32 INFO server.Server: jetty-8.y.z-SNAPSHOT 15/04/08 14:20:32 INFO server.AbstractConnector: Started SelectChannelConnector@0.0.0.0:4040 15/04/08 14:20:32 INFO util.Utils: Successfully started service 'SparkUI' on port 4040. 15/04/08 14:20:32 INFO ui.SparkUI: Started SparkUI at http://wxx215:4040 15/04/08 14:20:32 INFO executor.Executor: Starting executor ID on host localhost 15/04/08 14:20:32 INFO executor.Executor: Using REPL class URI: http://192.168.70.215:60802 15/04/08 14:20:32 INFO util.AkkaUtils: Connecting to HeartbeatReceiver: akka.tcp://sparkDriver@wxx215:45077/user/HeartbeatReceiver 15/04/08 14:20:32 INFO netty.NettyBlockTransferService: Server created on 26595 15/04/08 14:20:32 INFO storage.BlockManagerMaster: Trying to register BlockManager 15/04/08 14:20:32 INFO storage.BlockManagerMasterActor: Registering block manager localhost:26595 with 265.0 MB RAM, BlockManagerId(, localhost, 26595) 15/04/08 14:20:32 INFO storage.BlockManagerMaster: Registered BlockManager 界面的内存显示:INFO storage.BlockManagerMasterActor: Registering block manager localhost:26595 with 265.0 MB RAM —— 对应了driver program的内存。 当前未设置时,默认512M * 内存比例。 25.3.2 案例二 启动命令 [harli@wison215 spark-1.3.0-bin-hadoop2.4]$ ./bin/spark-shell --master spark://192.168.70.214:7077 --driver-memory 2g --executor-memory 4g 界面: Shell界面的内存显示也对应于 Executor ID 为的地方。 Executor的参考其他几个内存部分。 25.4 SPARK进程查看 可通过命令:ps -ef|grep -i java 查看提交节点、worker等节点上运行的相关进程。 如各个组件进程以及SparkSubmit、CoarseGrainedExecutorBackend、DriverWrapper等进程。 25.5 METRICS 官网:http://spark.apache.org/docs/latest/monitoring.html 26 SPARK可配置参数 Spark参数的配置可通过三种方式:SparkConf方式 > 命令行参数方式 >文件配置方式。 配置文件有两个,spark-env.sh和spark-default.conf env一般用于设置环境变量和配置属性,在对应的OPT中用-Dkey=value方式添加, 我一般用于设置和master或slave等单独相关的配置属性 default 设置方式最简单, 一般用于通用的,master和slave都可以使用的属性 另外,jvm等相关的,需要在Java的opts里设置的,也在env配置文件中 env中,针对opt的设置,有好几个,对应后台进程的,master、slave进程的,都各有一个 26.1 应用属性 属性名 默认值 含义  spark.app.name   应用程序名称 spark.master   要连接的Spark集群Master的URL,对应于不同的集群管理器。 spark.executor.memory 参见Storage部分 spark.serializer/spark.kryo.registrator 参见序列化部分 spark.local.dir 参见Storage部分 spark.logConf False SparkContext 启动时是否记录有效 SparkConf信息 —— 对应可整理到调试、监控部分 26.2 运行环境变量 属性名 默认值 含义 spark.executor.extraJavaOptions   传递给executor的额外JVM 选项,但是不能使用它来设置Spark属性或堆空间大小 spark.executor.extraClassPath   追加到executor类路径中的附加类路径 spark.executor.extraLibraryPath   启动executor JVM 时要用到的特殊库路径 spark.files.userClassPathFirst False executor在加载类的时候是否优先使用用户自定义的JAR包,而不是Spark带有的JAR包,目前,该属性只是一项试验功能   26.3 STORAGE相关属性 属性名 默认值 含义  spark.local.dir /tmp 用于保存map输出文件或者转储RDD(如RDD Cache,Shuffle,Spill等数据的位置)。 性能提升的两种方法: 1. 首先,最基本的当然是我们可以配置多个路径(用逗号分隔)到多个磁盘上增加整体IO带宽,这个大家都知道。 2. 其次,目前的实现中,Spark是通过对文件名采用hash算法分布到多个路径下的目录中去,如果你的存储设备有快有慢,比如SSD+HDD混合使用,那么你可以通过在SSD上配置更多的目录路径来增大它被Spark使用的比例,从而更好地利用SSD的IO带宽能力。当然这只是一种变通的方法,终极解决方案还是应该像目前HDFS的实现方向一样,让Spark能够感知具体的存储设备类型,针对性的使用。 在Spark 1.0 及更高版本此属性会被环境变量 SPARK_LOCAL_DIRS (Standalone、Mesos) 或 LOCAL_DIRS (YARN) 代替。 spark.executor.memory 512 m 每个executor使用的内存大小。 这个参数最终会被设置到Executor的JVM的heap尺寸上,对应的就是Xmx和Xms的值。 理论上Executor 内存当然是多多益善,但是实际受机器配置,以及运行环境,资源共享,JVM GC效率等因 素的影响,还是有可能需要为它设置一个合理的大小。 多大算合理,要看实际情况 Executor的内存基本上是Executor内部所有任务共享的,而每个Executor上可以支持的任务的数量取决于Executor所管理的CPU Core资源的多少,因此你需要了解每个任务的数据规模的大小,从而推算出每个Executor大致需要多少内存即可满足基本的需求。 如何知道每个任务所需内存的大小呢,这个很难统一的衡量,因为除了数据集本身的开销,还包括算法所需各种临时内存空间的使用,而根据具体的代码算法等不同,临时内存空间的开销也不同。但是数据集本身的大小,对最终所需内存的大小还是有一定的参考意义的。 通常来说每个分区的数据集在内存中的大小,可能是其在磁盘上源数据大小的若干倍(不考虑源数据压缩,Java对象相对于原始裸数据也还要算上用于管理数据的数据结构的额外开销),需要准确的知道大小的话,可以将RDD cache在内存中,从BlockManager的Log输出可以看到每个Cache分区的大小(其实也是估算出来的,并不完全准确) 如: BlockManagerInfo: Added rdd_0_1 on disk on sr438:41134 (size: 495.3 MB) 反过来说,如果你的Executor的数量和内存大小受机器物理配置影响相对固定,那么你就需要合理规划每个分区任务的数据规模,例如采用更多的分区,用增加任务数量(进而需要更多的批次来运算所有的任务)的方式来减小每个任务所需处理的数据大小。 spark.storage.memoryFraction 0.6 Java堆用于cache的比例。 如前面所说spark.executor.memory决定了每个Executor可用内存的大小,而spark.storage.memoryFraction则决定了在这部分内存中有多少可以用于Memory Store管理RDD Cache数据,剩下的内存用来保证任务运行时各种其它内存空间的需要。 spark.executor.memory默认值为0.6,官方文档建议这个比值不要超过JVM Old Gen区域的比值。这也很容易理解,因为RDD Cache数据通常都是长期驻留 内存的,理论上也就是说最终会被转移到Old Gen区域(如果该RDD还没有被删除的话),如果这部分数据允许的尺寸太大,势必把Old Gen区域占满,造成频繁的FULL GC。 如何调整这个比值,取决于你的应用对数据的使用模式和数据的规模,粗略的来说,如果频繁发生Full GC,可以考虑降低这个比值,这样RDD Cache可用的内存空间减少(剩下的部分Cache数据就需要通过Disk Store写到磁盘上了),会带来一定的性能损失,但是腾出更多的内存空间用于执行任务,减少Full GC发生的次数,反而可能改善程序运行的整体性能 26.4 SHUFFLE相关属性 属性名 默认值 含义 spark.shuffle.manager sort 配置所使用的Shuffle Manager。 针对spark 1.2版本 —— release-notes shuffle manager默认设置改成sort based。 在shuffle数据比较大的时候,性能会有提升。 与Hadoop的Shuffle-sort相比:目前Spark的sort只是按照Partition key排序,Partition内部目前是不排序的,不过就算内部要排序,也是比较容易实现的。而Hadoop是按照每个Partition内的每个KV排序的。 spark.shuffle.consolidateFiles False 仅适用于HashShuffleMananger。 如果为true,在shuffle时就合并中间文件,对于有大量Reduce任务的shuffle来说,合并文件可以提高文件系统性能,如果使用的是ext4 或 xfs 文件系统,建议设置为true;对于ext3,由于文件系统的限制,设置为true反而会使内核>8的机器降低性能。 同样是为了解决生成过多文件的问题,采用的方式是在不同批次运行的Map任务之间重用Shuffle输出文 件,也就是说合并的是不同批次的Map任务的输出数据,但是每个Map任务所需要的文件还是取决于Reduce分区的数量,因此,它并不减少同时打开的输出文件的数量,因此对内存使用量的减少并没有帮助。只是HashShuffleManager里的一个折中的解决方案。 需要注意的是,这部分的代码实现尽管原理上说很简单,但是涉及到底层具体的文件系统的实现和限制等因素,例如在并发访问等方面,需要处理的细节很多,因此一直存在着这样那样的bug或者问题,导致在例如EXT3上使用时,特定情况下性能反而可能下降,因此从Spark 0.8的代码开始,一直到Spark 1.1的代码为止也还没有被标志为Stable,不是默认采用的方式。此外因为并不减少同时打开的输出文件的数量,因此对性能具体能带来多大的改善也取决于具体的文件数量的情况。所以即使你面临着Shuffle文件数量巨大的问题,这个配置参数是否使用,在什么版本中可以使用,也最好还是实际测试以后再决定。  spark.shuffle.spill True 如果为true,在shuffle期间通过溢出数据到磁盘来降低了内存使用总量,溢出阈值是由spark.shuffle.memoryFraction指定的。 shuffle的过程中,如果涉及到排序,聚合等操作,势必会需要在内存中维护一些数据结构,进而占用额外的内存。如果内存不够用怎么办,那只有两条路可以走,一就是out of memory 出错了,二就是将部分数据临时写到外部存储设备中去,最后再合并到最终的Shuffle输出文件中去。 spark.shuffle.memoryFraction / spark.shuffle.safetyFraction 在启用Spill的情况下,spark.shuffle.memoryFraction(1.1后默认为0.2)决定了当Shuffle过程中使用的内存达到总内存多少比例的时候开始Spill。 通过spark.shuffle.memoryFraction可以调整Spill的触发条件,即Shuffle占用内存的大小,进而调整Spill的频率和GC的行为。 总的来说,如果Spill太过频繁,可以适当增加spark.shuffle.memoryFraction的大小,增加用于Shuffle的内存,减少Spill的次数。当然这样一来为了 避免内存溢出,对应的可能需要减少RDD cache占用的内存,即减小spark.storage.memoryFraction的值,这样RDD cache的容量减少,有可能带来性能影响,因此需要综合考虑。 由于Shuffle数据的大小是估算出来的,一来为了降低开销,并不是每增加一个数据项都完整的估算一次,二来估算也会有误差,所以实际暂用的内存可能比估算值要大。 这里spark.shuffle.safetyFraction(默认为0.8)用来作为一个保险系数,降低实际Shuffle使用的内存阀值,增加一定的缓冲,降低实际内存占用超过用户配置值的概率。 spark.shuffle.spill.compress True 这两个参数涉及shuffl过程中是否压缩数据:取决于CPU/DISK/NETWORK的实际能力和负载,应该综合考虑。 针对Spill的中间数据。 是否压缩在shuffle期间溢出的数据,如果压缩将使用spark.io.compression.codec。  spark.shuffle.compress True 针对最终的shuffle输出文件。 是否压缩map输出文件,压缩将使用spark.io.compression.codec。 理论上说,spark.shuffle.compress设置为True通常都是合理的,因为如果使用千兆以下的网卡,网络带宽往往最容易成为瓶颈。此外,目前的Spark任务调度实现中,以Shuffle划分Stage,下一个Stage的任务是要等待上一个Stage的任务全部完成以后才能开始执行,所以shuffle数据的传输和CPU计算任务之间通常不会重叠,这样Shuffle数据传输量的大小和所需的时间就直接影响到了整个任务的完成速度。但是压缩也是要消耗大量的CPU资源的,所以打开压缩选项会增加Map任务的执行时间,因此如果在CPU负载的影响远大于磁盘和网络带宽的影响的场合下,也可能将spark.shuffle.compress 设置为False才是最佳的方案。 spark.shuffle.file.buffer.kb 100 每个shuffle的文件输出流内存缓冲区的大小,以KB为单位。这些缓冲区可以减少磁盘寻道的次数,也减少创建shuffle中间文件时的系统调用。 对于spark.shuffle.spill.compress而言,情况类似,但是spill数据不会被发送到网络中,仅仅是临时写入本地磁盘,而且在一个任务中同时需要执行压缩和解压缩两个步骤,所以对CPU负载的影响会更大一些,而磁盘带宽(如果标配12HDD的话)可能往往不会成为Spark应用的主要问题,所以这个参数相对而言,或许更有机会需要设置为False。 spark.reducer.maxMbInFlight 48 每个reduce任务同时获取map输出的最大大小 (以兆字节为单位)。由于每个map输出都需要一个缓冲区来接收它,这代表着每个 reduce 任务有固定的内存开销,所以要设置小点,除非有很大内存 spark.shuffle.sort.bypassMergeThreshold 200 这个参数仅适用于SortShuffleManager,SortShuffleManager在处理不需要排序的Shuffle操作时,由于排序会带来性能的下降。这个参数决定了在这种情况下,当Reduce分区的数量小于多少的时候,在SortShuffleManager内部不使用Merge Sort的方式处理数据,而是与Hash Shuffle类似,直接将分区文件写入单独的文件,不同的是,在最后一步还是会将这些文件合并成一个单独的文件。这样通过去除Sort步骤来加快处理速度,代价是需要并发打开多个文件,所以内存消耗量增加,本质上是相对HashShuffleMananger一个折衷方案。如果内存GC问题严重,可以降低这个值。   26.5 SPARKUI相关属性 属性名 默认值 含义 spark.ui.port 4040 应用程序webUI的端口 spark.ui.retainedStages 1000 在GC之前保留的stage数量  spark.ui.killEnabled True 允许在webUI将stage和相应的job杀死  spark.eventLog.enabled False 是否记录Spark事件,用于应用程序在完成后重构webUI spark.eventLog.compress False 是否压缩记录Spark事件,前提spark.eventLog.enabled为true spark.eventLog.dir file:///tmp/spark-events 如果spark.eventLog.enabled为 true,该属性为记录spark事件的根目录。在此根目录中,Spark为每个应用程序创建分目录,并将应用程序的事件记录到在此目录中。可以将此属性设置为HDFS目录,以便history server读取历史记录文件   26.6 压缩和序列化相关属性 属性名 默认值 含义 spark.broadcast.compress True 是否在发送之前压缩广播变量。 是否对Broadcast的数据进行压缩,默认值为True。 Broadcast机制是用来减少运行每个Task时,所需要发送给TASK的RDD所使用到的相关数据的尺寸,一个Executor只需要在第一个Task启动时,获得一份Broadcast数据,之后的Task都从本地的BlockManager中获取相关数据。在1.1最新版本的代码中,RDD本身也改为以Broadcast的形式发送给Executor(之前的实现RDD本身是随每个任务发送的),因此基本上不太需要显式的决定哪些数据需要broadcast了。 因为Broadcast的数据需要通过网络发送,而在 Executor端又需要存储在本地BlockMananger中,加上最新的实现,默认RDD通过Boradcast机制发送,因此大大增加了Broadcast变量的比重,所以通过压缩减小尺寸,来减少网络传输开销和内存占用,通常都是有利于提高整体性能的。 什么情况可能不压缩更好呢,大致上个人觉得同样还是在网络带宽和内存不是问题的时候,如果Driver端CPU资源很成问题(毕竟压缩的动作基本都在Driver端执行),那或许有调整的必要。 spark.serializer org.apache.spark .serializer.JavaSerializer 序列化方式,官方建议使用org.apache.spark.serializer.KryoSerializer,当然也可以任意是定义为org.apache.spark.Serializer子类的序化器。 序列化对于spark应用的性能来说,还是有很大影响的,在特定的数据格式的情况下,KryoSerializer的性能可以达到JavaSerializer的10倍以上,当然放到整个Spark程序中来考量,比重就没有那么大了,但是以Wordcount为例,通常也很容易达到30%以上的性能提升。而对于一些Int之类的基本类型数据,性能的提升就几乎可以忽略了。KryoSerializer依赖Twitter的Chill库来实现,相对于JavaSerializer,主要的问题在于不是所有的Java Serializable对象都能支持。 需要注意的是,这里可配的Serializer针对的对象是Shuffle数据,以及RDD Cache等场合,而Spark Task的序列化是通过spark.closure.serializer来配置,但是目前只支持JavaSerializer,所以等于没法配置啦。 更多Kryo序列化相关优化配置,可以参考 http://spark.apache.org/docs/latest/tuning.html#data-serialization 一节。 spark.kryo.registrator   如果要使用 Kryo序化器,需要创建一个继承KryoRegistrator的类并设置系统属性 spark.kryo.registrator指向该类 spark.rdd.compress False 是否压缩RDD分区。 这个参数决定了RDD Cache的过程中,RDD数据在序列化之后是否进一步进行压缩再储存到内存或磁盘上。当然是为了进一步减小Cache数据的尺寸,对于Cache在磁盘上而言,绝对大小大概没有太大关系,主要是考虑Disk的IO带宽。而对于Cache在内存中,那主要就是考虑尺寸的影响,是否能够Cache更多的数据,是否能减小Cache数据对GC造成的压力等。 这两者,前者通常不会是主要问题,尤其是在RDD Cache本身的目的就是追求速度,减少重算步骤,用IO换CPU的情况下。而后者,GC问题当然是需要考量的,数据量小,占用空间少,GC的问题大概会减轻,但是是否真的需要走到RDD Cache压缩这一步,或许用其它方式来解决可能更加有效。 所以这个值默认是关闭的,但是如果在磁盘IO的确成为问题或者GC问题真的没有其它更好的解决办法的时候,可以考虑启用RDD压缩。 spark.io.compression.codec org.apache.spark.io. LZFCompressionCodec 用于压缩内部数据如 RDD分区和shuffle输出的编码解码器, org.apache.spark.io.LZFCompressionCodec和org.apache.spark.io.SnappyCompressionCodec。其中,Snappy提供更快速的压缩和解压缩,而LZF提供了更好的压缩比。 RDD Cache和Shuffle数据压缩所采用的算法Codec,默认值曾经是使用LZF作为默认Codec,最近因为LZF的内存开销的问题,默认的Codec已经改为Snappy。 LZF和Snappy相比较,前者压缩率比较高(当然要看具体数据内容了,通常要高20%左右),但是除了内存问题以外,CPU代价也大一些(大概也差20%~50%?) 在用于Shuffle数据的场合下,内存方面,应该主要是在使用HashShuffleManager的时候有可能成为问题,因为如果Reduce分区数量巨大,需要同时打开 大量的压缩数据流用于写文件,进而在Codec方面需要大量的buffer。但是如果使用SortShuffleManager,由于shuffle文件数量大大减少,不会产生大量的压缩数据流,所以内存开销大概不会成为主要问题。 剩下的就是CPU和压缩率的权衡取舍,和前面一样,取决于CPU/网络/磁盘的能力和负载,个人认为CPU通常更容易成为瓶颈。所以要调整性能,要不不压缩,要不使用Snappy可能性大一些? 对于RDD Cache的场合来说,绝大多数场合都是内存操作或者本地IO,所以CPU负载的问题可能比IO的问题更加突出,这也是为什么 spark.rdd.compress 本身默认为不压缩,如果要压缩,大概也是Snappy合适一些?  spark.io.compression.snappy .block.size 32768 使用Snappy编码解码器时,编码解码器使用的块大小 (以字节为单位)  spark.closure.serializer org.apache.spark.serializer. JavaSerializer 用于闭包的序化器,目前只有支持Java序化器 spark.serializer. objectStreamReset 10000 org.apache.spark.serializer.JavaSerializer序列化时,会缓存对象以防止写入冗余数据,此时会停止这些对象的垃圾收集。通过调用重置序化器,刷新该信息就可以收集旧对象。若要关闭这重定期重置功能将其设置为< = 0 。默认情况下每10000个对象将重置序化器 spark.kryo.referenceTracking True 当使用Kryo序化数据时,是否跟踪对同一对象的引用。如果你的对象图有回路或者同一对象有多个副本,有必要设置为true;其他情况下可以禁用以提高性能  spark.kryoserializer.buffer.mb 2 在Kryo 里允许的最大对象大小(Kryo会创建一个缓冲区,至少和序化的最大单个对象一样大)。每个worker的每个core只有一个缓冲区   26.7 执行时相关属性 属性名 默认值 含义 spark.default.parallelism 本地模式:机器核数 Mesos:8 其他:max(executor的core,2) 如果用户不设置,系统使用集群中运行shuffle操作的默认任务数(groupByKey、 reduceByKey等) spark.broadcast.factory org.apache.spark.broadcast. HttpBroadcastFactory 广播的实现类 spark.broadcast.blockSize 4096 TorrentBroadcastFactory块大小(以kb为单位)。过大会降低广播速度;过小会使印象BlockManager性能 spark.files.overwrite Fale 通过 SparkContext.addFile() 添加的文件在目标中已经存在并且内容不匹配时,是否覆盖目标文件 spark.files.fetchTimeout False 在获取由driver通过SparkContext.addFile() 添加的文件时,是否使用通信时间超时  spark.tachyonStore.baseDir System.getProperty("java.io.tmpdir") 用于存储RDD的techyon目录,tachyon文件系统的URL由spark.tachyonStore.url设置,也可以是逗号分隔的多个techyon目录 spark.storage. memoryMapThreshold 8192 以字节为单位的块大小,用于磁盘读取一个块大小时进行内存映射。这可以防止Spark在内存映射时使用很小块,一般情况下,对块进行内存映射的开销接近或低于操作系统的页大小 spark.tachyonStore.url tachyon://localhost:19998 基于techyon文件的URL spark.cleaner.ttl   spark记录任何元数据(stages生成、task生成等)的持续时间。定期清理可以确保将超期的元数据丢弃,这在运行长时间任务是很有用的,如运行7*24的sparkstreaming任务。RDD持久化在内存中的超期数据也会被清理   26.8 网络相关属性 属性名 默认值 含义 spark.driver.host   运行driver的主机名或 IP 地址 spark.driver.port 随机 driver侦听的端口 spark.akka.frameSize 10 以MB为单位的driver和executor之间通信信息的大小,设置值越大,driver可以接受更大的计算结果 spark.akka.threads 4 用于通信的actor线程数,在大型集群中拥有更多CPU内核的driver可以增加actor线程数 spark.akka.timeout 100 以秒为单位的Spark节点之间超时时间 spark.akka.heartbeat.pauses 600 下面3个参数是用于设置Akka自带的故障探测器。启用的话,以秒为单位设置如下这三个参数,有助于对恶意的executor的定位,而对于由于GC暂停或网络滞后引起的情况下,不需要开启故障探测器;另外故障探测器的开启会导致由于心跳信息的频繁交换而引起的网络泛滥。 本参数是设置可接受的心跳停顿时间 spark.akka.failure-detector.threshold 300.0 对应Akka的akka.remote.transport-failure-detector.threshold spark.akka.heartbeat.interval  1000 心跳间隔时间   26.9 调度相关属性 属性名 默认值 含义 spark.task.cpus 1 为每个任务分配的内核数。 实际上,这个参数并不能真的控制每个任务实际运行时所使用的CPU数量,比如你可以通过在任务内部创建额外的工作线程来使用更多的CPU(至少目前为止,将来任务的执行环境是否能通过LXC等技术来控制还不好说)。 它所发挥的作用,只是在作业调度时,每分配出一个任务时,对已使用的CPU资源进行计数。也就是说只是理论上用来统计资源的使用情况,便于安排调度。 spark.task.maxFailures 4 Task的最大重试次数 spark.scheduler.mode FIFO Spark的任务调度模式,还有一种Fair模式。 这个参数决定了单个Spark应用内部调度的时候使用FIFO模式还是Fair模式。只管理一个Spark应用内部的多个没有依赖关系的Job作业的调度策略。 多个Spark应用之间的调度策略: 3. Standalone模式:FIFO模式。 4. Yarn模式:由Yarn自己的策略配置文件决定。 5. Spark on Server:以服务的形式,为多个用户提交作业,可以通过配置Fair模式相关参数来调整不同用户间作业的调度和资源分配优先级。 spark.cores.max   当应用程序运行在Standalone集群或者粗粒度共享模式Mesos集群时,应用程序向集群请求的最大CPU内核总数(不是指每台机器,而是整个集群)。如果不设置,对于Standalone集群将使用spark.deploy.defaultCores 中数值,而Mesos将使用集群中可用的内核。 这个参数对Yarn模式不起作用,在YARN模式下,资源由Yarn统一调度管理,一个应用启动时所申请的CPU资源的数量由另外两个直接配置Executor的数量和每个Executor中core数量的参数决定。 参考:spark-submit.sh的帮助中命令选项与spark-env.sh文件。 spark.mesos.coarse  False 如果设置为true,在Mesos集群中运行时使用粗粒度共享模式  spark.speculation False 以下几个参数是关于Spark推测执行机制的相关参数。此参数设定是否使用推测执行机制,如果设置为true则spark使用推测执行机制,对于Stage中拖后腿的Task在其他节点中重新启动,并将最先完成的Task的计算结果最为最终结果。 推测执行机制是在任务调度的时候,如果没有适合当前本地性要求的任务可供运行,将跑得慢的任务在空闲计算资源上再度调度的行为,这些参数调整这些行为的频率和判断指标。 通常来说很难正确的判断是否需要Speculation,能真正发挥Speculation用处的场合,往往是某些节点由于运行环境原因,比如CPU资源由于某种原因被占用,磁盘损坏导致IO缓慢造成任务执行速度异常的情况,当然前提是你的分区任务不存在仅能被执行一次,或者不能同时执行多个拷贝等情况。Speculation任务参照的指标通常是其它任务的执行时间,而实际的任务可能由于分区数据尺寸不均匀,本来就会有时间差异,加上一定的调度和IO的随机性,所以如果一致性指标定得过严,Speculation可能并不能真的发现问题,反而增加了不必要的任务开销,定得过宽,大概又基本相当于没用。 注意:推测机制 与 一些共享变量 —— 参考共享变量的转换操作的说明(1.2.0版本新增的操作) spark.speculation.interval  100 Spark多长时间进行检查task运行状态用以推测,以毫秒为单位  spark.speculation.quantile 0.75 推测启动前,Stage必须要完成总Task的百分比 spark.speculation.multiplier 1.5 比已完成Task的运行速度中位数慢多少倍才启用推测  spark.locality.wait 3000 这几个参数决定任务分配时Spark的数据本地性策略的。 本参数是以毫秒为单位启动本地数据task的等待时间,如果超出就启动下一本地优先级别的task。该设置同样可以应用到各优先级别的本地性之间(本地进程 -> 本地节点 -> 本地机架 -> 任意节点 ),当然,也可以通过下面几个具体的参数设置不同优先级别的本地性等待时间。 Spark中任务的处理需要考虑所涉及的数据的本地性的场合,基本就两种,一是数据的来源是HadoopRDD; 二是RDD的数据来源来自于RDD Cache(即由CacheManager从BlockManager中读取,或者Streaming数据源RDD)。其它情况下,如果不涉及shuffle操作的RDD,不构成划分Stage和Task的基准,不存在判断Locality本地性的问题,而如果是ShuffleRDD,其本地性始终为No Prefer,因此其实也无所谓Locality。   在理想的情况下,任务当然是分配在可以从本地读取数据的节点上时(同一个JVM内部或同一台物理机器内部)的运行时性能最佳。但是每个任务的执行速度无法准确估计,所以很难在事先获得全局最优的执行策略,当Spark应用得到一个计算资源的时候,如果没有可以满足最佳本地性需求的任务可以运行时,是退而求其次,运行一个本地性条件稍差一点的任务呢,还是继续等待下一个可用的计算资源以期望它能更好地匹配任务的本地性呢? 这几个参数一起决定了Spark任务调度在得到分配任务时,选择暂时不分配任务,而是等待获得满足进程内部/节点内部/机架内部这样的不同层次的本地性资源的最长等待时间。默认都是3000毫秒。   基本上,如果你的任务数量较大和单个任务运行时间比较长的情况下,单个任务是否在数据本地运行,代价区别可能比较显著,如果数据本地性不理想,那么调大这些参数对于性能优化可能会有一定的好处。反之如果等待的代价超过带来的收益,那就不要考虑了。   特别值得注意的是:在处理应用刚启动后提交的第一批任务时,由于当作业调度模块开始工作时,处理任务的Executors可能还没有完全注册完毕,因此一部分的任务会被放置到No Prefer的队列中,这部分任务的优先级仅次于数据本地性满足Process级别的任务,从而被优先分配到非本地节点执行,如果的确没有Executors在对应的节点上运行,或者的确是No Prefer的任务(如shuffleRDD),这样做确实是比较优化的选择,但是这里的实际情况只是这部分Executors还没来得及注册上而已。这种情况下,即使加大本节中这几个参数的数值也没有帮助。针对这个情况,有一些已经完成的和正在进行中的PR通过例如动态调整No Prefer队列,监控节点注册比例等等方式试图来给出更加智能的解决方案。不过,你也可以根据自身集群的启动情况,通过在创建SparkContext之后,主动Sleep几秒的方式来简单的解决这个问题。  spark.locality.wait.process spark.locality.wait 本地进程级别的本地等待时间 spark.locality.wait.node spark.locality.wait 本地节点级别的本地等待时间 spark.locality.wait.rack spark.locality.wait 本地机架级别的本地等待时间 spark.scheduler.revive.interval 1000 复活重新获取资源的Task的最长时间间隔(毫秒),发生在Task因为本地资源不足而将资源分配给其他Task运行后进入等待时间,如果这个等待时间内重新获取足够的资源就继续计算   26.10 安全相关属性 属性名 默认值 含义 spark.authenticate False 是否启用内部身份验证 spark.authenticate.secret   设置组件之间进行身份验证的密钥。如果不是YARN上运行并且spark.authenticate为true时,需要设置密钥 spark.core.connection. auth.wait.timeout 30 进行身份认证的超时时间 spark.ui.filters   Spark web UI 要使用的以逗号分隔的筛选器名称列表。筛选器要符合javax servlet Filter标准,每个筛选器的参数可以通过设置java系统属性来指定: spark..params='param1=value1,param2=value2' 例如: -Dspark.ui.filters=com.test.filter1 -Dspark.com.test.filter1.params='param1=foo,param2=testing'  spark.ui.acls.enable False Spark webUI存取权限是否启用。如果启用,在用户浏览web界面的时候会检查用户是否有访问权限 spark.ui.view.acls   以逗号分隔Spark webUI访问用户的列表。默认情况下只有启动Spark job的用户才有访问权限   26.11 SPARKSTREAMING相关属性 属性名 默认值 含义 spark.streaming.blockInterval 200 Spark Streaming接收器将接收数据合并成数据块并存储在Spark里的时间间隔,毫秒。 这个参数用来设置Spark Streaming里 Stream Receiver生成Block的时间间隔,默认为200ms。具体的行为表现是具体的Receiver所接收的数据,每隔这里设定的时间间隔,就从Buffer中生成一个StreamBlock放进队列,等待进一步被存储到BlockManager中供后续计算过程使用。理论上来说,为了每个Streaming Batch 间隔里的数据是均匀的,这个时间间隔当然应该能被Batch的间隔时间长度所整除。总体来说,如果内存大小够用,Streaming的数据来得及处理,这个blockInterval时间间隔的影响不大,当然,如果数据Cache Level是Memory+Ser,即做了序列化处理,那么BlockInterval的大小会影响序列化后数据块的大小,对于Java 的GC的行为会有一些影响。 此外spark.streaming.blockQueueSize决定了在StreamBlock被存储到BlockMananger之前,队列中最多可以容纳多少个StreamBlock。默认为10,因为这个队列Poll的时间间隔是100ms,所以如果CPU不是特别繁忙的话,基本上应该没有问题。 spark.streaming.unpersist True 如果设置为true,强迫将SparkStreaming持久化的RDD数据从Spark内存中清理,同样的,SparkStreaming接收的原始输入数据也会自动被清理;如果设置为false,则允许原始输入数据和持久化的RDD数据可被外部的Streaming应用程序访问,因为这些数据不会自动清理 26.12 STANDALONE模式特有属性 可以在文件conf/spark-env.sh中来设置此模式的特有相关属性: 1. SPARK_MASTER_OPTS:配置master使用的属性 2. SPARK_WORKER_OPTS:配置worker使用的属性 3. SPARK_DAEMON_JAVA_OPTS:配置master和work都使用的属性 配置的时候,使用类似的语句: export SPARK_MASTER_OPTS="-Dx1=y1 -Dx2=y2" 其中x代表属性,y代表属性值。 SPARK_MASTER_OPTS所支持的属性有: 属性名 默认值 含义 spark.deploy.spreadOut True Standalone集群管理器是否自由选择节点还是固定到尽可能少的节点,前者会有更好的数据本地性,后者对于计算密集型工作负载更有效 spark.worker.timeout 60 master因为没有收到心跳信息而认为worker丢失的时间(秒) spark.deploy.defaultCores   如果没有设置spark.cores.max,该参数设置Standalone集群分配给应用程序的最大内核数,如果不设置,应用程序获取所有的有效内核(默认为Int.Max,也就是不限制的意思)。注意在一个共享的集群中,设置一个低值防止攫取了所有的内核,影响他人的使用。   SPARK_WORKER_OPTS所支持的属性有 属性名 默认值 含义 spark.worker.cleanup.enabled False 是否定期清理worker的应用程序工作目录,只适用于Standalone模式,清理的时候将无视应用程序是否在运行  spark.worker.cleanup.interval 1800 清理worker本地过期的应用程序工作目录的时间间隔(秒) spark.worker.cleanup.appDataTtl  7*24*3600 worker保留应用程序工作目录的有效时间。该时间由磁盘空间、应用程序日志、应用程序的jar包以及应用程序的提交频率来设定   SPARK_DAEMON_JAVA_OPTS所支持的属性有: 属性名 含义 spark.deploy.recoveryMode 下面3个参数是用于配置zookeeper模式的master HA。设置为ZOOKEEPER表示启用master备用恢复模式,默认为NONE spark.deploy.zookeeper.url zookeeper集群URL  spark.deploy.zookeeper.dir zooKeeper保存恢复状态的目录,缺省为/spark spark.deploy.recoveryMode 设成FILESYSTEM启用master单节点恢复模式,缺省值为NONE spark.deploy.recoveryDirectory Spark保存恢复状态的目录   26.13 SPARK ON YARN特有属性 属性名 默认值 含义 spark.yarn.applicationMaster.waitTries 10 RM等待Spark AppMaster启动重试次数,也就是SparkContext初始化次数。超过这个数值,启动失败 spark.yarn.submit.file.replication 3 应用程序上传到HDFS的文件的副本数 spark.yarn.preserve.staging.files False 若为true,在job结束后,将stage相关的文件保留而不是删除 spark.yarn.scheduler.heartbeat.interval-ms 5000 Spark AppMaster发送心跳信息给YARN RM的时间间隔 spark.yarn.max.executor.failures 2倍于executor数 导致应用程序宣告失败的最大executor失败次数 spark.yarn.historyServer.address   Spark history server的地址(不要加http://)。这个地址会在Spark应用程序完成后提交给YARN RM,然后RM将信息从RM UI写到 history server UI上。   26.14 配置示例 26.14.1 SPARK-ENV.SH文件 export JAVA_HOME="/export/servers/jdk1.6.0_25"     #yarn    export HADOOP_HOME=/home/hadoop/hadoop-2.0.0-cdh4.5.0    export HADOOP_CONF_DIR=$HADOOP_HOME/etc/hadoop    SPARK_EXECUTOR_INSTANCES=2    SPARK_EXECUTOR_CORES=1    SPARK_EXECUTOR_MEMORY=400M    SPARK_DRIVER_MEMORY=400M    SPARK_YARN_APP_NAME="Spark 1.0.0"      #alone    SPARK_MASTER_WEBUI_PORT=8090    SPARK_WORKER_MEMORY=400M    SPARK_WORKER_CORES=1    SPARK_WORKER_INSTANCES=2    #Master HA    export SPARK_DAEMON_JAVA_OPTS="-Dspark.deploy.recoveryMode=ZOOKEEPER -Dspark.deploy.zookeeper.url=192.168.1.117:2181,192.168.1.118:2181,192.168.1.119:2181 -Dspark.deploy.zookeeper.dir=/spark"  26.14.2 SPARK-DEFAULTS.CONF文件 #history server    spark.eventLog.enabled  true    spark.eventLog.dir      hdfs://namespace/user/hadoop/sparklogs    spark.yarn.historyServer.address    spark113:18080    #shuffle    spark.shuffle.consolidateFiles true    #task    spark.task.cpus 1    spark.task.maxFailures 3    #scheduler type    spark.scheduler.mode FAIR    #security    park.authenticate true    spark.authenticate.secret hadoop    spark.core.connection.auth.wait.timeout 1500    spark.ui.acls.enable true    spark.ui.view.acls root,hadoop    #each executor used max memory    spark.executor.memory 400m    #spark on yarn    spark.yarn.applicationMaster.waitTries 5    spark.yarn.submit.file.replication 3    spark.yarn.preserve.staging.files false    spark.yarn.scheduler.heartbeat.interval-ms 5000    #park standalone and on mesos    spark.cores.max 4 27 目录及其相关信息分析 27.1 配置(环境变量)相关目录 spark.eventLog.dir file:///tmp/spark-events 保存日志相关信息的路径,可以是hdfs://开头的HDFS路径,也可以是file://开头的本地路径,都需要提前创建 spark.local.dir /tmp 用于保存map输出文件或者转储RDD(如RDD Cache,Shuffle,Spill等数据的位置)。 在Spark 1.0 及更高版本此属性会被环境变量 SPARK_LOCAL_DIRS (Standalone、Mesos) 或 LOCAL_DIRS (YARN) 代替。  spark.deploy.zookeeper.dir /spark zooKeeper保存恢复状态的目录 SPARK_WORK_DIR $SPARK_HOME/work 指定work目录,用以存放从HttpFileServer下载下来的第三方库依赖及Executor运行时生成的日志信息。 说明:临时目录由系统属性:"java.io.tmpdir" 指定,默认为/tmp。如果win下,对应C:\cygwin\tmp目录 。 27.1.1 SPARK_WORK_DIR 源码: package org.apache.spark class SparkEnv // Set the sparkFiles directory, used when downloading dependencies. In local mode, // this is a temporary directory; in distributed mode, this is the executor's current working // directory. val sparkFilesDir: String = if (isDriver) { Utils.createTempDir().getAbsolutePath } else { "." } 27.2 依赖文件、JAR包目录 27.2.1 HTTPFILESERVER目录 依赖的文件和jar包放置位置,默认为/tmp。 对应源码:HttpFileServer 依赖文件放到子目录/tmp/files 依赖jar包放到子目录/tmp/jars 27.2.2 运行时下载的依赖文件和JAR包存放目录 参考:SparkEnv类sparkFilesDir方法。 根据不同模式有: 1. Local mode:/tmp; 2. 其他:在当前工作目录下,对应SPARK_WORK_DIR目录。 27.3 运行时相关目录 守护进程(Spark组件:Master,Worker,HistoryServer?等)在启动后会在PID目录下生成保存该守护进程PID信息的文件。 查看脚本spark-daemon.sh: PID默认存放路径为/tmp。 27.4 目录性能分析 可以 针对Executor运行时使用的目录,即spark.local.dir(SPARK_LOCAL_DIRS或LOCAL_DIRS),用内存或SSD等存储介质进行优化。 各种Block管理的文件,文件名字可以查看源码: package org.apache.spark.storage …… sealed abstract class BlockId 27.5 目录FQA分析 对默认存放在/tmp目录的配置,需要特别注意,/tmp目录文件在系统重启或一定时间后会被删除,从而导致一些问题。如PID目录,删除后找不到PID文件,导致停止集群时,无法找到守护进程的pid而操作失败 —— 解决方法参考FQA章节。 28 调优   29 作业调度 30 安全 31 硬件配置 32 调试 32.1 SPARK-SHELL 程序调试 交互式本身可以认为是一种调试方式; 或根据Sparkshell脚本,调试对应的入口Object。 32.2 IDEA 程序调试 程序调试分本地调试和远程调试两种: 1. 本地调试时,直接在IDEA里调试对应的类即可(根据启动该类的脚本进行模拟,模拟内容包括启动脚本中的环境变量设置等); 2. 远程调试时,需要设置对应的Java启动选项OPTS(即JVM调试参数的设置),打开调试对象(要启动的类)对应的远程调试端口,然后在IDEA中,配置远程调试的设置,在设置中,指定远程对应的host和port,进行监听,设置完成后,启动该调试对象(根据调试对象启动脚本中对应的类,并尽可能模拟启动脚本,包括脚本中的环境变量设置等)的调试即可。 如何打开远程调试,即OPTS如何设置,参考远程调试章节。 说明:本文档里只写了一种方式,就是脚本启动时使用到的OPTS环境变量,实际上,可以直接在脚本的java执行代码前面,加上调试参数,这样的话,你启动时,不需要设置OPTS,而只需要在某个节点上修改启动脚本,在start-all.sh的时候,就不会影响全部节点,而只会影响修改了脚本的节点。如修改了Worker的启动脚本时,只有该节点上的Worker是打开了远程调试的。 32.2.1 调试环境搭建 IDEA调试环境需构建在Spark 安装包部署 + Spark源码的基础上。 即,在调试机器上进行带Spark源码的Spark 安装包部署。可以同时作为Spark客户端和Spark 源码环境。 32.2.2 远程调试 1. JVM调试参数配置 远程调试时,结合JDWP(Java Debug Wire Protocol)进行,在JVM进程启动前,添加相应的JAVA_OPTS,设置远程调试参数,比如: -Xdebug -server -Xrunjdwp:transport=dt_socket,address=7777,server=y,suspend=y 启动JVM进程(通常就是执行对应的启动脚本)后会停留在 Listening for transport dt_socket at address: 7777 ,IDE连接后才会继续。 2. 对于Spark 的守护进程的调试参数配置 Ø 可以在SPARK_DAEMON_JAVA_OPTS环境变量中统一添加远程调试参数,此时要确保要调试的守护进程不在同一台机器上。 Ø 可以在具体的守护进程特定的XXX_OPTS环境变量中设置。此时,如果调试的守护进程在同一台机器上,可以通过设置不同的调试端口进行区分。 其中环境变量参考spark-env.sh文件,有以下几种: Ø SPARK_MASTER_OPTS Ø SPARK_WORKER_OPTS Ø SPARK_HISTORY_OPTS 3. IDE调试参数配置 在IDE中,针对进程对应的类,设置调试参数。 Ø Eclipse:Debug Configuration...>Remote Java Application>New下创建一个远程调试的项目并设置相关参数;对应界面如下: Host为远程服务器的ip, Port为在启动脚本中设置的address的值。 如果选中“Allow termination of remote VM”,那么在本地结束调试,就会结束远程运行的java程序。 Ø IDEA: 32.2.3 DRIVER PROGRAM调试 注意:提交应用时,需要使用提交脚本SparkSubmit.sh,来封装不同集群管理+部署模式的应用程序提交,因此,在调试时,应该以该脚本对应的SparkSubmit Object作为入口点,进行应用程序的提交。 提交应用程序运行、调试入口点为SparkSubmit Object,模拟提交脚本SparkSubmit,提交应用程序的示例: 1. 设置SparkSubmit类的运行、调试配置,如Windows下: --class org.apache.spark.examples.SparkPi --master local  E: \\bd\\spark-1.1.0-hadoop-2.2.0.bin\\lib\\spark-examples-1.1.0-hadoop2.2.0.jar 10 2. 运行、调试 SparkSubmit Object 待补充:如何模拟SparkSubmit提交脚本中的执行代码,如环境变量设置等。 根据部署模式(deploy mode)进行调试 32.2.3.1 CLIENT部署模式的调试 当使用Client部署模式进行提交时,Driver在提交点运行,因此不需要使用远程调试的方式。即从入口点出发,使用普通的本地调试方式即可。 32.2.3.2 CLUSTER部署模式的远程调试 Cluster与Client部署模式的主要差异在于: 1. Cluster 部署模式下,Driver 的运行点是在集群的Worker 上,对应DriverWrapper。 2. Cluster部署模式下,Master会对应增加对 Driver的调度。 如果调试的目的与这些差异无关,那么,直接在Client部署模式上提交就可以了 —— 简单,直至不能再简单! Driver驱动程序对应的JAVA_OPTS设置: 1. 命令选项:--driver-java-options 方法1步骤: 1. 使用提交脚本,部署应用程序时,设置DriverWrapper对应的JAVA_OPTS,打开远程调试端口; 2. 启动入口点SparkSubmit Object应用程序的调试。 难点:需要动态获取driver所在的host,可能需要从日志中获取。 方法2步骤 — 待确认: 1. 指定driver所在worker的host ip; 2. 然后启动入口点SparkSubmit Object应用程序的远程调试; 由于事先已经指定host,因此不需要动态获取。 32.2.4 MASTER组件远程调试 步骤: 1. 配置调试参数: Ø 静态方法:直接在master启动脚本中添加JAVA_OPTS,打开远程调试端口; Ø 动态方法:在SPARK_MASTER_OPTS中添加; 2. 在IDE中启动Master类的远程调试; 32.2.5 WORKER组件远程调试 步骤: 1. 配置调试参数:为简化调试,可以只针对某个节点上的Worker 守护进程。 Ø 静态方法:直接在Worker启动脚本上添加JAVA_OPTS,打开远程调试端口; Ø 动态方法:在SPARK_WORKER_OPTS中添加; 2. 在IDE中启动Worker类的远程调试; 32.2.6 HISTORY SERVER进程的远程调试 步骤: 1. 配置调试参数: Ø 静态方法:直接在启动脚本中添加JAVA_OPTS,打开远程调试端口; Ø 动态方法:在SPARK_HISTORY_OPTS中添加; 2. 在IDE中启动HistoryServer类的远程调试; 32.2.7 EXECUTOR进程的远程调试 待确认: Executor进程是在Worker中,由ExecutorRunner对象启动,需要确认下是否支持设置JAVA_OPTS,设置调试参数,打开远程调试端口。 进一步:如果木有直接提供可配置参数,可以试试在代码中添加,然后重新编译执行。 相关配置变量 —— 未试: --conf "spark.executor.extraJavaOptions=-XX:+PrintGCDetails -XX:+PrintGCTimeStamps" 在spark.executor.extraJavaOptions中,设置调试参数。 简化: Executor进程中的任务执行,采用线程池模型执行task,因此使用单task可以简化调试 —— 即,通过分区数控制对应的task数。 32.3 调试示例 32.3.1 WIN下IDEA中提交任务 用户环境Windows 7 准备工作: 1. Ssh设置:,将集群的ssh公钥信息拷贝到当前IDEA执行的用户目录下.ssh中的相应文件中,本机为github_rsa.pub。 2. 将本机github_rsa.pub 原内容拷贝到集群中各个节点的相应用户的.ssh目录下的authorized_keys文件中。 如果没有以上ssh设置的准备工作,在idea中运行时,连接akka的actor会报错。 调试测试环境: 1. IDEA 14.0.1 2. 用户应用程序Project – TestSpark 3. 依赖:将spark-1.1.0(含编译结果dist目录)的\\dist_first\\lib\\内所有jar包导入,设置为工程的依赖包,同时将spark-1.1.0设置为每个jar包的源代码。 4. Ctrl+N,搜寻jar包对应源码的SparkSubmit对象,编译调试设置,如下图所示: 5. 在SparkSubmit或应用程序类的源码中设置断点; 6. 基于SparkSubmit对象,提交任务 为SparkSubmit提供的参数为:--class org.apache.spark.examples.SparkPi --master spark://192.168.70.214:7077 E:\\bd\\spark-1.1.0\\dist_first\\lib\\spark-examples-1.1.0-hadoop2.2.0.jar 10 32.3.2 远程SPARKSUBMIT方式提交调试 实际环境部署:spark-1.1.0-hadoop-2.2.0.bin 调试测试环境: 1. IDEA 14.0.1 2. SparkSubmit运行环境:spark-1.2.0 3. 基于SparkSubmit对象,提交任务 为SparkSubmit提供的参数为:--class org.apache.spark.examples.SparkPi --master spark://192.168.70.214:7077 E:\\bd\\spark-1.1.0\\dist_first\\lib\\spark-examples-1.1.0-hadoop2.2.0.jar 10 问题分析:由于版本不同导致BlockManagerId反序列化 serialVersionUID的错误,因此在运行时会报错。 解决方法,运行时 错误信息:ERROR Remoting: org.apache.spark.storage.BlockManagerId; local class incompatible: stream classdesc serialVersionUID = -7366074099953117729, local class serialVersionUID = 2439208141545036836 java.io.InvalidClassException: org.apache.spark.storage.BlockManagerId; local class incompatible: stream classdesc serialVersionUID = -7366074099953117729, local class serialVersionUID = 2439208141545036836 32.3.3 详细调试案例 http://www.cnblogs.com/yuananyun/p/4265706.html 33 SPARK性能优化 33.1 10大问题及其解决方案 问题1:reduce task数目不合适 解决方式: 需根据实际情况调节默认配置,调整方式是修改参数spark.default.parallelism。通常,reduce数目设置为core数目的2到3倍。数量太大,造成很多小任务,增加启动任务的开销;数目太少,任务运行缓慢。 问题2:shuffle磁盘IO时间长 解决方式: 设置spark.local.dir为多个磁盘,并设置磁盘为IO速度快的磁盘,通过增加IO来优化shuffle性能; 问题3:map|reduce数量大,造成shuffle小文件数目多 解决方式: 默认情况下shuffle文件数目为map tasks * reduce tasks 通过设置spark.shuffle.consolidateFiles为true,来合并shuffle中间文件,此时文件数为reduce tasks数目; 问题4:序列化时间长、结果大 解决方式: Spark默认使.用JDK.自带的ObjectOutputStream,这种方式产生的结果大、CPU处理时间长,可以通过设置spark.serializer为org.apache.spark.serializer.KryoSerializer。 另外如果结果已经很大,可以使用广播变量; 问题5:单条记录消耗大 解决方式: 使用mapPartition替换map,mapPartition是对每个Partition进行计算,而map是对partition中的每条记录进行计算; 问题6 : collect输出大量结果时速度慢 解决方式: collect源码中是把所有的结果以一个Array的方式放在内存中,可以直接输出到分布式?文件系统,然后查看文件系统中的内容; 问题7: 任务执行速度倾斜 解决方式: 如果是数据倾斜,一般是partition key取的不好,可以考虑其它的并行处理方式 ,并在中间加上aggregation操作; 如果是Worker倾斜,例如在某些worker上的executor执行缓慢,可以通过设置spark.speculation=true 把那些持续慢的节点去掉; 问题9: 通过多步骤的RDD操作后有很多空任务或者小任务产生 解决方式: 使用coalesce或repartition去减少RDD中partition数量; 问题10:Spark Streaming吞吐量不高 解决方式: 可以设置spark.streaming.concurrentJobs 33.2 SHUFFLE优化 1. 对两个RDD进行Shuffle操作时,可以先对这两个RDD采用相同的分片方式,保证这两个RDD的Shuffle操作不需要通信(因为相同方式分区后的数据会 在相同的机器上)。 相同的分片方式可以采用定制的Partitioner类来实现。 对两个RDD的这种优化可以通过在定义RDD时调用partitionBy来实现: RDD.partitionBy(myPartFunc) 这种迭代间的一致性划分策略是一些特定框架的主要优化方法。 具体参考《大型集群上的快速和通用数据处理架构(修正版).pdf》:P38。 33.3 CHECKPOINT优化 虽然lineage可用于错误后RDD的恢复,但对于很长的lineage的RDD来说,这样的恢复耗时较长。由此,将某些RDD进行检查点操作(Checkpoint)保存到稳定存储上,是有帮助的。 通常情况下,对于包含宽依赖的长血统的RDD设置检查点操作是非常有用的,比如PageRank例子 (§2.3.2)中的排名数据集; 相反,对于那些窄依赖于稳定存储上数据的RDD来说,对其进行检查点操作就不是有必要的。这样的RDD如logistic回归的例子(§2.3.2)和PageRank中的链接列表。 34 技巧、FAQS 34.1 提交应用失败 1. TaskSchedulerImpl 调度失败 报错信息: TaskSchedulerImpl: Initial job has not accepted any resources; check your cluster UI to ensure that workers are registered and have sufficient memory 原因分析:1. worker注册失败,导致资源不够用; 2. 现有资源(core和memory)不满足提交时的资源申请要求; 根本解决方案: Ø 查看worker节点日志:找出注册失败原因。 Ø 熟悉Master对 Application的调度机制,一方面core,另一方面memory(必须满足可用的条件)。 2. Master URL的设置 在conf里设置master url时,必须和界面上显示的一样,都使用主机名或都使用ip。对应的值在spark-default.conf中的。。。。 34.2 SCALA版本不一致导致的资源获取失败 提供者:Spark-暴走蜗牛、北京-Spark-Ziv TaskSchedulerImpl报错找不到可执行资源 解决办法:更换scala版本,在执行环境把scala从2.11.6降到2.11.5,问题就不存在了。 环境信息: 版本:spark1.2.0 with hadoop 2.5.0 界面:worker显示正常 提交:默认资源提交,提交资源信息可以参看前面截图中的分配日志。 34.2.1 集群关闭失败 错误提示信息: 原因分析: 可能是因为找不到Worker与Master节点对应的pid文件,导致这种错误。 解决方法: 查看pid文件是不是放在tmp目录下,由于linux系统会定时(或重启时)删除tmp目录,所以导致pid文件缺失。 查看脚本spark-daemon.sh: PID默认存放路径为/tmp 34.3 页面相关 1. 在界面跳转时,找不到页面的话,查看下本地的hosts中是否有该主机名的映射 34.4 应用程序管理 34.4.1 如何终止应用程序 根据提交方式: 1. Client 部署模式提交时 Ø 通用方法 在提交点直接使用kill命令杀掉名为SparkSubmit进程 ps -aux |grep SparkSubmit kill -9 $pid // pid 为ps后的进程ID值 2. Cluster部署模式提交时: Ø Standalone 集群 在UI的Stage界面? 点击kill(需设置相应配置属性),杀掉应用程序; 在UI界面获取Driver所在Worker节点,通过kill命令,杀掉名为DriverWrapper的进程,命令参考前面; 使用…deploy.Client类支持的--kill参数,利用spark的脚本 ./spark-class,通过akka消息发送,杀掉driver。 Ø Yarn集群 使用Yarn自带的job管理命令,杀掉进程:   Hadoop job –list : 查询job列表 hadoop job -kill  jobid:kill指定id的job 34.5 EXITED CODE 34.5.1 53 提问:北京-spark-小唐spark-shell进入后一直出现这个日志,这是咋回事,哪位大牛解释下。 解答:南京-东 2014/12/18 15:25:50  这是源码上的提示,创建一个本地的临时目录失败,你在本地节点上查看下,一个是权限问题,一个是目录结构问题。跟其他节点比较下 原因:北京-spark-小唐 —— 磁盘空间不足 34.6 群问题记录 34.6.1 SPARK亚太研究院3群 - 安留军 1. 问题 mkdir: Permission denied: user=spark, access=WRITE, inode="/":hdfs:supergroup:drwxr-xr-x 14/12/28 17:46:13 WARN fs.FileSystem: "192.168.192.114" is a deprecated filesystem name. Use "hdfs://192.168.192.114/" instead. 14/12/28 17:46:14 WARN fs.FileSystem: "192.168.192.114" is a deprecated filesystem name. Use "hdfs://192.168.192.114/" instead. put: `/user/spark/share/lib/spark-assembly.jar': No such file or directory 增加权限(不能修复) 看到回复下我。 这一步解决不了。还是报这个错误 sudo -u hdfs hadoop fs -mkdir -p /user sudo -u hdfs hadoop fs -chown spark:spark /user 34.7 SPARK STREAMING 34.7.1 ALLOWMULTIPLECONTEXTS 属性配置 1. 错误提示 2. 运行描述: 这是运行spark streaming之后的情况,怎么设置set spark.driver.allowMultipleContexts = true 3. 版本: spark-1.2.0-bin-hadoop1 34.8 空间不足的问题 错误提示:No Space Left on the device Spark计算时会将中间结果存储到/tmp目录,目前linux又都支持tmpfs,就是将/tmp目录挂载到内存中。 当中间结果过多导致/tmp目录写满时,会出现上述问题; 解决方法: 1. 修改默认的/tmp目录 —— 对应local的dir配置,参考spark.local.dir。 2. 修改挂载内存方式:针对tmp目录不启用tmpfs—— 对应修改/etc/fstab 34.9 文件句柄、进程数限制问题 错误提示:java.lang.OutOfMemory, unable to create new native thread 有一种情况并非真的是内存不足引起的,而是由于超出了允许的最大文件句柄数或最大进程数。排查的步骤就是查看一下允许打开的文件句柄数和最大进程数,如果数值过低,使用ulimit将其调高之后,再试试问题是否已经解决。 查询最大文件句柄、进程数:ulimit -a 修改允许打开的最大进程数 :ulimit -u 65535 修改允许打开的文件句柄 :ulimit -n 65535 34.10 运行时报错 1. 报错信息与Scala有关,如类型不能转换,方法找不到等等,先检查下,编译对象依赖的Scala版本,有以下两种可能: a) 运行环境的scala版本与编译时使用的版本不一致 b) 引进的依赖包,如scalatest等,编译时的scala版本与运行时的版本不一致。 错误信息如: 1. java.lang.ClassCastException 2. Exception in thread "main" java.lang.NoSuchMethodError: scala.collection.immutable.HashSet$.empty()Lscala/collection/immutable/HashSet 根本原因:scala是向前兼容,却不支持向后兼容 参考:http://www.scala-lang.org/node/11466 34.11 文件编码错误 Txt文件的编码:在不同情况下,会添加UTF-8等格式对应的文件头标识信息。 故,仅在读取txt文件的第一行时,出现异常数据,影响分析结果。 34.12 单节点多应用启动失败 单节点多应用:指在单个节点上,运行了多个应用程序(driver)—— 包含client或cluster模式。 1. 问题:在同一个节点启动多个应用 时,报java.net.BindException:地址已在使用,报错信息如下: 2. 原因:driver会启动一个jetty server,此时会绑定一个driver上的默认端口(4040),如果driver所在节点已经启动了一个应用的话,则该端口绑定会失败。 3. 解决方法:查看代码 – SparkUI.scala def getUIPort(conf: SparkConf): Int = { conf.getInt("spark.ui.port", SparkUI.DEFAULT_PORT) } 根据修改对应配置即可,修改方法:通过在提交时,添加:--conf "spark.ui.port"=4041 即可。通过指定配置文件等方式也可以修改。 如果port的值为0,系统就会随机选取一个端口号 —— 参考:java.net. ServerSocket :ServerSocket对象的绑定端口为0,getLocalPort方法返回一个随机的端口(这类端口被称为匿名端口)。 34.13 编译问题 34.13.1 编译应用代码报对象已定义错误 提供者:北京-hadoop-happy 1478017615 错误界面: 解决分析: 从重复加载进行考虑,查看依赖的jar包是否重复有10和11两个Scala版本。 34.13.2 SBT编译工程时报OOM错误 提供者:北京-hadoop-happy 1478017615 环境:jdk 1.8 问题1:sbt的工程,添加上依赖spark-assembly-1.1.2-SNAPSHOT-hadoop1.0.4后,每次打包编译都报错out of memory 解决方法 —— FileàProject Settings ,修改Source处的Language Level 为8 34.13.3 SCALA.REFLECT.INTERNAL.MISSINGREQUIREMENTERROR https://github.com/deanwampler/spark-workshop/blob/master/project/Build.scala 34.13.4 SCALASTYLE检查失败导致编译失败 以maven方式编译为例: 提供者:AndrewHsu 错误描述:编译时,如果加入profile: -Pspark-ganglia-lgpl,或修改Hive版本为1.0.0或1.1.0等,都会导致编译报错,错误提示如下: 解决方法: 对应Bug的地址:https://issues.apache.org/jira/browse/SPARK-6532 对应pom.xml中的相关插件: 在outputEncoding后面添加以下元素: UTF-8  34.14 IDEA 34.15 GIT工具相关 34.15.1 IDEA找不到 GIT.EXE 使用:官网 github 工具。 打开IDEA,有如下错误提示: 1. 环境变量设置 在PATH中添加github安装目录的特定子目录下的bin目录到PATH中,如:安装目录\GitHub\PortableGit_....\bin。设置后打开cmd命令行: 2. 可能遇到的问题 官网安装的 Github中 bin 所在目录会修改,如下: C:\Users\lenovo\AppData\Local\GitHub\PortableGit_ed44d00daa128db527396557813e7b68709ed0e2 C:\Users\lenovo\AppData\Local\GitHub\PortableGit_c2ba306e536fdf878271f7fe636a147ff37326ad 导致IDEA找不到 git.exe 可能原因:在github界面工具上进行了用户切换操作导致 —— 未测试确认 34.16 自动测试代码相关 34.17 TACHYON相关 1. 连接问题 提供者:上海-ml空林玄一 2. 解决方法: 先使用官网runTest,测试Tachyon环境搭建是否有问题。 如果有问题,则重新根据官网信息查询Tachyon环境配置等信息。 34.18 HADOOP相关 On yarn方式提交后运行过程报错: 分析:Token过期,系统时间不同步导致。 34.19 心跳超时错误,错误码143 On yarn 方式启动应用,执行过程中报错 应用:FP算法 分析:BlockManager间心跳通信超时,移除,报executor丢失,远程akka客户端断连。任务重试结果一样。 环境: 1. 局域网网速: 解决方法:修改心跳时间 —— 临时解决方法 —— 未测试。 配置属性:参考源码 类:BlockManagerMasterActor spark.storage.blockManagerSlaveTimeoutMs 扩展: 进一步跟着Executor处的心跳消息 错误码143: spark自身框架终止应用? 对应信息WARN? 界面kill掉stage后的返回错误码143. 1. UI界面: 2. Driver: 34.20 ON YARN 模式,错误码15 Exit code: 15 Stack trace: ExitCodeException exitCode=15: 一般是内部出现错误,比如访问权限等导致? 以上错误是在使用了另一个集群的hdfs时,host错误导致的15错误码。 这是使用文件不存在时的错误信息(集群local文件需要各个节点相同路径下都有该文件),返回错误码也是15。 34.21 环境问题 34.21.1 JDK版本问题 1. 提供者:Spark-暴走蜗牛 2. 问题截图: 3. 对应的jdk版本: 4. 测试: 直接使用:java -Xms2G -Xmx4G HelloWorld 也一样报错 5. 解决方法:修改jdk的版本 改为 jdk-7u71-linux-x64.tar.gz ,测试通过。 35 实战记录 35.1 RDD操作性能类 35.1.1 RDD窄依赖的PIPELINE分析 Pipeline针对的是父子RDD分区数据处理,从源码角度上分析,其流程为: 父RDD分区的compute(……) à 子RDD分区的compute (……) 分析过程: 在compute过程中,会iterator各分区的记录, 即,每个compute过程都会对分区数据进行iterator,因此pipeline应该是针对分区数据的pipeline,而不是更彻底的对分区记录数据的pipeline。 如Filter + Mapper 测试记录待补充。 35.1.2 GROUPBYKEY的性能分析 提供者:Spark亚太研究院 —— 厦门-spark-小智 实践结论:groupByKey操作的效率很低,慎用。 原因分析:groupByKey的mapSideCombine为false,就是在map端不做合并。 Harli分析扩展 —— 与Hadoop比较: 1. 从Shuffle过程的源码开始分析,mapSideCombine参数在map端的使用部分。 2. 比对Hadoop类似代码:MapReduce编程模型中的Combiner组件。          每一个map都可能会产生大量的本地输出,Combiner的作用就是对map端的输出先做一次合并,以减少在map和reduce节点之间的数据传输量,以提高网络IO性能,是MapReduce的一种优化手段之一。  (蓝色部分参考—— http://flyingdutchman.iteye.com/blog/1880277) Combiner需要手动去设置,实现优化目的,通常使用的方法和reducer一样。 1. 对应在spark中,如reduceByKey转换中,已经通过设置mapSideCombine及其对应的aggregate函数实现与Hadoop相同的优化方法。 2. 对应在spark的groupByKey中,只进行group,没有reducer部分,所以不存在map端的这部分优化,因此会增加数据IO消耗。 35.1.3 JOIN的性能优化分析 转载董西成老师的博客: http://dongxicheng.org/framework-on-yarn/apache-spark-join-two-tables/ 在大数据处理场景中,多表Join是非常常见的一类运算。为了便于求解,通常会将多表join问题转为多个两表连接问题。两表Join的实现算法非常多,一般我们会根据两表的数据特点选取不同的join算法,其中,最常用的两个算法是map-side join和reduce-side join。本文将介绍如何在apache spark中实现这两种算法。 (1)Map-side Join Map-side Join使用场景是一个大表和一个小表的连接操作,其中,“小表”是指文件足够小,可以加载到内存中。该算法可以将join算子执行在Map端,无需经历shuffle和reduce等阶段,因此效率非常高。 在Hadoop MapReduce中, map-side join是借助DistributedCache实现的。DistributedCache可以帮我们将小文件分发到各个节点的Task工作目录下,这样,我们只需在程序中将文件加载到内存中(比如保存到Map数据结构中),然后借助Mapper的迭代机制,遍历另一个大表中的每一条记录,并查找是否在小表中,如果在则输出,否则跳过。 在Apache Spark中,同样存在类似于DistributedCache的功能,称为“广播变量”(Broadcast variable)。其实现原理与DistributedCache非常类似,但提供了更多的数据/文件广播算法,包括高效的P2P算法,该算法在节点数目非常多的场景下,效率远远好于DistributedCache这种基于HDFS共享存储的方式,具体比较可参考“Performance and Scalability of Broadcast in Spark”。使用MapReduce DistributedCache 时,用户需要显示地使用File API编写程序从本地读取小表数据,而Spark则不用,它借助Scala语言强大的函数闭包特性,可以隐藏数据/文件广播过程,让用户编写程序更加简单。 假设两个文件,一小一大,且格式类似为: Key,value,value Key,value,value 则利用Spark实现map-side的算法如下: var table1 = sc.textFile(args(1)) var table2 = sc.textFile(args(2)) // table1 is smaller, so broadcast it as a map var pairs = table1.map { x => var pos = x.indexOf(',') (x.substring(0, pos), x.substring(pos + 1)) }.collectAsMap var broadCastMap = sc.broadcast(pairs) //save table1 as map, and broadcast it // table2 join table1 in map side var result = table2.map { x => var pos = x.indexOf(',') (x.substring(0, pos), x.substring(pos + 1)) }.mapPartitions({ iter => var m = broadCastMap.value for { (key, value) <- iter if(m.contains(key)) } yield (key, (value, m.get(key).getOrElse(""))) }) result.saveAsTextFile(args(3)) //save result to local file or HDFS (2)Reduce-side Join 当两个文件/目录中的数据非常大,难以将某一个存放到内存中时,Reduce-side Join是一种解决思路。该算法需要通过Map和Reduce两个阶段完成,在Map阶段,将key相同的记录划分给同一个Reduce Task(需标记每条记录的来源,便于在Reduce阶段合并),在Reduce阶段,对key相同的进行合并。 Spark提供了Join算子,可以直接通过该算子实现reduce-side join,但要求RDD中的记录必须是pair,即RDD[KEY, VALUE],同样前一个例利用Reduce-side join实现如下: var table1 = sc.textFile(args(1)) var table2 = sc.textFile(args(2)) var pairs = table1.map{x => var pos = x.indexOf(',') (x.substring(0, pos), x.substring(pos + 1)) } var result = table2.map{x => var pos = x.indexOf(',') (x.substring(0, pos), x.substring(pos + 1)) }.join(pairs) result.saveAsTextFile(args(3)) (3)总结 本文介绍了Spark中map-side join和reduce-side join的编程思路,希望对大家有借鉴意义。但需要注意的是,在使用这两种算法处理较大规模的数据时,通常需要对多个参数进行调优,否则可能会产生OOM问题。通常需要调优的相关参数包括,map端数据输出buffer大小,reduce端数据分组方法(基于map还是基于sort),等等。 (4)两个问题 问题1:如果在map-side join中,不使用以下语句对文件1进行广播, var broadCastMap = sc.broadcast(pairs) 也可以在后面程序中直接使用变量pairs存储的数据进行join,这两种方式有什么异同,性能会有何不同? 问题2:将map-side join中的以下语句: mapPartitions({ iter =>   var m = broadCastMap.value   for{     (key, value) <- iter     if(m.contains(key))   } yield (key, (value, m.get(key).getOrElse(""))) 改为: var m = broadCastMap.value //这一句放在var table2 = sc.textFile(args(2))后面 map {case (key, value) =>   if(m.contains(key)) (key, (value, m.get(key).getOrElse(""))) } 最终结果是有问题的,为什么? 本文两个示例程序可以从百度网盘上下载,地址为Spark-Join-Exmaple。 36 参考资料 36.1 SHARK Shark是基于Spark上的“Hive”,看看基于hadoop的Hive: 而Shark的结构图: Hive是记录每行记录一个对象,而shark是每列记录: 执行SQL时间对比: 36.2 SPARK的适用场景 1. Spark是基于内存的迭代计算框架,适用于需要多次操作特定数据集的应用场合。需要反复操作的次数越多,所需读取的数据量越大,受益越大,数据量小但是计算密集度较大的场合,受益就相对较小 2. 由于RDD的特性,Spark不适用那种异步细粒度更新状态的应用,例如web服务的存储或者是增量的web爬虫和索引。就是对于那种增量修改的应用模型不适合。 37 参考文献 [1] http://www.infoq.com/cn/articles/spark-core-rdd [2] http://www.csdn.net/article/2014-01-28/2818282-Spark-Streaming-big-data [3] http://blog.sina.com.cn/s/blog_6f505d7101010j03.html [4] http://database.51cto.com/art/201407/445881.htm [5] http://mmicky.blog.163.com/blog/static/1502901542014312101657612/ [6] http://mp.weixin.qq.com/s?__biz=MjM5NTc2MTg3Mw==&mid=201641685&idx=1&sn=1b75be3d774bb3f26b6714674dbefc64&scene=2&from=timeline&isappinstalled=0#rd Spark1.2新特性概述 [7] http://spark-config.readthedocs.org/en/latest/ [8] http://databricks.com/blog/2015/01/15/improved-driver-fault-tolerance-and-zero-data-loss-in-spark-streaming.html

下载文档到电脑,查找使用更方便

文档的实际排版效果,会与网站的显示效果略有不同!!

需要 10 金币 [ 分享文档获得金币 ] 2 人已下载

下载文档