• 1. 北京传智播客教育 www.itcast.cnHahoop 培训讲师:刘刚
  • 2. 海量数据处理平台架构一般网站把用户的访问行为记录以apach 日志的形式记录下来了,这些日志中包含了下面一些 关键字段: client_ip ,user_id,access_time,url,referer,status,page_size,agent 因为需要统一对数据进行离线计算,所以常常把它们全部移到同一个地方。 简单算了一下: (1) 网站请求数:1kw/天 (2) 每天日志大小:450Byte/行* 1kw = 4.2G, (3) 日志存储周期:2 年 一天产生4.5G 的日志,2 年需要4.2G * 2 * 365 = 3.0T 解决方案: 为了方便系统命令查看日志,不压缩,总共需要3.0T 的空间,刚好有一些2U 的服务器,每台共1T 的磁盘空间。 为了避免系统盘坏掉影响服务器使用,对系统盘做了raid1。 为了避免其他存放数据的盘坏掉导致数据无法恢复,对剩下的盘做了raid5。 所有的数据都汇聚到这几台LogBackup 服务器上来了。北京传智播客教育 www.itcast.cn
  • 3. 有了LogBackup 服务器,离线统计就可以全部在这些服务器上进行了。在这套架构上,用wc、grep、sort、uniq、awk、sed 等系统命令,完成了很多的统计需求,比如统计访问频率较高的client_ip,某个新上线的的页面的referer 主要是哪些网站。 当业务的迅猛发展,网站流量爆发增长,产品经理如果想从中获取更多的用户特征和用户信息, 就需要我们这些数据分析人员从不同的日志中找到令他们满意的答案。如果 (1) 日志总行数:10 亿/天 (2) 每天日志大小:450Byte/行* 10 亿= 420G, (3) 日志种类:5 种北京传智播客教育 www.itcast.cn
  • 4. Hadoop能解决哪些问题海量数据需要及时分析和处理。 海量数据需要深入分析和挖掘。 数据需要长期保存 问题: 磁盘IO成为一种瓶颈,而非CPU资源。 网络带宽是一种稀缺资源 硬件故障成为影响稳定的一大因素 北京传智播客教育 www.itcast.cn
  • 5. Hadoop在国内的情景奇虎360:Hadoop存储软件管家中软件,使用CDN技术将用户请求引到最近的Hadoop集群并进行下载 京东、百度:存储、分析日志、数据挖掘和机器学习(主要是推荐系统) 广告类公司:存储日志,通过协调过滤算法为客户推荐广告 Yahoo:垃圾邮件过滤 华为:云计算平台 Facebook:日志存储,实时分析 某公安部项目:网民QQ聊天记录与关联人调查系统,使用Hbase实现 某学校:学生上网与社会行为分析,使用hadoop 淘宝、阿里:国内使用Hadoop最深入的公司,整个Taobao和阿里都是数据驱动的 北京传智播客教育 www.itcast.cn
  • 6. Hadoop开发人员市场需求和待遇北京传智播客教育 www.itcast.cn
  • 7. Hadoop在国内的人才储备北京传智播客教育 www.itcast.cn
  • 8. Hadoop介绍1)作者:Doug Cutting 2)用Java编写的开源系统,能够安排在大规模的计算平台上,从而长进计算效率。 3)Nutch搜索引擎里面的一个模块。 4)受Google三篇论文的启发 ---MapReduce GFS Bigtable 5)google hadoop mapreduce mapreduce GFS HDFS Bigtable Hbase 北京传智播客教育 www.itcast.cn
  • 9. Hadoop生态系统介绍Hbase 1)Nosql数据库,Key-Value存储 2)最大化利用内存 HDFS 1) hadoop distribute file system分布式文件系统 2)最大化利用磁盘 MapReduce 1)编程模型,主要用来做数据的分析 2)最大化利用CPU北京传智播客教育 www.itcast.cn
  • 10. HDFS篇北京传智播客教育 www.itcast.cn
  • 11. HDFS设计原则文件以块(block)方式存储 每个块带下远比多数文件系统来的大(预设64M) 通过副本机制提高可靠度和读取吞吐量 每个区块至少分到三台DataNode上 单一 master (NameNode)来协调存储元数据(metadata) 客户端对文件没有缓存机制 (No data caching) 北京传智播客教育 www.itcast.cn
  • 12. HDFS系统结构北京传智播客教育 www.itcast.cn
  • 13. NameNode(NN)NameNode主要功能提供名称查询服务,它是一个jetty服务器 NameNode保存metadate信息包括 文件owership和permissions 文件包含哪些块 Block保存在哪个DataNode(由DataNode启动时上报) NameNode的metadate信息在启动后会加载到内存 metadata存储到磁盘文件名为”fsimage” Block的位置信息不会保存到fsimage 北京传智播客教育 www.itcast.cn
  • 14. NameNode块存储结构metadate物理存储结构
  • 15. DataNode(DN)保存Block 启动DN线程的时候会向NN汇报block信息 通过向NN发送心跳保持与其联系(3秒一次),如果NN 10分钟没有收到DN的心跳,则认为其已经lost,并copy其上的block到其它DN
  • 16. Block的副本放置策略第一个副本:放置在上传文件的DN;如果是集群外提交,则随机挑选一台磁盘不太满,CPU不太忙的节点 第二个副本:放置在于第一个副本不同的机架的节点上 第三个副本:与第二个副本相同集群的节点 更多副本:随机节点
  • 17. 再说Block设置一个Block 64MB,如果上传文件小于该值,仍然会占用一个Block的命名空间(NameNode metadata),但是物理存储上不会占用64MB的空间 Block大小和副本数由Client端上传文件到HDFS时设置,其中副本数可以变更,Block是不可以再上传后变更的 北京传智播客教育 www.itcast.cn
  • 18. 数据损坏(corruption)处理当DN读取block的时候,它会计算checksum 如果计算后的checksum,与block创建时值不一样,说明该block已经损坏。 client读取其它DN上的block;NN标记该块已经损坏,然后复制block达到预期设置的文件备份数 DN在其文件创建后三周验证其checksum 北京传智播客教育 www.itcast.cn
  • 19. SecondaryNameNode(SNN)它不是NN的热备份 它可以作为冷备份 将本地保存的fsimage导入 修改cluster所有DN的NameNode地址 修改所有client端NameNode地址 or 修改SNN IP为原NNIP 它的工作是帮助NN合并edits log,减少NN启动时间 北京传智播客教育 www.itcast.cn
  • 20. SecondaryNameNode根据时间 根据过edits log大小
  • 21. HDFS文件权限与Linux文件权限类似 r: read; w:write; x:execute,权限x对于文件忽略,对于文件夹表示是否允许访问其内容 如果Linux系统用户zhangsan使用hadoop命令创建一个文件,那么这个文件在HDFS中owner就是zhangsan HDFS的权限目的:阻止好人错错事,而不是阻止坏人做坏事。HDFS相信,你告诉我你是谁,我就认为你是谁
  • 22. 安全模式1) namenode启动的时候,首先将映像文件(fsimage)载入内存,并执行编辑日志(edits)中的各项操作。 2)一旦在内存中成功建立文件系统元数据的映射,则创建一个新的fsimage文件(这个操作不需要SecondaryNameNode)和一个空的编辑日志。 3)NameNode开始监听RPC和HTTP请求。 4)此刻namenode运行在安全模式。即namenode的文件系统对于客服端来说是只读的。(显示目录,显示文件内容等。写、删除、重命名都会失败)。 5)系统中数据块的位置并不是由namenode维护的,而是以块列表形式存储在datanode中。
  • 23. 安全模式(续)6)在系统的正常操作期间,namenode会在内存中保留所有快位置的映射信息。 7)在安全模式下,各个datanode会向namenode发送快列表的最新情况。 8)进入和离开安全模式 查看namenode处于哪个状态 hadoop dfsadmin –safemode get 进入安全模式(hadoop启动的时候是在安全模式) hadoop dfsadmin –safemode enter 离开安全模式 hadoop dfsadmin -safemode leave
  • 24. 委任和解除节点 hadoop集群管理员需要经常向集群中添加节点,或从集群中移除节点。 委任新节点
  • 25. HDFS文件读取
  • 26. HDFS文件写入
  • 27. HDFS文件存储 两个文件,一个文件156M,一个文件128在HDFS里面怎么存储? --Block为64MB --rapliction默认拷贝3份北京传智播客教育 www.itcast.cn
  • 28. HDFS文件存储结构北京传智播客教育 www.itcast.cn
  • 29. HDFS开发常用命令创建一个文件夹? 上传一个文件? 删除一个文件和文件夹? 查看一个文件夹里面有哪些文件? 查看某个文件的内容?北京传智播客教育 www.itcast.cn
  • 30. Hadoop管理员常用命令hadoop job –list #列出正在运行的Job hadoop job –kill #kill job hadoop fsck / #检查HDFS块状态,是否损坏 hadoop fsck / -delete #检查HDFS块状态,删除损坏块 hadoop dfsadmin –report #检查HDFS状态,包括DN信息 hadoop dfsadmin –safemode enter | leave hadoop distcp hdfs://a:8020/xxx hdfs://b:8020/// #并行copy ./bin/start-balancer.sh #平衡集群文件 北京传智播客教育 www.itcast.cn
  • 31. HDFS API详解static FileSystem get(Configuration conf) operator() { //step1 得到Configuration对象 //step2 得到FileSystem对象 //step3 进行文件操作 } 北京传智播客教育 www.itcast.cn
  • 32. 用Java对HDFS编程文件操作 1.1  上传本地文件到hadoop fs 1.2 在hadoop fs中新建文件,并写入 1.3 删除hadoop fs上的文件 1.4  读取文件 1.5 文件修改时间 2. 目录操作 2.1 在hadoop fs上创建目录 2.2 删除目录 2.3 读取某个目录下的所有文件  2.4遍历hdfs hdfs信息 查找某个文件在HDFS集群的位置 获取HDFS集群上所有节点名称信息 北京传智播客教育 www.itcast.cn
  • 33. mapreduce篇北京传智播客教育 www.itcast.cn
  • 34. 北京传智播客教育 www.itcast.cn 开发hadoop依赖的jar和自带的example到http://hadoop.apache.org/ 下载hadoop-0.20.2.tar.gz,目前最新版为1.0.3。下载完后解压文件,y有hadoop-0.20.2-core.jar, hadoop-0.20.2-examples.jar, hadoop-0.20.2-core.jar :hadoop的核心类库 Hadoop所依赖的jar: hadoop所依赖的jar在lib目录下面。 Hadoop 自带的一些案例分析: hadoop-0.20.2-examples.jar是hadoop-0.20.2自带的一些案例。介绍如下: 1) aggregatewordcount 计算输入文件中文字个数的基于聚合的MapReduce程序。 2) aggregatewordhist 生成输入文件中文字个数的统计图的基于聚合的MapReduce程序。 3) grep 计算输入文件中匹配正则表达式的文字个数的MapReduce程序。 4) join 合并排序的平均分割的数据集的作业。 5)pentomino 解决五格拼版问题的分块分层的MapReduce程序。
  • 35. Hadoop自带的examples.jar介绍北京传智播客教育 www.itcast.cn6)pi 使用蒙地卡罗法计算PI的MapReduce程序。 7)Randomtextwriter 在一个节点上写10G随机文本的MapReduce程序。 8)randomwriter 在每个节点上写10G随机数据的MapReduce程序。 9) sleep 在每个Map和Reduce作业中休憩的程序。 10)sort 排序随机写入器生成的数据的MapReduce程序。 11)sudoku 一个九宫格游戏的解决方案。 12)wordcount 在输入文件中统计文字个数的统计器。
  • 36. 实例 写MapReduce程序的步骤: 1.把问题转化为MapReduce模型 2.设置运行的参数 3.写map类 4.写reduce类 例子:统计单词个数 My name is liu gang What is your nameMy 1 name 2 is 2 What 1 your 1 liu 1 gang 1
  • 37. MapReduce模型北京传智播客教育 www.itcast.cn1.Map端 一行行读文件,程序转化为中间Key/Value. My name is liu gang ->My 1,name 1, is 1, liu 1, gang 1 What is your name ->What 1, is 1, your 1 name 1 2.Reduce端 相同的key肯定会在一起。经过Reduce方法处理后, 形成最终的key/Value. name 1,name 1->name 2;
  • 38. 运行步骤北京传智播客教育 www.itcast.cn1)打成jar包。 2)创建一个word.txt文件 3)把word.txt文件传到HDFS上面 hadoop fs –copyFromLocal 4)执行hadoop jar <完整的类名> 5)查看执行结果 hadoop fs –text /path
  • 39. MapReduce执行流程
  • 40. MapReduce基本流程
  • 41. JobTracker(JT)和TaskTracker(TT)简介
  • 42. 再论JobTracker(JT)和TaskTracker(TT)JobTracker:协作作业的运行 taskTracker:运行作业划分后的任务
  • 43. Mapreduce原理1)一个文件file.txt 2)存储file.txt文件 3) 统计file.txt文件里面”Refund”个数
  • 44. JobTracker失败1)JobTracker失败在所有的失败中是最严重的一种。 2)hadoop没有处理jobtracker失败的机制。--它是一个单点故障。 3)在未来的新版本中可能可以运行多个JobTracker。 4)可以使用ZooKeeper来协作JobTracker。
  • 45. TaskTracker失败1)一个TaskTracker由于崩溃或运行过于缓慢而失败,它会向JobTracker发送“心跳”。 2)如果有未完成的作业,JobTracker会重新把这些任务分配到其他的TaskTracker上面运行。 3)即使TaskTracker没有失败也可以被JobTracker列入黑名单。
  • 46. Hadoop Job Scheduler1)Hadoop默认的调度器是基于队列的FIFO调度器。 所有用户的作业都被提交到一个队列中,然后由JobTracker先按照作业的优先级高低,再按照作业提交时间的先后顺序选择将被执行的作业。 优点: 调度算法简单明了,JobTracker工作负担轻。 缺点: 忽略了不同作业的需求差异。 2)还用两个多用户调度器 --Fair Scheduler(公平调度器) --Capacity Scheduler(计算能力调度)
  • 47. Fair Scheduler(公平调度器)多个Pool,Job需要被提交到某个Pool中 每个pool可以设置最小 task slot,称为miniShare FS会保证Pool的公平 Pool内部支持Priority设置 支持资源抢占
  • 48. 北京传智播客教育 www.itcast.cn mapreduce的类型与格式Hadoop 0.20.x之前的API Hadoop的MapReduce中,map和reduce函数遵循如下常规格式: map: (K1, V1) → list(K2, V2) reduce: (K2, list(V2)) → list(K3, V3) Mapper的接口: public interface Mapper extends JobConfigurable, Closeable { void map(K1 key, V1 value, OutputCollector output, Reporter reporter) throws IOException; } Reduce的接口: public interface Reducer extends JobConfigurable, Closeable { void reduce(K2 key, Iterator values, OutputCollector output, Reporter reporter) throws IOException; } //outputCollector 是为了输出key/value对, //Reporter 是用来更新计数和状态信息。
  • 49. 北京传智播客教育 www.itcast.cnHadoop 0.20.x之后的API Hadoop的MapReduce中,map和reduce函数遵循如下常规格式: map: (K1, V1) → list(K2, V2) reduce: (K2, list(V2)) → list(K3, V3) Mapper的接口: protected void map(KEY key, VALUE value, Context context) throws IOException, InterruptedException { } Reduce的接口: protected void reduce(KEY key, Iterable values, Context context ) throws IOException, InterruptedException { } //Context是上下文对象,这里Context等同于OutputCollector和Reporter两个函数的功能。
  • 50. 北京传智播客教育 www.itcast.cn mapreduce的数据类型与java类型对应关系Java的基本类型Writable实现booleanBooleanWritablebyteByteWritableintIntWritablefloatFloatWritablelongLongWritabledoubleDoubleWritableStringText
  • 51. Writable接口北京传智播客教育 www.itcast.cn1.对Java中的int型进行封装那么就是hadoop中的IntWritable类 在写程序时可以把IntWritable可以看着是int 类型,它实现 了WritableComparable接口。 WritableComparable又是Writable、java.lang.comparable接口的子接口。 2.Writable类对所有的Java基本类型进行封装: 如:boolean -> BooleanWritable;Byte -> ByteWritable 3. 我们可以自定义Writable接口,来编写更复杂的结构的类。 核心:hadoop有自己一套的I/O机制。I/O类都必须实现Writable接口。
  • 52. 北京传智播客教育 www.itcast.cn 实现自定义的mapreduce类型public class LogKey implements WritableComparable{ private String customerId; private String dateTime; private String domain; public void readFields(DataInput in) throws IOException { customerId = in.readUTF(); dateTime = in.readUTF(); domain = in.readUTF(); } public void write(DataOutput out) throws IOException { out.writeUTF(customerId); out.writeUTF(dateTime); out.writeUTF(domain);} } //当我们在实际开发的时候会遇到多个key的时候,mapreduce自带的类型是不能满足我们的需求,这样我们就要自己来定制化Key和Value。
  • 53. 北京传智播客教育 www.itcast.cn 最小的MapReduce驱动public class MinimalMapReduceWithDefaults extends Configured implements Tool { public int run(String[] args) throws IOException { JobConf conf = JobBuilder.parseInputAndOutput(this, getConf(), args); if (conf == null) { return -1;} conf.setInputFormat(TextInputFormat.class); conf.setNumMapTasks(1); conf.setMapperClass(IdentityMapper.class); conf.setMapRunnerClass(MapRunner.class); conf.setMapOutputKeyClass(LongWritable.class); conf.setMapOutputValueClass(Text.class); conf.setPartitionerClass(HashPartitioner.class); conf.setNumReduceTasks(1); conf.setReducerClass(IdentityReducer.class); conf.setOutputKeyClass(LongWritable.class); conf.setOutputValueClass(Text.class); conf.setOutputFormat(TextOutputFormat.class); JobClient.runJob(conf); return 0;} public static void main(String[] args) throws Exception { int exitCode = ToolRunner.run(new MinimalMapReduceWithDefaults(), args); System.exit(exitCode); }}
  • 54. 北京传智播客教育 www.itcast.cn mapreduce驱动默认的设置InputFormat(输入)TextInputFOrmatMapperClass(map类)IdentityMapperMapRunnerClass(map启动类)MapRunnerMapOutputKeyClassLongWritableMapOutputValueClassTextPartitionerClassHashPartitionerReduceClassIdentityReduceOutputKeyClassLongWritableOutputValueClassTextOutputFormatClassTextOutputFormat
  • 55. 北京传智播客教育 www.itcast.cn Combiners和Partitioner编程Combiners的作用: 每一个map可能会产生大量的输出,combiner的作用就是在map端对输出先做一次合并,以减少传输到reducer的数据量, 1)combiner最基本是实现本地key的聚合,对map输出的key排序,value进行迭代。如下所示: map: (K1, V1) → list(K2, V2)  combine: (K2, list(V2)) → list(K2, V2)  reduce: (K2, list(V2)) → list(K3, V3) 2)combiner还具有类似本地的reduce功能. 例如hadoop自带的wordcount的例子和找出value的最大值的程序,combiner和reduce完全一致。如下所示: map: (K1, V1) → list(K2, V2)  combine: (K2, list(V2)) → list(K3, V3)  reduce: (K3, list(V3)) → list(K4, V4)  3)如果不用combiner,那么,所有的结果都是reduce完成,效率会相对低下。使用combiner,先完成的map会在本地聚合,提升速度。 4)对于hadoop自带的wordcount的例子,value就是一个叠加的数字,所以map一结束就可以进行reduce的value叠加,而不必要等到所有的map结束再去进行reduce的value叠加。 注意:combiner使用的合适,可以在满足业务的情况下提升job的速度,如果不合适,则将导致输出的结果不正确。 Combiner的输出是Reducer的输入,Combiner绝不能改变最终的计算结果。所以从我的想法来看,Combiner只应该用于那种Reduce的输入key/value与输出key/value类型完全一致,且不影响最终结果的场景。比如累加,最大值等。
  • 56. 北京传智播客教育 www.itcast.cn Combiners分析假设有两个map。 第一个map的输出为: (1950,0) (1950,20) (1950,10) 第二个map输出为: (1950,25) (1950,15) (1950,30) Reduce函数被调用是,输入如下: (1950,[0,20,10,25,15,30]) 因为30是最大的值,所以输出如下: (1950,30) 如果我们使用 combiner:那么reduce调用的时候传入的数据如下: (1950,[20,30])--(1950,30) 用表达式表示为: Max(0,20,10,25,15,30)=max(max(0,20,10),max(25,15,30))=max(20,30)=30
  • 57. 北京传智播客教育 www.itcast.cn 使用 Combiners要小心刚才我们是计算最大值可以使用Combiners能提高效率。 如果我们要是求平均值呢? Avg(0,20,10,25,15,30) = 15 如果使用Combiner会得到什么样的结果呢? 第一个map输出为: avg(0,20,10) = 10 第二个map输出为: Avg(25,15,30) = 23 输入到reduce出来的结果为: Avg(10,23) = 17.5 17.5和15? 所以 :使用combiner一定要注意。
  • 58. 北京传智播客教育 www.itcast.cn Partitioner分析Partitioner 在mapreduce的位置:
  • 59. 北京传智播客教育 www.itcast.cn mapreduce提供的PatitionerPartition主要作用就是将map的结果发送到相应的reduce。这就对partition有两个要求: 1)均衡负载,尽量的将工作均匀的分配给不同的reduce。 2)效率,分配速度一定要快。 mapreduce提供的Patitioner
  • 60. 北京传智播客教育 www.itcast.cnPartitioner是partitioner的基类,如果需要定制partitioner也需要继承该类。 2. HashPartitioner是mapreduce的默认partitioner。计算方法是 which reducer=(key.hashCode() & Integer.MAX_VALUE) % numReduceTasks,得到当前的目的reducer。 BinaryPatitioner继承于Partitioner< BinaryComparable ,V>,是Partitioner的偏特化子类。该类提供leftOffset和rightOffset,在计算which reducer时仅对键值K的[rightOffset,leftOffset]这个区间取hash。 Which reducer=(hash & Integer.MAX_VALUE) % numReduceTasks KeyFieldBasedPartitioner也是基于hash的个partitioner。和BinaryPatitioner不同,它提供了多个区间用于计算hash。当区间数为0时KeyFieldBasedPartitioner退化成HashPartitioner。 5. TotalOrderPartitioner这个类可以实现输出的全排序。不同于以上3个partitioner,这个类并不是基于hash的。在下一节里详细的介绍totalorderpartitioner。
  • 61. 北京传智播客教育 www.itcast.cn 自定义的Partitioner1)为何使用Partitioner,主要是想reduce的结果能够根据key再次分类输出到不同的文件夹中。 2)结果能够直观,同时做到对数据结果的简单的统计分析。 需求: 1、输入的数据文件内容如下(1条数据内容少,1条数据内容超长,3条数据内容正常): Kaka 1 28 hua 0 26 chao 1 tao 1 22 mao 0 29 22 2、目的是为了分别输出结果,正确的结果输出到一个文本,太短的数据输出到一个文本,太长的输出到一个文本,共三个文本输出。 Patitioner接口: public int getPartition(Text key, Text value, int numPartitions); numPartitions为Reduce的个数。 注:在本地作业运行器上运行时,只支持0个或一个Reduce。
  • 62. 北京传智播客教育 www.itcast.cn 多文件输出MultipleOutputFormat在hadoop0.20.2的版本中MultipleOutputFormat已经被Deprecated掉,所以我们要自己实现一个MultipleOutputFormat。 public abstract class MultipleOutputFormat, V extends Writable> extends FileOutputFormat {} 案例:有一个文件里面含有 aaaa bbbb cccc dddd eeee ………等字符。 我希望: a开头的输入到a.txt文件里并统计个数。 b开头的输入到b.txt文件里并统计个数。 c开头的输入到c.txt文件里并统计个数。一次类推。 //案例分析
  • 63. 北京传智播客教育 www.itcast.cn 辅助类GenericOptionsParser、Tool和ToolRunnerHadoop为了简化命令命令行方式运行作业,hadoop自带了一些辅助类: GenericOptionsParser是一个类,用来解释常用的Hadoop命令行选项,并根据需要,为Configuration对象设置相应的取值。通常不直接使用GenericOptionsParser,更方便的方式是:实现Tool接口,通过ToolRunner来运行应用程序,ToolRunner内部调用GenericOptionsParser: public class ConfigurationPrinter extends Configured implements Tool {     static {       Configuration.addDefaultResource("hdfs-default.xml");       Configuration.addDefaultResource("hdfs-site.xml");       Configuration.addDefaultResource("mapred-default.xml");       Configuration.addDefaultResource("mapred-site.xml");   }   @Override   public int run(String[] args) throws Exception {     Configuration conf = getConf();     for (Entry entry: conf) {       System.out.printf("%s=%s\n", entry.getKey(), entry.getValue());     }       return 0;     }     public static void main(String[] args) throws Exception {       int exitCode = ToolRunner.run(new ConfigurationPrinter(), args);       System.exit(exitCode);     }   }
  • 64. 北京传智播客教育 www.itcast.cn hadoop支持的压塑库压塑格式总结: Google 的Snappy 一个高速压塑库。作为一个压缩库,它可以利用单颗Intel Corei7处理器内核处理至少每秒250MB~500MB的数据流。
  • 65. 北京传智播客教育 www.itcast.cn hadoop的压塑codecCodec为压缩,解压缩的算法实现。 在Hadoop中,codec由CompressionCode的实现来表示。下面是一些实现:
  • 66. 北京传智播客教育 www.itcast.cn MapReduce的输出进行压塑Map任务输出的压塑属性: 1)在程序中运用: Configuration conf = new Configuration(); conf.setBoolean("mapred.output.compress",true); conf.setClass("mapred.output.compression.codec",GzipCodec.class,CompressionCodec.class); 2)mapred.xml name :mapred.output.compress . value:true name:mapred.output.compression.codec“ value: GzipCodec.class,CompressionCodec.class
  • 67. 北京传智播客教育 www.itcast.cn Distributed Cache的使用hadoop DistributedCache(分布式缓存)的使用 Configuration conf = new Configuration(); DistributedCache.createSymlink(conf); DistributedCache.addFileToClassPath(new Path(“/user/hadoop/app_depend/lib/commons-cli-1.2.jar”), conf);//第三方jar。 DistributedCache.addCacheFile(new URI("/user/root/input/testFile#testFile"), conf);//可以用testFile代替前面的文件 Job job = new Job(conf); ##注意:DistributedCache的操作一定要放在job的初始化之前,否则会报出文件找不到的异常 >>>>>>在map端打开的方法是 FileReader fr = new FileReader("testFile");
  • 68. 文件模式北京传智播客教育 www.itcast.cnHadoop支持的通配符与Unix bash相同。 1)通配符及其含义 通配符 名称 匹配 * 星号 匹配0个或多个字符 ? 问号 匹配单一字符 [ab] 字符类 匹配{a,b}集合中的一个字符 [^ab] 非字符类类 匹配非{a,b}集合中的一个字符 [a-b] 字符范围 匹配一个在{a,b}范围内的字符包含(ab) [^a-b] 非字符范围 匹配一个不在{a,b}范围内的字符包含(ab) {a,b} 或选择 匹配包含a 或 b中的一个 \c 转义字符 匹配元字符c
  • 69. 通过PathFilter进行文件过滤北京传智播客教育 www.itcast.cn当输入路径中有些文件我们不需要的时候,可以通过PathFilter进行文件的过滤操作。 例如我们有一下一些文件: /user/hadoop/log/00001/05/25/access-log-201205250905.gz /user/hadoop/log/00001/05/25/access-log-201205250906.gz /user/hadoop/log/00001/05/25/access-log-201205250910.gz /user/hadoop/log/00001/05/25/access-log-201205250915.gz /user/hadoop/log/00004/05/25/access-log-201205250905.gz 1)我们只需要处理201205250905.gz和201205250906.gz文件怎么实现? 2)我们需要处理所有的文件怎么实现? /user/hadoop/log/*/05/25/access-log-2012052509*.gz 第一种方法: /user/hadoop/log/00001/05/25/access-log-20120525090{5,6}.gz 第二种方法: 我们自定义FilterPath来实现。 1) 自定FilterPath必须 implements PathFilter。 accept(Path path)方法 Tests whether or not the specified abstract pathname should be included in a pathname list.
  • 70. Mapreduce实战北京传智播客教育 www.itcast.cn设输入为如下的序列对: str1 3 str2 2 str1 1 str3 9 str2 10 我们期望的输出结果为: str1 1,3 str2 2,10 str3 9
  • 71. MRUnit测试北京传智播客教育 www.itcast.cn MRUnit针对不同测试对象分别使用以下几种Driver:   | MapDriver,针对单独的Map测试。 l  ReduceDriver,针对单独的Reduce测试。 l  MapReduceDriver,将Map和Reduce连贯起来测试。 l  PipelineMapReduceDriver,将多个Map-Reduce pair贯串测试。 MapDriver的使用 ReduceDriver的使用 MapReduceDriver的使用
  • 72. Mapreduce的输入北京传智播客教育 www.itcast.cn 在执行mapreduce之前,原始数据被分割成若干split,每个split作为一个map任务的输入,在map执行过程中split会被分解成一个个记录(key-value对),map会依次处理每一个记录。 FileInputFormat:  FileInputFormat是所有以文件作为数据源的InputFormat实现的基类,FileInputFormat保存作为job输入的所有文件,并实现了对输入文件计算splits的方法。至于获得记录的方法是有不同的子类进行实现的。  获得了输入文件后,FileInputFormat是怎样将他们划分成splits的呢?
  • 73. 输入文件的split北京传智播客教育 www.itcast.cn 1) FileInputFormat只划分比HDFS block大的文件,所以FileInputFormat划分的结果是这个文件或者是这个文件中的一部分. 2) 如果一个文件的大小比block小,将不会被划分,这也是Hadoop处理大文件的效率要比处理很多小文件的效率高的原因。 3) 当Hadoop处理很多小文件(文件大小小于hdfs block大小)的时候,由于FileInputFormat不会对小文件进行划分,所以每一个小文件都会被当做一个split并分配一个map任务,导致效率底下。 例如: 一个1G的文件,会被划分成16个64MB的split,并分配16个map任务处理,而10000个100kb的文件会被10000个map任务处理。   
  • 74. 默认的输入TextInputFormat北京传智播客教育 www.itcast.cn1)TextInputformat是默认的inputformat,对于输入文件。 2)文件中每一行作为一个记录,他将每一行在文件中的起始偏移量作为key,每一行的内容作为value。 3)默认以\n或回车键作为一行记录。 4)TextInputFormat继承了FileInputFormat。
  • 75. 自带的输入类北京传智播客教育 www.itcast.cn1)CombinarFileInputFormat 相对于大量的小文件来说,hadoop更合适处理少量的大文件。 CombinarFileInputFormat可以缓解这个问题,它是针对小文件而设计的。 2)KeyValueTextInputFormat 当输入数据的每一行是两列,并用tab分离的形式的时候,KeyValueTextInputformat处理这种格式的文件非常适合。 3) NLineInputformat  NLineInputformat可以控制在每个split中数据的行数。 4)SequenceFileInputformat  当输入文件格式是sequencefile的时候,要使用SequenceFileInputformat作为输入。  
  • 76. 自定义输入格式北京传智播客教育 www.itcast.cn1)继承FileInputFormat基类。 2)重写里面的isSplitable(FileSyatem fs,Path fileName)方法。 3)重写getRecordReader()方法。 public interface InputFormat { InputSplit[] getSplits(JobConf job, int numSplits) throws IOException; RecordReader getRecordReader(InputSplit split, JobConf job,Reporter reporter) throws IOException; }
  • 77. Hadoop的输出北京传智播客教育 www.itcast.cn1)TextOutputformat 默认的输出格式,key和value中间值用tab隔开的。 2)SequenceFileOutputformat 将key和value以sequencefile格式输出。 3)sequencefileAsOutputFormat 将key和value以原始二进制的格式输出。 4)MapFileOutputFormat 将key和value写入MapFile中。由于MapFile中的key是有序的,所以写入的时候必须保证记录是按key值顺序写入的。 5)MultipleOutputFormat 默认情况下一个reducer会产生一个输出,但是有些时候我们想一个reducer产生多个输出,MultipleOutputFormat和MultipleOutputs可以实现这个功能。
  • 78. Hadoop小文件处理北京传智播客教育 www.itcast.cn HDFS上每个文件都要在namenode上建立一个索引,这个索引的大小约为150byte,这样当小文件比较多的时候,就会产生很多的索引文件,一方面会大量占用namenode的内存空间,另一方面就是索引文件过大是的索引速度变慢, 解决的方式 (1)Hadoop本身提供了一些文件压缩的方案 (2)从系统层面改变现有HDFS存在的问题,其实主要还是小文件的合并,然后建立比较快速的索引。
  • 79. Hadoop自带小文件解决方案北京传智播客教育 www.itcast.cn1)Hadoop Archive 是一个高效地将小文件放入HDFS块中的文件存档工具,它能够将多个小文件打包成一个HAR文件,这样在减少namenode内存使用的同时。 2)Sequence file sequence file由一系列的二进制key/value组成,如果为key小文件名,value为文件内容,则可以将大批小文件合并成一个大文件。 3)CombineFileInputFormat。 CombineFileInputFormat是一种新的inputformat,用于将多个文件合并成一个单独的split,另外,它会考虑数据的存储位置。
  • 80. hadoop实战小文件优化 开启Jvm重用对Job影响: hadoop集群机器配置:三台Ubnutu虚拟机,内存512M 文件数文件大小JVM重用耗时Jobid48157.54 GBY32mins, 5secjob_201206161018_000448157.54 GBN58mins, 49secjob_201206161018_0014结论:对于大量小文件Job,开启JVM重用减少45%运行时间
  • 81. 参数mapred.reduce.parallel.copies任务时间mapred.reduce.parallel.copies54mins, 21sec5(默认值)48mins, 30sec20通过配置参数mapred.reduce.parallel.copies可以提升12%性能优化项优化方法可以减少Job时间Jvm重用开启jvm重用45%mapred.reduce.parallel.copies默认值为5,优化值2012%
  • 82. Mapreduce作业调优北京传智播客教育 www.itcast.cn哪些因素影响作业的运行效率? 1)mapper的数量:尽量将输入数据切分成数据块的整数倍。如有太多小文件,则考虑CombineFileInputFormat; 2)reducer的数量:为了达到最高性能,集群中reducer数应该略小于reducer的任务槽数。 3)combiner: 充分使用合并函数减少map和reduce之间传递的数据量,combiner在map后运行; 4)中间值的压缩:对map输出值进行压缩减少到reduce前的传递量(conf.setCompressMapOutput(true)和setMapOutputCompressorClass(GzipCodec.class)); 5)自定义序列:如果使用自定义的Writable对象或自定义的comparator,则必须确保已实现RawComparator
  • 83. MapReduce Streaming编程1)非java语言的map-reduce编程接口。 例如:shell、c、python、ruby等。 2)使用hadoop自带的hadoop-streaming-0.20.2.jar 3)手动指定map和reduce函数。
  • 84. 运行一个MapReduce Streaming程序一、单机测试 cat test.log | wc -l 二、将文件上传到集群 /bin/hadoop fs -copyFromLocal test.log /hdfs/ 三、运行map red /bin/hadoop jar contrib/streaming/hadoop-streaming-0.20.203.0.jar -mapper cat -reducer 'wc -l' -input /data/test.log -output /data/result/