基于 RabbitMQ 构建一个类似 Resque 的作业处理系统

jopen 6年前

RabbitMQ的是一个复杂的野兽。 

它灵活,强大,但也很难完全把控和掌握。 

许多不同的使用情况和使用模式都可以建立在这个强大的软件之上,但在第一次尝试为一个特定的解决方案编写代码时,差错和设计错误也是司空见惯的事情。 

在本文中,我将讨论   Palermo 的设计和实现,它是一个实现了可以将RabbitMQ用作底层排队机制构建的那些可能的使用模式其中之一:批处理作业处理系统。 

通过批处理作业处理系统,我们引用了一种机制来由 作业程序从不同队列中提取作业的自动执行。客户可以排入新的作业到这些队列,它们最终将被传递给将执行作业的程序。如果在执行一个任务失败了,作业程序线 程将会把失败的作业放入一个特殊的队列,它可以重新执行,或者检查出来进行错误调试。

创建Palermo的灵感主要来自于Resque ,它是一个作业处理系统,由Github使用Ruby和Redis创建.

对于系统的使用者,Resque让这些变成可能:定义拥有不同名字的队列;作业排队时,在系统中通过某个输入参数匹配Ruby类名;在不同的机器中 启动工作者进程--它们将处理作业,实例化Ruby类,并使用提供的输入参数来执行作业.如果作业执行失败,作业将被路由到一个特殊的"失败"作业队列. 在这个队列中的作业会重新执行或被删除.Resque工作者和底层的操作系统很好的集成在一起,它可以像系统操作者控制作业执行的方式,来处理即将到来的 信号.它也提供了一个web接口,可以监控作业和工作者的状态,同时也可以执行某种动作,像作业的重新排队或队列中作业的清理.

从一个开发者的角度来讲,Resque的主要优势是整个系统的使用超级简单.定义和排队作业只需要几行Ruby代码即可,同时系统以一种持续的和鲁棒的方式运行.

Palermo的目标是针对JVM语言,创建一个简单易用的作业处理系统,像Resque一样的健壮,并使用RabbitMQ作为底层的队列技术而不是Redis.

从一个正式的角度来讲,一个作业处理系统的队列技术可以定义为一个元组空间(tuple space),一种连续关联的内存,一些进程排队作业使用如下的方式编写元组:

write(queue_name, job_type, input_argument)

工作者进程从内存删除元组,一次一个,使用谓词匹配队列名:

read(queue_name, ?, ?)

一个仅有的,必须以元组空间,加入到作业处理队列技术模型中的限制是,来自分布式内存中的读函数必须遵从先进先出(FIFO)的语 义.RabbitMQ队列可以看作是这种类型的,包含FIFO语义的内存.队列名作为第一个参数传递到"读"谓词,用来提取存储在内存中的下一个匹配的元 组.

基于 RabbitMQ 构建一个类似 Resque 的作业处理系统

为了获取想要读取的元组空间语义,我们需要处理RabbitMQ内部的一些设置,并配置如下的选项:

- 队列的持久性
– 消息的持久性
– 服务的质量
– 消息确认

由于要使用可持续的内存,我们需要为RabbitMQ所管理的队列和消息增加可持续性.为了达到这个效果,在RabbitMQ之中,队列需要声明 为"durable",消息需要声明为"persistent".通过这种方式,即便RabbitMQ broker崩溃了,对于已经创建的队列信息和未决的消息将在重启时恢复.

当超过一个工作者连接到同一个队列时,RabbitMQ会使用round robin的方式,在所有的可用工作者中分发消息.这里主要的问题在于,只要消息到达队列,RabbitMQ就将发送一个消息到下一个工作者,而不管这个 工作者是否正在处理一个不同的消息.如果一个作业需要很长的时间来处理,到来的消息将堆叠在这个繁忙工作者的本地缓存中,而其他的工作者将处于闲置状态. 我们可以使用RabbitMQ的两个特性来处理这种情况:消息确认和服务质量/预取数量.

首先,工作者采用显式确认的方式处理完消息后可以通知RabbitMQ.消息只有确认后才会从队列删除.如果工作者停止运行而没有确认消息,RabbitMQ会自动将消息重新排队.

同时,服务质量(qos)配置可以告知RabbitMQ,发送给特定工作者的最大未确认消息数.如果设置了这个值,即从预取数量到1,只有最后一条消息被工作者进程确认,其他消息才能从队列发送出去.
通过这种方式,可以实现在工作者之间合理的分配作业.

基于 RabbitMQ 构建一个类似 Resque 的作业处理系统

