技术部\基础架构部 吴光英 2016-06-02 消息队列探究 目录 1,消息队列简介 2,消息队列使用场景及示例 3,常用消息队列软件比较 4,消息队列Kafka 5,消息队列RabbitMQ 6,总结 7,参考资料 1,消息队列简介 MQ(Message Queue),即消息队列是在消息的传输过程 中保存消息的容器。 通俗的说,就是一个容器,你把消息丢进去,不需要立 即处理。然后有个程序去从你的容器里面把消息一条条 读出来处理。 一般用于应用系统解耦、消息异步分发,能够提高系统 吞吐量。 MQ的产品有很多,有开源的,也有闭源,比如ZeroMQ、 RabbitMQ、 ActiveMQ、Kafka/Jafka、Kestrel、Beanstalkd、 HornetQ、Apache Qpid、Sparrow、Starling、Amazon SQS、 MSMQ等 2,消息队列使用场景 2. 1示例--异步处理 用户注册后,需要发注册邮件和注册短信。传统的做法有两种1.串行的方 式;2.并行方式。 引入消息队列,将不是必须的业务逻辑,异步处理。改造后的架 构如下 2. 2示例--应用解耦 2. 3示例--流量削锋 2. 4示例--日志处理 2. 5示例--消息通讯 2.6消息队列在架构的应用 分布式事务支持(补偿机制)、异步处理,应用解耦和消息通讯 (1)Kafka:接收用户日志的消息队列。 (2)Logstash:做日志解析,统一成JSON输出给Elasticsearch。 (3)Elasticsearch:实时日志分析服务,兼具强大的搜索和统计功能。 (4)Kibana:基于Elasticsearch的数据可视化组件,超强的数据可视化能力是 众多公司选择ELK stack的重要原因。 3,常用消息队列软件比较 ActiveMQ. Java世界的中坚力量。它有很长的历史,而且被广泛的使 用。它还是跨平台的,给那些非微软平台的产品提供了一 个天然的集成接入点。然而,它只有跑过了MSMQ才有可 能被考虑。 RabbitMQ. 用Erlang写成的消息中间件的优秀的特性。支持开放的高 级消息队列协议 (AMQP,Advanced Message Queuing Protocol), 从根本上避免了生产厂商的封闭,使用任何语言的各种客 户都可以从中受益。这种协议提供了相当复杂的消息传输 模式,它还具有“企业级”的适应性和稳定性。 ZeroMQ. 开发这个产品的公司是AMQP集团的一部分,还有 一个叫做OpenAMQ的产品。ZeroMQ具有一个独特 的非中间件的模式,也就是说,跟其它几个接受测 试的产品不同,你不需要安装和运行一个消息服务 器,或 中间件。你只需要简单的引用ZeroMQ程序 库,可以使用NuGet安装,然后你就可以愉快的在 应用程序之间发送消息了。非常有趣的是,他们也 同样使用这 方式在任何利用ZeroMQ进行强大的进 程内通信的语言里创建Erlang风格的这种执行角色。 MSMQ. 这是微软的产品里唯一被认为有价值的东西。对我 的客户来说,如果MSMQ能证明可以应对这种任务, 他们将选择使用它。关键是这个东西并不复杂,除 了接收和 发送,没有别的;它有一些硬性限制, 比如最大消息体积是4MB。然而,通过和一些像 MassTransit 或 NServiceBus这样的软件的连接,它完 全可以解决这些问题。 Kafka. Apache Kafka是分布式发布-订阅消息系统。它最初 由LinkedIn公司开发,之后成为Apache项目的一部分。 Kafka是一种快速、可扩展的、设计内在就是分布式 的,分区的和可复制的提交日志服务。 3.1常用mq测试比较 发送和接受的每秒钟的消息数。整个过程共产生1百万 条1K的消息。测试的执行是在Windows 上进行的。 4,消息队列Kafka 简介:Apache Kafka是分布式发布-订阅消息系统。 它最初由LinkedIn公司开发,之后成为Apache项目的 一部分。Kafka是一种快速、可扩展的、设计内在就 是分布式的,分区的和可复制的提交日志服务。 与传统消息系统相比,有以下不同: 它被设计为一个分布式系统,易于向外扩展; 它同时为发布和订阅提供高吞吐量; 它支持多订阅者,当失败时能自动平衡消费者; 它将消息持久化到磁盘,因此可用于批量消费, 例如ETL,以及实时应用程序。 4.1架构 话题(Topic)是特定类型的消息流。消息是字节的有效 负载(Payload),话题是消息的分类名或种子(Feed) 名。 生产者(Producer)是能够发布消息到话题的任何对象。 已发布的消息保存在一组服务器中,它们被称为代理 (Broker)或Kafka集群。 消费者可以订阅一个或多个话题,并从Broker拉数据, 从而消费这些已发布的消息。 4.2 Topics/logs 一个Topic可以认为是一类消息,每个topic将被分成多个partition(区), 每个partition在存储层面是append log文件。任何发布到此partition的 消息都会被直接追加到log文件的尾部,每条消息在文件中的位置 称为offset(偏移量),offset为一个long型数字,它是唯一标记一条 消息。它唯一的标记一条消息。 没有提供其他额外的索引机制来存储offset,因为在kafka中几乎不 允许对消息进行“随机读写”。 4.3适用场景 1、Messaging:对于一些常规的消息系统,kafka是个不错的选 择;partitons/replication和容错,可以使kafka具有良好的扩展性和性能 优势;但kafka并没有提供JMS中的"事务性""消息传输担保(消息确 认机制)""消息分组"等企业级特性;在一定程度上,尚未确保消息 的发送与接收绝对可靠(比如,消息重发,消息发送丢失等) 2、Websit activity tracking:kafka可以作为"网站活性跟踪"的最佳工具; 可以将网页/用户操作等信息发送到kafka中。并实时监控,或者离线 统计分析等 3、Log Aggregation:kafka的特性决定它非常适合作为"日志收集中心 ";application可以将操作日志"批量""异步"的发送到kafka集群中,而不 是保存在本地或者DB中;kafka可以批量提交消息/压缩消息等,这对 producer端而言,几乎感觉不到性能的开支. 4.4,设计原理 持久性:kafka使用文件存储消息,这就直接决定kafka 在性能上严重依赖文件系统的本身特性. 性能:要考虑的影响性能点很多,除磁盘IO之外,我 们还需要考虑网络IO,这直接关系到kafka的吞吐量问 题. 生产者:负载均衡--producer将会和Topic下所有 partition leader保持socket连接;消息由producer直接通 过socket发送到broker,中间不会经过任何"路由层" 消费者:consumer端向broker发送"fetch"请求,并告知 其获取消息的offset;此后consumer将会获得一定条数 的消息;consumer端也可以重置offset来重新消费消息. 在JMS实现中,Topic模型基于push方式,即broker将消息 推送给consumer端 在kafka中,采用了pull方式,即consumer在和broker建立 连接之后,主动去pull(或者说fetch)消息 4.5消息传送机制 JMS实现,消息传输担保非常直接:有且只有一次(exactly once) 在kafka中稍有不同: 1) at most once: 最多一次,这个和JMS中"非持久化"消息类似.发送 一次,无论成败,将不会重发. 2) at least once: 消息至少发送一次,如果消息未能接受成功,可能会 重发,直到接收成功. 3) exactly once: 消息只会发送一次. 通常情况下"at-least-once"是我们首选.(相比at most once而言,重复接 收数据总比丢失数据要好) 5,消息队列Rabbitmq RabbitMQ简介 RabbitMQ是实现AMQP(高级消息队列协议)的消 息中间件的一种,最初起源于金融系统,用于在分 布式系统中存储转发消息,在易用性、扩展性、高 可用性等方面表现不俗。 AMQP,即Advanced Message Queuing Protocol,高级消 息队列协议,是应用层协议的一个开放标准,为面 向消息的中间件设计 服务器端用Erlang语言编写,支持多种客户端,如: Python、Ruby、.NET、Java、JMS、C、PHP、 ActionScript、XMPP、STOMP等 5.1RabbitMQ基本的对象 Connection是RabbitMQ的socket链接,它封装了socket 协议相关部分逻辑。 ConnectionFactory为Connection的制造工厂。 Channel是我们与RabbitMQ打交道的最重要的一个接 口,我们大部分的业务操作是在Channel这个接口中 完成的,包括定义Queue、定义Exchange、绑定 Queue与Exchange、发布消息等。 5.2 概念和特性--交换机(exchange) 1. 接收消息,转发消息到绑定的队列。四种类型: direct, topic,fanout, headers 2. 如果没有队列绑定在交换机上,则发送到该交换 机上的消息会丢失。 3. 一个交换机可以绑定多个队列,一个队列可以被 多个交换机绑定。 4. topic类型交换器通过模式匹配分析消息的routing- key属性。它将routing-key和binding-key的字符串切分 成单词。这些单词之间用点隔开。它同样也会识别 两个通配符:#匹配0个或者多个单词,*匹配一个 单词。 5. 因为交换器是命名实体,声明一个已经存在的交换器,但是试 图赋予不同类型是会导致错误。客户端需要删除这个已经存在的 交换器,然后重新声明并且赋予新的类型。 6. 交换器的属性: - 持久性:如果启用,交换器将会在server重启前都有效。 - 自动删除:如果启用,那么交换器将会在其绑定的队列都被删除 掉之后自动删除掉自身。 - 惰性:如果没有声明交换器,那么在执行到使用的时候会导致异 常,并不会主动声明。 7.生产者将消息发送到Exchange(交换器,下图中的X),由 Exchange将消息路由到一个或多个Queue中(或者丢弃)。 Routing key:生产者在将消息发送给Exchange的时候,一般会指定 一个routing key,来指定这个消息的路由规则,而这个routing key需 要与Exchange Type及binding key联合使用才能最终生效。 Binding key:在绑定(Binding)Exchange与Queue的同时,一般会指 定一个binding key;消费者将消息发送给Exchange时,一般会指定一 个routing key;当binding key与routing key相匹配时,消息将会被路由 到对应的Queue中。 Exchange Type:fanout 把所有发送到该Exchange的消息路由到所有与它绑 定的Queue中。 生产者(P)发送到Exchange(X)的所有消息都会 路由到图中的两个Queue,并最终被两个消费者 (C1与C2)消费。 Exchange Type:direct Exchange Type:topic topic类型的Exchange在匹配规则上进行了扩展 routing key为一个句点号“. ”分隔的字符串(我们将被句点号“. ”分隔 开的每一段独立的字符串称为一个单词),如“stock.usd.nyse”、 “nyse.vmw”、“quick.orange.rabbit” binding key与routing key一样也是句点号“. ”分隔的字符串 binding key中可以存在两种特殊字符“*”与“#”,用于做模糊匹配,其 中“*”用于匹配一个单词,“#”用于匹配多个单词(可以是零个) 5.3队列(queue) 1. 队列是RabbitMQ内部对象,存储消息。相同属性的queue 可以重复定义。 2. 临时队列。channel.queueDeclare(),有时不需要指定队列 的名字,并希望断开连接时删除队列。 3. 队列的属性: - 持久性:如果启用,队列将会在server重启前都有效。 - 自动删除:如果启用,那么队列将会在所有的消费者停 止使用之后自动删除掉自身。 - 惰性:如果没有声明队列,那么在执行到使用的时候会 导致异常,并不会主动声明。 - 排他性:如果启用,队列只能被声明它的消费者使用。 5.4消息传递 1. 消息在队列中保存,以轮询的方式将消息发送给监听消息队 列的消费者,可以动态的增加消费者以提高消息的处理能力。 2. 为了实现负载均衡,可以在消费者端通知RabbitMQ,一个消 息处理完之后才会接受下一个消息。 channel.basic_qos(prefetch_count=1) 注意:要防止如果所有的消费者都在处理中,则队列中的消息 会累积的情况。 3. 消息有14个属性,最常用的几种:deliveryMode:持久化属 性;contentType:编码;replyTo:指定一个回调队列;correlationId: 消息id 5.5高可用性(HA) 1. 消息ACK,通知RabbitMQ消息已被处理,可以从内存删除。如果 消费者因宕机或链接失败等原因没有发送ACK,则RabbitMQ会将消 息重新发送给其他监听在队列的下一个消费者。 2. 消息和队列的持久化。定义队列时可以指定队列的持久化属性 channel.queueDeclare(queuename, durable=true, false, false, null); 3.发送消息时可以指定消息持久化属性,即使RabbitMQ服务器重启, 也不会丢失队列和消息。 5.6 集群(cluster) 1. 不支持跨网段(如需支持,需要shovel或federation插 件) 2. 可以随意的动态增加或减少、启动或停止节点,允许 节点故障 3. 集群分为RAM节点和DISK节点,一个集群最好至少有 一个DISK节点保存集群的状态。 4. 集群的配置可以通过命令行,也可以通过配置文件, 命令行优先。 5.7使用简介 简易使用流程 5.8Demo及client简介 Demo:0至4演示 rabbitmq_client演示 《Rabbitmq实例简介》 6,总结 RabbitMQ,遵循AMQP协议,由内在高并发的erlanng语言开发,用在 实时的对可靠性要求比较高的消息传递上。 kafka主要用于处理活跃的流式数据,大数据量的数据处理上。 rabbitMQ以broker为中心;有消息的确认机制。 kafka遵从一般的MQ结构以consumer为中心,根据消费的点,从 broker上批量pull数据;无消息确认机制。 kafka具有高的吞吐量,内部采用消息的批量处理,zero-copy机制, 消息处理的效率很高。 rabbitMQ在吞吐量方面稍逊于kafka,rabbitMQ支持对消息的可靠的 传递,支持事务,不支持批量的操作;基于存储的可靠性的要求 存储可以采用内存或者硬盘。 7,参考资料 http://www.cnblogs.com/likehua/p/3999538.html http://www.infoq.com/cn/articles/apache-kafka/ http://blog.csdn.net/whycold/article/details/41119807 http://lynnkong.iteye.com/blog/1699684 http://www.cnblogs.com/davidwang456/p/4076097.html 扩展话题 汽车狂人--从电池研发到汽车制造,王传福:“一 种新产品的开发,实际上60%来自公开文献,30% 来自现成样品,另外5%来自原材料等因素,自身 的研发实际上只有5%左右。” 美国-埃隆.马斯克,已使用PayPal、Tesla及SpaceX三 家公司分别创造、颠覆了三个行业--支付、汽车及 航空。导航产品我国已有10年历史,但在汽车上, 特斯拉是第一家将17寸大屏幕放到汽车的。比起马 斯克,我们干的好像别人都能干,但是他干的别人 都想不到。 消息队列mq,除了基本的使用场景,大家大可研 究、发挥想象力,使用它做出的别人都想不到高性 能应用。 谢谢!