Flume 经典自学文档


Flume-ng Flume-ng Flume-ng Flume-ng 的原理和使用 1. 介绍.......................................................................................................1 2. 架构.......................................................................................................2 2.1 数据流...........................................................................................2 2.2 核心组件.......................................................................................4 2.2.1 source.................................................................................4 2.2.2 Channel...............................................................................5 2.2.3 sink.....................................................................................6 2.3 可靠性...........................................................................................6 2.4 可恢复性.......................................................................................7 3. 安装和使用...........................................................................................7 4. 开发相关.............................................................................................10 4.1 自定义 source......................................................................11 4.2 自定义 sink..........................................................................13 4.3 自定义拦截器.......................................................................17 4.4 其他说明...............................................................................17 5. 参考文档.............................................................................................17 1.1.1.1. 介绍 Flume 是 Cloudera 提供的日志收集系统,具有分布式、高可靠、 高可用性等特点,对海量日志采集、聚合和传输,Flume 支持在日志 系统中定制各类数据发送方,同时,Flume 提供对数据进行简单处理, 并写到各种数据接受方的能力。 Flume 使用 java 编写,其需要运行在 Java1.6 或更高版本之上。 官方网站:http://flume.apache.org/ 用户文档:http://flume.apache.org/FlumeUserGuide.html 开发文档:http://flume.apache.org/FlumeDeveloperGuide.html 2.2.2.2. 架构 2.12.12.12.1 数据流数据流数据流数据流 Flume 的核心是把数据从数据源收集过来,再送到目的地。为了 保证输送一定成功,在送到目的地之前,会先缓存数据,待数据真正 到达目的地后,删除自己缓存的数据。 Flume 传输的数据的基本单位是 Event,如果是文本文件,通常 是一行记录,这也是事务的基本单位。Event 从 Source,流向 Channel,再到 Sink,本身为一个 byte 数组,并可携带 headers 信 息。Event 代表着一个数据流的最小完整单元,从外部数据源来,向 外部的目的地去。 Flume 运行的核心是 Agent。它是一个完整的数据收集工具,含 有三个核心组件,分别是 source、channel、sink。通过这些组件, Event 可以从一个地方流向另一个地方,如下图所示。 source 可以接收外部源发送过来的数据。不同的 source,可以 接受不同的数据格式。比如有目录池(spooling directory)数据源, 可以监控指定文件夹中的新文件变化,如果目录中有文件产生,就会 立刻读取其内容。 channel 是一个存储地,接收 source 的输出,直到有 sink 消 费掉 channel 中的数据。channel 中的数据直到进入到下一个 channel 中或者进入终端才会被删除。当 sink 写入失败后,可以自 动重启,不会造成数据丢失,因此很可靠。 sink 会消费 channel 中的数据,然后送给外部源或者其他 source。如数据可以写入到 HDFS 或者 HBase 中。 flume 允许多个 agent 连在一起,形成前后相连的多级跳。 2.22.22.22.2 核心组件核心组件核心组件核心组件 2.2.12.2.12.2.12.2.1 sourcesourcesourcesource Client 端操作消费数据的来源,Flume 支持 Avro,log4j,syslog 和 http post(body 为json 格式)。可以让应用程序同已有的 Source 直接打交道,如 AvroSource,SyslogTcpSource。也可以写一个 Source,以 IPC(进程间通信协议) 或 RPC(远程进程间通信协议) 的 方式接入自己的应用,Avro 和 Thrift 都可以(分 别 有 NettyAvroRpcClient 和 ThriftRpcClient 实现了 RpcClient 接 口),其中 Avro 是默认的 RPC 协议。具体代码级别的 Client 端数 据接入,可以参考官方手册。 对现有程序改动最小的使用方式是使用是直接读取程序原来记录 的日志文件,基本可以实现无缝接入,不需要对现有程序进行任何改 动。 对于直接读取文件 Source,有两种方式: ExecSource: 以运行 Linux 命令的方式,持续的输出最新的数 据,如 tail -F 文件名 指令,在这种方式下,取的文件名必须是指 定的。 ExecSource 可以实现对日志的实时收集,但是存在 Flume 不 运行或者指令执行出错时,将无法收集到日志数据,无法保证日志数 据的完整性。 SpoolSource: 监测配置的目录下新增的文件,并将文件中的数据 读取出来。需要注意两点:拷贝到 spool 目录下的文件不可以再打 开编辑;spool 目录下不可包含相应的子目录。SpoolSource 虽然无 法实现实时的收集数据,但是可以使用以分钟的方式分割文件,趋近 于实时。如果应用无法实现以分钟切割日志文件的话,可以两种收集 方式结合使用。 在实际使用的过程中,可以结合 log4j 使用,使用 log4j 的时候,将 log4j 的文件分割机制设为1分钟一次,将文件拷 贝到spool 的监控目录。log4j 有一个 TimeRolling 的插件,可以 把 log4j 分割文件到 spool 目录。基本实现了实时的监控。 Flume 在传完文件之后,将会修改文件的后缀,变为 .COMPLETED (后缀也可以在配置文件中灵活指定)。 2.2.22.2.22.2.22.2.2 ChannelChannelChannelChannel 当前有几个 channel 可供选择,分别是 Memory Channel, JDBC Channel , File Channel,Psuedo Transaction Channel。比较常见 的是前三种 channel。 MemoryChannel 可以实现高速的吞吐,但是无法保证数据的完整 性。 MemoryRecoverChannel 在官方文档的建议上已经建义使用 FileChannel 来替换。 FileChannel 保证数据的完整性与一致性。在具体配置 FileChannel 时,建议 FileChannel 设置的目录和程序日志文件保存 的目录设成不同的磁盘,以便提高效率。 File Channel 是一个持久化的隧道(channel),它持久化所有的 事件,并将其存储到磁盘中。因此,即使 Java 虚拟机当掉,或者操 作系统崩溃或重启,再或者事件没有在管道中成功地传递到下一个代 理(agent),这一切都不会造成数据丢失。Memory Channel 是一个 不稳定的隧道,其原因是由于它在内存中存储所有事件。如果 java 进程死掉,任何存储在内存的事件将会丢失。另外,内存的空间收到 RAM 大小的限制,而 File Channel 这方面是它的优势,只要磁盘空 间足够,它就可以将所有事件数据存储到磁盘上。 2.2.32.2.32.2.32.2.3 sinksinksinksink Sink 在设置存储数据时,可以向文件系统、数据库、hadoop 存数 据,在日志数据较少时,可以将数据存储在文件系中,并且设定一定 的时间间隔保存数据。在日志数据较多时,可以将相应的日志数据存 储到 Hadoop 中,便于日后进行相应的数据分析.更多 sink 的内容可 以参考官方手册。 2.32.32.32.3 可靠性可靠性可靠性可靠性 Flume 的核心是把数据从数据源收集过来,再送到目的地。为了 保证输送一定成功,在送到目的地之前,会先缓存数据,待数据真正 到达目的地后,删除自己缓存的数据。 Flume 使用事务性的方式保证传送 Event 整个过程的可靠性。 Sink 必须在 Event 被存入 Channel 后,或者,已经被传达到下一 站agent 里,又或者,已经被存入外部数据目的地之后,才能把 Event 从 Channel 中 remove 掉。这样数据流里的 event 无论是在一个 agent 里还是多个 agent 之间流转,都能保证可靠,因为以上的事 务保证了 event 会被成功存储起来。而 Channel 的多种实现在可恢 复性上有不同的保证。也保证了 event 不同程度的可靠性。比如 Flume 支持在本地保存一份文件 channel 作为备份,而 memory channel 将 event 存在内存 queue 里,速度快,但丢失的话无法恢 复。 2.42.42.42.4 可恢复性可恢复性可恢复性可恢复性 还是靠 Channel。推荐使用 FileChannel,事件持久化在本地文件 系统里(性能较差)。 3.3.3.3. 安装和使用 Flume 的 rpm 安装方式很简单,这里不做说明。 示例1: avro 数据源 安装成功之后,在 /etc/flume/conf 目录创建 f1.conf 文件,内容 如下: agent-1.channels.ch-1.type = memory agent-1.sources.avro-source1.channels = ch-1 agent-1.sources.avro-source1.type = avro agent-1.sources.avro-source1.bind = 0.0.0.0 agent-1.sources.avro-source1.port = 41414 agent-1.sources.avro-source1.threads = 5 agent-1.sinks.log-sink1.channel = ch-1 agent-1.sinks.log-sink1.type = logger agent-1.channels = ch-1 agent-1.sources = avro-source1 agent-1.sinks = log-sink1 关于 avro-source 配置说明,请参考 avro-source 接下来启动 agent: $ flume-ng agent -c /etc/flume-ng/conf -f /etc/flume-ng/conf/f1.conf -Dflume.root.logger=DEBUG,console -n agent-1 参数说明: -n 指定 agent 名称 -c 指定配置文件目录 -f 指定配置文件 -Dflume.root.logger=DEBUG,console 设置日志等级 下面可以启动一个 avro-client 客户端生产数据: $ flume-ng avro-client -c /etc/flume-ng/conf -H localhost -p 41414 -F/etc/passwd -Dflume.root.logger=DEBUG,console 示例2:spooldir 数据源 在 /etc/flume/conf 目录创建 f2.conf 文件,内容如下: agent-1.channels = ch-1 agent-1.sources = src-1 agent-1.channels.ch-1.type = memory agent-1.sources.src-1.type = spooldir agent-1.sources.src-1.channels = ch-1 agent-1.sources.src-1.spoolDir = /root/log agent-1.sources.src-1.fileHeader = true agent-1.sinks.log-sink1.channel = ch-1 agent-1.sinks.log-sink1.type = logger agent-1.sinks = log-sink1 关于 Spooling Directory Source 配置说明,请参考 Spooling Directory Source 接下来启动 agent: $ flume-ng agent -c /etc/flume-ng/conf -f /etc/flume-ng/conf/f2.conf -Dflume.root.logger=DEBUG,console -n agent-1 然后,手动拷贝一个文件到 /root/log 目录,观察日志输出以及 /root/log 目录下的变化。 示例3:spooldir 数据源,写入 hdfs 在 /etc/flume/conf 目录创建 f3.conf 文件,内容如下: agent-1.channels.ch-1.type = file agent-1.channels.ch-1.checkpointDir= /root/checkpoint agent-1.channels.ch-1.dataDirs= /root/data agent-1.sources.src-1.type = spooldir agent-1.sources.src-1.channels = ch-1 agent-1.sources.src-1.spoolDir = /root/log agent-1.sources.src-1.deletePolicy= never agent-1.sources.src-1.fileHeader = true agent-1.sources.src-1.interceptors =i1 agent-1.sources.src-1.interceptors.i1.type = timestamp agent-1.sinks.sink_hdfs.channel = ch-1 agent-1.sinks.sink_hdfs.type = hdfs agent-1.sinks.sink_hdfs.hdfs.path = hdfs://cdh1:8020/user/root/events/%Y-%m-%d agent-1.sinks.sink_hdfs.hdfs.filePrefix = logs agent-1.sinks.sink_hdfs.hdfs.inUsePrefix = . agent-1.sinks.sink_hdfs.hdfs.rollInterval = 30 agent-1.sinks.sink_hdfs.hdfs.rollSize = 0 agent-1.sinks.sink_hdfs.hdfs.rollCount = 0 agent-1.sinks.sink_hdfs.hdfs.batchSize = 1000 agent-1.sinks.sink_hdfs.hdfs.writeFormat = text agent-1.sinks.sink_hdfs.hdfs.fileType = DataStream #agent-1.sinks.sink_hdfs.hdfs.fileType = CompressedStream #agent-1.sinks.sink_hdfs.hdfs.codeC = lzop agent-1.channels = ch-1 agent-1.sources = src-1 agent-1.sinks = sink_hdfs 关于 HDFS Sink 配置说明,请参考 HDFS Sink 说明: 通过 interceptors 往 header 里添加 timestamp,这样做,可 以在 hdfs.path 引用系统内部的时间变量或者主机的 hostname。 通过设置 hdfs.inUsePrefix,例如设置为 .时,hdfs 会把该文 件当做隐藏文件,以避免在 mr 过程中读到这些临时文件,引起一些 错误 如果使用 lzo 压缩,则需要手动创建 lzo 索引,可以通过修改 HdfsSink 的代码,通过代码创建索引 FileChannel 的目录最好是和 spooldir 的数据目录处于不同磁 盘。 开发相关 加入 jar 包依赖: org.apache.flume flume-ng-core 1.5.2 org.apache.flume flume-ng-configuration 1.5.2 4.14.14.14.1 自定义 sourcesourcesourcesource 比如有一个需求,需要读取指定文件,要求读取4行才发送给 channel。 packagepackagepackagepackagecom.gds.flume.source; importimportimportimportjava.io.FileNotFoundException; importimportimportimport java.io.IOException; importimportimportimport java.io.RandomAccessFile; importimportimportimport java.util.concurrent.atomic.AtomicLong; importimportimportimport org.apache.flume.Context; importimportimportimport org.apache.flume.Event; importimportimportimport org.apache.flume.EventDeliveryException; importimportimportimport org.apache.flume.PollableSource; importimportimportimport org.apache.flume.conf.Configurable; importimportimportimport org.apache.flume.event.EventBuilder; importimportimportimport org.apache.flume.source.AbstractSource; importimportimportimport org.slf4j.Logger; importimportimportimport org.slf4j.LoggerFactory; /** *从某个日子文件中每次读取4行,发送给channel * *@author@author@author@author dczhao * */ publicpublicpublicpublic classclassclassclass MySourceextendsextendsextendsextends AbstractSourceimplementsimplementsimplementsimplements Configurable, PollableSource{ privateprivateprivateprivate staticstaticstaticstatic finalfinalfinalfinal Loggerlogger=LoggerFactory .getLogger(MySource.classclassclassclass); /** *要读的日志文件地址 */ privateprivateprivateprivate StringlogPath; privateprivateprivateprivate RandomAccessFilefile; privateprivateprivateprivate AtomicLongautoLine; privateprivateprivateprivate StringBuilderoldLine=newnewnewnew StringBuilder();; @Override publicpublicpublicpublic synchronizedsynchronizedsynchronizedsynchronized voidvoidvoidvoid start(){ logger.info("MySourcestarting~~~~~"); autoLine=newnewnewnew AtomicLong(0); trytrytrytry { file=newnewnewnew RandomAccessFile(logPath,"r"); }catchcatchcatchcatch (FileNotFoundExceptione){ logger.error("fileopererror~~~~~",e); } supersupersupersuper.start(); } @Override publicpublicpublicpublicsynchronizedsynchronizedsynchronizedsynchronizedvoidvoidvoidvoidstop(){ trytrytrytry{ file.close(); }catchcatchcatchcatch(IOExceptione){ logger.error("filecloseerror~~~~~",e); } autoLine=newnewnewnewAtomicLong(0); supersupersupersuper.stop(); } publicpublicpublicpublicvoidvoidvoidvoidconfigure(Contextcontext){ thisthisthisthis.logPath=context.getString("logPath"); } publicpublicpublicpublicStatusprocess()throwsthrowsthrowsthrowsEventDeliveryException{ trytrytrytry{ Stringline=file.readLine(); ifififif(line==nullnullnullnull){ returnreturnreturnreturnStatus.BACKOFF; } ifififif(autoLine.intValue()%4==0){ logger.info("oldLine:"+oldLine.toString()); bytebytebytebyte[]body=oldLine.toString().getBytes(); Eventevent=EventBuilder.withBody(body); getChannelProcessor().processEvent(event); oldLine=newnewnewnewStringBuilder(); autoLine=newnewnewnewAtomicLong(0); } oldLine.append(line); }catchcatchcatchcatch(IOExceptione){ logger.error("getFilePointererror~~~~~",e); returnreturnreturnreturnStatus.BACKOFF; } returnreturnreturnreturnStatus.READY; } } 其实要学会怎么自定义 source,可以参考 flume 提供的相关 source 类,比如 AvroSource、NetcatSource、SpoolDirectorySource 等。 4.24.24.24.2 自定义 sinksinksinksink 从channel 中读取数据,推送到 MySQL 数据库保存数据为例。 packagepackagepackagepackagecom.gds.flume.sink; importimportimportimportcom.google.common.base.Preconditions; importimportimportimportcom.google.common.base.Throwables; importimportimportimportcom.google.common.collect.Lists; importimportimportimportorg.apache.flume.*; importimportimportimportorg.apache.flume.conf.Configurable; importimportimportimportorg.apache.flume.sink.AbstractSink; importimportimportimportorg.slf4j.Logger; importimportimportimportorg.slf4j.LoggerFactory; importimportimportimportjava.sql.Connection; importimportimportimportjava.sql.DriverManager; importimportimportimportjava.sql.PreparedStatement; importimportimportimportjava.sql.SQLException; importimportimportimportjava.util.List; publicpublicpublicpublicclassclassclassclassMysqlSinkextendsextendsextendsextendsAbstractSinkimplementsimplementsimplementsimplementsConfigurable{ privateprivateprivateprivateLoggerLOG=LoggerFactory.getLogger(MysqlSink.classclassclassclass); privateprivateprivateprivateStringhostname; privateprivateprivateprivateStringport; privateprivateprivateprivateStringdatabaseName; privateprivateprivateprivateStringtableName; privateprivateprivateprivateStringuser; privateprivateprivateprivateStringpassword; privateprivateprivateprivatePreparedStatementpreparedStatement; privateprivateprivateprivateConnectionconn; privateprivateprivateprivateintintintintbatchSize; publicpublicpublicpublicMysqlSink(){ LOG.info("MysqlSinkstart..."); } publicpublicpublicpublicvoidvoidvoidvoidconfigure(Contextcontext){ hostname=context.getString("hostname"); Preconditions.checkNotNull(hostname,"hostnamemustbeset!!"); port=context.getString("port"); Preconditions.checkNotNull(port,"portmustbeset!!"); databaseName=context.getString("databaseName"); Preconditions.checkNotNull(databaseName,"databaseNamemustbeset!!"); tableName=context.getString("tableName"); Preconditions.checkNotNull(tableName,"tableNamemustbeset!!"); user=context.getString("user"); Preconditions.checkNotNull(user,"usermustbeset!!"); password=context.getString("password"); Preconditions.checkNotNull(password,"passwordmustbeset!!"); batchSize=context.getInteger("batchSize",100); Preconditions.checkNotNull(batchSize>0,"batchSizemustbeapositive number!!"); } @Override publicpublicpublicpublicvoidvoidvoidvoidstart(){ supersupersupersuper.start(); trytrytrytry{ //调用Class.forName()方法加载驱动程序 Class.forName("com.mysql.jdbc.Driver"); }catchcatchcatchcatch(ClassNotFoundExceptione){ e.printStackTrace(); } Stringurl="jdbc:mysql://"+hostname+":"+port+"/"+databaseName; //调用DriverManager对象的getConnection()方法,获得一个Connection对象 trytrytrytry{ conn=DriverManager.getConnection(url,user,password); conn.setAutoCommit(falsefalsefalsefalse); //创建一个Statement对象 preparedStatement=conn.prepareStatement("insertinto"+tableName+ "(content)values(?)"); }catchcatchcatchcatch(SQLExceptione){ e.printStackTrace(); System.exit(1); } } @Override publicpublicpublicpublicvoidvoidvoidvoidstop(){ supersupersupersuper.stop(); ifififif(preparedStatement!=nullnullnullnull){ trytrytrytry{ preparedStatement.close(); }catchcatchcatchcatch(SQLExceptione){ e.printStackTrace(); } } ifififif(conn!=nullnullnullnull){ trytrytrytry{ conn.close(); }catchcatchcatchcatch(SQLExceptione){ e.printStackTrace(); } } } publicpublicpublicpublicStatusprocess()throwsthrowsthrowsthrowsEventDeliveryException{ Statusresult=Status.READY; Channelchannel=getChannel(); Transactiontransaction=channel.getTransaction(); Eventevent; Stringcontent; Listactions=Lists.newArrayList(); transaction.begin(); trytrytrytry{ forforforfor(intintintinti=0;i0){ preparedStatement.clearBatch(); forforforfor(Stringtemp:actions){ preparedStatement.setString(1,temp); preparedStatement.addBatch(); } preparedStatement.executeBatch(); conn.commit(); } transaction.commit(); }catchcatchcatchcatch(Throwablee){ trytrytrytry{ transaction.rollback(); }catchcatchcatchcatch(Exceptione2){ LOG.error("Exceptioninrollback.Rollbackmightnothavebeen"+ "successful.",e2); } LOG.error("Failedtocommittransaction."+ "Transactionrolledback.",e); Throwables.propagate(e); }finallyfinallyfinallyfinally{ transaction.close(); } returnreturnreturnreturnresult; } } 其实要学会怎么自定义 sink,可以参考 flume 提供的相关 sink 类, 比如 LoggerSink、AvroSink 等。 4.34.34.34.3 自定义拦截器 4.44.44.44.4 其他说明 1、使用 Spooling Directory Source 的时候,一定要避免同时读 写一个文件的情况。可以通过source1.ignorePattern = ^(.)*\\.tmp$这个配置,让 spoolingsource 不读取该格式的文件。 5.5.5.5. 参考文档 http://blog.csdn.net/xiao_jun_0820/article/category/2399621 这篇博客比较全。
还剩17页未读

继续阅读

下载pdf到电脑,查找使用更方便

pdf的实际排版效果,会与网站的显示效果略有不同!!

需要 10 金币 [ 分享pdf获得金币 ] 3 人已下载

下载pdf

pdf贡献者

shiyf111

贡献于2015-10-28

下载需要 10 金币 [金币充值 ]
亲,您也可以通过 分享原创pdf 来获得金币奖励!
下载pdf