Hadoop 实现多文件输出

jopen 11年前

比如word.txt内容如下:

aaa bbb aba abc

bba bbd bbbc

cc ccd cce

要求按单词的首字母区分单词并分文件输出

代码如下:

LineRecordWriter

package com.hadoop.multi;    import java.io.DataOutputStream;  import java.io.IOException;  import java.io.UnsupportedEncodingException;  import org.apache.hadoop.io.NullWritable;  import org.apache.hadoop.io.Text;  import org.apache.hadoop.mapreduce.RecordWriter;  import org.apache.hadoop.mapreduce.TaskAttemptContext;    public class LineRecordWriter<K, V> extends RecordWriter<K, V> {     private static final String utf8 = "UTF-8";     private static final byte[] newline;     static {    try {     newline = "n".getBytes(utf8);    } catch (UnsupportedEncodingException uee) {     throw new IllegalArgumentException("can't find " + utf8       + " encoding");    }   }     protected DataOutputStream out;   private final byte[] keyValueSeparator;     public LineRecordWriter(DataOutputStream out, String keyValueSeparator) {    this.out = out;    try {     this.keyValueSeparator = keyValueSeparator.getBytes(utf8);    } catch (UnsupportedEncodingException uee) {     throw new IllegalArgumentException("can't find " + utf8       + " encoding");    }   }     public LineRecordWriter(DataOutputStream out) {    this(out, "t");   }     private void writeObject(Object o) throws IOException {    if (o instanceof Text) {     Text to = (Text) o;     out.write(to.getBytes(), 0, to.getLength());    } else {     out.write(o.toString().getBytes(utf8));    }   }     public synchronized void write(K key, V value) throws IOException {    boolean nullKey = key == null || key instanceof NullWritable;    boolean nullValue = value == null || value instanceof NullWritable;    if (nullKey && nullValue) {     return;    }    if (!nullKey) {     writeObject(key);    }    if (!(nullKey || nullValue)) {     out.write(keyValueSeparator);    }    if (!nullValue) {     writeObject(value);    }    out.write(newline);   }     public synchronized void close(TaskAttemptContext context)     throws IOException {    out.close();   }    }


MultipleOutputFormat

