import os from glob import glob import re import numpy as np import torch import json def bboxes_wasserstein(bboxes1, bboxes2, constant=1333.10): """ bboxes1: (cx, cy, cz, w, h, l) bboxes2: (cx, cy, cz, w, h, l) """ # print(f"bboxes1 shape: {bboxes1.shape}") # print(f"bboxes2 shape: {bboxes2.shape}") eps=1e-6 x_center1 = bboxes1[0] x_center2 = bboxes2[0] y_center1 = bboxes1[1] y_center2 = bboxes2[1] z_center1 = bboxes1[2] z_center2 = bboxes2[2] w_s = x_center1 - x_center2 h_s = y_center1 - y_center2 l_s = z_center1 - z_center2 center_distance = w_s * w_s + h_s * h_s + l_s * l_s + eps # w1 = bboxes1[3] + eps h1 = bboxes1[4] + eps l1 = bboxes1[5] + eps w2 = bboxes2[3] + eps h2 = bboxes2[4] + eps l2 = bboxes2[5] + eps wh_distance = ((w1 - w2) ** 2 + (h1 - h2) ** 2 + (l1 - l2) ** 2) / 4 wassersteins = torch.sqrt(center_distance + wh_distance) normalized_wasserstein = torch.exp(- wassersteins / constant) return normalized_wasserstein, wassersteins def compute_distance(box1, box2): return np.sqrt((box1[0] - box2[0])**2 + (box1[1] - box2[1])**2 + (box1[2] - box2[2])**2) def compute_size_diff(box1, box2): w_diff = np.abs(box1[3] - box2[3]) / box2[3] l_diff = np.abs(box1[4] - box2[4]) / box2[4] h_diff = np.abs(box1[5] - box2[5]) / box2[5] return w_diff + l_diff + h_diff def compute_score(box1, box2, alpha=1.0, beta=1.0): distance = compute_distance(box1, box2) size_diff = compute_size_diff(box1, box2) return alpha * distance + beta * size_diff def compute_iou_3d(box1, box2): """ [x, y, z, l, w, h], (x, y, z)-中心点坐标,(l, w, h)-长宽高。 """ intersect_x = max(0, min(box1[0] + box1[3] / 2, box2[0] + box2[3] / 2) - max(box1[0] - box1[3] / 2, box2[0] - box2[3] / 2)) intersect_y = max(0, min(box1[1] + box1[4] / 2, box2[1] + box2[4] / 2) - max(box1[1] - box1[4] / 2, box2[1] - box2[4] / 2)) intersect_z = max(0, min(box1[2] + box1[5] / 2, box2[2] + box2[5] / 2) - max(box1[2] - box1[5] / 2, box2[2] - box2[5] / 2)) intersect_volume = intersect_x * intersect_y * intersect_z box1_volume = box1[3] * box1[4] * box1[5] box2_volume = box2[3] * box2[4] * box2[5] union_volume = box1_volume + box2_volume - intersect_volume iou = intersect_volume / union_volume if union_volume > 0 else 0 return iou def match_boxes(proposals, labels, iou_threshold=0.5): """ proposals 预选框,[[x, y, z, l, w, h], ...] labels 目标,[[x, y, z, l, w, h], ...] return: [(proposal_idx, label_idx), ...] """ matches_scores = {} matches = [] # thres_list = [] # for label in labels: # mix_point = [label[0] - label[3], label[1] - label[4], label[2] - label[5]] # mix_point.extend(label[3:]) # thre = compute_score(mix_point, label) # thres_list.append(thre) for i, proposal in enumerate(proposals): for j, label in enumerate(labels): # iou = compute_iou_3d(proposal, label) iou = compute_score(proposal, label) # iou = bboxes_wasserstein(proposal, label) if j not in matches_scores.keys(): matches_scores[j] = [] matches_scores[j].append(iou) else: matches_scores[j].append(iou) # 构建match_result [label, proposal, value] match_results = [] for key in list(matches_scores.keys()): temp = [] temp.append(key) idx = np.argmin(matches_scores[key]) temp.append(idx) temp.append(matches_scores[key][idx]) match_results.append(temp) temp_min = {} for res in match_results: key = res[1] value = res[2] if key not in temp_min.keys() or value < temp_min[key]: temp_min[key] = value new_match_results = [] for key in list(temp_min.keys()): value = temp_min[key] for res in match_results: if res[1] == key and res[2] == value: new_match_results.append(res) for res in new_match_results: matches.append([res[1], res[0]]) return matches def matched_pairs(fixed_results, moving_results, elastix_points): # get info paths elastix_points = os.path.join(elastix_points, 'outputpoints.txt') # deal with json fixed_infos = json.load(open(fixed_results, 'r')) fixed_points = fixed_infos['annotationSessions'][0]['annotationSet'] fixed_list = [] for point in fixed_points: temp = [] labels = point['labelProperties'] left_point, right_point = point['coordinates'][0], point['coordinates'][1] temp.extend([(left_point['x'] + right_point['x']) / 2.0, (left_point['y'] + right_point['y']) / 2.0, (left_point['z'] + right_point['z']) / 2.0]) temp.extend([abs(left_point['x'] - right_point['x']), abs(left_point['y'] - right_point['y']), abs(left_point['z'] - right_point['z'])]) temp.extend([value['propertyID'] for value in labels if value['labelID'] == 1]) temp.extend([value['propertyID'] for value in labels if value['labelID'] == 2]) fixed_list.append(temp) moving_infos = json.load(open(moving_results, 'r')) moving_points = moving_infos['annotationSessions'][0]['annotationSet'] moving_list = [] for point in moving_points: temp = [] labels = point['labelProperties'] left_point, right_point = point['coordinates'][0], point['coordinates'][1] temp.extend([(left_point['x'] + right_point['x']) / 2.0, (left_point['y'] + right_point['y']) / 2.0, (left_point['z'] + right_point['z']) / 2.0]) temp.extend([abs(left_point['x'] - right_point['x']), abs(left_point['y'] - right_point['y']), abs(left_point['z'] - right_point['z'])]) temp.extend([value['propertyID'] for value in labels if value['labelID'] == 1]) temp.extend([value['propertyID'] for value in labels if value['labelID'] == 2]) moving_list.append(temp) new_fixed_list = [] # deal with txt input_pattern = r'InputPoint\s*=\s*\[\s*([-+]?\d*\.\d+|\d+)\s+([-+]?\d*\.\d+|\d+)\s+([-+]?\d*\.\d+|\d+)\s*\]' output_pattern = r'OutputPoint\s*=\s*\[\s*([-+]?\d*\.\d+|\d+)\s+([-+]?\d*\.\d+|\d+)\s+([-+]?\d*\.\d+|\d+)\s*\]' fixed_match = [] with open(elastix_points, 'r') as file: for line in file: line = line.strip() intput_points, output_points = line.split(';')[2].strip(), line.split(';')[4].strip() input_match = re.search(input_pattern, intput_points) input_x = float(input_match.group(1)) input_y = float(input_match.group(2)) input_z = float(input_match.group(3)) output_match = re.search(output_pattern, output_points) output_x = float(output_match.group(1)) output_y = float(output_match.group(2)) output_z = float(output_match.group(3)) temp = [] inputs = [input_x, input_y, input_z] for i, groups in enumerate(fixed_list): coord = groups[:3] if all(a == round(b, 6) for a, b in zip(inputs, coord)): temp.extend([output_x, output_y, output_z]) temp.extend(groups[3:]) new_fixed_list.append(temp) fixed_match.append(i) # match mew_fixed moving matched_results = {'matched': [], 'missing': [], 'new': []} matched_list = match_boxes(new_fixed_list, moving_list) temp1 = [] temp2 = [] for info in matched_list: matched_results['matched'].append([fixed_list[fixed_match[info[0]]], moving_list[info[1]]]) temp1.append(info[0]) temp2.append(info[1]) matched_results['missing'] = [fixed_list[fixed_match[i]] for i in list(range(len(new_fixed_list))) if i not in temp1] matched_results['new'] = [moving_list[i] for i in list(range(len(moving_list))) if i not in temp2] return matched_results if __name__ == '__main__': fixed_results = '/fileser/maozj/Downloads/lung_alg_results/proxima/NODULE/1.2.840.113619.2.55.3.34211586.404.1416367707.480.4.mha' moving_results = '/fileser/maozj/Downloads/lung_alg_results/proxima/NODULE/1.2.840.113619.2.55.3.34211586.404.1416367672.325.4.mha' moving_points = '/root/Documents/GroupLung/demo_tmp/point_moving' matched_pairs(fixed_results, moving_results, moving_points)