我们仍然需要找到一种方式实现写内存的操 作.RabbitMQ不同于其他队列排队系统的一个重要的方面,是强烈的订阅者和消费者分离的考量.在RabbitMQ体系结构中,这点是通过队列和订阅 者交换的介绍来实现的.订阅者不知道消息的最终目的地,他们只知道一个交换的名字和路由关键字,这个关键字可以看作是发送消息的地址.

针对不同的交换类型,RabbitMQ支持不同的语义,这将影响一个带有地址的消息匹配一个交换的方式.它将传递到实际的信箱(队列),消费者将从这里接收消息.

在我们的例子中,写方法的第一个参数是队列名称。 队列名称也是工作者预测下一个内存中待处理的任务时使用的参数。

Palermo中该方法下一个参数是一种直接交换类型。 在这种交换类型中,路由关键字必须和传输消息的队列名称匹配。

如果一条消息被发往 RabbitMQ 进行交换,并且没有队列连接到交换,那么这条消息会被丢弃或者被发往相关的备用交换。 为了避免这种情况, Palermo中 一个发布者每次把一个新的任务加入到队列中时,在发送任务的数据之前,发布者会声明匹配路由关键字的队列。 通过这种方式,我们可以确定当没有工作者在等待处理任务时消息也不会被丢弃。 在 RabbitMQ中,使用相同的参数重新声明一个已存在的队列是一个合法的操作,且没有任何影响。

使用之前为RabbitMQ所描述的设置,我们可以建立工作处理系统的核心。然而,某些特性,像失败事务的管理或者消息的序列化,它们不能被编址放到RabbitMQ的特性中。在Palermo中,这个特性已经被实现,并被作为一个附加的应用软件层,这个层可以被分配作为一个Java库。

首要的问题是如何处理失败的事务。我们来看它是怎么(工作的),过去我们使用显式的人工确认,在人工处理或超时时,RabbitMQ将会处理致命的 错误。这套机制也可以用于处理在事务处理逻辑中(产生的)任何类型的异常条件,除了我们期望的功能需求外,失败信息必定会发送给一个特别的失败事务队列, 他们可以被查阅,移除和重入队。这个功能已经在Palermo中被完成,被包装为一个通用的Java事务用来处理try/catch块,并通过管道将失败信息添加到队列中,并添加一些关于事务的错误信息,(例如)重试次数和在消息元数据的头中的原始队列一并发送消息给RabbitMQ。

序列化的问题,已经通过定义一个作业消息来陈述了,这个消息包含作业Java类的头信息,作业序列化参数和序列化类型.有关参数形象化的一小部分内容,通过一个人眼可识别的字符串,也随消息一起发送了. Palermo已经通过支持插件和去插件的方式,对系统进行不同的序列化,同时,包含一个基于JSON的序列化转换器.但是,默认的序列化技术是使用JBoss Serialization.只有作业的参数被序列化,并发送给工作者,作业类的字节码必须在工作者的class path中可用,以便正确执行作业.
Palermo工作者只是执行实际作业逻辑的一种通用方式,其中作业逻辑是封装在作业类的定义之中.

Palermo的整个逻辑已经在Clojure编程语言中实现,但是,多亏Clojure Java inter-op特性,它可以在java代码或其他任意基于JVM的语言中使用.由于Palermo工作者线程在JVM中运行,它们从底层的操作系统中被隔离出来.像Resque工作者那样,集成在OS之中,很难实现,但是某种程度的集成,在可能的情况下,已经尝试过,如,为关联到Palermo工作者的RabbitMQ消费者的身份标识,使用进程标识符.一个命令行的接口也已经实现了,所以,新的工作者,通过脚本和使用者,可以很简单的启动.

解决方案的最后一个组件是Palermo 系统中运行的一个web接口。 这只是一个使用了Palermo 类库自检特性的简单的web应用,它让人们更容易理解Palermo 系统的运行机制。 这个接口是Resque web接口的复制,在管理交换、队列、工作者方面,它可以作为RabbitMQ 通用接口的替代者。

通过Palermo 类库,web接口提供的所有功能可以在任意java代码中进行使用。

基于 RabbitMQ 构建一个类似 Resque 的作业处理系统

基于 RabbitMQ 构建一个类似 Resque 的作业处理系统

基于 RabbitMQ 构建一个类似 Resque 的作业处理系统

考虑到所有之前我们能想到的, Palermo 只是薄薄的一层,它建立在一个特定的 RabbitMQ设置的顶部,并被封装成一个可重用的作业处理使用模式的实现类库。 相同的方法可能适用于不同的使用模式,这些模式使用 RabbitMQ作为底层引擎,这样可以在写入非特定应用程序的代码、处理设置和由 RabbitMQ特定配置而引入其余的复杂性方面节省时间。