hadoop与mysql数据库的那点事

htwoz 8年前

来自: http://www.cnblogs.com/JimLy-BUG/p/5177952.html

转眼间已经接触了hadoop两周了,从之前的极力排斥到如今的有点喜欢,刚开始被搭建hadoop开发环境搞得几乎要放弃,如今学会了编写小程序,每天都在成长一点挺好的,好好努力,为自己的装备库再填一件武器挺好的,学习在于坚持不懈,加油!!!

马上就要过年了,在最后一天的上班时间内完成了hadoop如何去连接mysql数据库,自己感到很满足,下面就把自己编写的源码贡献给大家,希望能够帮到你们,如存在优化的地方还请大牛们指出,也希望有hadoop的大牛能够给点学习建议,一个来个HA初学者的心声。第一次发布竟然被退回,也不知道为什么,瞬间心情都不好了,但我还是坚持写自己的博客...

StudentRecord类:

package com.simope.mr.db;    import java.io.DataInput;  import java.io.DataOutput;  import java.io.IOException;  import java.sql.PreparedStatement;  import java.sql.ResultSet;  import java.sql.SQLException;    import org.apache.hadoop.io.Text;  import org.apache.hadoop.io.Writable;  import org.apache.hadoop.mapred.lib.db.DBWritable;    public class StudentRecord implements Writable, DBWritable{            int id;            String name;            int age;            int departmentID;        @Override      public void readFields(DataInput in) throws IOException {          this.id = in.readInt();          this.name = Text.readString(in);          this.age = in.readInt();          this.departmentID = in.readInt();      }        @Override      public void write(DataOutput out) throws IOException {          out.write(this.id);          Text.writeString(out, this.name);          out.write(this.age);          out.write(this.departmentID);      }                  public void readFields(ResultSet rs) throws SQLException {          this.id = rs.getInt(1);          this.name = rs.getString(2);          this.age = rs.getInt(3);          this.departmentID = rs.getInt(4);      }            public void write(PreparedStatement ps) throws SQLException {          ps.setInt(1, this.id);          ps.setString(2, this.name);          ps.setInt(3, this.age);          ps.setInt(4, this.departmentID);                }            @Override      public String toString() {          return new String(this.name + "\t" + this.age + "\t" + this.departmentID);      }  }

TeacherRecord类:

package com.simope.mr.db;    import java.io.DataInput;  import java.io.DataOutput;  import java.io.IOException;  import java.sql.PreparedStatement;  import java.sql.ResultSet;  import java.sql.SQLException;    import org.apache.hadoop.io.Text;  import org.apache.hadoop.io.Writable;  import org.apache.hadoop.mapred.lib.db.DBWritable;    public class TeacherRecord implements Writable, DBWritable{            int id;            String name;            int age;            int departmentID;        @Override      public void readFields(DataInput in) throws IOException {          this.id = in.readInt();          this.name = Text.readString(in);          this.age = in.readInt();          this.departmentID = in.readInt();      }        @Override      public void write(DataOutput out) throws IOException {          out.write(this.id);          Text.writeString(out, this.name);          out.write(this.age);          out.write(this.departmentID);      }                  public void readFields(ResultSet rs) throws SQLException {          this.id = rs.getInt(1);          this.name = rs.getString(2);          this.age = rs.getInt(3);          this.departmentID = rs.getInt(4);      }            public void write(PreparedStatement ps) throws SQLException {          ps.setInt(1, this.id);          ps.setString(2, this.name);          ps.setInt(3, this.age);          ps.setInt(4, this.departmentID);                }            @Override      public String toString() {          return new String(this.name + "\t" + this.age + "\t" + this.departmentID);      }  }

DBMapper类:

package com.simope.mr.db;    import java.io.IOException;    import org.apache.hadoop.io.LongWritable;  import org.apache.hadoop.io.Text;  import org.apache.hadoop.mapred.MapReduceBase;  import org.apache.hadoop.mapred.Mapper;  import org.apache.hadoop.mapred.OutputCollector;  import org.apache.hadoop.mapred.Reporter;    public class DBMapper extends MapReduceBase implements          Mapper<LongWritable, TeacherRecord, LongWritable, Text> {        public void map(LongWritable key, TeacherRecord value,              OutputCollector<LongWritable, Text> collector, Reporter reporter)              throws IOException {            collector.collect(new LongWritable(value.id),                  new Text(value.toString()));        }  }

