Celery 实现分布式任务队列

g2md 9年前

Celery 简介

Celery 是 Distributed Task Queue,分布式任务队列,分布式决定了可以有多个 worker 的存在,队列表示其是异步操作,即存在一个产生任务提出需求的工头,和一群等着被分配工作的码农。

在 Python 中定义 Celery 的时候,我们要引入 Broker,中文中有中间人的意思,在这里 Broker 起到一个中间人的角色。在工头提出任务的时候,把所有的任务放到 Broker 里面,在 Broker 的另外一头,一群码农等着取出一个个任务准备着手做。

这种模式注定了整个系统会是个开环系统,工头对于码农们把任务做的怎样是不知情的。所以我们要引入 Backend 来保存每次任务的结果。这个 Backend 有点像我们的 Broker,也是存储任务的信息用的,只不过这里存的是那些任务的返回结果。我们可以选择只让错误执行的任务返回结果到 Backend,这样我们取回结果,便可以知道有多少任务执行失败了。

Show me the code

# 以下为 dispatcher.py  from worker import divide    # 1  divide.delay(1,2)  # 2  divide.apply_async((1, 2))      # 以下为 worker.py  from celery import Celery  app = Celery('tasks', backend='amqp://guest@localhost//', broker='redis://')    @app.task  def divide(x, y):      print x / y

worker.py 中新建了一个 Celery 实例,以 amqp 作为 broker,以 redis 作为 backend 储存所有 task 执行的历史记录。我们在此例中使用 RabbitMQ 作为我们的消息队列服务器。

我们一方面通过命令行中执行以下语句来启动 celery 服务。

celery -A worker worker --loglevel=info 

另外一方面,我们运行 dispatch.py,代码中将 worker 中的 divide 函数导入,再接着以两种方式将 task 启动。第一种方法中的 delay 方法接收了两个参数,实际为第二种方法的便捷调用,第二种方法在使用时,要将我们要传给 divide 的参数作为 tuple 放在第一个参数位置。

apply_async 的其他参数

apply_async 还支持其他参数,比如设置回调。

设置 task 实例的回调可以采用 link:

divide.apply_async((16, 2), link=divide.s(8))

首先计算 16 / 2,然后把结果 8 / 8,最后执行的结果等于 1. 所以这里的 link 是指向一个后继的调用函数,即完成当前 divide 以后再进行下一个 divide 操作。除了 link 之外,还有 link_error,只会在该任务执行失败时调用。在本例中,我们可以在 divide 执行失败时,执行 link_error 所指的函数,这个函数就是错误消息的处理句柄,它会接收到一个 task 的 UUID,我们可以通过 UUID 来访问出错的任务的异常状态。

# dispatcher  divide.apply_async((1, 0), link_error=error_handler.s())    # 这里我们把 1 和 0 放到了 divide 函数中执行,引发了除零异常,继而执行 link_error 对应的 error_handler,error_handler 接收到 uuid 参数,通过 AsyncResult 生产一个结果实例,我们可以用 result.state 打印出该任务的执行情况,用 result.info 来获取异常的具体信息。    # worker  @app.task  def divide(x, y):      print x/y    @app.task  def error_handler(uuid):      result = AsyncResult(uuid)      print 'task error {0}'.format(uuid)  # [2015-09-01 13:43:26,569: WARNING/Worker-2] task error 8e516377-a6c0-4a40-934f-dd1b0692c5fa      print result.state  # [2015-09-01 13:43:26,572: WARNING/Worker-2] FAILURE      print result.info  # [2015-09-01 13:43:26,572: WARNING/Worker-2] integer division or modulo by zero

跟踪异常的成因

异常的成因我们可以如上述代码所示将 result.info 打印出来而得知。然而我们并不能满足于此,仅仅知晓出错的 task 的 UUID 和其状态是不够的,我们想要知道发生错误时,task 的传入参数是什么。我一开始没有尝试出通过 UUID 来获取到原来的 1 和 0 这两个参数,后来我追踪了 apply_async 这个函数,位于 task.py 中,再跟踪到 trace.py 中的 build_tracer 函数,果然在 link_error 的调用时只传递了 UUID 一个参数,代码如下:

   def on_error(request, exc, uuid, state=FAILURE, call_errbacks=True):          if propagate:              raise          I = Info(state, exc)          R = I.handle_error_state(task, eager=eager)          if call_errbacks:              group(                  [signature(errback, app=app)                   for errback in request.errbacks or []], app=app,              ).apply_async((uuid, ))              # ).apply_async((uuid, request.args))              # 可以改成上一行注释中的代码,这样就可以在 error_handler 中得到原来调用的任务的输入参数了。          return I, R, I.state, I.retval

