from sqlalchemy import create_engine, and_ from sqlalchemy.orm import sessionmaker, scoped_session import sys import os import numpy as np import argparse import threading import pandas as pd import re import threading sys.path.append(os.path.dirname(os.path.abspath(__file__)) + '/../') from data.domain import * from cls_utils.sitk_utils import CTSeries from cls_utils.data_utils import crop_ct_data, get_crop_data_padding, get_crop_data_2d from cls_utils.utils import hu_value_to_uint8, normalize, base64_to_list from cls_utils.data import save_supplement_data_csv, save_data_to_npy, load_npy_to_data, create_cls_train_csv, \ load_all_dicom_file, load_json, create_cls_train_all_csv, create_cls_train_csv_3d, \ replace_label_ids, add_label_ids, create_cls_train_last_3d MYSQL_SERVER = 'mysql+pymysql://lung:lung1qaz2wsx@127.0.0.1:3306/ct_file?charset=utf8' # cfg = load_json("/home/lung/ai-project/cls_train/config/train.json") # cfg = load_json("/df_lung/ai-project/cls_train/config/train.json") cfg = load_json("/df_lung/ai-project/cls_train/config/train_20241112.json") """ 连接数据库,返回一个session """ def conect_mysql(): engine = create_engine(MYSQL_SERVER, pool_recycle=3600) #onnection = engine.connect() db_session = sessionmaker(bind=engine) session = scoped_session(db_session) return session def select_series(session, select_node_time=0, start_label_id=None): """ Describe: 在打标签的数据中根据对应的series_instance_uid查找该文件对应的文件夹名 Returns: folder_name: 当前标签数据所在dicom文件对应的文件夹 node_time: 当前标签数据对应的label box_info: 标签数据的平面坐标信息 select_box: shape=(n, 3, 2) 数据存储为[[z_min, z_max], [y_min, y_max], [x_min, x_max]] """ #通过内连接,直接将两个表连接起来进行数据查询 #如果select_node_time=0,表示要查找的是数据库中所有打了类别标签的数据,否则就是查找指定node_time == select_node_time的数据 if select_node_time == 0: userlabel_and_dicomseries = session.query(UserLabel, DicomSeries).filter( and_(UserLabel.node_time != None, UserLabel.deleted_time == None, UserLabel.node_time != select_node_time, UserLabel.series_id == DicomSeries.id)) else: userlabel_and_dicomseries = session.query(UserLabel, DicomSeries).filter( and_(UserLabel.node_time != None, UserLabel.deleted_time == None, UserLabel.node_time == select_node_time, UserLabel.series_id == DicomSeries.id)) userlabel_and_dicomseries_list = list(map(lambda x: x, userlabel_and_dicomseries)) label_ids = [] box_infos = [] z_index_ranges = [] node_times = [] folder_names = [] select_boxs = [] patient_ids = [] series_instance_uids = [] for userlabel_dicomseries in userlabel_and_dicomseries_list: label_ids.append(userlabel_dicomseries[0].id) box_infos.append(list(map(lambda x: int(x), userlabel_dicomseries[0].box_info.strip('[]').split(',')))) z_index_ranges.append(list(map(lambda x: int(x.split(':')[0]), userlabel_dicomseries[0].area.strip('{}').split(',')))) node_times.append(userlabel_dicomseries[0].node_time) folder_names.append(userlabel_dicomseries[1].folder_name) patient_ids.append(userlabel_dicomseries[1].patient_id) series_instance_uids.append(userlabel_dicomseries[1].series_instance_uid) select_boxs.append([[min(z_index_ranges[-1]), max(z_index_ranges[-1])], [box_infos[-1][2], box_infos[-1][3]], [box_infos[-1][0], box_infos[-1][1]]]) select_boxs = np.array(select_boxs, np.float32) return folder_names, node_times, select_boxs, box_infos, label_ids, patient_ids, series_instance_uids #在user_label_delineation表中查询指定label_id的所有数据 def select_all_contours_by_labelId(session, label_id): delineations = session.query(UserLabelDelineation).filter( and_(UserLabelDelineation.label_id == label_id,)).order_by(UserLabelDelineation.z_index) contours = list(map(lambda x: x.contour, delineations)) return contours #从数据库中获取指定node_time的全部数据,并获取其对应的全部label_id def select_series_by_node_time(node_time=0): session = conect_mysql() folder_names, _, select_boxs, _, label_ids, patient_ids, series_instance_uids= select_series(session, node_time) return folder_names, select_boxs, label_ids, patient_ids, series_instance_uids #从数据库中只读取一个数据,主要是为了进行预测 def select_signal_series(label_id): session = conect_mysql() userlabel_and_dicomseries = session.query(UserLabel, DicomSeries).filter( and_(UserLabel.id == label_id, UserLabel.series_id == DicomSeries.id)) userlabel_and_dicomseries = list(map(lambda x: x, userlabel_and_dicomseries)) box_infos = list(map(lambda x: int(x), userlabel_and_dicomseries[0][0].box_info.strip('[]').split(','))) z_index_range = list(map(lambda x: int(x.split(':')[0]), userlabel_and_dicomseries[0][0].area.strip('{}').split(','))) z_min = min(z_index_range) z_max = max(z_index_range) folder_name = userlabel_and_dicomseries[0][1].folder_name series_instance_uid = userlabel_and_dicomseries[0][1].series_instance_uid select_box = [[z_min, z_max], [box_infos[2], box_infos[3]], [box_infos[0], box_infos[1]]] patient_id = userlabel_and_dicomseries[0][1].patient_id return folder_name, select_box, patient_id, series_instance_uid def read_series_dicom(dicom_folder=''): ct = CTSeries() ct.load_dicoms(dicom_folder) return ct #处理单个样本数据,如果mode=None,则将节点每个切面都拿出来当作中心面,在上下填充背景值,否则就直接从原始数据中直接扣取数据 #is_2d=True则只将当前切面数据进行剪切 def process_single_ct(ct_data=None, select_box=None, node_time=None, label_id=None, mode=None, is_2d=False): #dicom_folder = os.path.join(cfg['dicom_folder'], folder_name) #通过folder_name和label_id来判断是否已经存在该文件,如果存在就不进行创建操作 npy_output_path = os.path.join(cfg['train_data_path'], cfg['npy_folder'], 'cls_' + str(node_time), str(label_id)+'.npy') csv_output_path = os.path.join(cfg['train_data_path'], cfg['csv_path'], cfg['subject_all_csv']) if os.path.exists(npy_output_path): print(f"npy_output_path exists: {npy_output_path}") return #ct_data = read_series_dicom(dicom_folder=dicom_folder) #如果采用掩码进行处理则对ct_data数据进行处理 if mode is None: if is_2d: original_data = get_crop_data_2d(data=ct_data, select_box=select_box, crop_size=cfg['train_crop_size_2d']) else: original_data = get_crop_data_padding(ct_data=ct_data, select_box=select_box, crop_size=cfg['train_crop_size']) else: #这里是在select_box的基础上从原始数据中进行补充,如果原始数据中上层和下层都还有可以补充的就直接拿过来,否则才补充-1000层 original_data = crop_ct_data(ct_data=ct_data, select_box=select_box, crop_size=cfg['train_crop_size']) #将数据保存 save_data_to_npy(original_data=original_data, output_file=npy_output_path) content = ['cls_' + str(node_time)+'/'+str(label_id)+'.npy', str(node_time)] #将该信息添加到总的数据集的csv文件内 save_supplement_data_csv(content=content, output_file=csv_output_path) #将数据每一层都拿出来,将其作为中心面,填充成(48, 256, 256)的数据,当seg=True表示使用分割之后的结果,只保留节点,其余像素都设置为背景 #如果mode=None,则表示将该节点分割出来的每一面都当作中心面进行处理 #is_2d=True将每个切面结节所在区域保存为[256, 256]大小 def process_single_ct_all_series(contours=None, folder_name=None, select_box=None, node_time=None, label_id=None, mode=None, seg=False, is_2d=False, threshold=0): z_min = int(select_box[0][0]) z_max = int(select_box[0][1]) profile_id = str(label_id) + '_' print("label_id: ", label_id) dicom_folder = os.path.join(cfg['dicom_folder'], folder_name) ct_data = read_series_dicom(dicom_folder=dicom_folder) if contours is not None: data = ct_data.get_raw_image() img_np = np.zeros((data.shape[0], data.shape[1], data.shape[2])) for i in range(z_max-z_min+1): _, _, img = base64_to_list(contours[i]) img_np[z_min+i] = img data[img_np == 0] = -1000 ct_data.set_raw_image = data #直接将分割之后的图像数据进行结节抽取 if mode is not None: process_single_ct(ct_data=ct_data, select_box=select_box, node_time=node_time, label_id=label_id, mode=mode, is_2d=False) else: #计算出每个结节的两端结节,将其排除调,设定一个阈值进行排除 eliminate_num = int((threshold * (z_max - z_min + 1)) / 2) #将结节所在的每个切面都当作中心面进行处理 for z_index in range(z_min + eliminate_num, z_max - eliminate_num + 1): label_id = profile_id + str(z_index) select_box[0] = [z_index, z_index] print('文件名:', label_id) #处理单个select_box数据 if is_2d: data_2d = data[z_index] process_single_ct(ct_data=data_2d, select_box=select_box, node_time=node_time, label_id=label_id, mode=mode, is_2d=True) else: process_single_ct(ct_data=ct_data, select_box=select_box, node_time=node_time, label_id=label_id, mode=mode) def get_all_contours_by_labelId(label_id=None): session = conect_mysql() contours = select_all_contours_by_labelId(session, label_id=label_id) return contours def process_cts(seg=False, node_time=0, start_label_id=None, mode=None, is_2d=False, threshold=0): """ mode=None表示将将每个切面都当作中心面,其余层都进行背景填充 seg=True表示通过分割掩码只保留结节部分,别的都填充为背景值-1000 is_2d=true则将数据大小为[256, 256] threshold=0表示剔除现有结节层数的比例 """ session = conect_mysql() folder_names, node_times, select_boxs, _, label_ids, patient_ids, series_instance_uids = select_series(session, select_node_time=node_time) for i in range(len(folder_names)): #将指定label_id之后的数据生成数据集 # if label_ids[i] > start_label_id: #if label_ids[i] > start_label_id: if label_ids[i] in [5695, "5695"]: #从数据库中获取指定label_id的掩码信息 contours = select_all_contours_by_labelId(session, label_ids[i]) if seg else None folder_name = str(patient_ids[i])+'-'+str(series_instance_uids[i]) process_single_ct_all_series(contours=contours, folder_name=folder_name, select_box=select_boxs[i], node_time=node_times[i], label_id=label_ids[i], mode='3d', seg=True, is_2d=is_2d, threshold=threshold) #将数据库中获取到数据集,并保存到相应的文件内 def run(): """ 连接数据库,从数据库中获取到所有的要作为数据集的信息, 包含了所有的文件名:folder_name、节点类别:node_time、节点所在的空间坐标:select_boxs、当前节点在数据库中所对应的id:label_id """ session = conect_mysql() folder_names, node_times, select_boxs, box_infos, label_ids = select_series(session) #测试一个数据是否正确 #process_single_ct_all_series(folder_name=folder_names[0], select_box=select_boxs[0], node_time=node_times[0], label_id=label_ids[0]) #将每个切面数据都单独拿出来,然后进行填充生成对应的npy文件 for i in range(len(folder_names)): #将指定label_id之后的数据生成训练集 if label_ids[i] > 517: #从数据库中获取掩码信息 #process_single_ct_all_series(folder_name=folder_names[i], select_box=select_boxs[i], node_time=node_times[i], label_id=label_ids[i]) process_single_ct(folder_name=folder_names[i], select_box=select_boxs[i], node_time=node_times[i], label_id=label_ids[i], mode=1) #依次对每个数据都进行处理,提取数据并保存到相应的文件中 print('完成') #生成训练数据对应的train_csv def creat_train_csv(): """ AAH1: 2011 AIS1: 2021 MIA1: 2031 IAC1: 2041 炎症: 1010 高密度炎症1: 1016 增生: 1020 高密度IAC1: 2046 [AAH1, MIA1, IAC1]: 2_134 [炎症, 高密度炎症1, 增生]: 1_112 """ #[2041, 3001, 4001, 5001, 6001, 7006, 2060, 2061, 2062] # node_times_1 = [[2011, 2021, 2041]] node_times_2 = [[2041]] node_times_1 = [[2041]] csv_path = os.path.join(cfg['train_data_path'], cfg['csv_path']) #pretrain_csv_path = '/home/lung/project/ai-project/cls_train/data/train_data/plus_3d_0818/subject_all_csv/test/cls_1_5001-6001_1/train.csv' for time_1 in node_times_1: for time_2 in node_times_2: #create_cls_train_csv(node_times=[time_1, time_2], node2='', csv_path=csv_path, csv_name=cfg['subject_all_csv'], tabel_id='01_2', node1_end=False, node2_end=False) #create_cls_train_all_csv(node_times=[time_1, time_2], csv_path=csv_path, csv_name=cfg['subject_all_csv'], tabel_id='10_3') # create_cls_train_csv_3d(node_times=[time_1, time_2], csv_path=csv_path, csv_name=cfg['subject_all_csv'], tabel_id='08') create_cls_train_csv_3d(node_times=[time_1, time_2], csv_path=csv_path, csv_name=cfg['subject_all_csv'], tabel_id='08') #create_cls_train_last_3d(node_times=[time_1, time_2], csv_path=csv_path, csv_name=cfg['subject_all_csv'], tabel_id='test', pretrain_csv_path=pretrain_csv_path) #node_times = [[1080], [2048]] #csv_path = os.path.join(cfg['train_data_path'], cfg['csv_path']) #create_cls_train_csv(node_times=node_times, node2='', csv_path=csv_path, csv_name=cfg['subject_all_csv'], tabel_id='06', node1_end=False, node2_end=False) #将一个log文件中出错的label_id找出来 def extract_error_label(log_path, positive=True): with open(log_path, 'r') as f: log_contents = f.readlines() label_ids = [] #检索每一行数据,找到每个结节最后一行的均值结果 for line in log_contents: match = re.search(r"label_id: (.*?), result: \[(.*?)\]\n", line) if match: label_id, result = match.group(1), match.group(2) result = float(result) if positive and result < 0.5: label_ids.append(label_id) elif positive is False and result > 0.5: label_ids.append(label_id) return label_ids def test_read_npy(): #随机读取一个npy文件数据,观察是否其值的大小,最后将其显示出来 #npy_path = 'D:\\vscode\\plus代码\\ct_plus_seg_python\\cls_train\\data\\npy_data\\cls_1010\\379.npy' npy_path = './cls_train/data/npy_data/cls_1010' #遍历当前目录下的所有npy文件,并将其对应的最大值都进行输出 all_dicom_file = load_all_dicom_file(npy_path, prefix='*', postfix='npy') #将全部的npy文件都读进来,然后统计最大值和最小值 for file_path in all_dicom_file: original_data = load_npy_to_data(file_path) print(original_data.shape) #print(original_data[0]) image = hu_value_to_uint8(original_data=original_data) data = normalize(image=image) #print(image) #print(data) print(np.max(original_data), np.max(image), np.max(data)) print(np.min(original_data), np.min(image), np.min(data)) def test_single_dicom(): ''' 测试出错的那个单例 ''' folder_name = 'CT00100632-1.2.840.113704.1.111.2748.1295244861.11' select_box = [[79, 98], [669, 784], [452, 561]] node_time = 2040 label_id = 359 select_box = np.array(select_box, np.float32) process_single_ct(folder_name=folder_name, select_box=select_box, node_time=node_time, label_id=label_id) def test_count_csv(): csv_path = '/home/lung/project/ai-project/cls_train/data/train_data/plus_0815/subject_all_csv/02/cls_1010_2021/cls_1010_2021_1/train.csv' df = pd.read_csv(csv_path, names=['id', 'label']) select_rows = df[df['label'] == 1].drop_duplicates() print(len(select_rows)) if __name__ == "__main__": #通过多线程生成数据 """node_times = [2031] threads = [] for node_time in node_times: thread = threading.Thread(target=process_cts, args=(True, node_time, 427)) #process_cts(seg=True, node_time=node_time, start_label_id=427, is_2d=False) threads.append(thread) for thread in threads: thread.start() for thread in threads: thread.join()""" process_cts(seg=True, node_time=[2041], start_label_id=427, is_2d=False) # creat_train_csv() # test_count_csv() """log_path = '/home/lung/ai-project/cls_train/log/validation/cls_1_2031/20240720/log_validation_1.log' positive = True label_ids = extract_error_label(log_path, positive=positive) print(len(label_ids)) csv_path = os.path.join(cfg['train_data_path'], cfg['csv_path']) # #replace_label_ids(label_ids=label_ids, csv_path=csv_path, tabel_id='test') add_label_ids(label_ids=label_ids, csv_path=csv_path, positive=positive, cls_name='cls_1_2031_02')"""