Zhong__Celery基本使用详解
创始人
2024-06-01 07:54:26
0

时间:2023.03.10

环境:python3/centos/redis

目的:演示celery基本使用的详细案例

说明:python依赖的版本以requirement.txt文件为测试基准 不同版本可能存在差异

作者:Zhong

简介

简介及概念介绍部分不会很详细 主要看demo项目代码

Celery包含如下组件:

1. Celery Beat:任务调度器,Beat进程会读取配置文件的内容,周期性地将配置中到期需要执行的任务发送给任务队列。

2. Celery Worker:执行任务的消费者,通常会在多台服务器运行多个消费者来提高执行效率。

3. Broker:消息代理,或者叫作消息中间件,接受任务生产者发送过来的任务消息,存进队列再按序分发给任务消费方(通常是消息队列或者数据库)。

4. Producer:调用了Celery提供的API、函数或者装饰器而产生任务并交给任务队列处理的都是任务生产者。

5. Result Backend:任务处理完后保存状态信息和结果,以供查询。Celery默认已支持Redis、RabbitMQ、MongoDB、Django ORM、SQLAlchemy等方式。

核心架构图

Broker(消息中间件)

用来接收调用任务的生产者发送的任务 分发到相应worker执行

worker(消费者)

执行任务的实体

启动一个worker

# 处理所有的默认队列任务
celery -A celery_test worker -l INFO# 处理指定的队列任务
celery -A proj worker --loglevel=info -Q queue1,queue2

启动多个worker分别处理不同的队列任务

celery -A proj worker --loglevel=info -Q queue1
celery -A proj worker --loglevel=info -Q queue2

beat(调度器)

beat是一个调度器,它可以指定在什么时候某个worker来执行某个任务。如果我们想周期执行/定时执行某个任务 需要增加beat_schedule配置信息 在celery指定的配置文件中配置

beat_schedule = {# 周期任务/定时任务'every-5-minute':{'task': 'proj.tasks.period_task','schedule': 300.0,'args': (10, 20),},'add-every-monday-morning': {'task': 'proj.tasks.period_task','schedule': crontab(hour=7, minute=30, day_of_week=1),'args': (100, 200),},
}

开启一个celery beat服务

celery -A proj beat

celery需要保存上次任务运行的时间在数据文件中,文件在当前目录下名字叫celerybeat-schedule beat需要访问此文件

celery -A proj beat -s /home/celery/var/run/celerybeat-schedule

Routing(任务路由分发)

配置文件中指定任务发送到哪个队列中执行

task_routes=({'proj.tasks.task1': {'queue': 'queue1'},'proj.tasks.task2': {'queue': 'queue1'},'proj.tasks.task3': {'queue': 'queue2'},},
)

也可以通过apply_async()方法设置任务发送到哪个队列中执行

task1.apply_async(queue='queue1')

Producer(生产者)

调用任务 任务生产者

from proj.tasks import add# 直接调用 不会传递到worker执行 会在当前进程直接执行
add(1, 2)# delay 发送任务到broker 然后调度到worker执行
add.delay(1, 2)# apply_async 发送任务到broker 然后调度到worker执行 可设置一些任务执行的参数
add.apply_async((1, 2), queue='vip', countdown=10)

无论是delay()还是apply_async()方式都会返回AsyncResult对象,方便跟踪任务执行状态,但需要我们配置result_backend. 每一个被调用的任务都会被分配一个ID,我们叫做Task ID.

Result Backend

存储任务信息 包括状态、结果等

# 配置文件中指定
result_backend = 'redis://192.168.1.1:6379/1'# Celery配置中指定
app = Celery('proj',broker='redis://192.168.1.1:6379/0',backend='redis://192.168.1.1:63799/1',include=['proj.tasks'])

获取任务结果 返回的是一个AsyncResult对象

res = add.delay(1, 2)
print(res.get(timeout=30))

如果没有指定Result Backend是不能获取返回结果的 提示: NotImplementedError: No result backend is configured.

详细可配置项目选项可参考官网文档

celery项目选项配置

项目demo

通过一个demo演示各种task的基本使用

代码仓库

详见gitee: celery_demo

项目目录结构

celery_demo/

proj/

moduleA/

__init__.py

tasks.py

moduleB/

__init__.py

mod_b.py

moduleC/

tasks.py

__init__.py

celery.py

celery_config.py

tasks.py

call_tasks.py

requirement.txt

部署

redis

本demo使用redis作为broker 也可以根据需求使用mq等为消息中间件

安装redis 也可以使用docker启动 配置可远程访问

python环境

在各个要部署代码的主机上安装配置响应的python3环境 建议版本3.7及以上 并安装依赖文件requirement.txt 建议使用python的虚拟环境安装

pip3 install -i https://pypi.douban.com/simple -r requirement.txt

主机网络

部署测试demo的主机建议使用linux系统如centos 不要直接使用windows 除非你可以包装为服务来避免各种问题

测试可全部在一台主机上部署 也可以部署在不同的服务器上 保证各主机可通讯正常 主要是可与redis服务正常通讯

ip为192.168.1.1的服务器上 部署redis redis地址不能为本地地址如127.0.0.1 别的主机要能访问到它 单机部署无所谓

