消息中间件解决方案

sunshine42 贡献于2016-09-16

作者 dadi  创建于2014-07-22 13:22:00   修改者lin  修改于2014-08-22 03:44:00字数17107

文档摘要:目前在房仓云平台中存在多个业务场景需要使用消息中间的技术。如注册场景、数据同步场景、订单跟进场景、以及未来需要实现的用户行为和网站运营统计、日志采集等。现在云平台中消息的生产者发送消息到消息的消费者是通过数据库方式实现,即消息的生产者把消息通过API保存进数据库,消息的消费者通过定时轮询数据库获取消息;基于定时轮询方式实现的消息机制存在消息不及时、对数据库产生压力等问题
关键词:

 消息中间件解决方案 修订记录: 版本号 编写日期 修改内容 作者 审核人 V1.0 2014.07.28 初稿 王军华 V2.0 2014.08.8 添加名词、接口设计 王军华 1.概述 3 / 26 2.设计原则 4 3.专业术语 4 4.技术选型 5 4.1 ActiveMQ 5 4.1.1介绍 5 4.2 Kafka消息系统 6 4.2.1介绍 6 4.3 RocketMQ消息中间件 6 4.3.1介绍 6 4.建设方案 8 4.1.部署结构 8 4.2硬件要求 9 4.3软件要求 9 4.4 OS调优 9 5.安装部署 10 5.1安装流程 10 5.2操作系统优化 10 5.3软件安装 11 5.3.1 Linux环境 11 5.4 寻址原理 11 5.5 常用命令 12 5.6 Broker 集群搭建 13 5.6.1单个Master 13 5.6.2 多 Master 模式 14 5.6.3 多Master多Slave模式,异步复制 14 / 26 5.6.4 多Master多Slave模式,同步双写 15 5.7 Broker重启方式 16 6.业务及系统集成 16 6.1 业务对接消息处理流程 16 6.1.1 发送普通消息 17 6.1.2 发送顺序消息 17 6.1.3 订阅普通消息 18 6.1.4 订阅顺序消息 20 6.1.5 订阅广播消息 22 6.2 公共接口 23 6.2.1 统计数据接口 23 6.2.2 失败消息存储接口 23 6.2.3 失败消息重发定时任务 23 7.风险评估 23 7.1消息完整性 23 7.2消息统计功能正确性 24 1.概述 目前在房仓云平台中存在多个业务场景需要使用消息中间的技术。如注册场景、数据同步场景、订单跟进场景、以及未来需要实现的用户行为和网站运营统计、日志采集等。现在云平台中消息的生产者发送消息到消息的消费者是通过数据库方式实现,即消息的生产者把消息通过API保存进数据库,消息的消费者通过定时轮询数据库获取消息;基于定时轮询方式实现的消息机制存在消息不及时、对数据库产生压力等问题, 具体限制如下: / 26 1、定时轮询数据不能有太高的频率,影响消息处理的及时性; 2、云平台每分钟会产生几万、几十万消息需要处理,当有大量消息需要处理时对数据库构成压力,使影响近一步扩散甚至影响整个云平台的性能。 云平台在快速的发展过程中迫切需要消息中间件的投入使用,提高消息的及时性,降低对平台的影响,实现各个业务系统之间解耦。 2.设计原则 根据平台消息处理需求,在消息中间件设计时必须考虑以下原则: 1、 高性能:满足云平台未来用户达数万家,每秒处理数十万条消息。 2、 高可用:具备负载均衡功能,支持横向或纵向扩展,具备消息持久化、消息容错、消息失败恢复机制,有成熟案例。 3、 易于性和维护复杂度:部署方便,文档丰富,团队成员熟悉的语言实现,方便团队自定义功能。 4、 消息的可靠性:对消息的传递过程中有完善的容错机制和恢复机制。 3.专业术语 ▶Producer 消息生产者,负责产生消息,一般由业务系统负责产生消息。 ▶Consumer 消息消费者,负责消费消息,一般是后台系统负责异步消费。 ▶Push Consumer Consumer 的一种,应用通常向 Consumer 对象注册一个 Listener 接口,一旦收到消息,Consumer 对象立刻回调 Listener 接口方法。 ▶Pull Consumer Consumer 的一种,应用通常主动调用 Consumer 的拉消息方法从 Broker 拉消息,主动权由应用控制。 ▶Producer Group 一类 Producer 的集合名称,这类 Producer 通常发送一类消息,且发送逻辑一致。 ▶Consumer Group 一类 Consumer 的集合名称,这类 Consumer 通常消费一类消息,且消费逻辑一致。 / 26 ▶Broker 消息中转角色,负责存储消息,转发消息,一般也称为 Server。在 JMS 规范中称为 Provider。 ▶广播消费 一条消息被多个 Consumer 消费, 即使这些 Consumer 属于同一个 Consumer Group,消息也会被 Consumer Group 中的每个 Consumer 都消费一次,广播消费中的 Consumer Group 概念可以认为在消息划分方面无意义。 ▶集群消费 一个 Consumer Group 中的 Consumer 实例平均分摊消费消息。例如某个 Topic 有 9 条消息,其中一个 Consumer Group 有 3 个实例(可能是 3 个进程,或者 3 台机器),那么每个实例只消费其中的 3 条消息。 ▶顺序消息 消费消息的顺序要同发送消息的顺序一致,在 RocketMQ 中,主要指的是局部顺序,即一类消息为满足顺序性,必须 Producer 单线程顺序发送,且发送到同一个队列,这样 Consumer 就可以按照 Producer 发送的顺序去消费消息。 ▶普通顺序消息 顺序消息的一种,正常情况下可以保证完全的顺序消息,但是一旦发生通信异常,Broker 重启,由于队列总数发生变化,哈希取模后定位的队列会变化,产生短暂的消息顺序不一致。 如果业务能容忍在集群异常情况(如某个 Broker 宕机或者重启)下,消息短暂的乱序,使用普通顺序方式比较合适。 ▶严格顺序消息 顺序消息的一种,无论正常异常情况都能保证顺序,但是牺牲了分布式 Failover 特性,即 Broker 集群中只要有一台机器不可用,则整个集群都不可用,服务可用性大大降低。 如果服务器部署为同步双写模式,此缺陷可通过备机自动切换为主避免,不过仍然会存在几分钟的服务不可用。 (依赖同步双写,主备自动切换,自动切换功能目前还未实现) 目前已知的应用只有数据库 binlog 同步强依赖严格顺序消息,其他应用绝大部分都可以容忍短暂乱序,推荐使用普通的顺序消息。 ▶Message Queue 在 RocketMQ 中,所有消息队列都是持久化,长度无限的数据结构,所谓长度无限是 / 26 指队列中的每个存储单元都是定长,访问其中的存储单元使用 Offset 来访问,offset 为 java long 类型,64 位,理论上在 100年内不会溢出,所以认为是长度无限,另外队列中只保存最近几天的数据,之前的数据会按照过期时间来删除。 也可以认为 Message Queue 是一个长度无限的数组,offset 就是下标。 4.技术选型 目前消息中间件主流开源产品有两大类:一是以JMS、AMQP协议实现的企业级消息中间件,代表产品有ActiveMQ、RabbitMQ等,另一种以Kafka、Jafka、RocketMQ高吞吐量的分布式发布订阅消息系统。 针对云平台的特定使用场景,对各种消息中间件进行测试、综合对比的情况下选择使用RocketMQ消息中间件。 RocketMQ是Kafka Java语言的实现并对消息的持久化进行优化,以保障消息在高并发的情况下的可靠性。 4.1 ActiveMQ 4.1.1介绍 1. 多种语言和协议编写客户端。语言: Java, C, C++, C#, Ruby, Perl, Python, PHP。应用协议: OpenWire,Stomp REST,WS Notification,XMPP,AMQP   2. 完全支持JMS1.1和J2EE 1.4规范 (持久化,XA消息,事务)   3. 对Spring的支持,ActiveMQ可以很容易内嵌到使用Spring的系统里面去,而且也支持Spring2.0的特性   4. 通过了常见J2EE服务器(如 Geronimo,JBoss 4, GlassFish,WebLogic)的测试,其中通过JCA 1.5 resource adaptors的配置,可以让ActiveMQ可以自动的部署到任何兼容J2EE 1.4 商业服务器上   5. 支持多种传送协议:in-VM,TCP,SSL,NIO,UDP,JGroups,JXTA   6. 支持通过JDBC和journal提供高速的消息持久化   7. 从设计上保证了高性能的集群,客户端-服务器,点对点   8. 支持Ajax   9. 支持与Axis的整合   10. 可以很容易得调用内嵌JMS provider,进行测试 优点:Java 实现,历史悠久,业务集成方便,成熟案例众多,适合企业级应用。 缺点:性能一般、基于JMS协议实现,集群方式采用共享存储或通过数据库方式,不适合大数据高并发场景。 / 26 4.2 Kafka消息系统 4.2.1介绍 Kafka开发的主要初衷目标是构建一个用来处理海量日志,用户行为和网站运营统计等的数据处理框架。 在结合了数据挖掘,行为分析,运营监控等需求的情况下,需要能够满足各种实时在线和批量离线处理应用场合对低延迟和批量吞吐性能的要求。 从需求的根本上来说,高吞吐率是第一要求,其次是实时性和持久性。 业务场景: · "动态汇总(News feed)"功能。将你朋友的各种活动信息广播给你 · 相关性以及排序。通过使用计数评级(count rating)、投票(votes)或者点击率( click-through)判定一组给定的条目中那一项是最相关的. · 安全:网站需要屏蔽行为不端的网络爬虫(crawler),对API的使用进行速率限制,探测出扩散垃圾信息的企图,并支撑其它的行为探测和预防体系,以切断网站的某些不正常活动。 · 运营监控:大多数网站都需要某种形式的实时且随机应变的方式,对网站运行效率进行监控并在有问题出现的情况下能触发警告。 · 报表和批处理: 将数据装载到数据仓库或者Hadoop系统中进行离线分析,然后针对业务行为做出相应的报表,这种做法很普遍。 案例:Twitter、DataSift、LinkedIn 等 4.3 RocketMQ消息中间件 4.3.1介绍 Rocket的队列模型消息中间件,服务器使用Java语言编写,可在多种软硬件平台上部署。客户端支持Java、C++编程语言。单台服务器可支持1万以上个消息队列,通过扩容服务器,队列数几乎可任意横向扩展。每个队列都是持久化、长度无限(取决于磁盘空间大小)、并且可从队列任意位置开始消费。 / 26 业务场景: · 日志传输,高吞吐量的日志传输,这本来也是kafka的强项。 · 消息广播功能,如广播缓存配置失效。 · 数据的顺序同步功能,如MySQL binlog复制。 · 分布式环境下(broker、producer、consumer都为集群)的消息路由,对顺序和可靠性有极高要求的场景。 · 作为一般MQ来使用的其他功能。 Rocket相对于Kafka特有的一些功能: · 文本协议设计,非常透明,支持类似memcached stats的协议来监控broker · 纯Java实现,从通讯到存储,从client到server都是重新实现。 · 提供事务支持,包括本地事务和XA分布式事务 · 支持HA复制,包括异步复制和同步复制,保证消息的可靠性 · 支持异步发送消息 · 消费消息失败,支持本地恢复 · 多种offset存储支持,数据库、磁盘、zookeeper,可自定义实现 · 支持group commit,提升数据可靠性和吞吐量。(目前kafka已实现) · 支持消息广播模式 · 一系列配套项目:Python/Ruby/C/C++客户端、Twitter Storm的Spout、Tail4j等。 案例:淘宝、支付宝、阿里B2B部分、腾讯、京东等。 ■ Mysql binlog 同步,日消息量百亿+ ■ 订单类应用 ■ 流计算 ■ IM等实时消息领域 ■ Cache同步 ■ 双十一、双十二的削峰填谷 / 26 4.建设方案 消息中间件部署随着云平台的用户的使用情况进行动态扩展。 实施过程分2个阶段: 第一阶段为试运行阶段,选择消息及时性高准确性不高场景小量进行实施。 当系统平稳运行一个月左右进行其他业务场景改造。 4.1.部署结构 图表:1-1网络部署图 一、如图所示,Producer集群 的 producer 实例有三个,可能是部署在三个机器上的三个进程,也可能是一台机器上的三个进程。每个实例都会发送 TopicA 的消息。 二、ConsumerI集群 有三个实例,如果是集群消费方式,那么每个实例消费 TopicA 的 1/3 的消息,如果是广播消费方式,那么每个实例消费全量的消息。 / 26 逻辑部署结构: 图表:1-2逻辑部署结构 4.2硬件要求 · CPU 8核以上 · 内存 48G以上 · 磁盘 SAS15000/SSD raid10 · 网卡 千兆网卡 4.3软件要求 · 操作系统 Linux 2.6.32-220.23.2.ali927.el5.x86_64,必须是64位 · 文件系统 使用Ext4更佳,当然其他文件系统也可以 · JDK, 1.6.0.32,必须是64位。 4.4 OS调优 · 包括OS虚拟内存、文件系统IO调度算法、文件句柄等参数的调优. · 大容量磁盘挂载到/home目录 / 26 5.安装部署 5.1安装流程 图5-1安装部署流程图 流程描述: 1、 准备工作,选择满足配置要求的服务器,Broker服务器需要大容量磁盘挂载/home目录,根据需要对服务器进行os参数调优。 2、 安装JDK,配置环境变量,最低版本要求JDK, 1.6.0.32 必须是64位。 3、 安装 RocketMQ消息中间件。 4、 在服务器上启动 Name Server 服务。 5、 在 Broker 服务器上配置NAMESRV_ADDR 环境变量 6、 分别在不同的服务器上启动Broker 服务。 7、 业务系统访问Name Server 服务获取Broker服务器列表,发送消息。 5.2操作系统优化 在生产环境部署Broker前,必须要执行os.sh,对操作系统进行调优 注意: os.sh只能执行一次,需要sudo root权限 os.sh 文件位置: alibaba-rocketmq/bin / 26 5.3软件安装 注意:在5.3.1中描述的2个步骤完成软件安装,需要部署多台服务器重复上述操作。5.4以后进行集群的配置工作。 5.3.1 Linux环境 安装规范: 项目名称 默认值 系统用户: 用户名称: rocketmq 密码:rocketmq 安装目录: /opt/ 消息默认存储路径: /home/rocketmq/store 日志默认存储路径: /home/rocketmq/logs Name Server默认端口 9876 Broker默认端口 10911 开通端口:9876 、 10911 1、安装jdk环境 安装并设置环境变量 rpm -ivh jdk-7u51-linux-x64.rpm echo "export JAVA_HOME=/usr/java/default" >>/etc/profile source /etc/profile 2、安装RocketMQ 下载地址:https://github.com/alibaba/RocketMQ/releases 解压tar包, 目前使用V3.1.7版本 tar -zxvf alibaba-rocketmq-3.1.7.tar.gz cd alibaba-rocketmq/bin/ 3、配置环境变量 echo "export ROCKETMQ_HOME=/opt/alibaba-rocketmq" >>/etc/profile source /etc/profile / 26 5.4 寻址原理 Producer、Consumer、Broker都可以分布式部署。Producer、Consumer通过Name Server来寻找Broker,Broker启动后,会定时将要发布的Topic注册到Name Server。 · Broker如何指定Name Server地址(下面方式任选一种即可,优先级按照高低顺序排序)? o 启动Broker时,命令行方式指定 mqbroker -n "192.168.0.1:9876;192.168.0.2:9876" o 通过环境变量指定 export NAMESRV_ADDR="192.168.0.1:9876;192.168.0.2:9876" · Producer/Consumer如何指定Name Server地址(下面方式任选一种即可,优先级按照高低顺序排序)? o 通过代码中指定 producer.setNamesrvAddr("192.168.0.1:9876;192.168.0.2:9876"); 或 consumer.setNamesrvAddr("192.168.0.1:9876;192.168.0.2:9876"); o 通过Java启动参数中指定 -Drocketmq.namesrv.addr=192.168.0.1:9876;192.168.0.2:9876 o 通过环境变量指定 export NAMESRV_ADDR=”192.168.0.1:9876;192.168.0.2:9876 · Name Server部署方式 Name Server是一个几乎无状态、轻量的名称服务中心(源码小于1000行),部署非常简单,直接通过命令行启动即可,为保证可靠,可在多台机器启动多个实例。 · 查看端口使用情况 netstat -anp | grep java / 26 5.5 常用命令 一、启动Name Server Unix平台 nohup sh mqnamesrv & Windows平台(仅支持64位) mqnamesrv.exe 关闭Name Server sh mqshutdown namesrv 二、启动broker Unix平台 nohup sh mqbroker & Windows平台(仅支持64位) mqbroker.exe 关闭broker sh mqshutdown broker 三、更新或创建Topic sh mqadmin updateTopic -b 127.0.0.1:10911 -t TopicA 四、更新或创建订阅组 sh mqadmin updateSubGroup -b 127.0.0.1:10911 -g SubGroupA 5.6 Broker 集群搭建 推荐的几种 Broker 集群部署方式,这里的 Slave 不可写,但可读,类似于 Mysql 主备方式。 5.6.1单个Master / 26 在一台机器上部署NameServer、Broker应用 1. 启动Name Server nohup sh mqnamesrv & 2. 启动Broker、Producer、Consumer之前请设置环境变量 export NAMESRV_ADDR=127.0.0.1:9876 3. 启动Broker nohup sh mqbroker & 或 不配做环境变量,直接在启动的命令里添加 nohup sh mqbroker -n 127.0.0.1:9876 & 缺点:这种方式风险较大,一旦 Broker 重启或者宕机时,会导致整个服务不可用,不建议线上环境使用。 解决方式: 项目初期采用此种方式进行部署,使用定时任务1-5秒钟检测一次,发现宕机自动重启。提高系统可用性。发送失败的消息自动入库,当Broker恢复后自动进行重发。 5.6.2 多 Master 模式 一个集群无 Slave,全是Master,例如2个Master或者3个Master。 优点:配置简单,单个 Master 宕机或重启维护对应用无影响,在磁盘配置为 RAID10 时,即使机器宕机不可恢复情况下,由于 RAID10 磁盘非常可靠,消息也不会丢(异步刷盘丢失少量消息,同步刷盘一条不丢)。性能最高。 缺点:单台机器宕机期间,这台机器上未被消费的消息在机器恢复之前不可订阅,消息实时性会受到受到影响。 ### 先启动 Name Server,例如机器 IP 为:192.168.1.1:9876 nohup sh mqnamesrv & ### 在机器 A,启动第一个 Master nohup sh mqbroker -n 192.168.1.1:9876 -c $ROCKETMQ_HOME/conf/2m-noslave/broker-a.properties & / 26 ### 在机器 B,启动第二个 Master nohup sh mqbroker -n 192.168.1.1:9876 -c $ROCKETMQ_HOME/conf/2m-noslave/broker-b.properties & 5.6.3 多Master多Slave模式,异步复制 每个 Master 配置一个 Slave,有多对 Master-Slave,HA 采用异步复制方式,主备有短暂消息延迟,毫秒级。 优点:即使磁盘损坏,消息丢失的非常少,且消息实时性不会受影响,因为 Master 宕机后,消费者仍然可以从Slave消费,此过程对应用透明。不需要人工干预。性能同多 Master 模式几乎一样。 缺点:Master 宕机,磁盘损坏情况,会丢失少量消息。 ### 先启动 Name Server,例如机器 IP 为:192.168.1.1:9876 nohup sh mqnamesrv & ### 在机器 A,启动第一个 Master nohup sh mqbroker -n 192.168.1.1:9876 -c $ROCKETMQ_HOME/conf/2m-2s-async/broker-a.properties & ### 在机器 B,启动第二个 Master nohup sh mqbroker -n 192.168.1.1:9876 -c $ROCKETMQ_HOME/conf/2m-2s-async/broker-b.properties & ### 在机器 C,启动第一个 Slave nohup sh mqbroker -n 192.168.1.1:9876 -c $ROCKETMQ_HOME/conf/2m-2s-async/broker-a-s.properties & ### 在机器 D,启动第二个 Slave nohup sh mqbroker -n 192.168.1.1:9876 -c $ROCKETMQ_HOME/conf/2m-2s-async/broker-b-s.properties & 5.6.4 多Master多Slave模式,同步双写 每个 Master 配置一个 Slave,有多对 Master-Slave,HA 采用同步双写方式,主备都写成功,向应用返回成功。 优点:数据与服务都无单点,Master 宕机情况下,消息无延迟,服务可用性与数据可用性都非常高 缺点:性能比异步复制模式略低,大约低 10%左右,发送单个消息的 RT 会略高。目前主宕机后,备机不能自动切换为主机,后续会支持自动切换功能。 ### 先启动 Name Server,例如机器 IP 为:192.168.1.1:9876 nohup sh mqnamesrv & / 26 ### 在机器 A,启动第一个 Master nohup sh mqbroker -n 192.168.1.1:9876 -c $ROCKETMQ_HOME/conf/2m-2s-sync/broker-a.properties & ### 在机器 B,启动第二个 Master nohup sh mqbroker -n 192.168.1.1:9876 -c $ROCKETMQ_HOME/conf/2m-2s-sync/broker-b.properties & ### 在机器 C,启动第一个 Slave nohup sh mqbroker -n 192.168.1.1:9876 -c $ROCKETMQ_HOME/conf/2m-2s-sync/broker-a-s.properties & ### 在机器 D,启动第二个 Slave nohup sh mqbroker -n 192.168.1.1:9876 -c $ROCKETMQ_HOME/conf/2m-2s-sync/broker-b-s.properties & 以上Broker与Slave配对是通过指定相同的 brokerName 参数来配对,Master的BrokerId必须是 0,Slave 的BrokerId 必须是大于0的数。另外一个Master下面可以挂载多个 Slave,同一 Master 下的多个 Slave 通过指定不同的 BrokerId 来区分。 $ROCKETMQ_HOST 指的 RocketMQ 安装目录,需要用户自己设置此环境变量。 5.6.5多Name Server启动 一个集群无 Slave,全是Master,例如:2个Name server, 2个Master或者3个Master。 优点:配置简单,多个Name server主备自动切换,单个 Master 宕机或重启维护对应用无影响,在磁盘配置为 RAID10 时,即使机器宕机不可恢复情况下,由于 RAID10 磁盘非常可靠,消息也不会丢(异步刷盘丢失少量消息,同步刷盘一条不丢)。性能最高。 缺点:单台机器宕机期间,这台机器上未被消费的消息在机器恢复之前不可订阅,消息实时性会受到受到影响。 ### 先分别在2台服务器上启动 Name Server,例如机器 IP 为:192.168.1.1:9876 、192.168.1.2:9876 nohup sh mqnamesrv & ### 在机器 A,配置name server环境变量,启动第一个 Master nohup sh mqbroker -n "192.168.0.221:9876;192.168.0.222:9876" -c $ROCKETMQ_HOME/conf/2m-noslave/broker-a.properties & ### 在机器 B,配置name server环境变量,启动第二个 Master / 26 nohup sh mqbroker -n "192.168.0.221:9876;192.168.0.222:9876" -c $ROCKETMQ_HOME/conf/2m-noslave/broker-b.properties & 5.7 Broker重启方式 Broker 重启可能会导致正在发往这台机器的的消息发送失败, RocketMQ 提供了一种优雅关闭 Broker 的方法,通过执行以下命令会清除 Broker 的写权限,过 40s 后,所有客户端都会更新 Broker 路由信息,此时再关闭 Broker 就不会发生发送消息失败的情况,因为所有消息都发往了其他 Broker。 sh mqadmin wipeWritePerm -b brokerName -n namesrvAddr 6.业务及系统集成 6.1 业务对接消息处理流程 图6-1业务系统消息处理流程 / 26 流程描述: 1、 业务系统(消息生产者)发送消息给消息系统,消息系统接收到消息以后写入磁盘。 2、 消息系统成功写入磁盘以后返回给业务系统(消息生产者)发送状态。 3、 业务系统(消息生产者)判断消息发送是否成功,SEND_OK代表成功。 4、 发送成功调用消息系统统计处理接口添加统计数据。 5、 发送失败调用消息系统存储失败消息,消息系统定时任务定时进行消息重复操作。 6、 业务系统(消息消费者)主动到消息系统拉取消息,处理完成以后调用消息系统统计处理接口添加统计数据。 6.1.1 发送普通消息 /** * 下面这段代码表明一个Producer对象可以发送多个topic,多个tag的消息。 * 注意:send方法是同步调用,只要不抛异常就标识成功。但是发送成功也可会有多种状态,
