• 1. ActiveMQ开源消息总线介绍
  • 2. 提纲ActiveMQ介绍一二与Spring集成五可靠机制三高级特征四编程步骤
  • 3. 发一ActiveMQ介绍-1-应用背景 CORBA、DCOM、RMI等RPC中间件技术已广泛应用于各个领域。但是面对规模和复杂度都越来越高的分布式系统,这些技术也显示出其局限性: (1)同步通信:客户发出调用后,必须等待服务对象完成处理并返回结果后才能继续执行; (2)客户和服务对象的生命周期紧密耦合:客户进程和服务对象进程 都必须正常运行;如果由于服务对象崩溃或者网络故障导致客户的请求不可达,客户会接收到异常; (3)点对点通信:客户的一次调用只发送给某个单独的目标对象。
  • 4. 一ActiveMQ介绍-2-面向消息的中间件(Message Oriented Middleware,MOM)较好的解决了以上问题: (1)发送者将消息发送给消息服务器,消息服务器将消息存放在若干队列中, 在合适的时候再将消息转发给接收者; (2)发送和接收是异步的,发送者无需等待; (3)二者的生命周期未必相同:发送消息的时候接收者不一定运行,接收消息 的时候发送者也不一定运行; (4)一对多通信:对于一个消息可以有多个接收者。消息中间件ActiveMQ :由Apache出品,最流行的,能力强劲的开源消息总线。 ActiveMQ 是一个完全支持JMS1.1和J2EE 1.4规范的 JMS Provider实现。
  • 5. 一ActiveMQ介绍-2-JMS规范 JAVA 消息服务(JMS)定义了Java 中访问消息中间件的接口。JMS 只是接口,并没有给予实现,实现JMS 接口的消息中间件称为JMS Provider,例如ActiveMQ。 1、消息传送模型 PTP(使用Queue即队列目标):消息从一个生产者传送至一个消费者;如果没有已经向队列目标注册的消费者,队列将保留它收到的消息,并在某个消费者向该队列进行注册时将消息传送给该消费者 Pub/Sub(使用Topic即主题目标):消息从一个生产者传送至任意数量的消费者.可以向主题目标发送消息的生产者的数量没有限制,并且每个消息可以发送至任意数量的订阅消费者,主题目标也支持持久订阅和非持久订阅。
  • 6. 一ActiveMQ介绍-3-Queue通道Consumer1 (独占消费)Consumer1Consumer2Producer1Producer3Producer2 PTP(使用Queue即队列目标):数据数据数据Producer4 Consumer?存储数据
  • 7. v一ActiveMQ介绍-3-TOPIC通道Consumer1Consumer2Consumer3Producer1Producer3Producer2 Pub/Sub(使用Topic即主题目标):数据数据数据数据数据数据Consumer4(离线)存储Consumer?
  • 8. 一ActiveMQ介绍-1-2、JMS消息接口:统一域PTP域Pub/Sub域ConnectionFactoryQueueConnectionFactoryTopicConnectionFactory ConnectionQueueConnectionTopicConnection SessionQueueSessionTopicSessionDestination(Queue或 Topic)QueueTopic MessageProducerQueueSender TopicPublisherMessageConsumerQueueReceiver QueueBrowserTopicSubscriber并发支持:JMS对象是否支持并发Destination是ConnectionFactory是Connection是Session否MessageProducer 否MessageConsumer 否
  • 9. 一ActiveMQ介绍-1-消息体消息属性消息头1、自动分配的消息头:JMSDeliveryMode、JMSMessageID、JMSTimestamp、JMSExpiration、JMSRedelivered、JMSPriority 2、开发者分配的消息头:JMSReplyTo、JMSCorrelationID、JMSType 1. 应用需要用到的属性; 2. 消息头中原有的一些可选属性; 3. JMS Provider 需要用到的属性 1、TextMessage 2、MapMessage 3、BytesMessage 4、StreamMessage 5、ObjectMessage 6、Message 3、JMS消息结构:
  • 10. 一ActiveMQ介绍-1-JMS标准消息头: 消息头 描述JMSDestination消息发送的目的地JMSDeliveryMode传送模式, 有两种模式: PERSISTENT 和NON_PERSISTENT,PERSISTENT 表示该消息一定要被送到目的地,否则会导致应用错误。NON_PERSISTENT 表示偶然丢失该消息是被允许的,这两种模式使开发者可以在消息传送的可靠性和吞吐量之间找到平衡点。JMSExpiration消息过期时间,等于Destination 的send 方法中的timeToLive 值加上发送时刻的GMT 时间值。如果timeToLive值等于零,则JMSExpiration 被设为零,表示该消息永不过期。如果发送后,在消息过期时间之后消息还没有被发送到目的地,则该消息被清除。JMSPriority消息优先级,从0-9 十个级别,0-4 是普通消息,5-9 是加急消息。JMS 不要求JMS Provider 严格按照这十个优先级发送消息,但必须保证加急消息要先于普通消息到达。JMSMessageID唯一识别每个消息的标识,由JMS Provider 产生。JMSTimestamp一个消息被提交给JMS Provider 到消息被发出的时间。JMSCorrelationID用来连接到另外一个消息,典型的应用是在回复消息中连接到原消息。JMSReplyTo提供本消息回复消息的目的地址JMSType消息类型的识别符。JMSRedelivered如果一个客户端收到一个设置了JMSRedelivered 属性的消息,则表示可能客户端曾经在早些时候收到过该消息,但并没有签收(acknowledged)。
  • 11. 一ActiveMQ介绍-1-ActiveMQ特性:支持容器支持协议跨语言调用支持Axis and CXF SupportAMQPActiveMQ C++ ClientsBlazeDSMQTTAjaxGeronimoOpenWireC IntegrationIntegrating Apache ActiveMQ with GlassfishRESTCMSIntegrating Apache ActiveMQ with JBossRSS and AtomDelphi and FreePascalJ2EEStompdot NetJBoss IntegrationWSIFPerlJCA ContainerWS NotificationPHPJNDI SupportXMPPPikeOld OSGi Integration PythonOSGi Integration RubyResource Adapter WebSocketsSJSAS with GenericJMSRA  Spring Support  Sun JNDI  Tomcat  WebLogic Integration  zOS     
  • 12. 一ActiveMQ介绍-1-ActiveMQ安装: 第一步下载:下载地址: http://activemq.apache.org/activemq-5111-release.html 版本支持:Windows /Unix/Linux/Cygwin 第二部解压: 解压后目录结构如下: 第三步配置JDK环境: 省略,注意根据当前下载的版本配置jdk。 第四步运行ActiveMQ: 点击目录apache-activemq-5.11.1\bin\win64\wrapper.exe,点击后运行成功如下图:
  • 13. 一ActiveMQ介绍-1-第五步进入监控平台:在浏览器输入http://localhost:8161/admin,输入后默认登录账户和密码是admin
  • 14. 二ACTIVE_MQ编程步骤-3-创建ConnectionFactoryConnectionFactory创建ConnectionConnection 创建创建一个或多个JMS SessionSession创建Queue或TopicSession创建MessageProducer或MessageConsumer开发JMS的步骤发送消息: MessageProducer:send(Message messag)接收消息: 1、异步接收MessageConsumer:setMessageListener(MessageListener m) 2、同步接收 MessageConsumer:receive() 以上两种接收方式返回Message 对象
  • 15. 二ACTIVE_MQ编程步骤-1- Pub/Sub 编程例子: 发布消息代码: TopicConnection connection = null; TopicSession session = null; try { // 创建链接工厂 TopicConnectionFactory factory = new ActiveMQConnectionFactory(ActiveMQConnection.DEFAULT_USER, ActiveMQConnection.DEFAULT_PASSWORD, BROKER_URL); // 通过工厂创建一个连接 connection = factory.createTopicConnection(); // 启动连接 connection.start(); // 创建一个session会话 session = connection.createTopicSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE); // 创建一个消息队列 Topic topic = session.createTopic(DESTINATION); // 创建消息发送者 TopicPublisher publisher = session.createPublisher(topic); // 设置持久化模式 publisher.setDeliveryMode(DeliveryMode.NON_PERSISTENT); //subscriber.receive(); MapMessage map = session.createMapMessage(); map.setLong("time", System.currentTimeMillis()); publisher.send(map); sendMessage(session, publisher); session.commit(); // 提交会话 } catch (Exception e) { throw e; } finally { // 关闭释放资源 if (session != null) { session.close(); } if (connection != null) { connection.close(); } }
  • 16. 二ACTIVE_MQ编程步骤-1- TopicConnection connection = null; TopicSession session = null; try { // 创建链接工厂 TopicConnectionFactory factory = new ActiveMQConnectionFactory(ActiveMQConnection.DEFAULT_USER, ActiveMQConnection.DEFAULT_PASSWORD, BROKER_URL); // 通过工厂创建一个连接 connection = factory.createTopicConnection(); // 启动连接 connection.start(); // 创建一个session会话 session = connection.createTopicSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE); // 创建一个消息队列 Topic topic = session.createTopic(TARGET); // 创建消息制作者 TopicSubscriber subscriber = session.createSubscriber(topic); subscriber.setMessageListener(new MessageListener() { public void onMessage(Message msg) { if (msg != null) { MapMessage map = (MapMessage) msg; try { System.out.println(map.getLong("time") + "接收#" + map.getString("text")); } catch (JMSException e) { e.printStackTrace(); } } } }); // 提交会话 session.commit(); } catch (Exception e) { throw e; } finally { // 关闭释放资源 if (session != null) { session.close(); } if (connection != null) { connection.close(); } }订阅消息代码:
  • 17. 二ACTIVE_MQ编程步骤-1- PTP编程 发送消息列子: QueueConnection connection = null; QueueSession session = null; try { // 创建链接工厂 QueueConnectionFactory factory = new ActiveMQConnectionFactory(ActiveMQConnection.DEFAULT_USER, ActiveMQConnection.DEFAULT_PASSWORD, BROKER_URL); // 通过工厂创建一个连接 connection = factory.createQueueConnection(); // 启动连接 connection.start(); // 创建一个session会话 session = connection.createQueueSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE); // 创建一个消息队列 Queue queue = session.createQueue(DESTINATION); // 创建消息发送者 javax.jms.QueueSender sender = session.createSender(queue); // 设置持久化模式 sender.setDeliveryMode(DeliveryMode.PERSISTENT); tempQueue= session.createTemporaryQueue(); MessageConsumer tempConsumer =session.createConsumer(tempQueue); tempConsumer.setMessageListener(new MessageListener(){ @Override public void onMessage(Message paramMessage) { TextMessage backmessage =(TextMessage)paramMessage; try { System.out.println("客户端已收到消息,客户端回复消息:"+backmessage.getText()); } catch (JMSException e) {e.printStackTrace();} } }); sendMessage(session, sender); // 提交会话 session.commit(); } catch (Exception e) { session.rollback(); throw e; } finally { // 关闭释放资源 if (session != null) { session.close(); } if (connection != null) { connection.close(); } }
  • 18. 二ACTIVE_MQ编程步骤-1-QueueConnection connection = null; try { // 创建链接工厂 QueueConnectionFactory factory = new ActiveMQConnectionFactory(ActiveMQConnection.DEFAULT_USER, ActiveMQConnection.DEFAULT_PASSWORD, BROKER_URL); // 通过工厂创建一个连接 connection = factory.createQueueConnection(); connection.setClientID(clientid); // 启动连接 connection.start(); // 创建一个session会话 session = connection.createQueueSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE); ActiveMQQueue queue=new ActiveMQQueue(TARGET); // 创建一个消息队列 MessageConsumer messageConsumer = session.createConsumer(queue) ; //temp-queue://ID:yys-58806-1433241770450-1:1:1 producer = session.createProducer(null); messageConsumer.setMessageListener(new MessageListener() { public void onMessage(Message msg) { if (msg != null) { MapMessage map = (MapMessage) msg; try { System.out.println(clientid + "接收#" + map.getString("text")); } catch (JMSException e) {e.printStackTrace();} try {msg.acknowledge(); //回复消息 TextMessage backmessage = session.createTextMessage(); backmessage.setText("消息已签收!"); backmessage.setJMSCorrelationID(map.getJMSCorrelationID()); //回复服务器消息 producer.send(map.getJMSReplyTo(),backmessage); } catch (JMSException e1) { e1.printStackTrace(); } }}}); // 提交会话 session.commit(); } catch (Exception e) { session.rollback(); throw e; } finally { // 关闭释放资源 if (session != null) { session.close(); } if (connection != null) { connection.close(); } }接收消息列子:
  • 19. 三可靠机制-1-一、基本可靠机制 控制消息的签收 客户端成功接收一条消息的标志是这条消息被签收。成功接收一条消息一般包括如下三个阶段: 1.客户端接收消息; 2.客户端处理消息; 3.消息被签收。签收可以由ActiveMQ发起,也可以由客户端发起,取决于Session签收模式的设置。 在带事务的Session中,签收自动发生在事务提交时。如果事务回滚,所有已经接收的消息将会被再次传送。 在不带事务的Session中,一条消息何时和如何被签收取决于Session的设置。 1.Session.AUTO_ACKNOWLEDGE(自动签收) 2.Session.CLIENT_ACKNOWLEDGE(客户端签收) 3.Session.DUPS_OK_ACKNOWLEDGE(不必确保对传送消息的签收)
  • 20. 三可靠机制-1-指定消息传送模式: ActiveMQ支持两种消息传送模式:PERSISTENT和NON_PERSISTENT两种。 2.PERSISTENT(持久性消息) 这是ActiveMQ的默认传送模式,此模式保证这些消息只被传送一次和成功使用一次。对于这些消息,可靠性是优先考虑的因素。可靠性的另一个重要方面是确保持久性消息传送至目标后,消息服务在向消费者传送它们之前不会丢失这些消息。1.NON_PERSISTENT(非持久性消息) 保证这些消息最多被传送一次。对于这些消息,可靠性并非主要的考虑因素。此模式并不要求持久性的数据存储,也不保证消息服务由于某种原因导致失败后消息不会丢失。有两种方法指定传送模式: 1.使用setDeliveryMode方法,这样所有的消息都采用此传送模式; 2.使用send方法为每一条消息设置传送模式;
  • 21. 三可靠机制-1-ActiveMQ持久化消息的三种方式: 1、持久化文件 这个你装ActiveMQ时默认就是这种,只要你设置消息为持久化就可以了。涉及到的配置和代码有 producer.Send(request, MsgDeliveryMode.Persistent, level, TimeSpan.MinValue); 2、持久化为MySql   你首先需要把MySql的驱动放到ActiveMQ的Lib目录下,我用的文件名字是:mysql-connector-java-5.0.4-bin.jar      接下来你修改配置文件 在配置文件中的broker节点外增加 从配置中可以看出数据库的名称是activemq,你需要手动在MySql中增加这个库。 然后重新启动消息队列,你会发现多了3张表 a:activemq_acks b:activemq_lock c:activemq_msgs 3、持久化为Oracle:同持久化mysql一样,只是链接及驱动不一样
  • 22. 三可靠机制-1- 持久化到数据库表结构说明 当在启动ActiveMQ时,先判断表是否存在,如果不存在,将去创建表,如下:  (1)ACTIVEMQ_ACKS:持久订阅者列表  1.CONTAINER:类型://主题  如:topic://basicInfo.topic  2.SUB_DEST:应该是描述,与1内容相同  3.CLIENT_ID:持久订阅者的标志ID,必须唯一  4.SUB_NAME:持久订阅者的名称.(durableSubscriptionName)  5.SELECTOR:消息选择器,consumer可以选择自己想要的  6.LAST_ACKED_ID:最后一次确认ID,这个字段存的该该订阅者最后一次收到的消息的ID  (2)ACTIVEMQ_LOCK:进行数据访问的排斥锁  1.ID:值为1  2.TIME:时间  3.BROKER_NAME:broker的名称  这个表似为集群使用,但现在ActiveMQ并不能共享数据库.  (3)ACTIVEMQ_MSGS:存储Queue和Topic消息的表  1.ID:消息的ID  2.CONTAINER: 类型://主题 如:queue://my.queue或Topic://basicInfo.topic  3.MSGID_PROD:发送消息者的标志  MSGID_PROD =ID:[computerName][…..]  注意computerName,不要使用中文,消息对象中会存储这个部分,解析connectID时会出现Bad String错误.  4.MSGID_SEQ:还不知用处  5.EXPIRATION:到期时间.  6.MSG:消息本身,Blob类型.  可以在JmsTemplate发送配置中,加上,5天的生命期,如果消息一直没有被处理,消息会被删除,但是表中会存在CONTAINER为 queue://ActiveMQ.DLQ的记录.也就是说,相当于将过期的消息发给了一个ActiveMQ自定义的删除队列..  注意:关于ActiveMQ的持久订阅消息删除操作  1.主题消息只有一条,所有订阅了这个消息的持久订阅者都要收到消息,只有所有订阅者收到消息并确认(Acknowledge)之后.才会删除.  说明:ActiveMQ支持批量(optimizeAcknowledge为true)确认,以提高性能  2.ActiveMQ执行删除Topic消息的cleanup()操作的时间间隔为5 minutes.. 
  • 23. 三可靠机制-1-设置消息优先级普通情况下可以确保将单个会话向目标发送的所有消息按其发送顺序传送至消费者。然而,如果为这些消息分配了不同的优先级,消息传送系统将首先尝试传送优先级较高的消息。 有两种方法设置消息的优先级: 1.使用setDeliveryMode方法,这样所有的消息都采用此传送模式; 2.使用send方法为每一条消息设置传送模式; 注意:消息级别分为0至9,其中0至4是普通级别,5至9为加急消息。允许消息过期默认情况下,消息永不会过期。如果消息在特定周期内失去意义,那么可以设置过期时间。 有两种方法设置消息的过期时间,时间单位为毫秒: 1.使用setTimeToLive方法为所有的消息设置过期时间; 2.使用send方法为每一条消息设置过期时间; 注意:是在消息发送端调用setTimeToLive方法。创建临时目标ActiveMQ通过createTemporaryQueue和createTemporaryTopic创建临时目标,这些目标持续到创建它的Connection关闭。只有创建临时目标的Connection所创建的客户端才可以从临时目标中接收消息,但是任何的生产者都可以向临时目标中发送消息。如果关闭了创建此目标的Connection,那么临时目标被关闭,内容也将消失。
  • 24. 三可靠机制-1-二、高级可靠机制创建持久订阅删除持久订阅 客户端调用session调用 unsubscribe(String name);之中参数name指之前向主题注册的clientID 消息订阅分为非持久订阅(non-durable subscription)和持久订阅(durable subscription),非持久订阅只有当客户端处于激活状态,也就是和ActiveMQ保持连接状态才能收到发送到某个主题的消息,而当客户端处于离线状态,这个时间段发到主题的消息将会丢失,永远不会收到。建立持久订阅的步骤: 1.  为连接设置一个客户ID; 2.  为订阅的主题指定一个订阅名称; 注意:上述组合必须唯一。 持久订阅例子:ActiveMQSession方法:TopicSubscriber createDurableSubscriber(Topic topic, String name) 非持久化订阅例子:session.createSubscriber(topic);//无需向connection指定clientId
  • 25. 三可靠机制-1-消息事务 在事务中生成或使用消息时,ActiveMQ跟踪各个发送和接收过程,并在客户端发出提交事务的调用时完成这些操作。如果事务中特定的发送或接收操作失败,则出现异常。客户端代码通过忽略异常、重试操作或回滚整个事务来处理异常。在事务提交时,将完成所有成功的操作。在事务进行回滚时,将取消所有成功的操作。 本地事务的范围始终为一个会话。也就是说,可以将单个会话的上下文中执行的一个或多个生产者或消费者操作组成一个本地事务。 不但单个会话可以访问 Queue 或 Topic (任一类型的 Destination ),而且单个会话实例可以用来操纵一个或多个队列以及一个或多个主题,一切都在单个事务中进行。 这样就引出了第三种消息模式:应答模式(通过代码例子演示)。 注意:不但单个会话可以访问 Queue 或 Topic (任一类型的 Destination ),而且单个会话实例可以用来操纵一个或多个队列以及一个或多个主题,一切都在单个事务中进行。
  • 26. 四高级特征-1-1、异步发送消息 ActiveMQ支持生产者以同步或异步模式发送消息。使用不同的模式对send方法的反应时间有巨大的影响,反映时间是衡量ActiveMQ吞吐量的重要因素,使用异步发送可以提高系统的性能。 例:使用Connection URI配置异步发送: cf = new ActiveMQConnectionFactory("tcp://locahost:61616?jms.useAsyncSend=true"); 在ConnectionFactory层面配置异步发送: ((ActiveMQConnectionFactory)connectionFactory).setUseAsyncSend(true); 2、消费者异步分派 在ActiveMQ4中,支持ActiveMQ以同步或异步模式向消费者分派消息。这样的意义:可以以异步模式向处理消息慢的消费者分配消息;以同步模式向处理消息快的消费者分配消息。 ActiveMQ默认以同步模式分派消息,这样的设置可以提高性能。但是对于处理消息慢的消费者,需要以异步模式分派。例:((ActiveMQConnectionFactory)connectionFactory).setDispatchAsync(false); 3、消费者优先级queue = new ActiveMQQueue("TEST.QUEUE?consumer.prority=9"); consumer = session.createConsumer(queue);4、独占的消费者queue = new ActiveMQQueue("TEST.QUEUE?consumer.exclusive=true"); consumer = session.createConsumer(queue); 注意:仅仅限于队列5、再次传送策略6、复合目标 Queue queue = new ActiveMQQueue("FOO.A,FOO.B,FOO.C"); producer.send(queue, someMessage); 7、消息预取
  • 27. 五与spring集成-1-Spring 集成JMS的优势 Spring框架提供一个模板机制来隐藏Java API细节。 Spring提供了JMSTemplate类,所以开发者不必为JMS实现编写样本代码。当开发JMS应用时,Spring提供了一下一些优势: 1、提供了一个JMS的抽象API,简化了JMS的使用。如:访问目的地(队列或主体)和出版消息到特定目的地。 2、JEE开发者不必关心JMS不同版本之间的差异(如JMS 1.0.2 同 JMS 1.1); 3、开发者不必特定地处理JMS异常,因为Spring为JMS代码抛出的任何JMS异常提供了一个unchecked异常。 一旦你在JMS应用中开始使用Spring,你将会欣赏到异步消息处理的简易性。Spring JMS框架提供了各种java类使JMS开发变得简单。
  • 28. 五与spring集成-1-配置ConnectionFactory ConnectionFactory是用于产生到JMS服务器的链接的ActiveMQ为我们提供了一个PooledConnectionFactory,通过往里面注入一个ActiveMQConnectionFactory可以用来将Connection、Session和MessageProducer池化,这样可以大大的减少我们的资源消耗。当使用PooledConnectionFactory时,我们在定义一个ConnectionFactory时应该是如下定义:
  • 29. 五与spring集成-1-配置目的地 在ActiveMQ中实现了两种类型的Destination,一个是点对点的ActiveMQQueue,另一个就是支持订阅/发布模式的ActiveMQTopic。在定义这两种类型的Destination时我们都可以通过一个name属性来进行构造,如:
  • 30. 五与spring集成-1-配置生产者 生产者负责产生消息并发送到JMS服务器,这通常对应的是我们的一个业务逻辑服务实现类。但是我们的服务实现类是怎么进行消息的发送的呢?这通常是利用Spring为我们提供的JmsTemplate类来实现的,所以配置生产者其实最核心的就是配置进行消息发送的JmsTemplate。对于消息发送者而言,它在发送消息的时候要知道自己该往哪里发,为此,我们在定义JmsTemplate的时候需要往里面注入一个Spring提供的ConnectionFactory对象。 以下配置的是 发送队列消息的jmsTemplate。
  • 31. 五与spring集成-1-以下配置的是发送主题消息的jmsTemplate,需要指明是否为发布订阅者模式。
  • 32. 五与spring集成-1-定义producerService,运用jmsTemplate发送消息
  • 33. 五与spring集成-1-配置消费者 Spring为我们封装的消息监听容器MessageListenerContainer实现的,它负责接收信息,并把接收到的信息分发给真正的MessageListener进行处理。每个消费者对应每个目的地都需要有对应的MessageListenerContainer。对于消息监听容器而言,除了要知道监听哪个目的地之外,还需要知道到哪里去监听,也就是说它还需要知道去监听哪个JMS服务器,这是通过在配置MessageConnectionFactory的时候往里面注入一个ConnectionFactory来实现的。所以我们在配置一个MessageListenerContainer的时候有三个属性必须指定,一个是表示从哪里监听的ConnectionFactory;一个是表示监听什么的Destination;一个是接收到消息以后进行消息处理的MessageListener。Spring一共为我们提供了两种类型的MessageListenerContainer,SimpleMessageListenerContainer和DefaultMessageListenerContainer。 SimpleMessageListenerContainer会在一开始的时候就创建一个会话session和消费者Consumer,并且会使用标准的JMS MessageConsumer.setMessageListener()方法注册监听器让JMS提供者调用监听器的回调函数。它不会动态的适应运行时需要和参与外部的事务管理。兼容性方面,它非常接近于独立的JMS规范,但一般不兼容Java EE的JMS限制。 大多数情况下我们还是使用的DefaultMessageListenerContainer,跟SimpleMessageListenerContainer相比,DefaultMessageListenerContainer会动态的适应运行时需要,并且能够参与外部的事务管理。它很好的平衡了对JMS提供者要求低、先进功能如事务参与和兼容Java EE环境。
  • 34. 五与spring集成-1-以下配置的是接收主题消息的接收者
  • 35. 五与spring集成-1-在consumer端配置重发机制
  • 36. 五与spring集成-1-以下配置的是接收队列消息的接收者
  • 37. 五与spring集成-1-消息监听器MessageListener 在Spring整合JMS的应用中我们在定义消息监听器的时候一共可以定义三种类型的消息听器,分别是MessageListener、SessionAwareMessageListener和MessageListenerAdapter。 MessageListener是最原始的消息监听器,它是JMS规范中定义的一个接口。其中定义了一个用于处理接收到的消息的onMessage方法,该方法只接收一个Message参数。我们前面在讲配置消费者的时候用的消息监听器就是MessageListener,
  • 38. 五与spring集成-1- SessionAwareMessageListener是Spring为我们提供的,它不是标准的JMS MessageListener。MessageListener的设计只是纯粹用来接收消息的,假如我们在使用MessageListener处理接收到的消息时我们需要发送一个消息通知对方我们已经收到这个消息了,那么这个时候我们就需要在代码里面去重新获取一个Connection或Session。SessionAwareMessageListener的设计就是为了方便我们在接收到消息后发送一个回复的消息,它同样为我们提供了一个处理接收到的消息的onMessage方法,但是这个方法可以同时接收两个参数,一个是表示当前接收到的消息Message,另一个就是可以用来发送消息的Session对象
  • 39. 五与spring集成-1- MessageListenerAdapter类实现了MessageListener接口SessionAwareMessageListener接口,它的主要作用是将接收到的消息进行类型转换,然后通过反射的形式把它交给一个普通的Java类进行处理。        MessageListenerAdapter会把接收到的消息做如下转换:        TextMessage转换为String对象;        BytesMessage转换为byte数组;        MapMessage转换为Map对象;        ObjectMessage转换为对应的Serializable对象。  当我们使用 MessageListenerAdapter 来作为消息监听器的时候,我们可以为它指定一个对应的 MessageConverter ,这样 Spring 在处理接收到的消息的时候就会自动地利用我们指定的 MessageConverter 对它进行转换,然后把转换后的 Java 对象作为参数调用指定的消息处理方法。这里我们再把前面讲解 MessageListenerAdapter 时定义的 MessageListenerAdapter 拿来做一个测试,我们指定它的 MessageConverter 为我们定义好的 EmailMessageConverter 。
  • 40. 五与spring集成-1-消息转换器MessageConverter MessageConverter的作用主要有两方面,一方面它可以把我们的非标准化Message对象转换成我们的目标Message对象,这主要是用在发送消息的时候;另一方面它又可以把我们的Message对象转换成对应的目标对象,这主要是用在接收消息的时候。 下面我们就拿发送一个对象消息来举例,假设我们有这样一个需求:我们平台有一个发送邮件的功能,进行发送的时候我们只是把我们的相关信息封装成一个JMS消息,然后利用JMS进行发送,在对应的消息监听器进行接收到的消息处理时才真正的进行消息发送。
  • 41. 五与spring集成-1-
  • 42. 五与spring集成-1-  因为我们给 MessageListenerAdapter 指定了一个 MessageConverter ,而且是一个EmailMessageConverter ,所以当 MessageListenerAdapter 接收到一个消息后,它会调用我们指定的 MessageConverter 的 fromMessage 方法把它转换成一个 Java 对象,根据定义这里会转换成一个 Email 对象,
  • 43. 五与spring集成-1- Spring 已经为我们实现了一个简单的 MessageConverter ,即 org.springframework.jms.support.converter.SimpleMessageConverter ,其实 Spring 在初始化 JmsTemplate 的时候也指定了其对应的 MessageConverter 为一个 SimpleMessageConverter,所以如果我们平常没有什么特殊要求的时候可以直接使用 JmsTemplate 的 convertAndSend 系列方法进行消息发送,而不必繁琐的在调用 send 方法时自己 new 一个 MessageCreator 进行相应 Message 的创建。
  • 44. 五与spring集成-1-事务管理 Spring提供了一个JmsTransactionManager用于对JMS ConnectionFactory做事务管理。这将允许JMS应用利用Spring的事务管理特性。JmsTransactionManager在执行本地资源事务管理时将从指定的ConnectionFactory绑定一个ConnectionFactory/Session这样的配对到线程中。JmsTemplate会自动检测这样的事务资源,并对它们进行相应操作。 在Java EE环境中,ConnectionFactory会池化Connection和Session,这样这些资源将会在整个事务中被有效地重复利用。在一个独立的环境中,使用Spring的SingleConnectionFactory时所有的事务将公用一个Connection,但是每个事务将保留自己独立的Session。 JmsTemplate可以利用JtaTransactionManager和能够进行分布式的 JMS ConnectionFactory处理分布式事务。        在Spring整合JMS的应用中,如果我们要进行本地的事务管理的话非常简单,只需要在定义对应的消息监听容器时指定其sessionTransacted属性为true,
  • 45. 五与spring集成-1-  如果想接收消息和数据库访问处于同一事务中,那么我们就可以配置一个外部的事务管理同时配置一个支持外部事务管理的消息监听容器(如DefaultMessageListenerContainer)。要配置这样一个参与分布式事务管理的消息监听容器,我们可以配置一个JtaTransactionManager,当然底层的JMS ConnectionFactory需要能够支持分布式事务管理,并正确地注册我们的JtaTransactionManager。这样消息监听器进行消息接收和对应的数据库访问就会处于同一数据库控制下,当消息接收失败或数据库访问失败都会进行事务回滚操作。
  • 46. 谢 谢 !