Celery 入门

DONG Yuxuan https://www.dyx.name

07 Nov 2020 (+0800)

当Web程序中某请求耗时过长时,会显著降低程序的响应速度甚至是可用性。异步任务队列是解决此类问题的通用技术之一。

异步任务队列是指Web服务收到请求之后,将任务提交到一个队列并立即返回;后台被称为worker的一个或多个进程从队列中依次弹出任务处理;浏览器通过一个轮询URL去查询此任务的执行进度和结果。超过服务器计算能力的任务将在队列中排队等候处理。这样一来可以保证不会有用户因为服务器资源耗尽而被拒绝服务,同时也提高了所有请求的响应速度。

Celery是被广泛使用的Python任务队列库。

Celery使用broker的概念来抽象任务队列,broker是客户和worker之间通信的桥梁,Celery支持的broker有多种,比较常见的是Redis。

安装

我们首先需要安装一个broker,这里使用Redis。直接用Docker启动一个Redis实例并安装Redis的Python客户端模块以及Celery:

$ docker run -d -p 6379:6379 redis
$ pip install redis
$ pip install celery

编写Worker

我们先编写一个简单的加法任务,建立一个名为workers.py的文件并写入下述内容:

from celery import Celery

queue = Celery(__name__, broker='redis://localhost')

@queue.task
def add(x, y):
        return x + y

启动worker:

$ celery -A workers worker --loglevel=INFO

现在worker已经启动并准备好接收任务。celery命令行程序的更多参数可以通过执行celery --help命令查看,task装饰器的参数列表可以在官方文档中找到。

提交任务到Worker

使用下述代码提交一个计算5 +3的任务到之前启动的worker:

from workers import add
add.delay(3, 5)

观察worker的日志,可以发现任务被成功接收并计算。

delay方法是apply_async方法的简便写法,直接调用apply_async方法允许我们制定更多的选项:

add.apply_async((3, 5), expires=5)

上面的代码指定任务的过期时间为5秒。apply_async方法的完整可用参数列表可以在官方文档中找到。

获取任务执行状态

要查看任务的执行状态,除了broker外我们还需要设置一个backend作为任务状态的存储中介,这里我们使用和broker相同的Redis。

queue = Celery(__name__,
    backend='redis://localhost',
    broker='redis://localhost'
)

delay/apply_async方法会返回一个AsyncResult对象,我们可以通过这个对象来获取任务的执行情况。

res = add.delay(3, 5)
res.ready() # 返回结果是否就绪
res.get(timeout=1) # 阻塞等待结果,1秒超时;get方法会抛出worker抛出的异常

要在多进程或分布式的web程序中使用Celery实现异步任务,上面的方法显然是不够的,因为我们很难在不同的进程中共享AsyncResult。在通常的实现中,我们要求浏览器发送一个异步请求,服务器则回应一个请求ID供其轮询,再在轮询接口中查询任务的执行状态。

AsyncResult对象有一个id属性,我们可以使用它来作为这个任务的标志符并查询任务状态:

res = add.delay(3, 5)
id = res.id
poll_url = f'/apis/polling/{id}' # 构造轮询ID

在轮询请求的处理函数中,我们使用此ID来查询任务状态:

task = add.AsyncResult(id) # 得到与ID对应的任务的AsyncResult对象

AsyncResult对象的state属性(字符串类型)指明了任务当前的状态,以下是Celery的预设状态:

当我们启动了backend后,有两点需要特别注意:

自定义任务状态与进度

如果task装饰器的bind参数为真,那么任务对象本身会被作为第一个参数传入到我们的任务函数。

@queue.task(bind=True)
def add(self, x, y):
    print(f'Executing task: {self.request.id}')
    reutrn x + y

任务对象有一个update_state方法用来设置任务的当前状态和元数据(例如进度),状态和元数据都是任意的,这让我们可以突破预设状态的限制。

以下是来自官网的例子:

@app.task(bind=True)
def upload_files(self, filenames):
    for i, file in enumerate(filenames):
        if not self.request.called_directly:
            self.update_state(state='PROGRESS',
                meta={'current': i, 'total': len(filenames)})

此外,Miguel Grinberg的文章Using Celery With Flask是这个主题很好的参考资料。

从代码中启动Worker

我们之前一直是通过celery命令行来启动worker,但也许我们希望项目有更高的集成度,可以通过自己的脚本同时启动web服务和Celery worker。Celery实例有一个Worker类,可以用来构建启动器,在构造函数中配置选项并调用start方法启动。

worker = queue.Worker(loglevel='INFO')
worker.start()