• 1. Hadoop下MapReduce编程清华大学计算机系高性能所 2009年冬
  • 2. 内容MapReduce概述 MapReduce的数据流 检查点 相关编程要点
  • 3. Map过程Map过程通过在输入列表中的每一项执行函数,生成一系列的输出列表。
  • 4. Reduce过程Reduce过程在一个输入的列表进行扫描工作,随后生成一个聚集值,作为最后的输出
  • 5. MapReduce的Reduce过程所有不同的颜色代表不同的键值(keys)。所有相同键值的列表被输入到同一个Reduce任务中
  • 6. ComponentsMapper public static class TokenizerMapper extends Mapper Reducer public static class IntSumReducer extends Reducer Driver import org.apache.hadoop.mapreduce.Job; 具体分析一个WordCount程序 WordCount.java
  • 7. MapReduce的数据流程
  • 8. MapReduce的执行过程
  • 9. Input Files输入文件一般 保存在HDFS中 文件的类型不固定,可能是文本的,也有可能是其它形式的文件 文件经常很大,甚至有几十个GB
  • 10. InputFormat定义了这些文件如何分割,读取 InputFile提供了以下一些功能 选择文件或者其它对象,用来作为输入 定义InputSplits,将一个文件分开成为任务 为RecordReader提供一个工厂,用来读取这个文件 有一个抽象的类FileInputFormat,所有的输入格式类都从这个类继承这个类的功能以及特性。当启动一个Hadoop任务的时候,一个输入文件所在的目录被输入到FileInputFormat对象中。FileInputFormat从这个目录中读取所有文件。然后FileInputFormat将这些文件分割为一个或者多个InputSplits。 通过在JobConf对象上设置JobConf.setInputFormat设置文件输入的格式
  • 11. 预定义的文件输入格式InputFormat:Description:Key:Value:TextInputFormatDefault format; reads lines of text filesThe byte offset of the lineThe line contentsKeyValueInputFormatParses lines into key, val pairsEverything up to the first tab characterThe remainder of the lineSequenceFileInputFormatA Hadoop-specific high-performance binary formatuser-defineduser-defined
  • 12. 各种InputFormatTextInputFormat,默认的格式,每一行是一个单独的记录,并且作为value,文件的偏移值作为key KeyValueInputFormat,这个格式每一行也是一个单独的记录,但是Key和Value用Tab隔开,是默认的OutputFormat,可以作为中间结果,作为下一步MapReduce的输入。 SequenceFileInputFormat 基于块进行压缩的格式 对于几种类型数据的序列化和反序列化操作 用来将数据快速读取到Mapper类中
  • 13. InputSplitsInputSplit定义了输入到单个Map任务的输入数据 一个MapReduce程序被统称为一个Job,可能有上百个任务构成 InputSplit将文件分为64MB的大小 hadoop-site.xml中的mapred.min.split.size参数控制这个大小 mapred.tasktracker.map.taks.maximum用来控制某一个节点上所有map任务的最大数目
  • 14. RecordReaderInputSplit定义了一项工作的大小,但是没有定义如何读取数据 RecordReader实际上定义了如何从数据上转化为一个(key,value)对,从而输出到Mapper类中 TextInputFormat提供了LineRecordReader
  • 15. Mapper每一个Mapper类的实例生成了一个Java进程(在某一个InputSplit上执行) 有两个额外的参数OutputCollector以及Reporter,前者用来收集中间结果,后者用来获得环境参数以及设置当前执行的状态。 现在用Mapper.Context提供给每一个Mapper函数,用来提供上面两个对象的功能
  • 16. Partition&Shuffle在Map工作完成之后,每一个 Map函数会将结果传到对应的Reducer所在的节点,此时,用户可以提供一个Partitioner类,用来决定一个给定的(key,value)对传输的具体位置
  • 17. Sort传输到每一个节点上的所有的Reduce函数接收到得Key,value对会被Hadoop自动排序(即Map生成的结果传送到某一个节点的时候,会被自动排序)
  • 18. Reduce做用户定义的Reduce操作 接收到一个OutputCollector的类作为输出 最新的编程接口是Reducer.Context
  • 19. OutputFormat写入到HDFS的所有OutputFormat都继承自FileOutputFormat 每一个Reducer都写一个文件到一个共同的输出目录,文件名是part-nnnnn,其中nnnnn是与每一个reducer相关的一个号(partition id) FileOutputFormat.setOutputPath() JobConf.setOutputFormat()
  • 20. Output FormatOutputFormat:DescriptionTextOutputFormatDefault; writes lines in "key \t value" formSequenceFileOutputFormatWrites binary files suitable for reading into subsequent MapReduce jobsNullOutputFormatDisregards its inputs
  • 21. RecordWriter用来指导如何输出一个记录到文件中
  • 22. Combiner
  • 23. Combinerconf.setCombinerClass(Reduce.class); 是在本地执行的一个Reducer,满足一定的条件才能够执行。
  • 24. 容错由Hadoop系统自己解决 主要方法是将失败的任务进行再次执行 TaskTracker会把状态信息汇报给JobTracker,最终由JobTracker决定重新执行哪一个任务 为了加快执行的速度,Hadoop也会自动重复执行同一个任务,以最先执行成功的为准(投机执行) mapred.map.tasks.speculative.execution mapred.reduce.tasks.speculative.execution
  • 25. Chaining JobsMap1 -> Reduce1 -> Map2 -> Reduce2 -> Map3... 生成多个JobClient.runJob(),等待上一个任务结束,并且开始下一个任务 org.apache.hadoop.mapred.jobcontrol.Job 可以做一些任务控制 x.addDependingJob(y),x只有在y执行完了之后才能够开始执行
  • 26. 程序的调试日志文件 分门别类存放在hadoop-version/logs目录下面,hadoop-username-service-hostname.log 对于用户程序来说,TaskTracker的log可能是最重要的 每一个计算节点上有日志logs/userlogs日志,也有重要的日志信息 在单机上首先执行,看看是否能够正确执行,而后再在多机的集群系统上执行
  • 27. 任务的控制bin/hadoop job –list bin/hadoop job –kill jobid
  • 28. Thank You