rabbitmq_getstarted[翻译]

tttracy 贡献于2016-05-11

作者 ES-BJ-PC-334  创建于2012-06-06 01:30:27   修改者  修改于2013-06-26 07:29:54字数27478

文档摘要:RabbitMQ是一个消息服务器。从本质上讲,它接收生产者生产的消息,并发送给消费者。期间,它可以根据用户给定的规则来路由、缓存、持久化这些消息。
关键词:

1 发送"Hello World"消息 简介 RabbitMQ是一个消息服务器。从本质上讲,它接收生产者生产的消息,并发送给消费者。期间,它可以根据用户给定的规则来路由、缓存、持久化这些消息。 以下是RabbitMQ用到的一些专业术语: 1.生产的意思就是发送。发送消息的程序就是生产者,如下图所示: 2. 队列就是接受消息的邮箱,由RabbitMQ提供。尽管消息在RabbitMQ和应用之间传递,但只能被封装在一个队列中。队列没有限制,可以储存任意多的消息--它本质是是一个无限的缓存。一个队列可以接收多个生产者发送来的消息,并可以把消息分发给多个消费者。队列可以用下图表示: 3.消费类似于接收。等待接收消息的程序就是消费者,如下图所示: 注意生产者、消费者和RabbitMQ服务器不必位于同一台机器上,事实上大部分都是如此。 "Hello World"示例 (使用Java版客户端) 此处有2个java程序:发送一个消息的生产者和一个接收消息并打印的消费者。为了上手方便,我们将忽略程序的细节,只专注于核心部分。发送的消息的内容是“Hello World”。 在下图中,“P”代表生产者,“C”代表消费者。中间的方块代表队列--RabbitMQ的消息缓存器,储存向消费者发送的消息。 Java依赖库 RabbitMQ使用AMQP协议,它是一个开放的、通用的消息协议。许多编程语言都实现了AMQP协议。这里我们使用Java版。 下载客户端依赖包,并核对一下它的签名。将其解压到你的工作目录中,并从此目录中获取JAR文件: $ unzip rabbitmq-java-client-bin-*.zip $ cp rabbitmq-java-client-bin-*/*.jar ./ (maven中央仓库也提供此依赖包, groupId是com.rabbitmq,artifactId是amqp-client.) 有了java依赖包,就可以编码了。 发送消息 我们将调用消息生产者Send和消息消费者Recv。生产者和RabbitMQ连接并发送一个消息,然后退出。 在Send.java中导入依赖类: import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.Connection; import com.rabbitmq.client.Channel; 创建Send类并给队列命名: public class Send { private final static String QUEUE_NAME = "hello"; public static void main(String[] argv) throws java.io.IOException { ... } } 然后和服务器之间建立一个连接: ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); Connection对象将socket连接抽象化,封装了协议的版本和身份验证等。使用“localhost”连接本地的RabbitMQ服务器。如果想连接到其它机器,只需要修改机器名或IP地址即可。 下一步创建一个消息通道(channel),通过它来完成大部分的工作。发送消息之前,必须先创建一个队列;然后将消息发送给它: channel.queueDeclare(QUEUE_NAME, false, false, false, null); String message = "Hello World!"; channel.basicPublish("", QUEUE_NAME, null, message.getBytes()); System.out.println(" [x] Sent '" + message + "'"); 创建队列代码如上,当且仅当该队列不存在时,才会被创建。消息是字节数组,这样你就可以任意地对消息的内容进行编码。 最后,我们关闭消息通道和连接; channel.close(); connection.close(); 这就是完整的Send.java类。 发送消息失败! 如果这是你第一次使用RabbitMQ而且没有看到发送的消息,那么你可能正在挠头中并很想知道哪儿错了。也许服务器启动时没有足够的磁盘空间(默认情况下它至少需要1GB的磁盘空间),因此它拒绝接收消息。检查一下服务器的logfile配置文件并减小将该限制的属性值。可以查看配置文件文档怎么设置disk_free_limit属性。 接收消息 上面就是消息生产者。消费者从RabbitMQ获取消息,和只发送一次消息的生产者不同,消费者会一直等待接收消息并将接收的消息打印出来。 Recv.java和Send.java基本需要引用同样的类: import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.Connection; import com.rabbitmq.client.Channel; import com.rabbitmq.client.QueueingConsumer; 新引用的类QueueingConsumer是用来接收从服务器发送来的消息。 和Send一样,要打开连接、创建消息通道、声明队列,从声明的队列中接收消息。请注意,生产者的队列必须和消费者的队列一致。 public class Recv { private final static String QUEUE_NAME = "hello"; public static void main(String[] argv) throws java.io.IOException, java.lang.InterruptedException { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.queueDeclare(QUEUE_NAME, false, false, false, null); System.out.println(" [*] Waiting for messages. To exit press CTRL+C"); ... } } 注意在这里也声明了队列。因为消费者可能在生产者之前启动,但必须确保消费者在尝试接收消息时队列就存在。 通知服务器把队列中的消息发送给我们。由于服务器以异步的方式发送消息,所以我们需要提供一个回调对象,当消息发送过来它就会通知我们。QueueingConsumer就是这个回调。 QueueingConsumer consumer = new QueueingConsumer(channel); channel.basicConsume(QUEUE_NAME, true, consumer); while (true) { QueueingConsumer.Delivery delivery = consumer.nextDelivery(); String message = new String(delivery.getBody()); System.out.println(" [x] Received '" + message + "'"); } QueueingConsumer.nextDelivery() 方法会阻塞直到下一条消息从服务器发送过来。 以上就是完整的Recv.java代码。 组合代码 同时编译这2个类,需要将rabbitmq-client.jar放到类路径下: $ javac -cp rabbitmq-client.jar Send.java Recv.java 运行上面两个程序需要将rabbitmq-client.jar等依赖拷贝到类路径下。在终端上运行生产者: $ java -cp .:commons-io-1.2.jar:commons-cli-1.1.jar:rabbitmq-client.jar Send 再运行消费者: $ java -cp .:commons-io-1.2.jar:commons-cli-1.1.jar:rabbitmq-client.jar Recv 在windows环境下,使用冒号而不是分号来分隔类路径下的jar包。 消费者打印从RabbitMQ接收的消息。消费者将一直运行,等待接收消息(使用Ctrl-C停止),所以请从另一个终端上运行生产者。 使用rabbitmqctl list_queues命令检查队列。 “Hello World”例子结束。 接下来请看教程2--创建一个简单的工作队列。 提示 为了方便可以用环境变量代理类路径: $ export CP=.:commons-io-1.2.jar:commons-cli-1.1.jar:rabbitmq-client.jar $ java -cp $CP Send 在windows中: > set CP=.;commons-io-1.2.jar;commons-cli-1.1.jar;rabbitmq-client.jar > java -cp %CP% Send 2 工作队列 工作队列 (使用Java版客户端) 教程1中的程序是从同一个队列中发送或接收消息。这里我们将创建一个向多个工作者发送耗时任务的队列。 工作队列(也叫:任务队列)的主要目的是在避免处理资源密集型任务时需要立即处理并且需要等待任务结束时才能完成。相反,我们安排这种任务稍后才被完成。我们把任务封装成消息并发送到队列。工作者进程在后台运行,接收任务并最终执行作业。当启动多个工作者时,任务将在他们之间共享。 这种思想在web程序中尤其重要,因为它不可能在一个很短的http请求窗口中处理一个复杂的任务。 准备 教程1的程序发送消息“Hello World”。这里发送的消息是代表任务复杂度的字符串。由于没有现实中的任务,例如修改图片或者创建pdf文件,所以我们用Thread.sleep()方法来模拟耗时任务。字符串中点的个数代表任务的复杂度;每一个点代表一秒钟的任务。例如,“Hello...”代表3秒钟的任务。 稍微修改一下上个教程例子的Send.java的代码,允许从命令行发送任意消息。这个程序计划给工作队列发送多个任务,文件名修改为NewTask.java: String message = getMessage(argv); channel.basicPublish("", "hello", null, message.getBytes()); System.out.println(" [x] Sent '" + message + "'"); 从命令行参数中获取消息: private static String getMessage(String[] strings){ if (strings.length < 1) return "Hello World!"; return joinStrings(strings, " "); } private static String joinStrings(String[] strings, String delimiter) { int length = strings.length; if (length == 0) return ""; StringBuilder words = new StringBuilder(strings[0]); for (int i = 1; i < length; i++) { words.append(delimiter).append(strings[i]); } return words.toString(); } 以前的Recv.java也需要一些修改:消息内容中的每一个点都代表一秒钟的任务。它将从队列中接收消息并处理任务,文件名改为Worker.java: while (true) { QueueingConsumer.Delivery delivery = consumer.nextDelivery(); String message = new String(delivery.getBody()); System.out.println(" [x] Received '" + message + "'"); doWork(message); System.out.println(" [x] Done"); } 通过耗时来模拟任务: private static void doWork(String task) throws InterruptedException { for (char ch: task.toCharArray()) { if (ch == '.') Thread.sleep(1000); } } 同上次一样进行编译(jar文件在工作目录中): $ javac -cp rabbitmq-client.jar NewTask.java Worker.java 循环调度 任务队列的优点之一是可以并行执行任务。如果有很多任务要做,只需要简单的添加更多的工作者即可。 首先同时运行两个Work.java程序。它们从相同队列中获取消息,但具体是怎样的?我看看一下。 需要打开三个控制台,其中2个运行Work.java程序,即工作者C1和C2。 shell1$ java -cp .:commons-io-1.2.jar:commons-cli-1.1.jar:rabbitmq-client.jar Worker [*] Waiting for messages. To exit press CTRL+C shell2$ java -cp .:commons-io-1.2.jar:commons-cli-1.1.jar:rabbitmq-client.jar Worker [*] Waiting for messages. To exit press CTRL+C 第三个控制台用来发送任务。一旦工作者开始运行,生产者你可以发送多个消息给它们: shell3$ java -cp .:commons-io-1.2.jar:commons-cli-1.1.jar:rabbitmq-client.jar NewTask First message. shell3$ java -cp .:commons-io-1.2.jar:commons-cli-1.1.jar:rabbitmq-client.jar NewTask Second message.. shell3$ java -cp .:commons-io-1.2.jar:commons-cli-1.1.jar:rabbitmq-client.jar NewTask Third message... shell3$ java -cp .:commons-io-1.2.jar:commons-cli-1.1.jar:rabbitmq-client.jar NewTask Fourth message.... shell3$ java -cp .:commons-io-1.2.jar:commons-cli-1.1.jar:rabbitmq-client.jar NewTask Fifth message..... 工作者接收的消息: shell1$ java -cp .:commons-io-1.2.jar:commons-cli-1.1.jar:rabbitmq-client.jar Worker [*] Waiting for messages. To exit press CTRL+C [x] Received 'First message.' [x] Received 'Third message...' [x] Received 'Fifth message.....' java -cp .:commons-io-1.2.jar:commons-cli-1.1.jar:rabbitmq-client.jar Worker [*] Waiting for messages. To exit press CTRL+C [x] Received 'Second message..' [x] Received 'Fourth message....' 默认情况下,RabbitMQ将依次发送消息给下个工作者。每个工作者将平均接收相同数目的任务。这种分发任务的方式被称为循环调度。请尝试运行三个或更多的工作者。 消息确认 完成一个任务只需要几秒钟。当一个工作者在处理一个耗时任务,如果只完成一部分就死掉了会发生什么?在当前的代码中,一旦RabbitMQ发送了一个任务给工作者,它会立即将其从内存中移除。这种情况下,如果你杀掉了一个工作者,它正在处理的任务将丢失。同时也会丢掉这个工作者所有接收但尚未完成的任务。 但我们不想丢失任何的任务。如果工作者死掉,我们希望任务被重新发送给别的工作者。 为了确保消息从不会丢失,RabbitMQ支持消息确认。当一个工作者处理完一个任务时,就会发送一个确认信息给RabbitMQ,RabbitMQ就知道该任务处理完并删除其所占的空间。 如果一个工作者尚未发送确认信息就死掉,RabbitMQ就会值得袄为这个任务尚未处理完并会重新发送给其它工作者。这种方式你可以保定不丢失任务,即使工作者是偶然死掉的。 消息确认不存在超时问题。只有连接死掉,RabbitMQ才会把任务发送给其它工作者。即使处理一个任务会花很长很长的时间。 消息确认默认是开启的。在以前的例子中我们使用autoAck=true显式关闭了它。现在取消此标识,工作者完成任务后会发送一个正确的确认信息。 QueueingConsumer consumer = new QueueingConsumer(channel); boolean autoAck = false; channel.basicConsume("hello", autoAck, consumer); while (true) { QueueingConsumer.Delivery delivery = consumer.nextDelivery(); //... channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false); } 上述代码保证当工作者正在处理任务时,即便用CTRL+C关闭也不会丢失任务。工作者死掉后未确认的任务会被重新发送。 忘记确认 忘记使用basicAck进行消息确认是很普通的错误。它是一个很简单的错误,但是后果很严重。当客户端退出时消息将被重新发送(看起来像是随机重新发送),但当RabbitMQ无法释放掉没有确认的消息时,就会吃掉越来越多的内存。 为了调试这种类型的错误,可以使用rabbitmqctl打印messages_unacknowledged字段: $ sudo rabbitmqctl list_queues name messages_ready messages_unacknowledged Listing queues ... hello 0 0 ...done. 消息持久化 我们已经学习了怎么确保任务不会丢失,即使工作者死掉。但如果RabbitMQ服务器停掉,任务仍然会丢失。 当RabbitMQ退出或死机,它将会丢失队列和消息,除非你告诉它别这么做。确保消息不会丢失需要做两件事:标记队列持久化和消息持久化。 首先要确保RabbitMQ从不会丢失队列。为了实现这个目的,需要声明队列持久化: boolean durable = true; channel.queueDeclare("hello", durable, false, false, null); 即使这个命令本身是正确的,但在当前的配置中仍然不会起作用。因为已经定义了一个名字叫 “hello”的队列,它是非持久化的。RabbitMQ不允许使用不同的参数重新定义一个已经存在的队列,任何这么做的程序都会返回错误。但是有一个快速解决方案--使用不同的名字来声明一个队列,例如“task_queue”: boolean durable = true; channel.queueDeclare("task_queue", durable, false, false, null); 生产者和消费者的代码中都需将队列声明为持久化。 现在“task_queue”队列将不会丢失,即使RabbitMQ重启。现在需要将消息标记为持久化--设置MessageProperties(实现了BasicProperties接口) 的值为PERSISTENT_TEXT_PLAIN. import com.rabbitmq.client.MessageProperties; channel.basicPublish("", "task_queue", MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes()); 请注意消息持久化 将消息标记为持久化并不能完全确保消息不会丢失。尽管RabbitMQ会把消息保存到磁盘上,但当RabbitMQ接收了消息但尚未保存到磁盘上时仍然有一个很短的时间窗口。而且,RabbitMQ不会为每个消息都做磁盘同步处理--它也许仅被储存在内存中,并没有写到磁盘。持久化功能不算强大,但对于我们简单的任务队列来说已经足够了。如果你需要一个强大的持久化功能,你可以把发送消息的代码封装到事务中。 失败发送 你也许注意到发送机制仍然没有完全按照我们希望的去工作。例如在只有2个工作者的情况下,当所有处于奇数位置的任务都是耗时的,而处于偶数位置的任务是简单的,一个工作者将完全处于忙碌中,而另外一个将基本不做什么工作。然而,RabbitMQ完全不知道这些,仍然轮流给它们发送任务。 因为当消息进入队列时,RabbitMQ就会把它发送给工作者。它不会查看工作者是否发回确认信息。 只是盲目的发送第N个任务给第N个工作者。 通过basicQos方法将prefetchCount设置为1就可以改变这种状况。RabbitMQ将一次只发送一个任务给一个工作者。或者换句话说,工作者未返回上一个任务的确认信息之前就不给他发送新的任务。相反,它会把任务发送给空闲的工作者。 int prefetchCount = 1; channel.basicQos(prefetchCount); 请注意队列大小 当所有的工作者都处于忙状态,你的队列就会被填满。你需要留意到这种情况,此时可以增加更多的工人或者使用一些其它的策略。 组合代码 NewTask.java的最终代码: import java.io.IOException; import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.Connection; import com.rabbitmq.client.Channel; import com.rabbitmq.client.MessageProperties; public class NewTask { private static final String TASK_QUEUE_NAME = "task_queue"; public static void main(String[] argv) throws java.io.IOException { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null); String message = getMessage(argv); channel.basicPublish( "", TASK_QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes()); System.out.println(" [x] Sent '" + message + "'"); channel.close(); connection.close(); } //... } (NewTask.java 源码) Worker.java: import java.io.IOException; import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.Connection; import com.rabbitmq.client.Channel; import com.rabbitmq.client.QueueingConsumer; public class Worker { private static final String TASK_QUEUE_NAME = "task_queue"; public static void main(String[] argv) throws java.io.IOException, java.lang.InterruptedException { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null); System.out.println(" [*] Waiting for messages. To exit press CTRL+C"); channel.basicQos(1); QueueingConsumer consumer = new QueueingConsumer(channel); channel.basicConsume(TASK_QUEUE_NAME, false, consumer); while (true) { QueueingConsumer.Delivery delivery = consumer.nextDelivery(); String message = new String(delivery.getBody()); System.out.println(" [x] Received '" + message + "'"); doWork(message); System.out.println(" [x] Done" ); channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false); } } //... } (Worker.java 源码) 可以使用消息确认和prefetchCount设置一个工作队列。持久化可以使任务存活即使是RabbitMQ重启。 查看更多的Channel的方法和MessageProperties类,你可以在线浏览javadocs。 下一步学习教程3--怎么把相同的消息发送给多个消费者。 3 发布/订阅 发布和订阅 (使用Java版客户端) 上个教程中创建了一个工作队列,它假定每个任务只发送给一个工作者。这里和它完全不同--发送一个消息给多个消费者。这种模式被称为“(发布/订阅)”(publish/subscribe)。 这里通过建立一个简单的日志系统来说明这种模式。它由两个程序组成--第一个发布日志,第二个接收并打印日志。 日志系统中每个消费者都会接收消息。这种情况下,可以运行一个消费者把接收的日志保存到磁盘上,同时运行另外一个消费者将接收的日志打印到屏幕上。 实际上,日志消息将被广播给所有的消费者。 交换机 在上面的教程中,只在一个队列中接收和发送消息。现在该介绍Rabbit中完整的消息模型。 快速复习一下上面教程中的内容: · 生产者就是一个发送消息的应用程序。 · 队列就是一个储存消息的缓存器。 · 消费者就是一个接收消息的应用程序。 RabbitMQ中消息模型的核心思想就是生产者从不直接把消息发送到一个队列。事实上,在很多时候生产者甚至完全不知道一个消息是否被发送给队列。 相反,生产者只是发送消息给交换机。交换机很简单,它一边从生产者那里接收消息,另一边它将消息发送给队列。交换机必须知道怎么处理它接收的消息,消息应该被发送给指定的队列?还是应该发送给多个队列?或是被抛弃掉。交换机类型决定这些处理规则。 交换机的类型:direct,topic,headers和fanout。在这里创建一个fanout类型的交换机,取名为logs: channel.exchangeDeclare("logs", "fanout"); fanout类型交换机非常简单。从名字就可以看出来,它把消息广播给所有它知道的队列。这完全符合日志系统的需求。 列出交换机 使用rabbitmqctl命令列出服务器上的交换机: $ sudo rabbitmqctl list_exchanges Listing exchanges ... direct amq.direct direct amq.fanout fanout amq.headers headers amq.match headers amq.rabbitmq.log topic amq.rabbitmq.trace topic amq.topic topic logs fanout ...done. 列表中有一些amq.*交换机,和默认(无名)交换机。这些是被默认创建的,但此时不需要你使用它们。 无名交换机 在先前的教程中我们不知道交换机,但仍然可以发送消息到队列。这是因为使用的是默认交换机,通过空字符串来识别("")。 回想一下我们之前是怎么发送一个消息: channel.basicPublish("", "hello", null, message.getBytes()); 第一个参数就是交换机的名字。空字符串表示默认或无名交换机:消息被路由到由routingKey指定名称的队列,如果该队列存在的话。 发送消息给指定名称的交换机: channel.basicPublish( "logs", "", null, message.getBytes()); 临时队列 在上面的教程中使用的队列都有名字(回想下hello和task_queue),给队列命名很重要--根据名字可以把多个工作者指定到共同的队列。当消费者和生产者共享相同的队列时,就需要给队列命名。 但这不并适合日志系统。我们希望接收所有的日志消息,而不是它们的子集。而且希望接收最新的消息,而不是过期的。为了达到这个目的有两件事要做: 首先,无论什么时候连接到Rabbit,都需要一个新的、空的队列。为了做到这点可以创建一个使用随机名子的队列,或者更好的方法是让服务器提供一个随机的队列。 第二,一旦连接断开,队列应该自动被删除。 在Java版客户端,我们使用没有参数的queueDeclare()方法创建一个非持久化、唯一的、自动删除的队列,队列名由RabbitMQ随机生成: String queueName = channel.queueDeclare().getQueue(); 队列名是随机生成的。例如amq.gen-JzTY20BRgKO-HjmUJj0wLg。 绑定 已经创建了一个fanout类型的交换机和一个队列。现在需要告诉交换机把消息发送到指定队列。交换机和队列之间通过绑定来建立关系。 channel.queueBind(queueName, "logs", ""); 从此logs交换机将发送消息给指定的队列。 绑定列表 使用rabbitmqctl list_bindings命令列出绑定列表 组合代码 发送日志消息的生产者程序和上面教程中的看起来并没有太多差别。最重要的变化是需要发送消息给的logs交换机而不是无名的交换机。发送消息时需要提供routingKey,但是它的值被fanout类型交换机忽略了。EmitLog.java代码: import java.io.IOException; import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.Connection; import com.rabbitmq.client.Channel; public class EmitLog { private static final String EXCHANGE_NAME = "logs"; public static void main(String[] argv) throws java.io.IOException { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.exchangeDeclare(EXCHANGE_NAME, "fanout"); String message = getMessage(argv); channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes()); System.out.println(" [x] Sent '" + message + "'"); channel.close(); connection.close(); } //... } (EmitLog.java 源码) 建立连接后声明交换机。这步是必要的,因为不允许发送消息给一个不存在的交换机。 如果还没有队列绑定到交换机那么消息将会丢失,但这不是问题;如果没有消费者,这些消息会被安全地丢弃。 ReceiveLogs.java代码: import java.io.IOException; import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.Connection; import com.rabbitmq.client.Channel; import com.rabbitmq.client.QueueingConsumer; public class ReceiveLogs { private static final String EXCHANGE_NAME = "logs"; public static void main(String[] argv) throws java.io.IOException, java.lang.InterruptedException { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.exchangeDeclare(EXCHANGE_NAME, "fanout"); String queueName = channel.queueDeclare().getQueue(); channel.queueBind(queueName, EXCHANGE_NAME, ""); System.out.println(" [*] Waiting for messages. To exit press CTRL+C"); QueueingConsumer consumer = new QueueingConsumer(channel); channel.basicConsume(queueName, true, consumer); while (true) { QueueingConsumer.Delivery delivery = consumer.nextDelivery(); String message = new String(delivery.getBody()); System.out.println(" [x] Received '" + message + "'"); } } } (ReceiveLogs.java 源码) 编译代码: $ javac -cp rabbitmq-client.jar EmitLog.java ReceiveLogs.java 输出日志到文件: $ java -cp .:commons-io-1.2.jar:commons-cli-1.1.jar:rabbitmq-client.jar ReceiveLogs > logs_from_rabbit.log 在屏幕上打印日志: $ java -cp .:commons-io-1.2.jar:commons-cli-1.1.jar:rabbitmq-client.jar ReceiveLogs 发布日志: $ java -cp .:commons-io-1.2.jar:commons-cli-1.1.jar:rabbitmq-client.jar EmitLog 当想检查程序创建的绑定和队列,可使用rabbitmqctl list_bindings命令查看。当2个ReceiveLogs.java程序运行时,你应该会看到类似下面的情况: $ sudo rabbitmqctl list_bindings Listing bindings ... logs exchange amq.gen-JzTY20BRgKO-HjmUJj0wLg queue [] logs exchange amq.gen-vso0PVvyiRIL2WoV3i48Yg queue [] ...done. logs交换机把日志消息发送给这两个服务器分配的队列。 想接收消息的子集,请查看教程4。 4 路由 路由 (使用Java版客户端) 上一个教程中建立了一个简单的日志系统,日志消息会被发送给多个消费者。 这里将添加新的功能--只接收消息的子集。例如,‘error’级别的日志消息会被保存到文件中,而所有级别的日志消息会被输出到控制台。 绑定 先前的绑定代码: channel.queueBind(queueName, EXCHANGE_NAME, ""); 绑定建立了交换机和队列之间的联系。可以简单的理解为:该队列对这个交换机的消息感兴趣。 绑定可以使用额外的参数routingKey。为了避免和basic_publish参数混淆,这里称它binding key(绑定关键字)。使用绑定关键字来创建一个绑定: channel.queueBind(queueName, EXCHANGE_NAME, "black"); 绑定关键字的作用由交换机的类型来决定。之前的fanout类型的交换机只会忽略它的值。 Direct类型交换机 上个教程中的日志系统把所有的消息广播给所有的消费者。我们想扩展它的功能,根据消息的级别进行过滤。例如,只把’error’级别的消息写到磁盘,’info’,‘warning’级别的消息不写到磁盘。 fanout类型的交换机,不够灵活,只能盲目的广播。 这里将使用direct类型的交换机。direct交换的路由算法很简单--把消息发送到匹配routing key的队列中。 举例说明: 在该设置中,有两个队列绑定到direct类型的交换机“X”。第一个队列绑定关键字orange,第二个有2个绑定关键字,一个是black,另一个是green。 在这种设置下,有orange绑定关键字的消息会被交换机发送到队列Q1。拥有black或green绑定关键字的消息将被发送到Q2。其它的所有消息将被丢弃。 多个绑定 多个队列绑定相同的关键字是可以的。在上面的例子中给一给交换机X和队列Q1添加绑定关键字black。此时,direct交换机将和fanout表现的一样,广播消息到所有队列。关键字为balck的消息将被发送给Q1和Q2. 发送日志 日志系统将使用这种模式。发送消息给一个direct交换机而不是fanout。使用绑定关键字来支持日志级别。接收程序可以根据日志级别选择性地接收日志。首先关注一下发送日志。 创建一个交换机: channel.exchangeDeclare(EXCHANGE_NAME, "direct"); 发送消息: channel.basicPublish(EXCHANGE_NAME, severity, null, message.getBytes()); 为了方便,假定日志级别'severity'只是'info', 'warning', 'error'中的一种。 订阅 和上个教程中的消费者代码只有一处不同--为每一个日志级别创建一个绑定。 String queueName = channel.queueDeclare().getQueue(); for(String severity : argv){ channel.queueBind(queueName, EXCHANGE_NAME, severity); } 组合代码 EmitLogDirect.java代码: public class EmitLogDirect { private static final String EXCHANGE_NAME = "direct_logs"; public static void main(String[] argv) throws java.io.IOException { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.exchangeDeclare(EXCHANGE_NAME, "direct"); String severity = getSeverity(argv); String message = getMessage(argv); channel.basicPublish(EXCHANGE_NAME, severity, null, message.getBytes()); System.out.println(" [x] Sent '" + severity + "':'" + message + "'"); channel.close(); connection.close(); } //.. } ReceiveLogsDirect.java代码: public class ReceiveLogsDirect { private static final String EXCHANGE_NAME = "direct_logs"; public static void main(String[] argv) throws java.io.IOException, java.lang.InterruptedException { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.exchangeDeclare(EXCHANGE_NAME, "direct"); String queueName = channel.queueDeclare().getQueue(); if (argv.length < 1){ System.err.println("Usage: ReceiveLogsDirect [info] [warning] [error]"); System.exit(1); } for(String severity : argv){ channel.queueBind(queueName, EXCHANGE_NAME, severity); } System.out.println(" [*] Waiting for messages. To exit press CTRL+C"); QueueingConsumer consumer = new QueueingConsumer(channel); channel.basicConsume(queueName, true, consumer); while (true) { QueueingConsumer.Delivery delivery = consumer.nextDelivery(); String message = new String(delivery.getBody()); String routingKey = delivery.getEnvelope().getRoutingKey(); System.out.println(" [x] Received '" + routingKey + "':'" + message + "'"); } } } 把’warning’和’error’级别的日志消息保存到文件: $ java -cp $CP ReceiveLogsDirect warning error > logs_from_rabbit.log 把所有的日志消息输出到屏幕: $ java -cp $CP ReceiveLogsDirect info warning error [*] Waiting for logs. To exit press CTRL+C 发送一个'error' 级别的日志消息: $ java -cp $CP EmitLogDirect error "Run. Run. Or it will explode." [x] Sent 'error':'Run. Run. Or it will explode.' 到教程5学习如何监听基于模式的消息。 5 Topics类型交换机 Topics (使用Java版客户端) 在上一个教程中我们升级了一下日志系统,使用能够选择性接收日志的direct类型交换机代替只能广播的fanout类型交换机。 尽管使用direct类型交换机能提高系统的灵活性,但它仍有限制--它不能根据多个条件来路由消息。 在日志系统中,也许我们希望接收的日志消息不仅是根据它的级别,还根据它的来源。根据unix的syslog工具就可知道此概念,它根据日志的级别(info/warn/crit...) 和来源(auth/cron/kern...)来分发日志。 这样就可以接收来自'cron'或'kern'的严重错误级别的日志。 为了在日志系统中实现这种功能,就要用到更加复杂的topic类型交换机。 Topic交换机 发送到topic类型交换机的消息的routing_key不能随便设置--它必须是多个单词组成,用点分割。单词可以是任意的,但它们通常指定连接到该消息的某些功能。例如:“stock.usd.nyse”,“nyse.vmw”,“quick.orange.rabbit”。路由关键字可包含任意多的单词,但最高限制是255字节。 绑定关键字必须有相同的格式。topic交换机和direct交换的逻辑是相似的--拥有特定的路由关键字的消息将被发送到所有匹配关键字的队列。然而,绑定关键字有两个特殊的情况: · * (星号) 可以代替一个完整的单词. · # (井号) 可以代替零个或多个单词. 举例说明: 在这个例子中,我们将发送描述动物的消息。这些消息的路由关键字由三个单词(两个点)组成。路由关键字的 第一个单词描述速度,第二个描述颜色,第三个描述物种:“..”。 建立三个绑定: Q1绑定关键字"*.orange.*" ,Q2绑定关键字"*.*.rabbit"和"lazy.#". 这些绑定可以概括为: · Q1对所有橙色的动物感兴趣。 · Q2希望接收所有关于兔子和一切有关懒惰的动物的消息。 路由关键字为“quick.orange.rabbit”的消息将被传递到两个队列。“lazy.orange.elephant”消息也被发送到两个队列。“quick.orange.fox”消息只会到第一队列,“lazy.brown.fox”消息只会到第二个。“lazy.pink.rabbit”消息只会被发送给第二队列一次,即使它匹配两个绑定。 “quick.brown.fox”消息不匹配任何绑定,所以它会被丢弃。 如果和约定不一致,发送带有一个或四个字关键字的消息,如“orange”或“quick.orange.male.rabbit”,会发生什么事?那么,这些消息将不会匹配任何绑定,将会被丢弃。 “lazy.orange.male.rabbit”消息即使有四个词,但匹配最后一个绑定,将被发送到第二个队列。 Topic类型交换机 Topic类型交换机很强大,可以表现为其它类型的交换机: 以‘#’作为绑定关键字的队列将会接收所有的消息,和fanout一样,忽略消息的绑定关键字。 当绑定关键字中没有‘*’和‘#’时,topic类型交换机就和direct类型交换机一样。 组合代码 日志系统中将使用topic类型交换机。假定日志消息的绑定关键字只有两个单词:<来源>.<级别> 代码和上个教程中的代码基本一致。 EmitLogTopic.java代码: public class EmitLogTopic { private static final String EXCHANGE_NAME = "topic_logs"; public static void main(String[] argv) throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.exchangeDeclare(EXCHANGE_NAME, "topic"); String routingKey = getRouting(argv); String message = getMessage(argv); channel.basicPublish(EXCHANGE_NAME, routingKey, null, message.getBytes()); System.out.println(" [x] Sent '" + routingKey + "':'" + message + "'"); connection.close(); } //... } ReceiveLogsTopic.java代码: public class ReceiveLogsTopic { private static final String EXCHANGE_NAME = "topic_logs"; public static void main(String[] argv) throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.exchangeDeclare(EXCHANGE_NAME, "topic"); String queueName = channel.queueDeclare().getQueue(); if (argv.length < 1){ System.err.println("Usage: ReceiveLogsTopic [binding_key]..."); System.exit(1); } for(String bindingKey : argv){ channel.queueBind(queueName, EXCHANGE_NAME, bindingKey); } System.out.println(" [*] Waiting for messages. To exit press CTRL+C"); QueueingConsumer consumer = new QueueingConsumer(channel); channel.basicConsume(queueName, true, consumer); while (true) { QueueingConsumer.Delivery delivery = consumer.nextDelivery(); String message = new String(delivery.getBody()); String routingKey = delivery.getEnvelope().getRoutingKey(); System.out.println(" [x] Received '" + routingKey + "':'" + message + "'"); } } } 运行下面的例子,类路径设置参照教程1--在windows环境下使用%CP%。 接收所有的日志: $ java -cp $CP ReceiveLogsTopic "#" 从设备’kern’接收所有的日志: $ java -cp $CP ReceiveLogsTopic "kern.*" 只接收‘critical’级别的日志: $ java -cp $CP ReceiveLogsTopic "*.critical" 绑定更多的关键字: $ java -cp $CP ReceiveLogsTopic "kern.*" "*.critical" 发送关键字为’kern.critical’的消息: $ java -cp $CP EmitLogTopic "kern.critical" "A critical kernel error" 请注意,代码不会对绑定关键字的单词的个数有限制,你可能使用包含两个以上的单词的路由关键字。 一些玩笑: · "*"将匹配一个空路由关键字的消息吗? · "#.*"将匹配使用 ".." 作为关键字的消息吗?它会匹配只是用一个单词的消息吗? · "a.*.#" 和"a.#"的有区别吗? 接下来,在教程6中将介绍进行远程过程调用,它会发送和接收一个往返的消息。 6 远程过程调用(RPC) 远程过程调用(RPC) (使用Java版客户端) 在第二个教程中,我们学习了如何使用工作队列分发多个耗时任务给多个工人。 但是,如何运行远程计算机上的方法,并获取结果呢?当然,这是一个不同的模式。这种模式通常被称为远程过程调用或者RPC。 在本节中,使用RabbitMQ构建一个RPC系统:一个客户端和一个可扩展的RPC服务器。我们将创建一个虚拟的返回斐波那契数列的RPC服务来模拟真实的耗时工作。 客户端接口 为了说明如何使用RPC服务,我们要创建一个简单的客户端类。它将提供call方法,该方法发送一个RPC请求然后会阻塞,直到接收到答案: FibonacciRpcClient fibonacciRpc = new FibonacciRpcClient(); String result = fibonacciRpc.call("4"); System.out.println( "fib(4) is " + result); RPC注意事项 虽然RPC是一种很常见的计算模式,但经常受到批评。这是由于程序员没有意识到调用的方法是本地的还是一个耗时的RPC。这种困惑导致了一个不可预知的系统,并增加了不必要的复杂性调试。滥用RPC,不仅不会简化程序,还会产生不可维护的意大利面条似的代码。 为了记住这一点,请考虑一下的建议: · 确保明确地知道调用的方法是本地的还是远程的。 · 给你的系统建立文档。清晰地描述组件之间的依赖关系。 · 处理错误案例。比如当RPC服务器死掉很长一段时间后,客户端应该怎么做? 当有疑问时请避免使用RPC。如果可以的话,你应该使用异步管道-而不是RPC,例如阻塞时可以异步地放在下一个阶段再处理。 回调队列 在一般使用RabbitMQ做RPC很容易。客户端发送一个请求消息然后服务器回复一个响应消息。为了收到一个响应,我们需要发送一个'回调'的请求的队列地址。我们可以使用默认队列(在Java客户端除外)。让我们试试吧: callbackQueueName = channel.queueDeclare().getQueue(); BasicProperties props = new BasicProperties .Builder() .replyTo(callbackQueueName) .build(); channel.basicPublish("", "rpc_queue", props, message.getBytes()); // ... then code to read a response message from the callback_queue ... 消息属性 AMQP协议给消息定义了14个属性。大部分的属性很少使用,除了下面几个: · deliveryMode: 将消息标记为持久(值为2)或瞬态(任何其他值)。你可能记得在第二个教程中使用了这个属性。 · contentType:用来设置mime类型。例如经常使用的JSON格式数据,就需要将此属性设置为:application/json。 · replyTo: 通常用来命名一个回调队列. · correlationId: 用来关联RPC请求的响应. 我们需要导入新的依赖: import com.rabbitmq.client.AMQP.BasicProperties; Correlation Id 在上述方法中,给每一个RPC请求都创建一个回调队列。这是非常低效的,更好的方案是--每个客户端只创建一个回调队列。 这就提出了一个新的问题,在收到响应的队列中,不能明确的知道请求的响应属于谁。这时就该使用correlationId属性。在每一个请求中将它设置为一个唯一的值。然后,当我们在回调队列中收到一条消息时,就查看消息的这个属性是否和设置的一致,这样就能找到相匹配的响应。如果看到一个未知的correlationId的值,我们最好安全地丢弃这个消息--它不属于我们的请求。 你可能会问,为什么要忽略回调队列中的未知消息,而不是抛出错误而异常中断程序?这是因为服务器端可能重发送消息 。尽管可能性不大,但存在这种可能,在RPC服务器刚发送完答案但尚未接到确认消息时死掉。这种情况下,重新启动的RPC服务器将再处理这个请求,但客户端已经获取了答案并结束了RPC调用,correlationId就失效了。这就是为什么在客户端必须温和地处理重复的响应,RPC最好是幂等的。 总结 RPC工作流程: · 客户端启动时,创建了一个匿名的回调队列。 · 在一个RPC请求中,客户端发送一个消息,它有两个属性:1.REPLYTO,用来设置回调队列名;2.correlationId,对于每个请求都被设置成唯一的值。 · 请求被发送到rpc_queue队列. · RPC工作者(又名:服务器)等待接收该队列的请求。当收到一个请求,它就会处理并把结果发送给客户端,使用的队列是replyTo字段指定的。 · 客户端等待接收回调队列中的数据。当接到一个消息,它会检查它的correlationId属性。如果它和设置的相匹配,就会把响应返回给应用程序。 组合代码 斐波那契方法: private static int fib(int n) throws Exception { if (n == 0) return 0; if (n == 1) return 1; return fib(n-1) + fib(n-2); } 斐波纳契方法,假定只有有效的正整数输入。 (不要指望它处理很大的数字,否则它很可能成为是最慢的递归)。 RPC服务器的RPCServer.java的代码: private static final String RPC_QUEUE_NAME = "rpc_queue"; ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.queueDeclare(RPC_QUEUE_NAME, false, false, false, null); channel.basicQos(1); QueueingConsumer consumer = new QueueingConsumer(channel); channel.basicConsume(RPC_QUEUE_NAME, false, consumer); System.out.println(" [x] Awaiting RPC requests"); while (true) { QueueingConsumer.Delivery delivery = consumer.nextDelivery(); BasicProperties props = delivery.getProperties(); BasicProperties replyProps = new BasicProperties .Builder() .correlationId(props.getCorrelationId()) .build(); String message = new String(delivery.getBody()); int n = Integer.parseInt(message); System.out.println(" [.] fib(" + message + ")"); String response = "" + fib(n); channel.basicPublish( "", props.getReplyTo(), replyProps, response.getBytes()); channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false); } 服务器代码相当简单: · 先建立连接、通道,并声明队列。 · 可以运行多个服务器进程。通过channel.basicQos设置prefetchCount属性可将负载平均分配到多台服务器上。 · 使用basicConsume访问队列。然后在while循环中接收消息,处理消息,并发送处理结果。 客户端RPCClient.java的代码: private Connection connection; private Channel channel; private String requestQueueName = "rpc_queue"; private String replyQueueName; private QueueingConsumer consumer; public RPCClient() throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); connection = factory.newConnection(); channel = connection.createChannel(); replyQueueName = channel.queueDeclare().getQueue(); consumer = new QueueingConsumer(channel); channel.basicConsume(replyQueueName, true, consumer); } public String call(String message) throws Exception { String response = null; String corrId = java.util.UUID.randomUUID().toString(); BasicProperties props = new BasicProperties .Builder() .correlationId(corrId) .replyTo(replyQueueName) .build(); channel.basicPublish("", requestQueueName, props, message.getBytes()); while (true) { QueueingConsumer.Delivery delivery = consumer.nextDelivery(); if (delivery.getProperties().getCorrelationId().equals(corrId)) { response = new String(delivery.getBody()); break; } } return response; } public void close() throws Exception { connection.close(); } 客户端代码稍微复杂些: · 先建立一个连接和一个通道,并为回调声明一个唯一的'回调'队列。 · 注册'回调'队列,这样就可以收到RPC响应。 · 通过call 方法发送实际的RPC请求。 · 在这里,先生成一个唯一的correlationId,并将其保存,在while循环使用这个值来捕捉相匹配的响应。 · 接下来,发送请求消息,消息使用了两个属性:replyto和correlationId。 · 等待接收结果。 · while循环只是做很简单的工作,对于每一个响应的消息,它会检查它的correlationId是否是我们所要找的那个。如果是,它就保存相应的结果。 · 最终把响应返回给用户。 客户端发送请求: RPCClient fibonacciRpc = new RPCClient(); System.out.println(" [x] Requesting fib(30)"); String response = fibonacciRpc.call("30"); System.out.println(" [.] Got '" + response + "'"); fibonacciRpc.close(); 编译: $ javac -cp rabbitmq-client.jar RPCClient.java RPCServer.java 启动服务器: $ java -cp $CP RPCServer [x] Awaiting RPC requests 运行客户端: $ java -cp $CP RPCClient [x] Requesting fib(30) 这里介绍的是该设计不是实现RPC服务的唯一可能,但它有一些重要的优点: · 如果RPC服务器速度太慢,你可以通过运行多个RPC服务器。尝试在一个新的控制台上运行第二RPCServer。 · RPC客户端只发送和接收一个消息。不需要queueDeclare那样要求同步调用。因此,RPC客户端只需要在一个网络上发送和接收为一个单一的RPC请求。 代码很简单,没有解决下面这些很重要的复杂问题: · 如何没有任何服务器运行,客户端该怎么做? · 客户端是否应该有一些RPC超时处理? · 如果服务器出现故障,并抛出了一个异常,它应该被转发到客户端吗? · RPC处理之前防止无效的消息传入(如检查边界,类型),然后再处理。

下载文档到电脑,查找使用更方便

文档的实际排版效果,会与网站的显示效果略有不同!!

需要 10 金币 [ 分享文档获得金币 ] 0 人已下载

下载文档