用 celery 实现 多个队列

xiaoxiao2021-02-28  23

celery 是python中 比较流行的定时任务,但是在使用过程中有两个需求比较重要 ,那就是定时任务和优先级。

首先是定时任务,定时任务 也就是在固定的时间执行固定的任务。但是通常需要硬编码。

然后是优先级,比如在实际业务中,有的任务比较重要,我们希望他能优先执行。

#app.py

# *-* coding: utf-8 *-* from flask import Flask import tasks app = Flask(__name__) @app.route('/images', methods=['GET', 'POST']) def image(): tasks.image.delay() return "image success" @app.route('/video/', methods=['GET']) def video(): tasks.video.delay() return "video success" @app.route('/common/', methods=['GET', 'POST']) def common(): tasks.common.delay() return "common success" if __name__ == "__main__": app.run(debug=True)

app中定义web应用  ,假设每个接口都是耗时的,每个接口任务找不同的任务队列

#tasks.py

# *-* coding: utf-8 *-* import time from celery import Celery celeryapp = Celery(broker='redis://localhost:6379/2') celeryapp.config_from_object('celeryconfig') @celeryapp.task def video(): print "processing video" time.sleep(10) @celeryapp.task def image(): print "processing image" time.sleep(5) @celeryapp.task def common(): print "processing common" time.sleep(3)

#celeryconfig.py

# *-* coding:utf-8 *-* from kombu import Exchange, Queue CELERY_TIMEZONE = 'Asia/Shanghai' CELERY_BROKER = 'redis://locahost:6379/1' default_exchange = Exchange('default', type='direct') media_exchange = Exchange('media', type='direct') #定义3个不同的队列 CELERY_QUEUES = ( Queue('default', default_exchange, routing_key='default'), Queue('videos', media_exchange, routing_key='media.video'), Queue('images', media_exchange, routing_key='media.image'), ) CELERY_DEFAULT_QUEUE = 'default' CELERY_DEFAULT_EXCHANGE = 'default' CELERY_DEFAULT_ROUTTING_KEY = 'default' #配置路由 不同第任务 分配到不同的队列当中, 通过routing_key CELERY_ROUTES ={ 'tasks.image': { 'queue': 'images', 'routing_key': 'media.image' }, 'tasks.video': { 'queue': 'videos', 'routing_key': 'media.video' } } CELERY_IMPORTS = ('tasks')

启动web应用   python app.py

启动不同的worker   celery worker -A tasks --loglevel=info --queues=videos  #启动处理video的队列

                                celery worker -A tasks --loglevel=info --queues=images # 启动处理image的队列

                                celery worker -A tasks --loglevel=info --queues=default  #没有加入路由的  就会分配到默认的队列

这样 不同的任务就会被分配到不同的队列中去

转载请注明原文地址: https://www.6miu.com/read-2630317.html

最新回复(0)