当Web程序中某请求耗时过长时,会显著降低程序的响应速度甚至是可用性。异步任务队列是解决此类问题的通用技术之一。
异步任务队列是指Web服务收到请求之后,将任务提交到一个队列并立即返回;后台被称为worker的一个或多个进程从队列中依次弹出任务处理;浏览器通过一个轮询URL去查询此任务的执行进度和结果。超过服务器计算能力的任务将在队列中排队等候处理。这样一来可以保证不会有用户因为服务器资源耗尽而被拒绝服务,同时也提高了所有请求的响应速度。
Celery是被广泛使用的Python任务队列库。
Celery使用broker的概念来抽象任务队列,broker是客户和worker之间通信的桥梁,Celery支持的broker有多种,比较常见的是Redis。
我们首先需要安装一个broker,这里使用Redis。直接用Docker启动一个Redis实例并安装Redis的Python客户端模块以及Celery:
$ docker run -d -p 6379:6379 redis
$ pip install redis
$ pip install celery
我们先编写一个简单的加法任务,建立一个名为workers.py
的文件并写入下述内容:
from celery import Celery
queue = Celery(__name__, broker='redis://localhost')
@queue.task
def add(x, y):
return x + y
启动worker:
$ celery -A workers worker --loglevel=INFO
现在worker已经启动并准备好接收任务。celery
命令行程序的更多参数可以通过执行celery --help
命令查看,task
装饰器的参数列表可以在官方文档中找到。
使用下述代码提交一个计算5 +3的任务到之前启动的worker:
from workers import add
add.delay(3, 5)
观察worker的日志,可以发现任务被成功接收并计算。
delay
方法是apply_async
方法的简便写法,直接调用apply_async
方法允许我们制定更多的选项:
add.apply_async((3, 5), expires=5)
上面的代码指定任务的过期时间为5秒。apply_async
方法的完整可用参数列表可以在官方文档中找到。
要查看任务的执行状态,除了broker外我们还需要设置一个backend作为任务状态的存储中介,这里我们使用和broker相同的Redis。
queue = Celery(__name__,
backend='redis://localhost',
broker='redis://localhost'
)
delay/apply_async
方法会返回一个AsyncResult
对象,我们可以通过这个对象来获取任务的执行情况。
res = add.delay(3, 5)
res.ready() # 返回结果是否就绪
res.get(timeout=1) # 阻塞等待结果,1秒超时;get方法会抛出worker抛出的异常
要在多进程或分布式的web程序中使用Celery实现异步任务,上面的方法显然是不够的,因为我们很难在不同的进程中共享AsyncResult
。在通常的实现中,我们要求浏览器发送一个异步请求,服务器则回应一个请求ID供其轮询,再在轮询接口中查询任务的执行状态。
AsyncResult
对象有一个id
属性,我们可以使用它来作为这个任务的标志符并查询任务状态:
res = add.delay(3, 5)
id = res.id
poll_url = f'/apis/polling/{id}' # 构造轮询ID
在轮询请求的处理函数中,我们使用此ID来查询任务状态:
task = add.AsyncResult(id) # 得到与ID对应的任务的AsyncResult对象
AsyncResult
对象的state
属性(字符串类型)指明了任务当前的状态,以下是Celery的预设状态:
PENDING
STARTED
RETRY
FAILURE
result
属性将包含worker抛出的异常SUCCESS
result
属性中REVOKED
当我们启动了backend后,有两点需要特别注意:
task
装饰器中指明ignore_result=True
,否则其结果会被存入backend占用资源。AsyncResult
的get
或者forget
方法,否则其会一直被存储在backend中,除非我们指定了backend的过期时间。backend的过期时间可以通过在Celery对象的配置中写入result_expires
参数指定,可参考官方文档。如果task
装饰器的bind
参数为真,那么任务对象本身会被作为第一个参数传入到我们的任务函数。
@queue.task(bind=True)
def add(self, x, y):
print(f'Executing task: {self.request.id}')
reutrn x + y
任务对象有一个update_state
方法用来设置任务的当前状态和元数据(例如进度),状态和元数据都是任意的,这让我们可以突破预设状态的限制。
以下是来自官网的例子:
@app.task(bind=True)
def upload_files(self, filenames):
for i, file in enumerate(filenames):
if not self.request.called_directly:
self.update_state(state='PROGRESS',
meta={'current': i, 'total': len(filenames)})
此外,Miguel Grinberg的文章Using Celery With Flask是这个主题很好的参考资料。
我们之前一直是通过celery
命令行来启动worker,但也许我们希望项目有更高的集成度,可以通过自己的脚本同时启动web服务和Celery
worker。Celery
实例有一个Worker
类,可以用来构建启动器,在构造函数中配置选项并调用start方法启动。
worker = queue.Worker(loglevel='INFO')
worker.start()