Hadoop技术内幕:深入解析MapReduce架构设计与实现原理


Hadoop 技术内幕: 深入解析 MapReduce 架构 设计与实现原理 董西成 著 图书在版编目(CIP)数据 Hadoop 技术内幕 :深入解析 MapReduce 架构设计与实现原理 / 董西成著 . —北京 :机械工业出版社, 2013.5 (大数据技术丛书) ISBN 978-7-111-42226-6 Ⅰ. H… Ⅱ. 董… Ⅲ. 数据处理软件 Ⅳ. TP274 中国版本图书馆 CIP 数据核字(2013)第 077161 号 版权所有 • 侵权必究 封底无防伪标均为盗版 本书法律顾问 北京市展达律师事务所 “Hadoop 技术内幕”共两册,分别从源代码的角度对“Common+HDFS”和“MapReduce 的架构 设计和实现原理”进行了极为详细的分析。本书由 Hadoop 领域资深的实践者亲自执笔,首先介绍了 MapReduce 的设计理念和编程模型,然后从源代码的角度深入分析了 RPC 框架、客户端、JobTracker、 TaskTracker 和 Task 等 MapReduce 运行时环境的架构设计与实现原理,最后从实际应用的角度深入讲 解了 Hadoop 的性能优化、安全机制、多用户作业调度器和下一代 MapReduce 框架等高级主题和内容。 本书适合 Hadoop 的二次开发人员、应用开发工程师、运维工程师阅读。 本书共 12 章,分 4 个部分(不包括附录):第一部分(第 1 ~ 2 章),介绍了 Hadoop 源代码的组 织结构、获取、编译、调试、阅读环境搭建,以及 MapReduce 的设计理念和基本架构 ;第二部分(第 3 章),着重讲解了 MapReduce 的编程接口,主要包括旧 API 和新 API 两套编程接口,以及 Hadoop 工作流 ;第三部分(第 4 ~ 8 章)主要分析了 MapReduce 的运行时环境,包括 RPC 框架、客户端、 JobTracker、TaskTracker 和 Task 等的内部实现细节和机制剖析;第四部分(第 9 ~ 12 章)深入讲解了 Hadoop 的性能优化、多用户作业调度器、安全机制和下一代 MapReduce 框架等高级主题。 机械工业出版社(北京市西城区百万庄大街 22 号  邮政编码 100037) 责任编辑:孙海亮 印刷 2013 年 5 月第 1 版第 1 次印刷 186mm×240mm • 20.75 印张 标准书号:ISBN 978-7-111-42226-6 定  价:69.00 元 凡购本书,如有缺页、倒页、脱页,由本社发行部调换 客服热线:(010)88378991 88361066 投稿热线:(010)88379604 购书热线:(010)68326294 88379649 68995259 读者信箱:hzjsj@hzbook.com 前  言 为什么要写这本书 突然之间,大数据一下子就“火”了,开源软件 Hadoop 也因此水涨船高。得益于 一些国际领先厂商,尤其是 FaceBook、Yahoo !以及阿里巴巴等互联网巨头的现身说法, Hadoop 被看成大数据分析的“神器”。IDC 在对中国未来几年的预测中就专门提到了大 数据,其认为未来几年,会有越来越多的企业级用户试水大数据平台和应用,而这之中, Hadoop 将成为最耀眼的“明星”。 尽管 Hadoop 整个生态系统是开源的,但是,由于它包含的软件种类过多,且版本升 级过快,大部分公司,尤其是一些中小型公司,难以在有限的时间内快速掌握 Hadoop 蕴 含的价值。此外,Hadoop 自身版本的多样化也给很多研发人员带来了很大的学习负担。 尽管当前市面上已有很多参考书籍,比如《Hadoop: The Definitive Guide》、《Hadoop in Action》、《Pro Hadoop》、《Hadoop Operations》等,但是,至今还没有一本书能够深入地剖 析 Hadoop 内部的实现细节,比如 JobTracker 实现、作业调度器实现等。也正因如此,很多 Hadoop 初学者和研发人员只能参考网络上一些零星的源代码分析的文章,自己一点一点地 阅读源代码,缓慢地学习 Hadoop。而本书正是为了解决以上各种问题而编写的,它是国内 第一本深入剖析 Hadoop 内部实现细节的书籍。 本书以 Hadoop 1.0 为基础,深入剖析了 Hadoop MapReduce 中各个组件的实现细节, 包括 RPC 框架、JobTracker 实现、TaskTracker 实现、Task 实现和作业调度器实现等。书 中不仅详细介绍了 MapReduce 各个组件的内部实现原理,而且结合源代码进行了深入的剖  IV 析,使读者可以快速全面地掌握 Hadoop MapReduce 设计原理和实现细节。 读者对象 (1)Hadoop 二次开发人员 Hadoop 由于在扩展性、容错性和稳定性等方面的诸多优点,已被越来越多的公司采 用。而为了减少开发成本,大部分公司在 Hadoop 基础上进行了二次开发,以打造属于公司 内部的 Hadoop 平台。对于 Hadoop 二次开发人员来说,深入而又全面地了解 Hadoop 的设 计原理与实现细节是修改 Hadoop 内核的前提,而本书可帮助这部分读者快速而又全面地了 解 Hadoop 实现细节。 (2)Hadoop 应用开发人员 如果要利用 Hadoop 进行高级应用开发,仅掌握 Hadoop 基本使用方法是远远不够的, 必须对 Hadoop 框架的设计原理、架构和运作机制有一定的了解。对这部分读者而言,本书 将带领他们全面了解 Hadoop 的设计和实现原理,加深对 Hadoop 框架的理解,提高开发水 平,从而编写出更加高效的 MapReduce 应用程序。 (3)Hadoop 运维工程师 对于一名合格的 Hadoop 运维工程师而言,适当地了解 Hadoop 框架的设计原理、架 构和运作机制是十分有帮助的。这不仅可以使 Hadoop 运维人员更快地排除各种可能的 Hadoop 故障,还可以让 Hadoop 运维人员与研发人员进行更有效的沟通。通过阅读这本书, Hadoop 运维人员可以了解到很多其他书中无法获取的 Hadoop 实现细节。 (4)开源软件爱好者 Hadoop 是开源软件中的佼佼者。它在实现的过程中吸收了很多开源领域的优秀思想, 同时有很多值得学习的创新。尤为值得一提的是,本书分析 Hadoop 架构设计和实现原理的 方式也许值得所有开源软件爱好者学习和借鉴。通过阅读本书,这部分读者不仅能领略到 开源软件的优秀思想,还可以掌握分析开源软件源代码的方法和技巧,从而进一步提高使 用开源软件的效率和质量。 如何阅读本书 本书分为四大部分(不包括附录): 第一部分为基础篇,简单地介绍 Hadoop 的阅读环境搭建和基本设计架构,帮助读者了 解一些基础背景知识。 第二部分为 MapReduce 编程模型篇,着重讲解 MapReduce 编程接口,主要包括两套 编程接口,分别是旧 API 和新 API。 第三部分为 MapReduce 核心设计篇,主要讲解 Hadoop MapReduce 的运行时环境,包 括 RPC 框架、客户端、JobTracker、TaskTracker 和 Task 等内部实现细节。 V  第四部分为 MapReduce 高级篇,主要讲解 Hadoop MapReduce 中的一些高级特性和未 来发展趋势,包括多用户作业调度器、安全机制和下一代 MapReduce 框架等。 另外,本书最后还添加了几个附录 :附录 A 为安装 Hadoop 过程中可能存在的问题及 解决方案 ;附录 B 为 Hadoop 默认 HTTP 端口号以及 HTTP 地址。参考资料中包括了本书 写作过程中参考的书籍、论文、Hadoop Jira 和网络资源。 如果你是一名经验丰富的资深用户,能够理解 Hadoop 的相关基础知识和使用技巧,那 么你可以直接阅读第三部分和第四部分。但是,如果你是一名初学者,请一定从第 1 章的 基础理论知识开始学习。 勘误和支持 由于笔者的水平有限,加之编写时间仓促,书中难免会出现一些错误或者不准确的 地方,恳请读者批评指正。为此,笔者特意创建了一个在线支持与应急方案的站点 http:// hadoop123.com。你可以将书中的错误发布在 Bug 勘误表页面中。如果你遇到问题,可以访 问 Q&A 页面,我将尽量在线上为读者提供最满意的解答。如果你有什么宝贵意见,欢迎发 送邮件至 dongxicheng@yahoo.com,期待能够得到你的真挚反馈。 致谢 感谢我的导师廖华明副研究员。在我读研没空顾及项目的时候,她给了我一次又一次 的鼓励,她甚至专门为我写书留出空闲时间。在廖老师的身边,我还学会了很多专业知识 以外的东西。 感谢腾讯的蔡斌老师。正是由于他的推荐,才使本书的出版成为可能。 感谢机械工业出版社华章公司的杨福川老师和孙海亮老师。是他们在这一年多的时间 中始终支持着我的写作,是他们的鼓励和帮助使我顺利完成了本书的编写工作。 感谢对本书部分章节提出改进建议的何鹏、姜冰、郑伟伟等人。另外,感谢给我提供 各种帮助的战科宇、周礼、刘晏辰、孟椿智、王群、王颖、曹聪、朱雪峰等人。 最后,感谢我父母的养育之恩,感谢兄长的鼓励和支持,感谢他们时时刻刻给我信心 和力量!感谢我的女朋友颛悦对我生活的悉心照料与琐事上的宽容。 谨以此书献给我最亲爱的家人,以及众多热爱 Hadoop 的朋友们! 董西成 于北京 目  录 前 言 第一部分 基础篇 第 1 章 阅读源代码前的准备 / 2 1.1 准备源代码学习环境 / 2 1.1.1 基础软件下载 / 2 1.1.2 如何准备 Windows 环境 / 3 1.1.3 如何准备 Linux 环境 / 6 1.2 获取 Hadoop 源代码 / 7 1.3 搭建 Hadoop 源代码阅读环境 / 8 1.3.1 创建 Hadoop 工程 / 8 1.3.2 Hadoop 源代码阅读技巧 / 9 1.4 Hadoop 源代码组织结构 / 10 1.5 Hadoop 初体验 / 13 1.5.1 启动 Hadoop/ 13 1.5.2 Hadoop Shell 介绍 / 15 VII  1.5.3 Hadoop Eclipse 插件介绍 / 15 1.6 编译及调试 Hadoop 源代码 / 19 1.6.1 编译 Hadoop 源代码 / 19 1.6.2 调试 Hadoop 源代码 / 20 1.7 小结 / 23 第 2 章 MapReduce 设计理念与基本架构 / 24 2.1 Hadoop 发展史 / 24 2.1.1 Hadoop 产生背景 / 24 2.1.2 Apache Hadoop 新版本的特性 / 25 2.1.3 Hadoop 版本变迁 / 26 2.2 Hadoop MapReduce 设计目标 / 28 2.3 MapReduce 编程模型概述 / 29 2.3.1 MapReduce 编程模型简介 / 29 2.3.2 MapReduce 编程实例 / 31 2.4 Hadoop 基本架构 / 32 2.4.1 HDFS 架构 / 33 2.4.2 Hadoop MapReduce 架构 / 34 2.5 Hadoop MapReduce 作业的生命周期 / 36 2.6 小结 / 38 第二部分 MapReduce 编程模型篇 第 3 章 MapReduce 编程模型 / 40 3.1 MapReduce 编程模型概述 / 40 3.1.1 MapReduce 编程接口体系结构 / 40 3.1.2 新旧 MapReduce API 比较 / 41 3.2 MapReduce API 基本概念 / 42 3.2.1 序列化 / 42 3.2.2 Reporter 参数 / 43 3.2.3 回调机制 / 43 3.3 Java API 解析 / 44  VIII 3.3.1 作业配置与提交 / 44 3.3.2 InputFormat 接口的设计与实现 / 48 3.3.3 OutputFormat 接口的设计与实现 / 53 3.3.4 Mapper 与 Reducer 解析 / 55 3.3.5 Partitioner 接口的设计与实现 / 59 3.4 非 Java API 解析 / 61 3.4.1 Hadoop Streaming 的实现原理 / 61 3.4.2 Hadoop Pipes 的实现原理 / 64 3.5 Hadoop 工作流 / 67 3.5.1 JobControl 的实现原理 / 67 3.5.2 ChainMapper/ChainReducer 的实现原理 / 69 3.5.3 Hadoop 工作流引擎 / 71 3.6 小结 / 73 第三部分 MapReduce 核心设计篇 第 4 章 Hadoop RPC 框架解析 / 76 4.1 Hadoop RPC 框架概述 / 76 4.2 Java 基础知识 / 77 4.2.1 Java 反射机制与动态代理 / 78 4.2.2 Java 网络编程 / 80 4.2.3 Java NIO/ 82 4.3 Hadoop RPC 基本框架分析 / 89 4.3.1 RPC 基本概念 / 89 4.3.2 Hadoop RPC 基本框架 / 91 4.3.3 集成其他开源 RPC 框架 / 98 4.4 MapReduce 通信协议分析 / 100 4.4.1 MapReduce 通信协议概述 / 100 4.4.2 JobSubmissionProtocol 通信协议 / 102 4.4.3 InterTrackerProtocol 通信协议 / 102 4.4.4 TaskUmbilicalProtocol 通信协议 / 103 4.4.5 其他通信协议 / 104 IX  4.5 小结 / 106 第 5 章 作业提交与初始化过程分析 / 107 5.1 作业提交与初始化概述 / 107 5.2 作业提交过程详解 / 108 5.2.1 执行 Shell 命令 / 108 5.2.2 作业文件上传 / 109 5.2.3 产生 InputSplit 文件 / 111 5.2.4 作业提交到 JobTracker/ 113 5.3 作业初始化过程详解 / 115 5.4 Hadoop DistributedCache 原理分析 / 117 5.4.1 使用方法介绍 / 118 5.4.2 工作原理分析 / 120 5.5 小结 / 122 第 6 章 JobTracker 内部实现剖析 / 123 6.1 JobTracker 概述 / 123 6.2 JobTracker 启动过程分析 / 125 6.2.1 JobTracker 启动过程概述 / 125 6.2.2 重要对象初始化 / 125 6.2.3 各种线程功能 / 128 6.2.4 作业恢复 / 129 6.3 心跳接收与应答 / 129 6.3.1 更新状态 / 131 6.3.2 下达命令 / 131 6.4 Job 和 Task 运行时信息维护 / 134 6.4.1 作业描述模型 / 134 6.4.2 JobInProgress/ 136 6.4.3 TaskInProgress/ 137 6.4.4 作业和任务状态转换图 / 139 6.5 容错机制 / 141 6.5.1 JobTracker 容错 / 141 6.5.2 TaskTracker 容错 / 142  X 6.5.3 Job/Task 容错 / 145 6.5.4 Record 容错 / 147 6.5.5 磁盘容错 / 151 6.6 任务推测执行原理 / 152 6.6.1 计算模型假设 / 153 6.6.2 1.0.0 版本的算法 / 153 6.6.3 0.21.0 版本的算法 / 154 6.6.4 2.0 版本的算法 / 156 6.7 Hadoop 资源管理 / 157 6.7.1 任务调度框架分析 / 159 6.7.2 任务选择策略分析 / 162 6.7.3 FIFO 调度器分析 / 164 6.7.4 Hadoop 资源管理优化 / 165 6.8 小结 / 168 第 7 章 TaskTracker 内部实现剖析 / 169 7.1 TaskTracker 概述 / 169 7.2 TaskTracker 启动过程分析 / 170 7.2.1 重要变量初始化 / 171 7.2.2 重要对象初始化 / 171 7.2.3 连接 JobTracker/ 172 7.3 心跳机制 / 172 7.3.1 单次心跳发送 / 172 7.3.2 状态发送 / 175 7.3.3 命令执行 / 178 7.4 TaskTracker 行为分析 / 179 7.4.1 启动新任务 / 179 7.4.2 提交任务 / 179 7.4.3 杀死任务 / 181 7.4.4 杀死作业 / 182 7.4.5 重新初始化 / 184 7.5 作业目录管理 / 184 7.6 启动新任务 / 186 XI  7.6.1 任务启动过程分析 / 186 7.6.2 资源隔离机制 / 193 7.7 小结 / 195 第 8 章 Task 运行过程分析 / 196 8.1 Task 运行过程概述 / 196 8.2 基本数据结构和算法 / 197 8.2.1 IFile 存储格式 / 197 8.2.2 排序 / 198 8.2.3 Reporter/ 201 8.3 Map Task 内部实现 / 204 8.3.1 Map Task 整体流程 / 204 8.3.2 Collect 过程分析 / 205 8.3.3 Spill 过程分析 / 213 8.3.4 Combine 过程分析 / 214 8.4 Reduce Task 内部实现 / 214 8.4.1 Reduce Task 整体流程 / 215 8.4.2 Shuffle 和 Merge 阶段分析 / 215 8.4.3 Sort 和 Reduce 阶段分析 / 218 8.5 Map/Reduce Task 优化 / 219 8.5.1 参数调优 / 219 8.5.2 系统优化 / 220 8.6 小结 / 224 第四部分 MapReduce 高级篇 第 9 章 Hadoop 性能调优 / 228 9.1 概述 / 228 9.2 从管理员角度进行调优 / 229 9.2.1 硬件选择 / 229 9.2.2 操作系统参数调优 / 229 9.2.3 JVM 参数调优 / 230  XII 9.2.4 Hadoop 参数调优 / 230 9.3 从用户角度进行调优 / 235 9.3.1 应用程序编写规范 / 235 9.3.2 作业级别参数调优 / 235 9.3.3 任务级别参数调优 / 239 9.4 小结 / 240 第 10 章 Hadoop 多用户作业调度器 / 241 10.1 多用户调度器产生背景 / 241 10.2 HOD/ 242 10.2.1 Torque 资源管理器 / 242 10.2.2 HOD 作业调度 / 243 10.3 Hadoop 队列管理机制 / 245 10.4 Capacity Scheduler 实现 / 246 10.4.1 Capacity Scheduler 功能介绍 / 247 10.4.2 Capacity Scheduler 实现 / 249 10.4.3 多层队列调度 / 254 10.5 Fair Scheduler 实现 / 255 10.5.1 Fair Scheduler 功能介绍 / 255 10.5.2 Fair Scheduler 实现 / 258 10.5.3 Fair Scheduler 与 Capacity Scheduler 对比 / 263 10.6 其他 Hadoop 调度器介绍 / 264 10.7 小结 / 265 第 11 章 Hadoop 安全机制 / 266 11.1 Hadoop 安全机制概述 / 266 11.1.1 Hadoop 面临的安全问题 / 266 11.1.2 Hadoop 对安全方面的需求 / 267 11.1.3 Hadoop 安全设计基本原则 / 267 11.2 基础知识 / 268 11.2.1 安全认证机制 / 268 11.2.2 Kerberos 介绍 / 270 11.3 Hadoop 安全机制实现 / 273 XIII  11.3.1 RPC/ 273 11.3.2 HDFS/ 276 11.3.3 MapReduce/ 278 11.3.4 上层服务 / 280 11.4 应用场景总结 / 281 11.4.1 文件存取 / 281 11.4.2 作业提交与运行 / 282 11.4.3 上层中间件访问 Hadoop/ 282 11.5 小结 / 283 第 12 章 下一代 MapReduce 框架 / 284 12.1 第一代 MapReduce 框架的局限性 / 284 12.2 下一代 MapReduce 框架概述 / 284 12.2.1 基本设计思想 / 284 12.2.2 资源统一管理平台 / 286 12.3 Apache YARN/ 287 12.3.1 Apache YARN 基本框架 / 287 12.3.2 Apache YARN 工作流程 / 290 12.3.3 Apache YARN 设计细节 / 291 12.3.4 MapReduce 与 YARN 结合 / 294 12.4 Facebook Corona / 298 12.4.1 Facebook Corona 基本框架 / 298 12.4.2 Facebook Corona 工作流程 / 300 12.4.3 YARN 与 Corona 对比 / 303 12.5 Apache Mesos/ 304 12.5.1 Apache Mesos 基本框架 / 304 12.5.2 Apache Mesos 资源分配 / 305 12.5.3 MapReduce 与 Mesos 结合 / 307 12.6 小结 / 309 附录 A 安装 Hadoop 过程中可能存在的问题及解决方案 / 310 附录 B Hadoop 默认 HTTP 端口号以及 HTTP 地址 / 312 参考资料 / 313 第一部分 基 础 篇 本部分内容 阅读源代码前的准备 MapReduce 设计理念与基本架构 第 1 章 阅读源代码前的准备 一般而言,在深入研究一个系统的技术细节之前,先要进行一些基本的准备工作,比 如,准备源代码阅读环境,搭建运行环境并尝试使用该系统等。对于 Hadoop 而言,由于它 是一个分布式系统,且由多种守护进程组成,具有一定的复杂性,如果想深入学习其设计 原理,仅仅进行以上几项准备工作是不够的,还要学习一些调试工具的使用方法,以便对 Hadoop 源代码进行调试、跟踪。边用边学,这样才能事半功倍。 本章的编写目的是帮助读者构建一个“高效”的 Hadoop 源代码学习环境,包括 Hadoop 源代码阅读环境、Hadoop 使用环境和 Hadoop 源代码编译调试环境等,这主要涉及如下内容: 在 Linux 和 Windows 环境下搭建 Hadoop 源代码阅读环境的方法; ❑ Hadoop 的基本使用方法,主要包括 Hadoop Shell 和 Eclipse 插件两种工具的使用; ❑ Hadoop 源代码编译和调试方法,其中,调试方法包括使用 Eclipse 远程调试和打印 ❑ 调试日志两种。 考虑到大部分用户在单机上学习 Hadoop 源代码,所以本章内容均是基于单机环境的。 本章大部分内容较为基础,已经掌握这部分内容的读者可以直接跳过本章。 1.1 准备源代码学习环境 对于大部分公司而言,实验和生产环境中的服务器集群部署的都是 Linux 操作系统。 考虑到 Linux 在服务器市场中具有统治地位,Hadoop 从一开始便是基于 Linux 操作系统开 发的,因而对 Linux 有着非常完美的支持。尽管 Hadoop 采用了具有跨平台特性的 Java 作 为主要编程语言,但由于它的一些功能实现用到了 Linux 操作系统相关的技术,因而对其 他平台的支持不够友好,且没有进行过严格测试。换句话说,其他操作系统(如 Windows) 仅可作为开发环境 ,不可作为生产环境。对于学习源代码而言,操作系统的选择显得不是 非常重要,读者可根据个人爱好自行决定。本节以 64 bit Linux 和 32 bit Windows 两种操作 系统为例,介绍如何在单机上准备 Hadoop 源代码学习环境。1 1.1.1 基础软件下载 前面提到 Hadoop 采用的开发语言主要是 Java,因而搭建 Hadoop 环境所需的最基础的 软件首先应该包括 Java 基础开发包 JDK 和 Java 编译工具 Ant。考虑到源代码阅读和调试的   截至本书结稿时,Apache Hadoop SVN 中已经出现了针对 Windows 操作系统的分支,具体见 http://svn. apache.org/repos/asf/hadoop/common/branches/ 下的 branch-1-win 和 branch-trunk-win,且 Hortonworks 公 司发布了 Windows 安装版本,具体见 http://hortonworks.com/partners/microsoft/ 第 1 章 阅读源代码前的准备   3 便利性,本书采用功能强大的集成开发环境 Eclipse。此外,如果读者选择 Windows 平台搭 建学习环境,还需要安装 Cygwin 以模拟 Linux 环境,这是因为 Hadoop 采用 Bash Shell 脚 本管理集群。搭建 Hadoop 阅读环境需要的各种软件以及下载方式如表 1-1 所示。 表 1-1 搭建 Hadoop 阅读环境所需的各种软件及下载方式 软  件 下载网址 推荐版本 说  明 JDK http://www.oracle.com/technetwork/ java/javase/downloads/index.html 1.6 以上 Windows 和 Linux 的安装 包不同 Ant http://ant.apache.org/bindownload.cgi 1.6.0 以上 Windows 和 Linux 使用相 同的安装包 Cygwin http://www.cygwin.com/ 最新版本 只有 Windows 平台需要 Eclipse http://www.eclipse.org/downloads/ Galileo 或者 Helios 版本 Windows 和 Linux 的安装 包不同 1.1.2 如何准备 Windows 环境 2 本小节将介绍如何准备 Windows 下的 Hadoop 学习环境,包括 JDK、Ant、Cygwin 和 Eclipse 等基础软件的使用方法。本小节假设用户的软件安装目录为 D:\hadoop,且最终安 装完成的目录结构为: D:\hadoop ├─ apache-ant-1.7.1 ├─ cygwin └─ Java └─ jdk1.6.0_25 1. JDK 的安装 用 户 下 载 的 安 装 包 为 jdk-6u25- windows-i586.exe,直接双击该安装包将 JDK 安装到 D:\hadoop\Java\ 下,然后设配 置环境变量 JAVA_HOME、CLASSPATH、 PATH(不区分大小写),方法如下。 (1)配置 JAVA_HOME 如图 1-1 所示,在 Windows 桌面上, 右击“计算机”图标,然后在弹出的快捷 菜单中依次选择“属性”→“高级系统设 置”→“环境变量”命令,然后在系统变 量栏,单击“新建”按钮,在弹出的对话框中的“变量名”文本框中填写 JAVA_HOME,在 “变量值”文本框中填写 D:\hadoop\Java\jdk1.6.0_25,然后单击“确定”按钮。  注意,Indigo 及以上版本与 Hadoop Eclipse 插件可能存在兼容问题。 图 1-1 Windows 环境下设置 JAVA_HOME 环境变量 4   第一部分 基 础 篇 (2)配置 CLASSPATH 参考 JAVA_HOME 另建一个系统变量,变量名为 CLASSPATH,变量值为: .;%JAVA_HOME%/lib/dt.jar;%JAVA_HOME%/lib/tools.jar; (3)配置 PATH PATH 变量已经存在,选中后再单击“编辑”按钮即可。在变量值中添加如下内容: %JAVA_HOME%/bin;%JAVA_HOME%/jre/bin 经过以上配置,JDK 已经安装完毕,可在 DOS 窗口中输入命令“java -version”以验 证是否安装成功。如果输出以下内容,则说明安装成功: java version "1.6.0_25" Java(TM) SE Runtime Environment (build 1.6.0_25-b05) Java HotSpot(TM) Client VM (build 20.6-b01, mixed mode, sharing) 2. Ant 的安装 假设下载的安装包为 apache-ant-1.7.1-bin.zip,直接将其解压到工作目录 D:\hadoop\ 下, 并添加新的环境变量 ANT_HOME,设置其值为 D:\hadoop\apache-ant-1.7.1,同时在环境变量 PATH 后面添加如下内容: ;%ANT_HOME%\bin 经过以上配置,Ant 已经安装完毕,可在 DOS 窗口中输入命令“ant -version”以验证 是否安装成功。如果输出以下内容,则说明安装成功: Apache Ant version 1.7.1 compiled on June 27 2008 3. Cygwin 的安装 (1)安装 Cygwin 双击下载的 Cygwin 的安装包 setup.exe,一直单击“下一步”按钮,直到出现如图 1-2 所示的界面,在“Net”一栏中选中 OpenSSH 相关软件包,会出现如图 1-3 所示的界面, 然后单击“下一步”按钮,此时系统开始在线下载并安装 Cygwin 环境(时间比较长)。 图 1-2 Windows 环境下通过 Cygwin 安装 OpenSSH—单击“Net”一栏 第 1 章 阅读源代码前的准备   5 图 1-3 Windows 环境下通过 Cygwin 安装 OpenSSH—选中 OpenSSH 软件包 (2)安装并启动 sshd 服务 Hadoop 启动 / 停止脚本需要通过 SSH 发送命令启动相关守护进程,为此需要安装 sshd 服务。安装 sshd 服务的方法是,以管理员身份打开 Cygwin 命令行终端(右击运行图标, 单击“以管理员身份运行”命令),然后输入以下命令: ssh-host-config 接着,按照命令行中的提示进行安装,具体如图 1-4 所示。 图 1-4 Windows 环境下通过 Cygwin 安装 sshd 服务 安装完毕后,输入以下命令启动 sshd 服务: net start sshd 4. Eclipse 的安装 Eclipse 官网提供的 Eclipse 版本均是免安装版,直接将下载的压缩包解压到“D:\ hadoop\”下即可使用。 6   第一部分 基 础 篇 1.1.3 如何准备 Linux 环境 本小节将介绍如何准备 Linux 下的 Hadoop 学习环境。搭建 Linux 学习环境需要安装 JDK,Ant 和 Eclipse 等软件。本书以 64 bit Ubuntu 为例,介绍安装这些软件的方法,最终 安装完成的目录结构为: ROOT ├─ home │ └─ dong │ └─ eclipse └─ usr └─ lib ├─ apache-ant-1.7.1 └─ jvm └─ jdk1.6.0_25 1. JDK 的安装与配置 一般而言,Ubuntu 系统会自带 JDK,如果没有或者版本不符合要求,可按以下步骤进 行安装: 步骤 1 安装 JDK。将下载的 .bin 文件复制到 Linux 的某个目录下,比如 /usr/lib/jvm/ 下,然后在 Shell 中执行以下命令为该文件添加可执行权限: chmod +x /usr/lib/jvm/jdk1.6.0_25.bin 然后执行以下命令安装 JDK: sudo /usr/lib/jvm/jdk1.6.0_25.bin 之后将会出现安装信息,直至屏幕显示要求按下回车键。此时按下回车键后,会把 JDK 解压到文件夹 jdk1.6.0_25 中。至此,JDK 已安装完毕,下面进行配置。 步骤 2 配置 JDK。修改 /etc/profile 文件,在里面添加以下内容: export JAVA_HOME=/usr/lib/jvm/jdk1.6.0_25 export PATH=$PATH:$JAVA_HOME/bin export CLASSPATH=$CLASSPATH:$JAVA_HOME/lib:$JAVA_HOME/jre/lib 输入以下命令让配置生效: source /etc/profile 步骤 3 修改默认 JDK 版本。Ubuntu 中可能会有默认的 JDK,如 openjdk,因而我们 需要将自己安装的 JDK 设置为默认 JDK 版本,执行下面代码: sudo update-alternatives --install /usr/bin/java java /usr/lib/jvm/jdk1.6.0_25/ bin/java 300 sudo update-alternatives --install /usr/bin/javac javac /usr/lib/jvm/jdk1.6.0_25/ bin/javac 300 sudo update-alternatives --install /usr/bin/jar jar /usr/lib/jvm/jdk1.6.0_25/ bin/jar 300 sudo update-alternatives --install /usr/bin/javah javah /usr/lib/jvm jdk1.6.0_25/ 第 1 章 阅读源代码前的准备   7 bin/javah 300 sudo update-alternatives --install /usr/bin/javap javap /usr/lib/jvm/jdk1.6.0_25/ bin/javap 300 然后执行以下代码选择我们安装的 JDK 版本: sudo update-alternatives --config java 步骤 4 验证 JDK 是否安装成功。重启 Shell 终端,执行命令“java-version”。如果输 出以下内容,则说明安装成功: java version "1.6.0_25" Java(TM) SE Runtime Environment (build 1.6.0_25-b06) Java HotSpot(TM) Client VM (build 20.0-b11, mixed mode, sharing) 2. Ant 以及 Eclipse 的安装 (1)安装与配置 Ant 首先解压下载包,比如解压到文件 /usr/lib/apache-ant-1.7.1 目录下,然后修改 /etc/ profile 文件,在里面添加以下内容: export ANT_HOME=/usr/lib/apache-ant-1.7.1 export PATH=$PATH$:$ANT_HOME/bin 输入以下命令让配置生效: source /etc/profile 同 Windows 下的验证方式一样,重启终端,执行命令“ant –version”。如果输出以下 内容,则说明安装成功: Apache Ant version 1.7.1 compiled on June 27 2008 (2)安装 Eclipse 同 Windows 环境下的安装方式一样,直接解压即可使用。 1.2 获取 Hadoop 源代码 当前比较流行的 Hadoop 源代码版本有两个 :Apache Hadoop 和 Cloudera Distributed Hadoop(CDH)。Apache Hadoop 是由 Yahoo!、Cloudera、Facebook 等公司组成的 Hadoop 社区共同研发的,它属于最原始的开源版本。在该版本的基础上,很多公司进行了封装和 优化,推出了自己的开源版本,其中,最有名的一个是 Cloudera 公司发布的 CDH 版本。 考虑到 Apache Hadoop 是最原始的版本,且使用最为广泛,因而本书选用了 Apache Hadoop 版本为分析对象。自从 Apache Hadoop 发布以来,已经陆续推出很多版本(具体 介绍见 2.1.3 节)。其中,最具有标志性的版本是 1.0.0,而该书正是基于该版本对 Hadoop MapReduce 进行深入分析的。Hadoop 1.0.0 可从 http://hadoop.apache.org/common/releases. html 或 http://svn.apache.org/repos/asf/hadoop/common/branches/branch-1.01 处下载。 8   第一部分 基 础 篇 1.3 搭建 Hadoop 源代码阅读环境 1.3.1 创建 Hadoop 工程 本小节介绍如何创建一个 Hadoop 源代码工程,以方便阅读源代码。创建一个 Hadoop 工程,可分两个步骤完成: 步骤 1 解压缩 Hadoop 源代码。将下载到的 Hadoop 源代码压缩包 hadoop-1.0.0.tar.gz 解压到工作目录下(对于 Windows 系统而言,为了操作方便,解压到 Cygwin 安装目录的 home/${USER} 文件夹下)。 步骤 2 新建 Java 工程。打开 Eclipse,进入 Eclipse 可视化界面后,如图 1-5 所示, 依次单击“File”→“New”→“Java Project”,并在弹出的对话框中取消选中“Use default location”前的勾号,然后选择 Hadoop 安装目录的位置。默认情况下,工程名称与 Hadoop 安装目录名称相同,用户可自行修改。单击完成按钮,Hadoop 源代码工程创建完毕。 图 1-5 新建 Hadoop 工程 回到 Eclipse 主界面后,打开新建的 Hadoop 工程,可看到整个工程的组织代码,如 图 1-6 所示,源代码按目录组织,且每个目录下以 jar 包为单位显示各个 Java 文件。 除了使用源代码压缩包导入 Eclipse 工程的方法外,读者可以尝试直接从 Hadoop SVN 上导入 Hadoop 源代码。这些源代码本身已经是从 Eclipse 工程导出的,Hadoop SVN 地址 为:http://svn.apache.org/repos/asf/hadoop/common/branches/。 第 1 章 阅读源代码前的准备   9 图 1-6 Hadoop 工程展示(部分)源代码方式 1.3.2 Hadoop 源代码阅读技巧 本小节介绍在 Eclipse 下阅读 Hadoop 源代码的一些技巧,比如如何查看一个基类有哪 些派生类、一个方法被其他哪些方法调用等。 1. 查看一个基类或接口的派生类或实现类 在 Eclipse 中,选中某个基类或接口名称,右击,在弹出的快捷菜单中选择“Quick Type Hierarchy”,可在新窗口中看到对应的所有派生类或实现类。 例如,如图 1-7 所示,打开 src\mapred\ 目录下 org.apache.hadoop.mapred 包中的 InputFormat. java 文件,查看接口 InputFormat 的所有实现类,结果如图 1-8 所示。 图 1-7 在 Eclipse 中查看 Hadoop 源代码中接口 InputFormat 的所有实现类 2. 查看函数的调用关系 在 Eclipse 中, 选 中 某 个 方 法 名 称, 右 击, 在 弹 出 的 快 捷 菜 单 中 选 择“Open Call Hierarchy”,可在窗口“Call Hierarchy”中看到所有调用该方法的函数。 例 如, 如 图 1-9 所 示, 打 开 src\mapred\ 目 录 下 org.apache.hadoop.mapred 包 中 的 JobTracker.java 文件,查看调用方法 initJob 的所有函数,结果如图 1-10 所示。 10   第一部分 基 础 篇 图 1-8 Eclipse 列出接口 InputFormat 的所有实现类 图 1-9 在 Eclipse 中查看 Hadoop 源代码中所有调用 JobTracker.java 中 initJob 方法的函数 图 1-10 Eclipse 列出所有调用 initJob 方法的函数 3. 快速查找类对象的相关信息 同前两个小节类似,选中类对象,右击,在弹出的快捷菜单中选择“Open Declaration”, 可跳转到类定义 ;选择“Quick Outline”,可查看类所有的成员变量和成员方法。具体细节 本书不做详细介绍,读者可自行尝试。 1.4 Hadoop 源代码组织结构 直接解压 Hadoop 压缩包后,可看到图 1-11 所示的目录结构,其中,比较重要的目录 有 src、conf、lib、bin 等。下面分别介绍这几个目录的作用: src :Hadoop 源代码所在的目录。最核心的代码所在子目录分别是 core、hdfs 和 ❑ 第 1 章 阅读源代码前的准备   11 mapred,它们分别实现了 Hadoop 最重要的三个模块,即基础公共库、HDFS 实现和 MapReduce 实现。 conf:配置文件所在目录。Hadoop 的配置文件比较多,其设计原则可概括为如下两点。 ❑ ○ 尽可能模块化,即每个重要模块拥有自己的配置文件,这样使得维护以及管理变得简单。 ○ 动静分离,即将可动态加载的配置选项剥离出来,组成独立配置文件。比如, Hadoop 1.0.0 版本之前,作业队列权限管理相关的配置选项被放在配置文件 mapred- site.xml 中,而该文件是不可以动态加载的,每次修改后必须重启 MapReduce。但 从 1.0.0 版本开始,这些配置选项被剥离放到独立配置文件 mapred-queue-acls.xml 中,该文件可以通过 Hadoop 命令行动态加载。conf 目录下最重要的配置文件有 core-site.xml、hdfs-site.xml 和 mapred-site.xml,分别设置了基础公共库 core、分 布式文件系统 HDFS 和分布式计算框架 MapReduce 的配置选项。 lib:Hadoop 运行时依赖的三方库,包括编译好的 jar 包以及其他语言生成的动态库。 ❑ Hadoop 启动或者用户提交作业时,会自动加载这些库。 bin:运行以及管理 Hadoop 集群相关的脚本。这里介绍几个常用的脚本。 ❑ ○ hadoop:最基本且功能最完备的管理脚本,其他大部分脚本都会调用该脚本。 ○ start-all.sh/stop-all.sh:启动 / 停止所有节点上的 HDFS 和 MapReduce 相关服务。 ○ start-mapred.sh/stop-mapred.sh:单独启动 / 停止 MapReduce 相关服务。 ○ start-dfs.sh/stop-dfs.sh:单独启动 / 停止 HDFS 相关服务。 图 1-11 Hadoop 安装目录结构 12   第一部分 基 础 篇 本书重点介绍 MapReduce 的实现原理,下面就 Hadoop MapReduce 源代码组织结构进 行介绍。Hadoop MapReduce 源代码组织结构 如图 1-12 所示。 图 1-12 Hadoop MapReduce 源代码组织结构 总体上看,Hadoop MapReduce 分为两部分:一部分是 org.apache.hadoop.mapred.*,这 里面主要包含旧的对外编程接口以及 MapReduce 各个服务(JobTracker 以及 TaskTracker) 的实现 ;另一部分是 org.apache.hadoop.mapreduce.*,主要内容涉及新版本的对外编程接口 以及一些新特性(比如 MapReduce 安全)。 1. MapReduce 编程模型相关 org.apache.hadoop.mapred.lib.* :这一系列 Java 包提供了各种可直接在应用程序中使 ❑ 用的 InputFormat、Mapper、Partitioner、Reducer 和 OuputFormat,以减少用户编写 MapReduce 程序的工作量。 org.apache.hadoop.mapred.jobcontrol :该 Java 包允许用户管理具有相互依赖关系的 ❑ 作业(DAG 作业)。 org.apache.hadoop.mapred.join :该 Java 包实现了 map-side join 算法 ❑ 。该算法要求 数据已经按照 key 排好序,且分好片,这样可以只使用 Map Task 实现 join 算法,避 免 re-partition、sort、shuffling 等开销。34 org.apache.hadoop.mapred.pipes:该 Java 包允许用户用 C/C++ 编写 MapReduce 作业。 ❑ org.apache.hadoop.mapreduce:该 Java 包定义了一套新版本的编程接口,这套接口比 ❑ 旧版接口封装性更好。 org.apache.hadoop.mapreduce.* : 这 一 系 列 Java 包 根 据 新 版 接 口 实 现 了 各 种 ❑ InputFormat、Mapper、Partitioner、Reducer 和 OuputFormat。   不同版本 Hadoop 的源代码结构稍有差距,本书的分析是基于 Hadoop 1.0.0 版本的。   Join 算法是将两个表或者文件按照某个 key 值合并起来,在 Hadoop 中,可以在 Map 或者 Reduce 端进行 合并。若在 Reduce 端进行合并,则需要进行 re-partition、sort、shuffling 等操作,开销很大。 第 1 章 阅读源代码前的准备   13 2. MapReduce 计算框架相关 org.apache.hadoop.mapred:Hadoop MapReduce 最核心的实现代码,包括各个服务的 ❑ 具体实现。 org.apache.hadoop.mapred.filecache :Hadoop DistributedCache 实现。DistributedCache ❑ 是 Hadoop 提供的数据分发工具,可将用户应用程序中需要的文件分发到各个节 点上。 org.apache.hadoop.mapred.tools :管理控制 Hadoop MapReduce,当前功能仅包括允 ❑ 许用户动态更新服务级别的授权策略和 ACL(访问权限控制)属性。 org.apache.hadoop.mapreduce.split :该 Java 包的主要功能是根据作业的 InputFormat ❑ 生成相应的输入 split。 org.apache.hadoop.mapreduce.server.jobtracker :该 Java 包维护了 JobTracker 可看到 ❑ 的 TaskTracker 状态信息和资源使用情况。 org.apache.hadoop.mapreduce.server.tasktracker.*:TaskTracker 的一些辅助类。 ❑ 3. MapReduce 安全机制相关 这里只涉及 org.apache.hadoop.mapreduce.security.*。这一系列 Java 包实现了 MapReduce 安全机制。 1.5 Hadoop 初体验 一般而言,我们想要深入学习一个新的系统时,首先要尝试使用该系统,了解系统对 外提供的功能,然后通过某个功能逐步深入其实现细节。本节将介绍如何在伪分布式工作 模式 下使用 Hadoop,包括启动 Hadoop、访问 HDFS 以及向 MapReduce 提交作业等最基 本的操作。本节只是有代表性地介绍 Hadoop 的一些基本使用方法,使读者对 Hadoop 有一 个初步认识,并引导读者逐步进行更全面的学习。5 1.5.1 启动 Hadoop 步骤 1 修改 Hadoop 配置文件。在 conf 目录下,修改 mapred-site.xml、core-site.xml 和 hdfs-site.xml 三个文件,在 之间添加以下内容。 mapred-site.xml: ❑ mapred.job.tracker localhost:9001   单机环境中,Hadoop 有两种工作模式 :本地模式和伪分布式模式。其中,本地模式完全运行在本地,不 会加载任何 MapReduce 服务,因而不会涉及 MapReduce 最核心的代码实现 ;伪分布式模式即为“单点 集群”,在该模式下,所有的守护进程均会运行在单个节点上,因而本节选用该工作模式。 14   第一部分 基 础 篇 core-site.xml: ❑ fs.default.name hdfs://localhost:9000 hdfs-site.xml: ❑ dfs.replication 1 dfs.permissions false 如果是 Windows 环境,还需要在 hadoop-env.xml 中添加以下配置: export JAVA_HOME=D:/hadoop/Java/jdk1.6.0_27 步骤 2 设置免密码登录。前面提到 Hadoop 启动启动 / 停止脚本时需要通过 SSH 发送 命令启动相关守护进程,为了避免每次启动 / 停止 Hadoop 输入密码进行验证,需设置免密 码登录,设置步骤如下。 1)打开命令行终端(Windows 下为 Cygwin 终端,Linux 下为 Shell 终端,下同),输 入以下命令: ssh-keygen -t rsa 执行上述命令后,将会在“~/.ssh/”目录下生成公钥文件 id_rsa.pub 和私钥文件 id_rsa。 2)将公钥文件 id_rsa.pub 中的内容复制到相同目录下的 authorized_keys 文件中: cd ~/.ssh/ cat id_rsa.pub >> authorized_keys 步骤 3 启动 Hadoop。在 Hadoop 安装目录中,按以下两步操作启动 Hadoop。 1)格式化 HDFS: bin/hadoop namenode -format 2)启动 Hadoop: bin/start-all.sh 通过以下 URL 可查看 MapReduce 是否启动成功: http://localhost:50030/ 通过以下 URL 可查看 HDFS 是否启动成功: http://localhost:50070/ 经过以上两步操作,Hadoop 成功启动,接下来可以通过 Hadoop Shell 或者 Eclipse 插 件访问 HDFS 和提交 MapReduce 作业。下面两小节分别介绍 Hadoop Shell 和 Eclipse 插件 第 1 章 阅读源代码前的准备   15 的使用方法。 1.5.2 Hadoop Shell 介绍 在 1.4 节中曾提到,bin 目录下的 Hadoop 脚本是最基础的集群管理脚本,用户可以通过 该脚本完成各种功能,如 HDFS 文件管理、MapReduce 作业管理等。该脚本的使用方法为: hadoop [--config confdir] COMMAND 其 中,--config 用 于 设 置 Hadoop 配 置 文 件 目 录, 默 认 目 录 为 ${HADOOP_HOME}/ conf。而 COMMAND 是具体的某个命令,常用的有 HDFS 管理命令 fs、作业管理命令 job 和作业提交命令 jar 等。它们的使用方法如下: (1)HDFS 管理命令 fs 和作业管理命令 job 它们的用法一样,均为: bin/hadoop command [genericOptions] [commandOptions] 其中,command 可以是 fs 或者 job,genericOptions 是一些通用选项,commandOptions 是 fs 或者 job 附加的命令选项。看下面两个例子。 在 HDFS 上创建一个目录 /test: ❑ bin/hadoop fs -mkdir /test 显示 Hadoop 上正在运行的所有作业: ❑ bin/hadoop job -list (2)作业提交命令 jar 这个命令的用法是: hadoop jar [mainClass] args.. 其中, 表示 jar 包名 ;mainClass 表示 main class 名称,可以不必输入而由 jar 命 令自动搜索;args 是 main class 输入参数。举例如下: bin/hadoop jar hadoop-examples-1.0.0.jar wordcount /test/input /test/ouput 其中,wordcount 是 hadoop-examples-1.0.0.jar 中一个作业名称。顾名思义,该作业用 于统计输入文件中的每个单词出现的次数,它有两个输入参数 :输入数据目录(/test/input) 和输出数据目录(/test/output)。 至于其他更多命令,读者可自行查阅 Hadoop 官方设计文档。 1.5.3 Hadoop Eclipse 插件介绍 Hadoop 提供了一个 Eclipse 插件以方便用户在 Eclipse 集成开发环境中使用 Hadoop, 如管理 HDFS 上的文件、提交作业、调试 MapReduce 程序等。本小节将介绍如何使用该插 件访问 HDFS、运行 MapReduce 作业和跟踪 MapReduce 作业运行过程。 16   第一部分 基 础 篇 1. 编译生成 Eclipse 插件 用户需要自己生成 Eclipse 插件。Hadoop-1.0.0 生成的 jar 包不能直接使用,需要进 行部分修改,具体参考附录 A 中的问题 2,其代码位于 Hadoop 安装目录的 src/contrib/ eclipse-plugin 下,在该目录下,输入以下命令生成 Eclipse 插件: ant -Declipse.home=/home/dong/eclipse -Dversion=1.0.0 其中,eclipse.home 用来指定 Eclipse 安装目录,version 是 Hadoop 版本号。${HADOOP_ HOME}/build/contrib 目录下生成的 hadoop-eclipse-plugin-1.0.0.jar 文件即为 Eclipse 插件。 2. 配置 Eclipse 插件 将 生 成 的 Eclipse 插 件 hadoop-eclipse-plugin-1.0.0.jar 复 制 到 Eclipse 安 装 目 录 的 plugins 文件夹下,然后重启 Eclipse。 进入 Eclipse 后,按照以下步骤进行设置 :在菜单栏中依次单击“Window”→“Show View”→“Other...”,在对话框中依次单击“MapReduce Tools”→“Map/Reduce Locations”, 会弹出图 1-13a 所示的对话框,按图中提示填写内容。 经上述步骤后,回到主界面,如图 1-13b 所示,可在“Project Explore”视图中查看分 布式文件系统的内容,说明 Eclipse 插件安装成功。 图 1-13 配置 Hadoop Eclipse 插件 a)配置 MapReduce 和 HDFS 的主机名和端口号 b)显示 HDFS 中的文件列表 3. 运行 MapReduce 作业 前面提到,在伪分布式环境下,单个节点上会同时运行多种 Hadoop 服务。为了跟踪这 些服务的运行轨迹,我们采用了以下方法 :向 Hadoop 提交一个 MapReduce 作业,通过跟 踪该作业的运行轨迹来分析 Hadoop 的内部实现原理。 该方法可通过以下三个步骤完成: 步 骤 1  新 建 一 个 MapReduce 工 程。 在 菜 单 栏 中, 依 次 单 击“New” →“Other...” a) b) 第 1 章 阅读源代码前的准备   17 →“MapReduce Project”,会弹出图 1-14 所示的对话框。在该对话框中填写项目名称,并配 置 Hadoop 安装目录,此处可直接选择前面已经建好的 Java 工程“hadoop-1.0.0”。 图 1-14 在 Eclipse 中创建 MapReduce 工程 步骤 2 准备 MapReduce 作业。可直接将 Hadoop 源代码中 src\examples\org\apache\ hadoop\examples 目录下的 WordCount.java 复制到新建的 MapReduceJobs 工程中。 步骤 3 运行作业。 1)准备数据。如图 1-15 所示,在 HDFS 上创建目录 /test/input,并上传几个文本文件 到该目录中。 图 1-15 使用 Eclipse 插件上传文件到 HDFS 2)配置输入 / 输出路径。如图 1-16 所示,在 WordCount.java 中右击,在弹出的快捷 菜单中依次单击“Run As”→“Run Configurations...”,会出现图 1-17 所示的对话框。双 击“Java Applications”选项,在新建的对话框中输入作业的输入 / 输出路径(中间用空格 分隔),并单击“Apply”按钮保存。 3) 运 行 作 业。 在 WordCount.java 中 右 击, 在 弹 出 的 快 捷 菜 单 中 依 次 单 击“Run As”→“Run on Hadoop”,会出现如图 1-18 所示的对话框。按图中的提示选择后,单击 “Finish”按钮,作业开始运行。 此外,有兴趣的读者可以在 MapReduce 作业中设置断点,对作业进行断点调试。 18   第一部分 基 础 篇 图 1-16 在 Eclipse 中配置作业的输入 / 输出路径(1) 图 1-17 在 Eclipse 中配置作业的输入 / 输出路径(2) 第 1 章 阅读源代码前的准备   19 图 1-18 在 Eclipse 中运行作业 1.6 编译及调试 Hadoop 源代码 读者在阅读源代码的过程中,可能需要修改部分源代码或者使用调试工具以便跟踪某 些变量值的变化过程,此时要用到 Hadoop 源代码编译和调试方法。本节将介绍 Hadoop 在 伪分布式模式下的编译和调试方法,其中,调试方法主要介绍使用 Eclipse 远程调试和打印 调试日志两种。 1.6.1 编译 Hadoop 源代码 在 Windows 或 Linux 环境下,打开命令行终端,转到 Hadoop 安装目录下并输入以下 命令: ant -Dversion=1.0.0 {target} 其中,{target} 值如表 1-2 所示,不同的 target 可对应生成不同的 jar 包,如: ant -Dversion=1.0.0 examples 可生成 hadoop-examples-1.0.0.jar,产生的 jar 包位于 Hadoop 安装目录的 build 文件 夹下。 表 1-2 编译 target 与对应生成的 jar 包 target jar 包 jar hadoop-core-1.0.0.jar examples hadoop-examples-1.0.0.jar tools-jar hadoop-tools-1.0.0.jar jar-test hadoop-test-1.0.0.jar ant-tasks hadoop-ant-1.0.0.jar 20   第一部分 基 础 篇 1.6.2 调试 Hadoop 源代码 本小节介绍两种调试方式 :利用 Eclipse 远程调试和打印调试日志。这两种方式均可以 调试伪分布式工作模式和完全分布式工作模式下的 Hadoop。本小节主要介绍伪分布式工作 模式下的 Hadoop 调试方法。 1. 利用 Eclipse 进行远程调试 下面以调试 JobTracker 为例,介绍利用 Eclipse 进行远程调试的基本方法。调试过程可 分三步进行: 步骤 1 调试模式下启动 Hadoop。 在 Hadoop 安装目录下运行内容如下的 Shell 脚本: export HADOOP_JOBTRACKER_OPTS="-Xdebug -Xrunjdwp:transport=dt_socket,address=878 8,server=y,suspend=y" bin/start-all.sh 如果脚本运行成功,则可以看到 Shell 命令行终端显示如下信息: Listening for transport dt_socket at address: 8788 此时表明 JobTracker 处于监听状态。JobTracker 将一直处于监听状态,直到收到 debug 确认信息。 步骤 2 设置断点。 在前面新建的 Java 工程“hadoop-1.0.0”中,找到 JobTracker 相关代码,并在感兴趣 的地方设置一些断点。 步骤 3 在 Eclipse 中调试 Hadoop 程序。 在 Eclipse 的菜单栏中,依次单击“Run”→“Debug Configurations”→“Remote Java Applications”,打开图 1-19 所示的对话框,按图中的提示填写名称、JobTracker 所在的 host 以及监听端口,并选择 Hadoop 源代码工程,进入图 1-20 所示的调试模式。 图 1-19 在 Eclipse 中配置远程调试器 第 1 章 阅读源代码前的准备   21 图 1-20 Eclipse 中显示的 Hadoop 调试窗口 调试过程中,JobTracker 输出的信息被存储到日志文件夹下的 hadoop-XXX-jobtracker- localhost.log 文件(XXX 为当前用户名)中,可通过以下命令查看调试过程中打印的日志: tail -f logs/hadoop-XXX-jobtracker-localhost.log 2. 打印 Hadoop 调试日志 Hadoop 使用了 Apache log4j 作为基础日志库。该日志库将日志分为 5 个级别,分别为 DEBUG、INFO、WARN、ERROR 和 FATAL。这 5 个级别对应的日志信息重要程度不同,它 们的重要程度由低到高依次为 DEBUG < INFO < WARN < ERROR < FATAL。日志输出规则为: 只输出级别不低于设定级别的日志信息。比如,级别设定为 INFO,则 INFO、WARN、ERROR 和 FATAL 级别的日志信息都会被输出,但级别比 INFO 低的 DEBUG 则不会被输出。6 在 Hadoop 源代码中,大部分 Java 文件中存在调试日志(DEBUG 级别日志),但默 认情况下,日志级别是 INFO。为了查看更详细的运行状态,可采用以下几种方法打开 DEBUG 日志。 (1)使用 Hadoop Shell 命令 可使用 Hadoop 脚本中的 daemonlog 命令查看和修改某个类的日志级别,比如,可通过 以下命令查看 TaskTracker 类的日志级别: bin/hadoop daemonlog -getlevel ${tasktracker -host}:50075 \ org.apache.hadoop.mapred.TaskTracker  Apache log4j 网址:http://logging.apache.org/log4j/index.html 22   第一部分 基 础 篇 可通过以下命令将 JobTracker 类的日志级别修改为 DEBUG: bin/hadoop daemonlog -setlevel ${tasktracker-host}:50075 \ org.apache.hadoop.mapred.TaskTracker DEBUG 其 中,tasktracker-host 为 TaskTracker 的 host,50075 是 TaskTracker 的 HTTP 端 口 号 (其他服务的 HTTP 端口号可参考附录 B)。 (2)通过 Web 界面 用户可以通过 Web 界面查看和修改某个类的日志级别,比如,可通过以下 URL 修改 TaskTracker 类的日志级别: http://${tasktracker-host}:50075/logLevel (3)修改 log4j.properties 文件 以上两种方法只能暂时修改日志级别。当 Hadoop 重启后会被重置,如果要永久性改变 日志级别,可在目标节点配置目录下的 log4j.properties 文件中添加以下配置选项: log4j.logger.org.apache.hadoop.mapred.TaskTracker=DEBUG 此外,有时为了专门调试某个 Java 文件,需要把该文件的相关日志输出到一个单独文 件中,可在 log4j.properties 中添加以下内容: # 定义输出方式为自定义的 TTOUT log4j.logger.org.apache.hadoop.mapred.TaskTracker=DEBUG,TTOUT # 设置 TTOUT 的输出方式为输出到文件 log4j.appender.TTOUT =org.apache.log4j.FileAppender # 设置文件路径 log4j.appender.TTOUT.File=${hadoop.log.dir}/TaskTracker.log # 设置文件的布局 log4j.appender.TTOUT.layout=org.apache.log4j.PatternLayout # 设置文件的格式 log4j.appender.TTOUT.layout.ConversionPattern=%d{ISO8601} %p %c: %m%n 这些配置选项会把 TaskTracker.java 中的 DEBUG 日志写到日志目录下的 TaskTracker. log 文件中。 在阅读源代码的过程中,为了跟踪某个变量值的变化,读者可能需要自己添加一些 DEBUG 日志。在 Hadoop 源代码中,大部分类会定义一个日志打印对象。通过该对象,可 打印各个级别的日志。比如,在 JobTracker 中由以下代码定义对象 LOG: public static final Log LOG = LogFactory.getLog(JobTracker.class); 用户可使用 LOG 对象打印调试日志,比如,可在 JobTracker 的 main 函数首行添加以 下代码: LOG.debug("Start to lauch JobTracker..."); 然后重新编译 Hadoop 源代码,并将 org.apache.hadoop.mapred. JobTracker 的调试级别 修改为 DEBUG,重新启动 Hadoop 后便可以看到该调试信息。 第 1 章 阅读源代码前的准备   23 1.7 小结 搭建一个高效的源代码学习环境是深入学习 Hadoop 的良好开端,本章主要内容正是帮 助读者搭建一个这样的学习环境。在作者看来,一个高效的 Hadoop 学习环境至少应该包括 源代码阅读环境、Hadoop 使用环境和源代码编译调试环境,而本章正是围绕这三个环境的 搭建方法编写的。 本章首先分别介绍了在 Linux 和 Windows 环境下搭建 Hadoop 源代码阅读环境的方法 ; 在此基础上,进一步介绍了 Hadoop 的基本使用方法,主要涉及 Hadoop Shell 和 Eclipse 插 件两种工具的使用 ;最后介绍了 Hadoop 源代码编译和调试方法,其中,调试方法主要介绍 了使用 Eclipse 远程调试和打印调试日志两种。 第 2 章 MapReduce 设计理念与基本架构 第 1 章介绍了 Hadoop 学习环境的搭建方法,这是学习 Hadoop 需要进行的最基本 的准备工作。而在这一章中,我们将从设计理念和基本架构方面对 Hadoop MapReduce 进行介绍,同样,这属于准备工作的一部分。通过本章的介绍将会为后面几章深入剖析 MapReduce 内部实现奠定基础。 MapReduce 是一个分布式计算框架,主要由两部分组成 :编程模型和运行时环境。其 中,编程模型为用户提供了非常易用的编程接口,用户只需要像编写串行程序一样实现几 个简单的函数即可实现一个分布式程序,而其他比较复杂的工作,如节点间的通信、节点 失效、数据切分等,全部由 MapReduce 运行时环境完成,用户无须关心这些细节。在本章 中,我们将从设计目标、编程模型和基本架构等方面对 MapReduce 框架进行介绍。 2.1 Hadoop 发展史 2.1.1 Hadoop 产生背景 78910 Hadoop 最早起源于 Nutch 。Nutch 是一个开源的网络搜索引擎,由 Doug Cutting 于 2002 年创建。Nutch 的设计目标是构建一个大型的全网搜索引擎,包括网页抓取、索引、 查询等功能,但随着抓取网页数量的增加,遇到了严重的可扩展性问题,即不能解决数十 亿网页的存储和索引问题。之后,谷歌发表的两篇论文为该问题提供了可行的解决方案。 一篇是 2003 年发表的关于谷歌分布式文件系统(GFS)的论文 。该论文描述了谷歌搜索 引擎网页相关数据的存储架构,该架构可解决 Nutch 遇到的网页抓取和索引过程中产生的 超大文件存储需求的问题。但由于谷歌仅开源了思想而未开源代码,Nutch 项目组便根据论 文完成了一个开源实现,即 Nutch 的分布式文件系统(NDFS)。另一篇是 2004 年发表的关 于谷歌分布式计算框架 MapReduce 的论文 。该论文描述了谷歌内部最重要的分布式计算 框架 MapReduce 的设计艺术,该框架可用于处理海量网页的索引问题。同样,由于谷歌未 开源代码,Nutch 的开发人员完成了一个开源实现。由于 NDFS 和 MapReduce 不仅适用于 搜索领域,2006 年年初,开发人员便将其移出 Nutch,成为 Lucene 的一个子项目,称为  http://nutch.apache.org/   论文 :Sanjay Ghemawat, Howard Gobioff, Shun-Tak Leung, “The Google file system”, Proceedings of the nineteenth ACM Symposium on Operating Systems Principles.   论文 :J. Dean and S. Ghemawat, “Mapreduce: simplified data processing on large clusters”, Proceedings of the 6th Conference on Symposium on Opearting Systems Design & Implementation.   http://lucene.apache.org/ 第 2 章 MapReduce 设计理念与基本架构   25 Hadoop。大约同一时间,Doug Cutting 加入雅虎公司,且公司同意组织一个专门的团队继 续发展 Hadoop。同年 2 月,Apache Hadoop 项目正式启动以支持 MapReduce 和 HDFS 的独 立发展。2008 年 1 月,Hadoop 成为 Apache 顶级项目,迎来了它的快速发展期。 2.1.2 Apache Hadoop 新版本的特性 当前 Apache Hadoop 版本非常多,本小节将帮助读者梳理各个版本的特性以及它们 之间的联系。在讲解 Hadoop 各版本之前,先要了解 Apache 软件发布方式。对于任何一 个 Apache 开源项目,所有的基础特性均被添加到一个称为“trunk”的主代码线(main codeline)。当需要开发某个重要的特性时,会专门从主代码线中延伸出一个分支(branch), 这被称为一个候选发布版(candidate release)。该分支将专注于开发该特性而不再添加其 他新的特性,待基本 bug 修复之后,经过相关人士投票便会对外公开成为发布版(release version),并将该特性合并到主代码线中。需要注意的是,多个分支可能会同时进行研发, 这样,版本高的分支可能先于版本低的分支发布。 由于 Apache 以特性为准延伸新的分支,故在介绍 Apache Hadoop 版本之前,先介绍几 个独立产生的 Apache Hadoop 新版本的重大特性: Append ❑ :HDFS Append 主要完成追加文件内容的功能,也就是允许用户以 append 方式修改 HDFS 上的文件。HDFS 最初的一个设计目标是支持 MapReduce 编程模型, 而该模型只需要写一次文件,之后仅进行读操作而不会对其修改,即“write-once- read-many”,这就不需要支持文件追加功能。但随着 HDFS 变得流行,一些具有写需 求的应用想以 HDFS 作为存储系统,比如,有些应用程序需要往 HDFS 上某个文件 中追加日志信息,HBase 需使用 HDFS 具有的 append 功能以防止数据丢失 等。 HDFS RAID ❑ :Hadoop RAID 模块在 HDFS 之上构建了一个新的分布式文件系统 : Distributed Raid FileSystem(DRFS)。该系统采用了 Erasure Codes 增强对数据的保 护。有了这样的保护,可以采用更少的副本数来保持同样的可用性保障,进而为用 户节省大量存储空间。1112131415 Symlink ❑ :让 HDFS 支持符号链接。符号链接是一种特殊的文件,它以绝对或者相 对路径的形式指向另外一个文件或者目录(目标文件)。当程序向符号链接中写数据 时,相当于直接向目标文件中写数据。 Security ❑ :Hadoop 的 HDFS 和 MapReduce 均缺乏相应的安全机制,比如在 HDFS 中,用户只要知道某个 block 的 blockID,便可以绕过 NameNode 直接从 DataNode 上读取该 block,用户可以向任意 DataNode 上写 block ;在 MapReduce 中,用户可   0.20-append :https://issues.apache.org/jira/browse/HDFS-200,0.21.0-append :https://issues.apache.org/ jira/browse/HDFS-265   http://hbase.apache.org/book/hadoop.html   http://wiki.apache.org/hadoop/HDFS-RAID 与 https://issues.apache.org/jira/browse/HDFS-503   https://issues.apache.org/jira/browse/HDFS-245   https://issues.apache.org/jira/browse/HADOOP-4487 26   第一部分 基 础 篇 以修改或者删掉任意其他用户的作业等。为了增强 Hadoop 的安全机制,从 2009 年 起,Apache 专门组成一个团队,为 Hadoop 增加基于 Kerberos 和 Deletion Token 的 安全认证和授权机制。 MRv1 :第一代 MapReduce 计算框架。它由两部分组成 :编程模型(programming ❑ model)和运行时环境(runtime environment)。它的基本编程模型是将问题抽象成 Map 和 Reduce 两个阶段。其中,Map 阶段将输入数据解析成 key/value,迭代调用 map() 函数处理后,再以 key/value 的形式输出到本地目录 ;Reduce 阶段则将 key 相 同的 value 进行规约处理,并将最终结果写到 HDFS 上。它的运行时环境由两类服务 组成:JobTracker 和 TaskTracker,其中,JobTracker 负责资源管理和所有作业的控制, 而 TaskTracker 负责接收来自 JobTracker 的命令并执行它。 YARN/MRv2 :针对 MRv1 中的 MapReduce 在扩展性和多框架支持方面的不足,提 ❑ 出了全新的资源管理框架 YARN(Yet Another Resource Negotiator)。它将 JobTracker 中 的 资 源 管 理 和 作 业 控 制 功 能 分 开, 分 别 由 两 个 不 同 进 程 ResourceManager 和 ApplicationMaster 实现。其中,ResourceManager 负责所有应用程序的资源分配,而 ApplicationMaster 仅负责管理一个应用程序。 NameNode Federation ❑ :针对 Hadoop 1.0 中 NameNode 内存约束限制其扩展性问题 提出的改进方案。它将 NameNode 横向扩展成多个,其中,每个 NameNode 分管一 部分目录。这不仅增强了 HDFS 扩展性,也使 HDFS NameNode 具备了隔离性。1617 NameNode HA ❑ :HDFS NameNode 存在两个问题,即 NameNode 内存约束限制扩 展性和单点故障。其中,第一个问题通过 NameNode Federation 方案解决,而第二个 问题则通过 NameNode 热备方案(即 NameNode HA)实现。 2.1.3 Hadoop 版本变迁 到 2012 年 5 月为止,Apache Hadoop 已经出现四个大的分支,如图 2-1 所示。 Apache Hadoop 的四大分支构成了四个系列的 Hadoop 版本。 1. 0.20.X 系列 0.20.2 版本发布后,几个重要的特性没有基于 trunk 而是在 0.20.2 基础上继续研发。值 得一提的主要有两个特性 :Append 与 Security。其中,含 Security 特性的分支以 0.20.203 版本发布,而后续的 0.20.205 版本综合了这两个特性。需要注意的是,之后的 1.0.0 版本仅 是 0.20.205 版本的重命名。0.20.X 系列版本是最令用户感到疑惑的,因为它们具有的一些 特性,trunk 上没有;反之,trunk 上有的一些特性,0.20.X 系列版本却没有。 2. 0.21.0/0.22.X 系列 这一系列版本将整个 Hadoop 项目分割成三个独立的模块,分别是 Common、HDFS 和  https://issues.apache.org/jira/browse/HDFS-1052  https://issues.apache.org/jira/browse/HDFS-1623 第 2 章 MapReduce 设计理念与基本架构   27 MapReduce。HDFS 和 MapReduce 都对 Common 模块有依赖性,但是 MapReduce 对 HDFS 并没有依赖性。这样,MapReduce 可以更容易地运行其他分布式文件系统,同时,模块间 可以独立开发。具体各个模块的改进如下。 Common 模 块 : 最 大 的 新 特 性 是 在 测 试 方 面 添 加 了 Large-Scale Automated Test ❑ Framework 和 Fault Injection Framework 。 1819 HDFS 模块 :主要增加的新特性包括支持追加操作与建立符号连接、Secondary ❑ NameNode 改进(Secondary NameNode 被剔除,取而代之的是 Checkpoint Node,同 时添加一个 Backup Node 的角色,作为 NameNode 的冷备)、允许用户自定义 block 放置算法等。 MapReduce 模块 :在作业 API 方面,开始启动新 MapReduce API,但老的 API 仍然 ❑ 兼容。 0.22.0 在 0.21.0 的基础上修复了一些 bug 并进行了部分优化。 图 2-1 Hadoop 版本变迁图 2021 3. 0.23.X 系列 0.23.X 是为了克服 Hadoop 在扩展性和框架通用性方面的不足而提出来的。它实际上 是一个全新的平台,包括分布式文件系统 HDFS Federation 和资源管理框架 YARN 两部分, 可对接入的各种计算框架(如 MapReduce、Spark 等)进行统一管理。它的发行版自带 MapReduce 库,而该库集成了迄今为止所有的 MapReduce 新特性。  参考 https://issues.apache.org/jira/browse/HADOOP-6332  参考 http://hadoop.apache.org/hdfs/docs/r0.21.0/faultinject_framework.html  图片修改自:http://www.cloudera.com/blog/2012/01/an-update-on-apache-hadoop-1-0/  Spark 是一种内存计算框架,支持迭代式计算,主页是 http://www.spark-project.org/. 28   第一部分 基 础 篇 4. 2.X 系列 同 0.23.X 系列一样,2.X 系列也属于下一代 Hadoop。与 0.23.X 系列相比,2.X 系列增 加了 NameNode HA 和 Wire-compatibility 等新特性。 表 2-1 总结了 Hadoop 各个发布版的特性以及稳定性。 表 2-1 Hadoop 各个发布版的特性以及稳定性 时间 发布 版本 特性 是否稳定 Append RAID Symlink Security MRv1 YARN NameNode Federation NameNode HA 2010 年 0.20.2 × × × × √ × × × 是 0.21.0 √ √ √ × √ × × √ 否 2011 年 0.20.203 × × × √ √ × × × 是 (Yahoo ! 在 4 500 个节点上 部署) 0.20.205 (1.0.0) √ × × √ √ × × × 是 0.22.0 √ √ √ √ √ × × √ 否 0.23.0 √ √ √ √ × √ √ × 否 2012 年 1.X √ × × √ √ × × × 是 2.X √ √ √ √ × √ √ √ 否 本书之所以以分析 Apache Hadoop 1.0.0 为主,主要是因为这是一个稳定的版本,再 有其为 1.0.0,具有里程碑意义。Apache 发布这个版本,也是希望该版本成为业界的规范。 需要注意的是,尽管本书以分析 Apache Hadoop 1.0.0 版本为主,但本书内容适用于所有 Apache Hadoop 1.X 版本 。2223 2.2 Hadoop MapReduce 设计目标 通过上一节关于 Hadoop MapReduce 历史的介绍我们知道,Hadoop MapReduce 诞生于 搜索领域,主要解决搜索引擎面临的海量数据处理扩展性差的问题。它的实现很大程度上 借鉴了谷歌 MapReduce 的设计思想,包括简化编程接口、提高系统容错性等。总结 Hadoop MapReduce 设计目标,主要有以下几个。 易于编程 :传统的分布式程序设计(如 MPI)非常复杂,用户需要关注的细节非常 ❑ 多,比如数据分片、数据传输、节点间通信等,因而设计分布式程序的门槛非常高。 Hadoop 的一个重要设计目标便是简化分布式程序设计,将所有并行程序均需要关注 的设计细节抽象成公共模块并交由系统实现,而用户只需专注于自己的应用程序逻  0.22.0 版本中只有 HDFS Security,没有 MapReduce Security。  不同版本之间细节可能稍有不同,此时以 Hadoop 1.0.0 版本为主。 第 2 章 MapReduce 设计理念与基本架构   29 辑实现,这样简化了分布式程序设计且提高了开发效率。 良好的扩展性 :随着公司业务的发展,积累的数据量(如搜索公司的网页量)会越 ❑ 来越大,当数据量增加到一定程度后,现有的集群可能已经无法满足其计算能力和 存储能力,这时候管理员可能期望通过添加机器以达到线性扩展集群能力的目的。 高容错性 :在分布式环境下,随着集群规模的增加,集群中的故障率(这里的“故 ❑ 障”包括磁盘损坏、机器宕机、节点间通信失败等硬件故障和坏数据或者用户程序 bug 产生的软件故障)会显著增加,进而导致任务失败和数据丢失的可能性增加。为 此,Hadoop 通过计算迁移或者数据迁移等策略提高集群的可用性与容错性。 2.3 MapReduce 编程模型概述 2.3.1 MapReduce 编程模型简介 从 MapReduce 自 身 的 命 名 特 点 可 以 看 出,MapReduce 由 两 个 阶 段 组 成 :Map 和 Reduce。用户只需编写 map() 和 reduce() 两个函数,即可完成简单的分布式程序的设计。 map() 函数以 key/value 对作为输入,产生另外一系列 key/value 对作为中间输出写入本地 磁盘。MapReduce 框架会自动将这些中间数据按照 key 值进行聚集,且 key 值相同(用户可 设定聚集策略,默认情况下是对 key 值进行哈希取模)的数据被统一交给 reduce() 函数处理。 reduce() 函数以 key 及对应的 value 列表作为输入,经合并 key 相同的 value 值后,产 生另外一系列 key/value 对作为最终输出写入 HDFS。 下面以 MapReduce 中的“hello world”程序—WordCount 为例介绍程序设计方法。 “hello world”程序是我们学习任何一门编程语言编写的第一个程序。它简单且易于 理解,能够帮助读者快速入门。同样,分布式处理框架也有自己的“hello world”程序 : WordCount。它完成的功能是统计输入文件中的每个单词出现的次数。在 MapReduce 中, 可以这样编写(伪代码)。 其中 Map 部分如下: // key: 字符串偏移量 // value: 一行字符串内容 map(String key, String value) : // 将字符串分割成单词 words = SplitIntoTokens(value); for each word w in words: EmitIntermediate(w, "1"); Reduce 部分如下: // key: 一个单词 // values: 该单词出现的次数列表 reduce(String key, Iterator values): int result = 0; for each v in values: result += StringToInt(v); Emit(key, IntToString(result)); 30   第一部分 基 础 篇 用户编写完 MapReduce 程序后,按照一定的规则指定程序的输入和输出目录,并提交 到 Hadoop 集群中。作业在 Hadoop 中的执行过程如图 2-2 所示。Hadoop 将输入数据切分 成若干个输入分片(input split,后面简称 split),并将每个 split 交给一个 Map Task 处理 ; Map Task 不断地从对应的 split 中解析出一个个 key/value,并调用 map() 函数处理,处理完 之后根据 Reduce Task 个数将结果分成若干个分片(partition)写到本地磁盘 ;同时,每个 Reduce Task 从每个 Map Task 上读取属于自己的那个 partition,然后使用基于排序的方法将 key 相同的数据聚集在一起,调用 reduce() 函数处理,并将结果输出到文件中。 图 2-2 WordCount 程序运行过程 细心的读者可能注意到,上面的程序还缺少三个基本的组件,功能分别是 :①指定输入 文件格式。将输入数据切分成若干个 split,且将每个 split 中的数据解析成一个个 map() 函数 要求的 key/value 对。②确定 map() 函数产生的每个 key/value 对发给哪个 Reduce Task 函数处 理。③指定输出文件格式,即每个 key/value 对以何种形式保存到输出文件中。 在 Hadoop MapReduce 中,这三个组件分别是 InputFormat、Partitioner 和 OutputFormat, 它们均需要用户根据自己的应用需求配置。而对于上面的 WordCount 例子,默认情况下 第 2 章 MapReduce 设计理念与基本架构   31 Hadoop 采用的默认实现正好可以满足要求,因而不必再提供。 综 上 所 述,Hadoop MapReduce 对 外 提 供 了 5 个 可 编 程 组 件, 分 别 是 InputFormat、 Mapper、Partitioner、Reducer 和 OutputFormat 。本书将在第 3 章中详细介绍它们的设计 思路以及扩展实现。24 2.3.2 MapReduce 编程实例 MapReduce 能够解决的问题有一个共同特点 :任务可以被分解为多个子问题,且这些 子问题相对独立,彼此之间不会有牵制,待并行处理完这些子问题后,任务便被解决。在 实际应用中,这类问题非常庞大,谷歌在论文中提到了 MapReduce 的一些典型应用,包括 分布式 grep、URL 访问频率统计、Web 连接图反转、倒排索引构建、分布式排序等,这些 均是比较简单的应用。下面介绍一些比较复杂的应用。 (1)Top K 问题 在搜索引擎领域中,常常需要统计最近最热门的 K 个查询词,这就是典型的“Top K” 问题,也就是从海量查询中统计出现频率最高的前 K 个。该问题可分解成两个 MapReduce 作业,分别完成统计词频和找出词频最高的前 K 个查询词的功能。这两个作业存在依赖关 系,第二个作业需要依赖前一个作业的输出结果。第一个作业是典型的 WordCount 问题。 对于第二个作业,首先 map() 函数中输出前 K 个频率最高的词,然后由 reduce() 函数汇总 每个 Map 任务得到的前 K 个查询词,并输出频率最高的前 K 个查询词。 (2)K-means 聚类 K-means 是一种基于距离的聚类算法。它采用距离作为相似性的评价指标,认为两个 对象的距离越近,其相似度就越大。该算法解决的问题可抽象成 :给定正整数 K 和 N 个对 象,如何将这些数据点划分为 K 个聚类? 该问题采用 MapReduce 计算的思路如下 :首先随机选择 K 个对象作为初始中心点,然 后不断迭代计算,直到满足终止条件(达到迭代次数上限或者数据点到中心点距离的平方 和最小)。在第 I 轮迭代中,map() 函数计算每个对象到中心点的距离,选择距每个对象 (object)最近的中心点(center_point),并输出 对。reduce() 函数计 算每个聚类中对象的距离均值,并将这 K 个均值作为下一轮初始中心点。 (3)贝叶斯分类 贝叶斯分类是一种利用概率统计知识进行分类的统计学分类方法。该方法包括两个步 骤 :训练样本和分类。其实现由多个 MapReduce 作业完成,具体如图 2-3 所示。其中,训 练样本可由三个 MapReduce 作业实现:第一个作业(ExtractJob)抽取文档特征,该作业只 需要 Map 即可完成 ;第二个作业(ClassPriorJob)计算类别的先验概率,即统计每个类别 中文档的数目,并计算类别概率 ;第三个作业(ConditionalProbilityJob)计算单词的条件 概率,即统计 在所有文档中出现的次数并计算单词的条件概率。后两个作业 的具体实现类似于 WordCount。分类过程由一个作业(PredictJob)完成。该作业的 map()  还有一个组件是 Canbiner,它通常用于优化 MapReduce 程序性能,但不属于必备组件。 32   第一部分 基 础 篇 函数计算每个待分类文档属于每个类别的概率,reduce() 函数找出每个文档概率最高的类别, 并输出 (编号为 docid 的文档属于类别 label)。 图 2-3 朴素贝叶斯分类算法在 MapReduce 上实现 前 面 介 绍 的 是 MapReduce 可 以 解 决 的 一 些 问 题。 为 了 便 于 读 者 更 深 刻 地 理 解 MapReduce,下面介绍 MapReduce 不能解决或者难以解决的一些问题。 1)Fibonacci 数值计算。Fibonacci 数值计算时,下一个结果需要依赖于前面的计算 结果,也就是说,无法将该问题划分成若干个互不相干的子问题,因而不能用 MapReduce 解决。 2)层次聚类法。层次聚类法是应用最广泛的聚类算法之一。层次聚类法采用迭代控制 策略,使聚类逐步优化。它按照一定的相似性(一般是距离)判断标准,合并最相似的部 分或者分割最不相似的部分。按采用“自顶向下”和“自底向上”两种方式,可将其分为 分解型层次聚类法和聚结型层次聚类法两种。以分解型层次聚类算法为例,其主要思想是, 开始时,将每个对象归为一类,然后不断迭代,直到所有对象合并成一个大类(或者达到 某个终止条件);在每轮迭代时,需计算两两对象间的距离,并合并距离最近的两个对象为 一类。该算法需要计算两两对象间的距离,也就是说每个对象和其他对象均有关联,因而 该问题不能被分解成若干个子问题,进而不能用 MapReduce 解决。 2.4 Hadoop 基本架构 Hadoop 由两部分组成,分别是分布式文件系统和分布式计算框架 MapReduce。其中, 分布式文件系统主要用于大规模数据的分布式存储,而 MapReduce 则构建在分布式文件系 统之上,对存储在分布式文件系统中的数据进行分布式计算。本书主要涉及 MapReduce, 但考虑到它的一些功能跟底层存储机制相关,因而会首先介绍分布式文件系统。 第 2 章 MapReduce 设计理念与基本架构   33 在 Hadoop 中,MapReduce 底层的分布式文件系统是独立模块,用户可按照约定的一套 接口实现自己的分布式文件系统,然后经过简单的配置后,存储在该文件系统上的数据便 可以被 MapReduce 处理。Hadoop 默认使用的分布式文件系统是 HDFS(Hadoop Distributed File System,Hadoop 分布式文件系统),它与 MapReduce 框架紧密结合。本节首先介绍分 布式存储系统 HDFS 的基础架构,然后介绍 MapReduce 计算框架。 2.4.1 HDFS 架构 HDFS 是一个具有高度容错性的分布式文件系统,适合部署在廉价的机器上。HDFS 能 提供高吞吐量的数据访问,非常适合大规模数据集上的应用。 HDFS 的架构如图 2-4 所示,总体上采用了 master/slave 架构,主要由以下几个组件组成 : Client、NameNode、Secondary、NameNode 和 DataNode。下面分别对这几个组件进行介绍。25 图 2-4 HDFS 架构图 (1)Client Client(代表用户)通过与 NameNode 和 DataNode 交互访问 HDFS 中的文件。Client 提供了一个类似 POSIX 的文件系统接口供用户调用。26 (2)NameNode 整个 Hadoop 集群中只有一个 NameNode。它是整个系统的“总管”,负责管理 HDFS 的目录树和相关的文件元数据信息。这些信息是以“fsimage”(HDFS 元数据镜像文件) 和 “editlog”(HDFS 文件改动日志)两个文件 形式存放在本地磁盘,当 HDFS 重启时重  在 Hadoop 0.21.0 版本中,SecondaryNameNode 被 Checkpoint Node 代替。  在 Hadoop 0.21.0 版本中,这两个文件被合并成一个。 34   第一部分 基 础 篇 新构造出来的。此外,NameNode 还负责监控各个 DataNode 的健康状态,一旦发现某个 DataNode 宕掉,则将该 DataNode 移出 HDFS 并重新备份其上面的数据。 (3)Secondary NameNode Secondary NameNode 最重要的任务并不是为 NameNode 元数据进行热备份,而是定期 合并 fsimage 和 edits 日志,并传输给 NameNode。这里需要注意的是,为了减小 NameNode 压力,NameNode 自己并不会合并 fsimage 和 edits,并将文件存储到磁盘上,而是交由 Secondary NameNode 完成。 (4)DataNode 一般而言,每个 Slave 节点上安装一个 DataNode,它负责实际的数据存储,并将数据 信息定期汇报给 NameNode。DataNode 以固定大小的 block 为基本单位组织文件内容,默 认情况下 block 大小为 64MB。当用户上传一个大的文件到 HDFS 上时,该文件会被切分成 若干个 block,分别存储到不同的 DataNode ;同时,为了保证数据可靠,会将同一个 block 以流水线方式写到若干个(默认是 3,该参数可配置)不同的 DataNode 上。这种文件切割 后存储的过程是对用户透明的。 2.4.2 Hadoop MapReduce 架构 同 HDFS 一样,Hadoop MapReduce 也采用了 Master/Slave(M/S)架构,具体如图 2-5 所示。它主要由以下几个组件组成 :Client、JobTracker、 TaskTracker 和 Task。下面分别对 这几个组件进行介绍。 图 2-5 Hadoop MapReduce 架构图 第 2 章 MapReduce 设计理念与基本架构   35 (1)Client 用户编写的 MapReduce 程序通过 Client 提交到 JobTracker 端 ;同时,用户可通过 Client 提 供的一些接口查看作业运行状态。在 Hadoop 内部用“作业”(Job)表示 MapReduce 程序。一个 MapReduce 程序可对应若干个作业,而每个作业会被分解成若干个 Map/Reduce 任务(Task)。 (2)JobTracker JobTracker 主要负责资源监控和作业调度。JobTracker 监控所有 TaskTracker 与作业的 健康状况,一旦发现失败情况后,其会将相应的任务转移到其他节点;同时,JobTracker 会 跟踪任务的执行进度、资源使用量等信息,并将这些信息告诉任务调度器,而调度器会在 资源出现空闲时,选择合适的任务使用这些资源。在 Hadoop 中,任务调度器是一个可插拔 的模块,用户可以根据自己的需要设计相应的调度器。 (3)TaskTracker TaskTracker 会周期性地通过 Heartbeat 将本节点上资源的使用情况和任务的运行进度汇报 给 JobTracker,同时接收 JobTracker 发送过来的命令并执行相应的操作(如启动新任务、杀死 任务等)。TaskTracker 使用“slot”等量划分本节点上的资源量。“slot”代表计算资源(CPU、 内存等)。一个 Task 获取到一个 slot 后才有机会运行,而 Hadoop 调度器的作用就是将各个 TaskTracker 上的空闲 slot 分配给 Task 使用。slot 分为 Map slot 和 Reduce slot 两种,分别供 Map Task 和 Reduce Task 使用。TaskTracker 通过 slot 数目(可配置参数)限定 Task 的并发度。 (4)Task Task 分为 Map Task 和 Reduce Task 两种,均由 TaskTracker 启动。从上一小节中我们知道, HDFS 以固定大小的 block 为基本单位存储数据,而对于 MapReduce 而言,其处理单位是 split。 split 与 block 的对应关系如图 2-6 所示。split 是一个逻辑概念,它只包含一些元数据信息,比如 数据起始位置、数据长度、数据所在节点等。它的划分方法完全由用户自己决定。但需要注意 的是,split 的多少决定了 Map Task 的数目,因为每个 split 会交由一个 Map Task 处理。 图 2-6 split 与 block 的对应关系 36   第一部分 基 础 篇 Map Task 执行过程如图 2-7 所示。由该图可知,Map Task 先将对应的 split 迭代解析成一 个个 key/value 对,依次调用用户自定义的 map() 函数进行处理,最终将临时结果存放到本地 磁盘上,其中临时数据被分成若干个 partition,每个 partition 将被一个 Reduce Task 处理。 图 2-7 Map Task 执行流程 Reduce Task 执行过程如图 2-8 所示。该过程分为三个阶段①从远程节点上读取 Map Task 中间结果(称为“Shuffle 阶段”);②按照 key 对 key/value 对进行排序(称为“Sort 阶 段”);③依次读取 ,调用用户自定义的 reduce() 函数处理,并将最终结果 存到 HDFS 上(称为“Reduce 阶段”)。 图 2-8 Reduce Task 执行过程 2.5 Hadoop MapReduce 作业的生命周期 由于本书以“作业生命周期”为线索对 Hadoop MapReduce 架构设计和实现原理进行 解析,因而在深入剖析各个 MapReduce 实现细节之前整体了解一个作业的生命周期显得非 常重要。为此,本节主要讲解 Hadoop MapReduce 作业的生命周期,即作业从提交到运行 结束经历的整个过程。本节只是概要性地介绍 MapReduce 作业的生命周期,可看作后续几 章的内容导读。作业生命周期中具体各个阶段的深入剖析将在后续的章节中进行。 第 2 章 MapReduce 设计理念与基本架构   37 假设用户编写了一个 MapReduce 程序,并将其打包成 xxx.jar 文件,然后使用以下命 令提交作业: $HADOOP_HOME/bin/hadoop jar xxx.jar \ -D mapred.job.name="xxx" \ -D mapred.map.tasks=3 \ -D mapred.reduce.tasks=2 \ -D input=/test/input \ -D output=/test/output 则该作业的运行过程如图 2-9 所示。 图 2-9 Hadoop MapReduce 作业的生命周期 这个过程分为以下 5 个步骤: 步骤 1 作业提交与初始化。用户提交作业后,首先由 JobClient 实例将作业相关信 息,比如将程序 jar 包、作业配置文件、分片元信息文件等上传到分布式文件系统(一般为 HDFS)上,其中,分片元信息文件记录了每个输入分片的逻辑位置信息。然后 JobClient 通过 RPC 通知 JobTracker。JobTracker 收到新作业提交请求后,由作业调度模块对作业进 行初始化 :为作业创建一个 JobInProgress 对象以跟踪作业运行状况,而 JobInProgress 则会 38   第一部分 基 础 篇 为每个 Task 创建一个 TaskInProgress 对象以跟踪每个任务的运行状态,TaskInProgress 可能 需要管理多个“Task 运行尝试”(称为“Task Attempt”)。具体分析见第 5 章。 步骤 2 任务调度与监控。前面提到,任务调度和监控的功能均由 JobTracker 完成。 TaskTracker 周期性地通过 Heartbeat 向 JobTracker 汇报本节点的资源使用情况,一旦出现 空闲资源,JobTracker 会按照一定的策略选择一个合适的任务使用该空闲资源,这由任务调 度器完成。任务调度器是一个可插拔的独立模块,且为双层架构,即首先选择作业,然后 从该作业中选择任务,其中,选择任务时需要重点考虑数据本地性。此外,JobTracker 跟踪 作业的整个运行过程,并为作业的成功运行提供全方位的保障。首先,当 TaskTracker 或者 Task 失败时,转移计算任务 ;其次,当某个 Task 执行进度远落后于同一作业的其他 Task 时, 为之启动一个相同 Task,并选取计算快的 Task 结果作为最终结果。具体分析见第 6 章。 步 骤 3  任 务 运 行 环 境 准 备。 运 行 环 境 准 备 包 括 JVM 启 动 和 资 源 隔 离, 均 由 TaskTracker 实现。TaskTracker 为每个 Task 启动一个独立的 JVM 以避免不同 Task 在运行 过程中相互影响;同时,TaskTracker 使用了操作系统进程实现资源隔离以防止 Task 滥用资 源。具体分析见第 7 章。 步骤 4 任务执行。TaskTracker 为 Task 准备好运行环境后,便会启动 Task。在运行过 程中,每个 Task 的最新进度首先由 Task 通过 RPC 汇报给 TaskTracker,再由 TaskTracker 汇报给 JobTracker。具体分析见第 8 章。 步骤 5 作业完成。待所有 Task 执行完毕后,整个作业执行成功。 2.6 小结 Hadoop MapReduce 直接诞生于搜索领域,以易于编程、良好的扩展性和高容错性为设 计目标。它主要由两部分组成 :编程模型和运行时环境。其中,编程模型为用户提供了 5 个可编程组件,分别是 InputFormat、Mapper、Partitioner、Reducer 和 OutputFormat ;运行 时环境则将用户的 MapReduce 程序部署到集群的各个节点上,并通过各种机制保证其成功 运行。 Hadoop MapReduce 处理的数据一般位于底层分布式文件系统中。该系统往往将用户 的文件切分成若干个固定大小的 block 存储到不同节点上。默认情况下,MapReduce 的每 个 Task 处理一个 block。 MapReduce 主要由四个组件构成,分别是 Client、JobTracker、 TaskTracker 和 Task,它们共同保障一个作业的成功运行。一个 MapReduce 作业的运行 周期是,先在 Client 端被提交到 JobTracker 上,然后由 JobTracker 将作业分解成若干个 Task,并对这些 Task 进行调度和监控,以保障这些程序运行成功,而 TaskTracker 则启动 JobTracker 发来的 Task,并向 JobTracker 汇报这些 Task 的运行状态和本节点上资源的使用 情况。 第二部分 MapReduce 编程模型篇 本部分内容 MapReduce 编程模型 第 3 章 MapReduce 编程模型 MapReduce 应用广泛的原因之一在于它的易用性。它提供了一个因高度抽象化而变得 异常简单的编程模型 。在第 2 章中,我们已经对该编程模型的定义以及应用场景做了简单 介绍。在这一章中,我们将从 Hadoop MapReduce 编程模型实现的角度对其进行深入分析, 包括其编程模型的体系结构、设计原理等。2728 本章不介绍 Hadoop MapReduce API 的使用方法,而是从实现角度介绍其设计方法。 3.1 MapReduce 编程模型概述 在第 2 章中,我们提到 MapReduce 是在总结大量应用的共同特点的基础上抽象出来的 分布式计算框架,它适用的应用场景往往具有一个共同的特点 :任务可被分解成相互独立 的子问题。基于该特点,MapReduce 编程模型给出了其分布式编程方法,共分 5 个步骤: 1)迭代(iteration)。遍历输入数据,并将之解析成 key/value 对。 2)将输入 key/value 对映射(map)成另外一些 key/value 对。 3)依据 key 对中间数据进行分组(grouping)。 4)以组为单位对数据进行归约(reduce)。 5)迭代。将最终产生的 key/value 对保存到输出文件中。 MapReduce 将计算过程分解成以上 5 个步骤带来的最大好处是组件化与并行化。 为了实现 MapReduce 编程模型,Hadoop 设计了一系列对外编程接口。用户可通过实 现这些接口完成应用程序的开发。 3.1.1 MapReduce 编程接口体系结构 MapReduce 编程模型对外提供的编程接口体系结构如图 3-1 所示,整个编程模型位于 应用程序层和 MapReduce 执行器之间,可以分为两层。第一层是最基本的 Java API,主要 有 5 个可编程组件,分别是 InputFormat、Mapper、Partitioner、Reducer 和 OutputFormat 。 Hadoop 自带了很多直接可用的 InputFormat、Partitioner 和 OutputFormat,大部分情况下, 用户只需编写 Mapper 和 Reducer 即可。第二层是工具层,位于基本 Java API 之上,主要是 为了方便用户编写复杂的 MapReduce 程序和利用其他编程语言增加 MapReduce 计算平台的 兼容性而提出来的。在该层中,主要提供了 4 个编程工具包。   关于 MapReduce 编程模型的数学基础,可参考 Ralf Lammel 的论文“Google’s MapReduce Programming Model—Revisited”。   还有一个组件是 Combiner,它实际是一个 Reducer。 第 3 章 MapReduce 编程模型   41 JobControl :方便用户编写有依赖关系的作业,这些作业往往构成一个有向图,所以 ❑ 通常称为 DAG(Directed Acyclic Graph)作业,如第 2 章中的朴素贝叶斯分类算法 实现便是 4 个有依赖关系的作业构成的 DAG。 ChainMapper/ChainReducer:方便用户编写链式作业,即在 Map 或者 Reduce 阶段存 ❑ 在多个 Mapper,形式如下: [MAPPER+ REDUCER MAPPER*] Hadoop Streaming :方便用户采用非 Java 语言编写作业,允许用户指定可执行文件 ❑ 或者脚本作为 Mapper/Reducer。 Hadoop Pipes:专门为 C/C++ 程序员编写 MapReduce 程序提供的工具包。 ❑ 图 3-1 MapReduce 编程接口体系结构 3.1.2 新旧 MapReduce API 比较 从 0.20.0 版本开始,Hadoop 同时提供了新旧两套 MapReduce API。新 API 在旧 API 基础上进行了封装,使得其在扩展性和易用性方面更好。新旧版 MapReduce API 的主要区 别如下。 (1)存放位置 旧版 API 放在 org.apache.hadoop.mapred 包中,而新版 API 则放在 org.apache.hadoop. mapreduce 包及其子包中。 (2)接口变为抽象类 接口通常作为一种严格的“协议约束”。它只有方法声明而没有方法实现,且要求所有实 现类(不包括抽象类)必须实现接口中的每一个方法。接口的最大优点是允许一个类实现多 个接口,进而实现类似 C++ 中的“多重继承”。抽象类则是一种较宽松的“约束协议”,它可 为某些方法提供默认实现。而继承类则可选择是否重新实现这些方法。正是因为这一点,抽 象类在类衍化方面更有优势,也就是说,抽象类具有良好的向后兼容性,当需要为抽象类添 加新的方法时,只要新添加的方法提供了默认实现,用户之前的代码就不必修改了。 42   第二部分 MapReduce 编程模型篇 考 虑 到 抽 象 类 在 API 衍 化 方 面 的 优 势, 新 API 将 InputFormat、OutputFormat、 Mapper、Reducer 和 Partitioner 由接口变为抽象类。 (3)上下文封装 新版 API 将变量和函数封装成各种上下文(Context)类,使得 API 具有更好的易用性 和扩展性。首先,函数参数列表经封装后变短,使得函数更容易使用 ;其次,当需要修改 或添加某些变量或函数时,只需修改封装后的上下文类即可,用户代码无须修改,这样保 证了向后兼容性,具有良好的扩展性。 图 3-2 展示了新版 API 中树形的 Context 类继承关系。这些 Context 各自封装了一种实 体的基本信息及对应的操作(setter 和 getter 函数),如 JobContext、TaskAttemptContext 分 别封装了 Job 和 Task 的基本信息,TaskInputOutputContext 封装了 Task 的各种输入输出操 作,MapContext 和 ReduceContext 分别封装了 Mapper 和 Reducer 对外的公共接口。 图 3-2 新版 API 中树形 Context 类继承关系 除了以上三点不同之外,新旧 API 在很多其他细节方面也存在小的差别,具体将在接 下来的内容中讲解。 由于新版和旧版 API 在类层次结构、编程接口名称及对应的参数列表等方面存在较大 差别,所以两种 API 不能兼容。但考虑到应用程序的向后兼容性,短时间内不会将旧 API 从 MapReduce 中去掉。即使在完全采用新 API 的 0.21.0/0.22.X 版本系列中,也仅仅将旧 API 标注为过期(deprecated),用户仍然可以使用。 本章将对比介绍两套 MapReduce API 的设计细节。但考虑到新版 API 只是在旧版基础 上封装而来的,因此,我们将详细分析旧版 API 的设计思路,而对于新版 API,仅是概要 介绍它与旧版本的不同之处。 3.2 MapReduce API 基本概念 在正式分析新旧 API 之前,先要介绍几个基本概念。这些概念贯穿于所有 API 之中, 因此,有必要单独讲解。 3.2.1 序列化 序列化是指将结构化对象转为字节流以便于通过网络进行传输或写入持久存储的过程。 反序列化指的是将字节流转为结构化对象的过程。在 Hadoop MapReduce 中,序列化的主 要作用有两个:永久存储和进程间通信。 为 了 能 够 读 取 或 者 存 储 Java 对 象,MapReduce 编 程 模 型 要 求 用 户 输 入 和 输 出 数 第 3 章 MapReduce 编程模型   43 据 中 的 key 和 value 必 须 是 可 序 列 化 的。 在 Hadoop MapReduce 中, 使 一 个 Java 对 象 可序列化的方法是让其对应的类实现 Writable 接口。但对于 key 而言,由于它是数据 排序的关键字,因此还需要提供比较两个 key 对象的方法。为此,key 对应类需实现 WritableComparable 接口,它的类如图 3-3 所示 。29 图 3-3 序列化接口 WritableComparable 的类图 3.2.2 Reporter 参数 Reporter 是 MapReduce 提 供 给 应 用 程 序 的 工 具。 如 图 3-4 所 示, 应 用 程 序 可 使 用 Reporter 中的方法报告完成进度(progress)、设定状态消息(setStatus)以及更新计数器 (incrCounter)。 图 3-4 Reporter 的类图 Reporter 是一个基础参数。MapReduce 对外提供的大部分组件,包括 InputFormat、 Mapper 和 Reducer 等,均在其主要方法中添加了该参数,具体可参考 3.3 节。 3.2.3 回调机制 回调机制是一种常见的设计模式。它将工作流内的某个功能按照约定的接口暴露给外 部使用者,为外部使用者提供数据,或要求外部使用者提供数据。  关于 Hadoop 序列化的更详细介绍,可参考《Hadoop 权威指南》的“第 4 章 Hadoop I/O”。 44   第二部分 MapReduce 编程模型篇 Hadoop MapReduce 对外提供的 5 个组件(InputFormat、Mapper、Partitioner、Reducer 和 OutputFormat)实际上全部属于回调接口。当用户按照约定实现这几个接口后,MapReduce 运行时环境会自动调用它们。 如图 3-5 所示,MapReduce 给用户暴露了接口 Mapper,当用户按照自己的应用程序 逻辑实现自己的 MyMapper 后,Hadoop MapReduce 运行时环境会将输入数据解析成 key/ value 对,并调用 map() 函数迭代处理。 图 3-5 MapReduce 回调机制实例 3.3 Java API 解析 Hadoop 的主要编程语言是 Java,因而 Java API 是最基本的对外编程接口。当前各个版 本的 Hadoop 均同时存在新旧两种 API。本节将对比讲解这两种 API 的设计思路,主要内容 包括使用实例、接口设计、在 MapReduce 运行时环境中的调用时机等。 3.3.1 作业配置与提交 1. Hadoop 配置文件介绍 在 Hadoop 中,Common、HDFS 和 MapReduce 各有对应的配置文件,用于保存对应模 块中可配置的参数。这些配置文件均为 XML 格式且由两部分构成:系统默认配置文件和管 理员自定义配置文件。其中,系统默认配置文件分别是 core-default.xml、hdfs-default.xml 和 mapred-default.xml,它们包含了所有可配置属性的默认值。而管理员自定义配置文件分 别是 core-site.xml、hdfs-site.xml 和 mapred-site.xml。它们由管理员设置,主要用于定义一 第 3 章 MapReduce 编程模型   45 些新的配置属性或者覆盖系统默认配置文件中的默认值。通常这些配置一旦确定,便不能 被修改(如果想修改,需重新启动 Hadoop)。需要注意的是,core-default.xml 和 core-site. xml 属于公共基础库的配置文件,默认情况下,Hadoop 总会优先加载它们。 在 Hadoop 中,每个配置属性主要包括三个配置参数 :name、value 和 description,分 别表示属性名、属性值和属性描述。其中,属性描述仅仅用来帮助用户理解属性的含义, Hadoop 内部并不会使用它的值。此外,Hadoop 为配置文件添加了两个新的特性 :final 参 数和变量扩展。 final 参数 :如果管理员不想让用户程序修改某些属性的属性值,可将该属性的 final ❑ 参数置为 true,比如: mapred.map.tasks.speculative.execution true <final>true 管理员一般在 XXX-site.xml 配置文件中为某些属性添加 final 参数,以防止用户在应用 程序中修改这些属性的属性值。 变量扩展:当读取配置文件时,如果某个属性存在对其他属性的引用,则 Hadoop 首 ❑ 先会查找引用的属性是否为下列两种属性之一。如果是,则进行扩展。 ○ 其他已经定义的属性。 ○ Java 中 System.getProperties() 函数可获取属性。 比如,如果一个配置文件中包含以下配置参数: hadoop.tmp.dir /tmp/hadoop-${user.name} mapred.temp.dir ${hadoop.tmp.dir}/mapred/temp 则当用户想要获取属性 mapred.temp.dir 的值时,Hadoop 会将 hadoop.tmp.dir 解析成该配置 文件中另外一个属性的值,而 user.name 则被替换成系统属性 user.name 的值。 2. MapReduce 作业配置与提交 在 MapReduce 中,每个作业由两部分组成 :应用程序和作业配置。其中,作业配置内 容包括环境配置和用户自定义配置两部分。环境配置由 Hadoop 自动添加,主要由 mapred- default.xml 和 mapred-site.xml 两个文件中的配置选项组合而成 ;用户自定义配置则由用 户自己根据作业特点个性化定制而成,比如用户可设置作业名称,以及 Mapper/Reducer、 Reduce Task 个数等。在新旧两套 API 中,作业配置接口发生了变化,首先通过一个例子感 受一下使用上的不同。 46   第二部分 MapReduce 编程模型篇 旧 API 作业配置实例: JobConf job = new JobConf(new Configuration(), MyJob.class); job.setJobName("myjob"); job.setMapperClass(MyJob.MyMapper.class); job.setReducerClass(MyJob.MyReducer.class); JobClient.runJob(job); 新 API 作业配置实例: Configuration conf = new Configuration(); Job job = new Job(conf, "myjob "); job.setJarByClass(MyJob.class); job.setMapperClass(MyJob.MyMapper.class); job.setReducerClass(MyJob.MyReducer.class); System.exit(job.waitForCompletion(true) ? 0 : 1); 从以上两个实例可以看出,新版 API 用 Job 类代替了 JobConf 和 JobClient 两个类,这 样,仅使用一个类的同时可完成作业配置和作业提交相关功能,进一步简化了作业编写方 式。我们将在第 5 章介绍作业提交的相关细节,本小节重点从设计角度分析新旧两套 API 中作业配置的相关实现细节。 3. 旧 API 中的作业配置 MapReduce 配 置 模 块 代 码 结 构 如 图 3-6 所 示。 其 中,org.apache.hadoop.conf 中 的 Configuration 类是配置模块最底层的类。从图 3-6 中可以看出,该类支持以下两种基本操作。 序列化 :序列化是将结构化数据转换成字节流,以便于传输或存储。Java 实现了自 ❑ 己的一套序列化框架。凡是需要支持序列化的类,均需要实现 Writable 接口。 迭代:为了方便遍历所有属性,它实现了 Java 开发包中的 Iterator 接口。 ❑ 图 3-6 旧 MapReduce API 中作业配置类图 Configuration 类总会依次加载 core-default.xml 和 core-site.xml 两个基础配置文件,相 第 3 章 MapReduce 编程模型   47 关代码如下: addDefaultResource("core-default.xml"); addDefaultResource("core-site.xml"); addDefaultResource 函数的参数为 XML 文件名,它能够将 XML 文件中的 name/value 加载到内存中。当连续调用多次该函数时,对于同一个配置选项,其后面的值会覆盖前面 的值。 Configuration 类中有大量针对常见数据类型的 getter/setter 函数,用于获取或者设置某 种数据类型属性的属性值。比如,对于 float 类型,提供了这样一对函数: float getFloat(String name, float defaultValue) void setFloat(String name, float value) 除了大量 getter/setter 函数外,Configuration 类中还有一个非常重要的函数: void writeXml(OutputStream out) 该函数能够将当前 Configuration 对象中所有属性及属性值保存到一个 XML 文件中, 以便于在节点之间传输。这点在以后的几节中会提到。 JobConf 类描述了一个 MapReduce 作业运行时需要的所有信息,而 MapReduce 运行时 环境正是根据 JobConf 提供的信息运行作业的。 JobConf 继承了 Configuration 类,并添加了一些设置 / 获取作业属性的 setter/getter 函 数,以方便用户编写 MapReduce 程序,如设置 / 获取 Reduce Task 个数的函数为: public int getNumReduceTasks() { return getInt("mapred.reduce.tasks", 1); } public void setNumReduceTasks(int n) { setInt("mapred.reduce.tasks", n); } JobConf 中添加的函数均是对 Configuration 类中函数的再次封装。由于它在这些函数 名中融入了作业属性的名字,因而更易于使用。 默认情况下,JobConf 会自动加载配置文件 mapred-default.xml 和 mapred-site.xml,相 关代码如下: static{ Configuration.addDefaultResource("mapred-default.xml"); Configuration.addDefaultResource("mapred-site.xml"); } 4. 新 API 中的作业配置 前 面 提 到, 与 新 API 中 的 作 业 配 置 相 关 的 类 是 Job。该类同时具有作业配置和作业提交的功能,其 中,作业提交将在第 5 章中介绍,这里只关注作业配 置部分。作业配置部分的类图如图 3-7 所示。Job 类继 承了一个新类 JobContext,而 Context 自身则包含一个 JobConf 类型的成员。注意,JobContext 类仅提供了一 些 getter 方法,而 Job 类中则提供了一些 setter 方法。 图 3-7 新 MapReduce API 中作业配 置类图 48   第二部分 MapReduce 编程模型篇 3.3.2 InputFormat 接口的设计与实现 InputFormat 主要用于描述输入数据的格式,它提供以下两个功能。 数据切分 :按照某个策略将输入数据切分成若干个 split,以便确定 Map Task 个数以 ❑ 及对应的 split。 为 Mapper 提供输入数据:给定某个 split,能将其解析成一个个 key/value 对。 ❑ 本小节将介绍 Hadoop 如何设计 InputFormat 接口,以及提供了哪些常用的 InputFormat 实现。 1. 旧版 API 的 InputFormat 解析 如图 3-8 所示,在旧版 API 中,InputFormat 是一个接口,它包含两种方法: InputSplit[] getSplits(JobConf job, int numSplits) throws IOException; RecordReader getRecordReader(InputSplit split, JobConf job, Reporter reporter) throws IOException; getSplits 方法主要完成数据切分的功能,它会尝试着将输入数据切分成 numSplits 个 InputSplit。InputSplit 有以下两个特点。 逻辑分片 :它只是在逻辑上对输入数据进行分片,并不会在磁盘上将其切分成分片 ❑ 进行存储。InputSplit 只记录了分片的元数据信息,比如起始位置、长度以及所在的 节点列表等。 可序列化:在 Hadoop 中,对象序列化主要有两个作用:进程间通信和永久存储。此 ❑ 处,InputSplit 支持序列化操作主要是为了进程间通信。作业被提交到 JobTracker 之 前,Client 会调用作业 InputFormat 中的 getSplits 函数,并将得到的 InputSplit 序列 化到文件中。这样,当作业提交到 JobTracker 端对作业初始化时,可直接读取该文 件,解析出所有 InputSplit,并创建对应的 Map Task。 图 3-8 旧 API 中 InputFormat 类图 第 3 章 MapReduce 编程模型   49 getRecordReader 方 法 返 回 一 个 RecordReader 对 象, 该 对 象 可 将 输 入 的 InputSplit 解 析 成 若 干 个 key/value 对。MapReduce 框 架 在 Map Task 执 行 过 程 中, 会 不 断 调 用 RecordReader 对象中的方法,迭代获取 key/value 对并交给 map() 函数处理,主要代码(经 过简化)如下: // 调用 InputSplit 的 getRecordReader 方法获取 RecordReader input …… K1 key = input.createKey(); V1 value = input.createValue(); while (input.next(key, value)) { // 调用用户编写的 map() 函数 } input.close(); 前面分析了 InputFormat 接口的定义,接下来介绍系统自带的各种 InputFormat 实现。 为了方便用户编写 MapReduce 程序,Hadoop 自带了一些针对数据库和文件的 InputFormat 实现,具体如图 3-9 所示。通常而言,用户需要处理的数据均以文件形式存储到 HDFS 上, 所以我们重点针对文件的 InputFormat 实现进行讨论。 图 3-9 Hadoop MapReduce 自带 InputFormat 实现的类层次图 如图 3-9 所示,所有基于文件的 InputFormat 实现的基类是 FileInputFormat,并由此派 生出针对文本文件格式的 TextInputFormat、KeyValueTextInputFormat 和 NLineInputFormat, 针 对 二 进 制 文 件 格 式 的 SequenceFileInputFormat 等。 整 个 基 于 文 件 的 InputFormat 体 系的设计思路是,由公共基类 FileInputFormat 采用统一的方法对各种输入文件进行切 分,比如按照某个固定大小等分,而由各个派生 InputFormat 自己提供机制将进一步解析 InputSplit。对应到具体的实现是,基类 FileInputFormat 提供 getSplits 实现,而派生类提供 getRecordReader 实现。 为了帮助读者深入理解这些 InputFormat 的实现原理,我们选取 TextInputFormat 与 SequenceFileInputFormat 进行重点介绍。 我们首先介绍基类 FileInputFormat 的实现。它最重要的功能是为各种 InputFormat 提 供统一的 getSplits 函数。该函数实现中最核心的两个算法是文件切分算法和 host 选择算法。 50   第二部分 MapReduce 编程模型篇 (1)文件切分算法 文 件 切 分 算 法 主 要 用 于 确 定 InputSplit 的 个 数 以 及 每 个 InputSplit 对 应 的 数 据 段。 FileInputFormat 以文件为单位切分生成 InputSplit。对于每个文件,由以下三个属性值确定 其对应的 InputSplit 的个数。 goalSize :它是根据用户期望的 InputSplit 数目计算出来的,即 totalSize/numSplits。 ❑ 其中,totalSize 为文件总大小;numSplits 为用户设定的 Map Task 个数,默认情况下 是 1。 minSize:InputSplit 的最小值,由配置参数 mapred.min.split.size 确定,默认是 1。 ❑ blockSize:文件在 HDFS 中存储的 block 大小,不同文件可能不同,默认是 64 MB。 ❑ 这三个参数共同决定 InputSplit 的最终大小,计算方法如下: splitSize = max{minSize, min{goalSize, blockSize}} 一旦确定 splitSize 值后,FileInputFormat 将文件依次切成大小为 splitSize 的 InputSplit, 最后剩下不足 splitSize 的数据块单独成为一个 InputSplit。 【实例】输入目录下有三个文件 file1、file2 和 file3,大小依次为 1 MB,32 MB 和 250 MB。若 blockSize 采用默认值 64 MB,则不同 minSize 和 goalSize 下,file3 切分结果 如表 3-1 所示(三种情况下,file1 与 file2 切分结果相同,均为 1 个 InputSplit)。 表 3-1 minSize、goalSize、splitSize 与 InputSplit 对应关系 minSize goalSize splitSize file3 对应的 InputSplit 数目 输入目录对应的 InputSplit 总数 1 MB totalSize (numSplits=1) 64 MB 4 6 32 MB totalSize/5 50 MB 5 7 128 MB totalSize/2 128 MB 2 4 结合表和公式可以知道,如果想让 InputSplit 尺寸大于 block 尺寸,则直接增大配置参 数 mapred.min.split.size 即可。 (2)host 选择算法 待 InputSplit 切分方案确定后,下一步要确定每个 InputSplit 的元数据信息。这通常由 四部分组成 :,分别表示 InputSplit 所在的文件、起始位置、长度 以及所在的 host(节点)列表。其中,前三项很容易确定,难点在于 host 列表的选择方法。 InputSplit 的 host 列表选择策略直接影响到运行过程中的任务本地性。第 2 章介绍 Hadoop 架构时,我们提到 HDFS 上的文件是以 block 为单位组织的,一个大文件对应的 block 可能遍布整个 Hadoop 集群,而 InputSplit 的划分算法可能导致一个 InputSplit 对应多 个 block ,这些 block 可能位于不同节点上,这使得 Hadoop 不可能实现完全的数据本地性。 为此,Hadoop 将数据本地性按照代价划分成三个等级 :node locality、rack locality 和 data center locality(Hadoop 还未实现该 locality 级别)。在进行任务调度时,会依次考虑这 3 个 节点的 locality,即优先让空闲资源处理本节点上的数据,如果节点上没有可处理的数据, 第 3 章 MapReduce 编程模型   51 则处理同一个机架上的数据,最差情况是处理其他机架上的数据(但是必须位于同一个数 据中心)。 虽然 InputSplit 对应的 block 可能位于多个节点上,但考虑到任务调度的效率,通常不 会把所有节点加到 InputSplit 的 host 列表中,而是选择包含(该 InputSplit)数据总量最大 的前几个节点(Hadoop 限制最多选择 10 个,多余的会过滤掉),以作为任务调度时判断任 务是否具有本地性的主要凭证。为此,FileInputFormat 设计了一个简单有效的启发式算法 : 首先按照 rack 包含的数据量对 rack 进行排序,然后在 rack 内部按照每个 node 包含的数据 量对 node 排序,最后取前 N 个 node 的 host 作为 InputSplit 的 host 列表,这里的 N 为 block 副本数。这样,当任务调度器调度 Task 时,只要将 Task 调度给位于 host 列表的节点,就 认为该 Task 满足本地性。 【实例】某个 Hadoop 集群的网络拓扑结构如图 3-10 所示,HDFS 中 block 副本数为 3,某个 InputSplit 包含 3 个 block,大小依次是 100、150 和 75,很容易计算,4 个 rack 包 含的(该 InputSplit 的)数据量分别是 175、250、150 和 75。rack2 中的 node3 和 node4, rack1 中的 node1 将被添加到该 InputSplit 的 host 列表中。 图 3-10 一个 Hadoop 集群的网络拓扑结构图 从以上 host 选择算法可知,当 InputSplit 尺寸大于 block 尺寸时,Map Task 并不能实 现完全数据本地性,也就是说,总有一部分数据需要从远程节点上读取,因而可以得出以 下结论: 当使用基于 FileInputFormat 实现 InputFormat 时,为了提高 Map Task 的数据本地 性,应尽量使 InputSplit 大小与 block 大小相同。 分析完 FileInputFormat 实现方法,接下来分析派生类 TextInputFormat 与 Sequence- FileInputFormat 的实现。 前面提到,由派生类实现 getRecordReader 函数,该函数返回一个 RecordReader 对象。 它实现了类似于迭代器的功能,将某个 InputSplit 解析成一个个 key/value 对。在具体实现 时,RecordReader 应考虑以下两点。 52   第二部分 MapReduce 编程模型篇 定位记录边界 :为了能够识别一条完整的记录,记录之间应该添加一些同步标识。 ❑ 对于 TextInputFormat,每两条记录之间存在换行符 ;对于 SequenceFileInputFormat, 每隔若干条记录会添加固定长度的同步字符串。通过换行符或者同步字符串,它们 很容易定位到一个完整记录的起始位置。另外,由于 FileInputFormat 仅仅按照数据 量多少对文件进行切分,因而 InputSplit 的第一条记录和最后一条记录可能会被从 中间切开。为了解决这种记录跨越 InputSplit 的读取问题,RecordReader 规定每个 InputSplit 的第一条不完整记录划给前一个 InputSplit 处理。 解析 key/value :定位到一条新的记录后,需将该记录分解成 key 和 value 两部分。 ❑ 对于 TextInputFormat,每一行的内容即为 value,而该行在整个文件中的偏移量为 key。对于 SequenceFileInputFormat,每条记录的格式为: [record length] [key length] [key] [value] 其中,前两个字段分别是整条记录的长度和 key 的长度,均为 4 字节,后两个字段分 别是 key 和 value 的内容。知道每条记录的格式后,很容易解析出 key 和 value。 2. 新版 API 的 InputFormat 解析 新版 API 的 InputFormat 类图如图 3-11 所示。新 API 与旧 API 比较,在形式上发生了 较大变化,但仔细分析,发现仅仅是对之前的一些类进行了封装。正如 3.1.2 节介绍的那 样,通过封装,使接口的易用性和扩展性得以增强。 图 3-11 新 API 中 InputFormat 类图 此外,对于基类 FileInputFormat,新版 API 中有一个值得注意的改动 :InputSplit 划分 算法不再考虑用户设定的 Map Task 个数,而用 mapred.max.split.size(记为 maxSize)代替, 第 3 章 MapReduce 编程模型   53 即 InputSplit 大小的计算公式变为: splitSize = max{minSize, min{maxSize, blockSize}} 3.3.3 OutputFormat 接口的设计与实现 OutputFormat 主要用于描述输出数据的格式,它能够将用户提供的 key/value 对写入 特定格式的文件中。本小节将介绍 Hadoop 如何设计 OutputFormat 接口,以及一些常用的 OutputFormat 实现。 1. 旧版 API 的 OutputFormat 解析 如图 3-12 所示,在旧版 API 中,OutputFormat 是一个接口,它包含两个方法: RecordWriter getRecordWriter(FileSystem ignored, JobConf job, String name, Progressable progress) throws IOException; void checkOutputSpecs(FileSystem ignored, JobConf job) throws IOException; checkOutputSpecs 方法一般在用户作业被提交到 JobTracker 之前,由 JobClient 自动调 用,以检查输出目录是否合法。 图 3-12 旧版 API 的 OutputFormat 类图 getRecordWriter 方法返回一个 RecordWriter 类对象。该类中的方法 write 接收一个 key/value 对, 并 将 之 写 入 文 件。 在 Task 执 行 过 程 中,MapReduce 框 架 会 将 map() 或 者 reduce() 函数产生的结果传入 write 方法,主要代码(经过简化)如下。 假设用户编写的 map() 函数如下: public void map(Text key, Text value, OutputCollector output, Reporter reporter) throws IOException { // 根据当前 key/value 产生新的输出 ,并输出 …… output.collect(newKey, newValue); } 则函数 output.collect(newKey, newValue) 内部执行代码如下: RecordWriter out = job.getOutputFormat().getRecordWriter(...); 54   第二部分 MapReduce 编程模型篇 out.write(newKey, newValue); Hadoop 自带了很多 OutputFormat 实现,它们与 InputFormat 实现相对应,具体如图 3-13 所示。所有基于文件的 OutputFormat 实现的基类为 FileOutputFormat,并由此派生出 一些基于文本文件格式、二进制文件格式的或者多输出的实现。 图 3-13 Hadoop MapReduce 自带 OutputFormat 实现的类层次图 为了深入分析 OutputFormat 的实现方法,我们选取比较有代表性的 FileOutputFormat 类进行分析。同介绍 InputFormat 实现的思路一样,我们先介绍基类 FileOutputFormat,再 介绍其派生类 TextOutputFormat。 基类 FileOutputFormat 需要提供所有基于文件的 OutputFormat 实现的公共功能,总结 起来,主要有以下两个: (1)实现 checkOutputSpecs 接口 该接口在作业运行之前被调用,默认功能是检查用户配置的输出目录是否存在,如果 存在则抛出异常,以防止之前的数据被覆盖。 (2)处理 side-effect file 任务的 side-effect file 并不是任务的最终输出文件,而是具有特殊用途的任务专属文 件。它的典型应用是执行推测式任务。在 Hadoop 中,因为硬件老化、网络故障等原因,同 一个作业的某些任务执行速度可能明显慢于其他任务,这种任务会拖慢整个作业的执行速 度。为了对这种“慢任务”进行优化,Hadoop 会为之在另外一个节点上启动一个相同的 任务,该任务便被称为推测式任务,最先完成任务的计算结果便是这块数据对应的处理结 果。为防止这两个任务同时往一个输出文件中写入数据时发生写冲突,FileOutputFormat 会为每个 Task 的数据创建一个 side-effect file,并将产生的数据临时写入该文件,待 Task 完成后,再移动到最终输出目录中。这些文件的相关操作,比如创建、删除、移动等,均 由 OutputCommitter 完成。它是一个接口, Hadoop 提供了默认实现 FileOutputCommitter, 用 户 也 可 以 根 据 自 己 的 需 求 编 写 OutputCommitter 实 现, 并 通 过 参 数 {mapred.output. committer.class} 指定。OutputCommitter 接口定义以及 FileOutputCommitter 对应的实现如 表 3-2 所示。 第 3 章 MapReduce 编程模型   55 表 3-2 OutputCommitter 接口定义以及 FileOutputCommitter 对应的实现 方法 何时被调用 FileOutputCommitter 实现 setupJob 作业初始化 创建临时目录 ${mapred.out.dir}/_temporary commitJob 作业成功运行完成 删 除 临 时 目 录, 并 在 ${mapred.out.dir} 目 录 下 创 建 空 文 件 _ SUCCESS abortJob 作业运行失败 删除临时目录 setupTask 任务初始化 不进行任何操作。原本是需要在临时目录下创建 side-effect file 的,但它是用时创建的(create on demand) needsTaskCommit 判断是否需要提交结果 只要存在 side-effect file,就返回 true commitTask 任务成功运行完成 提交结果,即将 side-effect file 移动到 ${mapred.out.dir} 目录下 abortTask 任务运行失败 删除任务的 side-effect file 注意 默认情况下,当作业成功运行完成后,会在最终结果目录 ${mapred.out.dir} 下生成 空文件 _SUCCESS。该文件主要为高层应用提供作业运行完成的标识,比如,Oozie 需要 通过检测结果目录下是否存在该文件判断作业是否运行完成。 2. 新版 API 的 OutputFormat 解析 如图 3-14 所示,除了接口变为抽象类外,新 API 中的 OutputFormat 增加了一个新的 方法:getOutputCommitter,以允许用户自己定制合适的 OutputCommitter 实现。30 图 3-14 新版 API 的 OutputFormat 类图 3.3.4 Mapper 与 Reducer 解析 1. 旧版 API 的 Mapper/Reducer 解析 Mapper/Reducer 中封装了应用程序的数据处理逻辑。为了简化接口,MapReduce 要  具体可到 http://incubator.apache.org/oozie/ 下查看相关文档。 56   第二部分 MapReduce 编程模型篇 求所有存储在底层分布式文件系统上的数据均要解释成 key/value 的形式,并交给 Mapper/ Reducer 中的 map/reduce 函数处理,产生另外一些 key/value。 Mapper 与 Reducer 的类体系非常类似,我们以 Mapper 为例进行讲解。Mapper 的类图 如图 3-15 所示,包括初始化、Map 操作和清理三部分。 图 3-15 旧版 API 的 Mapper 类图 (1)初始化 Mapper 继承了 JobConfigurable 接口。该接口中的 configure 方法允许通过 JobConf 参 数对 Mapper 进行初始化。 (2)Map 操作 MapReduce 框 架 会 通 过 InputFormat 中 RecordReader 从 InputSplit 获 取 一 个 个 key/ value 对,并交给下面的 map() 函数处理: void map(K1 key, V1 value, OutputCollector output, Reporter reporter) throws IOException; 该函数的参数除了 key 和 value 之外,还包括 OutputCollector 和 Reporter 两个类型的 参数,分别用于输出结果和修改 Counter 值。 (3)清理 Mapper 通过继承 Closeable 接口(它又继承了 Java IO 中的 Closeable 接口)获得 close 方法,用户可通过实现该方法对 Mapper 进行清理。 MapReduce 提供了很多 Mapper/Reducer 实现,但大部分功能比较简单,具体如图 3-16 所示。它们对应的功能分别是: ChainMapper/ChainReducer:用于支持链式作业,具体见 3.5.2 节。 ❑ IdentityMapper/IdentityReducer:对于输入 key/value 不进行任何处理,直接输出。 ❑ InvertMapper:交换 key/value 位置。 ❑ 第 3 章 MapReduce 编程模型   57 RegexMapper:正则表达式字符串匹配。 ❑ TokenMapper:将字符串分割成若干个 token(单词),可用作 WordCount 的 Mapper。 ❑ LongSumReducer:以 key 为组,对 long 类型的 value 求累加和。 ❑ 图 3-16 Hadoop MapReduce 自带 Mapper/Reducer 实现的类层次图 对于一个 MapReduce 应用程序,不一定非要存在 Mapper。MapReduce 框架提供了 比 Mapper 更通用的接口 :MapRunnable,如图 3-17 所示。用户可以实现该接口以定制 Mapper 的调用方式或者自己实现 key/value 的处理逻辑,比如,Hadoop Pipes 自行实现了 MapRunnable,直接将数据通过 Socket 发送给其他进程处理。提供该接口的另外一个好处 是允许用户实现多线程 Mapper。 图 3-17 MapRunnable 类图 如 图 3-18 所 示,MapReduce 提 供 了 两 个 MapRunnable 实 现, 分 别 是 MapRunner 和 MultithreadedMapRunner,其中 MapRunner 为默认实现。MultithreadedMapRunner 实现了 一种多线程的 MapRunnable。默认情况下,每个 Mapper 启动 10 个线程,通常用于非 CPU 类型的作业以提供吞吐率。 图 3-18 Hadoop MapReduce 自带 MapRunnable 实现的类层次图 58   第二部分 MapReduce 编程模型篇 2. 新版 API 的 Mapper/Reducer 解析 从图 3-19 可知,新 API 在旧 API 基础上发生了以下几个变化: 图 3-19 新版 API 的 Mapper/Reducer 类图 Mapper 由接口变为抽象类,且不再继承 JobConfigurable 和 Closeable 两个接口,而 ❑ 是直接在类中添加了 setup 和 cleanup 两个方法进行初始化和清理工作。 将参数封装到 Context 对象中,这使得接口具有良好的扩展性。 ❑ 去掉 MapRunnable 接口,在 Mapper 中添加 run 方法,以方便用户定制 map() 函数的 ❑ 调用方法,run 默认实现与旧版本中 MapRunner 的 run 实现一样。 新 API 中 Reducer 遍历 value 的迭代器类型变为 java.lang.Iterable,使得用户可以采 ❑ 用“foreach”形式遍历所有 value,如下所示: 第 3 章 MapReduce 编程模型   59 void reduce(KEYIN key, Iterable values, Context context ) throws IOException, InterruptedException { for(VALUEIN value: values) { // 注意遍历方式 context.write((KEYOUT) key, (VALUEOUT) value); } } 3.3.5 Partitioner 接口的设计与实现 Partitioner 的作用是对 Mapper 产生的中间结果进行分片,以便将同一分组的数据交给 同一个 Reducer 处理,它直接影响 Reduce 阶段的负载均衡。旧版 API 中 Partitioner 的类图如 图 3-20 所示。它继承了 JobConfigurable,可通过 configure 方法初始化。它本身只包含一 个待实现的方法 getPartition。该方法包含三个参数,均由框架自动传入,前面两个参数是 key/value,第三个参数 numPartitions 表示每个 Mapper 的分片数,也就是 Reducer 的个数。 图 3-20 旧版 API 的 Partitioner 类图 MapReduce 提供了两个 Partitioner 实现 :HashPartitioner 和 TotalOrderPartitioner。其 中 HashPartitioner 是默认实现,它实现了一种基于哈希值的分片方法,代码如下: public int getPartition(K2 key, V2 value, int numReduceTasks) { return (key.hashCode() & Integer.MAX_VALUE) % numReduceTasks; } TotalOrderPartitioner 提供了一种基于区间的分片方法,通常用在数据全排序中。在 MapReduce 环境中,容易想到的全排序方案是归并排序,即在 Map 阶段,每个 Map Task 进行局部排序 ;在 Reduce 阶段,启动一个 Reduce Task 进行全局排序。由于作业只能有一 个 Reduce Task,因而 Reduce 阶段会成为作业的瓶颈。为了提高全局排序的性能和扩展性, MapReduce 提供了 TotalOrderPartitioner。它能够按照大小将数据分成若干个区间(分片), 并保证后一个区间的所有数据均大于前一个区间数据,这使得全排序的步骤如下: 步骤 1 数据采样。在 Client 端通过采样获取分片的分割点。Hadoop 自带了几个采样 算 法, 如 IntercalSampler、RandomSampler、SplitSampler 等( 具 体 见 org.apache.hadoop. mapred.lib 包中的 InputSampler 类)。下面举例说明。 采样数据为:b,abc,abd,bcd,abcd,efg,hii,afd,rrr,mnk 经排序后得到:abc,abcd,abd,afd,b,bcd,efg,hii,mnk,rrr 60   第二部分 MapReduce 编程模型篇 如果 Reduce Task 个数为 4,则采样数据的四等分点为 abd、bcd、mnk,将这 3 个字符 串作为分割点。 步骤 2 Map 阶段。本阶段涉及两个组件,分别是 Mapper 和 Partitioner。其中,Mapper 可 采用 IdentityMapper,直接将输入数据输出,但 Partitioner 必须选用 TotalOrderPartitioner, 它将步骤 1 中获取的分割点保存到 trie 树中以便快速定位任意一个记录所在的区间,这样, 每个 Map Task 产生 R(Reduce Task 个数)个区间,且区间之间有序。 TotalOrderPartitioner 通过 trie 树查找每条记录所对应的 Reduce Task 编号。如图 3-21 所示,我们将分割点保存在深度为 2 的 trie 树中,假设输入数据中有两个字符串“abg” 和“mnz”,则字符串“abg”对应 partition1,即第 2 个 Reduce Task,字符串“mnz”对应 partition3,即第 4 个 Reduce Task。 图 3-21 利用 trie 树对数据进行分片 步骤 3 Reduce 阶段。每个 Reducer 对分配到的区间数据进行局部排序,最终得到全 排序数据。 从以上步骤可以看出,基于 TotalOrderPartitioner 全排序的效率跟 key 分布规律和采样 算法有直接关系 ;key 值分布越均匀且采样越具有代表性,则 Reduce Task 负载越均衡,全 排序效率越高。 TotalOrderPartitioner 有两个典型的应用实例 :TeraSort 和 HBase 批量数据导入。其中, TeraSort 是 Hadoop 自带的一个应用程序实例。它曾在 TB 级数据排序基准评估中赢得第一 名 ,而 TotalOrderPartitioner 正是从该实例中提炼出来的。HBase 是一个构建在 Hadoop 之上的 NoSQL 数据仓库。它以 Region 为单位划分数据,Region 内部数据有序(按 key 排 序), Region 之间也有序。很明显,一个 MapReduce 全排序作业的 R 个输出文件正好可对 应 HBase 的 R 个 Region。3132  http://sortbenchmark.org/  http://hbase.apache.org/ 第 3 章 MapReduce 编程模型   61 新版 API 中的 Partitioner 类图如图 3-22 所示。它不再实现 JobConfigurable 接口。当用 户需要让 Partitioner 通过某个 JobConf 对象初始化时,可自行实现 Configurable 接口,如: public class TotalOrderPartitioner extends Partitioner implements Configurable 图 3-22 新版 API 中的 Partitioner 类图 3.4 非 Java API 解析 3.4.1 Hadoop Streaming 的实现原理 Hadoop Streaming 是 Hadoop 为方便非 Java 用户编写 MapReduce 程序而设计的工具 包。它允许用户将任何可执行文件或者脚本作为 Mapper/Reducer,这大大提高了程序员的 开发效率。 Hadoop Streaming 要求用户编写的 Mapper/Reducer 从标准输入中读取数据,并将结果 写到标准数据中,这类似于 Linux 中的管道机制。 1. Hadoop Streaming 编程实例 以 WordCount 为例,可用 C++ 分别实现 Mapper 和 Reducer,具体方法如下(这里仅 是最简单的实现,并未全面考虑各种异常情况)。 Mapper 实现的具体代码如下: int main() { //Mapper 将会被封装成一个独立进程,因而需要有 main() 函数 string key; while(cin >> key) { // 从标准输入流中读取数据 // 输出中间结果,默认情况下 TAB 为 key/value 分隔符 cout << key << "\t" << "1" << endl; } return 0; } Reducer 实现的具体代码如下: int main() { //Reducer 将会被封装成一个独立进程,因而需要有 main() 函数 string cur_key, last_key, value; cin >> cur_key >> value; last_key = cur_key; int n = 1; while(cin >> cur_key) { // 读取 Map Task 输出结果 cin >> value; 62   第二部分 MapReduce 编程模型篇 if(last_key != cur_key) { // 识别下一个 key cout << last_key << "\t" << n << endl; last_key = cur_key; n = 1; } else { // 获取 key 相同的所有 value 数目 n++; //key 值相同的,累计 value 值 } } return 0; } 分别编译这两个程序,生成的可执行文件分别是 wc_mapper 和 wc_reducer,并将它们 和 contrib/streaming/ hadoop-streaming-1.0.0.jar 一起复制到 Hadoop 安装目录下,使用以下 命令提交作业: $HADOOP_HOME/bin/hadoop jar $HADOOP_HOME/hadoop-streaming-1.0.0.jar \ -files wc_mapper,wc_reducer \ -input /test/intput \ -output /test/output \ -mapper wc_mapper \ -reducer wc_reducer 由于 Hadoop Streaming 类似于 Linux 管道,这使得测试变得非常容易。用户可直接在 本地使用下面命令测试结果是否正确: cat test.txt | ./wc_mapper | sort | ./wc_reducer 2. Hadoop Streaming 实现原理分析 Hadoop Streaming 工具包实际上是一个使用 Java 编写的 MapReduce 作业。当用户使用 可执行文件或者脚本文件充当 Mapper 或者 Reducer 时,Java 端的 Mapper 或者 Reducer 充 当了 wrapper 角色,它们将输入文件中的 key 和 value 直接传递给可执行文件或者脚本文件 进行处理,并将处理结果写入 HDFS。 实现 Hadoop Streaming 的关键技术点是如何使用标准输入输出实现 Java 与其他可执 行文件或者脚本文件之间的通信。为此,Hadoop Streaming 使用了 JDK 中的 java.lang. ProcessBuilder 类。该类提供了一整套管理操作系统进程的方法,包括创建、启动和停止进 程(也就是应用程序)等。相比于 JDK 中的 Process 类,ProcessBuilder 允许用户对进程进 行更多控制,包括设置当前工作目录、改变环境参数等。 下面分析 Mapper 的执行过程(Reducer 的类似)。整个过程如图 3-23 所示,Hadoop Streaming 使用 ProcessBuilder 以独立进程方式启动可执行文件 wc_mapper,并创建该进程 的输入输出流,具体实现代码如下: … // 将 wc_mapper 封装成一个进程 ProcessBuilder builder = new ProcessBuilder("wc_mapper"); builder.environment().putAll(childEnv.toMap()); // 设置环境变量 sim = builder.start(); 第 3 章 MapReduce 编程模型   63 // 创建标准输出流 clientOut_ = new DataOutputStream(new BufferedOutputStream( sim.getOutputStream(), BUFFER_SIZE)); // 创建标准输入流 clientIn_ = new DataInputStream(new BufferedInputStream( sim.getInputStream(), BUFFER_SIZE)); // 创建标准错误流 clientErr_ = new DataInputStream(new BufferedInputStream(sim.getErrorStream())); 图 3-23 Hadoop Streaming 工作原理图 Hadoop Streaming 提 供 了 一 个 默 认 的 PipeMapper。 它 实 际 上 是 C++ 端 Mapper 的 wrapper,主要作用是向已经创建好的输出流 clientOut_ 中写入数据,具体实现代码如下: public void map(Object key, Object value, OutputCollector output, Reporter reporter) throws IOException { … clientOut_.write(key, 0, keySize); clientOut_.write(mapInputFieldSeparator); clientOut_.write(value, 0, valueSize); clientOut_.write('\n'); } 写入 clientOut_ 的数据直接成为 wc_mapper 的输入,待数据被处理完后,可直接从标 准输入流 clientIn_ 中获取结果: // MROutputThread public void run() { lineReader = new LineReader((InputStream)clientIn_, job_); while (lineReader.readLine(line) > 0) { splitKeyVal(line, line.getLength(), key, val); output.collect(key, val); } } 64   第二部分 MapReduce 编程模型篇 通 过 分 析 以 上 代 码 可 知, 由 于 Hadoop Streaming 使 用 分 隔 符 定 位 一 个 完 整 的 key 或 value,因而只能支持文本格式数据,不支持二进制格式。在 0.21.0/0.22.X 系列版本 中,Hadoop Streaming 增加了对二进制文件的支持 ,并添加了两种新的二进制文件格 式 :RawBytes 和 TypedBytes。顾名思义,RawBytes 指 key 和 value 是原始字节序列,而 TypedBytes 指 key 和 value 可以拥有的数据类型,比如 boolean、list、map 等。由于它们采 用的是长度而不是某一种分隔符定位 key 和 value,因而支持二进制文件格式。33 RawBytes 传递给可执行文件或者脚本文件的内容编码格式为: <4 byte length><4 byte length> TypedBytes 允许用户为 key 和 value 指定数据类型。对于长度固定的基本类型,如 byte、bool、int、long 等,其编码格式为: <1 byte type code> <1 byte type code> 对于长度不固定的类型,如 byte array、string 等,其编码格式为: <1 byte type code> <4 byte length><1 byte type code><4 byte length> 当 key 和 value 大部分情况下为固定长度的基本类型时,TypedBytes 比 RawBytes 格式 更节省空间。 3.4.2 Hadoop Pipes 的实现原理 Hadoop Pipes 是 Hadoop 为方便 C/C++ 用户编写 MapReduce 程序而设计的工具。其设 计思想是将应用逻辑相关的 C++ 代码放在单独的进程中,然后通过 Socket 让 Java 代码与 C++ 代码通信以完成数据计算。 1. 编程实例 同样,以 WordCount 为例,采用 C++ 分别编写 Mapper 和 Reducer,具体方法如下。 Mapper 实现的具体代码如下: class WordCountMapper: public HadoopPipes::Mapper {// 注意基类 public: WordCountMapper(HadoopPipes::TaskContext& context) { // 在此初始化,比如定义计数器等 } //MapContext 封装了 Mapper 的各种操作 void map(HadoopPipes::MapContext& context){ std::vector words = HadoopUtils::splitString(context.getInputValue(), " "); for(unsigned int i=0; i < words.size(); ++i) { context.emit(words[i], "1"); // 用 emit 输出 key/value 对 } } };  https://issues.apache.org/jira/browse/HADOOP-1722 第 3 章 MapReduce 编程模型   65 Reducer 实现的具体代码如下: class WordCountReducer: public HadoopPipes::Reducer { public: WordCountReducer(HadoopPipes::TaskContext& context) { } //ReduceContext 封装了 Reducer 的各种操作 void reduce(HadoopPipes::ReduceContext& context) { int sum = 0; while (context.nextValue()) { // 迭代获取该 key 对应的 value 列表 sum += HadoopUtils::toInt(context.getInputValue()); } context.emit(context.getInputKey(), HadoopUtils::toString(sum)); } }; main() 函数的具体实现代码如下: // 每个 Hadoop Pipes 作业将被单独封装成一个进程,因此需要有 main() 函数 int main(int argc, char *argv[]) { return HadoopPipes::runTask( HadoopPipes::TemplateFactory()); } 编译之后生成可执行文件 wordcount,输入以下命令运行作业: bin/hadoop pipes \ -D hadoop.pipes.java.recordreader=true \ -D hadoop.pipes.java.recordwriter=true \ -D mapred.job.name=wordcount \ -input /test/intput \ -output /test/output \ -program wordcount 与 Hadoop Streaming 比较,可以发现,Hadoop Pipes 的一个缺点是调试不方便。因为 输入的数据是 Java 端代码通过 Socket 传到 C++ 应用程序的,所以用户不能单独对 C++ 部 分代码进行测试,而需要连同 Java 端代码一起启动。 2. 实现原理分析 Hadoop Pipes 的 实 现 原 理 与 Hadoop Streaming 非 常 类 似, 它 也 使 用 Java 中 的 ProcessBuilder 以单独进程方式启动可执行文件。不同之处是 Java 代码与可执行文件(或者 脚本)的通信方式:Hadoop Streaming 采用标准输入输出,而 Hadoop Pipes 采用 Socket。 Hadoop Pipes 由两部分组成 :Java 端代码和 C++ 端代码。与 Hadoop Streaming 一样, Java 端代码实际上实现了一个 MapReduce 作业,Java 端的 Mapper 或者 Reducer 实际上是 C++ 端 Mapper 或者 Reducer 的封装器(wrapper),它们通过 Socket 将输入的 key 和 value 直接传递给可执行文件执行。 Hadoop Pipes 具体执行流程如图 3-24 所示。该序列图阐释了执行 Mapper 时,Java 端 与 C++ 端通过 Socket 进行交互的过程,主要有以下几个步骤: 66   第二部分 MapReduce 编程模型篇 步骤 1 用户提交 Pipes 作业后,Java 端启动一个 Socket server(等待 C++ 端接入), 同时以独立进程方式运行 C++ 端代码。 步 骤 2 C++ 端 以 Client 身 份 连 接 Java 端 的 Socket server, 连 接 成功后,Java 端依次发送一系列指 令通知 C++ 端进行各项准备工作。 步 骤 3 Java 端 通 过 mapItem() 函 数 不 断 向 C++ 端 传 送 key/value 对,C++ 端将计算结果返回给 Java 端,Java 端对结果进行保存。 步骤 4 所有数据处理完毕后, Java 端通知 C++ 端终止计算,并关 闭 C++ 端进程。 上面分析了 Java 端与 C++ 端的 交互过程,接下来深入分析 Hadoop Pipes 内 部 实 现 原 理。 如 图 3-25 所 示,Java 端 用 PipesMapRunner 实 现 了 MapRunner, 在 MapRunner 内部,借助两个协议类 DownwardProtocol 和 UpwardProtocol 向 C++ 端发送数据 和从 C++ 端接收数据,而 C++ 端也有两个类与之对应,分别是 Protocol 和 UpwardProtocol。 Protocol 将收到的数据传给用户编写的 Mapper,经 Mapper、Combiner 和 Partitioner 处理后, 由 UpwardProtocol 返回给 Java 端的 UpwardProtocol,由它写到本地磁盘上。 图 3-25 Hadoop Pipes 内部实现原理图 图 3-24 Hadoop Pipes 中 Java 端与 C++ 端交互序列图 第 3 章 MapReduce 编程模型   67 3.5 Hadoop 工作流 前面仅是介绍了单一作业的编写方法,很多情况下,用户编写的作业比较复杂,相互 之间存在依赖关系,这种依赖关系可以用有向图表示,我们称之为“工作流”。本节将介绍 Hadoop 工作流的编写方法、设计原理以及实现。 3.5.1 JobControl 的实现原理 1. JobControl 编程实例 我们以第 2 章中的贝叶斯分类为例介绍。一个完整的贝叶斯分类算法可能需要 4 个有 依赖关系的 MapReduce 作业完成,传统的做法是 :为每个作业创建相应的 JobConf 对象, 并按照依赖关系依次(串行)提交各个作业,如下所示: // 为 4 个作业分别创建 JobConf 对象 JobConf extractJobConf = new JobConf(ExtractJob.class); JobConf classPriorJobConf = new JobConf(ClassPriorJob.class); JobConf conditionalProbilityJobConf = new JobConf(ConditionalProbilityJob.class); JobConf predictJobConf = new JobConf(PredictJob.class); ...// 配置各个 JobConf // 按照依赖关系依次提交作业 JobClient.runJob(extractJobConf); JobClient.runJob(classPriorJobConf); JobClient.runJob(conditionalProbilityJobConf); JobClient.runJob(predictJobConf); 如果使用 JobControl,则用户只需使用 addDepending() 函数添加作业依赖关系接口, JobControl 会按照依赖关系调度各个作业,具体代码如下: Configuration extractJobConf = new Configuration(); Configuration classPriorJobConf = new Configuration(); Configuration conditionalProbilityJobConf = new Configuration(); Configuration predictJobConf = new Configuration(); ...// 设置各个 Configuration // 创建 Job 对象。注意,JobControl 要求作业必须封装成 Job 对象 Job extractJob = new Job(extractJobConf); Job classPriorJob = new Job(classPriorJobConf); Job conditionalProbilityJob = new Job(conditionalProbilityJobConf); Job predictJob = new Job(predictJobConf); // 设置依赖关系,构造一个 DAG 作业 classPriorJob.addDepending(extractJob); conditionalProbilityJob.addDepending(extractJob); predictJob.addDepending(classPriorJob); predictJob.addDepending(conditionalProbilityJob); // 创建 JobControl 对象,由它对作业进行监控和调度 JobControl JC = new JobControl("Native Bayes"); JC.addJob(extractJob);// 把 4 个作业加入 JobControl 中 JC.addJob(classPriorJob); 68   第二部分 MapReduce 编程模型篇 JC.addJob(conditionalProbilityJob); JC.addJob(predictJob); JC.run(); // 提交 DAG 作业 在实际运行过程中,不依赖于其他任何作业的 extractJob 会优先得到调度,一旦运行完 成,classPriorJob 和 conditionalProbilityJob 两个作业同时被调度,待它们全部运行完成后, predictJob 被调度。 对比以上两种方案,可以得到一个简单的结论:使用 JobControl 编写 DAG 作业更加简 便,且能使多个无依赖关系的作业并行运行。 2. JobControl 设计原理分析 JobControl 由两个类组成 :Job 和 JobControl。其中,Job 类封装了一个 MapReduce 作 业及其对应的依赖关系,主要负责监控各个依赖作业的运行状态,以此更新自己的状态, 其状态转移图如图 3-26 所示。作业刚开始处于 WAITING 状态。如果没有依赖作业或者所 有依赖作业均已运行完成,则进入 READY 状态。一旦进入 READY 状态,则作业可被提 交到 Hadoop 集群上运行,并进入 RUNNING 状态。在 RUNNING 状态下,根据作业运行 情况,可能进入 SUCCESS 或者 FAILED 状态。需要注意的是,如果一个作业的依赖作业 失败,则该作业也会失败,于是形成“多米诺骨牌效应”,后续所有作业均会失败。 图 3-26 JobControl 中 Job 状态转移图 JobControl 封装了一系列 MapReduce 作业及其对应的依赖关系。它将处于不同状态的 作业放入不同的哈希表中,并按照图 3-26 所示的状态转移作业,直到所有作业运行完成。 在实现的时候,JobControl 包含一个线程用于周期性地监控和更新各个作业的运行状态, 调度依赖作业运行完成的作业,提交处于 READY 状态的作业等。同时,它还提供了一些 第 3 章 MapReduce 编程模型   69 API 用于挂起、恢复和暂停该线程。 3.5.2 ChainMapper/ChainReducer 的实现原理 ChainMapper/ChainReducer 主要为了解决线性链式 Mapper 而提出的。也就是说,在 Map 或者 Reduce 阶段存在多个 Mapper,这些 Mapper 像 Linux 管道一样,前一个 Mapper 的输出结果直接重定向到下一个 Mapper 的输入,形成一个流水线,形式类似于 [MAP+ REDUCE MAP*]。图 3-27 展示了一个典型的 ChainMapper/ChainReducer 的应用场景 :在 Map 阶段,数据依次经过 Mapper1 和 Mapper2 处理 ;在 Reduce 阶段,数据经过 shuffle 和 sort 后 ;交由对应的 Reducer 处理,但 Reducer 处理之后并没有直接写到 HDFS 上,而是交 给另外一个 Mapper 处理,它产生的结果写到最终的 HDFS 输出目录中。 图 3-27 ChainMapper/ChainReducer 应用实例 需 要 注 意 的 是, 对 于 任 意 一 个 MapReduce 作 业,Map 和 Reduce 阶 段 可 以 有 无 限 个 Mapper,但 Reducer 只能有一个。也就是说,图 3-28 所示的计算过程不能使用 ChainMapper/ ChainReducer 完成,而需要分解成两个 MapReduce 作业。 图 3-28 一个 ChainMapper/ChainReducer 不适用的场景 1. 编程实例 这里以图 3-27 中的作业为例,给出 ChainMapper/ChainReducer 的基本使用方法,具体 代码如下: … conf.setJobName("chain"); 70   第二部分 MapReduce 编程模型篇 conf.setInputFormat(TextInputFormat.class); conf.setOutputFormat(TextOutputFormat.class); JobConf mapper1Conf = new JobConf(false); JobConf mapper2Conf = new JobConf(false); JobConf reduce1Conf = new JobConf(false); JobConf mapper3Conf = new JobConf(false); … ChainMapper.addMapper(conf, Mapper1.class, LongWritable.class, Text.class,Text. class, Text.class, true, mapper1Conf); ChainMapper.addMapper(conf, Mapper2.class, Text.class, Text.class, LongWritable.class, Text.class, false, mapper2Conf); ChainReducer.setReducer(conf, Reducer.class, LongWritable.class, Text.class,Text. class, Text.class, true, reduce1Conf); ChainReducer.addMapper(conf, Mapper3.class, Text.class, Text.class, LongWritable.class, Text.class, false, null); JobClient.runJob(conf); 用户通过 addMapper 在 Map/Reduce 阶段添加多个 Mapper。该函数带有 8 个输入参 数,分别是作业的配置、Mapper 类、Mapper 的输入 key 类型、输入 value 类型、输出 key 类型、输出 value 类型、key/value 是否按值传递和 Mapper 的配置。其中,第 7 个参数需要 解释一下 :Hadoop MapReduce 有一个约定,函数 OutputCollector.collect(key, value) 执行 期间不应改变 key 和 value 的值。这主要是因为函数 Mapper.map() 调用完 OutputCollector. collect(key, value) 之后,可能会再次使用 key 和 value 值,如果被改变,可能会造成潜在 的错误。为了防止 OutputCollector 直接对 key/value 修改,ChainMapper 允许用户指定 key/ value 传递方式。如果用户确定 key/value 不会被修改,则可选用按引用传递,否则按值传 递。需要注意的是,引用传递可避免对象拷贝,提高处理效率,但需要确保 key/value 不会 被修改。 2. 实现原理分析 ChainMapper/ChainReducer 实 现 的 关 键 技 术 点 是 修 改 Mapper 和 Reducer 的 输 出 流,将本来要写入文件的输出结果重定向到另外一个 Mapper 中。在 3.3.4 节中提到,结 果的输出由 OutputCollector 管理,因而,ChainMapper/ChainReducer 需要重新实现一个 OutputCollector 完成数据重定向功能。 尽管链式作业在 Map 和 Reduce 阶段添加了多个 Mapper,但仍然只是一个 MapReduce 作 业, 因 而 只 能 有 一 个 与 之 对 应 的 JobConf 对 象。 然 而, 当 用 户 调 用 addMapper 添 加 Mapper 时,可能会为新添加的每个 Mapper 指定一个特有的 JobConf,为此,ChainMapper/ ChainReducer 将这些 JobConf 对象序列化后,统一保存到作业的 JobConf 中。图 3-27 中的 实例可能产生如表 3-3 所示的几个配置选项。 第 3 章 MapReduce 编程模型   71 表 3-3 图 3-27 中实例对应的几个配置选项 配置参数 参数值 chain.mapper.mapper.config.0 Mapper1Conf 序列化后的字符串 chain.mapper.mapper.config.1 Mapper2Conf 序列化后的字符串 chain.reducer.reducer.config.0 ReducerConf 序列化后的字符串 chain.reducer.mapper.config.0 Mapper3Conf 序列化后的字符串 当链式作业开始执行的时候,首先将各个 Mapper 的 JobConf 对象反序列化,并构造对 应的 Mapper 和 Reducer 对象,添加到数据结构 mappers(List 类型)和 reducer (Reducer 类型)中。ChainMapper 中实现的 map() 函数如下,它调用了第一个 Mapper,是 后续 Mapper 的“导火索”。 public void map(Object key, Object value, OutputCollector output, Reporter reporter) throws IOException { Mapper mapper = chain.getFirstMap(); if (mapper != null) { mapper.map(key, value, chain.getMapperCollector(0, output, reporter),reporter); } } chain.getMapperCollector 返回一个 OutputCollector 实现—ChainOutputCollector,它 的 collect 方法如下: public void collect(K key, V value) throws IOException { if (nextMapperIndex < mappers.size()) { // 调用下一个 Mapper nextMapper.map(key, value, new ChainOutputCollector(nextMapperIndex, nextKeySerialization, nextValueSerialization, output, reporter), reporter); } else { // 如果是最后一个 Mapper,则直接调用真正的 OutputCollector output.collect(key, value); } 3.5.3 Hadoop 工作流引擎 前 面 介 绍 的 JobControl 和 ChainMapper/ChainReducer 仅 可 看 作 运 行 工 作 流 的 工 具。 它们只具备最简单的工作流引擎功能,比如工作流描述、简单的作业调度等。为了增强 Hadoop 支持工作流的能力,在 Hadoop 之上出现了很多开源的工作流引擎,主要可概括为 两类:隐式工作流引擎和显式工作流引擎。 隐式工作流引擎在 MapReduce 之上添加了一个语言抽象层,允许用户使用更简单的 方式编写应用程序,比如 SQL、脚本语言等。这样,用户无须关注 MapReduce 的任何 72   第二部分 MapReduce 编程模型篇 细节,降低了用户的学习成本,并可大大提高开发效率。典型的代表有 Hive 、Pig 和 Cascading 。它们的架构如图 3-29 所示,从上往下分为以下三层。34353637 功能描述层 :直接面向用户提供了一种简单的应用 ❑ 程序编写方法,比如,Hive 使用 SQL,Pig 使用 Pig Latin 脚本语言,Cascading 提供了丰富的 Java API。 作业生成器:作业生成器主要将上层的应用程序转化 ❑ 成一批 MapReduce 作业。这一批 MapReduce 存在相 互依赖关系,实际上是一个 DAG。 调度引擎 :调度引擎直接构建于 MapReduce 环境之 ❑ 上,将作业生成器生成的 DAG 按照依赖关系提交到 MapReduce 上运行。 显式工作流引擎直接面向 MapReduce 应用程序开发 者,提供了一种作业依赖关系描述方式,并能够按照这种描述方式进行作业调度。典型的 代表有 Oozie 和 Azkaban 。它们的架构如图 3-30 所示,从上往下分为以下两层。 工作流描述语言 :工作流描述语言用于描述作业的 ❑ 依赖关系。Oozie 采用了 XML,而 Azkaban 采用了 key/value 格式的文本文件。需要注意的是,这里的 作业不仅仅是指 MapReduce 作业,还包括 Shell 命 令、Pig 脚本等。也就是说,一个 MapReduce 可能 依赖一个 Pig 脚本或者 Shell 命令。 调度引擎 :同隐式工作流引擎的调度引擎功能相同, ❑ 即根据作业的依赖关系完成作业调度。 表 3-4 对比了显式工作流引擎与 Hadoop 自带的 JobControl 的不同之处。尽管它们均用 于解决 Hadoop 工作流调度问题,但是在设计思路、使用方法、应用场景等方面都存在明显 的不同。 表 3-4 显式工作流引擎与 JobControl 对比 特性 Oozie/Azkaban JobControl 工作流描述方式 有专门的描述语言 Java API 执行模型 server-side client-side 是否可跟踪运行进度 可以,通过界面 不可以 是否需要安装 是 否 是否有重试功能 有,可设置作业失败重试机制 没有  可参考 http://hive.apache.org/ 相关文档。  可参考 http://pig.apache.org/ 相关文档。  可参考 http://www.cascading.org/ 相关文档。  可参考 http://sna-projects.com/azkaban/ 相关文档。 图 3-29 隐式工作流引擎架构图 图 3-30 显式工作流引擎架构图 第 3 章 MapReduce 编程模型   73 特性 Oozie/Azkaban JobControl 依 赖 关 系 中 是 否 可 有 Pig 脚 本、 Shell 命令等 可以 不可以,依赖关系只存在于使用 Java 编写的 MapReduce 作业之间 3.6 小结 MapReduce 编程模型直接决定了 MapReduce 的易用性。本章从简单地使用实例、设计 原理以及调用时机等方面介绍了 MapReduce 编程模型中的各个组件。 从整个体系结构上看,整个编程模型位于应用程序层和 MapReduce 执行器之间,可以 分为两层 :第一层是最基本的 Java API,第二层构建于 Java API 之上,添加了几个方便用 户编写复杂的 MapReduce 程序和利用其他语言编写 MapReduce 程序的工具。 Java API 分为新旧两套 API。新 API 在旧 API 基础上封装而来,在易用性和扩展性方 面更好。 为了方便用户采用非 Java 语言编写 MapReduce 程序,Hadoop 提供了 Hadoop Streaming 和 Hadoop Pipes 两个工具。它们本质上都是一个 MapReduce 作业,区别在于 Java 语言与非 Java 语言之间的通信机制。 考虑到实际应用中,用户有时不只是编写单一的 MapReduce 作业,而是存在复杂依 赖 关 系 的 DAG 作 业( 工 作 流 ),Hadoop MapReduce 提 供 了 JobControl 和 ChainMapper/ ChainReducer 两个工具。 (续)
还剩87页未读

继续阅读

下载pdf到电脑,查找使用更方便

pdf的实际排版效果,会与网站的显示效果略有不同!!

需要 10 金币 [ 分享pdf获得金币 ] 3 人已下载

下载pdf