DataX 插件开发指南


DataX 插件开发指南 版本号 修改内容 修改日期 修改人 V0.1 创建 2011-09-08 迟南 V0.2 增添插件编译信息 2012-02-20 迟南 目录 一、 概述 ....................................................................................................................... 4 二、 Reader 插件开发(以 httpreader 为例) ............................................................. 5 1、 确定插件所需配置的参数..................................................................................... 5 2、 构建相应包和类结构 ............................................................................................ 5 3、 实现重载方法 ........................................................................................................ 7 4、 自定义 split 方法 ................................................................................................... 9 三、 Writer 插件开发(以 streamwriter 为例) ......................................................... 12 1、 确定插件参数、构建相应包和类结构 ............................................................... 12 2、 实现重载方法 ...................................................................................................... 13 四、 插件运行配置(以 httpreader 为例) ................................................................ 15 1、 注册插件 ............................................................................................................. 15 2、 修改 build.xml 文件,并执行 ant 命令将本插件打成 jar 包 ............................. 15 3、 新建插件目录,测试运行................................................................................... 15 4、 安装发布插件 ...................................................................................................... 15 一、 概述 DataX 是一个在不同类型的数据库(文件系统)之间交换数据的工具,采用“框 架+插件”的结构,框架相当于一个数据中转平台,而插件则为访问不同类型的 数据库(文件系统)提供实现。 DataX 插件分为 Reader 和 Writer 两类。Reader 负责从数据源端读取数据到 Storage(交换空间),Writer 负责将 Storage 中的数据写入到数据目的端。Storage 可以适配不同种类的 Reader 和 Writer,从而实现数据同步。目前 DataX 版本已 经提供的 Reader 插件如下: 1、 hdfsreader : 支持从 hdfs 文件系统获取数据。 2、 mysqlreader: 支持从 mysql 数据库获取数据。 3、 sqlserverreader: 支持从 sqlserver 数据库获取数据。 4、 oraclereader : 支持从 oracle 数据库获取数据。 5、 streamreader: 支持从 stream 流获取数据(常用于测试) 6、 httpreader : 支持从 http URL 获取数据。 提供的 Writer 插件如下: 1、 hdfswriter :支持向 hdbf 写入数据。 2、 mysqlwriter :支持向 mysql 写入数据。 3、 sqlserverwriter:支持向 sqlserver 写入数据。 4、 oraclewriter :支持向 oracle 写入数据。 5、 streamwriter :支持向 stream 流写入数据。(常用于测试) 用户可以根据需要开发自己的 Reader & Writer 插件。现在以 HttpReader 和 StreamWriter 插件为例,使用 eclipse 分别说明 Reader 和 Writer 插件开发过程。 二、 Reader 插件开发(以 httpreader 为例) 1、 确定插件所需配置的参数 DataX 插件运行过程需要用户指定运行参数信息,例如 HttpReader 需要告之从具体的 URL 地址获取输入数据流,因此需要 HttpReader 用户 指定 URL 信息。插件所需的参数在插件所在目录下的 ParamKey.java 文件 指定。该文件将被 DataX 用在作业配置过程中解析生成作业配置 xml。 HttpReader 插件参数示例如下,需要注意注释尽量参照源码规范,DataX 运行时,会根据此处声明的参数和注释生成对应的模板 Job_xml.此处参数 设置非常重要,如图: 图 1 2、 构建相应包和类结构 在源码文件的 plugins.reader 包下构建 httpreader 包,再在 httpreader 包下创建类 HttpReader,并让之继承 common.plugin.Reader. 图 2 图 3 3、 实现重载方法 获得图 3 所示效果:现在开始分别实现 init( ), connectToDb( ), startRead(LineSender sender), finish( ) 四个方法。 (1) Init( ):通过从 Reader 间接继承自 DefaultPlugin 的 PluginParam 类 型的参数 param 获取配置 httpreader 插件的参数(此处可以对参数进 行检查和格式处理等操作),如图: 图 4 (2) connectToDb( ): 本插件不需要此操作,函数为空实现。(在数据 库相关插件中,主要操作是通过 DbSource.getConnection(keyId)获取 connection),如在 mysqlreader 中,该函数为: 图 5 (3) startRead(LineSender sender):根据 init( )初始化的参数,连接相 应的 http URL, 获取其中数据并用 BufferedReader 封装,循环处理每 行数据,调用 sender.createLine() 产生 line,并通过 line.addField(fieldStr)把每行数据切分成字段后组装成 Line 中的 field, 调用 sender.sendToWriter(line)将此行数据写入 Storage。如图: 图 6 (4) finish():本插件不需要此操作,函数实现为空。(在数据库相关 插件中,主要完成关闭 connection 操作。)如在 mysqlreader 中,该 函数为: 图 7 4、 自定义 split 方法 截止到目前,已经实现了一个简单的 httpreader(从一个 http URL 读 取数据),目前还未实现读取多个 http URL 的功能,为此,可以扩展你的 参数配置,在本插件对应的 ParamsKey.java 中,添加如下两项: 图 8 然后让 httpreader 插件覆盖默认的 split(PluginParam param)方法。操作如下: (1) split(PluginParam param)方法对 param 按照规则切分,生成 List,DataX 框架会根据此 List 产生插件实例执行任务。 图 9 (2) 为了清晰结构或者切分规则复杂,建议再构建类 HttpURLSplitter 辅助 split 方法实现切分,该类需要继承 Splitter: 图 10 图 11 实现 init( )方法和 split( )方法即可: 图 12 三、 Writer 插件开发(以 streamwriter 为例) 1、 确定插件参数、构建相应包和类结构 与 Reader 插件类似,略过,截图如下: 图 1 streamwriter 插件的配置项信息 图 2 图 3 2、 实现重载方法 (1) init( ) 、connectToDb()、finish( )三个方法与 Reader 中意义相似, 此处略过。 (2) commint( ) :本插件不需要此操作,函数实现为空。(在数据库 相关插件中,主要完成提交任务操作。) (3) startWrite(LineReceiver linereceiver) :根据 init( )初始化的参数, 按照 streamwriter 插件配置信息,把 linereceiver 从 Storage 获取的数 据通过 xxx 方法写入 line 中,循环处理每行数据(此处处理方式为直 接输出),即可。如图: 图 4 四、 插件运行配置(以 httpreader 为例) 1、 注册插件 在 conf/plugins.xml 中,添加如下配置项,以注册插件。DataX 框架 在运行时将从该配置文件中获取插件 jar 的名称等配置信息: 图 1 2、 修改 build.xml 文件,并执行 ant 命令将本插件打成 jar 包 图 2 3、 新建插件目录,测试运行 (1). 在 DataX 安装目录../datax/plugins/reader/下新建插件目录 httpreader, (2). 把 jar 文件、ParamKey.java 和 plugins-common-x.x.x.jar 一起拷贝到该 目录(若有其他相关 so 文件、jar 包等也要拷贝到此目录),如下图: 图 3 (3). 通过 DataX 配置生成一个新的配置作业信息 xml 文件(配置生成作业 信息请参考《DataX 命令行使用手册文档》),如生成的配置文件作业在 /home/taobao/datax/jobs/httpreader_streamwriter.xml (4). 输入 datax.py /home/taobao/datax/jobs/httpreader_streamwriter.xml 运行该 DataX 作业。 注:对于 writer 插件,新建插件目录位置为:../datax/plugins/writer/下。 4、 安装发布插件 参考 DataX 源码 rpm 目录下的 spec 文件和 sh 文 件 , 完 成 t_dp_datax_httpreader.spec 文件。 使用命令:rpmbuild --ba t_dp_datax_httpreader.spec 进行打包,打包完成后, 使用命令:rpm -ivh t_dp_datax_httpreader-x.x.x-x.noarch.rpm 完成安装发布。
还剩15页未读

继续阅读

下载pdf到电脑,查找使用更方便

pdf的实际排版效果,会与网站的显示效果略有不同!!

需要 6 金币 [ 分享pdf获得金币 ] 1 人已下载

下载pdf

pdf贡献者

doc-open

贡献于2015-05-16

下载需要 6 金币 [金币充值 ]
亲,您也可以通过 分享原创pdf 来获得金币奖励!
下载pdf