Spring整合ActiveMQ

jopen 10年前

实现公交GPS定位,在地图上动态显示订阅的公交车行车轨迹、轨迹回放等等一些功能。这就要用到消息推送服务中间件ActiveMQ。采用UDP的方式推送消息。

一.消息监听
Spring提供了三种 AbstractMessageListenerContainer 的子类,每种各有其特点。


第一种:SimpleMessageListenerContainer
      这个消息侦听容器是三种中最简单的。它在启动时创建固定数量的JMS session并在容器的整个生命周期中使用它们。这个类不能动态的适应运行时的要求或参与消息接收的事务处理。然而它对JMS提供者的要求也最低。它只需要简单的JMS API。

第二种:DefaultMessageListenerContainer

      这个消息侦听器使用的最多。和 SimpleMessageListenerContainer 相反,这个子类可以动态适应运行时侯的要求,也可以参与事务管理。每个收到的消息都注册到一个XA事务中(如果使用 JtaTransactionManager 配置过),这样就可以利用XA事务语义的优势了。这个类在对JMS提供者的低要求和提供包括事务参于等的强大功能上取得了很好的平衡。

第三种:ServerSessionMessageListenerContainer
 

     这个监听器容器利用JMS ServerSessionPool SPI动态管理JMS Session。 使用者各种消息监听器可以获得运行时动态调优功能,但是这也要求JMS提供者支持ServerSessionPool SPI。如果不需要运行时性能调整,请使用 DefaultMessageListenerContainer 或 SimpleMessageListenerContainer。 

二.自动将消息转化为Java对象
        转化器在很多组件中都是必不缺少的东西。Spring挺过MessageConverter接口提供了对消息转换的支持。

三.代码

      1.修改activeMQ  conf文件夹下activemq.xml配置文件,加入UDP传输方式

