Celery 使用入门
Celery是一个用 Python 实现的任务队列框架(Task Queue),是一种在线程或机器间分发任务的机制。
一共有五部分组成,Beat、Broker、Worker、Task、Backend
- Task:任务
- Beat:定时触发
- Broker: 接收消息,将消息放入队列
- Worker:持续监听队列,接收消息后执行特定的任务
- Backend:存储结果
我们平时经常也会做类似的工作,比如发邮件、发短信等用异步队列来做,把消息打入队列,然后由一个 Worker 来执行。Celery 是这个工作的集大成者。它在之上提供了以下几个功能:
- 高可用性,与 Broker 的连接进行自动重连
- 模块化,可随时替代连接池、 序列化、压缩模式、日志、调度器、消费者、生产者、自动扩展、 Broker 等
- 可使用 Canvas 机制构建复杂工作流
- 有完整的流水线监控
- 可进行时间和速率限制
- 提供了计划任务
- 资源泄露保护
- 自动扩展,保障服务质量
- ……
下面是一个简单入门:
首先,使用 Docker 启动一个 rabbitmq 作为 Broker:
docker run -v rmq-data:/var/lib/rabbitmq/mnesia/node@rabbitmq -d -p 5672:5672 -p 15672:15672 --hostname rabbitmq --name rmq -e RABBITMQ_DEFAULT_USER=user -e RABBITMQ_DEFAULT_PASS=password -e RABBITMQ_NODENAME=node@rabbitmq -e RABBITMQ_DEFAULT_VHOST=my_vhost rabbitmq:management-alpine
安装 Celery
pip install -U Celery
创建定时任务
首先创建一个文件:celery_config.py
,内容如下:
from datetime import timedelta
CELERYBEAT_SCHEDULE = {
# 创建 schedule
'cleanup-something': {
'task': 'tasks.cleanup', # 指定 Task
'schedule': timedelta(seconds=3), # 每 3 秒一次
}
}
创建文件 tasks.py
文件内容如下:
from celery import Celery
import time
app = Celery('tasks', broker='amqp://user:password@localhost:5672/my_vhost')
app.config_from_object('celery_config')
@app.task
def cleanup():
# 定时清理一些东西
print("cleanup something")
# 等待 1 秒
time.sleep(1)
return {
"status": "success"
}
启动 worker
:
celery worker -A tasks
启动成功后,我们创建一个 beat 来触发这个定时任务。
celery beat -A tasks
启动 beat 后会自动创建两个文件:
- celerybeat-schedule.db
- celerybeat.pid
用来存储 Beat 的信息。
此时,再看 Worker 的窗口,Worker 中的 Task 就不断地执行了。
[2020-04-24 11:11:22,065: WARNING/ForkPoolWorker-1] cleanup something
[2020-04-24 11:11:25,066: WARNING/ForkPoolWorker-1] cleanup something
[2020-04-24 11:11:28,066: WARNING/ForkPoolWorker-1] cleanup something
[2020-04-24 11:11:31,066: WARNING/ForkPoolWorker-1] cleanup something
[2020-04-24 11:11:34,066: WARNING/ForkPoolWorker-1] cleanup something
[2020-04-24 11:11:37,065: WARNING/ForkPoolWorker-1] cleanup something
[2020-04-24 11:11:40,065: WARNING/ForkPoolWorker-1] cleanup something
……
创建异步任务
上面是定时任务,由独立进程 Beat 进行触发。下面我们创建一个处理订单的异步任务,需要传入订单 ID,由一个 Task 对订单进行异步处理。
首先在 tasks.py
中添加这个模拟任务,代码如下:
@app.task
def handle_order(order_id):
print("handle order...", order_id)
time.sleep(1)
return {
"status": "success"
}
重启 worker,然后创建 test
文件,内容如下:
from tasks import handle_order
import time
# 异步调用
for i in range(10):
# 每隔 1 秒提交一个订单
handle_order.delay(i+1)
print("send order", i+1)
time.sleep(1)
直接执行:python test.py
可看到 Worker 的执行结果:
[2020-04-24 11:19:51,137: WARNING/ForkPoolWorker-1] handle order...
[2020-04-24 11:19:51,138: WARNING/ForkPoolWorker-1] 1
[2020-04-24 11:19:52,144: WARNING/ForkPoolWorker-1] handle order...
[2020-04-24 11:19:52,144: WARNING/ForkPoolWorker-1] 2
[2020-04-24 11:19:53,150: WARNING/ForkPoolWorker-1] handle order...
[2020-04-24 11:19:53,150: WARNING/ForkPoolWorker-1] 3
[2020-04-24 11:19:54,156: WARNING/ForkPoolWorker-1] handle order...
[2020-04-24 11:19:54,157: WARNING/ForkPoolWorker-1] 4
[2020-04-24 11:19:55,163: WARNING/ForkPoolWorker-1] handle order...
[2020-04-24 11:19:55,164: WARNING/ForkPoolWorker-1] 5
[2020-04-24 11:19:56,168: WARNING/ForkPoolWorker-1] handle order...
[2020-04-24 11:19:56,168: WARNING/ForkPoolWorker-1] 6
[2020-04-24 11:19:57,170: WARNING/ForkPoolWorker-1] handle order...
[2020-04-24 11:19:57,170: WARNING/ForkPoolWorker-1] 7
[2020-04-24 11:19:58,176: WARNING/ForkPoolWorker-1] handle order...
[2020-04-24 11:19:58,177: WARNING/ForkPoolWorker-1] 8
[2020-04-24 11:19:59,182: WARNING/ForkPoolWorker-1] handle order...
[2020-04-24 11:19:59,182: WARNING/ForkPoolWorker-1] 9
[2020-04-24 11:20:00,186: WARNING/ForkPoolWorker-1] handle order...
[2020-04-24 11:20:00,186: WARNING/ForkPoolWorker-1] 10
以上就是一个基础的 Celery 用法了。
获取结果
上面还没有使用到的部件是 Backend,这个用来存储状态和结果。
首先,我们准备一个 redis 来做 backend。可用的 backend 可看官方文档。
docker run -d --name redis -p 6379:6379 -v redis-data:/data redis:5.0.5-alpine
在 celery_config.py
中添加下面的 Backend 的配置:
CELERY_RESULT_BACKEND = 'redis://127.0.0.1:6379/0'
添加一个异步任务
@app.task
def add(x, y):
return x + y
重启 worker。
创建文件 test-add.py
添加代码:
from tasks import add
r = add.delay(1, 2)
# 获取结果
print(r.get(timeout=3))
还需要安装一下 redis 的插件
pip install -U celery[redis]
直接执行 python test-add.py
即可。
如果没有配置好 Backend 的话,会报 AttributeError: 'DisabledBackend' object has no attribute '_get_task_meta_for'
的错
可得结果
3