我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用numpy.nanstd()。
def test_against_numpy_nanstd(self): source = [np.random.random((16, 12, 5)) for _ in range(10)] for arr in source: arr[randint(0, 15), randint(0, 11), randint(0, 4)] = np.nan stack = np.stack(source, axis = -1) for axis in (0, 1, 2, None): for ddof in range(4): with self.subTest('axis = {}, ddof = {}'.format(axis, ddof)): from_numpy = np.nanstd(stack, axis = axis, ddof = ddof) from_ivar = last(istd(source, axis = axis, ddof = ddof, ignore_nan = True)) self.assertSequenceEqual(from_numpy.shape, from_ivar.shape) self.assertTrue(np.allclose(from_ivar, from_numpy))
def plot_heatmaps(data, mis, column_label, cont, topk=30, prefix=''): cmap = sns.cubehelix_palette(as_cmap=True, light=.9) m, nv = mis.shape for j in range(m): inds = np.argsort(- mis[j, :])[:topk] if len(inds) >= 2: plt.clf() order = np.argsort(cont[:,j]) subdata = data[:, inds][order].T subdata -= np.nanmean(subdata, axis=1, keepdims=True) subdata /= np.nanstd(subdata, axis=1, keepdims=True) columns = [column_label[i] for i in inds] sns.heatmap(subdata, vmin=-3, vmax=3, cmap=cmap, yticklabels=columns, xticklabels=False, mask=np.isnan(subdata)) filename = '{}/heatmaps/group_num={}.png'.format(prefix, j) if not os.path.exists(os.path.dirname(filename)): os.makedirs(os.path.dirname(filename)) plt.title("Latent factor {}".format(j)) plt.yticks(rotation=0) plt.savefig(filename, bbox_inches='tight') plt.close('all') #plot_rels(data[:, inds], map(lambda q: column_label[q], inds), colors=cont[:, j], # outfile=prefix + '/relationships/group_num=' + str(j), latent=labels[:, j], alpha=0.1)
def test_ddof_too_big(self): nanfuncs = [np.nanvar, np.nanstd] stdfuncs = [np.var, np.std] dsize = [len(d) for d in _rdat] for nf, rf in zip(nanfuncs, stdfuncs): for ddof in range(5): with warnings.catch_warnings(record=True) as w: warnings.simplefilter('always') tgt = [ddof >= d for d in dsize] res = nf(_ndat, axis=1, ddof=ddof) assert_equal(np.isnan(res), tgt) if any(tgt): assert_(len(w) == 1) assert_(issubclass(w[0].category, RuntimeWarning)) else: assert_(len(w) == 0)
def _calculate(self, X, y, categorical, metafeatures, helpers): skews = helpers.get_value("Skewnesses") std = np.nanstd(skews) if len(skews) > 0 else 0 return std if np.isfinite(std) else 0 # @metafeatures.define("cancor1") # def cancor1(X, y): # pass # @metafeatures.define("cancor2") # def cancor2(X, y): # pass ################################################################################ # Information-theoretic metafeatures
def add_MACD(data, Ns=[12,26,9]): ''' :param data: DataFrame containing stock price info in the second column :param Ns: List of short term long term EMA to use and look-back window of MACD's EMA :return: ''' symbol = data.columns.values[1] # assuming stock price is in the second column in data MACD = cal_EMA(data.ix[:,[symbol]],N=Ns[0]) - cal_EMA(data.ix[:,[symbol]],N=Ns[1]) data['MACD'] = MACD signal = cal_EMA(data.MACD[Ns[1]:],N=Ns[2]) # # normalized them # MACD = (MACD - np.nanmean(MACD))/(2*np.nanstd(MACD)) # signal = (signal - np.nanmean(signal))/(2*np.nanstd(signal)) data['MACD'] = MACD data['Signal'] = 'NaN' data.loc[Ns[1]:,'Signal'] = signal return data
def add_MACD(data, Ns=None): ''' :param data: DataFrame containing stock price info in the second column :param Ns: List of short term long term EMA to use and look-back window of MACD's EMA :return: ''' if Ns is None: Ns = [12, 26, 9] symbol = data.columns.values[1] # assuming stock price is in the second column in data MACD = cal_EMA(data.loc[:, symbol], N=Ns[0]) - cal_EMA(data.loc[:, symbol], N=Ns[1]) data['MACD'] = MACD signal = cal_EMA(data.MACD[Ns[1]:], N=Ns[2]) # # normalized them # MACD = (MACD - np.nanmean(MACD))/(2*np.nanstd(MACD)) # signal = (signal - np.nanmean(signal))/(2*np.nanstd(signal)) # data['MACD'] = MACD data['Signal'] = 'NaN' data.loc[Ns[1]:, 'Signal'] = signal return data
def addNoise(img, snr=25, rShot=0.5): ''' adds Gaussian (thermal) and shot noise to [img] [img] is assumed to be noise free [rShot] - shot noise ratio relative to all noise ''' s0, s1 = img.shape[:2] m = img.mean() if np.isnan(m): m = np.nanmean(img) assert m != 0, 'image mean cannot be zero' img = img / m noise = np.random.normal(size=s0 * s1).reshape(s0, s1) if rShot > 0: noise *= (rShot * img**0.5 + 1) noise /= np.nanstd(noise) noise[np.isnan(noise)] = 0 return m * (img + noise / snr)
def histo_plot(figure,X,color,xlabel="",ylabel="",cumul=False,bar=True,n_points=400,smooth_factor=0.1,spline_order=3,linewidth=3,alpha=1.0,label=""): if '%' in xlabel: magnitude = 100 X_values = np.array(np.minimum(np.around(X),n_points+1),int) else: # magnitude = np.power(10,np.around(4*np.log10(X.mean()))/4+0.5) magnitude = np.power(10,np.around(4*np.log10(np.nanmean(X)+np.nanstd(X)+1e-7))/4+1) magnitude = np.around(magnitude,int(-np.log10(magnitude))+1) # print magnitude #magnitude = X.mean()+5.0*X.std() X_values = np.array(np.minimum(np.around(n_points*X[True-np.isnan(X)]/magnitude),n_points+1),int) X_histo = np.zeros(n_points+1,float) for x in np.linspace(0,n_points,n_points+1): X_histo[x] = nd.sum(np.ones_like(X_values,float),X_values,index=x) if '%' in ylabel: X_histo[x] /= X_values.size/100.0 if cumul: X_histo[x] += X_histo[x-1] if bar: bar_plot(figure,np.linspace(0,magnitude,n_points+1),X_histo,np.array([1,1,1]),color,xlabel,ylabel,label=label) else: smooth_plot(figure,np.linspace(0,magnitude,n_points+1),X_histo,color,color,xlabel,ylabel,n_points=n_points,smooth_factor=smooth_factor,spline_order=spline_order,linewidth=linewidth,alpha=alpha,label=label)
def process(self, obj_data): ''' Removes table data with large snow errors @param obj_data: Input DataWrapper, will be modified in place ''' bad_stations = [] sigma_multiplier = self.ap_paramList[0]() for label, data in obj_data.getIterator(): if len(data[data[self.snow_column]==4]) > 0 and len(data[data[self.snow_column]==2]) > 0: snow = data[data[self.snow_column]==4].loc[:,self.column_name] no_snow = data[data[self.snow_column]==2].loc[:,self.column_name] non_snow_std = np.nanstd(no_snow) snow_std = np.nanstd(snow) if snow_std > sigma_multiplier * non_snow_std: bad_stations.append(label) if len(bad_stations) > 0: obj_data.removeFrames(bad_stations)
def X(self): # Scale the data across features X = stats.zscore(np.asarray(self.betas)) # Remove zero-variance features X = X[:, self.good_voxels] assert not np.isnan(X).any() # Regress out behavioral confounds rt = np.asarray(self.rt) m, s = np.nanmean(rt), np.nanstd(rt) rt = np.nan_to_num((rt - m) / s) X = OLS(X, rt).fit().resid return X
def split_and_zscore(self, data, test_run): # Enforse type and size of the data data = np.asarray(data) if data.ndim == 1: data = np.expand_dims(data, 1) # Identify training and test samples train = np.asarray(self.runs != test_run) test = np.asarray(self.runs == test_run) train_data = data[train] test_data = data[test] # Compute the mean and standard deviation of the training set m, s = np.nanmean(train_data), np.nanstd(train_data) # Scale the training and test set train_data = (train_data - m) / s test_data = (test_data - m) / s return train_data, test_data
def test_dtype_from_dtype(self): mat = np.eye(3) codes = 'efdgFDG' for nf, rf in zip(self.nanfuncs, self.stdfuncs): for c in codes: with suppress_warnings() as sup: if nf in {np.nanstd, np.nanvar} and c in 'FDG': # Giving the warning is a small bug, see gh-8000 sup.filter(np.ComplexWarning) tgt = rf(mat, dtype=np.dtype(c), axis=1).dtype.type res = nf(mat, dtype=np.dtype(c), axis=1).dtype.type assert_(res is tgt) # scalar case tgt = rf(mat, dtype=np.dtype(c), axis=None).dtype.type res = nf(mat, dtype=np.dtype(c), axis=None).dtype.type assert_(res is tgt)
def test_dtype_from_char(self): mat = np.eye(3) codes = 'efdgFDG' for nf, rf in zip(self.nanfuncs, self.stdfuncs): for c in codes: with suppress_warnings() as sup: if nf in {np.nanstd, np.nanvar} and c in 'FDG': # Giving the warning is a small bug, see gh-8000 sup.filter(np.ComplexWarning) tgt = rf(mat, dtype=c, axis=1).dtype.type res = nf(mat, dtype=c, axis=1).dtype.type assert_(res is tgt) # scalar case tgt = rf(mat, dtype=c, axis=None).dtype.type res = nf(mat, dtype=c, axis=None).dtype.type assert_(res is tgt)
def test_ddof_too_big(self): nanfuncs = [np.nanvar, np.nanstd] stdfuncs = [np.var, np.std] dsize = [len(d) for d in _rdat] for nf, rf in zip(nanfuncs, stdfuncs): for ddof in range(5): with suppress_warnings() as sup: sup.record(RuntimeWarning) sup.filter(np.ComplexWarning) tgt = [ddof >= d for d in dsize] res = nf(_ndat, axis=1, ddof=ddof) assert_equal(np.isnan(res), tgt) if any(tgt): assert_(len(sup.log) == 1) else: assert_(len(sup.log) == 0)
def zscore_cooccur(prob_observed, prob_shuffle): """Compare co-occurrence observed probabilities with shuffle Parameters ---------- prob_observed : np.array prob_shuffle : np.array Returns ------- prob_zscore : np.array """ num_neurons = prob_observed.shape[0] prob_zscore = np.zeros((num_neurons, num_neurons)) for i in range(num_neurons): for j in range(num_neurons): with warnings.catch_warnings(): warnings.simplefilter("ignore", category=RuntimeWarning) prob_zscore[i][j] = (prob_observed[i][j] - np.nanmean(np.squeeze(prob_shuffle[:, i, j]))) / np.nanstd(np.squeeze(prob_shuffle[:, i, j])) return prob_zscore
def init_distrib_idx(self, distrib, idx=None): assert isinstance(distrib, DistribGauss) x = distrib.get_mu() if idx is None: # initialize prior and thus average over all cases mu = np.nanmean(x, axis=0, keepdims=True) else: # select cases idx mu = x[idx, :] idx_nan = np.isnan(mu) if np.any(idx_nan): # we need to randomly select new values for all NaNs idx_good = np.ones_like(idx, dtype=bool) idx_good[idx, :] = False idx_good[np.isnan(x)] = False x_good = x[idx_good, :] num_nan = np.count_nonzero(idx_nan) mu[idx_nan] = np.random.choice(x_good, num_nan, replace=False) mu = np.copy(mu) # make sure to not overwrite data std = np.empty_like(mu) std.fill(np.asscalar(np.nanstd(x))) self.init_data(mu, std)
def normalize_mean_std(X, x_mean=None, x_std=None, axis=0, ignore_nan=False): if ignore_nan: mean = np.nanmean std = np.nanstd else: mean = np.mean std = np.std if x_mean is None: x_mean = mean(X, axis=axis, keepdims=True) if x_std is None: x_std = std(X, axis=axis, keepdims=True) x_std[~np.isfinite(1 / x_std)] = 1 if np.any(~np.isfinite(x_mean)): warnings.warn("x_mean contains NaN or Inf values!") if np.any(~np.isfinite(x_std)): warnings.warn("x_std contains NaN or Inf values!") X = X - x_mean X = X / x_std return X, x_mean, x_std
def average_with_nan(detector_list, error_estimate=ErrorEstimate.stderr): """ Calculate average detector object, excluding malformed data (NaN) from averaging. :param detector_list: :param error_estimate: :return: """ # TODO add compatibility check result = Detector() result.counter = len(detector_list) result.data_raw = np.nanmean([det.data_raw for det in detector_list], axis=0) if result.counter > 1 and error_estimate != ErrorEstimate.none: # s = stddev = sqrt(1/(n-1)sum(x-<x>)**2) # s : corrected sample standard deviation result.error_raw = np.nanstd([det.data_raw for det in detector_list], axis=0, ddof=1) # if user requested standard error then we calculate it as: # S = stderr = stddev / sqrt(n), or in other words, # S = s/sqrt(N) where S is the corrected standard deviation of the mean. if error_estimate == ErrorEstimate.stderr: result.error_raw /= np.sqrt(result.counter) # np.sqrt() always returns np.float64 else: result.error_raw = np.zeros_like(result.data_raw) return result
def getStdError(self): """ get standard deviation of error over all joints, averaged over sequence :return: standard deviation of error """ return numpy.nanmean(numpy.nanstd(numpy.sqrt(numpy.square(self.gt - self.joints).sum(axis=2)), axis=1))
def getJointStdError(self, jointID): """ get standard deviation of one joint, averaged over sequence :param jointID: joint ID :return: standard deviation of joint error """ return numpy.nanstd(numpy.sqrt(numpy.square(self.gt[:, jointID, :] - self.joints[:, jointID, :]).sum(axis=1)))
def test_nanstd(self): tgt = np.std(self.mat) for mat in self.integer_arrays(): assert_equal(np.nanstd(mat), tgt) tgt = np.std(self.mat, ddof=1) for mat in self.integer_arrays(): assert_equal(np.nanstd(mat, ddof=1), tgt)
def test_ddof(self): nanfuncs = [np.nanvar, np.nanstd] stdfuncs = [np.var, np.std] for nf, rf in zip(nanfuncs, stdfuncs): for ddof in [0, 1]: tgt = [rf(d, ddof=ddof) for d in _rdat] res = nf(_ndat, axis=1, ddof=ddof) assert_almost_equal(res, tgt)
def mean_std(data): # TODO: assert is a np.array mean = np.nanmean(data, axis=0) std = np.nanstd(data, axis=0) return [mean, std]
def compute(self, today, asset_ids, out, values): # *inputs are M x N numpy arrays, where M is the window_length and N is the number of securities # out is an empty array of length N. out will be the output of our custom factor each day. The job of compute is to write output values into out. # asset_ids will be an integer array of length N containing security ids corresponding to the columns in our *inputs arrays. # today will be a pandas Timestamp representing the day for which compute is being called. # Calculates the column-wise standard deviation, ignoring NaNs out[:] = numpy.nanstd(values, axis=0) # instantiate custom factor in make_pipeline()
def reject_outliers(data, m=3): data[abs(data - np.nanmean(data, axis=0)) > m * np.nanstd(data, axis=0)] = np.nan return data
def _get_stderr_lb_varyinglens(x): mus, stds, ns = [], [], [] for temp_list in zip_longest(*x, fillvalue=np.nan): mus.append(np.nanmean(temp_list)) n = len(temp_list) - np.sum(np.isnan(temp_list)) stds.append(np.nanstd(temp_list, ddof=1 if n > 1 else 0)) ns.append(n) return np.array(mus) - np.array(stds) / np.sqrt(ns)
def _calculate(self, X, y, categorical, metafeatures, helpers): values = [val for val in helpers.get_value("NumSymbols") if val > 0] if len(values) == 0: return 0 # FIXME Error handle std = np.nanstd(values) return std if np.isfinite(std) else 0
def _calculate(self, X, y, categorical, metafeatures, helpers): kurts = helpers.get_value("Kurtosisses") std = np.nanstd(kurts) if len(kurts) > 0 else 0 return std if np.isfinite(std) else 0
def normalize(array, window=11, normalize_variance=True): """ Arguments: array (np.array): 2D array (time, member) window (int): Window length to compute climatology normalize_variance (bool): Adjust variance Returns: np.array: Array of normalized values (same size as input array) """ N = array.shape[1] """ Remove climatology so we can look at annomalies. Use separate obs and fcst climatology otherwise the fcst variance is higher because obs gets the advantage of using its own climatology. """ clim = climatology(array, window, use_future_years=True) values = copy.deepcopy(array) for i in range(0, N): values[:, i] = (values[:, i] - clim) if normalize_variance and array.shape[1] > 2: """ This removes any seasonally varying variance, which can cause the 1-year variance to be larger than the 1/2 year variance, because the 1/2 year variance samples the summer months more often than the winter months, because of the windowing approach. Also, this normalization does not guarantee that the std of the whole timeseries is 1, therefore in the plot, don't expect the first point to be 1. The timeseries is scaled up again to match the average anomaly variance in the timeseries. """ std = np.nanstd(array, axis=1) if np.min(std) == 0: warning("Standard deviation of 0 at one or more days. Not normalizing variance") else: meanstd = np.nanmean(std) for i in range(0, N): values[:, i] = values[:, i] / std * meanstd return values
def test_basic_stats(x): s = SummaryStats() s.update(x) assert s.count() == np.count_nonzero(~np.isnan(x)) np.testing.assert_allclose(s.sum(), np.nansum(x), rtol=RTOL, atol=ATOL) np.testing.assert_equal(s.min(), np.nanmin(x) if len(x) else np.nan) np.testing.assert_equal(s.max(), np.nanmax(x) if len(x) else np.nan) np.testing.assert_allclose(s.mean(), np.nanmean(x) if len(x) else np.nan, rtol=RTOL, atol=ATOL) np.testing.assert_allclose(s.var(), np.nanvar(x) if len(x) else np.nan, rtol=RTOL, atol=ATOL) np.testing.assert_allclose(s.std(), np.nanstd(x) if len(x) else np.nan, rtol=RTOL, atol=ATOL)
def plot_heatmaps(data, labels, alpha, mis, column_label, cont, topk=20, prefix='', focus=''): cmap = sns.cubehelix_palette(as_cmap=True, light=.9) m, nv = mis.shape for j in range(m): inds = np.where(np.logical_and(alpha[j] > 0, mis[j] > 0.))[0] inds = inds[np.argsort(- alpha[j, inds] * mis[j, inds])][:topk] if focus in column_label: ifocus = column_label.index(focus) if not ifocus in inds: inds = np.insert(inds, 0, ifocus) if len(inds) >= 2: plt.clf() order = np.argsort(cont[:,j]) subdata = data[:, inds][order].T subdata -= np.nanmean(subdata, axis=1, keepdims=True) subdata /= np.nanstd(subdata, axis=1, keepdims=True) columns = [column_label[i] for i in inds] sns.heatmap(subdata, vmin=-3, vmax=3, cmap=cmap, yticklabels=columns, xticklabels=False, mask=np.isnan(subdata)) filename = '{}/heatmaps/group_num={}.png'.format(prefix, j) if not os.path.exists(os.path.dirname(filename)): os.makedirs(os.path.dirname(filename)) plt.title("Latent factor {}".format(j)) plt.savefig(filename, bbox_inches='tight') plt.close('all') #plot_rels(data[:, inds], list(map(lambda q: column_label[q], inds)), colors=cont[:, j], # outfile=prefix + '/relationships/group_num=' + str(j), latent=labels[:, j], alpha=0.1)
def first_order(feature, aggregates, verbose=False): if not type(aggregates) is list: aggregates = [aggregates] for aggregate in aggregates: if verbose: print(' first order computation: ' + aggregate) if aggregate == 'log': feature = np.log(feature) elif aggregate == 'sqrt': feature = np.sqrt(feature) elif aggregate == 'minlog': feature = np.log(1 - feature) elif aggregate == 'minsqrt': feature = np.sqrt(1 - feature) elif aggregate == 'mean': # feature = np.mean(feature, axis=0) feature = np.nanmean(feature, axis=0) elif aggregate == 'var': feature = np.var(feature, axis=0) elif aggregate == 'std': # feature = np.std(feature, axis=0) feature = np.nanstd(feature, axis=0) elif aggregate == 'stdmean': feature = np.hstack([np.mean(feature, axis=0), np.std(feature, axis=0)]) elif aggregate == 'cov': feature = np.flatten(np.cov(feature, axis=0)) elif aggregate == 'totvar': feature = np.array([np.mean(np.var(feature, axis=0))]) elif aggregate == 'totstd': feature = np.array([np.mean(np.std(feature, axis=0))]) elif aggregate == 'entropy': feature = feature.flatten() feature = np.array([stats.entropy(feature)]) elif aggregate == 'normentropy': feature = feature.flatten() feature = np.array([stats.entropy(feature) / np.log(feature.size)]) elif aggregate == 'information': feature = - np.log(feature) return feature
def __get_pd_mean(data, c=1.): """Get the mean and the standard deviation of data Args: data (numpy.ndarray): the data Returns: float, float: the mean and the standard deviation """ p = np.nanmean(data) d = np.nanstd(data) / c return p, d
def calculate_residual_distributions(self): """ This function ... :return: """ # Inform the user log.info("Calculating distributions of residual pixel values ...") # Loop over the different colours for colour_name in self.observed_colours: # Debugging log.debug("Calculating the distribution for the pixels of the " + colour_name + " residual map ...") # Get an 1D array of the valid pixel values pixel_values = None # Create the distribution distribution = Distribution.from_values(pixel_values) # Debugging #log.debug("Median " + colour_name + " residual: " + str(np.nanmedian(np.abs(residual)))) #log.debug("Standard deviation of " + colour_name + " residual: " + str(np.nanstd(residual))) # Add the distribution to the dictionary self.residual_distributions[colour_name] = distribution # -----------------------------------------------------------------
def scaleParamsFromReference(img, reference): # saving startup time: from scipy.optimize import curve_fit def ff(arr): arr = imread(arr, 'gray') if arr.size > 300000: arr = arr[::10, ::10] m = np.nanmean(arr) s = np.nanstd(arr) r = m - 3 * s, m + 3 * s b = (r[1] - r[0]) / 5 return arr, r, b img, imgr, imgb = ff(img) reference, refr, refb = ff(reference) nbins = np.clip(15, max(imgb, refb), 50) refh = np.histogram(reference, bins=nbins, range=refr)[ 0].astype(np.float32) imgh = np.histogram(img, bins=nbins, range=imgr)[0].astype(np.float32) import pylab as plt plt.figure(1) plt.plot(refh) plt.figure(2) plt.plot(imgh) plt.show() def fn(x, offs, div): return (x - offs) / div params, fitCovariances = curve_fit(fn, refh, imgh, p0=(0, 1)) perr = np.sqrt(np.diag(fitCovariances)) print('error scaling to reference image: %s' % perr[0]) # if perr[0] < 0.1: return params[0], params[1]
def _update(self, limits=None): curves = self.display.widget.curves x = self.display.stack.values y = [] s = self.pStd.value() yStd = [] for curve in curves: if limits: b1 = np.argmax(curve.xData >= limits[0]) b2 = np.argmax(curve.xData >= limits[1]) if b2 == 0: b2 = -1 else: b1, b2 = None, None y.append(np.nanmean(curve.yData[b1:b2])) if s: yStd.append(np.nanstd(curve.yData[b1:b2])) if self.out is None or self.out.isClosed(): self.out = self.display.workspace.addDisplay( origin=self.display, axes=self.display.axes.copy(('stack', 1)), title='ROI', names=['ROI'], data=[(x, y)]) else: self.out.widget.curves[0].setData(x, y) if s: if self.outStd is None or self.outStd.isClosed(): self.outStd = self.display.workspace.addDisplay( origin=self.display, axes=self.display.axes.copy(('stack', 1)), title='ROI - std', names=['ROI - std'], data=[(x, yStd)]) else: self.outStd.widget.curves[0].setData(x, yStd)
def get_mat_movingstd(tsmat, periods): mstd = np.empty(shape = tsmat.shape) mstd.fill(np.NAN) for i in xrange(tsmat.shape[0]): j = i - periods + 1 if j < 0: j = 0 mstd[i,:] = np.nanstd(tsmat[j:i+1,:], 0) return mstd
def calc_cumulative_dist(momlim=0.3,region_list=['L1688','NGC1333','B18','OrionA'], file_extension='DR1_rebase3'): projDir = '/media/DATAPART/projects/GAS/testing/' min_bin = 20.5 # log 10 N(H2) - set by B18 max_bin = 26. # log 10 N(H2) - set by Orion A bin_size = 0.05 nbins = np.int((max_bin - min_bin)/bin_size) data_bin_vals = [min_bin + x * bin_size for x in range(nbins+1)] # Loop over regions for region_i in range(len(region_list)): region = region_list[region_i] nh3ImFits = projDir + '{0}/{0}_NH3_11_{1}_mom0_QA_trim.fits'.format(region,file_extension) herColFits = 'nh2_regridded/{0}_NH2_regrid.fits'.format(region) nh3_fits = fits.open(nh3ImFits) nh2_regrid_fits = fits.open(herColFits) h2_data = np.log10(nh2_regrid_fits[0].data) nh3_data = nh3_fits[0].data h2_mean_array = np.zeros(nbins) h2_std_array = np.zeros(nbins) nh3_frac_array = np.zeros(nbins) for bin_i in range(nbins-1): bin_h2_indices = np.where(np.logical_and(h2_data >= data_bin_vals[bin_i], h2_data < data_bin_vals[bin_i+1])) bin_h2_data = h2_data[bin_h2_indices] bin_nh3_data = nh3_data[bin_h2_indices] if np.count_nonzero(bin_nh3_data) != 0: frac_above_mom = np.count_nonzero(bin_nh3_data > momlim)/(1.0*np.count_nonzero(bin_nh3_data)) else: frac_above_mom = 0 bin_mean_h2 = np.nanmean(bin_h2_data) bin_std_h2 = np.nanstd(bin_h2_data) h2_mean_array[bin_i] = bin_mean_h2 nh3_frac_array[bin_i] = frac_above_mom h2_std_array[bin_i] = bin_std_h2 # Write out bins np.savetxt('cumulative/{0}_cumulative.txt'.format(region), np.transpose([h2_mean_array,nh3_frac_array,h2_std_array]))
def corr(data): ns = data.shape[0]; nt = data.shape[1]; mean = np.nanmean(data, axis = 0); std = np.nanstd(data - mean, axis = 0); c = np.zeros((nt, nt)); for t1 in range(nt): #for t2 in range(nt): #c[t1,t2] = np.nanmean( (data[:,t1] - mean[t1]) * (data[:,t2] - mean[t2]), axis = 0); # / (std[t1] * std[t2]); c[t1,:] = np.nanmean( (data[:,:].T * data[:, t1]).T, axis = 0); return c;
def downstddv(x): if x.size < 4: return np.nan median = np.nanpercentile(x, 50) return np.nanstd(x[x < median])
def get_stats(arr): return np.array([ np.nanmean(arr), np.nanvar(arr), np.nanmedian(arr), np.nanstd(arr), arr.shape[0] ])