<!--     Licensed to the Apache Software Foundation (ASF) under one or more        contributor license agreements.  See the NOTICE file distributed with        this work for additional information regarding copyright ownership.        The ASF licenses this file to You under the Apache License, Version 2.0        (the "License"); you may not use this file except in compliance with        the License.  You may obtain a copy of the License at            http://www.apache.org/licenses/LICENSE-2.0            Unless required by applicable law or agreed to in writing, software        distributed under the License is distributed on an "AS IS" BASIS,        WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.        See the License for the specific language governing permissions and        limitations under the License.          -->     - <!--  START SNIPPET: example      -->     - <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core.xsd">    - <!--  Allows us to use system properties as variables in this configuration file      -->     - <bean class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer">    - <property name="locations">      <value>file:${activemq.conf}/credentials.properties</value>       </property>      </bean>    - <!--  Allows log searching in hawtio console      -->       <bean id="logQuery" class="org.fusesource.insight.log.log4j.Log4jLogQuery" lazy-init="false" scope="singleton" init-method="start" destroy-method="stop" />     - <!--         The <broker> element is used to configure the ActiveMQ broker.            -->     - <broker xmlns="http://activemq.apache.org/schema/core" brokerName="localhost" dataDirectory="${activemq.data}">    - <destinationPolicy>    - <policyMap>    - <policyEntries>    - <policyEntry topic=">">    - <!--  The constantPendingMessageLimitStrategy is used to prevent                             slow topic consumers to block producers and affect other consumers                             by limiting the number of messages that are retained                             For more information, see:                                 http://activemq.apache.org/slow-consumer-handling.html                                  -->     - <pendingMessageLimitStrategy>      <constantPendingMessageLimitStrategy limit="1000" />       </pendingMessageLimitStrategy>      </policyEntry>      </policyEntries>      </policyMap>      </destinationPolicy>    - <!--             The managementContext is used to configure how ActiveMQ is exposed in                JMX. By default, ActiveMQ uses the MBean server that is started by                the JVM. For more information, see:                    http://activemq.apache.org/jmx.html                  -->     - <managementContext>      <managementContext createConnector="false" />       </managementContext>    - <!--             Configure message persistence for the broker. The default persistence                mechanism is the KahaDB store (identified by the kahaDB tag).                For more information, see:                    http://activemq.apache.org/persistence.html                  -->     - <persistenceAdapter>      <kahaDB directory="${activemq.data}/kahadb" />       </persistenceAdapter>    - <!--             The systemUsage controls the maximum amount of space the broker will                use before disabling caching and/or slowing down producers. For more information, see:                http://activemq.apache.org/producer-flow-control.html                    -->     - <systemUsage>    - <systemUsage>    - <memoryUsage>      <memoryUsage percentOfJvmHeap="70" />       </memoryUsage>    - <storeUsage>      <storeUsage limit="100 gb" />       </storeUsage>    - <tempUsage>      <tempUsage limit="50 gb" />       </tempUsage>      </systemUsage>      </systemUsage>    - <!--             The transport connectors expose ActiveMQ over a given protocol to                clients and other brokers. For more information, see:                    http://activemq.apache.org/configuring-transports.html                  -->     - <transportConnectors>    - <!--  DOS protection, limit concurrent connections to 1000 and frame size to 100MB      -->       <transportConnector name="openwire" uri="tcp://0.0.0.0:61616?maximumConnections=1000&wireFormat.maxFrameSize=104857600" />       <strong><span style="color:#cc33cc;"><transportConnector name="udp" uri="udp://0.0.0.0:8123" /> </span></strong>      <transportConnector name="amqp" uri="amqp://0.0.0.0:5672?maximumConnections=1000&wireFormat.maxFrameSize=104857600" />       <transportConnector name="stomp" uri="stomp://0.0.0.0:61613?maximumConnections=1000&wireFormat.maxFrameSize=104857600" />       <transportConnector name="mqtt" uri="mqtt://0.0.0.0:1883?maximumConnections=1000&wireFormat.maxFrameSize=104857600" />       <transportConnector name="ws" uri="ws://0.0.0.0:61614?maximumConnections=1000&wireFormat.maxFrameSize=104857600" />       </transportConnectors>    - <!--  destroy the spring context on shutdown to stop jetty      -->     - <shutdownHooks>      <bean xmlns="http://www.springframework.org/schema/beans" class="org.apache.activemq.hooks.SpringContextHook" />       </shutdownHooks>      </broker>    - <!--         Enable web consoles, REST and Ajax APIs and demos            The web consoles requires by default login, you can disable this in the jetty.xml file                Take a look at ${ACTIVEMQ_HOME}/conf/jetty.xml for more details              -->       <import resource="jetty.xml" />       </beans>    - <!--  END SNIPPET: example      -->   
2. applicationContext.xml 

     

      <?xml version="1.0" encoding="UTF-8" ?>         - <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:aop="http://www.springframework.org/schema/aop" xmlns:tx="http://www.springframework.org/schema/tx" xmlns:context="http://www.springframework.org/schema/context" xmlns:flex="http://www.springframework.org/schema/flex" xmlns:amq="http://activemq.apache.org/schema/core" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.0.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-3.0.xsd http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx-3.0.xsd http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop-3.0.xsd http://www.springframework.org/schema/flex http://www.springframework.org/schema/flex/spring-flex-1.0.xsd http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core-5.2.0.xsd">        - <!-- %%%%%%%%%%%%%%%%%%*********************消息处理 ACTIVEMQ***************************%%%%%%%%%%%%%          -->         - <!--  JMS TOPIC MODEL          -->         - <!--  TOPIC链接工厂          -->         - <bean id="topicSendConnectionFactory" class="org.apache.activemq.spring.ActiveMQConnectionFactory">          <property name="brokerURL" value="udp://localhost:8123" />         - <!--  UDP传输方式          -->         - <!--    <property name="brokerURL" value="tcp://localhost:61616" />           -->         - <!--  TCP传输方式          -->           <property name="useAsyncSend" value="true" />           </bean>        - <bean id="topicListenConnectionFactory" class="org.apache.activemq.spring.ActiveMQConnectionFactory">          <property name="brokerURL" value="udp://localhost:8123" />         - <!--  UDP传输方式          -->         - <!--    <property name="brokerURL" value="tcp://localhost:61616" />           -->         - <!--  TCP传输方式          -->           </bean>        - <!--  定义主题          -->         - <bean id="myTopic" class="org.apache.activemq.command.ActiveMQTopic">          <constructor-arg value="normandy.topic" />           </bean>          <bean id="messageConvertForSys" class="com.tech.gps.util.MessageConvertForSys" />         - <!--  TOPIC send jms模板          -->         - <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">          <property name="connectionFactory" ref="topicSendConnectionFactory" />           <property name="defaultDestination" ref="myTopic" />           <property name="messageConverter" ref="messageConvertForSys" />         - <!--  发送模式  DeliveryMode.NON_PERSISTENT=1:非持久 ; DeliveryMode.PERSISTENT=2:持久         -->           <property name="deliveryMode" value="1" />           <property name="pubSubDomain" value="true" />         - <!--  开启订阅模式          -->           </bean>        - <!--  消息发送方          -->         - <bean id="topicSender" class="com.tech.gps.util.MessageSender">          <property name="jmsTemplate" ref="jmsTemplate" />           </bean>        - <!--  消息接收方          -->           <bean id="topicReceiver" class="com.tech.gps.util.MessageReceiver" />         - <!--  主题消息监听容器          -->         - <bean id="listenerContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer">          <property name="connectionFactory" ref="topicListenConnectionFactory" />           <property name="pubSubDomain" value="true" />         - <!-- true 订阅模式          -->           <property name="destination" ref="myTopic" />         - <!--  目的地 myTopic          -->           <property name="subscriptionDurable" value="true" />         - <!-- -这里是设置接收客户端的ID,在持久化时,但这个客户端不在线时,消息就存在数据库里,知道被这个ID的客户端消费掉         -->           <property name="clientId" value="clientId_1" />           <property name="messageListener" ref="topicReceiver" />           </bean>        - <!--  Servlet          -->         - <bean id="ControlServlet1" class="com.tech.gps.servlet.ControlServlet1">          <property name="topicSender" ref="topicSender" />           </bean>          </beans>          
