Storm笔记

jopen 9年前

用了一段时间Storm后的笔记。发现可以记的东西不多,证明Storm挺简单的,你只要遵循一些简单的接口与原则,就能写出大规模实时消息处理的程序。

为什么用Storm

没接触前把Storm想象得很强大,接触后觉得它就那样可有可无,再后来又觉得没有了全部自己做也麻烦。

1. 集群管理:支持应用的部署,工作节点的管理(任务分配、HA、Scalable等),Metrics的收集。

2. 数据流的传输与路由:支持多种数据在各处理节点间自由流动,基于Netty的高效传输机制,支持轮询,多播,按属性分组的路由。

3. 数据高可靠性的保证:还支持实现数据流动了多个节点后,在某个节点的失败,可以引发数据从源头开始重传的高级功能

按Storm的官方说法,你也可以自己搭建许多消息队列和worker组成的网络来实现实时处理,但是:

乏味:你大部份开发时间花费在配置消息发送到哪里,部署worker,还有部署中间队列。你所关心的实时处理逻辑对应到你的代码的只占了很少的比例 。

脆弱:你要自己负责保持每个worker和队列正常工作。

伸缩时痛苦:当单个worker或队列的消息吞吐量太高时,你需要分区,即数据如何分散。你需要重新配置其它worker,让它们发送消息到新位置。这导致删除或添加部件都可能失败。

缺点

核心代码是用Clojure写成,翻看代码非常不便。其实,它现在很多新的外部模块都用Java来写了,另外阿里同学翻写了一个JStorm

其他流处理方案

Spark-Streaming: 总是有人问为什么不用Spark Stream,首先它是Micro-Batch风格的准实时方案,间隔一般设到500ms。另外,它的消息流拓扑好像没Storm那样可以随便乱入,有时候必须弄个DB来做中间传输。

Samza, Linkedin的产品,在Linkedin里与Apache Kafka搭配。不过它的使用者没有Apache Kafka多。待研究。

Pulsar:来自eBay的开源实时分析平台,看文章很强,待研究。

自定义Spout

Storm对可靠消息传输的支持程度,很大程度上依赖于Spout的实现。

并不默认就是支持高可靠性的,collector emit的时候要传输msgId,要自己处理ack(msgId)和fail(msgId)函数。而很多spout其实没有这样做,只有Kafka Spout做的比较正规。

默认的,如果三十秒,消息流经的所有下游节点没有都ack完毕,或者有一个节点报fail,则触发fail(msgId)函数。

因为ack/fail的参数只有msgId,这就要Spout想在ack/fail时对消息源如Kafka/JMS进行ack/fail,或fail时想重发消息,如果需要完整的消息体而不只是msgId才能完成时,要自己把msgId对应的消息存起来(会撑爆内存么)。

另外,因为每个Spout 是单线程的,会循环的调用nextTuple()的同时,调用ack()或fail()处理结果。所以nextTuple()如果没消息时不要长期阻塞,但也不要完全不阻塞,参考storm-starter里的spout,等个50ms好了。在JStorm里,就改为了两条分开的线程。

另外,spout有时是每次被调用nextTuple()时主动去pull消息的,有时是被动接收消息后存放在 LinkedBlockingQueue里,netxtTuple()时从Queue里取消息的。如果消息源没有ack机制,Spout突然crash的话,存在queue里的消息也会丢失。

Spout还有个Max Pending的配置,如果有太多消息没有ack,它就不会再调nextTuple()。但如果上游消息源是主动Push的,消息还是会源源不断的来,累积在queue里。

RichBolt vs BasicBolt

直接用BasicBolt,会在execute()后自动ack/fail Tuple,而RichBolt则需要自行调用ack/fail。

那什么时候使用RichBolt? Bolt不是在每次execute()时立刻产生新消息,需要异步的发送新消息(比如聚合一段时间的数据再发送)时,又或者想异步的ack/fail原消息时就需要。

BasicBolt的prepare()里并没有collector参数,只在每次execute()时传入collector。而RichBolt刚好相反,你可以在初始化时就把collector保存起来,用它在任意时候发送消息。

另外,如果用RichBolt的collector,还要考虑在发送消息时是否带上传入的Tuple,如果不带,则下游的处理节点出错也不会回溯到Spout重发。用BasicBolt则已默认带上。

异常处理

如果希望上游的Spout重发消息,则在BasicBolt中抛出FailedException 或在RichBolt中直接fail掉Tuple。
其他情况下,execute()方法不应该抛出任何异常,或者你故意抛出异常使得Topology停转。

状态管理

