activemq存储

DarrelMendo 8年前

来自: http://my.oschina.net/goudingcheng/blog/606994


activemq消息存储
1.1activemq存储有两种方式
1.1.1持久化
即使发送者和接受者不是同时在线或者消息中心在发送者发送消息后宕机了,在消息 中心重新启动后仍然可以将消息发送出去,如果把这种持久化和
ReliableMessaging结合起来应该是很好的保证了消息的可靠传送。消息持久性的原理很简单,就是在发送者将消息发送出去后,消息中心首先将消息
存储到本地数据文件、内存数据库或者远程数据库等,然后试图将消息发送 给接收者,发送成功则将消息从存储中删除,失败则继续尝试。
1.1.1非持久化。非持久化典型的运用在发送通知,或者实时的数据

1.2activemq是怎么存储消息的
queens和toptic是采用不同的存储方式
采取先进先出模式,同一时间,消息只会发送给某一个消费者,只有当该消息被消费并告知已收到时,它才能在代理的存储中被删除。
对于持久性订阅来说,每一个消费者都会获取消息的拷贝。为了节约空间,代理的存储介质中只存储了一份消息,存储介质的持久订阅对象为其以后的被存储的消息维护了一个指针,
消费者消费时,从存储介质中复制一个消息。消息被所有订阅者获取后才能删除。
1.3kahadb消息存储
   Kahadb是activemq从版本5.4之后的默认消息存储引擎。他是基于文件(意味着第三方数据库并不是先决条件),事务的消息存储引擎,这个消息存储能够使
activemq下载和使用非常的快,kahadb使用事务log作为他的索引(所有destination仅仅有一个索引文件),在生产环境中支持10000个active的连接,每个连接都有单独的queen
    消息存储机制是消息中间件最重要的核心部件和性能提升点。一直想对它做一个完整分析,这次趁有时间对kahadb做一个较完整分析。
