Zhong__Celery基本使用详解
创始人
2024-06-01 07:54:26
0

时间:2023.03.10

环境:python3/centos/redis

目的:演示celery基本使用的详细案例

说明:python依赖的版本以requirement.txt文件为测试基准 不同版本可能存在差异

作者:Zhong

简介

简介及概念介绍部分不会很详细 主要看demo项目代码

Celery包含如下组件:

1. Celery Beat:任务调度器,Beat进程会读取配置文件的内容,周期性地将配置中到期需要执行的任务发送给任务队列。

2. Celery Worker:执行任务的消费者,通常会在多台服务器运行多个消费者来提高执行效率。

3. Broker:消息代理,或者叫作消息中间件,接受任务生产者发送过来的任务消息,存进队列再按序分发给任务消费方(通常是消息队列或者数据库)。

4. Producer:调用了Celery提供的API、函数或者装饰器而产生任务并交给任务队列处理的都是任务生产者。

5. Result Backend:任务处理完后保存状态信息和结果,以供查询。Celery默认已支持Redis、RabbitMQ、MongoDB、Django ORM、SQLAlchemy等方式。

核心架构图

Broker(消息中间件)

用来接收调用任务的生产者发送的任务 分发到相应worker执行

worker(消费者)

执行任务的实体

启动一个worker

# 处理所有的默认队列任务
celery -A celery_test worker -l INFO# 处理指定的队列任务
celery -A proj worker --loglevel=info -Q queue1,queue2

启动多个worker分别处理不同的队列任务

celery -A proj worker --loglevel=info -Q queue1
celery -A proj worker --loglevel=info -Q queue2

beat(调度器)

beat是一个调度器,它可以指定在什么时候某个worker来执行某个任务。如果我们想周期执行/定时执行某个任务 需要增加beat_schedule配置信息 在celery指定的配置文件中配置

beat_schedule = {# 周期任务/定时任务'every-5-minute':{'task': 'proj.tasks.period_task','schedule': 300.0,'args': (10, 20),},'add-every-monday-morning': {'task': 'proj.tasks.period_task','schedule': crontab(hour=7, minute=30, day_of_week=1),'args': (100, 200),},
}

开启一个celery beat服务

celery -A proj beat

celery需要保存上次任务运行的时间在数据文件中,文件在当前目录下名字叫celerybeat-schedule beat需要访问此文件

celery -A proj beat -s /home/celery/var/run/celerybeat-schedule

Routing(任务路由分发)

配置文件中指定任务发送到哪个队列中执行

task_routes=({'proj.tasks.task1': {'queue': 'queue1'},'proj.tasks.task2': {'queue': 'queue1'},'proj.tasks.task3': {'queue': 'queue2'},},
)

也可以通过apply_async()方法设置任务发送到哪个队列中执行

task1.apply_async(queue='queue1')

Producer(生产者)

调用任务 任务生产者

from proj.tasks import add# 直接调用 不会传递到worker执行 会在当前进程直接执行
add(1, 2)# delay 发送任务到broker 然后调度到worker执行
add.delay(1, 2)# apply_async 发送任务到broker 然后调度到worker执行 可设置一些任务执行的参数
add.apply_async((1, 2), queue='vip', countdown=10)

无论是delay()还是apply_async()方式都会返回AsyncResult对象,方便跟踪任务执行状态,但需要我们配置result_backend. 每一个被调用的任务都会被分配一个ID,我们叫做Task ID.

Result Backend

存储任务信息 包括状态、结果等

# 配置文件中指定
result_backend = 'redis://192.168.1.1:6379/1'# Celery配置中指定
app = Celery('proj',broker='redis://192.168.1.1:6379/0',backend='redis://192.168.1.1:63799/1',include=['proj.tasks'])

获取任务结果 返回的是一个AsyncResult对象

res = add.delay(1, 2)
print(res.get(timeout=30))

如果没有指定Result Backend是不能获取返回结果的 提示: NotImplementedError: No result backend is configured.

详细可配置项目选项可参考官网文档

celery项目选项配置

项目demo

通过一个demo演示各种task的基本使用

代码仓库

详见gitee: celery_demo

项目目录结构

celery_demo/

proj/

moduleA/

__init__.py

tasks.py

moduleB/

__init__.py

mod_b.py

moduleC/

tasks.py

__init__.py

celery.py

celery_config.py

tasks.py

call_tasks.py

requirement.txt

部署

redis

本demo使用redis作为broker 也可以根据需求使用mq等为消息中间件

安装redis 也可以使用docker启动 配置可远程访问

python环境

在各个要部署代码的主机上安装配置响应的python3环境 建议版本3.7及以上 并安装依赖文件requirement.txt 建议使用python的虚拟环境安装

pip3 install -i https://pypi.douban.com/simple -r requirement.txt

主机网络

部署测试demo的主机建议使用linux系统如centos 不要直接使用windows 除非你可以包装为服务来避免各种问题

测试可全部在一台主机上部署 也可以部署在不同的服务器上 保证各主机可通讯正常 主要是可与redis服务正常通讯

