# -*- coding: utf-8 -*- import os, sys import pathlib current_dir = pathlib.Path(__file__).parent.resolve() while "cls_train" != current_dir.name: current_dir = current_dir.parent sys.path.append(current_dir.as_posix()) import numpy as np import logging import time import traceback import itertools import mahotas import cv2 from scipy import ndimage from skimage import measure from skimage import morphology from multiprocessing.pool import Pool from data.data_process_utils.test_data_utils import get_crop_start_end from data.data_process_utils.test_data_utils import filter_data from data.data_process_utils.test_image_vis import plot_shrink_extend_mask def get_region_data(data, point, kernel_r): region_data = data[point[0] - kernel_r:point[0] + kernel_r + 1, point[1] - kernel_r:point[1] + kernel_r + 1] return region_data def get_contour_points(contour): contour = contour.copy() contour = contour.reshape((-1, 2)) contour = np.flip(contour, axis=1) return contour def get_around_points(mask, point, increase_points, select_value=-1): around_points = point + increase_points around_points[:, 0] = np.clip(around_points[:, 0], 0, mask.shape[0] - 1) around_points[:, 1] = np.clip(around_points[:, 1], 0, mask.shape[1] - 1) return select_around_points(mask, around_points, select_value) def batch_get_around_points(mask, contour, increase_points, select_value=-1): contour = get_contour_points(contour) contour_point = contour.reshape((-1, 1, 2)) contour_point = contour_point + increase_points around_points = contour_point.reshape((-1, 2)) around_points[:, 0] = np.clip(around_points[:, 0], 0, mask.shape[0] - 1) around_points[:, 1] = np.clip(around_points[:, 1], 0, mask.shape[1] - 1) return select_around_points(mask, around_points, select_value) def select_around_points(mask, around_points, select_value=-1): if select_value >= 0: around_mask = mask[around_points[:, 0], around_points[:, 1]] # 只选择mask为select_value值的点 return around_points[around_mask == select_value] return around_points def calculate_increase_points(length, include_center_point=True): length = int(length) increase_length = max(length * 2 + 1, 3) increase_points = list(itertools.product(range(increase_length), repeat=2)) if not include_center_point: increase_points = [point for point in increase_points if point != (length, length)] increase_points = np.array(increase_points) - length return increase_points # 密度允许的误差范围 density_allow_error_range = 30 density_reject_error_range = 80 increase_points2 = calculate_increase_points(2) increase_points1 = calculate_increase_points(1) kernel3 = np.array([[1, 1, 1], [1, 1, 1], [1, 1, 1]], np.uint8) kernel5 = np.array([[0, 0, 1, 0, 0], [0, 1, 1, 1, 0], [1, 1, 1, 1, 1], [0, 1, 1, 1, 0], [0, 0, 1, 0, 0]], np.uint8) kernel7 = np.array([[0, 0, 0, 1, 0, 0, 0], [0, 1, 1, 1, 1, 1, 0], [0, 1, 1, 1, 1, 1, 0], [1, 1, 1, 1, 1, 1, 1], [0, 1, 1, 1, 1, 1, 0], [0, 1, 1, 1, 1, 1, 0], [0, 0, 0, 1, 0, 0, 0]], np.uint8) def get_diameter_pixel(diameter, spacing, default_pixel=3): """ 计算某直径对应的像数点数 """ return max(np.round(diameter / spacing[1], 2), default_pixel) def batch_shrink_extend_edge(data, spacing, uid, nodule_mask_list, amend_head_tail=False, gen_image=False, image_path=''): """ 批量修正边缘 """ for nodule_index, one_nodule_mask in enumerate(nodule_mask_list): start_time = time.time() ret, result_mask, extend_mask, shrink_mask = \ shrink_extend_edge(data, one_nodule_mask, spacing, amend_head_tail=amend_head_tail, file_prefix=uid + '_' + str(nodule_index), log=True) print(uid + '_' + str(nodule_index), 'amend edge run time {:.2f}(s)'.format(time.time() - start_time)) if gen_image: file_path = os.path.join(image_path, uid + '_' + str(nodule_index) + '_result.png') plot_shrink_extend_mask(data, one_nodule_mask, result_mask, extend_mask, shrink_mask, spacing, file_path) nodule_mask_list[nodule_index] = result_mask def shrink_extend_edge(data, mask, spacing, amend_head_tail=True, shrink_extend_mask=True, min_pixel=5, file_prefix='', log=False): """ 修正边缘 """ try: return _shrink_extend_edge(data, mask, spacing, amend_head_tail, shrink_extend_mask, min_pixel, file_prefix, log) except Exception as e: logging.error('shrink_extend_edge error!!!!!!', e) traceback.print_exc() return False, mask, mask, mask def _shrink_extend_edge(data, mask, spacing, amend_head_tail, shrink_extend_mask, min_pixel, file_prefix, log): """ 修正边缘 """ trachea_pixel = get_diameter_pixel(diameter=2.0, spacing=spacing) amend_pixel = get_diameter_pixel(diameter=2.0, spacing=spacing) amend_pixel = min(int(amend_pixel), 5) print(file_prefix, 'amend_pixel =', amend_pixel, 'trachea_pixel =', trachea_pixel) min_pixel = max(0, min_pixel, 3 * trachea_pixel) # 获取mask区块 coords = np.asarray(np.where(mask == 1)) if len(coords[0]) <= min_pixel: return False, mask, mask, mask coord_start = coords.min(axis=1) coord_end = coords.max(axis=1) + 1 z_num = coord_end[0] - coord_start[0] + 4 y_num = coord_end[1] - coord_start[1] + 5 * amend_pixel x_num = coord_end[2] - coord_start[2] + 5 * amend_pixel crop_start, crop_end = get_crop_start_end(coord_start, coord_end, (z_num, y_num, x_num)) # 使计算量减少,只取包括mask的区块 new_data = data[crop_start[0]:crop_end[0], crop_start[1]:crop_end[1], crop_start[2]:crop_end[2]] new_mask = mask[crop_start[0]:crop_end[0], crop_start[1]:crop_end[1], crop_start[2]:crop_end[2]].copy() # 每层mask点数 z_point_count = np.sum(new_mask == 1, axis=(1, 2)) # 最大面层 z_max = np.argmax(z_point_count) # 层数 z_length = len(new_mask) # 最前面层 z_start = coords.min(axis=1)[0] # 最后面层 z_end = coords.max(axis=1)[0] # 最大面层到最前面层 front_z_list = list(range(z_max, z_start - crop_start[0] - 1, -1)) # 最大面层下一层到最后面层 behind_z_list = list(range(z_max + 1, z_end - crop_start[0] + 1, 1)) # 解决少层问题,目前只新增并修正前后各一层 # 新增的前面层 add_front_z_list = [] # 新增的后面层 add_behind_z_list = [] # 计算平均密度、最小密度、最大密度 average_density, min_density, max_density = \ calculate_edge_hu_threshold(new_data, new_mask, True, file_prefix=file_prefix, log=log) if min_density < -900: return False, mask, mask, mask # 小于最小密度点的mask置为0 new_mask[new_data < min_density - density_reject_error_range] = 0 # 从最大面层到最前面层,检查层是否存在异常 for z in front_z_list: if np.all(new_mask[z] == 0): continue # 检查层是否存在异常 if z != z_max and 0 <= z + 1 < z_length: current_point_num = z_point_count[z] next_point_num = z_point_count[z + 1] # 检查最前面层 if z == z_start - crop_start[0]: if current_point_num <= min_pixel or \ (current_point_num <= 2 * min_pixel and 5 * current_point_num < next_point_num): new_mask[z] = 0 add_front_z_list = [z] elif 0 <= z - 1 < z_length: add_front_z_list = [z - 1] break elif current_point_num <= min_pixel: for temp_z in range(z, -1, -1): new_mask[temp_z] = 0 break elif 8 * current_point_num < next_point_num: # 不足属于异常 new_mask[z] = get_maybe_mask_2d(new_mask[z], new_mask[z + 1], trachea_pixel) # 从最大面层下一层到最后面层,检查层是否存在异常 for z in behind_z_list: if np.all(new_mask[z] == 0): continue # 检查层是否存在异常 if z != z_max and 0 <= z - 1 < z_length: current_point_num = z_point_count[z] prev_point_num = z_point_count[z - 1] # 检查最后面层 if z == z_end - crop_start[0]: if current_point_num <= min_pixel or \ (current_point_num <= 2 * min_pixel and 5 * current_point_num < prev_point_num): new_mask[z] = 0 add_behind_z_list = [z] elif 0 <= z + 1 < z_length: add_behind_z_list = [z + 1] break elif current_point_num <= min_pixel: for temp_z in range(z, z_length, 1): new_mask[temp_z] = 0 break elif 8 * current_point_num < prev_point_num: # 不足属于异常 new_mask[z] = get_maybe_mask_2d(new_mask[z], new_mask[z - 1], trachea_pixel) ref_mask = np.max(new_mask, axis=0) ret, ref_mask = get_erode_mask_2d(ref_mask, trachea_pixel) if 'linux' in sys.platform: new_mask, shrink_mask = shrink_extend_edge_2d_pool(new_data, new_mask, ref_mask, average_density, min_density, max_density, crop_start, z_length, front_z_list, behind_z_list, amend_pixel, trachea_pixel, min_pixel) else: shrink_mask = np.zeros(new_mask.shape, np.uint8) new_mask, shrink_mask = shrink_extend_edge_2d(new_data, new_mask, shrink_mask, ref_mask, average_density, min_density, max_density, crop_start, z_length, front_z_list, behind_z_list, amend_pixel, trachea_pixel, min_pixel) # 解决少层问题 if amend_head_tail and (add_front_z_list or add_behind_z_list): for z in add_front_z_list: # 用后面层的mask替换当前层的mask new_mask[z] = get_maybe_mask_2d(new_mask[z], new_mask[z + 1], trachea_pixel) # 小于最小密度点的mask置为0 new_mask[z][new_data[z] < min_density - density_allow_error_range] = 0 logging.info('{}, {}, add_front_z_list {}' .format(time.strftime('%Y-%m-%d %H:%M:%S'), file_prefix, add_front_z_list + crop_start[0])) for z in add_behind_z_list: # 用前面层的mask替换当前层的mask new_mask[z] = get_maybe_mask_2d(new_mask[z], new_mask[z - 1], trachea_pixel) # 小于最小密度点的mask置为0 new_mask[z][new_data[z] < min_density - density_allow_error_range] = 0 logging.info('{}, {}, add_behind_z_list {}' .format(time.strftime('%Y-%m-%d %H:%M:%S'), file_prefix, add_behind_z_list + crop_start[0])) new_mask, shrink_mask = shrink_extend_edge_2d(new_data, new_mask, shrink_mask, ref_mask, average_density, min_density, max_density, crop_start, z_length, add_front_z_list, add_behind_z_list, amend_pixel, trachea_pixel, min_pixel, True) # 空洞问题 # 原mask original_mask = mask[crop_start[0]:crop_end[0], crop_start[1]:crop_end[1], crop_start[2]:crop_end[2]] # 在shrink_mask或原mask中没有选中 condition = np.logical_or(shrink_mask == 0, original_mask == 0) # 在new_mask中选中 condition = np.logical_and(new_mask == 1, condition) # 密度小于最小密度 condition = np.logical_and(new_data < min_density, condition) label = measure.label(condition, connectivity=1) props = measure.regionprops(label) no_select_label = [prop.label for prop in props if prop.area > trachea_pixel * trachea_pixel] if len(no_select_label) > 0: no_select_condition = np.logical_and(np.isin(label, no_select_label), condition) new_mask[no_select_condition] = 0 # 删除独立的小区块 label = measure.label(new_mask, connectivity=1) props = measure.regionprops(label) if len(props) > 1: props = sorted(props, key=lambda x: x.area, reverse=True) max_area = props[0].area no_select_label = [prop.label for prop in props if prop.area < 0.1 * max_area] if len(no_select_label) > 0: no_select_condition = np.isin(label, no_select_label) new_mask[no_select_condition] = 0 # 删除异常层 remove_small_point_layer_3d(new_data, new_mask, amend_head_tail, add_front_z_list, add_behind_z_list, min_pixel) new_mask_result = np.zeros(mask.shape, np.uint8) new_mask_result[crop_start[0]:crop_end[0], crop_start[1]:crop_end[1], crop_start[2]:crop_end[2]] = new_mask if not shrink_extend_mask: return True, new_mask_result, new_mask_result, new_mask_result shrink_mask_result = np.zeros(mask.shape, np.uint8) shrink_mask_result[crop_start[0]:crop_end[0], crop_start[1]:crop_end[1], crop_start[2]:crop_end[2]] = shrink_mask return True, new_mask_result, new_mask_result, shrink_mask_result def shrink_extend_edge_2d_pool(new_data, new_mask, ref_mask, average_density, min_density, max_density, crop_start, z_length, front_z_list, behind_z_list, amend_pixel, trachea_pixel, min_pixel): def callback(result): (z, mask) = result new_mask[z] = mask pool_size = min(len(front_z_list) + len(behind_z_list), 5) pool = Pool(pool_size) # 从最大面层到最前面层,收缩不为结节的点 for z in front_z_list: original_z = z + crop_start[0] pool.apply_async(batch_shrink_edge_2d, args=(new_data[z], new_mask[z], original_z, z, average_density, min_density, max_density, amend_pixel, trachea_pixel), callback=callback) # 从最大面层下一层到最后面层,收缩不为结节的点 for z in behind_z_list: original_z = z + crop_start[0] pool.apply_async(batch_shrink_edge_2d, args=(new_data[z], new_mask[z], original_z, z, average_density, min_density, max_density, amend_pixel, trachea_pixel), callback=callback) pool.close() pool.join() shrink_mask = new_mask.copy() pool = Pool(pool_size) # 从最大面层到最前面层,扩展为结节的点 for z in front_z_list: original_z = z + crop_start[0] small_data = new_data[z - 1] if 0 <= z - 1 < z_length else new_data[z] small_mask = new_mask[z - 1] if 0 <= z - 1 < z_length else new_mask[z] big_data = new_data[z + 1] if 0 <= z + 1 < z_length else new_data[z] big_mask = new_mask[z + 1] if 0 <= z + 1 < z_length else new_mask[z] new_min_pixel = max(0.075 * np.sum(new_mask[z + 1] == 1) if 0 <= z + 1 < z_length else 0, min_pixel) new_mask[z] = batch_extend_edge_3d(new_data[z], new_mask[z], original_z, average_density, min_density, max_density, small_data, small_mask, big_data, big_mask) pool.apply_async(batch_extend_edge_2d, args=(new_data[z], new_mask[z], original_z, z, average_density, min_density, max_density, big_mask, ref_mask, shrink_mask[z], amend_pixel, trachea_pixel, new_min_pixel), callback=callback) # 从最大面层下一层到最后面层,扩展为结节的点 for z in behind_z_list: original_z = z + crop_start[0] small_data = new_data[z + 1] if 0 <= z + 1 < z_length else new_data[z] small_mask = new_mask[z + 1] if 0 <= z + 1 < z_length else new_mask[z] big_data = new_data[z - 1] if 0 <= z - 1 < z_length else new_data[z] big_mask = new_mask[z - 1] if 0 <= z - 1 < z_length else new_mask[z] new_min_pixel = max(0.075 * np.sum(new_mask[z - 1] == 1) if 0 <= z - 1 < z_length else 0, min_pixel) new_mask[z] = batch_extend_edge_3d(new_data[z], new_mask[z], original_z, average_density, min_density, max_density, small_data, small_mask, big_data, big_mask) pool.apply_async(batch_extend_edge_2d, args=(new_data[z], new_mask[z], original_z, z, average_density, min_density, max_density, big_mask, ref_mask, shrink_mask[z], amend_pixel, trachea_pixel, new_min_pixel), callback=callback) pool.close() pool.join() return new_mask, shrink_mask def shrink_extend_edge_2d(new_data, new_mask, shrink_mask, ref_mask, average_density, min_density, max_density, crop_start, z_length, front_z_list, behind_z_list, amend_pixel, trachea_pixel, min_pixel, is_head_tail=False): # 从最大面层到最前面层,收缩不为结节的点 for z in front_z_list: original_z = z + crop_start[0] _, new_mask[z] = batch_shrink_edge_2d(new_data[z], new_mask[z], original_z, z, average_density, min_density, max_density, amend_pixel, trachea_pixel) shrink_mask[z] = new_mask[z].copy() # 从最大面层下一层到最后面层,收缩不为结节的点 for z in behind_z_list: original_z = z + crop_start[0] _, new_mask[z] = batch_shrink_edge_2d(new_data[z], new_mask[z], original_z, z, average_density, min_density, max_density, amend_pixel, trachea_pixel) shrink_mask[z] = new_mask[z].copy() # 从最大面层到最前面层,扩展为结节的点 for z in front_z_list: original_z = z + crop_start[0] small_data = new_data[z - 1] if 0 <= z - 1 < z_length else new_data[z] small_mask = new_mask[z - 1] if 0 <= z - 1 < z_length else new_mask[z] big_data = new_data[z + 1] if 0 <= z + 1 < z_length else new_data[z] big_mask = new_mask[z + 1] if 0 <= z + 1 < z_length else new_mask[z] new_min_pixel = max(0.075 * np.sum(new_mask[z + 1] == 1) if 0 <= z + 1 < z_length else 0, min_pixel) new_mask[z] = batch_extend_edge_3d(new_data[z], new_mask[z], original_z, average_density, min_density, max_density, small_data, small_mask, big_data, big_mask) _, new_mask[z] = batch_extend_edge_2d(new_data[z], new_mask[z], original_z, z, average_density, min_density, max_density, big_mask, ref_mask, shrink_mask[z], amend_pixel, trachea_pixel, new_min_pixel, is_head_tail) # 从最大面层下一层到最后面层,扩展为结节的点 for z in behind_z_list: original_z = z + crop_start[0] small_data = new_data[z + 1] if 0 <= z + 1 < z_length else new_data[z] small_mask = new_mask[z + 1] if 0 <= z + 1 < z_length else new_mask[z] big_data = new_data[z - 1] if 0 <= z - 1 < z_length else new_data[z] big_mask = new_mask[z - 1] if 0 <= z - 1 < z_length else new_mask[z] new_min_pixel = max(0.075 * np.sum(new_mask[z - 1] == 1) if 0 <= z - 1 < z_length else 0, min_pixel) new_mask[z] = batch_extend_edge_3d(new_data[z], new_mask[z], original_z, average_density, min_density, max_density, small_data, small_mask, big_data, big_mask) _, new_mask[z] = batch_extend_edge_2d(new_data[z], new_mask[z], original_z, z, average_density, min_density, max_density, big_mask, ref_mask, shrink_mask[z], amend_pixel, trachea_pixel, new_min_pixel, is_head_tail) return new_mask, shrink_mask def get_kernel_and_iterations(mask_point_count, trachea_pixel, min_iterations=2): """ 根据mask点数与血管像素点数获取kernel与迭代次数 """ if mask_point_count <= 10 ** 2: kernel = kernel3 next_kernel = kernel5 iterations = 2 elif mask_point_count <= 17 ** 2: kernel = kernel5 next_kernel = kernel7 iterations = 2 elif mask_point_count <= 25 ** 2: kernel = kernel5 next_kernel = kernel7 iterations = trachea_pixel / 4 else: kernel = kernel7 next_kernel = kernel7 iterations = trachea_pixel / 6 iterations = max(int(np.ceil(iterations)), min_iterations) return kernel, next_kernel, iterations def get_maybe_mask_2d(small_mask, big_mask, trachea_pixel): """ 根据small_mask,big_mask获取包含small_mask的估计mask """ ret, erode_mask = get_erode_mask_2d(big_mask, trachea_pixel) if ret: small_mask = np.bitwise_or(erode_mask, small_mask) return small_mask def get_erode_mask_2d(mask, trachea_pixel, min_iterations=2, min_ratio=0.2, ratio=0.8): """ 获取侵蚀后与mask的交集 """ mask_point_count = np.sum(mask == 1) if mask_point_count > 8 ** 2: kernel, next_kernel, original_iterations = get_kernel_and_iterations(mask_point_count, trachea_pixel) for iterations in range(original_iterations, 0, -1): next_mask = cv2.erode(mask, next_kernel, iterations=iterations) new_mask = cv2.erode(mask, kernel, iterations=iterations) new_mask = np.bitwise_or(next_mask, new_mask) new_mask = np.bitwise_and(mask, new_mask) new_mask_point_count = np.sum(new_mask == 1) if new_mask_point_count > ratio * mask_point_count or \ (new_mask_point_count > min_ratio * mask_point_count and iterations == min_iterations): return True, new_mask return False, mask def get_erode_dilate_mask_2d(mask, trachea_pixel, min_iterations=2, min_ratio=0.2, ratio=0.8): """ 获取侵蚀膨胀后与mask的交集 """ mask_point_count = np.sum(mask == 1) if mask_point_count > 8 ** 2: kernel, next_kernel, original_iterations = get_kernel_and_iterations(mask_point_count, trachea_pixel) for iterations in range(original_iterations, 0, -1): next_mask = cv2.erode(mask, next_kernel, iterations=iterations) next_mask = cv2.dilate(next_mask, next_kernel, iterations=iterations) new_mask = cv2.erode(mask, kernel, iterations=iterations) new_mask = cv2.dilate(new_mask, kernel, iterations=iterations) new_mask = np.bitwise_or(next_mask, new_mask) new_mask = np.bitwise_and(mask, new_mask) new_mask_point_count = np.sum(new_mask == 1) if new_mask_point_count > ratio * mask_point_count or \ (new_mask_point_count > min_ratio * mask_point_count and iterations == min_iterations): return True, new_mask return False, mask def fill_min_mask_2d(max_mask, min_mask, default_distance=-1.0): """ 填充最小mask边缘 """ change_mask = max_mask - min_mask if np.sum(change_mask == 1) > 0: change_coords = np.asarray(np.where(change_mask == 1)).T _, contours, _ = cv2.findContours(min_mask, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_NONE) for contour in contours: for point in change_coords: distance = cv2.pointPolygonTest(contour, (point[1], point[0]), True) if distance > default_distance: min_mask[point[0], point[1]] = 1 return min_mask def fill_error_range_point(data, mask, z, density, error_density=0, edge_mask=None): """ 填充在误差范围内凸集中的点 """ _, contours, _ = cv2.findContours(mask, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_NONE) for contour in contours: if len(contours) > 1: # 设置contour获取包含的所有点 temp_mask = np.zeros(mask.shape, np.uint8) cv2.drawContours(temp_mask, [contour], -1, 1, -1) else: temp_mask = mask temp_mask = morphology.convex_hull_object(temp_mask, neighbors=8) temp_mask = temp_mask.astype(np.uint8) if edge_mask is not None: temp_mask = np.bitwise_and(edge_mask, temp_mask) condition = np.logical_and(data >= density - error_density, temp_mask == 1) mask[condition] = 1 def fill_3d(mask): """ 填充mask """ for z in range(len(mask)): if np.all(mask[z] == 0): continue fill_2d(mask[z], z) def fill_2d(mask, z): """ 填充mask """ _, contours, _ = cv2.findContours(mask, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_NONE) cv2.drawContours(mask, contours, -1, 1, -1) def remove_small_objects_3d(mask, ratio=0.15, min_pixel=5): """ 删除小区块 """ for z in range(len(mask)): if np.all(mask[z] == 0): continue mask[z] = remove_small_objects_2d(mask[z], z, ratio=ratio, min_pixel=min_pixel) return mask def remove_small_objects_2d(mask, z, ratio=0.15, min_pixel=5): """ 删除小区块 """ mask_point_count = int(np.sum(mask == 1)) min_size = int(max(ratio * mask_point_count, min_pixel, 5)) input_mask = mask.astype(np.bool) result_mask = morphology.remove_small_objects(input_mask, min_size=min_size) result_mask = result_mask.astype(np.uint8) if np.sum(result_mask == 1) > 0: mask = result_mask return mask def remove_small_area_2d(mask, z, ratio=0.15): """ 检查区块,删除比率小的区块 """ _, contours, _ = cv2.findContours(mask, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_NONE) if len(contours) > 1: contours = sorted(contours, key=cv2.contourArea, reverse=True) for contour_index, contour in enumerate(contours): if contour_index == 0: max_area = cv2.contourArea(contour) elif cv2.contourArea(contour) < ratio * max_area: cv2.drawContours(mask, [contour], -1, 0, -1) return mask def remove_new_extend_area_2d(new_mask, original_mask, z, ratio=0.15): """ 删除新扩展的独立区块 """ _, original_contours, _ = cv2.findContours(original_mask, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_NONE) if len(original_contours) == 0: return new_mask _, new_contours, _ = cv2.findContours(new_mask, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_NONE) if len(new_contours) > 1: new_contours = sorted(new_contours, key=cv2.contourArea, reverse=True) for new_contour_index, new_contour in enumerate(new_contours): new_check_mask = np.zeros(new_mask.shape, np.uint8) cv2.drawContours(new_check_mask, [new_contour], -1, 1, -1) found = False for original_contour_index, original_contour in enumerate(original_contours): original_check_mask = np.zeros(new_mask.shape, np.uint8) cv2.drawContours(original_check_mask, [original_contour], -1, 1, -1) check_mask = np.bitwise_and(new_check_mask, original_check_mask) intersection_count = np.sum(check_mask == 1) check_mask = np.bitwise_or(new_check_mask, original_check_mask) union_count = np.sum(check_mask == 1) if intersection_count > 0 and union_count > 0 and intersection_count / union_count > ratio: found = True break # 删除新扩展的独立区块 if not found: cv2.drawContours(new_mask, [new_contour], -1, 0, -1) return new_mask def remove_trachea_2d(mask, z, trachea_pixel, retain_first=False): """ 检查形态,删除长宽比异常的区块 """ _, contours, _ = cv2.findContours(mask, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_NONE) if len(contours) > 1: contours = sorted(contours, key=cv2.contourArea, reverse=True) for contour_index, contour in enumerate(contours): if retain_first and contour_index == 0: continue if len(contours) > 1: # 设置contour获取包含的所有点 check_mask = np.zeros(mask.shape, np.uint8) cv2.drawContours(check_mask, [contour], -1, 1, -1) else: check_mask = mask if is_trachea(check_mask, z, trachea_pixel, contour_index, contour): cv2.drawContours(mask, [contour], -1, 0, -1) return mask def is_trachea(mask, z, trachea_pixel, contour_index, contour): """ 通过形态判断是否为异常的区块 """ rect_min = cv2.minAreaRect(contour) ((_, _), (width, height), angle) = rect_min if width < 2 or height < 2: return True min_length = min(width, height) max_length = max(width, height) if (contour_index == 0 and min_length <= trachea_pixel and max_length / min_length > 2.0) or \ (contour_index > 0 and min_length <= 1.5 * trachea_pixel and max_length / min_length > 2.0) or \ max_length / min_length > 3.0: return True mask_point_count = np.sum(mask == 1) ret, erode_mask = get_erode_mask_2d(mask, trachea_pixel) if not ret: if min_length <= 1.5 * trachea_pixel and max_length / min_length > 2.0: print('z =', z, 'min_length =', np.round(min_length, 2), 'max_length =', np.round(max_length, 2), 'mask_point_count =', mask_point_count, 'area =', np.round(width * height, 2)) return True return False def remove_small_point_layer_3d(data, mask, amend_head_tail, add_front_z_list, add_behind_z_list, min_pixel): """ 删除异常层 """ # 每层mask点数 z_point_count = np.sum(mask == 1, axis=(1, 2)) # 最大面层 z_max = np.argmax(z_point_count) coords = np.asarray(np.where(mask == 1)) if len(coords[0]) > 0: # 最前面层 z_start = coords.min(axis=1)[0] # 最后面层 z_end = coords.max(axis=1)[0] if z_end - z_start > 2 and z_start != z_max: z = z_start if z != z_max and 0 <= z + 1 < len(mask): current_point_num = z_point_count[z] next_point_num = z_point_count[z + 1] if amend_head_tail and add_front_z_list and len(add_front_z_list) > 0 and add_front_z_list[0] == z: if current_point_num < 2 * min_pixel or 5 * current_point_num < next_point_num: mask[z] = 0 elif current_point_num < 3 * min_pixel and 8 * current_point_num < next_point_num: mask[z] = 0 if z_end - z_start > 2 and z_end != z_max: z = z_end if z != z_max and 0 <= z - 1 < len(mask): current_point_num = z_point_count[z] prev_point_num = z_point_count[z - 1] if amend_head_tail and add_behind_z_list and len(add_behind_z_list) > 0 and add_behind_z_list[0] == z: if current_point_num < 2 * min_pixel or 5 * current_point_num < prev_point_num: mask[z] = 0 elif current_point_num < 3 * min_pixel and 8 * current_point_num < prev_point_num: mask[z] = 0 def sum_nodule_point(data, min_density, max_density): """ 统计在某密度范围内的点数量 """ return np.sum(np.logical_and(data >= min_density, data <= max_density)) def is_near_distance(source, target1, target2, ratio=1.0): """ 判断是否离目标1的距离更近 """ if (ratio * (source - target1)) ** 2 < (source - target2) ** 2: return True return False def calculate_density(data, mask, z, point, kernel_radius, min_density=None): """ 计算选中区块与未选中区块的平均密度 """ density = data[point[0], point[1]] around_data = get_region_data(data, point, kernel_radius) if len(around_data.ravel()) < (2 * kernel_radius + 1) ** 2: return density, density, 0, 0 around_mask = get_region_data(mask, point, kernel_radius).copy() # 当前点不参与计算 around_mask[kernel_radius, kernel_radius] = -1 if min_density is not None: # 小于最小密度的点不参与计算 around_mask[around_data < min_density] = -1 num1 = np.sum(around_mask == 1) num0 = np.sum(around_mask == 0) density1 = np.mean(around_data[around_mask == 1]) if num1 > 0 else density density0 = np.mean(around_data[around_mask == 0]) if num0 > 0 else density return density1, density0, num1, num0 def is_shrink_point_density(data, mask, z, point, kernel_radius, min_density=None, error_thred=0): """ 判断该点是否可以收缩 """ density = data[point[0], point[1]] density1, density0, num1, num0 = calculate_density(data, mask, z, point, kernel_radius, min_density) if num1 <= 0 or num0 <= 0: # 数据不正确 return 0 if is_near_distance(density, density0, density1): # 离未选中区块更近 return 1 if np.abs(density - density0) <= error_thred: # 在未选中区块的误差范围内 return 1 return 0 def is_extend_point_density(data, mask, z, point, kernel_radius, min_density=None, error_thred=0, ref_mask=None): """ 判断该点是否可以扩展 """ density = data[point[0], point[1]] density1, density0, num1, num0 = calculate_density(data, mask, z, point, kernel_radius, min_density) if num1 <= 0 or num0 <= 0: # 数据不正确 return 0 if ref_mask is not None and ref_mask[point[0], point[1]] == 0 and \ density0 > density1 + 20 and density > density1 + 20: # 不在ref_mask范围内,不向密度大的方向上扩展 return 0 if (density > density1 + 20 and density > density0 + 20) or \ (density < density1 - 20 and density < density0 - 20): # 该点的密度与两边反向,不处理该点 return 0 if is_near_distance(density, density1, density0): # 离选中区块更近 return 1 if np.abs(density - density1) <= error_thred: # 在选中区块的误差范围内 return 1 return 0 def shrink_edge_point_11(data, mask, z, average_density, min_density, max_density, amend_pixel): """ 收缩边缘点,该点在它的周围选中的点中为最大密度的点 """ for _ in range(amend_pixel): change = False _, contours, _ = cv2.findContours(mask, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_NONE) for contour in contours: around_points = get_contour_points(contour) around_data = data[around_points[:, 0], around_points[:, 1]] idx_sorted = np.argsort(around_data)[::-1] for point in around_points[idx_sorted]: if mask[point[0], point[1]] == 0: continue density = data[point[0], point[1]] if density < max_density - 20: # 该点密度小于最大密度,跳过 continue check_data = get_region_data(data, point, 2) if np.sum(check_data < min_density) > 0: # 周围有点密度小于最小密度,跳过 continue check_mask = get_region_data(mask, point, 2) check_data_1 = check_data[check_mask == 1] check_data_0 = check_data[check_mask == 0] if np.sum(check_data_1 > density) == 0 and np.sum(check_data_0 > density) > 0: # 周围选中的点中密度都小于该点密度并且周围未选中的点中密度存在大于该点密度,mask置于0 mask[point[0], point[1]] = 0 change = True continue if not change: break def shrink_edge_point_21(data, mask, z, average_density, min_density, max_density, amend_pixel, error_thred=0): """ 收缩边缘点 """ for _ in range(amend_pixel): change = False _, contours, _ = cv2.findContours(mask, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_NONE) for contour in contours: around_points = get_contour_points(contour) for point in around_points: if mask[point[0], point[1]] == 0: continue density = data[point[0], point[1]] if min_density + density_reject_error_range <= density <= max_density - density_reject_error_range: continue check_data = get_region_data(data, point, 1) check_num = len(check_data.ravel()) nodule_point_num = sum_nodule_point(check_data, min_density, max_density) if min_density + density_allow_error_range <= density <= max_density - density_allow_error_range and \ nodule_point_num > 0.5 * check_num: continue elif min_density <= density <= max_density and \ nodule_point_num > 0.8 * check_num: continue if density < min_density and np.sum(check_data < min_density) > 0.5 * check_num: # 该点密度小于最小密度,并且周围的点中密度有一半以上小于最小密度,mask置于0 mask[point[0], point[1]] = 0 change = True continue # 判断该点是否可以收缩 new_error_thred = error_thred if density < min_density - density_allow_error_range: new_error_thred += density_allow_error_range vote_num = is_shrink_point_density(data, mask, z, point, 1, error_thred=new_error_thred) + \ is_shrink_point_density(data, mask, z, point, 2, error_thred=new_error_thred) if vote_num >= 2: # 该点可以收缩,mask置于0 mask[point[0], point[1]] = 0 change = True continue if not change: break def batch_shrink_edge_2d(data, mask, z, index, average_density, min_density, max_density, amend_pixel, trachea_pixel): """ 批量收缩不为结节的点 """ # 检查形态,删除长宽比异常的区块 mask = remove_trachea_2d(mask, z, trachea_pixel, retain_first=True) # 填充mask fill_2d(mask, z) # 获取核mask ret, kernel_mask = get_erode_mask_2d(mask, trachea_pixel) if ret: # 填充在误差范围内凸集中的点 fill_error_range_point(data, kernel_mask, z, min_density, error_density=density_allow_error_range) mask[kernel_mask == 1] = 1 # 收缩前mask original_shrink_mask = mask.copy() # 收缩边缘点,该点在它的周围选中的点中为最大密度的点 shrink_edge_point_11(data, mask, z, average_density, min_density, max_density, 2) # 收缩边缘点 shrink_edge_point_21(data, mask, z, average_density, min_density, max_density, amend_pixel) # 获取最小mask ret, min_mask = get_erode_dilate_mask_2d(mask, trachea_pixel, ratio=0.9) if ret: # 收缩与扩展边缘点, 扩展因侵蚀膨胀后丢失的为结节的点 extend_edge_point_32(data, min_mask, z, average_density, min_density, max_density, edge_mask=mask, shrink_mask=None, error_thred=50) mask = min_mask # 填充mask边缘 mask = fill_min_mask_2d(original_shrink_mask, mask) # 检查区块,删除比率小的区块 mask = remove_small_area_2d(mask, z) return index, mask def batch_extend_edge_3d(data, mask, z, average_density, min_density, max_density, small_data, small_mask, big_data, big_mask): """ 根据上下层,批量扩展为结节的点 """ if np.sum(small_mask == 1) > 0 and np.sum(big_mask == 1) > 0: base_condition_data = np.logical_and(data >= min_density, data <= max_density) base_condition_log1 = np.logical_and(base_condition_data, mask == 0) big_condition_data = np.logical_and(big_data >= min_density, big_data <= max_density) big_condition_log1 = np.logical_and(big_condition_data, big_mask == 1) small_condition_data = np.logical_and(small_data >= min_density, small_data <= max_density) small_condition_log1 = np.logical_and(small_condition_data, small_mask == 1) condition = np.logical_and(big_condition_log1, base_condition_log1) condition = np.logical_and(small_condition_log1, condition) mask[condition] = 1 return mask def extend_edge_point_11(data, mask, z, average_density, min_density, max_density, amend_pixel): """ 扩展为结节的点 """ increase_points = calculate_increase_points(amend_pixel) _, contours, _ = cv2.findContours(mask, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_NONE) for contour in contours: # 获取相关的点,只处理mask为0的点 around_points = batch_get_around_points(mask, contour, increase_points, select_value=0) around_data = data[around_points[:, 0], around_points[:, 1]] # 获取大于等于最小密度与小于等于最大密度的点 around_min_points = around_points[np.logical_and(around_data >= min_density + density_allow_error_range, around_data <= max_density - density_allow_error_range)] # 大于等于最小密度与小于等于最大密度的点mask置为1 mask[around_min_points[:, 0], around_min_points[:, 1]] = 1 # 获取在误差范围内的小于最小密度的点,根据条件设置mask的值 around_error_points = around_points[np.logical_and(around_data > min_density - density_allow_error_range, around_data < min_density + density_allow_error_range)] if len(around_error_points) > 0: around_error_points = np.unique(around_error_points, axis=0) for point in around_error_points: check_data = get_region_data(data, point, 1) if sum_nodule_point(check_data, min_density - density_allow_error_range, max_density) >= 5: mask[point[0], point[1]] = 1 # 获取在误差范围内的大于最大密度的点,根据条件设置mask的值 around_error_points = around_points[np.logical_and(around_data > max_density - density_allow_error_range, around_data < max_density + density_allow_error_range)] if len(around_error_points) > 0: around_error_points = np.unique(around_error_points, axis=0) for point in around_error_points: check_data = get_region_data(data, point, 1) if sum_nodule_point(check_data, min_density, max_density + density_allow_error_range) >= 5: mask[point[0], point[1]] = 1 def extend_edge_point_12(data, mask, z, average_density, min_density, max_density, amend_pixel): """ 扩展为结节的点 """ increase_points = calculate_increase_points(amend_pixel) _, contours, _ = cv2.findContours(mask, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_NONE) for contour in contours: # 获取相关的点,只处理mask为0的点 around_points = batch_get_around_points(mask, contour, increase_points, select_value=0) around_data = data[around_points[:, 0], around_points[:, 1]] # 获取大于等于最小密度与小于等于最大密度的点 around_min_points = around_points[np.logical_and(around_data >= min_density - density_allow_error_range, around_data <= max_density + density_allow_error_range)] # 大于等于最小密度与小于等于最大密度的点mask置为1 mask[around_min_points[:, 0], around_min_points[:, 1]] = 1 def extend_edge_point_21(data, mask, z, average_density, min_density, max_density, extend_mask): """ 扩展为结节的点,边缘点 """ extend_coords = np.asarray(np.where(extend_mask == 1)).T for point in extend_coords: if mask[point[0], point[1]] == 1: continue check_data = get_region_data(data, point, 1) check_mask = get_region_data(mask, point, 1) lt_min_density_num1 = np.sum(check_data < min_density) lt_min_density_num2 = np.sum(check_data < min_density + density_allow_error_range) if (lt_min_density_num1 > 0 or lt_min_density_num2 > 2) and \ sum_nodule_point(check_data, min_density, max_density) > 2: # 周围有点的密度小于最小密度,该点设为边缘点,mask置于1 mask[point[0], point[1]] = 1 # 周围所有点mask也置于1 check_mask[...] = 1 elif lt_min_density_num2 + \ np.sum(check_data > max_density - density_allow_error_range) == len(check_data.ravel()): # 周围点的密度要么小于最小密度要么大于最大密度,mask置于1 mask[point[0], point[1]] = 1 def extend_edge_point_31(data, mask, z, average_density, min_density, max_density, amend_pixel, edge_mask=None, ref_mask=None, error_thred=0): """ 扩展为结节的点 """ for _ in range(amend_pixel): change = False _, contours, _ = cv2.findContours(mask, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_NONE) for contour in contours: # 获取相关的点,只处理mask为0的点 around_points = batch_get_around_points(mask, contour, increase_points1, select_value=0) if len(around_points) == 0: continue around_points = np.unique(around_points, axis=0) for point in around_points: if mask[point[0], point[1]] == 1: continue if edge_mask is not None and edge_mask[point[0], point[1]] == 0: continue check_mask = get_region_data(mask, point, 1) if np.sum(check_mask == 1) <= 2: continue density = data[point[0], point[1]] if not (min_density - density_allow_error_range <= density <= max_density + density_allow_error_range): # 该点密度不在密度范围内,跳过 continue check_data = get_region_data(data, point, 1) check_num = len(check_data.ravel()) nodule_point_num = sum_nodule_point(check_data, min_density, max_density) check_data_1 = check_data[check_mask == 1] nodule_point_num_1 = sum_nodule_point(check_data_1, min_density + density_allow_error_range, max_density - density_allow_error_range) if min_density + density_allow_error_range <= density <= max_density - density_allow_error_range and \ nodule_point_num_1 > 3: # 周围的点密度在密度范围内,mask置于1 mask[point[0], point[1]] = 1 change = True continue elif min_density <= density <= max_density and \ nodule_point_num == check_num and \ nodule_point_num_1 > 3: # 周围的点密度在密度范围内,mask置于1 mask[point[0], point[1]] = 1 change = True continue # 判断该点是否可以扩展 new_error_thred = error_thred vote_num = is_extend_point_density(data, mask, z, point, 1, error_thred=new_error_thred, ref_mask=ref_mask) + \ is_extend_point_density(data, mask, z, point, 2, error_thred=new_error_thred, ref_mask=ref_mask) if vote_num >= 2: # 该点可以扩展,mask置于1 mask[point[0], point[1]] = 1 change = True continue if not change: break def blank_outside_edge_3d(mask, times=1): for z in range(len(mask)): if np.sum(mask[z] == 1) == 0: continue blank_outside_edge_2d(mask[z], times) def blank_outside_edge_2d(mask, times=1): for _ in range(times): _, contours, _ = cv2.findContours(mask, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_NONE) for contour in contours: around_points = get_contour_points(contour) if len(around_points) > 0: mask[around_points[:, 0], around_points[:, 1]] = 0 def extend_edge_point_32(data, mask, z, average_density, min_density, max_density, edge_mask=None, shrink_mask=None, error_thred=0): """ 收缩与扩展边缘点 """ mask_point_count = np.sum(mask == 1) if mask_point_count < 8 ** 2: return if edge_mask is None: edge_mask = mask.copy() times = 1 blank_outside_edge_2d(mask, times) for i in range(times + 1): change = False _, contours, _ = cv2.findContours(mask, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_NONE) for contour in contours: # 获取相关的点,只处理mask为0的点 around_points = batch_get_around_points(mask, contour, increase_points1, select_value=0) if len(around_points) == 0: continue around_points = np.unique(around_points, axis=0) for point in around_points: if mask[point[0], point[1]] == 1: continue if edge_mask is not None and edge_mask[point[0], point[1]] == 0: continue check_mask = get_region_data(mask, point, 1) if np.sum(check_mask == 1) <= 2: continue if shrink_mask is not None and shrink_mask[point[0], point[1]] == 1: # 该点在shrink_mask内,mask置于1 mask[point[0], point[1]] = 1 continue density = data[point[0], point[1]] if min_density + density_allow_error_range <= density <= max_density - density_allow_error_range: # 该点密度在密度范围内,mask置于1 mask[point[0], point[1]] = 1 change = True continue # 判断该点是否可以扩展 new_error_thred = error_thred vote_num = is_extend_point_density(data, mask, z, point, 1, error_thred=new_error_thred) + \ is_extend_point_density(data, mask, z, point, 2, error_thred=new_error_thred) if vote_num >= 2: # 该点可以扩展,mask置于1 mask[point[0], point[1]] = 1 change = True continue if not change: break def batch_extend_edge_2d(data, mask, z, index, average_density, min_density, max_density, big_mask, ref_mask, shrink_mask, amend_pixel, trachea_pixel, min_pixel, is_head_tail=False): """ 批量扩展为结节的点 """ # 获取最大扩展区块 max_mask = mask.copy() extend_edge_point_11(data, max_mask, z, average_density, min_density, max_density, amend_pixel) # 删除小区块 max_mask = remove_small_objects_2d(max_mask, z, min_pixel=min_pixel) # 删除新扩展的独立区块 max_mask = remove_new_extend_area_2d(max_mask, shrink_mask, z) # 检查形态,删除长宽比异常的区块 max_mask = remove_trachea_2d(max_mask, z, trachea_pixel, retain_first=not is_head_tail) if np.sum(max_mask == 1) == 0: return index, max_mask # 获取核mask ret, kernel_mask = get_erode_mask_2d(max_mask, trachea_pixel) if ret: # 填充在误差范围内凸集中的点 fill_error_range_point(data, kernel_mask, z, min_density, error_density=density_allow_error_range, edge_mask=big_mask) max_mask[kernel_mask == 1] = 1 # 多扩展几圈计算边缘 new_mask = max_mask.copy() extend_edge_point_12(data, new_mask, z, average_density, min_density, max_density, 2) # 删除新扩展的独立区块 new_mask = remove_new_extend_area_2d(new_mask, shrink_mask, z) # 获取所有扩展的点,并判断扩展的点是否为边缘点 total_mask = mask.copy() extend_mask = new_mask - total_mask extend_mask[np.logical_and(data > min_density + density_reject_error_range, data < max_density - density_reject_error_range)] = 0 extend_edge_point_21(data, total_mask, z, average_density, min_density, max_density, extend_mask) # 填充边缘内的所有点 fill_2d(total_mask, z) # 获取最大扩展区块与边缘内区块的交集 edge_mask = np.bitwise_and(max_mask, total_mask) if is_head_tail and not np.all(max_mask == edge_mask): # 没有找到边界 mask[...] = 0 shrink_mask[...] = 0 return index, mask # 真正扩展为结节的点 extend_edge_point_31(data, mask, z, average_density, min_density, max_density, amend_pixel, edge_mask=edge_mask, ref_mask=ref_mask) # 获取核mask ret, kernel_mask = get_erode_mask_2d(mask, trachea_pixel) if ret: # 填充在误差范围内凸集中的点 fill_error_range_point(data, kernel_mask, z, min_density, error_density=density_allow_error_range, edge_mask=big_mask) mask[kernel_mask == 1] = 1 # 再次扩展为结节的点 extend_edge_point_31(data, mask, z, average_density, min_density, max_density, 2, edge_mask=edge_mask) # 收缩边缘点,该点在它的周围选中的点中为最大密度的点 shrink_edge_point_11(data, mask, z, average_density, min_density, max_density, 2) # 收缩与扩展边缘点 extend_edge_point_32(data, mask, z, average_density, min_density, max_density, shrink_mask=shrink_mask, error_thred=0) # 获取最小mask ret, min_mask = get_erode_dilate_mask_2d(mask, trachea_pixel) # 填充mask边缘 mask = fill_min_mask_2d(mask, min_mask) mask[shrink_mask == 1] = 1 # 删除小区块 mask = remove_small_objects_2d(mask, z, min_pixel=min_pixel) # 删除新扩展的独立区块 mask = remove_new_extend_area_2d(mask, shrink_mask, z) # 检查区块,删除比率小的区块 mask = remove_small_area_2d(mask, z) # 填充mask fill_2d(mask, z) return index, mask def analysis_edge_hu_threshold(nodule_data, file_prefix='', log=False): average_density = int(np.round(np.mean(nodule_data))) min_density = int(np.min(nodule_data)) max_density = int(np.max(nodule_data)) left_data = nodule_data[nodule_data <= average_density] left_point_count = len(left_data) sorted_left_data = left_data[np.argsort(left_data)] max_threshold_index = max(int(0.3 * left_point_count), 20 if left_point_count > 20 else left_point_count - 1) min_threshold = min_density max_threshold = sorted_left_data[max_threshold_index] left_average_space = np.round((np.max(left_data) - np.min(left_data)) / (left_point_count + 1), 5) left_max_space = max(4 * left_average_space, 10 if left_average_space > 0.5 else 5) if left_average_space < 0.05: left_max_space = 50 * left_average_space elif left_average_space < 0.2: left_max_space = 20 * left_average_space left_max_space = np.round(left_max_space, 5) left_values, left_counts = np.unique(left_data, return_counts=True) left_spaces = (left_values[1:] - left_values[:-1]) / left_counts[1:] left_coords = np.where(left_spaces >= left_max_space)[0] if log: print(file_prefix, 'left_min_density =', np.min(left_data), 'left_max_density =', np.max(left_data), 'left_average_space =', left_average_space, 'left_max_space =', left_max_space, 'left_point_count =', left_point_count) for left_min_index in left_coords: left_min_value = left_values[left_min_index + 1] left_remove_num = np.sum(left_data < left_min_value) if left_remove_num < max(0.1 * left_point_count, 10): min_density = left_min_value if log: print(file_prefix, 'left_min_index =', left_min_index, 'left_space =', np.round(left_spaces[left_min_index], 5), 'left_min_value =', left_min_value, 'left_remove_num =', left_remove_num) if left_remove_num < max(0.05 * left_point_count, 5): min_threshold = min_density if max_threshold < min_threshold: max_threshold = min_threshold if log: print(file_prefix, 'min_threshold =', min_threshold, 'max_threshold =', max_threshold) right_data = nodule_data[nodule_data >= average_density] right_point_count = len(right_data) right_average_space = np.round((np.max(right_data) - np.min(right_data)) / (right_point_count + 1), 5) right_max_space = max(4 * right_average_space, 10 if right_average_space > 0.5 else 5) if right_average_space < 0.05: right_max_space = 50 * right_average_space elif right_average_space < 0.2: right_max_space = 20 * right_average_space right_max_space = np.round(right_max_space, 5) right_values, right_counts = np.unique(right_data, return_counts=True) right_spaces = (right_values[1:] - right_values[:-1]) / right_counts[1:] right_coords = np.where(right_spaces >= right_max_space)[0] if log: print(file_prefix, 'right_min_density =', np.min(right_data), 'right_max_density =', np.max(right_data), 'right_average_space =', right_average_space, 'right_max_space =', right_max_space, 'right_point_count =', right_point_count) for right_max_index in right_coords: right_max_value = right_values[right_max_index] right_remove_num = np.sum(right_data > right_max_value) if right_remove_num < max(0.1 * right_point_count, 10): max_density = right_max_value if log: print(file_prefix, 'right_max_index =', right_max_index, 'right_space =', np.round(right_spaces[right_max_index], 5), 'right_max_value =', right_max_value, 'right_remove_num =', right_remove_num) break return min_density, max_density, min_threshold, max_threshold def calculate_edge_hu_threshold(data, mask, otsu=True, hu_max=2000, file_prefix='', log=False): # 获取结节数据 nodule_data = filter_data(data[mask == 1]) if len(nodule_data) == 0: return hu_max, hu_max, hu_max min_density, max_density, min_threshold, max_threshold = \ analysis_edge_hu_threshold(nodule_data, file_prefix=file_prefix, log=log) nodule_data = nodule_data[nodule_data >= min_density] nodule_data = nodule_data[nodule_data <= max_density] if len(nodule_data) == 0: return hu_max, hu_max, hu_max average_density = int(np.round(np.mean(nodule_data))) if otsu: otsu_min_density = otsu_edge_hu_threshold(data, mask, average_density, min_threshold, max_threshold, file_prefix=file_prefix, log=log) logging.info('{}, {}, otsu min_density = {}' .format(time.strftime('%Y-%m-%d %H:%M:%S'), file_prefix, otsu_min_density)) if otsu_min_density != min_density: if otsu_min_density < min_density: # 重新获取结节数据 nodule_data = filter_data(data[mask == 1]) min_density = otsu_min_density nodule_data = nodule_data[nodule_data >= min_density] nodule_data = nodule_data[nodule_data <= max_density] if len(nodule_data) == 0: return hu_max, hu_max, hu_max average_density = int(np.round(np.mean(nodule_data))) if log: logging.info('{}, {}, average_density = {} ' 'min_density = {} max_density = {} ' 'min_threshold = {} max_threshold = {}' .format(time.strftime('%Y-%m-%d %H:%M:%S'), file_prefix, average_density, min_density, max_density, min_threshold, max_threshold)) return average_density, min_density, max_density def otsu_edge_hu_threshold(data, mask, average_density, min_threshold, max_threshold, hu_min=-1000, hu_max=2000, file_prefix='', log=False): new_masks = np.zeros((4, mask.shape[0], mask.shape[1], mask.shape[2]), np.uint8) for z in range(len(mask)): if np.all(mask[z] == 0): continue _, contours, _ = cv2.findContours(mask[z], cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_NONE) if len(contours) > 1: contours = sorted(contours, key=cv2.contourArea, reverse=True) for contour_index, contour in enumerate(contours): if contour_index > 0: break (x, y), radius = cv2.minEnclosingCircle(contour) (x, y, radius) = np.int0((x, y, radius)) if radius >= 15: increase_length = 3 elif radius >= 12: increase_length = 2 else: increase_length = 1 for new_mask_index, new_mask in enumerate(new_masks): increase_points = calculate_increase_points(new_mask_index + increase_length) around_points = batch_get_around_points(mask[z], contour, increase_points) new_mask[z][around_points[:, 0], around_points[:, 1]] = 1 # 原始结节的点 original_mask = mask.copy() original_mask[data <= hu_min] = 0 original_mask[data >= hu_max] = 0 original_mask[data >= average_density] = 0 nodule_datas = [] for new_mask in new_masks: # 结节边缘与周围的点 new_mask[data <= hu_min] = 0 new_mask[data >= hu_max] = 0 new_mask[data >= average_density] = 0 # 结节边缘的点 is_nodule_mask = np.bitwise_and(new_mask, original_mask) # 结节周围的点 no_nodule_mask = new_mask - is_nodule_mask no_nodule_mask[data >= min_threshold] = 0 no_nodule_mask[data >= max_threshold] = 0 # 填补结节边缘与结节周围使它们的点数量相同,形成两个波峰的效果 is_nodule_data = data[is_nodule_mask == 1] no_nodule_data = data[no_nodule_mask == 1] is_nodule_point_count = len(is_nodule_data) no_nodule_point_count = len(no_nodule_data) if is_nodule_point_count < no_nodule_point_count: no_nodule_data = no_nodule_data[0:is_nodule_point_count] no_nodule_point_count = len(no_nodule_data) if is_nodule_point_count <= 0 or no_nodule_point_count <= 0: continue is_nodule_density = int(np.round(np.mean(is_nodule_data))) no_nodule_density = int(np.round(np.mean(no_nodule_data))) if is_nodule_point_count > no_nodule_point_count: add_data = [no_nodule_density] * (is_nodule_point_count - no_nodule_point_count) no_nodule_data = np.concatenate((no_nodule_data, add_data)) elif is_nodule_point_count < no_nodule_point_count: add_data = [is_nodule_density] * (no_nodule_point_count - is_nodule_point_count) is_nodule_data = np.concatenate((is_nodule_data, add_data)) add_count = max(int(0.1 * (is_nodule_point_count + no_nodule_point_count)), 20) add_data = [no_nodule_density] * add_count no_nodule_data = np.concatenate((no_nodule_data, add_data)) add_data = [is_nodule_density] * add_count is_nodule_data = np.concatenate((is_nodule_data, add_data)) nodule_data = np.concatenate((is_nodule_data, no_nodule_data)) nodule_datas.append(nodule_data) if len(nodule_datas) == 0: return min_threshold hu_thresholds = [] for n in range(1, len(nodule_datas) + 1): for tup in itertools.combinations(list(range(len(nodule_datas))), n): nodule_data = np.empty(0, np.float32) for index in tup: nodule_data = np.concatenate((nodule_data, nodule_datas[index])) nodule_data = nodule_data + np.abs(hu_min) nodule_data = nodule_data.astype(np.uint) hu_threshold = mahotas.thresholding.otsu(nodule_data) - np.abs(hu_min) hu_thresholds.append(hu_threshold) hu_thresholds = np.sort(np.array(hu_thresholds)) min_density = get_hu_threshold(hu_thresholds, min_threshold, max_threshold) if log: logging.info('{}, {}, min_density = {} {}' .format(time.strftime('%Y-%m-%d %H:%M:%S'), file_prefix, min_density, hu_thresholds)) return min_density def get_hu_threshold(hu_thresholds, min_threshold, max_threshold): hu_thresholds = hu_thresholds.copy() hu_thresholds = hu_thresholds[hu_thresholds >= (min_threshold - 50)] if len(hu_thresholds) == 0: return min_threshold hu_thresholds = hu_thresholds[hu_thresholds <= max_threshold] if len(hu_thresholds) == 0: return max_threshold for ratio in [5, 4, 3]: hu_thresholds_exclude = exclude_exception_hu(hu_thresholds, ratio=ratio) if len(hu_thresholds_exclude) > 0: hu_thresholds = hu_thresholds_exclude else: break values, counts = np.unique(hu_thresholds, return_counts=True) max_count = max(counts) if max_count > 1: # hu_threshold = values[np.argmax(counts)] hu_min_threshold = np.min(values[counts == max_count]) hu_max_threshold = np.max(values[counts == max_count]) hu_thresholds = hu_thresholds[hu_thresholds >= hu_min_threshold] hu_thresholds = hu_thresholds[hu_thresholds <= hu_max_threshold] hu_threshold = int(np.round(np.mean(hu_thresholds))) else: hu_threshold = int(np.round(np.mean(hu_thresholds))) return hu_threshold def exclude_exception_hu(hu_thresholds, ratio=2): hu_thresholds = hu_thresholds.copy() hu_average_value = np.mean(hu_thresholds) hu_average_space = np.round((np.max(hu_thresholds) - np.min(hu_thresholds)) / (len(hu_thresholds) + 1), 5) hu_max_space = max(ratio * hu_average_space, 5) hu_min_threshold = hu_thresholds[0] hu_max_threshold = hu_thresholds[-1] values, counts = np.unique(hu_thresholds, return_counts=True) hu_spaces = (values[1:] - values[:-1]) / counts[1:] for i in range(len(hu_spaces)): if hu_spaces[i] >= hu_max_space: if values[i + 1] < hu_average_value: hu_min_threshold = values[i + 1] elif hu_average_value < values[i] < hu_max_threshold: hu_max_threshold = values[i] hu_thresholds = hu_thresholds[hu_thresholds >= hu_min_threshold] hu_thresholds = hu_thresholds[hu_thresholds <= hu_max_threshold] return hu_thresholds def set_nodule_outside_mask(data, mask, check_density=True, average_density=None, ratio=0.5, min_length=5): """ 设置结节外部肺部组织,结节mask为1, 外部肺部组织mask为99 """ if average_density is None: # 获取结节数据 nodule_data = data[mask == 1] if len(nodule_data) == 0: return # 计算平均密度 average_density = int(np.round(np.mean(nodule_data))) for z in range(len(mask)): if np.sum(mask[z] == 1) == 0: continue _, contours, _ = cv2.findContours(mask[z], cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_NONE) if len(contours) > 1: contours = sorted(contours, key=cv2.contourArea, reverse=True) for contour_index, contour in enumerate(contours): if len(contours) > 1: # 设置contour获取包含的所有点 check_mask = np.zeros(mask[z].shape, np.uint8) cv2.drawContours(check_mask, [contour], -1, 1, -1) else: check_mask = mask[z] nodule_point_num = np.sum(check_mask == 1) (_, _), radius = cv2.minEnclosingCircle(contour) for increase_length in range(max(int(np.ceil(ratio * radius)), min_length)): increase_points = calculate_increase_points(increase_length) around_points = batch_get_around_points(check_mask, contour, increase_points, select_value=0) around_points1 = [point for point in around_points if check_mask[point[0], point[1]] == 0 and (not check_density or data[z][point[0], point[1]] < average_density)] if len(around_points1) > 0: around_points1 = np.array(around_points1) mask[z][around_points1[:, 0], around_points1[:, 1]] = 99 check_mask[around_points1[:, 0], around_points1[:, 1]] = 99 outside_point_num = np.sum(check_mask == 99) if (outside_point_num > nodule_point_num) or \ (increase_length > min_length and outside_point_num > 0.7 * nodule_point_num): break def get_nodule_peak(nodule_data, kernel, hu_range=25): """ 获取结节波峰HU值, HU最小值,HU最大值 """ values, counts = np.unique(nodule_data, return_counts=True) max_values = values[counts == max(counts)] if len(max_values) > 1: max_total_hu_value = [] for max_value in max_values: temp_data = np.logical_and(nodule_data >= max_value - hu_range, nodule_data <= max_value + hu_range) max_total_hu_value.append(np.sum(temp_data)) max_index = np.argmax(np.array(max_total_hu_value)) peak_hu_value = max_values[max_index] else: peak_index = np.argmax(counts) peak_hu_value = values[peak_index] min_hu_value = peak_hu_value max_hu_value = peak_hu_value counts = ndimage.convolve(counts, kernel, mode='nearest') counts = counts.astype(np.float32) peak_index = np.argmax(values == peak_hu_value) for i in range(peak_index - 1, -1, -1): if counts[i] >= counts[peak_index] / 2: min_hu_value = values[i] else: break for i in range(peak_index + 1, len(values), 1): if counts[i] >= counts[peak_index] / 2: max_hu_value = values[i] else: break return peak_hu_value, min_hu_value, max_hu_value def get_nodule_peak_info(data, mask): """ 获取结节波峰相关信息(波峰HU值, 波峰HU数量, HU最小值,HU最大值,占比) """ nodule_data = data[mask == 1] total_point_num = len(nodule_data) if total_point_num == 0: return None # 过滤无效的hu值 nodule_data = filter_data(nodule_data) if len(nodule_data) == 0: nodule_data = data[mask == 1] kernel = np.array([1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1]) kernel = kernel / np.sum(kernel) # 计算平均密度 average_density = int(np.round(np.mean(nodule_data))) peak_hu_value, min_hu_value, max_hu_value = get_nodule_peak(nodule_data, kernel) if average_density - peak_hu_value > 100 and \ np.abs(2 * peak_hu_value - min_hu_value - max_hu_value) > 50: nodule_data_copy = nodule_data.copy() nodule_data_copy = nodule_data_copy[nodule_data_copy >= average_density] peak_hu_value2, min_hu_value2, max_hu_value2 = get_nodule_peak(nodule_data_copy, kernel) nodule_data_copy = nodule_data.copy() nodule_data_copy = nodule_data_copy[nodule_data_copy >= peak_hu_value] nodule_data_copy = nodule_data_copy[nodule_data_copy <= peak_hu_value2] values, counts = np.unique(nodule_data_copy, return_counts=True) counts = counts.astype(np.float32) counts = ndimage.convolve(counts, kernel, mode='nearest') min_values = values[counts == min(counts)] trough_hu_value = min_values[-1] nodule_data_copy = nodule_data.copy() nodule_data_copy = nodule_data_copy[nodule_data_copy >= trough_hu_value] peak_hu_value2, min_hu_value2, max_hu_value2 = get_nodule_peak(nodule_data_copy, kernel) if peak_hu_value2 - min_hu_value2 > 50: peak_hu_value, min_hu_value, max_hu_value = peak_hu_value2, min_hu_value2, max_hu_value2 nodule_data_copy = nodule_data.copy() nodule_data_copy = nodule_data_copy[nodule_data_copy >= min_hu_value] nodule_data_copy = nodule_data_copy[nodule_data_copy <= max_hu_value] current_point_num = len(nodule_data_copy) ratio = np.round(current_point_num / total_point_num, 5) peak_hu_count = np.sum(nodule_data == peak_hu_value) peak_info = (peak_hu_value, peak_hu_count, min_hu_value, max_hu_value, ratio) return peak_info