简单的性能测试:
通用的代码:
config.py(gunicorn+gevent):
# -*-coding:utf-8 -*- __author__ = "ZJL" import gevent.monkey import multiprocessing gevent.monkey.patch_all() # 监听本机的5000端口 bind = '0.0.0.0:5000' preload_app = True # 开启进程 # workers=4 workers = multiprocessing.cpu_count() * 2 + 1 # 每个进程的开启线程 threads = multiprocessing.cpu_count() * 2 backlog = 2048 # 切换这个模式就能转换gevent或者meinheld,其他代码不用变 # 工作模式为gevent worker_class = "gevent" #工作模式为meinheld #worker_class = "egg:meinheld#gunicorn_worker" # debug=True # 如果不使用supervisord之类的进程管理工具可以是进程成为守护进程,否则会出问题 daemon = True # 进程名称 proc_name = 'gunicorn.pid' # 进程pid记录文件 pidfile = 'app_pid.log' loglevel = 'debug' logfile = 'debug.log' accesslog = 'access.log' access_log_format = '%(h)s %(t)s %(U)s %(q)s'
configm.py(gunicorn+meinheld):
# -*-coding:utf-8 -*- __author__ = "ZJL" import multiprocessing # import gevent.monkey # gevent.monkey.patch_all() # from meinheld import patch # patch.patch_all() # 监听本机的5000端口 bind = '0.0.0.0:5000' preload_app = True # 开启进程 # workers=4 workers = multiprocessing.cpu_count() * 2 + 1 # 每个进程的开启线程 threads = multiprocessing.cpu_count() * 2 backlog = 2048 # 工作模式为gevent # worker_class = "gevent" #工作模式为meinheld worker_class = "egg:meinheld#gunicorn_worker" # debug=True # 如果不使用supervisord之类的进程管理工具可以是进程成为守护进程,否则会出问题 daemon = True # 进程名称 proc_name = 'gunicorn.pid' # 进程pid记录文件 pidfile = 'app_pid.log' loglevel = 'debug' logfile = 'debug.log' accesslog = 'access.log' access_log_format = '%(h)s %(t)s %(U)s %(q)s'mongoM.py(用于数据库的读写):
# -*- coding: utf-8 -*- # __author__="ZJL" from pymongo import MongoClient import traceback def decorator(func): def wrapper(self, *args, **kwargs): try: return func(self, *args, **kwargs) except Exception as e: print(traceback.format_exc()) print(str(e)) self.conn.close() return traceback.format_exc() return wrapper def decorator_conn(func): def wrapper(self, *args, **kwargs): try: return func(self, *args, **kwargs) except Exception as e: print(traceback.format_exc()) print(str(e)) return traceback.format_exc() return wrapper # mongdb数据库链接 class MongoDBManager(object): @decorator_conn def __init__(self, mongodb_uri): self.host = mongodb_uri['host'] self.port = mongodb_uri['port'] self.database = mongodb_uri['database'] self.user_name = mongodb_uri['user_name'] self.user_password = mongodb_uri['password'] self.conn = self.createConn(mongodb_uri) # 插入数据 @decorator def insert(self, col, data): if isinstance(data, dict): self.conn[self.database][col].insert_one(data) elif isinstance(data, list): self.conn[self.database][col].insert_many([var for var in data]) else: print('data is not a dict or a list of dicts') return 'data is not a dict or a list of dicts' # 更新 @decorator def update(self, col, data, filter, upsert=True): if isinstance(data, dict): modify = '$set' modify_data = data update_result = self.conn[self.database][col].update_one( filter=filter, update={modify: modify_data}, upsert=upsert ) return update_result else: print('data is not a dict or a list of dicts') return 'data is not a dict or a list of dicts' # 碰到一个字段加减法问题新增这个函数 # collection = self.conn[self.database][col].update({'_id': 1111}, {'$inc': {'aa': 1}}) # '$inc'用来加减,-1就是减法 @decorator def update_new(self, col, filter, data): if isinstance(data, dict) and isinstance(filter, dict): update_result = self.conn[self.database][col].update(filter, data) return update_result else: print('data is not a dict or a list of dicts') return 'data is not a dict or a list of dicts' # 查询数据,返回游标,skip,limit用于分页 # skip为跳过多少条数据,limit为显示多少条数据,skip,limit都是0,显示所有 # @decorator # def find_one(self, col, filter={}, kwargs={}, skip=0, limit=0): # kwargs用于返回指定字段 # if kwargs: # return self.find(col, filter, kwargs) # else: # return self.find(col, filter) # 查询一条数据 @decorator def find(self, col, filter={}, kwargs={}): if isinstance(filter, dict): # kwargs用于返回指定字段 if kwargs: return self.conn[self.database][col].find_one(filter, kwargs) else: return self.conn[self.database][col].find_one(filter) else: print("type not dict!") return "type not dict!" # 查询多条数据,返回一个可迭代类型 # skip, limit用于分页,skip为跳过多少条数据,limit为显示多少条数据,skip,limit都是0,显示所有 # sortkey是排序关键字,sort是排序,-1是降序,1是升序 @decorator def find_cursor(self, col, filter={}, kwargs={}, skip=0, limit=0, sortkey="", sort=-1): if isinstance(filter, dict): # kwargs用于返回指定字段 if kwargs and sortkey: return self.conn[self.database][col].find(filter, kwargs).skip(skip).limit(limit).sort(sortkey, sort) elif kwargs and not sortkey: return self.conn[self.database][col].find(filter, kwargs).skip(skip).limit(limit) elif not kwargs and sortkey: return self.conn[self.database][col].find(filter).skip(skip).limit(limit).sort(sortkey, sort) else: return self.conn[self.database][col].find(filter).skip(skip).limit(limit) else: print("type not dict!") return "type not dict!" # find_cursor_data = list(find_cursor()) 是一个list # find_cursor()的结果赋值给一个空list,结果就是一个List,所以这个方法舍弃 # 查询多条数据,返回一个list # skip, limit用于分页,skip为跳过多少条数据,limit为显示多少条数据,skip,limit都是0,显示所有 # @decorator # def find_cursor_list(self, col, filter={}, kwargs={}, skip=0, limit=0): # if isinstance(filter, dict): # data_list = [] # # kwargs用于返回指定字段 # if kwargs: # data_dict = self.conn[self.database][col].find(filter, kwargs).skip(skip).limit(limit) # if data_dict: # for d in data_dict: # data_list.append(d) # return data_list # else: # data_dict = self.conn[self.database][col].find(filter).skip(skip).limit(limit) # if data_dict: # for d in data_dict: # data_list.append(d) # return data_list # else: # print("type not dict!") # return "type not dict!" # 删除数据 @decorator def remove(self, col, filter={}): if isinstance(filter, dict): return self.conn[self.database][col].remove(filter) else: print("type not dict!") return "type not dict!" # 统计 @decorator def count(self, col, filter={}): if isinstance(filter, dict): return self.conn[self.database][col].find(filter).count() else: print("type not dict!") return "type not dict!" # 连接数据库 @classmethod @decorator_conn def createConn(cls, mongodb_uri): conn = MongoClient( host=mongodb_uri['host'], port=mongodb_uri['port'], ) db = conn[mongodb_uri['database']] # 身份认证,有用户名密码放开这段注释 # db.authenticate(name=mongodb_uri['user_name'], password=mongodb_uri['password']) return conn # 关闭数据库 def close(self): self.conn.close() MONGODB_URI={ "host":"127.0.0.1", "port": 27017, "database":"py_demo", "user_name":"", "password":"", } mb = MongoDBManager(MONGODB_URI) # a = mb.find_cursor("zjl_demo",{"a":"1"}) # print(list(a))flasktest.py:
from flask import Flask from werkzeug.contrib.fixers import ProxyFix from mogodbM import mb app = Flask(__name__) @app.route("/") def index(): # 插入mongodb数据 mb.insert('zjl_demo',[{'a':'1','b':[1,2,3,4,"哈哈哈"]}]) # 查询数据 mb.find_cursor("zjl_demo",{"a":"1"}) return "hello world" #项目的代理设置 app.wsgi_app = ProxyFix(app.wsgi_app) if __name__ == "__main__": app.run()所有测试都用:siege -c 200 -r 100 http://127.0.0.1:5000
flask启动命令(gunicorn+gevent):gunicorn -c config.py flasktest:app
flask + gunicorn + gevent结果:
flask启动命令(gunicorn+meinheld):gunicorn -c configm.py flasktest:app
flask + gunicorn + meinheld结果:
bottletest.py: # -*-coding:utf-8 -*- __author__ = "ZJL" from bottle import route, run from mogodbM import mb @route('/') def hello(): # 插入mongodb数据 mb.insert('zjl_demo', [{'a': '1', 'b': [1, 2, 3, 4, "哈哈哈"]}]) # 查询数据 mb.find_cursor("zjl_demo", {"a": "1"}) return "Hello World!" run(server='gunicorn',host='127.0.0.1', port=5000)# debug=Truebottle启动命令(gunicorn+gevent):gunicorn -c config.py bottletest:appbottle + gunicorn + gevent结果:
bottle启动命令(gunicorn+meinheld):gunicorn -c configm.py bottletest:app bottle + gunicorn + meinheld结果:
Transactions(完成次数) Availability(可用性) Elapsed time(总共使用时长) Data transferred(数据传输) Response time(响应时间,显示网络连接的速度) Transaction rate(平均每秒完成的处理次数) Throughput(平均每秒传送数据) Concurrency(实际最高并发连接数) Successful transactions(成功处理次数) Failed transactions(失败处理次数) Longest transaction(最长传输时长) Shortest transaction(最短传输时长)
之所以做这个简单测试是因为meinheld号称性能怪兽
网上说flask在高并发下表现不如bottle