Storm集成Redis

zvfr8962 8年前

来自: http://my.oschina.net/drl/blog/606415


一、Storm集成redis前的准备

    首先是需要安装storm和redis环境,具体安装方法可分别去http://storm.apache.org/http://redis.io查询,安装完成之后 需要准备运行所需要的jar包,除了storm本身lib下的jar包外,操作redis还需要guava-12.0.1、jedis-2.7.2、commons-pool2-2.3.jar、hamcrest-core-1.3.jar和storm-redis-0.10.0.jar,将其引入即可

二、代码

    具体的实例代码在storm-1.10.0的版本中自带,具体可以去下载storm的1.10.0的源码中自行查找,下面将wordcount的集成实例代码列下,spout/bolt的topology主要需要三个文件,分别为PersistentWordCount.java、WordSpout.java和WordCount.java,代码如下:(对trident感兴趣的,源码中也有实例程序,可自行查找学习)

/**   * Licensed to the Apache Software Foundation (ASF) under one   * or more contributor license agreements.  See the NOTICE file   * distributed with this work for additional information   * regarding copyright ownership.  The ASF licenses this file   * to you under the Apache License, Version 2.0 (the   * "License"); you may not use this file except in compliance   * with the License.  You may obtain a copy of the License at   *   * http://www.apache.org/licenses/LICENSE-2.0   *   * Unless required by applicable law or agreed to in writing, software   * distributed under the License is distributed on an "AS IS" BASIS,   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.   * See the License for the specific language governing permissions and   * limitations under the License.   */  package org.apache.storm.redis.topology;  import backtype.storm.Config;  import backtype.storm.LocalCluster;  import backtype.storm.StormSubmitter;  import backtype.storm.topology.OutputFieldsDeclarer;  import backtype.storm.topology.TopologyBuilder;  import backtype.storm.tuple.Fields;  import backtype.storm.tuple.ITuple;  import backtype.storm.tuple.Tuple;  import org.apache.storm.redis.bolt.AbstractRedisBolt;  import org.apache.storm.redis.bolt.RedisStoreBolt;  import org.apache.storm.redis.common.config.JedisClusterConfig;  import org.apache.storm.redis.common.config.JedisPoolConfig;  import org.apache.storm.redis.common.mapper.RedisDataTypeDescription;  import org.apache.storm.redis.common.mapper.RedisStoreMapper;  import org.slf4j.Logger;  import org.slf4j.LoggerFactory;  import redis.clients.jedis.JedisCommands;  import redis.clients.jedis.exceptions.JedisConnectionException;  import redis.clients.jedis.exceptions.JedisException;  public class PersistentWordCount {      private static final String WORD_SPOUT = "WORD_SPOUT";      private static final String COUNT_BOLT = "COUNT_BOLT";      private static final String STORE_BOLT = "STORE_BOLT";      private static final String TEST_REDIS_HOST = "127.0.0.1";      private static final int TEST_REDIS_PORT = 6379;      public static void main(String[] args) throws Exception {          Config config = new Config();          String host = TEST_REDIS_HOST;          int port = TEST_REDIS_PORT;          if (args.length >= 2) {              host = args[0];              port = Integer.parseInt(args[1]);          }          JedisPoolConfig poolConfig = new JedisPoolConfig.Builder()                  .setHost(host).setPort(port).build();          WordSpout spout = new WordSpout();          WordCounter bolt = new WordCounter();          RedisStoreMapper storeMapper = setupStoreMapper();          RedisStoreBolt storeBolt = new RedisStoreBolt(poolConfig, storeMapper);          // wordSpout ==> countBolt ==> RedisBolt          TopologyBuilder builder = new TopologyBuilder();          builder.setSpout(WORD_SPOUT, spout, 1);          builder.setBolt(COUNT_BOLT, bolt, 1).fieldsGrouping(WORD_SPOUT, new Fields("word"));          builder.setBolt(STORE_BOLT, storeBolt, 1).shuffleGrouping(COUNT_BOLT);          if (args.length == 2) {              LocalCluster cluster = new LocalCluster();              cluster.submitTopology("test", config, builder.createTopology());              Thread.sleep(30000);              cluster.killTopology("test");              cluster.shutdown();              System.exit(0);          } else if (args.length == 3) {              StormSubmitter.submitTopology(args[2], config, builder.createTopology());          } else {              System.out.println("Usage: PersistentWordCount <redis host> <redis port> (topology name)");          }      }      private static RedisStoreMapper setupStoreMapper() {          return new WordCountStoreMapper();      }      private static class WordCountStoreMapper implements RedisStoreMapper {          private RedisDataTypeDescription description;          private final String hashKey = "wordCount";          public WordCountStoreMapper() {              description = new RedisDataTypeDescription(                      RedisDataTypeDescription.RedisDataType.HASH, hashKey);          }          @Override          public RedisDataTypeDescription getDataTypeDescription() {              return description;          }          @Override          public String getKeyFromTuple(ITuple tuple) {              return tuple.getStringByField("word");          }          @Override          public String getValueFromTuple(ITuple tuple) {              return tuple.getStringByField("count");          }      }  }
/**   * Licensed to the Apache Software Foundation (ASF) under one   * or more contributor license agreements.  See the NOTICE file   * distributed with this work for additional information   * regarding copyright ownership.  The ASF licenses this file   * to you under the Apache License, Version 2.0 (the   * "License"); you may not use this file except in compliance   * with the License.  You may obtain a copy of the License at   *   * http://www.apache.org/licenses/LICENSE-2.0   *   * Unless required by applicable law or agreed to in writing, software   * distributed under the License is distributed on an "AS IS" BASIS,   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.   * See the License for the specific language governing permissions and   * limitations under the License.   */  package org.apache.storm.redis.topology;  import backtype.storm.spout.SpoutOutputCollector;  import backtype.storm.task.TopologyContext;  import backtype.storm.topology.IRichSpout;  import backtype.storm.topology.OutputFieldsDeclarer;  import backtype.storm.tuple.Fields;  import backtype.storm.tuple.Values;  import java.util.Map;  import java.util.Random;  import java.util.UUID;  public class WordSpout implements IRichSpout {      boolean isDistributed;      SpoutOutputCollector collector;      public static final String[] words = new String[] { "apple", "orange", "pineapple", "banana", "watermelon" };      public WordSpout() {          this(true);      }      public WordSpout(boolean isDistributed) {          this.isDistributed = isDistributed;      }      public boolean isDistributed() {          return this.isDistributed;      }      @SuppressWarnings("rawtypes")      public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {          this.collector = collector;      }      public void close() {      }      public void nextTuple() {          final Random rand = new Random();          final String word = words[rand.nextInt(words.length)];          this.collector.emit(new Values(word), UUID.randomUUID());          Thread.yield();      }      public void ack(Object msgId) {      }      public void fail(Object msgId) {      }      public void declareOutputFields(OutputFieldsDeclarer declarer) {          declarer.declare(new Fields("word"));      }      @Override      public void activate() {      }      @Override      public void deactivate() {      }      @Override      public Map<String, Object> getComponentConfiguration() {          return null;      }  }
/**   * Licensed to the Apache Software Foundation (ASF) under one   * or more contributor license agreements.  See the NOTICE file   * distributed with this work for additional information   * regarding copyright ownership.  The ASF licenses this file   * to you under the Apache License, Version 2.0 (the   * "License"); you may not use this file except in compliance   * with the License.  You may obtain a copy of the License at   *   * http://www.apache.org/licenses/LICENSE-2.0   *   * Unless required by applicable law or agreed to in writing, software   * distributed under the License is distributed on an "AS IS" BASIS,   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.   * See the License for the specific language governing permissions and   * limitations under the License.   */  package org.apache.storm.redis.topology;  import backtype.storm.task.TopologyContext;  import backtype.storm.topology.BasicOutputCollector;  import backtype.storm.topology.IBasicBolt;  import backtype.storm.topology.OutputFieldsDeclarer;  import backtype.storm.tuple.Fields;  import backtype.storm.tuple.Tuple;  import backtype.storm.tuple.Values;  import com.google.common.collect.Maps;  import java.util.Map;  import static backtype.storm.utils.Utils.tuple;  public class WordCounter implements IBasicBolt {      private Map<String, Integer> wordCounter = Maps.newHashMap();      @SuppressWarnings("rawtypes")      public void prepare(Map stormConf, TopologyContext context) {      }      public void execute(Tuple input, BasicOutputCollector collector) {          String word = input.getStringByField("word");          int count;          if (wordCounter.containsKey(word)) {              count = wordCounter.get(word) + 1;              wordCounter.put(word, wordCounter.get(word) + 1);          } else {              count = 1;          }          wordCounter.put(word, count);          collector.emit(new Values(word, String.valueOf(count)));      }      public void cleanup() {      }      public void declareOutputFields(OutputFieldsDeclarer declarer) {          declarer.declare(new Fields("word", "count"));      }      @Override      public Map<String, Object> getComponentConfiguration() {          return null;      }  }

三、查看结果

    运行PersistentWordCount,在命令行中键入

src/redis-cli

进入到redis的client,输入命令

HGETALL wordCount

即可查看到结果