package com.hadoop.multi;       import java.io.DataOutputStream;     import java.io.IOException;     import java.util.HashMap;     import java.util.Iterator;     import org.apache.hadoop.conf.Configuration;     import org.apache.hadoop.fs.FSDataOutputStream;     import org.apache.hadoop.fs.Path;     import org.apache.hadoop.io.Writable;     import org.apache.hadoop.io.WritableComparable;     import org.apache.hadoop.io.compress.CompressionCodec;     import org.apache.hadoop.io.compress.GzipCodec;     import org.apache.hadoop.mapreduce.OutputCommitter;     import org.apache.hadoop.mapreduce.RecordWriter;     import org.apache.hadoop.mapreduce.TaskAttemptContext;     import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter;     import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;     import org.apache.hadoop.util.ReflectionUtils;     public abstract class MultipleOutputFormat<K extends WritableComparable<?>, V extends Writable>          extends FileOutputFormat<K, V> {      private MultiRecordWriter writer = null;       public RecordWriter<K, V> getRecordWriter(TaskAttemptContext job) throws IOException,          InterruptedException {       if (writer == null) {           writer = new MultiRecordWriter(job, getTaskOutputPath(job));       }       return writer;      }      private Path getTaskOutputPath(TaskAttemptContext conf) throws IOException {             Path workPath = null;             OutputCommitter committer = super.getOutputCommitter(conf);             if (committer instanceof FileOutputCommitter) {                 workPath = ((FileOutputCommitter) committer).getWorkPath();             } else {                 Path outputPath = super.getOutputPath(conf);                 if (outputPath == null) {                     throw new IOException("Undefined job output-path");                 }                 workPath = outputPath;             }             return workPath;         }       protected abstract String generateFileNameForKeyValue(K key, V value, Configuration conf);        public class MultiRecordWriter extends RecordWriter<K, V> {                       private HashMap<String, RecordWriter<K, V>> recordWriters = null;             private TaskAttemptContext job = null;                          private Path workPath = null;             public MultiRecordWriter(TaskAttemptContext job, Path workPath) {                 super();                 this.job = job;                 this.workPath = workPath;                 recordWriters = new HashMap<String, RecordWriter<K, V>>();             }             @Override             public void close(TaskAttemptContext context) throws IOException, InterruptedException {                 Iterator<RecordWriter<K, V>> values = this.recordWriters.values().iterator();                 while (values.hasNext()) {                     values.next().close(context);                 }                 this.recordWriters.clear();             }             @Override             public void write(K key, V value) throws IOException, InterruptedException {                 //得到输出文件名                 String baseName = generateFileNameForKeyValue(key, value, job.getConfiguration());                 RecordWriter<K, V> rw = this.recordWriters.get(baseName);                 if (rw == null) {                     rw = getBaseRecordWriter(job, baseName);                     this.recordWriters.put(baseName, rw);                 }                 rw.write(key, value);             }                         private RecordWriter<K, V> getBaseRecordWriter(TaskAttemptContext job, String baseName)                     throws IOException, InterruptedException {                 Configuration conf = job.getConfiguration();                 boolean isCompressed = getCompressOutput(job);                 String keyValueSeparator = ",";                 RecordWriter<K, V> recordWriter = null;                 if (isCompressed) {                     Class<? extends CompressionCodec> codecClass = getOutputCompressorClass(job,                             GzipCodec.class);                     CompressionCodec codec = ReflectionUtils.newInstance(codecClass, conf);                     Path file = new Path(workPath, baseName + codec.getDefaultExtension());                     FSDataOutputStream fileOut = file.getFileSystem(conf).create(file, false);                     recordWriter = new LineRecordWriter<K, V>(new DataOutputStream(codec                             .createOutputStream(fileOut)), keyValueSeparator);                 } else {                     Path file = new Path(workPath, baseName);                     FSDataOutputStream fileOut = file.getFileSystem(conf).create(file, false);                     recordWriter = new LineRecordWriter<K, V>(fileOut, keyValueSeparator);                 }                 return recordWriter;             }         }        }


MultiFileOutPut

package com.hadoop.multi;    import java.io.IOException;  import java.util.StringTokenizer;    import org.apache.hadoop.conf.Configuration;  import org.apache.hadoop.fs.Path;  import org.apache.hadoop.io.IntWritable;  import org.apache.hadoop.io.Text;  import org.apache.hadoop.mapreduce.Job;  import org.apache.hadoop.mapreduce.Mapper;  import org.apache.hadoop.mapreduce.Reducer;  import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;  import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;  import org.apache.hadoop.util.GenericOptionsParser;  import com.hadoop.multi.MultipleOutputFormat;    public class MultiFileOutPut {      public static class TokenizerMapper          extends Mapper<Object, Text, Text, IntWritable>{            private final static IntWritable one = new IntWritable(1);      private Text word = new Text();              public void map(Object key, Text value, Context context                      ) throws IOException, InterruptedException {        StringTokenizer itr = new StringTokenizer(value.toString());        while (itr.hasMoreTokens()) {          word.set(itr.nextToken());          context.write(word, one);        }      }    }        public static class IntSumReducer          extends Reducer<Text,IntWritable,Text,IntWritable> {      private IntWritable result = new IntWritable();        public void reduce(Text key, Iterable<IntWritable> values,                          Context context                         ) throws IOException, InterruptedException {        int sum = 0;        for (IntWritable val : values) {          sum += val.get();        }        result.set(sum);        context.write(key, result);      }    }        public static class AlphabetOutputFormat extends MultipleOutputFormat<Text, IntWritable> {           @Override           protected String generateFileNameForKeyValue(Text key, IntWritable value, Configuration conf) {               char c = key.toString().toLowerCase().charAt(0);               if (c >= 'a' && c <= 'z') {                   return c + ".txt";               }               return "other.txt";           }       }        public static void main(String[] args) throws Exception {      Configuration conf = new Configuration();      String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();      if (otherArgs.length != 2) {        System.err.println("Usage: wordcount <in> <out>");        System.exit(2);      }      Job job = new Job(conf, "word count");      job.setJarByClass(MultiFileOutPut.class);      job.setMapperClass(TokenizerMapper.class);      job.setCombinerClass(IntSumReducer.class);      job.setReducerClass(IntSumReducer.class);      job.setOutputKeyClass(Text.class);      job.setOutputValueClass(IntWritable.class);      job.setOutputFormatClass(AlphabetOutputFormat.class);      FileInputFormat.addInputPath(job, new Path(otherArgs[0]));      FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));      System.exit(job.waitForCompletion(true) ? 0 : 1);    }  }
来自:http://blog.csdn.net/zyuc_wangxw/article/details/9304461