此处通过修改 celery 源代码来获取出错时 task 的传入参数,但是方法并不好。于是我想能不能通过 UUID 直接获取到原来的 task,然后查看 task 的 args,但是这篇文档有些晦涩难懂,我就先放弃了,便发现了以下方法。
class DebugTask(Task):      abstract = True      def on_failure(self, *args, **kwargs):          print self.request.args    @app.task(base=DebugTask)  def divide(x, y):      print x / y

这段代码将原来应该继承的 Task 类中的 on_failure 函数重写,当 divide 函数发生异常时,该 task 的 state 自动变成 failure,Task 会自动调用 on_failure 函数,从而打印出传入的 args。

任务的远程调用

关于 task 的调用,celery 还提供了另外一种 send_task 方法。

Celery 作为分布式系统,自然就支持远程 worker,这个时候我们可以利用 send_task 这个函数,以函数名的方式调用 task。代码如下:

from celery import Celery    app = Celery()  app.config_from_object('celeryconfig')  app.send_task('worker.divide', args=[1, 0])    # send_task 也支持 link_error,这个官方文档上没写详细,这里需要调用 signature 函数来生产函数的 signature,这时 divide 的 UUID 和我们通过修改源代码得到的 args。    app.send_task('worker.divide', args=[1, 0], link_error=app.signature('worker.error_handler'))

这里我们没有通过 module 的方式把 divide 函数给 import 到程序中来,也就意味着我们可以不将 worker 放在与 dispatcher 同一目录下。我们的想法是,将 worker 放在另外一台服务器上,通过 celery 调用它,本地 django 项目调用这个 dispatcher 后,将 task 发送到远程服务器的队列中,然后由远程服务器中的 worker 处理。

配置文件

此时需要注意的是,这里的 dispatcher 是通过文件的方式配置的,其配置文件应与 worker 端配置文件吻合,如下:

# celeryconfig.py  # coding=utf-8    # Broker 设置 RabbitMQ  BROKER_URL = 'amqp://guest:guest@localhost:5672//'  CELERY_RESULT_BACKEND = 'redis://'    # Tasks 位于 worker.py 中  CELERY_IMPORTS = ('worker', )    # 默认为1次/秒的任务  CELERY_ANNOTATIONS = {'worker.divide': {'rate_limit': '1/s'}}    CELERY_ROUTES = {'worker.divide': {'queue': 'divide'},                   'worker.error_handler': {'queue': 'error'}}    # 默认所有格式为 json  CELERY_TASK_SERIALIZER = 'json'  CELERY_RESULT_SERIALIZER = 'json'  CELERY_ACCEPT_CONTENT=['json']

使用了配置文件以后,我们在 worker 中也可以采用相同的方式定义 app,如下:
# coding=utf-8  from celery import Celery  app = Celery()  app.config_from_object('celeryconfig')    @app.task  def divide(x, y):      print x / y

我们在配置文件中为 worker.divide 这个 task 指定了 divide 这个队列,为 error_handler 定义了 error 这个队列用于错误处理。在启动 celery 的时候可以通过 -Q 参数指定队列。在终端中执行了以下命令后,celery 服务器就启动了,当前 celery 会监视 divide 队列,取出参数执行任务。而如果我们不启动另外一个 celery 来监视 error 队列,error_handler 就不会前往队列去拿参数执行。

celery -A worker worker --loglevel=info -Q divide

关于 Celery,网上英文教程都不多,更别说中文的了。

网上有些关于 Celery 性能的讨论,我暂且没有做分析,如果有更好的解决方案能够替代它,请留言告知。

如果发现本文有错误,请指正。



来自:http://my.oschina.net/shinedev/blog/500120