activemq特性

tian1216 8年前

来自: http://my.oschina.net/goudingcheng/blog/614081


这一节,我们将要看一下2个有用的activemq特点:用通配符订阅多个destination,用组合发布多重destionation
activemq支持destination的层次结构【topic和queen】便于归类和管理。
通配符有三个:
.  用来分隔路径
* 用来匹配路径中的一节
> 用来匹配任意节的路径
opics: <sport><League>.<team>  
。例如: football.division.leeds。 如果leeds 参加两种运动--Scccer 和 Rugby,为了方便,我们希望通过一个消息消费者而看到Leeds两种运动的最新战绩,这个时候,通配符就有用武之地了
.  : used to separate elements in the destination name
*  : used to match one element    
>  : match one or all trailing elements
所以,对于上面的例子, 你可以订阅这样的主题: *.*.Leeds
如果你想知道division1 这个赛区的所有分数, 你可以订阅这个: soccer.division1.*
如果你想知道Rugby的分数: 你可以订阅这个: rugby.>.
然而, 通配符中是为消费者服务的,如果你发送了这样的一个主题: rugby.>., 这个消息仅会发送到命名了rugby.>.的主题,并不是所有的主题都是以rugby开头的。
这里有一种  方法,使消息生产者能将一条
消息发送到多个目的地。通过使用   composite destination。
将同一条消息发送到不同的目的地是很有用的。 比如一个用来存储信息的应用,会发送一条消息给队列
同时也要将这条消息广播给监控的所有系统。通常,你会通过用两个producer 发送两次消息来达到这个目的。composite destination就是用来解决这种情况的
例如,如果你创建了名子为: store.order.backoffice,store.order.warehouse 的 Queue,这样 就会发送同时两个Queue。
订阅信息     解释
PRICE.>     Any price for any product on any exchange
PRICE.STOCK.>     Any price for a stock on any exchange
PRICE.STOCK.NASDAQ.*     Any stock price on NASDAQ
PRICE.STOCK.*.IBM     Any IBM stock price on any exchange
从5.5 版本以后,可以自定义路径分隔符:

    <plugins>
       .....
       <destinationPathSeparatorPlugin/>
    </plugins>

