Hadoop中DBInputFormat和DBOutputFormat使用

一、背景

     为了方便MapReduce直接访问关系型数据库(Mysql,Oracle),Hadoop提供了DBInputFormat和DBOutputFormat两个类。通过

DBInputFormat类把数据库表数据读入到HDFS,根据DBOutputFormat类把MapReduce产生的结果集导入到数据库表中。

二、技术细节

1、DBInputFormat(Mysql为例),先创建表:

CREATE TABLE studentinfo (
  id INTEGER NOT NULL PRIMARY KEY,
  name VARCHAR(32) NOT NULL);
2、由于0.20版本对DBInputFormat和DBOutputFormat支持不是很好,该例用了0.19版本来说明这两个类的用法。
3、DBInputFormat用法如下:
public class DBInput {
   // DROP TABLE IF EXISTS `hadoop`.`studentinfo`;
   // CREATE TABLE studentinfo (
   // id INTEGER NOT NULL PRIMARY KEY,
   // name VARCHAR(32) NOT NULL);

   public static class StudentinfoRecord implements Writable, DBWritable {
     int id;
     String name;
     public StudentinfoRecord() {

     }
     public void readFields(DataInput in) throws IOException {
        this.id = in.readInt();
        this.name = Text.readString(in);
     }
     public void write(DataOutput out) throws IOException {
        out.writeInt(this.id);
        Text.writeString(out, this.name);
     }
     public void readFields(ResultSet result) throws SQLException {
        this.id = result.getInt(1);
        this.name = result.getString(2);
     }
     public void write(PreparedStatement stmt) throws SQLException {
        stmt.setInt(1, this.id);
        stmt.setString(2, this.name);
     }
     public String toString() {
        return new String(this.id + " " + this.name);
     }
   }
   public class DBInputMapper extends MapReduceBase implements
        Mapper<LongWritable, StudentinfoRecord, LongWritable, Text> {
     public void map(LongWritable key, StudentinfoRecord value,
          OutputCollector<LongWritable, Text> collector, Reporter reporter)
          throws IOException {
        collector.collect(new LongWritable(value.id), new Text(value
             .toString()));
     }
   }
   public static void main(String[] args) throws IOException {
     JobConf conf = new JobConf(DBInput.class);
     DistributedCache.addFileToClassPath(new Path(
          "/lib/mysql-connector-java-5.1.0-bin.jar"), conf);
     
     conf.setMapperClass(DBInputMapper.class);
     conf.setReducerClass(IdentityReducer.class);

     conf.setMapOutputKeyClass(LongWritable.class);
     conf.setMapOutputValueClass(Text.class);
     conf.setOutputKeyClass(LongWritable.class);
     conf.setOutputValueClass(Text.class);
     
     conf.setInputFormat(DBInputFormat.class);
     FileOutputFormat.setOutputPath(conf, new Path("/hua01"));
     DBConfiguration.configureDB(conf, "com.mysql.jdbc.Driver",
          "jdbc:mysql://192.168.3.244:3306/hadoop", "hua", "hadoop");
     String[] fields = { "id", "name" };
     DBInputFormat.setInput(conf, StudentinfoRecord.class, "studentinfo",
 null, "id", fields);

     JobClient.runJob(conf);
   }
}


a)StudnetinfoRecord类的变量为表字段,实现Writable和DBWritable两个接口。

实现Writable的方法:

 public void readFields(DataInput in) throws IOException {
        this.id = in.readInt();
        this.name = Text.readString(in);
     }
     public void write(DataOutput out) throws IOException {
        out.writeInt(this.id);
        Text.writeString(out, this.name);
     }

实现DBWritable的方法:

public void readFields(ResultSet result) throws SQLException {
        this.id = result.getInt(1);
        this.name = result.getString(2);
     }
     public void write(PreparedStatement stmt) throws SQLException {
        stmt.setInt(1, this.id);
        stmt.setString(2, this.name);
     }

b)读入Mapper的value类型是StudnetinfoRecord。

c)配置如何连入数据库,读出表studentinfo数据。

DBConfiguration.configureDB(conf, "com.mysql.jdbc.Driver",
          "jdbc:mysql://192.168.3.244:3306/hadoop", "hua", "hadoop");
     String[] fields = { "id", "name" };
     DBInputFormat.setInput(conf, StudentinfoRecord.class, "studentinfo",  null, "id", fields);


4、DBOutputFormat用法如下:
public class DBOutput {

   public static class StudentinfoRecord implements Writable,  DBWritable {
     int id;
     String name;
     public StudentinfoRecord() {

     }
     public void readFields(DataInput in) throws IOException {
        this.id = in.readInt();
        this.name = Text.readString(in);
     }
     public void write(DataOutput out) throws IOException {
        out.writeInt(this.id);
        Text.writeString(out, this.name);
     }
     public void readFields(ResultSet result) throws SQLException {
        this.id = result.getInt(1);
        this.name = result.getString(2);
     }
     public void write(PreparedStatement stmt) throws SQLException {
        stmt.setInt(1, this.id);
        stmt.setString(2, this.name);
     }
     public String toString() {
        return new String(this.id + " " + this.name);
     }
   }
   
