我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用numpy.s_()。
def resize_image(image,target_shape, pad_value = 0): assert isinstance(target_shape, list) or isinstance(target_shape, tuple) add_shape, subs_shape = [], [] image_shape = image.shape shape_difference = np.asarray(target_shape, dtype=int) - np.asarray(image_shape,dtype=int) for diff in shape_difference: if diff < 0: subs_shape.append(np.s_[int(np.abs(np.ceil(diff/2))):int(np.floor(diff/2))]) add_shape.append((0, 0)) else: subs_shape.append(np.s_[:]) add_shape.append((int(np.ceil(1.0*diff/2)),int(np.floor(1.0*diff/2)))) output = np.pad(image, tuple(add_shape), 'constant', constant_values=(pad_value, pad_value)) output = output[subs_shape] return output
def test_prepend_not_one(self): assign = self.assign s_ = np.s_ a = np.zeros(5) # Too large and not only ones. assert_raises(ValueError, assign, a, s_[...], np.ones((2, 1))) with warnings.catch_warnings(): # Will be a ValueError as well. warnings.simplefilter("error", DeprecationWarning) assert_raises(DeprecationWarning, assign, a, s_[[1, 2, 3],], np.ones((2, 1))) assert_raises(DeprecationWarning, assign, a, s_[[[1], [2]],], np.ones((2,2,1)))
def _get_data(path, preprocess): data = sio.loadmat(path)['gestures'] data = [np.transpose(np.delete(segment.astype(np.float32), np.s_[7:192:8], 0)) for segment in data.flat] if preprocess: data = list(Context.parallel(jb.delayed(preprocess)(segment, **PREPROCESS_KARGS) for segment in data)) return data # @cached # def _get_data(path, bandstop, cut, downsample): # data = sio.loadmat(path)['gestures'] # data = [np.transpose(np.delete(segment.astype(np.float32), np.s_[7:192:8], 0)) # for segment in data.flat] # if bandstop: # data = list(Context.parallel(jb.delayed(get_bandstop)(segment) for segment in data)) # if cut is not None: # data = list(Context.parallel(jb.delayed(cut)(segment, framerate=FRAMERATE) for segment in data)) # if downsample > 1: # data = [segment[::downsample].copy() for segment in data] # return data
def __getitem__(self, *args): """epochs, units""" # by default, keep all units unitslice = slice(None, None, None) if isinstance(*args, int): epochslice = args[0] elif isinstance(*args, EpochArray): epochslice = args[0] else: try: slices = np.s_[args]; slices = slices[0] if len(slices) > 2: raise IndexError("only [epochs, units] slicing is supported at this time!") elif len(slices) == 2: epochslice, unitslice = slices else: epochslice = slices[0] except TypeError: # only epoch to slice: epochslice = slices return epochslice, unitslice
def __getitem__(self, *args): """epochs, signals""" # by default, keep all signals signalslice = slice(None, None, None) if isinstance(*args, int): epochslice = args[0] elif isinstance(*args, core.EpochArray): epochslice = args[0] else: try: slices = np.s_[args]; slices = slices[0] if len(slices) > 2: raise IndexError("only [epochs, signal] slicing is supported at this time!") elif len(slices) == 2: epochslice, signalslice = slices else: epochslice = slices[0] except TypeError: # only epoch to slice: epochslice = slices return epochslice, signalslice
def get_edge_mask(self, subdomain=None): '''Get faces which are fully in subdomain. ''' if subdomain is None: # http://stackoverflow.com/a/42392791/353337 return numpy.s_[:] if subdomain not in self.subdomains: self._mark_vertices(subdomain) # A face is inside if all its edges are in. # An edge is inside if all its nodes are in. is_in = self.subdomains[subdomain]['vertices'][self.idx_hierarchy] # Take `all()` over the first index is_inside = numpy.all(is_in, axis=tuple(range(1))) if subdomain.is_boundary_only: # Filter for boundary is_inside = numpy.logical_and(is_inside, self.is_boundary_edge) return is_inside
def get_face_mask(self, subdomain): '''Get faces which are fully in subdomain. ''' if subdomain is None: # http://stackoverflow.com/a/42392791/353337 return numpy.s_[:] if subdomain not in self.subdomains: self._mark_vertices(subdomain) # A face is inside if all its edges are in. # An edge is inside if all its nodes are in. is_in = self.subdomains[subdomain]['vertices'][self.idx_hierarchy] # Take `all()` over all axes except the last two (face_ids, cell_ids). n = len(is_in.shape) is_inside = numpy.all(is_in, axis=tuple(range(n-2))) if subdomain.is_boundary_only: # Filter for boundary is_inside = numpy.logical_and(is_inside, self.is_boundary_face) return is_inside
def get_cell_mask(self, subdomain=None): if subdomain is None: # http://stackoverflow.com/a/42392791/353337 return numpy.s_[:] if subdomain.is_boundary_only: # There are no boundary cells return numpy.array([]) if subdomain not in self.subdomains: self._mark_vertices(subdomain) is_in = self.subdomains[subdomain]['vertices'][self.idx_hierarchy] # Take `all()` over all axes except the last one (cell_ids). n = len(is_in.shape) return numpy.all(is_in, axis=tuple(range(n-1)))
def get_features_old(self, index): try: self.features_array except AttributeError: if self.opt['mode'] == 'att': self.features_array = np.zeros((2048,14,14), dtype='f') elif self.opt['mode'] == 'noatt': self.features_array = np.zeros((2048), dtype='f') if self.opt['mode'] == 'att': self.dataset_features.read_direct(self.features_array, np.s_[index,:2048,:14,:14], np.s_[:2048,:14,:14]) elif self.opt['mode'] == 'noatt': self.dataset_features.read_direct(self.features_array, np.s_[index,:2048], np.s_[:2048]) return self.features_array
def split_array(gle, gre, shape, psize): """ Split array into px*py*pz subarrays. """ n_d = np.array(shape, dtype=np.int64) dds = (gre-gle)/shape left_edges = [] right_edges = [] shapes = [] slices = [] for i in range(psize[0]): for j in range(psize[1]): for k in range(psize[2]): piece = np.array((i, j, k), dtype=np.int64) lei = n_d * piece // psize rei = n_d * (piece + np.ones(3, dtype=np.int64)) // psize lle = gle + lei*dds lre = gle + rei*dds left_edges.append(lle) right_edges.append(lre) shapes.append(rei-lei) slices.append(np.s_[lei[0]:rei[0], lei[1]: rei[1], lei[2]:rei[2]]) return left_edges, right_edges, shapes, slices
def voc2007_classification_generator2(which, batch_size, input_size, outer_input_size, shuffle=True,# seed=0, color_transform=None, random_mirror=False): path = os.path.expandvars('$VOC2007_DIR/ImageSets/Main') assert which in ['test', 'val'] imgs, C = dd.io.load('{}.h5'.format(which), ['/data', '/labels']) if shuffle: rs = np.random.RandomState() while True: II = rs.randint(len(imgs), size=batch_size) ii, cc = imgs[II], C[II] if random_mirror and rs.randint(2) == 1: ii = ii[:, :, ::-1] yield ii, cc else: for i in range(len(imgs)//batch_size): ss = np.s_[i*batch_size:(i+1)*batch_size] yield imgs[ss], C[ss]
def flush(self): super(PiBayerArray, self).flush() self._demo = None data = self.getvalue()[-6404096:] if data[:4] != b'BRCM': raise PiCameraValueError('Unable to locate Bayer data at end of buffer') # Strip header data = data[32768:] # Reshape into 2D pixel values data = np.frombuffer(data, dtype=np.uint8).\ reshape((1952, 3264))[:1944, :3240] # Unpack 10-bit values; every 5 bytes contains the high 8-bits of 4 # values followed by the low 2-bits of 4 values packed into the fifth # byte data = data.astype(np.uint16) << 2 for byte in range(4): data[:, byte::5] |= ((data[:, 4::5] >> ((4 - byte) * 2)) & 3) data = np.delete(data, np.s_[4::5], 1) # XXX Should test camera's vflip and hflip settings here and adjust self.array = np.zeros(data.shape + (3,), dtype=data.dtype) self.array[1::2, 0::2, 0] = data[1::2, 0::2] # Red self.array[0::2, 0::2, 1] = data[0::2, 0::2] # Green self.array[1::2, 1::2, 1] = data[1::2, 1::2] # Green self.array[0::2, 1::2, 2] = data[0::2, 1::2] # Blue
def update_display(self): ii = self.settings['index'] plane = self.settings['plane'] if plane == 'xy': arr_slice = np.s_[ii,:,:] index_max = self.dat['count_rate_map'].shape[0] elif plane == 'yz': arr_slice = np.s_[:,:,ii] index_max = self.dat['count_rate_map'].shape[2] elif plane == 'xz': arr_slice = np.s_[:,ii,:] index_max = self.dat['count_rate_map'].shape[1] self.settings.index.change_min_max(0, index_max) self.imview.setImage(self.dat['count_rate_map'][arr_slice], autoLevels=self.settings['auto_level'], ) other_ax = dict(xy='z', yz='x', xz='y' )[plane] self.info_label.setText("{} plane {}={} um (index={})".format( plane, other_ax, self.dat[other_ax+'_array'][ii], ii))
def compute1didx(extar,slc): x1=np.argmin(np.abs(extar[0]-slc[0])) x2=np.argmin(np.abs(extar[0]-slc[1])) if len(extar) == 2: y1=np.argmin(np.abs(extar[1]-slc[2])) y2=np.argmin(np.abs(extar[1]-slc[3])) if x1==x2: IDX=np.s_[x1,y1:y2] elif y1==y2: IDX=np.s_[x1:x2,y1] if len(extar) == 3: z1=np.argmin(np.abs(extar[2]-slc[4])) z2=np.argmin(np.abs(extar[2]-slc[5])) if (x1==x2 and y1==y2): IDX=np.s_[x1,y1,z1:z2] if (y1==y2 and z1==z2): IDX=np.s_[x1:x2,y1,z1] if (x1==x2 and z1==z2): IDX=np.s_[x1,y1:y2,z1] else: IDX=np.s_[x1:x2] return IDX
def get_HDF_cell_WSE(hf, cell_number, flow_area): with h5py.File(hdf_filename,'r') as hf: flow_areas = hf['Results']['Unsteady']['Output']['Output Blocks']\ ['Base Output']['Unsteady Time Series']['2D Flow Areas'] dataset = flow_areas[flow_area]['Water Surface'] timesteps = dataset.shape[0] data_list = np.zeros((timesteps,), dtype='Float64') dataset.read_direct(data_list, np.s_[0:timesteps,cell_number], np.s_[0:timesteps]) data_list = np.array(data_list).tolist() return data_list # This will go through all of the 1D and 2D observed points listed in the two_dim_coords and one_dim_comp_paths txt files # Without those two files, the program will not run. This function returns data dictionaries for each gage
def push(self, values: np.ndarray): """ Push values to buffer. If buffer can't store all values a ValueError is raised """ n = len(values) if len(self) + n > self.size: raise ValueError("Too much data to push to RingBuffer") slide_1 = np.s_[self.right_index:min(self.right_index + n, self.size)] slide_2 = np.s_[:max(self.right_index + n - self.size, 0)] with self.__data.get_lock(): data = np.frombuffer(self.__data.get_obj(), dtype=np.complex64) data[slide_1] = values[:slide_1.stop - slide_1.start] data[slide_2] = values[slide_1.stop - slide_1.start:] self.right_index += n self.__length.value += n
def load_sym(self): """Load isotopic and elemental symbols and masses.""" el_sym = pd.read_csv(join(self.path_yldgen, 'sym_atomicnum.txt'), delim_whitespace=True, usecols=[0, 1], names=['num', 'el']) self.atomic_num = np.array(el_sym['num']) self.element_all = np.array(el_sym['el']) snii_sym = pd.read_csv(join(self.path_yldgen, 'species.txt'), delim_whitespace=True, skiprows=1, usecols=[1, 2], names=['name', 'mass']) self.snii_sym = np.array(snii_sym['name']) self.snii_sym_mass = np.array(snii_sym['mass']) self.n_snii_sym = len(self.snii_sym) u, indices = np.unique([item.rstrip('0123456789') for item in self.snii_sym], return_index=True) indices_s = np.argsort(indices) self.element = np.delete(u[indices_s], np.s_[13, 14]) self.n_elements = len(self.element)
def downsample_idx(N, N_max=100, axis=0, method='equidist'): if N > N_max: if method == 'equidist': step = (N - 1) / N_max idx_cont = (np.arange(N_max) + 0.5) * step # add small slope to idx-cont, to avoid rounding neighbouring values to the same integer. # max absolute value added/subtracted is 1/10 of the step size adjust = ((idx_cont * 2 / (N - 1)) - 1) * step / 10 idx_cont += adjust idx = np.array(np.round(idx_cont), dtype=int) if method == 'random': idx = np.random.choice(N, size=N_max, replace=False) idx = np.sort(idx) else: idx = np.s_[:] return idx
def format_barcode_summary_h5_key(genome, region, read_type): return '%s_%s_%s_barcode_reads' % (genome, region, read_type)
def get_full_alignment_base_quality_scores(read): """ Returns base quality scores for the full read alignment, inserting zeroes for deletions and removing inserted and soft-clipped bases. Therefore, only returns quality for truly aligned sequenced bases. Args: read (pysam.AlignedSegment): read to get quality scores for Returns: np.array: numpy array of quality scores """ quality_scores = np.fromstring(read.qual, dtype=np.byte) - tk_constants.ILLUMINA_QUAL_OFFSET start_pos = 0 for operation,length in read.cigar: operation = cr_constants.cigar_numeric_to_category_map[operation] if operation == 'D': quality_scores = np.insert(quality_scores, start_pos, [0] * length) elif operation == 'I' or operation == 'S': quality_scores = np.delete(quality_scores, np.s_[start_pos:start_pos + length]) if not operation == 'I' and not operation == 'S': start_pos += length return start_pos, quality_scores
def test_exists(): # Bbox version delete_layer() cv, data = create_layer(size=(128,64,64,1), offset=(0,0,0)) defexists = Bbox( (0,0,0), (128,64,64) ) results = cv.exists(defexists) assert len(results) == 2 assert results['1_1_1/0-64_0-64_0-64'] == True assert results['1_1_1/64-128_0-64_0-64'] == True fpath = os.path.join(cv.layer_cloudpath, cv.key, '64-128_0-64_0-64') fpath = fpath.replace('file://', '') + '.gz' os.remove(fpath) results = cv.exists(defexists) assert len(results) == 2 assert results['1_1_1/0-64_0-64_0-64'] == True assert results['1_1_1/64-128_0-64_0-64'] == False # Slice version delete_layer() cv, data = create_layer(size=(128,64,64,1), offset=(0,0,0)) defexists = np.s_[ 0:128, :, : ] results = cv.exists(defexists) assert len(results) == 2 assert results['1_1_1/0-64_0-64_0-64'] == True assert results['1_1_1/64-128_0-64_0-64'] == True fpath = os.path.join(cv.layer_cloudpath, cv.key, '64-128_0-64_0-64') fpath = fpath.replace('file://', '') + '.gz' os.remove(fpath) results = cv.exists(defexists) assert len(results) == 2 assert results['1_1_1/0-64_0-64_0-64'] == True assert results['1_1_1/64-128_0-64_0-64'] == False
def padding3D(input, width_mode, pad_factor): if width_mode == 'multiple': assert isinstance(pad_factor, int) shape = input.shape[-3:] added_shape = [(0,0)]*len(input.shape[:-3]) for dim in shape: added_shape.append((0,dim % pad_factor)) output = np.pad(input, tuple(added_shape), 'constant', constant_values=(0, 0)) elif width_mode == 'fixed': assert isinstance(pad_factor,list) or isinstance(pad_factor,tuple) output = np.pad(input, tuple(pad_factor), 'constant',constant_values=(0, 0)) elif width_mode == 'match': assert isinstance(pad_factor, list) or isinstance(pad_factor, tuple) shape = input.shape[-3:] shape_difference = np.asarray(pad_factor) - np.asarray(shape) added_shape = [(0, 0)] * len(input.shape[:-3]) subs_shape = [np.s_[:]]* len(input.shape[:-3]) for diff in shape_difference: if diff < 0: subs_shape.append(np.s_[:diff]) added_shape.append((0, 0)) else: subs_shape.append(np.s_[:]) added_shape.append((0, diff)) output = np.pad(input, tuple(added_shape), 'constant', constant_values=(0, 0)) output = output[subs_shape] else: raise ValueError("Padding3D error (src.helpers.preprocessing_utils): No existen padding method " + str(width_mode)) return output
def get_sparse_matrix(self, chunk_size = 1000): """Fetches the time-series data matrix in compressed sparse row (csr) format. Does this in chunks to prevent memory usage issues. Parameters ---------- chunk_size: int the number of items to fetch at one time. Default is 1000. Returns ------- scipy.sparse.csr_matrix csr matrix object containing sequences/time-series as rows, samples /time-points as columns """ data = np.empty(self.h5_table["timeseries/data"].shape) indices = np.empty(self.h5_table["timeseries/indices"].shape) indptr = np.empty(self.h5_table["timeseries/indptr"].shape) chunks = list(range(0, data.shape[0], chunk_size)) if chunks[-1] != data.shape[0]: chunks = chunks + [data.shape[0]] for i,j in zip(chunks[0:-1], chunks[1:]): self.h5_table["timeseries/data"].read_direct(data, np.s_[i:j], np.s_[i:j]) chunks = list(range(0, indices.shape[0], chunk_size)) if chunks[-1] != indices.shape[0]: chunks = chunks + [indices.shape[0]] for i,j in zip(chunks[0:-1], chunks[1:]): self.h5_table["timeseries/indices"].read_direct(indices, np.s_[i:j], np.s_[i:j]) chunks = list(range(0, indptr.shape[0], chunk_size)) if chunks[-1] != indptr.shape[0]: chunks = chunks + [indptr.shape[0]] for i,j in zip(chunks[0:-1], chunks[1:]): self.h5_table["timeseries/indptr"].read_direct(indptr, np.s_[i:j], np.s_[i:j]) return csr_matrix((data, indices, indptr))
def updatebuffer(data_buffer, new_data): """ Concatenates "new_data" into "buffer_array", and returns an array with the same size than "buffer_array" """ new_samples = new_data.shape[0] new_buffer = np.concatenate((data_buffer, new_data), axis =0) new_buffer = np.delete(new_buffer, np.s_[0:new_samples], 0) return new_buffer
def getdata(self, seconds, flush=True ): """ Flush all the Data present in MuLES buffer and, Request and Retrieve a certain amount of Data indicated as seconds Data returned has the shape [seconds * sampling_frequency, channels] Argument: seconds: used to calculate the amount of samples requested n_samples n_samples = seconds * sampling_frequency flush: Boolean, if True send the command Flush before getting Data, Defaul = True """ if flush: self.flushdata() # Size of data requested n_samples = int(round(seconds * self.params['sampling frequency'])) n_columns = len(self.params['data format']) data_buffer = -1 * np.ones((n_samples, n_columns)) while (data_buffer[0, n_columns - 1]) < 0 : #While the first row has not been rewriten new_data = self.getalldata() new_samples = new_data.shape[0] data_buffer = np.concatenate((data_buffer, new_data), axis =0) data_buffer = np.delete(data_buffer, np.s_[0:new_samples], 0) return data_buffer
def mnist (labels = range(10)): from keras.datasets import mnist (x_train, y_train), (x_test, y_test) = mnist.load_data() x_train = (x_train.astype('float32') / 255.).round() x_test = (x_test.astype('float32') / 255.).round() x_train = x_train.reshape((len(x_train), np.prod(x_train.shape[1:]))) x_test = x_test.reshape((len(x_test), np.prod(x_test.shape[1:]))) def conc (x,y): return np.concatenate((y.reshape([len(y),1]),x),axis=1) def select (x,y): selected = np.array([elem for elem in conc(x, y) if elem[0] in labels]) return np.delete(selected,0,1), np.delete(selected,np.s_[1::],1).flatten() x_train, y_train = select(x_train, y_train) x_test, y_test = select(x_test, y_test) return x_train, y_train, x_test, y_test
def test_simple_broadcasting_errors(self): assign = self.assign s_ = np.s_ a = np.zeros((5, 1)) assert_raises(ValueError, assign, a, s_[...], np.zeros((5, 2))) assert_raises(ValueError, assign, a, s_[...], np.zeros((5, 0))) assert_raises(ValueError, assign, a, s_[:, [0]], np.zeros((5, 2))) assert_raises(ValueError, assign, a, s_[:, [0]], np.zeros((5, 0))) assert_raises(ValueError, assign, a, s_[[0], :], np.zeros((2, 1)))
def typical_x(self, dim): off = self.rotate(np.floor(np.arange(0, 3, 3. / dim)) / np.logspace(0, 1, dim), inverse=True) off[np.s_[3:]] += 0.005 off[-1] *= 1e2 off[0] /= 2.0e3 if off[0] > 0 else 1e3 off[2] /= 3.01e4 if off[2] < 0 else 2e4 return self._x_opt(dim) + off
def recordClip(self): ''' recordClip() Clip trailing zeros from the record parameter ''' self._record = np.delete(self._record,np.s_[self._recordIndex::],0)
def _singlenode_searchlight(l, msk, mysl_rad, bcast_var, extra_params): """Run searchlight function on block data in parallel. `extra_params` contains: - Searchlight function. - `Shape` mask. - Minimum active voxels proportion required to run the searchlight function. """ voxel_fn = extra_params[0] shape_mask = extra_params[1] min_active_voxels_proportion = extra_params[2] outmat = np.empty(msk.shape, dtype=np.object)[mysl_rad:-mysl_rad, mysl_rad:-mysl_rad, mysl_rad:-mysl_rad] for i in range(0, outmat.shape[0]): for j in range(0, outmat.shape[1]): for k in range(0, outmat.shape[2]): if msk[i+mysl_rad, j+mysl_rad, k+mysl_rad]: searchlight_slice = np.s_[ i:i+2*mysl_rad+1, j:j+2*mysl_rad+1, k:k+2*mysl_rad+1] voxel_fn_mask = msk[searchlight_slice] * shape_mask if (min_active_voxels_proportion == 0 or np.count_nonzero(voxel_fn_mask) / voxel_fn_mask.size > min_active_voxels_proportion): outmat[i, j, k] = voxel_fn( [ll[searchlight_slice] for ll in l], msk[searchlight_slice] * shape_mask, mysl_rad, bcast_var) return outmat
def expected_bbands(self, window_length, k, closes): """Compute the expected data (without adjustments) for the given window, k, and closes array. This uses talib.BBANDS to generate the expected data. """ lower_cols = [] middle_cols = [] upper_cols = [] ndates, nassets = closes.shape for n in range(nassets): close_col = closes[:, n] if np.isnan(close_col).all(): # ta-lib doesn't deal well with all nans. upper, middle, lower = [np.full(ndates, np.nan)] * 3 else: upper, middle, lower = talib.BBANDS( close_col, window_length, k, k, ) upper_cols.append(upper) middle_cols.append(middle) lower_cols.append(lower) # Stack all of our uppers, middles, lowers into three 2d arrays # whose columns are the sids. After that, slice off only the # rows we care about. where = np.s_[window_length - 1:] uppers = np.column_stack(upper_cols)[where] middles = np.column_stack(middle_cols)[where] lowers = np.column_stack(lower_cols)[where] return uppers, middles, lowers
def var_yx_idx(self): r"""Get index expression for component block of :math:`\mathbf{y}` that is constrained to be equal to :math:`\mathbf{x}`. """ return np.s_[...,-1]
def var_yx_idx(self): r"""Get index expression for component block of :math:`\mathbf{y}` that is constrained to be equal to :math:`\mathbf{x}`. """ return np.s_[..., 0:self.cri.M]
def index_primary(self): """Return an index expression appropriate for extracting the primary (inner) component of the main variables X, Y, etc. """ return np.s_[..., 0:-self.cri.Cd]
def index_addmsk(self): """Return an index expression appropriate for extracting the additive mask (outer) component of the main variables X, Y, etc.""" return np.s_[..., -self.cri.Cd:]
def get_vertex_mask(self, subdomain=None): if subdomain is None: # http://stackoverflow.com/a/42392791/353337 return numpy.s_[:] if subdomain not in self.subdomains: self._mark_vertices(subdomain) return self.subdomains[subdomain]['vertices']
def index_block(y, x, D): return np.s_[y * D:(y + 1) * D, x * D:(x + 1) * D]
def forward(self, x): # create diagonal matrices m = np.zeros((x.size * self.dim)).reshape(-1, self.dim, self.dim) x = x.reshape(-1, self.dim) m[(np.s_[:],) + np.diag_indices(x.shape[1])] = x return m
def processImage(self, index, data): # get the image raw_image = CommonFunctions.preprocessImage(data["image"], self.scale_factor, interpolation=cv2.INTER_CUBIC) image_dimension = raw_image.shape # create output image as numpy array with upscaled image size processed_image = np.zeros(image_dimension, np.float32) # align all tiles for tile, transform_matrix in zip(self.tiles, data["transform_matrix"]): tile_slice_raw_image = np.s_[tile["y"][0]:tile["y"][1], tile["x"][0]:tile["x"][1]] raw_image_tile = raw_image[tile_slice_raw_image] tile_aligned = cv2.warpAffine(raw_image_tile, transform_matrix, (raw_image_tile.shape[1],raw_image_tile.shape[0]), flags=cv2.INTER_CUBIC + cv2.WARP_INVERSE_MAP); # Insert the inner area of tile_aligned (so without margins) into # the appropriate area in the processed image min_x = tile["x"][0] + tile["margin_x"][0] min_y = tile["y"][0] + tile["margin_y"][0] max_x = tile["x"][1] - tile["margin_x"][1] max_y = tile["y"][1] - tile["margin_y"][1] tile_slice_processed_image = np.s_[min_y:max_y, min_x:max_x] max_y_aligned = tile_aligned.shape[0] - tile["margin_y"][1] max_x_aligned = tile_aligned.shape[1] - tile["margin_x"][1] tile_aligned_slice = np.s_[tile["margin_y"][0]:max_y_aligned, tile["margin_x"][0]:max_x_aligned] tile_aligned_without_margin = tile_aligned[tile_aligned_slice] processed_image[tile_slice_processed_image] = tile_aligned_without_margin return processed_image
def flatten(f, channel=0, freqaxis=0): """ Flatten a fits file so that it becomes a 2D image. Return new header and data """ from astropy import wcs naxis=f[0].header['NAXIS'] if naxis<2: raise RadioError('Can\'t make map from this') if naxis==2: return f[0].header,f[0].data w = wcs.WCS(f[0].header) wn=wcs.WCS(naxis=2) wn.wcs.crpix[0]=w.wcs.crpix[0] wn.wcs.crpix[1]=w.wcs.crpix[1] wn.wcs.cdelt=w.wcs.cdelt[0:2] wn.wcs.crval=w.wcs.crval[0:2] wn.wcs.ctype[0]=w.wcs.ctype[0] wn.wcs.ctype[1]=w.wcs.ctype[1] header = wn.to_header() header["NAXIS"]=2 copy=('EQUINOX','EPOCH') for k in copy: r=f[0].header.get(k) if r: header[k]=r slice=[] for i in range(naxis,0,-1): if i<=2: slice.append(np.s_[:],) elif i==freqaxis: slice.append(channel) else: slice.append(0) # slice=(0,)*(naxis-2)+(np.s_[:],)*2 return header,f[0].data[slice]
def select_n_slow(dropped, n, keep, method): reverse_it = (keep == 'last' or method == 'nlargest') ascending = method == 'nsmallest' slc = np.s_[::-1] if reverse_it else np.s_[:] return dropped[slc].sort_values(ascending=ascending).head(n)