我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用nibabel.load()。
def generate_patch_probs(path, patch_locations, patch_size, im_size): x, y, z = patch_locations seg = nib.load(glob.glob(os.path.join(path, '*_seg.nii.gz'))[0]).get_data().astype(np.float32) p = [] for i in range(len(x)): for j in range(len(y)): for k in range(len(z)): patch = seg[int(x[i] - patch_size / 2) : int(x[i] + patch_size / 2), int(y[j] - patch_size / 2) : int(y[j] + patch_size / 2), int(z[k] - patch_size / 2) : int(z[k] + patch_size / 2)] patch = (patch > 0).astype(np.float32) percent = np.sum(patch) / (patch_size * patch_size * patch_size) p.append((1 - np.abs(percent - 0.5)) * percent) p = np.asarray(p, dtype=np.float32) p[p == 0] = np.amin(p[np.nonzero(p)]) p = p / np.sum(p) return p
def train_xgboost(): df = pd.read_csv('survival_data.csv', index_col=0, encoding = 'UTF-7') p = np.array([np.mean(np.load('training/%s_flair.nii.gz.npy' % str(id)), axis=0) for id in folder_names_train]) q = np.array([np.mean(np.load('training/%s_t1.nii.gz.npy' % str(id)), axis=0) for id in folder_names_train]) r = np.array([np.mean(np.load('training/%s_t1ce.nii.gz.npy' % str(id)), axis=0) for id in folder_names_train]) s = np.array([np.mean(np.load('training/%s_t2.nii.gz.npy' % str(id)), axis=0) for id in folder_names_train]) y=np.array([]) t=0 z=np.array([]) for ind in range(len(folder_names_train)): try: temp = df.get_value(str(folder_names_train[ind]),'Survival') y=np.append(y,temp) temp = df.get_value(str(folder_names_train[ind]),'Age') z=np.append(z,np.array([temp])) except Exception as e: t+=1 print (t,str(e),"Label Not found, deleting entry") y=np.append(y,0) z=np.array([[v] for v in z]) t=np.concatenate((p,q),axis=1) u=np.concatenate((r,s),axis=1) x=np.concatenate((t,u),axis=1) #print(x.shape) #print (x) #print (x.shape,z.shape) x=np.concatenate((x,z),axis=1) #print (x) #clf=linear_model.LogisticRegression(C=1e5) #clf = RandomForestRegressor() clf = xgb.XGBRegressor() clf.fit(x,y) return clf
def load_nifti(filename, with_affine=False): """ load image from NIFTI file Parameters ---------- filename : str filename of NIFTI file with_affine : bool if True, returns affine parameters Returns ------- data : np.ndarray image data """ img = nib.load(filename) data = img.get_data() data = np.copy(data, order="C") if with_affine: return data, img.affine return data
def load_channels(self,normalize = False): modalities = [] modalities.append(nib.load(self.FLAIR_FILE)) modalities.append(nib.load(self.T1_FILE)) channels = np.zeros( modalities[0].shape + (2,), dtype=np.float32) for index_mod, mod in enumerate(modalities): if self.data_augmentation: channels[:, :, :, index_mod] = flip_plane(np.asarray(mod.dataobj)) else: channels[:,:,:,index_mod] = np.asarray(mod.dataobj) if normalize: channels[:, :, :, index_mod] = normalize_image(channels[:,:,:,index_mod], mask = self.load_ROI_mask() ) return channels
def load_ROI_mask(self): proxy = nib.load(self.FLAIR_FILE) image_array = np.asarray(proxy.dataobj) mask = np.ones_like(image_array) mask[np.where(image_array < 90)] = 0 # img = nib.Nifti1Image(mask, proxy.affine) # nib.save(img, join(modalities_path,'mask.nii.gz')) struct_element_size = (20, 20, 20) mask_augmented = np.pad(mask, [(21, 21), (21, 21), (21, 21)], 'constant', constant_values=(0, 0)) mask_augmented = binary_closing(mask_augmented, structure=np.ones(struct_element_size, dtype=bool)).astype( np.int) return mask_augmented[21:-21, 21:-21, 21:-21].astype('bool')
def get_subject_shape(self): if self.booleanLabels: proxy = nib.load(self.GT_FILE) elif self.booleanT1: proxy = nib.load(self.T1_FILE) elif self.booleanT1c: proxy = nib.load(self.T1c_FILE) elif self.booleanT2: proxy = nib.load(self.T2_FILE) elif self.booleanFLAIR: proxy = nib.load(self.FLAIR_FILE) else: raise ValueError("[src.helpers.Subject] No modalities are given for subject: " + self.id) return proxy.shape
def load_channels(self): flair = nib.load(self.FLAIR_FILE) t1 = nib.load(self.T1_FILE) t1c = nib.load(self.T1c_FILE) t2 = nib.load(self.T2_FILE) to_int = lambda b: 1 if b else 0 num_input_modalities = to_int(self.booleanFLAIR) + to_int(self.booleanT1) + to_int(self.booleanT1c) + to_int( self.booleanT2) channels = np.zeros((num_input_modalities,) + flair.shape, dtype=np.float32) channels[0] = np.asarray(flair.dataobj) if self.booleanFLAIR is True else None channels[to_int(self.booleanFLAIR)] = np.asarray(t1.dataobj) if self.booleanT1 is True else None channels[to_int(self.booleanFLAIR) + to_int(self.booleanT1)] = np.asarray( t1c.dataobj) if self.booleanT1c is True else None channels[to_int(self.booleanFLAIR) + to_int(self.booleanT1) + to_int(self.booleanT1c)] = np.asarray( t2.dataobj) if self.booleanT2 is True else None return channels
def load_channels(self, normalize=False): modalities = [] modalities.append(nib.load(self.FLAIR_FILE)) modalities.append(nib.load(self.T1_FILE)) modalities.append(nib.load(self.T1c_FILE)) modalities.append(nib.load(self.T2_FILE)) channels = np.zeros(modalities[0].shape + (4,), dtype=np.float32) if normalize: mask = self.load_ROI_mask() for index_mod, mod in enumerate(modalities): if self.data_augmentation_flag: channels[:, :, :, index_mod] = flip_plane(np.asarray(mod.dataobj), plane=self.data_augmentation) else: channels[:, :, :, index_mod] = np.asarray(mod.dataobj) if normalize: channels[:, :, :, index_mod] = normalize_image(channels[:, :, :, index_mod], mask=mask) return channels
def load_channels(self, normalize=False): modalities = [] modalities.append(nib.load(self.T2_FILE)) modalities.append(nib.load(self.T1_FILE)) channels = np.zeros(modalities[0].shape + (2,), dtype=np.float32) for index_mod, mod in enumerate(modalities): if self.data_augmentation: channels[:, :, :, index_mod] = flip_plane(np.asarray(mod.dataobj), plane=self.data_augmentation) else: channels[:,:,:,index_mod] = np.asarray(mod.dataobj) if normalize: channels[:, :, :, index_mod] = normalize_image(channels[:,:,:,index_mod], mask = self.load_ROI_mask() ) return channels
def test_individual_stability_matrix(): """ Tests individual_stability_matrix method on three gaussian blobs. """ import utils import numpy as np import scipy as sp desired = np.load(home + '/git_repo/PyBASC/tests/ism_test.npy') blobs = generate_blobs() ism = utils.individual_stability_matrix(blobs, 20, 3) #how to use test here? # np.corrcoef(ism.flatten(),desired.flatten()) # np.testing.assert_equal(ism,desired) # # corr=np.array(sp.spatial.distance.cdist(ism, desired, metric = 'correlation')) # assert False
def test_ndarray_to_vol(): import basc import nibabel as nb subject_file = home + '/git_repo/PyBASC/sample_data/sub1/Func_Quarter_Res.nii.gz' subject_file = home + '/git_repo/PyBASC/sample_data/test.nii.gz' data = nb.load(subject_file).get_data().astype('float32') roi_mask_file= home + '/git_repo/PyBASC/masks/LC_Quarter_Res.nii.gz' print( 'Data Loaded') roi_mask_file_nb = nb.load(roi_mask_file) roi_mask_nparray = nb.load(roi_mask_file).get_data().astype('float32').astype('bool') roi1data = data[roi_mask_nparray] data_array=roi1data sample_file=subject_file filename=home + '/git_repo/PyBASC/sample_data/ndarray_to_vol_test.nii.gz' basc.ndarray_to_vol(data_array, roi_mask_file, roi_mask_file, filename)
def analyse_data(input_dir): shapes = [] relative_volumes = [] for folder in get_sub_folders(input_dir): print(folder) for sub_folder in get_sub_folders(os.path.join(input_dir, folder)): image_type = get_image_type_from_folder_name(sub_folder) # do not save the raw data (too heavy) if image_type != '.OT': continue path = os.path.join(input_dir, folder, sub_folder) filename = next(filename for filename in os.listdir(path) if get_extension(filename) == '.nii') path = os.path.join(path, filename) im = nib.load(path) image = im.get_data() shape = image.shape shapes.append(shape) relative_volumes.append(100 * np.sum(image) / np.cumprod(shape)[-1]) return shapes, relative_volumes # train
def export_data(prediction_dir, nii_image_dir, tfrecords_dir, export_dir, transformation=None): for file_path in os.listdir(prediction_dir): name, prediction, probability = read_prediction_file(os.path.join(prediction_dir, file_path)) if transformation: image, ground_truth = get_original_image(os.path.join(tfrecords_dir, name + '.tfrecord'), False) prediction = transformation.transform_image(prediction, probability, image) # build cv_predictions .nii image img = nib.Nifti1Image(prediction, np.eye(4)) img.set_data_dtype(dtype=np.uint8) path = os.path.join(nii_image_dir, name) adc_name = next(l for l in os.listdir(path) if 'MR_ADC' in l) export_image = nib.load(os.path.join(nii_image_dir, name, adc_name, adc_name + '.nii')) i = export_image.get_data() i[:] = img.get_data() # set name to specification and export _id = next(l for l in os.listdir(path) if 'MR_MTT' in l).split('.')[-1] export_path = os.path.join(export_dir, 'SMIR.' + name + '.' + _id + '.nii') nib.save(export_image, os.path.join(export_path))
def load(self, path): """ A method that load the image data and associated metadata. Parameters ---------- path: str the path to the image to be loaded. Return ------ image: Image the loaded image. """ _image = nibabel.load(path) return Image(spacing=_image.header.get_zooms(), data_type="scalar", metadata={"path": path}, data=_image.get_data())
def _get_investigation_template(bids_directory, mri_par_names): this_path = os.path.realpath( __file__[:-1] if __file__.endswith('.pyc') else __file__) template_path = opj( *(psplit(this_path)[:-1] + ("i_investigation_template.txt", ))) investigation_template = open(template_path).read() title = psplit(bids_directory)[-1] if exists(opj(bids_directory, "dataset_description.json")): with open(opj(bids_directory, "dataset_description.json"), "r") \ as description_dict_fp: description_dict = json.load(description_dict_fp) if "Name" in description_dict: title = description_dict["Name"] investigation_template = investigation_template.replace( "[TODO: TITLE]", title) investigation_template = investigation_template.replace( "[TODO: MRI_PAR_NAMES]", ";".join(mri_par_names)) return investigation_template
def get_all_nifti_data(directory, map_names=None, deferred=True): """Get the data of all the nifti volumes in the given directory. If map_names is given we will only load the given map names. Else, we load all .nii and .nii.gz files in the given directory. Args: directory (str): the directory from which we want to read a number of maps map_names (list of str): the names of the maps we want to use. If given, we only use and return these maps. deferred (boolean): if True we return an deferred loading dictionary instead of a dictionary with the values loaded as arrays. Returns: dict: A dictionary with the volumes. The keys of the dictionary are the filenames without the extension of the .nii(.gz) files in the given directory. """ proxies = load_all_niftis(directory, map_names=map_names) if deferred: return DeferredActionDict(lambda _, item: item.get_data(), proxies) else: return {k: v.get_data() for k, v in proxies.items()}
def get_cut_size(data_path): list_D_s, list_D_e, list_H_s, list_H_e, list_W_s, list_W_e = [], [], [], [], [], [] for i in range(1, 10+1): subject_name = 'subject-%d-' % i f = os.path.join(data_path, subject_name+'T1.hdr') img = nib.load(f) inputs_tmp = img.get_data() D_s, D_e, H_s, H_e, W_s, W_e = cut_edge(inputs_tmp, margin) # set how many zero margins to keep list_D_s.append(D_s) list_D_e.append(D_e) list_H_s.append(H_s) list_H_e.append(H_e) list_W_s.append(W_s) list_W_e.append(W_e) min_D_s, max_D_e, min_H_s, max_H_e, min_W_s, max_W_e = \ min(list_D_s), max(list_D_e), min(list_H_s), max(list_H_e), min(list_W_s), max(list_W_e) print("new size: ", min_D_s, max_D_e, min_H_s, max_H_e, min_W_s, max_W_e) return min_D_s, max_D_e, min_H_s, max_H_e, min_W_s, max_W_e
def check_atlas(atlas): """Validation of atlas input.""" # when its a name for pre-defined atlas if isinstance(atlas, str): if not pexists(atlas): # just a name atlas = atlas.lower() if atlas not in parcellate.atlas_list: raise ValueError('Invalid choice of atlas. Accepted : {}'.format(parcellate.atlas_list)) elif os.path.isdir(atlas): # cortical atlas in Freesurfer org if not parcellate.check_atlas_annot_exist(atlas): raise ValueError('Given atlas folder does not contain Freesurfer label annot files. ' 'Needed : given_atlas_dir/label/?h.aparc.annot') elif pexists(atlas): # may be a volumetric atlas? try: atlas = nibabel.load(atlas) except: traceback.print_exc() raise ValueError('Unable to read the provided image volume. Must be a nifti 2d volume, readable by nibabel.') else: raise ValueError('Unable to decipher or use the given atlas.') else: raise NotImplementedError('Atlas must be a string, providing a name or path to Freesurfer folder or a 3D nifti volume.') return atlas
def _add_provenance(in_file, settings, air_msk, rot_msk): from mriqc import __version__ as version from niworkflows.nipype.utils.filemanip import hash_infile import nibabel as nb import numpy as np air_msk_size = nb.load(air_msk).get_data().astype( np.uint8).sum() rot_msk_size = nb.load(rot_msk).get_data().astype( np.uint8).sum() out_prov = { 'md5sum': hash_infile(in_file), 'version': version, 'software': 'mriqc', 'warnings': { 'small_air_mask': bool(air_msk_size < 5e5), 'large_rot_frame': bool(rot_msk_size > 500), } } if settings: out_prov['settings'] = settings return out_prov
def _binarize(in_file, threshold=0.5, out_file=None): import os.path as op import numpy as np import nibabel as nb if out_file is None: fname, ext = op.splitext(op.basename(in_file)) if ext == '.gz': fname, ext2 = op.splitext(fname) ext = ext2 + ext out_file = op.abspath('{}_bin{}'.format(fname, ext)) nii = nb.load(in_file) data = nii.get_data() data[data <= threshold] = 0 data[data > 0] = 1 hdr = nii.header.copy() hdr.set_data_dtype(np.uint8) nb.Nifti1Image(data.astype(np.uint8), nii.affine, hdr).to_filename( out_file) return out_file
def image_gradient(in_file, snr, out_file=None): """Computes the magnitude gradient of an image using numpy""" import os.path as op import numpy as np import nibabel as nb from scipy.ndimage import gaussian_gradient_magnitude as gradient if out_file is None: fname, ext = op.splitext(op.basename(in_file)) if ext == '.gz': fname, ext2 = op.splitext(fname) ext = ext2 + ext out_file = op.abspath('{}_grad{}'.format(fname, ext)) imnii = nb.load(in_file) data = imnii.get_data().astype(np.float32) # pylint: disable=no-member datamax = np.percentile(data.reshape(-1), 99.5) data *= 100 / datamax grad = gradient(data, 3.0) gradmax = np.percentile(grad.reshape(-1), 99.5) grad *= 100. grad /= gradmax nb.Nifti1Image(grad, imnii.get_affine(), imnii.get_header()).to_filename(out_file) return out_file
def thresh_image(in_file, thres=0.5, out_file=None): """Thresholds an image""" import os.path as op import nibabel as nb if out_file is None: fname, ext = op.splitext(op.basename(in_file)) if ext == '.gz': fname, ext2 = op.splitext(fname) ext = ext2 + ext out_file = op.abspath('{}_thresh{}'.format(fname, ext)) im = nb.load(in_file) data = im.get_data() data[data < thres] = 0 data[data > 0] = 1 nb.Nifti1Image( data, im.get_affine(), im.get_header()).to_filename(out_file) return out_file
def __init__(self, func, mask, seg=None, tr=None, title=None, figsize=DINA4_LANDSCAPE): func_nii = nb.load(func) self.func_data = func_nii.get_data() self.mask_data = nb.load(mask).get_data() self.ntsteps = self.func_data.shape[-1] self.tr = tr if tr is None: self.tr = func_nii.get_header().get_zooms()[-1] if seg is None: self.seg_data = 2 * self.mask_data else: self.seg_data = nb.load(seg).get_data() self.fig = plt.figure(figsize=figsize) if title is not None: self.fig.suptitle(title, fontsize=20) self.confounds = [] self.spikes = []
def _get_limits(nifti_file, only_plot_noise=False): from builtins import bytes, str # pylint: disable=W0622 if isinstance(nifti_file, (str, bytes)): nii = nb.as_closest_canonical(nb.load(nifti_file)) data = nii.get_data() else: data = nifti_file data_mask = np.logical_not(np.isnan(data)) if only_plot_noise: data_mask = np.logical_and(data_mask, data != 0) vmin = np.percentile(data[data_mask], 0) vmax = np.percentile(data[data_mask], 61) else: vmin = np.percentile(data[data_mask], 0.5) vmax = np.percentile(data[data_mask], 99.5) return vmin, vmax
def symmetrise_axial(self, filename_in, filename_out=None, axis='x', plane_intercept=10, side_to_copy='below', keep_in_data_dimensions=True): pfi_in, pfi_out = get_pfi_in_pfi_out(filename_in, filename_out, self.pfo_in, self.pfo_out) im_segm = nib.load(pfi_in) data_labels = im_segm.get_data() data_symmetrised = symmetrise_data(data_labels, axis=axis, plane_intercept=plane_intercept, side_to_copy=side_to_copy, keep_in_data_dimensions=keep_in_data_dimensions) im_symmetrised = set_new_data(im_segm, data_symmetrised) nib.save(im_symmetrised, pfi_out) print('Symmetrised axis {0}, plane_intercept {1}, image of {2} saved in {3}.'.format(axis, plane_intercept, pfi_in, pfi_out)) return pfi_out
def modify_affine(self, filename_in, affine_in, filename_out, q_form=True, s_form=True, multiplication_side='left'): pfi_in, pfi_out = get_pfi_in_pfi_out(filename_in, filename_out, self.pfo_in, self.pfo_out) if isinstance(affine_in, str): if affine_in.endswith('.txt'): aff = np.loadtxt(connect_path_tail_head(self.pfo_in, affine_in)) else: aff = np.load(connect_path_tail_head(self.pfo_in, affine_in)) elif isinstance(affine_in, np.ndarray): aff = affine_in else: raise IOError('parameter affine_in can be path to an affine matrix .txt or .npy or the numpy array' 'corresponding to the affine transformation.') im = nib.load(pfi_in) new_im = modify_affine_transformation(im, aff, q_form=q_form, s_form=s_form, multiplication_side=multiplication_side) nib.save(new_im, pfi_out)
def apply_small_rotation(self, filename_in, filename_out, angle=np.pi/6, principal_axis='pitch'): if isinstance(angle, list): assert isinstance(principal_axis, list) assert len(principal_axis) == len(angle) new_aff = np.identity(4) for pa, an in zip(principal_axis, angle): aff = get_small_orthogonal_rotation(theta=an, principal_axis=pa) new_aff = new_aff.dot(aff) else: new_aff = get_small_orthogonal_rotation(theta=angle, principal_axis=principal_axis) pfi_in, pfi_out = get_pfi_in_pfi_out(filename_in, filename_out, self.pfo_in, self.pfo_out) im = nib.load(pfi_in) new_im = modify_affine_transformation(im_input=im, new_aff=new_aff, q_form=True, s_form=True, multiplication_side='right') nib.save(new_im, pfi_out)
def modify_translational_part(self, filename_in, filename_out, new_translation): pfi_in, pfi_out = get_pfi_in_pfi_out(filename_in, filename_out, self.pfo_in, self.pfo_out) im = nib.load(pfi_in) if isinstance(new_translation, str): if new_translation.endswith('.txt'): tr = np.loadtxt(connect_path_tail_head(self.pfo_in, new_translation)) else: tr = np.load(connect_path_tail_head(self.pfo_in, new_translation)) elif isinstance(new_translation, np.ndarray): tr = new_translation elif isinstance(new_translation, list): tr = np.array(new_translation) else: raise IOError('parameter new_translation can be path to an affine matrix .txt or .npy or the numpy array' 'corresponding to the new intended translational part.') new_im = replace_translational_part(im, tr) nib.save(new_im, pfi_out)
def values_below_labels(self, segmentation_filename, anatomy_filename, labels=None): """ :param segmentation_filename: :param anatomy_filename: :param labels: :return: pandas series with label names and corresponding vectors of labels values """ pfi_anat = connect_path_tail_head(self.pfo_in, anatomy_filename) pfi_segm = connect_path_tail_head(self.pfo_in, segmentation_filename) assert os.path.exists(pfi_anat) im_anat = nib.load(pfi_anat) assert os.path.exists(pfi_segm) im_segm = nib.load(pfi_segm) labels_list, labels_names = labels_query(labels, segmentation_array=im_segm.get_data()) labels_values = get_values_below_labels(im_segm, im_anat, labels_list) return pa.Series(labels_values, index=labels_names)
def global_dist(self, segm_1_filename, segm_2_filename, where_to_save=None, global_metrics=(global_outline_error, global_dice_score)): pfi_segm1 = connect_path_tail_head(self.pfo_in, segm_1_filename) pfi_segm2 = connect_path_tail_head(self.pfo_in, segm_2_filename) assert os.path.exists(pfi_segm1), pfi_segm1 assert os.path.exists(pfi_segm2), pfi_segm2 if self.verbose > 0: print("\nGlobal distances between segmentations: \n -> {0} \n -> {1} " "\nComputations started!".format(pfi_segm1, pfi_segm2)) im_segm1 = nib.load(pfi_segm1) im_segm2 = nib.load(pfi_segm2) se_global_distances = pa.Series(np.array([d(im_segm1, im_segm2) for d in global_metrics]), index=[d.__name__ for d in global_metrics]) if where_to_save is not None: where_to_save = connect_path_tail_head(self.pfo_out, where_to_save) se_global_distances.to_pickle(where_to_save) return se_global_distances
def relabel(self, pfi_input, pfi_output=None, list_old_labels=(), list_new_labels=()): """ Masks of :func:`labels_manager.tools.manipulations.relabeller.relabeller` using filename """ pfi_in, pfi_out = get_pfi_in_pfi_out(pfi_input, pfi_output, self.pfo_in, self.pfo_out) im_labels = nib.load(pfi_in) data_labels = im_labels.get_data() data_relabelled = relabeller(data_labels, list_old_labels=list_old_labels, list_new_labels=list_new_labels) im_relabelled = set_new_data(im_labels, data_relabelled) nib.save(im_relabelled, pfi_out) print('Relabelled image {0} saved in {1}.'.format(pfi_in, pfi_out)) return pfi_out
def toJSON(stats, seg_file, structure_map): """Combine stats files to a single JSON file""" import json import os import nibabel as nb import numpy as np img = nb.load(seg_file) data = img.get_data() voxel2vol = np.prod(img.header.get_zooms()) idx = np.unique(data) reverse_map = {k:v for v, k in structure_map} out_dict = dict(zip([reverse_map[val] for val in idx], np.bincount(data.flatten())[idx])) for key in out_dict.keys(): out_dict[key] = [out_dict[key], voxel2vol * out_dict[key]] mapper = dict([(0, 'csf'), (1, 'gray'), (2, 'white')]) out_dict.update(**{mapper[idx]: val for idx, val in enumerate(stats)}) out_file = 'segstats.json' with open(out_file, 'wt') as fp: json.dump(out_dict, fp, sort_keys=True, indent=4, separators=(',', ': ')) return os.path.abspath(out_file)
def load_inference(base_path='./data/Test/Test_Subject', nii_index=0): """Load nii data, whose name is, for example, 'Test_Subject01.nii'. Arguments: nii_index: counts from 0. """ filename = base_path + str(nii_index + 1).zfill(2) + '.nii' xs = nib.load(filename).get_data() # Crop black region to reduce nii volumes. dummy_ys = np.zeros_like(xs) xs, *_ = _banish_darkness(xs, dummy_ys) # Normalize images. local_max = np.max(xs, axis=(1, 2), keepdims=True) local_min = np.min(xs, axis=(1, 2), keepdims=True) xs = (xs - local_min) / (local_max - local_min) return xs[None, ..., None]
def select_training_voxels(input_masks, threshold=2, datatype=np.float32): """ Select voxels for training based on a intensity threshold Inputs: - input_masks: list containing all subject image paths for a single modality - threshold: minimum threshold to apply (after normalizing images with 0 mean and 1 std) Output: - rois: list where each element contains the subject binary mask for selected voxels [len(x), len(y), len(z)] """ # load images and normalize their intensities images = [load_nii(image_name).get_data() for image_name in input_masks] images_norm = [(im.astype(dtype=datatype) - im[np.nonzero(im)].mean()) / im[np.nonzero(im)].std() for im in images] # select voxels with intensity higher than threshold rois = [image > threshold for image in images_norm] return rois
def select_voxels_from_previous_model(model, train_x_data, options): """ Select training voxels from image segmentation masks """ # get_scan names and number of modalities used scans = train_x_data.keys() modalities = train_x_data[scans[0]].keys() # select voxels for training. Discard CSF and darker WM in FLAIR. # flair_scans = [train_x_data[s]['FLAIR'] for s in scans] # selected_voxels = select_training_voxels(flair_scans, options['min_th']) # evaluate training scans using the learned model and extract voxels with probability higher than 0.5 seg_mask = [test_scan(model, dict(train_x_data.items()[s:s+1]), options, save_nifti = False) > 0.5 for s in range(len(scans))] # check candidate segmentations: # if no voxels have been selected, return candidate voxels on FLAIR modality > 2 flair_scans = [train_x_data[s]['FLAIR'] for s in scans] images = [load_nii(name).get_data() for name in flair_scans] images_norm = [(im.astype(dtype=np.float32) - im[np.nonzero(im)].mean()) / im[np.nonzero(im)].std() for im in images] seg_mask = [im > 2 if np.sum(seg) == 0 else seg for im, seg in zip(images_norm, seg_mask)] return seg_mask
def default_file_reader(x): def pil_loader(path): return Image.open(path).convert('RGB') def npy_loader(path): return np.load(path) def nifti_loader(path): return nibabel.load(path).get_data() if isinstance(x, str): if x.endswith('.npy'): x = npy_loader(x) elif x.endsiwth('.nii.gz'): x = nifti_loader(x) else: try: x = pil_loader(x) except: raise ValueError('File Format is not supported') #else: #raise ValueError('x should be string, but got %s' % type(x)) return x
def create_3D_distance_matrix(vox_ijk, epi_fname): """Compute distance between voxels in the volume. Parameters ---------- vox_ijk : n x 3 array Indices of voxels included in the ROI. epi_fname : file path Path to image defining the volume space. Returns ------- dmat : array Dense square distance matrix. """ aff = nib.load(epi_fname).affine vox_ras = nib.affines.apply_affine(aff, vox_ijk) dmat = squareform(pdist(vox_ras)) return dmat
def extract_data(exp, subj, roi_info): """Extract timeseries data from each ROI.""" ts_data = {roi: [] for roi in roi_info} n_runs = dict(dots=12, sticks=12, rest=8)[exp] anal_dir = PROJECT["analysis_dir"] ts_temp = op.join(anal_dir, exp, subj, "reg", "epi", "unsmoothed", "run_{}", "res4d_xfm.nii.gz") # Extract ROI data from each run, loading images only once for run in range(1, n_runs + 1): run_data = nib.load(ts_temp.format(run)).get_data() for roi, info in roi_info.iteritems(): roi_ts = extract_from_volume(run_data, info["vox_ijk"]) ts_data[roi].append(roi_ts) # Combine across runs ts_data = {roi: np.hstack(ts_data[roi]).T for roi in roi_info} for roi in roi_info: assert ts_data[roi].shape[1] == len(roi_info[roi]["vox_ijk"]) return ts_data
def create_histo(folder_path, min_val, max_val, num_bins=None): if num_bins is None: num_bins = int(max_val - min_val) print num_bins histogram = {} for f in os.listdir(folder_path): im = nib.load(os.path.join(folder_path, f)) data = im.get_data() hist, bins = np.histogram(data, num_bins, range=(min_val, max_val)) count = 0 for el in bins[0:-1]: try: histogram[el] += hist[count] except: histogram[el] = hist[count] count += 1 return histogram.items()
def reg_dti_pngs(dti, loc, atlas, outdir): """ outdir: directory where output png file is saved fname: name of output file WITHOUT FULL PATH. Path provided in outdir. """ atlas_data = nb.load(atlas).get_data() dti_data = nb.load(dti).get_data() b0_data = dti_data[:,:,:,loc] cmap1 = LinearSegmentedColormap.from_list('mycmap1', ['black', 'magenta']) cmap2 = LinearSegmentedColormap.from_list('mycmap2', ['black', 'green']) fig = plot_overlays(atlas_data, b0_data, (cmap1, cmap2)) # name and save the file fname = os.path.split(dti)[1].split(".")[0] + '.png' plt.savefig(outdir + '/' + fname, format='png')
def tensor2fa(tensors, tensor_name, dti, derivdir, qcdir): ''' outdir: location of output directory. fname: name of output fa map file. default is none (name created based on input file) ''' dti_data = nb.load(dti) affine = dti_data.get_affine() dti_data = dti_data.get_data() # create FA map FA = fractional_anisotropy(tensors.evals) FA[np.isnan(FA)] = 0 # generate the RGB FA map FA = np.clip(FA, 0, 1) RGB = color_fa(FA, tensors.evecs) fname = os.path.split(tensor_name)[1].split(".")[0] + '_fa_rgb.nii.gz' fa = nb.Nifti1Image(np.array(255 * RGB, 'uint8'), affine) nb.save(fa, derivdir + fname) fa_pngs(fa, fname, qcdir)
def resample(self, base, ingested, template): """ Resamples the image such that images which have already been aligned in real coordinates also overlap in the image/voxel space. **Positional Arguments** base: - Image to be aligned ingested: - Name of image after alignment template: - Image that is the target of the alignment """ # Loads images template_im = nb.load(template) base_im = nb.load(base) # Aligns images target_im = nl.resample_img(base_im, target_affine=template_im.get_affine(), target_shape=template_im.get_data().shape, interpolation="nearest") # Saves new image nb.save(target_im, ingested) pass
def _3d_in_file(in_file): ''' if self.inputs.in_file is 3d, return it. if 4d, pick an arbitrary volume and return that. if in_file is a list of files, return an arbitrary file from the list, and an arbitrary volume from that file ''' in_file = filemanip.filename_to_list(in_file)[0] try: in_file = nb.load(in_file) except AttributeError: in_file = in_file if in_file.get_data().ndim == 3: return in_file return nlimage.index_img(in_file, 0)
def test_ROIsPlot(oasis_dir): """ the BET report capable test """ import nibabel as nb import numpy as np labels = nb.load(os.path.join(oasis_dir, 'T_template0_glm_4labelsJointFusion.nii.gz')) data = labels.get_data() out_files = [] ldata = np.zeros_like(data) for i, l in enumerate([1, 3, 4, 2]): ldata[data == l] = 1 out_files.append(os.path.abspath('label%d.nii.gz' % i)) lfile = nb.Nifti1Image(ldata, labels.affine, labels.header) lfile.to_filename(out_files[-1]) roi_rpt = ROIsPlot( generate_report=True, in_file=os.path.join(oasis_dir, 'T_template0.nii.gz'), in_mask=out_files[-1], in_rois=out_files[:-1], colors=['g', 'y'] ) _smoke_test_report(roi_rpt, 'testROIsPlot.svg')
def _copyxform(ref_image, out_image, message=None): # Read in reference and output resampled = nb.load(out_image) orig = nb.load(ref_image) if not np.allclose(orig.affine, resampled.affine): LOG.debug( 'Affines of input and reference images do not match, ' 'FMRIPREP will set the reference image headers. ' 'Please, check that the x-form matrices of the input dataset' 'are correct and manually verify the alignment of results.') # Copy xform infos qform, qform_code = orig.header.get_qform(coded=True) sform, sform_code = orig.header.get_sform(coded=True) header = resampled.header.copy() header.set_qform(qform, int(qform_code)) header.set_sform(sform, int(sform_code)) header['descrip'] = 'xform matrices modified by %s.' % (message or '(unknown)') newimg = resampled.__class__(resampled.get_data(), orig.affine, header) newimg.to_filename(out_image)
def getSlices(self,paths): image,truth = paths image = nib.load(image).get_data(); truth = nib.load(truth).get_data() slicesWithValues = [unique(s) for s in where(truth>0)] sliceAxis = argmin([len(s) for s in slicesWithValues]) slicesWithValues = slicesWithValues[sliceAxis] slc = repeat(-1,3); slc[sliceAxis] = slicesWithValues[0] if not self.padding is None: image, truth = [padImage(im,self.padding) for im in (image[slc][0],truth[slc][0])] else: image, truth = (image[slc][0],truth[slc][0]) return (image,truth)
def preprocess(inputfile, outputfile, order=0, df=None, input_key=None, output_key=None): img = nib.load(inputfile) data = img.get_data() affine = img.affine zoom = img.header.get_zooms()[:3] data, affine = reslice(data, affine, zoom, (1., 1., 1.), order) data = np.squeeze(data) data = np.pad(data, [(0, 256 - len_) for len_ in data.shape], "constant") if order == 0: if df is not None: tmp = np.zeros_like(data) for target, source in zip(df[output_key], df[input_key]): tmp[np.where(data == source)] = target data = tmp data = np.int32(data) assert data.ndim == 3, data.ndim else: data_sub = data - gaussian_filter(data, sigma=1) img = sitk.GetImageFromArray(np.copy(data_sub)) img = sitk.AdaptiveHistogramEqualization(img) data_clahe = sitk.GetArrayFromImage(img)[:, :, :, None] data = np.concatenate((data_clahe, data[:, :, :, None]), 3) data = (data - np.mean(data, (0, 1, 2))) / np.std(data, (0, 1, 2)) assert data.ndim == 4, data.ndim assert np.allclose(np.mean(data, (0, 1, 2)), 0.), np.mean(data, (0, 1, 2)) assert np.allclose(np.std(data, (0, 1, 2)), 1.), np.std(data, (0, 1, 2)) data = np.float32(data) img = nib.Nifti1Image(data, affine) nib.save(img, outputfile)