我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用numpy.nanmedian()。
def normalize(in_data, column='PDCSAP_FLUX', group_column='QUARTER'): ''' This function normalizes PDCSAP_FLUX data by quarter by dividing the flux by the median for the quarter @param in_data: Data to be normalized @param column: Name of column to be normalized @param group_column: Name of column used to group data ''' if group_column != None: group_list = list(set(in_data[group_column])) group_list.sort() for group in group_list: index = in_data[group_column] == group in_data.loc[index, column] = in_data.loc[index,column] / np.nanmedian(in_data.loc[index,column])
def _nanmedian1d(arr1d, overwrite_input=False): """ Private function for rank 1 arrays. Compute the median ignoring NaNs. See nanmedian for parameter usage """ c = np.isnan(arr1d) s = np.where(c)[0] if s.size == arr1d.size: warnings.warn("All-NaN slice encountered", RuntimeWarning) return np.nan elif s.size == 0: return np.median(arr1d, overwrite_input=overwrite_input) else: if overwrite_input: x = arr1d else: x = arr1d.copy() # select non-nans at end of array enonan = arr1d[-s.size:][~c[-s.size:]] # fill nans in beginning of array with non-nans of end x[s[:enonan.size]] = enonan # slice nans away return np.median(x[:-s.size], overwrite_input=True)
def _nanmedian(a, axis=None, out=None, overwrite_input=False): """ Private function that doesn't support extended axis or keepdims. These methods are extended to this function using _ureduce See nanmedian for parameter usage """ if axis is None or a.ndim == 1: part = a.ravel() if out is None: return _nanmedian1d(part, overwrite_input) else: out[...] = _nanmedian1d(part, overwrite_input) return out else: # for small medians use sort + indexing which is still faster than # apply_along_axis if a.shape[axis] < 400: return _nanmedian_small(a, axis, out, overwrite_input) result = np.apply_along_axis(_nanmedian1d, axis, a, overwrite_input) if out is not None: out[...] = result return result
def test_allnans(self): mat = np.array([np.nan]*9).reshape(3, 3) for axis in [None, 0, 1]: with warnings.catch_warnings(record=True) as w: warnings.simplefilter('always') warnings.simplefilter('ignore', FutureWarning) assert_(np.isnan(np.nanmedian(mat, axis=axis)).all()) if axis is None: assert_(len(w) == 1) else: assert_(len(w) == 3) assert_(issubclass(w[0].category, RuntimeWarning)) # Check scalar assert_(np.isnan(np.nanmedian(np.nan))) if axis is None: assert_(len(w) == 2) else: assert_(len(w) == 4) assert_(issubclass(w[0].category, RuntimeWarning))
def GetSigma(): ''' Get the standard deviation of the Gaussian PSF. I find `sigma` = 0.45 pixels. I also plotted the `x` and `y` obtained below to find that average the maximum extent of the stellar motion is x,y ~ y,x ~ (2.5, 3.5). ''' star = Everest(os.path.join(TRAPPIST_OUT, 'nPLDTrappist.fits'), TRAPPIST_EPIC) fpix = star.fpix.reshape(-1, 6, 6).swapaxes(1,2) guess = [3., 3., 1e4, 1e2, 0.5] n = 0 niter = len(fpix) // 10 x = np.zeros(niter) y = np.zeros(niter) a = np.zeros(niter) b = np.zeros(niter) sigma = np.zeros(niter) for n in prange(niter): x[n], y[n], a[n], b[n], sigma[n] = fmin(ChiSq, guess, args = (fpix[n * 10],), disp = 0) return np.nanmedian(sigma)
def median_percentile(data, des_percentiles='68+95+99'): """ :param data: :param des_percentiles: string with +separated values of the percentiles :return: """ median = np.nanmedian(data, axis=0) out = np.array(map(int, des_percentiles.split("+"))) for i in range(out.size): assert 0 <= out[i] <= 100, 'Percentile must be >0 <100; instead is %f' % out[i] list_percentiles = np.empty((2*out.size,), dtype=out.dtype) list_percentiles[0::2] = out # Compute the percentile list_percentiles[1::2] = 100 - out # Compute also the mirror percentile percentiles = np.nanpercentile(data, list_percentiles, axis=0) return [median, percentiles]
def test_run_roi_stats_via_API(): "Tests whether roi stats can be computed (not their accuracy) and the return values match in size." summary_methods = ['median', 'mean', 'std', 'variation', 'entropy', 'skew', 'kurtosis'] # 'mode' returns more than one value; 'gmean' requires only positive values, # 'hmean' can not always be computed from scipy.stats import trim_mean, kstat from functools import partial trimmed_mean = partial(trim_mean, proportiontocut=0.05) third_kstat = partial(kstat, n=3) summary_methods.extend([trimmed_mean, third_kstat]) # checking support for nan-handling callables summary_methods.extend([np.nanmedian, np.nanmean]) for summary_method in summary_methods: roi_medians = graynet.roiwise_stats_indiv(subject_id_list, fs_dir, base_feature=base_feature, chosen_roi_stats=summary_method, atlas=atlas, smoothing_param=fwhm, out_dir=out_dir, return_results=True) for sub in subject_id_list: if roi_medians[sub].size != num_roi_wholebrain: raise ValueError('invalid summary stats - #nodes do not match.')
def rolling_fltr(dem, f=np.nanmedian, size=3, circular=True): """General rolling filter (default operator is median filter) Can input any function f Efficient for smaller arrays, correclty handles NaN, fills gaps """ dem = malib.checkma(dem) newshp = (dem.size, size*size) #Force a step size of 1 t = malib.sliding_window_padded(dem.filled(np.nan), (size, size), (1, 1)) if circular: mask = circular_mask(size) t[:,mask] = np.nan t = t.reshape(newshp) out = f(t, axis=1).reshape(dem.shape) out = np.ma.fix_invalid(out).astype(dem.dtype) out.set_fill_value(dem.fill_value) return out
def dowd(X): """ Return the Dowd Variogram of the given sample X. X has to be an even-length array of point pairs like: x1, x1+h, x2, x2+h ...., xn, xn + h. If X.ndim > 1, dowd will be called recursively and a list of Cressie-Hawkins Variances is returned. Dowd, P. A., (1984): The variogram and kriging: Robust and resistant estimators, in Geostatistics for Natural Resources Characterization. Edited by G. Verly et al., pp. 91 - 106, D. Reidel, Dordrecht. :param X: :return: """ _X = np.array(X) if any([isinstance(_, list) or isinstance(_, np.ndarray) for _ in _X]): return np.array([dowd(_) for _ in _X]) # check even if len(_X) % 2 > 0: raise ValueError('The sample does not have an even length: {}'.format(_X)) # calculate term1 = np.nanmedian([np.abs(_X[i] - _X[i + 1]) for i in np.arange(0, len(_X) - 1, 2)]) return 0.5 * (2.198 * np.power(term1, 2))
def test_allnans(self): mat = np.array([np.nan]*9).reshape(3, 3) for axis in [None, 0, 1]: with warnings.catch_warnings(record=True) as w: warnings.simplefilter('always') assert_(np.isnan(np.nanmedian(mat, axis=axis)).all()) if axis is None: assert_(len(w) == 1) else: assert_(len(w) == 3) assert_(issubclass(w[0].category, RuntimeWarning)) # Check scalar assert_(np.isnan(np.nanmedian(np.nan))) if axis is None: assert_(len(w) == 2) else: assert_(len(w) == 4) assert_(issubclass(w[0].category, RuntimeWarning))
def obtainValues(): url = 'http://earthserver.ecmwf.int/rasdaman/ows?service=WCS&version=2.0.11&request=ProcessCoverages&query=for c in (river_discharge_forecast_opt2) return encode (c[ansi("2008-04-08T00:00"),Lat(41.32),Long(-89.0)],"csv")' print(" hardcoded value for url "+ url) r = requests.get(url, proxies={'http':None} ) r.raise_for_status() x = np.array(eval(r.text.replace('{','[').replace('}',']'))) listOfMedian=[] for i in range(30): listOfEnsambles=x[i] ## print(listOfEnsambles) listOfEnsambles_masked = np.ma.masked_where(listOfEnsambles == 0, listOfEnsambles) ## print(listOfEnsambles_masked) medianValue=np.nanmedian(listOfEnsambles_masked) ## print(medianValue) listOfMedian.append(medianValue) return(listOfMedian)
def calculate(arguments): filenames = [os.path.join(arguments.input_path, filename) for filename in os.listdir(arguments.input_path) if filename.endswith(arguments.ext)] medians = [] for filename in filenames: depth_map = read_file(filename) median = np.nanmedian(depth_map) medians.append(median) medians = np.array(medians) # Check and create output directory. if not os.path.exists(arguments.output_path): os.makedirs(arguments.output_path) with open(os.path.join(arguments.output_path, "median.txt"), "w") as file: file.write("{:.6f}".format(np.median(medians))) print("{:.6f}".format(np.median(medians)))
def derampMatrix(displ): """ Deramp through fitting a bilinear plane Data is also de-meaned """ if displ.ndim != 2: raise TypeError('Displacement has to be 2-dim array') mx = num.nanmedian(displ, axis=0) my = num.nanmedian(displ, axis=1) ix = num.arange(mx.size) iy = num.arange(my.size) dx, cx, _, _, _ = sp.stats.linregress(ix[~num.isnan(mx)], mx[~num.isnan(mx)]) dy, cy, _, _, _ = sp.stats.linregress(iy[~num.isnan(my)], my[~num.isnan(my)]) rx = (ix * dx) ry = (iy * dy) data = displ - (rx[num.newaxis, :] + ry[:, num.newaxis]) data -= num.nanmean(data) return data
def _nanmedian1d(arr1d, overwrite_input=False): """ Private function for rank 1 arrays. Compute the median ignoring NaNs. See nanmedian for parameter usage """ c = np.isnan(arr1d) s = np.where(c)[0] if s.size == arr1d.size: warnings.warn("All-NaN slice encountered", RuntimeWarning, stacklevel=3) return np.nan elif s.size == 0: return np.median(arr1d, overwrite_input=overwrite_input) else: if overwrite_input: x = arr1d else: x = arr1d.copy() # select non-nans at end of array enonan = arr1d[-s.size:][~c[-s.size:]] # fill nans in beginning of array with non-nans of end x[s[:enonan.size]] = enonan # slice nans away return np.median(x[:-s.size], overwrite_input=True)
def _nanmedian(a, axis=None, out=None, overwrite_input=False): """ Private function that doesn't support extended axis or keepdims. These methods are extended to this function using _ureduce See nanmedian for parameter usage """ if axis is None or a.ndim == 1: part = a.ravel() if out is None: return _nanmedian1d(part, overwrite_input) else: out[...] = _nanmedian1d(part, overwrite_input) return out else: # for small medians use sort + indexing which is still faster than # apply_along_axis # benchmarked with shuffled (50, 50, x) containing a few NaN if a.shape[axis] < 600: return _nanmedian_small(a, axis, out, overwrite_input) result = np.apply_along_axis(_nanmedian1d, axis, a, overwrite_input) if out is not None: out[...] = result return result
def test_allnans(self): mat = np.array([np.nan]*9).reshape(3, 3) for axis in [None, 0, 1]: with suppress_warnings() as sup: sup.record(RuntimeWarning) assert_(np.isnan(np.nanmedian(mat, axis=axis)).all()) if axis is None: assert_(len(sup.log) == 1) else: assert_(len(sup.log) == 3) # Check scalar assert_(np.isnan(np.nanmedian(np.nan))) if axis is None: assert_(len(sup.log) == 2) else: assert_(len(sup.log) == 4)
def test_nangeomedian_axis_zero_one_good(): data = np.array([[1.0, np.nan, 1.0], [2.0, 1.0, 1.0]]) m = hd.nangeomedian(data, axis=0) r = np.nanmedian(data, axis=0) assert_array_almost_equal(m, r, decimal=3)
def test_nangeomedian_axis_one_two_good(): data = np.array([[1.0, np.nan, 1.0], [2.0, 1.0, 1.0]]) m = hd.nangeomedian(data, axis=1) r = np.nanmedian(data, axis=1) assert_array_almost_equal(m, r, decimal=3)
def _nanmedian_small(a, axis=None, out=None, overwrite_input=False): """ sort + indexing median, faster for small medians along multiple dimensions due to the high overhead of apply_along_axis see nanmedian for parameter usage """ a = np.ma.masked_array(a, np.isnan(a)) m = np.ma.median(a, axis=axis, overwrite_input=overwrite_input) for i in range(np.count_nonzero(m.mask.ravel())): warnings.warn("All-NaN slice encountered", RuntimeWarning) if out is not None: out[...] = m.filled(np.nan) return out return m.filled(np.nan)
def test_mutation(self): # Check that passed array is not modified. ndat = _ndat.copy() np.nanmedian(ndat) assert_equal(ndat, _ndat)
def test_keepdims(self): mat = np.eye(3) for axis in [None, 0, 1]: tgt = np.median(mat, axis=axis, out=None, overwrite_input=False) res = np.nanmedian(mat, axis=axis, out=None, overwrite_input=False) assert_(res.ndim == tgt.ndim) d = np.ones((3, 5, 7, 11)) # Randomly set some elements to NaN: w = np.random.random((4, 200)) * np.array(d.shape)[:, None] w = w.astype(np.intp) d[tuple(w)] = np.nan with warnings.catch_warnings(record=True) as w: warnings.simplefilter('always', RuntimeWarning) res = np.nanmedian(d, axis=None, keepdims=True) assert_equal(res.shape, (1, 1, 1, 1)) res = np.nanmedian(d, axis=(0, 1), keepdims=True) assert_equal(res.shape, (1, 1, 7, 11)) res = np.nanmedian(d, axis=(0, 3), keepdims=True) assert_equal(res.shape, (1, 5, 7, 1)) res = np.nanmedian(d, axis=(1,), keepdims=True) assert_equal(res.shape, (3, 1, 7, 11)) res = np.nanmedian(d, axis=(0, 1, 2, 3), keepdims=True) assert_equal(res.shape, (1, 1, 1, 1)) res = np.nanmedian(d, axis=(0, 1, 3), keepdims=True) assert_equal(res.shape, (1, 1, 7, 1))
def test_small_large(self): # test the small and large code paths, current cutoff 400 elements for s in [5, 20, 51, 200, 1000]: d = np.random.randn(4, s) # Randomly set some elements to NaN: w = np.random.randint(0, d.size, size=d.size // 5) d.ravel()[w] = np.nan d[:,0] = 1. # ensure at least one good value # use normal median without nans to compare tgt = [] for x in d: nonan = np.compress(~np.isnan(x), x) tgt.append(np.median(nonan, overwrite_input=True)) assert_array_equal(np.nanmedian(d, axis=-1), tgt)
def test_result_values(self): tgt = [np.median(d) for d in _rdat] res = np.nanmedian(_ndat, axis=1) assert_almost_equal(res, tgt)
def test_empty(self): mat = np.zeros((0, 3)) for axis in [0, None]: with warnings.catch_warnings(record=True) as w: warnings.simplefilter('always') assert_(np.isnan(np.nanmedian(mat, axis=axis)).all()) assert_(len(w) == 1) assert_(issubclass(w[0].category, RuntimeWarning)) for axis in [1]: with warnings.catch_warnings(record=True) as w: warnings.simplefilter('always') assert_equal(np.nanmedian(mat, axis=axis), np.zeros([])) assert_(len(w) == 0)
def test_scalar(self): assert_(np.nanmedian(0.) == 0.)
def test_float_special(self): with warnings.catch_warnings(record=True): warnings.simplefilter('ignore', RuntimeWarning) a = np.array([[np.inf, np.nan], [np.nan, np.nan]]) assert_equal(np.nanmedian(a, axis=0), [np.inf, np.nan]) assert_equal(np.nanmedian(a, axis=1), [np.inf, np.nan]) assert_equal(np.nanmedian(a), np.inf) # minimum fill value check a = np.array([[np.nan, np.nan, np.inf], [np.nan, np.nan, np.inf]]) assert_equal(np.nanmedian(a, axis=1), np.inf) # no mask path a = np.array([[np.inf, np.inf], [np.inf, np.inf]]) assert_equal(np.nanmedian(a, axis=1), np.inf)
def colbkg(self): ''' ''' # Flux in background pixels bkg = np.zeros(self.nx) for col in range(self.nx): b = np.where(self.aperture[:,col] == 0) bkg[col] = np.nanmedian(self.images[self.cadence][b,col]) return 100 * (bkg / np.mean(bkg) - 1.)
def lcbkg(self): ''' ''' binds = np.where(self.aperture ^ 1) bkg = np.nanmedian(np.array([f[binds] for f in self.images], dtype='float64'), axis = 1) return bkg.reshape(-1, 1)
def zero_center_normalize(df, samples, logInput=False, method='median'): ''' Transforming input peptide abundance table into log2-scale and centralize to zero. Inputs: df : dataframe of peptide abundaces samples: column names of selected samples logInput: input abundances are already in log scale method: method for estimating zero point ''' assert method in ('median', 'average', 'GMM'), \ 'Zero centering method has to be among median, average or GMM!' if not logInput: # convert abundances to log2 scale df[samples] = df[samples].apply(np.log2) if method == 'average': norm_scale = np.nanmean(df[samples], axis=0) elif method == 'median': norm_scale = np.nanmedian(df[samples], axis=0) elif method == 'GMM': ''' two-component Gaussian mixture model ''' from sklearn.mixture import GMM gmm = GMM(2) norm_scale = [] for sp in samples: v = df[sp].values v = v[np.logical_not(np.isnan(v))] v = v[np.logical_not(np.isinf(v))] try: gmm.fit(np.matrix(v.values).T) vmean = gmm.means_[np.argmin(gmm.covars_)] norm_scale.append(vmean) except: norm_scale.append(np.nanmean(v)) norm_scale = np.array(norm_scale) df[samples] = df[samples] - norm_scale return df
def __get_pd_median(data, c=1.): """Get the median and the mad of data Args: data (numpy.ndarray): the data Returns: float, float: the median and the mad """ p = np.nanmedian(data) d = np.nanmedian(np.abs(data - p)) / c # d is the MAD return p, d
def calculate_residual_distributions(self): """ This function ... :return: """ # Inform the user log.info("Calculating distributions of residual pixel values ...") # Loop over the different colours for colour_name in self.observed_colours: # Debugging log.debug("Calculating the distribution for the pixels of the " + colour_name + " residual map ...") # Get an 1D array of the valid pixel values pixel_values = None # Create the distribution distribution = Distribution.from_values(pixel_values) # Debugging #log.debug("Median " + colour_name + " residual: " + str(np.nanmedian(np.abs(residual)))) #log.debug("Standard deviation of " + colour_name + " residual: " + str(np.nanstd(residual))) # Add the distribution to the dictionary self.residual_distributions[colour_name] = distribution # -----------------------------------------------------------------