import os import sys import traceback from multiprocessing import Pool, cpu_count import SimpleITK as sitk import numpy as np from scipy.ndimage.interpolation import zoom import warnings warnings.filterwarnings("ignore") BASE_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) # 添加路径 sys.path.append(BASE_DIR) # from Common.global_var import RAW_NII_DIR, IMAGE_DIR # from Common.image_resample import ItkResample class ItkResample(object): def __init__(self): super(ItkResample, self).__init__() def resample_to_spacing(self, itk_image, target_spacing=(1.0, 1.0, 1.0), interpolator=sitk.sitkLinear, default_value=0.0): source_spacing = itk_image.GetSpacing() source_size = itk_image.GetSize() zoom_factor = np.divide(source_spacing, target_spacing) offset = self._calculate_origin_offset(target_spacing, source_spacing) target_size = np.asarray(np.ceil(np.round(np.multiply(zoom_factor, source_size), decimals=5)), dtype=np.int16) reference_image = self._sitk_new_blank_image(size=target_size, spacing=target_spacing, direction=itk_image.GetDirection(), origin=itk_image.GetOrigin() + offset, default_value=default_value) return self._sitk_resample_to_image(itk_image, reference_image, interpolator=interpolator, default_value=default_value) def resample_to_size(self, itk_image, target_size, interpolator=sitk.sitkLinear, default_value=0.0): source_spacing = itk_image.GetSpacing() source_size = itk_image.GetSize() target_spacing = np.empty(3) for i in range(3): target_spacing[i] = source_size[i] * 1.0 * source_spacing[i] / target_size[i] blank_image = self._sitk_new_blank_image(size=target_size, spacing=target_spacing, direction=itk_image.GetDirection(), origin=itk_image.GetOrigin(), default_value=default_value) return self._sitk_resample_to_image(itk_image, blank_image, interpolator=interpolator, default_value=default_value) def _calculate_origin_offset(self, target_spacing, source_spacing): return np.subtract(target_spacing, source_spacing) / 2.0 def _sitk_new_blank_image(self, size, spacing, direction, origin, default_value=0.0): itk_image = sitk.GetImageFromArray(np.ones(size, dtype=np.float32).T * default_value) itk_image.SetSpacing(spacing) itk_image.SetDirection(direction) itk_image.SetOrigin(origin) return itk_image def _sitk_resample_to_image(self, itk_image, reference_image, interpolator=sitk.sitkLinear, default_value=0., transform=None, output_pixel_type=None): if transform is None: transform = sitk.Transform() transform.SetIdentity() if output_pixel_type is None: output_pixel_type = itk_image.GetPixelID() resample_filter = sitk.ResampleImageFilter() resample_filter.SetInterpolator(interpolator) resample_filter.SetTransform(transform) resample_filter.SetOutputPixelType(output_pixel_type) resample_filter.SetDefaultPixelValue(default_value) resample_filter.SetReferenceImage(reference_image) return resample_filter.Execute(itk_image) class Resample_and_Save(object): def __init__(self, series_ids, data_format, target_spacing, out_nii_gz_path): ''' :param series_ids: series to resample :param data_format: format of input data, '.nii.gz' or '.mha' :param target_spacing: spacing resampled to :param out_nii_gz_path: path to save resampled image ''' self.series_ids = series_ids self.data_formant = data_format self.target_spacing = target_spacing self.out_nii_gz_path = out_nii_gz_path self.itk_resample = ItkResample() def __call__(self): pool = Pool(int(cpu_count() * 0.7)) for series_id in self.series_ids: try: pool.apply_async(self._single_resample, (series_id,)) except Exception as err: traceback.print_exc() print('Outer resample to spacing %.1f throws exception %s, with uid %s!' % ( self.target_spacing, err, series_id)) pool.close() pool.join() def _single_resample(self, series_id): print('Resample uid %s to spacing %.1f.' % (series_id, self.target_spacing)) raw_nii_gz_file = os.path.join(RAW_NII_DIR, series_id + self.data_formant) resampled_nii_gz_file = os.path.join(self.out_nii_gz_path, series_id + self.data_formant) if not os.path.exists(resampled_nii_gz_file): try: # read image itk_image = sitk.ReadImage(raw_nii_gz_file) # resample image itk_image_resample = self.itk_resample.resample_to_spacing( itk_image=itk_image, target_spacing=(tuple([self.target_spacing] * 3)), interpolator=sitk.sitkBSpline) itk_image_resample.SetOrigin(itk_image.GetOrigin()) itk_image_resample.SetSpacing([self.target_spacing] * 3) # save image sitk.WriteImage(itk_image_resample, resampled_nii_gz_file) except Exception as err: traceback.print_exc() print('Inner resample to spacing %.1f throws exception %s, with uid %s!' % ( self.target_spacing, err, series_id)) if __name__ == '__main__': data_format = '.nii.gz' target_spacing = 1 RAW_NII_DIR = '/data/DATA/IMAGE/RAW_NII' IMAGE_DIR = '/home/DATA/IMAGE/SPACING_1.0_NII_BSPLINE' series_ids = os.listdir(RAW_NII_DIR) series_ids = [series_id.replace(data_format, '') for series_id in series_ids] # out_nii_gz_path = os.path.join(IMAGE_DIR, 'res%d' % target_spacing) if not os.path.exists(IMAGE_DIR): os.makedirs(IMAGE_DIR) resample_and_save = Resample_and_Save(series_ids, data_format, target_spacing, IMAGE_DIR) resample_and_save()