场景说明
假设有一个mysql表被水平切分,分散到多个host中,每个host拥有n个切分表。 如果需要并发去访问这些表,快速得到查询结果, 应该怎么做呢? 这里提供一种方案,利用python3的asyncio异步io库及aiomysql异步库去实现这个需求。
代码演示
import logging
import random
import asyncio
from aiomysql
import create_pool
TBLES = {
"192.168.1.01":
"table_000-015",
"192.168.1.02":
"table_016-031",
"192.168.1.03":
"table_032-047",
"192.168.1.04":
"table_048-063",
"192.168.1.05":
"table_064-079",
"192.168.1.06":
"table_080-095",
"192.168.1.07":
"table_096-0111",
"192.168.1.08":
"table_112-0127",
}
USER =
"xxx"
PASSWD =
"xxxx"
def query_wrapper(func):
async
def wrapper(*args, **kwargs):
try:
await func(*args, **kwargs)
except Exception
as e:
print(e)
return wrapper
@query_wrapper
async
def query_do_something(ip, db, table):
async
with create_pool(host=ip, db=db, user=USER, password=PASSWD)
as pool:
async
with pool.get()
as conn:
async
with conn.cursor()
as cur:
sql = (
"select xxx from {} where xxxx")
await cur.execute(sql.format(table))
res = await cur.fetchall()
def gen_tasks():
tasks = []
for ip, tbls
in TBLES.items():
cols = re.split(
'_|-', tbls)
tblpre =
"_".join(cols[:-
2])
min_num = int(cols[-
2])
max_num = int(cols[-
1])
for num
in range(min_num, max_num+
1):
tasks.append(
(query_do_something, ip,
'your_dbname',
'{}_{}'.format(tblpre, num))
)
random.shuffle(tasks)
return tasks
def run_tasks(tasks, batch_len):
try:
for idx
in range(
0, len(tasks), batch_len):
batch_tasks = tasks[idx:idx+batch_len]
logging.info(
"current batch, start_idx:%s len:%s" % (idx, len(batch_tasks)))
for i
in range(
0, len(batch_tasks)):
l = batch_tasks[i]
batch_tasks[i] = asyncio.ensure_future(
l[
0](*l[
1:])
)
loop.run_until_complete(asyncio.gather(*batch_tasks))
except Exception
as e:
logging.warn(e)
def main():
loop = asyncio.get_event_loop()
tasks = gen_tasks()
batch_len = len(TBLES.keys()) *
5
run_tasks(tasks, batch_len)
loop.close()