ip为192.168.1.1的服务器上 部署redis redis地址不能为本地地址如127.0.0.1 别的主机要能访问到它 单机部署无所谓

ip为192.168.1.2的服务器上 部署beat

ip为192.168.1.3的主机部署一个worker处理默认的task

ip为192.168.1.4的主机部署一个worker处理queue为queue1的task

ip为192.168.1.5的主机部署一个worker处理queue为queue2的task

代码分发

将相同的代码分别复制到ip为192.168.1.2-192.168.1.5的主机上 安装python依赖 当然根据需求部分代码如调用的task代码可以不同

启动

192.168.1.1主机启动redis

192.168.1.3主机启动worker 在celery_demo目录下proj目录同级执行

celery -A proj worker --loglevel=info

192.168.1.3主机启动worker 在celery_demo目录下proj目录同级执行

celery -A proj worker --loglevel=info -Q queue1

192.168.1.3主机启动worker 在celery_demo目录下proj目录同级执行

celery -A proj worker --loglevel=info -Q queue2

ip为192.168.1.2主机启动beat

celery -A proj beat -s ./celerybeat-schedule

当所有服务启动后 可以观察到worker主机会输出celery及task相关的信息 还有各个worker节点同步的信息 它们主要是通过broker代理来保持协作

在指定的beat任务执行时 可以看到worker处理beat task的输出 主要根据机制算法分发到某台worker去执行任务

测试

在某台主机调用call_tasks.py

python3 call_tasks.py

可以观察到默认的task 指定queue的task 都在对应的worker主机上得到了执行

Note

celery可设置多种类型的任务 可以集成到其它框架如Django使用

更多详情见官网文档 ... ...

相关内容

热门资讯

国家助学贷款申请书 国家助学贷款申请书(精选10篇)  国家助学贷款是由政府主导、财政贴息、财政和高校共同给予银行一定风...
工厂离职申请书 工厂离职申请书(通用16篇)  随着时代在进步,我们会使用上申请书,申请书是我们平时提出请求的一种书...
贫困户申请书 贫困户申请书模板合集十篇  当下市场经济活跃,交易频繁,申请书在生活中的使用越来越广泛,请注意不同的...
澳洲留学申请书 澳洲留学申请书6篇  当下市场经济活跃,交易频繁,有各项事务需要申请书,通过申请书,我们可以提出自己...
最新入党申请书 最新入党申请书范文(通用6篇)  在市场经济发展迅速的今天,申请书在生活中的使用越来越广泛,正确运用...
家长的同意退学申请书 家长的同意退学申请书(精选14篇)  在经济飞速发展、人们往来越来越密切的今天,申请书在现实生活中使...
申请优惠书范文怎么写优选10... 申请优惠书范文怎么写 第一篇尊敬的各位领导、各位同事,大家好!我是客服部的***,非常荣幸参加这次的...
法院延期开庭申请书_如何向法... 如果要向法院申请延期开庭,要如何写申请书呢?下面是法院延期开庭申请书,欢迎阅读参考。法院延期开庭申请...
教师补助申请格式 教师补助申请格式尊敬的领导:您好!我是xx市大xx镇xx中心小学校的一名教师。很荣幸能够成为教育大军...
封井申请书范文16篇 封井申请书范文 第一篇协 议 书甲方:潍坊锦源水利建筑安装工程有限公司乙方:经甲乙双方相互协商同意,...
民政补助申请书 民政补助申请书(精选5篇)  在现在的社会生活中申请书在现实生活中使用广泛,正确运用申请书可以达到事...
质量保证金退还申请书样本 一、工程质量保证金退还申请书样本工程名称:2009 年省级投资秭归县杨林桥等六个乡镇低丘岗地改造项目...
小学贫困生申请书 小学贫困生申请书  贫困生申请书范文(一)  尊敬的领导:  我叫李xxx,家住东镇镇中羊泉西村,在...
孩子改名申请书范文   孩子的名字会影响孩子的一生,如果一开始起名不够优雅可以再去修改,下面就是小编为您收集整理的孩子改...
贫困生补助申请理由 贫困生补助申请理由200字(精选10篇)  一些贫困生因家庭经济困难,申请贫困补助,这个时候需要去撰...
如何写申请书 如何写申请书  一、申请书的含义  申请书是个人或集体向组织、机关、企事业单位或社会团体表述愿望、提...
解除财产保全申请书 解除财产保全申请书解除财产保全申请书 第一篇:解除财产保全书范文申请人:王XX,女,白族,1983年...
事故索赔申请书 事故索赔申请书范文(精选5篇)  随着社会在进步,申请书与我们不再陌生,申请书不同于其他书信,是一种...
入团申请书 入团申请书模板400字(通用15篇)  在当今社会生活中,很多场合都离不了申请书,利用申请书我们可以...
困难救助申请书 困难救助申请书(精选7篇)  在眼下市场经济活跃的社会,我们会使用上申请书,不同的使用场景有不同的申...