Spark 介绍与应用案例分析


Spark技术研究与实践分享 About Me ⽥毅 tianyi.asiainfo@gmail.com @亚信科技 -⽥毅 Spark社区 Contributor 北京 SparkMeetup的发起⼈ 主要关注 SparkSQL与 Spark Streaming 提纲 ●  为什么选择Spark ●  Spark实践分享 ●  使用Spark的建议 为什么选择Spark ●  多种计算场景的结合 ●  多数据源的计算中心 ●  活跃的社区支持 多种计算场景的结合 SQL 批处理 Streaming 流处理 MLlib 机器学习 GraphX 图计算 Spark Core (SparkContext+RDD+DAG) User Application 用户应用 随着信息技术的发展 , 越来越多的企业⾯临着复杂计算场景的考验 1 机器学习的不断发展和应⽤ 2 信息时效性决定了流处理技术的重要性 3 传统业务⼈员操作熟练的 SQL编写能⼒ 多种计算场景的结合 假设场景:与新浪微博合作,通过一个消息队列实时接收微博信息,根据指定关 键字过滤消息 传统方案:使用Storm读取消息队列内容,设定Bolt进行关键字过滤 微 博 消 息 队 列 Storm 消 息 队 列 多种计算场景的结合 假设场景:与新浪微博合作,通过一个消息队列实时接收微博信息,根据指定关 键字过滤消息,再通过实时配置SQL对微博进行统计分析,生成实时报表 传统方案: 使用Storm读取消息队列内容,设定Bolt进行关键字过滤,将结果写入HDFS 使用Hive或者Impala实现SQL统计分析 微 博 消 息 队 列 Storm H D F S Hive/ Impala 多种计算场景的结合 假设场景:与新浪微博合作,通过一个消息队列实时接收微博信息,根据指定关键字过滤消息,通过机 器学习,对关键字不断进行调整,再通过实时配置SQL对微博进行统计分析,生成实时报表; 传统方案: 使用Storm读取消息队列内容,设定Bolt进行关键字过滤,将结果写入HDFS 使用Hive或者Impala实现SQL统计分析 使用Mahout实现机器学习算法,将训练后的算法模型回传给Storm 微 博 消 息 队 列 Storm H D F S Hive/ Impala Mahout 多种计算场景的结合 Spark方案: 优势: 1 同一套架构,学习成本较低 2 资源可统一规划 3 流计算与Machine Learning数据交互简单 微 博 消 息 队 列 Spark Streaming + MLlib 消 息 队 列 商业产品:Databricks Cloud 开源产品:zeppelin https://github.com/NFLabs/ zeppelin 多数据源的计算中心 对于大多数公司来说, 数据会根据应用场景被存储到多种数据源 以我们熟悉的电信行业举例: 但是, 这些数据单独应用只能满足企业内部若干独立的应用场景 想要真正的从数据中获得最大的价值, 必须让所有数据关联到一起进行计算分析 数据类型 举例 应用场景 存储方式 单据类数据 通信详单, 账单 随机查询 HBase, Cassandra 日志类数据 信令数据, 应用 日志 汇总分析 HDFS 关系类数据 用户资料, 订购 关系 实时更新, 关联 查询 RDBMS 多数据源的计算中心 HDFS HBASE RDBMS MR ETL 计算引擎 复杂的数据同步流程 极⼤消耗⺴络带宽和存储资源 多数据源的计算中心 Spark 1.1.0 通过扩展RDD实现外部数据访问 HDFS HBASE RDBMS Spark HadoopFileRDD 使⽤ RDD扩展存在的问题 : 只能全量获取 , ⺴络压⼒⼤ ,⽆⽤传输太多 引⽤ Cheng Lian@Databricks在 Meetup上的 slide 多数据源的计算中心 BaseRelation PrunedScan TableScan PrunedFilteredScan Spark 1.2.0 – External Datasource API 全量扫描 指定列扫描 根据 Filter指定列扫描 (Parquet and ORC) 尽可能将列过滤与⾏过滤在 Server端进⾏ , 降低传输 ⼤⼩ 可以更好的利⽤ 数据源的特性 引⽤ Cheng Lian@Databricks在 Meetup上的 slide 多数据源的计算中心 SPARK Hbase user_bill 查询所有开通 GPRS⽤户中 ,各项增值业务的⽤ 户数 ,平均年龄 ,总花费 select t2.businame, count(t2.user), avg(age), sum(t2.fee) from ( select businame, user, sum(fee) as fee from user_bill where busitype in ('sms','gprs') group by businame, user ) t1 left outer join ( select user, age from user_info where user_usegprs=1 ) t2 on t1.user = t2.user where t2.user is not null group by t2.businame DB2 user_info DB2Relation requiredColumns=(user,age) filter=(“user_usegprs=1”) SQL ResultSet RDD HBaseRelation RDD RowSet Filter requiredColumns filter 多数据源的计算中心 企业级数据计算中心 SPARK HBase RMDBS Json Parquet Cassandra HDFS 活跃的社区支持 2013年6月 2014年6月 2014年12月 Contributor数 量 68 255 368 参与贡献的公 司 17 50 未统计 代码行数 63000 175000 239000 活跃的社区支持 活跃的邮件列表: user@spark.apache.org dev@spark.apache.org 日均70+的邮件 JIRA问题收集: 日平均报告/解决 issue 15个 Github Pull Request: 日平均merge pull request 10个 活跃的社区支持 11月刚刚推出的模块维护人制度, 确保每个模块都有至少2个committer专门跟踪维护 - Spark core public API: Matei, Patrick, Reynold - Job scheduler: Matei, Kay, Patrick - Shuffle and network: Reynold, Aaron, Matei - Block manager: Reynold, Aaron - YARN: Tom, Andrew Or - Python: Josh, Matei - MLlib: Xiangrui, Matei - SQL: Michael, Reynold - Streaming: TD, Matei - GraphX: Ankur, Joey, Reynold Spark实践分享 ●  使用Spark实现信令数据的实时营销 ●  使用Spark实现广告竞价效果实时反馈平台 使用Spark实现信令数据的实时营销 场景描述: 输入数据 ●  用户信令数据: 每秒钟5w条 业务需求 ●  沉淀: 形成用户实时位置信息和行为轨迹 ●  匹配: 合适的业务, 对用户进行主动营销 ●  需要支持多业务的扩展 输出数据 ●  用户实时位置信息 ●  用户历史行为轨迹 ●  需要进行业务营销的用户信息 难点 ●  大量的数据查询更新 业务架构 消 息 队 列 主流程 消 息 队 列 业务流程1 业务流程2 业务流程3 数据清洗 数据增强 数据筛选 业务判断 数据筛选 业务判断 数据筛选 业务判断 数据沉淀 技术架构1.0 外围 系统 消息队列 增量 数据 增量 数据 … 增量 数据 Spark Streaming HBase 关联数据 预处理 消息队列 业务流程 业务流程 增量 数据 HDFS 关联数 据1 关联数 据2 关联数 据n 输出 数据 输出 数据 输出 数据 技术架构1.0实践 优化: ●  Spark优化 ○  Kafka接收数据优化:多Topic,多Dstream,Repatition ○  Task并行数量优化 ●  Hbase优化 ○  预建多分区 ○  balance 表现:集群处理吞吐能力无法满足要求 分析 : 集群规模较小, Spark最大并发任务数不到300, 同时Hbase操作平均不到150 Hbase单次request处理时间1-2ms, 每秒处理700(单线程) 总计处理100000每秒 实际需求: 5w * 4 = 200000每秒 结论:机器数量有限时,有限的任务并行度会限制Hbase的吞吐能力 技术架构2.0 外围 系统 消息队列 增量 数据 增量 数据 … 增量 数据 消息队列 HDFS 动态数据 只读数据 增量 数据 广 播 变 量 RDD File Spark Streaming 预处理 业务流程 业务流程 输出 数据 输出 数据 输出 数据 实现信令数据的实时处理 预处理流程业务: 沉淀用户实时位置信息和行为轨迹 在流数据上增加用户历史位置信息 实现方式: val cogroup = leftRDD.cogroup(rightRDD).map { … … (lout, rout) } cogroup.cache() cogroup.flatMap(_._1)… cogroup.flatMap(_._2)… 流⼊数据 ⽤户历史 输出数据 ⽤户历史 更新 COGroup 实现信令数据的实时处理 子流程业务(举例): 判断是否校园用户(根据校园基站列表) 判断是否营销对象(根据用户资料表与营销规则表) 对比更新营销结果表(避免重复营销) 数据通过Kafka发给营销系统 实现方法: inputDStream.foreachRDD(rdd=>{ rdd. … . registerAsTable(”inputTable") val tempRS = sql(“xxxxx”) tempRS . … . registerAsTable(”tempTable") val result = sql(“xxxxx”) … }) 流⼊数据 关联表 1 关联表 2 临时数据 SQL SQL 输出数据 使用Spark实现广告竞价效果实时反馈平台 广 告 位 目标网站 XX 网站 DSP 平台 广告 交易 平台 浏览网页 点击广告 跳转 竞价请求 出价 浏览记录 购买记录 竞价成功 报表 系统 使用Spark实现广告竞价效果实时反馈平台 业务需求1: 1 实时收集所有出价记录,竞价成功记录,浏览记录和购买记录 2 按广告位统计: 最近2000次的竞价成功次数 最近2000次的平均成功价格 最近2000次的点击比率 3 将实时统计结果反馈到竞价模块对竞价策略进行调整 业务需求2: 1 按广告主统计: 出价次数 花费金额 转化率等等指标 2 将实时统计结果更新到报表模块展示 技术难点: 1 数据量较大,每秒消息数量在3-5万 2 不按照常用的时间窗口统计,而按照竞价次数统计 Spark Streaming 技术架构 DSP 平 台 HDFS 日志1 Spark Streaming 日志1预处理 日志2 日志3 日志4 日志2预处理 日志3预处理 日志4预处理 Spark Streaming 竞价统计反馈 报表统计 消息队列 技术架构 DSP 平 台 HDFS 日志1 Spark Streaming 日志1预处理 日志2 日志3 日志4 日志2预处理 日志3预处理 日志4预处理 竞价统计反馈 报表统计 UNION &CACHE 预处理 数据清洗: DStream.filter: 清洗非法格式数据 DStream.map: 清洗不使用的数据字段 数据聚合: DStream.reduceByKey: 对数据进行统计 聚合维度: 广告位 广告主 格式转换: DStream.map: 将数据转换格式为统一格式 数据清洗 数据聚合 格式转换 数据获取 竞价统计反馈 数据获取 数据聚合 数据输出 数据获取: DStream.filter: 按需获取需要的数据 数据聚合: DStream.updateStateByKey: 对每个⼲告位的状态 (统计信息 )进⾏更新 其中 State可以是⾃定义的 class 数据输出: DStream.mapPartition: 将数据输出到指定的接口 (http或者JDBC) SparkStreaming实施中的问题 Hdfs 文件正在生成时文件后缀问题 java.io.FileNotFoundException: File does not exist: / user/streaming/tmp/test/bidinput/2bid.gz._COPYING_ 产生原因: SparkStreaming读取目录时没有过滤正在拷贝的文件 Patch: [SPARK-4314] SparkStreaming实施中的问题 FileInputDStream只能读取单级目录 对于这样的目录层级, 无法使用SparkStreaming读取 --data |--20141201 |--20141202 |--20141203 |--20141204 Patch: [SPARK-3586] SparkSQL相关Patch HashOuterJoin优化 [SPARK-4483] 通过单表遍历的⽅式 , 替换原有两边 HashMap的 Join⽅式 100万 join 1万 性能对⽐ , 性能提升 16%, 内存消耗减少 70% master: 耗时 : 12671 ms 耗时 : 9021 ms 耗时 : 9200 ms Current Mem Usage:787788984 after patch: 耗时 : 10382 ms 耗时 : 7543 ms 耗时 : 7469 ms Current Mem Usage:208145728 SparkSQL相关Patch BroadcastHashOuterJoin优化 [SPARK-4485] 通过 Broadcast实现⼩表在 Map端实现 OuterJoin 性能对⽐ , 性能提升 7倍 Original: left outer join : 15439 ms right outer join : 9707 ms Optimized: left outer join : 1992 ms right outer join : 1288 ms SparkSQL相关Patch 重要Feature: 动态分区功能[SPARK-3007] Window函数功能[SPARK-1442] 使用Spark的建议 ●  如何与社区互动 ●  参加meetup活动 如何与社区互动 最简单的方法:加入spark-user邮件组 发送邮件到:user-subscribe@spark.apache.org (引用@连城404 在beijing meetup上的分享《Spark社区协作指南》) ●  更多人关注,可以及时得到丰富翔实的答案 ●  更易于积累,供自己和他人日后检索 ●  篇幅不受限的富文本支持,可以清晰详尽地描述问题 ●  英语?不是问题(惧怕英语才是问题) 参加meetup活动 2014年8月开始,在北京组织了中国首个 Spark Meetup小组,活动的宗旨是:更好 的推广Spark技术,推进中国Spark使用者 的技术交流。 12月13日刚刚举行了第4次 meetup(SparkSQL专题)活动 活动得到Databricks公司的多名技术人员支 持,并且汇集了多名国内Spark技术专家 国内Spark Meetup人数最多且最活跃的小 组,全世界Spark Meetup人数排名第7 全国范围目前还有上海,杭州,深圳组织了 各种活动 http://www.meetup.com/ 搜索 beijing spark 按照惯例 加入我们的团队 Email to : tianyi@asiainfo.com 我们有: 大量的大数据项目实践机 会 50+的Spark Patch提交记 录 请和我们一起把Spark做的 更好 谢谢
还剩41页未读

继续阅读

下载pdf到电脑,查找使用更方便

pdf的实际排版效果,会与网站的显示效果略有不同!!

需要 8 金币 [ 分享pdf获得金币 ] 6 人已下载

下载pdf

pdf贡献者

wdc7

贡献于2015-02-16

下载需要 8 金币 [金币充值 ]
亲,您也可以通过 分享原创pdf 来获得金币奖励!
下载pdf