RabbitMQ架构

RabbitMQ   2018-02-09 21:51:28 发布
您的评价:
     
0.0
收藏     0收藏
文件夹
标签
(多个标签用逗号分隔)

RabbitMQ是一个高可用的消息中间件,支持多种协议和集群扩展。并且支持消息持久化和镜像队列,适用于对消息可靠性较高的场合,基本模型如下。

其客户端使用方式

from kombu import Connection, Exchange, Queue

media_exchange = Exchange('media', 'direct', durable=True)
video_queue = Queue('video', exchange=media_exchange, routing_key='video')

def process_media(body, message):
    print body
    message.ack()

# connections
with Connection('amqp://guest:guest@localhost//') as conn:

    # produce
    producer = conn.Producer(serializer='json')
    producer.publish({'name': '/tmp/lolcat1.avi', 'size': 1301013},
                      exchange=media_exchange, routing_key='video',
                      declare=[video_queue])

    # the declare above, makes sure the video queue is declared
    # so that the messages can be delivered.
    # It's a best practice in Kombu to have both publishers and
    # consumers declare the queue. You can also declare the
    # queue manually using:
    # video_queue(conn).declare()

    # consume
    with conn.Consumer(video_queue, callbacks=[process_media]) as consumer:
        # Process messages and handle events on all channels
        while True:
            conn.drain_events()

示例中的发布端和消费端是同一方,而实际中的使用方式一般有多种场景,topic模式、fanout模式、direct模式和RPC模式。

  1. topic模式,按照设置的路由信息(routing key)将消息路由到一个或者多个消费端,而消息只能由一个消费者消费一次。一个消费者可以设置多个路由信息,可以同时获取多个消费者发送的消息;
  2. fanout模式,与topic模式唯一的区别是同一消息会发送到订阅(binding)的多个消费者;
  3. direct模式,一对一模式,实际中比较少用;
  4. RPC模式,结合topic和direct模式,发送消息的同时指定要接受的消息。

RabbitMQ监控树

为了高可靠,Erlang中实际的工作进程(Erlang进程,并不是系统进程)都有一个监控进程,监控进程负责(一个或多个)工作进程的创建、销毁和重启。监控进程和工作进程的关系如图。

  1. 方块图是监控进程;
  2. 圆圈是工作进程;
  3. 方块中的”1“(one_for_one)和”a“(one_for_all)代表不同的监控策略

one_for_one 监控策略,一个工作进程崩溃,则只重启崩溃的工作进程。

one_for_all监控策略,一个工作进程崩溃,则销毁并重启所有工作进程

在RabbitMQ中还有一种 simple_one_for_one监控策略 ,与 one_for_one监控策略 相同,只不过重启工作进程时的启动参数是固定的。RabbitMQ网络框架也遵循该原则。

RabbitMQ消息架构

当client端链接服务器时,RabbitMQ会启动一系列监控和工作进程来处理网络连接。

为了降低TCP链接数量,多个消费者共享同一个链接Connection,但是每个消费者独享一个管道channel,用consumer_tag标识。consumer_tag在Connection唯一,从1开始累加,当重连接时需要匹配该tag。每个消费者对应独立的一套rabbit_channel_sup_sup->rabbit_channel_sup->rabbit_channel|rabbit_writer|rabbit_limiter系列进程。

RabbitMQ网络框架时序图

client建立链接后,RabbitMQ通过tcp_acceptor进程处理accept成功后返回的clientfd。

rabbit_reader从TCP链接中读取数据,然后根据协议回调函数处理客户端的各种请求。

RabbitMQ消息处理流程

RabbitMQ先验证权限;然后检查Exchange是否存在,不存在则创建;检查消息是否合法以及是否需要confirm等;根据路由信息选择消费队列;检查消费队列是否存在,有则将消息发送给消息队列;检查消费者是否存在,存在则将消息发送给消费者client端。

RabbitMQ会根据不同的消息的不同类型做不同的处理:

  1. 不持久化消息,如果没有消费者则直接丢掉,不会入消费队列;如果有,则先入消息队列,按照入队顺序依次发送给消费者。
  2. 持久化消息,将消息持久化成功后才给发送端发ack,然后再发送给消费者。

(完)

 

来自:https://fanchao01.github.io/blog/2018/02/09/rabbitmq-arch/

 

扩展阅读

Rabbitmq集群高可用测试
分布式消息系统尝试(rabbitmq, celery, redis)
如何优雅的使用RabbitMQ
[RabbitMQ]队列
消息中间件的技术选型心得-RabbitMQ、ActiveMQ和ZeroMQ

为您推荐

iOS 第三方开源库-----AFNetworking
Python 并行任务技巧
Struts2 s:doubleselect级联下拉框 详解析
淘宝架构一
贝叶斯分类

更多

RabbitMQ
相关文档  — 更多
相关经验  — 更多
相关讨论  — 更多