大数据(七) - Flume

jopen 8年前

flume[flu:m]:日志采集、聚合和传输的系统, java语言实现

flume是干什么的?
收集日志的
flume如何搜集日志?
我们把flume比作情报人员
(1)搜集信息
(2)获取记忆信息
(3)传递报告间谍信息
flume是怎么完成上面三件事情的,三个组件:
source: 搜集信息
channel:传递信息
sink:存储信息
</div>


</div> </div>
flume OG( original generation初始版本 )和 NG( next generation,cdh4以及之后的版本
Flume OG 代码工程臃肿、核心组件设计不合理、核心配置不标准等
Flume OG 有三种角色节点:agent、collector、master节点。
Flume NG 只有一种角色的节点:代理节点(agent),去掉了 collector、master 节点,这是核心组件最核心的变化。 
agent 节点的组成也发生了变化,由 source、sink、channel 组成。

</div>
NG要求jdk1.6以上,而且只有linux上的启动脚本
     OG版本已经不更新了
    NG的核心组件:
        source:完成对 日志数据的收集,分成transition和event打入到channel中。
            source有多种实现包括AvroSource(监控端口)、NetCat Source、Syslog Source、Syslog TCP Source、Syslog UDP Source、Http Source、HDFS Source、Spooling Directory Source(对目录下新增文件的监控,并读取文件数据)、Exec Source(以运行linux命令的方式,持续输出最新数据,如tail -F)等
            flume可以和log4j配合使用
        sink:取出channel中的数据,输出到存储文件系统,数据库,或远程服务器
            多种实现方式如Avro sink、HDFS Sink、HBase Sink、Logger Sink(测试用,后台打印)
        小数据可以存储在文件或数据库中,海量数据(每天GB、TB级别的数据)存储到hadoop中
        Channel:管道,提供一个队列的功能,对source提供的数据进行简单缓存
            实现由Memory/File/jdbc channel,Memory无法保证数据完成性,官方建议使用File Channel,保证数据完整性和一致性
</div>
</div>
Flow Pipeline
1、多个Agent顺序连接
2、多个Agent的数据汇聚到同一个Agent

3、多路(一个agent上有多个channel)(Multiplexing)Agent

这种模式,有两种方式,一种是用来复制(Replication),另一种是用来分流(Multiplexing)。Replication方式,可以将最前端的数据源复制多份,分别传递到多个channel中,每个channel接收到的数据都是相同的;Multiplexing方式,selector可以根据header的值来确定数据传递到哪一个channel
4、实现load balance功能

5、实现failover功能

flume source
    Avro Source:接收外部avro客户端的事件
    Thrift Source:接收外部thrift客户端的事件
    Exec Source:接收来自一个给定的Unix命令的标准输出上的数据
    Jms Source:接收来自消息队列的事件
    NetCat Source:netcat在一端侦听,每一行文字变成一个事件源
    Spooling Directory Source:以目录中文件内容为事件源
    SequenceGenerator Source:一个简单地序列生成器,主要用于测试
    Syslog Source:读取syslog数据
        Syslog  UDP Source
        Syslog  TCP Source
        Multiport Syslog TCP Source
    Http Source:接收http post,get事件,get只用于试验
    Custom Source:自定义source

flume sink
    HDFS Sink将事件写入到hadoop分布式文件系统HDFS
    Logger sink 通常用于调试、测试
    Avro sink 可以批量传送,可以配置批量大小
    Thrift sink
    IRC sink 从通道中取得信息到irc server
    File Roll sink存储文件到本地文件系统中
    Null sink丢弃从通道接收的所有事件
    HBase sink将数据写入到hbase中
    AsyncHbase sink异步方式将数据写入到hbase中
    Custom sink 自定义sink
flume channel
    Memory channel 时间存储在一个可配置的最大尺寸的内存中的队列;速度快,吞吐量大,但是代理出现故障时数据丢失
    JDBC channel 时间存储在数据库中
    File channel 不同的file channel应该写到不同的磁盘上,避免单磁盘io过大
    Pseudo Thansaction channel 用于测试
    Custom channel 自定义channel
flume channel selector
    Replicating channel selector (default) 复制,相同的数据发送到多个channel
    Multiplexing channel selector 复用,以header区分一个event发送到哪个channel
    Custom channel selector 自定义channel selector

</div>

数据通信系统或计算机网络系统中,传输媒体的带宽或容量往往会大于传输单一信号的需求,为了有效地利用通信线路,希望一个信道同时传输多路信号,这就是所谓的 多路复用技术(Multiplexing)。采用多路复用技术能把多个信号组合起来在一条物理信道上进行传输,在远距离传输时可大大节省电缆的安装和维护费用。


Flume sink processor
    Default sink processor
    Failover sink processor 故障转移(主备)
    Load balancing sink processor 负载均衡:轮询round_robin或随机random

flume interceptor 
    拦截器主要是对事件的header信息信息操作,要么直接忽略他,要么修改他的数据
    一、Event Serializers
  file_roll sink 和hdfs sink 都支持EventSerializer接口
    Body TextSerializer,别名:text。这个拦截器将把事件的body部分写入到输出流中而不需要任何转换或者修改。事件的header将直接被忽略。
    Avro Event Serializer别名:avro_event。这个拦截器将把事件序列化到一个Avro容器文件中。使用的模式和RPC Avro机制使用到的处理flume事件的机制一样。这个序列化器继承自AbstractAvroEventSerializer类。
    二、Timestamp Interceptor
    Flume 可以在事件传输过程中对它进行修改与删除,而这个都是通过Interceptor进行实现的,实际都是往事件的header里插数据。而Timestamp Interceptor拦截器就是可以往event的header中插入关键词为timestamp的时间戳。
    三、Host Interceptor
  该拦截器可以往event的header中插入关键词默认为host主机名或者ip地址(注意是agent运行的机器的主机名或者ip地址)
    四、Static Interceptor
  Static Interceptor拦截器允许用户增加一个static的header并为所有的事件赋值。范围是所有事件。
    五、Regex FilteringInterceptor
  Regex Filtering Interceptor拦截器用于过滤事件,筛选出与配置的正则表达式相匹配的事件。可以用于包含事件和排除事件(include 或者是exclude)。常用于数据清洗,通过正则表达式把数据过滤出来。
</div> </div>

flume开发
    1、RPC
        flume虽然包含一些内部机制来采集数据,但是有时候用户希望能将应用程序和flume直接相通。flume client是一个库,允许应用程序链接flume和通过rpc往flume发送数据。
        avro是flume默认的rpc协议
    2、 Transaction    
        Flume 的核心是把数据从数据源收集过来,再送到目的地。为了保证输送一定成功,在送到目的地之前,会先缓存数据,待数据真正到达目的地后,删除自己缓存的数据。
        Flume 使用事务性的方式保证传送Event整个过程的可靠性。Sink 必须在 Event 被存入 Channel 后,或者,已经被传达到下一站agent里,又或者,已经被存入外部数据目的地之后,才能把 Event 从 Channel 中 remove 掉。这样数据流里的 event 无论是在一个 agent 里还是多个 agent 之间流转,都能保证可靠,因为以上的事务保证了 event 会被成功存储起来。而 Channel 的多种实现在可恢复性上有不同的保证。也保证了 event 不同程度的可靠性。比如 Flume 支持在本地保存一份文件 channel 作为备份,而memory channel 将 event 存在内存 queue 里,速度快,但丢失的话无法恢复。
    3、Source
    4、Sink
</div>

最佳实践

参考基于Flume的美团日志收集系统(一)架构和设计,列出一些最佳实践:

  • 模块命名规则:所有的 Source 以 src 开头,所有的 Channel 以 ch 开头,所有的 Sink 以 sink 开头
  • 模块之间内部通信统一使用 Avro 接口
  • 日志采集系统系统分为三层:Agent 层,Collector 层和 Store 层,其中 Agent 层每个机器部署一个进程,负责对单机的日志收集工作;Collector 层部署在中心服务器上,负责接收Agent层发送的日志,并且将日志根据路由规则写到相应的 Store 层中;Store 层负责提供永久或者临时的日志存储服务,或者将日志流导向其它服务器。
  • 扩展 MemoryChannel 和 FileChannel ,提供 DualChannel 的实现,以提供高吞吐和大缓存
  • 监控 collector HdfsSink写数据到 hdfs 的速度、FileChannel 中拥堵的 events 数量,以及写 hdfs 状态(查看是否有 .tmp 文件生成)

美团对 flume 的改进代码见 github:https://github.com/javachen/mt-flume

来自: http://blog.csdn.net//matthewei6/article/details/50534563