Storm分布式实时计算模式


大数据技术丛书 Storm 分布式实时计算模式 Storm Blueprints:Patterns for Distributed Real-time Computation (美) P. Taylor Goetz Brian O’Neill 著 董昭 译 图书在版编目(CIP)数据 Storm 分布式实时计算模式 / ( 美 ) 吉奥兹 (Goetz, P. T.), ( 美 ) 奥尼尔 (O’Neill, B.) 著; 董昭译 . —北京:机械工业出版社,2014.11 (大数据技术丛书) 书名原文:Storm Blueprints: Patterns for Distributed Real-time Computation ISBN 978-7-111-48438-7 I. S… II. ①吉… ②奥… ③董… III. 数据处理软件 IV. TP274 中国版本图书馆 CIP 数据核字(2014)第 253835 号 本书版权登记号:图字:01-2014-5893 P. Taylor Goetz,Brian O’Neill :Storm Blueprints: Patterns for Distributed Real-time Computation (ISBN: 978-1782168294). Copyright © 2014 Packt Publishing. First published in the English language under the title“ Storm Blueprints: Patterns for Distributed Real-time Computation”. All rights reserved. Chinese simplified language edition published by China Machine Press. Copyright © 2015 by China Machine Press. 本书中文简体字版由 Packt Publishing 授权机械工业出版社独家出版。未经出版者书面许可,不得以任何方式复制 或抄袭本书内容。 Storm 分布式实时计算模式 出版发行:机械工业出版社(北京市西城区百万庄大街 22 号 邮政编码:100037) 责任编辑:吴 怡 责任校对:董纪丽 印  刷: 版  次:2015 年 1 月第 1 版第 1 次印刷 开  本:186mm×240mm 1/16 印  张:16.75 书  号:ISBN 978-7-111-48438-7 定  价:59.00 元 凡购本书,如有缺页、倒页、脱页,由本社发行部调换 客服热线:(010)88378991 88361066 投稿热线:(010)88379604 购书热线:(010)68326294 88379649 68995259 读者信箱:hzjsj@hzbook.com 版权所有 • 侵权必究 封底无防伪标均为盗版 本书法律顾问:北京大成律师事务所 韩光 / 邹晓东 The Translator s Words 译 者 序 大数据概念在各行业已然形成了热潮,犹胜当年的云计算,近期甚至被列入了国家重 点发展规划。DataSift 利用 Twitter 上的情感监控预测 Facebook 股价波动,Google 预测世 界杯比赛结果,大数据应用的生动案例每每会引发无限遐想:大数据能否对我们所处的行 业或领域带来新气象、新思路?迈出尝试的第一步非常重要。 要从海量数据中提取加工对业务有用的信息,选取合适的技术将事半功倍,省去了重 新造轮子的烦恼。对海量数据进行批处理运算,Hadoop 依旧保持着无法撼动的地位。但 在对实时性要求较高的应用场景中,Hadoop 就显得力不从心。它需要将数据先落地存储 到 HDFS 上,然后再通过 MapReduce 进行计算。这样的批处理运算流程使它很难将延时 缩小到秒级。 Storm 是基于数据流的实时处理系统,提供了大吞吐量的实时计算能力。每条数据到 达系统时,立即在内存中进入处理流程,并在很短的时间内处理完成。实时性要求较高的 数据分析场景,都可以尝试使用 Storm 作为技术解决方案。 我们已经决定利用大数据改善所在领域的工作,并选定 Storm 实时流式计算框架 作为技术解决方案。这时候的问题是,如何将 Storm 和工作中的实际场景关联起来?这 个开源项目的文档并不是非常丰富,源码中示例也很简单。类似的问题可能困扰过不少 Storm 用户。 我在看到本书英文版的介绍时,就感觉到,这正是我想要的,早有这本书能省去多 少学习成本!本书并没有非常深入介绍 Storm 的内部实现,而是一本应用指南。其中最有 价值的部分,是通过大量翔实的示例,使用 Storm 解决不同的实际应用场景,提出多种基 IV 于 Storm 的设计模式。读者完全可以参考书中示例和源码,来设计并实现自己的 Storm 应 用。书中还简要介绍了 Storm 基本概念,以及大规模部署集群的方案,这些都是非常实用 的内容。 作为 Storm 的一个忠实用户,能够承担本书的翻译工作实属荣幸。翻译的过程,也是 深入学习加深了解的过程。学到作者丰富的 Storm 实践经验,是本次翻译的最大收获。希 望这些经验也能够帮助读者少走弯路,快速高效地使用这个工具。 翻译过程中得到了很多人的帮助。首先感谢家人的包容和支持,困难时总有你们的鼓 励。感谢腾讯安全平台部的同事们在学习工作中给予的帮助。感谢好友何双宁在翻译过程 中提出的建议和探讨。感谢机械工业出版社编辑们的信任和支持。 非常高兴能将这本书分享给大家,也期望有兴趣的朋友一起探讨,共同进步。如果你 有任何问题和建议,请联系我 (appledzshr@live.cn)。 Preface 前  言 目前对信息高时效性、可操作性的需求不断增长,这要求软件系统在更少的时间内能 处理更多的数据。随着可连接设备数量不断增加,以及在众多行业领域广泛应用,这种信 息需求已无处不在。传统企业的运营系统被迫处理原先只有互联网企业才会遇到的大规模 数据。这种重大转变正不断瓦解传统架构和解决方案,传统上会将在线事务处理和离线分 析分割开来。与此同时,人们正在重新勾勒从数据中提取信息的意义和价值。软件框架和 基础设施也在不断进化,以适应这种新场景。 具体地说,数据的生成可以看作一连串发生的离散事件,这些事件流会伴随着不同的 数据流、操作和分析,都会由一个通用的软件框架和基础设施来处理。 Storm 正是最流行的实时流计算框架之一,它提供了可容错分布式计算所要求的基本 原语和保障机制,可以满足大容量关键业务应用的需求。它不但是一套技术的整合,也是 一种数据流和控制的机制。很多大公司都将 Storm 作为大数据处理平台的核心部分。 尝试使用本书中介绍的设计模式,你将学到开发、部署、运营数据处理的流程,它具 有每天或者每小时处理上亿次事务的能力。 本书介绍了多种分布式计算相关的主题,包括设计和集成的模式,还介绍了这些技术 常见的适用领域和具体应用。本书通过实际示例,从最简单的 topology 出发,首先向用户 介绍了 Storm 基础,然后通过更复杂的示例,逐步引入 Storm 的高级概念、更细致的部署 方案以及运营中需要关注的事项。 主要内容 第 1 章介绍了使用 Storm 进行分布式流式计算的核心概念。分布式单词计数这个例 VI 子中展示的数据结构、技术和设计模式都是后续进行更复杂计算的基础。在该章中,我 们会对 Storm 计算架构有一个基本了解。还将学会搭建 Storm 开发环境,了解开发和调试 Storm 应用的技术。 第 2 章进一步介绍 Storm 技术架构和安装部署 Storm 集群的过程。在该章中,我们会 通过配置工具 Puppet 来自动化安装和部署一个多节点 Storm 集群。 第 3 章主要介绍 Trident topology。Trident 在 Storm 基础之上提供了高级抽象,抽象了 事务处理和状态管理的细节。该章使用 Trident 框架处理、聚合、过滤来自传感器的数据, 以检测传染病是否爆发。 第 4 章介绍使用 Storm 和 Trident 进行实时趋势分析。实时趋势分析引入了在数据流 中进行识别的模式。在该章中,你将会整合 Apache Kafka 队列并且通过实现一个滑动窗口 来计算移动平均数。 第 5 章介绍了使用 Storm 进行基于图的数据分析,首先将数据持久化到图形数据库, 再通过查询数据来发现关系。图形数据库将数据按照顶点、边、属性的图形结构进行存 储,聚焦于实体间的关系。在该章中,我们将 Strom 和一种流行的图形数据库 Titan 进行 整合,使用 Twitter 作为数据源。 第 6 章介绍在 Storm 上使用递归实现一个典型的人工智能算法。该章展现了 Strom 的局限性,并检视设计模式来适应这些局限。通过分布式远程调用(Distributed Remote Procedure Call, DRPC),你会实现一个提供同步查询服务的 Storm topology,用来决定井 字棋游戏下一步怎样走最好。 第 7 章演示整合 Storm 和非事务型系统的复杂性。为了支持这种整合,介绍一种通过 ZooKeeper 进行分布式状态管理的设计模式。该章通过整合 Strom 和开源探索性分析架构 Druid,实现一个可配置的实时系统来分析金融事件。 第 8 章介绍 Lambda 系统架构的概念,结合实时系统和批处理来构建一个可纠错的 分析系统。在第 7 章的基础上,你将会融入 Hadoop 的基础设施并且检视如何使用一个 MapReduce job 对 Druid 中可能出现的主机故障事件进行纠错。 第 9 章演示将一个 Hadoop 上运行的 Pig 语言编写的批处理 job 转化为一个实时的 Storm topology。你可以利用 Storm-YARN 来实现这个功能,这个工具可以使用户使用 YARN 来部署和运行 Storm 集群。在 Hadoop 上运行 Storm 系统,企业可以在同一套基础 VII 设施上同时运行与利用实时和批处理系统。 第 10 章提供了在云环境下运行和部署 Storm 系统的最佳实践。详细地说,你可利用 一套为云计算服务的库 Apache Whirr,在 Amazon Web Services(AWS)Elastic Compute Cloud(EC2)上部署和配置 Storm 及其相关的支撑组件。此外,你还可以利用 Vagrant 工 具在虚拟机环境下建立开发和测试的集群环境。 预备知识 本书中用到的软件如下表所示。 章  节 需要的软件 1 Storm (0.9.1) 2 ZooKeeper (3.3.5) Java (1.7) Puppet (3.4.3) Hiera (1.3.1) 3 Trident ( 配套 Storm 0.9.1) 4 Kafka (0.7.2) OpenFire (3.9.1) 5 Twitter4J (3.0.3) Titan (0.3.2) Cassandra (1.2.9) 6 无最新软件 7 MySQL (5.6.15) Druid (0.5.58) 8 Hadoop (0.20.2) 9 Storm-YARN (1.0-alpha) Hadoop (2.1.0-beta) 10 Whirr (0.8.2) Vagrant (1.4.3) 面向的读者 初学者和高级用户都可以从本书获益。本书在真实示例的基础上,描述了多种实用的 分布式计算模式。书中介绍了 Storm 和 Trident 的核心原语,以及成功部署和运营系统的 关键技术。 虽然本书主要讲述 Storm 相关的 Java 开发,但其中的设计模式同样适用于其他编程 VIII 语言。书中的小窍门、技术和实现方法对架构师、开发人员和运维人员都具有参考价值。 Hadoop 爱好者会发现,这是一本很好的 Storm 入门书籍,书中举例说明这两种系统 如何优势互补,提供了将批处理运算迁移到实时分析的一种高效途径。 本书提供了 Storm 应用于多个问题和行业的具体示例,这些例子应该能够在其他领域 中举一反三,解决在有限时间内处理大量数据的问题。同时,解决方案设计师、商业分析 师也能从本书介绍的高层系统架构和技术中获益。 读者反馈 我们随时欢迎您的反馈。如果您有任何喜欢或者不喜欢本书的地方,都请告知我们, 这将促使我们优化读者最关心的内容。若有反馈,可发送邮件到 feedbak@packtpub.com, 邮件标题中请包含本书名。 如果您是某一领域方面的专家,并且有兴趣写一本或者参与一本书,请参考作者指南 www.packtpub.com/authors。 用户支持 对于购买了 Packet 出版社书籍的读者,我们有多种途径帮助你最大限度的利用本书。 下载示例代码 可以在 http://www.packetpub.com 下载到通过 packet 账户购买的所有书籍的示例代码。 如果你通过其他途径购买这本书,可以访问 http://www.packetpub.com/support,通过邮箱 注册,然后示例代码会通过邮箱发送。 勘误表 尽管我们已经非常用心确保书籍内容的准确性,仍然有错误可能发生。如果您发现书 中的任何错误(可能是错字或者代码错误)请将错误报告给我们,不胜感激。这样会避免 其他用户对同样的错误造成困惑,有助于我们在后续版本不断完善这本书。勘误报告请在 http://www.packtpub.com/submit-errata 进行上报,先选取本书,点击 errata submission form 链接,然后填入详细的勘误信息即可。勘误内容经确认后就会上传到网站上,或者添加到 书籍当前的勘误表中。当前已知的刊物内容可以通过访问 http://www.packtpub.com/support 进行查看。 IX 盗版 互联网上的盗版行为是所有出版媒体共同面临的问题。Packt 出版社采取严厉措施来 保护在版权和许可。如果你偶然发现互联网上 Packt 出版社的任何作品,任何形式的非法 拷贝,请您立刻提供网址和站点名称以便我们追责。请通过 copyright@packtpub.com 联系 我们提供有盗版嫌疑连接。我们非常感谢这种保护作者和出版社权益的帮助,这将有利于 提供更有价值的作品。 问题 如果您对本书有任何疑问,可以通过 questions@packtpub.com 联系我们,我们会尽力 解答。 作 者 简 介 About the Author P. Taylor Goetz 是 Apache Storm 项目核心贡献者以及发布经理,自2011 年 10 月 Storm 项目首次开源至今都参与其中,具有长期的 Storm 使用和开发经验。作为 Storm 用 户社区中的活跃贡献者,Taylor 领导了一系列开源项目,旨在使企业能够将 Storm 集成到 不同的基础设施上。 目前,他在Hortonworks 公司工作,主导将Storm 集成到Hortonworks 的数据平台 中 (HDP)。在加入 Hortonworks 公司之前,他就职于 Health Market Science 公司,主导了 Storm 与 HMS 公司的下一代主数据管理平台 (Maste Data Management platform) 的集成工 作,涉及 Cassandra 数据库、Kafka 队列、Elastic 搜索引擎以及 Titan 图形数据库等技术。 非常感谢我的妻子、孩子、家人和朋友们,有了你们的爱、支持以及包容 和牺牲,才有了这本书。感激不尽。 Brian O’Neill 是一个丈夫、黑客、徒步旅行家、皮划艇爱好者。他还是渔夫、父亲, 也是大数据的信徒、开拓者以及分布式计算的梦想家。 他已经担任技术主管超过 15 年,被公认为大数据领域的权威。他作为系统架构师, 有着应对各种不同场景的经验,从初创公司到财富五百强公司。他信奉开源精神,对多个 项目做出了贡献。他领导的项目,扩展了 Cassandra 数据库,并且将索引引擎,分布式处 理框架,分析引擎集成到了该数据库中。他荣获了 2013 年 InfoWorld 的技术领导力奖项。 他撰写了关于 Cassandra 的 Dzone 参考文档,被选为 2012 和 2013 年 Datastax Cassandra MVP。 XI 在过去,他作为专家组成员对 Java 社区化进程 (JCP) 做出了贡献,发表过人工智能和 上下文发现领域的专利。他对获取了美国布朗大学的计算机科学学士学位引以为荣。 目前,Brian 就职于 Health Market Science(HMS) 公司,任首席技术官,开发的大数 据分析平台聚焦于数据管理和医疗领域数据分析。平台主要由 Storm 和 Cassandra 构成, 提供了实时数据管理和分析服务。 致我的家人。我的妻子丽莎,我们一起将信仰放飞,它带领我们飞抵云端。 孩子们使我们扎根大地,先人们为我们奠定基石,我们的家人成就了我所有的成 就。没有你们,这本书永远不可能面世。 审校者简介 Vincent Gijsen 是非常好相处的一个人,他对任何技术相关的事情都非常有耐心。他 的背景和感兴趣的领域主要是嵌入式系统工程以及信息科学。他职业生涯初期担任从事市 场研究公司的 IT 经理。此后开办了自己的公司,专攻 VOIP 通信技术。现在,他就职于初 创公司 ScienceRockstars,该公司聚焦于劝导式用户画像分析和大数据。业余时间他喜欢 捣鼓激光发射器、四角直升机模型、eBay 购物、黑客相关的事情,以及啤酒。 Sonal Raj 是一个极客、Python 爱好者、技术狂人。他是 Enfoss 的创办者和行政负责 人。他获取了 Jamshedpur 国家技术学院的计算机科学与技术学士学位。曾担任 SERC 的 研究员,从事分布式计算和实时运算相关的项目。还在 HCL Infosystems 做过实习生。 他曾在 PyCon India 会议发表过关于 Storm 和 Neo4J 的演讲,在一线杂志和国际期刊 上发表过多篇文章和研究论文。 James Xu 是 Apache Storm 项目的核心贡献者,工作在电商领域的 Java/Clojure 工程师。 他热衷于新技术比如 Storm 和 Clojure。目前就职于中国电商的领先平台阿里巴巴集团。 目  录 Contents 译者序 前言 作者简介 第 1 章 分布式单词计数 1 1.1 Storm topology 的组成部分——    stream、spout 和 bolt 1 1.1.1 stream 2 1.1.2 spout 2 1.1.3 bolt 2 1.2 单词计数 topology 的数据流 3 1.2.1 语句生成 spout 3 1.2.2 语句分割 bolt 3 1.2.3 单词计数 bolt 4 1.2.4 上报 bolt 4 1.3 实现单词计数 topology 4 1.3.1 配置开发环境 4 1.3.2 实现 SentenceSpout 5 1.3.3 实现语句分割 bolt 6 1.3.4 实现单词计数 bolt 7 1.3.5 实现上报 bolt 8 1.3.6 实现单词计数 topology 10 1.4 Storm 的并发机制 12 1.4.1 WordCountTopology 的    并发机制 13 1.4.2 给 topology 增加 worker 14 1.4.3 配置 executor 和 task 14 1.5 理解数据流分组 17 1.6 有保障机制的数据处理 20 1.6.1 spout 的可靠性 20 1.6.2 bolt 的可靠性 21 1.6.3 可靠的单词计数 22 总结 23 第 2 章 配置 Storm 集群 24 2.1 Storm 集群的框架 24 2.1.1 理解 nimbus 守护进程 25 2.1.2 supervisor 守护进程的    工作方式 26 2.1.3 Apache ZooKeeper 简介 26 XIII 2.1.4 Storm 的 DRPC 服务    工作机制 27 2.1.5 Storm UI 27 2.2 Storm 技术栈简介 28 2.2.1 Java 和 Clojure 28 2.2.2 Python 29 2.3 在 Linux 上安装 Storm 29 2.3.1 安装基础操作系统 30 2.3.2 安装 Java 30 2.3.3 安装 ZooKeeper 30 2.3.4 安装 Storm 30 2.3.5 运行 Storm 守护进程 31 2.3.6 配置 Storm 33 2.3.7 必需的配置项 34 2.3.8 可选配置项 35 2.3.9 Storm 可执行程序 36 2.3.10 在工作站上安装 Storm     可执行程序 36 2.3.11 守护进程命令 37 2.3.12 管理命令 37 2.3.13 本地调试 / 开发命令 39 2.4 把 toplogy 提交到集群中 40 2.5 自动化集群配置 42 2.6 Puppet 的快速入门 43 2.6.1 Puppet manifest 文件 43 2.6.2 Puppet 类和模块 44 2.6.3 Puppet 模板 45 2.6.4 使用 Puppet Hiera 来    管理环境 46 2.6.5 介绍 Hiera 46 总结 48 第 3 章 Trident 和传感器数据 49 3.1 使用场景 50 3.2 Trident topology 50 3.3 Trident spout 52 3.4 Trident 运算 57 3.4.1 Trident filter 58 3.4.2 Trident function 59 3.5 Trident 聚合器 63 3.5.1 CombinerAggregator 63 3.5.2 ReducerAggregator 63 3.5.3 Aggregator 64 3.6 Trident 状态 65 3.6.1 重复事务型状态 69 3.6.2 不透明型状态 70 3.7 执行 topology 72 总结 73 第 4 章 实时趋势分析 74 4.1 应用场景 75 4.2 体系结构 75 4.2.1 数据源应用程序 75 4.2.2 logback Kafka appender 76 4.2.3 Apache Kafka 76 4.2.4 Kafka spout 76 4.2.5 XMPP 服务器 76 4.3 安装需要的软件 77 XIV 4.3.1 安装 Kafka 77 4.3.2 安装 OpenFire 78 4.4 示例程序 78 4.5 日志分析 topology 84 4.5.1 Kafka spout 84 4.5.2 JSON project function 85 4.5.3 计算移动平均值 86 4.5.4 添加一个滑动窗口 87 4.5.5 实现滑动平均 function 91 4.5.6 按照阈值进行过滤 92 4.5.7 通过 XMPP 发送通知 94 4.6 最终的 topology 96 4.7 运行日志分析 topology 98 总结 99 第 5 章 实时图形分析 100 5.1 使用场景 101 5.2 体系结构 102 5.2.1 Twitter 客户端 102 5.2.2 Kafka spout 102 5.2.3 Titan 分布式图形数据库 103 5.3 图形数据库简介 103 5.3.1 访问图——TinkerPop 栈 104 5.3.2 使用 Blueprints API    操作图形 105 5.3.3 通过 Gremlin shell    操作图形 106 5.4 软件安装 107 5.5 使用 Cassandra 存储后端    设置 Titan 109 5.5.1 安装 Cassandra 109 5.5.2 使用 Cassandra 后端启动    Titan 109 5.6 图数据模型 110 5.7 连接 Twitter 数据流 111 5.7.1 安装 Twitter4J 客户端 112 5.7.2 OAuth 配置 112 5.7.3 TwitterStreamConsumer 类 112 5.7.4 TwitterStatusListener 类 113 5.8 Twitter graph topology 115 5.9 实现 GraphState 116 5.9.1 GraphFactory 117 5.9.2 GraphTupleProcessor 117 5.9.3 GraphStateFactory 117 5.9.4 GraphState 118 5.9.5 GraphUpdater 119 5.10 实现 GraphFactory 119 5.11 实现 GraphTupleProcessor 120 5.12 组合成 TwitterGraph    Topology 类 121 5.13 使用 Gremlin 查询图 122 总结 123 第 6 章 人工智能 124 6.1 为应用场景进行设计 125 6.2 确立体系结构 128 XV 6.2.1 审视设计中的挑战 128 6.2.2 实现递归 128 6.2.3 解决这些挑战 132 6.3 实现体系结构 133 6.3.1 数据模型 133 6.3.2 检视 Recursive Topology 136 6.3.3 队列交互 138 6.3.4 function 和 filter 140 6.3.5 研究 Scoring Topology 141 6.3.6 分布式远程命令    调用(DRPC) 146 总结 152 第 7 章 整合 Druid 进行金融分析 153 7.1 使用场景 154 7.2 集成一个非事务系统 155 7.3 topology 158 7.3.1 spout 159 7.3.2 filter 161 7.3.3 状态设计 162 7.4 实现体系结构 165 7.4.1 DruidState 166 7.4.2 实现 StormFirehose 对象 169 7.4.3 在 ZooKeeper 中实现    分片状态 174 7.5 执行实现的程序 175 7.6 检视分析过程 176 总结 179 第 8 章 自然语言处理 180 8.1 Motivating Lambda 结构 181 8.2 研究使用场景 183 8.3 实现 Lambda architecture 184 8.4 为应用场景设计 topology 185 8.5 设计的实现 186 8.5.1 TwitterSpout/TweetEmitter 187 8.5.2 function 188 8.6 检视分析逻辑 191 8.7 Hadoop 196 8.7.1 MapReduce 概览 196 8.7.2 Druid 安装 197 总结 204 第 9 章 在 Hadoop 上部署 Storm     进行广告分析 205 9.1 应用场景 205 9.2 确定体系结构 206 9.2.1 HDFS 简介 208 9.2.2 YARN 简介 208 9.3 配置基础设施 211 9.3.1 Hadoop 基础设施 211 9.3.2 配置 HDFS 212 9.4 部署分析程序 217 9.4.1 以 Pig 为基础执行批    处理分析 217 9.4.2 在 Storm-YARN 基础上    执行实时分析 218 XVI 9.5 执行分析 223 9.5.1 执行批处理分析 223 9.5.2 执行实时分析 224 9.6 部署 topology 229 9.7 执行 toplogy 229 总结 230 第 10 章 云环境下的 Storm 231 10.1 Amazon Elastic Compute    Cloud 简介 232 10.1.1 建立 AWS 帐号 232 10.1.2 AWS 管理终端 232 10.1.3 手工启动一个 EC2 实例 234 10.2 Apache Whirr 简介 236 10.3 使用 Whirr 配置 Storm 集群 237 10.4 Whirr Storm 简介 239 10.5 Vagrant 简介 243 10.5.1 安装 Vagrant 243 10.5.2 创建第一个虚拟机 244 10.6 生成 Storm 安装准备脚本 247 10.6.1 ZooKeeper 247 10.6.2 Storm 248 10.6.3 Supervisord 249 总结 252 第 1 章 分布式单词计数 本章将介绍使用 Storm 建立一个分布式流式计算应用时涉及的核心概念。我们通过建 立一个简单的计数器程序实现这个目的。计数器将持续输入的一句句话作为输入流,统计 其中单词出现的次数。单词计数这个例子浅显易懂,引入了多种数据结构、技术和设计模 式。这些都是实现更复杂计算所必须的基础。 本章首先概要介绍 Storm 的数据结构,然后实现一个完整 Storm 程序所需的各个组成 部分。读完本章,读者将会了解 Storm 计算的基本结构、搭建开发环境的方法、Storm 程 序的开发和调试技术。 本章包括以下主题: ● Strom topology 的基本组成部分——stream、spout 和 bolt。 ● 搭建 Storm 开发环境。 ● 实现单词计数程序。 ● 并发和容错机制。 ● 并发计算任务以实现扩容。 1.1 Storm topology 的组成部分——stream、spout 和 bolt Storm 分布式计算结构称为 topology(拓扑),由 stream(数据流),spout(数据流的生 Chapter 1 2   Storm 分布式实时计算模式 成 者 ), bolt(运算)组成,如图 1-1 所示。Storm topology 大致等同与 Hadoop 这类批处理 运算中的 job。然而,批处理运算中的 job 对运算的起始和终止有着明确定义,Storm topology 会一直运行下去,除非进程被杀死 或被取消部署。 1.1.1 stream Storm 的核心数据结构是tuple。tuple 是包含了一个或者多个键值对的列表, Stream 是由无限制的tuple 组成的序列。 如果你对复杂事务处理(Complex Event Processing,CEP)比较熟悉,tuple 就相当于 CEP 中的 event。 1.1.2 spout spout 代表了一个 Storm topology 的主要数据入口,充当采集器的角色,连接到数据 源,将数据转化为一个个 tuple,并将 tuple 作为数据流进行发射。 你会发现 Storm 为实现 spout 提供了非常简单的 API。开发一个 spout 的主要工作就 是编写代码从数据源或者 API 消费数据。数据源可能包括以下几种: ● Web 或者移动程序的点击流 ● Twitter 或其他社交网络的消息 ● 传感器的输出 ● 应用程序的日志事件 因为 spout 通常不会用来实现业务逻辑,所以在多个 topology 中常常可以复用。 1.1.3 bolt bolt 可以理解为计算程序中的运算或者函数,将一个或者多个数据流作为输入,对数 据实施运算后,选择性地输出一个或者多个数据流。bolt 可以订阅多个由 spout 或者其他 bolt 发射的数据流,这样就可以建立复杂的数据流转换网络。 像 Spout API 一样,bolt 可以执行各式各样的处理功能,bolt 的编程接口简单明了, 图 1-1 TQPVU CPMU CPMU CPMU CPMU TQPVU ஝૶ູ ஝૶ູ 第 1 章 分布式单词计数   3 bolt 可以执行的典型功能包括: ● 过滤 tuple ● 连接(join)和聚合操作(aggregation) ● 计算 ● 数据库读写 1.2 单词计数 topology 的数据流 如图 1-2 所示,单词计数 topology 由一个 spout 和下游的三个 bolt 组成。 ឦԲ 4QPVU ឦԲѬҟ #PMU ӭឈᝠ஝ #PMU ʽઑ#PMU 图 1-2 1.2.1 语句生成 spout SentenceSpout 类的功能很简单,向后端发射一个单值 tuple 组成的数据流,键名是 “sentence”,键值是字符串格式存储的一句话。如下所示: 为了简化起见,我们的数据源是一个静态语句的列表。spout 会一直循环将每句话作 为 tuple 发射。实际应用中,spout 通常会连接到动态数据源上,比如通过 Twitter 的 API 获取推特消息。 1.2.2 语句分割 bolt 语句分割 bolt(SplitSentenceBolt)类会订阅 sentence spout 发射的 tuple 流。每当收到 一个 tuple,bolt 会获取“ sentence”对应值域的语句,然后将语句分割为一个个的单词。 每个单词向后发射一个 tuple: 4   Storm 分布式实时计算模式 1.2.3 单词计数 bolt 单词计数 bolt(WordCountBolt)订阅 SplitSentenceBolt 类的输出,保存每个特定单词 出现的次数。每当 bolt 接收到一个 tuple,会将对应单词的计数加一,并且向后发送该单 词当前的计数。 1.2.4 上报 bolt 上报 bolt 订阅 WordCountBolt 类的输出,像 WordCountBolt 一样,维护一份所有单词 对应的计数的表。当接收到一个 tuple 时,上报 bolt 会更新表中的计数数据,并且将值在 终端打印。 1.3 实现单词计数 topology 前面介绍了 Storm 的基础概念,我们已经准备好实现一个简单的应用。现在开始着手 开发一个 Storm topology,并且在本地模式执行。Storm 本地模式会在一个 JVM 实例中模 拟出一个 Storm 集群。大大简化了用户在开发环境或者 IDE 中进行开发和调试。后续章节 将会演示如何将本地模式下开发好的 topology 部署到真实的 Storm 集群环境。 1.3.1 配置开发环境 新建一个 Storm 项目其实就是将 Storm 及其依赖的类库添加到 Java classpath 中。在第 2 章中,你将了解到,将 Storm topology 发布到集群环境中,需要将编译好的类和相关依 赖打包在一起。基于这个原因,我们强烈建议使用构建管理工具来管理项目,比如 Apache Maven、Gradle 或者 Leinengen。在单词计数这个例子中,我们使用 Maven。 首先,建立一个 Maven 项目: 然后,编辑配置文件 pom.xml,添加 Storm 依赖 第 1 章 分布式单词计数   5 之后,通过执行下述命令编译项目,来测试配置 Maven 是否正确。 下载示例代码 如果您是通过 packet 账户购买的书籍,可以在 http://www.packtpub.com 下载到 所有已购买书籍的示例代码。如果通过其他途径购买,可以在访问 http://www. packtpub.com/support 并注册。代码会通过邮件发送给您。 Maven 会下载 Storm 类库和所有依赖。项目建好后,可以着手开发我们的第一个 Storm 应用了。 1.3.2 实现 SentenceSpout 为简化起见,SentenceSpout 的实现通过重复静态语句列表来模拟数据源。每句话作 为一个单值的 tuple 向后循环发射。完整实现如例 1.1 所示。 例 1.1 SentenceSpout.java 注意 6   Storm 分布式实时计算模式 BaseRichSpout 类是 ISpout 接口和 IComponent 接口的一个简便的实现。接口对本例 中用不到的方法提供了默认实现。使用这个类,我们可以专注在所需要的方法上。方法 declareOutputFields() 是在IComponent 接口中定义的,所有Storm 的组件(spout 和 bolt) 都必须实现这个接口。Storm 的组件通过这个方法告诉 Storm 该组件会发射哪些数据流, 每个数据流的 tuple 中包含哪些字段。本例中,我们声明了 spout 会发射一个数据流,其中 的 tuple 包含一个字段(sentence) Open() 方法在 ISpout 接口中定义,所有 Spout 组件在初始化时调用这个方法。Open() 方法接收三个参数,一个包含了Storm 配置信息的map,TopologyContext 对象提供了 topology 中组件的信息,SpoutOutputCollector 对象提供了发射 tuple 的方法。本例中,初 始化时不需要做额外操作,因此 open() 方法实现仅仅是简单将 SpoutOutputCollector 对象 的引用保存在变量中。 nextTuple() 方法是所有 spout 实现的核心所在,Storm 通过调用这个方法向输出的 collector 发射 tuple。这个例子中,我们发射当前索引对应的语句,并且递增索引指向下一 个语句。 1.3.3 实现语句分割 bolt 例 1.2 列出了 SplitSentenceBolt 类的实现。 例 1.2 SplitSentenceBolt.java 第 1 章 分布式单词计数   7 BaseRichBolt 类是 IComponent 和 IBolt 接口的一个简便实现。继承这个类,就不用去 实现本例不关心的方法,将注意力放在实现我们需要的功能上。 prepare() 方法在 IBolt 中定义,类同与 ISpout 接口中定义的 open() 方法。这个方法 在 bolt 初始化时调用,可以用来准备 bolt 用到的资源,如数据库连接。和 SentenceSpout 类一样,SplitSentenceBolt 类在初始化时没有额外操作,因此prepare() 方法仅仅保存 OutputCollector 对象的引用。 在 declareOutputFields() 方法中,SplitSentenceBolt 声明了一个输出流,每个 tuple 包 含一个字段“word”。 SplitSentenceBolt 类的核心功能在 execute() 方法中实现,这个方法是 IBolt 接口定义 的。每当从订阅的数据流中接收一个 tuple,都会调用这个方法。本例中,execute() 方法按 照字符串读取“ sentence”字段的值,然后将其拆分为单词,每个单词向后面的输出流发 射一个 tuple。 1.3.4 实现单词计数 bolt WordCountBolt 类(见 例 1.3)是topology 中实际进行单词计数的组件。该bolt 的 prepare() 方法中,实例化了一个 HashMap 的实例,用来存储单词和对应的 计数。大部分实例变量通常是在 prepare() 方法中进行实例化,这个设计模式是由 topology 的部署方式决定的。当 topology 发布时,所有的 bolt 和 spout 组件首先会进行序列化,然 后通过网络发送到集群中。如果 spout 或者 bolt 在序列化之前(比如说在构造函数中生成) 实例化了任何无法序列化的实例变量,在进行序列化时会抛出 NotSerializableException 异 常,topology 就会部署失败。本例中,因为 HashMap 是可序列化的,所以在 8   Storm 分布式实时计算模式 构造函数中进行实例化也是安全的。但是,通常情况下最好是在构造函数中对基本数据类型 和可序列化的对象进行赋值和实例化,在 prepare() 方法中对不可序列化的对象进行实例化。 在 declareOutputFields() 方法中,类 WordCountBolt 声明了一个输出流,其中的 tuple 包括了单词和对应的计数。execute() 方法中,当接收到一个单词时,首先查找这个单词对 应的计数(如果单词没有出现过则计数初始化为 0),递增并存储计数,然后将单词和最新 计数作为 tuple 向后发射。将单词计数作为数据流发射,topology 中的其他 bolt 就可以订 阅这个数据流进行进一步的处理。 例 1.3 WordCountBolt.java 1.3.5 实现上报 bolt ReportBolt 类的作用是对所有单词的计数生成一份报告。和WordCountBolt 类似, ReportBolt 使用一个 HashMap 对象来保存单词和对应计数。本例中,它的功 能是简单的存储接收到计数 bolt 发射出的计数 tuple。 上报 bolt 和上述其他 bolt 的一个区别是,它是一个位于数据流末端的 bolt,只接收 第 1 章 分布式单词计数   9 tuple。因为它不发射任何数据流,所以 declareOutputFields() 方法是空的。 上报 bolt 中初次引入了 cleanup() 方法,这个方法在 IBolt 接口中定义。Storm 在终止 一个 bolt 之前会调用这个方法。本例中我们利用 cleanup() 方法在 topology 关闭时输出最 终的计数结果。通常情况下,cleanup() 方法用来释放 bolt 占用的资源,如打开的文件句柄 或者数据库连接。 开发 bolt 时需要谨记的是,当 topology 在 Storm 集群上运行时,IBolt.cleanup() 方法 是不可靠的,不能保证会执行。下一章讲到 Storm 的容错机制时,会讨论其中的原因。但 这个例子我们是运行在开发模式中的,可以保证 cleanup() 被调用。 类 ReportBolt 的完整代码见示例 1.4。 例 1.4 ReportBolt.java 10   Storm 分布式实时计算模式 1.3.6 实现单词计数 topology 我们已经定义了计算所需要的spout 和 bolt。下面将它们整合为一个可运行的 topology(见 例 1.5) 例 1.5 WordCountTopology.java Storm topology 通常由Java 的 main() 函数进行定义,运行或者提交(部署到集群 的操作)。在本例中,我们首先定义了一系列字符串常量,作为 Storm 组件的唯一标 第 1 章 分布式单词计数   11 识 符。main() 方法中,首先实例化了spout 和 bolt,并生成一个TopologyBuilder 实 例。 TopologyBuilder 类提供了流式接口风格的 API 来定义 topology 组件之间的数据流。首先 注册一个 sentence spout 并且赋值给其唯一的 ID: 然后注册一个 SplitSentenceBolt,这个 bolt 订阅 SentenceSpout 发射出来的数据流: 类 TopologyBuilder 的 setBolt() 方法会注册一个bolt,并且返回BoltDeclarer 的实 例,可以定义bolt 的数据源。这个例子中,我们将SentenceSpout 的唯一ID 赋值给 shuffleGrouping() 方法确立了这种订阅关系。shuffleGrouping() 方法告诉Storm,要将 类 SentenceSpout 发射的 tuple 随机均匀的分发给 SplitSentenceBolt 的实例。后续在讨论 Storm 的并发性时,会解释数据流分组的详情。代码下一行确立了类 SplitSentenceBolt 和 类 theWordCountBolt 之间的连接关系: 你将了解到,有时候需要将含有特定数据的 tuple 路由到特殊的 bolt 实例中。在此我 们使用类 BoltDeclarer 的 fieldsGrouping() 方法来保证所有“ word”字段值相同的 tuple 会 被路由到同一个 WordCountBolt 实例中。 定义数据流的最后一步是将 WordCountBolt 实例发射出的 tuple 流路由到类 ReportBolt 上。本例中,我们希望 WordCountBolt 发射的所有 tuple 路由到唯一的 ReportBolt 任务中。 globalGrouping() 方法提供了这种用法: 所有的数据流都已经定义好,运行单词计数计算的最后一步是编译并提交到集群上: 12   Storm 分布式实时计算模式 这里我们采用了 Storm 的本地模式,使用 Storm 的 LocalCluster 类在本地开发环境来 模拟一个完整的 Storm 集群。本地模式是开发和测试的简便方式,省去了在分布式集群中 反复部署的开销。本地模式还能够很方便地在 IDE 中执行 Storm topology,设置断点,暂 停运行,观察变量,分析程序性能。当 topology 发布到分布式集群后,这些事情会很耗时 甚至难以做到。 Storm 的 Config 类是一个 HashMap 的子类,并定义了一些 Storm 特有 的常量和简便的方法,用来配置 topology 运行时行为。当一个 topology 提交时,Storm 会 将默认配置和 Config 实例中的配置合并后作为参数传递给 submitTopology() 方法。合并后 的配置被分发给各个 spout 的 bolt 的 open()、prepare() 方法。从这个层面上讲,Config 对象 代表了对 topology 所有组件全局生效的配置参数集合。现在可以运行 WordCountTopology 类了,main() 方法会提交 topology,在执行 10 秒后,停止(卸载)该 topology,最后关闭 本地模式的集群。程序执行完毕后,在控制台可以看到类似以下的输出: 1.4 Storm 的并发机制 在 Storm 的间接中提到过,Storm 计算支持在多台机器上水平扩容,通过将计算切分 为多个独立的 tasks 在集群上并发执行来实现。在 Storm 中,一个 task 可以简单地理解为 在集群某节点上运行的一个 spout 或者 bolt 实例。 第 1 章 分布式单词计数   13 为了理解 storm 的并发机制是如何运行的,我们先来解释下在集群中运行的 topology 的四个主要组成部分: ● Nodes(服务器):指配置在一个 Storm 集群中的服务器,会执行 topology 的一部分 运算。一个 Storm 集群可以包括一个或者多个工作 node。 ● Workers(JVM 虚拟机):指一个 node 上相互独立运行的 JVM 进程。每个 node 可 以配置运行一个或者多个 worker。一个 topology 会分配到一个或者多个 worker 上 运行。 ● Executeor(线程):指一个 worker 的 jvm 进程中运行的 Java 线程。多个 task 可以 指派给同一个 executer 来执行。除非是明确指定,Storm 默认会给每个 executor 分 配一个 task。 ● Task(bolt/spout 实 例 ): task 是 spout 和 bolt 的实例,它们的nextTuple() 和 execute() 方法会被 executors 线程调用执行。 1.4.1 WordCountTopology 的并发机制 到目前为止,在单词计数的示例中没有明确使用任何 Storm 中并发机制的 API,而 是让 Storm 使用默认配置。在大多数情况下,除非明确指定,Strom 的默认并发设置默认 是 1。 在我们修改 topology 的并发度之前,先来看默认配置下 topology 是如何执行的。假设 我们有一台服务器(node), 为 topology 分配了一个 worker,并且每个 executer 执行一个 task。我们的 topology 执行过程如图 1-3 所示: /PEF 8PSLFS +7. &YFDVUPS 5ISFBE 5BTL 4FOUFODF 4QPVU 5BTL 4QMJU 4FOUFODF #PMU 5BTL 8PSE $PVOU#PMU 5BTL 3FQPSU #PMU &YFDVUPS 5ISFBE &YFDVUPS 5ISFBE &YFDVUPS 5ISFBE 图 1-3 14   Storm 分布式实时计算模式 正如在图中看到的,唯一的并发机制出现在线程级。每个任务在同一个 JVM 的不 同线程中执行。如何增加并发度以充分利用硬件能力?让我们来增加分配给 topology 的 worker 和 executer 的数量。 1.4.2 给 topology 增加 worker 增加额外的 worker 是增加 topology 计算能力的简单方法。为此 Storm 提供了 API 和 修改配置项两种修改方法。无论采取哪种方法,spout 和 bolt 组件都不需要做变更,可以 直接复用。 在单词计数topology 前面的版本中,我们引入了Config 对象在发布时传递参数 给 submitTopology() 方法,但是没有做更多配置操作。为了增加分配给一个 topology 的 worker 数量,只需要简单的调用一下 Config 对象的 setNumWorkers() 方法: 这样就给 topology 分配了两个 worker 而不是默认的一个。从而增加了 topology 的计 算资源,也更有效的利用了计算资源。我们还可以调整 topology 中的 executor 个数以及每 个 executor 分配的 task 数量。 1.4.3 配置 executor 和 task 我们已经知道,Storm 给 topology 中定义的每个组件建立一个 task,默认的情况下, 每个 task 分配一个 executor。Storm 的并发机制 API 对此提供了控制方法,允许设定每个 task 对应的 executor 个数和每个 executor 可执行的 task 的个数。 在定义数据流分组时,可以设置给一个组件指派的 executor 的数量。为了说明这个功 能,修改 topology 的定义代码,设置 SentenceSpout 并发为两个 task,每个 task 指派各自 的 executor 线程。 如果只使用一个 worker,topology 的执行如图 1-4 所示。 下一步,我们给语句分割 bolt SplitSentenceBolt 设置 4 个 task 和 2 个 executor。每个 executor 线程指派 2 个 task 来执行(4/2=2)。还将配置单词计数 bolt 运行四个 task,每个 task 由一个 executor 线程执行: 第 1 章 分布式单词计数   15 /PEF 8PSLFS +7. &YFDVUPS 5ISFBE 5BTL 4FOUFODF 4QPVU 5BTL 4FOUFODF 4QPVU 5BTL 4QMJU 4FOUFODF #PMU 5BTL 8PSE $PVOU#PMU 5BTL 3FQPSU #PMU &YFDVUPS 5ISFBE &YFDVUPS 5ISFBE &YFDVUPS 5ISFBE &YFDVUPS 5ISFBE 图 1-4 在 2 个 worker 的情况下,topology 执行如图 1-5 所示。 增加了 topology 并发后,运行更新过的 WordCountTopology 类,每个单词的计数比原 topology 要多: 16   Storm 分布式实时计算模式 /PEF 8PSLFS +7. &YFDVUPS 5ISFBE 5BTL 4FOUFODF 4QPVU 5BTL 4QMJU 4FOUFODF #PMU 5BTL 8PSE $PVOU#PMU 5BTL 3FQPSU #PMU 5BTL 8PSE $PVOU#PMU 5BTL 4QMJU 4FOUFODF #PMU &YFDVUPS 5ISFBE &YFDVUPS 5ISFBE &YFDVUPS 5ISFBE &YFDVUPS 5ISFBE 8PSLFS +7. &YFDVUPS 5ISFBE 5BTL 4FOUFODF 4QPVU 5BTL 4QMJU 4FOUFODF #PMU 5BTL 8PSE $PVOU#PMU 5BTL 8PSE $PVOU#PMU 5BTL 4QMJU 4FOUFODF #PMU &YFDVUPS 5ISFBE &YFDVUPS 5ISFBE &YFDVUPS 5ISFBE 图 1-5 spout 在 topology 关闭之前会一直发射数据,单词的计数值取决于计算机的速度,是 否有其他程序在运行。总量上看发射和处理的单词增多了。 第 1 章 分布式单词计数   17 要重点指出的是,当 topology 执行在本地模式时,增加 worker 的数量不会达到提 高速度的效果。因为 topology 在本地模式下是在同一个 JVM 进程中执行的,所以只有 增加 task 和 executor 的并发度配置才会生效。Storm 的本地模式提供了接近集群模式的 模拟,对开发是否有帮助。但程序在投入生产环境之前,必须在真实的集群环境下进行 测试。 1.5 理解数据流分组 看了前面的例子,你会纳闷为什么没有增加 ReportBolt 的并发度。答案是,这样做没 有任何意义。为了理解其中的原因,需要了解 Storm 中数据流分组的概念。 数据流分组定义了一个数据流中的 tuple 如何分发给 topology 中不同 bolt 的 task。举 例说明,在并发版本的单词计数 topology 中,SplitSentenceBolt 类指派了四个 task。数据 流分组决定了指定的一个 tuple 会分发到哪个 task 上。 Storm 定义了七种内置数据流分组的方式: ● Shuffle grouping(随机分组):这种方式会随机分发 tuple 给 bolt 的各个 task,每个 bolt 实例接收到的相同数量的 tuple。 ● Fields grouping(按字段分组):根据指定字段的值进行分组。比如说,一个数据流 根据“word”字段进行分组,所有具有相同“word”字段值的 tuple 会路由到同一 个 bolt 的 task 中。 ● All grouping(全复制分组):将所有的 tuple 复制后分发给所有 bolt task。每个订阅 数据流的 task 都会接收到 tuple 的拷贝。 ● Globle grouping(全局分组):这种分组方式将所有的 tuples 路由到唯一一个 task 上。 Storm 按照最小的 task ID 来选取接收数据的 task。注意,当使用全局分组方式时, 设置 bolt 的 task 并发度是没有意义的,因为所有 tuple 都转发到同一个 task 上了。 使用全局分组的时候需要注意,因为所有的 tuple 都转发到一个 JVM 实例上,可 能会引起 Storm 集群中某个 JVM 或者服务器出现性能瓶颈或崩溃。 ● None grouping(不分组):在功能上和随机分组相同,是为将来预留的。 ● Direct grouping(指向型分组):数据源会调用 emitDirect() 方法来判断一个 tuple 应 该由哪个 Storm 组件来接收。只能在声明了是指向型的数据流上使用。 18   Storm 分布式实时计算模式 ● Local or shuffle grouping(本地或随机分组):和随机分组类似,但是,会将 tuple 分 发给同一个 worker 内的 bolt task(如 果 worker 内有接收数据的 bolt task)。其他情 况下,采用随机分组的方式。取决于 topology 的并发度,本地或随机分组可以减 少网络传输,从而提高 topology 性能。 除了预定义好的分组方式之外,还可以通过实现 CustomStreamGrouping(自定义分组) 接口来自定义分组方式: prepare() 方法在运行时调用,用来初始化分组信息,分组的具体实现会使用这些信息 决定如何向接收 task 分发 tuple。WorkerTopologyContext 对象提供了 topology 的上下文信息, GlobalStreamId 提供了待分组数据流的属性。最有用的参数是 targetTasks,是分组所有待选 task 的标识符列表。通常,会将 targetTasks 的引用存在变量里作为 chooseTasks() 的参数。 chooseTasks() 方法返回一个 tuple 发送目标 task 的标识符列表。它的两个参数是发送 tuple 的组件的 id 和 tuple 的值。 为了说明数据流分组的重要性,我们在topology 中引入一个bug。首先,修改 SentenceSpout 的 nextTuple() 方法,使每个句子只发送一次: 程序的输出是这样的: 第 1 章 分布式单词计数   19 然后将 CountBolt 中按字段分组方式修改为随机分组方式: 运行程序的结果是这样的: 结果是错误的,因为 CountBolt 的参数是和状态相关的:它会对收到的每个单词进行 计数。这个例子中,在并发状况下,计算的准确度取决于是否按照 tuple 的内容进行适当 的分组。我们引入的 bug 只会在 CountBolt 并发实例超过一个时出现。这也是我们为什么 一再强调,要在不同的并发度配置下测试 topology。 通常,需要避免将信息存在 bolt 中,因为 bolt 执行异常或者重新指派时,数据会 丢失。一种解决方法是定期对存储的信息快照并放在持久性存储中,比如数据库。 这样,如果 task 被重新指派就可以恢复数据。 小技巧 20   Storm 分布式实时计算模式 1.6 有保障机制的数据处理 Storm 提供了一种 API 能够保证 spout 发送出来的每个 tuple 都能够执行完整的处理 过程。在我们上面的例子中,不担心执行失败的情况。可以看到在一个 topology 中一个 spout 的数据流会被分割生成任意多的数据流,取决于下游 bolt 的行为。如果发生了执行 失败会怎样?举个例子,考虑一个负责将数据持久化到数据库的 bolt。怎样处理数据库更 新失败的情况? 1.6.1 spout 的可靠性 在 Storm 中,可靠的消息处理机制是从 spout 开始的。一个提供了可靠的处理机制的 spout 需要记录它发射出去的 tuple,当下游 bolt 处理tuple 或者子tuple 失败时spout 能 够重新发射。子 tuple 可以理解为 bolt 处理 spout 发射的原始 tuple 后,作为结果发射出 去的tuple。另外一个视角来看,可以将 spout 发射的数据流看作一个 tuple 树的主干 (如 图 1-6 所 示 )。 在图中,实线部分表示从 spout 发射的原始主干 tuple,虚线部分表示的子 tuple 都 是源自于原始 tuple。这样产生的图形叫做 tuple 树。在有保障数据的处理过程中,bolt 每收到一个 tuple,都需要向上游确认应答(ack)者报错。对主干 tuple 中的一个 tuple, 如果 tuple 树上的每个 bolt 进行了确认应答,spout 会调用 ack 方法来标明这条消息已经 完全处理了。如果树中任何一个 bolt 处理 tuple 报错,或者处理超时,spout 会调用 fail 方法。 Storm 的 ISpout 接口定义了三个可靠性相关的 API:nextTuple,ack 和 fail。 TQPVU CPMU CPMU CPMU CPMU CPMU CPMU CPMU CPMU CPMU 图 1-6 第 1 章 分布式单词计数   21 前面讲过,Storm 通过调用 Spout 的 nextTuple() 发送一个 tuple。为实现可靠的消息处 理,首先要给每个发出的 tuple 带上唯一的 ID, 并且将 ID 作为参数传递给 SpoutOutputCollector 的 emit() 方法: 给 tuple 指定 ID 告诉 Storm 系统,无论执行成功还是失败,spout 都要接收 tuple 树上 所有节点返回的通知。如果处理成功,spout 的 ack() 方法将会对编号是 ID 的消息应答确 认,如果执行失败或者超时,会调用 fail() 方法。 1.6.2 bolt 的可靠性 bolt 要实现可靠的消息处理机制包含两个步骤: 1. 当发射衍生的 tuple 时,需要锚定读入的 tuple 2. 当处理消息成功或者失败时分别确认应答或者报错 锚定一个 tuple 的意思是,建立读入 tuple 和衍生出的 tuple 之间的对应关系,这样下 游的 bolt 就可以通过应答确认、报错或超时来加入到 tuple 树结构中。 可以通过调用 OutputCollector 中 emit() 的一个重载函数锚定一个或者一组 tuple: 这里,我们将读入的 tuple 和发射的新 tuple 锚定起来,下游的 bolt 就需要对输出的 tuple 进行确认应答或者报错。另外一个 emit() 方法会发射非锚定的 tuple: 非锚定的 tuple 不会对数据流的可靠性起作用。如果一个非锚定的 tuple 在下游处理失 败,原始的根 tuple 不会重新发送。 当处理完成或者发送了新 tuple 之后,可靠数据流中的 bolt 需要应答读入的 tuple: 如果处理失败,这样的话 spout 必须发射 tuple,bolt 就要明确地对处理失败的 tuple 报错: 如果因为超时的原因,或者显式调用 OutputCollector.fail() 方法,spout 都会重新发送 原始 tuple。后面很快有例子。 22   Storm 分布式实时计算模式 1.6.3 可靠的单词计数 为了进一步说明可控性,让我们增强 SentenceSpout 类,支持可靠的 tuple 发射方式。 需要记录所有发送的 tuple,并且分配一个唯一的 ID。我们使用 HashMap 来存储已发送待确认的 tuple。每当发送一个新的 tuple,分配一个唯一的标识符并且存储 在我们的 hashmap 中。当收到一个确认消息,从待确认列表中删除该 tuple。如果收到报 错,从新发送 tuple: 第 1 章 分布式单词计数   23 为支持有保障的处理,需要修改 bolt,将输出的 tuple 和输入的 tuple 锚定,并且应答 确认输入的 tuple: 总结 本章中,在没有安装和搭建 Storm 集群的情况下,我们使用 Storm 的核心 API 建立了 一个简单的分布式计算程序,覆盖了 Storm 特性集的大部分内容。Storm 的本地模式非常 强大,简化了开发,提高了开发效率。但要感受到 Storm 真正的威力和水平扩展性,还是 需要将程序部署在真实的集群上。 下一章,我们会讲如何安装和搭建 Storm 集群环境,以及如何将 topology 部署到到分 布式环境中。 第 2 章 配置 Storm 集群 在本章中你将深入理解 Storm 的技术栈,它的软件依赖,以及搭建和部署 Storm 集 群的过程。我们首先会在伪分布式模式下安装 Storm,所有的组件都安装在同一台机器 上,而不是在多台机器上。一旦你了解了安装和配置 Storm 的基本步骤,我们就可以通 过 Puppet 这个工具进行自动化的安装,这样的话部署多节点的集群可以节省大量的时间 和精力。 本章包括以下内容: ● 组成 Storm 集群的不同组件和服务 ● Storm 的技术栈 ● 在 Linux 上安装和配置 Storm ● Storm 的配置参数 ● Storm 的命令行接口 ● 使用服务提供工具 Puppet 进行自动安装 2.1 Storm 集群的框架 Storm 集群遵循主 / 从(master/slave)结构,和 Hadoop 等分布式计算技术类似,语义 上稍有不同。主 / 从结构中,通常有一个配置中静态指定或运行时动态选举出的主节点。 Chapter 2 第 2 章 配置 Storm 集群   25 Storm 使用前一种实现方式。主 / 从结构中因为引入了单点故障的风险而被诟病,我们会 解释 Storm 的主节点是半容错的。 Storm 集群由一个主节点(称为 nimbus)和一个或者多个工作节点(称为 supervisor) 组成。在nimbus 和 supervisor 节点之外,Storm 还需要一个Apache ZooKeeper 的实例, ZooKeeper 实例本身可以由一个或者多个节点组成。如图 2-1 所示。 /JNCVT ;PP,FFQFS 4VQFSWJTPS 4VQFSWJTPS 4VQFSWJTPS 4VQFSWJTPS 图 2-1 nimbus 和 supervisor 都是 Storm 提供的后台守护进程,可以共存在同一台机器上。实 际上,可以建立一个单节点伪集群,把 nimbus、supervisor 和 ZooKeeper 进程都运行在同 一台机器上。 2.1.1 理解 nimbus 守护进程 nimbus 守护进程的主要职责是管理,协调和监控在集群上运行的 topology。包括 topology 的发布,任务指派,事件处理失败时重新指派任务。 将 topology 发布到Strom 集群,将预先打包成jar 文件的topology 和配置信息提 交(submitting)到 nimbus 服务器上。一旦 nimbus 接收到了 topology 的压缩包,会将 jar 包分发到足够数量的 supervisor 节点上。当 supervisor 节点接收到了 topology 压缩文件, nimbus 就会指派 task(bolt 和 spout 实例)到每个 supervisor 并且发送信号指示 supervisoer 生成足够的 worker 来执行指派的 task。 nimbus 记录所有 supervisor 节点的状态和分配给它们的 task。如果 nimbus 发现某个 supervisor 没有上报心跳或者已经不可达了,它会将故障 supervisor 分配的 task 重新分配 到集群中的其他 supervisor 节点。 前面提到过,严格意义上讲 nimbus 不会引起单点故障。这个特性是因为 nimubs 并 26   Storm 分布式实时计算模式 不参与 topology 的数据处理过程,它仅仅是管理 topology 的初始化,任务分发和进行监 控。实际上,如果 nimbus 守护进程在 topology 运行时停止了,只要分配的 supervisor 和 worker 健康运行,topology 一直继续数据处理。要注意的是,在 nimbus 已经停止的情况 下 supervisor 异常终止,因为没有 nimbus 守护进程来重新指派失败这个终止的 supervisor 的任务,数据处理就会失败。 2.1.2 supervisor 守护进程的工作方式 supervisor 守护进程等待 nimbus 分配任务后生成并监控 workers(JVM 进程)执行任 务。supervisor 和 worker 都是运行在不同的 JVM 进程上,如果由 supervisor 拉起的一个 woker 进程因为错误(或者因为 Unix 终端的 kill-9 命令,Window 的 tskkill 命令强制结束) 异常退出,supervisor 守护进程会尝试重新生成新的 worker 进程。 看到这里你可能想知道 Storm 的有保障传输机制如何适应其容错模型。如果一个 worker 甚至整个 supervisor 节点都故障了,Storm 怎么保障出错时正在处理的 tuples 的 传输? 答案就在 Storm 的 tuple 锚定和应答确认机制中。当打开了可靠传输的选项,传输到 故障节点上的 tuples 将不会收到应答确认,spout 会因为超时而重新发射原始 tuple。这样 的过程会一直重复直到 topology 从故障中恢复开始正常处理数据。 2.1.3 Apache ZooKeeper 简介 ZooKeeper 使用一个简单的操作原语集合和分组服务,在分布式环境下提供了集中式 的信息维护管理服务。它是一种简单但功能强大的分布式同步机制,允许客户端的应用程 序监控或者订阅数据集中的部分数据,当数据产生,更新或者修改时,客户端都会收到通 知。使用常见的 ZooKeeper 模式或方法,开发者可以实现分布式计算所需要的很多种机 制,比如 Leader 选举,分布式锁和队列。 Storm 主要使用 ZooKeeper 来协调一个集群中的状态信息,比如任务的分配情况, worker 的状态,supervisor 之间的 nimbus 的拓扑度量。nimbus 和 supervisor 节点之间的通 信主要是结合 ZooKeeper 的状态变更通知和监控通知来处理的。 Storm 对 ZooKeeper 的使用相对比较轻量化,不会造成很重的资源负担。对于重量级 的数据传输操作,比如发布 topology 时传输 jar 包,Storm 依赖 Thirft 进行通信。我们将会 第 2 章 配置 Storm 集群   27 看到,topology 组件之间的数据传输(最影响效率的地方)是在底层进行的,并且经过了 性能优化。 2.1.4 Storm 的 DRPC 服务工作机制 Storm 应用中的一个常见模式期望将 Storm 的并发性和分布式计算能力应用到“请求 - 响应”范式中。一个客户端进程或者应用提交了一个请求并同步地等待响应。这样的范式 可能看起来和典型 topology 的高异步性、长时间运行的特点恰恰相反,Storm 具有事务处 理的特性来实现这种应用场景,如图 2-2 所示。 %31$ 4FSWFS %31$ 4FSWFS %31$ CPMU %31$ TQPVU UPQPMPHZ BSHT SFTVMU <SFRVFTU-JE SFTVMU> <SFRVFTU-JEᶲBSHTᶲSFUVSO-JOGP> 图 2-2 为了实现这个功能,Storm 将额外的服务(Storm DRPC)以及 spout 和 bolt 整合在一 起工作,提供了可扩展的分布式 RPC 能力。 DRPC 功能是完全是可选的,当 Storm 集群中的应用有使用这个功能时,DRPC 服务 节点才是必须的。 2.1.5 Storm UI Storm UI 也是可选功能,非常有用,会提供一个基于 Web 的 GUI 来监控 Storm 集群, 对正在运行的 topology 有一定的管理功能。Storm UI 提供了已经发布的 topology 的统计信 息,对监控 Storm 集群的运转和 topology 的功能有很大帮助,如图 2-3 所示。 28   Storm 分布式实时计算模式 图 2-3 Storm UI 只能报告由 nimubs 的 trhift API 获取的信息,不会影响到 topology 上其他 功能。Storm UI 可以随时开关而不影响任何 topology 的运行,在那里它完全是无状态 的。它还可以用配置来进行一些简单的管理功能,如开启、停止、暂停和重新均衡负载 topology。 2.2 Storm 技术栈简介 在安装 Storm 之前,我们来看看 Storm 和 topology 是基于哪些技术建立的。 2.2.1 Java 和 Clojure Storm 运行在 Java 虚拟机上,大部分是使用 Java 和 Clojure 进行开发的。Storm 的主 第 2 章 配置 Storm 集群   29 要接口都是通过 Java 语言定制的,Storm 使用 Python 实现了可执行程序。除了这些程序, 由于 Java 使用了 Apache Thrift 接口,Java 还友好的兼容多种其他语言。 Storm 的组件(spout 和 bot)实际上可以使用任何当前服务器安装支持的语言进行开 发。JVM 虚拟机支持的语言可以原生的执行,其他语言的实现需要通过 JNI 和 Storm 的多 语言协议来实现。 2.2.2 Python 所有 Storm 的后台程序和管理命令都是使用单独一个可执行 Python 文件来启动。这 包括了 nimbus 和 supervisor 后台程序,并包括后续会讲的所有的命令和发布管理命令。这 样做的原因主要是 Storm 集群中所有的服务器都安装了 Python 解释器。很多工作站也使 用 Python 来进行管理。 2.3 在 Linux 上安装 Storm Storm 是设计运行在 Unix 兼容的操作系统上。但在 0.9.1 版本,它也支持在 Windows 机器上部署。 为了简化部署,我们使用 Ubuntu 12.04LTS 的发行版作为安装服务器。将会使用服务 器版本,默认不包括图形界面接口,因为我们用不到。Ubuntu12.04LTS 版本可以在 http:// releases.ubuntu.com/precise/ubuntu-12.04.2-server-i386.iso. 下载到。在实体机和虚拟机上安 装 ubuntu 都是非常方便的。出于学习和开发的目的,你会发现在虚拟机里进行部署更加方 便,尤其是手头没有那么多实体机的情况。 OSX、Linux、Windows 都有着对应的虚拟机软件。我们建议从下面集中软件中选择 一个: ● VMWare(OSX、Linux、Windows) 这个软件是需要付费的,可以在 http://www.vmware.com 获得。 ● VirtualBox(OSX、Linux、Windows) 这个软件是免费的,可以在 https://www.virtualbox.org 下载 ● Parallels Desktop(OSX) 这个软件也是需要付费的,可以在 http://www.parallels.com 下载。 30   Storm 分布式实时计算模式 2.3.1 安装基础操作系统 可以从 Ubuntu 的安装盘或镜像启动,遵循屏幕上的指令选择基础安装。在 Packege Selection 在屏幕上出现的时候,选取安装 OpenSSH 服务的选项。这个安装包允许你从远 程 ssh 登录上服务器。在其他安装界面上,都选择默认选项就可以了,除非是为了兼容硬 件修改选项。 默认情况下,进入 Ubuntu 的初始化用户具有管理权限(sudo)。如果你要重新建一个 账户,记得给账户赋予管理权限。 2.3.2 安装 Java 首先安装 JVM。Storm 可以兼容 1.6 和 1.7 版本 OpenJDK 和 Oracle 发布的 JVM。本 例中,我们首先更新系统,然后安装 1.6 版本的 JDK。 2.3.3 安装 ZooKeeper 在我们的单节点伪集群中,需要安装一个 ZooKeeper 实例就够了。Storm 需要 3.3.X 版本的 ZooKeeper。调用一次下述命令即可: 这个命令会安装二进制版本的 ZooKeeper,并且会生成一个服务的脚本来启动和关闭 它。还会生成一个 cron 工作来定期清理过期的 ZooKeeper 传输记录和快照。如果不按默 认设置来清理,ZooKeeper 很快就会占用大量硬盘使用空间。 2.3.4 安装 Storm Storm 二进制发行版本可以在 Storm 官网(http://storm.incubator.apache.org)下载到。 二进制版本的安装文件夹布局更适合在生产系统中部署。我们会做一些修改,使之更贴近 UNIX 的风格(比如日志搭在 /var/log 下而不是搭在 Storm 的主目录)。 首先新建 Storm 的用户和用户组。这可以避免 Storm 进程以默认或者 root 权限启动: 第 2 章 配置 Storm 集群   31 然后下载和安装 Storm 的发行版本。我们将 Storm 安装在 /usr/share 目录下并且生成 一个版本无关的软连接到 /usr/share/storm 目录下。这种方式便于将来升级版本,或回滚 版本,只需要重新建立一个软连接即可。我们在将 Storm 的可执行程序软连接到 /usr/bin/ storm 目录下: 默认的,Storm 会将日志写在 $STORM_HOME/logs,而不像其他 Unix 程序一样将日 志写在 /var/log 下。执行下面的命令,在 /var/log 生成 Storm 转述的 log 目录,并将 Storm 配置指向那里: 最后,将 Storm 的默认配置文件移到 /etc/storm 下,建立软连接以便 Storm 程序可以 找到它: Storm 安装完毕后,可以对 Storm 守护进程的程序进行配置,使其异常之后可以自动 恢复。 2.3.5 运行 Storm 守护进程 所有的 Storm 守护进程都是设计为快速失败的,也就是一旦遇到了任何异常错误进程将 会终止。这样使得单独的组件可以安全地结束,并且在不影响系统其他部分的情况下恢复。 这意味着,Storm 的守护进程无论在什么时候异常终止,都需要立即重启。这个技 术称为在监督(supervision)下运行进程。幸运的是,有很多种技术供选择。实际上, ZooKeeper 也是一个快速失败的系统,在 Debian 发行版中特有的基于 upstart 的 init 脚本 给 ZooKeeper 提供了这项功能。如果 ZooKeeper 进程在任意时间退出,upstart 会保证重启 32   Storm 分布式实时计算模式 进程恢复集群。 Debian 的 upstart 系统非常适合这种场景。在其他 Linux 发行版本还有其他的选择。 为了简单起见,我们使用大部分 Linux 发行版都有的 supervisor 包来实现这个功能。不巧 的是,supervisor 的名字和 Storm 的 supervisor 后台程序冲突了。为了表明区别,我们将非 Storm 的 supervisior 后台程序叫做 supervisord(添加了一个 d),但后面代码示例中,还是 会用没有 d 的名字来执行命令。 在基于 Debian 的 Linux 发行版本中,supervisord 包命名为 supervisor,其在其他发行 版,如 RED HAT 中,使用 supervisord 的名字,执行下述命令: 这条命令会安装和启动 supervisord 服务,服务的主要配置在 /etc/supervisor/supervisord 中。Supervisord 服务会自动包含所有 /etc/supervisord/conf.d/ 目录下的 *.conf 文件,我们将 在这里放置运行 Storm 后台进程需要的 config 文件。 每个要在 Supervisord 监督下运行的命令,都需要创建一个文件,包含下列内容: ● 每个被监督命令要配置一个在 supervisord 配置中唯一的名字。 ● 启动的命令 ● 启动时的工作目录 ● 当一个命令或者服务终止时,是否要拉起。对于快速失败服务,这项配置永远是 true。 切换到 Storm 命令时使用的目录。这里,我们使用 Storm 用户来执行所有守护进程 建立下述三个文件使得 Storm 守护进程由 supervisord 自动拉起(在遇到异常终止的事 件时也会重启)。 ● /etc/supervisord/conf.d/storm-nimbus.conf 文件内容是: ● /etc/supervisord/conf.d/storm-supervisor.conf 文件内容是: 第 2 章 配置 Storm 集群   33 ● /etc/supervisord/conf.d/storm-ui.conf 文件内容是: 当文件生成后,通过下述命令终止启动 supervisord 服务: supervisord 会载入新的配置文件并且启动 Storm 的守护进程。片刻后 Storm 服务就会 启动,可以通过在浏览器里访问下述 URL 来判断 Storm 伪集群是否已经启动运行(使用实 际机器的 IP 或者 hostname 来替换 URL 中的 localhost 字 段 ): 这里提及了 Storm UI 图形接口。它表明集群正在运行,包含了一个 supervisor 和四个 worker 槽位,没有 topology 在集群中运行(稍后将会将一个 topology 发布到集群里)。 如果 Strom UI 因为某些原因未能启动起来,或者看不到集群中活跃的 supervisor,可 以从下述位置查看错误日志: ● Strom UI:在 /var/log/storm 中查看 ui.log 文件。 ● Nimbus:在 /var/log/storm 中查看 nimbus.log 文件。 ● Supervisor:在 /var/log/storm 查看 supervisor.log 文件。 迄今为止我们都在使用 Storm 的默认配置,使用 localhost 作为集群中 ZooKeeper、 Nimbus 等服务的主机名参数。在一个机器上构建单节点伪集群时是可行的,但是实际搭 建一个多节点的集群需要重新定义一些默认配置选项。 下来,我们将介绍 Storm 提供的不同的配置选项,以及这些配置项如何影响集群和 topology 的行为。 2.3.6 配置 Storm Storm 的配置由一些 YAML 格式的属性组成。当一个 Storm 的守护进程启动时,会 加载默认属性,然后加载 $STORM_HOME/conf 目录下的 storm.yaml(已经软连接到 /etc/ storm/storm.yaml)文件,然后使用该文件中的值替换默认值。 34   Storm 分布式实时计算模式 下面列出了 storm.yaml 配置文件中必须要重新定义的几项内容: 2.3.7 必需的配置项 下述配置项是生产环境的多节点 Storm 集群的必选配置: ● storm.zookeeper.serviers:这项配置列出了 ZooKeeper 集群的主机名称。因为我们在 一台机器上运行单节点的 ZooKeeper,并且 Storm 的其他守护程序都在同一台机器 上,可以使用默认值的 localhost。 ● nimbus.host :指定了集群中 nimbus 的节点。worker 需要从这项配置知道集群的主 节点在哪里,用来下载 topology 的 jar 包和配置选项。 ● supervisor.slots.pors :这个配置控制每个 supervisor 节点运行多少个 worker 进程。 这个配置定义为 worker 监听端口的列表,监听端口的个数控制了 supervisor 节点上 有多少个 worker 的插槽。例如,如果我们有个集群中有三个 supervisor 节点,每 个节点配置了三个监听端口,整个集群就有九个 worker 插槽(3×3=9)。默认的, Storm 使用 6700~6703 端口,每个 supervisor 节点上有 4 个 worker 插槽。 第 2 章 配置 Storm 集群   35 ● storm.local.dir :nimbus 和 supervisor 守护进程都会存储一些短暂的状态信息,比如 JAR 报和 woker 需要的配置文件。这个配置项决定了 nimbus 和 supervisor 将信息 存储在哪里。这个指定的目录必须已经存在,并且 storm 的启动用户要有合适的操 作权限,可以读或者写这个目录。这个目录下的内容必须在集群运行的过程中一直 保存,所以要避免使用 /tmp 目录作为这个这个配置项,因为重启后 /tmp 目录的内 容会丢失。 2.3.8 可选配置项 一个运营中的 Storm 集群除了上述必选配置项之外,还有一系列可选配置项,可以按 照需要进行重新定义。Storm 配置选项使用点号分隔的命名规范,用前缀来识别配置分类; 如表 2-1 所示。 表 2-1 前  缀 分  类 前  缀 分  类 storm.* 通用配置 supervisor.* Supervisor 配置 nimbus.* Nimbus 配置 worker.* Worker 配置 ui.* Storm UI 配置 zmq.* ZeroMQ 配置 drpc.* DRPC 服务配置 topology.* Topology 配置 如果需要查看完整的默认配置,可以在Storm 的源码中查看defaults.yaml 文件 (https://github.com/nathanmarz/storm/blob/master/conf/defaults.yaml)。其他几个经常需要重 新定义的配置如下: ● nimbus.childopts(default:“ -Xms1024m”) :这项 JVM 的配置会添加在启动 nimbus 守护进程的 Java 命令行中。 ● ui.port(default:8080):这项配置指定了 Storm UI 的 Web 服务器监听的端口。 ● ui.childopts(default:“ -Xms1024m”) :这项 JVM 配置会添加在 Storm UI 服务启动 的 Java 命令行中。 ● supervisor.childopts(default:“-Xms768m”):这个 JVM 选项会添加在 Supervisor 启 动的 Java 命令行中。 ● worker.childopts(default:“ -Xms768m”) :这个 JVM 选项会添加在启动 worker 进 程的 Java 命令行中。 36   Storm 分布式实时计算模式 ● topology.message.timeout.secs(default:30):这个配置项设定了一个 tuple 树需要应答最大 时间秒数限制,超过这个时间的认为已经执行失败(超时)。这个值设置得太小可能 会导致 tuple 反复重新发送。当这个选项生效时,spout 必须设定来发送锚定的 tuple。 ● topology.max.spout.pending(default:null) :在默认值null 的时候,每当spout 产生 了新的 tuple,Storm 会立即将 tuple 向后端数据流发送。由于下游 bolt 执行可能有 延迟,默认的数据发送行为可能导致 topology 过载,从而导致消息处理超时。将 本选项设置为非 null 大于 0 的数字时,Storm 会暂停发送 tuple 到数据流直到发 送出去的 tuple 小于这个数字,起到了对 spout 限速的作用。这项配置和 topology. message.timeout.secs 一起,是调节 topology 性能的最重要的两个参数。 ● topology.enable.message.timeouts(default:ture) :这个选项用来设定锚定的 tuple 的超 时时间。如果设置为 false,则锚定的 tuple 不会超时。谨慎使用这个选项,将本项 配置设置为 flase 之前,考虑改变 topology.message.timeout.secs。本项配置生效后, spout 必须配置为发送锚定 tuple。 2.3.9 Storm 可执行程序 Storm 执行程序是一个多用途的命令行程序,可以用来启动所有的守护进程,执行 topology 管理操作,比如说部署一个新的 topology 到集群,或者使用本地模式在开发测试 阶段执行一个 topology。 Storm 命令的基础命令如下: 2.3.10 在工作站上安装 Storm 可执行程序 为了在连接的远程集群上执行 Storm 命令,需要在本地安装 Storm 发行版。在工作站 上安装 Storm 发行版很简单,只需要解压缩 Storm 发行版的压缩包,并且添加 Stom 的 bin 目录($STORM_HOME/bin)到 PATH 环境变量中。下一步在 ~/.storm/ 目录下新建 storm. yaml 文件,文件内容只有一行,告诉 storm 在哪里可以找到需要连接集群的 nimbus 服务: 示例:~/.storm/storm.yaml file. 第 2 章 配置 Storm 集群   37 为了让一个 Storm 集群正确执行,必须正确配置主机名的 IP 地址解析,无论是通 过 DNS 系统还是通过 /etc/hosts 文件。 在 Storm 配置中,也可以直接使用 IP 来代替主机名,使用 DNS 系统更好。 2.3.11 守护进程命令 Storm 的守护进程用来启动 Storm 服务,应该在被监督模式下启动,这样,程序如果 有异常失败,服务就可以重启。在启动时,Storm 守护程序从 $STORM_HOME/conf/storm. yaml 中读取配置。这个配置文件中的任何配置项都会重新定义内置的默认配置。 Nimbus 用法:storm nimbus 这个命令启动 nimbus 守护进程 Supervisor 用法:storm supervisor 这个命令启动 supervisor 进程 UI 用法:storm ui 这个命令启动 Storm UI 的守护进程,提供了一个 web 的 UI 界面用来监控 Storm 集群。 DRPC 用法:storm drpc 这个命令启动一个 DRPC 服务守护进程 2.3.12 管理命令 Storm 的管理命令用来发布和管理集群中的 topology。管理命令通常在 Storm 集群外 部的工作站来执行,但不是必需的。和 nimbus Thrift API 通信需要知道 nimbus 节点的主 小提示 38   Storm 分布式实时计算模式 机名。管理命令在 ~/.storm/storm.yaml 文件中查找配置,Storm 的 jar 包会添加到 Java 的 classpath 中。配置中唯一需要的参数是 nimbus 节点的主机名: Jar 用法:storm jar topology_jar topology_class[arguments...] jar 命令用来向集群提交topology。它会使用指定的参数运行topology_class 中的 main() 方法,同时上传 topology_jar 文件到 nimbus 以分发到整个集群。提交后,Storm 集 群会激活并且开始运行 topology。 topology 类中的 main() 方法需要调用 StormSubmitter.submitTopology() 方法,并且为 topology 提供集群内唯一的名称。如果集群中已经有一个同名的 topology,jar 命令会执行 失败。通常会通过命令行参数的方式指定 topology 的名称,这样 topology 就可以在提交执 行的时候再命名。 Kill 用法:storm kill topology_name[-w wait_time] Kill 命令用来关闭已经部署的 topology。这个命令使用 topology_name 来关闭 topology。 首先,Storm 在 topology.message.timeout.secs 的时间后使 toplogy 的 spout 取消激活,这样 已经发出去的 tuple 就可以执行完毕。然后停止 worker 进程,并且尝试清理所有存储的状 态信息。特别是,通过 -w 参数,可以使用特定的时间间隔覆盖 topology.message.timeout. secs 参数。Kill 命令也可以用在 Storm UI 上进行操作。 Deactivate 用法:storm deactivate topology_name Deactivate 命令告诉 Storm 停止特定的 topology 的 spout 发送 tuple。topology 可以在 Storm UI 上进行取消激活的操作。 Activate 用法:storm activate topology_name Activate 命令告诉 Storm 重新恢复特定 topology 的 spout 发送 tuple,topology 可以在 第 2 章 配置 Storm 集群   39 Storm UI 中重新激活操作。 Rebalance 用法:storm rebalance topology_name[-w wait_time][-n worker_count][-e component_name= executer_count]... rebalance 命令指示 Storm 在集群的 worker 之间重新平均地分派任务,不需要关闭或 者重新提交现有的 topology。例如,当一个新的 supervisor 节点添加到一个集群中时,就 会需要执行这个命令,因为现有的 topology 是不会将任务分配到新节点的 worker 上的。 rebalance 命令还可以分别使用 -n 和 -e 参数来修改为 topology 分配的 worker 的个数 以及每个 task 分配的 executor 的个数。 当执行 rebalance 命令时,Storm 会先取消激活 topology,等待配置的时间使剩余的 tuple 完成处理,然后在 supervisor 节点中均匀地重新分配 worker。重新平衡后,Storm 会 将 topology 恢复到之前的激活状态(意思是,如果是已经激活的 topology,Storm 会重新 激活它,反之亦然)。 下述的例子会等待 15 秒后重新平衡 wordcount-topology topology,指定 5 个 worker, 比如去设置 sentence-spout 和 split-bolt 使用 4 个和 8 个 executor 线程: Remoteconfvalue 用法:storm remoteconfvalue conf-name Remoteconfvalue 命令用来查看远程集群中的配置参数值。注意,用这个命令看到的 是整个集群的公共配置,看不到单独 topology 中覆盖的特殊配置。 2.3.13 本地调试 / 开发命令 Storm 的本地命令是用来进行调试和测试用的。和管理命令类似,Storm 的调试命令 也会读取 ~/.storm/storm.yaml 文件并且使用其值来覆盖 Storm 内置的默认值。 REPL 使用:storm repl Repl 命令会使用 Storm 的本地 classpath 打开一个 Clojure REPL 会话。 40   Storm 分布式实时计算模式 Classpath 使用:storm classpath Classpath 命令打印 Storm client 使用的 classpath 值。 Localconfvalue 使用:storm localconfvalue conf-name Localconfvalue 命令在整合 ~/.storm/storm.yaml 和 Storm 内置默认值后的配置中查找 特定配置项的值。 2.4 把 toplogy 提交到集群中 现在我们已经有一个正在运行的 Storm 集群了,让我们回顾一下前面说的单词计数的 例子,然后使之能够像本地模式一样运行在集群环境。前面的例子使用 LocalCluster 类将 topology 运行在本地模式: 提交一个 topology 到远程集群非常简单,只需要使用 StormSubmitter 类中同样的方法 和名称: 当开发一个 Storm 的 topology 时,通常不想在本地 / 远程集群模式之间切换部署时修 改代码和重新编译。标准的方法是使用一个 if/else 的条件块,使用命令行参数来决定使用 哪种模式,当命令行不带参数时,使用本地模式,反之当使用 topology 名称做参数时,使 用远程集群模式,如下所示: 第 2 章 配置 Storm 集群   41 为了更新单词计数程序到运行中的集群,首先在第 2 章的代码目录执行 Maven 的编 译命令: 然后,执行 storm jar 命令来发布 topology: 42   Storm 分布式实时计算模式 当命令执行完毕时,你应该看到 topology 在 Storm UI 里已经激活,并且可以点击 topology 的名称来查看详情和 topology 的统计信息,如图 2-4 所示。 图 2-4 2.5 自动化集群配置 迄今为止,我们已经在命令行模式下手工配置了单节点伪集群。这种方法应对小规模 集群当然是行得通,但当集群规模不断增加时,这种方式就变得难以维护了。考虑到配置 十个,甚至成百上千节点的场景。配置任务可以通过脚本来自动执行,但即使是基于脚本 的自动化解决方案在扩展性上也是有问题的。 幸运的是,已有一些有效的技术来帮助解决管理大规模服务器集群的配置和定制问 题。Chef 和 Puppet 都提供了声明式的配置方式,来定义机器的状态(意味着机器安装了 第 2 章 配置 Storm 集群   43 哪些软件包,以及如何配置的)和分类(例如,一个 Apache Web Server 类机器需要安装 Apache 的 httpd 程 序 )。 服务器定制和配置过程的自动化是一个非常大的主题,超出了本书的范畴。我们的目 的是使用 Puppet 中的一部分功能,介绍其基本的概念,同时鼓励大家进一步深入了解。 2.6 Puppet 的快速入门 Puppet(https://puppetlabs.com) 是一个 IT 自动化框架,用来帮助管理员管理大规模的 网络设施资源,使用灵活的、声明式的实现方式来进行 IT 自动化管理。 Puppet 的核心是 manifest 的概念,它描述了一个设施资源对预期的声明(state)。 在 Puppet 中,声明包括了以下方面: ● 安装了哪些软件包。 ● 运行着哪些服务,不运行哪些服务。 ● 软件配置详情。 2.6.1 Puppet manifest 文件 Puppet 使用了基于 Ruby 的 DSL(Domain Specific Language)在一组 manifest 文件中 来描述系统配置。例如,描述 ZooKeeper 的 Puppet manifest 文件如下: 这个最简单的 manifest 例子表明 ZooKeeper 作为一个服务已经安装在机器上,并且 服务是运行的。第一个 package 的代码块告诉 Puppet 使用操作系统的包管理程序(例如, Ubuntu/debian 的 apt-get,Ret Hat 的 yum 等)来安装 3.3.5 版本的 ZooKeeper 软件包。第 44   Storm 分布式实时计算模式 二个 package 代码块保证 zookeeperd 软件包已经安装。最后 service 代码块告诉 Puppet 需 要保证 zookeeperd 的系统服务正在运行,这个服务依赖于 zookeeperd 软件包已经安装。 为了演示 Puppet manifest 文件如何转化为安装软件和系统声明,让我们安装 Puppet 并且使用前面的例子来安装和启动 zookeeperd 服务。 为了获取最新版的 Puppet,需要将 apt-get 配置使用 Puppet 实验室的软件源。执行下 述命令来安装最新版的 Puppet: 然后,保存前面列出的实例 manifest 文件,命名为 init.pp。使用 Puppet 来应用这个 manifest: 当命令执行完后,查看 zookeeper 服务是否正在运行: 如果我们手动停止了 zookeeper 服务并且重新运行 Puppet apply 命令,Puppet 不会重 复执行安装过程,但是会重启服务,因为在 manifest 文件中定义的声明中,服务是运行的。 2.6.2 Puppet 类和模块 使用独立的 Puppet manifest 文件很容易定义资源的声明,但这种实现方法在需要管理 的资源增多时变得越来越不方便。所幸是 Puppet 有类和模块的概念,可以用来更好地组织 和隔离特定的配置详情。 考虑到 Storm 的场景,我们有多个节点的类。举例来说,一个 Storm 集群会有 nimbus 节点,supervisor 节点或者同时兼具两种服务的节点。Puppet 类和模块提供了一种区分不 同配置角色的方法,可以混合搭配来方便地定义身具多种角色的资源。 举例说明这个功能,回顾上面安装 zookeeper 包的 manifest 文件,将它重新定义为一 个类,这样包含多个类的 manifest 文件就可以引用这个类: 第 2 章 配置 Storm 集群   45 在前面的例子里,我们将 zookeeper 的 manifest 文件重新定义为 puppet 的类。这个类 可以在其他类或 manifest 文件中使用。第二行中,zookeeper 类包含了另外一个类,jdk, 引用这个类的定义,表示它声明一个机器必须具备 Java JDK。 2.6.3 Puppet 模板 Puppet 还利用 Ruby ERB 模板系统来定义各种文件的模板,在 Puppet 生成 manifest 时 使用。当 Puppet 运行时,会替换 Puppet ERB 模板中 Ruby 表达式和构造式。ERB 模板中 的 Ruby 语句对 manifest 文件中的 Puppet 变量和定义有完全访问的权限。 下述 Puppet 文件声明是用来生成 storm.yaml 配置文件的: 这个声明告诉 Puppet,从 storm.yaml.erb 模板中,在 /etc/storm 下新建 storm.yaml 文件: 46   Storm 分布式实时计算模式 模板中的条件逻辑和变量扩展使我们在多种环境下可以使用同一个模板文件。举例来 说,如果环境中不需要 Storm DRPC 服务,则生成的 storm.yaml 文件中就会忽略掉 drpc. service 的部分。 2.6.4 使用 Puppet Hiera 来管理环境 我们已经简要介绍了 Puppet manifest、类和模板的概念。你这时候可能会好奇,如何 在一个 puppet 的类或 manifest 文件中定义变量。在 puppet 类或 manifest 文件中定义变量 非常简单,在 manifest 文件和类中像下面这样定义即可: 定义后,java_sersion 变量就可以在类或 manifest 以及 ERB 模板中使用了;但是,这 也带来了不易复用的问题。如果我们将版本号等信息写死在代码里,就会因固定的变量值 限制了这个类的复用效率。如果可以将变量扩展到可能潜在的各种变化值,配置管理会变 得更容易维护。这就是 Hiera 的用武之地。 2.6.5 介绍 Hiera Hiera 是一个最新版 Puppet 框架中集成的一个键值对的查找工具。Hiera 允许定义键 值对分层结构(hierarchies,Hiera 的名字是缩写),在结构中,父节点定义的值可以被子节 点定义的值代替。 例如,考虑我们在 Storm 集群中需要给一些机器定义配置参数的场景。所有的机器 会使用一套通用的键值对,比如说使用的 Java 版本。所以我们将这些值定义在 common. yaml 中。从这里开始,开始出现分化。我们可能有单节点伪集群,也可能有多节点集 群。我们会将环境相关的变量放在特殊的文件里,比如“single-node.yaml”和“cluster. 第 2 章 配置 Storm 集群   47 yaml”。 最后,我们会将机器相关的信息写在这种格式的文件中: “[hostname].yaml”,如图 2-5 所示。 Puppet 集成的 Hiera 允许使用 Puppet 内置变量来正确的解析文件 名称。 第 2 章的示例源代码目录说明了如何实现文件这种文件组织方 式。一个典型的 common.yaml 文件可能定义全局属性对所有的机器通 用,如下格式: 在 environment 级别,可以区分单节点和集群配置的不同,cluster.yaml 可以像下面这 样定义: 最后,是定义机器相关的参数配置,使用对应的[hostname].yaml 文件,定义的 Puppet 类会针对特定节点。 DPNNPOZBNM ᶪ\FOWJSPONFOU^ZBNM ᶪ\IPTUOBNF^ZBNM 图 2-5 48   Storm 分布式实时计算模式 比如 nimbus01.yaml,使用下列配置: 对 zookeeper01.yaml。使用下列代码: 我们对 Puppet 和 Hiera 仅仅了解了皮毛。第 2 章的代码中有额外的例子和文档来说明 如何使用 Puppet 进行自动部署和配置任务。 总结 在本章中,我们介绍了安装和配置单节点和多节点集群 Storm 的必须步骤。还介绍了 Storm 的守护进程和命令行工具用来管理和运行 topology。 最后,简要介绍了 Puppet 框架,演示如何使用该框架管理多节点环境下的配置。鼓 励大家更深入研究下载的代码和文档。 下一章,我们将介绍 Trident,它是一个在 Storm 事务处理和状态管理基础上高级别的 抽象技术。 第 3 章 Trident 和传感器数据 在本章中,我们将介绍 Trident topology。Trident 在 Storm 上提供了高层抽象。Trident 抽象掉了事务处理和状态管理的细节。特别是,它可以让一批 tuple 进行离散的事务处 理。此外,Trident 还提供了抽象操作,允许 topology 在数据上执行函数功能、过滤和聚 合操作。 我们将使用传感器数据作为例子来更好地理解 Trident。通常情况下,传感器数据流 会来自不同的位置。一些传统的例子包括天气和交通状况,这种模式扩展到了更大的数据 源。比如,手机应用产生的众多事件信息。处理手机生成的事件流就是另一个传感器数据 处理的实例。 传感器数据包括不同设备发射的事件,往往是无穷尽的数据流。这正是 Storm 最合适 的一种应用场景。 本章包括以下主题: ● Trident topology ● Trident spout ● Trident 操作——filter 和 function ● Trident 聚合——Combiner 和 Reducer ● Trident 状态(state) Chapter 3 50   Storm 分布式实时计算模式 3.1 使用场景 在用 Storm 处理传感器数据时,为了更好地理解 Trident topology,我们实现了一个 Trident topolygy 收集医学诊断报告来判断是否有疾病暴发的实例。 这个 topology 会处理的医学诊断事件包括以下的信息: Latitude Longitude Timestamp Diagnosis Code(ICD9-CM) 39.9522 -75.1642 03/13/2013 at 3:30 PM 320.0(Hemophilus meningitis) 40.3588 -75.6269 03/13/2013 at 3:50 PM 324.0(Intracranial abscess) 每个事件包括事件发生时的全球定位系统(GPS)的位置坐标,经度和纬度使用十进 制小数表示。事件还包括 ICD9-CM 编码,表示诊断结果,以及事件发生的时间戳。完整 的 ICD9-CM 编码参见 http://www.icd9data.com/。 为了判断是否有疾病暴发,系统会按照地理位置来统计各种疾病代码在一段时间内出 现的次数。为了简化例子,我们按城市划分诊断结果地理位置。实际系统会对地理位置做 出更精细的划分。 另外,示例中会逐小时对诊断事件进行分组。实际系统会更倾向于使用滑动窗口,使 用移动平均值来计算趋势。 最后,我们使用简单的阈值来判断是否有疾病暴发。如果某个小时事件发生的次数超 过了阈值,系统会产生告警信息并且派遣应急人员。 为了维护历史记录,我们还需要将每个城市、小时、疾病的统计量持久化存储。 3.2 Trident topology 为了满足这些需求,我们需要在 topology 中对疾病的发生进行统计。使用标准的 Storm topology 进行统计会遇到难题,因为 tuple 可能重复发送,这会导致重复计数的问 题。下一节将会看到,Trident 提供了操作原语来解决这个问题。 我们将使用的 topology,如图 3-1 所示。 上述 topology 的代码如下: 第 3 章 Trident 和传感器数据   51 Ѧ஝ Ѧ஝ Ѧ஝ Ѧ஝ Ѧ஝ 图 3-1 52   Storm 分布式实时计算模式 上述代码表现了不同 Trident 函数之间的布局关联方式。首先,DiagnosisEventSpout 函数发射疾病事件。然后事件由 DiseaseFilter 函数过滤,过滤掉我们不关心的疾病事件。 之后,事件由 CityAssignment 函数赋值一个对应的城市名。然后 HourAssignment 函数赋 值一个表示小时的时间戳,并且增加一个 key cityDiseaseHour 到 tuple 的字段中,这个 key 包括城市、小时和疾病代码。后续就使用这个 key 进行分组统计并使用 persistAggregate 函数对统计量持久性存储。统计量传递给 OutbreakDetector 函数,如果统计量超过阈值, OutbreakDetector 向后发送一个告警信息。最后 DispatchAlert 接收到告警信息,记录日志, 并且结束流程。在后面,我们会深入了解每个步骤。 3.3 Trident spout 让我们先来看 topology 中的 spout。和 Storm 相比,Trident 引入了“数据批次”(batch) 的概念。不像 Storm 的 spout,Trident spout 必须成批地发送 tuple。 每个 batch 会分配一个唯一的事务标识符。spout 基于约定决定 batch 的组成方式。spout 有三种约定:非事务型(non-transactional)、事务型(transactional)、非透明型(opaque)。 非事务型 spout 对 batch 的组成部分不提供保障,并且可能出现重复。两个不同的 batch 可能含有相同的 tuple。事务型 spout 保证 batch 是非重复的,并且 batch 总是包含相 同的 tuple。非透明型 spout 保证数据是非重复的,但不能保证 batch 的内容是不变的。 表 3-1 描述了这些特性。 表 3-1 spout 类型 batch 可能有重复数据 batch 内容不会变化 非事务型 × × 非透明型 × 事务型 spout 接口如下面代码片段所示: 第 3 章 Trident 和传感器数据   53 在 Trident 中,spout 没有真的发射 tuple,而是把这项工作分解给了 BatchCoordinator 和 Emitter 方 法。Emitter 负责发送tuple,BatchCoordinator 负责管理批次和元数据, Emitter 需要依靠元数据来恰当地进行批次的数据重放。TridentSpout 函数仅仅是简单地提 供了到 BatchCoordinator 和 Emitter 的访问方法,并且声明发射的 tuple 包括哪些字段。下 面列出了示例中的 DiagnosisEventSpout 方法: 54   Storm 分布式实时计算模式 如上述代码中的 getOutputFields() 方法所示,在我们的实例 topology 中,spout 发射一 个字段 event,值是一个 DiagnosisEvent 类。 BatchCoordinator 类实现下述接口: BatchCoordinator 是一个泛型类。这个泛型类是重放一个 batch 所需要的元数据。 在本例中,spout 发送随机事件,因此元数据可以忽略。实际系统中,元数据可能包含 组成了这个 batch 的消息或者对象的标识符。通过这个信息,非透明型和事务型 spout 可以实现约定,确保 batch 的内容不出现重复,在事务型 spout 中,batch 的内容不会出 现变化。 BatchCoordinator 类作为一个 Storm Bolt 运行在一个单线程中。Storm 会在 ZooKeeper 中持久化存储这个元数据。当事务处理完成时会通知到对应的 coordinator。 在我们的例子中,没有做特定的协调操作,下面就是 DiagnosisEventSpout 类中使用 的协调操作: 第 3 章 Trident 和传感器数据   55 Trident spout 的第二个组成部分是Emitter。在Storm 里,spout 使用collector 来 发送tuple,Emmiter 函数在Trident spout 中执行这种功能。唯一的区别是,使用 TridentCollector 类,发送出去的 tuple 是通过 BatchCoordinator 类初始化的一批数据。 Emitter 方法的接口格式如下所示: 如前面代码所示,Emitter 函数只有一个功能,将 tuple 打包发射出去。为了实现这 个功能,函数接收的参数包括 batch(由 coordinator 生成的)的元数据、事务的信息和 Emitter 用来发送 tuple 的 collector。DiagnosisEventEmitter 类的代码如下所示: 56   Storm 分布式实时计算模式 发送的工作在 emitBatch() 中进行。例子中,我们随机分配一个经度和纬度,大体保 持在美国范围内,使用 System.currentTimeMillis() 方法生成诊断的时间戳。 实际场景中,ICD-9-CM 的范围在 000 到 999 之间。针对本示例,我们仅使用 320 到 327 之间的诊断代码。这些代码如下所示: 代  码     描  述 320 细菌性脑膜炎 321 其他病原体所致脑膜炎 322 未知病因的脑膜炎 323 脑炎脊髓炎 324 颅内和脊椎内脓肿 325 静脉炎及颅内静脉窦血栓性静脉炎 326 颅内脓肿或化脓性感染后期影响 327 器官性睡眠障碍 这些诊断代码随机分配给事件。 在这个例子里,我们使用对象来封装诊断事件。为简化起见,我们将事件的每个组 成部分作为 tuple 的一个独立字段。这里,对象封装还是使用 tuple 字段进行封装,需要权 衡。通常会限制 tuple 的字段在易于管理的数量之内,但为了数据流控制或 tuple 的分组策 略,将数据放在 tuple 的字段里还是有意义的。 在我们的例子中,DiagnosisEvent 类表示 topology 处理的关键数据。对象的代码如下 所示: 第 3 章 Trident 和传感器数据   57 这个对象是一个简单的 JavaBean。时间戳使用 long 变量存储,存储的是纪元时间的 秒数。经度和纬度使用 dobule 存储。diagnosisCode 类使用 string,以防系统可能需要处理 非 ICD-9 数据,比如有字母的代码。 至此,topology 已经可以发射事件了。在实际场景中,我们可能将 topology 集成到一 个医疗请求处理系统或者一个电子健康记录系统来进行实践演练。 3.4 Trident 运算 时间戳已经生成好了,下一步是加入处理事件的逻辑组件。在 Trident 中,这些组件 称为运算(operation)。在我们的 topology 中,使用两种不同的运算:filter 和 function。 运算通过 Stream 对象的方法来调用。这个例子中,我们使用了 Stream 对象的下述方法: 58   Storm 分布式实时计算模式 注意前面代码中列出的方法返回形式为 Stream 对象或者 TridentState 对象,返回可以 用来创建新的数据流。因此,运算可以连在一起使用流式接口形式的 Java 代码。让我们再 看看示例 topology 中的关键代码: 通常,应用运算需要声明一个输入域集合和一个输出域集合,也就是 funcition 域。上 面代码中 topology 第二行声明我们需要 CityAssignment 对数据流中的每个 tuple 执行操作。 在每个 tuple 中,CityAssignment 会在 event 字段上运算并且增加一个叫做 city 的新字段, 这个字段会附在 tuple 中向后发射。 每个操作在流式风格的语法上略有不同,这取决于操作需要哪些信息。下面将介绍不 同操作的详细语法和语义。 3.4.1 Trident filter 我们 topology 逻辑中的第一部分就是个过滤器 filter,它会忽略掉我们不关心的疾病 事件。在这个例子中,系统只关心脑膜炎(meningitis)的病情,从前面表格中看到,脑膜 炎对应的疾病代码是 320、321 和 322。 为了通过疾病代码过滤事件,我们需要利用 Trident filter。Trident 通过提供 BaseFilter 类,我们通过实现子类就可以方便地对tuple 进行过滤,滤除系统不需要的tuple。 BaseFilter 类实现了 Filter 接口,这个接口如下面代码片段所示: 第 3 章 Trident 和传感器数据   59 为了在数据流中过滤 tuple,应用需要通过继承 BaseFilter 类来实现这个接口。这个例 子中,我们使用下述过滤器过滤事件: 上面的代码中,我们从 tuple 中提取了 DiagnosisEvent 并且检查疾病代码。因为所有 的脑膜炎代码小于等于 322,我们也没有发送其他代码,所以只需要简单地检查代码是否 小于 322,就可以决定事件是否和脑膜炎有关。 Filter 操作结果返回 True 的 tuple 将会被发送到下游进行操作。如果方法返回 False, 该 tuple 就不会发送到下游。 在我们的 topology 中,我们在数据流上使用 each(inputFields,filter)方法,将这个 过滤器应用到数据流的每个 tuple 中: 3.4.2 Trident function 在 filter 之外,Storm 还提供了一个更通用功能的接口 function。function 和 Storm 的 bolt 类似,读取 tuple 并且发送新的 tuple。其中一个区别是,Trident function 只能添加数 60   Storm 分布式实时计算模式 据。function 发送数据时,将新字段添加在 tuple 中,并不会删除或者变更已有的字段。 function 接口如下代码片段所示: 和 Storm 的 bolt 类似,function 实现了一个包括实际逻辑的方法 execute。function 的 实现也可以选用 TridentCollector 来发送 tuple 到新的 function 中。用这种方式,function 也 可以用来过滤 tuple,起到 filter 的作用。 我们 topology 中的第一个 function 是 CityAssignment,如下所示: 第 3 章 Trident 和传感器数据   61 在这个 function 中,我们使用静态初始化的方式建立了一个我们关心的城市的地图。 示例中,function 包括一个地图,存储了的坐标信息包括:Philadelphia(PHL)、 New York City(NYC)、 San Francisco(SF)和 LosAngeles(LA)。 在 execute() 方法中,函数遍历城市计算事件和城市之间的距离。现实系统中,地理 空间的索引效率会高很多。 function 声明的字段数量必须和它发射出值的字段数一致。如果不一致,Storm 就会 抛出 IndexOutOfBoundsException 异常。 我们 topology 中的下一个 function 是 HourAssignment,用来转化 Unix 时间戳为纪 元时间的小时,可以用来对事件发生进行时间上的分组操作。HourAssignment 的代码 如下: 62   Storm 分布式实时计算模式 我们重写了这个 function,同时发射了小时的数值,以及由城市、疾病代码、小时组 合而成的 key。实际上,这个组合值会作为聚合计数的唯一标识符,后面会详细解释。 我们 topology 中最后两个 funciton 是用来侦测疾病暴发并且告警的。OutbreakDetector 类的代码如下: 这个 function 提取出了特定城市、疾病、时间的发生次数,并且检查计数是否超过了 设定的阈值。如果超过,发送一个新的字段包括一条告警信息。在上面代码里,注意这个 function 实际上扮演了一个过滤器的角色,但是却作为一个 function 的形式来实现,是因 为需要在 tuple 中添加新的字段。因为 filter 不能改变 tuple,当我们既想过滤又想添加字段 时必须使用 function。 最后一个 function 的功能就是发布一个告警(并且结束程序)。代码如下: 第 3 章 Trident 和传感器数据   63 这个方法非常简单,提取了告警的内容,并写入日志,最后结束整个程序。 3.5 Trident 聚合器 和 function 类似,aggregator(聚合器)允许 topology 组合 tuple。不同的是,它会替换 tuple 的字段和值。有三种聚合器:CombinerAggregator、ReducerAggregator 和 Aggregator。 3.5.1 CombinerAggregator CombinerAggregator 用来将一个集合的 tuple 组合到一个单独的字段中,Combiner 的 签名(Signature)如下所示: Storm 对每个 tuple 调用 init() 方法,然后重复调用 combine() 方法直到一个分片的数 据处理完成。传递给 combine() 方法的两个参数是局部聚合的结果,以及调用了 init() 返回 的值。分片会在后面的部分详细介绍,分片实际上就是 tuples 组成的数据流在同一个机器 上的一个子集。将 tuple 生成的值进行组合之后,Storm 发送组合结果作为一个新的字段。 如果分片是空的,Storm 会发送 zero() 方法执行的返回。 3.5.2 ReducerAggregator ReducerAggregator 接口有一点区别,签名如下: 64   Storm 分布式实时计算模式 Storm 调用 init() 方法来获取原始值。然后为分片中的每一个 tuple 调用 reduce() 方法, 直到分片数据处理完成。第一个参数是局部的聚合结果。这个方法的实现需要将第二个参 数 tuple 合并到局部聚合结果中返回。 3.5.3 Aggregator 最通用的聚合操作是 Aggregator。签名如下所示: Aggregator 接口的 aggregate() 方法和 function 接口的 execute() 方法类似,但是多了 一个 value 参数。这样 Aggregator 就可以在处理 tuple 的时候累积值。注意,在 Aggregator 接口中,aggregate() 和 complete() 方法都有 collector 这个参数,通过它可以发射任意个数 的 tuple。在我们的 topology 例子中,我们利用了一个内置的 Count 的 Aggregator。Count 的实现如下面代码片段所示: 我们在示例 topology 中使用了分组和计数来统计在一个城市附近一个小时内发生疾病 的次数。实现代码如下所示: 第 3 章 Trident 和传感器数据   65 回顾 Storm 在不同机器上的数据的分片,如图 3-2 所示。 图 3-2 groupBy() 方法强制数据重新分片,将特定字段值相同的 tuple 分组到同一个分片 中。为了做到这个,Storm 必须将相似的 tuple 发送到相同的主机上。图 3-3 展示了数据被 groupBy() 重新分组后的分片情况。 图 3-3 分片后,agreagte 函数在每个分片数据的每个分组中运行。在我们的例子里,根据城 市、小时、疾病代码作为分组的关键词。然后 Count aggregator 在每个分组上执行,将计 数发射给下游的消费者组件。 3.6 Trident 状态 我们现在已经给每个 aggregator 的分组数据进行了计数,现在想将信息进行持久化存 66   Storm 分布式实时计算模式 储,以便进一步分析。在 Trident 中,持久化操作从状态管理开始。Trident 对状态有底层 的操作原语,但不同于 Storm API,它不关心要哪些数据会作为状态存储或者如何存储这 些状态。Trident 在高层提供了下述的状态接口: 上面提到了,Trident 将 tuple 分组成一批批数据。每批数据都有自己的事务标识符。在 前面的接口中,Trident 告诉 State 对象什么时候开始提交状态,什么时候提交状态应该结束。 和 function 类似,Stream 对象也有方法向 topology 引入基于状态的操作。更具体说, Trident 有两种数据流:Stream 和 GroupedStream。一个 GroupedStream 是 GroupBy 操作的结 果。在我们的 topology 中,我们根据 HourAssignment function 生成的 key 对 tuple 进行分组。 在 Steam 对象中,下列方法允许 topology 读和写状态信息: 第 3 章 Trident 和传感器数据   67 stateQuery() 方法从 state 生成了一个输入流,不同参数的几个 partitionPersist() 方法允 许 topology 从数据流中的 tuple 更新状态信息。partitionPersist() 方法的操作对象是每个数 据分片。 在 Stream 对象的方法外,GroupedStream 对象允许 topology 对一批 tuple 进行聚合统 计,并且将收集到的信息持久化在 state 中。下列代码是 GroupedSteam 类中和状态相关的 方法: 68   Storm 分布式实时计算模式 和 Steam 对象类似,stateQuery() 方法从State 生成一个输入数据流。不同参数 的几个persistAggregate() 方法允许topology 从数据流中的tuple 更新状态信息。注意 GroupedStream 方法有一个 Aggregator 参数,它在信息写入 State 对象之前执行。 现在考虑将这些 function 应用到我们的例子中来。在我们的系统中,需要将事件 发生的城市、疾病代码、每小时内产生疾病统计量进行持久存储。这样可以生成报表如 表 3-2 所示。 表 3-2 疾  病 城  市 日  期 时  间 疾病统计量 细菌性脑膜炎 San Francisco 3/12/2013 3:00 PM 12 细菌性脑膜炎 San Francisco 3/12/2013 4:00 PM 50 细菌性脑膜炎 San Francisco 3/12/2013 5:00 PM 100 天花 New York 3/13/2013 5:00 PM 6 为了实现这个功能,我们需要将聚集操作中生成的统计量进行持久化存储。我们可以 使用 groupBy 函数返回的 GroupedStream 接口(如前面所示)调用 persistAggregate 方法。 下面代码是示例 topology 中具体的调用方式: 要了解持久化存储,我们首先来看这个方法的第一个参数。Trident 使用一个工厂类 第 3 章 Trident 和传感器数据   69 来生成State 的实例。OutbreakTrendFactory 是我们的topology 提供给Storm 的工厂类。 OutbreakTrendFactory 代码如下: 工厂类返回一个 State 对象,Storm 用它来持久化存储信息。在 Storm 中,有三种类型 的状态。每个类型的描述如表 3-3 所示。 表 3-3 状态类型 说  明 非事务型 没有回滚能力,更新操作是永久性的,commit 操作会被忽略 重复事务型 由同一批 tuple 提供的结果是幂等的 不透明事务型 更新操作基于先前的值,这样一批数据组成不同,持久化的数据也会变 在分布式环境下,数据可能被重放,为了支持计数和状态更新,Trident 将状态更新操作 进行序列化,使用不同的状态更新模式对重放和错误数据进行容错。接下来会介绍这些模式。 3.6.1 重复事务型状态 在重复事务型状态中,最后一批提交的数据的标识符存在数据中。当且仅当一批数据 标识符的序号大于当前标识符时,才进行更新操作。如果小于或者等于当前标识符,将会 忽略更新操作。 为了演示这个实现方法,考虑如表 3-4 所示的数据批次的序列,这些记录对我们例子 中的数据按照 key 进行聚合计数。 表 3-4 批  次 状态更新 1 {SF:320:378911 = 4} 2 {SF:320:378911 = 10} 3 {SF:320:378911 = 8} 这些批次数据按照下列将顺序处理完成: 70   Storm 分布式实时计算模式 1 à 2 à 3 à 3 ( 重放 ) 处理结果将按照表 3-5 中的状态变更操作,中间的一列数据用来存储数据标识符,记 录最近一次合并进状态的数据批次编号。 表 3-5 处理过的批次 状  态 1 { Batch = 1 } { SF:320:378911 = 4 } 2 { Batch = 2 } { SF:320:378911 = 14 } 3 { Batch = 3 } { SF:320:378911 = 22 } 3(重 放) { Batch = 3 } { SF:320:378911 =22} 注意当批次号为 3 的数据完成重放后,不会对状态产生影响,因为 Trident 已经将其更新 合并到 State 中。为了让重复事务型状态正常工作,一批数据在重放时的数据组成不能变化。 3.6.2 不透明型状态 重复事务型状态的实现依赖于数据批次中包含数据保持不变,这种特性在系统遇到错 误时就可能保证不了了。如果发射数据的 spout 发生了局部故障,原始批次数据中的部分 tuple 可能无法重新发送。不透明型状态通过存储当前的状态和前一次状态来允许批次的数 据组成发生变化。 假设我们的数据批次和前面的例子相同,这次在批次号为 3 的数据重新发送时,聚合 统计量会不一样,因为包括了不同的 tuple 的集合,如表 3-6 所示。 表 3-6 批  次 状态更新 1 {SF:320:378911 = 4} 2 {SF:320:378911 = 10} 3 {SF:320:378911 = 8} 3(重 放) {SF:320:378911 = 6} 使用不透明型状态时,状态信息的更新如表 3-7 所示。 表 3-7 处理过的批次 提交过的批次 前一个状态 当前状态 1 1 { } { SF:320:378911 = 4 } 2 2 { SF:320:378911 = 4 } { SF:320:378911 = 14 } 3(应 用) 3 { SF:320:378911 = 14 } { SF:320:378911 = 22 } 3(重 放) 3 { SF:320:378911 = 14 } { SF:320:378911 = 20 } 第 3 章 Trident 和传感器数据   71 注意不透明型状态存储了上一个状态信息,因此,当第 3 批次数据重放时,可以使用 新的聚合计数重新赋值。 你可能会好奇,为什么可以在一批数据提交后还会再次应用这批数据。对应的一种场 景是状态已经更新成功了,但是下游处理失败。在我们的例子中,可能是告警信息发布失 败。这种情况下 Trident 会重新发送这批数据。在最坏的情况下,当要求 spout 重新发送这 批数据时,可能有一个或者多个数据源不可用。 在事务型 spout 中,需要一直等待直到数据源恢复可用。不透明事务型 spout 会发送 当前可用的数据分片,数据的处理照常进行。因为 Trident 是按照序列处理数据批次并记 录状态,因此每个单独的批次都不能延迟,因为延迟可能导致阻塞整个系统。 在实现中,状态类型的选择需要基于 spout 来保证处理行为的幂等性,才能保证不会 错误计数或者破坏了状态。表 3-8 展示了为保证幂等性的可行搭配。 表 3-8 spout 类型 非事务型状态 不透明型状态 重复事务型状态 非事务型 spout 不透明型 spout × 事务型 spout × × 幸运的是,Storm 提供了 map 的实现来屏蔽了持久层进行状态管理的复杂性。尤其是, Trident 提供了 State 实现通过维护额外的信息来实现上面提到的保证。这些对象命名相似: NonTransactionalMap、TransactionalMap、OpaqueMap。 回到我们的例子里,因为我们没有事务型保证,所以选用 NonTransactionalMap 作为 我们的 State 对象。 OutbreakTrendState 对象如下代码所示: 上面代码利用了 MapState 对象,只需要传递一个 backing map。我们的例子中,使用 OutbreakTrendBackingMap。代码如下: 72   Storm 分布式实时计算模式 在我们的 topology 中,实际上没有固化存储数据。我们简单地将数据放入 Concurrent HashMap 中。显然,对于多个机器的环境下,这样是不可行的。然而 BackingMap 是一个 非常巧妙的抽象。只需要将传入 MapState 对象的 backing map 的实例替换就可以更换持久 层的实现。我们在后面章节会看到例子。 3.7 执行 topology OutbreakDetectionTopology 类有下列方法: 第 3 章 Trident 和传感器数据   73 执行这个方法会将 topology 部署到本地集群中。spout 会立即开始发送疾病事件,由 Count aggregator 收集计数。OutbreakDetector 类中的阈值故意设置得很小,这样计数很快 就超过阈值,这时程序结束,输出如下日志: 注意当数据批次成功处理完成时会通知到 coordinator,并且在几批数据后,就超过阈 值了,系统会通过错误消息告知我们:派遣国民警卫队 ! 总结 在本章中,我们建立了一个 topology 处理疾病信息来监测异常情况,这些异常可能说 明有疾病暴发。这个数据流也可以应用到任何类型的数据上,包括天气信息、地震信息或 者交通信息。我们运用 Trident 中的基本原语来构建一个系统,对事件进行统计,即使数 据重放也适用。在本书后面的章节中,我们会利用同样的构造和模式来完成类似的功能。
还剩89页未读

继续阅读

下载pdf到电脑,查找使用更方便

pdf的实际排版效果,会与网站的显示效果略有不同!!

需要 5 金币 [ 分享pdf获得金币 ] 3 人已下载

下载pdf

pdf贡献者

nd7b

贡献于2015-12-09

下载需要 5 金币 [金币充值 ]
亲,您也可以通过 分享原创pdf 来获得金币奖励!
下载pdf