   public static class MyReducer extends MapReduceBase implements
        Reducer<LongWritable, Text, StudentinfoRecord, Text> {
     public void reduce(LongWritable key, Iterator<Text> values,
          OutputCollector<StudentinfoRecord, Text> output, Reporter  reporter)
          throws IOException {
        String[] splits = values.next().toString().split("/t");
        StudentinfoRecord r = new StudentinfoRecord();
        r.id = Integer.parseInt(splits[0]);
        r.name = splits[1];
        output.collect(r, new Text(r.name));
     }
   }

   public static void main(String[] args) throws IOException {
     JobConf conf = new JobConf(DBOutput.class);
     conf.setInputFormat(TextInputFormat.class);
     conf.setOutputFormat(DBOutputFormat.class);

     FileInputFormat.setInputPaths(conf, new Path("/hua/hua.bcp"));
     DBConfiguration.configureDB(conf, "com.mysql.jdbc.Driver",
          "jdbc:mysql://192.168.3.244:3306/hadoop", "hua", "hadoop");
     DBOutputFormat.setOutput(conf, "studentinfo", "id", "name");

  conf.setMapperClass(org.apache.hadoop.mapred.lib.IdentityMapper.class);
     conf.setReducerClass(MyReducer.class);

     JobClient.runJob(conf);
   }

}

a)StudnetinfoRecord类的变量为表字段,实现Writable和DBWritable两个接口,同.DBInputFormat的StudnetinfoRecord类。

b)输出Reducer的key/value类型是StudnetinfoRecord。

c)配置如何连入数据库,输出结果到表studentinfo。

DBConfiguration.configureDB(conf, "com.mysql.jdbc.Driver",
          "jdbc:mysql://192.168.3.244:3306/hadoop", "hua", "hadoop");
     DBOutputFormat.setOutput(conf, "studentinfo", "id", "name");

三、总结

      运行MapReduce时候报错:java.io.IOException: com.mysql.jdbc.Driver,一般是由于程序找不到mysql驱动包。解决方法是让每个

tasktracker运行MapReduce程序时都可以找到该驱动包。

添加包有两种方式:

1.在每个节点下的${HADOOP_HOME}/lib下添加该包。重启集群,一般是比较原始的方法。

2.a)把包传到集群上: hadoop fs -put mysql-connector-java-5.1.0- bin.jar /lib

b)在mr程序提交job前,添加语句:istributedCache.addFileToClassPath(new Path("/lib/mysql- connector-java- 5.1.0-bin.jar"), conf);

3、虽然API用的是0.19的,但是使用0.20的API一样可用,只是会提示方法已过时而已。

  • 1
    点赞
  • 12
    收藏
    觉得还不错? 一键收藏
  • 2
    评论
Hadoop是一个分布式计算框架,它可以将大规模数据分散存储在集群的多个节点上,并通过MapReduce算法进行分布式计算和处理。 以下是搭建和使用Hadoop集群的步骤: 1. 下载和安装Hadoop。你可以从官方网站(http://hadoop.apache.org/)下载最新的Hadoop版本,并按照官方文档进行安装。 2. 配置Hadoop集群。Hadoop需要在集群的每个节点上运行。在每个节点上,你需要编辑Hadoop配置文件(core-site.xml、hdfs-site.xml、mapred-site.xml、yarn-site.xml),并将它们放在Hadoop的conf目录下。这些配置文件包括Hadoop集群的节点列表、存储路径、MapReduce任务的配置参数等等。 3. 启动Hadoop集群。在集群的每个节点上,你需要启动Hadoop的各个服务,如NameNode、DataNode、ResourceManager、NodeManager等。你可以使用start-all.sh脚本来启动所有服务,也可以使用单独的命令来启动每个服务。 4. 测试Hadoop集群。你可以使用hadoop fs命令来测试Hadoop集群。例如,你可以使用hadoop fs -ls /命令列出Hadoop集群根目录下的文件和目录。 5. 编写和运行MapReduce任务。你可以使用Hadoop提供的API或者编写MapReduce程序来执行分布式计算任务。你需要将MapReduce程序打包成jar文件,并使用hadoop jar命令来提交任务到Hadoop集群运行。 总之,Hadoop集群的搭建和使用需要一定的技术和经验,但是它可以为大规模数据处理提供高效、可靠、可扩展的解决方案。

“相关推荐”对你有帮助么?

  • 非常没帮助
  • 没帮助
  • 一般
  • 有帮助
  • 非常有帮助
提交
评论 2
添加红包

请填写红包祝福语或标题

红包个数最小为10个

红包金额最低5元

当前余额3.43前往充值 >
需支付:10.00
成就一亿技术人!
领取后你会自动成为博主和红包主的粉丝 规则
hope_wisdom
发出的红包
实付
使用余额支付
点击重新获取
扫码支付
钱包余额 0

抵扣说明:

1.余额是钱包充值的虚拟货币,按照1:1的比例进行支付金额的抵扣。
2.余额无法直接购买下载,可以购买VIP、付费专栏及课程。

余额充值