storm基础框架分析

KristalKort 8年前
   <h2>背景</h2>    <p>前期收到的问题:</p>    <p>1、在Topology中我们可以指定spout、bolt的并行度,在提交Topology时Storm如何将spout、bolt自动发布到每个服务器并且控制服务的CPU、磁盘等资源的?</p>    <p>2、Storm处理消息时会根据Topology生成一棵消息树,Storm如何跟踪每个消息、如何保证消息不丢失以及如何实现重发消息机制?</p>    <p>上篇:storm是如何保证at least once语义的</p>    <p>回答了第2个问题。</p>    <p>本篇来建立一个基本的背景,来大概看下构成storm流式计算能力的一些基础框架,并部分回答第一个问题。</p>    <p>worker、executor、task的关系</p>    <p><img src="https://simg.open-open.com/show/658dfd84461f14bcb7286db47ca55c8e.png"></p>    <p>worker是一个进程.</p>    <p>executor是一个线程,是运行tasks的物理容器.</p>    <p>task是对spout/bolt/acker等任务的逻辑抽象.</p>    <p>supervisor会定时从zookeeper获取拓补信息topologies、任务分配信息assignments及各类心跳信息,以此为依据进行任务分配。</p>    <p>在supervisor同步时,会根据新的任务分配情况来启动新的worker或者关闭旧的worker并进行负载均衡。</p>    <p>worker通过定期的更新connections信息,来获知其应该通讯的其它worker。</p>    <p>worker启动时,会根据其分配到的任务启动一个或多个executor线程。这些线程仅会处理唯一的topology。</p>    <p>如果有新的tolopogy被提交到集群,nimbus会重新分配任务,这个后面会说到。</p>    <p>executor线程负责处理多个spouts或者多个bolts的逻辑,这些spouts或者bolts,也称为tasks。</p>    <p>具体有多少个worker,多少个executor,每个executor负责多少个task,是由配置和指定的parallelism-hint共同决定的,但这个值并不一定等于实际运行中的数目。</p>    <p>如果计算出的总的executors超过了nimbus的限制,此topology将不会得到执行。</p>    <p>并行度的作用:</p>    <p><img src="https://simg.open-open.com/show/8baa0cbe9b739d2420a48a9156764799.jpg"></p>    <p><img src="https://simg.open-open.com/show/2069a3979389299d5930fef543e58b14.jpg"></p>    <p><img src="https://simg.open-open.com/show/de6e488cbed50ea951a3b94f2923f29a.jpg"></p>    <p>上述代码会在nimbus进行任务分配时调用:</p>    <p><img src="https://simg.open-open.com/show/6a2b5820cfc0bb57e3d90fe93a7b96f0.jpg"></p>    <h2>线程模型及消息系统</h2>    <p>基本关系如下所示:</p>    <p><img src="https://simg.open-open.com/show/90b05a12e161fc4c8ebd4d5f1f699933.png"></p>    <p>worker启动时,除了启动多个executor线程,还会启动多个工作线程来负责消息传递。</p>    <p>worker会订阅到transfer-queue来消费消息,同时也会发布消息到transfer-queue,比如需要进行远程发布时(某个bolt在另一个进程或者节点上)。</p>    <p>executor会发布消息到executor-send-queue比如emit tuple,同时会从executor-receive-queue消费消息,比如执行ack或者fail。</p>    <p>batch-transfer-worker-handler线程订阅到executor-send-queue消费消息,并将消息发布到transfer-queue供worker消费。</p>    <p>transfer-thread会订阅到transfer-queue消费消息,并负责将消息通过socket发送到远程节点的端口上。</p>    <p>worker通过receive-thread线程来收取远程消息,并将消息以本地方式发布到消息中指定的executor对应的executor-receive-queue。executor按第3点来消费消息。</p>    <p>以上所有的消息队列都是Disruptor Queue,非常高效的线程间通讯框架。</p>    <p>所谓本地发布,是指在worker进程内及executor线程间进行消息发布。</p>    <p>所谓远程发布,是指在worker进程间、不同的机器间进行消息发布。</p>    <h2>任务调度及负载均衡</h2>    <p>任务调度的主要角色</p>    <p><img src="https://simg.open-open.com/show/b83ddbfb6d428a890d01fb47a8c0188f.png"></p>    <p>nimbus将可以工作的worker称为worker-slot.</p>    <p>nimbus是整个集群的控管核心,总体负责了topology的提交、运行状态监控、负载均衡及任务重新分配,等等工作。</p>    <p>nimbus分配的任务包含了topology代码所在的路径(在nimbus本地)、tasks、executors及workers信息。</p>    <p>worker由node + port唯一确定。</p>    <p>supervisor负责实际的同步worker的操作。一个supervisor称为一个node。所谓同步worker,是指响应nimbus的任务调度和分配,进行worker的建立、调度与销毁。</p>    <p>其通过将topology的代码从nimbus下载到本地以进行任务调度。</p>    <p>任务分配信息中包含task到worker的映射信息task -> node + host,所以worker节点可据此信息判断跟哪些远程机器通讯。</p>    <h2>集群的状态机</h2>    <p><img src="https://simg.open-open.com/show/cd27b33556437b465a8900ab02283447.png"></p>    <h3>集群状态管理</h3>    <p>集群的状态是通过一个storm-cluster-state的对象来描述的。</p>    <p>其提供了许多功能接口,比如:</p>    <p>zookeeper相关的基本操作,如create-node、set-data、remove-node、get-children等.</p>    <p>心跳接口,如supervisor-heartbeat!、worker-heatbeat!等.</p>    <p>心跳信息,如executors-beats等.</p>    <p>启动、更新、停止storm,如update-storm!等.</p>    <p>如下图所示:</p>    <p><img src="https://simg.open-open.com/show/8b7d3085fb257cf6bce92070159d45c9.png"></p>    <h3>任务调度的依据</h3>    <p>zookeeper是整个集群状态同步、协调的核心组件。</p>    <p>supervisor、worker、executor等组件会定期向zookeeper写心跳信息。</p>    <p>当topology出现错误、或者有新的topology提交到集群时,topologies信息会同步到zookeeper。</p>    <p>nimbus会定期监视zookeeper上的任务分配信息assignments,并将重新分配的计划同步到zookeeper。</p>    <p>所以,nimbus会根据心跳、topologies信息及已分配的任务信息为依据,来重新分配任务,如下图所示:</p>    <p><img src="https://simg.open-open.com/show/b87f3d9d731c0149d8144288a150f44d.png"></p>    <h3>任务调度的时机</h3>    <p>如上文的状态机图所示,rebalance和do-reblalance(比如来自web调用)会触发mk-assignments即任务(重新)分配。</p>    <p>同时,nimbus进程启动后,会周期性地进行mk-assignments调用,以进行负载均衡和任务分配。</p>    <p>客户端通过storm jar ... topology 方式提交topology,会通过thrift接口调用nimbus的提交功能,此时会启动storm,并触发mk-assignments调用。</p>    <p>topology提交过程</p>    <p>一个topology的提交过程:</p>    <p>非本地模式下,客户端通过thrift调用nimbus接口,来上传代码到nimbus并触发提交操作.</p>    <p>nimbus进行任务分配,并将信息同步到zookeeper.</p>    <p>supervisor定期获取任务分配信息,如果topology代码缺失,会从nimbus下载代码,并根据任务分配信息,同步worker.</p>    <p>worker根据分配的tasks信息,启动多个executor线程,同时实例化spout、bolt、acker等组件,此时,等待所有connections(worker和其它机器通讯的网络连接)启动完毕,此storm-cluster即进入工作状态。</p>    <p>除非显示调???kill topology,否则spout、bolt等组件会一直运行。</p>    <p>主要过程如下图所示:</p>    <p><img src="https://simg.open-open.com/show/1f9c1f152129327571dfb311f76bcb06.png"></p>    <h2>结语</h2>    <p>以上,基本阐述了storm的基础框架,但未涉及trident机制,也基本回答了问题1。</p>    <p> </p>    <p>来自:http://www.uml.org.cn/bigdata/201608024.asp</p>    <p> </p>