# -*- coding: utf-8 -*- import os import cv2 import numpy as np from scipy import ndimage from skimage import morphology import torch import torch.nn.functional as F edge_enhance_kernel = np.array([[-1, -1, -1], [-1, 8, -1], [-1, -1, -1]]) def check_and_makedirs(output_file, is_file=False): if is_file: output_dir = os.path.dirname(output_file) else: output_dir = output_file if output_dir is not None and output_dir != '' and not os.path.exists(output_dir): os.makedirs(output_dir) def sigmoid(x): # return 1 / (1 + np.exp(-x)) return 0.5 * (1 + np.tanh(0.5 * x)) def get_crop_start_end(start, end, crop_size): center = (np.asarray(start) + np.asarray(end)) // 2 crop_start = center - np.asarray(crop_size) // 2 crop_start = np.maximum(crop_start, 0) return crop_start, crop_start + crop_size def maxip(input, level_num=3): level_num = level_num // 2 output = np.zeros(input.shape, input.dtype) for i in range(len(input)): length = level_num if i >= level_num else i output[i] = np.max(input[i-length:i+level_num+1], axis=0) return output def minip(input, level_num=3): level_num = level_num // 2 output = np.zeros(input.shape, input.dtype) for i in range(len(input)): length = level_num if i >= level_num else i output[i] = np.min(input[i-length:i+level_num+1], axis=0) return output def downsample_data(data, scale, mode='max'): """ 下采样 """ if data is None or scale == (1, 1, 1): return data new_data = torch.from_numpy(data).float().unsqueeze(0).unsqueeze(0) if mode == 'max': new_data = F.max_pool3d(new_data, kernel_size=scale, stride=scale, ceil_mode=True) else: new_data = F.avg_pool3d(new_data, kernel_size=scale, stride=scale, ceil_mode=True) new_data = new_data[0][0].numpy() return new_data def upsample_data(data, scale, mode='nearest'): """ 上采样 The modes available for resizing are: `nearest`, `linear` (3D-only), `bilinear`, `bicubic` (4D-only), `trilinear` (5D-only), `area` """ if data is None or scale == (1, 1, 1): return data new_data = torch.from_numpy(data).float().unsqueeze(0).unsqueeze(0) new_data = F.interpolate(new_data, scale_factor=scale, mode=mode) new_data = new_data[0][0].numpy() return new_data def downsample_mask(mask, scale, prob_thred=0.5): if mask is None or scale == (1, 1, 1): return mask new_mask = downsample_data(mask, scale, mode='avg') new_mask = new_mask > prob_thred new_mask = new_mask.astype(np.uint8) return new_mask def upsample_mask(mask, scale): if mask is None or scale == (1, 1, 1): return mask new_mask = upsample_data(mask, scale) new_mask = new_mask.astype(np.uint8) return new_mask def resample_data(data, spacing, new_spacing, mode='trilinear'): if data is None or np.all(spacing == new_spacing): return data, spacing new_shape = np.round(data.shape * spacing / new_spacing) new_spacing = spacing * data.shape / new_shape scale = new_shape / data.shape new_data = torch.from_numpy(data).float().unsqueeze(0).unsqueeze(0) new_data = F.interpolate(new_data, scale_factor=scale, mode=mode, align_corners=False) new_data = new_data[0][0].numpy() return new_data, new_spacing def resample_mask(mask, size, mode='trilinear', prob_thred=0.5): if mask is None or np.all(mask.shape == size): return mask new_mask = torch.from_numpy(mask).float().unsqueeze(0).unsqueeze(0) new_mask = F.interpolate(new_mask, size=size, mode=mode, align_corners=False) new_mask = new_mask[0][0].numpy() new_mask = new_mask > prob_thred new_mask = new_mask.astype(np.uint8) return new_mask def extend_mask(mask, kernel_size=(1, 7, 7), padding=(0, 3, 3), min_value=0.01, max_value=1.0): new_mask = torch.from_numpy(mask).float().unsqueeze(0).unsqueeze(0) new_mask = F.avg_pool3d(new_mask, kernel_size=kernel_size, padding=padding, stride=1) new_mask = new_mask.ge(min_value) * new_mask.le(max_value) new_mask = new_mask[0][0].numpy() return new_mask def clip_data(data, min_value=-1000, max_value=2000, nan_value=-2000): new_data = np.array(data) new_data[np.isnan(new_data)] = nan_value new_data = np.clip(new_data, min_value, max_value) return new_data def filter_data(data, min_value=-1000, max_value=2000, nan_value=-2000): new_data = np.array(data) new_data[np.isnan(new_data)] = nan_value new_data = new_data[new_data >= min_value] new_data = new_data[new_data <= max_value] return new_data def normalize_cls(data, min_value=-1000, max_value=600): new_data = data.clone() new_data[new_data < min_value] = min_value new_data[new_data > max_value] = max_value # normalize to [-1, 1] new_data = 2.0 * (new_data - min_value) / (max_value - min_value) - 1 return new_data def normalize_quantify_data(quantify_data): quantify_data[:, 0] = normalize_cls(quantify_data[:, 0], 0, 5000) quantify_data[:, 2] = normalize_cls(quantify_data[:, 2], 0, 1600) quantify_data[:, 3] = normalize_cls(quantify_data[:, 3], 0, 10000) for i in range(4, quantify_data.shape[1]): quantify_data[:, i] = normalize_cls(quantify_data[:, i], 0, 1600) return quantify_data def get_quantify_data(data, mask, spacing): nodule_data = filter_data(data[mask == 1]) mask_point_count = len(nodule_data) if mask_point_count > 0: # 一个点的体积 single_volume = spacing[0] * spacing[1] * spacing[2] # 总体积 volume = np.round(mask_point_count * single_volume, 3) # 平均密度, 加1000转换成密度 average_density = np.mean(nodule_data) + 1000 average_density = np.max(np.round(average_density, 3), 0) # 比重 specific_gravity = np.round(1000 * average_density / volume, 3) else: volume = 0 average_density = 0 specific_gravity = 0 return volume, average_density, specific_gravity def get_average_diameter(volume): return np.round(pow((3 * volume / (4 * np.pi)), 1 / 3) * 2, 3) def get_3d_curve_data(data, mask, spacing, curve_step): curve_data = np.zeros(curve_step, np.float32) if np.sum(mask == 1) > 0: # 一个点的体积 single_volume = spacing[0] * spacing[1] * spacing[2] # 加1000转换成密度 nodule_data = data[mask == 1] + 1000 nodule_data = filter_data(nodule_data, min_value=0, max_value=curve_step - 1) nodule_data = np.int0(nodule_data) values, counts = np.unique(nodule_data, return_counts=True) volumes = counts * single_volume for value, volume in zip(values, volumes): curve_data[value] = volume return curve_data def get_3d_volume_percentage(data, mask, max_value=600): volume_percentage_list = [] if np.sum(mask == 1) > 0: # 加1000转换成密度 nodule_data = data[mask == 1] + 1000 nodule_data = filter_data(nodule_data, min_value=0, max_value=max_value + 1000 - 1) nodule_data = np.sort(nodule_data) total_length = len(nodule_data) for i in range(10, 4, -1): length = max(int(0.1 * i * total_length) - 1, 0) current_hu = nodule_data[length] current_data = nodule_data[nodule_data <= current_hu] volume_percentage_list.append(current_hu) volume_percentage_list.append(np.round(np.mean(current_data), 3)) volume_percentage_list.append(np.round(current_data.std(), 3)) else: for i in range(10, 4, -1): volume_percentage_list.append(0) volume_percentage_list.append(0) volume_percentage_list.append(0) return volume_percentage_list def transform_input_data(data, n_channels): if n_channels == 1: data = data[np.newaxis] elif n_channels == 4: data_edge_enhance = np.zeros(data.shape, np.float32) for z in range(data.shape[0]): data_edge_enhance[z] = ndimage.convolve(data[z], edge_enhance_kernel, mode='nearest') data1 = data data2 = data # 边缘增强 data3 = data + 0.5 * data_edge_enhance # 边缘增强 data4 = data + 1.0 * data_edge_enhance # 最大密度投影 # data5 = maxip(data) # 最小密度投影 # data6 = minip(data) data = np.array([data1, data2, data3, data4]) return data def get_crop_data(data, crop_start, crop_size, mode='constant', constant_values=-1000): """ 获取切割块数据 """ if data is None: return data data_shape = np.array(data.shape) crop_data = data[ max(crop_start[0], 0):min(crop_start[0] + crop_size[0], data_shape[0]), max(crop_start[1], 0):min(crop_start[1] + crop_size[1], data_shape[1]), max(crop_start[2], 0):min(crop_start[2] + crop_size[2], data_shape[2])] pad = np.zeros((3, 2), np.int) pad[:, 0] = np.maximum(-crop_start, 0) pad[:, 1] = np.maximum(crop_start + crop_size - data_shape, 0) if not np.all(pad == 0): if mode == 'edge': crop_data = np.pad(crop_data, pad, mode=mode) else: crop_data = np.pad(crop_data, pad, mode=mode, constant_values=constant_values) return crop_data def get_anchor_coords(target_size, stride): """ 获取anchor在z, y, x轴上的中心点坐标 """ offset = 0.5 z_center_anchors = np.arange(offset, offset + target_size[0]) * stride[0] y_center_anchors = np.arange(offset, offset + target_size[1]) * stride[1] x_center_anchors = np.arange(offset, offset + target_size[2]) * stride[2] return z_center_anchors, y_center_anchors, x_center_anchors def convex_hull_3d(mask): for z in range(len(mask)): if np.all(mask[z] == 0): continue mask[z] = convex_hull_2d(mask[z]) return mask def convex_hull_2d(mask): mask = morphology.convex_hull_object(mask, neighbors=8) mask = mask.astype(np.uint8) return mask def continuous_2d(input_data): output_data = input_data.copy() for j in range(len(output_data)): if j % 2 == 1: output_data[j] = output_data[j][::-1] return output_data def var_2d(input_data, kernel_size=3, min_value=-100): output_data = np.zeros(input_data.shape, np.float32) output_data[...] = np.inf kernel_size = kernel_size // 2 for i in range(kernel_size, output_data.shape[0] - kernel_size): kernel_i = kernel_size if i >= kernel_size else i for j in range(kernel_size, output_data.shape[1] - kernel_size): if input_data[i, j] < min_value: continue kernel_j = kernel_size if j >= kernel_size else j region_data = input_data[i - kernel_i:i + kernel_size + 1, j - kernel_j:j + kernel_size + 1] output_data[i, j] = region_data.var() return output_data def std_2d(input_data, kernel_size=3, min_value=-100): output_data = np.zeros(input_data.shape, np.float32) output_data[...] = np.inf kernel_size = kernel_size // 2 for i in range(kernel_size, output_data.shape[0] - kernel_size): kernel_i = kernel_size if i >= kernel_size else i for j in range(kernel_size, output_data.shape[1] - kernel_size): if input_data[i, j] < min_value: continue kernel_j = kernel_size if j >= kernel_size else j region_data = input_data[i - kernel_i:i + kernel_size + 1, j - kernel_j:j + kernel_size + 1] output_data[i, j] = region_data.std() return output_data def get_diameter_pixel_length(diameter, spacing): return np.int0(np.ceil(diameter / spacing)) def get_center_and_diameter(nodule_mask, spacing): """ 根据mask获取结节中心点坐标与直径 """ # 中心点坐标 center = np.zeros(3, np.int) # max_average_width_height = 0 # for z, mask in enumerate(nodule_mask): # if np.sum(mask == 1) == 0: # continue # _, contours, _ = cv2.findContours(mask, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_NONE) # contours = sorted(contours, key=cv2.contourArea, reverse=True) # # 最小外接矩形 # rect_min = cv2.minAreaRect(contours[0]) # ((_, _), (width, height), angle) = rect_min # # 最大面层对应的z坐标 # if max_average_width_height < (width + height) / 2: # max_average_width_height = (width + height) / 2 # center[0] = z # 每层mask点数 z_point_count = np.sum(nodule_mask == 1, axis=(1, 2)) # 最大面层 center[0] = np.argmax(z_point_count) coords = np.asarray(np.where(nodule_mask[center[0]] == 1)) coord_start = coords.min(axis=1) coord_end = coords.max(axis=1) + 1 # 最大面层的中心点坐标 center[1:3] = (coord_start + coord_end) // 2 # 直径,仅比较y,x轴 diameter = np.round(np.max((coord_end - coord_start) * spacing[1:3]), 5) return center, diameter def get_nodule_rect(nodule_box, spacing): diameter_pixel_length = get_diameter_pixel_length(nodule_box.diameter, spacing) center = np.array([nodule_box.z, nodule_box.y, nodule_box.x]) rect = np.zeros((3, 2), np.int) rect[:, 0] = center - diameter_pixel_length // 2 rect[:, 1] = rect[:, 0] + diameter_pixel_length return rect def save_pred_npy(npy_path, file_prefix, pred): check_and_makedirs(npy_path) np.save(os.path.join(npy_path, file_prefix + '_pred.npy'), pred.astype(np.uint8)) def load_pred_npy(npy_path, file_prefix): pred = np.load(os.path.join(npy_path, file_prefix + '_pred.npy')) return pred def save_mask_npy(npy_path, file_prefix, mask): check_and_makedirs(npy_path) np.save(os.path.join(npy_path, file_prefix + '_mask.npy'), mask.astype(np.uint8)) def load_mask_npy(npy_path, file_prefix): mask = np.load(os.path.join(npy_path, file_prefix + '_mask.npy')) return mask def print_data_mask(data, mask, filename=None, group=False): coords = np.asarray(np.where(mask > 0)) if len(coords[0]) > 0: coord_start = coords.min(axis=1) coord_end = coords.max(axis=1) + 1 print_mask = mask[coord_start[0]:coord_end[0], coord_start[1]:coord_end[1]] print_data = data[coord_start[0]:coord_end[0], coord_start[1]:coord_end[1]] if filename is not None: with open(filename, 'w') as f: for temp_data in print_data: f.write(np.array2string(temp_data).replace('\n', '') + '\n') for temp_mask in print_mask: f.write(np.array2string(temp_mask).replace('\n', '') + '\n') if group: for temp_data, temp_mask in zip(print_data, print_mask): temp_group = np.array([str(i) + '(' + str(j) + ')' for i, j in zip(temp_data, temp_mask)]) f.write(np.array2string(temp_group).replace('\n', '').replace('\'', '') + '\n') else: for temp_data in print_data: print(np.array2string(temp_data).replace('\n', '') + '\n') for temp_mask in print_mask: print(np.array2string(temp_mask).replace('\n', '') + '\n') if group: for temp_data, temp_mask in zip(print_data, print_mask): temp_group = np.array([str(i) + '(' + str(j) + ')' for i, j in zip(temp_data, temp_mask)]) print(np.array2string(temp_group).replace('\n', '').replace('\'', '') + '\n')