RabbitMQ 学习

123bt 9年前

一、概要

    官网: http://www.rabbitmq.com/ 

    一个不错的入门教程: http://blog.csdn.net/linvo/article/details/5750987  写的挺好的,只是刚开始看可能不太懂,模模糊糊,多看几遍,试着写点代码之后,再看。就比较清晰了。

    官方文档使用了 using the pika 0.9.8 Python client 。本文使用      http://github.com/celery/py-amqp   amqp 1.4.6

    至于安装,自己找下教程吧。不难,先安装 Erlang,再安装RabbitMQ。然后配置一下,有个web控制台。之后就是python编程使用了。

    再加一个不错的中文资料: http://blog.chinaunix.net/topic/surpershi/ 


    MQ全称为Message Queue, 消息队列(MQ)是一种应用程序对应用程序的通信方法。RabbitMQ是流行的开源消息队列系统,用erlang语言开发。RabbitMQ是AMQP(高级消息队列协议)的标准实现。

       Broker:简单来说就是消息队列服务器实体。
Exchange:消息交换机,它指定消息按什么规则,路由到哪个队列。
Queue:消息队列载体,每个消息都会被投入到一个或多个队列。
Binding:绑定,它的作用就是把exchange和queue按照路由规则绑定起来。
Routing Key:路由关键字,exchange根据这个关键字进行消息投递。
vhost:虚拟主机,一个broker里可以开设多个vhost,用作不同用户的权限分离。
producer:消息生产者,就是投递消息的程序。
consumer:消息消费者,就是接受消息的程序。
channel:消息通道,在客户端的每个连接里,可建立多个channel,每个channel代表一个会话任务。

    消息队列的使用过程大概如下:

       (1)客户端连接到消息队列服务器,打开一个channel。
(2)客户端声明一个exchange,并设置相关属性。
(3)客户端声明一个queue,并设置相关属性。
(4)客户端使用routing key,在exchange和queue之间建立好绑定关系。
(5)客户端投递消息到exchange。

    exchange接收到消息后,就根据消息的key和已经设置的binding,进行消息路由,将消息投递到一个或多个队列里。

    exchange也有几个类型,完全根据key进行投递的叫做Direct交换机,例如,绑定时设置了 routing key为”abc”,那么客户端提交的消息,只有设置了key为”abc”的才会投递到队列。对key进行模式匹配后进行投递的叫做Topic交换机,符 号”#”匹配一个或多个词,符号”*”匹配正好一个词。例如”abc.#”匹配”abc.def.ghi”,”abc.*”只匹配”abc.def”。还 有一种不需要key的,叫做Fanout交换机,它采取广播模式,一个消息进来时,投递到与该交换机绑定的所有队列。

    RabbitMQ支持消息的持久化,也就是数据写在磁盘上,为了数据安全考虑,我想大多数用户都会选择持久化。消息队列持久化包括3个部分:
(1)exchange持久化,在声明时指定durable => 1
(2)queue持久化,在声明时指定durable => 1
(3)消息持久化,在投递时指定delivery_mode => 2(1是非持久化)

    如果exchange和queue都是持久化的,那么它们之间的binding也是持久化的。如果exchange和queue两者之间有一个持久化,一个非持久化,就不允许建立绑定。


二、基本使用

    入门教程看会之后,就差不多了。

下面示例代码:

    consumer 消费者 

    # amqp_consumer.py

# -*- coding: utf-8 -*-    __author__ = 'lpe234'  __date__ = '2014-12-15'    import amqp    conn = amqp.Connection(host="localhost:5672", userid="guest", password="guest", virtual_host="/", insist=False)  chan = conn.channel()    chan.queue_declare(queue="po_box", durable=True, exclusive=False, auto_delete=False)  chan.exchange_declare(exchange="sorting_room", type="direct", durable=True, auto_delete=False, )    chan.queue_bind(queue="po_box", exchange="sorting_room", routing_key="1111")      def receive_callback(msg):      print 'Received: ' + msg.body + ' from channel #' + str(msg.channel.channel_id)    chan.basic_consume(queue='po_box', no_ack=True, callback=receive_callback, consumer_tag="consumer")  while True:      chan.wait()  chan.basic_cancel("consumer")    chan.close()  conn.close()


    producer 生产者

    # amqp_publisher.py

