田毅 - Spark 应用案例分析


Spark应用案例分析 About Me •  田毅 tianyi.asiainfo@gmail.com •  @亚信科技-田毅 •  Spark社区Contributor •  北京SparkMeetup的发起人 •  主要关注SparkSQL与Spark Streaming 目录 •  Spark的优势和收益 •  Spark与现有Hadoop生态的互操作性 •  Spark实践分享 •  使用Spark的建议 Spark的优势和收益 Spark的优势和收益 •  统一的数据处理架构 – 混合使用多种计算类型操作 – 兼容大量数据源类型 •  降低开发复杂度 – 丰富的API – 可读性强 •  高性能的执行引擎 统一的数据处理架构 SparkSQL: 批处理、关系查询、交互式查询 SparkStreaming: 流式计算 MLlib: 以机器学习为代表的迭代型计算 GraphX: 图计算 多种数据处理场景的统一实现 SPARK-CORE SQL 分析计算 STREAMING 流计算 MLlib 机器学习 GraphX 图计算 User Application API层 核心层 应用层 统一的数据处理架构 通过RDD良好的扩展性, 可以方便的扩展Spark支持的数据源 目前数据源方面已经支持HDFS, JDBC, 数据格式支持JSON, Parquet等. Spark将在1.2以及以后的版本中提供统一的数据源API以支持用户自定义 数据源的扩展. 因此, 后续Spark将可以构建多数据源的统一数据处理平台 Spark HDFS JDBC HBase Cassan dra … 统一的数据处理架构 •  传统方式: –  复杂的批量数据处理: HDFS + MR + Hive –  基于历史数据的交互式查询: HDFS + Impala –  基于实时数据流的数据处理: Storm •  存在如下问题: –  数据交互难 –  资源争抢 –  人员技能分散 •  使用Spark带来的好处 –  人员组织简单 –  数据交互方便 –  资源统一调配 降低开发复杂度 •  Spark 统一计算模型 RDD+DAG – 对Spark用户提供丰富的API – 对Spark本身的运行机制更容易理解 •  代码量的大大减少 – Hadoop 2470k – Hive 925k – Impala 2320k – Spark 291k 降低开发复杂度 •  提升代码可读性 Spark: val lines = ssc.textFileStream(args(0)) val words = lines.flatMap(_.split(" ")) val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _) MapReduce: public static class TokenizerMapper extends Mapper{ private final static IntWritable one = new IntWritable(1); private Text word = new Text(); public void map(Object key, Text value, Context context) throws IOException, InterruptedException { StringTokenizer itr = new StringTokenizer(value.toString()); while (itr.hasMoreTokens()) { word.set(itr.nextToken()); context.write(word, one); } } } …… 高性能的执行引擎 •  天下武功,唯快不破 •  和MR相比, Spark的性能优势主要体现在两 个地方: – 高效的事件触发机制+多线程的执行机制,使得 Task的执行非常高效, 启动时间达到亚秒级 – DAG 的编程范式大大的减少了数据分析计算过 程中的持久化落地动作 高性能的执行引擎 Spark颠覆MapReduce 保持的排序记录 Yahoo 2013年的记录 海量数据的SORT性能 10%的机器数量 12%的CPU核数 3倍集群速度 Spark与现有Hadoop生态的互操作性 Spark与Hadoop版本 •  Spark支持多个Hadoop版本的实现 •  众多Hadoop商业版本提供商也纷纷收录 Spark到最新的发行版本 – Cloudera, Hontonworks, MapR, Pivotal Spark与Yarn •  目前Spark提供了yarn-cluster和yarn-client两种 模式支持Spark程序在yarn上运行 •  实际应用中,比较常用: –  yarn-client模式:有交互需求的常驻应用 •  使用spark-shell来执行scala脚本 •  使用thrift-server来执行hql脚本 –  yarn-cluster模式:无交互需求的临时应用 •  使用spark-submit来执行spark应用程序 •  将Spark运行在Yarn上的好处 –  可以与MR良好的协调使用资源 –  可以通过Hadoop的安全机制运行在Kerberized cluster Spark与Hive •  Spark1.0之前,通过Shark实现Hive的大部分功 能 •  从Spark 1.0开始, Spark增加了一个重要的组 件, Spark SQL •  Spark SQL目前主要通过HiveContext实现了: –  访问Hive仓库中所有数据的能力 –  大部分HQL语法的解析和执行 •  Hive on Spark(HIVE-7292) 目前正在开发之 中, 很快就可以成为hive的第三个执行引擎 (前两个是mr和tez) Spark实践分享 Spark实践分享 •  批量处理离线数据 •  多维数据分析查询 •  信令数据的实时处理 •  使用mllib预测用户行为 在企业数据仓库中使用Shark ODS层(明细数据) 原始数据(DB2, Oracle, CSV) DWD层(汇总数据) DW层(不同维度关联汇总) ST层(指标, 报表展示数据) Hadoop (Hive) RDBMS 模型分层 技术分层 在企业数据仓库中使用Shark •  问题: –  部分程序迁移到hive后, 执行 速度变慢 •  使用Shark的目的: 提速 –  小数据量的模型计算 –  步骤较多的模型计算 •  使用Shark的方式: –  以Standalone方式运行, 使用 主机的20%计算资源 –  MR使用主机80%计算资源 –  切换使用Hiveserver与 Sharkserver来实现不同的计算 方式 Applicaiton SharkServer HiveServer HDFS Spark MapReduce 在企业数据仓库中使用Shark 说明: hive使用80%资源 Shark使用20%资源 提速效果根据不同应用特点 不尽相同 相对流程较复杂的应用优化 效果更好 如果部分临时表使用内存表 的方式效果更佳 TCL名 Hive Shark 时间减少 dw_new_comp_town_ds.tcl 77 21 72.73% co_dtdsM_gd.tcl 35 9 74.29% co_dtdyM_new.tcl 26 18 30.77% nb_atvdt_td.tcl 32 12 62.50% pr_stcdr.tcl 31 8 74.19% ac_dwPrtAcctdM.tcl 92 20 78.26% dw_new_chuanka_user_town_ds.tcl 95 22 76.84% ac_dwGrpPtdMnew.tcl 67 26 61.19% dw_new_chuanka_lt_town_ds.tcl 55 18 67.27% nb_dwmmsdM.tcl 34 13 61.76% nb_wland.tcl 33 13 60.61% cr_24houroutdM.tcl 24 16 33.33% 在企业数据仓库中使用Shark 存在的问题 [现象]: 某些SQL的运行长时间不结束,并且不断的有FullGC出现,最终出现OutOfMemory的现象任 务失败 [分析]: •  spark 在 agg 和 cogroup 时,通过spill机制, shuffle 数据可临时写入磁盘; •  shark 重写了CoGroupRDD类,使用hashMap进行数据存储,但不具备spill功能 [结论]: •  在处理大数据关联的场景时, 在同一个Group下数据量过大, shark无法支撑 •  解决办法之一, 选择shark与hive并存,通过yarn实现资源共享 •  解决办法之二, 增大Reduce Task的数量 SparkSQL目前也存在类似的问题 关键是针对不同场景下数据的分布规律, 无法用一种方法同时解决性能与稳定性这两 个问题 多维数据分析查询 市场部策划了一个营销活动,为了在有限的成本下提升营 销效果,怎样精确定位客户群,准确选择目标客户? 创建客户群,“潜 在终端营销用户”,进行营销 根据业务经验,筛选标签 多维数据分析查询 表名 周期 列数 SPARK_DATE 日周期 284 SPARK_MONTH 月周期 1748 最终这条查询时间平均使⽤了 2秒 Client Thri Server Spark Cluster 多维数据分析查询 问题->解决思路: 1 数据和task分布不均,跨节点访问造成部分task慢 尽量增加hdfs上的备份数,减少数据跨节点访问 2 Spark没有创建足够多的Task并发执行 增加HDFS文件block数, 增加文件数 3 磁盘IO耗时较长 使用RCFile格式+GZ压缩存储 4 并发查询较多时,查询耗时变长 需要增加ThriftServer的内存, 避免大量GC 5 序列化的CPU消耗较大 选用kyro序列化方式 结论: 从性能角度看,使用SparkSQL作为多维数据分析查询与搜索引擎相比不是最佳的实 践, 性能结果只能勉强可用 从系统整体角度看, 如果在整体业务中,这部分不是核心功能,使用SparkSQL也是一 个很好的选择 多维数据分析查询 0 100 200 300 400 500 600 700 sql1 sql2 sql3 sql4 sql5 sql6 sql7 sql8 sql9 sql10 sql11 sql12 sql13 sql14 sql15 sql16 sql17 sql18 sql19 sql20 sql21 sql22 sql23 30并发4节点 30并发8节点 扩展性能测试 多维数据分析查询 0 5 10 15 20 25 30 35 40 45 sql1 sql2 sql3 sql4 sql5 sql6 sql7 sql8 sql9 sql10 sql11 sql12 8节点1并发 8节点5并发 8节点10并发 8节点20并发 8节点30并发 并发性能测试 实现信令数据的实时处理 场景描述: •  输入数据 –  用户信令数据: 每秒钟5w条 •  业务需求 –  沉淀: 形成用户实时位置信息和行为轨迹 –  匹配: 合适的业务, 对用户进行主动营销 –  需要支持多业务的扩展 •  输出数据 –  用户实时位置信息 –  用户历史行为轨迹 –  需要进行业务营销的用户信息 实现信令数据的实时处理 外 围 系 统 消息队列 增量 数据 增量 数据 … 增量 数据 Spark Streaming HBase 关联数据 预处理 消息队列 业务流程 业务流程 增量 数据 Hbase每秒支持15W Req 每条记录需要交互查询4-8次 无法满足要求 实现信令数据的实时处理 外 围 系 统 消息队列 增量 数据 增量 数据 … 增量 数据 Spark Streaming 预处理 消息队列 HDFS 动态数据 只读数据 业务流程 业务流程 增量 数据 广 播 变 量 关 联 更 新 实现信令数据的实时处理 •  预处理流程业务: –  沉淀用户实时位置信息和行为轨迹 –  在流数据上增加用户历史位置信息 •  实现方式: val cogroup = leftRDD.cogroup(rightRDD).map { … … (lout, rout) } cogroup.cache() cogroup.flatMap(_._1)… cogroup.flatMap(_._2)… 流入数据 用户历史 输出数据 用户历史 更新 COGroup 实现信令数据的实时处理 •  子流程业务(举例): –  判断是否校园用户(根据校园基站列表) –  判断是否营销对象(根据用户资料表与营销规则表) –  对比更新营销结果表(避免重复营销) –  数据通过Kafka发给营销系统 •  实现方法: inputDStream.foreachRDD(rdd=>{ rdd. … . registerAsTable(”inputTable") val tempRS = sql(“xxxxx”) tempRS . … . registerAsTable(”tempTable") val result = sql(“xxxxx”) }) •  为什么使用SparkSQL –  这部分逻辑变化较快, 使用SQL便于维护人员修改 (会写scala的人太少了) 流入数据 关联表1 关联表2 临时数据 SQL SQL 输出数据 使用MLlib进行用户行为预测 广告竞价平台 流量过滤 活动匹配 数据处理平台 效果跟踪 收集日志 更新训练 集 训练 更新模型 效果评估 出价 AD Exchange 使用Mllib进行用户行为预测 业务背景: 通过mllib中的逻辑回归算法根据用户历史行为记录预测用户未来的行 为. 举例: 根据用户历史点击广告的记录,判断用户是否会点击指定的广告 实现: LogisticRegressionWithSGD算法 样本: 用户标签, 广告标签, 广告位标签 用户是否点击广告 性能: 阿里云 Yarn集群 25个Executor 2Core 2G内存 5W个维度 21907292条数据, 每次计算耗时243秒 使用Spark的建议 使用Spark的建议 SQL on Spark •  目前虽然有多个方案实现, Shark, SparkSQL, Hive on Spark 但是都不太具 备单独运行在生产系统中的能力 •  解决办法: – 如果业务不是很庞大(包含的SQL不是很多), 推 荐使用Spark的API组织一个Spark应用来实现业 务逻辑 – 使用Hive作为备选方案 使用Spark的建议 使用Spark进行流计算 •  适合场景: –  允许延迟在2-5秒及以上的业务 –  对吞吐量要求较高,且增长迅速的业务 •  Spark进行流计算的优势 –  可以直接写SQL处理数据 –  相对方便的使用Hive或hdfs中的数据 •  使用到外部数据常用方法 –  外部数据量远大于流数据时: •  使用Hbase, redis, memcache等外部系统进行逐条存储, 查询 –  外部数据不是很大,且需要读写操作 •  直接使用Spark读取并关联外部数据以批处理的方式进行查询和更新 –  外部数据较小, 且只读 •  使用广播变量的方式将外部数据提前广播到各个节点 使用Spark的建议 •  使用Spark要考虑到Scala, Akka等新元素 带来的学习成本 •  使用Spark建议先从最适合Spark特点的业 务开始尝试 •  当前Spark上的问题还很多, 有点类似 hadoop 0.20.x版本的阶段 •  使用Spark需要很好的和社区进行关注与互 动 使用Spark的建议 •  如果想了解更多的Spark实践分享, 请关注 Spark在各地的meetup小组活动 •  www.meetup.com •  目前已经有 北京, 上海, 杭州, 深圳四个地方 Q & A @InfoQ infoqchina
还剩40页未读

继续阅读

下载pdf到电脑,查找使用更方便

pdf的实际排版效果,会与网站的显示效果略有不同!!

需要 5 金币 [ 分享pdf获得金币 ] 2 人已下载

下载pdf

pdf贡献者

dfwm

贡献于2014-10-19

下载需要 5 金币 [金币充值 ]
亲,您也可以通过 分享原创pdf 来获得金币奖励!
下载pdf