JAVA中利用ActiveMQ收发消息

dh_sue 7年前

ActiveMQ不多说,下现是开启线程进行消息监听,符合条件则发送相应的消息到对方,两者消息均采用队列模式
SMSMQListener.java

/**   *    */  package com.wxcm.sms;    import java.util.HashMap;  import java.util.List;  import java.util.Map;    import javax.jms.Connection;  import javax.jms.ConnectionFactory;  import javax.jms.Destination;  import javax.jms.JMSException;  import javax.jms.MapMessage;  import javax.jms.Message;  import javax.jms.MessageConsumer;  import javax.jms.MessageListener;  import javax.jms.Session;    import org.apache.activemq.ActiveMQConnectionFactory;  import org.apache.log4j.Logger;  import org.springframework.beans.factory.annotation.Autowired;    import com.wxcm.sms.service.ChannelService;  import com.wxcm.sms.util.MQUtil;  import com.wxcm.sms.vo.Channel;  import com.wxcm.waf.Configur;    /**   * @author D.H. Sue   *   */  public class SMSMQListener implements Runnable {     Logger logger = Logger.getLogger(SMSMQListener.class);   @Autowired   ChannelService channelService;   @Autowired   Configur configur;   Connection connection;   Session session;      public SMSMQListener() {       }      public void run() {    logger.info("--ActiveMQ Connect--");    ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("failover:(" + configur.getProperties().get("sms.mq.url") + ")");    logger.info("--ActiveMQ Connect Success--");    try {     connection = connectionFactory.createConnection();     connection.start();     session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);     Destination destination = session.createQueue("stream");     MessageConsumer consumer = session.createConsumer(destination);     while (true) {      consumer.setMessageListener(new MessageListener() {       public void onMessage(Message message) {        MapMessage mm = (MapMessage) message;        try {         logger.info("--Msg From GET_ALL_CHANNELS--");         if (mm.getString("type").equals("get_all_channels")) {          sendChannelInfo2StreamServerI(mm.getString("queue_name"), Constants.SEND_CHANNEL_2_MQ_ADD);          logger.info("--Send Channel Info Done--");         }        } catch (JMSException e) {         logger.info(e);        }       }      });     }    } catch (JMSException e) {     logger.info(e);    }       }      private void sendChannelInfo2StreamServerI(String streamServerI, String opType){    try {     List<Channel> channelList = (List<Channel>) channelService.listChannels();     Map<String, String> msgMap = new HashMap<String, String>();     if (channelList!=null) {      msgMap.put("count", channelList.size()+"");      for (int i = 0; i < channelList.size(); i++) {       msgMap.put("type_" + i, Constants.SEND_CHANNEL_2_MQ_ADD);       msgMap.put("channel_num_" + i, channelList.get(i).getChannelnumber() + "");       msgMap.put("srcurl_" + i, channelList.get(i).getSrcurl());      }      MQUtil mqUtil = new MQUtil(configur.getProperties().get("sms.mq.url").toString());      Connection connection = mqUtil.connectMQ();      mqUtil.sendMessage(connection, msgMap, streamServerI);      mqUtil.disconnectMQ(connection);     }    } catch (Exception e) {     e.printStackTrace();    }   }     public void init() throws Exception {    Thread thread = new Thread(this);    thread.setName("listening");    thread.start();   }      public void destroy(){    try {     if (session!=null) {      session.close();     }    } catch (Exception e) {     logger.error(e);    }        try {     if (connection!=null) {      connection.stop();      connection.close();     }    } catch (Exception e) {     logger.error(e);    }   }  }
下面是MQUtil.java工具类:
package com.wxcm.sms.util;    import java.util.Iterator;  import java.util.List;  import java.util.Map;    import javax.jms.Connection;  import javax.jms.ConnectionFactory;  import javax.jms.DeliveryMode;  import javax.jms.JMSException;  import javax.jms.MapMessage;  import javax.jms.MessageProducer;  import javax.jms.Queue;  import javax.jms.Session;  import javax.jms.Topic;    import org.apache.activemq.ActiveMQConnection;  import org.apache.activemq.ActiveMQConnectionFactory;  import org.apache.log4j.Logger;    /**   * ActiveMQ工具类   * @author D.H. Sue   *   */  public class MQUtil {   Logger logger = Logger.getLogger(MQUtil.class);   private String url;      public String getUrl() {    return url;   }     public void setUrl(String url) {    this.url = url;   }     public MQUtil(String url){    this.url = url;   }      /**    * 连接到ActiveMQ服务器    * @param isProducer    *    是否为生产者1表示生产者,0表示消费者    * @return    *    不空表示连接成功,空表示连接失败    */   public Connection connectMQ() {    ConnectionFactory connectionFactory;    Connection connection = null;    connectionFactory = new ActiveMQConnectionFactory(ActiveMQConnection.DEFAULT_USER, ActiveMQConnection.DEFAULT_PASSWORD, url);    try {     connection = connectionFactory.createConnection();     connection.start();    } catch (Exception e) {     logger.error(e);    }        return connection;   }      /**    * 断开连接    * @param connection    *    ActiveMQ连接    */   public void disconnectMQ(Connection connection){    try {     if (connection != null) {      connection.stop();      connection.close();     }    } catch (Exception e) {     logger.error(e.getMessage());    }   }      /**    * 将消息message以topicName为主题发送出去    * @param key    *    消息名    * @param message    *    消息体    * @param topicName    *    主题名称    * @throws Exception    */   public void sendMessage(Connection connection, List<String> key, List<String> message, String topicName) {    Session session = null;    MessageProducer producer;    try {     session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);     Topic topic = session.createTopic(topicName);     producer = session.createProducer(topic);     producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);     MapMessage mapMessage = session.createMapMessage();     Iterator<String> keyIt = key.iterator();     Iterator<String> messageIt = message.iterator();     while (keyIt.hasNext() && messageIt.hasNext()) {      mapMessage.setString(keyIt.next(), messageIt.next());     }     producer.send(mapMessage);     session.commit();    } catch (Exception e) {     logger.error(e.getMessage());    } finally {     try {      if (session != null) {       session.close();      }     } catch (JMSException e) {      logger.error(e);     }    }   }     public void sendMessage(Connection connection, Map<String, String> msgMap, String streamServerI) {    Session session;    MessageProducer producer;    try {     session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);     Queue queue = session.createQueue(streamServerI);     producer = session.createProducer(queue);     producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);     MapMessage mapMessage = session.createMapMessage();     for (Map.Entry<String, String> entity : msgMap.entrySet()) {      mapMessage.setString(entity.getKey(), entity.getValue());     }     producer.send(mapMessage);     session.commit();    } catch (Exception e) {     logger.error(e);    }   }     }
需要注意一点的是,使用线程来监听MQ消息时,一定要记得断开会话与链接,否则会遗留多个消费者,从而干扰程序的正确运行,因为是启动线程,因此需要在配置文件中配置线程的启动方法与销毁方法:
<bean id="SMSMQListener" class="com.wxcm.sms.SMSMQListener" scope="singleton" lazy-init="false" init-method="init" destroy-method="destroy"/>