import pymysql
import base64
from ctypes
import *
import ctypes
import os
current_path
= os.path.dirname(__file__)
from iholter_def_struc
import ECGRTHead,StruEcgProperty
def init_database()
:
try:
conn
= pymysql.connect(
host='10.10.9.157',
port=3306,
user='chuanyhu',
passwd='Chuanyhu@123',
db='yocaly_hcy_ecgtest',
charset='gbk')
cursor
= conn.cursor()
return conn,cursor
except:
print(
'Error:数据库连接失败')
return None
'''查找需要的文件 database.execute('select *from temp_ecg_testdata where c_level = %s and c_dllLevel = %s ',['紧急','一般'])'''
def select_data(
sql,
*args)
:
try:
conn, cursor
= init_database()
cursor.execute(
sql,
args)
datas
= cursor.fetchall()
#print(datas)
#print(len(datas))
cursor.close()
conn.close()
return datas
except:
print(
"Error:数据查找错误")
return None
'''将查询的数据进行解码'''
def decode_data(
datas)
:
try:
deData
= base64.b64decode(
datas)
return deData
except:
print(
"Error:解码失败")
return None
'''创建数据库'''
def create_database(
sql)
:
try:
conn, cursor
= init_database()
cursor.execute(
'drop table if EXISTS ECG_RealAnalysisFile_compare')
cursor.execute(
sql)
conn.close()
return True
except:
print(
'Error:创建数据库失败')
return None
'''向数据库中插入数据'''
def insert_database(
sql,
*args)
:
conn, cursor
= init_database()
try:
cursor.execute(
sql,
args)
conn.commit()
cursor.close()
conn.close()
return True
except:
print(
"Error:数据插入失败")
return None
'''读取数据'''
def read_data(
deData)
:
try:
struct_prop
= read_rt_head(
deData[
0:180])
ecg_lead_s_datas
= read_rt_body_data(
deData[
180:],struct_prop)
return struct_prop,ecg_lead_s_datas
except:
print(
"Error:数据读取出错1")
def read_rt_head(
head_buf)
:
rt_head
= ECGRTHead(
head_buf)
ecg_prop
= rt_head.get_ecg_property()
struct_prop
= StruEcgProperty(ecg_prop.get_sample_rate(),
4,ecg_prop.get_data_length(),
len(rt_head.get_lead_names()),rt_head.get_lead_names())
return struct_prop
def read_rt_body_data(
body_buf,
struct_prop)
:
ecg_leads_datas
= []
for ind
in range(
struct_prop.get_data_channel_num())
:
ecg_leads_datas.append([])
piece_count
= len(
body_buf)
//struct_prop.get_piece_len()
for piece_ind
in range(piece_count)
:
start_ind
= piece_ind
* struct_prop.get_piece_len()
+ struct_prop.get_piece_head_len()
end_ind = (piece_ind
+1)
*struct_prop.get_piece_len()
lead_num
= struct_prop.get_data_channel_num()
data_len
= struct_prop.get_data_len()
ch_precision
= int(
struct_prop.get_data_precision())
lead_data_len
= data_len
//lead_num
//ch_precision
for ind
in range(lead_data_len)
:
for lead_ind
in range(lead_num)
:
ch_data
= read_transformed_chdata(
body_buf[start_ind
+(ind
*lead_num
+lead_ind)
*ch_precision
:start_ind
+(ind
*lead_num
+lead_ind
+1)
*ch_precision],ch_precision,
struct_prop,
True)
ecg_leads_datas[lead_ind].append(ch_data)
return ecg_leads_datas
def read_transformed_chdata(
byte_data,
ch_size,
ecg_struct_prop,
is_read_origin= False)
:
if ch_size == 1:
if is_read_origin:
return byte_data[
0]
value
= byte_data[
0]
- 128
value
= value
/ecg_struct_prop.get_max_value()
* ecg_struct_prop.get_real_input_max_value()
*0.01
return value
elif ch_size == 2:
value
= ctypes.c_int16(
byte_data[
0]
| byte_data[
1]
<<8).value
if is_read_origin:
return value
value
= value
/ecg_struct_prop.get_max_value()
* ecg_struct_prop.get_real_input_max_value()
*0.01
return value
if __name__
== '__main__':
'''创建数据表'''
sql_creat
= 'create table ECG_RealAnalysisFile_compare(c_servicecode char(100),noiseLeve char(100), typeBeatClass char(100))'
create_database(sql_creat)
'''查询数据库'''
sql_select
= 'select * from temp_ecg_testdata '#where c_level = %s and c_dllLevel = %s'
#datas = select_data(sql_select,'次紧急','一般') #'一般','一般' '次紧急','一般' '普通报警','一般' '紧急','一般'
datas
= select_data(sql_select)
#print(len(datas))
'''调用dll'''
dll
= ctypes.cdll.LoadLibrary(
'G:/python/mydll/EcgMonitorRealAnalysis.dll')
'''将获得的多个元组解码放入数据库中,进行数据插入操作'''
#i = 0
for row_data
in datas
:
#每行的序号
code_servicecode
= row_data[
0]
data
= row_data[
1]
#解析64数据
missing_data
= 4- len(data)
%4
if missing_data
:
data
+=b'='*missing_data
de_data
=decode_data(data)
print(
len(de_data))
#读取数据
#i = i + 1
# if i == 175:
# print('error')
struct_prop, ecg_lead_s_datas
= read_data(de_data)
if ecg_lead_s_datas
== None or ecg_lead_s_datas[
0]
== None:
continue
# print("============")
# print(i)
c_ecg_lead0_datas
= (ctypes.c_int
* len(ecg_lead_s_datas[
0]))()
c_ecg_lead1_datas
= (ctypes.c_int
* len(ecg_lead_s_datas[
1]))()
c_ecg_lead2_datas
= (ctypes.c_int
* len(ecg_lead_s_datas[
2]))()
c_ecg_lead3_datas
= (ctypes.c_int
* len(ecg_lead_s_datas[
3]))()
c_ecg_lead0_datas[
:]
= ecg_lead_s_datas[
0][
:]
c_ecg_lead1_datas[
:]
= ecg_lead_s_datas[
1][
:]
c_ecg_lead2_datas[
:]
= ecg_lead_s_datas[
2][
:]
c_ecg_lead3_datas[
:]
= ecg_lead_s_datas[
3][
:]
c_lead_data_addr
= (ctypes.c_void_p
*4)()
c_lead_data_addr[
0]
= ctypes.
addressof(c_ecg_lead0_datas)
c_lead_data_addr[
1]
= ctypes.
addressof(c_ecg_lead1_datas)
c_lead_data_addr[
2]
= ctypes.
addressof(c_ecg_lead2_datas)
c_lead_data_addr[
3]
= ctypes.
addressof(c_ecg_lead3_datas)
#调用dll进行分析
#数据的采样频率
Fs
= struct_prop.get_sample_rate()
noiseLeve
= []
typeBeatClass
= []
#4个导联
for ind
in range(
4)
:
noise_level
= ctypes.c_int()
typeBeat
= ctypes.c_int()
noise_adr
= ctypes.
addressof(noise_level)
typeB_adr
= ctypes.
addressof(typeBeat)
numR
= dll.ECG_RealAnalysisFile(c_lead_data_addr,
len(c_ecg_lead0_datas),ind,Fs,noise_adr,typeB_adr)
noiseLeve.append(noise_level)
typeBeatClass.append(typeBeat)
# print(noiseLeve)
# print(typeBeatClass)
#将这条数据的id,noise_level typeBeatClass写进数据库中
noiseLeves
= str(noiseLeve)
typeBeatClasss
= str(typeBeatClass)
sql_insert
= 'insert into ECG_RealAnalysisFile_compare(c_servicecode,noiseLeve,typeBeatClass) values(%s,%s,%s)'
insert_database(sql_insert,code_servicecode,noiseLeves,typeBeatClasss)