kafka学习

jopen 10年前

关于kafka的架构图在之前的一篇文章中有:

http://wiki.corp.qunar.com/pages/viewpage.action?pageId=27866816

kafka的一些设计理念:

1、关注大吞吐量,而不是别的特性

2、针对实时性场景

3、关于消息被处理的状态是在consumer端维护,而不是由kafka server端维护。

4、分布式,producer、broker和consumer都分布于多台机器上。

以下内容基本是翻译 加总结kafka的官方文档:http://incubator.apache.org/kafka/design.html

基本术语和概念:

Message 是通信的基本单位。

Messages被一个producer发布给一个topic,也就是说它们被物理地传递给作为一个broker的服务器(一般是另外一台机器)。一些consumer订阅一个topic,每一个被发布的消息就被转交给这些consumer了。

每一个consumer进程属于一个consumer  group 并且一条message会传递给consumer group中但是(全局看)只有一个consumer进程。这种consumer group 概念非常有用,因为,consumer group 下面的多个进程或机器在逻辑上可以作为一个consumer。

为支持 queue语义,我们可以把所有的consumers 放到一个consumer group,然后一个消息只会进入一个consumer。

为支持topic语义,consumer可以作为单独的consumer group,这样所有的消息都会传给每一个consumer。

Kafka在大规模数据面前有更多的优势,因为不管一个topic有多少consumer,一个消息只存储一次。

Message持久化和缓存:

Kafka是依赖文件系统来存储和缓存消息的,(但是大家都觉得磁盘是比较慢的),磁盘不同用法会造成速度上的巨大差别。

一个67200rpm SATA磁盘 线性写可达到300M/s,但是如果是随机写,只有50k/s

并且,kafka是运行在JVM上的,JVM两个特性:

         1、object 的内存开销是非常大的,经常是要存储数据的两倍(或者更高)

         2、Java的内存回收机制随着堆内存的数据的增加变得频繁。

作为这些因素的结果,使用文件系统和依赖于页缓存比维持一个内存的存储或者其他的结构有优势------我们至少通过自动访问所有的空闲内存使得可用的缓存加倍,而且可能通过存储一个紧凑的字节结构而不是单独的对象使

得可用的缓存又增加一倍的大小。这么做将导致在在一个32GB的机器上有28到30GB的缓存,而且还不会有GC带来的损失。而且这种缓存将保持可用即使服务被重新启动,但是进程中的缓存将需要在内存中重建(这对于一

个10GB的缓存将需要大概10分钟的时间)或者需要用一个完全的冷备份启动(这将是一个非常可怕的初始化过程)。它也将极大的简化了编码因为所有在缓存和文件系统里的相关维护逻辑现在都归操作系统里了,这将比在

进程中的一次性尝试的效率和正确度都要高。如果你的磁盘支持一次的读取那么read-ahead 将有效地用每一个磁盘上读取的有用数据填充这个缓存。

这表明了一种很简单的设计:我们不是把数据尽量多的维持在内存中并只有当需要的时候在将数据刷到文件系统,我们是反其道而行之。所有的数据不用进行任何的刷数据的调用就立刻被写入到文件系统的一个持久化的日志

记录中。事实上这只是意味着转移到了内核的页缓存中,OS将在之后将它刷出。接着我们添加一个配置驱动器刷数据策略来允许系统的用户控制数据被刷入物理磁盘的频率(每多少消息或者每多少秒)来设置一个在临界磁

盘崩溃时数据量的一个限制。

这种页缓存为中心的设计在一片关于Varnish的设计的文章 中有描述。

满足长时间保存消息:

一般消息系统持久化数据结构是用BTree,使得在消息系统中支持一个广泛的各种各样的事务性的和非事务性的语义。但是BTree的开销还是比较高的:B树操作的复杂度是O(log N),这个开销貌似是固定的。但是对磁盘操作

却不是这样的,因为需要考虑磁盘寻道的开销。此外,为满足事务性语义,BTree还要考虑row-lock,无疑这样的开销是非常大的。

直观上一个持久化的队列可以进行简单读写和添加数据到文件。尽管不能支持B数的丰富语义,但是他的优势是:快!O(1)并且读写不相互阻塞。

这样还有个好处,可以长时间存储消息,只要磁盘没有限制并且不出现损失,kafka可以存储相当长时间的消息(一周)。

效率最大化:

通常有两种原因造成效率低下: 太多的网络请求,过多的字节拷贝。

为提供效率,kafka的API围绕 “message set”概念构建,这种方式是天然的将消息分组。这样可以允许一次请求一组消息,并且分摊了网络往返的开销。

Lazily desialized :MessageSet  实现本身是一个封装了字节数组或者文件的API。因此,在处理messageset时可以用lazy deserialize。(如果不需要反序列化,就不做反序列化)

被broker维护的message的记录本身只是个被写入磁盘的message sets的目录。

维护字节数组或者文件对网络传输是非常方便的,现代的unix操作系统提供了一个非常高效的方法将数据从页缓存发送到socket------sendfile,java通过FileChannel.transferTo.api提供对这个系统调用的访问。

通常的数据从file传输到socket的路径有:

1、操作系统从磁盘读取文件到内核空间的pagecache。

2、应用程序从内核空间读取数据到用户空间的缓存。

3、应用程序将数据写回内核空间的socket buffer。

4、操作系统将socket  buffer的数据拷贝到NIC buffer,数据从NIC被发送到网络。

kafka使用了zero copy技术:

使用zero copy 方法优化: 数据只被拷贝到pagecache一次,每一次consumer请求都会重用,这就要求限制连接到服务器的consumer的数量。

点对点的批量压缩

高效的压缩需要将多个消息一起压缩而不是对单个消息单独压缩。

