Storm 实时数据处理


大数据技术丛书 Storm 实时数据处理 Storm Real-Time Processing Cookbook (澳) Quinton Anderson 著 卢誉声 译 图书在版编目(CIP)数据 Storm 实时数据处理 /(澳)安德森(Anderson,Q.)著;卢誉声译 . —北京:机械工业出 版社,2014.6 (大数据技术丛书) 书名原文: Storm Real-Time Processing Cookbook ISBN 978-7-111-46663-5 I. S… II. ① 安… ② 卢… III. 数据处理软件 IV. TP274 中国版本图书馆 CIP 数据核字(2014)第 103057 号 本书版权登记号:图字:01-2013-7570 Quinton Anderson: Storm Real-Time Processing Cookbook (ISBN: 978-1-78216-442-5). Copyright © 2013 Packt Publishing. First published in the English language under the title “Storm Real-Time Processing Cookbook”. All rights reserved. Chinese simplified language edition published by China Machine Press. Copyright © 2014 by China Machine Press. 本书中文简体字版由 Packt Publishing 授权机械工业出版社独家出版。未经出版者书面许可,不得以任何方式 复制或抄袭本书内容。 Storm 实时数据处理 [ 澳 ] Quinton Anderson 著 出版发行:机械工业出版社(北京市西城区百万庄大街 22 号 邮政编码:100037) 责任编辑:高婧雅 责任校对:殷 虹 印  刷: 版  次:2014 年 6 月第 1 版第 1 次印刷 开  本:186mm×240mm 1/16 印  张:12.75 书  号: ISBN 978-7-111-46663-5 定  价:49.00 元 凡购本书,如有缺页、倒页、脱页,由本社发行部调换 客服热线:(010)88378991 88361066 投稿热线:(010)88379604 购书热线:(010)68326294 88379649 68995259 读者信箱:hzjsj@hzbook.com 版权所有· 侵权必究 封底无防伪标均为盗版 本书法律顾问:北京大成律师事务所 韩光 / 邹晓东 译 者 序 随着互联网业务数据规模的急剧增加,人们处理和使用数据的模式已然发生了天翻地覆 的变化,传统的技术架构越来越无法适应当今海量数据处理的需求。MapReduce、Hadoop 以 及一些相关技术的出现使得我们能处理的数据量比以前要多得多,这类技术解决了我们面对 海量数据时的措手不及,也在一定程度上缓解了传统技术架构过时的问题。 但是,随着业务数据规模的爆炸式增长和对数据实时处理能力的需求越来越高,原本承载 着海量数据处理任务的 Hadoop 在实时计算处理方面越发显得乏力。原因很简单,像 Hadoop 使用的 MapReduce 这样的数据处理技术,其设计初衷并不是为了满足实时计算的需求。那么 就目前来说,有没有什么行之有效的办法能简单地将 Hadoop 转换成实时计算系统呢? 这个问题的答案可能令人略感失望:没有。Hadoop 作为批处理系统,与实时处理系统 在需求上存在着本质的区别。要做到实时性,不仅需要及时地推送数据以便处理,还要将数 据划分成尽可能小的单位,而 HDFS 存储推送数据的能力已经远不能满足实时性的需求。另 外,Hadoop 还存在配置、一致性和可伸缩性方面的问题。 那么问题来了,怎么才能构建出一个可靠的实时处理系统呢? 答案是 Storm。 从整体架构上看,Storm 和 Hadoop 非常类似。Storm 从架构基础本身就实现了实时计算 和数据处理保序的功能,而且从概念上看,Storm 秉承了许多 Hadoop 的概念、术语和操作 方法,因此如果你对 Hadoop 非常熟悉,那么将 Storm 与 Hadoop 集成也不是什么难事。通 过 Storm Trident 提供的高级抽象元语,你可以像 Hadoop Cascading 简化并行批处理那样简 化并行实时数据处理。 我本人在实时计算服务器开发方面具有一定的经验,对大数据处理解决方案十分感兴 趣,也对相关技术有一些了解,并且有幸承担了本书的翻译工作。本书从多个角度解析了有 关 Storm 的最佳实践,无论是从最基本的应用、多语言特性、完整业务系统的实现,还是最 终交付至产品环境,本书都给出了翔实的最佳实践方法。不仅如此,本书还从产品持续交付 的角度,分析并实践了集成、测试和交付的所有步骤,让人受益匪浅。相信你会和我一样, 通过阅读本书,能对 Storm 本身及构建成熟实时计算系统的方法有更进一步的了解。 翻译本书对我来说其实也是一个渐进学习的过程,书中提及了大量方法和工具的应用, IV 让我从零散的概念分支逐渐有了清晰的知识主干,形成了系统的知识点。通过对本书的翻 译,我对这个领域的内容有了更加深刻的理解。这也是我翻译此书最大的收获之一。在翻译 过程中,我不仅查阅了大量国内外的相关资料,还与原书作者进行了深入的沟通,力求做到 专业词汇准确权威,书中内容正确。 在翻译过程中我得到了很多人的帮助,特在此一一感谢。首先是我的家人,你们是我学 习和前进的动力。感谢鲁昌华教授,在我的成长道路上给予了很大的支持和鼓励。感谢我在 思科系统(中国)研发的同事们,在我的学习工作中给予了很大帮助。感谢我的好友金柳颀, 感谢你在翻译过程中的通力合作以及在技术问题上的共同探讨。还要感谢机械工业出版社华 章公司对我的信任与支持。 现在我怀着期盼和忐忑的心情将这本译著呈献给大家,我渴望得到您的认可,更渴望和 您成为朋友。如果您有任何问题和建议,请与我联系( samblg@me.com)。让我们一起探讨, 共同进步。 卢誉声 前  言 开源已经在许多方面从根本上改变了软件的原有面貌。在很多应用环境中,人们都会争 论使用开源带来的好处和坏处,主要体现在支持、风险以及总体拥有成本等方面。开源在某 些领域比其他领域流行,比如在研究机构中就比在大型金融服务提供商中应用得多。在某些 新兴领域,比如 Web 服务供应商、内容供应商以及社交网络等,开源软件占据主导地位。其 原因是多方面的,其中成本是一个非常大的因素。怎么说呢?如果方案要上升到网络规模, 那么一般会应用“大数据”解决方案,以期获得更好的效果。凭借极佳的可用性,这些解决 方案每秒处理数百万条请求,同时通过各式各样的服务为客户提供定制体验。 这种规模的系统设计需要我们用不同的方式思考问题,采用不同的架构解决方案,而且 要了解什么时候应该接受系统的复杂性以及什么时候应该避免这种复杂性。作为行业中人, 我们应该掌握这种大规模批处理系统的设计方法。由 MapReduce、Bulk Synchronous Parallel 以及其他计算范式衍生而来的大规模计算集群已经广泛实施,并且得到了很好的理解。开源 掀起了创新浪潮,并推动其发展,而即使是顶级软件开发商也只能努力将 Hadoop 集成到自 家的技术架构中,更别提试图与开源竞争了。 然而,客户是永不满足的,他们想要更多的数据、更多的服务、更多的价值、更多的便 利,而且现在就要,并希望成本更低。随着数据量的增加,对实时响应时间的需求也在提 高。专注于实时性、规模化的计算平台新时代已经来临。这带来了许多新的挑战,不管是理 论上还是实践上都具有挑战性。 本书将帮助你掌握一个平台:Storm 处理系统。Storm 处理系统是一个开源的、实时的计 算平台,最初由社交分析公司 Backtype 的 Nathan Marz 编写,后来被 Twitter 收购,并作为 开源软件发布。从那时起,该平台茁壮成长,目前已经成长为一个日益扩大的开源社区:拥 有众多用户、贡献者以及产品页面上的成功案例。写这篇前言的时候,该项目在 GitHub 上 拥有超过 6000 个星、3000 多个 Twitter 粉丝,每个节点每秒可以完成超过 100 万次交易,而 且有近 80 名相关用户在使用 Storm 产品实例。这些数字极其可观。此外,在本书的结尾你 会发现,无论你用哪种语言(与你的思维方式以及解决方案交付方式相一致的语言)研发系 统,以 Storm 平台为基础都会非常愉快。 本书通过一系列实例指导你学习 Storm。书中的例子源于实战用例,并随着内容的展开 VI 介绍各种概念。此外,本书旨在围绕 Storm 技术促进 DevOps 实践,使读者能够开发 Storm 解决方案,同时可靠地交付具有价值的产品。1 Storm 处理系统简介 开源项目普遍存在缺乏文档的问题,但 Storm 并不存在这个问题,其项目有着高质量且 编写良好的文档,由活跃的用户社区提供支持。本书并不想照本宣科,而是通过丰富的例子 对现有资料进行补充,在必要的时候辅以概念和理论的探讨。强烈建议读者在阅读第 1 章之 前,花些时间阅读 Storm 的入门文档,具体有以下几个网页: ‰ https://github.com/nathanmarz/storm/wiki/Rationale; ‰ https://github.com/nathanmarz/storm/wiki/Concepts; ‰ https://github.com/nathanmarz/storm/wiki/Understanding-the-parallelism-of-a-Storm- topology。 章节介绍 第 1 章 介绍搭建 Storm 本地开发环境的方法,包括所需工具和推荐的开发流程。 第 2 章 教你创建一个日志流处理解决方案、基本的统计面板和日志搜索功能。 第 3 章 介绍 Trident,Trident 是基于 Storm 元语进行实时计算的一类更高级的处理数 据流的抽象元语,支持高效的企业数据管道。 第 4 章 介绍实现分布式远程过程调用的方法。 第 5 章 介绍在不同的编程语言中实现 Topology 的方法,并在现有技术基础上增加一些 新的技术。 第 6 章 介绍 Storm 与 Hadoop 的集成方法,创建一个完整的 Lambda 架构。 第 7 章 简要介绍机器学习,讨论多种基于 Storm 实时处理项目的实现方法。 第 8 章 介绍搭建持续交付流水线和可靠部署 Storm 集群到产品环境的方法。 第 9 章 介绍多种在 Amazon 云计算平台中自动化配置 Storm 集群的方法。 阅读前提 本书采用 Ubuntu 或 Debian 操作系统作为基本的开发环境。第 1 章会介绍如何配置其余   DevOps是一组过程、方法和系统的统称,用于促进开发、技术运营和质量保障部门之间的沟通、协作与整 合。DevOps的目的是实现业务敏捷,它将敏捷开发扩展到了运维上,通过快速、反应灵敏且稳定的业务运 维,使其能够与开发过程的创新保持同步,实现真正的IT融合。—译者注 VII 必备的工具。如果你不能使用 Ubuntu 作为开发用的操作系统,任何基于 *Nix 的操作系统都 是可以的,因为书中所有示例都以 bash 命令行界面为基础构建。 读者对象 如果你想学习实时处理技术,或想了解通过 Storm 实现实时处理的方法,那么本书非常 适合你。本书假定你是一名 Java 开发者,要是你还具备 Clojure、C++ 和 Ruby 的开发经验, 那将是锦上添花的事情,不过那并不是必需的。如果你还了解一些 Hadoop 或类似的技术就 更好了。 本书版式约定 在本书中,读者会发现针对不同信息类型的文本样式。下面是这些样式的示例和解释。 代码段版式如下所示: Preface 4 A block of code is set as follows: github-releases http://oss.sonatype.org/content/repositories /github-releases/ clojars.org http://clojars.org/repo twitter4j http://twitter4j.org/maven2 Any command-line input or output is written as follows: mkdir FirstGitProject cd FirstGitProject git init New terms and important words are shown in bold. Words that you see on the screen, in menus or dialog boxes for example, appear in the text like this: "Uncheck the Use default location checkbox." Warnings or important notes appear in a box like this. Tips and tricks appear like this. 所有命令行输入和输出如下所示: Preface 4 A block of code is set as follows: github-releases http://oss.sonatype.org/content/repositories /github-releases/ clojars.org http://clojars.org/repo twitter4j http://twitter4j.org/maven2 Any command-line input or output is written as follows: mkdir FirstGitProject cd FirstGitProject git init New terms and important words are shown in bold. Words that you see on the screen, in menus or dialog boxes for example, appear in the text like this: "Uncheck the Use default location checkbox." Warnings or important notes appear in a box like this. Tips and tricks appear like this. 下载本书的示例代码 访问 http://www.packtpub.com/,可以下载本书及你所购买的所有 Packt 图书的示例代 码。如果你是从其他地方购买的本书英文版,可以访问 http://www.packtpub.com/support 并 VIII 注册,以便通过电子邮件取得示例文件。 作者通过 Bitbucket 账户来维护开源版本的代码:https://bitbucket.org/qanderson。 致谢 感谢 Storm 社区,正是他们的努力构建出了开源社区中如此出色的平台,当然,还要 特别感谢 Storm 的核心作者—Nathan Marz。 我要感谢我的妻子和孩子对我长时间写作本书和其他相关项目的宽容,感谢这段时间 他们给予我的帮助,我爱他们。此外,我还要感谢所有参与本书审校工作的朋友。 目  录 译者序 前言 第 1 章 搭建开发环境 / 1 1.1 简介 / 1 1.2 搭建开发环境 / 1 1.3 分布式版本控制 / 3 1.4 创建“Hello World”Topology / 6 1.5 创建 Storm 集群——配置机器 / 12 1.6 创建 Storm 集群——配置 Storm / 18 1.7 获取基本的点击率统计信息 / 23 1.8 对 Bolt 进行单元测试 / 31 1.9 实现集成测试 / 34 1.10 将产品部署到集群 / 37 第 2 章 日志流处理 / 38 2.1 简介 / 38 2.2 创建日志代理 / 38 2.3 创建日志 Spout / 40 2.4 基于规则的日志流分析 / 45 2.5 索引与持久化日志数据 / 49 2.6 统计与持久化日志统计信息 / 53 2.7 为日志流集群创建集成测试 / 55 2.8 创建日志分析面板 / 59 X 第 3 章 使用 Trident 计算单词重要度 / 71 3.1 简介 / 71 3.2 使用 Twitter 过滤器创建 URL 流 / 71 3.3 从文件中获取整洁的词流 / 76 3.4 计算每个单词的相对重要度 / 81 第 4 章 分布式远程过程调用 / 85 4.1 简介 / 85 4.2 通过 DPRC 实现所需处理流程 / 85 4.3 对 Trident Topology 进行集成测试 / 90 4.4 实现滚动窗口 Topology / 95 4.5 在集成测试中模拟时间 / 98 第 5 章 在不同语言中实现 Topology / 100 5.1 简介 / 100 5.2 在 Qt 中实现多语言协议 / 100 5.3 在 Qt 中实现 SplitSentence Bolt / 105 5.4 在 Ruby 中实现计数 Bolt / 108 5.5 在 Clojure 中实现单词计数 Topology / 109 第 6 章 Storm 与 Hadoop 集成 / 113 6.1 简介 / 113 6.2 在 Hadoop 中实现 TF-IDF 算法 / 115 6.3 持久化来自 Storm 的文件 / 121 6.4 集成批处理与实时视图 / 122 第 7 章 实时机器学习 / 127 7.1 简介 / 127 7.2 实现事务性 Topology / 129 7.3 在 R 中创建随机森林分类模型 / 134 7.4 基于随机森林的事务流业务分类 / 143 XI 7.5 在 R 中创建关联规则模型 / 149 7.6 创建推荐引擎 / 152 7.7 实时在线机器学习 / 157 第 8 章 持续交付 / 162 8.1 简介 / 162 8.2 搭建 CI 服务器 / 162 8.3 搭建系统环境 / 164 8.4 定义交付流水线 / 166 8.5 实现自动化验收测试 / 170 第 9 章 在 AWS 上部署 Storm / 177 9.1 简介 / 177 9.2 使用 Pallet 在 AWS 上部署 Storm / 177 9.3  搭建虚拟私有云 / 181 9.4 使用 Vagrant 在虚拟私有云上部署 Storm / 189 第 1 章 搭建开发环境 1.1 简介 本章将简要介绍 Storm 处理系统。这将涵盖所有你想知道的内容,从搭建你的开发环境 到部署 Topology 时需要注意的操作关注事项,再到基本的质量实践,比如对 Storm Topology 进行单元测试和集成测试。在阅读完本章后,你将能够构建、测试和交付基本的 Storm Topology。 本书并不准备对 Storm 处理系统及其元语和架构进行理论介绍。我们假定你在阅读本书 之前已经通过浏览如 Storm wiki 这样的在线资源了解了 Storm 的基本概念。 当系统在产品环境中能持续可靠地产生商业价值时,才能交付系统。为了实现 这个目的,在开发 Storm Topology 时,必须始终考虑质量问题和操作注意事项。 1.2 搭建开发环境 开发环境涵盖了构建 Storm Topology 所需的各种工具和系统。虽说本书重点关注的是每 个有技术侧重点的 Storm 交付,但需要指出的是,对于一个软件开发团队来说,无论使用集 中式开发环境还是分布式开发环境,都需要更多的工具和流程来保证高效工作,而且不能仅 仅局限于本书所讨论的内容。 无论是为了将来的开发工作,还是为了实现书中的例子,以下几类工具和流程都是快速 搭建开发环境必不可少的: ‰ SDK ‰ 版本控制 ‰ 构建环境 ‰ 系统配置工具 ‰ 集群配置工具 书中描述的配置和安装方法都基于 Ubuntu。不管怎么说,适用于 Ubuntu 的这些方法还是 比较容易移植到其他 Linux 发行版中的。如果将这些方法应用于其他发行版时出现任何问题, 你可以通过 Storm 邮件列表寻求支持:https://groups.google.com/forum/#!forum/storm-user。 2   第1章 搭建开发环境 环境变量是导致系统可操作性和可用性下降的“罪魁祸首”,如果部署环境 与开发环境不同,那么环境变量很可能在这时引起大问题。所以尽可能让开 发环境和部署环境保持一致。 1.2.1 实战 Step01  从 Oracle 网 站(http://www.oracle.com/technetwork/java/javase/downloads/index.html) 下载最新版本的 J2SE 6 SDK,并通过以下命令进行安装: Setting Up Your Development Environment 8 This book does not provide a theoretical introduction to the Storm processor and its primitives and architecture. The author assumes that the readers have orientated themselves through online resources such as the Storm wiki. Delivery of systems is only achieved once a system is delivering a business value in a production environment consistently and reliably. In order to achieve this, quality and operational concerns must always be taken into account while developing your Storm topologies. Setting up your development environment A development environment consists of all the tools and systems that are required in order to start building Storm topologies. The focus of this book is on individual delivery of Storm with a focus on the technology; however, it must be noted that the development environment for a software development team, be it centralized or distributed, would require much more tooling and processes to be effective and is considered outside the scope of this book. The following classes of tools and processes are required in order to effectively set up the development environment, not only from an on-going perspective, but also in terms of implementing the recipes in this book: f SDK(s) f Version control f Build environment f System provisioning tooling f Cluster provisioning tooling The provisioning and installation recipes in this book are based on Ubuntu; they are, however, quite portable to other Linux distributions. If you have any issues working with another distribution using these instructions, please seek support from the Storm mailing list at https://groups.google.com/forum/#!forum/storm-user. Environmental variables are the enemy of maintainable and available systems. Developing on one environment type and deploying on another is a very risky example of such a variable. Developing on your target type should be done whenever possible. How to do it… 1. Download the latest J2SE 6 SDK from Oracle's website (http://www.oracle.com/ technetwork/java/javase/downloads/index.html) and install it as follows: chmod 775 jdk-6u35-linux-x64.bin Chapter 1 9 yes | jdk-6u35-linux-x64.bin mv jdk1.6.0_35 /opt ln -s /opt/jdk1.6.0_35/bin/java /usr/bin ln -s /opt/jdk1.6.0_35/bin/javac /usr/bin JAVA_HOME=/opt/jdk1.6.0_35 export JAVA_HOME PATH=$PATH:$JAVA_HOME/bin export PATH 2. The version control system, Git, must then be installed: sudo apt-get install git 3. The installation should then be followed by Maven, the build system: sudo apt-get install mvn 4. Puppet, Vagrant, and VirtualBox must then be installed in order to provide application and environment provisioning: sudo apt-get install virtualbox puppet vagrant 5. Finally, you need to install an IDE: sudo apt-get install eclipse There is currently a debate around which fork of the Java SDK is to be used since Sun was acquired by Oracle. While the author understood the need for OpenJDK, the recipes in this book have been tested using the Oracle JDK. In general, there is no difference between OpenJDK and Oracle JDK, apart from the Oracle JDK being more stable but lagging behind in terms of features. How it works… The JDK is obviously required for any Java development to take place. GIT is an open source distributed version control system that has received wide adoption in recent years. A brief introduction to GIT will be presented shortly. Maven is a widely used build system that prefers convention over configuration. Maven includes many useful features including the Project Object Model (POM), which allows us to manage our libraries, dependencies, and versions in an effective manner. Maven is backed by many binary repositories on the Internet that allow us to transparently maintain binary dependencies correctly and package our topologies for deployment. Step02 然后安装版本控制系统 Git: Chapter 1 9 yes | jdk-6u35-linux-x64.bin mv jdk1.6.0_35 /opt ln -s /opt/jdk1.6.0_35/bin/java /usr/bin ln -s /opt/jdk1.6.0_35/bin/javac /usr/bin JAVA_HOME=/opt/jdk1.6.0_35 export JAVA_HOME PATH=$PATH:$JAVA_HOME/bin export PATH 2. The version control system, Git, must then be installed: sudo apt-get install git 3. The installation should then be followed by Maven, the build system: sudo apt-get install mvn 4. Puppet, Vagrant, and VirtualBox must then be installed in order to provide application and environment provisioning: sudo apt-get install virtualbox puppet vagrant 5. Finally, you need to install an IDE: sudo apt-get install eclipse There is currently a debate around which fork of the Java SDK is to be used since Sun was acquired by Oracle. While the author understood the need for OpenJDK, the recipes in this book have been tested using the Oracle JDK. In general, there is no difference between OpenJDK and Oracle JDK, apart from the Oracle JDK being more stable but lagging behind in terms of features. How it works… The JDK is obviously required for any Java development to take place. GIT is an open source distributed version control system that has received wide adoption in recent years. A brief introduction to GIT will be presented shortly. Maven is a widely used build system that prefers convention over configuration. Maven includes many useful features including the Project Object Model (POM), which allows us to manage our libraries, dependencies, and versions in an effective manner. Maven is backed by many binary repositories on the Internet that allow us to transparently maintain binary dependencies correctly and package our topologies for deployment. Step03 接下来,安装构建系统 Maven: Chapter 1 9 yes | jdk-6u35-linux-x64.bin mv jdk1.6.0_35 /opt ln -s /opt/jdk1.6.0_35/bin/java /usr/bin ln -s /opt/jdk1.6.0_35/bin/javac /usr/bin JAVA_HOME=/opt/jdk1.6.0_35 export JAVA_HOME PATH=$PATH:$JAVA_HOME/bin export PATH 2. The version control system, Git, must then be installed: sudo apt-get install git 3. The installation should then be followed by Maven, the build system: sudo apt-get install mvn 4. Puppet, Vagrant, and VirtualBox must then be installed in order to provide application and environment provisioning: sudo apt-get install virtualbox puppet vagrant 5. Finally, you need to install an IDE: sudo apt-get install eclipse There is currently a debate around which fork of the Java SDK is to be used since Sun was acquired by Oracle. While the author understood the need for OpenJDK, the recipes in this book have been tested using the Oracle JDK. In general, there is no difference between OpenJDK and Oracle JDK, apart from the Oracle JDK being more stable but lagging behind in terms of features. How it works… The JDK is obviously required for any Java development to take place. GIT is an open source distributed version control system that has received wide adoption in recent years. A brief introduction to GIT will be presented shortly. Maven is a widely used build system that prefers convention over configuration. Maven includes many useful features including the Project Object Model (POM), which allows us to manage our libraries, dependencies, and versions in an effective manner. Maven is backed by many binary repositories on the Internet that allow us to transparently maintain binary dependencies correctly and package our topologies for deployment. Step04 安装 Puppet、Vagrant 和 VirtualBox,以便于对应用程序和环境进行配置: Chapter 1 9 yes | jdk-6u35-linux-x64.bin mv jdk1.6.0_35 /opt ln -s /opt/jdk1.6.0_35/bin/java /usr/bin ln -s /opt/jdk1.6.0_35/bin/javac /usr/bin JAVA_HOME=/opt/jdk1.6.0_35 export JAVA_HOME PATH=$PATH:$JAVA_HOME/bin export PATH 2. The version control system, Git, must then be installed: sudo apt-get install git 3. The installation should then be followed by Maven, the build system: sudo apt-get install mvn 4. Puppet, Vagrant, and VirtualBox must then be installed in order to provide application and environment provisioning: sudo apt-get install virtualbox puppet vagrant 5. Finally, you need to install an IDE: sudo apt-get install eclipse There is currently a debate around which fork of the Java SDK is to be used since Sun was acquired by Oracle. While the author understood the need for OpenJDK, the recipes in this book have been tested using the Oracle JDK. In general, there is no difference between OpenJDK and Oracle JDK, apart from the Oracle JDK being more stable but lagging behind in terms of features. How it works… The JDK is obviously required for any Java development to take place. GIT is an open source distributed version control system that has received wide adoption in recent years. A brief introduction to GIT will be presented shortly. Maven is a widely used build system that prefers convention over configuration. Maven includes many useful features including the Project Object Model (POM), which allows us to manage our libraries, dependencies, and versions in an effective manner. Maven is backed by many binary repositories on the Internet that allow us to transparently maintain binary dependencies correctly and package our topologies for deployment. Step05 最后,你还需要安装一个集成开发环境(IDE): Chapter 1 9 yes | jdk-6u35-linux-x64.bin mv jdk1.6.0_35 /opt ln -s /opt/jdk1.6.0_35/bin/java /usr/bin ln -s /opt/jdk1.6.0_35/bin/javac /usr/bin JAVA_HOME=/opt/jdk1.6.0_35 export JAVA_HOME PATH=$PATH:$JAVA_HOME/bin export PATH 2. The version control system, Git, must then be installed: sudo apt-get install git 3. The installation should then be followed by Maven, the build system: sudo apt-get install mvn 4. Puppet, Vagrant, and VirtualBox must then be installed in order to provide application and environment provisioning: sudo apt-get install virtualbox puppet vagrant 5. Finally, you need to install an IDE: sudo apt-get install eclipse There is currently a debate around which fork of the Java SDK is to be used since Sun was acquired by Oracle. While the author understood the need for OpenJDK, the recipes in this book have been tested using the Oracle JDK. In general, there is no difference between OpenJDK and Oracle JDK, apart from the Oracle JDK being more stable but lagging behind in terms of features. How it works… The JDK is obviously required for any Java development to take place. GIT is an open source distributed version control system that has received wide adoption in recent years. A brief introduction to GIT will be presented shortly. Maven is a widely used build system that prefers convention over configuration. Maven includes many useful features including the Project Object Model (POM), which allows us to manage our libraries, dependencies, and versions in an effective manner. Maven is backed by many binary repositories on the Internet that allow us to transparently maintain binary dependencies correctly and package our topologies for deployment. 自从 Sun 公司被 Oracle 收购以后,人们就一直在争论该使用哪种 Java SDK。 虽说作者理解 OpenJDK 的必要性,但书中的例子都是在 Oracle JDK 下测 试通过的。总的来说,OpenJDK 和 Oracle JDK 没有什么区别,只是 Oracle JDK 为了追求稳定而在功能方面相对滞后。 1.2.2 解析 对于任何 Java 开发工作来说 JDK 都是必不可少的。GIT 是近几年被广泛采用的开源分 布式版本控制系统。稍后我们将简要介绍 GIT。 Maven 是一种广泛使用的“约定优于配置”的构建系统。Maven 包含了许多有用的功能, 1.3 分布式版本控制   3 比如项目对象模型(POM), POM 能够让我们以有效的方式管理库、依赖和版本。Maven 在 互联网上拥有许多二进制仓库可供使用,所以我们可以直接用恰当的方式来维护二进制依赖 项,然后打包部署 Topology。 由于 DevOps 和持续交付的不断发展,Puppet 系统已经被广泛用于为 Linux、其他一些 操作系统和应用程序提供声明式服务器配置。Puppet 能够对服务器的状态和部署环境进行编 程,这一点非常重要,因为这样就可以通过 GIT 版本控制系统管理服务器状态,而且对于服 务器的修改都可以安全删除。这么做好处很多,包括可确定的服务器平均恢复时间(MTTR) 和审计跟踪,说白了就是让系统更加稳定。这对实现持续交付来说至关重要。 Vagrant 是一个在开发过程中非常有用的工具,它能自动配置 VirtualBox 虚拟机,这对 Storm 处理系统这样的以集群为基础的技术来说尤为重要。为了测试一个集群,你必须构建 一个真正的机群或配置多台虚拟机。Vagrant 能准确无误地在本地配置虚拟机。 虚拟机在 IT 运维和系统开发过程中能够大显身手,但需要指出的是,虽然 我们都知道在本地部署的虚拟机性能不太理想,但其可用性却完全依赖于可 用的内存容量。处理能力不是需要考虑的主要问题,因为现代处理器的利用 率普遍严重偏低,尽管处理 Topology 时利用率会高一些。建议你的计算机 至少保留 8GB 的内存容量。 1.3 分布式版本控制 传统版本控制系统都是集中式的。每个客户端都包含一份从当前版本签出的文件,而当 前版本则取决于客户端使用的分支。所有历史版本都会存储在服务器上。这样的做法效果不 错,不仅能让团队紧密协作,还能知道其他成员在做什么工作。 但集中式服务器存在一些较为明显的缺点,这让分布式版本控制系统的应用范围越来越 广。首先,集中式服务器存在单点故障问题,如果服务器由于某种原因死机或无法访问,开 发人员会因此难以继续工作。其次,如果服务器上的数据由于某种原因损坏或丢失,代码库 的历史记录也就随之丢失。 基于以上两个原因,开源项目极大地推动了分布式版本控制的发展,但究其主要原因, 还是分布式系统能帮助我们实现协作模型。开发人员可以在本地环境下用自己的方式进行工 作,然后在方便的时候,一次性或多次将这些改动分布到一个或多个远程代码仓库。 除此之外,还有一个很明显的优势就是,由于每一个客户端都拥有代码仓库的完整镜 像,因此会自然存在多个仓库备份。所以,如果任何一个客户端或服务器出现问题,都可以 轻松恢复,实现数据还原。 4   第1章 搭建开发环境 实战 本书使用 Git 作为分布式版本控制系统。如果要建立一个代码仓库,你可以克隆已存在 的代码仓库,或者通过初始化创建一个新的代码仓库。对于新创建的项目来说,应该通过初 始化创建一个新的代码仓库。 Step01 通过以下命令创建项目目录: Chapter 1 11 The obvious additional advantage is that there naturally exist many backups of the repository because each client has a complete mirror of the repository; therefore, if any client or server dies, it can simply be replicated back, once it has been restored. How to do it… Git is used in this book as the distributed version control system. In order to create a repository, you need to either clone or initialize a repository. For a new project that you create, the repository should be initialized. 1. First, let's create our project directory, as follows: mkdir FirstGitProject cd FirstGitProject git init 2. In order to test if the workflow is working, we need some files in our repository. touch README.txt vim README.txt Using vim, or any other text editor, simply add some descriptive text and press the Insert key. Once you have finished typing, simply hit the Esc key and then a colon, followed by wq; hit the Enter key. 3. Before you commit, review the status of the repository. git status This should give you an output that looks similar to the following: # On branch master # Initial commit # Untracked files: # README.txt 4. Git requires that you add all files and folders manually; you can do it as follows: git add README.txt 5. Then commit the file using the following: git commit –a 6. This will open a vim editor and allow you to add your comments. You can specify the commit message directly while issuing the command, using the –m flag. Step02 为了确认我们之前的步骤是否有效,需要在代码仓库中添加一些文件: Chapter 1 11 The obvious additional advantage is that there naturally exist many backups of the repository because each client has a complete mirror of the repository; therefore, if any client or server dies, it can simply be replicated back, once it has been restored. How to do it… Git is used in this book as the distributed version control system. In order to create a repository, you need to either clone or initialize a repository. For a new project that you create, the repository should be initialized. 1. First, let's create our project directory, as follows: mkdir FirstGitProject cd FirstGitProject git init 2. In order to test if the workflow is working, we need some files in our repository. touch README.txt vim README.txt Using vim, or any other text editor, simply add some descriptive text and press the Insert key. Once you have finished typing, simply hit the Esc key and then a colon, followed by wq; hit the Enter key. 3. Before you commit, review the status of the repository. git status This should give you an output that looks similar to the following: # On branch master # Initial commit # Untracked files: # README.txt 4. Git requires that you add all files and folders manually; you can do it as follows: git add README.txt 5. Then commit the file using the following: git commit –a 6. This will open a vim editor and allow you to add your comments. You can specify the commit message directly while issuing the command, using the –m flag. 使用 vim 或任何其他文本编辑器,简单增加一些描述性文字并按下插入键。完成后按下 Esc 键,输入“:wq”,并按下回车键保存。 Step03 在提交之前,我们来看看代码仓库的状态: Chapter 1 11 The obvious additional advantage is that there naturally exist many backups of the repository because each client has a complete mirror of the repository; therefore, if any client or server dies, it can simply be replicated back, once it has been restored. How to do it… Git is used in this book as the distributed version control system. In order to create a repository, you need to either clone or initialize a repository. For a new project that you create, the repository should be initialized. 1. First, let's create our project directory, as follows: mkdir FirstGitProject cd FirstGitProject git init 2. In order to test if the workflow is working, we need some files in our repository. touch README.txt vim README.txt Using vim, or any other text editor, simply add some descriptive text and press the Insert key. Once you have finished typing, simply hit the Esc key and then a colon, followed by wq; hit the Enter key. 3. Before you commit, review the status of the repository. git status This should give you an output that looks similar to the following: # On branch master # Initial commit # Untracked files: # README.txt 4. Git requires that you add all files and folders manually; you can do it as follows: git add README.txt 5. Then commit the file using the following: git commit –a 6. This will open a vim editor and allow you to add your comments. You can specify the commit message directly while issuing the command, using the –m flag. 该命令会显示出类似于以下内容的信息: Chapter 1 11 The obvious additional advantage is that there naturally exist many backups of the repository because each client has a complete mirror of the repository; therefore, if any client or server dies, it can simply be replicated back, once it has been restored. How to do it… Git is used in this book as the distributed version control system. In order to create a repository, you need to either clone or initialize a repository. For a new project that you create, the repository should be initialized. 1. First, let's create our project directory, as follows: mkdir FirstGitProject cd FirstGitProject git init 2. In order to test if the workflow is working, we need some files in our repository. touch README.txt vim README.txt Using vim, or any other text editor, simply add some descriptive text and press the Insert key. Once you have finished typing, simply hit the Esc key and then a colon, followed by wq; hit the Enter key. 3. Before you commit, review the status of the repository. git status This should give you an output that looks similar to the following: # On branch master # Initial commit # Untracked files: # README.txt 4. Git requires that you add all files and folders manually; you can do it as follows: git add README.txt 5. Then commit the file using the following: git commit –a 6. This will open a vim editor and allow you to add your comments. You can specify the commit message directly while issuing the command, using the –m flag. Step04 你需要使用以下命令,手动添加所有的文件和目录到 Git 代码仓库中: Chapter 1 11 The obvious additional advantage is that there naturally exist many backups of the repository because each client has a complete mirror of the repository; therefore, if any client or server dies, it can simply be replicated back, once it has been restored. How to do it… Git is used in this book as the distributed version control system. In order to create a repository, you need to either clone or initialize a repository. For a new project that you create, the repository should be initialized. 1. First, let's create our project directory, as follows: mkdir FirstGitProject cd FirstGitProject git init 2. In order to test if the workflow is working, we need some files in our repository. touch README.txt vim README.txt Using vim, or any other text editor, simply add some descriptive text and press the Insert key. Once you have finished typing, simply hit the Esc key and then a colon, followed by wq; hit the Enter key. 3. Before you commit, review the status of the repository. git status This should give you an output that looks similar to the following: # On branch master # Initial commit # Untracked files: # README.txt 4. Git requires that you add all files and folders manually; you can do it as follows: git add README.txt 5. Then commit the file using the following: git commit –a 6. This will open a vim editor and allow you to add your comments. You can specify the commit message directly while issuing the command, using the –m flag. Step05 然后提交: Chapter 1 11 The obvious additional advantage is that there naturally exist many backups of the repository because each client has a complete mirror of the repository; therefore, if any client or server dies, it can simply be replicated back, once it has been restored. How to do it… Git is used in this book as the distributed version control system. In order to create a repository, you need to either clone or initialize a repository. For a new project that you create, the repository should be initialized. 1. First, let's create our project directory, as follows: mkdir FirstGitProject cd FirstGitProject git init 2. In order to test if the workflow is working, we need some files in our repository. touch README.txt vim README.txt Using vim, or any other text editor, simply add some descriptive text and press the Insert key. Once you have finished typing, simply hit the Esc key and then a colon, followed by wq; hit the Enter key. 3. Before you commit, review the status of the repository. git status This should give you an output that looks similar to the following: # On branch master # Initial commit # Untracked files: # README.txt 4. Git requires that you add all files and folders manually; you can do it as follows: git add README.txt 5. Then commit the file using the following: git commit –a 6. This will open a vim editor and allow you to add your comments. You can specify the commit message directly while issuing the command, using the –m flag. Step06 这时会打开 vim 编辑器并让你添加注释。 你可以在输入命令的同时,使用 -m 参数指定要提交的描述信息。 如果不把代码仓库存储在远程主机上,你将遇到和集中式存储一样的风险问题。所以你 应该把代码仓库存储在远程主机上。 www.github.com 和 www.bitbucket.org 是两个不错的免 费 Git 托管服务,这两个服务都可以通过加密保证你的知识产权不被公共用户访问。本书使 用 bitbucket.org 作为示例。要使用该远程托管服务,只需在你的浏览器上访问该站点并注册 一个账户。 注册成功后,点击菜单创建一个新的代码仓库,如图 1-1 所示。 Chapter 1 13 Once the repository is created, you need to add the remote repository to your local repository and push the changes to the remote repository. git remote add origin https://[user]@bitbucket.org/[user]/ firstgitproject.git git push origin master You must replace [user] in the preceding command with your registered username. Cloning of a repository will be covered in later recipes, as will some standard version control workflows. Creating a "Hello World" topology The "Hello World" topology, as with all "Hello World" applications, is of no real use to anyone, except to illustrate some really basic concepts. The "Hello World" topology will show how to create a Storm project including a simple spout and bolt, build it, and execute it in the local cluster mode. How to do it… 1. Create a new project folder and initialize your Git repository. mkdir HelloWorld cd HelloWorld git init 2. We must then create the Maven project file as follows: vim pom.xml 3. Using vim, or any other text editor, you need to create the basic XML tags and project metadata for the "Hello World" project. 4.0.0 storm.cookbook hello-world 6   第1章 搭建开发环境 1.4 创建“Hello World”Topology “ Hello World” Topology 和其他所有的“ Hello World”应用程序一样,并没有什么实际 用途,其目的在于说明一些基本概念。“ Hello World” Topology 结构将演示如何创建一个包 含简单的 Spout 和 Bolt 的项目,如何构建项目,并在本地集群模式下运行项目。 1.4.1 实战 Step01 新建一个项目目录,并初始化你的 Git 代码仓库。 Chapter 1 13 Once the repository is created, you need to add the remote repository to your local repository and push the changes to the remote repository. git remote add origin https://[user]@bitbucket.org/[user]/ firstgitproject.git git push origin master You must replace [user] in the preceding command with your registered username. Cloning of a repository will be covered in later recipes, as will some standard version control workflows. Creating a "Hello World" topology The "Hello World" topology, as with all "Hello World" applications, is of no real use to anyone, except to illustrate some really basic concepts. The "Hello World" topology will show how to create a Storm project including a simple spout and bolt, build it, and execute it in the local cluster mode. How to do it… 1. Create a new project folder and initialize your Git repository. mkdir HelloWorld cd HelloWorld git init 2. We must then create the Maven project file as follows: vim pom.xml 3. Using vim, or any other text editor, you need to create the basic XML tags and project metadata for the "Hello World" project. 4.0.0 storm.cookbook hello-world Step02 然后,我们需要通过以下命令创建 Maven 项目文件: Chapter 1 13 Once the repository is created, you need to add the remote repository to your local repository and push the changes to the remote repository. git remote add origin https://[user]@bitbucket.org/[user]/ firstgitproject.git git push origin master You must replace [user] in the preceding command with your registered username. Cloning of a repository will be covered in later recipes, as will some standard version control workflows. Creating a "Hello World" topology The "Hello World" topology, as with all "Hello World" applications, is of no real use to anyone, except to illustrate some really basic concepts. The "Hello World" topology will show how to create a Storm project including a simple spout and bolt, build it, and execute it in the local cluster mode. How to do it… 1. Create a new project folder and initialize your Git repository. mkdir HelloWorld cd HelloWorld git init 2. We must then create the Maven project file as follows: vim pom.xml 3. Using vim, or any other text editor, you need to create the basic XML tags and project metadata for the "Hello World" project. 4.0.0 storm.cookbook hello-world Step03 使用 vim 或任何其他文本编辑器,为“ Hello World”项目添加基本的 XML 标签 和项目元数据。 Chapter 1 13 Once the repository is created, you need to add the remote repository to your local repository and push the changes to the remote repository. git remote add origin https://[user]@bitbucket.org/[user]/ firstgitproject.git git push origin master You must replace [user] in the preceding command with your registered username. Cloning of a repository will be covered in later recipes, as will some standard version control workflows. Creating a "Hello World" topology The "Hello World" topology, as with all "Hello World" applications, is of no real use to anyone, except to illustrate some really basic concepts. The "Hello World" topology will show how to create a Storm project including a simple spout and bolt, build it, and execute it in the local cluster mode. How to do it… 1. Create a new project folder and initialize your Git repository. mkdir HelloWorld cd HelloWorld git init 2. We must then create the Maven project file as follows: vim pom.xml 3. Using vim, or any other text editor, you need to create the basic XML tags and project metadata for the "Hello World" project. 4.0.0 storm.cookbook hello-world Setting Up Your Development Environment 14 0.0.1-SNAPSHOT jar hello-world https://bitbucket.org/[user]/hello-world UTF-8 4. We then need to declare which Maven repositories we need to fetch our dependencies from. Add the following to the pom.xml file within the project tags: github-releases http://oss.sonatype.org/content/repositories /github-releases/ clojars.org http://clojars.org/repo twitter4j http://twitter4j.org/maven2 You can override these repositories using your .m2 and settings.xml files, the details of which are outside the scope of this book; however, this is extremely useful within development teams where dependency management is the key. 5. We then need to declare our dependencies by adding them within the project tags: junit junit Step04 然后,我们需要指定从哪一个 Maven 仓库获取依赖项。在 pom.xml 文件的项目标 签内添加以下内容: Setting Up Your Development Environment 14 0.0.1-SNAPSHOT jar hello-world https://bitbucket.org/[user]/hello-world UTF-8 4. We then need to declare which Maven repositories we need to fetch our dependencies from. Add the following to the pom.xml file within the project tags: github-releases http://oss.sonatype.org/content/repositories /github-releases/ clojars.org http://clojars.org/repo twitter4j http://twitter4j.org/maven2 You can override these repositories using your .m2 and settings.xml files, the details of which are outside the scope of this book; however, this is extremely useful within development teams where dependency management is the key. 5. We then need to declare our dependencies by adding them within the project tags: junit junit 1.4 创建“Hello World”Topology   7 Setting Up Your Development Environment 14 0.0.1-SNAPSHOT jar hello-world https://bitbucket.org/[user]/hello-world UTF-8 4. We then need to declare which Maven repositories we need to fetch our dependencies from. Add the following to the pom.xml file within the project tags: github-releases http://oss.sonatype.org/content/repositories /github-releases/ clojars.org http://clojars.org/repo twitter4j http://twitter4j.org/maven2 You can override these repositories using your .m2 and settings.xml files, the details of which are outside the scope of this book; however, this is extremely useful within development teams where dependency management is the key. 5. We then need to declare our dependencies by adding them within the project tags: junit junit 你可以使用 .m2 和 settings.xml 文件替换上面指定的这些仓库,虽然具体方 法超出了本书的讨论范围,但对于开发团队来说依赖项管理非常重要,因此 这种方法会变得十分有用。 Step05 然后在项目标签中添加我们所需的依赖项: Setting Up Your Development Environment 14 0.0.1-SNAPSHOT jar hello-world https://bitbucket.org/[user]/hello-world UTF-8 4. We then need to declare which Maven repositories we need to fetch our dependencies from. Add the following to the pom.xml file within the project tags: github-releases http://oss.sonatype.org/content/repositories /github-releases/ clojars.org http://clojars.org/repo twitter4j http://twitter4j.org/maven2 You can override these repositories using your .m2 and settings.xml files, the details of which are outside the scope of this book; however, this is extremely useful within development teams where dependency management is the key. 5. We then need to declare our dependencies by adding them within the project tags: junit junit Chapter 1 15 3.8.1 test storm storm 0.8.1 provided com.googlecode.json-simple json-simple 1.1 6. Finally we need to add the build plugin definitions for Maven: maven-assembly-plugin jar-with-dependencies make-assembly package single Step06 最后,我们需要为 Maven 添加 build 插件定义: Chapter 1 15 3.8.1 test storm storm 0.8.1 provided com.googlecode.json-simple json-simple 1.1 6. Finally we need to add the build plugin definitions for Maven: maven-assembly-plugin jar-with-dependencies make-assembly package single 将 maven-assembly-plugin 绑定到 package 阶段,这样就可以创建一个适用 于集群部署且不含 Storm 依赖项的 jar 文件 8   第1章 搭建开发环境 Chapter 1 15 3.8.1 test storm storm 0.8.1 provided com.googlecode.json-simple json-simple 1.1 6. Finally we need to add the build plugin definitions for Maven: maven-assembly-plugin jar-with-dependencies make-assembly package single Setting Up Your Development Environment 16 com.theoryinpractise clojure-maven-plugin 1.3.8 true src/clj compile compile compile test test test org.apache.maven.plugins maven-compiler-plugin 1.6 1.6 7. With the POM file complete, save it using the Esc + : + wq + Enter key sequence and complete the required folder structure for the Maven project: mkdir src cd src mkdir test mkdir main cd main mkdir java Setting Up Your Development Environment 16 com.theoryinpractise clojure-maven-plugin 1.3.8 true src/clj compile compile compile test test test org.apache.maven.plugins maven-compiler-plugin 1.6 1.6 7. With the POM file complete, save it using the Esc + : + wq + Enter key sequence and complete the required folder structure for the Maven project: mkdir src cd src mkdir test mkdir main cd main mkdir java Setting Up Your Development Environment 16 com.theoryinpractise clojure-maven-plugin 1.3.8 true src/clj compile compile compile test test test org.apache.maven.plugins maven-compiler-plugin 1.6 1.6 7. With the POM file complete, save it using the Esc + : + wq + Enter key sequence and complete the required folder structure for the Maven project: mkdir src cd src mkdir test mkdir main cd main mkdir java 10   第1章 搭建开发环境 HelloWorldSpout 的类,并继承 BaseRichSpout。Eclipse 会为你生成一个默认的 spouts 方法。 Spout 会生成一个随机数生成的 Tuple。添加以下成员变量并构造对象: Setting Up Your Development Environment 18 10. You must then create your first spout by creating a new class named HelloWorldSpout, which extends from BaseRichSpout and is located in the storm.cookbook package. Eclipse will generate a default spouts method for you. The spout will simply generate tuples based on random probability. Create the following member variables and construct the object: private SpoutOutputCollector collector; private int referenceRandom; private static final int MAX_RANDOM = 10; public HelloWorldSpout(){ final Random rand = new Random(); referenceRandom = rand.nextInt(MAX_RANDOM); } 11. After construction, the Storm cluster will open the spout; provide the following implementation for the open method: public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) { this.collector = collector; } 12. The Storm cluster will repeatedly call the nextTuple method, which will do all the work of the spout. Provide the following implementation for the method: Utils.sleep(100); final Random rand = new Random(); int instanceRandom = rand.nextInt(MAX_RANDOM); if(instanceRandom == referenceRandom){ collector.emit(new Values("Hello World")); } else { collector.emit(new Values("Other Random Word")); } 13. Finally, you need to tell the Storm cluster which fields this spout emits within the declareOutputFields method: declarer.declare(new Fields("sentence")); 14. Once you have resolved all the required imports for the class, you need to create HelloWorldBolt. This class will consume the produced tuples and implement the required counting logic. Create the new class within the storm.cookbook package; it should extend the BaseRichBolt class. Declare a private member variable and provide the following implementation for the execute method, which does the work for this bolt: Step11 对象构造好后,Storm 集群将会打开 Spout,为 open 方法添加以下实现代码: Setting Up Your Development Environment 18 10. You must then create your first spout by creating a new class named HelloWorldSpout, which extends from BaseRichSpout and is located in the storm.cookbook package. Eclipse will generate a default spouts method for you. The spout will simply generate tuples based on random probability. Create the following member variables and construct the object: private SpoutOutputCollector collector; private int referenceRandom; private static final int MAX_RANDOM = 10; public HelloWorldSpout(){ final Random rand = new Random(); referenceRandom = rand.nextInt(MAX_RANDOM); } 11. After construction, the Storm cluster will open the spout; provide the following implementation for the open method: public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) { this.collector = collector; } 12. The Storm cluster will repeatedly call the nextTuple method, which will do all the work of the spout. Provide the following implementation for the method: Utils.sleep(100); final Random rand = new Random(); int instanceRandom = rand.nextInt(MAX_RANDOM); if(instanceRandom == referenceRandom){ collector.emit(new Values("Hello World")); } else { collector.emit(new Values("Other Random Word")); } 13. Finally, you need to tell the Storm cluster which fields this spout emits within the declareOutputFields method: declarer.declare(new Fields("sentence")); 14. Once you have resolved all the required imports for the class, you need to create HelloWorldBolt. This class will consume the produced tuples and implement the required counting logic. Create the new class within the storm.cookbook package; it should extend the BaseRichBolt class. Declare a private member variable and provide the following implementation for the execute method, which does the work for this bolt: Step12 Storm 集群会重复调用 nextTuple 方法,该方法包含了整个 Spout 的逻辑。为该方 法添加以下实现代码: Setting Up Your Development Environment 18 10. You must then create your first spout by creating a new class named HelloWorldSpout, which extends from BaseRichSpout and is located in the storm.cookbook package. Eclipse will generate a default spouts method for you. The spout will simply generate tuples based on random probability. Create the following member variables and construct the object: private SpoutOutputCollector collector; private int referenceRandom; private static final int MAX_RANDOM = 10; public HelloWorldSpout(){ final Random rand = new Random(); referenceRandom = rand.nextInt(MAX_RANDOM); } 11. After construction, the Storm cluster will open the spout; provide the following implementation for the open method: public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) { this.collector = collector; } 12. The Storm cluster will repeatedly call the nextTuple method, which will do all the work of the spout. Provide the following implementation for the method: Utils.sleep(100); final Random rand = new Random(); int instanceRandom = rand.nextInt(MAX_RANDOM); if(instanceRandom == referenceRandom){ collector.emit(new Values("Hello World")); } else { collector.emit(new Values("Other Random Word")); } 13. Finally, you need to tell the Storm cluster which fields this spout emits within the declareOutputFields method: declarer.declare(new Fields("sentence")); 14. Once you have resolved all the required imports for the class, you need to create HelloWorldBolt. This class will consume the produced tuples and implement the required counting logic. Create the new class within the storm.cookbook package; it should extend the BaseRichBolt class. Declare a private member variable and provide the following implementation for the execute method, which does the work for this bolt: Step13 最后,你需要通过 declareOutputFields 方法,告诉 Storm 集群 Spout 发送了哪些字段: Setting Up Your Development Environment 18 10. You must then create your first spout by creating a new class named HelloWorldSpout, which extends from BaseRichSpout and is located in the storm.cookbook package. Eclipse will generate a default spouts method for you. The spout will simply generate tuples based on random probability. Create the following member variables and construct the object: private SpoutOutputCollector collector; private int referenceRandom; private static final int MAX_RANDOM = 10; public HelloWorldSpout(){ final Random rand = new Random(); referenceRandom = rand.nextInt(MAX_RANDOM); } 11. After construction, the Storm cluster will open the spout; provide the following implementation for the open method: public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) { this.collector = collector; } 12. The Storm cluster will repeatedly call the nextTuple method, which will do all the work of the spout. Provide the following implementation for the method: Utils.sleep(100); final Random rand = new Random(); int instanceRandom = rand.nextInt(MAX_RANDOM); if(instanceRandom == referenceRandom){ collector.emit(new Values("Hello World")); } else { collector.emit(new Values("Other Random Word")); } 13. Finally, you need to tell the Storm cluster which fields this spout emits within the declareOutputFields method: declarer.declare(new Fields("sentence")); 14. Once you have resolved all the required imports for the class, you need to create HelloWorldBolt. This class will consume the produced tuples and implement the required counting logic. Create the new class within the storm.cookbook package; it should extend the BaseRichBolt class. Declare a private member variable and provide the following implementation for the execute method, which does the work for this bolt: Step14 导入类所需的包和类后,就可以开始创建 HelloWorldBolt 了。该类用于读取已产 生的 Tuple 并实现必要的统计逻辑。在 storm.cookbook 包中创建一个继承 BaseRichBolt 的新 类。为 Bolt 声明一个私有成员变量并为 execute 方法添加以下实现代码:Chapter 1 19 String test = input.getStringByField("sentence"); if("Hello World".equals(test)){ myCount++; System.out.println("Found a Hello World! My Count is now: " + Integer.toString(myCount)); } 15. Finally, you need to bring the elements together and declare the Storm topology. Create a main class named HelloWorldTopology within the same package and provide the following main implementation: TopologyBuilder builder = new TopologyBuilder(); builder.setSpout("randomHelloWorld", new HelloWorldSpout(), 10); builder.setBolt("HelloWorldBolt", new HelloWorldBolt(), 2) .shuffleGrouping("randomHelloWorld"); Config conf = new Config(); conf.setDebug(true); if(args!=null && args.length > 0) { conf.setNumWorkers(3); StormSubmitter.submitTopology(args[0], conf, builder.createTopology()); } else { LocalCluster cluster = new LocalCluster(); cluster.submitTopology("test", conf, builder.createTopology()); Utils.sleep(10000); cluster.killTopology("test"); cluster.shutdown(); } This will essentially set up the topology and submit it to either a local or remote Storm cluster, depending on the arguments passed to the main method. 16. After you have resolved the compiler issues, you can execute the cluster by issuing the following command from the project's root folder: mvn compile exec:java -Dexec.classpathScope=compile -Dexec. mainClass=storm.cookbook.HelloWorldTopology Step15 最后,将所有代码逻辑组织起来,声明 Storm Topology。在同一个包中创建名为 HelloWorldTopology 的主类,并添加以下主函数实现: Chapter 1 19 String test = input.getStringByField("sentence"); if("Hello World".equals(test)){ myCount++; System.out.println("Found a Hello World! My Count is now: " + Integer.toString(myCount)); } 15. Finally, you need to bring the elements together and declare the Storm topology. Create a main class named HelloWorldTopology within the same package and provide the following main implementation: TopologyBuilder builder = new TopologyBuilder(); builder.setSpout("randomHelloWorld", new HelloWorldSpout(), 10); builder.setBolt("HelloWorldBolt", new HelloWorldBolt(), 2) .shuffleGrouping("randomHelloWorld"); Config conf = new Config(); conf.setDebug(true); if(args!=null && args.length > 0) { conf.setNumWorkers(3); StormSubmitter.submitTopology(args[0], conf, builder.createTopology()); } else { LocalCluster cluster = new LocalCluster(); cluster.submitTopology("test", conf, builder.createTopology()); Utils.sleep(10000); cluster.killTopology("test"); cluster.shutdown(); } This will essentially set up the topology and submit it to either a local or remote Storm cluster, depending on the arguments passed to the main method. 16. After you have resolved the compiler issues, you can execute the cluster by issuing the following command from the project's root folder: mvn compile exec:java -Dexec.classpathScope=compile -Dexec. mainClass=storm.cookbook.HelloWorldTopology 1.4 创建“Hello World”Topology   11 Chapter 1 19 String test = input.getStringByField("sentence"); if("Hello World".equals(test)){ myCount++; System.out.println("Found a Hello World! My Count is now: " + Integer.toString(myCount)); } 15. Finally, you need to bring the elements together and declare the Storm topology. Create a main class named HelloWorldTopology within the same package and provide the following main implementation: TopologyBuilder builder = new TopologyBuilder(); builder.setSpout("randomHelloWorld", new HelloWorldSpout(), 10); builder.setBolt("HelloWorldBolt", new HelloWorldBolt(), 2) .shuffleGrouping("randomHelloWorld"); Config conf = new Config(); conf.setDebug(true); if(args!=null && args.length > 0) { conf.setNumWorkers(3); StormSubmitter.submitTopology(args[0], conf, builder.createTopology()); } else { LocalCluster cluster = new LocalCluster(); cluster.submitTopology("test", conf, builder.createTopology()); Utils.sleep(10000); cluster.killTopology("test"); cluster.shutdown(); } This will essentially set up the topology and submit it to either a local or remote Storm cluster, depending on the arguments passed to the main method. 16. After you have resolved the compiler issues, you can execute the cluster by issuing the following command from the project's root folder: mvn compile exec:java -Dexec.classpathScope=compile -Dexec. mainClass=storm.cookbook.HelloWorldTopology 这样就搭建好了 Topology,并根据传递给 main 方法的参数,将其提交到本地或远程的 Storm 集群上。 Step16 解决所有的编译问题后,你就能在项目根目录下,通过以下命令启动集群了: Chapter 1 19 String test = input.getStringByField("sentence"); if("Hello World".equals(test)){ myCount++; System.out.println("Found a Hello World! My Count is now: " + Integer.toString(myCount)); } 15. Finally, you need to bring the elements together and declare the Storm topology. Create a main class named HelloWorldTopology within the same package and provide the following main implementation: TopologyBuilder builder = new TopologyBuilder(); builder.setSpout("randomHelloWorld", new HelloWorldSpout(), 10); builder.setBolt("HelloWorldBolt", new HelloWorldBolt(), 2) .shuffleGrouping("randomHelloWorld"); Config conf = new Config(); conf.setDebug(true); if(args!=null && args.length > 0) { conf.setNumWorkers(3); StormSubmitter.submitTopology(args[0], conf, builder.createTopology()); } else { LocalCluster cluster = new LocalCluster(); cluster.submitTopology("test", conf, builder.createTopology()); Utils.sleep(10000); cluster.killTopology("test"); cluster.shutdown(); } This will essentially set up the topology and submit it to either a local or remote Storm cluster, depending on the arguments passed to the main method. 16. After you have resolved the compiler issues, you can execute the cluster by issuing the following command from the project's root folder: mvn compile exec:java -Dexec.classpathScope=compile -Dexec. mainClass=storm.cookbook.HelloWorldTopology 1.4.2 解析 图 1-4 描述了“Hello World” Topology: 图 1-4 “ Hello World” Topology Spout 最终会发送一个流(Stream),包含下面两条语句中的任意一条: ‰ Other Random Word ‰ Hello World 12   第1章 搭建开发环境 Spout 根据随机概率决定要发送哪一条语句,具体原理是:先在构造函数中生成一个随 机数,作为原始变量的值,接着生成随后的随机数,并和原始成员变量的值进行比较。若比 较结果一致,就发送“Hello World”,否则会发送另一条语句。 Bolt 会简单对比和计算 Hello World 的数量。在我们当前的实现中,你会注意到一系 列通过Bolt 打印出来的增量。想要扩展 Bolt 的规模,只需通过更新以下语句就可以为 Topology 增加并行程度: Setting Up Your Development Environment 20 How it works… The following diagram describes the "Hello World" topology: Hello World Spout Hello World Bolt Hello World Spout Hello World Bolt The spout essentially emits a stream containing one of the following two sentences: f Other Random Word f Hello World Based on random probability, it works by generating a random number upon construction and then generates subsequent random numbers to test against the original member's variable value. When it matches, Hello World is emitted; during the remaining executions, the other random words are emitted. The bolt simply matches and counts the instances of Hello World. In the current implementation, you will notice sequential increments being printed from the bolt. In order to scale this bolt, you simply need to increase the parallelism hint for the topology by updating the following line: builder.setBolt("HelloWorldBolt", new HelloWorldBolt(), 3) .shuffleGrouping("randomHelloWorld"); The key parameter here is parallism_hint, which you can adjust upwards. If you execute the cluster again, you will then notice three separate counts that are printed independently and interweaved with each other. You can scale a cluster after deployment by updating these hints using the Storm GUI or CLI; however, you can't change the topology structure without recompiling and redeploying the JAR. For the command-line option, please see the CLI documentation on the wiki available at the following link: https://github.com/nathanmarz/storm/wiki/ Command-line-client 这里 parallism_hint 是关键参数,你可以根据情况把这个参数值调高。如果重新运行集 群,你将能看到三条独立的计数信息交织地打印出来。 部署运行后若要调整集群的大小,可以通过Storm 的 GUI 或 CLI 来修 改并行程度。但无法在没有重新编译和部署 JAR 包的情况下,直接修改 Topology 的结构。有关命令行选项的信息,请通过以下链接查看 wiki 上的 CLI 文档:https://github.com/nathanmarz/storm/wiki/Command-line-client。 确保POM 中指定的项目依赖项正确是很重要的。我们必须将 Storm JAR 文件的范 围设定成 provided,否则这些依赖项会被打包进你的 JAR,这会导致在一个集群节点中, classpath 下出现重复的类文件。而且 Storm 会检查这种 classpath 重复的问题。启动集群会因 为部署中包含了 Storm 的文件而失败。 下载示例代码 访问 http://www.packtpub.com 可下载本书及你所购买的所有 Packt 图书的 示例代码。如果你是从其他地方购买的本书英文版,可以访问 http://www. packtpub.com/support 并注册,以便通过电子邮件取得示例文件。 作者通过Bitbucket 账户来维护开源版本的代码:https://bitbucket.org/ qanderson。 1.5 创建 Storm 集群——配置机器 本地模式下测试集群对调试和验证集群的基本功能逻辑很有帮助。但是,这并不代表你 就能够了解集群在实际环境中运行的状况。此外,只有当系统已经在产品环境中运行时,开 发工作才算真正完成。任何开发者都应该重视这一点,并且这也是整个 DevOps 实践的基础。 无论采用什么方法,你都必须能够将代码可靠地部署到产品环境中。本节将展示如何直接通 Chapter 1 23 "http://dl.dropbox.com/u/1537815/precise64.box" config.vm.network :hostonly, opts[:ip] config.vm.host_name = "storm.%s" % opts[:name].to_s config.vm.share_folder "v-data", "/vagrant_data", "./data", :transient => false config.vm.customize ["modifyvm", :id, "--memory", opts[:memory]] config.vm.customize ["modifyvm", :id, "--cpus", opts[:cpus] ] if opts[:cpus] 4. The provisioning of the application is then configured using a combination of the bash and Puppet scripts: config.vm.provision :shell, :inline => "cp -fv /vagrant_data/hosts /etc/hosts" config.vm.provision :shell, :inline => "apt-get update" # Check if the jdk has been provided if File.exist?("./data/jdk-6u35-linux-x64.bin") then config.vm.provision :puppet do |puppet| puppet.manifest_file = "jdk.pp" end end config.vm.provision :puppet do |puppet| puppet.manifest_file = "provisioningInit.pp" end # Ask puppet to do the provisioning now. config.vm.provision :shell, :inline => "puppet apply /tmp/storm-puppet/manifests/site.pp --verbose -- modulepath=/tmp/storm-puppet/modules/ --debug" end end end The Vagrant file simply defines the hypervisor-level configuration and provisioning; the remaining provisioning is done through Puppet and is defined at two levels. The first level makes the base Ubuntu installation ready for application provisioning. The second level contains the actual application provisioning. In order to create the first level of provisioning, you need to create the JDK provisioning bash script and the provisioning initialization Puppet script. puppet.manifests_ path = "manifests" puppet.manifests_ path = "manifests" Chapter 1 23 "http://dl.dropbox.com/u/1537815/precise64.box" config.vm.network :hostonly, opts[:ip] config.vm.host_name = "storm.%s" % opts[:name].to_s config.vm.share_folder "v-data", "/vagrant_data", "./data", :transient => false config.vm.customize ["modifyvm", :id, "--memory", opts[:memory]] config.vm.customize ["modifyvm", :id, "--cpus", opts[:cpus] ] if opts[:cpus] 4. The provisioning of the application is then configured using a combination of the bash and Puppet scripts: config.vm.provision :shell, :inline => "cp -fv /vagrant_data/hosts /etc/hosts" config.vm.provision :shell, :inline => "apt-get update" # Check if the jdk has been provided if File.exist?("./data/jdk-6u35-linux-x64.bin") then config.vm.provision :puppet do |puppet| puppet.manifest_file = "jdk.pp" end end config.vm.provision :puppet do |puppet| puppet.manifest_file = "provisioningInit.pp" end # Ask puppet to do the provisioning now. config.vm.provision :shell, :inline => "puppet apply /tmp/storm-puppet/manifests/site.pp --verbose -- modulepath=/tmp/storm-puppet/modules/ --debug" end end end The Vagrant file simply defines the hypervisor-level configuration and provisioning; the remaining provisioning is done through Puppet and is defined at two levels. The first level makes the base Ubuntu installation ready for application provisioning. The second level contains the actual application provisioning. In order to create the first level of provisioning, you need to create the JDK provisioning bash script and the provisioning initialization Puppet script. puppet.manifests_ path = "manifests" puppet.manifests_ path = "manifests" 1.5 创建Storm集群——配置机器   15 并定义了两个层次。第一层是安装 Ubuntu,为应用程序配置做准备。第二层则包含了实际的 应用程序配置操作。要创建第一层的配置项,需要创建用于 JDK 配置的 bash 脚本和初始化 配置项的 Puppet 脚本。 Step05 在项目的 scripts 目录中新建 installJdk.sh 文件并添加以下代码: Setting Up Your Development Environment 24 5. In the scripts folder of the project, create the installJdk.sh file and populate it with the following code: #!/bin/sh echo "Installing JDK!" chmod 775 /vagrant_data/jdk-6u35-linux-x64.bin cd /root yes | /vagrant_data/jdk-6u35-linux-x64.bin /bin/mv /root/jdk1.6.0_35 /opt /bin/rm -rv /usr/bin/java /bin/rm -rv /usr/bin/javac /bin/ln -s /opt/jdk1.6.0_35/bin/java /usr/bin /bin/ln -s /opt/jdk1.6.0_35/bin/javac /usr/bin JAVA_HOME=/opt/jdk1.6.0_35 export JAVA_HOME PATH=$PATH:$JAVA_HOME/bin export PATH This will simply be invoked by the Puppet script in a declarative manner. 6. In the manifest folder create a file called jdk.pp: $JDK_VERSION = "1.6.0_35" package {"openjdk": ensure => absent, } exec { "installJdk": command => "installJdk.sh", path => "/vagrant/scripts", logoutput => true, creates => "/opt/jdk${JDK_VERSION}", } 7. In the manifest folder, create the provisioningInit.pp file and define the required packages and static variable values: $CLONE_URL = "https://bitbucket.org/qanderson/storm-puppet.git" $CHECKOUT_DIR="/tmp/storm-puppet" package {git:ensure=> [latest,installed]} package {puppet:ensure=> [latest,installed]} package {ruby:ensure=> [latest,installed]} package {rubygems:ensure=> [latest,installed]} package {unzip:ensure=> [latest,installed]} exec { "install_hiera": command => "gem install hiera hiera-puppet", path => "/usr/bin", require => Package['rubygems'], } 我们将会使用声明式的 Puppet 脚本调用该脚本文件。 Step06 在 manifest 目录中新建 jdk.pp: Setting Up Your Development Environment 24 5. In the scripts folder of the project, create the installJdk.sh file and populate it with the following code: #!/bin/sh echo "Installing JDK!" chmod 775 /vagrant_data/jdk-6u35-linux-x64.bin cd /root yes | /vagrant_data/jdk-6u35-linux-x64.bin /bin/mv /root/jdk1.6.0_35 /opt /bin/rm -rv /usr/bin/java /bin/rm -rv /usr/bin/javac /bin/ln -s /opt/jdk1.6.0_35/bin/java /usr/bin /bin/ln -s /opt/jdk1.6.0_35/bin/javac /usr/bin JAVA_HOME=/opt/jdk1.6.0_35 export JAVA_HOME PATH=$PATH:$JAVA_HOME/bin export PATH This will simply be invoked by the Puppet script in a declarative manner. 6. In the manifest folder create a file called jdk.pp: $JDK_VERSION = "1.6.0_35" package {"openjdk": ensure => absent, } exec { "installJdk": command => "installJdk.sh", path => "/vagrant/scripts", logoutput => true, creates => "/opt/jdk${JDK_VERSION}", } 7. In the manifest folder, create the provisioningInit.pp file and define the required packages and static variable values: $CLONE_URL = "https://bitbucket.org/qanderson/storm-puppet.git" $CHECKOUT_DIR="/tmp/storm-puppet" package {git:ensure=> [latest,installed]} package {puppet:ensure=> [latest,installed]} package {ruby:ensure=> [latest,installed]} package {rubygems:ensure=> [latest,installed]} package {unzip:ensure=> [latest,installed]} exec { "install_hiera": command => "gem install hiera hiera-puppet", path => "/usr/bin", require => Package['rubygems'], } Step07 在 manifest 目录中新建 provisioningInit.pp 文件,然后定义需要用到的包和静态变量值: Setting Up Your Development Environment 24 5. In the scripts folder of the project, create the installJdk.sh file and populate it with the following code: #!/bin/sh echo "Installing JDK!" chmod 775 /vagrant_data/jdk-6u35-linux-x64.bin cd /root yes | /vagrant_data/jdk-6u35-linux-x64.bin /bin/mv /root/jdk1.6.0_35 /opt /bin/rm -rv /usr/bin/java /bin/rm -rv /usr/bin/javac /bin/ln -s /opt/jdk1.6.0_35/bin/java /usr/bin /bin/ln -s /opt/jdk1.6.0_35/bin/javac /usr/bin JAVA_HOME=/opt/jdk1.6.0_35 export JAVA_HOME PATH=$PATH:$JAVA_HOME/bin export PATH This will simply be invoked by the Puppet script in a declarative manner. 6. In the manifest folder create a file called jdk.pp: $JDK_VERSION = "1.6.0_35" package {"openjdk": ensure => absent, } exec { "installJdk": command => "installJdk.sh", path => "/vagrant/scripts", logoutput => true, creates => "/opt/jdk${JDK_VERSION}", } 7. In the manifest folder, create the provisioningInit.pp file and define the required packages and static variable values: $CLONE_URL = "https://bitbucket.org/qanderson/storm-puppet.git" $CHECKOUT_DIR="/tmp/storm-puppet" package {git:ensure=> [latest,installed]} package {puppet:ensure=> [latest,installed]} package {ruby:ensure=> [latest,installed]} package {rubygems:ensure=> [latest,installed]} package {unzip:ensure=> [latest,installed]} exec { "install_hiera": command => "gem install hiera hiera-puppet", path => "/usr/bin", require => Package['rubygems'], } 如果需要了解更多有关 Hiera 的信息,请查阅 Puppet 的文档页 http://docs. puppetlabs.com/hiera/1/index.html。 16   第1章 搭建开发环境 Step08 然后,我们就可以克隆包含二级配置项的代码仓库了: Chapter 1 25 For more information on Hiera, please see the Puppet documentation page at http://docs.puppetlabs.com/hiera/1/index.html. 8. You must then clone the repository, which contains the second level of provisioning: exec { "clone_storm-puppet": command => "git clone ${CLONE_URL}", cwd => "/tmp", path => "/usr/bin", creates => "${CHECKOUT_DIR}", require => Package['git'], } 9. You must now configure a Puppet plugin called Hiera, which is used to externalize properties from the provisioning scripts in a hierarchical manner: exec {"/bin/ln -s /var/lib/gems/1.8/gems/hiera-puppet-1.0.0/ /tmp/ storm-puppet/modules/hiera-puppet": creates => "/tmp/storm-puppet/modules/hiera-puppet", require => [Exec['clone_storm- puppet'],Exec['install_hiera']] } #install hiera and the storm configuration file { "/etc/puppet/hiera.yaml": source => "/vagrant_data/hiera.yaml", replace => true, require => Package['puppet'] } file { "/etc/puppet/hieradata": ensure => directory, require => Package['puppet'] } file {"/etc/puppet/hieradata/storm.yaml": source => "${CHECKOUT_DIR}/modules/storm.yaml", replace => true, require => [Exec['clone_storm-puppet'],File['/etc/puppet/ hieradata']] } Step09 我们还需要配置 Puppet 插件 Hiera,它能以分层方式分离配置脚本的各项属性: Chapter 1 25 For more information on Hiera, please see the Puppet documentation page at http://docs.puppetlabs.com/hiera/1/index.html. 8. You must then clone the repository, which contains the second level of provisioning: exec { "clone_storm-puppet": command => "git clone ${CLONE_URL}", cwd => "/tmp", path => "/usr/bin", creates => "${CHECKOUT_DIR}", require => Package['git'], } 9. You must now configure a Puppet plugin called Hiera, which is used to externalize properties from the provisioning scripts in a hierarchical manner: exec {"/bin/ln -s /var/lib/gems/1.8/gems/hiera-puppet-1.0.0/ /tmp/ storm-puppet/modules/hiera-puppet": creates => "/tmp/storm-puppet/modules/hiera-puppet", require => [Exec['clone_storm- puppet'],Exec['install_hiera']] } #install hiera and the storm configuration file { "/etc/puppet/hiera.yaml": source => "/vagrant_data/hiera.yaml", replace => true, require => Package['puppet'] } file { "/etc/puppet/hieradata": ensure => directory, require => Package['puppet'] } file {"/etc/puppet/hieradata/storm.yaml": source => "${CHECKOUT_DIR}/modules/storm.yaml", replace => true, require => [Exec['clone_storm-puppet'],File['/etc/puppet/ hieradata']] } Step10 最后,还需要在 data 目录中增加几个文件。创建 Hiera 基本配置文件 hiera.yaml: Setting Up Your Development Environment 26 10. Finally, you need to populate the data folder. Create the Hiera base configuration file, hiera.yaml: --- :hierarchy: - %{operatingsystem} - storm :backends: - yaml :yaml: :datadir: '/etc/puppet/hieradata' 11. The final datafile required is the host's file, which act as the DNS in our local cluster: 127.0.0.1 localhost 192.168.33.100 storm.nimbus 192.168.33.101 storm.supervisor1 192.168.33.102 storm.supervisor2 192.168.33.103 storm.supervisor3 192.168.33.104 storm.supervisor4 192.168.33.105 storm.supervisor5 192.168.33.201 storm.zookeeper1 192.168.33.202 storm.zookeeper2 192.168.33.203 storm.zookeeper3 192.168.33.204 storm.zookeeper4 The host's file is not required in properly configured environments; however, it works nicely in our local "host only" development network. The project is now complete, in that it will provision the correct virtual machines and install the base required packages; however, we need to create the Application layer provisioning, which is contained in a separate repository. 12. Initialize your Git repository for this project and push it to bitbucket.org. How it works... Provisioning is performed on three distinct layers: Application Guest Hypervisor Step11 除此之外,你还需要 host 文件,它在本地集群环境中充当 DNS: # 安装 hiera 和 storm 配置文件 1.5 创建Storm集群——配置机器   17 Setting Up Your Development Environment 26 10. Finally, you need to populate the data folder. Create the Hiera base configuration file, hiera.yaml: --- :hierarchy: - %{operatingsystem} - storm :backends: - yaml :yaml: :datadir: '/etc/puppet/hieradata' 11. The final datafile required is the host's file, which act as the DNS in our local cluster: 127.0.0.1 localhost 192.168.33.100 storm.nimbus 192.168.33.101 storm.supervisor1 192.168.33.102 storm.supervisor2 192.168.33.103 storm.supervisor3 192.168.33.104 storm.supervisor4 192.168.33.105 storm.supervisor5 192.168.33.201 storm.zookeeper1 192.168.33.202 storm.zookeeper2 192.168.33.203 storm.zookeeper3 192.168.33.204 storm.zookeeper4 The host's file is not required in properly configured environments; however, it works nicely in our local "host only" development network. The project is now complete, in that it will provision the correct virtual machines and install the base required packages; however, we need to create the Application layer provisioning, which is contained in a separate repository. 12. Initialize your Git repository for this project and push it to bitbucket.org. How it works... Provisioning is performed on three distinct layers: Application Guest Hypervisor 在配置完善的环境中不需要 host 文件,但对于我们这个“只有一个主机”的 开发环境来说,host 文件还是非常实用的。 这样就完成了整个项目的构建,虽然它能正确配置虚拟机并安装需要用到的包,但我们 还是需要创建 Application 层的配置项,而这些配置项包含在独立的代码仓库中。 Step12 为当前项目初始化你的 Git 代码仓库,并将其提交至 bitbucket.org。 1.5.2 解析 我们在如图 1-6 所示的三个独立层次上进行配置。 图 1-6 三个独立层次 本节内容只涵盖了配置Guest 层和Hypervisor 层的方法,我们会在下一节介绍 Application 层的配置。分层的一个重要原因是,你通常会根据不同的部署管理程序为不同层 创建不同的配置项。当配置好虚拟机后,所有环境中的应用栈配置项都应该是一致的。保持 配置的一致性非常重要,因为只有在这个条件下,我们才能在完成产品开发前对我们部署的 产品进行成百上千次的测试,并保证它们处于可重用和版本控制的状态下。 开发环境中的 VirtualBox 是一个虚拟机管理程序,它通过 Vagrant 和 Puppet 提供其所需 的配置项。Vagrant 通过扩展 VirtualBox 镜像实现对虚拟机的配置。VirtualBox 镜像属于受版 本控制的工件。Vagrant 文件中定义的每个 Box,都需要指定以下参数: ‰ Base Box 18   第1章 搭建开发环境 ‰ 网络设置 ‰ 共享目录 ‰ 虚拟机的内存和 CPU 设置 这些基本配置项并不包含任何产品环境中你期望的基本控制策略,比如安 全、访问控制、托管和监控。所以你必须在部署至产品环境中之前配置好 这些基本控制策略。你可以在 Puppet Forge 上找到有关内容: http://forge. puppetlabs.com。 然后调用配置代理来完成剩余的工作: Chapter 1 27 This recipe only works in the bottom two layers, with the Application layer presented in the next recipe. A key reason for the separation is that you will typically create different provisioning at these layers depending on the Hypervisor you are using for deployment. Once the VMs are provisioned, however, the application stack provisioning should be consistent through all your environments. This is key, in that it allows us to test our deployments hundreds of times before we get to production, and ensure that they are in a repeatable and version-controlled state. In the development environment, VirtualBox is the Hypervisor with Vagrant and Puppet providing the provisioning. Vagrant works by specializing a base image of a VirtualBox. This base image represents a version-controlled artifact. For each box defined in our Vagrant file, the following parameters are specified: f The base box f The network settings f Shared folders f Memory and CPU settings for the VM This base provisioning does not include any of the baseline controls you would expect in a production environment, such as security, access controls, housekeeping, and monitoring. You must provision these before proceeding beyond your development environment. You can find these kinds of recipes on Puppet Forge (http://forge.puppetlabs.com/). Provisioning agents are then invoked to perform the remaining heavy lifting: config.vm.provision :shell, :inline => "cp -fv /vagrant_data/hosts /etc/ hosts" The preceding command installs the host's file that gives the resolution of our cluster name: config.vm.provision :shell, :inline => "apt-get update" This updates all the packages in the apt-get cache within the Ubuntu installation. Vagrant then proceeds to install the JDK and the base provisioning. Finally it invokes the application provisioning. The base VM image could contain the entire base provisioning already, thus making this portion of the provisioning unrequired. However, it is important to understand the process of creating an appropriate base image and also to balance the amount of specialization in the base images you control; otherwise, they will proliferate. 执行上面这条命令行语句会安装 host 文件,该文件包含了详细集群信息。 Chapter 1 27 This recipe only works in the bottom two layers, with the Application layer presented in the next recipe. A key reason for the separation is that you will typically create different provisioning at these layers depending on the Hypervisor you are using for deployment. Once the VMs are provisioned, however, the application stack provisioning should be consistent through all your environments. This is key, in that it allows us to test our deployments hundreds of times before we get to production, and ensure that they are in a repeatable and version-controlled state. In the development environment, VirtualBox is the Hypervisor with Vagrant and Puppet providing the provisioning. Vagrant works by specializing a base image of a VirtualBox. This base image represents a version-controlled artifact. For each box defined in our Vagrant file, the following parameters are specified: f The base box f The network settings f Shared folders f Memory and CPU settings for the VM This base provisioning does not include any of the baseline controls you would expect in a production environment, such as security, access controls, housekeeping, and monitoring. You must provision these before proceeding beyond your development environment. You can find these kinds of recipes on Puppet Forge (http://forge.puppetlabs.com/). Provisioning agents are then invoked to perform the remaining heavy lifting: config.vm.provision :shell, :inline => "cp -fv /vagrant_data/hosts /etc/ hosts" The preceding command installs the host's file that gives the resolution of our cluster name: config.vm.provision :shell, :inline => "apt-get update" This updates all the packages in the apt-get cache within the Ubuntu installation. Vagrant then proceeds to install the JDK and the base provisioning. Finally it invokes the application provisioning. The base VM image could contain the entire base provisioning already, thus making this portion of the provisioning unrequired. However, it is important to understand the process of creating an appropriate base image and also to balance the amount of specialization in the base images you control; otherwise, they will proliferate. 这条语句会更新 Ubuntu apt-get 缓存中所有的包,然后 Vagrant 会安装 JDK 和基本配置 项。最后它将调用这些应用程序配置项。 虚拟机镜像可能已经包含了完整的配置,所以不一定要执行以上步骤。但需 要知道如何创建合适的镜像,以及如何平衡基本镜像中特定配置的数量,否 则你的配置数量会过多。 1.6 创建 Storm 集群——配置 Storm 当拥有一组准备好进行应用程序配置的虚拟机后,你就可以在每个节点上安装和配置适 当的软件包了。 1.6.1 实战 Step01 按照图 1-7 所示的目录结构,新建名为 storm-puppet 的项目。 图 1-7 storm-puppet 的目录结构 1.6 创建Storm集群——配置Storm   19 Step02 在配置好的节点上, Puppet 的运行入口(起点)是 site.pp。在 manifests 目录下新 建 site.pp 并添加以下内容: Setting Up Your Development Environment 28 Creating a Storm cluster – provisioning Storm Once you have a base set of virtual machines that are ready for application provisioning, you need to install and configure the appropriate packages on each node. How to do it… 1. Create a new project named storm-puppet with the following folder structure: 2. The entry point into the Puppet execution on the provisioned node is site.pp. Create it in the manifests folder: node 'storm.nimbus' { $cluster = 'storm1' include storm::nimbus include storm::ui } node /storm.supervisor[1-9]/ { $cluster = 'storm1' include storm::supervisor } node /storm.zookeeper[1-9]/ { include storm::zoo } 3. Next, you need to define the storm module. A module exists in the modules folder and has its own manifests and template folder structure, much as with the structure found at the root level of the Puppet project. Within the storm module, create the required manifests (modules/storm/manifests), starting with the init.pp file: Step03 接着,你需要定义 Storm 模块。模块应该放置于 modules 目录下,并且和 Puppet 项目根目录的结构一样,拥有自己的 manifests 和 template 目录结构。在 Storm 模块目录中 创建必要的清单文件(modules/storm/manifests),首先创建 init.pp: Chapter 1 29 class storm { include storm::install include storm::config } 4. The installation of the Storm application is the same on each storm node; only the configurations are adjusted where required, via templating. Next create the install.pp file, which will download the required binaries and install them: class storm::install { $BASE_URL="https://bitbucket.org/qanderson/storm-deb- packaging/downloads/" $ZMQ_FILE="libzmq0_2.1.7_amd64.deb" $JZMQ_FILE="libjzmq_2.1.7_amd64.deb" $STORM_FILE="storm_0.8.1_all.deb" package { "wget": ensure => latest } # call fetch for each file exec { "wget_storm": command => "/usr/bin/wget ${BASE_URL}${STORM_FILE}" } exec {"wget_zmq": command => "/usr/bin/wget ${BASE_URL}${ZMQ_FILE}" } exec { "wget_jzmq": command => "/usr/bin/wget ${BASE_URL}${JZMQ_FILE}" } #call package for each file package { "libzmq0": provider => dpkg, ensure => installed, source => "${ZMQ_FILE}", require => Exec['wget_zmq'] } #call package for each file package { "libjzmq": provider => dpkg, ensure => installed, source => "${JZMQ_FILE}", require => [Exec['wget_jzmq'],Package['libzmq0']] } #call package for each file package { "storm": provider => dpkg, ensure => installed, source => "${STORM_FILE}", require => [Exec['wget_storm'], Package['libjzmq']] } } Step04 每个 storm 节点上安装 Storm 应用程序的方法都是一样的,只是配置需要通过预先 定义好的模板,根据实际情况进行调整。接着,我们来创建 install.pp 文件,该文件会指定需 要下载和安装的二进制文件: Chapter 1 29 class storm { include storm::install include storm::config } 4. The installation of the Storm application is the same on each storm node; only the configurations are adjusted where required, via templating. Next create the install.pp file, which will download the required binaries and install them: class storm::install { $BASE_URL="https://bitbucket.org/qanderson/storm-deb- packaging/downloads/" $ZMQ_FILE="libzmq0_2.1.7_amd64.deb" $JZMQ_FILE="libjzmq_2.1.7_amd64.deb" $STORM_FILE="storm_0.8.1_all.deb" package { "wget": ensure => latest } # call fetch for each file exec { "wget_storm": command => "/usr/bin/wget ${BASE_URL}${STORM_FILE}" } exec {"wget_zmq": command => "/usr/bin/wget ${BASE_URL}${ZMQ_FILE}" } exec { "wget_jzmq": command => "/usr/bin/wget ${BASE_URL}${JZMQ_FILE}" } #call package for each file package { "libzmq0": provider => dpkg, ensure => installed, source => "${ZMQ_FILE}", require => Exec['wget_zmq'] } #call package for each file package { "libjzmq": provider => dpkg, ensure => installed, source => "${JZMQ_FILE}", require => [Exec['wget_jzmq'],Package['libzmq0']] } #call package for each file package { "storm": provider => dpkg, ensure => installed, source => "${STORM_FILE}", require => [Exec['wget_storm'], Package['libjzmq']] } } # 下载所有文件 # 安装所有文件 20   第1章 搭建开发环境 Chapter 1 29 class storm { include storm::install include storm::config } 4. The installation of the Storm application is the same on each storm node; only the configurations are adjusted where required, via templating. Next create the install.pp file, which will download the required binaries and install them: class storm::install { $BASE_URL="https://bitbucket.org/qanderson/storm-deb- packaging/downloads/" $ZMQ_FILE="libzmq0_2.1.7_amd64.deb" $JZMQ_FILE="libjzmq_2.1.7_amd64.deb" $STORM_FILE="storm_0.8.1_all.deb" package { "wget": ensure => latest } # call fetch for each file exec { "wget_storm": command => "/usr/bin/wget ${BASE_URL}${STORM_FILE}" } exec {"wget_zmq": command => "/usr/bin/wget ${BASE_URL}${ZMQ_FILE}" } exec { "wget_jzmq": command => "/usr/bin/wget ${BASE_URL}${JZMQ_FILE}" } #call package for each file package { "libzmq0": provider => dpkg, ensure => installed, source => "${ZMQ_FILE}", require => Exec['wget_zmq'] } #call package for each file package { "libjzmq": provider => dpkg, ensure => installed, source => "${JZMQ_FILE}", require => [Exec['wget_jzmq'],Package['libzmq0']] } #call package for each file package { "storm": provider => dpkg, ensure => installed, source => "${STORM_FILE}", require => [Exec['wget_storm'], Package['libjzmq']] } } 这里的 install 清单文件是建立在 Ubuntu 的 Debian 安装包基础之上的。它 基于脚本构建,并可以根据需求进行调整。二进制文件和创建脚本可以在 https://bitbucket.org/qanderson/storm-deb-packaging 找到。 将会安装以下包: ‰ Storm ‰ ZeroMQ:http://www.zeromq.org/ ‰ Java-ZeroMQ Step05 每个节点的配置是通过基于模板产生的配置文件生成的。在 storm 清单目录下创建 config.pp: Setting Up Your Development Environment 30 The install manifest here assumes the existence of package, Debian packages, for Ubuntu. These were built using scripts and can be tweaked based on your requirements. The binaries and creation scripts can be found at https://bitbucket.org/qanderson/ storm-deb-packaging. The installation consists of the following packages: ‰ Storm ‰ ZeroMQ: http://www.zeromq.org/ ‰ Java-ZeroMQ 5. The configuration of each node is done through the template-based generation of the configuration files. In the storm manifests, create config.pp: class storm::config { require storm::install include storm::params file { '/etc/storm/storm.yaml': require => Package['storm'], content => template('storm/storm.yaml.erb'), owner => 'root', group => 'root', mode => '0644' } file { '/etc/default/storm': require => Package['storm'], content => template('storm/default.erb'), owner => 'root', group => 'root', mode => '0644' } } 6. All the storm parameters are defined using Hiera, with the Hiera configuration invoked from params.pp in the storm manifests: class storm::params { #_ STORM DEFAULTS _# $java_library_path = hiera_array('java_library_path', ['/usr/local/lib', '/opt/local/lib', '/usr/lib']) } Due to the sheer number of properties, the file has been concatenated. For the complete file, please refer to the Git repository at https:// bitbucket.org/qanderson/storm-puppet/src. Step06 所有 storm 参数都通过 Hiera 定义。Hiera 则会使用 storm 清单目录下 params.pp 定 1.6 创建Storm集群——配置Storm   21 义的 Hiera 配置: Setting Up Your Development Environment 30 The install manifest here assumes the existence of package, Debian packages, for Ubuntu. These were built using scripts and can be tweaked based on your requirements. The binaries and creation scripts can be found at https://bitbucket.org/qanderson/ storm-deb-packaging. The installation consists of the following packages: ‰ Storm ‰ ZeroMQ: http://www.zeromq.org/ ‰ Java-ZeroMQ 5. The configuration of each node is done through the template-based generation of the configuration files. In the storm manifests, create config.pp: class storm::config { require storm::install include storm::params file { '/etc/storm/storm.yaml': require => Package['storm'], content => template('storm/storm.yaml.erb'), owner => 'root', group => 'root', mode => '0644' } file { '/etc/default/storm': require => Package['storm'], content => template('storm/default.erb'), owner => 'root', group => 'root', mode => '0644' } } 6. All the storm parameters are defined using Hiera, with the Hiera configuration invoked from params.pp in the storm manifests: class storm::params { #_ STORM DEFAULTS _# $java_library_path = hiera_array('java_library_path', ['/usr/local/lib', '/opt/local/lib', '/usr/lib']) } Due to the sheer number of properties, the file has been concatenated. For the complete file, please refer to the Git repository at https:// bitbucket.org/qanderson/storm-puppet/src. 由于属性的数量过多,因此在此省略了配置的其他内容。请到位于 https:// bitbucket.org/ganderson/storm-puppet/src 的 Git 代码仓库获取完整的清单文件。 Step07 然后指定每个节点类。我们先定义 nimbus 类: Chapter 1 31 7. Each class of node is then specified; here we will specify the nimbus class: class storm::nimbus { require storm::install include storm::config include storm::params # Install nimbus /etc/default storm::service { 'nimbus': start => 'yes', jvm_memory => $storm::params::nimbus_mem } } Specify the supervisor class: class storm::supervisor { require storm::install include storm::config include storm::params # Install supervisor /etc/default storm::service { 'supervisor': start => 'yes', jvm_memory => $storm::params::supervisor_mem } } Specify the ui class: class storm::ui { require storm::install include storm::config include storm::params # Install ui /etc/default storm::service { 'ui': start => 'yes', jvm_memory => $storm::params::ui_mem } } And finally, specify the zoo class (for a zookeeper node): class storm::zoo { package {['zookeeper','zookeeper-bin','zookeeperd']: ensure => latest, } } 然后是 supervisor 类: Chapter 1 31 7. Each class of node is then specified; here we will specify the nimbus class: class storm::nimbus { require storm::install include storm::config include storm::params # Install nimbus /etc/default storm::service { 'nimbus': start => 'yes', jvm_memory => $storm::params::nimbus_mem } } Specify the supervisor class: class storm::supervisor { require storm::install include storm::config include storm::params # Install supervisor /etc/default storm::service { 'supervisor': start => 'yes', jvm_memory => $storm::params::supervisor_mem } } Specify the ui class: class storm::ui { require storm::install include storm::config include storm::params # Install ui /etc/default storm::service { 'ui': start => 'yes', jvm_memory => $storm::params::ui_mem } } And finally, specify the zoo class (for a zookeeper node): class storm::zoo { package {['zookeeper','zookeeper-bin','zookeeperd']: ensure => latest, } } 接着是 ui 类: Chapter 1 31 7. Each class of node is then specified; here we will specify the nimbus class: class storm::nimbus { require storm::install include storm::config include storm::params # Install nimbus /etc/default storm::service { 'nimbus': start => 'yes', jvm_memory => $storm::params::nimbus_mem } } Specify the supervisor class: class storm::supervisor { require storm::install include storm::config include storm::params # Install supervisor /etc/default storm::service { 'supervisor': start => 'yes', jvm_memory => $storm::params::supervisor_mem } } Specify the ui class: class storm::ui { require storm::install include storm::config include storm::params # Install ui /etc/default storm::service { 'ui': start => 'yes', jvm_memory => $storm::params::ui_mem } } And finally, specify the zoo class (for a zookeeper node): class storm::zoo { package {['zookeeper','zookeeper-bin','zookeeperd']: ensure => latest, } } 最后指定 zoo 类 (供 zookeeper 节点使用): # 安装 nimbus 的 /etc/defualt 文件 # 安装 supervisor 的 /etc/default 文件 # 安装 supervisor 的 /etc/default 文件 22   第1章 搭建开发环境 Chapter 1 31 7. Each class of node is then specified; here we will specify the nimbus class: class storm::nimbus { require storm::install include storm::config include storm::params # Install nimbus /etc/default storm::service { 'nimbus': start => 'yes', jvm_memory => $storm::params::nimbus_mem } } Specify the supervisor class: class storm::supervisor { require storm::install include storm::config include storm::params # Install supervisor /etc/default storm::service { 'supervisor': start => 'yes', jvm_memory => $storm::params::supervisor_mem } } Specify the ui class: class storm::ui { require storm::install include storm::config include storm::params # Install ui /etc/default storm::service { 'ui': start => 'yes', jvm_memory => $storm::params::ui_mem } } And finally, specify the zoo class (for a zookeeper node): class storm::zoo { package {['zookeeper','zookeeper-bin','zookeeperd']: ensure => latest, } } Step08 创建好所有文件后,初始化 Git 代码仓库并提交至 bitbucket.org。 Step09 为了确保配置生效,我们还要回到 vagrant-storm-cluster 目录并执行以下命令: Setting Up Your Development Environment 32 8. Once all the files have been created, initialize the Git repository and push it to bitbucket.org. 9. In order to actually run the provisioning, navigate to the vagrant-storm-cluster folder and run the following command: vagrant up 10. If you would like to ssh into any of the nodes, simply specify the following command: vagrant ssh nimbus Replace nimbus with your required node name. How it works… There are various patterns that can be applied when using Puppet. The simplest one is using a distributed model, whereby nodes provision themselves as opposed to a centralized model using Puppet Master. In the distributed model, updating server configuration simply requires that you update your provisioning manifests and push them to your central Git repository. The various nodes will then pull and apply this configuration. This can either be achieved through cron jobs, triggers, or through the use of a Continuous Delivery tool such as Jenkins, Bamboo, or Go. Provisioning in the development environment is explicitly invoked by Vagrant through the following command: config.vm.provision :shell, :inline => "puppet apply /tmp/storm-puppet/ manifests/site.pp --verbose --modulepath=/tmp/storm-puppet/modules/ --debug" The manifest is then applied declaratively by the Puppet. Puppet is declarative, in that each language element specifies the desired state together with methods for getting there. This means that, when the system is already in the required state, that particular provisioning step will be skipped, together with the adverse effects of duplicate provisioning. The storm-puppet project is therefore cloned onto the node and then the manifest is applied locally. Each node only applies provisioning for itself, based on the hostname specified in the site.pp manifest, for example: node 'storm.nimbus' { $cluster = 'storm1' include storm::nimbus include storm::ui } Step10 如果你想通过 ssh 访问任意一个节点,只需执行下面的命令: Setting Up Your Development Environment 32 8. Once all the files have been created, initialize the Git repository and push it to bitbucket.org. 9. In order to actually run the provisioning, navigate to the vagrant-storm-cluster folder and run the following command: vagrant up 10. If you would like to ssh into any of the nodes, simply specify the following command: vagrant ssh nimbus Replace nimbus with your required node name. How it works… There are various patterns that can be applied when using Puppet. The simplest one is using a distributed model, whereby nodes provision themselves as opposed to a centralized model using Puppet Master. In the distributed model, updating server configuration simply requires that you update your provisioning manifests and push them to your central Git repository. The various nodes will then pull and apply this configuration. This can either be achieved through cron jobs, triggers, or through the use of a Continuous Delivery tool such as Jenkins, Bamboo, or Go. Provisioning in the development environment is explicitly invoked by Vagrant through the following command: config.vm.provision :shell, :inline => "puppet apply /tmp/storm-puppet/ manifests/site.pp --verbose --modulepath=/tmp/storm-puppet/modules/ --debug" The manifest is then applied declaratively by the Puppet. Puppet is declarative, in that each language element specifies the desired state together with methods for getting there. This means that, when the system is already in the required state, that particular provisioning step will be skipped, together with the adverse effects of duplicate provisioning. The storm-puppet project is therefore cloned onto the node and then the manifest is applied locally. Each node only applies provisioning for itself, based on the hostname specified in the site.pp manifest, for example: node 'storm.nimbus' { $cluster = 'storm1' include storm::nimbus include storm::ui } 将 nimbus 替换为你所需的节点名。 1.6.2 解析 在使用 Puppet 过程中,可以应用多种不同的模式。最简单的方法是使用分布式模型,在 这种模式下,每一个节点进行自主配置,这与使用 Puppet Master 的集中式模式完全不同。 在分布式模型中,若要更新服务器配置,只需更新配置清单,并将配置清单提交到集中式 Git 代码仓库即可。所有节点随后会获取并应用这些配置。可以通过 cron jobs、触发器,或 者使用如 Jenkins、Bamboo、Go 这样的持续集成工具完成这项工作。使用以下命令调用 Vagrant,在开发环境中进行配置: Setting Up Your Development Environment 32 8. Once all the files have been created, initialize the Git repository and push it to bitbucket.org. 9. In order to actually run the provisioning, navigate to the vagrant-storm-cluster folder and run the following command: vagrant up 10. If you would like to ssh into any of the nodes, simply specify the following command: vagrant ssh nimbus Replace nimbus with your required node name. How it works… There are various patterns that can be applied when using Puppet. The simplest one is using a distributed model, whereby nodes provision themselves as opposed to a centralized model using Puppet Master. In the distributed model, updating server configuration simply requires that you update your provisioning manifests and push them to your central Git repository. The various nodes will then pull and apply this configuration. This can either be achieved through cron jobs, triggers, or through the use of a Continuous Delivery tool such as Jenkins, Bamboo, or Go. Provisioning in the development environment is explicitly invoked by Vagrant through the following command: config.vm.provision :shell, :inline => "puppet apply /tmp/storm-puppet/ manifests/site.pp --verbose --modulepath=/tmp/storm-puppet/modules/ --debug" The manifest is then applied declaratively by the Puppet. Puppet is declarative, in that each language element specifies the desired state together with methods for getting there. This means that, when the system is already in the required state, that particular provisioning step will be skipped, together with the adverse effects of duplicate provisioning. The storm-puppet project is therefore cloned onto the node and then the manifest is applied locally. Each node only applies provisioning for itself, based on the hostname specified in the site.pp manifest, for example: node 'storm.nimbus' { $cluster = 'storm1' include storm::nimbus include storm::ui } 这时 Puppet 就会应用清单文件。Puppet 是一个使用声明式语言的配置管理工具,其中每 一个语言元素都定义了期望的状态以及获得该状态的方法。也就是说,当系统已经处于所需 状态下时,Puppet 就会跳过特定的配置步骤,也就避免了重复配置的不利影响。 每个节点都会克隆 storm-puppet 项目,并在其本地应用清单配置。每个节点只应用与自 己对应的配置项,而应用哪一项配置取决于 site.pp 清单中定义的主机名,比如: Setting Up Your Development Environment 32 8. Once all the files have been created, initialize the Git repository and push it to bitbucket.org. 9. In order to actually run the provisioning, navigate to the vagrant-storm-cluster folder and run the following command: vagrant up 10. If you would like to ssh into any of the nodes, simply specify the following command: vagrant ssh nimbus Replace nimbus with your required node name. How it works… There are various patterns that can be applied when using Puppet. The simplest one is using a distributed model, whereby nodes provision themselves as opposed to a centralized model using Puppet Master. In the distributed model, updating server configuration simply requires that you update your provisioning manifests and push them to your central Git repository. The various nodes will then pull and apply this configuration. This can either be achieved through cron jobs, triggers, or through the use of a Continuous Delivery tool such as Jenkins, Bamboo, or Go. Provisioning in the development environment is explicitly invoked by Vagrant through the following command: config.vm.provision :shell, :inline => "puppet apply /tmp/storm-puppet/ manifests/site.pp --verbose --modulepath=/tmp/storm-puppet/modules/ --debug" The manifest is then applied declaratively by the Puppet. Puppet is declarative, in that each language element specifies the desired state together with methods for getting there. This means that, when the system is already in the required state, that particular provisioning step will be skipped, together with the adverse effects of duplicate provisioning. The storm-puppet project is therefore cloned onto the node and then the manifest is applied locally. Each node only applies provisioning for itself, based on the hostname specified in the site.pp manifest, for example: node 'storm.nimbus' { $cluster = 'storm1' include storm::nimbus include storm::ui } 在这种情况下,nimbus 节点将包含 cluster1 的 Hiera 配置,然后将会安装 nimbus 和 ui 节点。由于在 node 定义中可以包含任意类的组合,因此你能够方便地定义整个环境。 1.7 获取基本的点击率统计信息   23 1.7 获取基本的点击率统计信息 统计点击率 Topology 用于记录基本的网站使用量统计信息,具体来说它会统计以下信息: ‰ 访客数量 ‰ 独立访客数量 ‰ 特定国家的访客数量 ‰ 特定城市的访客数量 ‰ 特定国家中每个城市访客数量的百分比 该系统假定访客总量一定,而且希望服务端的用户标示与客户端的 Cookies 是相对应的。 该 Topology 通过 IP 地址和公用 IP 解析服务获取地理位置信息。 统计点击率 Topology 还将使用 Redis 存储发送到 Topology 的点击事件,具体来说就是 将其作为一个持久化队列;除此以外还要利用 Redis 记录再次访问网站的访客信息。 关于 Redis 的更多信息,请访问 Redis.io。 1.7.1 准备工作 在我们开始之前,先要安装 Redis(2.6 或更高版本): Chapter 1 33 In this case, the nimbus node will include the Hiera configurations for cluster1, and the installation for the nimbus and ui nodes will be performed. Any combination of classes can be included in the node definition, thus allowing the complete environment to be succinctly defined. Deriving basic click statistics The click topology is designed to gather basic website-usage statistics, specifically: f The number of visitors f The number of unique visitors f The number of visitors for a given country f The number of visitors for a given city f The percentage of visitors for each city in a given country The system assumes a limited possible visitor population and prefers server-side client keys as opposed to client-side cookies. The topology derives the geographic information from the IP address and a public IP resolution service. The click topology also uses Redis to store click events being sent into the topology, specifically as a persistent queue, and it also leverages Redis in order to persistently recall the previous visitors to the site. For more information on Redis, please visit Redis.io. Getting ready Before you proceed, you must install Redis (Version 2.6 or greater): wget http://download.redis.io/redis-stable.tar.gz tar xvzf redis-stable.tar.gz cd redis-stable make sudo cp redis-server /usr/local/bin/ sudo cp redis-cli /usr/local/bin/ Then start the Redis server.然后启动 Redis 服务器。 1.7.2 实战 Step01 新建一个名为 click-topology 的 Java 项目,然后按照创建“ Hello World” Topology 项目的方法建立 pom.xml 文件和目录结构。 Step02 在 pom.xml 文件中修改对应的项目名称和引用,然后在 标签中添 加以下依赖项: Setting Up Your Development Environment 34 How to do it… 1. Create a new Java project named click-topology, and create the pom.xml file and folder structure as per the "Hello World" topology project. 2. In the pom.xml file, update the project name and references, and then add the following dependencies to the tag: junit junit 4.11 test org.jmock jmock-junit4 2.5.1 test org.jmock jmock-legacy 2.5.1 test redis.clients jedis 2.1.0 3. Take a special note of the scope definitions of JUnit and JMock so as to not include them in your final deployable JAR. 4. In the source/main/java folder, create the ClickTopology main class in the package storm.cookbook package. This class defines the topology and provides the mechanisms to launch the topology into a cluster or in a local mode. Create the class as follows: public ClickTopology(){ builder.setSpout("clickSpout", new ClickSpout(), 10); //First layer of bolts builder.setBolt("repeatsBolt", new RepeatVisitBolt(), 10) .shuffleGrouping("clickSpout"); 24   第1章 搭建开发环境 Setting Up Your Development Environment 34 How to do it… 1. Create a new Java project named click-topology, and create the pom.xml file and folder structure as per the "Hello World" topology project. 2. In the pom.xml file, update the project name and references, and then add the following dependencies to the tag: junit junit 4.11 test org.jmock jmock-junit4 2.5.1 test org.jmock jmock-legacy 2.5.1 test redis.clients jedis 2.1.0 3. Take a special note of the scope definitions of JUnit and JMock so as to not include them in your final deployable JAR. 4. In the source/main/java folder, create the ClickTopology main class in the package storm.cookbook package. This class defines the topology and provides the mechanisms to launch the topology into a cluster or in a local mode. Create the class as follows: public ClickTopology(){ builder.setSpout("clickSpout", new ClickSpout(), 10); //First layer of bolts builder.setBolt("repeatsBolt", new RepeatVisitBolt(), 10) .shuffleGrouping("clickSpout"); Step03 要特别注意 JUnit 和 JMock 的 scope 定义,避免将它们包含在用于最终部署的 JAR 包中。 Step04 在 src/main/java 目录下的package storm.cookbook 包中创建ClickTopology 主类。 该类定义了 Topology,并提供了在集群或者本地模式下运行 Topology 的机制。按照下面的 方式创建这个类: Setting Up Your Development Environment 34 How to do it… 1. Create a new Java project named click-topology, and create the pom.xml file and folder structure as per the "Hello World" topology project. 2. In the pom.xml file, update the project name and references, and then add the following dependencies to the tag: junit junit 4.11 test org.jmock jmock-junit4 2.5.1 test org.jmock jmock-legacy 2.5.1 test redis.clients jedis 2.1.0 3. Take a special note of the scope definitions of JUnit and JMock so as to not include them in your final deployable JAR. 4. In the source/main/java folder, create the ClickTopology main class in the package storm.cookbook package. This class defines the topology and provides the mechanisms to launch the topology into a cluster or in a local mode. Create the class as follows: public ClickTopology(){ builder.setSpout("clickSpout", new ClickSpout(), 10); //First layer of bolts builder.setBolt("repeatsBolt", new RepeatVisitBolt(), 10) .shuffleGrouping("clickSpout"); Chapter 1 35 builder.setBolt("geographyBolt", new GeographyBolt(new HttpIPResolver()), 10) .shuffleGrouping("clickSpout"); //second layer of bolts, commutative in nature builder.setBolt("totalStats", new VisitStatsBolt(), 1).globalGrouping("repeatsBolt"); builder.setBolt("geoStats", new GeoStatsBolt(), 10).fieldsGrouping("geographyBolt", new Fields(storm.cookbook.Fields.COUNTRY)); conf.put(Conf.REDIS_PORT_KEY, DEFAULT_JEDIS_PORT); } public void runLocal(int runTime){ conf.setDebug(true); conf.put(Conf.REDIS_HOST_KEY, "localhost"); cluster = new LocalCluster(); cluster.submitTopology("test", conf, builder.createTopology()); if(runTime > 0){ Utils.sleep(runTime); shutDownLocal(); } } public void shutDownLocal(){ if(cluster != null){ cluster.killTopology("test"); cluster.shutdown(); } } public void runCluster(String name, String redisHost) throws AlreadyAliveException, InvalidTopologyException { conf.setNumWorkers(20); conf.put(Conf.REDIS_HOST_KEY, redisHost); StormSubmitter.submitTopology(name, conf, builder.createTopology()); } // Bolt 的第一层实现 // Bolt 的第二层实现,自然交换 1.7 获取基本的点击率统计信息   25 Chapter 1 35 builder.setBolt("geographyBolt", new GeographyBolt(new HttpIPResolver()), 10) .shuffleGrouping("clickSpout"); //second layer of bolts, commutative in nature builder.setBolt("totalStats", new VisitStatsBolt(), 1).globalGrouping("repeatsBolt"); builder.setBolt("geoStats", new GeoStatsBolt(), 10).fieldsGrouping("geographyBolt", new Fields(storm.cookbook.Fields.COUNTRY)); conf.put(Conf.REDIS_PORT_KEY, DEFAULT_JEDIS_PORT); } public void runLocal(int runTime){ conf.setDebug(true); conf.put(Conf.REDIS_HOST_KEY, "localhost"); cluster = new LocalCluster(); cluster.submitTopology("test", conf, builder.createTopology()); if(runTime > 0){ Utils.sleep(runTime); shutDownLocal(); } } public void shutDownLocal(){ if(cluster != null){ cluster.killTopology("test"); cluster.shutdown(); } } public void runCluster(String name, String redisHost) throws AlreadyAliveException, InvalidTopologyException { conf.setNumWorkers(20); conf.put(Conf.REDIS_HOST_KEY, redisHost); StormSubmitter.submitTopology(name, conf, builder.createTopology()); } Step05 然后实现 main 方法,该方法根据运行时传递进来的参数个数来决定使用哪种 Topology 运行模式: Setting Up Your Development Environment 36 5. This is followed by the main method, which is guided by the number of arguments passed at runtime: public static void main(String[] args) throws Exception { ClickTopology topology = new ClickTopology(); if(args!=null && args.length > 1) { topology.runCluster(args[0], args[1]); } else { if(args!=null && args.length == 1) System.out.println("Running in local mode, redis ip missing for cluster run"); topology.runLocal(10000); } } 6. The topology assumes that the web server pushes messages onto a Redis queue. You must create a spout to inject these into the Storm cluster as a stream. In the storm.cookbook package, create the ClickSpout class, which connects to Redis when it is opened by the cluster: public class ClickSpout extends BaseRichSpout { public static Logger LOG = Logger.getLogger(ClickSpout.class); private Jedis jedis; private String host; private int port; private SpoutOutputCollector collector; @Override public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) { outputFieldsDeclarer.declare(new Fields(storm.cookbook.Fields.IP, storm.cookbook.Fields.URL, storm.cookbook.Fields.CLIENT_KEY)); } @Override public void open(Map conf, TopologyContext topologyContext, SpoutOutputCollector Step06 这个Topology 假定由Web 服务器向Redis 队列推送消息。因此你必须创建一 个 Spout,才能将这些消息作为一个流添加到Storm 集群中。在 storm.coobook 包中创建 ClickSpout 类,用于在集群启动时建立与 Redis 的连接: Setting Up Your Development Environment 36 5. This is followed by the main method, which is guided by the number of arguments passed at runtime: public static void main(String[] args) throws Exception { ClickTopology topology = new ClickTopology(); if(args!=null && args.length > 1) { topology.runCluster(args[0], args[1]); } else { if(args!=null && args.length == 1) System.out.println("Running in local mode, redis ip missing for cluster run"); topology.runLocal(10000); } } 6. The topology assumes that the web server pushes messages onto a Redis queue. You must create a spout to inject these into the Storm cluster as a stream. In the storm.cookbook package, create the ClickSpout class, which connects to Redis when it is opened by the cluster: public class ClickSpout extends BaseRichSpout { public static Logger LOG = Logger.getLogger(ClickSpout.class); private Jedis jedis; private String host; private int port; private SpoutOutputCollector collector; @Override public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) { outputFieldsDeclarer.declare(new Fields(storm.cookbook.Fields.IP, storm.cookbook.Fields.URL, storm.cookbook.Fields.CLIENT_KEY)); } @Override public void open(Map conf, TopologyContext topologyContext, SpoutOutputCollector 26   第1章 搭建开发环境 Setting Up Your Development Environment 36 5. This is followed by the main method, which is guided by the number of arguments passed at runtime: public static void main(String[] args) throws Exception { ClickTopology topology = new ClickTopology(); if(args!=null && args.length > 1) { topology.runCluster(args[0], args[1]); } else { if(args!=null && args.length == 1) System.out.println("Running in local mode, redis ip missing for cluster run"); topology.runLocal(10000); } } 6. The topology assumes that the web server pushes messages onto a Redis queue. You must create a spout to inject these into the Storm cluster as a stream. In the storm.cookbook package, create the ClickSpout class, which connects to Redis when it is opened by the cluster: public class ClickSpout extends BaseRichSpout { public static Logger LOG = Logger.getLogger(ClickSpout.class); private Jedis jedis; private String host; private int port; private SpoutOutputCollector collector; @Override public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) { outputFieldsDeclarer.declare(new Fields(storm.cookbook.Fields.IP, storm.cookbook.Fields.URL, storm.cookbook.Fields.CLIENT_KEY)); } @Override public void open(Map conf, TopologyContext topologyContext, SpoutOutputCollector Chapter 1 37 spoutOutputCollector) { host = conf.get(Conf.REDIS_HOST_KEY).toString(); port = Integer.valueOf(conf .get(Conf.REDIS_PORT_KEY).toString()); this.collector = spoutOutputCollector; connectToRedis(); } private void connectToRedis() { jedis = new Jedis(host, port); } 7. The cluster will then poll the spout for new tuples through the nextTuple method: public void nextTuple() { String content = jedis.rpop("count"); if(content==null || "nil".equals(content)) { try { Thread.sleep(300); } catch (InterruptedException e) {} } else { JSONObject obj=(JSONObject) JSONValue.parse(content); String ip = obj.get(storm.cookbook.Fields.IP).toString(); String url = obj.get(storm.cookbook.Fields.URL).toString(); String clientKey = obj.get(storm.cookbook.Fields.CLIENT_KEY) .toString(); collector.emit(new Values(ip,url,clientKey)); } } 8. Next, we need to create the bolts that will enrich the basic data through the database or remote API lookups. Let us start with the repeat visit bolt. This bolt will check the client's ID against previous visit records and emit the enriched tuple with a flag set for unique visits. Create the RepeatVisitBolt class, providing the open and Redis connection logic: public class RepeatVisitBolt extends BaseRichBolt { private OutputCollector collector; private Jedis jedis; private String host; private int port; @Override Step07 集群会通过 nextTuple 方法从 Spout 获取新的 Tuple: Chapter 1 37 spoutOutputCollector) { host = conf.get(Conf.REDIS_HOST_KEY).toString(); port = Integer.valueOf(conf .get(Conf.REDIS_PORT_KEY).toString()); this.collector = spoutOutputCollector; connectToRedis(); } private void connectToRedis() { jedis = new Jedis(host, port); } 7. The cluster will then poll the spout for new tuples through the nextTuple method: public void nextTuple() { String content = jedis.rpop("count"); if(content==null || "nil".equals(content)) { try { Thread.sleep(300); } catch (InterruptedException e) {} } else { JSONObject obj=(JSONObject) JSONValue.parse(content); String ip = obj.get(storm.cookbook.Fields.IP).toString(); String url = obj.get(storm.cookbook.Fields.URL).toString(); String clientKey = obj.get(storm.cookbook.Fields.CLIENT_KEY) .toString(); collector.emit(new Values(ip,url,clientKey)); } } 8. Next, we need to create the bolts that will enrich the basic data through the database or remote API lookups. Let us start with the repeat visit bolt. This bolt will check the client's ID against previous visit records and emit the enriched tuple with a flag set for unique visits. Create the RepeatVisitBolt class, providing the open and Redis connection logic: public class RepeatVisitBolt extends BaseRichBolt { private OutputCollector collector; private Jedis jedis; private String host; private int port; @Override Step08 接下来,我们需要创建 Bolt,其作用是通过数据库或者远程 API 查询来丰富基本 数据。让我们从“处理重复访客信息 Bolt”开始创建,该 Bolt 会比较客户端 ID 与之前访问 记录是否一致,以此判断用户是否第一次访问,然后发送处理后包含这个标记的 Tuple。创 建 RepeatVisitBolt 类,提供打开逻辑和 Redis 连接逻辑: 1.7 获取基本的点击率统计信息   27 Chapter 1 37 spoutOutputCollector) { host = conf.get(Conf.REDIS_HOST_KEY).toString(); port = Integer.valueOf(conf .get(Conf.REDIS_PORT_KEY).toString()); this.collector = spoutOutputCollector; connectToRedis(); } private void connectToRedis() { jedis = new Jedis(host, port); } 7. The cluster will then poll the spout for new tuples through the nextTuple method: public void nextTuple() { String content = jedis.rpop("count"); if(content==null || "nil".equals(content)) { try { Thread.sleep(300); } catch (InterruptedException e) {} } else { JSONObject obj=(JSONObject) JSONValue.parse(content); String ip = obj.get(storm.cookbook.Fields.IP).toString(); String url = obj.get(storm.cookbook.Fields.URL).toString(); String clientKey = obj.get(storm.cookbook.Fields.CLIENT_KEY) .toString(); collector.emit(new Values(ip,url,clientKey)); } } 8. Next, we need to create the bolts that will enrich the basic data through the database or remote API lookups. Let us start with the repeat visit bolt. This bolt will check the client's ID against previous visit records and emit the enriched tuple with a flag set for unique visits. Create the RepeatVisitBolt class, providing the open and Redis connection logic: public class RepeatVisitBolt extends BaseRichBolt { private OutputCollector collector; private Jedis jedis; private String host; private int port; @OverrideSetting Up Your Development Environment 38 public void prepare(Map conf, TopologyContext topologyContext, OutputCollector outputCollector) { this.collector = outputCollector; host = conf.get(Conf.REDIS_HOST_KEY).toString(); port = Integer.valueOf(conf. get(Conf.REDIS_PORT_KEY).toString()); connectToRedis(); } private void connectToRedis() { jedis = new Jedis(host, port); jedis.connect(); } 9. In the execute method, the tuple from the ClickSpout class is provided by the cluster. The bolt needs to look up the previous visit flags from Redis, based on the fields in the tuple, and emit the enriched tuple: public void execute(Tuple tuple) { String ip = tuple.getStringByField(storm.cookbook.Fields.IP); String clientKey = tuple.getStringByField(storm.cookbook.Fields .CLIENT_KEY); String url = tuple.getStringByField(storm.cookbook.Fields.URL); String key = url + ":" + clientKey; String value = jedis.get(key); if(value == null){ jedis.set(key, "visited"); collector.emit(new Values(clientKey, url, Boolean.TRUE.toString())); } else { collector.emit(new Values(clientKey, url, Boolean.FALSE.toString())); } } 10. Next, the geographic enrichment bolt must be created. This bolt will emit an enriched tuple by looking up the country and city of the client's IP address through a remote API call. The GeographyBolt class delegates the actual call to an injected IP resolver in order to increase the testability of the class. In the storm.cookbook package, create the GeographyBolt class, extending from the BaseRichBolt interface, and implement the execute method: Step09 在 execute 方法中, Tuple 由集群中的 ClickSpout 类提交。 Bolt 会基于 Tuple 里的 字段,从 Redis 查询之前的访问标记,然后发送处理后的 Tuple: Setting Up Your Development Environment 38 public void prepare(Map conf, TopologyContext topologyContext, OutputCollector outputCollector) { this.collector = outputCollector; host = conf.get(Conf.REDIS_HOST_KEY).toString(); port = Integer.valueOf(conf. get(Conf.REDIS_PORT_KEY).toString()); connectToRedis(); } private void connectToRedis() { jedis = new Jedis(host, port); jedis.connect(); } 9. In the execute method, the tuple from the ClickSpout class is provided by the cluster. The bolt needs to look up the previous visit flags from Redis, based on the fields in the tuple, and emit the enriched tuple: public void execute(Tuple tuple) { String ip = tuple.getStringByField(storm.cookbook.Fields.IP); String clientKey = tuple.getStringByField(storm.cookbook.Fields .CLIENT_KEY); String url = tuple.getStringByField(storm.cookbook.Fields.URL); String key = url + ":" + clientKey; String value = jedis.get(key); if(value == null){ jedis.set(key, "visited"); collector.emit(new Values(clientKey, url, Boolean.TRUE.toString())); } else { collector.emit(new Values(clientKey, url, Boolean.FALSE.toString())); } } 10. Next, the geographic enrichment bolt must be created. This bolt will emit an enriched tuple by looking up the country and city of the client's IP address through a remote API call. The GeographyBolt class delegates the actual call to an injected IP resolver in order to increase the testability of the class. In the storm.cookbook package, create the GeographyBolt class, extending from the BaseRichBolt interface, and implement the execute method: Step10 接下来就该创建“处理地理位置信息 Bolt”了。该 Bolt 会通过远程 API 调用查询 客户 IP 地址所对应的国家和城市。 GeographyBolt 类把实际的调用委托给一个预先添加的 IP 解析器,以便于提高类的可测试性。在 storm.cookbook 包中创建 GeographyBolt 类,并继承 BaseRichBolt 接口,然后实现 execute 方法: 28   第1章 搭建开发环境 Chapter 1 39 public void execute(Tuple tuple) { String ip = tuple.getStringByField(storm .cookbook.Fields.IP); JSONObject json = resolver.resolveIP(ip); String city = (String) json.get(storm .cookbook.Fields.CITY); String country = (String) json.get(storm .cookbook.Fields.COUNTRY_NAME); collector.emit(new Values(country, city)); } 11. Provide a resolver by implementing the resolver, HttpIPResolver, and injecting it into GeographyBolt at design time: public class HttpIPResolver implements IPResolver, Serializable { static String url = "http://api.hostip.info/get_json.php"; @Override public JSONObject resolveIP(String ip) { URL geoUrl = null; BufferedReader in = null; try { geoUrl = new URL(url + "?ip=" + ip); URLConnection connection = geoUrl.openConnection(); in = new BufferedReader(new InputStreamReader( connection.getInputStream())); JSONObject json = (JSONObject) JSONValue.parse(in); in.close(); return json; } catch (IOException e) { e.printStackTrace(); } finally { if(in != null){ try { in.close(); } catch (IOException e) {} } } return null; } } Step11 为了能够使用这个解析器,我们需要自己实现一个名为 HttpIPResolver 的解析器, 并在设计的时候就将其添加到 GeographyBolt 类的实现当中: Chapter 1 39 public void execute(Tuple tuple) { String ip = tuple.getStringByField(storm .cookbook.Fields.IP); JSONObject json = resolver.resolveIP(ip); String city = (String) json.get(storm .cookbook.Fields.CITY); String country = (String) json.get(storm .cookbook.Fields.COUNTRY_NAME); collector.emit(new Values(country, city)); } 11. Provide a resolver by implementing the resolver, HttpIPResolver, and injecting it into GeographyBolt at design time: public class HttpIPResolver implements IPResolver, Serializable { static String url = "http://api.hostip.info/get_json.php"; @Override public JSONObject resolveIP(String ip) { URL geoUrl = null; BufferedReader in = null; try { geoUrl = new URL(url + "?ip=" + ip); URLConnection connection = geoUrl.openConnection(); in = new BufferedReader(new InputStreamReader( connection.getInputStream())); JSONObject json = (JSONObject) JSONValue.parse(in); in.close(); return json; } catch (IOException e) { e.printStackTrace(); } finally { if(in != null){ try { in.close(); } catch (IOException e) {} } } return null; } } Step12 接下来,我们还需要获取地理位置统计信息。GeoStatsBolt 类的原理很简单,它从 GeographicBolt 类接收处理完成后的 Tuple,与此同时在内存中维护这个数据结构。它还可 以发送更新的统计信息给订阅者。GeoStatsBolt 类的设计初衷是让我们能够把不同国家的总 人口数据分割到不同的 Bolt 中进行处理。但需要确保一个国家所有城市的数据在同一个 Bolt 中进行处理。Topology 就是根据不同国家这一点将流分割并添加到 Bolt 中的。 1.7 获取基本的点击率统计信息   29 Setting Up Your Development Environment 40 12. Next, we need to derive the geographic stats. The GeoStatsBolt class simply receives the enriched tuple from GeographicBolt and maintains an in-memory structure of the data. It also emits the updated counts to any interested party. The GeoStatsBolt class is designed such that the total population of the countries can be split between many bolts; however, all cities within each country must arrive at the same bolt. The topology, therefore, splits streams into the bolt on this basis: builder.setBolt("geoStats", new GeoStatsBolt(), 10).fieldsGrouping("geographyBolt", new Fields(storm.cookbook.Fields.COUNTRY)); 13. Creating the GeoStatsBolt class, provide the implementation for the execute method: public void execute(Tuple tuple) { String country = tuple.getStringByField(storm.cookbook. Fields.COUNTRY); String city = tuple.getStringByField(Fields.CITY); if(!stats.containsKey(country)){ stats.put(country, new CountryStats(country)); } stats.get(country).cityFound(city); collector.emit(new Values(country, stats.get(country).getCountryTotal(), city, stats.get(country).getCityTotal(city))); } 14. The bulk of logic is contained in the inner-model class that maintains an in-memory model of the city and country: private class CountryStats { private int countryTotal = 0; private static final int COUNT_INDEX = 0; private static final int PERCENTAGE_INDEX = 1; private String countryName; public CountryStats(String countryName){ this.countryName = countryName; } private Map> cityStats = new HashMap>(); public void cityFound(String cityName){ countryTotal++; if(cityStats.containsKey(cityName)){ cityStats.get(cityName).set(COUNT_INDEX, cityStats.get(cityName) Step13 创建 GeoStatsBolt 类,并实现 execute 方法: Setting Up Your Development Environment 40 12. Next, we need to derive the geographic stats. The GeoStatsBolt class simply receives the enriched tuple from GeographicBolt and maintains an in-memory structure of the data. It also emits the updated counts to any interested party. The GeoStatsBolt class is designed such that the total population of the countries can be split between many bolts; however, all cities within each country must arrive at the same bolt. The topology, therefore, splits streams into the bolt on this basis: builder.setBolt("geoStats", new GeoStatsBolt(), 10).fieldsGrouping("geographyBolt", new Fields(storm.cookbook.Fields.COUNTRY)); 13. Creating the GeoStatsBolt class, provide the implementation for the execute method: public void execute(Tuple tuple) { String country = tuple.getStringByField(storm.cookbook. Fields.COUNTRY); String city = tuple.getStringByField(Fields.CITY); if(!stats.containsKey(country)){ stats.put(country, new CountryStats(country)); } stats.get(country).cityFound(city); collector.emit(new Values(country, stats.get(country).getCountryTotal(), city, stats.get(country).getCityTotal(city))); } 14. The bulk of logic is contained in the inner-model class that maintains an in-memory model of the city and country: private class CountryStats { private int countryTotal = 0; private static final int COUNT_INDEX = 0; private static final int PERCENTAGE_INDEX = 1; private String countryName; public CountryStats(String countryName){ this.countryName = countryName; } private Map> cityStats = new HashMap>(); public void cityFound(String cityName){ countryTotal++; if(cityStats.containsKey(cityName)){ cityStats.get(cityName).set(COUNT_INDEX, cityStats.get(cityName) Step14 大部分逻辑都包含在内部模型类中,这个类在内存中维护城市和国家信息: Setting Up Your Development Environment 40 12. Next, we need to derive the geographic stats. The GeoStatsBolt class simply receives the enriched tuple from GeographicBolt and maintains an in-memory structure of the data. It also emits the updated counts to any interested party. The GeoStatsBolt class is designed such that the total population of the countries can be split between many bolts; however, all cities within each country must arrive at the same bolt. The topology, therefore, splits streams into the bolt on this basis: builder.setBolt("geoStats", new GeoStatsBolt(), 10).fieldsGrouping("geographyBolt", new Fields(storm.cookbook.Fields.COUNTRY)); 13. Creating the GeoStatsBolt class, provide the implementation for the execute method: public void execute(Tuple tuple) { String country = tuple.getStringByField(storm.cookbook. Fields.COUNTRY); String city = tuple.getStringByField(Fields.CITY); if(!stats.containsKey(country)){ stats.put(country, new CountryStats(country)); } stats.get(country).cityFound(city); collector.emit(new Values(country, stats.get(country).getCountryTotal(), city, stats.get(country).getCityTotal(city))); } 14. The bulk of logic is contained in the inner-model class that maintains an in-memory model of the city and country: private class CountryStats { private int countryTotal = 0; private static final int COUNT_INDEX = 0; private static final int PERCENTAGE_INDEX = 1; private String countryName; public CountryStats(String countryName){ this.countryName = countryName; } private Map> cityStats = new HashMap>(); public void cityFound(String cityName){ countryTotal++; if(cityStats.containsKey(cityName)){ cityStats.get(cityName).set(COUNT_INDEX, cityStats.get(cityName) Chapter 1 41 .get(COUNT_INDEX).intValue() + 1); } else { List list = new LinkedList(); //add some dummy data list.add(1); list.add(0); cityStats.put(cityName, list); } double percent = (double)cityStats.get(cityName) .get(COUNT_INDEX)/(double)countryTotal; cityStats.get(cityName).set(PERCENTAGE_INDEX, (int)percent); } public int getCountryTotal(){return countryTotal;} public int getCityTotal(String cityName){ return cityStats.get(cityName) .get(COUNT_INDEX).intValue(); } } 15. Finally, the VisitorStatsBolt method provides the final counting functionality for visitors and unique visits, based on the enriched stream from the RepeatVisitBolt class. This bolt needs to receive all the count information in order to maintain a single in-memory count, which is reflected in the topology definition: builder.setBolt("totalStats", new VisitStatsBolt(), 1).globalGroup ing("repeatsBolt"); 16. In order to implement the VisitorStatsBolt class, create the class and define two member-level integers, total and uniqueCount; then implement the execute method: public void execute(Tuple tuple) { boolean unique = Boolean.parseBoolean(tuple .getStringByField(storm.cookbook.Fields.UNIQUE)); total++; if(unique)uniqueCount++; collector.emit(new Values(total,uniqueCount)); } // 添加一些假的数据 30   第1章 搭建开发环境 Chapter 1 41 .get(COUNT_INDEX).intValue() + 1); } else { List list = new LinkedList(); //add some dummy data list.add(1); list.add(0); cityStats.put(cityName, list); } double percent = (double)cityStats.get(cityName) .get(COUNT_INDEX)/(double)countryTotal; cityStats.get(cityName).set(PERCENTAGE_INDEX, (int)percent); } public int getCountryTotal(){return countryTotal;} public int getCityTotal(String cityName){ return cityStats.get(cityName) .get(COUNT_INDEX).intValue(); } } 15. Finally, the VisitorStatsBolt method provides the final counting functionality for visitors and unique visits, based on the enriched stream from the RepeatVisitBolt class. This bolt needs to receive all the count information in order to maintain a single in-memory count, which is reflected in the topology definition: builder.setBolt("totalStats", new VisitStatsBolt(), 1).globalGroup ing("repeatsBolt"); 16. In order to implement the VisitorStatsBolt class, create the class and define two member-level integers, total and uniqueCount; then implement the execute method: public void execute(Tuple tuple) { boolean unique = Boolean.parseBoolean(tuple .getStringByField(storm.cookbook.Fields.UNIQUE)); total++; if(unique)uniqueCount++; collector.emit(new Values(total,uniqueCount)); } Step15 最后,基于 RepeatVisitBolt 类处理完成后的流, VisitorStatsBolt 方法对访客数量 和独立访客数量进行最终统计。这个 Bolt 在内存中维护了访客与独立访客的数量,它需要接 收所有的计数信息并进行统计,我们能通过 Topology 中的定义看出些端倪: Chapter 1 41 .get(COUNT_INDEX).intValue() + 1); } else { List list = new LinkedList(); //add some dummy data list.add(1); list.add(0); cityStats.put(cityName, list); } double percent = (double)cityStats.get(cityName) .get(COUNT_INDEX)/(double)countryTotal; cityStats.get(cityName).set(PERCENTAGE_INDEX, (int)percent); } public int getCountryTotal(){return countryTotal;} public int getCityTotal(String cityName){ return cityStats.get(cityName) .get(COUNT_INDEX).intValue(); } } 15. Finally, the VisitorStatsBolt method provides the final counting functionality for visitors and unique visits, based on the enriched stream from the RepeatVisitBolt class. This bolt needs to receive all the count information in order to maintain a single in-memory count, which is reflected in the topology definition: builder.setBolt("totalStats", new VisitStatsBolt(), 1).globalGroup ing("repeatsBolt"); 16. In order to implement the VisitorStatsBolt class, create the class and define two member-level integers, total and uniqueCount; then implement the execute method: public void execute(Tuple tuple) { boolean unique = Boolean.parseBoolean(tuple .getStringByField(storm.cookbook.Fields.UNIQUE)); total++; if(unique)uniqueCount++; collector.emit(new Values(total,uniqueCount)); } Step16 现在来实现 VisitorStatsBolt 类,创建这个类并定义两个整型成员变量, total 和 uniqueCount,然后实现 execute 方法: Chapter 1 41 .get(COUNT_INDEX).intValue() + 1); } else { List list = new LinkedList(); //add some dummy data list.add(1); list.add(0); cityStats.put(cityName, list); } double percent = (double)cityStats.get(cityName) .get(COUNT_INDEX)/(double)countryTotal; cityStats.get(cityName).set(PERCENTAGE_INDEX, (int)percent); } public int getCountryTotal(){return countryTotal;} public int getCityTotal(String cityName){ return cityStats.get(cityName) .get(COUNT_INDEX).intValue(); } } 15. Finally, the VisitorStatsBolt method provides the final counting functionality for visitors and unique visits, based on the enriched stream from the RepeatVisitBolt class. This bolt needs to receive all the count information in order to maintain a single in-memory count, which is reflected in the topology definition: builder.setBolt("totalStats", new VisitStatsBolt(), 1).globalGroup ing("repeatsBolt"); 16. In order to implement the VisitorStatsBolt class, create the class and define two member-level integers, total and uniqueCount; then implement the execute method: public void execute(Tuple tuple) { boolean unique = Boolean.parseBoolean(tuple .getStringByField(storm.cookbook.Fields.UNIQUE)); total++; if(unique)uniqueCount++; collector.emit(new Values(total,uniqueCount)); } 1.7.3 解析 图 1-8 描述了统计点击率 Topology。 访问统计 Bolt 地理位置 信息统计 Bolt 地理位置 信息统计 Bolt 点击统计 Spout 点击统计 Spout 处理重复 访客信息 Bolt 处理重复 访客信息 Bolt 处理地理 位置信息 Bolt 处理地理 位置信息 Bolt 图 1-8 统计点击率 Topology 1.8 对Bolt进行单元测试   31 Spout 把 Web 服务器上获得的点击事件发送到 Topology 中,再通过随机分组转发给“处 理地理位置信息 Bolt”和“处理重复访客信息 Bolt”。这样能保证集群内的负载均衡,对于 像这样处理速度慢或高延时的处理过程来说负载均衡尤为重要。 在设计 Topology 之前,需要充分理解数据模型的交换结合性质、你的流的 其他问题和结构的固有模型。 在搭建 Topology 的时候,要对 Storm 的并行性有充分的认识。Storm wiki 上有一篇文章 对这个问题作了很好的总结,网址是 https://github.com/nathanmarz/storm/wiki/Understanding- the-parallelism-of-a-Storm-topology。其中有几个要点需要我们注意: ‰ Topology 中工作进程(Worker)的数量(TOPOLOGY_WORKERS)。 ‰ Topology 中每个组件执行单元(线程)的数量。这个值取决于并行程度。值得注意 的是,这组值只是初始值(线程数),运行过程中可通过 Topology 重新配置(通过 UI 或 CLI)执行单元的数量。还可以通过 Config#setMaxTaskParallelism() 方法限制执行 单元的数量。 ‰ 每个执行单元的默认任务数都是 1。可以在声明新组件的时候通过 ComponentConfig urationDeclarer#setNumTasks() 方法调整这个值。 这些是调整集群大小时要点。集群会尝试分发任务给工作进程,每个进程包含许多执行 单元,而每个执行单元又可能在运行一个或多个任务。每个工作进程的执行单元数量其实是 一个执行单元总数量和执行进程总数量之比的函数。关于这个问题,可以在刚才提到的 wiki 页面上找到一个很好的例子。 可以参考节点的数量和每个节点处理器的核数来调整集群大小。我们推荐为每个线程 (执行单元)分配一个核。 1.8 对 Bolt 进行单元测试 对于任何产品交付来说,单元测试都是其重要组成部分,我们也必须对 Bolt 中实现的逻 辑进行单元测试。 1.8.1 准备工作 单元测试中有一种很常用的技术叫做 Mock,它允许你动态生成依赖对象的假实例,这 样才能保证对一个特定类进行准确的单元测试。本书通过 JUnit 4 和 JMock 来演示单元测试。 请花些时间阅读 JMock 使用方法,网址是 http://jmock.org/cookbook.html。 32   第1章 搭建开发环境 1.8.2 实战 Step01 在 src/test/java 目录下创建 storm.cookbook 包,然后创建 StormTestCase 类。该类 是一些初始化代码的抽象类: Chapter 1 43 These are the key elements to consider when sizing your cluster. The cluster will try distributing work to worker processes, each containing many executors that may be executing one or more tasks. The number of executors per worker is therefore a function of the number of executors over the number of workers. A good example of this can be seen in the previously mentioned wiki page. Using these numbers, you can size your cluster in terms of nodes and cores per node, where ideally you should have one core per thread (executor) in the cluster. Unit testing a bolt Unit testing is an essential part of any delivery; the logic contained in the bolts must also be unit tested. Getting ready Unit testing often involves a process called mocking that allows you to use dynamically generated fake instances of objects as dependencies in order to ensure that a particular class is tested on a unit basis. This book illustrates unit testing using JUnit 4 and JMock. Please take the time to read up on JMock's recipes online at http://jmock.org/cookbook.html. How to do it… 1. In the src/test/java folder, create the storm.cookbook package and create the StormTestCase class. This class is a simple abstraction of some of the initialization code: public class StormTestCase { protected Mockery context = new Mockery() {{ setImposteriser(ClassImposteriser.INSTANCE); }}; protected Tuple getTuple(){ final Tuple tuple = context.mock(Tuple.class); return tuple; } } 2. Then create the TestRepeatVisitBolt class that extends from StormTestCase, and mark it with the parameterized runner annotation: @RunWith(value = Parameterized.class) public class TestRepeatVisitBold extends StormTestCase { Step02 创建 TestRepeatVisitBolt 类,并继承 StormTestCase,然后使用参数化执行器注解 来标记它: Chapter 1 43 These are the key elements to consider when sizing your cluster. The cluster will try distributing work to worker processes, each containing many executors that may be executing one or more tasks. The number of executors per worker is therefore a function of the number of executors over the number of workers. A good example of this can be seen in the previously mentioned wiki page. Using these numbers, you can size your cluster in terms of nodes and cores per node, where ideally you should have one core per thread (executor) in the cluster. Unit testing a bolt Unit testing is an essential part of any delivery; the logic contained in the bolts must also be unit tested. Getting ready Unit testing often involves a process called mocking that allows you to use dynamically generated fake instances of objects as dependencies in order to ensure that a particular class is tested on a unit basis. This book illustrates unit testing using JUnit 4 and JMock. Please take the time to read up on JMock's recipes online at http://jmock.org/cookbook.html. How to do it… 1. In the src/test/java folder, create the storm.cookbook package and create the StormTestCase class. This class is a simple abstraction of some of the initialization code: public class StormTestCase { protected Mockery context = new Mockery() {{ setImposteriser(ClassImposteriser.INSTANCE); }}; protected Tuple getTuple(){ final Tuple tuple = context.mock(Tuple.class); return tuple; } } 2. Then create the TestRepeatVisitBolt class that extends from StormTestCase, and mark it with the parameterized runner annotation: @RunWith(value = Parameterized.class) public class TestRepeatVisitBold extends StormTestCase { Step03 execute 方法中涵盖了类的测试用例逻辑: Setting Up Your Development Environment 44 3. The test case logic of the class is contained in a single execute method: public void testExecute(){ jedis = new Jedis("localhost",6379); RepeatVisitBolt bolt = new RepeatVisitBolt(); Map config = new HashMap(); config.put("redis-host", "localhost"); config.put("redis-port", "6379"); final OutputCollector collector = context.mock(OutputCollector.class); bolt.prepare(config, null, collector); assertEquals(true, bolt.isConnected()); final Tuple tuple = getTuple(); context.checking(new Expectations(){{ oneOf(tuple).getStringByField(Fields IP);will(returnValue(ip)); oneOf(tuple).getStringByField(Fields .CLIENT_KEY);will(returnValue(clientKey)); oneOf(tuple).getStringByField(Fields .URL);will(returnValue(url)); oneOf(collector).emit(new Values (clientKey, url, expected)); }}); bolt.execute(tuple); context.assertIsSatisfied(); if(jedis != null) jedis.disconnect(); } 4. Next, the parameters must be defined: @Parameterized.Parameters public static Collection data() { Object[][] data = new Object[][] { { "192.168.33.100", "Client1", "myintranet.com", "false" }, { "192.168.33.100", "Client1", "myintranet.com", "false" }, { "192.168.33.101", "Client2", "myintranet1.com", "true" }, { "192.168.33.102", "Client3", "myintranet2.com", false"}}; return Arrays.asList(data); } 1.8 对Bolt进行单元测试   33 Setting Up Your Development Environment 44 3. The test case logic of the class is contained in a single execute method: public void testExecute(){ jedis = new Jedis("localhost",6379); RepeatVisitBolt bolt = new RepeatVisitBolt(); Map config = new HashMap(); config.put("redis-host", "localhost"); config.put("redis-port", "6379"); final OutputCollector collector = context.mock(OutputCollector.class); bolt.prepare(config, null, collector); assertEquals(true, bolt.isConnected()); final Tuple tuple = getTuple(); context.checking(new Expectations(){{ oneOf(tuple).getStringByField(Fields IP);will(returnValue(ip)); oneOf(tuple).getStringByField(Fields .CLIENT_KEY);will(returnValue(clientKey)); oneOf(tuple).getStringByField(Fields .URL);will(returnValue(url)); oneOf(collector).emit(new Values (clientKey, url, expected)); }}); bolt.execute(tuple); context.assertIsSatisfied(); if(jedis != null) jedis.disconnect(); } 4. Next, the parameters must be defined: @Parameterized.Parameters public static Collection data() { Object[][] data = new Object[][] { { "192.168.33.100", "Client1", "myintranet.com", "false" }, { "192.168.33.100", "Client1", "myintranet.com", "false" }, { "192.168.33.101", "Client2", "myintranet1.com", "true" }, { "192.168.33.102", "Client3", "myintranet2.com", false"}}; return Arrays.asList(data); } Step04 接下来,我们还需要定义一些参数: Setting Up Your Development Environment 44 3. The test case logic of the class is contained in a single execute method: public void testExecute(){ jedis = new Jedis("localhost",6379); RepeatVisitBolt bolt = new RepeatVisitBolt(); Map config = new HashMap(); config.put("redis-host", "localhost"); config.put("redis-port", "6379"); final OutputCollector collector = context.mock(OutputCollector.class); bolt.prepare(config, null, collector); assertEquals(true, bolt.isConnected()); final Tuple tuple = getTuple(); context.checking(new Expectations(){{ oneOf(tuple).getStringByField(Fields IP);will(returnValue(ip)); oneOf(tuple).getStringByField(Fields .CLIENT_KEY);will(returnValue(clientKey)); oneOf(tuple).getStringByField(Fields .URL);will(returnValue(url)); oneOf(collector).emit(new Values (clientKey, url, expected)); }}); bolt.execute(tuple); context.assertIsSatisfied(); if(jedis != null) jedis.disconnect(); } 4. Next, the parameters must be defined: @Parameterized.Parameters public static Collection data() { Object[][] data = new Object[][] { { "192.168.33.100", "Client1", "myintranet.com", "false" }, { "192.168.33.100", "Client1", "myintranet.com", "false" }, { "192.168.33.101", "Client2", "myintranet1.com", "true" }, { "192.168.33.102", "Client3", "myintranet2.com", false"}}; return Arrays.asList(data); } Step05 在使用 Redis 进行测试之前,必须初始化一些基本配置的值: Chapter 1 45 5. The base provisioning of the values must be done before the tests using Redis: @BeforeClass public static void setupJedis(){ Jedis jedis = new Jedis("localhost",6379); jedis.flushDB(); Iterator it = data().iterator(); while(it.hasNext()){ Object[] values = it.next(); if(values[3].equals("false")){ String key = values[2] + ":" + values[1]; jedis.set(key, "visited");//unique, meaning it must exist } } } It is always useful to leave data in the stack after the test completes in order to review and debug, clearing again only on the next run. How it works… Firstly, the unit test works by defining a set of test data. This allows us to test many different cases without unnecessary abstractions or duplication. Before the tests execute, the static data is populated into the Redis DB, thus allowing the tests to run deterministically. The test method is then executed once per line of parameterized data; many different cases are verified. JMock provides mock instances of the collector and the tuples to be emitted by the bolt. The expected behavior is then defined in terms of these mocked objects and their interactions: context.checking(new Expectations(){{ oneOf(tuple).getStringByField(Fields.IP);will(returnValue(ip)); oneOf(tuple).getStringByField(Fields.CLIENT_KEY);will(returnValue( clientKey)); oneOf(tuple).getStringByField(Fields.URL);will(returnValue(url)); oneOf(collector).emit(new Values(clientKey, url, expected)); }}); Although these are separate lines of code, within the bounds of the expectations they should be read declaratively. I expect the getStringField method of the tuple to be called exactly once with the value ip, and the mock object must then return a value to the class being tested. 保留测试完成后栈中的数据总是有用的,这样便于复查和调试逻辑。我们只 要在下一次测试执行时清理栈中的数据就好了。 1.8.3 解析 首先,单元测试是由一组测试数据驱动的测试。这就是说我们能在测试不同用例的时候免 去许多无谓和重复的操作。在测试运行之前会将静态数据写入 Redis 数据库中,保证了测试的 正确运行。接着测试程序就会一行一行地执行参数化的测试数据,并验证许多不同的用例。 JMock 负责 Mock 收集器及 Bolt 发送的 Tuple。然后通过 Mock 的对象与对象之间的交 互来定义我们预期执行的操作: 34   第1章 搭建开发环境 Chapter 1 45 5. The base provisioning of the values must be done before the tests using Redis: @BeforeClass public static void setupJedis(){ Jedis jedis = new Jedis("localhost",6379); jedis.flushDB(); Iterator it = data().iterator(); while(it.hasNext()){ Object[] values = it.next(); if(values[3].equals("false")){ String key = values[2] + ":" + values[1]; jedis.set(key, "visited");//unique, meaning it must exist } } } It is always useful to leave data in the stack after the test completes in order to review and debug, clearing again only on the next run. How it works… Firstly, the unit test works by defining a set of test data. This allows us to test many different cases without unnecessary abstractions or duplication. Before the tests execute, the static data is populated into the Redis DB, thus allowing the tests to run deterministically. The test method is then executed once per line of parameterized data; many different cases are verified. JMock provides mock instances of the collector and the tuples to be emitted by the bolt. The expected behavior is then defined in terms of these mocked objects and their interactions: context.checking(new Expectations(){{ oneOf(tuple).getStringByField(Fields.IP);will(returnValue(ip)); oneOf(tuple).getStringByField(Fields.CLIENT_KEY);will(returnValue( clientKey)); oneOf(tuple).getStringByField(Fields.URL);will(returnValue(url)); oneOf(collector).emit(new Values(clientKey, url, expected)); }}); Although these are separate lines of code, within the bounds of the expectations they should be read declaratively. I expect the getStringField method of the tuple to be called exactly once with the value ip, and the mock object must then return a value to the class being tested. 我们应该把 Expectations 对象内的代码当做描述性的语言来理解。比如方法体第一行的 意思就是:希望 Tuple 的 getStringByField 方法被精确地调用一次(必须只被调用一次),而 且必定会返回一个值给被测试的对象。 这种机制能够让我们准确地测试 Bolt。 还有很多种不同的单元测试方法,我们通常会利用数据库来实现测试。除非 万不得已,还是建议 Mock 所有类的依赖项,这样才能实现真正意义上的单 元测试。对于“地理位置信息 Bolt”的 resolver 抽象结构来说,我们完全可 以 Mock 所有成员。 1.9 实现集成测试 由于环境和使用者的差异,集成测试可能会代表多种不同的概念。在本书中,集成测 试指的是根据事先定义的输入和输出点,通过端到端的方法测试本地集群 Topology 的方法。 这样就能确保在部署到实际的集群环境之前对 Topology 进行完整的功能性验证。 1.9.1 实战 Step01 在 src/test/java 目录下的 storm.cookbook 包中创建 IntegrationTestTopology 类。我 们在这里添加一个“测试 Bolt”类,以便于搭建本地 Topology: Setting Up Your Development Environment 46 This mechanism provides a clean way to exercise the bolt. There are many different kinds of unit tests; often it becomes necessary to test against a DB in such a manner; if you can help it, rather mock out all dependencies of the class and implement a true unit test. This would be possible with the geography bolt due to the resolver abstraction. Implementing an integration test Integration testing can mean many different things depending on the situation and audience. For the purposes of this book, integration testing is a means of testing the topology from end-to-end, with defined input and output points within a local cluster. This allows for a full-functional verification of the functionality before deploying it to an actual cluster. How to do it… 1. Create the IntegrationTestTopology class in the src/test/java folder in the storm.cookbook package. Set up a local topology by adding in a testing utility bolt: @BeforeClass public static void setup(){ //We want all output tuples coming to the mock for // testing purposes topology.getBuilder().setBolt("testBolt",testBolt, 1) .globalGrouping("geoStats") .globalGrouping("totalStats"); // run in local mode, but we will shut the cluster down // when we are finished topology.runLocal(0); //jedis required for input and output of the cluster jedis = new Jedis("localhost", Integer.parseInt(ClickTopology.DEFAULT_JEDIS_PORT)); jedis.connect(); jedis.flushDB(); //give it some time to startup before running the tests. Utils.sleep(5000); } 2. Then, define the expected parameters as a set of arrays arranged in pairs: @Parameterized.Parameters public static Collection data() { Object[][] data = new Object[][] { {new Object[]{ "165.228.250.178", "internal.com", "Client1"}, //input // 我们希望所有的输出 Tuple 都会流动到这个测试用例的 mock 对象上 // 在本地模式下运行,不过当任务结束的时候我们会停止集群 // 集群的输入和输出需要使用 Jedis 1.9 实现集成测试   35 Setting Up Your Development Environment 46 This mechanism provides a clean way to exercise the bolt. There are many different kinds of unit tests; often it becomes necessary to test against a DB in such a manner; if you can help it, rather mock out all dependencies of the class and implement a true unit test. This would be possible with the geography bolt due to the resolver abstraction. Implementing an integration test Integration testing can mean many different things depending on the situation and audience. For the purposes of this book, integration testing is a means of testing the topology from end-to-end, with defined input and output points within a local cluster. This allows for a full-functional verification of the functionality before deploying it to an actual cluster. How to do it… 1. Create the IntegrationTestTopology class in the src/test/java folder in the storm.cookbook package. Set up a local topology by adding in a testing utility bolt: @BeforeClass public static void setup(){ //We want all output tuples coming to the mock for // testing purposes topology.getBuilder().setBolt("testBolt",testBolt, 1) .globalGrouping("geoStats") .globalGrouping("totalStats"); // run in local mode, but we will shut the cluster down // when we are finished topology.runLocal(0); //jedis required for input and output of the cluster jedis = new Jedis("localhost", Integer.parseInt(ClickTopology.DEFAULT_JEDIS_PORT)); jedis.connect(); jedis.flushDB(); //give it some time to startup before running the tests. Utils.sleep(5000); } 2. Then, define the expected parameters as a set of arrays arranged in pairs: @Parameterized.Parameters public static Collection data() { Object[][] data = new Object[][] { {new Object[]{ "165.228.250.178", "internal.com", "Client1"}, //input Step02 然后,定义一组成对数组作为所需的参数: Setting Up Your Development Environment 46 This mechanism provides a clean way to exercise the bolt. There are many different kinds of unit tests; often it becomes necessary to test against a DB in such a manner; if you can help it, rather mock out all dependencies of the class and implement a true unit test. This would be possible with the geography bolt due to the resolver abstraction. Implementing an integration test Integration testing can mean many different things depending on the situation and audience. For the purposes of this book, integration testing is a means of testing the topology from end-to-end, with defined input and output points within a local cluster. This allows for a full-functional verification of the functionality before deploying it to an actual cluster. How to do it… 1. Create the IntegrationTestTopology class in the src/test/java folder in the storm.cookbook package. Set up a local topology by adding in a testing utility bolt: @BeforeClass public static void setup(){ //We want all output tuples coming to the mock for // testing purposes topology.getBuilder().setBolt("testBolt",testBolt, 1) .globalGrouping("geoStats") .globalGrouping("totalStats"); // run in local mode, but we will shut the cluster down // when we are finished topology.runLocal(0); //jedis required for input and output of the cluster jedis = new Jedis("localhost", Integer.parseInt(ClickTopology.DEFAULT_JEDIS_PORT)); jedis.connect(); jedis.flushDB(); //give it some time to startup before running the tests. Utils.sleep(5000); } 2. Then, define the expected parameters as a set of arrays arranged in pairs: @Parameterized.Parameters public static Collection data() { Object[][] data = new Object[][] { {new Object[]{ "165.228.250.178", "internal.com", "Client1"}, //input Chapter 1 47 new Object[]{ "AUSTRALIA", new Long(1), "SYDNEY", new Long(1), new Long(1), new Long(1) } },//expectations {new Object[]{ "165.228.250.178", "internal.com", "Client1"}, //input new Object[]{ "AUSTRALIA", new Long(2), "SYDNEY", new Long(2), new Long(2), new Long(1) } }, {new Object[]{ "4.17.136.0", "internal.com", "Client1"}, //input, same client, different location new Object[]{ "UNITED STATES", new Long(1), "DERRY, NH", new Long(1), new Long(3), new Long(1) } }, {new Object[]{ "4.17.136.0", "internal.com", "Client2"}, //input, same client, different location new Object[]{ "UNITED STATES", new Long(2), "DERRY, NH", new Long(2), new Long(4), new Long(2) } }};//expectations return Arrays.asList(data); } Object[] input; Object[] expected; public IntegrationTestTopology(Object[] input,Object[] expected){ this.input = input; this.expected = expected; } 3. The test logic can then be based on these parameters: @Test public void inputOutputClusterTest(){ JSONObject content = new JSONObject(); content.put("ip" ,input[0]); content.put("url" ,input[1]); content.put("clientKey" ,input[2]); jedis.rpush("count", content.toJSONString()); Utils.sleep(3000); int count = 0; String data = jedis.rpop("TestTuple"); while(data != null){ JSONArray values = (JSONArray) JSONValue.parse(data); if(values.get(0).toString().contains("geoStats")){ count++; Step03 这样我们就可以根据这些参数编写测试逻辑了: Chapter 1 47 new Object[]{ "AUSTRALIA", new Long(1), "SYDNEY", new Long(1), new Long(1), new Long(1) } },//expectations {new Object[]{ "165.228.250.178", "internal.com", "Client1"}, //input new Object[]{ "AUSTRALIA", new Long(2), "SYDNEY", new Long(2), new Long(2), new Long(1) } }, {new Object[]{ "4.17.136.0", "internal.com", "Client1"}, //input, same client, different location new Object[]{ "UNITED STATES", new Long(1), "DERRY, NH", new Long(1), new Long(3), new Long(1) } }, {new Object[]{ "4.17.136.0", "internal.com", "Client2"}, //input, same client, different location new Object[]{ "UNITED STATES", new Long(2), "DERRY, NH", new Long(2), new Long(4), new Long(2) } }};//expectations return Arrays.asList(data); } Object[] input; Object[] expected; public IntegrationTestTopology(Object[] input,Object[] expected){ this.input = input; this.expected = expected; } 3. The test logic can then be based on these parameters: @Test public void inputOutputClusterTest(){ JSONObject content = new JSONObject(); content.put("ip" ,input[0]); content.put("url" ,input[1]); content.put("clientKey" ,input[2]); jedis.rpush("count", content.toJSONString()); Utils.sleep(3000); int count = 0; String data = jedis.rpop("TestTuple"); while(data != null){ JSONArray values = (JSONArray) JSONValue.parse(data); if(values.get(0).toString().contains("geoStats")){ count++; // 在开始测试之前等待一会儿,等待任务启动 // 输入数据 // 期望集 // 输入数据 // 输入数据,相同客户,不同地点 // 输入数据,相同客户,不同地点 // 期望集 36   第1章 搭建开发环境 Chapter 1 47 new Object[]{ "AUSTRALIA", new Long(1), "SYDNEY", new Long(1), new Long(1), new Long(1) } },//expectations {new Object[]{ "165.228.250.178", "internal.com", "Client1"}, //input new Object[]{ "AUSTRALIA", new Long(2), "SYDNEY", new Long(2), new Long(2), new Long(1) } }, {new Object[]{ "4.17.136.0", "internal.com", "Client1"}, //input, same client, different location new Object[]{ "UNITED STATES", new Long(1), "DERRY, NH", new Long(1), new Long(3), new Long(1) } }, {new Object[]{ "4.17.136.0", "internal.com", "Client2"}, //input, same client, different location new Object[]{ "UNITED STATES", new Long(2), "DERRY, NH", new Long(2), new Long(4), new Long(2) } }};//expectations return Arrays.asList(data); } Object[] input; Object[] expected; public IntegrationTestTopology(Object[] input,Object[] expected){ this.input = input; this.expected = expected; } 3. The test logic can then be based on these parameters: @Test public void inputOutputClusterTest(){ JSONObject content = new JSONObject(); content.put("ip" ,input[0]); content.put("url" ,input[1]); content.put("clientKey" ,input[2]); jedis.rpush("count", content.toJSONString()); Utils.sleep(3000); int count = 0; String data = jedis.rpop("TestTuple"); while(data != null){ JSONArray values = (JSONArray) JSONValue.parse(data); if(values.get(0).toString().contains("geoStats")){ count++; Setting Up Your Development Environment 48 assertEquals(expected[0], values.get(1).toString().toUpperCase()); assertEquals(expected[1], values.get(2)); assertEquals(expected[2], values.get(3).toString().toUpperCase()); assertEquals(expected[3], values.get(4)); } else if(values.get(0).toString(). contains("totalStats")) { count++; assertEquals(expected[4], values.get(1)); assertEquals(expected[5], values.get(2)); } data = jedis.rpop("TestTuple"); } assertEquals(2, count); } How it works… The integration test works by creating a local cluster and then injecting input values into the cluster through Redis, in the same way as a real web server would for the given design. It then adds a specific testing bolt to the end of the topology that receives all the output tuples and tests these against the expected values. Once the TestBolt value is submitted to the cluster, it is no longer accessible from the test; therefore, the outputs can only be accessed through persistence. TestBolt persists received tuples to Redis, where the test case can read and validate them. The logic within TestBolt is as follows: public void execute(Tuple input) { List objects = input.getValues(); objects.add(0, input.getSourceComponent()); jedis.rpush("TestTuple", JSONArray.toJSONString(objects)); } This is then read by the test and validated against the expected values: String data = jedis.rpop("TestTuple"); while(data != null){ JSONArray values = (JSONArray) JSONValue.parse(data); 1.9.2 解析 该集成测试首先会创建本地集群,然后通过 Redis 将输入数据添加到集群中,这个 过程与实际 Web 服务器的工作原理一致。然后它会把预先定义好的“测试 Bolt”连接到 Topology 结构的末尾,这样我们就能接收所有输出的 Tuple 并与期望值比较。 一旦 TestBolt 的值提交到了集群,在测试过程中就再也不能访问它了。所以只能通过数 据持久化手段访问这些输出数据。TestBolt 会把收到的 Tuple 数据持久化存储在 Redis 上,这 样测试用例就可以读取并验证这些值了。TestBolt 的代码逻辑如下所示: Setting Up Your Development Environment 48 assertEquals(expected[0], values.get(1).toString().toUpperCase()); assertEquals(expected[1], values.get(2)); assertEquals(expected[2], values.get(3).toString().toUpperCase()); assertEquals(expected[3], values.get(4)); } else if(values.get(0).toString(). contains("totalStats")) { count++; assertEquals(expected[4], values.get(1)); assertEquals(expected[5], values.get(2)); } data = jedis.rpop("TestTuple"); } assertEquals(2, count); } How it works… The integration test works by creating a local cluster and then injecting input values into the cluster through Redis, in the same way as a real web server would for the given design. It then adds a specific testing bolt to the end of the topology that receives all the output tuples and tests these against the expected values. Once the TestBolt value is submitted to the cluster, it is no longer accessible from the test; therefore, the outputs can only be accessed through persistence. TestBolt persists received tuples to Redis, where the test case can read and validate them. The logic within TestBolt is as follows: public void execute(Tuple input) { List objects = input.getValues(); objects.add(0, input.getSourceComponent()); jedis.rpush("TestTuple", JSONArray.toJSONString(objects)); } This is then read by the test and validated against the expected values: String data = jedis.rpop("TestTuple"); while(data != null){ JSONArray values = (JSONArray) JSONValue.parse(data); 然后读取这些值,并与我们所期望的值进行比较: Setting Up Your Development Environment 48 assertEquals(expected[0], values.get(1).toString().toUpperCase()); assertEquals(expected[1], values.get(2)); assertEquals(expected[2], values.get(3).toString().toUpperCase()); assertEquals(expected[3], values.get(4)); } else if(values.get(0).toString(). contains("totalStats")) { count++; assertEquals(expected[4], values.get(1)); assertEquals(expected[5], values.get(2)); } data = jedis.rpop("TestTuple"); } assertEquals(2, count); } How it works… The integration test works by creating a local cluster and then injecting input values into the cluster through Redis, in the same way as a real web server would for the given design. It then adds a specific testing bolt to the end of the topology that receives all the output tuples and tests these against the expected values. Once the TestBolt value is submitted to the cluster, it is no longer accessible from the test; therefore, the outputs can only be accessed through persistence. TestBolt persists received tuples to Redis, where the test case can read and validate them. The logic within TestBolt is as follows: public void execute(Tuple input) { List objects = input.getValues(); objects.add(0, input.getSourceComponent()); jedis.rpush("TestTuple", JSONArray.toJSONString(objects)); } This is then read by the test and validated against the expected values: String data = jedis.rpop("TestTuple"); while(data != null){ JSONArray values = (JSONArray) JSONValue.parse(data); Chapter 1 49 if(values.get(0).toString().contains("geoStats")){ count++; assertEquals(expected[0], values.get(1) .toString().toUpperCase()); assertEquals(expected[1], values.get(2)); assertEquals(expected[2], values.get(3) .toString().toUpperCase()); assertEquals(expected[3], values.get(4)); } else if(values.get(0).toString().contains("totalStats")) { count++; assertEquals(expected[4], values.get(1)); assertEquals(expected[5], values.get(2)); } data = jedis.rpop("TestTuple"); } assertEquals(2, count); } Deploying to the cluster The final step in the development process is to functionally test the topology in a cluster before promoting it to the next environment. How to do it… 1. First you need to configure the Storm client on your host development machine by creating the .storm folder in your user home directory. Create storm.yaml in this folder with the following content: storm.local.dir: "/mnt/storm" nimbus.host: "192.168.33.100" 2. Package your topology using the following command within the project's root: mvn package 3. This will produce a completely packaged JAR in the target folder of the project. You can deploy this to the cluster using the storm client command: storm jar jarName.jar [TopologyName] [Args] 1.10 将产品部署到集群   37 Chapter 1 49 if(values.get(0).toString().contains("geoStats")){ count++; assertEquals(expected[0], values.get(1) .toString().toUpperCase()); assertEquals(expected[1], values.get(2)); assertEquals(expected[2], values.get(3) .toString().toUpperCase()); assertEquals(expected[3], values.get(4)); } else if(values.get(0).toString().contains("totalStats")) { count++; assertEquals(expected[4], values.get(1)); assertEquals(expected[5], values.get(2)); } data = jedis.rpop("TestTuple"); } assertEquals(2, count); } Deploying to the cluster The final step in the development process is to functionally test the topology in a cluster before promoting it to the next environment. How to do it… 1. First you need to configure the Storm client on your host development machine by creating the .storm folder in your user home directory. Create storm.yaml in this folder with the following content: storm.local.dir: "/mnt/storm" nimbus.host: "192.168.33.100" 2. Package your topology using the following command within the project's root: mvn package 3. This will produce a completely packaged JAR in the target folder of the project. You can deploy this to the cluster using the storm client command: storm jar jarName.jar [TopologyName] [Args] 1.10 将产品部署到集群 在完成整个开发过程之前,我们需要对集群中的 Topology 进行功能性测试,然后再将其 部署到下个环境中。 1.10.1 实战 Step01 首先需要在主机开发环境中配置 Storm 客户端,在用户主目录下创建 .storm 目录。 然后在这个目录下新建 storm.yaml,内容如下: Chapter 1 49 if(values.get(0).toString().contains("geoStats")){ count++; assertEquals(expected[0], values.get(1) .toString().toUpperCase()); assertEquals(expected[1], values.get(2)); assertEquals(expected[2], values.get(3) .toString().toUpperCase()); assertEquals(expected[3], values.get(4)); } else if(values.get(0).toString().contains("totalStats")) { count++; assertEquals(expected[4], values.get(1)); assertEquals(expected[5], values.get(2)); } data = jedis.rpop("TestTuple"); } assertEquals(2, count); } Deploying to the cluster The final step in the development process is to functionally test the topology in a cluster before promoting it to the next environment. How to do it… 1. First you need to configure the Storm client on your host development machine by creating the .storm folder in your user home directory. Create storm.yaml in this folder with the following content: storm.local.dir: "/mnt/storm" nimbus.host: "192.168.33.100" 2. Package your topology using the following command within the project's root: mvn package 3. This will produce a completely packaged JAR in the target folder of the project. You can deploy this to the cluster using the storm client command: storm jar jarName.jar [TopologyName] [Args] Step02 在项目根目录下,使用以下命令打包 Topology: Chapter 1 49 if(values.get(0).toString().contains("geoStats")){ count++; assertEquals(expected[0], values.get(1) .toString().toUpperCase()); assertEquals(expected[1], values.get(2)); assertEquals(expected[2], values.get(3) .toString().toUpperCase()); assertEquals(expected[3], values.get(4)); } else if(values.get(0).toString().contains("totalStats")) { count++; assertEquals(expected[4], values.get(1)); assertEquals(expected[5], values.get(2)); } data = jedis.rpop("TestTuple"); } assertEquals(2, count); } Deploying to the cluster The final step in the development process is to functionally test the topology in a cluster before promoting it to the next environment. How to do it… 1. First you need to configure the Storm client on your host development machine by creating the .storm folder in your user home directory. Create storm.yaml in this folder with the following content: storm.local.dir: "/mnt/storm" nimbus.host: "192.168.33.100" 2. Package your topology using the following command within the project's root: mvn package 3. This will produce a completely packaged JAR in the target folder of the project. You can deploy this to the cluster using the storm client command: storm jar jarName.jar [TopologyName] [Args] Step03 这样就会在项目的目标目录下生成一个完整并打包好的 JAR 文件。这时就可以用 storm 客户端命令将 JAR 包部署到集群上: Chapter 1 49 if(values.get(0).toString().contains("geoStats")){ count++; assertEquals(expected[0], values.get(1) .toString().toUpperCase()); assertEquals(expected[1], values.get(2)); assertEquals(expected[2], values.get(3) .toString().toUpperCase()); assertEquals(expected[3], values.get(4)); } else if(values.get(0).toString().contains("totalStats")) { count++; assertEquals(expected[4], values.get(1)); assertEquals(expected[5], values.get(2)); } data = jedis.rpop("TestTuple"); } assertEquals(2, count); } Deploying to the cluster The final step in the development process is to functionally test the topology in a cluster before promoting it to the next environment. How to do it… 1. First you need to configure the Storm client on your host development machine by creating the .storm folder in your user home directory. Create storm.yaml in this folder with the following content: storm.local.dir: "/mnt/storm" nimbus.host: "192.168.33.100" 2. Package your topology using the following command within the project's root: mvn package 3. This will produce a completely packaged JAR in the target folder of the project. You can deploy this to the cluster using the storm client command: storm jar jarName.jar [TopologyName] [Args] 1.10.2 解析 Storm 命令行客户端为你提供了所有控制集群功能的工具。其中一个功能就是部署打包 好的 Topology。如果想了解更多有关 storm CLI 的信息,可以阅读 Storm wiki 上提供的详细 文档,地址是 https://github.com/nathanmarz/storm/wiki/Command-line-client。 第 2 章 日志流处理 2.1 简介 本章将会展示一个企业日志存储系统的实现方法,以及一个基于 Storm 处理系统的搜索 分析解决方案。其实日志数据处理已经不再是一个需要解决的问题了,但它依然能够帮助我 们加深理解这些新概念。 数据流处理在现代企业中是一个主要的架构关注点。但通常情况下,最理想的数据流也 只是半结构化的。本章展示了一个企业日志的处理方案,目的是为了让读者学习了解各种重 要概念,获得处理各种类型数据的能力。由于日志数据量庞大,因此日志数据也为学术研究 提供了极大的便利。对于任何流处理或分析工作来说,其成功的关键都在于深入理解实际数 据和获取数据这两项因素上,而这往往非常困难。 因此,如何将架构设计蓝图运用到企业其他形式的数据中,才是读者需要重点关注的 问题。 图 2-1 展示了我们将在本章中介绍的基本概念。 日志 文件 Logstash 代理 Redis 列表 Spout 日志 规则 索引器 索引器 Kubana 索引器 Elastic Search Cassandra 图 2-1  架构设计图 你将学习如何建立日志代理,它可以部署到你环境中的任意节点上。你也将学到如何使 用 Storm 和 Redis 集中式地收集、分析、索引和统计日志,以便日后搜索和展示它们的基本 统计信息。 2.2 创建日志代理 现代企业架构由大量的解决方案组成,而每个方案都包含许多节点。有些 MapReduce 集 群可以包含几百个节点。在操作系统和应用层上,每个节点又包含一组应用和服务。这些服 务和应用又会产生大量不同类型的日志数据。如今人们已经越来越认识到日志数据在企业团 2.2 创建日志代理   39 体中的重要性,原因如下: ‰ 对于 IT 运营团队维护系统运行来说,它是一种关键的信息来源。 ‰ 它对于在生产和系统测试阶段中发现问题和解决问题至关重要。 ‰ 它越来越成为一种商业价值来源,有价值的业务数据都包含在这种半结构化数据中, 包括: ‰ 风险和合规性数据 ‰ 业务运营数据 ‰ 网站分析数据 ‰ 安全信息 ‰ 财务预测数据 为了利用这些具有价值的日志数据,我们必须先从这些节点中获取数据,然后再把数据 安全简单地传送到中央日志服务器中进行存储、索引和分析。本节将通过名为 logstash 的开 源日志代理阐述实现这一过程的方法。 可以找到很多好的商业和开源的日志解决方案。本章只涉及 logstash 的一 部分。欲了解更多 logstash 技巧,可以访问 http://cookbook.logstash.net/ 和 http://logstash.net/docs/1.1.13/tutorials/getting-started-centralized。另外还有 一个与 logstash 类似的商业版本叫做 Splunk(http://www.splunk.com/)。 2.2.1 实战 Step01 首先,将本地节点上的日志导入 Topology。按照下面的链接下载和配置 logstash: Chapter 2 53 There are many good commercial and open source log solutions available. This chapter uses portions of logstash; further logstash recipes can be found at http://cookbook.logstash.net/ and http://logstash.net/ docs/1.1.13/tutorials/getting-started-centralized. A good commercial equivalent is Splunk (http://www.splunk.com/). How to do it… 1. To start, the logs on your local node will be streamed into the topology. Start by downloading and configuring logstash as follows: wget https://logstash.objects.dreamhost.com/release/logstash- 1.1.7-monolithic.jar 2. Then, using your favorite text editor, create a file called shipper.conf containing the following: input { file { type => "syslog" # Wildcards work here :) path => [ "/var/log/messages", "/var/log/system.*", "/var/ log/*.log" ] } } output { # Output events to stdout for debugging. Feel free to remove # this output if you don't need it. stdout { } redis { host => "localhost" data_type => "list" key => "rawLogs" } } 3. After starting a local instance of Redis, you can start this logging agent by issuing the following command: java –jar logstash-1.1.7-monolithic.jar –f shipper.conf Step02 然后,使用文本编辑器创建一个名为 shipper.conf 的文件,文件内容如下: Chapter 2 53 There are many good commercial and open source log solutions available. This chapter uses portions of logstash; further logstash recipes can be found at http://cookbook.logstash.net/ and http://logstash.net/ docs/1.1.13/tutorials/getting-started-centralized. A good commercial equivalent is Splunk (http://www.splunk.com/). How to do it… 1. To start, the logs on your local node will be streamed into the topology. Start by downloading and configuring logstash as follows: wget https://logstash.objects.dreamhost.com/release/logstash- 1.1.7-monolithic.jar 2. Then, using your favorite text editor, create a file called shipper.conf containing the following: input { file { type => "syslog" # Wildcards work here :) path => [ "/var/log/messages", "/var/log/system.*", "/var/ log/*.log" ] } } output { # Output events to stdout for debugging. Feel free to remove # this output if you don't need it. stdout { } redis { host => "localhost" data_type => "list" key => "rawLogs" } } 3. After starting a local instance of Redis, you can start this logging agent by issuing the following command: java –jar logstash-1.1.7-monolithic.jar –f shipper.conf # 这里可以使用通配符 :) # 供调试使用,通过标准输出打印事件。如果不需要可以删除这个输出信息。 40   第2章 日志流处理 Chapter 2 53 There are many good commercial and open source log solutions available. This chapter uses portions of logstash; further logstash recipes can be found at http://cookbook.logstash.net/ and http://logstash.net/ docs/1.1.13/tutorials/getting-started-centralized. A good commercial equivalent is Splunk (http://www.splunk.com/). How to do it… 1. To start, the logs on your local node will be streamed into the topology. Start by downloading and configuring logstash as follows: wget https://logstash.objects.dreamhost.com/release/logstash- 1.1.7-monolithic.jar 2. Then, using your favorite text editor, create a file called shipper.conf containing the following: input { file { type => "syslog" # Wildcards work here :) path => [ "/var/log/messages", "/var/log/system.*", "/var/ log/*.log" ] } } output { # Output events to stdout for debugging. Feel free to remove # this output if you don't need it. stdout { } redis { host => "localhost" data_type => "list" key => "rawLogs" } } 3. After starting a local instance of Redis, you can start this logging agent by issuing the following command: java –jar logstash-1.1.7-monolithic.jar –f shipper.conf Step03 启动 Redis 本地实例后,执行以下命令启动日志代理: java –jar logstash-1.1.7-monolithic.jar agent–f shipper.conf 2.2.2 解析 logstash 使用可扩展插件列表实现了一个非常简单的“输入 - 过滤器 - 输出”(input-filter- output)模型,并基于该模型的三种元素不断进行扩展。配置文件(shipper.conf)至少配置了 一组输入值和输出值。 文件输入插件根据指定路径中的文件名或通配符来跟踪(tail)文件的修改。你可以配置 很多不同类型的文件输入。日志类型在后期处理和分类时尤为重要。由于我们没有在配置文 件中配置任何过滤器,因此原始日志将直接被传输到输出插件。基于 Redis 的输出插件将日 志输出到本地 Redis 实例中,然后存储到名为 rawLogs 的列表里。 你可以轻松地在设备节点中安装和配置 logstash,包括密匙交换,这样你就 可以通过任意传输机制安全地传输日志了。 2.3 创建日志 Spout 日志 Topology 通过 Redis 通道读取所有 logstash 产生的日志,这些日志数据会通过本章 介绍的 Spout 发送到 Topology 中。由于这是一个全新的 Topology,因此我们先来新建一个 Topology 项目。 2.3.1 实战 我们先来创建项目目录和标准的 Maven 目录结构(标准结构可以参考:http://maven. apache.org/guides/introduction/introduction-to-the-standard-directory-layout.html)。 Step01 参照第 1 章中创建“Hello World” Topology 的方法创建 POM 文件,修改 标签,然后添加以下依赖项: Log Stream Processing 54 How it works… logstash implements a very simple model of input-filter-output with an ever-expanding list of plugins for any of these three elements. The preceding configuration file (shipper.conf) configures at least one input and output. The file input plugin will tail files based on filenames or wildcards in the specified paths. Many file inputs can be configured, each with a different type. The log type is important for later processing and categorization. As we have not configured any filters in this configuration file, the raw log will be passed to the output plugin. The output plugin is the Redis plugin that will output the log to the Redis instance on localhost to a list called rawLogs. logstash can easily be included in the baseline provisioning of each node provisioned on your infrastructure, including key exchange for secure log delivery via any transport mechanism you are comfortable with. Creating the log spout The log topology will read all logs through the Redis channel that is fed by logstash; these logs will be emitted into the topology through the spout described in this recipe. As this is a new topology, we must first create the new topology project. How to do it… Start by creating the project directory and the standard Maven folder structure (http://maven.apache.org/guides/introduction/introduction-to-the- standard-directory-layout.html). 1. Create the POM as per the Creating a "Hello World" topology recipe in Chapter 1, Setting Up Your Development Environment, updating the and tag values and including the following dependencies: junit junit 4.11 test org.slf4j slf4j-log4j12 1.6.1 2.3 创建日志Spout   41 Log Stream Processing 54 How it works… logstash implements a very simple model of input-filter-output with an ever-expanding list of plugins for any of these three elements. The preceding configuration file (shipper.conf) configures at least one input and output. The file input plugin will tail files based on filenames or wildcards in the specified paths. Many file inputs can be configured, each with a different type. The log type is important for later processing and categorization. As we have not configured any filters in this configuration file, the raw log will be passed to the output plugin. The output plugin is the Redis plugin that will output the log to the Redis instance on localhost to a list called rawLogs. logstash can easily be included in the baseline provisioning of each node provisioned on your infrastructure, including key exchange for secure log delivery via any transport mechanism you are comfortable with. Creating the log spout The log topology will read all logs through the Redis channel that is fed by logstash; these logs will be emitted into the topology through the spout described in this recipe. As this is a new topology, we must first create the new topology project. How to do it… Start by creating the project directory and the standard Maven folder structure (http://maven.apache.org/guides/introduction/introduction-to-the- standard-directory-layout.html). 1. Create the POM as per the Creating a "Hello World" topology recipe in Chapter 1, Setting Up Your Development Environment, updating the and tag values and including the following dependencies: junit junit 4.11 test org.slf4j slf4j-log4j12 1.6.1 Chapter 2 55 org.jmock jmock-legacy 2.5.1 test storm storm 0.8.1 provided slf4j-api org.slf4j com.googlecode.json-simple json-simple 1.1 redis.clients jedis 2.1.0 commons-httpclient commons-httpclient 3.1 org.jmock jmock-junit4 2.5.1 test com.github.ptgoetz storm-cassandra 0.3.1-SNAPSHOT 42   第2章 日志流处理 Chapter 2 55 org.jmock jmock-legacy 2.5.1 test storm storm 0.8.1 provided slf4j-api org.slf4j com.googlecode.json-simple json-simple 1.1 redis.clients jedis 2.1.0 commons-httpclient commons-httpclient 3.1 org.jmock jmock-junit4 2.5.1 test com.github.ptgoetz storm-cassandra 0.3.1-SNAPSHOT Log Stream Processing 56 org.elasticsearch elasticsearch 0.20.2 org.drools drools-core 5.5.0.Final org.drools drools-compiler 5.5.0.Final 2. Import the project into Eclipse after generating the Eclipse project files as follows: mvn eclipse:eclipse 3. Tuples in the log topology will carry a log domain object that encapsulates the data and parsing logic for a single log record or an entry in a logfile. In the created project, create this domain object: public class LogEntry { public static Logger LOG = Logger.getLogger(LogEntry.class); private String source; private String type; private List tags = new ArrayList(); private Map fields = new HashMap(); private Date timestamp; private String sourceHost; private String sourcePath; private String message = ""; private boolean filter = false; private NotificationDetails notifyAbout = null; private static String[] FORMATS = new String[]{"yyyy-MM- dd'T'HH:mm:ss.SSS", "yyyy.MM.dd G 'at' HH:mm:ss z", "yyyyy.MMMMM.dd GGG hh:mm aaa", "EEE, d MMM yyyy HH:mm:ss Z", "yyMMddHHmmssZ"}; Step02 通过以下命令生成 Eclipse 工程文件,然后将工程文件导入 Eclipse: Log Stream Processing 56 org.elasticsearch elasticsearch 0.20.2 org.drools drools-core 5.5.0.Final org.drools drools-compiler 5.5.0.Final 2. Import the project into Eclipse after generating the Eclipse project files as follows: mvn eclipse:eclipse 3. Tuples in the log topology will carry a log domain object that encapsulates the data and parsing logic for a single log record or an entry in a logfile. In the created project, create this domain object: public class LogEntry { public static Logger LOG = Logger.getLogger(LogEntry.class); private String source; private String type; private List tags = new ArrayList(); private Map fields = new HashMap(); private Date timestamp; private String sourceHost; private String sourcePath; private String message = ""; private boolean filter = false; private NotificationDetails notifyAbout = null; private static String[] FORMATS = new String[]{"yyyy-MM- dd'T'HH:mm:ss.SSS", "yyyy.MM.dd G 'at' HH:mm:ss z", "yyyyy.MMMMM.dd GGG hh:mm aaa", "EEE, d MMM yyyy HH:mm:ss Z", "yyMMddHHmmssZ"}; Step03 日志 Topology 中的 Tuple 会携带日志域对象,该对象封装了数据和解析日志文件 中单个记录的逻辑。在我们的项目中创建这个域对象: Log Stream Processing 56 org.elasticsearch elasticsearch 0.20.2 org.drools drools-core 5.5.0.Final org.drools drools-compiler 5.5.0.Final 2. Import the project into Eclipse after generating the Eclipse project files as follows: mvn eclipse:eclipse 3. Tuples in the log topology will carry a log domain object that encapsulates the data and parsing logic for a single log record or an entry in a logfile. In the created project, create this domain object: public class LogEntry { public static Logger LOG = Logger.getLogger(LogEntry.class); private String source; private String type; private List tags = new ArrayList(); private Map fields = new HashMap(); private Date timestamp; private String sourceHost; private String sourcePath; private String message = ""; private boolean filter = false; private NotificationDetails notifyAbout = null; private static String[] FORMATS = new String[]{"yyyy-MM- dd'T'HH:mm:ss.SSS", "yyyy.MM.dd G 'at' HH:mm:ss z", "yyyyy.MMMMM.dd GGG hh:mm aaa", "EEE, d MMM yyyy HH:mm:ss Z", "yyMMddHHmmssZ"}; Chapter 2 57 @SuppressWarnings("unchecked") public LogEntry(JSONObject json){ source = (String)json.get("@source"); timestamp = parseDate((String)json.get("@timestamp")); sourceHost = (String)json.get("@source_host"); sourcePath = (String)json.get("@source_path"); message = (String)json.get("@message"); type = (String)json.get("@type"); JSONArray array = (JSONArray)json.get("@tags"); tags.addAll(array); JSONObject fields = (JSONObject)json.get("@fields"); fields.putAll(fields); } public Date parseDate(String value){ SimpleDateFormat format = new SimpleDateFormat(FORMATS[i]); for(int i = 0; i < FORMATS.length; i++){ Date temp; try { temp = format.parse(value); if(temp != null) return temp; } catch (ParseException e) {} } LOG.error("Could not parse timestamp for log"); return null; } @SuppressWarnings("unchecked") public JSONObject toJSON(){ JSONObject json = new JSONObject(); json.put("@source", source); json.put("@timestamp",DateFormat .getDateInstance().format(timestamp)); json.put("@source_host",sourceHost); json.put("@source_path",sourcePath); 2.3 创建日志Spout   43 Chapter 2 57 @SuppressWarnings("unchecked") public LogEntry(JSONObject json){ source = (String)json.get("@source"); timestamp = parseDate((String)json.get("@timestamp")); sourceHost = (String)json.get("@source_host"); sourcePath = (String)json.get("@source_path"); message = (String)json.get("@message"); type = (String)json.get("@type"); JSONArray array = (JSONArray)json.get("@tags"); tags.addAll(array); JSONObject fields = (JSONObject)json.get("@fields"); fields.putAll(fields); } public Date parseDate(String value){ SimpleDateFormat format = new SimpleDateFormat(FORMATS[i]); for(int i = 0; i < FORMATS.length; i++){ Date temp; try { temp = format.parse(value); if(temp != null) return temp; } catch (ParseException e) {} } LOG.error("Could not parse timestamp for log"); return null; } @SuppressWarnings("unchecked") public JSONObject toJSON(){ JSONObject json = new JSONObject(); json.put("@source", source); json.put("@timestamp",DateFormat .getDateInstance().format(timestamp)); json.put("@source_host",sourceHost); json.put("@source_path",sourcePath);Log Stream Processing 58 json.put("@message",message); json.put("@type",type); JSONArray temp = new JSONArray(); temp.addAll(tags); json.put("@tags", temp); JSONObject fieldTemp = new JSONObject(); fieldTemp.putAll(fields); json.put("@fields",fieldTemp); return json; } ... The getter, setter, and equals methods have been excluded from this code snippet; however, they must be implemented in order. The equals method is vital for unit testing purposes. 4. Then create the Logspout class that extends the BaseRichSpout interface and implements the same pattern as described in Chapter 1, Setting Up Your Development Environment, declaring a single field as follows: outputFieldsDeclarer.declare(new Fields(FieldNames.LOG_ENTRY)); And then emitting the received log entries into the topology as follows: public void nextTuple() { String content = jedis.rpop(LOG_CHANNEL); if(content==null || "nil".equals(content)) { //sleep to prevent starving other threads try { Thread.sleep(300); } catch (InterruptedException e) {} } else { JSONObject obj=(JSONObject) JSONValue.parse(content); LogEntry entry = new LogEntry(obj); collector.emit(new Values(entry)); } } Literals should be avoided in the code as far as possible; tuples allow for effective runtime coupling; however, peppering code with field name literals for elements that are effectively coupled prior to runtime doesn't add any value. Hence the usage of static field name definitions. 这个代码片段中并没有包含 getter、setter 和 equals 方法,但还是有必要依次 实现它们。对单元测试来说,equals 方法有很大用处。 Step04 接下来创建 Logspout 类,并继承 BaseRichSpout 接口。参照第 1 章的方法实现这 个类,声明下面这个字段: Log Stream Processing 58 json.put("@message",message); json.put("@type",type); JSONArray temp = new JSONArray(); temp.addAll(tags); json.put("@tags", temp); JSONObject fieldTemp = new JSONObject(); fieldTemp.putAll(fields); json.put("@fields",fieldTemp); return json; } ... The getter, setter, and equals methods have been excluded from this code snippet; however, they must be implemented in order. The equals method is vital for unit testing purposes. 4. Then create the Logspout class that extends the BaseRichSpout interface and implements the same pattern as described in Chapter 1, Setting Up Your Development Environment, declaring a single field as follows: outputFieldsDeclarer.declare(new Fields(FieldNames.LOG_ENTRY)); And then emitting the received log entries into the topology as follows: public void nextTuple() { String content = jedis.rpop(LOG_CHANNEL); if(content==null || "nil".equals(content)) { //sleep to prevent starving other threads try { Thread.sleep(300); } catch (InterruptedException e) {} } else { JSONObject obj=(JSONObject) JSONValue.parse(content); LogEntry entry = new LogEntry(obj); collector.emit(new Values(entry)); } } Literals should be avoided in the code as far as possible; tuples allow for effective runtime coupling; however, peppering code with field name literals for elements that are effectively coupled prior to runtime doesn't add any value. Hence the usage of static field name definitions. 44   第2章 日志流处理 然后发送接收到的日志记录到 Topology 中,代码如下: Log Stream Processing 58 json.put("@message",message); json.put("@type",type); JSONArray temp = new JSONArray(); temp.addAll(tags); json.put("@tags", temp); JSONObject fieldTemp = new JSONObject(); fieldTemp.putAll(fields); json.put("@fields",fieldTemp); return json; } ... The getter, setter, and equals methods have been excluded from this code snippet; however, they must be implemented in order. The equals method is vital for unit testing purposes. 4. Then create the Logspout class that extends the BaseRichSpout interface and implements the same pattern as described in Chapter 1, Setting Up Your Development Environment, declaring a single field as follows: outputFieldsDeclarer.declare(new Fields(FieldNames.LOG_ENTRY)); And then emitting the received log entries into the topology as follows: public void nextTuple() { String content = jedis.rpop(LOG_CHANNEL); if(content==null || "nil".equals(content)) { //sleep to prevent starving other threads try { Thread.sleep(300); } catch (InterruptedException e) {} } else { JSONObject obj=(JSONObject) JSONValue.parse(content); LogEntry entry = new LogEntry(obj); collector.emit(new Values(entry)); } } Literals should be avoided in the code as far as possible; tuples allow for effective runtime coupling; however, peppering code with field name literals for elements that are effectively coupled prior to runtime doesn't add any value. Hence the usage of static field name definitions. 应尽量避免直接在代码中使用字符串。虽说 Tuple 能在运行时高效地处理这些 局部变量,但在编译链接的时候,初始化带有字符串元素的代码并没有任何意 义。因此,我们还是建议使用静态变量定义代替这种直接使用字符串的方法。 2.3.2 解析 Redis Spout 的实现我们已经熟谙于心,本节重点阐述 LogEntry 类域对象中的解析逻辑。 logstash 会以单个 JSON 值的形式向 Redis 通道发送日志。这些 JSON 值的格式如下所示: Chapter 2 59 How it works… The Redis spout implementation is already familiar; the key logic implemented in this recipe is the parsing logic within the domain object of the LogEntry class. logstash submits log lines as separate JSON values into the Redis channel. These JSON values are in the following format: { "@source":"file://PATH", "@tags":[], "@fields":{}, "@timestamp":"yyyy-MM-ddThh:mm:ss.SSS", "@source_host":"hostname", "@source_path":"path", "@message":"Syslog log line contents", "@type":"syslog" } There's more… The JSON format provides for two key structures, namely the JSON object and the JSON array. The JSON website (www.json.org) provides a concise definition of each and has also been provided here for the sake of convenience. An object is an unordered set of name/value pairs. An object begins with { (left brace) and ends with } (right brace). Each name is followed by : (colon) and the name/value pairs are separated by , (comma). object , string value: }} An array is an ordered collection of values. An array begins with [ (left bracket) and ends with ] (right bracket). The values are separated by , (comma). array , value[ [ A value can be a string in double quotes; a number; true, false, or null; an object; or an array. These structures can be nested. 2.3.3 补充内容 JSON 格式包含两种关键结构,分别是 JSON 对象(JSON Object)和 JSON 数组(JSON Array)。 JSON 主页(www.json.org)提供了针对这两种结构的简要定义,为了方便起见,我 们就直接在这里罗列出这两种结构的定义。对象是一个无序的名 / 值对(name/value pair)集 合。一个对象以“{”(左括号)开始,以“}”(右括号)结束。每个名称后跟一个“:”(冒号), 名 / 值对之间使用“,”(逗号)分隔,如图 2-2 所示。 // 休眠,避免发生线程饿死 2.4 基于规则的日志流分析   45 对象(object) 字符串 值 图 2-2  JSON 对象 数组是值(value)的有序集合。一个数组以“ [”(左中括号)开始,以“ ]”(右中括号) 结束。值之间使用“,”(逗号)分隔,如图 2-3 所示。 值 数组(array) 图 2-3  JSON 数组 值(value)可以是双引号括起来的字符串(string)、数值 (number)、true、false、 null、 对象(object)或者数组(array)。这些结构可以嵌套。 LogEntry 对象的构造函数包含一个 JSONObject 对象作为参数,并根据该对象的成员对 自身进行初始化。可以通过 toJSON() 方法把 LogEntry 对象转换成对应的 JSONObject 对象, 以备不时之需。LogEntry 通过 com.googlecode.json-simple 库中的工具方法,将字符串转换 成可用的 JSON 结构。 虽然结构定义清晰,但“日期 - 时间”却有多种不同的格式。parseDate() 方法采取最优 的方法解析各种日期格式。FORMATS 类成员变量定义了支持的“日期 - 时间”格式。 2.4 基于规则的日志流分析 任何合理的日志管理系统都需要具备以下特性: (1)过滤无关紧要的日志信息,无须对这类日志进行统计和排序。这类日志常常包含 INFO 或 DEBUG 等级的日志记录(没错,产品系统中也会包含这些信息 )。 (2)深入分析日志记录并提取更多有价值的信息和新的字段。 (3)在保存日志之前增改日志记录。 (4)当收到特定日志记录时发送通知信息。 (5)通过关联日志事件来获取有价值的信息。 (6)应对日志结构和格式的变更。 本节在 Bolt 中集成了 JBoss 库和 Drools,以便于我们通过声明和清晰的方式,轻松实现 46   第2章 日志流处理 上面列举出的这些目标。Drools 是一个基于正向推理的规则引擎的开源实现,它可以推断新 的值并根据匹配逻辑执行相关逻辑操作。你可以在这个网站上找到 Drools 项目的更多信息: http://www.jboss.org/drools/。 2.4.1 实战 Step01 使用 Eclipse,在 storm.cookbook.log 包中创建 LogRulesBolt 类,并继承 BaseRichBolt。 与 LogSpout 类一样,LogRulesBolt 类会发送一个包含 LogEntry 实例的值到 Topology 中。 Log Stream Processing 60 The constructor of the LogEntry object takes JSONObject as the only parameter and initializes its internal values based on the contained values. The LogEntry object is also able to convert itself into a JSONObject through the toJSON() method, which will become useful later. LogEntry makes heavy use of the com.googlecode.json-simple library in order to achieve the first level of parsing from string to a workable structure. Although the structure is well-defined, the date-time format can vary. The parseDate() method, therefore, provides a best-effort approach to parse the date. A static list of supported date-time formats is defined as the FORMATS class member variable. Rule-based analysis of the log stream Any reasonable log management system needs to be able to achieve the following: f Filter logs that aren't important, and therefore should not be counted or stored. These often include log entries at the INFO or DEBUG levels (yes, these exist in production systems). f Analyze the log entry further and extract as much meaning and new fields as possible. f Enhance/update the log entry prior to storage. f Send notifications on when certain logs are received. f Correlate log events to derive new meaning. f Deal with changes in the log's structure and formatting. This recipe integrates the JBoss Library and Drools into a bolt to make these goals easily achievable in a declarative and clear manner. Drools is an open source implementation of a forward-chaining rules engine that is able to infer new values and execute the logic based on matching logic. You can find more details on the Drools project at http://www.jboss.org/drools/. How to do it… 1. Within Eclipse, create a class called LogRulesBolt, which extends BaseRichBolt, within the storm.cookbook.log package. As with the LogSpout class, the LogRulesBolt class will emit a single value containing a LogEntry instance. declarer.declare(new Fields(FieldNames.LOG_ENTRY)); 2. Add a private member-level variable ksession of the StatelessKnowledgeSession class and initialize it within the bolt's prepare method. KnowledgeBuilder kbuilder = KnowledgeBuilderFactory. newKnowledgeBuilder(); Step02 在 StatelessKnowledgeSession 类中添加一个私有成员变量 kssession,并在 Bolt 的 prepare 方法中对其进行初始化。 Log Stream Processing 60 The constructor of the LogEntry object takes JSONObject as the only parameter and initializes its internal values based on the contained values. The LogEntry object is also able to convert itself into a JSONObject through the toJSON() method, which will become useful later. LogEntry makes heavy use of the com.googlecode.json-simple library in order to achieve the first level of parsing from string to a workable structure. Although the structure is well-defined, the date-time format can vary. The parseDate() method, therefore, provides a best-effort approach to parse the date. A static list of supported date-time formats is defined as the FORMATS class member variable. Rule-based analysis of the log stream Any reasonable log management system needs to be able to achieve the following: f Filter logs that aren't important, and therefore should not be counted or stored. These often include log entries at the INFO or DEBUG levels (yes, these exist in production systems). f Analyze the log entry further and extract as much meaning and new fields as possible. f Enhance/update the log entry prior to storage. f Send notifications on when certain logs are received. f Correlate log events to derive new meaning. f Deal with changes in the log's structure and formatting. This recipe integrates the JBoss Library and Drools into a bolt to make these goals easily achievable in a declarative and clear manner. Drools is an open source implementation of a forward-chaining rules engine that is able to infer new values and execute the logic based on matching logic. You can find more details on the Drools project at http://www.jboss.org/drools/. How to do it… 1. Within Eclipse, create a class called LogRulesBolt, which extends BaseRichBolt, within the storm.cookbook.log package. As with the LogSpout class, the LogRulesBolt class will emit a single value containing a LogEntry instance. declarer.declare(new Fields(FieldNames.LOG_ENTRY)); 2. Add a private member-level variable ksession of the StatelessKnowledgeSession class and initialize it within the bolt's prepare method. KnowledgeBuilder kbuilder = KnowledgeBuilderFactory. newKnowledgeBuilder(); Chapter 2 61 kbuilder.add( ResourceFactory.newClassPathResource( "/Syslog.drl", getClass() ), ResourceType.DRL ); if ( kbuilder.hasErrors() ) { LOG.error( kbuilder.getErrors().toString() ); } KnowledgeBase kbase = KnowledgeBaseFactory. newKnowledgeBase(); kbase.addKnowledgePackages( kbuilder.getKnowledgePackages() ); ksession = kbase.newStatelessKnowledgeSession(); The initialization of this knowledge session includes only a single set of rules for the syslog logs. It is recommended that rules management be extracted out into Drools Guvnor, or similar, and rules resources be retrieved via an agent. This is outside the scope of this book but more details are available at the following link: http://www.jboss.org/drools/drools-guvnor.html 3. In the bolt's execute method, you need to pass the LogEntry object from the tuple into the knowledge session. LogEntry entry = (LogEntry)input.getValueByField(FieldNames.LOG_ ENTRY); if(entry == null){ LOG.fatal( "Received null or incorrect value from tuple" ); return; } ksession.execute( entry ); if(!entry.isFilter()){ collector.emit(new Values(entry)); } 4. You next need to create the rules resource file; this can simply be done with a text editor or using the Eclipse plugin available from the update site (http:// download.jboss.org/drools/release/5.5.0.Final/org.drools. updatesite/). The rules resource file should be placed at the root of the classpath; create the file named Syslog.drl in src/main/resources and add this folder to the build path within Eclipse by right-clicking on the folder and going to Build Path | Use as source folder. 5. Add the following content to the rules resource: package storm.cookbook.log.rules import storm.cookbook.log.model.LogEntry; 该知识会话只初始化了一组 syslog 日志规则。建议把规则管理的工作交给 Drools Guvnor 或者类似的软件,然后通过代理来获取有关规则资源。这个 主题已经超出了本书的讨论范围,更多信息可以参考以下链接: http://www. jboss.org/drools/drools-guvnor.html。 Step03 在 Bolt 的 execute 方法中实现这个逻辑:把 Tuple 中的 LogEntry 对象传递到知识会话中。 Chapter 2 61 kbuilder.add( ResourceFactory.newClassPathResource( "/Syslog.drl", getClass() ), ResourceType.DRL ); if ( kbuilder.hasErrors() ) { LOG.error( kbuilder.getErrors().toString() ); } KnowledgeBase kbase = KnowledgeBaseFactory. newKnowledgeBase(); kbase.addKnowledgePackages( kbuilder.getKnowledgePackages() ); ksession = kbase.newStatelessKnowledgeSession(); The initialization of this knowledge session includes only a single set of rules for the syslog logs. It is recommended that rules management be extracted out into Drools Guvnor, or similar, and rules resources be retrieved via an agent. This is outside the scope of this book but more details are available at the following link: http://www.jboss.org/drools/drools-guvnor.html 3. In the bolt's execute method, you need to pass the LogEntry object from the tuple into the knowledge session. LogEntry entry = (LogEntry)input.getValueByField(FieldNames.LOG_ ENTRY); if(entry == null){ LOG.fatal( "Received null or incorrect value from tuple" ); return; } ksession.execute( entry ); if(!entry.isFilter()){ collector.emit(new Values(entry)); } 4. You next need to create the rules resource file; this can simply be done with a text editor or using the Eclipse plugin available from the update site (http:// download.jboss.org/drools/release/5.5.0.Final/org.drools. updatesite/). The rules resource file should be placed at the root of the classpath; create the file named Syslog.drl in src/main/resources and add this folder to the build path within Eclipse by right-clicking on the folder and going to Build Path | Use as source folder. 5. Add the following content to the rules resource: package storm.cookbook.log.rules import storm.cookbook.log.model.LogEntry; Step04 接着创建规则资源文件,使用文本编辑器或 Eclipse 插件就可轻松搞定。可以从更 新站点http://download.jboss.org/drools/release/5.5.0.Final/org.drools.updatesite/ 下载到相关 2.4 基于规则的日志流分析   47 Eclipse 插件。规则资源文件应该放置在 classpath 的根目录下。在 src/main/resources 下创建 Syslog.drl,然后右击该目录,依次选择 “ Build Path ”→ “ Use as source folder”菜单项,将 这个目录添加到 Eclipse 的构建路径中。 Step05 将以下内容添加到规则资源文件中: Chapter 2 61 kbuilder.add( ResourceFactory.newClassPathResource( "/Syslog.drl", getClass() ), ResourceType.DRL ); if ( kbuilder.hasErrors() ) { LOG.error( kbuilder.getErrors().toString() ); } KnowledgeBase kbase = KnowledgeBaseFactory. newKnowledgeBase(); kbase.addKnowledgePackages( kbuilder.getKnowledgePackages() ); ksession = kbase.newStatelessKnowledgeSession(); The initialization of this knowledge session includes only a single set of rules for the syslog logs. It is recommended that rules management be extracted out into Drools Guvnor, or similar, and rules resources be retrieved via an agent. This is outside the scope of this book but more details are available at the following link: http://www.jboss.org/drools/drools-guvnor.html 3. In the bolt's execute method, you need to pass the LogEntry object from the tuple into the knowledge session. LogEntry entry = (LogEntry)input.getValueByField(FieldNames.LOG_ ENTRY); if(entry == null){ LOG.fatal( "Received null or incorrect value from tuple" ); return; } ksession.execute( entry ); if(!entry.isFilter()){ collector.emit(new Values(entry)); } 4. You next need to create the rules resource file; this can simply be done with a text editor or using the Eclipse plugin available from the update site (http:// download.jboss.org/drools/release/5.5.0.Final/org.drools. updatesite/). The rules resource file should be placed at the root of the classpath; create the file named Syslog.drl in src/main/resources and add this folder to the build path within Eclipse by right-clicking on the folder and going to Build Path | Use as source folder. 5. Add the following content to the rules resource: package storm.cookbook.log.rules import storm.cookbook.log.model.LogEntry;Log Stream Processing 62 import java.util.regex.Matcher import java.util.regex.Pattern rule "Host Correction" when l: LogEntry(sourceHost == "localhost") then l.setSourceHost("localhost.example.com"); end rule "Filter By Type" when l: LogEntry(type != "syslog") then l.setFilter(true); end rule "Extract Fields" salience 100//run later when l: LogEntry(filter != true) then String logEntryPattern = "^([\\d.]+) (\\S+) (\\S+) \\ [([\\w:/]+\\s[+\\-]\\d{4})\\] \"(.+?)\" (\\d{3}) (\\d+) \"([^\"]+)\" \"([^\"]+)\""; Matcher matcher = Pattern.compile(logEntryPattern). matcher(l.getMessage()); if(matcher.find()){ l.addField("_pid",matcher.group(1)); l.addField("_src",matcher.group(2)); } end How it works… Drools supports two types of knowledge sessions, namely stateful and stateless. For this use case, a stateless session is all that is required. Stateful sessions are always to be used with caution as they can lead to performance problems. They essentially maintain facts in memory between session executions. There are use cases where this is vital; however, the nature of a forward-chaining rete engine is that it will degrade in performance exponentially as facts are added to its knowledge base. 2.4.2 解析 Drools 支持两种类型的知识会话,分别是有状态(stateful)会话和无状态(stateless)会 话。在这个例子中,我们只需要一个无状态会话就足够了。 // 推迟运行 48   第2章 日志流处理 在我们使用有状态会话的时候,需要时刻注意其可能导致的性能问题。有状 态会话主要是维持工作会话中内存的“事实”( fact),这在有些用例中十分重 要。但需要了解的是, Drools 所使用的正向推理 Rete 算法引擎的性能还是 会随着知识库中“事实”的增加而呈指数级下降。 知识会话根据已知的一组规则来评估事实。Bolt 的 prepare 方法中定义了这一系列操作, 并在该方法中指定了相关规则。在 Bolt 执行过程中,通过如下调用从 Tuple 中提取 LogEntry 并将其传递到知识会话中: Chapter 2 63 A knowledge session is used to evaluate facts against a known set of rules. This is set up within the prepare method of the bolt, with the rules provided at that point. Within the execution of the bolt, LogEntry is extracted from the tuple and passed into the knowledge session through the following call: ksession.execute( entry ); The knowledge session will act as an entry during execution and we can expect it to be potentially different once the call has completed. Contained within the LogEntry object is a control field called filter. If a rule sets this to true, the log entry is to be dropped; this is implemented by checking prior to emitting a tuple containing the entry after the rules execution. if(!entry.isFilter()){ collector.emit(new Values(entry)); } Within the rules resource file, there are three rules currently defined. f Host Correction f Filter By Type f Extract Fields These rules are implemented for demonstration purposes only and aren't necessarily viable production rules. The Host Correction rule tries to correct the host name value such that it is fully qualified. The autonomy of a rule is that, when a matching criterion is met, the result is displayed. The when clause of this rule will match against the LogEntry instance whose sourceHost field is localhost. l: LogEntry(sourceHost == "localhost") This clause also assigns any matching instance to a local variable l within the scope of this rule. The functionality specified in the then clause is simply plain old Java, which is added into the current classpath after compilation at runtime. These rules imply making the localhost value fully qualified. l.setSourceHost("localhost.example.com"); The Filter By Type rule will set the filter to true for all entries whose type doesn't match syslog. The Extract Fields rule is more interesting. Firstly, because it includes a salience value, which ensures it is evaluated last. This ensures that it never extracts fields from filtered logs. It then uses regular expression matching to extract more fields and structure from the logfile. While regular expressions are outside the scope of this book, they are widely understood and well documented. 在执行过程中会把知识会话当做一条记录进行处理,而且该记录在每次调用结束时有可 能改变。LogEntry 对象中包含一个名为 filter 的控制字段,如果将这个字段设置为 true,那 么这条日志记录将被丢弃。实现方法是:在规则执行后,发送包含记录的 Tuple 之前,检查 这个字段。 Chapter 2 63 A knowledge session is used to evaluate facts against a known set of rules. This is set up within the prepare method of the bolt, with the rules provided at that point. Within the execution of the bolt, LogEntry is extracted from the tuple and passed into the knowledge session through the following call: ksession.execute( entry ); The knowledge session will act as an entry during execution and we can expect it to be potentially different once the call has completed. Contained within the LogEntry object is a control field called filter. If a rule sets this to true, the log entry is to be dropped; this is implemented by checking prior to emitting a tuple containing the entry after the rules execution. if(!entry.isFilter()){ collector.emit(new Values(entry)); } Within the rules resource file, there are three rules currently defined. f Host Correction f Filter By Type f Extract Fields These rules are implemented for demonstration purposes only and aren't necessarily viable production rules. The Host Correction rule tries to correct the host name value such that it is fully qualified. The autonomy of a rule is that, when a matching criterion is met, the result is displayed. The when clause of this rule will match against the LogEntry instance whose sourceHost field is localhost. l: LogEntry(sourceHost == "localhost") This clause also assigns any matching instance to a local variable l within the scope of this rule. The functionality specified in the then clause is simply plain old Java, which is added into the current classpath after compilation at runtime. These rules imply making the localhost value fully qualified. l.setSourceHost("localhost.example.com"); The Filter By Type rule will set the filter to true for all entries whose type doesn't match syslog. The Extract Fields rule is more interesting. Firstly, because it includes a salience value, which ensures it is evaluated last. This ensures that it never extracts fields from filtered logs. It then uses regular expression matching to extract more fields and structure from the logfile. While regular expressions are outside the scope of this book, they are widely understood and well documented. 在规则资源文件中,定义了三个规则: (1)Host Correction (2)Filter By Type (3)Extract Fields 这些规则只是为了我们演示一些基本概念而设计的,所以在产品中可能并没有什么用。 Host Correction 规则会尝试纠正主机名以使其符合要求。该规则会在满足匹配标准时自动将 结果显示出来。该规则的 when 子句将 LogEntry 实例的 sourceHost 字段与 localhost 做匹配 操作,当满足条件时触发相关动作。 Chapter 2 63 A knowledge session is used to evaluate facts against a known set of rules. This is set up within the prepare method of the bolt, with the rules provided at that point. Within the execution of the bolt, LogEntry is extracted from the tuple and passed into the knowledge session through the following call: ksession.execute( entry ); The knowledge session will act as an entry during execution and we can expect it to be potentially different once the call has completed. Contained within the LogEntry object is a control field called filter. If a rule sets this to true, the log entry is to be dropped; this is implemented by checking prior to emitting a tuple containing the entry after the rules execution. if(!entry.isFilter()){ collector.emit(new Values(entry)); } Within the rules resource file, there are three rules currently defined. f Host Correction f Filter By Type f Extract Fields These rules are implemented for demonstration purposes only and aren't necessarily viable production rules. The Host Correction rule tries to correct the host name value such that it is fully qualified. The autonomy of a rule is that, when a matching criterion is met, the result is displayed. The when clause of this rule will match against the LogEntry instance whose sourceHost field is localhost. l: LogEntry(sourceHost == "localhost") This clause also assigns any matching instance to a local variable l within the scope of this rule. The functionality specified in the then clause is simply plain old Java, which is added into the current classpath after compilation at runtime. These rules imply making the localhost value fully qualified. l.setSourceHost("localhost.example.com"); The Filter By Type rule will set the filter to true for all entries whose type doesn't match syslog. The Extract Fields rule is more interesting. Firstly, because it includes a salience value, which ensures it is evaluated last. This ensures that it never extracts fields from filtered logs. It then uses regular expression matching to extract more fields and structure from the logfile. While regular expressions are outside the scope of this book, they are widely understood and well documented. 这个条件子句也会在规则允许的范围内把满足条件的实例赋给局部变量 l。then 子句是 一段传统的 Java 代码(POJO),它会在编译后的运行时添加到当前 classpath 中。这些规则能 让 localhost 值满足我们的需求。 Chapter 2 63 A knowledge session is used to evaluate facts against a known set of rules. This is set up within the prepare method of the bolt, with the rules provided at that point. Within the execution of the bolt, LogEntry is extracted from the tuple and passed into the knowledge session through the following call: ksession.execute( entry ); The knowledge session will act as an entry during execution and we can expect it to be potentially different once the call has completed. Contained within the LogEntry object is a control field called filter. If a rule sets this to true, the log entry is to be dropped; this is implemented by checking prior to emitting a tuple containing the entry after the rules execution. if(!entry.isFilter()){ collector.emit(new Values(entry)); } Within the rules resource file, there are three rules currently defined. f Host Correction f Filter By Type f Extract Fields These rules are implemented for demonstration purposes only and aren't necessarily viable production rules. The Host Correction rule tries to correct the host name value such that it is fully qualified. The autonomy of a rule is that, when a matching criterion is met, the result is displayed. The when clause of this rule will match against the LogEntry instance whose sourceHost field is localhost. l: LogEntry(sourceHost == "localhost") This clause also assigns any matching instance to a local variable l within the scope of this rule. The functionality specified in the then clause is simply plain old Java, which is added into the current classpath after compilation at runtime. These rules imply making the localhost value fully qualified. l.setSourceHost("localhost.example.com"); The Filter By Type rule will set the filter to true for all entries whose type doesn't match syslog. The Extract Fields rule is more interesting. Firstly, because it includes a salience value, which ensures it is evaluated last. This ensures that it never extracts fields from filtered logs. It then uses regular expression matching to extract more fields and structure from the logfile. While regular expressions are outside the scope of this book, they are widely understood and well documented. Filter By Type 规则会将所有非 syslog 类型日志记录的 filter 字段设置成 true。 Extract Fields 规则更加有趣。首先,它所包含的 salience 命令可以确保其在最后才被执 行。这样就可以避免它从已过滤的日志中提取字段。借助于正则表达式匹配,该规则可以从 日志文件中提取更多的字段和结构。虽说有关正则表达式的内容超出了本书的讨论范围,但 有不少总结得很好文档可供我们参考。 2.5 索引与持久化日志数据   49 考虑到介绍的完整性,我们在这里罗列了一些有助于删选日志记录的正则表达式示例: ‰ 匹配日期:((?<=>))\w+\x20\d+ ‰ 匹配时间:((?<=\d\x20))\d+:\d+:\d+ ‰ 匹配 IP 地址:((?<=[,]))(\d{1,3}.\d{1,3}.\d{1,3}.\d{1,3}) ‰ 匹配协议:((?<=[\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}]\s))+\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3} 可以在维基百科中找到更多有关正则表达式的内容: http://en.wikipedia.org/wiki/Regular_expression 这些附加字段可以被添加到字段或者标签当中,我们会在稍后的分析、搜索和分组操作 中使用这些字段。 Drools 中还有一个叫做Drools Fusion 的模块,用于支持复杂事件处理 (CEP)。 CEP 常常被视为一个新的企业级方法,这么讲不为过,但实际上就 是指让规则引擎能够在与时间相关的情况下进行正常的处理。它可以根据时 间来关联事件,然后导出新的知识或触发相关动作。在本节的 Bolt 实现中 就有时序算子的身影。欲了解更多的信息,可以浏览以下链接: http://www.jboss.org/drools/drools-fusion.html 2.5 索引与持久化日志数据 我们需要在某些特定时期将日志数据存储起来以便后期利用,而且还要保证这些日志数 据能够被检索。为了实现这个目标,本例中将会集成名为 Elastic Search 的开源产品,它是一 个通用并集成 RESTful API 的集群搜索引擎(http://www.elasticsearch.org/)。 2.5.1 实战 Step01  创建一个继承自BaseRichBolt 的 IndexerBolt 类,并声明org.elasticsearch.client. Client 私有成员变量。你需要在 prepare 方法中初始化它,代码如下: Log Stream Processing 64 For completeness's sake, here are some more useful examples of expressions for log entries: f Match the date: ((?<=>))\w+\x20\d+ f Match the time: ((?<=\d\x20))\d+:\d+:\d+ f Match an IP address: ((?<=[,]))(\d{1,3}.\d{1,3}.\d{1,3}.\d{1,3}) f Match the protocol: ((?<=[\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}]\s))+\ d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3} Further reading on regular expressions can be found at Wikipedia: http://en.wikipedia.org/wiki/Regular_expression These extra fields can then be added to the fields or tags and used later for analysis or search and grouping. Drools also includes a module called Drools Fusion that essentially supports Complex Event Processing (CEP). It is often referred to as an emerging enterprise approach, which may be true, but practically it simply means that the rules engine understands temporal concerns. Using temporal operators, it can correlate events over time and derive new knowledge or trigger actions. These temporal operators are supported based on the bolt implementation in this recipe. For more information, browse to the following link: http://www.jboss.org/drools/drools-fusion.html Indexing and persisting the log data Log data needs to be stored for some defined period of time in order to be useful; it also needs to be searchable. In order to achieve this, the recipe integrates with an open source product call Elastic Search, which is a general-use, clustered search engine with a RESTful API (http://www.elasticsearch.org/). How to do it… 1. Create a new BaseRichBolt class called IndexerBolt and declare the org. elasticsearch.client.Client client as a private member variable. You must initialize it as follows within the prepare method: if((Boolean)stormConf.get(backtype.storm.Config.TOPOLOGY_DEBUG) == true){ node = NodeBuilder.nodeBuilder().local(true).node(); } else { String clusterName = (String) stormConf.get(Conf.ELASTIC_ CLUSTER_NAME); Chapter 2 65 if(clusterName == null) clusterName = Conf.DEFAULT_ELASTIC_CLUSTER; node = NodeBuilder.nodeBuilder(). clusterName(clusterName).node(); } client = node.client(); 2. The LogEntry object can then be indexed during the execute method of the bolt: LogEntry entry = (LogEntry)input.getValueByField(FieldNames.LOG_ ENTRY); if(entry == null){ LOG.fatal( "Received null or incorrect value from tuple" ); return; } String toBeIndexed = entry.toJSON().toJSONString(); IndexResponse response = client.prepareIndex(INDEX_ NAME,INDEX_TYPE) .setSource(toBeIndexed) .execute().actionGet(); if(response == null) LOG.error("Failed to index Tuple: " + input.toString()); else{ if(response.getId() == null) LOG.error("Failed to index Tuple: " + input.toString()); else{ LOG.debug("Indexing success on Tuple: " + input.toString()); collector.emit(new Values(entry,response.getId())); } } 3. The unit test of this bolt is not obvious; it is therefore worthwhile to give some explanation here. Create a new JUnit 4 unit test in your test source folder under the storm.cookbook.log package. Add a private inner class called StoringMatcher as follows: private static class StoringMatcher extends BaseMatcher { private final List objects = new ArrayList(); @Override public boolean matches(Object item) { if (item instanceof Values) { objects.add((Values) item); 50   第2章 日志流处理 Step02 然后在 Bolt 的 execute 方法中建立 LogEntry 对象的索引: Chapter 2 65 if(clusterName == null) clusterName = Conf.DEFAULT_ELASTIC_CLUSTER; node = NodeBuilder.nodeBuilder(). clusterName(clusterName).node(); } client = node.client(); 2. The LogEntry object can then be indexed during the execute method of the bolt: LogEntry entry = (LogEntry)input.getValueByField(FieldNames.LOG_ ENTRY); if(entry == null){ LOG.fatal( "Received null or incorrect value from tuple" ); return; } String toBeIndexed = entry.toJSON().toJSONString(); IndexResponse response = client.prepareIndex(INDEX_ NAME,INDEX_TYPE) .setSource(toBeIndexed) .execute().actionGet(); if(response == null) LOG.error("Failed to index Tuple: " + input.toString()); else{ if(response.getId() == null) LOG.error("Failed to index Tuple: " + input.toString()); else{ LOG.debug("Indexing success on Tuple: " + input.toString()); collector.emit(new Values(entry,response.getId())); } } 3. The unit test of this bolt is not obvious; it is therefore worthwhile to give some explanation here. Create a new JUnit 4 unit test in your test source folder under the storm.cookbook.log package. Add a private inner class called StoringMatcher as follows: private static class StoringMatcher extends BaseMatcher { private final List objects = new ArrayList(); @Override public boolean matches(Object item) { if (item instanceof Values) { objects.add((Values) item); ENTRY); Chapter 2 65 if(clusterName == null) clusterName = Conf.DEFAULT_ELASTIC_CLUSTER; node = NodeBuilder.nodeBuilder(). clusterName(clusterName).node(); } client = node.client(); 2. The LogEntry object can then be indexed during the execute method of the bolt: LogEntry entry = (LogEntry)input.getValueByField(FieldNames.LOG_ ENTRY); if(entry == null){ LOG.fatal( "Received null or incorrect value from tuple" ); return; } String toBeIndexed = entry.toJSON().toJSONString(); IndexResponse response = client.prepareIndex(INDEX_ NAME,INDEX_TYPE) .setSource(toBeIndexed) .execute().actionGet(); if(response == null) LOG.error("Failed to index Tuple: " + input.toString()); else{ if(response.getId() == null) LOG.error("Failed to index Tuple: " + input.toString()); else{ LOG.debug("Indexing success on Tuple: " + input.toString()); collector.emit(new Values(entry,response.getId())); } } 3. The unit test of this bolt is not obvious; it is therefore worthwhile to give some explanation here. Create a new JUnit 4 unit test in your test source folder under the storm.cookbook.log package. Add a private inner class called StoringMatcher as follows: private static class StoringMatcher extends BaseMatcher { private final List objects = new ArrayList(); @Override public boolean matches(Object item) { if (item instanceof Values) { objects.add((Values) item); Step03  创建这个Bolt 的单元测试可能没那么容易,因此值得在这里对此多补充几 句。在测试源码目录中的storm.cookbook.log 包里新建一个 JUnit 4 单元测试,然后添加 StoringMatcher 私有内部类,代码如下: Chapter 2 65 if(clusterName == null) clusterName = Conf.DEFAULT_ELASTIC_CLUSTER; node = NodeBuilder.nodeBuilder(). clusterName(clusterName).node(); } client = node.client(); 2. The LogEntry object can then be indexed during the execute method of the bolt: LogEntry entry = (LogEntry)input.getValueByField(FieldNames.LOG_ ENTRY); if(entry == null){ LOG.fatal( "Received null or incorrect value from tuple" ); return; } String toBeIndexed = entry.toJSON().toJSONString(); IndexResponse response = client.prepareIndex(INDEX_ NAME,INDEX_TYPE) .setSource(toBeIndexed) .execute().actionGet(); if(response == null) LOG.error("Failed to index Tuple: " + input.toString()); else{ if(response.getId() == null) LOG.error("Failed to index Tuple: " + input.toString()); else{ LOG.debug("Indexing success on Tuple: " + input.toString()); collector.emit(new Values(entry,response.getId())); } } 3. The unit test of this bolt is not obvious; it is therefore worthwhile to give some explanation here. Create a new JUnit 4 unit test in your test source folder under the storm.cookbook.log package. Add a private inner class called StoringMatcher as follows: private static class StoringMatcher extends BaseMatcher { private final List objects = new ArrayList(); @Override public boolean matches(Object item) { if (item instanceof Values) { objects.add((Values) item);Log Stream Processing 66 return true; } return false; } @Override public void describeTo(Description description) { description.appendText("any integer"); } public Values getLastValue() { return objects.remove(0); } } 4. Then implement the actual test method as follows: @Test public void testIndexing() throws IOException { //Config, ensure we are in debug mode Map config = new HashMap(); config.put(backtype.storm.Config.TOPOLOGY_DEBUG, true); Node node = NodeBuilder.nodeBuilder() .local(true).node(); Client client = node.client(); final OutputCollector collector = context.mock(OutputCollector.class); IndexerBolt bolt = new IndexerBolt(); bolt.prepare(config, null, collector); final LogEntry entry = getEntry(); final Tuple tuple = getTuple(); final StoringMatcher matcher = new StoringMatcher(); context.checking(new Expectations(){{ oneOf(tuple).getValueByField(FieldNames.LOG_ENTRY);will (returnValue(entry)); oneOf(collector).emit(with(matcher)); }}); bolt.execute(tuple); context.assertIsSatisfied(); //get the ID for the index String id = (String) matcher.getLastValue().get(1); 2.5 索引与持久化日志数据   51 Step04 然后实现真正的测试逻辑,代码如下: Log Stream Processing 66 return true; } return false; } @Override public void describeTo(Description description) { description.appendText("any integer"); } public Values getLastValue() { return objects.remove(0); } } 4. Then implement the actual test method as follows: @Test public void testIndexing() throws IOException { //Config, ensure we are in debug mode Map config = new HashMap(); config.put(backtype.storm.Config.TOPOLOGY_DEBUG, true); Node node = NodeBuilder.nodeBuilder() .local(true).node(); Client client = node.client(); final OutputCollector collector = context.mock(OutputCollector.class); IndexerBolt bolt = new IndexerBolt(); bolt.prepare(config, null, collector); final LogEntry entry = getEntry(); final Tuple tuple = getTuple(); final StoringMatcher matcher = new StoringMatcher(); context.checking(new Expectations(){{ oneOf(tuple).getValueByField(FieldNames.LOG_ENTRY);will (returnValue(entry)); oneOf(collector).emit(with(matcher)); }}); bolt.execute(tuple); context.assertIsSatisfied(); //get the ID for the index String id = (String) matcher.getLastValue().get(1); Chapter 2 67 //Check that the indexing working GetResponse response = client.prepareGet(IndexerBolt.INDEX_NAME ,IndexerBolt.INDEX_TYPE,id) .execute() .actionGet(); assertTrue(response.exists()); } How it works… Elastic Search provides a complete client API for Java (given that it is implemented in Java), making integration with it quite trivial. The prepare method of the bolt will create a cluster node in either the local or clustered mode. The cluster mode will join an existing cluster based on the name provided with a local storage node being created on the current node; this prevents the double-hop latency of a write over a different transport. Elastic Search is a large complex system in its own right; it is recommended that you read the provided documentation in order to understand the operational and provisioning concerns. When Storm is in the debug mode, the Elastic Search node will run an embedded cluster, with many nodes (if requested) being executed within the same JVM. This is obviously useful for unit testing purposes. This is all enabled in the prepare method of the bolt. if((Boolean)stormConf.get(backtype.storm.Config.TOPOLOGY_DEBUG) == true){ node = NodeBuilder.nodeBuilder().local(true).node(); } else { When a tuple is received, the LogEntry object is extracted and the JSON contents of LogEntry are sent to Elastic Search. String toBeIndexed = entry.toJSON().toJSONString(); IndexResponse response = client.prepareIndex(INDEX_NAME,INDEX_TYPE) .setSource(toBeIndexed) .execute().actionGet(); The ID of the log within the Elastic Search cluster is then extracted from the response and emitted with the LogEntry objects to downstream bolts. In this particular recipe, we will only use this value for unit testing; however, downstream bolts could easily be added to persist this value against some log statistics that would be extremely useful within a user interface for drilldown purposes. collector.emit(new Values(entry,response.getId())); 2.5.2 解析 Elastic Search 提供了完整的 Java 客户端 API(它本身也是用 Java 实现的),因此与它集 成轻而易举。 Bolt 的 prepare 方法会在本地模式或集群模式下创建一个集群节点。集群模式 会将根据名称获得的集群和在当前节点上创建的本地存储节点连接起来,这样就可以避免在 使用不同传输方式进行写操作时发生的双跃点延迟问题。 Elastic Search 本身是一个大型复杂系统,为了更好地理解操作和配置方面的 问题,建议你先读一读它所提供的文档。 // 配置,保证处于调试模式 // 获取索引的 ID // 检查索引是否正常工作 52   第2章 日志流处理 当 Storm 处于调试模式时,Elastic Search 节点将连同多个在相同 JVM 中被执行的节点 (如果需要的话)运行嵌入式集群。这显然对单元测试来说大有裨益。所有这些操作都会在 Bolt 的 prepare 方法中实现。 Chapter 2 67 //Check that the indexing working GetResponse response = client.prepareGet(IndexerBolt.INDEX_NAME ,IndexerBolt.INDEX_TYPE,id) .execute() .actionGet(); assertTrue(response.exists()); } How it works… Elastic Search provides a complete client API for Java (given that it is implemented in Java), making integration with it quite trivial. The prepare method of the bolt will create a cluster node in either the local or clustered mode. The cluster mode will join an existing cluster based on the name provided with a local storage node being created on the current node; this prevents the double-hop latency of a write over a different transport. Elastic Search is a large complex system in its own right; it is recommended that you read the provided documentation in order to understand the operational and provisioning concerns. When Storm is in the debug mode, the Elastic Search node will run an embedded cluster, with many nodes (if requested) being executed within the same JVM. This is obviously useful for unit testing purposes. This is all enabled in the prepare method of the bolt. if((Boolean)stormConf.get(backtype.storm.Config.TOPOLOGY_DEBUG) == true){ node = NodeBuilder.nodeBuilder().local(true).node(); } else { When a tuple is received, the LogEntry object is extracted and the JSON contents of LogEntry are sent to Elastic Search. String toBeIndexed = entry.toJSON().toJSONString(); IndexResponse response = client.prepareIndex(INDEX_NAME,INDEX_TYPE) .setSource(toBeIndexed) .execute().actionGet(); The ID of the log within the Elastic Search cluster is then extracted from the response and emitted with the LogEntry objects to downstream bolts. In this particular recipe, we will only use this value for unit testing; however, downstream bolts could easily be added to persist this value against some log statistics that would be extremely useful within a user interface for drilldown purposes. collector.emit(new Values(entry,response.getId())); 当收到 Tuple 时,Bolt 会获取其中的 LogEntry 对象,并将其转化成对应的 JSON 格式内 容,然后发送给 Elastic Search。 Chapter 2 67 //Check that the indexing working GetResponse response = client.prepareGet(IndexerBolt.INDEX_NAME ,IndexerBolt.INDEX_TYPE,id) .execute() .actionGet(); assertTrue(response.exists()); } How it works… Elastic Search provides a complete client API for Java (given that it is implemented in Java), making integration with it quite trivial. The prepare method of the bolt will create a cluster node in either the local or clustered mode. The cluster mode will join an existing cluster based on the name provided with a local storage node being created on the current node; this prevents the double-hop latency of a write over a different transport. Elastic Search is a large complex system in its own right; it is recommended that you read the provided documentation in order to understand the operational and provisioning concerns. When Storm is in the debug mode, the Elastic Search node will run an embedded cluster, with many nodes (if requested) being executed within the same JVM. This is obviously useful for unit testing purposes. This is all enabled in the prepare method of the bolt. if((Boolean)stormConf.get(backtype.storm.Config.TOPOLOGY_DEBUG) == true){ node = NodeBuilder.nodeBuilder().local(true).node(); } else { When a tuple is received, the LogEntry object is extracted and the JSON contents of LogEntry are sent to Elastic Search. String toBeIndexed = entry.toJSON().toJSONString(); IndexResponse response = client.prepareIndex(INDEX_NAME,INDEX_TYPE) .setSource(toBeIndexed) .execute().actionGet(); The ID of the log within the Elastic Search cluster is then extracted from the response and emitted with the LogEntry objects to downstream bolts. In this particular recipe, we will only use this value for unit testing; however, downstream bolts could easily be added to persist this value against some log statistics that would be extremely useful within a user interface for drilldown purposes. collector.emit(new Values(entry,response.getId())); 接着从 Elastic Search 集群的响应(response)中获取日志 ID,并连同 LogEntry 一起发送 给下游的 Bolt。在这个例子中,我们只将这个值用于单元测试。不管怎么说,我们都可以很 容易地通过添加下游的 Bolt 来持久化这个值,进而将该值用于日志统计信息,这么做能为开 发用户界面带来极大便利。 Chapter 2 67 //Check that the indexing working GetResponse response = client.prepareGet(IndexerBolt.INDEX_NAME ,IndexerBolt.INDEX_TYPE,id) .execute() .actionGet(); assertTrue(response.exists()); } How it works… Elastic Search provides a complete client API for Java (given that it is implemented in Java), making integration with it quite trivial. The prepare method of the bolt will create a cluster node in either the local or clustered mode. The cluster mode will join an existing cluster based on the name provided with a local storage node being created on the current node; this prevents the double-hop latency of a write over a different transport. Elastic Search is a large complex system in its own right; it is recommended that you read the provided documentation in order to understand the operational and provisioning concerns. When Storm is in the debug mode, the Elastic Search node will run an embedded cluster, with many nodes (if requested) being executed within the same JVM. This is obviously useful for unit testing purposes. This is all enabled in the prepare method of the bolt. if((Boolean)stormConf.get(backtype.storm.Config.TOPOLOGY_DEBUG) == true){ node = NodeBuilder.nodeBuilder().local(true).node(); } else { When a tuple is received, the LogEntry object is extracted and the JSON contents of LogEntry are sent to Elastic Search. String toBeIndexed = entry.toJSON().toJSONString(); IndexResponse response = client.prepareIndex(INDEX_NAME,INDEX_TYPE) .setSource(toBeIndexed) .execute().actionGet(); The ID of the log within the Elastic Search cluster is then extracted from the response and emitted with the LogEntry objects to downstream bolts. In this particular recipe, we will only use this value for unit testing; however, downstream bolts could easily be added to persist this value against some log statistics that would be extremely useful within a user interface for drilldown purposes. collector.emit(new Values(entry,response.getId())); 对这个 Bolt 进行单元测试十分需要讲究技巧。这是因为对于一般的单元测试来说,我们 在执行单元测试之前就已预知输出结果。但在这个示例中,只有在接收到 Elastic Search 集群 的响应后,我们才能知道 ID 的具体值。这使得我们很难提前指定预期输出结果,更别提验 证搜索引擎中的日志了。所以为了实现这个目标,我们使用了一个 JMock 自定义匹配器。在 matches 方法中实现了该自定义匹配器的主要逻辑。 Log Stream Processing 68 The unit test for this particular bolt is quite tricky. This is because, in a typical unit test, we know what the expected outcome is before we run the test. In this case, we don't know the ID until we have received the response from the Elastic Search cluster. This makes expressing expectations difficult, especially if we want to validate the log in the search engine. To achieve this, we make use of a custom matcher for JMock. The key method in the custom matcher is the matches method. public boolean matches(Object item) { if (item instanceof Values) { objects.add((Values) item); return true; } return false; } This method simply ensures that an instance of Values is returned but it also holds onto the value for later evaluation. This allows us to set the following set of expectations: context.checking(new Expectations(){{ oneOf(tuple).getValueByField(FieldNames .LOG_ENTRY);will(returnValue(entry)); oneOf(collector).emit(with(matcher)); }}); And then retrieve the record ID and validate it against the embedded Elastic Search cluster. String id = (String) matcher.getLastValue().get(1); GetResponse response = client.prepareGet(IndexerBolt .INDEX_NAME,IndexerBolt.INDEX_TYPE,id) .execute() .actionGet(); assertTrue(response.exists()); If you would like to be able to search the logfiles in the cluster, download and install the excellent log search front engine, Kibana, from kibana. org. This recipe has maintained the JSON log structure from logstash and Kibana is designed as the frontend for logstash on Elastic Search; it will work seamlessly with this recipe. It also uses the Twitter Bootstrap GUI framework, meaning that you can integrate it with the analytics dashboard quite easily. Counting and persisting log statistics There are many statistics that can be gathered for log streams; for the purposes of this recipe and to illustrate the concept, only a single-time series will be dealt with (log volume per minute); however, this should fully illustrate the design and approach for implementing other analyses. 该方法能够确保返回 Values 的实例,并将其保存起来。我们会在后续的计算操作用到这 个值。有了这个实例,就可以指定以下期望集合: Log Stream Processing 68 The unit test for this particular bolt is quite tricky. This is because, in a typical unit test, we know what the expected outcome is before we run the test. In this case, we don't know the ID until we have received the response from the Elastic Search cluster. This makes expressing expectations difficult, especially if we want to validate the log in the search engine. To achieve this, we make use of a custom matcher for JMock. The key method in the custom matcher is the matches method. public boolean matches(Object item) { if (item instanceof Values) { objects.add((Values) item); return true; } return false; } This method simply ensures that an instance of Values is returned but it also holds onto the value for later evaluation. This allows us to set the following set of expectations: context.checking(new Expectations(){{ oneOf(tuple).getValueByField(FieldNames .LOG_ENTRY);will(returnValue(entry)); oneOf(collector).emit(with(matcher)); }}); And then retrieve the record ID and validate it against the embedded Elastic Search cluster. String id = (String) matcher.getLastValue().get(1); GetResponse response = client.prepareGet(IndexerBolt .INDEX_NAME,IndexerBolt.INDEX_TYPE,id) .execute() .actionGet(); assertTrue(response.exists()); If you would like to be able to search the logfiles in the cluster, download and install the excellent log search front engine, Kibana, from kibana. org. This recipe has maintained the JSON log structure from logstash and Kibana is designed as the frontend for logstash on Elastic Search; it will work seamlessly with this recipe. It also uses the Twitter Bootstrap GUI framework, meaning that you can integrate it with the analytics dashboard quite easily. Counting and persisting log statistics There are many statistics that can be gathered for log streams; for the purposes of this recipe and to illustrate the concept, only a single-time series will be dealt with (log volume per minute); however, this should fully illustrate the design and approach for implementing other analyses. 2.6 统计与持久化日志统计信息   53 并获取记录 ID,根据嵌入式 Elastic Search 集群来验证它。 Log Stream Processing 68 The unit test for this particular bolt is quite tricky. This is because, in a typical unit test, we know what the expected outcome is before we run the test. In this case, we don't know the ID until we have received the response from the Elastic Search cluster. This makes expressing expectations difficult, especially if we want to validate the log in the search engine. To achieve this, we make use of a custom matcher for JMock. The key method in the custom matcher is the matches method. public boolean matches(Object item) { if (item instanceof Values) { objects.add((Values) item); return true; } return false; } This method simply ensures that an instance of Values is returned but it also holds onto the value for later evaluation. This allows us to set the following set of expectations: context.checking(new Expectations(){{ oneOf(tuple).getValueByField(FieldNames .LOG_ENTRY);will(returnValue(entry)); oneOf(collector).emit(with(matcher)); }}); And then retrieve the record ID and validate it against the embedded Elastic Search cluster. String id = (String) matcher.getLastValue().get(1); GetResponse response = client.prepareGet(IndexerBolt .INDEX_NAME,IndexerBolt.INDEX_TYPE,id) .execute() .actionGet(); assertTrue(response.exists()); If you would like to be able to search the logfiles in the cluster, download and install the excellent log search front engine, Kibana, from kibana. org. This recipe has maintained the JSON log structure from logstash and Kibana is designed as the frontend for logstash on Elastic Search; it will work seamlessly with this recipe. It also uses the Twitter Bootstrap GUI framework, meaning that you can integrate it with the analytics dashboard quite easily. Counting and persisting log statistics There are many statistics that can be gathered for log streams; for the purposes of this recipe and to illustrate the concept, only a single-time series will be dealt with (log volume per minute); however, this should fully illustrate the design and approach for implementing other analyses. 若想要实现在集群中搜索日志文件的功能,可以从 kibna.org 下载并安装 Kibana—一款优秀的日志搜索前端引擎。本例通过来自 logstash 的 JSON 日志结构来维护信息,而 Kibana 可作为 Elastic Search 上 logstash 的前端, 所以它能与本例所使用的系统进行无缝集成。除此之外, Kibana 还使用了 Twitter Bootstrap 框架,因此你可以非常简单地将其集成到分析面板里。 2.6 统计与持久化日志统计信息 我们其实可以收集很多日志流的统计信息,但由于本节只是为了展示一些基本概念,因 此我们只准备收集和处理单时间序列(每分钟)日志量。虽说如此,但这应该已经足以展示 有关的设计和方法了,这些技巧能够帮助你在未来实现其他分析工作。 2.6.1 实战 Step01 下载 storm-cassandra contrib 项目并将其安装到 Maven 仓库中: Chapter 2 69 How to do it… 1. Download and install the storm-cassandra contrib project into your Maven repository: git clone https://github.com/quintona/storm-cassandra cd storm-cassandra mvn clean install 2. Create a new BaseRichBolt class called VolumeCountingBolt in the storm. cookbook.log package. The bolt must declare three output fields: public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields(FIELD_ROW_KEY, FIELD_COLUMN, FIELD_INCREMENT)); } 3. Then implement a static utility method to derive the minute representation of the log's time: public static Long getMinuteForTime(Date time) { Calendar c = Calendar.getInstance(); c.setTime(time); c.set(Calendar.SECOND,0); c.set(Calendar.MILLISECOND, 0); return c.getTimeInMillis(); } 4. Implement the execute method (yes, it is that short): LogEntry entry = (LogEntry) input.getValueByField(FieldNames.LOG_ ENTRY); collector.emit(new Values(getMinuteForTime(entry. getTimestamp()), entry.getSource(),1L)); 5. Finally, create the LogTopology class as per the pattern presented in Chapter 1, Setting Up Your Development Environment, and create the topology as follows: builder.setSpout("logSpout", new LogSpout(), 10); builder.setBolt("logRules", new LogRulesBolt(), 10). shuffleGrouping("logSpout"); builder.setBolt("indexer", new IndexerBolt(), 10). shuffleGrouping("logRules"); builder.setBolt("counter", new VolumeCountingBolt(), 10). shuffleGrouping("logRules"); CassandraCounterBatchingBolt logPersistenceBolt = new CassandraCounterBatchingBolt( Conf.COUNT_CF_NAME, VolumeCountingBolt.FIELD_ROW_KEY, Step02 在 storm.cookbook.log 包中,创建一个继承自 BaseRichBolt 的类 VolumeCountingBolt。 为该 Bolt 声明以下三个字段: Chapter 2 69 How to do it… 1. Download and install the storm-cassandra contrib project into your Maven repository: git clone https://github.com/quintona/storm-cassandra cd storm-cassandra mvn clean install 2. Create a new BaseRichBolt class called VolumeCountingBolt in the storm. cookbook.log package. The bolt must declare three output fields: public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields(FIELD_ROW_KEY, FIELD_COLUMN, FIELD_INCREMENT)); } 3. Then implement a static utility method to derive the minute representation of the log's time: public static Long getMinuteForTime(Date time) { Calendar c = Calendar.getInstance(); c.setTime(time); c.set(Calendar.SECOND,0); c.set(Calendar.MILLISECOND, 0); return c.getTimeInMillis(); } 4. Implement the execute method (yes, it is that short): LogEntry entry = (LogEntry) input.getValueByField(FieldNames.LOG_ ENTRY); collector.emit(new Values(getMinuteForTime(entry. getTimestamp()), entry.getSource(),1L)); 5. Finally, create the LogTopology class as per the pattern presented in Chapter 1, Setting Up Your Development Environment, and create the topology as follows: builder.setSpout("logSpout", new LogSpout(), 10); builder.setBolt("logRules", new LogRulesBolt(), 10). shuffleGrouping("logSpout"); builder.setBolt("indexer", new IndexerBolt(), 10). shuffleGrouping("logRules"); builder.setBolt("counter", new VolumeCountingBolt(), 10). shuffleGrouping("logRules"); CassandraCounterBatchingBolt logPersistenceBolt = new CassandraCounterBatchingBolt( Conf.COUNT_CF_NAME, VolumeCountingBolt.FIELD_ROW_KEY, Step03 然后实现一个静态工具方法,该方法的作用是获取日志的时间信息,按分钟形式 表示出来: Chapter 2 69 How to do it… 1. Download and install the storm-cassandra contrib project into your Maven repository: git clone https://github.com/quintona/storm-cassandra cd storm-cassandra mvn clean install 2. Create a new BaseRichBolt class called VolumeCountingBolt in the storm. cookbook.log package. The bolt must declare three output fields: public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields(FIELD_ROW_KEY, FIELD_COLUMN, FIELD_INCREMENT)); } 3. Then implement a static utility method to derive the minute representation of the log's time: public static Long getMinuteForTime(Date time) { Calendar c = Calendar.getInstance(); c.setTime(time); c.set(Calendar.SECOND,0); c.set(Calendar.MILLISECOND, 0); return c.getTimeInMillis(); } 4. Implement the execute method (yes, it is that short): LogEntry entry = (LogEntry) input.getValueByField(FieldNames.LOG_ ENTRY); collector.emit(new Values(getMinuteForTime(entry. getTimestamp()), entry.getSource(),1L)); 5. Finally, create the LogTopology class as per the pattern presented in Chapter 1, Setting Up Your Development Environment, and create the topology as follows: builder.setSpout("logSpout", new LogSpout(), 10); builder.setBolt("logRules", new LogRulesBolt(), 10). shuffleGrouping("logSpout"); builder.setBolt("indexer", new IndexerBolt(), 10). shuffleGrouping("logRules"); builder.setBolt("counter", new VolumeCountingBolt(), 10). shuffleGrouping("logRules"); CassandraCounterBatchingBolt logPersistenceBolt = new CassandraCounterBatchingBolt( Conf.COUNT_CF_NAME, VolumeCountingBolt.FIELD_ROW_KEY, 54   第2章 日志流处理 Chapter 2 69 How to do it… 1. Download and install the storm-cassandra contrib project into your Maven repository: git clone https://github.com/quintona/storm-cassandra cd storm-cassandra mvn clean install 2. Create a new BaseRichBolt class called VolumeCountingBolt in the storm. cookbook.log package. The bolt must declare three output fields: public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields(FIELD_ROW_KEY, FIELD_COLUMN, FIELD_INCREMENT)); } 3. Then implement a static utility method to derive the minute representation of the log's time: public static Long getMinuteForTime(Date time) { Calendar c = Calendar.getInstance(); c.setTime(time); c.set(Calendar.SECOND,0); c.set(Calendar.MILLISECOND, 0); return c.getTimeInMillis(); } 4. Implement the execute method (yes, it is that short): LogEntry entry = (LogEntry) input.getValueByField(FieldNames.LOG_ ENTRY); collector.emit(new Values(getMinuteForTime(entry. getTimestamp()), entry.getSource(),1L)); 5. Finally, create the LogTopology class as per the pattern presented in Chapter 1, Setting Up Your Development Environment, and create the topology as follows: builder.setSpout("logSpout", new LogSpout(), 10); builder.setBolt("logRules", new LogRulesBolt(), 10). shuffleGrouping("logSpout"); builder.setBolt("indexer", new IndexerBolt(), 10). shuffleGrouping("logRules"); builder.setBolt("counter", new VolumeCountingBolt(), 10). shuffleGrouping("logRules"); CassandraCounterBatchingBolt logPersistenceBolt = new CassandraCounterBatchingBolt( Conf.COUNT_CF_NAME, VolumeCountingBolt.FIELD_ROW_KEY, Step04 实现 execute 方法(没错,函数体很短): Chapter 2 69 How to do it… 1. Download and install the storm-cassandra contrib project into your Maven repository: git clone https://github.com/quintona/storm-cassandra cd storm-cassandra mvn clean install 2. Create a new BaseRichBolt class called VolumeCountingBolt in the storm. cookbook.log package. The bolt must declare three output fields: public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields(FIELD_ROW_KEY, FIELD_COLUMN, FIELD_INCREMENT)); } 3. Then implement a static utility method to derive the minute representation of the log's time: public static Long getMinuteForTime(Date time) { Calendar c = Calendar.getInstance(); c.setTime(time); c.set(Calendar.SECOND,0); c.set(Calendar.MILLISECOND, 0); return c.getTimeInMillis(); } 4. Implement the execute method (yes, it is that short): LogEntry entry = (LogEntry) input.getValueByField(FieldNames.LOG_ ENTRY); collector.emit(new Values(getMinuteForTime(entry. getTimestamp()), entry.getSource(),1L)); 5. Finally, create the LogTopology class as per the pattern presented in Chapter 1, Setting Up Your Development Environment, and create the topology as follows: builder.setSpout("logSpout", new LogSpout(), 10); builder.setBolt("logRules", new LogRulesBolt(), 10). shuffleGrouping("logSpout"); builder.setBolt("indexer", new IndexerBolt(), 10). shuffleGrouping("logRules"); builder.setBolt("counter", new VolumeCountingBolt(), 10). shuffleGrouping("logRules"); CassandraCounterBatchingBolt logPersistenceBolt = new CassandraCounterBatchingBolt( Conf.COUNT_CF_NAME, VolumeCountingBolt.FIELD_ROW_KEY, Step05 最后,参照第 1 章中的方法来创建 LogTopology 类,并按照如下方法创建 Topology: Chapter 2 69 How to do it… 1. Download and install the storm-cassandra contrib project into your Maven repository: git clone https://github.com/quintona/storm-cassandra cd storm-cassandra mvn clean install 2. Create a new BaseRichBolt class called VolumeCountingBolt in the storm. cookbook.log package. The bolt must declare three output fields: public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields(FIELD_ROW_KEY, FIELD_COLUMN, FIELD_INCREMENT)); } 3. Then implement a static utility method to derive the minute representation of the log's time: public static Long getMinuteForTime(Date time) { Calendar c = Calendar.getInstance(); c.setTime(time); c.set(Calendar.SECOND,0); c.set(Calendar.MILLISECOND, 0); return c.getTimeInMillis(); } 4. Implement the execute method (yes, it is that short): LogEntry entry = (LogEntry) input.getValueByField(FieldNames.LOG_ ENTRY); collector.emit(new Values(getMinuteForTime(entry. getTimestamp()), entry.getSource(),1L)); 5. Finally, create the LogTopology class as per the pattern presented in Chapter 1, Setting Up Your Development Environment, and create the topology as follows: builder.setSpout("logSpout", new LogSpout(), 10); builder.setBolt("logRules", new LogRulesBolt(), 10). shuffleGrouping("logSpout"); builder.setBolt("indexer", new IndexerBolt(), 10). shuffleGrouping("logRules"); builder.setBolt("counter", new VolumeCountingBolt(), 10). shuffleGrouping("logRules"); CassandraCounterBatchingBolt logPersistenceBolt = new CassandraCounterBatchingBolt( Conf.COUNT_CF_NAME, VolumeCountingBolt.FIELD_ROW_KEY, Log Stream Processing 70 VolumeCountingBolt.FIELD_INCREMENT ); logPersistenceBolt.setAckStrategy (AckStrategy.ACK_ON_RECEIVE); builder.setBolt("countPersistor", logPersistenceBolt, 10).shuffleGrouping("counter"); conf.put(Conf.REDIS_PORT_KEY, Conf.DEFAULT_JEDIS_PORT); conf.put(CassandraBolt.CASSANDRA_KEYSPACE, Conf.LOGGING_KEYSPACE); How it works… This implementation looks surprisingly simple, and it is. It makes use of the storm- cassandra project (a type of storm-contrib) to abstract all the persistence complexity away. The storm-cassandra project integrates Storm and Cassandra by providing a generic and configurable backtype.storm.Bolt implementation that writes the StormTuple objects to a Cassandra column family. Cassandra is a column family database (http://cassandra.apache.org/). Cassandra's column family data model offers the convenience of column indexes with the performance of log-structured updates, strong support for materialized view, and powerful built-in caching. A recent addition to the Cassandra functionality is that of counter columns, which are essentially persistent columns, within a given column family, that can be incremented safely from anywhere in the cluster. The storm-cassandra project provides two styles of persistence, firstly for standard Cassandra column families and secondly for counter-based columns. We will focus on the second type, as it is appropriate for our use case; you can read about the other style on the project's README file but it is essentially the same. An instance of the CassandraCounterBatchingBolt class does all the work for us. It expects to be told which column family to use, which tuple field to use for the row key and which tuple field to use for the increment amount. It will then increment columns by that amount based on the remaining fields in the tuple. Consider the following constructor: CassandraCounterBatchingBolt logPersistenceBolt = new CassandraCounter BatchingBolt("columnFamily", "RowKeyField", "IncrementAmountField" ); And the following tuple as input: {rowKey: 12345, IncrementAmount: 1L, IncrementColumn: 'SomeCounter'} This will increment the SomeCounter counter column by 1L in the columnFamily column family. 2.6.2 解析 该实现方法看起来简单得让人感到不可思议,但事实确实如此。在实现过程中我们使 用了 storm-cassandra 项目(该项目属于 storm-contrib 组件)来对所有持久化复杂性进行抽 象。storm-cassandra 项目通过一个通用且可配置的 backtype.storm.Bolt 实现来集成 Storm 和 Cassandra,该 Bolt 的作用是将 StormTuple 对象写入 Cassandra 列族。 Cassandra 是一个面向列族的数据库系统(http://cassandra.apache.org/)。 Cassandra 的列 族数据模型在提供二级索引 便利的同时还具备日志式结构更新的存储性能,另外也提供了 健壮的物化视图支持和强大的内置缓存。Cassandra 最新的一个功能是计数器列。你可以在 集群上的任何地方,通过持久化指定计数器列族中的计数器列来实现计数器的安全自增。2 storm-cassandra 项目支持两种持久化风格,第一种是标准的 Cassandra 列族风格,第二 种是基于计数器列的风格。我们在这里只关注第二种类型的风格,因为它非常适用于当前这   早期版本的Cassandra仅仅支持行键索引(Row Key Index),而现在的版本支持新的列索引(Column Index)。原来的行键索引称为一级索引,而列索引称为二级索引。—译者注 2.7 为日志流集群创建集成测试   55 个例子。你可以阅读项目的 README 文件中有关其他风格的介绍,基本上都差不多。 CassandraCounterBatchingBolt 类的实例就能帮我们搞定所有工作 。只需告诉它我们要 使用哪个列族,哪个 Tuple 字段作为行键,哪个 Tuple 字段作为增量,而 Tuple 中剩下的那 个字段则指明了需要更新的计数器列。 我们来看看下面的构造函数: Log Stream Processing 70 VolumeCountingBolt.FIELD_INCREMENT ); logPersistenceBolt.setAckStrategy (AckStrategy.ACK_ON_RECEIVE); builder.setBolt("countPersistor", logPersistenceBolt, 10).shuffleGrouping("counter"); conf.put(Conf.REDIS_PORT_KEY, Conf.DEFAULT_JEDIS_PORT); conf.put(CassandraBolt.CASSANDRA_KEYSPACE, Conf.LOGGING_KEYSPACE); How it works… This implementation looks surprisingly simple, and it is. It makes use of the storm- cassandra project (a type of storm-contrib) to abstract all the persistence complexity away. The storm-cassandra project integrates Storm and Cassandra by providing a generic and configurable backtype.storm.Bolt implementation that writes the StormTuple objects to a Cassandra column family. Cassandra is a column family database (http://cassandra.apache.org/). Cassandra's column family data model offers the convenience of column indexes with the performance of log-structured updates, strong support for materialized view, and powerful built-in caching. A recent addition to the Cassandra functionality is that of counter columns, which are essentially persistent columns, within a given column family, that can be incremented safely from anywhere in the cluster. The storm-cassandra project provides two styles of persistence, firstly for standard Cassandra column families and secondly for counter-based columns. We will focus on the second type, as it is appropriate for our use case; you can read about the other style on the project's README file but it is essentially the same. An instance of the CassandraCounterBatchingBolt class does all the work for us. It expects to be told which column family to use, which tuple field to use for the row key and which tuple field to use for the increment amount. It will then increment columns by that amount based on the remaining fields in the tuple. Consider the following constructor: CassandraCounterBatchingBolt logPersistenceBolt = new CassandraCounter BatchingBolt("columnFamily", "RowKeyField", "IncrementAmountField" ); And the following tuple as input: {rowKey: 12345, IncrementAmount: 1L, IncrementColumn: 'SomeCounter'} This will increment the SomeCounter counter column by 1L in the columnFamily column family. 以及以下作为输入的元组数据: Log Stream Processing 70 VolumeCountingBolt.FIELD_INCREMENT ); logPersistenceBolt.setAckStrategy (AckStrategy.ACK_ON_RECEIVE); builder.setBolt("countPersistor", logPersistenceBolt, 10).shuffleGrouping("counter"); conf.put(Conf.REDIS_PORT_KEY, Conf.DEFAULT_JEDIS_PORT); conf.put(CassandraBolt.CASSANDRA_KEYSPACE, Conf.LOGGING_KEYSPACE); How it works… This implementation looks surprisingly simple, and it is. It makes use of the storm- cassandra project (a type of storm-contrib) to abstract all the persistence complexity away. The storm-cassandra project integrates Storm and Cassandra by providing a generic and configurable backtype.storm.Bolt implementation that writes the StormTuple objects to a Cassandra column family. Cassandra is a column family database (http://cassandra.apache.org/). Cassandra's column family data model offers the convenience of column indexes with the performance of log-structured updates, strong support for materialized view, and powerful built-in caching. A recent addition to the Cassandra functionality is that of counter columns, which are essentially persistent columns, within a given column family, that can be incremented safely from anywhere in the cluster. The storm-cassandra project provides two styles of persistence, firstly for standard Cassandra column families and secondly for counter-based columns. We will focus on the second type, as it is appropriate for our use case; you can read about the other style on the project's README file but it is essentially the same. An instance of the CassandraCounterBatchingBolt class does all the work for us. It expects to be told which column family to use, which tuple field to use for the row key and which tuple field to use for the increment amount. It will then increment columns by that amount based on the remaining fields in the tuple. Consider the following constructor: CassandraCounterBatchingBolt logPersistenceBolt = new CassandraCounter BatchingBolt("columnFamily", "RowKeyField", "IncrementAmountField" ); And the following tuple as input: {rowKey: 12345, IncrementAmount: 1L, IncrementColumn: 'SomeCounter'} This will increment the SomeCounter counter column by 1L in the columnFamily column family. 这会把 columnFamily 列族中的 SomeCounter 计数器列加 1L。 对于任何有关系型数据库背景的开发人员来说,最关键的观念转变在于了解面向列族数 据库中的数据建模。作为数据库中大数据家族的一分子,面向列族的数据库促进了高度不规 则数据模型的应用。这个方法移除了表之间的关系以及它们的锁,实现了对数据库的大规 模、并行读写的处理。虽然这种方法会增加数据的冗余,但为了满足当今大规模数据处理的 需求,在一些普通磁盘上面的开销其实算不了什么问题。观念的转变在于我们要根据在数据 集上执行的查询来选择适当的数据模型,而不是简单地将现实世界模型转化成固有和统一的 结构。对应当前查询的数据类型可以归纳为:查询给定时间点上所有日志文件的总数 。 这种方法能让我们通过简单地发送 Tuple 来生成统计数据,我们可以使用这种方法来得 到各种问题的答案,下面给出几个例子: ‰ 我们的日志体积随着给定时间周期(可以是日、月或者年)的变化趋势到底是什么样的? ‰ 日志文件中出现频率最高的词干是什么? 列族不只可以包含统计数据,你可以设计任意的不规则结构,并通过发送一 个 Tuple 来表示某一行中任意列的集合。如果该行已经存在, Cassandra 会 简单地添加或者更新数据。这其实可以实现非常强大的功能。 2.7 为日志流集群创建集成测试 在交付过程中,集成测试扮演着非常重要的角色。集成测试的类型有很多。通常来说, 单元集成测试和 Topology 集成测试不仅是持续集成构建周期的组成部分,还是对部署集群 集成测试的必要功能性风格的补充。我们在这里展示的集成测试方法本质上和第 1 章中的集 成测试方法没什么区别,但这里却复杂很多,所以有必要多作些解释。 56   第2章 日志流处理 2.7.1 实战 我们先来创建单元测试。 Step01 使用 Eclipse,在项目的单元测试源代码目录中的 storm.cookbook.log 包里创建一 个名为 IntegrationTestTopology 的 JUnit 4 单元测试用例。添加一个 setup 方法,该方法会在 初始化类之前被调用: Chapter 2 71 A key mind shift for any developer with a relational database background is data modeling in a column family database. Column family databases, as part of the big data family of databases, promote the use of highly denormalized data models. This approach removes table relationships and their locking concerns and enables massive-scale parallel read and write processing on the database. This promotes data duplication; however, given the cost of commodity disks, this is seen as a small sacrifice in order to meet the scaling objectives of today. The mind shift is to think of data models in terms of the queries that we will perform on the dataset, rather than modeling real-world entities into concise normalized structures. The query we are trying to answer with this data model is essentially this: select all total count for all logfiles for a given point in time. This approach allows us to easily derive this data by emitting a tuple to count that total; we can easily emit a tuple to answer any other question, and examples could include the following: f What do my volume trends look like across any given time period, be it day, month, or year? f What are the most popular stems within my logfiles? Column families can contain more than counts, design any denormalized structure, and emit a tuple to represent a set of columns for that row; if the row already exists, the columns will simply be added or updated. This can become extremely powerful. Creating an integration test for the log stream cluster Integration testing is obviously a vital task in the delivery process. There are many types of integration testing. Unit integration testing involves integration testing a topology, typically as part of the continuous integration build cycle, and should be seen as complementary to the necessary functional style of integration testing of a deployed cluster. The integration test presented here is essentially the same as that of the integration test presented in Chapter 1, Setting Up Your Development Environment; however, it is sufficiently complex to warrant an explanation here. How to do it… Start by creating the unit test. 1. Using Eclipse, create a JUnit 4 test case called IntegrationTestTopology under the unit testing source folder of your project in the storm.cookbook.log package. Add a setup method that should be invoked before the class: @BeforeClass public static void setup() throws Exception { Log Stream Processing 72 setupCassandra(); setupElasticSearch(); setupTopology(); } 2. Then create each of the associated setup methods; first set up an embedded version of Cassandra: private static void setupCassandra() throws Exception { cassandra = new EmbeddedCassandra(9171); cassandra.start(); //Allow some time for it to start Thread.sleep(3000); AstyanaxContext clusterContext = new AstyanaxContext.Builder() .forCluster("ClusterName") .withAstyanaxConfiguration( new AstyanaxConfigurationImpl() .setDiscoveryType(NodeDiscoveryType.NONE)) .withConnectionPoolConfiguration( new ConnectionPoolConfigurationImpl("MyConnectio nPool") .setMaxConnsPerHost(1).setSeeds( "localhost:9171")) .withConnectionPoolMonitor(new CountingConnectionPoolMonitor()) .buildCluster(ThriftFamilyFactory.getInstance()); clusterContext.start(); Cluster cluster = clusterContext.getEntity(); KeyspaceDefinition ksDef = cluster.makeKeyspaceDefinition(); Map stratOptions = new HashMap(); stratOptions.put("replication_factor", "1"); ksDef.setName(Conf.LOGGING_KEYSPACE) .setStrategyClass("SimpleStrategy") .setStrategyOptions(stratOptions) .addColumnFamily( cluster.makeColumnFamilyDefinition(). setName(Conf.COUNT_CF_NAME) .setComparatorType("UTF8Type") .setKeyValidationClass("UTF8Type") .setDefaultValidationClass("CounterColumnT ype")); Step02 接下来创建 setup 方法中所有被调用的初始化方法,我们先来初始化嵌入式版本的 Cassandra: Log Stream Processing 72 setupCassandra(); setupElasticSearch(); setupTopology(); } 2. Then create each of the associated setup methods; first set up an embedded version of Cassandra: private static void setupCassandra() throws Exception { cassandra = new EmbeddedCassandra(9171); cassandra.start(); //Allow some time for it to start Thread.sleep(3000); AstyanaxContext clusterContext = new AstyanaxContext.Builder() .forCluster("ClusterName") .withAstyanaxConfiguration( new AstyanaxConfigurationImpl() .setDiscoveryType(NodeDiscoveryType.NONE)) .withConnectionPoolConfiguration( new ConnectionPoolConfigurationImpl("MyConnectio nPool") .setMaxConnsPerHost(1).setSeeds( "localhost:9171")) .withConnectionPoolMonitor(new CountingConnectionPoolMonitor()) .buildCluster(ThriftFamilyFactory.getInstance()); clusterContext.start(); Cluster cluster = clusterContext.getEntity(); KeyspaceDefinition ksDef = cluster.makeKeyspaceDefinition(); Map stratOptions = new HashMap(); stratOptions.put("replication_factor", "1"); ksDef.setName(Conf.LOGGING_KEYSPACE) .setStrategyClass("SimpleStrategy") .setStrategyOptions(stratOptions) .addColumnFamily( cluster.makeColumnFamilyDefinition(). setName(Conf.COUNT_CF_NAME) .setComparatorType("UTF8Type") .setKeyValidationClass("UTF8Type") .setDefaultValidationClass("CounterColumnT ype")); // 等待一段时间来启动 2.7 为日志流集群创建集成测试   57 Log Stream Processing 72 setupCassandra(); setupElasticSearch(); setupTopology(); } 2. Then create each of the associated setup methods; first set up an embedded version of Cassandra: private static void setupCassandra() throws Exception { cassandra = new EmbeddedCassandra(9171); cassandra.start(); //Allow some time for it to start Thread.sleep(3000); AstyanaxContext clusterContext = new AstyanaxContext.Builder() .forCluster("ClusterName") .withAstyanaxConfiguration( new AstyanaxConfigurationImpl() .setDiscoveryType(NodeDiscoveryType.NONE)) .withConnectionPoolConfiguration( new ConnectionPoolConfigurationImpl("MyConnectio nPool") .setMaxConnsPerHost(1).setSeeds( "localhost:9171")) .withConnectionPoolMonitor(new CountingConnectionPoolMonitor()) .buildCluster(ThriftFamilyFactory.getInstance()); clusterContext.start(); Cluster cluster = clusterContext.getEntity(); KeyspaceDefinition ksDef = cluster.makeKeyspaceDefinition(); Map stratOptions = new HashMap(); stratOptions.put("replication_factor", "1"); ksDef.setName(Conf.LOGGING_KEYSPACE) .setStrategyClass("SimpleStrategy") .setStrategyOptions(stratOptions) .addColumnFamily( cluster.makeColumnFamilyDefinition(). setName(Conf.COUNT_CF_NAME) .setComparatorType("UTF8Type") .setKeyValidationClass("UTF8Type") .setDefaultValidationClass("CounterColumnT ype")); Chapter 2 73 cluster.addKeyspace(ksDef); } 3. Then set up a local, embedded instance of Elastic Search: private static void setupElasticSearch() throws Exception { Node node = NodeBuilder.nodeBuilder().local(true).node(); client = node.client(); //allow time for the node to be available Thread.sleep(5000); } 4. Finally, set up the actual topology to be tested: private static void setupTopology() { // We want all output tuples coming to the mock for // testing purposes topology.getBuilder().setBolt("testBolt", testBolt, 1) .globalGrouping("indexer"); // run in local mode, but we will shut the cluster // down when we are finished topology.runLocal(0); // jedis required for input and output of the cluster jedis = new Jedis("localhost", Integer.parseInt(Conf.DEFAULT_JEDIS_PORT)); jedis.connect(); jedis.flushDB(); // give it some time to startup before running the // tests. Utils.sleep(5000); } 5. This will set up the fixtures we require in order to test our topology; we also need to shut these down gracefully at the end of the test, so add the AfterClass method for the test suite: @AfterClass public static void shutDown() { topology.shutDownLocal(); jedis.disconnect(); client.close(); cassandra.stop(); } Step03 然后初始化本地嵌入式 Elastic Search 实例: Chapter 2 73 cluster.addKeyspace(ksDef); } 3. Then set up a local, embedded instance of Elastic Search: private static void setupElasticSearch() throws Exception { Node node = NodeBuilder.nodeBuilder().local(true).node(); client = node.client(); //allow time for the node to be available Thread.sleep(5000); } 4. Finally, set up the actual topology to be tested: private static void setupTopology() { // We want all output tuples coming to the mock for // testing purposes topology.getBuilder().setBolt("testBolt", testBolt, 1) .globalGrouping("indexer"); // run in local mode, but we will shut the cluster // down when we are finished topology.runLocal(0); // jedis required for input and output of the cluster jedis = new Jedis("localhost", Integer.parseInt(Conf.DEFAULT_JEDIS_PORT)); jedis.connect(); jedis.flushDB(); // give it some time to startup before running the // tests. Utils.sleep(5000); } 5. This will set up the fixtures we require in order to test our topology; we also need to shut these down gracefully at the end of the test, so add the AfterClass method for the test suite: @AfterClass public static void shutDown() { topology.shutDownLocal(); jedis.disconnect(); client.close(); cassandra.stop(); } Step04 最后,初始化需要测试的 Topology: Chapter 2 73 cluster.addKeyspace(ksDef); } 3. Then set up a local, embedded instance of Elastic Search: private static void setupElasticSearch() throws Exception { Node node = NodeBuilder.nodeBuilder().local(true).node(); client = node.client(); //allow time for the node to be available Thread.sleep(5000); } 4. Finally, set up the actual topology to be tested: private static void setupTopology() { // We want all output tuples coming to the mock for // testing purposes topology.getBuilder().setBolt("testBolt", testBolt, 1) .globalGrouping("indexer"); // run in local mode, but we will shut the cluster // down when we are finished topology.runLocal(0); // jedis required for input and output of the cluster jedis = new Jedis("localhost", Integer.parseInt(Conf.DEFAULT_JEDIS_PORT)); jedis.connect(); jedis.flushDB(); // give it some time to startup before running the // tests. Utils.sleep(5000); } 5. This will set up the fixtures we require in order to test our topology; we also need to shut these down gracefully at the end of the test, so add the AfterClass method for the test suite: @AfterClass public static void shutDown() { topology.shutDownLocal(); jedis.disconnect(); client.close(); cassandra.stop(); } Step05 这样我们就准备好了所有测试 Topology 所需的工具。另外,我们还需要在测试结 束时销毁这些对象和结构体,在测试套件中增加一个 AfterClass 方法: Chapter 2 73 cluster.addKeyspace(ksDef); } 3. Then set up a local, embedded instance of Elastic Search: private static void setupElasticSearch() throws Exception { Node node = NodeBuilder.nodeBuilder().local(true).node(); client = node.client(); //allow time for the node to be available Thread.sleep(5000); } 4. Finally, set up the actual topology to be tested: private static void setupTopology() { // We want all output tuples coming to the mock for // testing purposes topology.getBuilder().setBolt("testBolt", testBolt, 1) .globalGrouping("indexer"); // run in local mode, but we will shut the cluster // down when we are finished topology.runLocal(0); // jedis required for input and output of the cluster jedis = new Jedis("localhost", Integer.parseInt(Conf.DEFAULT_JEDIS_PORT)); jedis.connect(); jedis.flushDB(); // give it some time to startup before running the // tests. Utils.sleep(5000); } 5. This will set up the fixtures we require in order to test our topology; we also need to shut these down gracefully at the end of the test, so add the AfterClass method for the test suite: @AfterClass public static void shutDown() { topology.shutDownLocal(); jedis.disconnect(); client.close(); cassandra.stop(); } Step06 然后实现好测试用例就可以大功告成了: // 等待一段时间,确保节点启动完成 // 我们期望所有输出 Tuple 都可以触发执行 Mock 方法,确保达到 测试目的 // 本地模式下运行,结束时关闭集群 // 在运行测试前停顿几秒,确保初始化完成 // 将 jedis 作为集群的输入和输出 58   第2章 日志流处理 Log Stream Processing 74 6. Finish off by implementing the actual test case: @Test public void inputOutputClusterTest() throws Exception { String testData = UnitTestUtils.readFile("/testData1.json"); jedis.rpush("log", testData); LogEntry entry = UnitTestUtils.getEntry(); long minute = VolumeCountingBolt.getMinuteForTime(entry. getTimestamp()); Utils.sleep(6000); String id = jedis.rpop(REDIS_CHANNEL); assertNotNull(id); // Check that the indexing working GetResponse response = client .prepareGet(IndexerBolt.INDEX_NAME, IndexerBolt.INDEX_TYPE, id).execute().actionGet(); assertTrue(response.exists()); // now check that count has been updated in cassandra AstyanaxContext astyContext = new AstyanaxContext .Builder() .forCluster("ClusterName") .forKeyspace(Conf.LOGGING_KEYSPACE) .withAstyanaxConfiguration( new AstyanaxConfigurationImpl() .setDiscoveryType(NodeDiscoveryType.NONE)) .withConnectionPoolConfiguration( new ConnectionPoolConfigurationImpl( "MyConnectionPool") .setMaxConnsPerHost(1).setSeeds( "localhost:9171")) .withConnectionPoolMonitor( new CountingConnectionPoolMonitor()) .buildKeyspace(ThriftFamilyFactory.getInstance()); astyContext.start(); Keyspace ks = astyContext.getEntity(); Column result = ks.prepareQuery( new ColumnFamily( Conf.COUNT_CF_NAME, StringSerializer.get(), StringSerializer.get())) Chapter 2 75 .getKey(Long.toString(minute)).getColumn(entry. getSource()) .execute().getResult(); assertEquals(1L, result.getLongValue()); } How it works… This test case works by creating embedded instances of the required clusters for this topology, namely Cassandra and Elastic Search. As with the previous integration test, it then injects test data into the input channel and allows the log entry to flow through the topology, after which it validates the entry in the search engine and validates that the counter has incremented appropriately. This test will take longer to run than a standard unit test and therefore should not be included in your standard Maven build. The test should, however, be used as part of your local development workflow and validated further on a continuous integration server. Creating a log analytics dashboard The log analytics dashboard is a web application that presents aggregated data to the user, typically in a graphical manner. For achieving this, we must take cognizance of the following user interface design principles: f Laser focus: This only shows what is required, creates a focal point based on what the user is trying to achieve, and doesn't detract from it with unnecessary clutter f Minimalistic: This only incorporates required graphical features based on the usability concerns f Responsive: This is a design approach that ensures that the display is clear and consistent regardless of the device it is viewed on, be it a PC or a tablet f Standards based: This means that you shouldn't use any vendor-specific technologies that would preclude the viewing of the dashboard on devices such as the iPad 2.7.2 解析 该测试用例通过创建嵌入式 Topology 集群实例来实现集成测试功能,其中两个实例分别 叫做 Cassandra 和 Elastic Search。然后我们所做的工作就与之前的集成测试一样,将测试数 据添加到输入通道,通过 Topology 来处理这些日志记录,并验证搜索引擎中的记录,以及 // 检查索引功能是否工作正常 // 检查 Cassandra 中的计数更新是否正常 Chapter 2 77 How to do it… 1. Create a new project called log-web using the standard Maven archetype command: mvn archetype:generate -DgroupId=storm.cookbook -DartifactId=log- web -DarchetypeArtifactId=maven-archetype-webapp This will generate a standard project structure and Maven POM file for you. 2. Open the pom.xml file and remove the default dependencies, replacing them with the following dependencies: junit junit 4.8.1 test org.hectorclient hector-core 1.1-2 org.slf4j slf4j-log4j12 1.6.1 com.sun.jersey jersey-server 1.16 com.sun.jersey jersey-grizzly2 1.16 com.sun.jersey jersey-servlet 1.16 com.sun.jersey Chapter 2 77 How to do it… 1. Create a new project called log-web using the standard Maven archetype command: mvn archetype:generate -DgroupId=storm.cookbook -DartifactId=log- web -DarchetypeArtifactId=maven-archetype-webapp This will generate a standard project structure and Maven POM file for you. 2. Open the pom.xml file and remove the default dependencies, replacing them with the following dependencies: junit junit 4.8.1 test org.hectorclient hector-core 1.1-2 org.slf4j slf4j-log4j12 1.6.1 com.sun.jersey jersey-server 1.16 com.sun.jersey jersey-grizzly2 1.16 com.sun.jersey jersey-servlet 1.16 com.sun.jersey 2.8 创建日志分析面板   61 Chapter 2 77 How to do it… 1. Create a new project called log-web using the standard Maven archetype command: mvn archetype:generate -DgroupId=storm.cookbook -DartifactId=log- web -DarchetypeArtifactId=maven-archetype-webapp This will generate a standard project structure and Maven POM file for you. 2. Open the pom.xml file and remove the default dependencies, replacing them with the following dependencies: junit junit 4.8.1 test org.hectorclient hector-core 1.1-2 org.slf4j slf4j-log4j12 1.6.1 com.sun.jersey jersey-server 1.16 com.sun.jersey jersey-grizzly2 1.16 com.sun.jersey jersey-servlet 1.16 com.sun.jersey Log Stream Processing 78 jersey-json 1.16 com.sun.jersey.contribs jersey-multipart 1.16 org.jmock jmock-junit4 2.5.1 test com.googlecode.json-simple json-simple 1.1 3. Then add the following build plugins to the build section of the POM: org.mortbay.jetty jetty-maven-plugin org.codehaus.mojo exec-maven-plugin java maven-compiler-plugin 2.3 1.6 1.6 true Step03 然后在 POM 中的 build 区域增加以下构建插件: Log Stream Processing 78 jersey-json 1.16 com.sun.jersey.contribs jersey-multipart 1.16 org.jmock jmock-junit4 2.5.1 test com.googlecode.json-simple json-simple 1.1 3. Then add the following build plugins to the build section of the POM: org.mortbay.jetty jetty-maven-plugin org.codehaus.mojo exec-maven-plugin java maven-compiler-plugin 2.3 1.6 1.6 true 62   第2章 日志流处理 Log Stream Processing 78 jersey-json 1.16 com.sun.jersey.contribs jersey-multipart 1.16 org.jmock jmock-junit4 2.5.1 test com.googlecode.json-simple json-simple 1.1 3. Then add the following build plugins to the build section of the POM: org.mortbay.jetty jetty-maven-plugin org.codehaus.mojo exec-maven-plugin java maven-compiler-plugin 2.3 1.6 1.6 true Chapter 2 79 true true org.codehaus.mojo cassandra-maven-plugin 4. Then import the project into Eclipse using the mvn eclipse:eclipse command and the Eclipse project import process. 5. The excellent Twitter Bootstrap GUI library will be used in the creation of the user interface. Start by downloading this into a separate location on your drive and expanding it. wget http://twitter.github.com/bootstrap/assets/bootstrap.zip unzip boostrap.zip 6. The bootstrap gives us a rapid start by providing many practical examples; we will simply copy one and adapt it: cp bootstrap/docs/examples/hero.html log-web/src/main/webapp/ index.html cp bootstrap/docs/about log-web/src/main/webapp/about.html cp boostrap/docs/assets log-web/src/main/webapp/ cp boostrap/docs/assets log-web/src/main/webapp/ cp boostrap/docs/templates log-web/src/main/webapp/ The Twitter Bootstrap is really quite an excellent departure point for any web-based GUI; it is highly recommended that you read the self-contained documentation in the downloaded package. 7. While there is much HTML to update, we will focus on the important elements: the central content and graph. Update the index.html file, replacing the existing
tag and its contents with the following:
Step04 通过 mvn eclipse:eclipse 命令和 Eclipse 项目导入功能把项目文件导入 Eclipse。 Step05 我们会使用优秀的 Twitter Bootstrap GUI 库来创建用户界面。首先将这个库下载到 你的独立磁盘分区下,然后将其解压缩。 Chapter 2 79 true true org.codehaus.mojo cassandra-maven-plugin 4. Then import the project into Eclipse using the mvn eclipse:eclipse command and the Eclipse project import process. 5. The excellent Twitter Bootstrap GUI library will be used in the creation of the user interface. Start by downloading this into a separate location on your drive and expanding it. wget http://twitter.github.com/bootstrap/assets/bootstrap.zip unzip boostrap.zip 6. The bootstrap gives us a rapid start by providing many practical examples; we will simply copy one and adapt it: cp bootstrap/docs/examples/hero.html log-web/src/main/webapp/ index.html cp bootstrap/docs/about log-web/src/main/webapp/about.html cp boostrap/docs/assets log-web/src/main/webapp/ cp boostrap/docs/assets log-web/src/main/webapp/ cp boostrap/docs/templates log-web/src/main/webapp/ The Twitter Bootstrap is really quite an excellent departure point for any web-based GUI; it is highly recommended that you read the self-contained documentation in the downloaded package. 7. While there is much HTML to update, we will focus on the important elements: the central content and graph. Update the index.html file, replacing the existing
tag and its contents with the following:
Step06 Bootstrap 提供了许多实例,这有助于我们快速掌握其基本要领。我们先简单复制 一份示例,然后再修改它: Chapter 2 79 true true org.codehaus.mojo cassandra-maven-plugin 4. Then import the project into Eclipse using the mvn eclipse:eclipse command and the Eclipse project import process. 5. The excellent Twitter Bootstrap GUI library will be used in the creation of the user interface. Start by downloading this into a separate location on your drive and expanding it. wget http://twitter.github.com/bootstrap/assets/bootstrap.zip unzip boostrap.zip 6. The bootstrap gives us a rapid start by providing many practical examples; we will simply copy one and adapt it: cp bootstrap/docs/examples/hero.html log-web/src/main/webapp/ index.html cp bootstrap/docs/about log-web/src/main/webapp/about.html cp boostrap/docs/assets log-web/src/main/webapp/ cp boostrap/docs/assets log-web/src/main/webapp/ cp boostrap/docs/templates log-web/src/main/webapp/ The Twitter Bootstrap is really quite an excellent departure point for any web-based GUI; it is highly recommended that you read the self-contained documentation in the downloaded package. 7. While there is much HTML to update, we will focus on the important elements: the central content and graph. Update the index.html file, replacing the existing
tag and its contents with the following:
对于开发任何基于 Web 图形用户界面的程序来说, Twitter Bootstrap 都是一 个非常好的选择。强烈建议你读一读刚才下载的压缩包中的文档,它可能会 给你带来很大的帮助。 Step07 有不少 HTML 都需要修改,我们先把重点放在几项关键内容上面:核心展示内容和 图形。先更新 index.html 文件,用以下代码替换现有的
标签和内容: 2.8 创建日志分析面板   63 Chapter 2 79 true true org.codehaus.mojo cassandra-maven-plugin 4. Then import the project into Eclipse using the mvn eclipse:eclipse command and the Eclipse project import process. 5. The excellent Twitter Bootstrap GUI library will be used in the creation of the user interface. Start by downloading this into a separate location on your drive and expanding it. wget http://twitter.github.com/bootstrap/assets/bootstrap.zip unzip boostrap.zip 6. The bootstrap gives us a rapid start by providing many practical examples; we will simply copy one and adapt it: cp bootstrap/docs/examples/hero.html log-web/src/main/webapp/ index.html cp bootstrap/docs/about log-web/src/main/webapp/about.html cp boostrap/docs/assets log-web/src/main/webapp/ cp boostrap/docs/assets log-web/src/main/webapp/ cp boostrap/docs/templates log-web/src/main/webapp/ The Twitter Bootstrap is really quite an excellent departure point for any web-based GUI; it is highly recommended that you read the self-contained documentation in the downloaded package. 7. While there is much HTML to update, we will focus on the important elements: the central content and graph. Update the index.html file, replacing the existing
tag and its contents with the following:
Log Stream Processing 80

