hadoop 里执行 MapReduce 任务的几种方式

jopen 11年前

说明:

测试文件:
echo -e "aa\tbb \tcc  bb\tcc\tdd" > 3.txt

hadoop fs -mkdir /data  hadoop fs -put 3.txt /data

全文的例子均以该文件做测试用例,统计单词出现的次数(WordCount)。

1、最原始的方式:java 源码编译打包成jar包后,由 hadoop 脚本调度执行,类似:

bin/hadoop jar /tmp/wordcount.jar WordCount /tmp/input /tmp/output
java 代码 100 多行,我就不贴代码了,具体请见官方范例:

2、MR 脚本开发语言:pig
A1 = load '/data/3.txt';  A = stream A1 through `sed "s/\t/ /g"`;  B = foreach A generate flatten(TOKENIZE((chararray)$0)) as word;  C = filter B by word matches '\\w+';  D = group C by word;  E = foreach D generate COUNT(C), group;  dump E;

注意:不同分隔符对load及后面的$0的影响。
详情请见:
https://gist.github.com/186460
http://www.slideshare.net/erikeldridge/a-brief-handson-introduction-to-hadoop-pig

3、构建数据仓库的类 SQL 开发语言:hive
create table textlines(text string);  load data inpath '/data/3.txt' overwrite into table textlines;  SELECT wordColumn, count(1) FROM textlines LATERAL VIEW explode(split(text,'\t+')) wordTable AS wordColumn GROUP BY wordColumn;

详情请见:
http://my.oschina.net/leejun2005/blog/83045
http://blog.csdn.net/techdo/article/details/7433222

4、跨平台的脚本语言:python

map:
#!/usr/bin/python  import os,re,sys  for line in sys.stdin:   for i in line.strip().split("\t"):    print i

reduce:
#!/usr/bin/python  import os,re,sys  arr = {}  for words in sys.stdin:   word = words.strip()   if word not in arr:    arr[word] = 1   else:    arr[word] += 1  for k, v in arr.items():   print str(k) + ": " + str(v)

最后在shell下执行:
hadoop jar $HADOOP_HOME/contrib/streaming/hadoop-streaming-0.20.203.0.jar -file map.py -file reduce.py  -mapper map.py -reducer reduce.py -input /data/3.txt -output /data/py

注意:脚本开头需要显示指定何种解释器以及赋予脚本执行权限
详情请见:
http://blog.csdn.net/jiedushi/article/details/7390015

5、Linux 下的瑞士军刀:shell 脚本
map:
#!/bin/bash  tr '\t' '\n'

reduce:
#!/bin/bash  sort|uniq -c

最后在shell下执行:

june@deepin:~/hadoop/hadoop-0.20.203.0/tmp>  hadoop jar $HADOOP_HOME/contrib/streaming/hadoop-streaming-0.20.203.0.jar -file map.py -file reduce.py  -mapper map.py -reducer reduce.py -input /data/3.txt -output /data/py  packageJobJar: [map.py, reduce.py, /home/june/data_hadoop/tmp/hadoop-unjar2676221286002400849/] [] /tmp/streamjob8722854685251202950.jar tmpDir=null  12/10/14 21:57:00 INFO mapred.FileInputFormat: Total input paths to process : 1  12/10/14 21:57:00 INFO streaming.StreamJob: getLocalDirs(): [/home/june/data_hadoop/tmp/mapred/local]  12/10/14 21:57:00 INFO streaming.StreamJob: Running job: job_201210141552_0041  12/10/14 21:57:00 INFO streaming.StreamJob: To kill this job, run:  12/10/14 21:57:00 INFO streaming.StreamJob: /home/june/hadoop/hadoop-0.20.203.0/bin/../bin/hadoop job  -Dmapred.job.tracker=localhost:9001 -kill job_201210141552_0041  12/10/14 21:57:00 INFO streaming.StreamJob: Tracking URL: http://localhost:50030/jobdetails.jsp?jobid=job_201210141552_0041  12/10/14 21:57:01 INFO streaming.StreamJob:  map 0%  reduce 0%  12/10/14 21:57:13 INFO streaming.StreamJob:  map 67%  reduce 0%  12/10/14 21:57:19 INFO streaming.StreamJob:  map 100%  reduce 0%  12/10/14 21:57:22 INFO streaming.StreamJob:  map 100%  reduce 22%  12/10/14 21:57:31 INFO streaming.StreamJob:  map 100%  reduce 100%  12/10/14 21:57:37 INFO streaming.StreamJob: Job complete: job_201210141552_0041  12/10/14 21:57:37 INFO streaming.StreamJob: Output: /data/py  june@deepin:~/hadoop/hadoop-0.20.203.0/tmp>  hadoop fs -cat /data/py/part-00000        1 aa         1 bb          1 bb         2 cc         1 dd   june@deepin:~/hadoop/hadoop-0.20.203.0/tmp>

特别提示:上述有些方法对字段后的空格忽略或计算,请注意仔细甄别。

说明:列举了上述几种方法主要是给大家一个不同的思路,
在解决问题的过程中,开发效率、执行效率都是我们需要考虑的,不要太局限某一种方法了。