MapReduce 图解流程超详细解答(2)-【map阶段】

jopen 8年前

接上一篇讲解:http://blog.csdn.net/mrcharles/article/details/50465626

map任务:溢写阶段

Spilling phase


正如我们在执行阶段看到的一样,map会使用Mapper.Context.write()将map函数的输出溢写到内存中的环形缓冲区 (MapTask.MapOutputBuffer)。缓冲区的大小是固定的,通过mapreduce.task.io.sort.mb (default: 100MB)指定。
任何时候当这个缓冲区将要充满的时候(mapreduce.map. sort.spill.percent: 默认80% ),溢写将会被执行(这是一个并行过程,使用的是单独的线程,缓冲池还可以继续被写入)。如果溢写线程太慢,而缓冲区又忙了的话,map()就会暂停执行而等待。
溢写线程执行下面的动作:
  1. 创建一个溢写记录SpillRecord 和一个FSOutputStream 文件输出流(本地文件系统)
  2. 内存内排序缓冲中的块:输出的数据会使用快排算法按照partitionIdx, key排序
  3. 排序之后的输出会分割成为分区:每一个分区对应一个reduce
  4. 分区序列化写到本地文件

有多少个reduce任务呢?

一个job的ReduceTasks 的数量是通过配置mapreduce.job.reduces参数设置的

一个输出元组的分割指数是多少?

输出元组的分割指数指的是分区的指数。在 Mapper.Context.write()内部被指定:

partitionIdx = (key.hashCode() & Integer.MAX_VALUE) % numReducers
随着输出元组以元数据的形式保存在环形缓冲区。用户可以通过配置mapreduce.job.partitioner.class参数自己定制partitioner 

我们什么时候运行combiner

如果用户制定了一个特定的combiner ,那么,在溢写线程写出到文件之前,会在每一个分区含有元组数据的地方执行combiner 通常,我们做了如下事情:

  1. 创建一个用户指定的Reducer.class实例(用户指定的combiner 
  2. 创建一个Reducer.Context:输出将会保存在本地文件系统
  3. 运行Reduce.run():请看reduce 任务的描述

Map任务:执行结束

在执行阶段结束的时候,溢写线程最后被触发,细节上,我们作如下事情:

  1. 排序,溢写最后没有被溢写的元组数据
  2. 开始SHUFFLE 相
注意,每一次环形缓冲区将要慢时,达到溢写的百分比的时候,我们就会得到一个溢写文件(溢写记录+输出文件)。每一个溢写文件包含若干分区(分段)

map执行后时期:洗牌shuffle

Hadoop (MapReduce): MapTask - Shuffle

在一个map任务完成的最后时刻,所有的溢写文件会被合并一个分区文件,与相应的reducer对应。mapreduce.io.sort.merge参数控制合并流一次的数量,默认是100。如果至少有3个溢写文件,combiner将会再次执行。如果只有一个或者两个溢写文件,再次执行combiner从而减少map输出的数据量的大小已经没有什么效果,没有必要再次执行combiner了。分区输出文件通过http的方式提供给reducer。



Hadoop (MapReduce): MapTask - Shuffle (2)

Merger合并

Hadoop (MapReduce): Merger

略粗糙,望帮助大家

Charles 于2016-01-06  Phnom Penh



版权说明:
本文由Charles Dong原创,本人支持开源以及免费有益的传播,反对商业化谋利。
CSDN博客:http://blog.csdn.net/mrcharles
</div>
个人站:http://blog.xingbod.cn
EMAIL:charles@xingbod.cn

</div> </div>