• 1. MapReduce与Hadoop
  • 2. 大多数运算所包含的操作在输入数据的“逻辑”记录上应用Map操作得出一个中间Key/value pair集合在所有具有相同key值的value值上应用Reduce操作,从而达到合并中间的数据,得到一个想要的结果的目的
  • 3. 编程模型编程模型的原理利用一个输入key/value pair集合来产生一个输出的key/value pair集合。用两个函数表达这个运算MapReduce
  • 4. …MapReduce是一种编程模型,用于大规模数据集(>1TB)的并行运算“映射”(map)和“化简”(reduce)等概念和它们的主要思想都是从函数式编程语言中借来的数据片段数据片段数据片段(key,value)(key,value)(key,value)(key,value)(key,value)(key,value)(key,value_list)(key,value_list)(key,value_list)(key,value)(key,value)(key,value)data data data ...输入数据数据分割mapshufflereduce输出数据…………MapReduce编程模型
  • 5. Map-reduce Programming ParadigmMap-reduce是一种适合分布式计算的编程范式input | map | shuffle | reduce | output最简单的实现方式cat * | grep ‘java’| sort | uniq -c | cat > file实现这种编程范式的有GoogleHadoopOracleTeradata……
  • 6. MapReduce实现中的全部流程User ProgramMasterworkerworkerworkerworkerworkersplit 0split 1split 2split 3split 4output file 0output file 1Input filesMap phaseIntermediate files (on local disks)Reduce phaseOutput files(1)fork(1)fork(2)assign map(2)assign map(3)read(4)local write(5)remote read(6) write
  • 7. (本页无文本内容)
  • 8. Hadoop的构造模块“运行Hadoop”意味着在网络分布的不同服务器上运行一组守护进程(daemons)。NameNode(名字节点)DataNode(数据节点)Secondary NameNode(次名字节点)JobTracker(作业跟踪节点)TaskTracker(任务跟踪节点)
  • 9. HDFS体系结构 NameNodeMaster,主服务器DataNode管理存储的数据HDFS允许用户以文件的形式存储数据机架2机架1NameNodeDataNodeDataNodeDataNodeDataNodeDataNodeDataNode客户端客户端数据请求read块信息备份write
  • 10. Jobtracker与tasktracker体系结构
  • 11. 输入数据ReduceMapReduceMap节点1节点2节点1节点2输入数据分布在节点上每个map任务处理一个数据分片Mapper输出中间数据节点间的数据交换在“洗牌”阶段完成相同key的中间数据进入相同的reducer存储Reducer的输出普通的MapReduce数据流
  • 12. INININININININININoutoutoutMapReduceKey = 形状 Value = 图案MapReduce数据流重点说明了“分区”和“洗牌”
  • 13. WordCount设计思路文件内容切分成单词(Map阶段)内容切分步骤和数据不相关,可以并行化处理每个拿到原始数据的机器只要将输入数据切分成单词就可以将所有相同的单词聚集在一块(Shuffle阶段)计算单词出现的次数并输出(Reduce阶段)相同词的频数计算可以并行化处理将相同词的频数计算交给一台机器来计算,然后输出结构
  • 14. Hadoop Map/Reduce(input) -> map -> -> combine* -> -> reduce -> (output) combine 过程可能没有,也可能有多次
  • 15. WordCount代码解读
  • 16. wordcount类public class WordCount { public static class TokenizerMapper; public static class IntSumReducer ; public static void main(String[] args); }
  • 17. Map类
  • 18. Reduce类 public static class IntSumReducer extends Reducer { private IntWritable result = new IntWritable();   public void reduce(Text key, Iterable values, Context context ) throws IOException, InterruptedException { int sum = 0; for (IntWritable val : values) { sum += val.get(); } result.set(sum); context.write(key, result); } }  
  • 19. Main函数public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs(); if (otherArgs.length != 2) { System.err.println("Usage: wordcount "); System.exit(2); } Job job = new Job(conf, "word count"); job.setJarByClass(WordCount.class); job.setMapperClass(TokenizerMapper.class); job.setCombinerClass(IntSumReducer.class); job.setReducerClass(IntSumReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); FileInputFormat.addInputPath(job, new Path(otherArgs[0])); FileOutputFormat.setOutputPath(job, new Path(otherArgs[1])); System.exit(job.waitForCompletion(true) ? 0 : 1); }