• 1. Map/Reduce Job编写
  • 2. 1.继承HadoopJob类 public class MySecondJob extends HadoopJob { public static final Log LOG = LogFactory.getLog(MySecondJob.class); @Override protected void internalExecute() throws Exception { } }
  • 3. 2.创建构造器何为构造器? 构造器是一种方法,构造器是为了创建一个类的实例。这个过程也可以在创建一个对象的时候用到: Platypus p1 = new Platypus(); 构造器没有返回值,构造器使用和类相同的名字。
  • 4. public MySecondJob(){ this(ApplicationConfiguration.create(NutchConfiguration.create())); } public MySecondJob(Configuration conf) { this.setConf(conf); }
  • 5. 3.覆写InternalExecute()方法作用:配置如何调度Job @Override protected void internalExecute() throws Exception { Configuration conf = getConf(); FileSystem.get(conf).delete(new Path("/Training/testbed/zj/out"), true); Job job = new Job(conf, “Jey‘s Job”); //创建Job 并命名
  • 6. //配置Job的MapTask和ReduceTask : 调用JobUtil类的setupMapTask()和setupReduceTask()方法 //样式如下 // JobUtil.setupMapTask(job, mapperCls, outputKeyCls, outputValueCls, inputFormatCls, commaSeparatedPathsIn); // JobUtil.setupReduceTask(job, reducerCls, outputKeyCls, outputValueCls, outputFormatCls, pathOut); JobUtil.setupMapTask(job, MyMapper.class, LongWritable.class,NStudent.class, SequenceFileInputFormat.class,"/Training/testbed/zj/in/Student.txt"); JobUtil.setupReduceTask(job, MyReducer.class, LongWritable.class,NStudent.class, SequenceFileOutputFormat.class,"/Training/testbed/zj/out"); job.waitForCompletion(true); //提交Job到分布式系统
  • 7. 4.编写Mapper,Reducer类(1) 首先也要继承Mapper类,同时覆盖map()方法 public static class MyMapper extends Mapper { public void map(LongWritable key, Writable value, Context context) throws IOException, InterruptedException { if (value instanceof NStudent) { context.write(key, (NStudent ) value); } 注:红色区域需根据实际需求编写自己处理数据的map方法,该过程将转化为< K', V‘>
  • 8. 在此期间,系统会自动将< K‘, V‘> 整合为 (2)首先也要继承Reducer类,同时覆盖reduce方法public static class MyReducer extends Reducer { public void reduce(LongWritable key, Iterable values, Context context) throws IOException, InterruptedException { for (NStudent value : values) { context.write(key, value); }} 注:红色区域需根据实际需求编写自己处理数据的reduce方法,该过程将 转化为< K‘’, V‘’>
  • 9. 5.程序的运行部署在main函数中执行 public static void main(String[] args) throws Exception { deployProject();//发布项目 new MyJob(conf).execute();//执行项目 }