不像Linkedin的Samza,Storm完全不管数据的持久化,Bolt如果需要历史数据,一般会使用路由规则,比如相同用户的数据路由到同一个Bolt,然后Bolt自己在内存里管理数据。

当然,也可以用共享的NoSQL存储如Redis,但此时压力就都在Redis上了。

定时任务

如下设置,所有Bolt都会在定时收到一条消息,一般用于触发sliding windows的统计等。

conf.put(Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS, 300);

如下函数用于判断是否Tick消息

protected static boolean isTickTuple(Tuple tuple) {
return tuple.getSourceComponent().equals(Constants.SYSTEM_COMPONENT_ID)
&& tuple.getSourceStreamId().equals(Constants.SYSTEM_TICK_STREAM_ID);
}

拓扑的定义

除了使用Java代码,还可以使用Yaml来动态定义拓扑,见 https://github.com/ptgoetz/flux

并发度的定义及命令行动态扩容见官方文档,另对于worker进程数的建议是Use one worker per topology per machine。

序列化

Tuple除了传基本类型与数组,AraayList,HashMap外,也可以传一下Java对象的。Storm使用Kyro作为序列化框架,据测比Hessian什么的都要快和小。但一定注册这些Java对象的类型,否则就会使用Java默认的序列化。

参看官方文档,有两种方式注册类型,一个是storm.yaml文件,一个是Config类的registerSerialization方法。如无特殊需求,直接注册需要序列化的类就可以了,不需要自己实现一个Serializer。

Spout和Bolt的构造函数只会在submit Topology时调一次,然后序列化起来,直接发给工作节点,工作节点里实例化时不会被调用里,所以复杂的成员变量记得都定义成transient,在open(),prepare()里初始化及连接数据库等资源。

另外,需要实现close()函数清理资源,但该函数不承诺在worker进程被杀时保证被调用。

fields grouping的算法

按名称提取fileds的值,取hash,再按当前的可选Tasks取模。所以,动态扩展Task数量,或某Task失效被重建的话,都可能让原来的分配完全乱掉。

与其他开源技术的集成

比如External目录里的一堆,storm-contrib 里也有一堆,目前支持Jdbc,Redis,HBase,HDFS,Hive,甚至还有Esper,目标都是通过配置(比如SQL及Input/Output fields),而非代码,或尽量少的代码,实现交互。有时也可以不一定要直接用它们,当成Example Code来看就好了。

另外,与传统的Java应用思路相比,Bolt/Spout与资源连接时,比较难实现共享连接池的概念,连接池一般都是每个Bolt/Spout实例自用的,要正确处理其连接数量。

HA的实现

如果Worker进程失效,Supervisor进程会检查 Worker的心跳信息,重新进行创建。

如果整个机器节点失效,Nimbus会在其他节点上重新创建。

Supervisor进程和Nimbus进程,需要用Daemon程序如monit来启动,失效时自动重新启动。
因为它们在进程内都不保存状态,状态都保存在本地文件和ZooKeeper,因此进程可以随便杀。

如果Nimbus进程所在的机器都直接倒了,需要在其他机器上重新启动,Storm目前没有自建支持,需要自己写脚本实现。
即使Nimbus进程不在了,也只是不能部署新任务,有节点失效时不能重新分配而已,不影响已有的线程。
同样,如果Supervisor进程失效,不影响已存在的Worker进程。

Zookeeper本身已经是按至少三台部署的HA架构了。

运维管理

Storm UI也是用Clojure写的,比较难改,好在它提供了Restful API,可以与其他系统集成,或基于它重写一个UI。

Metrics的采样率是1/20(topology.stats.sample.rate=0.05),即Storm随机从20个事件里取出一个事件来进行统计,命中的话,counter 直接+20。

在旧版本的Storm使用旧版的ZooKeeper要启动数据清理的脚本,在新版上只要修改ZooKeeper的配置文件zoo.cfg, 默认是24小时清理一次 autopurge.purgeInterval=24

日志的配置在logback/cluster.xml文件里,Storm的日志,天然的需要Logstash + ElasticSearch的集中式日志方案

storm.local.dir 要自己建,而且不支持~/ 代表用户根目录。

storm.yaml的默认值在 https://github.com/apache/storm/blob/master/conf/defaults.yaml

Tunning

1. 内部传输机制的各种配置,见文档

2. 屏蔽ack机制,当可靠传输并不是最重要时。可以把Acker数量设为0,可以让Spout不要发出msgId,或者bolt发送消息时不传之前的Tuple。

资料

来自:http://calvin1978.blogcn.com/articles/stormnotes.html