分布式消息队列(Message Queue)系统:kafka

jopen 6年前

分布式系统很重要的一个设计原则是松耦合,即尽量减少子系统间的依赖。这样各个子系统可以相互独立的进行演进,维护,重用等。Message Queue (MQ)是一种很好的解耦手段。要了解MQ在系统整合中的作用,可以看Enterprise Integration Patterns (EIP)这本书或对应的网站。简单说就是发布者只管把消息发布到MQ中而不管谁会来取,同样消息使用者只管只管从MQ取消息而不管是谁发布的。这样发布者和使用者都不用知道对方的存在。

MQ产品也有很多,开源的也不少。常见的有activeMQ,openMQ,RabbitMQ等。以前也用过MQ系统,而最近在思考SaaS系统中如何使用MQ。所以在网上看看目前有什么样的MQ系统具有比较好的扩展性,可以支持大规模的数据流的,就发现了kafka。

1. kafka是什么

kafka是LinkedIn开发并开源的一个分布式MQ系统,现在是Apache的一个孵化项目。在它的主页描述kafka为一个高吞吐量的分布式(能将消息分散到不同的节点上)MQ。在这片博文中,作者简单提到了开发kafka而不选择已有MQ系统的原因。两个原因:性能和扩展性。这里做适当解释。

基本上目前绝大多数(如果不是所有的)MQ系统都是针对企业集成应用设计的,而不是针对大规模Service应用设计的。两者有什么区别?

企业集成的基本特点是把企业中现存的本不相干的各种应用进行集成。例如:一个企业可能想把财务系统和仓管系统进行集成,减少部门间结算和流通的成本和时间,并能更好的支持上层决策。但这两个系统是由不同的厂家做的,不能修改。另外企业集成是一个持续渐进的过程,需求变化非常频繁。这对MQ系统的要求是要非常灵活,可定制性要求高。所以常见的MQ系统通常都可以通过复炸的xml配置或插件开发进行定制以适应不同企业的业务流程的需要。他们大多数都能通过配置不同程度的支持EIP中定义一些模式。但设计目标并没有很重视扩展性和性能,因为通常企业级应用的数据流和规模都不会非常大。即使有的比较大,使用高配置的服务器或做一个简单几个节点的集群就可以满足了。

大规模的service是指面向公众的向非死book,google,linkedin和taobao这样级别或有可能成长到这个级别的应用。相对企业集成来讲,这些应用的业务流程相对比较稳定。子系统间集成的业务复杂度也相对较低,因为子系统通常也是经过精心选择和设计的并能做一定的调整。所以对MQ系统的可定制性及定制的复杂性要求并不高。但由于数据量会非常巨大,不是几台Server能满足的,可能需要几十甚至几百台,且对性能要求较高以降低成本,所以MQ系统需要有很好的扩展性。

kafka正是一个满足SaaS要求的MQ系统,它通过降低MQ系统的复杂度来提高性能和扩展性。

2. kafka的设计

kafka的设计文档详细说明了它的设计思路。这里简单列举并讨论一下。

基本概念

kafka的工作方式和其他MQ基本相同,只是在一些名词命名上有些不同。为了更好的讨论,这里对这些名词做简单解释。通过这些解释应该可以大致了解kafka MQ的工作方式。

  • Producer (P):就是网kafka发消息的客户端
  • Consumer (C):从kafka取消息的客户端
  • Topic (T):可以理解为一个队列
  • Consumer Group (CG):这是kafka用来实现一个topic消息的广播(发给所有的consumer)和单播(发给任意一个consumer)的手段。一个 topic可以有多个CG。topic的消息会复制(不是真的复制,是概念上的)到所有的CG,但每个CG只会把消息发给该CG中的一个 consumer。如果需要实现广播,只要每个consumer有一个独立的CG就可以了。要实现单播只要所有的consumer在同一个CG。用CG还可以将consumer进行自由的分组而不需要多次发送消息到不同的topic。
  • Broker (B):一台kafka服务器就是一个broker。一个集群由多个broker组成。一个broker可以容纳多个topic。
  • Partition(P):为了实现扩展性,一个非常大的topic可以分布到多个broker(即服务器)上。kafka只保证按一个partition中的顺序将消息发给consumer,不保证一个topic的整体(多个partition间)的顺序。

可靠性(一致性)

MQ要实现从producer到consumer之间的可靠的消息传送和分发。传统的MQ系统通常都是通过broker和consumer间的确认(ack)机制实现的,并在broker保存消息分发的状态。即使这样一致性也是很难保证的(参考原文)。kafka的做法是由consumer自己保存状态,也不要任何确认。这样虽然consumer负担更重,但其实更灵活了。因为不管consumer上任何原因导致需要重新处理消息,都可以再次从 broker获得。

kafka的producer有一种异步发送的操作。这是为提高性能提供的。producer先将消息放在内存中,就返回。这样调用者(应用程序)就不需要等网络传输结束就可以继续了。内存中的消息会在后台批量的发送到broker。由于消息会在内存呆一段时间,这段时间是有消息丢失的风险的。所以使用该操作时需要仔细评估这一点。

另外,在最新的版本中,还实现了broker间的消息复制机制,去除了broker的单点故障(SPOF)。

扩展性

kafka使用zookeeper来实现动态的集群扩展,不需要更改客户端(producer和consumer)的配置。broker会在 zookeeper注册并保持相关的元数据(topic,partition信息等)更新。而客户端会在zookeeper上注册相关的watcher。一旦zookeeper发生变化,客户端能及时感知并作出相应调整。这样就保证了添加或去除broker时,各broker间仍能自动实现负载均衡。

负载均衡

负载均衡可以分为两个部分:producer发消息的负载均衡和consumer读消息的负载均衡。

producer有一个到当前所有broker的连接池,当一个消息需要发送时,需要决定发到哪个broker(即partition)。这是由 partitioner实现的,partitioner是由应用程序实现的。应用程序可以实现任意的分区机制。要实现均衡的负载均衡同时考虑到消息顺序的问题(只有一个partition/broker上的消息能保证按顺序投递),partitioner的实现并不容易。个人认为这一点还有待改进。

consumer读取消息时,除了考虑当前的broker情况外,还要考虑其他consumer的情况,才能决定从哪个partition读取消息。具体的机制还不是很清楚,需要做更深入的研究。

性能

性能是kafka设计重点考虑的因素。使用多种方法来保证稳定的O(1)性能。

kafka使用磁盘文件保存收到的消息。它使用一种类似于WAL(write ahead log)的机制来实现对磁盘的顺序读写,然后再定时的将消息批量写入磁盘。消息的读取基本也是顺序的。这正符合MQ的顺序读取和追加写特性。

另外,kafka通过批量消息传输来减少网络传输,并使用java中的sendfile和0拷贝机制减少从读取文件到发送消息间内存数据拷贝和内核用户态切换的次数。

根据kafka的性能测试报告,它的性能基本达到了O(1)的复杂度。

3. 总结

从以上来看,个人觉得kafka比较适合用来做简单的消息传递和分发,能支持大数据量。但如果需要实现复杂的EIP模式,则不像传统MQ那么容易。而且,因为只有partition内的消息才能保证传递顺序,如果消息的顺序很重要,又需要很好的扩展性,使用kafka实现可能会比较困难。所以,kafka应该比较适合处理简单的事件和消息,例如数据(log)收集,大量事实数据的实时分析(kafka可与MapReduce集成)。

但需要注意的是,kafka现在还只是Apache的孵化项目,还不是很成熟,虽然开发活动还是比较活跃的。