此时FOO.BAR.* 可以表示为 FOO/BAR/*
也可以通过pathSeparator 属性定义其他符号位路径分隔符。
   public void subscribeToLeeds() throws JMSException {
        String brokerURI = ActiveMQConnectionFactory.DEFAULT_BROKER_URL;
        ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(brokerURI);
        Connection connection = connectionFactory.createConnection();
        connection.start();
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        Topic allLeeds = session.createTopic("*.*.Leeds");
        MessageConsumer consumer = session.createConsumer(allLeeds);
        Message result = consumer.receive();
    }
    11.1.2发送一个message到多重destinations
    发送相同的message到不同的destination上:案列发送一个[queen,opic]组合模式,默认的组合destination用,分隔
    列如store.order.backoffice,store.order.warehouse
     String brokerURI = ActiveMQConnectionFactory.DEFAULT_BROKER_URL;
        ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(brokerURI);
        Connection connection = connectionFactory.createConnection();
        connection.start();
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        Queue ordersDestination = session.createQueue("store.orders, topic://store.orders");
        MessageProducer producer = session.createProducer(ordersDestination);
        Message order = session.createObjectMessage();
        producer.send(order);
11.2通知消息
单的说就是实现了ActiveMQ的broker上各种操作的记录跟踪和通知。

使用这个功能,你可以实时的知道broker上

    创建或销毁了连接,
    添加或删除了生存者或消费者,
    添加或删除了主题或队列,
    有消息发送和接收,
    什么时候有慢消费者,
    什么时候有快生产者
    什么时候什么消息被丢弃
    什么时候broker被添加到集群(主从或是网络连接)

这个机制是ActiveMQ对JMS协议的重要补充,也是基于JMS实现的ActiveMQ的可管理性的一部分。多个ActiveMQ的相互协调和互操作的基础设置。
 String brokerURI = ActiveMQConnectionFactory.DEFAULT_BROKER_URL;
        ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(brokerURI);
        Connection connection = connectionFactory.createConnection();
        connection.start();
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        Topic connectionAdvisory = org.apache.activemq.advisory.AdvisorySupport.CONNECTION_ADVISORY_TOPIC;
        MessageConsumer consumer = session.createConsumer(connectionAdvisory);
        ActiveMQMessage message = (ActiveMQMessage) consumer.receive();
        DataStructure data = (DataStructure) message.getDataStructure();
        if (data.getDataStructureType() == ConnectionInfo.DATA_STRUCTURE_TYPE) {
            ConnectionInfo connectionInfo = (ConnectionInfo) data;
            System.out.println("Connection started: " + connectionInfo);
        } else if (data.getDataStructureType() == RemoveInfo.DATA_STRUCTURE_TYPE) {
            RemoveInfo removeInfo = (RemoveInfo) data;
            System.out.println("Connection stopped: " + removeInfo.getObjectId());
        } else {
            System.err.println("Unknown message " + data);
        }
        大多数advisor消息都是完整的对于destiation,但是呢advisorysupport类有一些方法来决定监听哪个advisorytopic,你也能使用通配符-
             String brokerURI = ActiveMQConnectionFactory.DEFAULT_BROKER_URL;
        ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(brokerURI);
        Connection connection = connectionFactory.createConnection();
        connection.start();
        // Lets first create a Consumer to listen too
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        // Lets first create a Consumer to listen too
        Queue queue = session.createQueue("test.Queue");
        MessageConsumer testConsumer = session.createConsumer(queue);
        // so lets listen for the Consumer starting/stoping
        Topic advisoryTopic = org.apache.activemq.advisory.AdvisorySupport.getConsumerAdvisoryTopic(queue);
        MessageConsumer consumer = session.createConsumer(advisoryTopic);
        consumer.setMessageListener(new MessageListener() {
            public void onMessage(Message m) {
                ActiveMQMessage message = (ActiveMQMessage) m;
                try {
                    System.out.println("Consumer Count = " + m.getStringProperty("consumerCount"));
                    DataStructure data = (DataStructure) message.getDataStructure();
                    if (data.getDataStructureType() == ConsumerInfo.DATA_STRUCTURE_TYPE) {
                        ConsumerInfo consumerInfo = (ConsumerInfo) data;
                        System.out.println("Consumer started: " + consumerInfo);
                    } else if (data.getDataStructureType() == RemoveInfo.DATA_STRUCTURE_TYPE) {
                        RemoveInfo removeInfo = (RemoveInfo) data;
                        System.out.println("Consumer stopped: " + removeInfo.getObjectId());
                    } else {
                        System.err.println("Unknown message " + data);
                    }
                } catch (JMSException e) {
                    e.printStackTrace();
                }
            }
        });
        testConsumer.close();
         <destinationPolicy>
            <policyMap>
              <policyEntries>
                <policyEntry topic=">" advisoryForSlowConsumers="true">
                </policyEntry>
              </policyEntries>
            </policyMap>
        </destinationPolicy>

ActiveMQ中,topic只有在持久订阅(durablesubscription)下是持久化的。存在持久订阅时,每个持久订阅者,都相当于一个持久化的queue的客户端,它会收取所有消息。这种情况下存在两个问题:

1.        同一应用内consumer端负载均衡的问题:同一个应用上的一个持久订阅不能使用多个consumer来共同承担消息处理功能。因为每个都会获取所有消息。queue模式可以解决这个问题,broker
端又不能将消息发送到多个应用端。所以,既要发布订阅,又要让消费者分组,这个功能jms规范本身是没有的。
2.        同一应用内consumer端failover的问题:由于只能使用单个的持久订阅者,如果这个订阅者出错,则应用就无法处理消息了,系统的健壮性不高,为了解决这两个问题,ActiveMQ中实现了虚拟
Topic的功能。使用起来非常简单。对于消息发布者来说,就是一个正常的Topic,名称以VirtualTopic.开头。例如VirtualTopic.TEST。对于消息接收端来说,是个队列,不同应用里使用不同的前缀作为
队列的名称,即可表明自己的身份即可实现消费端应用分组。例如Consumer.A.VirtualTopic.TEST,说明它是名称为A的消费端,同理Consumer.B.VirtualTopic.TEST说明是一个名称为B的客户端。
可以在同一个应用里使用多个consumer消费此queue,则可以实现上面两个功能。又因为不同应用使用的queue名称不同(前缀不同),所以不同的应用中都可以接收到全部的消息。每个客户端相当于一个持久订
阅者,而且这个客户端可以使用多个消费者共同来承担消费任务。
String brokerURI = ActiveMQConnectionFactory.DEFAULT_BROKER_URL;
        
        ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(brokerURI);
        Connection consumerConnection = connectionFactory.createConnection();
        consumerConnection.start();
        
        Session consumerSessionA = consumerConnection.createSession(false,Session.AUTO_ACKNOWLEDGE);
        Queue consumerAQueue = consumerSessionA.createQueue("Consumer.A.VirtualTopic.orders");
        MessageConsumer consumerA = consumerSessionA.createConsumer(consumerAQueue);
        
        Session consumerSessionB = consumerConnection.createSession(false,Session.AUTO_ACKNOWLEDGE);
        Queue consumerBQueue = consumerSessionB.createQueue("Consumer.B.VirtualTopic.orders");
        MessageConsumer consumerB = consumerSessionB.createConsumer(consumerAQueue);
        
        //setup the sender
        Connection senderConnection = connectionFactory.createConnection();
        senderConnection.start();
        Session senerSession = senderConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        Topic ordersDestination = senerSession.createTopic("VirtualTopic.orders");
        MessageProducer producer = senerSession.createProducer(ordersDestination);
同样queue名称的消费者会平分所有消息。
从queue接收到的消息,message.getJMSDestination().toString()为topic://VirtualTopic.TEST,即原始的destination。消息的persistent属性为true,即每个相当于一个持久订阅。
Virtual Topic这个功能特性在broker上有个总开关,useVirtualTopics属性,默认为true,设置为false即可关闭此功能。
当此功能开启,并且使用了持久化的存储时,broker启动的时候会从持久化存储里拿到所有的destinations的名称,如果名称模式与Virtual Topics匹配,则把它们添加到系统的Virtual Topics列表中去。
当然,没有显式定义的Virtual Topics,也可以直接使用的,系统会自动创建对应的实际topic。
当有consumer访问此VirtualTopics时,系统会自动创建持久化的queue,并在每次Topic收到消息时,分发到具体的queue。



可追溯”消费者,只对Topic有效,如果consumer是可追溯的,那么它可以获取实例创建之前的消息。通常而言,订阅者不可能获取实例创建之前的消息,因为broker根本不知道它的存在。对于broker而言,如果
一个Topic通道创建,且有发布者发布消息(Publisher),那么broker将会在内存中(非持久化)或者磁盘中(持久化)保存已经发布的消息,直到所有的订阅者都消费者,才会清除原始消息内容。那么retroactive
类型的订阅者,就可以获取这些原本不属于自己但broker上还保存的旧消息,就像我们订阅一种Feed,可以立即获取旧的内容列表一样。如果此订阅者不是durable(耐久的),它可以获取最近发布的一些消息;如果是durable,它可以获取存储器中尚未删除的所有的旧消息。[下文会详细介绍Topic的数据转发模型]
//在destinationUrl中设置,默认为false
feedTopic?consumer.retroactive=true
在broker端,可以配置当前Topic默认为“可追溯的”,不过Topic并不会在此种情况下额外的保存消息,只不过表示订阅者默认都是可追溯的而已。
<!-- 只对topic有效,默认为false -->
<policyEntry topic="feedTopic" alwaysRetroactive="true" />
    String brokerURI = ActiveMQConnectionFactory.DEFAULT_BROKER_URL;
        ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(brokerURI);
        Connection connection = connectionFactory.createConnection();
        connection.start();
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        Topic topic = session.createTopic("soccer.division1.leeds?consumer.retroactive=true");
        MessageConsumer consumer = session.createConsumer(topic);
        Message result = consumer.receive();

 redeliveryPolicy
    consumer使用的重发策略,当消息在client端处理失败(比如onMessage方法抛出异常,事务回滚等),将会触发消息重发。对于Broker端,需要重发的消息将会被立即发送(如果broker端使用异步发送,
且发送队列中还有其他消息,那么重发的消息可能不会被立即到达Consumer)。我们通过此Policy配置最大重发次数、重发频率等,如果你的Consumer客户端处于不良网络环境中,可以适当调整相关参数。参数列表,
请参见(RedeliveryPolicy)
//在brokerUrl中设置
tcp://localhost:61616?jms.redeliveryPolicy.maximumRedeliveries=6
 . redeliveryPolicy
RedelieveryPolicy policy=connection.getRedelieveryPolicy();
policy.setInitialRedelieveryDelay(500);
policy.setBackOffMultiplier(2)
policy.setUseExponentialBackOff(true)
policy.setMaximumRedelieveries(2)

DLQ-死信队列(Dead Letter Queue)用来保存处理失败或者过期的消息。
出现以下情况时,消息会被redelivered
A transacted session is used and rollback() is called.
A transacted session is closed before commit is called.
A session is using CLIENT_ACKNOWLEDGE and Session.recover() is called.

当一个消息被redelivered超过maximumRedeliveries(缺省为6次,具体设置请参考后面的链接)次数时,会给broker发送一个"Poison ack",这个消息被认为是a poison pill,这时broker会将这
消息发送到DLQ,以便后续处理。缺省的死信队列是ActiveMQ.DLQ,如果没有特别指定,死信都会被发送到这个队列。缺省持久消息过期,会被送到DLQ,非持久消息不会送到DLQ可以通过配置文件(activemq.xml)
来调整死信发送策略
<destinationPolicy>

    <policyMap>

      <policyEntries>

        <!— 设置所有队列,使用 '>' ,否则用队列名称 -->

        <policyEntry queue=">">

          <deadLetterStrategy>

            <!--

                    queuePrefix:设置死信队列前缀

                    useQueueForQueueMessages: 设置使用队列保存死信,还可以设置useQueueForTopicMessages,使用Topic来保存死信

            -->

            <individualDeadLetterStrategy   queuePrefix="DLQ." useQueueForQueueMessages="true"  processExpired="false" processNonPersistent="false"/>

          </deadLetterStrategy>

        </policyEntry>

      </policyEntries>

    </policyMap>

  </destinationPolicy>

  ...

</broker>

        在一个电子系统中可能接受来自不同供应商的各种订单信息,不同类型的订单走的流程不尽相同,为了快速处理各种不同的订单完成不同的业务。特定义不同的路由 信息。根据路由信息的不同,将消息进行不同的处理。如果采用ActiveMQ那么最好采用apache-camel整合,使不同的消息根据不同的流程自动 处理到不同的队列中去。

 

<beans>

<broker brokerName="testBroker">

<transportConnectors>

<transportConnector uri="tcp://localhos:61616">

</transportConnectors>

<import resource="camel.xml">

</beans>