# -*- coding: utf-8 -*-    __author__ = 'lpe234'  __date__ = '2014-12-15'    import amqp  import json    conn = amqp.Connection(host="localhost:5672", userid="guest", password="guest", virtual_host="/", insist=False)  chan = conn.channel()    for x in xrange(10):      msg = json.dumps({'id': str(x)+'111', 'lists': [{'id': 12345}, {'id': 12345}, {'id': 15656}, {'id': '4545'}, ]})      print msg      msg = amqp.Message(msg)      msg.properties["delivery_mode"] = 2      chan.basic_publish(msg, exchange="sorting_room", routing_key="1111")      chan.close()  conn.close()



代码基本都是在 csdn 那个博客里面弄下来的。稍微的修改了以下。

启动时,先运行 consumer 消费者进程,它会先连接, 并创建 Queue和 Exchange ,然后一直等待队列中的消息。

然后,启动 publisher ,它会先连接,然后向指定 Exchange 交换机推送带有特定 routing_key 路由键的消息。

如果消费者对应的 Queue 队列与 Exchange 交换机 的 routing_key 路由键 相对应的话。那么消费者就会接收到相应消息。至此,整个传递过程结束。


三、补充

    注释代码

# -*- coding: utf-8 -*-    __author__ = 'lpe234'  __date__ = '2014-12-15'    import amqp    """  amqp rabbitmq DEMO测试  先启动 amqp_consumer.py 消费者,创建  """      conn = amqp.Connection(host='localhost:5672', userid='guest', password='guest', virtual_host='/', insist=False)  # 每个channel都被分配了一个整数标识,自动由Connection()类的.channel()方法维护。可以使用.channel(x)来指定channel标识。  chan = conn.channel(channel_id=1)  # 当多个 channel_id 相同时,实际为同一 channel  # 现在已经有了一个可用的连接和channel。  # 现在将代码分为两类,生产者(producer)和消费者(consumer)。    # 创建一个消费者程序,会创建一个"po_box"的队列和一个叫"sorting_room"的交换机。  chan.queue_declare(queue='po_box', durable=True, exclusive=False, auto_delete=False)  chan.exchange_declare(exchange='sorting_room', type='direct', durable=True, auto_delete=False)  # 创建了"po_box" 的队列,durable重启之后会重新建立,auto_delete=False最后一个消费者断开之后不会自动删除,exclusive私有队列  # 创建了"sorting_room"的交换机,type指定交换机类型,  # 现在已经有了一个可以接收消息的队列和一个可以发送消息的交换机。不过还需要创建一个绑定    chan.queue_bind(queue='po_box', exchange='sorting_room', routing_key='jason')  # 这个绑定非常直接,任何送到交换机"sorting_room"的具有路由键"jason"的消息都被路由到"po_box" 队列    # 现在有两个方法,从队列中取出消息。  # 第一个是调用 chan.basic_get(), 主动从队列中拉出下一条消息(若没有则返回 None)  # msg = chan.basic_get(queue='po_box')  # if msg:  #     print msg.body  #     chan.basic_ack(msg.delivery_tag)      # 第二种  def receive_callback(msg):      print msg.body    chan.basic_consume(queue='po_box', no_ack=True, callback=receive_callback, consumer_tag='testtag')  while True:      chan.wait()  chan.basic_cancel('testtag')  # chan.wait() 放在无限循环里面,这个函数会等待在队列上,知道下一个消息到达队列。  # chan.basic_cancel() 用来注销该回调函数  # no_ack 这个参数,可以传给 chan.basic_get(), chan.basic_consume。是否等待回馈,