Celery使用心得

  • [**Celery架构**](#intro)
  • [**Celery使用**](#use)
  • [**错误处理机制**](#exception)
  • [**重试retry**](#retry)
  • [**任务回调**](#task-callback)
  • [**使用范围**](#use-range)
  • [**动态加入队列**](#dynamic-queue)
  • [**优先队列**](#priority-queue)
  • [**分布式使用**](#distribute)
  • [**工作流**](#workflow)

Celery架构

|-------------|       |-------------|
        |    Task     |       |    Celery   |
        |  Producer   |       |     Beat    |
        |-------------|       |-------------|
     celery任务序列化↘︎             ↙︎ celery任务序列化
                |--------------------|
                |   Message Broker   |
                |     |----------|   |
                |     | Exchange |   |
                |     |----------|   |
                |          ⬇︎        |
                |  |---------------| |
                |  | Message Queue | |
                |  |---------------| |
                |--------------------|
 celery反序列化 ↙︎         ⬇︎ ...      ↘︎ ...
    |----------|    |----------|    |----------|
    |  Celery  |    |  Celery  |    |  Celery  |
    |  Worker  |    |  Worker  |    |  Worker  |
    |----------|    |----------|    |----------|
celery结果序列化 ↘︎        ⬇︎ ...     ↙︎ ...
                |--------------------|
                |       Result       |
                |       Backend      |
                |--------------------|

Task Producer: 发布者发布任务(手动跑脚本)或者 Web应用要处理的请求任务 等等

Celery Beat: Celery自带的定时任务方式,需要跑一个celery beat进程,定时任务支持crontab方式和相隔一定时间的方式

Message Broker: 一个消息中间件,主要功能是消息的路由(Routing)和缓存(Buffering)。接受任务消息,放进队列,然后按照顺序分给 Worker 执行任务,缓存消息的组件官方推荐RabbitMQ, Redis

Celery Worker: 执行任务的,图中3个 Worker 可以认为是在不同的机器上面的运行的Worker实例

Result Backend: 用于存放任务执行结果的东西

Celery序列化:json, msgpack, yaml, pickle 任务消息传进消息代理的时候或者存储结果的时候,需要对消息进行序列化,可以选择上面4种方式,据说3.2版本之后,pickle方式已不再支持,详细查看文档

下面是查看Redis里面的两种序列化之后的任务结果

# json
175game:6379[3]> get celery-task-meta-b293c6b8-abe8-4b22-a809-c532487d47e2
"{\"status\": \"SUCCESS\", \"traceback\": null, \"result\": 11, \"children\": []}"

# msgpack
175game:6379[3]> get celery-task-meta-71d1ae72-bb86-4c90-aab6-dd014b53b569
"\x84\xc4\x06status\xc4\aSUCCESS\xc4\ttraceback\xc0\xc4\x06result\x15\xc4\bchildren\x90"

Exchange: 是 Broker 的一部分,负责接受任务消息,然后根据不同的路由算法分配到不同 Message Queue,具体有4种路由方式:Direct(default)、topic、fanout、headers,详细参考文档

Message Queue: 消息队列,定义不同的消息队列,负责向 Worker 发送任务


Celery使用

一般的目录结构:

proj/tasks.py  # 定义任务,以及生产者
     celeryconfig.py  # config file,其中配置的引入方式,参见官方文档
     consumer.py  # 消费者,就是定义worker

官方文档有一个add任务的代码,这里不贴例子出来了

其中启动worker也很容易: celery -A proj.consumer worker -l info, 也可以使用 python -m proj.consumer ... 方式来跑


错误处理

官方错误文档

看了下,这部分没什么特别的,其中一个地方,在定义task的时候,加入throws参数(例子),这个会在任务处理过程中捕捉到对应参数就直接抛出,而不会把对应的错误代码日志信息都打印打日志文件

@app.task(throws=(TimeLimitExceeded,))
def add(x, y):
    return x + y

the different usage of throw arg:

[2016-10-17 10:59:04,112: INFO/MainProcess] Received task: proj.tasks.add[3edd4386-1b69-42aa-9ac7-47218667616e]
[2016-10-17 10:59:14,124: ERROR/MainProcess] Task proj.tasks.add[3edd4386-1b69-42aa-9ac7-47218667616e] raised unexpected: TimeLimitExceeded(10,)
Traceback (most recent call last):
  File "/Users/lijunjie/test/env_test/lib/python2.7/site-packages/billiard/pool.py", line 645, in on_hard_timeout
    raise TimeLimitExceeded(job._timeout)
TimeLimitExceeded: TimeLimitExceeded(10,)
[2016-10-17 10:59:14,125: ERROR/MainProcess] Hard time limit (10s) exceeded for proj.tasks.add[3edd4386-1b69-42aa-9ac7-47218667616e]
[2016-10-17 10:59:14,250: ERROR/MainProcess] Process 'Worker-3' pid:35925 exited with 'signal 9 (SIGKILL)'
# when use throw arg
[2016-10-17 11:06:22,019: INFO/MainProcess] Task proj.tasks.add[e66bda73-2bac-4e47-9b64-b4b3637767b0] raised expected: TimeLimitExceeded(10,)
[2016-10-17 11:06:22,019: ERROR/MainProcess] Hard time limit (10s) exceeded for proj.tasks.add[e66bda73-2bac-4e47-9b64-b4b3637767b0]
[2016-10-17 11:06:22,142: ERROR/MainProcess] Process 'Worker-3' pid:35965 exited with 'signal 9 (SIGKILL)'

重试

@app.task(bind=True)
def div(self, x, y):
    try:
        result = x / y
    except ZeroDivisionError as e:
        raise self.retry(args=(5, 2), exc=e, countdown=5, max_retries=3)
    return result

这里一个要说明的,就是bind=True,当这个参数为True的时候,对应的方法要加入一个self参数,这个对应当前的task对象,可以调用celery的task方法 文档

这里为什么要使用将self.retry的执行结果抛出来呢?因为retry方法一个参数throw默认值为True,表示其执行结果总是raise一个 celery.exceptions.Retry 错误信息,告诉worker这个扔去重试了,对应任务的状态就为RETRY

如果参数throw设置为False, 对应任务的结果直接设为FAILURE,否则为SUCCESS

这个retry方法的参数以及解释,一言不合,直接上文档


任务回调

我打算在任务完成(成功or失败)后调用某个连接,来告诉别人,任务已经完成了,可以定义一个abstract-task

class DebugTask(Task):
    abstract = True  # 必须True,因为这个只用于定义任务的时候使用的base task

    def on_success(self, retval, task_id, args, kwargs):
        # 可以在这里调用连接,发送http请求告诉别人已经成功了,可以过来拿结果了...
        print retval, task_id, args, kwargs

    def on_failure(self, exc, task_id, args, kwargs, einfo):
        pass

    def on_retry(self, exc, task_id, args, kwargs, einfo):
        pass

    def after_return(self, status, retval, task_id, args, kwargs, einfo):
        pass

    # ...

详细参考文档


使用范围

> 使用Celery的常见场景如下: >

  1. Web应用。当用户触发的一个操作需要较长时间才能执行完成时,可以把它作为任务交给Celery去异步执行,执行完再返回给用户。这段时间用户不需要等待,提高了网站的整体吞吐量和响应时间。 >
  2. 定时任务。生产环境经常会跑一些定时任务。假如你有上千台的服务器、上千种任务,定时任务的管理很困难,Celery可以帮助我们快速在不同的机器设定不同种任务。 >
  3. 同步完成的附加工作都可以异步完成。比如发送短信/邮件、推送消息、清理/设置缓存等。 > 作者:董伟明

动态加入队列

使用apply_async方法,设置对应的queuerouting_key值,就可以将任务分配对应的队列。我测试发现,从队列拿出任务给worker的过程中,一个worker进程一次大概接受着5个进程,其中standby的有4个,例如我for循环扔20个任务给队列A,再扔一个给队列B,队列B的那个任务会大概在10个队列A的任务完成之后再执行(理想情况)。 这个任务的执行先后还跟权重有关,如果权重级别为9的,就算是别的队列,一样放到最后执行。

日志如下(我这里是开了两个worker进程,结果为99的那个是队列B的任务):

[2016-10-19 15:39:18,435: INFO/MainProcess] Received task: projq.tasks.add[e2473fd1-1bef-4b00-b484-bf4b69bca994]
[2016-10-19 15:39:18,446: INFO/MainProcess] Received task: projq.tasks.add[ee507b13-0d01-462b-89cd-f778521fc0ce]
[2016-10-19 15:39:18,458: INFO/MainProcess] Received task: projq.tasks.add[c7fe5a6d-4b38-4bcb-bea8-922b4aaf6782]
[2016-10-19 15:39:18,464: INFO/MainProcess] Received task: projq.tasks.add[60bb7880-5e1b-40c6-bad4-e1bc0bc3a2f8]
[2016-10-19 15:39:18,471: INFO/MainProcess] Received task: projq.tasks.add[a53c2a2d-6722-4b75-8985-ba7ff153abae]
[2016-10-19 15:39:18,478: INFO/MainProcess] Received task: projq.tasks.add[7571d366-be1e-4be7-a925-8179fa98a3b5]
[2016-10-19 15:39:18,484: INFO/MainProcess] Received task: projq.tasks.add[af7c2ab3-8945-478e-bcb2-b85e658124d5]
[2016-10-19 15:39:18,493: INFO/MainProcess] Received task: projq.tasks.add[bfd0daca-b436-46a3-8c39-cf82a15f3a18]
[2016-10-19 15:39:18,504: INFO/MainProcess] Received task: projq.tasks.add[6abfaf5d-082b-46e3-bfda-cfb071aee473]
[2016-10-19 15:39:18,513: INFO/MainProcess] Received task: projq.tasks.add[bf4020e2-f047-495a-9180-d431d3c1bfcb]

[2016-10-19 15:39:21,478: WARNING/Worker-2] 10
[2016-10-19 15:39:21,479: WARNING/Worker-1] 11
[2016-10-19 15:39:21,481: INFO/MainProcess] Task projq.tasks.add[e2473fd1-1bef-4b00-b484-bf4b69bca994] succeeded in 3.04420606297s: 10
[2016-10-19 15:39:21,482: INFO/MainProcess] Task projq.tasks.add[ee507b13-0d01-462b-89cd-f778521fc0ce] succeeded in 3.033827885s: 11
[2016-10-19 15:39:21,504: INFO/MainProcess] Received task: projq.tasks.minus[8aa03cf1-6e96-4faa-b052-da5909e13a6c]
[2016-10-19 15:39:21,555: INFO/MainProcess] Received task: projq.tasks.add[9fb03ba3-a3d9-443e-b85e-3ca25a026d8a]
[2016-10-19 15:39:24,488: WARNING/Worker-2] 12
[2016-10-19 15:39:24,488: WARNING/Worker-1] 13
[2016-10-19 15:39:24,489: INFO/MainProcess] Task projq.tasks.add[c7fe5a6d-4b38-4bcb-bea8-922b4aaf6782] succeeded in 3.006375317s: 12
[2016-10-19 15:39:24,490: INFO/MainProcess] Task projq.tasks.add[60bb7880-5e1b-40c6-bad4-e1bc0bc3a2f8] succeeded in 3.00720034697s: 13
[2016-10-19 15:39:24,504: INFO/MainProcess] Received task: projq.tasks.add[ab5c6552-fb30-46b9-82cf-71b60c0337ea]
[2016-10-19 15:39:24,512: INFO/MainProcess] Received task: projq.tasks.add[d7ca7dba-d4b0-42c6-95c1-2e7aef37b817]
[2016-10-19 15:39:27,498: WARNING/Worker-2] 14
[2016-10-19 15:39:27,498: WARNING/Worker-1] 15
[2016-10-19 15:39:27,500: INFO/MainProcess] Task projq.tasks.add[a53c2a2d-6722-4b75-8985-ba7ff153abae] succeeded in 3.008687375s: 14
[2016-10-19 15:39:27,500: INFO/MainProcess] Task projq.tasks.add[7571d366-be1e-4be7-a925-8179fa98a3b5] succeeded in 3.00934139197s: 15
[2016-10-19 15:39:27,518: INFO/MainProcess] Received task: projq.tasks.add[66cf59a8-bd80-4772-afc2-92508a5e5de7]
[2016-10-19 15:39:27,532: INFO/MainProcess] Received task: projq.tasks.add[7b0bce58-b6da-454b-8b0a-c5b6f12d6229]
[2016-10-19 15:39:30,516: WARNING/Worker-1] 17
[2016-10-19 15:39:30,516: WARNING/Worker-2] 16
[2016-10-19 15:39:30,518: INFO/MainProcess] Task projq.tasks.add[af7c2ab3-8945-478e-bcb2-b85e658124d5] succeeded in 3.01642756799s: 16
[2016-10-19 15:39:30,518: INFO/MainProcess] Task projq.tasks.add[bfd0daca-b436-46a3-8c39-cf82a15f3a18] succeeded in 3.01718715997s: 17
[2016-10-19 15:39:30,536: INFO/MainProcess] Received task: projq.tasks.add[f6e248c6-7c3b-4b65-9076-68d3fecba3d1]
[2016-10-19 15:39:30,543: INFO/MainProcess] Received task: projq.tasks.add[50abc41e-e1bc-4fb8-a88d-fe7cf9cd64c8]
[2016-10-19 15:39:33,534: WARNING/Worker-2] 18
[2016-10-19 15:39:33,534: WARNING/Worker-1] 19
[2016-10-19 15:39:33,535: INFO/MainProcess] Task projq.tasks.add[6abfaf5d-082b-46e3-bfda-cfb071aee473] succeeded in 3.01599604799s: 18
[2016-10-19 15:39:33,536: INFO/MainProcess] Task projq.tasks.add[bf4020e2-f047-495a-9180-d431d3c1bfcb] succeeded in 3.01692276s: 19
[2016-10-19 15:39:33,567: INFO/MainProcess] Received task: projq.tasks.add[a85c7c56-a466-46fa-ad05-c12f5f672f16]
[2016-10-19 15:39:33,592: INFO/MainProcess] Received task: projq.tasks.add[677daa6f-6484-4633-925f-b5c3dab0bb7d]
[2016-10-19 15:39:36,540: WARNING/Worker-2] 99
[2016-10-19 15:39:36,541: INFO/MainProcess] Task projq.tasks.minus[8aa03cf1-6e96-4faa-b052-da5909e13a6c] succeeded in 3.00419641705s: 99
[2016-10-19 15:39:36,543: WARNING/Worker-1] 20
[2016-10-19 15:39:36,546: INFO/MainProcess] Task projq.tasks.add[9fb03ba3-a3d9-443e-b85e-3ca25a026d8a] succeeded in 3.00860996899s: 20
[2016-10-19 15:39:36,558: INFO/MainProcess] Received task: projq.tasks.add[6f428a31-65fa-42d8-91dc-5886c3c67a41]
[2016-10-19 15:39:39,558: WARNING/Worker-2] 21
[2016-10-19 15:39:39,559: INFO/MainProcess] Task projq.tasks.add[ab5c6552-fb30-46b9-82cf-71b60c0337ea] succeeded in 3.01721997s: 21
[2016-10-19 15:39:39,600: WARNING/Worker-1] 22
[2016-10-19 15:39:39,601: INFO/MainProcess] Task projq.tasks.add[d7ca7dba-d4b0-42c6-95c1-2e7aef37b817] succeeded in 3.05430756399s: 22
[2016-10-19 15:39:42,564: WARNING/Worker-2] 23
[2016-10-19 15:39:42,565: INFO/MainProcess] Task projq.tasks.add[66cf59a8-bd80-4772-afc2-92508a5e5de7] succeeded in 3.00488142198s: 23
[2016-10-19 15:39:42,630: WARNING/Worker-1] 24
[2016-10-19 15:39:42,634: INFO/MainProcess] Task projq.tasks.add[7b0bce58-b6da-454b-8b0a-c5b6f12d6229] succeeded in 3.03226235998s: 24
[2016-10-19 15:39:45,569: WARNING/Worker-2] 25
[2016-10-19 15:39:45,571: INFO/MainProcess] Task projq.tasks.add[f6e248c6-7c3b-4b65-9076-68d3fecba3d1] succeeded in 3.00426965201s: 25
[2016-10-19 15:39:45,638: WARNING/Worker-1] 26
[2016-10-19 15:39:45,639: INFO/MainProcess] Task projq.tasks.add[50abc41e-e1bc-4fb8-a88d-fe7cf9cd64c8] succeeded in 3.00417465699s: 26
[2016-10-19 15:39:48,581: WARNING/Worker-2] 27
[2016-10-19 15:39:48,583: INFO/MainProcess] Task projq.tasks.add[a85c7c56-a466-46fa-ad05-c12f5f672f16] succeeded in 3.01057312201s: 27
[2016-10-19 15:39:48,643: WARNING/Worker-1] 28
[2016-10-19 15:39:48,644: INFO/MainProcess] Task projq.tasks.add[677daa6f-6484-4633-925f-b5c3dab0bb7d] succeeded in 3.004215625s: 28
[2016-10-19 15:39:51,586: WARNING/Worker-2] 29
[2016-10-19 15:39:51,587: INFO/MainProcess] Task projq.tasks.add[6f428a31-65fa-42d8-91dc-5886c3c67a41] succeeded in 3.00338450499s: 29

优先队列

看了下文档,好像没有优先队列这个概念,但是可以设置权重,使用Task.apply_async方法里面的priority参数,(参数值范围为0-9,其中0的优先级最高),这个优先顺序还跟添加任务的顺序和权重的梯度有关系。例如权重是2,4,8,8,2的5个任务,有可能执行完第一个,就执行第二个,再到第五个任务。所以要注意添加的权重值的梯度。

下面就是我的测试例子:

# 加入对应的权重值
def execute_add():
    priority = [9, 6, 0, 4, 2, 6, 4, 8, 4, 0]
    for i in range(10):
        print "%s + 10 = %s, and priority is %s" % (i, i+10, priority[i])
        add.apply_async(args=(i, 10), priority=priority[i])

执行的结果:

[2016-10-18 16:10:29,628: INFO/MainProcess] Received task: proj.tasks.add[384d7442-51f8-42c0-98d7-247af384316a]
[2016-10-18 16:10:29,649: INFO/MainProcess] Received task: proj.tasks.add[009907dc-c110-43be-87a1-09c63a308c03]
[2016-10-18 16:10:29,676: INFO/MainProcess] Received task: proj.tasks.add[5d2d19c9-da39-4510-9971-6fcfeba4bef4]
[2016-10-18 16:10:29,683: INFO/MainProcess] Received task: proj.tasks.add[c71e0509-6e9b-48ad-8922-37bcb00933d5]
[2016-10-18 16:10:29,689: INFO/MainProcess] Received task: proj.tasks.add[d2d342e3-b080-4ec8-8cb9-2bd47649a12d]
[2016-10-18 16:10:39,659: WARNING/Worker-1] 12
[2016-10-18 16:10:39,661: INFO/MainProcess] Task proj.tasks.add[384d7442-51f8-42c0-98d7-247af384316a] succeeded in 10.031274941s: 12
[2016-10-18 16:10:39,681: INFO/MainProcess] Received task: proj.tasks.add[b1df0276-b9f9-4f59-a1fc-804fd54c8f08]
[2016-10-18 16:10:49,777: WARNING/Worker-1] 14
[2016-10-18 16:10:49,779: INFO/MainProcess] Task proj.tasks.add[009907dc-c110-43be-87a1-09c63a308c03] succeeded in 10.116225825s: 14
[2016-10-18 16:10:49,821: INFO/MainProcess] Received task: proj.tasks.add[a8d984ed-f84c-48be-9181-b92c5e110323]
[2016-10-18 16:10:59,784: WARNING/Worker-1] 19
[2016-10-18 16:10:59,785: INFO/MainProcess] Task proj.tasks.add[5d2d19c9-da39-4510-9971-6fcfeba4bef4] succeeded in 10.005275797s: 19
[2016-10-18 16:10:59,806: INFO/MainProcess] Received task: proj.tasks.add[c5adcf06-3038-4338-a4c7-1f5f5fab21a6]
[2016-10-18 16:11:09,797: WARNING/Worker-1] 13
[2016-10-18 16:11:09,798: INFO/MainProcess] Task proj.tasks.add[c71e0509-6e9b-48ad-8922-37bcb00933d5] succeeded in 10.011900687s: 13
[2016-10-18 16:11:09,812: INFO/MainProcess] Received task: proj.tasks.add[538c54d6-e110-4a24-85e8-4e061b4b1c08]
[2016-10-18 16:11:19,808: WARNING/Worker-1] 16
[2016-10-18 16:11:19,809: INFO/MainProcess] Task proj.tasks.add[d2d342e3-b080-4ec8-8cb9-2bd47649a12d] succeeded in 10.010052042s: 16
[2016-10-18 16:11:19,819: INFO/MainProcess] Received task: proj.tasks.add[b41299fc-c56b-4c25-a94c-d11dadcd0417]
[2016-10-18 16:11:32,999: WARNING/Worker-1] 18
[2016-10-18 16:11:33,000: INFO/MainProcess] Task proj.tasks.add[b1df0276-b9f9-4f59-a1fc-804fd54c8f08] succeeded in 13.189561757s: 18
[2016-10-18 16:11:43,006: WARNING/Worker-1] 11
[2016-10-18 16:11:43,007: INFO/MainProcess] Task proj.tasks.add[a8d984ed-f84c-48be-9181-b92c5e110323] succeeded in 10.005693113s: 11
[2016-10-18 16:11:53,013: WARNING/Worker-1] 15
[2016-10-18 16:11:53,015: INFO/MainProcess] Task proj.tasks.add[c5adcf06-3038-4338-a4c7-1f5f5fab21a6] succeeded in 10.006416848s: 15
[2016-10-18 16:12:03,019: WARNING/Worker-1] 17
[2016-10-18 16:12:03,020: INFO/MainProcess] Task proj.tasks.add[538c54d6-e110-4a24-85e8-4e061b4b1c08] succeeded in 10.004479535s: 17
[2016-10-18 16:12:13,027: WARNING/Worker-1] 10
[2016-10-18 16:12:13,029: INFO/MainProcess] Task proj.tasks.add[b41299fc-c56b-4c25-a94c-d11dadcd0417] succeeded in 10.007206134s: 10

分布式使用

在不同的机器上运行对应的celery worker就可以了,这里不用考虑负载均衡什么的,因为这只是几个执行任务的进程,主要奋发的还是broker...

参考别人的吧


总结:

要想任务某个任务执行得快一点,要考虑下面3个方面:

  1. 权重数值必须要低(就是权重大,0-9,0为最高)
  2. 另外的一个队列
  3. 当前准备扔进worker执行的standby的任务的数目

当然也可以另外跑几个worker来接受特定队列的任务


工作流

这里有一个signature的概念,这里可以理解为子任务,子任务怎么理解?可以简单理解为:例如你执行连串任务的时候,后面那些任务就是前一个任务的子任务,执行一连串任务就是这个工作流了。其中子任务最大的特点就是父任务的结果可以直接作为子任务的参数

Signature: Describes the arguments and execution options for a single task invocation.

有几种工作流的方法:

  1. group: 并行添加任务,而且结果互不影响,结果以list的形势返回
  2. chain: 一个执行完再到下一个,使用callback的形式告诉下一个任务,以及前一个任务的结果作为于下一个任务的参数
  3. chord: 在group的基础之上加一个callback,将group的执行结果,作为一个子任务的参数,得到最后的结果
  4. map: 参数为一个list,把list里面的每个元素作为参数,执行一次任务,结果返回一个list,每个元素是参数元素的子任务的结果
  5. starmap: 和map一样,但是参数list里面的元素以 *args 的形式,例如: [(1, ), (2, )]
  6. chunks: 可以说是group的升级版,可以将多个任务分配成更高的单位,例如1000个任务, 10个为一个单位chunk,那么相当于执行100个chunk就完成任务了。

具体的例子,文档也说的很清晰,我这里就不弄出来了

文档 is here


参考

注:顺序不分先后