Sqoop Developer’s Guide v1.4.6 (Sqoop开发者指南,中文版)

jopen 8年前

1.介绍

如果你是一个开发者或者应用程序员,想要修改Sqoop或者使用Sqoop内部API构建一个扩展,你应该阅读本文档。以下章节描述了每个API的目的,哪里用到了内部API,实现其他数据库的支持需要哪些API。

2.支持的发行版

本文档适用于Sqoop v1.4.6。

3.Sqoop发行版

Apache Sqoop是Apache Software Foundation的一款开源软件产品。Sqoop的开发位于:http://sqoop.apache.org/。在这个网址上,你可以获取:

  • Sqoop的新发行版以及最新的源码
  • 问题跟踪器(issue tracker)
  • 包含Sqoop文档的wiki

4.前提

Sqoop需要以下前置知识:

  • Java开发
    • 熟悉JDBC
    • 熟悉Hadoop的API(包括新的0.20+的MapReduce API)
    </li>
  • 关系数据库管理系统和SQL
  • </ul>

    本文档假定你使用Linux或类Linux环境。如果你使用Windows,你可以使用cygwin完成大部分任务。如果你使用Mac OS X,你应该会看到很少(如果有的话)兼容性错误。Sqoop主要在Linux上操作和测试。

    5.编译Sqoop源码

    你可以使用以下命令获取Sqoop源码:git clone https://git-wip-us.apache.org/repos/asf/sqoop.git

    Sqoop源码存在一个git仓库里。从仓库检出源码的说明:TODO。编译说明在COMPILING.txt文件中。

    6. 开发者API reference

    本节介绍为集成Sqoop或者修改Sqoop的应用开发者提供的API

    下面三个小节按照以下用例编写:

    • 使用Sqoop及其公开库生成的类
    • 编写Sqoop扩展(即,新的ConnManager实现以支持与更多的数据库交互)
    • 修改Sqoop的内核

    每一小节都会比上一小节更深入的描述。

    6.1. 外部API

    Sqoop自动生成要导入HDFSHadoop Distributed File System)的表对应的类。被导入表的每一列对应类中的一个成员变量;该类的一个实例对应表的一行。生成的类实现Hadoop的序列化API,即WritableDBWritable接口。也包含下列其他便于使用的方法:

    • parse()方法,用于解释分隔文本字段
    • toString()方法,用于保留用户选择的分隔符

    自动生成类必须实现的方法集在抽象类com.cloudera.sqoop.lib.SqoopRecord中声明。

    SqoopRecord实例可能依赖于Sqooppublic API。也就是com.cloudera.sqoop.lib包里的所有类。下面会简要介绍。虽然Sqoop生成的类会依赖这些类,但是Sqoop客户端不应该直接与这些类交互。因此,这些API被认为是公开的,而且改进它们的时候需要小心。

    • 通过使用可控的分隔符和引号字符,RecordParser类会将一行文本解析成一个字段清单。
    • 静态类FieldFormatter提供了一个处理引号和转义字符的方法,这个方法会在SqoopRecord.toString()的实现里用到。
    • ResultSetPreparedStatement对象和SqoopRecords的编组数据由JdbcWritableBridge实现。
    • BigDecimalSerializer包含一对方法,用于实现BigDecimal对象的序列号。

    Public API的完整描述参见Sqoop开发wikiSIP-4

    6.2. 扩展API

    本节介绍用于Sqoop扩展的API和基础类,由此Sqoop可以兼容更多的数据库提供商。

    尽管Sqoop使用JDBCDataDrivenDBInputFormat读数据库,但是不同数据库厂商以及JDBC元数据支持的SQL使得我们必须为不同数据库编写不同的代码。Sqoop的解决方法是引入ConnManager APIcom.cloudera.sqoop.manager.ConnManager)。

    ConnManager是一个抽象类,定义了与数据库交互的所有方法。ConnManager的大多数实现会继承自com.cloudera.sqoop.manager.SqlManager抽象类(而非直接继承ConnManager),SqlManager类使用标准SQL实现多数操作。子类必须实现getConnection()方法,该方法返回与数据库的实际的JDBC连接。子类也可以覆盖任何其他方法。SqlManager类暴露了一个受保护的API,允许程序员选择性的覆盖。(译者注:应该类似于模板方法设计模式)例如,getColNamesQuery()方法的存在,使得可以在不重写getColNames()方法的大部分代码的情况下,修改getColNames()方法使用的SQL查询。

    ConnManager的实现从SqoopOptions类获取很多配置。SqoopOptions类是可变的。SqoopOptions不保存具体的配置。而是持有一个Configuration的引用,这个引用是由Tool.getConf()在GenericOptionsParser解析命令行参数后返回的。允许使用扩展参数“-D any.specific.param=any.vaule”,而且不需要SqoopOptions的选项解析或者修改。这个Configuration形成了传给任意工作流调用的MapReduce Job的Configuration的基础,因此用户可以在命令行中设置任何需要的定制Hadoop状态。

    所有已有的ConnManager实现都是无状态的。因此,实例化ConnManagers的系统在Sqoop的生命周期中可能实现同一个ConnManager的多个实例。目前认为实例化一个ConnManager是一个轻量级的操作,而且发生频率相对较低。因此,ConnManagers在各个操作之间并没有缓存。

    ConnManagers目前是由抽象类ManagerFactory(参见:http://issues.apache.org/jira/browse/MAPREDUCE-750)的实例来创建的。ManagerFactory的一个实现目前支持Sqoop的所有情况:com.cloudera.sqoop.manager.DefaultManagerFactory。扩展不应该修改DefaultManagerFactory。相反,一个ManagerFactory的扩展实现应该提供新的ConnManager。ManagerFactory只有一个方法,accept()。这个方法判断是否可以给用户的SqoopOptions实例化一个ConnManager。若可以,则返回ConnManager实例,否则,返回null。

    采用哪个ManagerFactory实现由sqoop-site.xml配置文件中的sqoop.connection.factories配置项指定的。扩展库用户可以安装包含一个新ManagerFactory和ConnManager(s)的第三方库,并配置sqoop-site.xml来使用这个新的ManagerFactory。DefaultManagerFactory主要是通过解析存储在SqoopOptions中的连接字符串来区分数据库。

    扩展提供者可能利用com.cloudera.sqoop.io,mapreduce和util包里面的类,来辅助实现。这些包和类在下面的章节中详细介绍。

    6.2.1. HBase序列化扩展

     Sqoop支持从数据库导入到Hbase。当向HBase复制数据时,必须将其转换成HBase支持的格式。特别是:

    • 数据必须放置于HBase的一个(或多个)表中。
    • 输入数据的列必须放置于一个列族(column family)中。
    • 值必须序列化为字节数组,以放入cells中。

    所有这些都是通过HBase客户端API的Put语句完成的。Sqoop与HBase的交互在com.cloudera.sqoop.hbase包里实现。记录从数据库中反序列化,并从mapper发出。OutputFormat负责把结果插入HBase。这是通过PutTransformer接口完成的。PutTransformer有一个getPutCommand()方法,输入一个Map<String,Object>代表数据集的字段。返回一个List<Put>描述如何把cells插入HBase。默认的PutTransformer实现是ToStringPutTransformer,它使用每个字段的基于字符串的表示,来将字段序列化进HBase。

    你可以通过实现PutTransformer接口来覆盖这个实现,并把它加入到classpath用于map任务(例如,使用-libjars选项)。要让Sqoop使用你的实现,需要设置sqoop.hbase.insert.put.transformer.class属性来用-D标识你的类。

    在你的PutTransformer实现中,row key column和column family可以通过getRowKeyColumn()和getColumnFamily()方法获取。你可以自行添加额外的Put操作;例如,注入额外的行表示一个二级索引。Sqoop会对--hbase-table指定的表执行所有Put操作。

    6.3. Sqoop内核

    本节描述Sqoop的内部架构。

    Sqoop是由com.cloudera.sqoop.Sqoop主类驱动。同一个包里还有另外几个类;SqoopOptions(前面介绍过了)和ConnFactory(操作ManagerFactory实例)。

    6.3.1. 一般程序流程

     一般程序流程如下:

    com.cloudera.sqoop.Sqoop是主类,并且实现了Tool。新实例由ToolRunner启动。Sqoop的第一个参数是标识要运行的SqoopTool的名字的字符串。这个SqoopTool来执行用于请求的操作(例如,import,export,codegen,等等)。

    SqoopTool API在SIP-1中完整描述。

    选中的SqoopTool会解析参数中剩余部分,把相应的字段设置到SqoopOptions类中。接下来执行其方法体。

    在SqoopTool的run()方法里,import、export或者其他操作会被执行。紧接着,基于SqoopOptions中的数据实例化一个ConnManager实例。ConnFactory用于从ManagerFactory获取ConnManager;前面的章节已经描述过这个机制。导入、导出和其他大量数据的移动任务通常是执行一个MapReduce job在表上进行并行、可靠的操作。导入(import)操作不必通过MapReduce job执行;ConnManager.importTable()方法用于判断怎样执行import最好。每一个主要的操作实际上都是由ConnManager控制的,除了代码生成,它是由CompilationManager和ClassWriter完成的。(二者都在com.cloudera.sqoop.orm包中。)导入Hive的操作,在importTable()完成后,由com.cloudera.sqoop.hive.HiveImport接管。与使用的ConnManager实现无关。

    ConnManager的importTable()方法接收一个ImportJobContext类型的参数,它包含了该方法的参数。这个类将来可能会扩展更多的参数,进一步管理import操作。类似的,exportTable()方法接收一个ExportJobContext类型的参数。这些类包含要导入或导出的表的名称,一个SqoopOptions对象的引用,和其他相关数据。

    6.3.2 子包

    com.cloudera.sqoop包下面包含以下子包:

    • hive - 用于导入数据到Hive。
    • io - java.io.*接口(即,OutputStream和Writer)的实现。
    • lib - 外部公开API(前面介绍过了)。
    • manager - ConnManager和ManagerFactory接口和它们的实现。
    • mapreduce - 与new MapReduce API(0.20+)的连接类。
    • orm - 代码自动生成。
    • tool - SqoopTool的实现类。
    • util - 工具类

    io包中包含OutputStream和BufferedWriter实现类,用于HDFS的直接写入器(direct writers)。SplittableBufferedWriter允许对一个client打开一个BufferedWriter,当达到了目标的阈值,这个client会连续的写入多个文件。这允许不可分割的压缩库(例如,gzip)与Sqoop import一起使用,同时仍然允许随后的MapReduce作业使用每个数据集的多个输入分片(splits)。大对象文件存储(参见SIP-3)系统的代码也位于io包里。

    mapreduce包中包含与Hadoop MapReduce直接交互的代码。这个包的内容在下一节中详细描述。

    orm包中包含类生成的代码。它依赖于JDK的tools.jar,该jar包里提供了com.sun.tools.javac包。

    util包中包含Sqoop使用的各种工具:

    • ClassLoaderStack管理一组当前线程使用的ClassLoader实例。主要用于,在local(standalone)模式下运行MapReduce时,把自动生成的代码加载到当前线程。
    • DirectImportUtils包含HDFS直接写入器(direct HDFS writer)的便捷方法。
    • Executor启动外部进程,并把它们连接到AsyncSink(请看下面的详细描述)生成的流处理器。
    • ExportException在导出失败时,ConnManagers抛出该异常。
    • ImportException在导入失败时,ConnManagers抛出该异常。
    • JdbcUrl处理连接字符串的解析,连接字符串是类URL(URL-like)的,但并不完全一致。(特别是,JDBC连接字符串可能有multi:part:scheme://组件。)
    • PerfCounters用于估算传输率,展示给用户。
    • ResultSetPrinter可以优化打印ResultSet。

    有几个地方,Sqoop从外部进程读取stdout。最直接的例子是LocalMySQLManager和DirectPostgresqlManager实现的direct-mode导入。在Runtime.exec()产生一个进程后,必须处理进程的stdout(Process.getInputStream())和可能的sdterr(Process.getErrorStream())。若从这两个流中读取足够的数据失败,外部进程写入更多数据前,会导致外部进程阻塞。因此,二者必须都被处理,而且最好是异步的。

    在Sqoop用语中,“async sink”是一个线程,持有一个InputStream并读取直至结束。由AsyncSink的实现类来完成。com.cloudera.sqoop.util.AsyncSink抽象类定义了这个工厂必须实现的操作。processStream()会创建另一个线程,立即开始处理从InputStream参数读取的数据;必须读取这个流直至结束。join()方法允许外部线程等待,知道这个处理完成。

    默认提供了一些AsyncSink的实现:LoggingAsyncSink处理诸如log4j INFO等的InputStream。NullAsyncSink消耗所有的输入,但不做其他处理。

    利用外部进程的各种ConnManagers以内部类的形式实现了各自的AsyncSink实现类,这些实现类从数据库工具读取数据,并把数据发送到HDFS,可能还同时进行了格式转换。

    6.3.3 与MapReduce的接口

    Sqoop调度MapReduce 作业(jobs)进行导入和导出。MapReduce作业的配置和执行按照一些通用步骤进行(配置InputFormat;配置OutputFormat;设置Mapper实现;等等...)。这些步骤在com.cloudera.sqoop.mapreduce.JobBase类中体现。JobBase允许用户指定要使用哪些InputFormat,OutputFormat和Mapper。

    JobBase本身是ImportJobBase和ExportJobBase的子类,为import或export相关作业通用的配置步骤分别提供了更好的支持。ImportJobBase.runImport()会调用配置步骤,并运行一个作业来把一个表导入HDFS。

    这些基类也存在一些子类。例如,DataDrivenImportJob使用DataDrivenDBImportJob来执行导入。这是各种可用ConnManager实现最常用到的导入类型。MySQL使用不同的类(MySQLDumpImportJob)来运行direct-mode导入。它的定制Mapper和InputFormat实现也位于这个包里。


    英文原文:http://sqoop.apache.org/docs/1.4.6/SqoopDevGuide.html

    相关文章:Sqoop中文用户手册,http://www.zihou.me/html/2014/01/28/9114.html

    来自: http://blog.csdn.net//kingzone_2008/article/details/50298143