• 1. 报告人:刘凯毅 / 报告时间:2012年08月4日 所属部门:北京蓝汛通信技术有限责任公司CCINDEX 内容概要:HDFS运行,MapReduce概要 技术:kaiyi.liu@chinacache.com 招聘: lili.liu@chinacache.com CCINDEX TEAMHADOOP 分享北京蓝汛通信技术有限责任公司
  • 2. 目录 HDFS 文件系统 与 HDFS 窥探内部结构 运行 正常流程 / 非正常流程 / 发现异常 MapReduce 参与 计算角色 窥探 JOB 运行 MR 提供重写 接口 配置参数调优 Hadoop 组成
  • 3. HDFS 文件系统
  • 4. HDFS 什么是文件系统 文件系统:操作系统用于明确磁盘或分区上的文件的方法和数据结构; 即在磁盘上组织文件的方法。 BlockSize 、 inode HDFS DN 单盘配置 > mkfs.ext4 -T largefile /dev/sdxx # inode_ratio 1M ext4 (rw,noatime,nodiratime,data=writeback,barrier=0) (单机)文件系统 Ext2/3/4 ; HFS ; Btrf ; JFS ; ReiserFs; XFS ; 文件系统,分布式文件系统 与 HDFS
  • 5. HDFS 窥探内部结构
  • 6. HDFS INode:  存放文件及目录的基本信息:名称,父节点、修改时间,访问时间以及UGI信息等。 INodeFile: 继承自INode,除INode信息外, 还有组成这个文件的Blocks列表,重复因子,Block大小 INodeDirectory:继承自INode,此外还有一个INode列表来组成文件或目录树结构 FSDirectory:保存文件树结构,HDFS整个文件系统是通 FSDirectory来管理 Block :组成文件的物理存储,有BlockId,size ,以及时间戳 BlockInfo : 元数据 ( eg:BlockInfo = block2 [dn1,pblock1,nblock3..] ) BlocksMap: 保存数据块到INode和DataNode的映射关系 block ---- blockInfo ( inode ,dn…. ) FSImage:保存的是文件系统的目录树 FSEditlog:  文件树上的操作日志 FSNamesystem: HDFS文件系统管理 窥探内部结构
  • 7. HDFS FSNamesystem: Field FSDirectory dir # file -> block ;INodeDirectory -> List children BlocksMap blocksMap # >get> { INode, datanodes, self ref } NavigableMap datanodeMap #storageid ->block Map> recentInvalidateSets #最近废止block Map> excessReplicateMap #复制过剩 ArrayList heartbeats #心跳 UnderReplicatedBlocks neededReplications #需要复制 PendingReplicationBlocks pendingReplications #正在复制 LeaseManager leaseManager #租赁管理 Daemon hbthread #心跳守护 Daemon lmthread #租赁守护 Daemon smmthread #安全模式守护 Daemon replthread #拷贝守护 窥探内部结构
  • 8. HDFS 文件系统,分布式文件系统 与 HDFS
  • 9. HDFS 1. FSImage / FSEditlog 文件系统,分布式文件系统 与 HDFS
  • 10. HDFS 1. HDFS 正常写入(1) 运行 正常流程 / 非正常流程 / 发现异常
  • 11. HDFS 1. HDFS 正常写入(2) 运行 正常流程 / 非正常流程 / 发现异常
  • 12. HDFS 1. HDFS 正常读 、 非正常读 运行 正常流程 / 非正常流程 / 发现异常
  • 13. HDFS 1. HDFS 非正常写 运行 正常流程 / 非正常流程 / 发现异常
  • 14. HDFS 1. 描述异常 ( dn掉线、宕机;数据块坏了; nn宕机 ) 运行 正常流程 / 非正常流程 / 发现异常
  • 15. HDFS 发现异常 ( dn掉线、宕机;数据块坏了; nn宕机 ) ??我这也有疑问,监测数据块报表不相当耗时问题! 运行 正常流程 / 非正常流程 / 发现异常
  • 16. MapReduceJobClient        每一个job都会在用户端通过JobClient类将应用程序以及配置参数打包成jar文件存储在HDFS,并把路径提交到JobTracker,然后由JobTracker创建每一个Task(即MapTask和ReduceTask)并将它们分发到各个TaskTracker服务中去执行。 JobTracker JobTracker是一个master服务,软件启动之后JobTracker接收job,负责调度job的每一个子任务task运行于TaskTracker上,并监控它们,如果发现有失败的task就重新运行它。一般情况应该把JobTracker部署在单独的机器上。 TaskTracker TaskTracker是运行于多个节点上的slaver服务。TaskTracker主动与JobTracker通信,接收作业,并负责直接执行每一个任务。TaskTracker都需要运行在HDFS的DataNode上, 参与 计算角色
  • 17. MapReduce参与 计算角色
  • 18. MapReduce窥探 JOB 运行 (1)InputFormat mapred.min.split.size dfs.block.size # 影响 Map 数量 这两个配置设置,减少 Map数量来处理小文件问题 Sock read file
  • 19. MapReduce窥探 JOB 运行 (2)Inputformat——》map——》(combine)——》partition——》copy&merge——》sort——》reduce——》outputformat
  • 20. MapReduce配置参数调优(Map)io.sort.mb = 默认100m io.sort.spill.percent = 80% ( buffer sync disk 的阀值) # maptask 最后 merge spill file # 会多次 merge 直到 合并成一个文件 io.sort.factor # CPU 换 IO mapred.compress.map.output(true/false) mapred.map.output.compression.codec =org.apache.hadoop.io.compress.DefaultCodec # combiner函数运行的最小spill数 min.num.spill.for.combine
  • 21. MapReduce配置参数调优(Reduce)reduce的运行是分成三个阶段的。 分别为copy->sort->reduce # reduce 去 map 拉数据 最大线程数 ,可适当调大 mapred.reduce.parallel.copies(default 5) # 拉数据 最大响应等待时间 mapred.reduce.copy.backoff(default 300秒) # reduce 使用合并数据最大内存 百分比 70% mapred.job.shuffle.input.buffer.percent(default 0.7) # buffer sync disk 的阀值 0.66 mapred.job.shuffle.merge.percent
  • 22. MapReduce配置参数调优(全局)为一些特殊需求我们可以再上传 文件至HDFS时可单独配置 dfs.block.size + mapred.min.split.size 去设置 Maptask数量 mapred.local.dir 使用多块盘符,并且较快文件系统 。Ssd盘也可以考虑 tasktracker.http.threads map 输出线程,大集群可调大 mapred.child.java.opts 根据特殊 job 调大、调小内存 或使用特殊jvm参数。
  • 23. MapReduceJob Set setMapperClass(Class cls) setCombinerClass(Class cls) setReducerClass(Class cls) setNumReduceTasks(int tasks) setInputFormatClass(Class cls) setOutputFormatClass(Class cls) setMapOutputKeyClass(Class theClass) setMapOutputValueClass(Class theClass) setOutputKeyClass(Class theClass) setOutputValueClass(Class theClass) setSortComparatorClass(Class cls) int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) setGroupingComparatorClass(Class cls) int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) setPartitionerClass(Class cls) int getPartition(KEY key, VALUE value, int numPartitions) MR 提供重写 接口
  • 24. MapReduceReduic 最大优化,就是合理使用 Partitioner DEMO : 刚开始 - 频道,栏目 pv,uv !!不推荐 M1 -> : : R1 - > : : M2 -> : : R2 - > : :< str: uv,pv> MR 提供重写 接口
  • 25. MapReducekey 选取优化,reduce (奇数 )均衡 M1 -> : : R1 - > : : Mem hashMap : : (uv,pv) , : (uv,pv) … Job end Reduce Hashmap value add MR 提供重写 接口
  • 26. MapReduce深度优化MR: 0. map 中使用 LRU – Map ,待顶出的 才进行 out.writer 1. 使用java nio ByteBuffer 定制 序列化,反序列化 byte2String , byte2Double,byte2Int,byte2Float,byte2Long 。。。 String2byte ,double2Byte , float2Byte , int2Byte ,long2Byte 。。。。 2. 节省 看到的每一个Byte 一些属性就 0~255 就使用一个 byte 特定的类型的流水号(str枚举+int) ,定制规则替换str+int2byte 3. SortComparator a. 排序比较顺序打破 顺序比较 b. 不进行 反序列化对象,直接用 byte[] 进行排序 4. Partitioner 直接使用 byte[] 来进行分区,不用使用 反序列化好的 对象。 5. 时间戳,ip 高消耗CPU操作,合理分配本地缓存 (LRU,或local redis) 6. 正则普遍会比 split 快 ,好正则更是如此 MR 提供重写 接口
  • 27. Q&A北京蓝汛通信技术有限责任公司