RocketMQ -原理简析(适合初级使用者)

hjlsoft 贡献于2014-11-23

作者 zeus  创建于2014-06-20 05:29:00   修改者zeus  修改于2014-07-02 03:48:00字数6553

文档摘要:RocketMQ使用基本概念ProducerGroup通常具有同样属性(处理的消息种类-topic、以及消息处理逻辑流程—分布式多个客户端)的一些producer可以归为同一个group。在事务消息机制中,如果某条发送某条消息的producer-A宕机,使得事务消息一直处于PREPARED状态并超时,则broker会回查同一个group的其他producer,确认这条消息应该commit还是rollback。ConsumerGroup具有同样逻辑消费同样消息的consumer,可以归并为一个group。
关键词:

RocketMQ使用 版本 时间 操作 作者 v0.1 2014-06-30 创建初始版本 何鹏 (hepenglff@163.com) 基本概念 ProducerGroup 通常具有同样属性(处理的消息种类-topic、以及消息处理逻辑流程—分布式多个客户端)的一些producer可以归为同一个group。在事务消息机制中,如果某条发送某条消息的producer-A宕机,使得事务消息一直处于PREPARED状态并超时,则broker会回查 同一个group的其他producer,确认这条消息应该commit还是rollback。 ConsumerGroup 具有同样逻辑消费同样消息的consumer,可以归并为一个group。同一个group内的消费者,可以共同消费(CLUSTERING)对应topic的消息,达到分布式并行处理的功能。 Topoic 消息的逻辑管理单位。 Queue 消息的物理管理单位。一个Topic下可以有多个Queue,Queue的引入使得消息存储可以分布式集群化,具有了水平扩展的能力。 作者 何鹏 关注分布式存储与计算相关框架,包括Hadoop、YARN、HBase、Storm、Spark、MQ等 peng.he.ia@gmail.com 消费进度管理 RocketMQ的broker端,不负责推送消息,无论消费者是否消费消息,都将消息存储起来。谁要消费消息,就向broker发请求获取消息,消费记录由consumer来维护。RocketMQ提供了两种存储方式来保留消费记录:一种是保留在consumer所在的服务器上;另一种是保存在broker服务器上。用户还可以自己实现相应的消费进度存储接口。 默认情况下,采用集群消费(CLUSTERING),会将记录保存在broker端;而采用广播消费(BROADCASTING)则会将消费记录保存在本地。 顺序消息 用户实现MessageQueueSelector为某一批消息(通常是有同样的唯一的标示ID),选择同一个Queue,则这一批消息的消费将是顺序消费(并由同一个consumer完成消费)。 事务消息 这样的消息有多个状态,并且其发送是两阶段的。第一个阶段发送PREPARED状态的消息,此时consumer是看不见这种状态的消息的,发送完毕后回调用户的TransactionExecutor接口,执行相应的事务操作(如数据库),当事务操作成功时,则对此条消息返回commit,让broker对该消息执行commit操作,成为commit状态的消息对consumer是可见的。 基本原理 总览 RocketMQ以Topic来管理不同应用的消息。对于生产者而言,发送消息是,需要指定消息的Topic,对于消费者而言,在启动后,需要订阅相应的Topic,然后可以消费相应的消息。Topic是逻辑上的概念,在物理实现上,一个Topic由多个Queue组成,采用多个Queue的好处是可以将Broker存储分布式化,提高系统性能。 RocketMQ中,producer将消息发送给Broker时,需要制定发送到哪一个队列中,默认情况下,producer会轮询的将消息发送到每个队列中(所有broker下的Queue合并成一个List去轮询)。 对于consumer而言,会为每个consumer分配固定的队列(如果队列总数没有发生变化),consumer从固定的队列中去拉取没有消费的消息进行处理。 作者 何鹏 关注分布式存储与计算相关框架,包括Hadoop、YARN、HBase、Storm、Spark、MQ等 peng.he.ia@gmail.com Queue-01 Queue-02 Queue-03 Queue-01 Queue-02 Queue-03 producer-01 producer-02 producerGroup-A consumer-01 consumer-03 consumerGroup-A consumer-03 broker-01   broker-02 Producer Producer端(属于client)的逻辑概述: MQProducer DefaultMQProducer TransactionMQProducer producer端的逻辑都比较简单,将消息发送到某个Queue中即可,具体发送到那个Queue可以由用户控制(MessageQueueSelector接口),默认情况下,将轮询方式选择Queue。在producer端,会从NameServer将所有Broker的Topic及对应的Queue信息(即:TopicRoute信息)拉取到本地,然后根据组建成一个List。因此在MessageQueueSelector,可以看到所有的Queue信息。 RocketMQ将topic的消息以多个Queue来管理,使得其较为容易的就可以进行水平扩展,提供系统吞吐力。这样分布带来的问题,就是从全局上不能做到顺序性(很多时候也并不需要全局上的顺序性)。 RocketMQ提到支持顺序消息,实际上是指基于Queue级别的顺序。用户将某些需要满足顺序的一批消息(比如电商某个订单号的一系列后续操作、比如数据库的某个主键的insert、delete、update等操作)发送到固定的某个Queue中,则从这个Queue消费消息的consumer,针对这一批消息是顺序消费。 问题1:针对顺序消息的队列,是否可以做到不停服务下的集群动态扩展? 作者 何鹏 关注分布式存储与计算相关框架,包括Hadoop、YARN、HBase、Storm、Spark、MQ等 peng.he.ia@gmail.com Consumer consumer逻辑稍微复杂一点。初步思考,consumer端至少需要处理: 1、 消息的获取 2、 offset(消费进度)的管理与存储 3、 集群消费模式下,Queue的分配问题(rebalance) RocketMQ对外提供了两种不同形式的Consumer:PushConsumer和PullConsumer。顾名思义,对于PullConsumer而言,用户需要主动调用相应的接口去拉取未消费的消息。对于PushConsumer而言,用户提供消息处理的CallBack,有未曾消费的消息时,会主动回调这个CallBack来处理消息。虽从用户角度而言,Consumer存在主动(pull)和被动(push),但RocketMQ本身的broker端仅仅保存所有的消息,并不负责push消息,因此PushConsumer的底层实现也是有一个长连接主动去broker上拉取未消费的消息,然后回调用户的callback逻辑。 PushConsumer端逻辑概述: 自己使用PushConsumer的代码非常简单: 1 2 3 4 5 6 7 8 9 10 11 DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(“groupName”); consumer.subscribe(“TopicName”, “*”); // a | b | c consumer.registerMessaeListener( new MessageListenerConcurrently() { public ConsumeConcurrentlyStatus consumeMessage(List msgs, ConsumeConcurrentlyContext context) { System.out.println(“Consume Message Num: ” + msgs.size()); return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); consumer.start(); // add shutdown hook to execute consumer.shutdown(); 第2行的subscribe即定于某个topic下,符合某些标签(tag)的消息。这个过滤会在服务端过滤(其实在consumer端也有过滤逻辑)。tag之间用”|”分割开。这些tag会被解析成SubscriptionData来保存信息,主要存储tag的字符串集合,以及这些tag对应的hashcode集合(在broker端的存储和过滤其实都是tag值对应的hashcode,可能是为了加速过滤以及节约存储空间)。 主要的逻辑在第10行调用start函数之后开始。Consumer的主要实现是DefaultMQPushConsumerImpl,其包含的对象关系简单如下图: 作者 何鹏 关注分布式存储与计算相关框架,包括Hadoop、YARN、HBase、Storm、Spark、MQ等 peng.he.ia@gmail.com DefaultMQPushConsumerImpl RebalancePushImpl MQClientFactory PullAPIWrapper OffsetStore ConsumeMessageService PullMessageService RebalanceService MQClientAPIImpl 虚线--为调用关系 实线--为包含关系 DefaultMQPushConsumerImpl中各个对象的主要功能如下: RebalancePushImpl: 主要负责决定,当前的consumer应该从哪些Queue中消费消息; PullAPIWrapper: 长连接,负责从broker处拉取消息,然后利用ConsumeMessageService回调用户的Listener执行消息消费逻辑; ConsumeMessageService: 实现所谓的“Push-被动”消费机制;从Broker拉取的消息后,封装成ConsumeRequest提交给ConsumeMessageSerivce,此service负责回调用户的Listener消费消息; OffsetStore: 维护当前consumer的消费记录(offset);有两种实现,Local和Rmote,Local存储在本地磁盘上,适用于BROADCASTING广播消费模式;而Remote则将消费进度存储在Broker上,适用于CLUSTERING集群消费模式; MQClientFactory: 大杂烩,负责管理client(consumer、producer),并提供多中功能接口供各个Service(Rebalance、PullMessage等)调用;大部分逻辑均在这个类中完成; 使用 Producer返回值 发送消息时,只有抛出异常,才是发送失败,其他情况下,根据如下返回值,应用层做相应取舍处理逻辑: SendStatus 返回值 解释 SEND_OK 发送成功 FLUSH_DISK_TIMEOUT 发送成功,但broker刷盘失败,此时如若服务器宕机,消息会丢失; FLUSH_SLAVE_TIMEOUT 写从失败;如果主宕机,消息丢失; SLAVE_NOT_AVAILABLE 从不可用; 注意:当配置多master无slave的集群时,若master的brokerRole为SYNC_MASTER,则发送消息会一直返回这个值;最新版本(3.1.14以上)事务消息将一直发送失败(事务消息中处理了返回值不为 作者 何鹏 关注分布式存储与计算相关框架,包括Hadoop、YARN、HBase、Storm、Spark、MQ等 peng.he.ia@gmail.com SEND_OK,则直接进行ROLL_BACK); 当应用方明确指出,producer发送成功为SEND_OK状态的消息对consumer才是可见的。可以采用事务消息来完成这个功能,RocketMQ 从3.0.14版本开始,对于事务消息,开始检查SendStatus,如果不为SEND_OK,则直接执行事务消息的回滚。 Consumer返回值 当使用PushConsumer(使用callBack回调执行应用消费逻辑) 非顺序消息(ConsumeConcurrentlyStatus) 返回值 解释 CONSUME_SUCCESS 消费成功 RECONSUME_LATER 消费失败,稍后重新消费这一批消息 RECONSUMER_LATER的解释: 这一批消息均会sendBack到broker上,稍后会重新消费这一批消息。可以通过设置参数,使得批量消费的“批量”为一条,这样可以一定程度避免重复消费。但这样设置后,可能效率较低。另外一种方法是在用户指定的CallBack(MessageListenerConcurrently)中,通过对应的ConsumeConcurrentlyContext参数来控制本批消息从哪一条之后重复消费。 具体方法是控制context的ackIndex变量。这个变量的意思是对于这一批消息(List),[0, ackIndex]内的消息是成功消费的,而(ackIndex, Lst.size)内的消息是消费失败的,如果返回值为RECONSUME_LATER,则对于失败范围的消息将调用sendBack回发到broker上(从代码看来,这个功能只对CLUSTERING消费模型的consumer生效,BROADCASTING的直接丢弃)。这里还有个小的tips,在调用SendBack失败后,会在consumer本地去尝试重复消费这些回发失败的消息(构造相应的ConsumeRequest)。这个处理模式(先消费,消费失败的消息尝试回发给broker,回发给broker失败的消息尝试在consumer端重新消费)一直尝试,直到消费成功或者回发到broker成功。 顺序消息(ConsumerOrderlyStatus) 返回值 解释 SUCCESS 消息处理成功 ROLLBACK 回滚消息—似乎用在了事务消息中 COMMIT 提交消息—似乎用在了事务消息中 SUSPEND_CURRENT_QUEUE_A_MOMENT 当前队列挂起一段时间 问题:消费端的ROLLBACK、COMMIT如何理解??? 普通消息 作者 何鹏 关注分布式存储与计算相关框架,包括Hadoop、YARN、HBase、Storm、Spark、MQ等 peng.he.ia@gmail.com 使用TIPS 集群搭建 1. 基本配置 使用./bin/mqbroker –p >conf/broker.conf,查看所有参数的默认取值;根据自己集群的需要修改对应的配置; 2. 集群选择 RocketMQ集群支持如下一些模式的配置: 集群模式 特点 适用场景 单Master 一个Broker实例; 或者多个Broker实例,但Topic只在某一个Broker上配置了; 测试 多Master 无Slave 多个Broker实例构成集群,且brokerID均为0(即角色都为Master) master挂掉后,这个master上未被消费的消息,暂时不能被消费; 可以容忍消息丢失(未被consumer消费)的应用场景;如日志收集 多Master多Slave 每个master有一个backup的slave HA高可用;当master-slave采用同步双写时(采用异步时,任可能存在部分未写入slave的消息丢失),master挂掉,也可以从slave处消费消息;但当master挂掉后,目前不支持自动的Failover(因此不支持producer的写); 疑问: 手动如何切换?是否需要修改Slave的配置为Master,然后重启broker实例?代价有点高,支持发送命令切换否? 当使用多master无slave的集群搭建方式时,master的brokerRole配置必须为ASYNC_MASTER。如果配置为SYNC_MASTER,则producer发送消息时,返回值的SendStatus会一直是SLAVE_NOT_AVAILABLE。 3. 系统参数优化 参考bin/os.sh中的参数,重点注意物理内存预留参数(vm.min_free_kbytes)。 作者 何鹏 关注分布式存储与计算相关框架,包括Hadoop、YARN、HBase、Storm、Spark、MQ等 peng.he.ia@gmail.com 4. broker的启动 采用集群模式时,启动broker时,需要制定nameserver 地址的list,这里一定要注意,要将所有的nameserver 地址都包含进去;因为rocketmq的nameserver之间并不会同步,均需要broker主动汇报;如果有3个nameserver: A B C,启动时只制定了A,忘记制定了B C,那么客户端如果刚好链接了B或C去获取broker的信息,则会获取失败。 消息使用 问题整理 问题: 1、服务器磁盘配置大概是怎样(重点想了解磁盘配置、磁盘总容量,例如: N * 4T SATA 7200 )? 2、服务器的磁盘做raid了吗?做的哪一种raid?刷磁盘模式是SYNC还是ASYNC? 3、rocketmq采用的哪种方式搭建(多master无slave、多master多slave)?如果是master-slave方式,主从同步是SYNC还是ASYNC方式? 解答: 1、一般是3T磁盘,实际是12个600,12* 600G的sas 15000转的磁盘做了raid10 2、刷盘模式通常是异步方式 3、大部分集群是不开启slave的, 有少部分集群会开启 sync方式的slave 作者 何鹏 关注分布式存储与计算相关框架,包括Hadoop、YARN、HBase、Storm、Spark、MQ等 peng.he.ia@gmail.com

下载文档到电脑,查找使用更方便

文档的实际排版效果,会与网站的显示效果略有不同!!

需要 8 金币 [ 分享文档获得金币 ] 3 人已下载

下载文档