• 1. ActiveMQ讲义
  • 2. 一、ActiveMQ介绍
  • 3. ActiveMQ是什么? ActiveMQ 是Apache出品,最流行的,能力强劲的开源消息总线。ActiveMQ 是一个完全支持JMS1.1和J2EE 1.4规范的 JMS Provider实现
  • 4. ActiveMQ特性列表 1. 多种语言和协议编写客户端。语言: Java, C, C++, C#, Ruby, Perl, Python, PHP。应用协议: OpenWire,Stomp REST,WS Notification,XMPP,AMQP 2. 完全支持JMS1.1和J2EE 1.4规范 (持久化,XA消息,事务) 3. 对Spring的支持,ActiveMQ可以很容易内嵌到使用Spring的系统里面去,而且也支持Spring2.0的特性 4. 通过了常见J2EE服务器(如 Geronimo,JBoss 4, GlassFish,WebLogic)的测试,其中通过JCA 1.5 resource adaptors的配置,可以让ActiveMQ可以自动的部署到任何兼容J2EE 1.4 商业服务器上 5. 支持多种传送协议:in-VM,TCP,SSL,NIO,UDP,JGroups,JXTA 6. 支持通过JDBC和journal提供高速的消息持久化 7. 从设计上保证了高性能的集群,客户端-服务器,点对点 8. 支持Ajax 9. 支持与Axis的整合 10. 可以很容易得调用内嵌JMS provider,进行测试
  • 5. 一、ActiveMQ介绍 二、ActiveMQ安装配置
  • 6. ActiveMQ安装一、安装 在http://activemq.apache.org/download.html 下载5.2.0发行包,解压到需要安装ActiveMQ的文件夹(假设该文件夹为/path/),记为/path/activemq。 unix环境activemq文件夹需要执行权限,执行如下命令  chmod -R 755 /path/activemq 
  • 7. ActiveMQ安装二、启动 window环境运行/path/activemq/bin/activemq.bat unix环境运行/path/activemq/bin/activemq
  • 8. ActiveMQ安装三、测试 ActiveMQ默认使用的TCP连接端口是61616, 通过查看该端口的信息可以测试ActiveMQ是否成功启动 window环境运行  netstat -an|find "61616" unix环境运行netstat -an|grep 61616
  • 9. ActiveMQ安装监控 ActiveMQ5.2版本默认启动时,启动了内置的jetty服务器,提供一个demo应用和用于监控ActiveMQ的admin应用。 admin:http://127.0.0.1:8161/admin/ demo:http://127.0.0.1:8161/demo/ 点击demo应用中的“ Market data publisher ”,就会发一些测试的消息。转到admin页面的topics menu下面,可以看到消息在增长。
  • 10. 一、ActiveMQ介绍 二、ActiveMQ安装配置 三、ActiveMQ架构
  • 11. ActiveMQ架构
  • 12. 一、ActiveMQ介绍 二、ActiveMQ安装配置 三、ActiveMQ架构 四、ActiveMQ消息模式
  • 13. ActiveMQ消息模式一、Point-to-Point(PTP,点对点) ActiveMQ的PTP模式通过消息队列(Queue)实现。消息由一个JMS客户机(Producer)发送到服务器上的一个队列( Queue )。而另一个JMS客户机(Consumer)则可以访问这个队列,并从该服务器获取这条消息。
  • 14. ActiveMQ消息模式二、Publich/Subscribe(发布/订阅者) ActiveMQ的Publich/Subscribe模式通过消息主题(Topic)实现。消息由一个JMS客户机(Producer)发布到服务器上的一个主题(Topic),可有多个订阅者(Consumer)去访问该消息。消息将一直维持在主题中,直到这个主题的所有订阅者都取走了该消息的一个副本。消息也包括了一个参数,用于定义了该消息的耐久性(它能够在服务器上等待订阅者多长时间)。
  • 15. 一、ActiveMQ介绍 二、ActiveMQ安装配置 三、ActiveMQ架构 四、ActiveMQ消息模式 五、Java for ActiveMQ
  • 16. 入门实例实例说明: ActiveMQ发送和异步接受文本类型消息的简单功能 ,采用PTP模式
  • 17. MQBroker类com.honno.activemq.MQBroker负责启动ActiveMQ的消息代理。 代码片段: BrokerService broker = new BrokerService(); try { broker.addConnector("tcp://localhost:61616"); broker.start(); } catch (InterruptedException e) { e.printStackTrace(); } catch (Exception e) { e.printStackTrace(); }
  • 18. Producer类com.honno.activemq.producer.Producer生产10个文本消息,并发送出去。 代码片段: try { // TODO Auto-generated method stub ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory( ActiveMQConnectionFactory.DEFAULT_USER, ActiveMQConnectionFactory.DEFAULT_PASSWORD, ActiveMQConnectionFactory.DEFAULT_BROKER_URL); try { Connection connection = connectionFactory.createConnection(); connection.start(); Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
  • 19. ProducerDestination destination = session.createQueue("TEST"); MessageProducer producer = session.createProducer(destination); TextMessage message = session.createTextMessage(); PrintlnUtil.println("Sending Messags"); for (int i = 0; i < 10; i++) { message.setText("This is message " + (i + 1)); producer.send(message); } } catch (JMSException e) { // TODO Auto-generated catch block e.printStackTrace(); } }
  • 20. Consumer 类com.honno.activemq.consumer.Consumer异步接收的方式消费消息。 代码片段: ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory( ActiveMQConnectionFactory.DEFAULT_USER, ActiveMQConnectionFactory.DEFAULT_PASSWORD, ActiveMQConnectionFactory.DEFAULT_BROKER_URL); try { Connection connection = connectionFactory.createConnection(); connection.start(); Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); Destination destination = session.createQueue("TEST"); MessageConsumer consumer = session.createConsumer(destination); PrintlnUtil.println("Consumering Messages");
  • 21. Consumerconsumer.setMessageListener(this); } catch (JMSException e) { // TODO Auto-generated catch block e.printStackTrace(); } } public void onMessage(Message m) { // TODO Auto-generated method stub if (m instanceof TextMessage) { TextMessage message = (TextMessage) m; try { System.out.println("Message :" + message.getText()); } catch (JMSException e) { // TODO Auto-generated catch block e.printStackTrace(); } } }
  • 22. 谢谢