Google MapReduce中文版


GoogleGoogleGoogleGoogle MapReduce MapReduce MapReduce MapReduce 中文 版 译者 :::: alexalexalexalex 摘要 MapReduce 是一个编程模型,也是一个处理和生成超大数据集的算法模型的相 关实现。用户首先创建一个 Map 函 数处理一个 基 于 key/value pair 的数据集合 ,输出 中间 的基 于 key/value pair 的数据集合 ;然后 再创 建一 个 Reduce 函数 用来合并所有的具有相同中 间 key 值的中间 value 值。 现 实世界中有很多满足上述处理模型的例子, 本 论文将详细描 述 这个模型。 MapReduce 架构的程序能够在 大量的普 通配置 的计算机 上实现 并行化处 理。这 个系统在 运行时 只关心: 如何分割 输入数据,在大量 计算机组 成的集 群上的调 度,集 群中计算 机的错 误处理, 管理集 群中计算 机之间 必要的通 信。采用 MapReduce 架构可以使那些没有并行计算和分布式 处理系统开发经 验的程序员有效 利用分布式系统 的丰富资源。 我们的 MapReduce 实现运行在规模可以灵活调整的由普通 机器组成的集群 上: 一 个典型的 MapReduce 计算往往 由 几千台机 器组 成 、处 理 以 TB计算的数 据。 程 序员 发现 这个 系 统非 常好 用: 已 经实 现了 数以 百 计 的 MapReduce 程序, 在Google 的集群上,每天都有 1000 多个 MapReduce 程序在执行。 1111、介 绍 在过去的 5年里, 包 括本文作者在内的 Google 的很多程序员, 为 了处理海量的原始数据, 已 经实现了数以百计 的、 专用的计算方法。这些计算方法用来处理大量的原始数据,比如,文档抓取 (类似网络爬虫的程序) 、 Web 请求日志 等 等;也为了计算处 理各种类 型的衍 生数据, 比如倒 排索引、 Web 文档的图结构的各 种表示形 势、每 台主机上 网络爬虫 抓取的页面数量的 汇总、每 天被请 求的最多 的查询 的集合等 等。大 多数这样 的数据 处理运算 在概念 上很容易 理解。然 而由于输入的数据 量巨大, 因此要 想在可接 受的时 间内完成 运算, 只有将这 些计算 分布在成 百上千 的主机 上。如何处 理并行计算、如何 分发数据 、如何 处理错误 ?所有 这些问题 综合在 一起,需 要大量 的代码处 理,因 此也使得 原本简单 的运算变得难以处理。 为了解决上述复杂 的问题, 我们设 计一个新 的抽象 模型,使 用这个 抽象模型 ,我们 只要表述 我们想 要执行的 简单 运算即可,而不必 关心并行 计算、 容错、数 据分布 、负载均 衡等复 杂的细节 ,这些 问题都被 封装在 了一个库 里面。设 计这个抽象模型的灵感来 自 Lisp 和许多其他函数式语言的 Map 和Reduce 的原语。我们意识到我们大多数的运算 都包 含这样的操作: 在 输入数据的 “逻辑 ”记录上应用 Map 操作得出一个中间 key/value pair 集合, 然后在所有具有相同 key 值的 value 值上应用 Reduce 操作, 从而达到合并中间的数据,得到一个想 要的结果的目的。使用 MapReduce 模型, 再 结合用户实现的 Map 和Reduce 函数, 我们 就可以非常容易的实现大规模并行 化计算; 通过 MapReduce 模型自带的 “再 次执行 ”(re-execution)功能,也提供了初级的容灾实现方案 。 这个工作 (实现一个 MapReduce 框架模型 )的主要贡献是通过 简单的接 口来实 现自动的 并行化 和大规模 的分布 式计 算,通过使用 MapReduce 模型接口实现在大量普通 的 PC 机上高性能计算。 第二部分描述基本 的编程模 型和一 些使用案 例。第 三部分描 述了一 个经过裁 剪的、 适合我们 的基于 集群的计 算环 境的 MapReduce 实现。 第 四部分描述我们认为在 MapReduce 编程模型中一些实用的技巧。 第 五部分对于各种不同的 任 务,测量我们 MapReduce 实现的性能。第六 部分揭示 了 在 Google 内部如何使用 MapReduce 作为基础重写我们 的索引 系统产品,包括其它一些使 用 MapReduce 的经验。第七部分讨论相关的和未来的 工作。 2222、编 程模型 MapReduce 编程模型的原理是: 利 用一个输入 key/value pair 集合来产生一个输出的 key/value pair 集合。MapReduce 库的用户用两个函数表达这个计算: Map 和Reduce。 用户自定义的 Map 函数接受一个输入的 key/value pair 值, 然 后产生一个中间 key/value pair 值的集合。 MapReduce 库把所有具有相同中间 key 值I的中间 value 值集合在一起后传递给 reduce 函数。 用户自定义的 Reduce 函数接受一个中间 key 的值 I和相关的一个 value 值的集合。 Reduce 函数合并这些 value 值, 形成一个较 小 的 value 值的集合。 一般的 ,每 次 Reduce 函数调用只 产 生 0或1个输出 value 值。通常我 们通过 一个迭 代器把中间 value 值提供给 Reduce 函数,这样我们就可以处理无法全部放 入内存中的大量 的 value 值的集合。 2.12.12.12.1、例 子 例如,计算一个大的文档集合中每个单 词出现的次数, 下面是伪代码段 : map(String key, String value): // key: document name // value: document contents for each word w in value: EmitIntermediate(w, "1"); reduce(String key, Iterator values): // key: a word // values: a list of counts int result = 0; for each v in values: result += ParseInt(v); Emit(AsString(result)); Map 函数输出文档中的 每个词、 以及这 个词的出 现次数 (在这个简单的例子 里就 是 1)。Reduce 函数把 Map 函数产 生的每一个特定的词的计数累加起来。 另外, 用 户编写代码, 使 用输入和输出文件的名字、 可 选的调节参数来完成一个符合 MapReduce 模型规范的对 象, 然后调用 MapReduce 函数,并把这个规范对象传递给它。用户的代码和 MapReduce 库链接在一起 (用C++实现 )。附 录 A包含了这个实例的全部程序代码。 2.22.22.22.2、类 型 尽管在前面 例子的 伪代 码中 使用了 以字 符串 表示的 输入 输出 值,但 是在 概念 上,用 户定 义 的 Map 和Reduce 函数 都有相关联的类型: map(k1,v1) ->list(k2,v2) reduce(k2,list(v2)) ->list(v2) 比如, 输入的 key 和value 值与输出的 key 和value 值在类型上推导的域不同。 此 外, 中间 key 和value 值与输出 key 和value 值在类型上推导的域相同。 (alex 注: 原文中这个 domain 的含义不是很清楚,我参考 Hadoop、KFS 等实现, map 和reduce 都使用了泛型,因此 , 我把 domain 翻译成类型推导的域) 。 我们的 C++中使用字符串类型作为用户自定义函数 的输入输出,用 户在自己的代码 中对字符串进行 适当的类型转换 。 2.32.32.32.3、更 多的例子 这里还有一些有趣的简单例子,可以很 容易的使 用 MapReduce 模型来表示: 分布式的 Grep:Map 函数输出匹配某个模式的一行, Reduce 函数是一个恒等函数,即把中间数据复 制到输出。 计算 URL访问频率: Map 函数处理日志中 web 页面请求的记录,然后输出 (URL,1)。Reduce 函数把相同 URL的value 值都累加起来,产生 (URL,记录总数 )结果。 倒转网络 链接 图 : Map 函数在源 页面 ( source)中搜索 所有 的 链接 目标 ( target)并输出 为 (target,source)。Reduce 函数把给定链接目标( target)的链接组合成一个列表,输出 (target,list(source))。 每个主机的 检索词 向量 :检 索词向 量用 一个 (词,频率 )列表来概述 出现在 文档 或文 档集中 的最 重要 的一些 词。 Map 函数为每一个输入文档输出 (主机名 ,检索词向量 ), 其中主机名来自文档的 URL。Reduce 函数接收给定主机的所有文 档 的检索词向量,并把这些检索词向量加 在一起,丢弃掉 低频的检索词, 输出一个最终的 (主机名 ,检索词向量 )。 倒排索引: Map 函数分析每个文档输出一个 (词,文档号 )的列表, Reduce 函数的输入是一个给定词的所有(词, 文 档号) , 排序所有的文档号,输 出 (词,list(文档号) )。 所有的输出集合形成一个简单的倒排索引,它以一种简单的算 法 跟踪词在文档中的位置。 分布式排序 : Map 函数从每个 记录提 取 key,输出 (key,record)。Reduce 函数不改变 任何的 值。 这个 运算依 赖分区 机制 (在4.1 描述 )和排序属性 (在4.2 描述 )。 3333、实 现 MapReduce 模型可以有多种不 同的实现 方式。 如何正确 选择取 决于具体 的环境 。例如, 一种实 现方式适 用于小型 的共享内存方式的机器, 另 外一种实现方式则适用于大 型 NUMA 架构的多处理器的主机, 而 有的实现方式更适合大 型 的网络连接集群。 本章节描述一个适用于 Google 内部广泛使用的运算环境的实现:用以 太网交换机连接 、由普 通 PC 机组成的大型 集群。在我们的环境里包括: 1.x86 架构、运行 Linux 操作系统、双处理器、 2-4GB 内存的机器。 2.普通的网络硬件设备,每个机器的带宽 为百兆或者千兆 ,但是远小于网 络的平均带宽的 一半。 (alex 注:这里 需要网络专家解释一下了) 3.集群中包含成百上千的机器,因此,机 器故障是常态。 4.存储为廉价的内 置 IDE硬盘。一个内部分 布式文件 系统用 来管理存 储在这 些磁盘上 的数据 。文件系 统通过 数据 复制来在不可靠的硬件上保证数据的可 靠性和有效性。 5.用户提交工作( job)给调度系统。每个工作( job)都包含一系列的任务( task) , 调度系统将这些任务调度到 集 群中多台可用的机器上。 3.1、执行概括 通过将 Map 调用的输入 数据自 动分 割 为 M个数据片段 的集合 , Map 调用被分布 到多台 机器 上执 行。输 入的 数据 片段能够在不同的机器上并行处理。 使 用分区函数将 Map调用产生的中间 key值分成 R个不同分区(例如,hash(key) mod R),Reduce 调用也被分布到多台机器上执行。分区 数量( R)和分区函数由用户来指定。 图1展示了我们的 MapReduce 实现中操作的全部流程。 当 用户调用 MapReduce 函数时, 将发生下面的一系列动 作 (下面的序号和图 1中的序号一一对应) : 1.用户 程 序 首 先 调 用 的 MapReduce 库将 输 入 文 件 分 成 M个数 据片度,每个数据片段的大小一般从16MB 到 64MB(可以通过可选的参数来控制每个数据片 段的大小 )。 然后用户程序在机群中创建大量的程序副本。 (alex:copies of the program 还真难翻译) 2.这些程序副 本中的 有一 个特 殊的程 序 --master。副本中其 它的程 序都 是 worker 程序,由 master 分配任务。 有 M 个Map 任务和 R个Reduce 任务将被分配, master 将一个 Map 任务或 Reduce 任务分配给一个空闲的 worker。 3.被分配了 map 任务的 worker 程序读取相关的输入数据片段,从输入 的数据片段中解 析 出 key/value pair,然后把 key/value pair 传递给用户自定义的 Map 函数,由 Map 函数生成并输出的中间 key/value pair,并缓存在内存中。 4.缓存中的 key/value pair 通过分区函数分成 R个区域,之后周期性的写入到本地磁盘上 。缓存 的 key/value pair 在 本地磁盘上的存储位置将被回传 给 master,由 master 负责把这些存储位置再传送 给 Reduce worker。 5.当Reduce worker 程序接收到 master 程序发来的数据存储位置信息后,使 用 RPC 从Map worker 所在主机的磁 盘 上读取这些缓存数据。当 Reduce worker 读取了所有的中间数据后,通过对 key 进行排序后使得具有相同 key 值的数 据 聚合在一起。由于许多不同的 key 值会映射到相同的 Reduce 任务上, 因此必须进行排序。如果中间数据太大无法在 内 存中完成排序,那么就要在外部进行排 序。 6.Reduce worker 程序遍历排序后的中间数据,对于每一个 唯一的中 间 key 值, Reduce worker 程序将这个 key 值和 它相关的中间 value 值的集合传递给用户自定义 的 Reduce 函数。 Reduce 函数的输出被追加到所属分区的输出文 件。 7.当所有 的 Map 和Reduce 任务都完 成之 后 , master 唤醒用户 程序 。 在这 个时 候, 在 用户 程序 里的 对 MapReduce 调用才返回。 在成功完成任务之后, MapReduce 的输出存放在 R个输出文件中 (对应每个 Reduce 任务产生一个输出文件, 文 件 名由 用 户 指 定) 。一般 情 况 下 , 用 户 不 需 要 将 这 R个输 出文件合并成一个文件--他们 经常把这些文件作为另外一个 MapReduce 的输入,或者在另外一个可以处理多个 分割文件的分布 式应用中使用。 3.23.23.23.2、Master Master Master Master 数据结 构 Master 持有一些数据结构, 它 存储每一个 Map 和Reduce 任务的状态 ( 空闲、 工 作中或完成 ), 以及 Worker 机器 (非 空闲任务的机器 )的标识。 Master 就像一个数据管道, 中间文件存储区域的位置信息通过这个管道 从 Map 传递到 Reduce。 因 此, 对 于每个 已 经完成的 Map 任务, master 存储了 Map 任务产生的 R个中间文件存储区域的大小和位置。 当 Map 任务完成时, Master 接收到位置和大小的更新信息,这些信 息被逐步递增的 推送给那些正在 工作 的 Reduce 任务。 3.33.33.33.3、容 错 因为 MapReduce 库的设计初衷是使用由成百上千的机器 组成的集群来处 理超大规模的数 据,所以,这个库必须 要 能很好的处理机器故障。 ·worker worker worker worker 故障 master 周期性的 ping 每个 worker。如果在一 个约定 的时 间范 围内没 有收 到 worker 返回的信息 , master 将把这个 worker 标记为失效。 所有由这个失效的 worker 完成的 Map 任务被重设为初始的空闲状态, 之后这些任务就可以被安 排 给其他的 worker。同样的, worker 失效时正在运行的 Map 或Reduce 任务也将被重新置为空闲状态,等待重 新调度。 当worker 故障时, 由 于已经完成的 Map 任务的输出存储在这台机器上, Map 任务的输出已不可访问了, 因此必 须 重新执行。而已经完成的 Reduce 任务的输出存储在全局文件系统上,因 此不需要再次执 行。 当一个 Map 任务首先 被 worker A执行,之 后由 于 worker A失效了又 被调 度 到 worker B执行,这 个 “重新执行 ” 的动作会被通知给所有执 行 Reduce 任务的 worker。 任 何还没有从 worker A读取数据的 Reduce 任务将从 worker B读取 数据。 MapReduce 可以处理大规模 worker 失效的情况。 比如, 在一个 MapReduce 操作执行期间, 在 正在运行的集群上 进 行网络维 护引 起 80 台机器在 几分 钟 内不 可访 问了 , MapReduce master 只需要简 单的 再 次执 行那 些不 可 访问 的 worker 完成的工作,之后继续执行未完成的任 务,直到最终完 成这 个 MapReduce 操作。 ·master master master master 失败 一个简单的解决办法是让 master 周期性的将上面描述的数据结构 ( alex 注:指3.2 节)的写入磁盘,即检查点(checkpoint)。 如果这个 master 任务失效了, 可以从最后一个检查点 ( checkpoint) 开 始启动另一个 master 进程。 然 而, 由 于只有一 个 master 进程, master 失效后再恢 复是比 较麻 烦的 ,因此 我们 现在 的实现 是如 果 master 失效,就中 止 MapReduce 运算。 客户可以检查到这个状态,并且可以根 据需要重新执 行 MapReduce 操作。 ·在失 效方面的 处理机制 (alex 注:原文为 "semantics in the presence of failures")当用户提供 的 Map 和Reduce 操作是输入确定性 函数(即 相同的输入产生相 同的输出 )时, 我们的分 布式实 现在任何 情况下 的输出都 和所有 程序没有 出现任 何错误、 顺序的执 行产生的输出是一样的。 我们依赖 对 Map 和Reduce 任务的输出 是原子 提交 的来 完成这 个特 性。 每个工 作中 的任 务把它 的输 出写 到私有 的 临时文件中。每个 Reduce 任务生成一个这样的文件,而每个 Map 任务则生成 R个这样的文件( 一个 Reduce 任务对 应 一个文件) 。 当一个 Map 任务完成的时, worker 发送一个包含 R个临时文件名的完成消息 给 master。如果 master 从一 个已经完成的 Map 任务再次接收到到一个完成消息, master 将忽略这个消息;否则, master 将这 R个文件的名字记 录 在数据结构里。 当Reduce 任务完成时,Reduce worker 进程以原子的方式把临时文件重命名为 最终的输出文件。 如果同一个 Reduce 任务在多台机器上 执行,针 对同一 个最终的 输出文 件将有多 个重命 名操作执 行。我 们依赖底 层文件 系统提供 的重命名 操作的原子性来保证最终的文件系统状 态仅仅包含一 个 Reduce 任务产生的数据。 使用 MapReduce 模型的程序员可以很容易的理解他们程 序的行为,因为 我们绝大多数 的 Map 和Reduce 操作是确 定性的,而且存在这样的一个事实:我 们的失效处理机 制等价于一个顺 序的执行的操作 。 当 Map 或/和Reduce 操作是 不确定性的时候,我们提供虽然较弱但 是依然合理的处 理机制。当使用 非确定操作的时 候,一 个 Reduce 任务 R1 的输 出等价于一个非确定性程序顺序执行产 生时的输出。但 是,另一 个 Reduce 任务 R2 的输出也许符合一个不同的非确定 顺序程序执行产生的 R2 的输出。 考虑 Map 任务 M和Reduce 任务 R1、R2 的情况。我 们设 定 e(Ri)是Ri 已经提交的 执行过 程( 有且 仅有一 个这样 的执行过程) 。当 e(R1)读取了由 M一次执行产生的输出,而 e(R2)读取了由 M的另一次执行产生的输出,导致了较 弱 的失效处理。 3.43.43.43.4、存 储位置 在我们的计算运行 环境中, 网络带 宽是一个 相当匮 乏的资源 。我们 通过尽量 把输入 数据 (由GFS 管理 )存储在集群中机 器的本地磁盘上来 节省网络 带宽。 GFS 把每个文件按 64MB 一个 Block 分隔,每个 Block 保存在多台机器上 ,环境中 就存放了多份拷贝 (一般是 3个拷贝 )。MapReduce 的master 在调度 Map 任务时会考虑输入文件的位置信息,尽 量将一 个Map 任务调度在包含相关输入数据拷贝的机 器上执行;如果 上述努力失败了 , master 将尝试在保存有输入数据拷贝 的机器附近的机器 上执 行 Map 任务 (例如,分配到一个 和包含输 入 数据的机器在一 个 switch 里的 worker 机器上执行 )。 当在一个足够大的 cluster 集群上运行大型 MapReduce 操作的时候, 大 部分的输入数据都能从本地机器读取,因此消 耗 非常少的网络带宽。 3.53.53.53.5、任 务粒度 如前所述, 我们把 Map 拆分成了 M个片段、把 Reduce 拆分成 R个片段执行。 理 想情况下, M和R应当比集群 中 worker 的机器数量要多得多。在每台 worker 机器都执行大量的不同任务能够提高集 群的动态的负载 均衡能力,并且 能 够加快故障恢复的速度:失效机器上执 行的大 量 Map 任务都可以分布到所有其他 的 worker 机器上去执行。 但是实际上,在我 们的具体 实现中 对 M和R的取值都有一定的 客观限制 ,因 为 master 必须执行 O(M+R)次调度, 并且在内存中保存 O(M*R)个状态 ( 对影响内存使用的因素还是比较小的: O(M*R)块状态, 大概每对 Map 任务 /Reduce 任务 1个字节就可以了) 。 更进一步, R值通常是由用户指定的,因为每个 Reduce 任务最终都会生成一个独立的输出文件。实际使用时我 们 也倾向于选择合适的 M值,以使得每一个独立任务都是处理大 约 16M 到64M 的输入数据(这样,上面描写的输入数 据本地存储优化策 略才最有 效) ,另外,我们把 R值设置为我们想使 用 的 worker 机器数量的小的倍 数。我们 通常会用 这样的比例来执行 MapReduce:M=200000,R=5000,使用 2000 台worker 机器。 3.63.63.63.6、备 用任务 影响一个 MapReduce 的总执行时 间最通 常的 因素 是 “落伍者 ”:在运算过 程中, 如果 有一 台机器 花了 很长 的时间 才完成最 后几 个 Map 或Reduce 任务,导 致 MapReduce 操作总的 执行 时 间超 过预 期。 出 现 “落伍者 ”的原因非 常多。 比如:如果一个机 器的硬盘 出了问 题,在读 取的时 候要经常 的进行 读取纠错 操作, 导致读取 数据的 速度 从 30M/s 降低 到1M/s。 如果 cluster 的调度系统在这台机器上又调度了其他 的任务,由于 CPU、 内存、 本地硬盘和网络带宽等竞争 因 素的存在, 导 致执行 MapReduce 代码的执行效率更加缓慢。 我们 最 近遇到的一个问题是由于机器的初始化 代码 有 bug, 导致关闭了的处理器的缓存:在这些机 器上执行任务的 性能和正常情况 相差上百倍。 我们有一个通用的机制来减少 “落伍者 ”出现的情况。当一个 MapReduce 操作接近完成的时候, master 调度备用 (backup) 任 务进程来执行剩下的、 处 于处理中状态 ( in-progress) 的 任务。 无 论是最初的执行进程、 还 是备用 ( backup) 任务进程完成了任 务,我们 都把这 个任务标 记成为 已经完成 。我们 调优了这 个机制 ,通常只 会占用 比正常操 作多几个 百分点的计算资源。我们发现采用这样 的机制对于减少 超 大 MapReduce 操作的总处理时间效果显著。例如, 在 5.3 节 描述的排序任务,在关闭掉备用任务的 情况下要多 花 44%的时间完成排序任务。 4444、技 巧 虽然简单 的 Map 和Reduce 函数提供的 基本功 能已 经能 够满足 大部 分的 计算需 要, 我们 还是发 掘出 了一 些有价 值 的扩展功能。本节将描述这些扩展功能 。 4.14.14.14.1、分 区函数 MapReduce 的使用者通常会指定 Reduce 任务和 Reduce 任务输出文件的数量( R) 。 我们在中间 key 上使用分区函 数来对数据进行分区, 之 后再输入到后续任务执行进程。 一 个缺省的分区函数是使用 hash 方法 (比如, hash(key) mod R) 进行分区。 hash 方法能产生非常平衡的分区。然而, 有的时候, 其它的一些分区函数对 key 值进行的分区将非常有用 。 比如, 输 出的 key 值是 URLs, 我们 希望每个主机的所有条目保持在同一个输 出文件中。 为 了支持类似的情况, MapReduce 库的用户需要提供专门的分区函数。例 如, 使 用 “hash(Hostname(urlkey)) mod R”作为分区函数就可以把所有来自同 一 个主机的 URLs 保存在同一个输出文件中。 4.24.24.24.2、顺 序保证 我们确保在给定的分区中,中 间 key/value pair 数据的处理顺序是按照 key 值增量顺序处理的。这样的顺序保证对 每个分成生成一个 有序的输 出文件 ,这对于 需要对 输出文件 按 key 值随机存取的应用 非常有意 义,对 在排序输 出的数 据集也很有帮助。 4.34.34.34.3、Combiner Combiner Combiner Combiner 函数 在某些情况下, Map 函数产生的中间 key 值的重复数据会占很大的比重,并且, 用 户自定义的 Reduce 函数满足 结 合律和交换 律。 在 2.1 节的词数统 计程序 是个 很好 的例子 。由 于词 频率倾 向于 一 个 zipf 分布 (齐夫分布 ),每个 Map 任 务将产生成 千上万 个这 样的 记录 。所有的这 些记录 将通 过网 络被发 送到 一个 单独 的 Reduce 任务,然后 由这 个 Reduce 任务把所有这些记录累加起来产生一个 数字。 我们允许用户指定一个可选的 combiner 函数, combiner 函数首 先 在本地将这些记录进行一次合并,然后 将合并的结果再 通过 网络发送出去。 Combiner 函数在每台 执 行 Map 任务的机器 上都会 被执 行一 次。一 般情 况下 , Combiner 和Reduce 函数是一样的。 Combiner 函数和 Reduce 函数之间唯一的区别是 MapReduce 库怎样控制函数的输出。 Reduce 函数的输出被保存在最 终 的输出文件里,而 Combiner 函数的输出被写到中间文件里,然后被 发送 给 Reduce 任务。 部分的合并中间结果可以显著的提高一 些 MapReduce 操作的速度。附录 A包含一个使用 combiner 函数的例子。 4.44.44.44.4、输 入和输出 的类型 MapReduce 库支持几种不同的格式的输入数据。比 如, 文 本模式的输入数据的每一行被视为 是一 个 key/value pair。 key 是文件的偏 移量, value 是那一行的 内容。 另外 一种 常见的 格式 是 以 key 进行排序来 存储 的 key/value pair 的序列。 每种输入类 型的实 现都 必须 能够把 输入 数据 分割成 数据 片段 ,该数 据片 段能 够由单 独 的 Map 任务来进行 后续处 理 (例 如, 文 本模式的范围分割必须确保仅仅在每行 的边界进行范围 分割 )。 虽 然大多数 MapReduce 的使用者仅仅使用很少 的 预定义输入类型就满足要求了,但是使用者依然 可以通过提供一个简单的 Reader 接口实现就能够支持一个新的输入 类 型。 Reader 并非一定要从文件中读取数据,比如, 我们可以很容易 的实现一个从数 据库里读记录 的 Reader,或者从内 存中的数据结构读取数据 的 Reader。 类似的,我们提供 了一些预 定义的 输出数据 的类型 ,通过这 些预定 义类型能 够产生 不同格式 的数据 。用户采 用类 似添加新的输入数据类型的方式增加新 的输出类型。 4.54.54.54.5、副 作用 在某些情况下, MapReduce 的使用者发现, 如果在 Map 和/或Reduce 操作过程中增加辅助的输出文件会比较 省事 。 我们依靠程 序 writer 把这种 “副作用 ”变成原子的 和幂等 的( alex 注:幂等的 指一个 总是 产生 相同结 果的 数学 运算) 。 通常应用程序首先把输出结果写到一个 临时文件中,在输出全部数据之后,在使用系统级的原子操作 rename 重新命 名 这个临时文件。 如果一个任务产生 了多个输 出文件 ,我们没 有提供 类似两阶 段提交 的原子操 作支持 这种情况 。因此 ,对于会 产生 多个输出文件、并 且对于跨 文件有 一致性要 求的任 务,都必 须是确 定性的任 务。但 是在实际 应用过 程中,这 个限制还 没有给我们带来过麻烦。 4.64.64.64.6、跳 过损坏的 记录 有时候, 用户程序中的 bug 导致 Map 或者 Reduce 函数在处理某些记录的时 候 crash 掉, MapReduce 操作无法顺 利 完成。惯常的做法是修复 bug 后再次执行 MapReduce 操作,但是,有时候找出这些 bug 并修复它们不是一件容易的 事 情;这些 bug 也许是在第三方库 里边,而 我们手 头没有这 些库的 源代码。 而且在 很多时候 ,忽略 一些有问 题的记 录也 是可以接受的,比 如在一个 巨大的 数据集上 进行统 计分析的 时候。 我们提供 了一种 执行模式 ,在这 种模式下 ,为了保 证保证整个处理能继续进行, MapReduce 会检测哪些记录导致确定性 的 crash,并且跳过这些记录不处理。 每个 worker 进程都设置了信号处理函数捕获内存段 异常( segmentation violation) 和总线错误 ( bus error)。在执行 Map 或者 Reduce 操作之前, MapReduce 库通过全局变量保存记录序号。 如果用户程序触发了一个系统信号, 消息处 理 函数将用 “最后一口气 ”通过 UDP 包向 master 发送处理的 最后一 条记 录的 序号。 当 master 看到在处理 某条特 定记录 不止失败一次时, master 就标志着条记录需要被跳过, 并且在下次重新执行相关的 Map 或者 Reduce 任务的时候跳过 这 条记录。 4.74.74.74.7、本 地执行 调试 Map 和Reduce 函数的 bug 是非常困难 的,因 为实 际执 行操作 时不 但是 分布在 系统 中执 行的, 而且 通常 是在 好几千台计算机上执行,具体的执行位 置是 由 master 进行动态调度的,这又大大增加了调试 的难度。为了简 化调试、 profile 和小规模测试, 我们开发了一套 MapReduce 库的本地实现版本, 通过使用本地版本的 MapReduce 库,MapReduce 操作在本地计算机上顺序的执行。用 户可以控制 MapReduce 操作的执行, 可以把操作限制到特定的 Map 任务上。 用 户 通过设定特别的标志来在本地执行他们 的程序,之后就 可以很容易的使用本地调试和测试工具 (比 如 gdb)。 4.84.84.84.8、状 态信息 master 使用嵌入式的 HTTP 服务器(如 Jetty)显示一组状态信 息页面, 用户可 以监控各 种执行 状态。状 态信息页 面显示了包括计算执行的进度,比如已经完成了多少任务、有多少任务正在处理、输入的字节数、中间数据的字节数 、 输出的字节数、 处 理百分比等等。 页 面还包含了指向每个任务 的 stderr 和stdout 文件的链接。 用 户根据这些数据预测 计 算需要执行大约多 长时间、 是否需 要增加额 外的计 算资源。 这些页 面也可以 用来分 析什么时 候计算 执行的比 预期的要 慢。 另外,处 于最 顶 层的 状态 页面 显 示了 哪 些 worker 失效了, 以及 他 们失 效的 时候 正 在运 行 的 Map 和Reduce 任务。 这些信息对于调试用户代码中 的 bug 很有帮助。 4.94.94.94.9、计 数器 MapReduce 库使用计数器统计 不同事件 发生次 数。比如 ,用户 可能想统 计已经 处理了多 少个单 词、已经 索引的多 少篇 German 文档等等。 为了使用这个特性,用户在程序中创建一个命名 的计数器对象, 在 Map 和Reduce 函数中相应的增加计数器的值 。 例如: Counter* uppercase; uppercase = GetCounter("uppercase"); map(String name, String contents): for each word w in contents: if (IsCapitalized(w)): uppercase->Increment(); EmitIntermediate(w, "1"); 这些计数器的值周 期性的从 各个单 独 的 worker 机器上传递给 master(附加在 ping 的应答包中传递) 。 master 把执 行成功的 Map 和Reduce 任务的计数器值进行累计, 当 MapReduce 操作完成之后,返回给用户代码。 计数器当前的值也会显示 在 master 的状态页面上,这样用户就可以看到当 前计算的进度。 当累加计数器的 值的时 候,master 要检查重复运行的 Map 或者 Reduce 任务, 避 免重复累加 ( 之前提到的备用任务和失效后重新执行任务 这 两 种情况会导致相同的任务被多次执行) 。 有些计数器的值是由 MapReduce 库自动维持的, 比如已经处理的输入的 key/value pair 的数量、 输 出的 key/value pair 的数量等等。 计数器机制对于 MapReduce 操作的完整性检查非常有用。 比如, 在 某些 MapReduce 操作中, 用 户需要确保输出 的 key value pair 精确的等于输入的 key value pair,或者处理的 German 文档数量在处理的整个文档数量中属于 合理范围。 5555、性 能 本节我们用在一个大型集群上运行的两 个计算来衡 量 MapReduce 的性能。 一个计算在大约 1TB 的数据中进行特 定 的模式匹配,另一个计算对大 约 1TB 的数据进行排序。 这两个程序在大量 的使 用 MapReduce 的实际应用中是非 常典型的 -- 一类是对数据格式 进行转换 ,从一 种表现形 式转换为另外一种表现形式;另一类是 从海量数据中抽 取少部分的用户 感兴趣的数据。 5.15.15.15.1、集 群配置 所有这些程序都运行在一个大约 由 1800 台机器构成的集群上。 每 台机器配置 2个2G 主频、 支 持超线程的 Intel Xeon 处理器,4GB 的物理内存, 两个 160GB 的IDE硬盘和一个千兆以太网卡。 这 些机器部署在一个两层的树形交换网 络 中, 在root 节点大概有 100-200GBPS 的传输带宽。所有 这些机器 都采用 相同的部 署(对 等部署) , 因此任意两点之间 的网 络来回时间小于 1毫秒。 在4GB 内存里,大概有 1-1.5G 用于运行在集群上的其他任务。测试程序在周末下午开 始执行,这时主机 的 CPU、 磁盘和网络基本上处于空闲状态。 5.25.25.25.2、GREPGREPGREPGREP 这个分布式的 grep 程序需要扫描大概 10 的10 次方个由 100 个字节组成的记录,查找出现概率较小 的 3个字符的 模式(这个模式 在 92337 个记录中出现) 。输入数据被拆分成大 约 64M 的Block(M=15000) , 整个输出数据存放 在一 个文件中( R=1)。 图2显示了这个运算随时间的处理过程。其中 Y轴表示输入数据的处理速度。 处 理速度随着参与 MapReduce 计算 的机器数量的增加而增加,当 1764 台worker 参与计算的时,处理速度达到了 30GB/s。当 Map 任务结束的时候,即 在 计算开始后 80 秒,输入的处理速度降到 0。整个计算过程从开始到结束一共花了 大 概 150 秒。这包括了大约一分钟的 初始启动阶段。 初 始启动阶段消耗的时间包括了是把这个程序 传送到各 个 worker 机器上的时间、 等待 GFS 文件系统 打 开1000 个输入文件集合的时间、获取相关的文 件本地位置优化 信息的时间。 5.35.35.35.3、排 序 排序程序处理 10 的10 次方个 100 个字节组成的记录(大概 1TB 的数据) 。 这个程序模仿 TeraSort benchmark[10]。 排序程序由不 到 50 行代码组成。只有 三行 的 Map 函数从文本行中解 析 出 10 个字节的 key 值作为排序的 key,并 且把这个 key 和原始文本行作为中间的 key/value pair 值输出。我们使用了一个内置的恒等函 数作 为 Reduce 操作函数。 这个函数把中间的 key/value pair 值不作任何改变输出。最 终排序结果输出到两路复制 的 GFS 文件系统 (也就是说, 程 序输出 2TB 的数据)。 如前 所述,输入数据被分成64MB 的Block(M=15000) 。我们 把 排 序 后 的 输 出 结 果 分 区 后 存 储 到 4000 个文 件 (R=4000) 。 分区函数使用 key 的原始字节来把数据分区 到 R个片段中。 在这个 benchmark 测试中,我们使用 的分区函 数知 道 key 的分区情况。通常 对于排序 程序来 说,我们 会增加 一个 预处理的 MapReduce 操作用于采样 key 值的分布情况,通过采样的数据来计算 对最终排序处理 的分区点。 图三 ( a) 显示了这个排序程序的正常执行过程。左上的图显示了输 入数据读取的速 度。数据读取速度峰值会达 到 13GB/s,并且所有 Map 任务完成之后,即大约 200 秒之后迅速滑落到 0。值得注意的是,排序程序输入数据读 取速度 小于分布式 grep 程序。 这 是因为排序程序的 Map 任务花了大约一半的处理时间 和 I/O 带宽把中间输出结果写到本地 硬 盘。相应的分布式 grep 程序的中间结果输出几乎可以忽略不计 。 左边中间的图显示了中间数据 从 Map 任务发送到 Reduce 任务的网络速度。这个过程从第一 个 Map 任务完成之后 就开始缓 慢启 动 了。 图示 的第 一 个高 峰是 启动 了 第一 批大 概 1700 个Reduce 任务(整 个 MapReduce 分布到大 概 1700 台机器上,每 台机器 1次最多执行 1个Reduce 任务) 。 排 序程序运行大约 300 秒后, 第一批启动的 Reduce 任务有些 完 成了,我们开始执行剩下 的 Reduce 任务。所有的处理在大约 600 秒后结束。 左下图表示 Reduce 任务把排序后的数据写到最终的输出文 件的速度。 在 第一个排序阶段结束和数据开始写 入磁 盘 之间有一个小的延时,这是因 为 worker 机器正在忙于排序中间数据。磁盘写入 速度 在 2-4GB/s 持续一段时间。输出数 据写入磁盘大约持 续 850 秒。计入初始启动 部分的时 间,整 个运算消 耗 了 891 秒。这个速度 和 TeraSort benchmark[18] 的最高纪录 1057 秒相差不多。 还有一些值得注意 的现象: 输入数 据的读取 速度比 排序速度 和输出 数据写入 磁盘速 度要高不 少,这 是因为我 们的 输入数据本地化优化策略起了作用 -- 绝大部分数据都是从本地硬盘读取的, 从 而节省了网络带宽。 排 序速度比输出 数 据写入到磁盘的速度快,这是因为输出数据写 了两份(我们使用 了 2路的 GFS 文件系统,写入复制节点的原因是为 了 保证数据可靠性和可用性) 。 我们把输出数据写入到两个复制节点的原因是因 为这是底层文件 系统的保证数据 可靠性 和 可用性的实现机制。如果底层文件系统 使用类似容错编 码 [14](erasure coding)的方式 而不是复制的方式保证数据的可靠 性和可用性,那么在输出数据写入磁盘 的时候,就可以 降低网络带宽的 使用。 5.45.45.45.4、高 效 的 backup backup backup backup 任务 图三 ( b) 显示了关闭了备用任务后排序程序执行情况。执行的过程和图 3(a) 很相似, 除 了输出数据写磁盘的 动 作在时间上拖了一个很长的尾巴,而且在这段时间里,几乎没有什么写入动作。在 960 秒后, 只有 5个Reduce 任务 没 有完成。这些拖后腿的任务又执行 了 300 秒才完成。整个计算消耗 了 1283 秒,多了 44%的执行时间。 5.55.55.55.5、失 效的机器 在图三 ( c) 中 演示的排序程序执行的过程中,我们在程序开始后几分钟有意的 kill 了1746 个worker 中的 200 个。 集群底层的调度立刻在这些机器上重新 开始新 的 worker 处理进程 ( 因为只是 worker 机器上的处理进程被 kill 了, 机 器 本身还在工作)。 图三 ( c) 显 示出了一个 “负”的输入数据读取速度,这 是因为一些已经完成 的 Map 任务丢失了( 由于相应的执 行 Map 任务的 worker 进程被 kill 了) ,需要重新执行这些任务。 相 关 Map 任务很快就被重新 执行了。 整个运 算 在 933 秒 内完成,包括了初始启动时间(只比正 常执行多消耗 了 5%的时间)。 6666、经 验 我们在 2003 年1月完成了第一个版本的 MapReduce 库,在 2003 年8月的版本有了显著的增强,这包括了输入 数 据本地优化、 worker 机器之间的动态负载均衡等等。从那以 后,我们惊喜的 发现, MapReduce 库能广泛应用于我们日 常工作中遇到的各类问题。它现在 在 Google 内部各个领域得到广泛应用,包括: 大规模机器学习问题 Google News 和Froogle 产品的集群问题 从公众查询产品(比如 Google 的Zeitgeist)的报告中抽取数据。 从大量的新应用和新产品的网页中提取 有用信息(比如 ,从大量的位置 搜索网页中抽取 地理位置信息) 。 大规模的图形计算。 图四显示了 在我们 的源 代码 管理系 统中 ,随 着时间 推移 ,独 立 的 MapReduce 程序数量的 显著增 加。 从 2003 年早 些时候的 0个增长到 2004 年9月份的差不 多 900 个不同的程 序。 MapReduce 的成功取决 于采 用 MapReduce 库能够在 不到半个小时时间 内写出一 个简单 的程序, 这个简 单的程序 能够在 上千台机 器的组 成的集群 上做大 规模并发 处理,这 极大的加快了开发和原形设计的周期。另外, 采用 MapReduce 库, 可以让完全没有分布式和 /或并行系统开发经验的 程 序员很容易的利用大量的资源,开发出 分布式和 /或并行处理的应用。 在每个任务结束的时候, MapReduce 库统计计算资源的使用状况。在表 1, 我们列出了 2004 年8月份 MapReduce 运行的任务所占用的相关资源。 6.16.16.16.1、大 规模索引 到目前为止, MapReduce 最成功的应用就是重写了 Google 网络搜索服务所使用到的 index 系统。索引系统的输入 数据是网络 爬虫抓 取回 来的 海量的 文档 ,这 些文档 数据 都保 存 在 GFS 文件系统里 。这些 文档 原始 内容( alex 注: raw contents,我认为就 是网页 中的 剔 除 html 标记后的内 容、 pdf 和word 等有格式文 档中提 取的 文本 内容等 )的 大小 超过 了20TB。 索引程序是通过一系列 的 MapReduce 操作 (大约 5到10 次) 来建立索引。使用 MapReduce(替换上一个 特 别设计的、分布式处理的索引程序)带 来这些 好处: 实现索引部分的代码简单、小巧、容易理解,因为对 于容错、分布式以及 并行计算的处理 都 是 MapReduce 库提 供 的。比如,使用 MapReduce 库,计算的代码行数从原来 的 3800 行C++代码减少到大概 700 行代码。 MapReduce 库的性能已经足够 好了,因 此我们 可以把在 概念上 不相关的 计算步 骤分开处 理,而 不是混在 一起以期 减少数据传递的额 外消耗。 概念上 不相关的 计算步 骤的隔离 也使得 我们可以 很容易 改变索引 处理方 式。比如 ,对之前 的索引系统的一个小更改可能要耗费好 几个月的时间,但是在使 用 MapReduce 的新系统上,这样的更改只需要花几 天 时间就可以了。 索引系统的操作管 理更容易 了。因 为由机器 失效、 机器处理 速度缓 慢、以及 网络的 瞬间阻塞 等引起 的绝大部 分问 题都已经由 MapReduce 库解决了,不再需要操作人员的介入了。另外,我们可以 通过在索引系统 集群中增加机器 的 简 单方法提高整体处理性能。 7777、相 关工作 很多系统都提供了 严格的编 程模式 ,并且通 过对编 程的严格 限制来 实现并行 计算。 例如,一 个结合 函数可以 通过 把N个元素的数组的前缀在 N个处理器上使用并行前缀算法,在 log N的时间内计算完 [6,9,13](alex 注: 完 全没 有 明白作者在说啥,具 体参考相关 6、9、13 文档)。MapReduce 可以看作是我们结合在真实环境下处理 海量数据的经验 , 对这些经典模型进 行简化和 萃取的 成果。更 加值得 骄傲的是 ,我们 还实现了 基于上 千台处理 器的集 群的容错 处理。相 比而言,大部分并发处理系统都只在小 规模的集群上实 现,并且把容错 处理交给了程序 员。 Bulk Synchronous Programming[17]和一些 MPI 原语 [11]提供了更高级别的并行处理抽象, 可以更容易写出并行处 理 的程序。 MapReduce 和这些系统的关键不同之处在于, MapReduce 利用限制性编程模式实现了用户程序的 自动并发处 理,并且提供了透明的容错处理。 我们数据本地优化 策略的灵 感来源 于 active disks[12,15]等技术,在 active disks 中,计算任务是尽 量推送到 数据存 储的节点处理( alex 注:即靠近数据源 处理) , 这样就减少了网络 和 IO子系统的吞吐量。 我们在挂 载几个 硬盘的普 通 机器上执行我们的运算,而不是在磁盘 处理器上执行我 们的工作,但是 达到的目的一样 的。 我们的备用任务机制和 Charlotte System[3]提出的 eager 调度机制比较类似。 Eager 调度机制的一个缺点是如果一 个 任务反复失效,那么整个计算就不能完 成。我们通过忽 略引起故障的记 录的方式在某种 程度上解决了这 个问题。 MapReduce 的实现依赖于一个 内部的集 群管理 系统,这 个集群 管理系统 负责在 一个超大 的、共 享机器的 集群上分 布和运 行 用户 任 务。 虽 然 这个 不 是本 论 文 的重 点 ,但 是 有 必要 提 一下 , 这 个集 群 管理 系 统 在理 念 上和 其 它 系统 , 如 Condor[16]是一样。 MapReduce 库的排序机制 和 NOW-Sort[1]的操作上很类似。 读取输入 源的机 器( map workers)把待排序的数据进 行分区后,发送到 R个Reduce worker 中的一个进行处理。每个 Reduce worker 在本地对数据进行排序(尽可能在内 存 中排序)。当然,NOW-Sort 没有给用户自定义的 Map 和Reduce 函数的机会, 因 此不具备 MapReduce 库广泛的实用性 。 River[2]提供了一个编程模型: 处 理进程通过分布式队列传送数据的方 式进行互相通讯。 和 MapReduce 类似,River 系统尝试在不对等的硬件环境下,或者在系统颠簸的情况下也能提供近似平均的 性能。 River 是通过精心调度硬盘和 网 络的通讯来平衡任务的完成时间。 MapReduce 库采用了其它的方法。通过对编程模型 进行限制, MapReduce 框架把问 题分解成为大量的 “小”任务。 这 些任务在可用的 worker 集群上动态的调度,这 样快速的 worker 就可以执行更多的 任 务。通过对编程模 型进行限 制,我 们可用在 工作接 近完 成的时候调度备用 任务,缩 短在硬 件配置不 均衡的 情况下缩 小 整个操作完成的时间(比如有的机器性 能差、或者机器 被某些操作阻塞 了) 。 BAD-FS[5]采用了和 MapReduce 完全不同的 编程模 式, 它是 面向广 域网 ( alex 注: wide-area network)的。不过, 这两个系统有两个 基础功能 很类似。 ( 1)两个系统采用重 新执行的 方式来 防止由于 失效导 致的数据 丢失。 ( 2)两个都 使用数据本地化调度策略,减少网络通 讯的数据量。 TACC[7]是一个用于简化构 造高可用 性网络 服务的系 统。 和 MapReduce 一样,它也依靠重 新执行机 制来实 现的容 错处理。 8888、结 束语 MapReduce 编程模型在 Google 内部成功应用于多个领域。 我们 把这种成功归结为几个方面: 首先, 由于 MapReduce 封装了并行处理、容错处理、数据本地化优化、负载 均衡等等技术难 点的细节,这使 得 MapReduce 库易于使用。即 便 对于完全没有并行或者分布式系统开发 经验的程序员而 言;其次,大量不同类型的问题都 可以通 过 MapReduce 简单 的 解决。比如, MapReduce 用于生成 Google 的网络搜索服务所 需要的数 据、用 来排序、 用来数 据挖掘、 用于机 器学习, 以及很多其它的系统;第三,我们实现 了一个在数千台 计算 机组成的大型集群上灵活部署运行 的 MapReduce。这个实 现使得有效利用这些丰富的计算资源变 得非常简单, 因 此也适合用来解决 Google 遇到的其他很多需要大量计算的问 题。 我们也从 MapReduce 开发过程中学到了不少东西。首先,约束编程模式使 得并行和分布式 计算非常容易,也易 于 构造容错的计算环 境;其次 ,网络 带宽是稀 有资源 。大量的 系统优 化是针对 减少网 络传输量 为目的 的:本地 优化策略 使大量的数据从本 地磁盘读 取,中 间文件写 入本地 磁盘、并 且只写 一份中间 文件也 节约了网 络带宽 ;第三, 多次执行 相同的任务可以减少性能缓慢的机器带 来的负面影响( alex 注: 即硬件配置的不平衡) , 同 时解决了由于机器失效导 致 的数据丢失问题。 9999、感 谢 (alex 注: 还 是原汁原味的感谢词比较好, 这个就不翻译了) Josh Levenberg has been instrumental in revising and extending the user-level MapReduce API with a number of new features based on his experience with using MapReduce and other people's suggestions for enhancements. MapReduce reads its input from and writes its output to the Google File System [8]. We would like to thank Mohit Aron, Howard Gobioff, Markus Gutschke, David Kramer, Shun-Tak Leung, and Josh Redstone for their work in developing GFS. We would also like to thank Percy Liang and Olcan Sercinoglu for their work in developing the cluster management system used by MapReduce. Mike Burrows, Wilson Hsieh, Josh Levenberg, Sharon Perl, Rob Pike, and Debby Wallach provided helpful comments on earlier drafts of this paper.The anonymous OSDI reviewers, and our shepherd, Eric Brewer, provided many useful suggestions of areas where the paper could be improved. Finally, we thank all the users of MapReduce within Google's engineering organization for providing helpful feedback, suggestions, and bug reports. 10101010、参 考资料 [1] Andrea C. Arpaci-Dusseau, Remzi H. Arpaci-Dusseau,David E. Culler, Joseph M. Hellerstein, and David A. Patterson.High- performance sorting on networks of workstations.In Proceedings of the 1997 ACMSIGMOD InternationalConference on Management of Data, Tucson,Arizona, May 1997. [2] Remzi H. Arpaci-Dusseau, Eric Anderson, NoahTreuhaft, David E. Culler, Joseph M. Hellerstein, David Patterson, and Kathy Yelick. Cluster I/O with River:Making the fast case common. In Proceedings of the Sixth Workshop on Input/Output in Parallel and Distributed Systems (IOPADS'99), pages 10.22, Atlanta, Georgia, May 1999. [3] Arash Baratloo, Mehmet Karaul, Zvi Kedem, and Peter Wyckoff. Charlotte: Metacomputing on the web. In Proceedings of the 9th International Conference on Parallel and Distributed Computing Systems, 1996. [4] Luiz A. Barroso, Jeffrey Dean, and Urs H¨olzle. Web search for a planet: The Google cluster architecture. IEEE Micro, 23(2):22.28, April 2003. [5] John Bent, Douglas Thain, Andrea C.Arpaci-Dusseau, Remzi H. Arpaci-Dusseau, and Miron Livny. Explicit control in a batch-aware distributed file system. In Proceedings of the 1st USENIX Symposium on Networked Systems Design and Implementation NSDI, March 2004. [6] Guy E. Blelloch. Scans as primitive parallel operations.IEEE Transactions on Computers, C-38(11), November 1989. [7] Armando Fox, Steven D. Gribble, Yatin Chawathe, Eric A. Brewer, and Paul Gauthier. Cluster-based scalable network services. In Proceedings of the 16th ACM Symposium on Operating System Principles, pages 78. 91, Saint-Malo, France, 1997. [8] Sanjay Ghemawat, Howard Gobioff, and Shun-Tak Leung. The Google file system. In 19th Symposium on Operating Systems Principles, pages 29.43, Lake George, New York, 2003. To appear in OSDI 2004 12 [9] S. Gorlatch. Systematic efficient parallelization of scan and other list homomorphisms. In L. Bouge, P. Fraigniaud, A. Mignotte, and Y. Robert, editors, Euro-Par'96. Parallel Processing, Lecture Notes in Computer Science 1124, pages 401.408. Springer-Verlag, 1996. [10] Jim Gray. Sort benchmark home page. http://research.microsoft.com/barc/SortBenchmark/. [11] William Gropp, Ewing Lusk, and Anthony Skjellum. Using MPI: Portable Parallel Programming with the Message-Passing Interface. MIT Press, Cambridge, MA, 1999. [12] L. Huston, R. Sukthankar, R.Wickremesinghe, M. Satyanarayanan, G.R. Ganger, E. Riedel, and A. Ailamaki. Diamond: A storage architecture for early discard in interactive search. In Proceedings of the 2004 USENIX File and Storage Technologies FAST Conference, April 2004. [13] Richard E. Ladner and Michael J. Fischer. Parallel prefix computation. Journal of the ACM, 27(4):831.838, 1980. [14] Michael O. Rabin. Efficient dispersal of information for security, load balancing and fault tolerance. Journal of the ACM, 36(2):335.348, 1989. [15] Erik Riedel, Christos Faloutsos, Garth A. Gibson, and David Nagle. Active disks for large-scale data processing. IEEE Computer, pages 68.74, June 2001. [16] Douglas Thain, Todd Tannenbaum, and Miron Livny. Distributed computing in practice: The Condor experience. Concurrency and Computation: Practice and Experience, 2004. [17] L.G. Valiant. A bridging model for parallel computation. Communications of theACM, 33(8):103.111, 1997. [18] Jim Wyllie. Spsort: How to sort a terabyte quickly. http://alme1.almaden.ibm.com/cs/spsort.pdf. 附录AAAA、单 词频率统 计 本节包含了一个完整的程序,用于统计 在一组命令行指 定的输入文件中 ,每一个不同的 单词出现频率。 #include "mapreduce/mapreduce.h" // User's map function class WordCounter : public Mapper { public: virtual void Map(const MapInput& input) { const string& text = input.value(); const int n = text.size(); for (int i = 0; i < n; ){ // Skip past leading whitespace while ((i < n) && isspace(text[i])) i++; // Find word end int start = i; while ((i < n) &&!isspace(text[i])) i++; if (start < i) Emit(text.substr(start,i-start),"1"); } } }; REGISTER_MAPPER(WordCounter); // User's reduce function class Adder : public Reducer { virtual void Reduce(ReduceInput* input) { // Iterate over all entries with the // same key and add the values int64 value = 0; while (!input->done()) { value += StringToInt(input->value()); input->NextValue(); } // Emit sum for input->key() Emit(IntToString(value)); } }; REGISTER_REDUCER(Adder); int main(int argc, char** argv) { ParseCommandLineFlags(argc, argv); MapReduceSpecification spec; // Store list of input files into "spec" for (int i = 1; i < argc; i++) { MapReduceInput* input = spec.add_input(); input->set_format("text"); input->set_filepattern(argv[i]); input->set_mapper_class("WordCounter"); } // Specify the output files: ///gfs/test/freq-00000-of-00100 ///gfs/test/freq-00001-of-00100 //... MapReduceOutput* out = spec.output(); out->set_filebase("/gfs/test/freq"); out->set_num_tasks(100); out->set_format("text"); out->set_reducer_class("Adder"); // Optional: do partial sums within map // tasks to save network bandwidth out->set_combiner_class("Adder"); // Tuning parameters: use at most 2000 // machines and 100 MB of memory per task spec.set_machines(2000); spec.set_map_megabytes(100); spec.set_reduce_megabytes(100); // Now run it MapReduceResult result; if (!MapReduce(spec, &result)) abort(); // Done: 'result' structure contains info // about counters, time taken, number of // machines used, etc. return 0; }
还剩13页未读

继续阅读

下载pdf到电脑,查找使用更方便

pdf的实际排版效果,会与网站的显示效果略有不同!!

需要 15 金币 [ 分享pdf获得金币 ] 65 人已下载

下载pdf

pdf贡献者

fmms

贡献于2010-11-02

下载需要 15 金币 [金币充值 ]
亲,您也可以通过 分享原创pdf 来获得金币奖励!
下载pdf