• 1. 公开课主题:浅析Storm流式计算主讲人:肖康
  • 2. 主要内容Storm简介 Storm原理和架构 Storm实战2
  • 3. Storm简介 - 案例分析统计某个服务被访问的客户端地域分布情况 日志中记录了客户端IP 把IP转换成地域 按照地域进行统计
  • 4. Storm简介 - 案例分析Hadoop貌似就可以轻松搞定 日志存HDFS 运行MapReduce程序 map做ip提取,转换成地域 reduce以地域为key聚合,计数统计 从HDFS取出结果
  • 5. Storm简介 - 案例分析如果有时效性要求呢? 小时级:还行,每小时跑一个MapReduce Job 10分钟:还凑合能跑 5分钟 :够呛了,等槽位可能要几分钟呢 1分钟 :算了吧,启动Job就要几十秒呢 秒级 :… 分析MapReduce不满足时效性要求的原因 一批数据启动一次,处理完进程停止 启动本身是需要时间的:输入切分、调度、起进程 共享集群Job比较杂,可能需要等待资源 所有数据都需要读写磁盘
  • 6. Storm简介 - 案例分析解决方案 进程常驻运行 数据在内存中 Storm正好适合这种需求logMQ从MQ取日志解析ip 转成地域内存累加计数定期输出redisredis
  • 7. Storm简介 - 是什么Storm是一个分布式实时流式计算平台 分布式 水平扩展:通过加机器、提高并发数就提高处理能力 自动容错:自动处理进程、机器、网络异常 实时:数据不写磁盘,延迟低(毫秒级) 流式:不断有数据流入、处理、流出 开源:twitter开源,社区很活跃
  • 8. Storm简介 – 和其他大数据计算平台对比Storm vs. MapReduce 常驻运行 流式处理:数据来一点处理一点 实时处理:数据在内存中不写磁盘 DAG模型:可以组合多个阶段 Storm vs. queue+worker系统 维护简单:无需维护queue,queue和worker对应关系 扩展简单:加机器,提高并发,重新提交 自动容错:进程、机器、网络异常,消息可重发
  • 9. Storm简介 - 典型应用场景请求应答(同步) DRPC 实时图片处理 实时网页分析 流式处理(异步) 逐条处理 数据之间无关系:如实时日志格式标准化入库 分析统计 数据之间有关系(聚合等):如日志pv/uv统计、访问热点统计9ClientDRPC ServerSpoutBoltReturn图片X图片X图片X图片Y图片Y图片YClientMQSpoutBolt1StorageN行日志N行日志N行日志Bolt2ippv/uvreceived
  • 10. 主要内容Storm简介 Storm原理和架构 Storm实战10
  • 11. Storm原理和架构 - 计算模型DAG计算模型 Tuple:数据处理单元,一个Tuple由多个字段组成 Stream:持续的Tuple流 Spout:从外部获取数据,输出原始Tuple Bolt:接收Spout/Bolt输出的Tuple,处理,输出新Tuple11
  • 12. Storm原理和架构 - 计算模型DAG计算模型(续) Grouping Tuple从上游到某个下游多个并发task的分组方式 shuffleGrouping:随机发给某个下游task fieldsGrouping:按照某几个字段做hash取模,发给对应task allGrouping:发给下游全部task Topology 一个应用的spout, bolt, grouping组合
  • 13. Storm原理和架构 - 架构 nimbus:集群的master,负责管理supervisor、调度topology supervisor:负责运行topology的worker worker:负责实际的计算和网络通信 zookeeper:负责存储以上模块的状态,做到高可用 13nimbuszookeeperzookeeperzookeepersupervisorsupervisorsupervisorsupervisorworkerworker
  • 14. Storm原理和架构 - 数据流程executor执行spout.nextTuple()或bolt.execute(),调用emit生成新的tuple,放到executor的transfer queue executor transfer thread把自己transfer queue里面的tuple放到worker transfer queue worker transfer thread把transfer queue里面的tuple序列化发送到远程的worker worker receive thread分别从网络收数据,反序列化成tuple放到对应executor的receive queue executor receive thread从自己的receive queue取出tuple,调用bolt.execute() 14
  • 15. 主要内容Storm简介 Storm原理和架构 Storm实战15
  • 16. Storm实战 - 集群部署依赖包 java 6+ python 2.6.6 zookeeper 3.4.5 apache-storm-0.9.2-incubating 集群规划 1个nimbus + 3/5个zookeeper + n个supervisor16
  • 17. Storm实战 - 集群部署部署步骤 修改$STORM_HOME/conf/storm.yaml配置,至少要修改两项 zk地址:storm.zookeeper.servers nimbus地址:nimbus.host 启动zookeeper 启动drpc:$STORM_HOME/bin/storm drpc 启动nimbus:$STORM_HOME/bin/storm nimbus 启动ui:$STORM_HOME/bin/storm ui 启动supervisor:$STORM_HOME/bin/storm supervisor 启动logviewer:$STORM_HOME/bin/storm log viewer 在浏览器中打开webui查看集群状态http://nimbus.host:8080 更多参考storm官网的详细文档17
  • 18. Storm实战 - 应用开发APIJava API Spout nextTuple():回调函数,循环触发 ack(id):回调函数,消息成功处理时触发 fail(id):回调函数,消息超时时触发 Bolt execute(Tuple input):回调函数,数据触发 collector.emit(tuple):通过collector向下游发送tuple collector.ack(tuple):通过collector确认已经成处理输入tuple 一定要用Java吗? NO, ShellBolt18
  • 19. Storm实战 - 应用开发public class MD5Topology { public static class MD5Bolt extends BaseBasicBolt { @Override public void execute(Tuple tuple, BasicOutputCollector collector) { String input = tuple.getString(0); // 获取来自DRPCSpout的实际输入数据 String output = MD5Util.getMD5Str(input); // 往下游ReturnBolt emit数据 // 第一个字段是计算的结果,这里是md5串 // 第二个字段是来自DRPCSpout的return-info,是一个json串,包括drpc request id,server host、port collector.emit(new Values(output, tuple.getString(1))); } @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("result", "return-info")); // 声明输出两个字段,和emit是对应的 } } public static void main(String[] args) throws Exception { TopologyBuilder builder = new TopologyBuilder(); builder.setSpout("DRPCSpout", new DRPCSpout(args[0]), 2); builder.setBolt("MD5Bolt", new MD5Bolt(), 4) // 参数依次是spout/bolt id,spout/bolt对象,并发度 .shuffleGrouping("DRPCSpout"); // 指定上游以及grouping方式 builder.setBolt("ReturnBolt", new ReturnResults(), 2) .shuffleGrouping("MD5Bolt"); Config conf = new Config(); conf.setNumWorkers(4); // 设置worker个数 StormSubmitter.submitTopology(args[0], conf, builder.createTopology()); } }19
  • 20. Storm实战 - 应用开发编译打包成md5.jar 提交topology $STORM_HOME/bin/storm jar md5.jar MD5Topology md5 在webui查看topology状态 发送drpc请求 incubator-storm/storm-core/src/py/storm/DistributedRPC-remote -h host:port -f execute md5 abcd20
  • 21. 联系我们: 新浪微博:ChinaHadoop 微信公号:ChinaHadoop让你的数据产生价值!