点对点压缩:producer端:定期的对数据进行压缩,然后发送给服务端。服务端以压缩的形式存储数据,只有当consumer请求数据时进行解压。

Kafka支持 GZIP 和 Snappy 压缩协议,详情请看这里

Consumer 状态:

对于一个消息系统来说,保持消耗消息的纪录,是一个必不可少的功能。

一般的消息系统是将消息追踪纪录写在broker端,因此,broker可以及时的将发出消息从消息系统删除,从而保障消息系统存储的数据尽量的少。

但是,如果consumer处理消息失败,此时broker已经把相应消息删除,会造成消息丢失。因此,许多消息系统引入 acknowledge机制,只有消息确认处理完成,broker再删除消息。

这种机制虽然解决消息丢失问题,但它又引入了别的问题:

1、如果consumer处理完一个消息,在发生确认之前fail掉,会造成消息被处理两次。

2、在性能上,broker要记录每一个消息的多种状态,开销太大。

消息交付语义:

以下几个条件是消息交付必须保证的:

1、最多一次:消息处理之后应该立即被标记,以免重复处理,但是,很多失败场景会造成消息丢失。

2、至少一次:应该保证每一个消息至少被交付一次,但是,失败场景会造成交付多次。

3、恰好一次:这正是我们想要的。

确保恰好交付一次的算法有:two- or three- phase commits  和 Paxos 的变体。但是它们有一些缺陷:They typically require multiple round trips and may have poor guarantees of liveness (they can halt indefinitely). The FLP result provides some of the fundamental limitations on these algorithms.(求解释)

在metadata 方面,kafka有两样是与众不同的。

* *1、数据流将被划分成独立的部分(partitions)。这个语义是将划分的过程留给了producer,producer来指定一个消息 属于哪个部分。在一个partition中,消息是按照接受顺序排序的,并且会以相同的顺序发送给consumer。这意味着我们需要记录 每一部分(consumer、topic、and partition)的“high water mark”。

    2、在kafka中,high-water mark 以“offset”的形式记录,这对于各方面来说将变的非常清晰。

在kafka中,consumer将保持记录已经处理的消息状态(offset),consumer会将这些记录信息写到zookeeper中,但 是如果把这些信息存储到consumer正在写入数据的数据库会更好,例如:consumer可能会简单将一些统计信息写到OLTP数据库。这样 consumer可以把状态信息写到正在进行事务操作的数据库,这样解决了分布式一致性问题。

很多应用场景下,可以讲consumer存储的状态信息(offset)同步到别的地方(如搜索系统的索引字段里,HDFS中),这样,如果consumer挂了,还能从上次处理的记录的offset继续,避免重复处理。

另外,这种机制还可以这样用,直接从头开始处理,虽然会有出发处理,但是有些场景确实需要这样做。例如:发现处理过程有BUG,等BUG修复了可以从头开始处理。

推 VS 拉:

有一个问题是:应该让consumer从broker拉数据还是应该让broker向consumer推数据?

在这一点上,kafka遵从传统的做法,数据从producer被推到broker,然后consumer从broker拉取数据,类似的系统有 scribe 和flume(大家可以查查相关资料)。

在push-based的系统中,broker将控制着consumer的处理数据的速度(依赖broker向其推送数据的速度);pull-based系统就不会这样,道理很简单,consumer处理数据的速度主要受其自身拉数据的能力影响。

当然,可以结合一些backoff协议,以防consumer拉数据的速度过载,让其达到满载即可。

结合以上的说明,kafka采用的是        Producer ----- 推 --->broker<--- 拉  -----consumer;

Producer:

producer的自动负载均衡:

kafka支持producer端的自动负载均衡和用户指定的TCP连接负载均衡器;kafka的broker支持用户指定的第四层TCP连接负载均衡器(求解释)。

使用第四层TCP连接负载均衡器 好 处是每一个producer只需要一个TCP连接与broker连接,并且不需要与zookeeper连接。它的弊端是负载均衡是在TCP层进行的,这样 会造成负载分配不够均衡(如果一个producer产生的消息比其他producer多,producer到broker的tcp连接均衡并不会导致消息 的传递的均衡)。

基于zookeeper客户端的负载均衡解决了这个问题,它允许producer自动发现broker,并且对每一个请求都会做负载均衡。

并且zookeeper会通过一个key而不是随机的将producer与broker连接起来(例如按照用户ID来划分数据)。这种机制称为“语义划分”;

基于zookeeper的负载均衡描述如下:通过以下几种事件在zookeeper watcher上注册:

1、新的broker出现。

2、一个broker挂了。

3、一个新的topic被注册。

4、一个broker注册到一个已经存在的topic。

在内部,producer对每一个broker会建一个弹性的连接池。这个连接池通过zookeeper watchers的回调会持续更新与每一个broker的新建或者已经连好的连接(保持心跳)。

当一个producer针对某个topic产生数据,broker会挑选一个partition,然后,在连接池中选一个可用的连接进行传输数据。

异步发送:

异步非阻塞操作是一个可扩展消息系统的基本操作,kafka当然也提供这样一个操作(producer.type=async)。producer 可以在内存中缓存要发送的消息,然后等到触发时间或者缓存内容达到配置好的buffer的大小,就会批量发送消息。由于产生消息的机器一般都是异构的,产 生数据的速度是不同的,这种异步缓存机制会对broker产生统一的通信量,会更好的提高网络利用率和更高的吞吐量。

语义划分:

kafka的producer可以将消息与可用的broker和partition进行映射,这样允许对数据流以语义进行划分,就是基于关键字将这些消息 划分到不同的broker和partition。这个划分机制可以由用户自己通过实现kafka.producer.Partitioner接口进行设 置,默认的是随机划分。例如:关键词可以是消息的ID,划分方法可以是hash(消息id)%划分的总数