Kahadb是基于B-tree算法的,具体原理fusesource给了个原理说明(http://fusesource.com/docs/broker/5.4/persistence/KahaDB-Overview.html),下面我们从代码实现角度进行一个较深入的分析。下面所有的分析都是基于activeMQ 5.4.3版本源码,该版本里的kahadb版本是V3,且是基于queue进行的源码分析,topic的实现虽然有不少差异,但整体可参考queue的。

1.3.1配置Kahadb
package com.activemq.store;

import java.io.File;
import java.io.IOException;

import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.store.kahadb.KahaDBStore;

public class EmbeddedBrokerUsingAMQStoreExample {
    public BrokerService createBrokerService() throws Exception{
        
        BrokerService broker=new BrokerService();
        File dataFileDir=new File("target/amq-in-action/kahadb");
        KahaDBStore kaha=new KahaDBStore();
        kaha.setDirectory(dataFileDir);
        kaha.setJournalMaxFileLength(1024*100);
        kaha.setIndexWriteBatchSize(100);
        kaha.setEnableIndexWriteAsync(true);
        broker.setPersistenceAdapter(kaha);
        broker.addConnector("tcp://localhost:61616");
        broker.start();
        return broker;
        
    }
}

在activemq里配置<persistenceAdapter>
<broker brokerName="broker" persistent="true" useShutdownHook="false">
....
<persistenceAdapter>
<kahaDB directory="active-data" journalMaxFileLength="16mb">
</persistenceAdapter>
.....
</broker>

2kahaDBm 消息存储目录结构
2.1.1db-*.log:
存放完整的每条消息(包括事务、目的地、id、优先级、具体内容等)和producerSequenceIdTracker(用来验证每个消息生成者发送的消息是否重复的数据结构)。它随着消息数量的增多,如每32M一个文件,
文件名按照数字进行编号,如db-1.log、db-2.log、db-3.log …
2.1.2db.data:
通过存放多个Btree数据结构来保存各类重要信息,下面一一进行介绍:
 Metadata类的destinations:用来保存该broker上有哪些Queue或队列
 StoredDestination类的orderIndex属性中的
defaultPriorityIndex、lowPriorityIndex、highPriorityIndex,这3个btree是为消息优先级排序而设计的(应该是版本5.4引入的,唉,有时候一个功能的引入带来的代价可能比较大)。它们的主要作用
是为AbstractStoreCursor类的doFillBatch方法服务的,也就是常说的消息指针(message cursors)。当消息指针需要从磁盘文件中装载一批消息的时候会使用这3个btree实例(kahadb版本小于2的不支
持lowPriorityIndex、highPriorityIndex)StoredDestination类的locationIndex:该btree的主要作用包括:
. 系统重启进行恢复操作的时候,要移除掉不在db-*.log文件里的消息;
. 在系统进行定时checkpointUpdate时使用
2.1.3archive directory
StoredDestination类的messageIdIndex:该btree的主要作用是消息确认acknowledge操作时,通过消息ID在messageIdIndex中删除对应的记录,并依据返回的值删除orderIndex和locationIndex中的记
录上面这些就是kahadb中最主用的btree实例。
2.1.4db.redo:用来恢复Btree索引
Storage for Queue: 消息存储是按照First in, First out(FIFO)的顺序存储的,One message is dispatched to a  singleconsumer at a time. 只有当消息被消费或被确认时,它才会被从消息存
储中删除。For durable sunscribers to a topic: 每个Consumer获取一个消息的Copy。为了节省存储空间,每个消息只有一个Copy被Broker存储。存储维护Durable Subscriber的指针,该指针指向下一
个存储的消息,并且分发该消息的Copy给它的Consumer. 因为在这种情况下,每个消息都有多个潜在消费者,所以消息只有当被成功的传递给每个注册的Durabel Subscriber后,才会被删除KahaDB消息存储
联合使用快速的事务处理:Journal以及数据日志文件,该日志文件是消息ID的索引,并且 在内存中缓存消息AMQ消息存储,类似于KahaDB。它使用了journal事务来确保稳定的持久、恢复、高性能的索引。
这种存储方式主要用于大数据(消息)量的存储。对每个索引文件,有两个单独的文件;一个用于每个Destination。如果每个Broker有上千个Queue,此时使用AMQ消息存储是不合适的,并且恢复数据也很慢,
因为所有的索引文件需要rebuid,需要Broker扫描所有的Data logs去再次构建索引。
2.2下表是KahaDB的配置选项:

 KahaDB Properties

property name                      default value                                     Comments

directory                          activemq-data                     the path to the directory to use to store the message store data and log files

IndexDirectory                                                        If set, configures where the KahaDB index files will be stored. If not set, the index
                                                                      files are stored in the directory specified by the 'directory' attribute.
                                                                       Available as of ActiveMQ 5.10

indexWriteBatchSize                1000                               number of indexes written in a batch

indexCacheSize                     10000                              number of index pages cached in memory

enableIndexWriteAsync              false                              if set, will asynchronously write indexes

journalMaxFileLength               32mb                               a hint to set the maximum size of the message data logs

enableJournalDiskSyncs             true                               ensure every journal write is followed by a disk sync (JMS durability requirement)

cleanupInterval                    30000                              time (ms) before checking for a discarding/moving message data logs that are no longer used

checkpointInterval                 5000                               time (ms) before checkpointing the journal

ignoreMissingJournalfiles          false                              If enabled, will ignore a missing message log file

checkForCorruptJournalFiles        false                              If enabled, will check for corrupted Journal files on startup and try and recover them

checksumJournalFiles               false truev5.9                     create a checksum for a journal file - to enable checking for corrupted journals

archiveDataLogs                    false                              If enabled, will move a message data log to the archive directory instead of deleting it.

directoryArchive                   null                               Define the directory to move data logs to when they all the messages they contain have been consumed.

maxAsyncJobs                       10000                              the maximum number of asynchronous messages that will be queued awaiting storage (should be the same
                                                                      as the number of concurrent MessageProducers)

concurrentStoreAndDispatchTopics   false                              enable the dispatching of Topic messages to interested clients to happen concurrently with message
                                                                      storage (Warning: Enabling this property is not recommended)

concurrentStoreAndDispatchQueues   true                                  enable the dispatching of Queue messages to interested clients to happen concurrently with message storage

archiveCorruptedIndex              false                              If enabled, corrupted indexes found at startup will be archived (not deleted)

preallocationStrategy              sparse_file                        (as of 5.12.0) This setting configures how the broker will try to preallocate the journal files when a new journal file is needed. The default allocation strategy sets the file length, but does not populate it with any data. The 'os_kernel_copy' strategy delegates the preallocation to the Operating System. The 'zeros' strategy configures ActiveMQ to do the preallocation by writing 0x00 to all of the positions in the journal file.

2.3amq消息存储
amq是一种结合事务用来可持久化高性能的索引他能够使存储消息采用最好的方式,但是由于每个索引都采用2个独立的文件,这里每个destination都有一个索引,在拥有成千上万个queen的情况下不应该采用amq
并且如果amq关闭的不彻底的话,恢复将会变得特别慢,,这是因为amq要重建索引, AMQ消息存储,类似于KahaDB。它使用了journal事务来确保稳定的持久、恢复、高性能的索引。这种存
储方式主要用于大数据(消息)量的存储。对每个索引文件,有两个单独的文件;一个用于每个Destination。如果每个Broker有上千个Queue,此时使用AMQ消息存储是不合适的,并且恢复数据也很慢,因为所有的索引文
件需要rebuid,需要Broker扫描所有的Data logs去再次构建索引。
2.3.1amq消息内部
1.data log作为消息日志
2.缓存(cache)
3.the reference store
the amq消息存储目录结构
1.a lock file:确保仅仅一个broker能访问数据
2.a temp-storeage directory用来存储非持久化消息
3.the kr-store 这个目录结构被amq消息存储引用(the data &&state directory)
4.日志目录
5备份目录

配置amq消息存储
<?xml version="1.0" encoding="UTF-8"?>
<beans>
    <broker xmlns="http://activemq.apache.org/schema/core">
        <persistenceAdapter>
            <amqPersistenceAdapter directory="target/Broker2-data/activemq-data"
                syncOnWrite="true" indexPageSize="16kb" indexMaxBinSize="100"
                maxFileLength="10mb" />
        </persistenceAdapter>
    </broker>
</beans>

5.4jdbc消息存储
JDBC消息存储由三个表组成,,两个表用来保存消息,第三个用来锁表,确保只有一个activemq能够访问数据库
Activemq_msgs sqltable

colunmn                default type                      desc
id                     integer                           the sequeence id used to retrieve the message
container              varchar(250)                      the destination of the message
msgid_prod             varchar(250)                      the id of the message producer
msgid_seq              integer                           the producer sequeence numer for the message , this together with the msgid_prod is equivalent to the jms messageid
expiration             bigint                            the time in milliseconds when the message will expire
msg                    blob                              the serialized message itself
topics 和queen被分解和存储在Activemq_msgs
这里有两张独立的表用来保存持久的消息订阅和id for 持久化消息订阅者收到的上一条消息activemq
activemq_acks sql table

column name                default type                  desc
container                  varchar(250)                  the destination of the message
sub_dest                   varchar(250)                  the destination of the durable subscriber
client_id                  varchar(250)                  the client of the durable subscriber
sub_name                   varchar(250)                  the subscriber name of the durable subscriber
selector                   varchar(250)                  the selector of the durable subscriber
last_acked_id              integer                       the sequeence id of last message receive by this subcriber

对于持久化消息订阅者来说last_acked_id sequeence 被用来指向Activemq_msgs并且使他们 能够很容易从发ctivemq_msgs select出来

activemq——lock表
column name                default type                  desc'
id                         integer                       a unique id for the lock
broker name                varchar(250)                   the name of the active broker that has the lock
这章表用来确保只有一个activemqbroker 实例在一段时间内能够接入数据库,如果一个activemq broker 不能抓住database锁,, 代理就不能初始化, 并且等到lock free(或者shutdown)

配置jdbc message store
<?xml version="1.0" encoding="UTF-8"?>
<beans>
    <broker brokerName="test-broker"
    persistent="true"
    xmlns="http://activemq.apache.org/schema/core">
        <persistenceAdapter>
            <jdbcPersistenceAdapter dataSource="#mysql-ds"/>
        </persistenceAdapter>
    </broker>
    
    <bean id="mysql-ds" class="org.apache.commons.dbcp.BasicDataSource" destroy-method="close">
        <property name="driverClassName" value="com.mysql.jdbc.Driver"/>
        <property name="url" value="jdbc:mysql://localhost/activemq?relaxAutoCommit=true"/>
        <property name="username" value="root"/>
        <property name="password" value="admin"/>
        <property name="poolPreparedStatements" value="true"/>
    </bean>
</beans>

5.4.4利用jdbc消息存储active mq日志(改善journal日志)
<?xml version="1.0" encoding="UTF-8"?>
<beans>
    <broker brokerName="test-broker"
    persistent="true"
    xmlns="http://activemq.apache.org/schema/core">
        <persistenceFactory>
            <journalPersistenceAdapterFactory
            journalLogFiles="4"
            journalLogFileSize="32768"
            useJournal="true"
            useQuikJournal="true"
            dataSource="derby-ds"
            dataDirectory="activemq-data" />
            
        </persistenceFactory>
    </broker>
    
    <bean id="derby-ds" class="org.apache.derby.jdbc.EmbeddedDataSource">
        <property name="databaseName" value="derbydb"/>
        <property name="createDatabase" value="create"/>
    </bean>
</beans>
5.5内存消息存储
<?xml version="1.0" encoding="UTF-8"?>
<beans>
    <broker brokerName="test-broker"
    persistent="false"
    xmlns="http://activemq.apache.org/schema/core">
    <transportConnectors>
        <transportConnector uri="tcp://localhost:61635">
    </transportConnectors>
    </broker>
    
</beans>
public BrokerService createEmbeddedBroker() throws Exception{
        
        BrokerService broker=new BrokerService();
        broker.setPersistent(false);
        broker.addConnector("tcp://localhost:61616");
        broker.start();
        return broker;
        
    }
    
    public void createRetroactiveConsumer() throws Exception {
        ConnectionFactory fac = new ActiveMQConnectionFactory();
        Connection connection = fac.createConnection();
        connection.start();
        Session session = connection.createSession(false,
                Session.AUTO_ACKNOWLEDGE);
        Topic topic = session
                .createTopic("TEST.TOPIC?consumer.retroactive=true");
        MessageConsumer consumer = session.createConsumer(topic);

    }