• 1. 1Metamorphosis介绍 通用产品-消息中间件 无花(wuhua@taobao.com) 2011-11-11
  • 2. 主要内容 Meta是什么,特征和适用场景 在公司的应用状况 原理和内部实现 Meta的使用 性能
  • 3. 3Metamorphosis是什么? A distributed publish-subscribe messaging system 开源MQ-kafka的Java版本 Linkedin开源的MQ 《The metamorphosis》——卡夫卡的代表作 设计原则 消息都是持久的,保存在磁盘 吞吐量第一 消费状态保存在客户端 分布式,生产者、服务器和消费者都可分布。
  • 4. 4跟kafka有什么不同? 用java替换scala 实现完全重写 存储结构上采用自定义结构,更简洁紧凑 Consumer API没有采用kafka的stream方式, 而是同时实现同步获取和异步订阅两种方式,更接近JMS和Notify
  • 5. 5跟kafka有什么不同? 添加了实时统计功能和协议 客户端的连接复用 实现HA复制 实现发送的软负载 实现事务机制 支持http协议 实现消息数据的无痛迁移和水平扩展
  • 6. 6Meta有什么特性? 生产者、服务器和消费者都可分布 消息存储顺序写 性能极高,吞吐量大 支持消息顺序 支持本地和XA事务 客户端pull,随机读,利用sendfile系统调用,zero-copy ,批量拉数据
  • 7. 7Meta有什么特性? 支持消费端事务 支持消息广播模式 支持异步发送消息 支持http协议 支持消息重试和recover 数据迁移、扩容对用户透明 消费状态保存在客户端
  • 8. 8Meta能做什么收发消息作为普通的消息发布订阅模型使用 收集数据收集和传输日志 Tail4j — 日志传输Agent 目录扫描 断点续传 文件正则匹配 自动编码探测同步缓存使用广播模式,同步各台机器的本地缓存顺序消息需要消息有序的场景,例如数据库同步其他……更多其他场景,例如acookie等大吞吐量场景
  • 9. Meta的应用ctu acookie商城的会员营销一淘GeneSNS卖家中心新业务充值平台湖畔聚划算交易安全系统TDDL数据同步支付宝淘宝每天消息3T左右,6000多万条更多……支付宝每天消息6-9T,大约250亿条TOP
  • 10. Metamorphosis优缺点服务端无状态 消息存储顺序写,性能极高。 客户端pull,随机读,利用zero-copy sendfile提高发送效率。 客户端有状态 保存pull的偏移量offset 异常情况下的消息暂存和recover 利用zookeeper在同一个group的consumer之间做负载均衡 实时性取决于pull间隔,消费者处理能力差异不影响服务端 太多的无效pull请求可能浪费服务器资源 合理设定间隔 合理设计协议
  • 11. Meta的部署结构BrokerBrokerZK集群ProducerProducerProducerConsumerConsumerConsumerslaveSlaverSlaverSlaverConsumer异步复制异步复制同步复制异步复制
  • 12. Meta ServerMeta的系统结构gecko/notify-remotingMessage StoreNetwork ProcessorstatsHttpMeta SlaveMeta MasterHttp ClientMeta ClientMeta Manager ToolsCpp ClientMeta OpsZookeeperProducer AppConsemer AppHDFSDBHbaseAgent
  • 13. 主要概念的对应关系Broker-1Topic-1partition0partition1partition2partition3Topic-2partition0partition1partition2partition3Broker-2Topic-1partition0partition1partition2Topic-3partition0partition1partition2partition3messages
  • 14. Producer和Broker之间的负载均衡partition1partition2partition3producer
  • 15. 同一Group的Consumer和Broker之间的负载均衡partition1partition2partition3consumer1consumer2consumer3consumer4每个分区针对每个消费者group只挂一个消费者; 同一个group的多余的消费者不参与消费
  • 16. 同一Group的Consumer和Broker之间的负载均衡partition1partition2partition2consumer1consumer2consumer3consumer4当分区数目(n)大于单个group的消费者数目(m)的时候, 则有n%m个消费者需要额外承担1/n的消费任务。 n足够大的时候,仍然可以认为负载平均分配partition3partition4
  • 17. 异步发送消息 发送消息调用所消耗的时间少(0.01-0.02ms) 使用者不用关心发送成功或失败 降低可靠性,降低对业务逻辑的影响 吞吐量更大 不关心发送结果 希望发消息对业务流程不产生影响(耗时上、逻辑上) 收集日志特点使用场景
  • 18. 异步发送消息 流量控制Why?
  • 19. 异步发送消息 流量控制11秒钟发送条数4k消息,2w2发送1条移动的距离3自定义窗口大小Legth=N(消息大小)/4k
  • 20. 服务端FAQ如何保证高可用? Master/Slave方案(同步和异步复制两种); 集群 持久化数据保留多久的? 看业务要求,可以为每个Topic配置不同的保留时间 消息发送成功后,已经写入服务器磁盘? 可以说是,也可以说不是,因为存在os和磁盘缓存 每条消息在返回应答前都先write 每1000条消息force一次,每10秒force一次,可为全局或某个Topic配置参数 可以配置Group commit方式 消息是怎么保存的? 每条消息保存在一个分区,分区内是一系列文件,顺序写,固定大小切换文件
  • 21. 发送端FAQ为什么发送消息前需要先publish topic? 为了根据topic从zk获取有效的broker列表 发送消息怎么保证有序? 只保证单线程发送的消息有序 只保证发送同一个分区的消息有序 实现自定义分区选择器 消息可以带属性吗? 仅允许带一个字符串属性,消费者可依此过滤 消息ID怎么产生? Long类型,在发送成功后由服务器端返回 默认-1 42位时间 + 10 位 brokerId + 12位递增数字 消息体怎么产生? 消息体仅要求是一个byte[]数组 序列化方式完全由用户决定
  • 22. 消费者的FAQ实时性问题如何解决? 服务端提高刷盘频率,客户端减少pull时间间隔 ConsumerConfig.setMaxDelayFetchTimeInMills(long maxDelayFetchTimeInMills) 消费者是单线程还是多线程拉消息? 多线程(默认为CPU的个数),也可以配置只有一个线程拉 ConsumerConfig.setFetchRunnerCount(int fetchRunnerCount) 处理消息的回调方法是运行在单线程还是多线程中? 多线程拉,不同分区消息的回调是运行在多线程环境中的 多线程拉,相同分区消息的回调可以认为是运行在单线程环境中的 单线程拉,运行在单线程中
  • 23. 消费者的FAQ为什么在调用subscribe还要调用一次completeSubscribe? 因为subscribe可以调用多次,为了减少跟zk交互次数,subscribe会将订阅信息保存在内存,completeSubscribe的时候一次性处理 Pull的偏移量保存在哪里? 默认保存在zk 我们还提供文件、数据库的存储实现。 OffsetStorage接口,可自主实现。 偏移量多长时间保存一次? 默认5秒,可设置 ConsumerConfig.setCommitOffsetPeriodInMills(long commitOffsetPeriodInMills) 新加入的消费者不想接收到以前发的消息怎么办? 新增的group和广播新增的机器有这个问题 1.3-SNAPSHOT及其之后的版本支持可设置 ConsumerConfig.setConsumeFromMaxOffset()
  • 24. 消费者的FAQ如果我暂时无法处理某个消息,又想继续往下走,怎么办? 消息处理失败如何重试? 可选择跳过,设置最大重试次数,超过即跳过,默认5次 ConsumerConfig.setMaxFetchRetries(int maxFetchRetries) 跳过的消息将保存在消费者本地磁盘或者notify,并自动Recover重试。 如果不想往下走就把这个参数设为int的最大值 消息能保证不重复接收吗? 因为每个分区物理隔离消息,理论上每个消费者接收的消息不会重复 在consumer重新负载均衡的时候,可能由于offset保存延迟,导致重复接收极小部分消息。 可以设置pull请求的时间间隔吗? 可以,你可以设置允许的最大延迟时间,当响应为空的时候,每次递增最大延迟时间的1/10做延迟,不会超过设定的最大延迟时间。默认5秒。
  • 25. 消费者的FAQ广播消息和非广播消息的区别? Topic1Message1Message2Group1consumer1consumer2Group2consumer3consumer4group1非广播接收group2广播接收
  • 26. Meta的使用 发送消息// New session factory,强烈建议使用单例 MessageSessionFactory sessionFactory = new MetaMessageSessionFactory(new MetaClientConfig()); // create producer, 强烈建议使用单例 MessageProducer producer = sessionFactory.createProducer(); // publish topic final String topic = “meta-test”; //调一次就够了 producer.publish(topic); byte []data=... // send message SendResult sendResult = producer.sendMessage(new Message(topic,data));
  • 27. Meta的使用 异步发送// New session factory,强烈建议使用单例 AsyncMessageSessionFactory sessionFactory = new AsyncMetaMessageSessionFactory(new MetaClientConfig()); // create producer, 强烈建议使用单例 MessageProducer producer = sessionFactory. createAsyncProducer(); // publish topic final String topic = “meta-test”; //调一次就够了 producer.publish(topic); byte []data=... // send message SendResult sendResult = producer.sendMessage(new Message(topic,data));
  • 28. Meta的使用 同步消费// subscribed topic final String topic = “meta-test”; // consumer group final String group = “meta-example”; // create consumer MessageConsumer consumer = sessionFactory.createConsumer(new ConsumerConfig(group)); // start offset long offset = 0; MessageIterator it = null; // fetch messages while ((it = consumer.get(topic, new Partition("0-0"), offset, 1024 * 1024)) != null) { while (it.hasNext()) { final Message msg = it.next(); System.out.println("Receive message " + new String(msg.getData())); } // move offset forward offset += it.getOffset(); }
  • 29. Meta的使用 异步消费(推荐)// subscribed topic final String topic = "meta-test"; // consumer group final String group = “meta-example”; // create consumer , 强烈建议使用单例 MessageConsumer consumer = sessionFactory.createConsumer(new ConsumerConfig(group)); // subscribe topic consumer.subscribe(topic, 1024 * 1024, new MessageListener() { public void recieveMessages(Message message) { System.out.println(“Receive message ” + new String(message.getData())); } public Executor getExecutor() { // Thread pool to process messages,maybe null. return null; } }); // complete subscribe,调用一次就够了 consumer.completeSubscribe();
  • 30. Meta的使用 广播方式接收消息// New session factory,强烈建议使用单例 BroadcastMessageSessionFactory sessionFactory = new MetaBroadcastMessageSessionFactory (new MetaClientConfig()); // create broadcast consumer , 强烈建议使用单例 MessageConsumer consumer = sessionFactory. createBroadcastConsumer(new ConsumerConfig(group));
  • 31. Meta的使用 高级使用方式 发送事务消息 接收端事务 消费端使用notify recover 发送顺序消息 发送消息自定义分区选择器 MessageSessionFactory.createProducer(PartitionSelector partitionSelector)
  • 32. Meta的性能测试场景刷盘阀值消息大小平均每次发送消耗时间每秒钟发送的消息数CPU Utilization (服务端)IO wait(服务端)Average Load(服务端)单台Meta,10个同组默认2562.3ms4500013%0.152.5单台Meta,10个同组默认2k4ms2600014%13.5单台Meta,10个同组一条一刷2k20.1ms46005%0.1710.8单台Meta,10个同组默认4k6.4ms1600012%12单台Meta,10个不同组默认4k6.3ms1580017%0.46单台Meta,本地事务发送事务日志由操作系统决定刷盘4k3.5ms675015%0.597.5单台Meta,异步发送默认4k0.015ms1248310%0.552.1单台Meta,异步发送默认4k3ms33000-3900011%0.72.1
  • 33. 参考资料更详细的资料请参考:
  • 34. Q&A
  • 35. (本页无文本内容)