任务调度schedule和celery

xiaoxiao2025-05-27  42

1. schedule

如果要实现一个小的定时任务脚本,可以采用schedule这个轻量级定时任务调度库。

import schedule import time def job(name): print(name,'do something...') # 每十分钟执行任务 schedule.every(10).minutes.do(job, name) # 每小时执行任务 schedule.every().hour.do(job, name) # 每天定点执行任务 schedule.every().day.at('10:30').do(job, name) # 每周一执行任务 schedule.every().monday.do(job, name) # 每周一定点任务 schedule.every().monday.at('10:30').do(job, name) while True: # 保持任务运行 schedule.run_pending() time.sleep(1)

值得注意的是,如果是多个任务运行,实际上他们是串行执行的。如果上面的任务耗时,会影响下面任务的运行。

对于这种情况,可以使用多线程/多进程来解决。

import datetime import schedule import threading import time def job1(): print('this is job1') time.sleep(2) print('job1:',datetime.datetime.now()) def job2(): print('this is job2') time.sleep(2) print('job2:',datetime.datetime.now()) def task1(): threading.Thread(target=job1).start() def task2(): threading.Thread(target=job2).start() def run(): schedule.every(10).seconds.do(task1) schedule.every(10).seconds.do(task2) while True: schedule.run_pending() time.sleep(1)

schedule的使用比较简单,就是一个死循环执行任务,因此定时任务job不应该是死循环类型的,这个任务线程需要有一个执行完毕的出口,否则会导致无限循环问题。另外一点是,定时任务的执行时间如果比schedule的间隔时间长,同样会造成线程堆积问题,引发异常。

2. celery

celery是一个强大的分布式任务队列,相比schedule更加完备而强大,同时也更加“重”。它可以让任务的执行完全脱离主程序,甚至是分配到其他主机上运行。

通常使用celery来实现异步任务和定时任务。其结构组成如下:

可以看到,celery主要包含以下几个模块:

任务模块

包含异步任务与定时任务。异步任务通常在业务逻辑中被触发,并被发往任务队列;定时任务由celery beat进程周期性地将任务发往任务队列。

消息中间件broker

broker,即为任务调度队列,接收任务生产者发来的任务消息,将任务存入队列。celery本身不提供任务队列,推荐使用RabbitMQ和Redis。

任务执行单元worker

worker实时监控消息队列,获取调度的任务,并执行。

任务结果存储backend

backend用于存储任务的执行结果,通消息中间件一样,存储可使用RabbitMQ,Redis,MongoDB等。

异步任务

使用celery实现异步任务主要包括三个步骤:

创建一个celery实例启动celery worker程序调用异步任务

以下做具体介绍:

1.创建celery实例

这里使用Redis作为broker和backend。

创建文件task.py

import time from celery import Celery # 指定消息中间件用redis broker = 'redis://127.0.0.1:6379' # 指定存储用redis backend = 'redis://127.0.0.1:6379/0' # 创建一个celery实例app,名称为my_task app = Celery('my_task', broker=broker, backend=backend) # 创建一个celery任务add,被@app.task装饰后,成为可被调度的任务 @app.task def add(x, y): time.sleep(5) # 模拟耗时操作 return x+y 2.启动celery worker

在当前目录下,使用如下方式启动celery worker

celery worker -A task --loglevel=info

其中:

参数-A指定了celery实例的位置,这里是task.py中,celery会自动在该文件中寻找celery实例对象,当然也可以直接指定为-A task.app;参数--loglevel指定了日志级别,默认为warning,也可以使用-l info来表示;

3.调用任务

现在可以使用delay()或者apply_async()方法来调用任务。

在当前目录下,打开Python控制台,输入如下:

我们从task.py中导入了add任务对象,然后使用delay()方法发送任务到broker,worker进程监测到该任务后就执行,

这时发现报错,原因是在Windows系统下使用celery4版本,解决方法是安装一个eventlet包,然后启动worker时加一个参数

celery worker -A task -l info -P eventlet

然后就可以正常使用了。

另外如果想获取执行后的结果,可以这样做:

上面是在交互环境中调用任务,实际上通常在程序用调用,建立client.py如下:

from task import add import time print('开始时间', time.ctime()) add.delay(2,5) print('完成时间', time.ctime())

然后执行文件,结果如下:

可以看出,虽然任务函数需要等待5秒才返回结果,但是由于是一个异步任务,不会阻塞当前主程序,所以立刻执行了打印完成的语句。

相比直接把broker和backend配置写入程序代码中,更好的方式是增加一个配置文件,通常命名为`celeryconfig.py`。

__init__.py代码如下:

from celery import Celery # 创建celery实例 app = Celery('demo') # 通过celery实例加载配置模块 app.config_from_object('celery_app.celeryconfig')

celeryconfig.py代码如下:

# 指定broker BROKER_URL = 'redis://127.0.0.1:6379' # 指定backend CELERY_RESULT_BACKEND = 'redis://127.0.0.1:6379/0' # 指定时区 CELERY_TIMEZONE = 'Asia/Shanghai' # 指定导入的任务模块 CELERY_IMPORTS = ( 'celery_app.task1', 'celery_app.task2' )

task1.py代码如下:

import time from celery_app import app @app.task def add(x,y): time.sleep(2) return x+y

task2.py代码如下:

import time from celery_app import app @app.task def multiply(x,y): time.sleep(2) return x*y

client.py代码如下:

import time from celery_app import task1 from celery_app import task2 print('开始', time.ctime()) task1.add.delay(2,3) # delay是apply_async的快捷方式 task2.multiply.apply_async(args=[2,3]) print('完成', time.ctime())

现在可以启动worker进程

然后执行python命令运行client.py文件。

在worker窗口,我们可以看到任务的执行

定时任务

celery beat进程通过读取配置文件的内容,周期性地将定时任务发往任务队列。

除了celerconfig.py内容增加了定时调度内容,其他模块和异步任务相同。

celerconfig.py代码如下:

from datetime import timedelta from celery.schedules import crontab # 指定broker BROKER_URL = 'redis://127.0.0.1:6379' # 指定backend CELERY_RESULT_BACKEND = 'redis://127.0.0.1:6379/0' # 指定时区 CELERY_TIMEZONE = 'Asia/Shanghai' # 指定导入的任务模块 CELERY_IMPORTS = ( 'celery_app.task1', 'celery_app.task2' ) # 定时调度schedules CELERYBEAT_SCHEDULE={ 'add-every-30-seconds':{ 'task':'celery_app.task1.add', 'schedule':timedelta(seconds=30), # 每30秒执行一次 'args':(2,3) # 任务函数参数 }, 'multiply-every-30-seconds':{ 'task':'celery_app.task2.multiply', 'schedule':crontab(hour=14,minute=30), # 每天下午2点30分执行一次 'args':(2,3) # 任务函数参数 } }

现在,启动worker进程,然后启动beat进程,定时任务将被发送到broker

之后在worker窗口,可以看到task1每30秒执行一次,task2则定点执行一次。

为了简化,也可以将启动worker进程和beat进程放在一条命令中:

celery -B -A celery_app worker -l info -P eventlet

文章参考schedules和celery

转载请注明原文地址: https://www.6miu.com/read-5030795.html

最新回复(0)