redis的发布订阅功能

JenHorseman 8年前

来自: https://segmentfault.com/a/1190000004438233


redis提供了简单的发布订阅功能,对于一些合适的场景(比如不要求消费者不在线时也能收到离线消息),比起专业的MQ来说,用起来更简单些。本文主要是记录下怎么在SpringBoot里头使用redis的发布订阅功能。

定义生产者

配置

    @Bean      MyPublisher redisPublisher(RedisConnectionFactory factory) {          return new MyPublisher( redisTemplate(factory), topic() );      }        @Bean      ChannelTopic topic() {          return new ChannelTopic( "pubsub:queue" );      }

生产者实例

public class MyPublisher {        private final RedisTemplate< String, Object > template;      private final ChannelTopic topic;      private final AtomicLong counter = new AtomicLong( 0 );        public MyPublisher( final RedisTemplate< String, Object > template,                                 final ChannelTopic topic ) {          this.template = template;          this.topic = topic;      }        @Scheduled( fixedDelay = 100 )      public void publish() {          template.convertAndSend( topic.getTopic(), "Message " + counter.incrementAndGet() +                  ", " + Thread.currentThread().getName() );      }  }

定义消费者

配置

//subscribe      @Bean      MessageListenerAdapter messageListener() {          return new MessageListenerAdapter( new MyMessageListener() );      }        @Bean      RedisMessageListenerContainer redisContainer(RedisConnectionFactory factory) {          final RedisMessageListenerContainer container = new RedisMessageListenerContainer();          container.setConnectionFactory(factory);          container.addMessageListener(messageListener(), topic());          return container;      }

消费者实例

public class MyMessageListener implements MessageListener {      @Override      public void onMessage( final Message message, final byte[] pattern ) {          System.out.println( "Message received: " + message.toString() );      }  }

运行

  .   ____          _            __ _ _   /\\ / ___'_ __ _ _(_)_ __  __ _ \ \ \ \  ( ( )\___ | '_ | '_| | '_ \/ _` | \ \ \ \   \\/  ___)| |_)| | | | | || (_| |  ) ) ) )    '  |____| .__|_| |_|_| |_\__, | / / / /   =========|_|==============|___/=/_/_/_/   :: Spring Boot ::        (v1.3.2.RELEASE)    2016-02-16 00:14:08.190  INFO 1481 --- [           main] com.patterncat.RedisdemoApplication      : Starting RedisdemoApplication on Jupiter.local with PID 1481 (/Users/patterncat/workspace/redisdemo/target/classes started by patterncat in /Users/patterncat/workspace/redisdemo)  2016-02-16 00:14:08.193  INFO 1481 --- [           main] com.patterncat.RedisdemoApplication      : No active profile set, falling back to default profiles: default  2016-02-16 00:14:08.242  INFO 1481 --- [           main] s.c.a.AnnotationConfigApplicationContext : Refreshing org.springframework.context.annotation.AnnotationConfigApplicationContext@140e5a13: startup date [Tue Feb 16 00:14:08 CST 2016]; root of context hierarchy  2016-02-16 00:14:09.756  INFO 1481 --- [           main] o.s.j.e.a.AnnotationMBeanExporter        : Registering beans for JMX exposure on startup  2016-02-16 00:14:09.763  INFO 1481 --- [           main] o.s.c.support.DefaultLifecycleProcessor  : Starting beans in phase 0  2016-02-16 00:14:09.807  INFO 1481 --- [           main] o.s.c.support.DefaultLifecycleProcessor  : Starting beans in phase 2147483647  2016-02-16 00:14:09.897  INFO 1481 --- [           main] com.patterncat.RedisdemoApplication      : Started RedisdemoApplication in 2.215 seconds (JVM running for 2.589)  Message received: "Message 1, pool-1-thread-1"  Message received: "Message 2, pool-1-thread-1"  Message received: "Message 3, pool-1-thread-1"  Message received: "Message 4, pool-1-thread-1"  Message received: "Message 5, pool-1-thread-1"  Message received: "Message 6, pool-1-thread-1"  Message received: "Message 7, pool-1-thread-1"  Message received: "Message 8, pool-1-thread-1"  Message received: "Message 9, pool-1-thread-1"  Message received: "Message 10, pool-1-thread-1"

参考