3. web.xml
      <?xml version="1.0" encoding="UTF-8" ?>         - <web-app version="2.4" xmlns="http://java.sun.com/xml/ns/j2ee" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://java.sun.com/xml/ns/j2ee http://java.sun.com/xml/ns/j2ee/web-app_2_4.xsd">        - <!--  加载spring配置文件applicationContext.xml          -->         - <listener>          <listener-class>org.springframework.web.context.ContextLoaderListener</listener-class>           </listener>        - <!--  指明spring配置文件在何处          -->         - <context-param>          <param-name>contextConfigLocation</param-name>           <param-value>classpath*:applicationContext.xml</param-value>           </context-param>        - <servlet>          <servlet-name>ControlServlet1</servlet-name>           <servlet-class>com.tech.gps.servlet.DelegatingServletProxy</servlet-class>           </servlet>        - <servlet-mapping>          <servlet-name>ControlServlet1</servlet-name>           <url-pattern>/ControlServlet1</url-pattern>           </servlet-mapping>        - <welcome-file-list>          <welcome-file>index11.jsp</welcome-file>           </welcome-file-list>          </web-app>          
4. 消息发送
package com.tech.gps.util;        import org.springframework.jms.core.JmsTemplate;             public class MessageSender {                  private JmsTemplate jmsTemplate;                public void sendMessage(String msg){                         jmsTemplate.convertAndSend(msg);          }            public JmsTemplate getJmsTemplate() {            return jmsTemplate;        }            public void setJmsTemplate(JmsTemplate jmsTemplate) {            this.jmsTemplate = jmsTemplate;        }           }    
5. 消息转换
package com.tech.gps.util;        import javax.jms.JMSException;    import javax.jms.Message;    import javax.jms.ObjectMessage;    import javax.jms.Session;    import javax.jms.TopicPublisher;        import org.springframework.jms.support.converter.MessageConversionException;    import org.springframework.jms.support.converter.MessageConverter;        public class MessageConvertForSys implements MessageConverter {                  public Message toMessage(Object object, Session session)                  throws JMSException, MessageConversionException {                          System.out.println("sendMessage:"+object.toString());              ObjectMessage objectMessage = session.createObjectMessage();              objectMessage.setStringProperty("key",object.toString());                            return objectMessage;          }                   public Object fromMessage(Message message) throws JMSException,                  MessageConversionException {                                     ObjectMessage objectMessage = (ObjectMessage) message;              return objectMessage.getObjectProperty("key");          }         }  
6. 消息接收
    package com.tech.gps.util;                import javax.jms.JMSException;        import javax.jms.Message;        import javax.jms.MessageListener;        import javax.jms.ObjectMessage;                public class MessageReceiver implements MessageListener {                            public void onMessage(Message m) {                                  ObjectMessage om = (ObjectMessage) m;                  try {                      String key = om.getStringProperty("key");                     System.out.println(" ");                    System.out.println("receiveMessage:"+key);                                        } catch (JMSException e) {                      e.printStackTrace();                  }              }                             }    
