import os import sys import json import numpy as np from datetime import datetime import torch import torch.distributed import torch.multiprocessing as mp import torch.distributed as dist import logging import argparse from Segmentation import Segmentation3D sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) # sys.path.insert(0, os.path.join(os.path.dirname(os.path.dirname(os.path.abspath(__file__))), "..")) __package__ = "LungNoduleSegmentation" from k8s_utils import CParamFiller, CK8sPathWrapper os.environ["CUDA_VISIBLE_DEVICES"] = "0,1,2,3,4,5,6,7" def init_dist(backend='nccl', master_ip='127.0.0.1', port=29500): if mp.get_start_method(allow_none=True) is None: mp.set_start_method('spawn') os.environ['MASTER_ADDR'] = master_ip os.environ['MASTER_PORT'] = str(port) rank = int(os.environ['RANK']) world_size = int(os.environ['WORLD_SIZE']) num_gpus = torch.cuda.device_count() local_rank = os.environ['LOCAL_RANK'] deviceid = eval(local_rank) % num_gpus torch.cuda.set_device(deviceid) print(fr'dist settings: local_rank {local_rank}, rank {rank}, worldsize {world_size}, gpus {num_gpus}, deviceid {deviceid}') dist.init_process_group(backend=backend) return rank, world_size def get_logger(name, task_name=None): file_handler = logging.StreamHandler(sys.stdout) logger = logging.getLogger(name) logger.setLevel(logging.INFO) logger.addHandler(file_handler) return logger def _log_msg(strmsg="\n"): if torch.distributed.get_rank() == 0: if g_logger is not None: g_logger.info(strmsg) else: print(strmsg) if __name__ == '__main__': parser = argparse.ArgumentParser(description="full functional execute script of Segmentation module.") group = parser.add_mutually_exclusive_group() # ddp parser.add_argument("--local_rank", default=-1, type=int) parser.add_argument('--port', default=29500, type=int, help='port of server') parser.add_argument('--world-size', default=2, type=int) parser.add_argument('--rank', default=0, type=int) parser.add_argument('--master_ip', default='127.0.0.1', type=str) parser.add_argument('--job_data_root', default="/root/Documents/GroupLung/Datasets/seg_data/job_data_seg_train", type=str) args = parser.parse_args() g_pathWrapper = CK8sPathWrapper(args.job_data_root) config_path = g_pathWrapper.get_input_inputconfig_filepath() if not os.path.isfile(config_path): print(fr'given config file {config_path} not exist !') else: # print(fr'using config file {config_path} to fill args !') cfg = CParamFiller.fillin_args(config_path, args) g_logger = get_logger(__name__) rank, world_size = init_dist(backend='nccl', master_ip=args.master_ip, port=args.port) cfg.rank = rank cfg.world_size = world_size cfg.local_rank = os.environ['LOCAL_RANK'] if cfg.mode == 'training': cfg['train_data_path'] = os.path.join(g_pathWrapper.get_output_preprocess_preprocessfiledetails_dirpath(), 'train_data.npy') cfg['train_info_path'] = os.path.join(g_pathWrapper.get_output_preprocess_preprocessfiledetails_dirpath(), 'train_info.npy') cfg['train_mask_path'] = os.path.join(g_pathWrapper.get_output_preprocess_preprocessfiledetails_dirpath(), 'train_mask.npy') cfg['val_data_path'] = os.path.join(g_pathWrapper.get_output_preprocess_preprocessfiledetails_dirpath(), 'val_data.npy') cfg['val_info_path'] = os.path.join(g_pathWrapper.get_output_preprocess_preprocessfiledetails_dirpath(), 'val_info.npy') cfg['val_mask_path'] = os.path.join(g_pathWrapper.get_output_preprocess_preprocessfiledetails_dirpath(), 'val_mask.npy') if (not os.path.exists(cfg.training['train_data_path'])) and cfg.rank == 0: # 读取所有的npy信息,并划分训练集和测试集 npy_path = g_pathWrapper.get_output_preprocess_npy_path() data2info = {} uids = [] for file in os.listdir(npy_path): uid = file[:-9] if uid not in uids: uids.append(uid) whole_data = [] whole_info = [] whole_mask = [] for uid in uids: data_path = os.path.join(npy_path, uid+'_data.npy') info_path = os.path.join(npy_path, uid+'_info.npy') mask_path = os.path.join(npy_path, uid+'_mask.npy') whole_data.append(np.load(data_path)) whole_info.append(np.load(info_path, allow_pickle=True)) whole_mask.append(np.load(mask_path)) whole_data = np.concatenate(whole_data, axis=0) whole_info = np.concatenate(whole_info, axis=0) whole_mask = np.concatenate(whole_mask, axis=0) # get train and test npy and info, mask train_data = [] val_data = [] train_info = [] val_info = [] train_mask = [] val_mask = [] indices = range(whole_data.shape[0]) train_idxs = list(np.random.choice(indices, int(whole_data.shape[0] * 0.8), replace=False)) val_idxs = [idx for idx in indices if idx not in train_idxs] for idx in train_idxs: train_data.append(whole_data[idx][np.newaxis,:]) train_info.append(whole_info[idx].reshape(-1, len(whole_info[idx]))) train_mask.append(whole_mask[idx][np.newaxis,:]) for idx in val_idxs: val_data.append(whole_data[idx][np.newaxis,:]) val_info.append(whole_info[idx].reshape(-1, len(whole_info[idx]))) val_mask.append(whole_mask[idx][np.newaxis,:]) train_data = np.concatenate(train_data, axis=0) train_info = np.concatenate(train_info, axis=0) train_mask = np.concatenate(train_mask, axis=0) val_data = np.concatenate(val_data, axis=0) val_info = np.concatenate(val_info, axis=0) val_mask = np.concatenate(val_mask, axis=0) np.save(cfg['train_data_path'], train_data) np.save(cfg['train_info_path'], train_info) np.save(cfg['train_mask_path'], train_mask) np.save(cfg['val_data_path'], val_data) np.save(cfg['val_info_path'], val_info) np.save(cfg['val_mask_path'], val_mask) # torch.distributed.barrier() # 设置输出的文件路径 cfg['base_path'] = g_pathWrapper.get_output_train_dirpath() cfg['save_path'] = g_pathWrapper.get_output_train_bestmodel_dirpath() cfg['writer_path'] = g_pathWrapper.get_output_train_writer_path() cfg['eval_pmd'] = g_pathWrapper.get_output_eval_performance_md_filepath() cfg['eval_pjson'] = g_pathWrapper.get_output_eval_performance_filepath() cfg['train_metrics'] = g_pathWrapper.get_output_train_trainingmetrics_filepath() cfg['train_result'] = g_pathWrapper.get_output_train_trainresult_filepath() # 开始训练, mode choice in [training, testing] train_obj = Segmentation3D(cfg) train_obj.Train() elif cfg.mode == 'testing': cfg['test_data_path'] = os.path.join(g_pathWrapper.get_output_preprocess_preprocessfiledetails_dirpath(), 'val_data.npy') cfg['test_info_path'] = os.path.join(g_pathWrapper.get_output_preprocess_preprocessfiledetails_dirpath(), 'val_info.npy') cfg['test_mask_path'] = os.path.join(g_pathWrapper.get_output_preprocess_preprocessfiledetails_dirpath(), 'val_mask.npy') # 设置输出的文件路径 cfg['eval_pmd'] = g_pathWrapper.get_output_eval_performance_md_filepath() cfg['eval_pjson'] = g_pathWrapper.get_output_eval_performance_filepath() cfg['eval_result'] = g_pathWrapper.get_output_eval_evalresult_filepath() # 开始训练, mode choice in [training, testing] train_obj = Segmentation3D(cfg) train_obj.test()