Timeseries

This graph shows a view of the log volumes of a given time period by day

8. For the graph, we will use the excellent data-visualization library, D3 (http://d3js. org/), and some preconfigured models based on D3, called NVD3 (http://nvd3. org/), by adding their compiled JavaScript into our webapp's assets folder: wget https://github.com/novus/nvd3/zipball/master unzip novus-nvd3-4e12985.zip cp novus-nvd3-4e12985/nv.d3.js log-web/src/main/webapp/assets/js/ cp novus-nvd3-4e12985/lib/d3.v2.js log-web/src/main/webapp/assets/ js/ cp novus-nvd3-4e12985/src/nv.d3.css log-web/src/main/webapp/ assets/css/ 9. Next, we include these into the HTML file and write the client-side JavaScript to retrieve the data and update the graph. 10. Add the following script includes at the bottom of the HTML file, after the other 11. And the CSS imports in the html header: 12. Then add our custom JavaScript into a tag below the other script imports, towards the bottom of the file: var chart; var continueUpdates = true; nv.addGraph(function () { chart = nv.models.stackedAreaChart() .x(function(d) { return d[0] }) .y(function(d) { return d[1] }) .clipEdge(true); chart.xAxis .tickFormat(function(d) { return d3.time. format('%X')(new Date(d)) }) Step08 对于图形来说,我们准备使用一款优秀的数据可视化库 —D3(http://d3js.org/), 以及一些基于 D3 预先定义好的模型—NVD3(http://nvd3.org/),将这些已经编译好的 JavaScript 文件添加到我们的 Web 应用程序的 assets 目录中: Log Stream Processing 80

Timeseries

This graph shows a view of the log volumes of a given time period by day

8. For the graph, we will use the excellent data-visualization library, D3 (http://d3js. org/), and some preconfigured models based on D3, called NVD3 (http://nvd3. org/), by adding their compiled JavaScript into our webapp's assets folder: wget https://github.com/novus/nvd3/zipball/master unzip novus-nvd3-4e12985.zip cp novus-nvd3-4e12985/nv.d3.js log-web/src/main/webapp/assets/js/ cp novus-nvd3-4e12985/lib/d3.v2.js log-web/src/main/webapp/assets/ js/ cp novus-nvd3-4e12985/src/nv.d3.css log-web/src/main/webapp/ assets/css/ 9. Next, we include these into the HTML file and write the client-side JavaScript to retrieve the data and update the graph. 10. Add the following script includes at the bottom of the HTML file, after the other 11. And the CSS imports in the html header: 12. Then add our custom JavaScript into a tag below the other script imports, towards the bottom of the file: var chart; var continueUpdates = true; nv.addGraph(function () { chart = nv.models.stackedAreaChart() .x(function(d) { return d[0] }) .y(function(d) { return d[1] }) .clipEdge(true); chart.xAxis .tickFormat(function(d) { return d3.time. format('%X')(new Date(d)) }) Step09 然后在 HTML 文件中包含这些 JavaScript 文件,并编写用于获取数据和更新图形 的客户端 Javascript 代码。 Step10 在 HTML 文件末尾的 11. And the CSS imports in the html header: 12. Then add our custom JavaScript into a tag below the other script imports, towards the bottom of the file: var chart; var continueUpdates = true; nv.addGraph(function () { chart = nv.models.stackedAreaChart() .x(function(d) { return d[0] }) .y(function(d) { return d[1] }) .clipEdge(true); chart.xAxis .tickFormat(function(d) { return d3.time. format('%X')(new Date(d)) }) Step11 在 html 头部增加 CSS 样式表文件: Log Stream Processing 80

Timeseries

This graph shows a view of the log volumes of a given time period by day

8. For the graph, we will use the excellent data-visualization library, D3 ( http://d3js. org/), and some preconfigured models based on D3, called NVD3 (http://nvd3. org/), by adding their compiled JavaScript into our webapp's assets folder: wget https://github.com/novus/nvd3/zipball/master unzip novus-nvd3-4e12985.zip cp novus-nvd3-4e12985/nv.d3.js log-web/src/main/webapp/assets/js/ cp novus-nvd3-4e12985/lib/d3.v2.js log-web/src/main/webapp/assets/ js/ cp novus-nvd3-4e12985/src/nv.d3.css log-web/src/main/webapp/ assets/css/ 9. Next, we include these into the HTML file and write the client-side JavaScript to retrieve the data and update the graph. 10. Add the following script includes at the bottom of the HTML file, after the other 11. And the CSS imports in the html header: 12. Then add our custom JavaScript into a tag below the other script imports, towards the bottom of the file: var chart; var continueUpdates = true; nv.addGraph(function () { chart = nv.models.stackedAreaChart() .x(function(d) { return d[0] }) .y(function(d) { return d[1] }) .clipEdge(true); chart.xAxis .tickFormat(function(d) { return d3.time. format('%X')(new Date(d)) }) css"> Step12 然后在其他导入脚本的代码之后,靠近文件末尾的位置,将以下自定义 JavaScript 代码添加到 标签中: Log Stream Processing 80

Timeseries

This graph shows a view of the log volumes of a given time period by day

8. For the graph, we will use the excellent data-visualization library, D3 (http://d3js. org/), and some preconfigured models based on D3, called NVD3 (http://nvd3. org/), by adding their compiled JavaScript into our webapp's assets folder: wget https://github.com/novus/nvd3/zipball/master unzip novus-nvd3-4e12985.zip cp novus-nvd3-4e12985/nv.d3.js log-web/src/main/webapp/assets/js/ cp novus-nvd3-4e12985/lib/d3.v2.js log-web/src/main/webapp/assets/ js/ cp novus-nvd3-4e12985/src/nv.d3.css log-web/src/main/webapp/ assets/css/ 9. Next, we include these into the HTML file and write the client-side JavaScript to retrieve the data and update the graph. 10. Add the following script includes at the bottom of the HTML file, after the other 11. And the CSS imports in the html header: 12. Then add our custom JavaScript into a tag below the other script imports, towards the bottom of the file: var chart; var continueUpdates = true; nv.addGraph(function () { chart = nv.models.stackedAreaChart() .x(function(d) { return d[0] }) .y(function(d) { return d[1] }) .clipEdge(true); chart.xAxis .tickFormat(function(d) { return d3.time. format('%X')(new Date(d)) }) 64   第2章 日志流处理 Log Stream Processing 80

Timeseries

This graph shows a view of the log volumes of a given time period by day

8. For the graph, we will use the excellent data-visualization library, D3 (http://d3js. org/), and some preconfigured models based on D3, called NVD3 (http://nvd3. org/), by adding their compiled JavaScript into our webapp's assets folder: wget https://github.com/novus/nvd3/zipball/master unzip novus-nvd3-4e12985.zip cp novus-nvd3-4e12985/nv.d3.js log-web/src/main/webapp/assets/js/ cp novus-nvd3-4e12985/lib/d3.v2.js log-web/src/main/webapp/assets/ js/ cp novus-nvd3-4e12985/src/nv.d3.css log-web/src/main/webapp/ assets/css/ 9. Next, we include these into the HTML file and write the client-side JavaScript to retrieve the data and update the graph. 10. Add the following script includes at the bottom of the HTML file, after the other 11. And the CSS imports in the html header: 12. Then add our custom JavaScript into a tag below the other script imports, towards the bottom of the file: var chart; var continueUpdates = true; nv.addGraph(function () { chart = nv.models.stackedAreaChart() .x(function(d) { return d[0] }) .y(function(d) { return d[1] }) .clipEdge(true); chart.xAxis .tickFormat(function(d) { return d3.time. format('%X')(new Date(d)) }) Chapter 2 81 .axisLabel('Time') .showMaxMin(false); chart.yAxis .axisLabel('Volume') .tickFormat(d3.format(',.2f')); d3.select('#chart svg') .datum(getdata()) .transition().duration(500) .call(chart); nv.utils.windowResize(chart.update); chart.dispatch.on('stateChange', function (e) { nv.log('New State:', JSON.stringify(e)); }); return chart; }); function update() { fetch(); if (continueUpdates) setTimeout(update, 60000); } update(); $(document).ready(function () { $('#updateToggleButton').bind('click', function () { if (continueUpdates) { continueUpdates = false; } else { continueUpdates = true; update(); } }); }); 13. And then add the code to fetch the data from the server: var alreadyFetched = {}; function getUrl(){ var today = new Date(); today.setSeconds(0); Step13 添加获取服务器数据的代码: Chapter 2 81 .axisLabel('Time') .showMaxMin(false); chart.yAxis .axisLabel('Volume') .tickFormat(d3.format(',.2f')); d3.select('#chart svg') .datum(getdata()) .transition().duration(500) .call(chart); nv.utils.windowResize(chart.update); chart.dispatch.on('stateChange', function (e) { nv.log('New State:', JSON.stringify(e)); }); return chart; }); function update() { fetch(); if (continueUpdates) setTimeout(update, 60000); } update(); $(document).ready(function () { $('#updateToggleButton').bind('click', function () { if (continueUpdates) { continueUpdates = false; } else { continueUpdates = true; update(); } }); }); 13. And then add the code to fetch the data from the server: var alreadyFetched = {}; function getUrl(){ var today = new Date(); today.setSeconds(0);Log Stream Processing 82 today.setMilliseconds(0); var timestamp = today.valueOf(); var dataurl = "http://localhost:8080/services/LogCount/ TotalsForMinute/" + timestamp + "/"; return dataurl; } function fetch() { // find the URL in the link right next to us var dataurl = getUrl(); // then fetch the data with jQuery function onDataReceived(series) { // append to the existing data for(var i = 0; i < series.length; i++){ if(alreadyFetched[series[i].FileName] == null){ alreadyFetched[series[i].FileName] = { FileName: series[i].FileName, values: [{ Minute: series[i].Minute, Total: series[i].Total }] }; } else { alreadyFetched[series[i].FileName].values.push({ Minute: series[i].Minute, Total: series[i].Total }); if(alreadyFetched[series[i].FileName].values. length > 30){ alreadyFetched[series[i].FileName].values. pop(); } } } //update the graph d3.select('#chart svg') .datum(getdata()) .transition().duration(500) .call(chart); } function onError(request, status, error){ console.log("Received Error from AJAX: " + request.responseText); 2.8 创建日志分析面板   65 Log Stream Processing 82 today.setMilliseconds(0); var timestamp = today.valueOf(); var dataurl = "http://localhost:8080/services/LogCount/ TotalsForMinute/" + timestamp + "/"; return dataurl; } function fetch() { // find the URL in the link right next to us var dataurl = getUrl(); // then fetch the data with jQuery function onDataReceived(series) { // append to the existing data for(var i = 0; i < series.length; i++){ if(alreadyFetched[series[i].FileName] == null){ alreadyFetched[series[i].FileName] = { FileName: series[i].FileName, values: [{ Minute: series[i].Minute, Total: series[i].Total }] }; } else { alreadyFetched[series[i].FileName].values.push({ Minute: series[i].Minute, Total: series[i].Total }); if(alreadyFetched[series[i].FileName].values. length > 30){ alreadyFetched[series[i].FileName].values. pop(); } } } //update the graph d3.select('#chart svg') .datum(getdata()) .transition().duration(500) .call(chart); } function onError(request, status, error){ console.log("Received Error from AJAX: " + request.responseText); Chapter 2 83 } $.ajax({ url:dataurl, type:'GET', dataType:'json', crossDomain: true, xhrFields: { withCredentials: true }, success:onDataReceived, error:onError }); } function getdata(){ var series = []; var keys = []; for (key in alreadyFetched) { keys.push(key); } for(var i = 0; i < keys.length; i++){ var newValues = []; for(var j = 0; j < alreadyFetched[keys[i]].values. length;j++){ newValues.push([alreadyFetched[keys[i]].values[j]. Minute, alreadyFetched[keys[i]].values[j].Total]); } series.push({ key:alreadyFetched[keys[i]].FileName, values:newValues }); } return series; } 14. This completes the client-side part of the implementation. In order to expose the data to the client layer, we need to expose services to retrieve the data. 15. Start by creating a utility class called CassandraUtils in the storm.cookbook. services.resources package and add the following content: public class CassandraUtils { public static Cluster cluster; public static Keyspace keyspace; // 通过上面定义的函数来获取 URL // 通过 jQuery 获取数据 // 添加到现有数据中 // 更新图形 66   第2章 日志流处理 Chapter 2 83 } $.ajax({ url:dataurl, type:'GET', dataType:'json', crossDomain: true, xhrFields: { withCredentials: true }, success:onDataReceived, error:onError }); } function getdata(){ var series = []; var keys = []; for (key in alreadyFetched) { keys.push(key); } for(var i = 0; i < keys.length; i++){ var newValues = []; for(var j = 0; j < alreadyFetched[keys[i]].values. length;j++){ newValues.push([alreadyFetched[keys[i]].values[j]. Minute, alreadyFetched[keys[i]].values[j].Total]); } series.push({ key:alreadyFetched[keys[i]].FileName, values:newValues }); } return series; } 14. This completes the client-side part of the implementation. In order to expose the data to the client layer, we need to expose services to retrieve the data. 15. Start by creating a utility class called CassandraUtils in the storm.cookbook. services.resources package and add the following content: public class CassandraUtils { public static Cluster cluster; public static Keyspace keyspace; Step14 至此,客户端的实现大功告成。为了让客户层能够访问到这些数据,我们还需要 为其提供获取数据的服务。 Step15 首先,在 storm.cookbook.services.resources 包中创建一个工具类 CassandraUtils, 并添加以下内容: Chapter 2 83 } $.ajax({ url:dataurl, type:'GET', dataType:'json', crossDomain: true, xhrFields: { withCredentials: true }, success:onDataReceived, error:onError }); } function getdata(){ var series = []; var keys = []; for (key in alreadyFetched) { keys.push(key); } for(var i = 0; i < keys.length; i++){ var newValues = []; for(var j = 0; j < alreadyFetched[keys[i]].values. length;j++){ newValues.push([alreadyFetched[keys[i]].values[j]. Minute, alreadyFetched[keys[i]].values[j].Total]); } series.push({ key:alreadyFetched[keys[i]].FileName, values:newValues }); } return series; } 14. This completes the client-side part of the implementation. In order to expose the data to the client layer, we need to expose services to retrieve the data. 15. Start by creating a utility class called CassandraUtils in the storm.cookbook. services.resources package and add the following content: public class CassandraUtils { public static Cluster cluster; public static Keyspace keyspace;Log Stream Processing 84 protected static Properties properties; public static boolean initCassandra(){ properties = new Properties(); try { properties.load(Main.class .getResourceAsStream("/cassandra.properties")); } catch (IOException ioe) { ioe.printStackTrace(); } cluster = HFactory.getOrCreateCluster(properties. getProperty("cluster.name", "DefaultCluster"), properties.getProperty ("cluster.hosts", "127.0.0.1:9160")); ConfigurableConsistencyLevel ccl = new ConfigurableConsistencyLevel(); ccl.setDefaultReadConsistencyLevel (HConsistencyLevel.ONE); String keyspaceName = properties.getProperty( "logging.keyspace", "Logging"); keyspace = HFactory.createKeyspace( keyspaceName, cluster, ccl); return (cluster.describeKeyspace( keyspaceName) != null); } } 16. Then create the LogCount class in the same package, which essentially exposes a RESTful lookup service: @Path("/LogCount") public class LogCount { @GET @Path("/TotalsForMinute/{timestamp}") @Produces("application/json") public String getMinuteTotals(@PathParam("timestamp") String timestamp){ SliceCounterQuery query = HFactory.createCounterSliceQuery( CassandraUtils.keyspace, StringSerializer.get(), StringSerializer.get()); 2.8 创建日志分析面板   67 Log Stream Processing 84 protected static Properties properties; public static boolean initCassandra(){ properties = new Properties(); try { properties.load(Main.class .getResourceAsStream("/cassandra.properties")); } catch (IOException ioe) { ioe.printStackTrace(); } cluster = HFactory.getOrCreateCluster(properties. getProperty("cluster.name", "DefaultCluster"), properties.getProperty ("cluster.hosts", "127.0.0.1:9160")); ConfigurableConsistencyLevel ccl = new ConfigurableConsistencyLevel(); ccl.setDefaultReadConsistencyLevel (HConsistencyLevel.ONE); String keyspaceName = properties.getProperty( "logging.keyspace", "Logging"); keyspace = HFactory.createKeyspace( keyspaceName, cluster, ccl); return (cluster.describeKeyspace( keyspaceName) != null); } } 16. Then create the LogCount class in the same package, which essentially exposes a RESTful lookup service: @Path("/LogCount") public class LogCount { @GET @Path("/TotalsForMinute/{timestamp}") @Produces("application/json") public String getMinuteTotals(@PathParam("timestamp") String timestamp){ SliceCounterQuery query = HFactory.createCounterSliceQuery( CassandraUtils.keyspace, StringSerializer.get(), StringSerializer.get()); Step16 在同一包中创建 LogCount 类,用于提供 RESTful 查询服务: Log Stream Processing 84 protected static Properties properties; public static boolean initCassandra(){ properties = new Properties(); try { properties.load(Main.class .getResourceAsStream("/cassandra.properties")); } catch (IOException ioe) { ioe.printStackTrace(); } cluster = HFactory.getOrCreateCluster(properties. getProperty("cluster.name", "DefaultCluster"), properties.getProperty ("cluster.hosts", "127.0.0.1:9160")); ConfigurableConsistencyLevel ccl = new ConfigurableConsistencyLevel(); ccl.setDefaultReadConsistencyLevel (HConsistencyLevel.ONE); String keyspaceName = properties.getProperty( "logging.keyspace", "Logging"); keyspace = HFactory.createKeyspace( keyspaceName, cluster, ccl); return (cluster.describeKeyspace( keyspaceName) != null); } } 16. Then create the LogCount class in the same package, which essentially exposes a RESTful lookup service: @Path("/LogCount") public class LogCount { @GET @Path("/TotalsForMinute/{timestamp}") @Produces("application/json") public String getMinuteTotals(@PathParam("timestamp") String timestamp){ SliceCounterQuery query = HFactory.createCounterSliceQuery( CassandraUtils.keyspace, StringSerializer.get(), StringSerializer.get()); Chapter 2 85 query.setColumnFamily("LogVolumeByMinute"); query.setKey(timestamp); query.setRange("", "", false, 100); QueryResult> result = query.execute(); Iterator> it = result.get().getColumns().iterator(); JSONArray content = new JSONArray(); while (it.hasNext()) { HCounterColumn column = it.next(); JSONObject fileObject = new JSONObject(); fileObject.put("FileName", column.getName()); fileObject.put("Total", column.getValue()); fileObject.put("Minute", Long.parseLong(timestamp)); content.add(fileObject); } return content.toJSONString(); } } 17. Finally, you expose the service by creating the LogServices class: @ApplicationPath("/") public class LogServices extends Application { public LogServices(){ CassandraUtils.initCassandra(); } @Override public Set> getClasses() { final Set> classes = new HashSet>(); // register root resource classes.add(LogCount.class); return classes; } } 18. Then configure the web.xml file: Log-Web storm.cookbook.services.LogServices 68   第2章 日志流处理 Step17 最后,通过创建 LogServices 类来提供相关数据服务: Chapter 2 85 query.setColumnFamily("LogVolumeByMinute"); query.setKey(timestamp); query.setRange("", "", false, 100); QueryResult> result = query.execute(); Iterator> it = result.get().getColumns().iterator(); JSONArray content = new JSONArray(); while (it.hasNext()) { HCounterColumn column = it.next(); JSONObject fileObject = new JSONObject(); fileObject.put("FileName", column.getName()); fileObject.put("Total", column.getValue()); fileObject.put("Minute", Long.parseLong(timestamp)); content.add(fileObject); } return content.toJSONString(); } } 17. Finally, you expose the service by creating the LogServices class: @ApplicationPath("/") public class LogServices extends Application { public LogServices(){ CassandraUtils.initCassandra(); } @Override public Set> getClasses() { final Set> classes = new HashSet>(); // register root resource classes.add(LogCount.class); return classes; } } 18. Then configure the web.xml file: Log-Web storm.cookbook.services.LogServices Step18 配置 web.xml 文件: Chapter 2 85 query.setColumnFamily("LogVolumeByMinute"); query.setKey(timestamp); query.setRange("", "", false, 100); QueryResult> result = query.execute(); Iterator> it = result.get().getColumns().iterator(); JSONArray content = new JSONArray(); while (it.hasNext()) { HCounterColumn column = it.next(); JSONObject fileObject = new JSONObject(); fileObject.put("FileName", column.getName()); fileObject.put("Total", column.getValue()); fileObject.put("Minute", Long.parseLong(timestamp)); content.add(fileObject); } return content.toJSONString(); } } 17. Finally, you expose the service by creating the LogServices class: @ApplicationPath("/") public class LogServices extends Application { public LogServices(){ CassandraUtils.initCassandra(); } @Override public Set> getClasses() { final Set> classes = new HashSet>(); // register root resource classes.add(LogCount.class); return classes; } } 18. Then configure the web.xml file: Log-Web storm.cookbook.services.LogServices Log Stream Processing 86 com.sun.jersey.spi.container.servlet.ServletContainer javax.ws.rs.Application storm.cookbook.services.LogServices 1 storm.cookbook.services.LogServices /services/* 19. You can now run your project using the following command from the root of your web- log project: mvn jetty:run Your dashboard will then be available at localhost:8080. How it works… At a high level, the dashboard works by periodically querying the server for counts for a given time. It maintains an in-memory structure on the client side to hold the results of these queries and then feeds the consolidated two-dimensional array into the graph class. Take a look at the HTML; the following code defines where the graph will be displayed:
The chart is defined by the following: nv.addGraph(function () { chart = nv.models.stackedAreaChart() .x(function(d) { return d[0] }) .y(function(d) { return d[1] }) .clipEdge(true); chart.xAxis .tickFormat(function(d) { return d3.time.format('%X')(new Date(d)) }) Step19 这时就可以运行你的工程了,在 web-log 项目根目录下执行如下命令: Log Stream Processing 86 com.sun.jersey.spi.container.servlet.ServletContainer
javax.ws.rs.Application storm.cookbook.services.LogServices 1
storm.cookbook.services.LogServices /services/*
19. You can now run your project using the following command from the root of your web- log project: mvn jetty:run Your dashboard will then be available at localhost:8080. How it works… At a high level, the dashboard works by periodically querying the server for counts for a given time. It maintains an in-memory structure on the client side to hold the results of these queries and then feeds the consolidated two-dimensional array into the graph class. Take a look at the HTML; the following code defines where the graph will be displayed:
The chart is defined by the following: nv.addGraph(function () { chart = nv.models.stackedAreaChart() .x(function(d) { return d[0] }) .y(function(d) { return d[1] }) .clipEdge(true); chart.xAxis .tickFormat(function(d) { return d3.time.format('%X')(new Date(d)) }) 然后通过 localhost:8080 访问你的日志分析面板。 2.8.2 解析 在上层实现中,面板会根据指定时间周期性地查询服务器。为了能够保存查询结果,它 // 注册根资源 2.8 创建日志分析面板   69 会在客户端维持一个内存中的结构,并将二维数组转换成图形类。我们来看一下 HTML,以 下代码定义了图形显示的位置: Log Stream Processing 86 com.sun.jersey.spi.container.servlet.ServletContainer
javax.ws.rs.Application storm.cookbook.services.LogServices 1
storm.cookbook.services.LogServices /services/*
19. You can now run your project using the following command from the root of your web- log project: mvn jetty:run Your dashboard will then be available at localhost:8080. How it works… At a high level, the dashboard works by periodically querying the server for counts for a given time. It maintains an in-memory structure on the client side to hold the results of these queries and then feeds the consolidated two-dimensional array into the graph class. Take a look at the HTML; the following code defines where the graph will be displayed:
The chart is defined by the following: nv.addGraph(function () { chart = nv.models.stackedAreaChart() .x(function(d) { return d[0] }) .y(function(d) { return d[1] }) .clipEdge(true); chart.xAxis .tickFormat(function(d) { return d3.time.format('%X')(new Date(d)) }) 通过以下代码定义图表: Log Stream Processing 86 com.sun.jersey.spi.container.servlet.ServletContainer javax.ws.rs.Application storm.cookbook.services.LogServices 1 storm.cookbook.services.LogServices /services/* 19. You can now run your project using the following command from the root of your web- log project: mvn jetty:run Your dashboard will then be available at localhost:8080. How it works… At a high level, the dashboard works by periodically querying the server for counts for a given time. It maintains an in-memory structure on the client side to hold the results of these queries and then feeds the consolidated two-dimensional array into the graph class. Take a look at the HTML; the following code defines where the graph will be displayed:
The chart is defined by the following: nv.addGraph(function () { chart = nv.models.stackedAreaChart() .x(function(d) { return d[0] }) .y(function(d) { return d[1] }) .clipEdge(true); chart.xAxis .tickFormat(function(d) { return d3.time.format('%X')(new Date(d)) }) Chapter 2 87 .axisLabel('Time') .showMaxMin(false); chart.yAxis .axisLabel('Volume') .tickFormat(d3.format(',.2f')); d3.select('#chart svg') .datum(getdata()) .transition().duration(500) .call(chart); nv.utils.windowResize(chart.update); chart.dispatch.on('stateChange', function (e) { nv.log('New State:', JSON.stringify(e)); }); return chart; }); The in-memory structure is essentially simply a two-dimensional array of values and so it is important to map these onto the x and y axes on the graph, which is done through the following: chart = nv.models.stackedAreaChart() .x(function(d) { return d[0] }) .y(function(d) { return d[1] }) .clipEdge(true); Data is fetched through the fetch() method, which issues an Ajax asynchronus request to the server. Once the response is received, it is added to the in-memory structure in the onDataReceived(series) method. Finally, the getdata() method maps the log structure into a two-dimension array to be displayed by the graph. On the server side, the service is exposed via Jersey. It is the open source, production- quality, JSR 311 Reference Implementation for building RESTful web services. Services are defined using annotations. For this recipe, only the single service is defined by the following annotations to the LogCount class: @Path("/LogCount") public class LogCount { @GET @Path("/TotalsForMinute/{timestamp}") @Produces("application/json") public String getMinuteTotals(@PathParam("timestamp") String timestamp){ 内存结构其实是一个简单的数值型二维数组,所以我们能够通过以下方法把这些值映射 到图形中的 x 和 y 轴: Chapter 2 87 .axisLabel('Time') .showMaxMin(false); chart.yAxis .axisLabel('Volume') .tickFormat(d3.format(',.2f')); d3.select('#chart svg') .datum(getdata()) .transition().duration(500) .call(chart); nv.utils.windowResize(chart.update); chart.dispatch.on('stateChange', function (e) { nv.log('New State:', JSON.stringify(e)); }); return chart; }); The in-memory structure is essentially simply a two-dimensional array of values and so it is important to map these onto the x and y axes on the graph, which is done through the following: chart = nv.models.stackedAreaChart() .x(function(d) { return d[0] }) .y(function(d) { return d[1] }) .clipEdge(true); Data is fetched through the fetch() method, which issues an Ajax asynchronus request to the server. Once the response is received, it is added to the in-memory structure in the onDataReceived(series) method. Finally, the getdata() method maps the log structure into a two-dimension array to be displayed by the graph. On the server side, the service is exposed via Jersey. It is the open source, production- quality, JSR 311 Reference Implementation for building RESTful web services. Services are defined using annotations. For this recipe, only the single service is defined by the following annotations to the LogCount class: @Path("/LogCount") public class LogCount { @GET @Path("/TotalsForMinute/{timestamp}") @Produces("application/json") public String getMinuteTotals(@PathParam("timestamp") String timestamp){ 数据是通过 fetch() 方法来获取的,该方法通过 Ajax 异步请求将数据发送给服务器。当 客户端接收到响应后,通过 onDataReceived(series) 回调,把相应结果添加到内存结构中。最 后,getdata() 方法负责将日志结构映射成对应图形的二维数组。 70   第2章 日志流处理 在服务器端,通过 Jersey 来提供服务。它是一个开源且符合 JSR311 标准的产品,用 于构建RESTful Web 服务。在 Jersey 中,服务通常是通过注解来定义的。本例只需在 LogCount 类中定义一种服务,通过以下注解来实现: Chapter 2 87 .axisLabel('Time') .showMaxMin(false); chart.yAxis .axisLabel('Volume') .tickFormat(d3.format(',.2f')); d3.select('#chart svg') .datum(getdata()) .transition().duration(500) .call(chart); nv.utils.windowResize(chart.update); chart.dispatch.on('stateChange', function (e) { nv.log('New State:', JSON.stringify(e)); }); return chart; }); The in-memory structure is essentially simply a two-dimensional array of values and so it is important to map these onto the x and y axes on the graph, which is done through the following: chart = nv.models.stackedAreaChart() .x(function(d) { return d[0] }) .y(function(d) { return d[1] }) .clipEdge(true); Data is fetched through the fetch() method, which issues an Ajax asynchronus request to the server. Once the response is received, it is added to the in-memory structure in the onDataReceived(series) method. Finally, the getdata() method maps the log structure into a two-dimension array to be displayed by the graph. On the server side, the service is exposed via Jersey. It is the open source, production- quality, JSR 311 Reference Implementation for building RESTful web services. Services are defined using annotations. For this recipe, only the single service is defined by the following annotations to the LogCount class: @Path("/LogCount") public class LogCount { @GET @Path("/TotalsForMinute/{timestamp}") @Produces("application/json") public String getMinuteTotals(@PathParam("timestamp") String timestamp){ 定义好后,这项服务就可以通过 localhost:8080/services/LogCount/TotalForMinutes/ [timestamp] 来访问了。我们把传递给 timestamp 变量的值作为查询条件,来执行 Cassandra 查询。最终的查询结果会被映射到 JSON 对象上,然后将其返回给调用者: Log Stream Processing 88 This service will then essentially be available from localhost:8080/services/ LogCount/TotalForMinutes/[timestamp]. The value passed into the timestamp variable will be used in performing the lookup against Cassandra. The results of the query are then mapped onto a JSON object and returned to the caller: Iterator> it = result.get().getColumns(). iterator(); JSONArray content = new JSONArray(); while (it.hasNext()) { HCounterColumn column = it.next(); JSONObject fileObject = new JSONObject(); fileObject.put("FileName", column.getName()); fileObject.put("Total", column.getValue()); fileObject.put("Minute", Long.parseLong(timestamp)); content.add(fileObject); } return content.toJSONString(); It is usually quite difficult to bring up the entire topology and set of clusters in order to simply test the web application; a convenient main class is provided in the supporting material that populates the column family with random data, allowing for easy testing of the web application in isolation 通常来讲,我们不会调用整个 Topology 和集群,这是为了确保测试 Web 应 用程序的简单性。在支持材料中提到了一个实用的 main 类,它可以使用随 机数据来填充列族,这样就可以对 Web 应用程序进行简单独立的测试了。
还剩82页未读

继续阅读

下载pdf到电脑,查找使用更方便

pdf的实际排版效果,会与网站的显示效果略有不同!!

需要 8 金币 [ 分享pdf获得金币 ] 1 人已下载

下载pdf

pdf贡献者

open9876

贡献于2015-03-25

下载需要 8 金币 [金币充值 ]
亲,您也可以通过 分享原创pdf 来获得金币奖励!
下载pdf