ip为192.168.1.2的服务器上 部署beat

ip为192.168.1.3的主机部署一个worker处理默认的task

ip为192.168.1.4的主机部署一个worker处理queue为queue1的task

ip为192.168.1.5的主机部署一个worker处理queue为queue2的task

代码分发

将相同的代码分别复制到ip为192.168.1.2-192.168.1.5的主机上 安装python依赖 当然根据需求部分代码如调用的task代码可以不同

启动

192.168.1.1主机启动redis

192.168.1.3主机启动worker 在celery_demo目录下proj目录同级执行

celery -A proj worker --loglevel=info

192.168.1.3主机启动worker 在celery_demo目录下proj目录同级执行

celery -A proj worker --loglevel=info -Q queue1

192.168.1.3主机启动worker 在celery_demo目录下proj目录同级执行

celery -A proj worker --loglevel=info -Q queue2

ip为192.168.1.2主机启动beat

celery -A proj beat -s ./celerybeat-schedule

当所有服务启动后 可以观察到worker主机会输出celery及task相关的信息 还有各个worker节点同步的信息 它们主要是通过broker代理来保持协作

在指定的beat任务执行时 可以看到worker处理beat task的输出 主要根据机制算法分发到某台worker去执行任务

测试

在某台主机调用call_tasks.py

python3 call_tasks.py

可以观察到默认的task 指定queue的task 都在对应的worker主机上得到了执行

Note

celery可设置多种类型的任务 可以集成到其它框架如Django使用

更多详情见官网文档 ... ...

相关内容

热门资讯

六年级毕业作文(实用6篇) 六年级毕业作文 篇一:我的小学生活六年级毕业作文 篇二:成长的足迹六年级毕业作文 篇一:我的小学生活...
让生活更美好作文500字六年... 让生活更美好作文500字六年级 篇一生活,是我们每个人都要面对的,无论是喜是忧,都离不开生活。那么,...
感动小学六年级作文300字(... 感动小学六年级作文300字 篇一:爱心传递的故事这是一个关于爱心传递的故事。有一天,我放学后回家的路...
六年级下册语文第二单元作文5... 六年级下册语文第二单元作文500字 篇一我最喜欢的节日我最喜欢的节日是春节。春节是中国最重要的传统节...
精彩的生活六年级作文(通用3... 精彩的生活六年级作文 篇一快乐的学习生活我是一个六年级的学生,我的生活非常精彩。每天早上,当我听到闹...
什么让生活更美好作文600字... 篇一:什么让生活更美好生活是一幅绚丽多彩的画卷,我们每个人都希望能够过上美好的生活。那么,什么让生活...
暑假去海边玩作文600六年级... 暑假去海边玩作文600六年级 篇一我开心的暑假海边之旅暑假终于到了,我迫不及待地和家人一起去海边度假...
烟花大会六年级作文【优秀3篇... 烟花大会六年级作文 篇一:烟花大会的美丽夜晚烟花大会是我最期待的一个活动,每年一次的盛大烟花表演总能...
变形记六年级作文(实用6篇) 变形记六年级作文 篇一我的变形记之旅我是一个普通的小学六年级学生,最近我读了一本引人入胜的小说《变形...
我是小农夫小学六年级记叙文(... 我是小农夫小学六年级记叙文 篇一我是小农夫我是一个小农夫,今年上小学六年级。在我家乡的小村庄里,大家...
采茶姑娘六年级作文【实用5篇... 采茶姑娘六年级作文 篇一茶园里的孩子们我家住在一个美丽的小山村,村子周围都是茶园。每年的春天,当茶叶...
我的假期生活六年级作文【优秀... 我的假期生活六年级作文 篇一我喜欢的假期活动假期对于学生来说,是一段宝贵的时间。在假期里,我可以尽情...
地球的自述六年级作文500字... 篇一:地球的自述你好,我是地球,这是我第一次向你们自述。作为宇宙中唯一一个有生命存在的行星,我感到非...
泪作文600字六年级(优秀6... 泪作文600字六年级 篇一:泪水中的勇敢泪水是我们内心的一种表达,它可以是喜悦的泪水,也可以是悲伤的...
六年级上册第一单元作文500... 六年级上册第一单元作文500字 篇一我的暑假生活暑假到了,我迫不及待地开始了我的暑假生活。这个暑假,...
小动物三年级作文300字【经... 小动物三年级作文300字 篇一我喜欢小动物我是一个小动物爱好者,我喜欢各种各样的小动物。我觉得它们都...
秋韵六年级作文【经典6篇】 秋韵六年级作文 篇一我与秋天的邂逅秋天是一个美丽的季节,它给大地披上了五彩斑斓的外衣。我喜欢秋天,因...
功夫不负有心人作文(优秀3篇... 功夫不负有心人作文 篇一在人生的道路上,我们常常会遇到各种各样的困难和挑战。然而,只要我们拥有一颗有...
三年级作文做家务200字左右... 三年级作文做家务200字左右 篇一帮助家人,乐在其中家务是我们生活中不可避免的一部分,也是我们孝敬父...
三年级作文254字(精彩6篇... 三年级作文254字 篇一最喜欢的动物我最喜欢的动物是猫。猫是一种非常可爱的动物,它们有着柔软的皮毛和...