7. servlet控制器
    package com.tech.gps.servlet;                import java.io.IOException;        import java.io.PrintWriter;                import javax.servlet.ServletContext;        import javax.servlet.ServletException;        import javax.servlet.http.HttpServlet;        import javax.servlet.http.HttpServletRequest;        import javax.servlet.http.HttpServletResponse;                import org.springframework.context.ApplicationContext;        import org.springframework.web.context.WebApplicationContext;        import org.springframework.web.context.support.WebApplicationContextUtils;                 import com.tech.gps.util.MessageSender;                          public class ControlServlet1 extends HttpServlet {                        private MessageSender topicSender;                                   public MessageSender getTopicSender() {                return topicSender;            }                    public void setTopicSender(MessageSender topicSender) {                this.topicSender = topicSender;            }                    public void init() throws ServletException {                     }                         public void doGet(HttpServletRequest request, HttpServletResponse response)                    throws ServletException, IOException {                             doPost(request,response);            }                                 public void doPost(HttpServletRequest request, HttpServletResponse response)                    throws ServletException, IOException {                            request.setCharacterEncoding("utf-8");                                   for(int i =0;i<10;i++){                                        try {                         Thread.sleep(1000);                      } catch (InterruptedException e) {                                                   e.printStackTrace();                      }                                               topicSender.sendMessage("坐标:118.36582,37.2569812");                                    }                             }                }  
8.Spring整合Servlet
package com.tech.gps.servlet;        import java.io.IOException;        import javax.servlet.GenericServlet;    import javax.servlet.Servlet;    import javax.servlet.ServletException;    import javax.servlet.ServletRequest;    import javax.servlet.ServletResponse;        import org.springframework.web.context.WebApplicationContext;    import org.springframework.web.context.support.WebApplicationContextUtils;        public class DelegatingServletProxy extends GenericServlet{             private String targetBean;           private Servlet proxy;                     public void service(ServletRequest req, ServletResponse res)                throws ServletException, IOException {                           proxy.service(req, res);          }                         public void init() throws ServletException {              this.targetBean = getServletName();              getServletBean();              proxy.init(getServletConfig());          }                private void getServletBean() {              WebApplicationContext wac = WebApplicationContextUtils.getRequiredWebApplicationContext(getServletContext());              this.proxy = (Servlet) wac.getBean(targetBean);          }      }    
9. 输出

       sendMessage:坐标:128.36582,32.2569812
       receiveMessage:坐标:128.36582,32.2569812

来自:http://blog.csdn.net/allen_oscar/article/details/17066349