flask + celery+redis

xiaoxiao2021-03-01  33

flask 是一个轻型框架,这里使用celery 任务调度算法

第一阶段 先创建一个 合格的flask 框架,在这个基础上再使用celery 任务调度

1、简易的目录结构 celery_task 是为了调用celery 处理任务的目录;flaskapp是存放开发应用的目录;test 是测试任务处理的目录

manage 是整体控制的文件

2、开始创建基本 manage.py 

#-*- coding:utf-8 -*-from flask import Flask, jsonify, requestapp = Flask(__name__)@app.route('/api/task_start',methods = ['GET','POST'])def task_start(): request_data = request.get_json(force=True) #获取提交请求的原始json 数据 task_data = {} task_data['url'] = request_data.get("target") return jsonify(task_data)@app.route('/api/task_status',methods = ["GET","POST"])def task_status(): pass@app.route('/api/task_reslut',methods = ["GET","POST"])def task_result(): passif __name__ == '__main__': app.run(debug=True,port=9999)

3、现在 可以运行基础的flask 了,使用curl 提交请求

curl -i -X POST -d '{"target":"www.baidu.com"}' http://127.0.0.1:9999/api/task_start

第二阶段:

在第一阶段的基础上 开始使用celery 任务调度,我这使用 redis 做为缓存服务器,安装配置redis 这里不再赘述

1、创建taskmanage.py 用于celery

2、启动celery

celery -A celery_task.taskmanage  worker --loglevel=info

curl -i -X POST -d '{"target":"www.baidu.com"}' http://127.0.0.1:9999/api/task_start

调用该task_start api 启动任务

3、编写结果查询 api

在manage .py 文件中@app.route('/api/task_result',methods = ["GET","POST"])def task_result(): data = json.loads(request.get_data()) task_id = data['task_id'] result = get_result(task_id)

return make_response(jsonify(result=result,task_id=task_id))

在taskmanage文件中

def get_result(task_id): result = app.AsyncResult(task_id) return result.result

运行结果:

在redis 数据中查看:

转载请注明原文地址: https://www.6miu.com/read-3449976.html

最新回复(0)