Storm实验 -- 单词计数

fpcm 9年前
1. 使用mvn命令创建项目
mvn archetype:generate -DgroupId=storm.test -DartifactId=Storm01 -DpackageName=com.zhch.v1

然后编辑配置文件pom.xml,添加storm依赖 

<dependency>    <groupId>org.apache.storm</groupId>    <artifactId>storm-core</artifactId>    <version>0.9.4</version>  </dependency>

最后通过下述命令来编译项目,编译正确完成后导入到IDE中 

mvn install

当然,也可以在IDE中安装maven插件,从而直接在IDE中创建maven项目 


2. 实现数据源,用重复的静态语句来模拟数据源
package storm.test.v1;    import backtype.storm.spout.SpoutOutputCollector;  import backtype.storm.task.TopologyContext;  import backtype.storm.topology.OutputFieldsDeclarer;  import backtype.storm.topology.base.BaseRichSpout;  import backtype.storm.tuple.Fields;  import backtype.storm.tuple.Values;    import java.util.Map;    public class SentenceSpout extends BaseRichSpout {      private String[] sentences = {              "storm integrates with the queueing",              "and database technologies you already use",              "a storm topology consumes streams of data",              "and processes those streams in arbitrarily complex ways",              "repartitioning the streams between each stage of the computation however needed"      };      private int index = 0;        private SpoutOutputCollector collector;        @Override      public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {          outputFieldsDeclarer.declare(new Fields("sentence"));      }        @Override      public void open(Map map, TopologyContext topologyContext, SpoutOutputCollector spoutOutputCollector) {          this.collector = spoutOutputCollector;      }        @Override      public void nextTuple() {          this.collector.emit(new Values(sentences[index]));          index++;          if (index >= sentences.length) {              index = 0;          }            try {              Thread.sleep(1);          } catch (InterruptedException e) {          }      }  }

3. 实现语句分割bolt 

package storm.test.v1;    import backtype.storm.task.OutputCollector;  import backtype.storm.task.TopologyContext;  import backtype.storm.topology.OutputFieldsDeclarer;  import backtype.storm.topology.base.BaseRichBolt;  import backtype.storm.tuple.Fields;  import backtype.storm.tuple.Tuple;  import backtype.storm.tuple.Values;    import java.util.Map;    public class SplitSentenceBolt extends BaseRichBolt {      private OutputCollector collector;        @Override      public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) {          this.collector = outputCollector;      }        @Override      public void execute(Tuple tuple) {          String sentence = tuple.getStringByField("sentence");          String[] words = sentence.split(" ");          for (String word : words) {              this.collector.emit(new Values(word));          }      }        @Override      public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {          outputFieldsDeclarer.declare(new Fields("word"));      }  }

4. 实现单词计数bolt 

package storm.test.v1;    import backtype.storm.task.OutputCollector;  import backtype.storm.task.TopologyContext;  import backtype.storm.topology.OutputFieldsDeclarer;  import backtype.storm.topology.base.BaseRichBolt;  import backtype.storm.tuple.Fields;  import backtype.storm.tuple.Tuple;  import backtype.storm.tuple.Values;    import java.util.HashMap;  import java.util.Map;    public class WordCountBolt extends BaseRichBolt {      private OutputCollector collector;      private HashMap<String, Long> counts = null;        @Override      public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) {          this.collector = outputCollector;          this.counts = new HashMap<String, Long>();      }        @Override      public void execute(Tuple tuple) {          String word = tuple.getStringByField("word");          Long count = this.counts.get(word);          if (count == null) {              count = 0L;          }          count++;          this.counts.put(word, count);          this.collector.emit(new Values(word, count));      }        @Override      public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {          outputFieldsDeclarer.declare(new Fields("word", "count"));      }  }

5. 实现上报bolt 

package storm.test.v1;    import backtype.storm.task.OutputCollector;  import backtype.storm.task.TopologyContext;  import backtype.storm.topology.OutputFieldsDeclarer;  import backtype.storm.topology.base.BaseRichBolt;  import backtype.storm.tuple.Tuple;    import java.util.ArrayList;  import java.util.Collections;  import java.util.HashMap;  import java.util.List;  import java.util.Map;    public class ReportBolt extends BaseRichBolt {      private HashMap<String, Long> counts = null;        @Override      public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) {          counts = new HashMap<String, Long>();      }        @Override      public void execute(Tuple tuple) {          String word = tuple.getStringByField("word");          Long count = tuple.getLongByField("count");          this.counts.put(word, count);      }        @Override      public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {      }        @Override      public void cleanup() { //本地模式下,终止topology时可以保证cleanup()被执行          System.out.println("--- FINAL COUNTS ---");          List<String> keys = new ArrayList<String>();          keys.addAll(this.counts.keySet());          Collections.sort(keys);          for (String key : keys) {              System.out.println(key + " : " + this.counts.get(key));          }          System.out.println("----------");      }  }

6. 实现单词计数topology 

package storm.test.v1;    import backtype.storm.Config;  import backtype.storm.LocalCluster;  import backtype.storm.topology.TopologyBuilder;  import backtype.storm.tuple.Fields;    public class WordCountTopology {      private static final String SENTENCE_SPOUT_ID = "sentence-spout";      private static final String SPLIT_BOLT_ID = "split-bolt";      private static final String COUNT_BOLT_ID = "conut-bolt";      private static final String REPORT_BOLT_ID = "report-bolt";      private static final String TOPOLOGY_NAME = "word-count-topology";        public static void main(String[] args) {          SentenceSpout spout = new SentenceSpout();          SplitSentenceBolt spiltBolt = new SplitSentenceBolt();          WordCountBolt countBolt = new WordCountBolt();          ReportBolt reportBolt = new ReportBolt();            TopologyBuilder builder = new TopologyBuilder();            builder.setSpout(SENTENCE_SPOUT_ID, spout); //注册数据源          builder.setBolt(SPLIT_BOLT_ID, spiltBolt) //注册bolt                  .shuffleGrouping(SENTENCE_SPOUT_ID); //该bolt订阅spout随机均匀发射来的数据流          builder.setBolt(COUNT_BOLT_ID, countBolt)                  .fieldsGrouping(SPLIT_BOLT_ID, new Fields("word")); //该bolt订阅spiltBolt发射来的数据流,并且保证"word"字段值相同的tuple会被路由到同一个countBolt          builder.setBolt(REPORT_BOLT_ID, reportBolt)                  .globalGrouping(COUNT_BOLT_ID); //该bolt订阅countBolt发射来的数据流,并且所有的tuple都会被路由到唯一的一个reportBolt中            Config config = new Config();            //本地模式启动          LocalCluster cluster = new LocalCluster();          cluster.submitTopology(TOPOLOGY_NAME, config, builder.createTopology());          try {              Thread.sleep(5 * 1000);          } catch (InterruptedException e) {          }          cluster.killTopology(TOPOLOGY_NAME);          cluster.shutdown();      }  }

7. 运行结果:

--- FINAL COUNTS ---  a : 302  already : 302  and : 604  arbitrarily : 302  between : 302  complex : 302  computation : 302  consumes : 302  data : 302  database : 302  each : 302  however : 302  in : 302  integrates : 302  needed : 302  of : 604  processes : 302  queueing : 302  repartitioning : 302  stage : 302  storm : 604  streams : 906  technologies : 302  the : 906  those : 302  topology : 302  use : 302  ways : 302  with : 302  you : 302  ----------