RabbitMQ入门之安装配置与简单实例

jopen 10年前

简介


   由于某些原因,今天接触了一下一个新的东西RabbitMQ( http://www.rabbitmq.com/)总的来说给人的感觉就是安装简单方便,同时功能强大。而且官网也给出了几个相当实用的例子, 不管关于消息队列的持久化却并没有提及,关于持久化的问题我会在后面的文章中再详细说明。不过在天朝想要直接访问RabbitMQ官网有些困难,所以建议 还是安装一下fangqiang工具goagent 在Windows和Linux下都可以使用,具体信息还是自己看官网https://code.google.com/p/goagent/


RabbitMQ的主要特点

    支持一对多方式

        多个Queue绑定到一个Exchange后,通过向Exchange发送消息,就可将信息转发到多个绑定到Exchange的Queue中,

    消息持久化

        如果对消息进行了持久话处理,那么消息队列都将保存到服务器中,即使RabbitMQ服务器停止,下一次启动消息依然存在

    消息序列一致性

        每个消费者对自己的Queue操作,由于Queue是消息是队列形式保存,所以可以保证绑定到同一Exchange的消息队列的信息序列是一致的

    状态一致性

        对于消费者可以通过设置信息分发方式,让消费者每次只从队列种取出一条信息,操作完成并确认后才发送下一条信息,当操作出现异常如宕机,未完成的消息依然保存在服务器,可以保证在下一次消费者程序启动后可以从上一次操作未完成的位置继续执行。

 

 

Linux下安装RabbitMQ



   rabbitMQ是一个消息中间件,负责消息的接受和传递。openstack中貌似也是使用rabbitMQ作为消息中间件,这也是我们选择rabbitMQ的主要原因。保持一致性嘛。
   关于rabbitMQ的安装这里使用APT的安装方式,只要网络比较好,安装起来还是很快的
   首先添加一下内容到 /etc/apt/sources.list中

1
deb http: //www .rabbitmq.com /debian/ testing main
</div> </div>
为了避免安装时出现错误我们需要将rabbitMQ的公钥添加到我们的信任列表中

1
2
wget http: //www .rabbitmq.com /rabbitmq-signing-key-public .asc
sudo apt-key add rabbitmq-signing-key-public.asc
</div> </div> 直接运行 apt-get update </div>
最后直接安装即可

1
sudo apt-get install rabbitmq-server
</div> </div> 通过这种方式安装可以避免繁琐的配置方式,对于需要快速了解rabbitMQ的人,可以避免安装过程产生的各种麻烦事情。 </div>
安装完成后Rabbit服务器将自动启动,RabbitMQ提供了一些简单实用的命令用于管理服务器运行状态,以及查询服务器状态信息
下面说几个比较常用的,熟悉这些命令对之后分析服务器状态,以及持久化内容都有一定帮组。RabbitMQ的命令都需要在管理员权限下执行
查看服务器运行状态: rabbitmq-server status
启动服务器:rabbitmq-server start
停止服务器:rabbitmq-server stop

查看服务器中所有的消息队列信息 :rabbitmqctl list_queues
查看服务器种所有的路由信息: rabbitmqctl list_exchanges
查看服务器种所有的路由与消息队列绑定信息 :rabbitmq list_bindings



RabbitMQ 之Hello World

     
    rabbitMQ作为一个消息服务中间件负责接受消息和转发,下面给出一个简单的例子用来说明这种工作方式。
一个简单的实现主要包含3种角色生产者(Producing),消息队列(queue),消费者(consuming)。当然rabbitMQ 的核心并不在于此,rabbitMQ核心主要包含Exchange和queue,消息持久化操作也是通过这两个核心来完成的。后面文章会有详细说明,现在 先看一下简单的消息队列实现的例子。
RabbitMQ入门之安装配置与简单实例

    生产者 Send.java
import com.rabbitmq.client.Channel;  import com.rabbitmq.client.Connection;  import com.rabbitmq.client.ConnectionFactory;        public class Send {          //消息队列名称       private final static String QUEUE_NAME= "hello" ;              public static void main(String[] args)  throws java.io.IOException{                      //创建链接工程           ConnectionFactory factory = new ConnectionFactory();           factory.setHost( "localhost" );           //创建链接           Connection connection = factory.newConnection();                      //创建消息通道           Channel channel = connection.createChannel();                      //生命一个消息队列           channel.queueDeclare(QUEUE_NAME, false , false , false , null );                      String message = "Hello World" ;                      //发布消息,第一个参数表示路由(Exchange名称),未""则表示使用默认消息路由           channel.basicPublish( "" , QUEUE_NAME, null , message.getBytes());                      System.out.println( " [x] Sent '" +message+ "'" );                     //关闭消息通道和链接           channel.close();           connection.close();                  }         }

</div>     消费者Recv.java </div>
import com.rabbitmq.client.Channel;  import com.rabbitmq.client.Connection;  import com.rabbitmq.client.ConnectionFactory;  import com.rabbitmq.client.QueueingConsumer;     public class Recv {          //消息队列名称       private final static String QUEUE_NAME= "hello" ;              public static void main(String[] args)  throws java.io.IOException,java.lang.InterruptedException{                      //创建链接工厂           ConnectionFactory factory = new ConnectionFactory();           factory.setHost( "localhost" );           //创建链接           Connection connection = factory.newConnection();                      //创建消息信道           Channel channel = connection.createChannel();                      //生命消息队列           channel.queueDeclare(QUEUE_NAME, false , false , false , null );           System.out.println( "[*] Waiting for message. To exist press CTRL+C" );                      //消费者用于获取消息信道绑定的消息队列中的信息           QueueingConsumer consumer = new QueueingConsumer(channel);                      channel.basicConsume(QUEUE_NAME, true ,consumer);                      while ( true ){                              //循环获取消息队列中的信息               QueueingConsumer.Delivery delivery = consumer.nextDelivery();               String message = new String(delivery.getBody());               System.out.println( "[x] Received '" +message+ "'" );                          }                  }         }

</div> </div>     首先运行Send.java 发送消息到服务器种,再运行Recv.java即可获得消息队列中的信息。 </div>     这里并没有做消息的持久化操作,由于某些原因在程序运行过程中可能出现服务器宕机,以及其他外在因素使程序运行出现出错,消息的持久化就是使服务 器即使停止后下一次启动依然能保持上一次操作消息队列的状态。可以保证程序依然能从异常开始的地方重新执行。也就是为什么要对消息进行持久化的主要原因