Nutch的HDFS文件输出

jopen 9年前

以1.7为例,之前Nutch的输出可以自定义其它存储系统中,具体原理不赘述。

项目有个需求,就是文件仍然保存在HDFS中,而不是索引到其它存储系统中。

也就是说,不用写

public class XXX implements IndexWriter

这样的插件了,

那么,问题来了,怎么修改Nutch的源码,使得结果顺利存在HDFS中呢?

----------那就让我们从源头一步一步来修改,碰到问题就解决问题。

首先Crawl.java中,原先在索引阶段有这么一些代码。

if (i > 0) {        linkDbTool.invert(linkDb, segments, true, true, false); // invert links          if (solrUrl != null) {          // index, dedup & merge          FileStatus[] fstats = fs.listStatus(segments, HadoopFSUtil.getPassDirectoriesFilter(fs));                    IndexingJob indexer = new IndexingJob(getConf());          indexer.index(crawlDb, linkDb,                   Arrays.asList(HadoopFSUtil.getPaths(fstats)));            SolrDeleteDuplicates dedup = new SolrDeleteDuplicates();          dedup.setConf(getConf());          dedup.dedup(solrUrl);        }

最关键的是

IndexingJob indexer = new IndexingJob(getConf());          indexer.index(crawlDb, linkDb,                   Arrays.asList(HadoopFSUtil.getPaths(fstats)));

也就是说,这里是索引的入口处。

这里把这些代码屏蔽掉,我个人的方法是 if (solrUrl != null) {------》if (false) {

这样还能保持原先的代码存在,这样如果后面的代码有问题还可以恢复此代码。

---------------接下来呢?添加我们自己的索引任务代码如下:

if (true) {    // add my index job    // index, dedup & merge    FileStatus[] fstats = fs.listStatus(segments,    HadoopFSUtil.getPassDirectoriesFilter(fs));    IndexingJob indexer = new IndexingJob(getConf());    indexer.index(crawlDb, linkDb,Arrays.asList(HadoopFSUtil.getPaths(fstats)), true,false, null);  }

这样,就完成了索引任务外围的改造,这里只是改了个外观,还没伤筋动骨。

下面我们开始对内部进行改造!

-------------------------------------------------------------------------------

首先,我们得找到MR的方法吧,入口在哪呢?

IndexerMapReduce.initMRJob(crawlDb, linkDb, segments, job);

这句话,跟进去,就能看到具体的MR类,如下:

 job.setMapperClass(IndexerMapReduce.class);   job.setReducerClass(IndexerMapReduce.class);

也就是说,MR类都是IndexerMapReduce.class.

那么我们就开始分析这个类的map和reduce函数。

备注: 我的URL文件的格式是 url   \t   sender=xxx   \t   receiver=xxx   \t   oldname=xxx     \t   newname=xxx   \n

---------------

改动的几个地方如下:

1 对于reduce的函数声明

  public void reduce(Text key, Iterator<NutchWritable> values,                       OutputCollector<Text, NutchIndexAction> output, Reporter reporter)

修改为

  public void reduce(Text key, Iterator<NutchWritable> values,                       OutputCollector<Text, Text> output, Reporter reporter)

这会导致出现3个错误,把这3个地方屏蔽掉即可。

2 看reduce的最后两行

NutchIndexAction action = new NutchIndexAction(doc, NutchIndexAction.ADD);  output.collect(key, action);

这里需要做一个改动如下:

// NutchIndexAction action = new NutchIndexAction(doc,    // NutchIndexAction.ADD);    // output.collect(key, action);    Object senderObject = doc.getFieldValue("sender");    Object receiverObject = doc.getFieldValue("receiver");    Object singerObject = doc.getFieldValue("singer");    if (null != senderObject && null != receiverObject      && null != singerObject) {     String sender = senderObject.toString();     String receiver = receiverObject.toString();     String singer = singerObject.toString();     // output it     output.collect(new Text(sender), new Text(singer));     output.collect(new Text(receiver), new Text(singer));    }

 如果此时进行ant编译,自然会报错,如下:

    [javac] /usr/local/music_Name_to_Singer/nutch-1.7/src/java/org/apache/nutch/indexer/IndexerMapReduce.java:53: error: IndexerMapReduce is not abstract and does not override abstract method reduce(Text,Iterator<NutchWritable>,OutputCollector<Text,NutchIndexAction>,Reporter) in Reducer      [javac] public class IndexerMapReduce extends Configured implements      [javac]        ^      [javac] 1 error      [javac] 1 warning

那是因为我们需要修改一个地方:

IndexerMapReduce.java中的

原先的代码:

job.setOutputFormat(IndexerOutputFormat.class);  job.setOutputKeyClass(Text.class);  job.setMapOutputValueClass(NutchWritable.class);  job.setOutputValueClass(NutchWritable.class);

现在要修改为:

 

job.setOutputFormat(TextOutputFormat.class);  job.setOutputKeyClass(Text.class);  job.setMapOutputValueClass(NutchWritable.class);  job.setOutputValueClass(Text.class);


以及

public class IndexerMapReduce extends Configured implements
  Mapper<Text, Writable, Text, NutchWritable>,
  Reducer<Text, NutchWritable, Text, NutchIndexAction> {

修改为

public class IndexerMapReduce extends Configured implements
  Mapper<Text, Writable, Text, NutchWritable>,
  Reducer<Text, NutchWritable, Text, Text> {

然后ant

就可以看到

BUILD SUCCESSFUL
Total time: 15 seconds

表明编译成功!

别急着运行,还有一个地方需要修改!

---------------------- 在InexingJob中有如下一些代码:

 final Path tmp = new Path("tmp_" + System.currentTimeMillis() + "-"                  + new Random().nextInt());            FileOutputFormat.setOutputPath(job, tmp);          try {              JobClient.runJob(job);              // do the commits once and for all the reducers in one go              if (!noCommit) {                  writers.open(job,"commit");                  writers.commit();              }              long end = System.currentTimeMillis();              LOG.info("Indexer: finished at " + sdf.format(end) + ", elapsed: "                      + TimingUtil.elapsedTime(start, end));          } finally {              FileSystem.get(job).delete(tmp, true);          }

表明,Nutch1.7默认是把输出导向到其它输出的,而不是本地HDFS.

所以FileSystem.get(job).delete(tmp, true);是用来删除此文件的,此时我们需要修改这个地方来保留文件。

不然咱辛辛苦苦写的文件,全被一句话任性的删掉了。

------------------------代码如下:

注意:我这里的需求是输出为当天的目录。所以代码为:

//final Path tmp = new Path("tmp_" + System.currentTimeMillis() + "-"                 // + new Random().nextInt());          Calendar cal = Calendar.getInstance();    int year = cal.get(Calendar.YEAR);    int month = cal.get(Calendar.MONTH) + 1;    int day = cal.get(Calendar.DAY_OF_MONTH);          final Path tmp = new Path(getConf().get("pathPrefix"),"year="+year+"/month="+month+"/day="+day);            FileOutputFormat.setOutputPath(job, tmp);          try {              JobClient.runJob(job);              // do the commits once and for all the reducers in one go              if (!noCommit) {                  writers.open(job,"commit");                  writers.commit();              }              long end = System.currentTimeMillis();              LOG.info("Indexer: finished at " + sdf.format(end) + ", elapsed: "                      + TimingUtil.elapsedTime(start, end));          } finally {              //FileSystem.get(job).delete(tmp, true);          }

此时编译是可以通过的。

好,暂时就是这样,效果图:

184940_gqwq_1382024.jpg

 

来自:http://my.oschina.net/qiangzigege/blog/355014