DBReducer类:

package com.simope.mr.db;    import java.io.IOException;  import java.util.Iterator;    import org.apache.hadoop.io.LongWritable;  import org.apache.hadoop.io.Text;  import org.apache.hadoop.mapred.MapReduceBase;  import org.apache.hadoop.mapred.OutputCollector;  import org.apache.hadoop.mapred.Reducer;  import org.apache.hadoop.mapred.Reporter;    public class DBReducer extends MapReduceBase implements Reducer<LongWritable, Text, StudentRecord, Text>{        @Override      public void reduce(LongWritable key, Iterator<Text> values,              OutputCollector<StudentRecord, Text> output, Reporter reporter)              throws IOException {           String[] InfoArr = values.next().toString().split("\t");             StudentRecord s = new StudentRecord();    //         t.id = Integer.parseInt(InfoArr[0]);  //id是自增长           s.name = InfoArr[0];             s.age = Integer.parseInt(InfoArr[1]);             s.departmentID = Integer.parseInt(InfoArr[2]);             output.collect(s, new Text(s.name));        }    }

DBJob类: (读取数据库表内容,并将数据写入hdfs文件中) 数据库表- hdfs文件

package com.simope.mr.db;    import java.io.IOException;    import org.apache.hadoop.fs.Path;  import org.apache.hadoop.io.LongWritable;  import org.apache.hadoop.io.Text;  import org.apache.hadoop.mapred.FileOutputFormat;  import org.apache.hadoop.mapred.JobClient;  import org.apache.hadoop.mapred.JobConf;  import org.apache.hadoop.mapred.lib.IdentityReducer;  import org.apache.hadoop.mapred.lib.db.DBConfiguration;  import org.apache.hadoop.mapred.lib.db.DBInputFormat;      /**   * @deprecated 读取数据库录入文件   * @author JimLy   * @see 20160202   * */  public class DBJob {        public static void main(String[] args) throws IOException{                    JobConf jobConf = new JobConf(DBJob.class);            jobConf.setOutputKeyClass(LongWritable.class);          jobConf.setOutputValueClass(Text.class);          jobConf.setInputFormat(DBInputFormat.class);                    FileOutputFormat.setOutputPath(jobConf, new Path("/usr/output/db"));                    DBConfiguration.configureDB(jobConf, "com.mysql.jdbc.Driver", "jdbc:mysql://10.10.1.1:3306/my_hd", "root", "root");                    String[] fields = {"id", "name", "age", "departmentID"};                    //从my_hd数据库的teacher表查询数据          DBInputFormat.setInput(jobConf, TeacherRecord.class, "teacher", null, "id", fields);                    jobConf.setMapperClass(DBMapper.class);          jobConf.setReducerClass(IdentityReducer.class);                    JobClient.runJob(jobConf);      }        }

DB2Job类: (读取数据库表内容,并将数据写入hdfs文件中) 数据库表- hdfs文件

package com.simope.mr.db;    import java.io.IOException;    import org.apache.hadoop.fs.Path;  import org.apache.hadoop.io.LongWritable;  import org.apache.hadoop.io.Text;  import org.apache.hadoop.mapred.FileOutputFormat;  import org.apache.hadoop.mapred.JobClient;  import org.apache.hadoop.mapred.JobConf;  import org.apache.hadoop.mapred.lib.IdentityReducer;  import org.apache.hadoop.mapred.lib.db.DBConfiguration;  import org.apache.hadoop.mapred.lib.db.DBInputFormat;      /**   * @deprecated 读取数据库录入文件   * @author JimLy   * @see 20160202   * */  public class DB2Job {        public static void main(String[] args) throws IOException{                    JobConf jobConf = new JobConf(DB2Job.class);            jobConf.setOutputKeyClass(LongWritable.class);          jobConf.setOutputValueClass(Text.class);          jobConf.setInputFormat(DBInputFormat.class);                    FileOutputFormat.setOutputPath(jobConf, new Path("/usr/output/db"));                    DBConfiguration.configureDB(jobConf, "com.mysql.jdbc.Driver", "jdbc:mysql://10.10.1.1:3306/my_hd", "root", "root");            //        String[] fields = {"id", "name", "age", "departmentID"};                    String inputQuery = "SELECT * FROM teacher where id != 4";          String inputCountQuery = "SELECT COUNT(1) FROM teacher where id != 4";                    //从my_hd数据库的teacher表查询数据          DBInputFormat.setInput(jobConf, TeacherRecord.class, inputQuery, inputCountQuery);                    jobConf.setMapperClass(DBMapper.class);          jobConf.setReducerClass(IdentityReducer.class);                    JobClient.runJob(jobConf);      }        }

DB3Job类: (读取hdfs文件中的内容,并将数据写入指定的数据库表中) =>hdfs文件- 数据库表

package com.simope.mr.db;    import java.io.IOException;    import org.apache.hadoop.fs.Path;  import org.apache.hadoop.mapred.FileInputFormat;  import org.apache.hadoop.mapred.JobClient;  import org.apache.hadoop.mapred.JobConf;  import org.apache.hadoop.mapred.TextInputFormat;  import org.apache.hadoop.mapred.lib.IdentityMapper;  import org.apache.hadoop.mapred.lib.db.DBConfiguration;  import org.apache.hadoop.mapred.lib.db.DBOutputFormat;      /**   * @deprecated 读取文件录入数据库   * @author JimLy   * @see 20160202   * */  public class DB3Job {        public static void main(String[] args) throws IOException{                    JobConf jobConf = new JobConf(DB3Job.class);            jobConf.setInputFormat(TextInputFormat.class);          jobConf.setOutputFormat(DBOutputFormat.class);                    FileInputFormat.addInputPath(jobConf, new Path("/usr/input/db"));                    DBConfiguration.configureDB(jobConf, "com.mysql.jdbc.Driver", "jdbc:mysql://10.10.1.1:3306/my_hd", "root", "root");                    String[] fields = {"id", "name", "age", "departmentID"};                    DBOutputFormat.setOutput(jobConf, "teacher", fields);                    jobConf.setMapperClass(IdentityMapper.class);          jobConf.setReducerClass(DBReducer.class);                    JobClient.runJob(jobConf);      }        }

DB4Job类: (读取指定的数据库表信息,并将数据写入其他指定表中)=>数据库表-表

package com.simope.mr.db;    import java.io.IOException;    import org.apache.hadoop.io.LongWritable;  import org.apache.hadoop.io.Text;  import org.apache.hadoop.mapred.JobClient;  import org.apache.hadoop.mapred.JobConf;  import org.apache.hadoop.mapred.lib.db.DBConfiguration;  import org.apache.hadoop.mapred.lib.db.DBInputFormat;  import org.apache.hadoop.mapred.lib.db.DBOutputFormat;      /**   * @deprecated 读取数据库表录入其他表   * @author JimLy   * @see 20160202   * */  public class DB4Job {        public static void main(String[] args) throws IOException{                    JobConf jobConf = new JobConf(DB4Job.class);            jobConf.setOutputKeyClass(LongWritable.class);          jobConf.setOutputValueClass(Text.class);          jobConf.setInputFormat(DBInputFormat.class);          jobConf.setOutputFormat(DBOutputFormat.class);                    DBConfiguration.configureDB(jobConf, "com.mysql.jdbc.Driver", "jdbc:mysql://10.10.1.1:3306/my_hd", "root", "root");                    String inputQuery = "SELECT * FROM teacher";          String inputCountQuery = "SELECT COUNT(1) FROM teacher";                    //从my_hd数据库的teacher表查询数据          DBInputFormat.setInput(jobConf, TeacherRecord.class, inputQuery, inputCountQuery);                    String[] fields = {"id", "name", "age", "departmentID"};                    DBOutputFormat.setOutput(jobConf, "student", fields);                    jobConf.setMapperClass(DBMapper.class);          jobConf.setReducerClass(DBReducer.class);                    JobClient.runJob(jobConf);      }        }

如果你觉得写的不错的,请点个推荐,你的推荐是我继续坚持写博客的动力。。。

如需转载的请注明出处http://www.cnblogs.com/JimLy-BUG/