#!/usr/bin/python3 #coding=utf-8 import argparse import os import time import numpy as np import shutil import sys from tqdm import tqdm import json from python_tree import color_print_dir import random import torch import pandas import lmdb from trans_dicom_2_nii import MyEncoder from quality_control import control_single from anno_json_wrapper import annojson2cls, annolabels2labelidlabelmap, CLS_UNDEFINED_LABEL from k8s_utils import CK8sPathWrapper, CParamFiller, CDatasetWrapper, get_fullfilepaths_according2pattern import glob parser = argparse.ArgumentParser(description='PyTorch DataBowl3 Detector') # ================================== parser.add_argument('--job_data_root', default="/data/job_465", type=str) # ================================== args = parser.parse_args() g_pathWrapper = CK8sPathWrapper(args.job_data_root) config_path = g_pathWrapper.get_input_inputconfig_filepath() if not os.path.isfile(config_path): print(fr'given config file {config_path} not exist !') else: # print(fr'using config file {config_path} to fill args !') args = CParamFiller.fillin_args(config_path, args) # ================================== import logging import sys from utils import constants as constants def main(): global args, best_loss, g_rank, g_world_size, g_logger, g_writer save_path = g_pathWrapper.get_output_preprocess_preprocessfiledetails_dirpath() # preprocess结果目录 print(fr'doing prepare, save path {save_path}') _ensure_path(save_path) fulldicom_path = fr'{g_pathWrapper.get_output_tmp_dirpath()}' _ensure_path(fulldicom_path) fullnii_path = fr'{save_path}/{constants.SUBDIR_NII}' _ensure_path(fullnii_path) ############################################ # ================================== print('using mlops data') data_config_json = g_pathWrapper.get_input_dataset_filepath() # data_config_json = os.path.join(args.job_data_root, 'job_data_preprocess/input/data_set.json') print(fr'target data config {data_config_json}') # 将zip解压缩,dcm保存在tmp中,[uid,dicom,json] output_path = g_pathWrapper.get_output_tmp_dirpath() # with open(data_config_json, 'r') as fp: # data_dict = json.loads(fp.read()) # for dl in tqdm(data_dict[CDatasetWrapper.DATASET_DATALIST]): # if dl[CDatasetWrapper.DATASET_RAWDATATYPE] =='DCM_ZIP': # zipfilepath = dl[CDatasetWrapper.DATASET_RAWDATAURL][0] # zipfile_basename = os.path.splitext(os.path.basename(zipfilepath))[0] # target_zip_path = os.path.join(output_path, zipfile_basename) # if not os.path.exists(target_zip_path): # os.makedirs(target_zip_path) # # print(fr'basename {zipfile_basename}, {zipfilepath} -> {target_zip_path}') # if output_path is None: # raise IOError('output path needed for extracting zip dataset files') # # unzip data # # if not len(os.listdir(target_zip_path)): # print(fr'basename {zipfile_basename}, {zipfilepath} -> {target_zip_path}') # CDatasetWrapper.extract_zipfile(zipfilepath, target_zip_path) raw_dir = glob.glob(os.path.dirname(args.job_data_root)+'/raw_*')[0] print(raw_dir) for zipfilepath in glob.glob(os.path.join(raw_dir, '*.zip')): zipfile_basename = os.path.splitext(os.path.basename(zipfilepath))[0] target_zip_path = os.path.join(output_path, zipfile_basename) if not os.path.exists(target_zip_path): os.makedirs(target_zip_path) # print(fr'basename {zipfile_basename}, {zipfilepath} -> {target_zip_path}') if output_path is None: raise IOError('output path needed for extracting zip dataset files') # unzip data # if not len(os.listdir(target_zip_path)): print(fr'basename {zipfile_basename}, {zipfilepath} -> {target_zip_path}') CDatasetWrapper.extract_zipfile(zipfilepath, target_zip_path) ## 进行质控,不满足指控条件的将进行删除 dcm_root = g_pathWrapper.get_output_tmp_dirpath() dcm_paths = [os.path.join(dcm_root, dcm) for dcm in os.listdir(dcm_root)] for path in dcm_paths: control_res = control_single(path) if control_res != 'SUCCESS': print('Remove: {} for {}'.format(path, control_res)) shutil.rmtree(path) ## 将解压文件转为nii 进行了resample让spaceing为0.5] print('input: ', fulldicom_path, 'save_path: ', save_path) preprocess_LUNGMLOPs(input_path=fulldicom_path, save_path=fullnii_path) _log_msg(fr'done...') # constants.DATA_PATH_DCM = fulldicom_path # constants.DATA_PATH_NII = fullnii_path # constants.DATA_PATH = fr'{g_pathWrapper.get_output_tmp_dirpath()}/{constants.SUBDIR_TRAINVAL}' # ############################### # color_print_dir(constants.DATA_PATH_NII) def preprocess_LUNGMLOPs(input_path, save_path, log_func=None): from preprocess_func import convert_dcm_2_nii if log_func is None: log_func = print # deal with dcms raw_dicts = convert_dcm_2_nii(input_path, save_path, preprocess_workers=0, log_func=log_func) print(raw_dicts.keys()) def _ensure_path(given_path): if not os.path.exists(given_path): os.makedirs(given_path) if __name__ == '__main__': main()