ActiveMQ简单介绍+简单实例

jopen 10年前

1. JMS基本概念
</span>      JMS(Java Message Service) 即Java消息服务。它提供标准的产生、发送、接收消息的接口简化 企业 应用的开发。它支持两种消息通信模型:点到点(point-to-point)(P2P)模型和发布/订阅(Pub/Sub)模型。P2P 模型规定了一个消息只能有一个接收者;Pub/Sub 模型允许一个消息可以有多个接收者。
    对于点到点模型,消息生产者产生一个消息后,把这个消息发送到一个Queue(队列)中,然后消息接收者再从这个Queue中读取,一旦这个消息被一个接收者读取之后,它就在这个Queue中消失了,所以一个消息只能被一个接收者消费。

    与点到点模型不同,发布/订阅模型中,消息生产者产生一个消息后,把这个消息发送到一个Topic中,这个Topic可以同时有多个接收者在监听,当一个消息到达这个Topic之后,所有消息接收者都会收到这个消息。

2.编程的结构

2.1消息产生者向JMS发送消息的步骤 
(1)创建连接使用的工厂类JMS ConnectionFactory 
(2)使用管理对象JMS ConnectionFactory建立连接Connection 
(3)使用连接Connection 建立会话Session 
(4)使用会话Session和管理对象Destination创建消息生产者MessageSender 
(5)使用消息生产者MessageSender发送消息 
2.2消息消费者从JMS接受消息的步骤 
(1)创建连接使用的工厂类JMS ConnectionFactory 
(2)使用管理对象JMS ConnectionFactory建立连接Connection 
(3)使用连接Connection 建立会话Session 
(4)使用会话Session和管理对象Destination创建消息消费者MessageReceiver 
(5)使用消息消费者MessageReceiver接受消息,需要用setMessageListener将MessageListener接口绑定到MessageReceiver 
消息消费者必须实现了MessageListener接口,需要定义onMessage事件方法。

3.ActiveMQ的下载

下载地址:http://activemq.apache.org/download.html
解压缩到本地,然后启动/bin/activemq.bat

并且有的Eclipse没有自带jms.jar,需要去下载

4.发送端代码:

package com.xkey.JMS;    import javax.jms.Connection;  import javax.jms.ConnectionFactory;  import javax.jms.DeliveryMode;  import javax.jms.Destination;  import javax.jms.JMSException;  import javax.jms.MessageProducer;  import javax.jms.Session;  import javax.jms.TextMessage;  import javax.jms.Topic;    import org.apache.activemq.ActiveMQConnection;  import org.apache.activemq.ActiveMQConnectionFactory;    public class JmsSender {      private ConnectionFactory connectionFactory = null;   private Connection connection = null;   private Session session = null;   private Destination destination = null;   private MessageProducer producer = null;         JmsSender(){       }      public void init(){    connectionFactory = new ActiveMQConnectionFactory(      ActiveMQConnection.DEFAULT_USER,      ActiveMQConnection.DEFAULT_PASSWORD,"tcp://localhost:61616");        try{          connection = connectionFactory.createConnection();     connection.start();     session = connection.createSession(Boolean.TRUE.booleanValue(),                        Session.AUTO_ACKNOWLEDGE);     //Queue     destination = session.createQueue("xkey");     producer = session.createProducer(destination);     //Topic     /**      * Topic topic = session.createTopic("xkey.Topic");      * producer = session.createProducer(topic);     */     producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);     sendMessage(session,producer);     session.commit();         }catch(Exception e){     e.printStackTrace();    }finally{     try {      connection.close();     } catch (JMSException e) {      // TODO Auto-generated catch block      e.printStackTrace();     }    }   }      private void sendMessage(Session session,MessageProducer producer) throws JMSException{    for (int i = 1; i <= 5; i ++) {                TextMessage message = session.createTextMessage("First ActiveMQ Test:::: " + i);                // 发送消息              System.out.println("Sender:" + "First ActiveMQ Test::: " + i);                producer.send(message);            }     }     /**    * @param args    */   public static void main(String[] args) {    // TODO Auto-generated method stub    JmsSender jms = new JmsSender();    jms.init();   }    }



5.接收端代码

package com.xkey.JMS;    import javax.jms.Connection;    import javax.jms.ConnectionFactory;    import javax.jms.Destination;    import javax.jms.JMSException;  import javax.jms.Message;  import javax.jms.MessageConsumer;    import javax.jms.MessageListener;  import javax.jms.Session;    import javax.jms.TextMessage;      import org.apache.activemq.ActiveMQConnection;    import org.apache.activemq.ActiveMQConnectionFactory;      public class JmsReceiver {     private ConnectionFactory connectionFactory = null;   private Connection connection = null;   private Session session = null;   private MessageConsumer consumer = null;   private Destination destination = null;      public JmsReceiver(){       }      public void init(){    connectionFactory = new ActiveMQConnectionFactory(      ActiveMQConnection.DEFAULT_USER,      ActiveMQConnection.DEFAULT_PASSWORD,"tcp://localhost:61616");    try{     connection = connectionFactory.createConnection();     connection.start();     session = connection.createSession(Boolean.TRUE.booleanValue(),                     Session.AUTO_ACKNOWLEDGE);      destination = session.createQueue("xkey");     consumer = session.createConsumer(destination);     consumer.setMessageListener(new MessageListener(){        @Override      public void onMessage(Message msg) {       // TODO Auto-generated method stub       TextMessage message = (TextMessage)msg;       try{        System.out.println("Receiver " + message.getText());         }catch(Exception e){        e.printStackTrace();       }      }           });     /**while (true) {                    TextMessage message = (TextMessage) consumer.receive(1000);                    if (null != message) {                        System.out.println("Receiver " + message.getText());                    } else {                        break;                    }                }  */    }catch(Exception e){     e.printStackTrace();    }finally{     try {      connection.close();     } catch (JMSException e) {      // TODO Auto-generated catch block      e.printStackTrace();     }    }   }   /**    * @param args    */   public static void main(String[] args) {    // TODO Auto-generated method stub    JmsReceiver jms = new JmsReceiver();    jms.init();   }    }



6、JMS+ActiveMQ+Spring+TomCat可以用来实现企业级的消息队列,这样能使服务请求端和服务操作端实现低耦合。不知道怎么实现分布式,也就是多个请求,每个请求用不同的服务器去相响应。

Spring + JMS + ActiveMQ实现简单的消息队列(监听器异步实现)

连接:   http://blog.csdn.net/acceptedxukai/article/details/7775746

在Tomcat 6.0下用JNDI连接IBM MQ 6.0的配置方法(一)

连接:  http://blog.sina.com.cn/s/blog_60dadc490100ecy6.html

http://www.javawind.net/help/html/spring_ref_2.0/html/jms.html