* 例如消息写入Master成功,但是Slave不成功,这种情况消息属于成功,但是对于个别应用如果对消息可靠性要求极高,
* 需要对这种情况做处理。另外,消息可能会存在发送失败的情况,失败重试由应用来处理。 */ DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName"); producer.start(); Message msg = new Message("TopicTest1",// topic                     "TagA",// tag                     "OrderID001",// key                     ("Hello MetaQ").getBytes());// body     SendResult sendResult = producer.send(msg);     System.out.println(sendResult); 6.1.2 发送顺序消息  MQProducer producer = new DefaultMQProducer("please_rename_unique_group_name"); producer.start(); String[] tags = new String[] { "TagA", "TagB", "TagC", "TagD", "TagE" }; for (int i = 0; i < 100; i++) {        // 订单ID相同的消息要有序        int orderId = i % 10;       Message msg = new Message("TopicTestjjj", tags[i % tags.length], "KEY" + i,                             ("Hello RocketMQ " + i).getBytes());       SendResult sendResult = producer.send(msg, new MessageQueueSelector() {                     @Override                     public MessageQueue select(List mqs, Message msg, Object arg) {                         Integer id = (Integer) arg; / 26                         int index = id % mqs.size();                         return mqs.get(index);                     }                 }, orderId);                 System.out.println(sendResult);             }             producer.shutdown(); 6.1.3 订阅普通消息 import java.util.List; import com.alibaba.rocketmq.client.consumer.DefaultMQPushConsumer; import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext; import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus; import com.alibaba.rocketmq.client.consumer.listener.MessageListenerConcurrently; import com.alibaba.rocketmq.client.exception.MQClientException; import com.alibaba.rocketmq.common.consumer.ConsumeFromWhere; import com.alibaba.rocketmq.common.message.MessageExt; public class PushConsumer {     /** * 当前例子是PushConsumer用法,使用方式给用户感觉是消息从RocketMQ服务器推到了应用客户端。
* 但是实际PushConsumer内部是使用长轮询Pull方式从Broker拉消息,然后再回调用户Listener方法
*/     public static void main(String[] args) throws InterruptedException, MQClientException {         /** * 一个应用创建一个Consumer,由应用来维护此对象,可以设置为全局对象或者单例
* 注意:ConsumerGroupName需要由应用来保证唯一 */         DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("CID_222");         consumer.setNamesrvAddr("10.235.170.7:9877");         //consumer.setNamesrvAddr("127.0.0.1:9876"); / 26         /** * 订阅指定topic下tags分别等于TagA或TagC或TagD */         consumer.subscribe("TopicTest1", "TagA || TagC || TagD");         /** * 订阅指定topic下所有消息
* 注意:一个consumer对象可以订阅多个topic */         consumer.subscribe("TopicTest2", "*");         consumer.subscribe("TopicTest3", "*");         consumer.subscribe("BenchmarkTest", "*");         /** * 设置Consumer第一次启动是从队列头部开始消费还是队列尾部开始消费
* 如果非第一次启动,那么按照上次消费的位置继续消费 */         consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);         consumer.registerMessageListener(new MessageListenerConcurrently() {             /** * 默认msgs里只有一条消息,可以通过设置consumeMessageBatchMaxSize参数来批量接收消息 */             @Override             public ConsumeConcurrentlyStatus consumeMessage(List msgs,                     ConsumeConcurrentlyContext context) {                 //System.out.println(Thread.currentThread().getName() + " Receive New Messages: " + msgs);                 MessageExt msg = msgs.get(0);                 if (msg.getTopic().equals("TopicTest1")) {                     // 执行TopicTest1的消费逻辑                     if (msg.getTags() != null && msg.getTags().equals("TagA")) {                         // 执行TagA的消费                     }                     else if (msg.getTags() != null && msg.getTags().equals("TagC")) {                         // 执行TagC的消费                     }                     else if (msg.getTags() != null && msg.getTags().equals("TagD")) {                         // 执行TagD的消费                     }                 }                 else if (msg.getTopic().equals("TopicTest2")) {                     // 执行TopicTest2的消费逻辑 / 26                 }                 return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;             }         });         /** * Consumer对象在使用之前必须要调用start初始化,初始化一次即可
*/         consumer.start();         System.out.println("Consumer Started.");     } } 6.1.4 订阅顺序消息 import java.util.List; import java.util.concurrent.atomic.AtomicLong; import com.alibaba.rocketmq.client.consumer.DefaultMQPushConsumer; import com.alibaba.rocketmq.client.consumer.listener.ConsumeOrderlyContext; import com.alibaba.rocketmq.client.consumer.listener.ConsumeOrderlyStatus; import com.alibaba.rocketmq.client.consumer.listener.MessageListenerOrderly; import com.alibaba.rocketmq.client.exception.MQClientException; import com.alibaba.rocketmq.common.consumer.ConsumeFromWhere; import com.alibaba.rocketmq.common.message.MessageExt; /** * 顺序消息消费,带事务方式(应用可控制Offset什么时候提交) */ public class Consumer {     public static void main(String[] args) throws MQClientException {         DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name_3");         /** * 设置Consumer第一次启动是从队列头部开始消费还是队列尾部开始消费
* 如果非第一次启动,那么按照上次消费的位置继续消费 / 26 */         consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);         consumer.subscribe("TopicTest", "TagA || TagC || TagD");         consumer.registerMessageListener(new MessageListenerOrderly() {             AtomicLong consumeTimes = new AtomicLong(0);             @Override             public ConsumeOrderlyStatus consumeMessage(List msgs, ConsumeOrderlyContext context) {                 context.setAutoCommit(false);                 System.out.println(Thread.currentThread().getName() + " Receive New Messages: " + msgs);                 this.consumeTimes.incrementAndGet();                 if ((this.consumeTimes.get() % 2) == 0) {                     return ConsumeOrderlyStatus.SUCCESS;                 }                 else if ((this.consumeTimes.get() % 3) == 0) {                     return ConsumeOrderlyStatus.ROLLBACK;                 }                 else if ((this.consumeTimes.get() % 4) == 0) {                     return ConsumeOrderlyStatus.COMMIT;                 }                 else if ((this.consumeTimes.get() % 5) == 0) {                     context.setSuspendCurrentQueueTimeMillis(3000);                     return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;                 }                 return ConsumeOrderlyStatus.SUCCESS;             }         });         consumer.start();         System.out.println("Consumer Started.");     } } / 26 6.1.5 订阅广播消息 import java.util.List; import com.alibaba.rocketmq.client.consumer.DefaultMQPushConsumer; import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext; import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus; import com.alibaba.rocketmq.client.consumer.listener.MessageListenerConcurrently; import com.alibaba.rocketmq.client.exception.MQClientException; import com.alibaba.rocketmq.common.consumer.ConsumeFromWhere; import com.alibaba.rocketmq.common.message.MessageExt; import com.alibaba.rocketmq.common.protocol.heartbeat.MessageModel; /** * PushConsumer,广播方式订阅消息 * */ public class PushConsumer {     public static void main(String[] args) throws InterruptedException, MQClientException {         DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name_1");         /** * 设置Consumer第一次启动是从队列头部开始消费还是队列尾部开始消费
* 如果非第一次启动,那么按照上次消费的位置继续消费 */         consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);         consumer.setMessageModel(MessageModel.BROADCASTING);         consumer.subscribe("TopicTest", "TagA || TagC || TagD");         consumer.registerMessageListener(new MessageListenerConcurrently() {             @Override             public ConsumeConcurrentlyStatus consumeMessage(List msgs,                     ConsumeConcurrentlyContext context) {                 System.out.println(Thread.currentThread().getName() + " Receive New Messages: " + msgs);                 return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; / 26             }         });         consumer.start();         System.out.println("Broadcast Consumer Started.");     } } 6.2 公共接口 6.2.1 统计数据接口 6.2.2 失败消息存储接口 6.2.3 失败消息重发定时任务 6.2.4 注意事项 详细信息参考《RocketMQ用户指南.pdf》 名称 默认值 实际测试值 描述 maxMessageSize 131072 (128k) 131070 客户端限制的消息大小,超过报错,同时 服务端也会限制 / 26 7.风险评估 7.1消息完整性 7.2消息统计功能正确性 / 26

下载文档到电脑,查找使用更方便

文档的实际排版效果,会与网站的显示效果略有不同!!

需要 10 金币 [ 分享文档获得金币 ] 2 人已下载

下载文档