我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用scipy.interpolate.interp1d()。
def _get_legendre_deg_ctd(npts): from scipy.interpolate import interp1d degs = nparray([4,5,6,10,15]) pts = nparray([1e2,3e2,5e2,1e3,3e3]) fn = interp1d(pts, degs, kind='linear', bounds_error=False, fill_value=(min(degs), max(degs))) legendredeg = int(npfloor(fn(npts))) return legendredeg ####################################### # UTILITY FUNCTION FOR ANY DETRENDING # #######################################
def plot_info_retrieval(precisions, save_file): # markers = ["|", "D", "8", "v", "^", ">", "h", "H", "s", "*", "p", "d", "<"] markers = ["D", "p", 's', "*", "d", "8", "^", "H", "v", ">", "<", "h", "|"] ticks = zip(*zip(*precisions)[1][0])[0] plt.xticks(range(len(ticks)), ticks) new_x = interpolate.interp1d(ticks, range(len(ticks)))(ticks) i = 0 for model_name, val in precisions: fr, pr = zip(*val) plt.plot(new_x, pr, linestyle='-', alpha=0.7, marker=markers[i], markersize=8, label=model_name) i += 1 # plt.legend(model_name) plt.xlabel('Fraction of Retrieved Documents') plt.ylabel('Precision') legend = plt.legend(loc='upper right', shadow=True) plt.savefig(save_file) plt.show()
def plot_info_retrieval_by_length(precisions, save_file): markers = ["o", "v", "8", "s", "p", "*", "h", "H", "^", "x", "D"] ticks = zip(*zip(*precisions)[1][0])[0] plt.xticks(range(len(ticks)), ticks) new_x = interpolate.interp1d(ticks, range(len(ticks)))(ticks) i = 0 for model_name, val in precisions: fr, pr = zip(*val) plt.plot(new_x, pr, linestyle='-', alpha=0.6, marker=markers[i], markersize=6, label=model_name) i += 1 # plt.legend(model_name) plt.xlabel('Document Sorted by Length') plt.ylabel('Precision (%)') legend = plt.legend(loc='upper right', shadow=True) plt.savefig(save_file) plt.show()
def riskfree(): r = requests.get(TREASURY_URL) soup = BeautifulSoup(r.text, 'html.parser') table = soup.find("table", attrs={'class' : 't-chart'}) rows= table.find_all('tr') lastrow = len(rows)-1 cells = rows[lastrow].find_all("td") date = cells[0].get_text() m1 = float(cells[1].get_text()) m3 = float(cells[2].get_text()) m6 = float(cells[3].get_text()) y1 = float(cells[4].get_text()) y2 = float(cells[5].get_text()) y3 = float(cells[6].get_text()) y5 = float(cells[7].get_text()) y7 = float(cells[8].get_text()) y10 = float(cells[9].get_text()) y20 = float(cells[10].get_text()) y30 = float(cells[11].get_text()) years = (0, 1/12, 3/12, 6/12, 12/12, 24/12, 36/12, 60/12, 84/12, 120/12, 240/12, 360/12) rates = (OVERNIGHT_RATE, m1/100, m3/100, m6/100, y1/100, y2/100, y3/100, y5/100, y7/100, y10/100, y20/100, y30/100) return interp1d(years, rates)
def reSample( df , dt = None , xAxis = None , n = None , kind = 'linear') : """ re-sample the signal """ if type(df) == pd.Series : df = pd.DataFrame(df) f = interp1d( df.index, np.transpose(df.values) , kind=kind, axis=-1, copy=True, bounds_error=True, assume_sorted=True) if dt : end = int(+(df.index[-1] - df.index[0] ) / dt) * dt + df.index[0] xAxis = np.linspace( df.index[0] , end , 1+int(+(end - df.index[0] ) / dt) ) elif n : xAxis = np.linspace( df.index[0] , df.index[-1] , n ) elif xAxis == None : raise(Exception("reSample : either dt or xAxis should be provided" )) #For rounding issue, ensure that xAxis is within ts.xAxis #xAxis[ np.where( xAxis > np.max(df.index[:]) ) ] = df.index[ np.where( xAxis > np.max(df.index[:]) ) ] return pd.DataFrame( data = np.transpose(f(xAxis)), index = xAxis , columns = map( lambda x : "reSample("+ x +")" , df.columns ) )
def wavelength_to_XYZ(wavelength, observer='1931_2deg'): ''' Uses tristimulus color matching functions to map a awvelength to XYZ coordinates. Args: wavelength (`float`): wavelength in nm. observer (`str`): CIE observer name, must be 1931_2deg. Returns: `numpy.ndarray`: array with last dimension corresponding to X, Y, Z. ''' wavelength = np.asarray(wavelength, dtype=config.precision) cmf = get_cmf(observer) wvl, X, Y, Z = cmf['wvl'], cmf['X'], cmf['Y'], cmf['Z'] ia = {'bounds_error': False, 'fill_value': 0, 'assume_sorted': True} f_X, f_Y, f_Z = interp1d(wvl, X, **ia), interp1d(wvl, Y, **ia), interp1d(wvl, Z, **ia) x, y, z = f_X(wavelength), f_Y(wavelength), f_Z(wavelength) shape = wavelength.shape return np.stack((x, y, z), axis=len(shape))
def make_quantile_df(data, draw_quantiles): """ Return a dataframe with info needed to draw quantile segments """ dens = data['density'].cumsum() / data['density'].sum() ecdf = interp1d(dens, data['y'], assume_sorted=True) ys = ecdf(draw_quantiles) # Get the violin bounds for the requested quantiles violin_xminvs = interp1d(data['y'], data['xminv'])(ys) violin_xmaxvs = interp1d(data['y'], data['xmaxv'])(ys) data = pd.DataFrame({ 'x': interleave(violin_xminvs, violin_xmaxvs), 'y': np.repeat(ys, 2), 'group': np.repeat(np.arange(1, len(ys)+1), 2)}) return data
def compute_precision_score_mapping(thresh, prec, score): ind = np.argsort(thresh); # thresh, ind = torch.sort(thresh) thresh = thresh[ind]; indexes = np.unique(thresh, return_index=True)[1] indexes = np.sort(indexes); thresh = thresh[indexes] thresh = np.vstack((min(-1000, min(thresh) - 1), thresh[:, np.newaxis], max(1000, max(thresh) + 1))); prec = prec[ind]; for i in xrange(1, len(prec)): prec[i] = max(prec[i], prec[i - 1]); prec = prec[indexes] prec = np.vstack((prec[0], prec[:, np.newaxis], prec[-1])); f = interp1d(thresh[:, 0], prec[:, 0]) val = f(score) return val
def __init__(self, streamlines, streamlineData): self.streamlineCoordinates = streamlines self.streamlineData = streamlineData # Get the streamline parameter in a single array. self.parameterisedStreamline = self.streamlineCoordinates[:, 0, 3] # Calculate the velocity along the streamline streamlineVelocity = np.linalg.norm(self.streamlineData[:, 2:4], axis=1) # Fit a cubic spline to the streamline velocity # Need this to calculate velocity derivatives self.parameterisedVelocity = interpolate.CubicSpline(self.parameterisedStreamline, streamlineVelocity, extrapolate=1) # Calculate the first derivative self.parameterisedvelocityPrime = self.parameterisedVelocity.derivative(nu=1) # Calculate the second derivative self.parameterisedvelocityDoublePrime = self.parameterisedVelocity.derivative(nu=2) # Parameterise temperature, pressure and density along the streamline self.parameterisedM = interpolate.interp1d(self.parameterisedStreamline, self.streamlineData[:, 0], kind='linear', fill_value='extrapolate') self.parameterisedT = interpolate.interp1d(self.parameterisedStreamline, self.streamlineData[:, 1], kind='linear', fill_value='extrapolate') self.parameterisedP = interpolate.interp1d(self.parameterisedStreamline, self.streamlineData[:, 5], kind='linear', fill_value='extrapolate') self.parameterisedRho = interpolate.interp1d(self.parameterisedStreamline, self.streamlineData[:, 6], kind='linear', fill_value='extrapolate')
def test_megkde_2d_basic(): # Draw from normal, fit KDE, see if sampling from kde's pdf recovers norm np.random.seed(1) data = np.random.multivariate_normal([0, 1], [[1.0, 0.], [0., 0.75 ** 2]], size=10000) xs, ys = np.linspace(-4, 4, 50), np.linspace(-4, 4, 50) xx, yy = np.meshgrid(xs, ys, indexing='ij') samps = np.vstack((xx.flatten(), yy.flatten())).T zs = MegKDE(data).evaluate(samps).reshape(xx.shape) zs_x = zs.sum(axis=1) zs_y = zs.sum(axis=0) cs_x = zs_x.cumsum() cs_x /= cs_x[-1] cs_x[0] = 0 cs_y = zs_y.cumsum() cs_y /= cs_y[-1] cs_y[0] = 0 samps_x = interp1d(cs_x, xs)(np.random.uniform(size=10000)) samps_y = interp1d(cs_y, ys)(np.random.uniform(size=10000)) mu_x, std_x = norm.fit(samps_x) mu_y, std_y = norm.fit(samps_y) assert np.isclose(mu_x, 0, atol=0.1) assert np.isclose(std_x, 1.0, atol=0.1) assert np.isclose(mu_y, 1, atol=0.1) assert np.isclose(std_y, 0.75, atol=0.1)
def test_summary_max_shortest_2(self): c = ChainConsumer() c.add_chain(self.data_skew) summary_area = 0.6827 c.configure(statistics="max_shortest", bins=1.0, summary_area=summary_area) summary = c.analysis.get_summary()['0'] xs = np.linspace(-1, 5, 1000) pdf = skewnorm.pdf(xs, 5, 1, 1.5) cdf = skewnorm.cdf(xs, 5, 1, 1.5) x2 = interp1d(cdf, xs, bounds_error=False, fill_value=np.inf)(cdf + summary_area) dist = x2 - xs ind = np.argmin(dist) x0 = xs[ind] x2 = x2[ind] xmax = xs[pdf.argmax()] assert np.isclose(xmax, summary[1], atol=0.05) assert np.isclose(x0, summary[0], atol=0.05) assert np.isclose(x2, summary[2], atol=0.05)
def test_summary_max_central_2(self): c = ChainConsumer() c.add_chain(self.data_skew) summary_area = 0.6827 c.configure(statistics="max_central", bins=1.0, summary_area=summary_area) summary = c.analysis.get_summary()['0'] xs = np.linspace(-1, 5, 1000) pdf = skewnorm.pdf(xs, 5, 1, 1.5) cdf = skewnorm.cdf(xs, 5, 1, 1.5) xval = interp1d(cdf, xs)([0.5 - 0.5 * summary_area, 0.5 + 0.5 * summary_area]) xmax = xs[pdf.argmax()] assert np.isclose(xmax, summary[1], atol=0.05) assert np.isclose(xval[0], summary[0], atol=0.05) assert np.isclose(xval[1], summary[2], atol=0.05)
def test_summary_max_central_3(self): c = ChainConsumer() c.add_chain(self.data_skew) summary_area = 0.95 c.configure(statistics="max_central", bins=1.0, summary_area=summary_area) summary = c.analysis.get_summary()['0'] xs = np.linspace(-1, 5, 1000) pdf = skewnorm.pdf(xs, 5, 1, 1.5) cdf = skewnorm.cdf(xs, 5, 1, 1.5) xval = interp1d(cdf, xs)([0.5 - 0.5 * summary_area, 0.5 + 0.5 * summary_area]) xmax = xs[pdf.argmax()] assert np.isclose(xmax, summary[1], atol=0.05) assert np.isclose(xval[0], summary[0], atol=0.05) assert np.isclose(xval[1], summary[2], atol=0.05)
def get_parameter_summary_max_shortest(self, chain, parameter): xs, ys, cs = self._get_smoothed_histogram(chain, parameter) desired_area = chain.config["summary_area"] c_to_x = interp1d(cs, xs, bounds_error=False, fill_value=(-np.inf, np.inf)) # Get max likelihood x max_index = ys.argmax() x = xs[max_index] # Pair each lower bound with an upper to get the right area x2 = c_to_x(cs + desired_area) dists = x2 - xs mask = (xs > x) | (x2 < x) # Ensure max point is inside the area dists[mask] = np.inf ind = dists.argmin() return [xs[ind], x, x2[ind]]
def preprocess_Dr(data, r, normalize=True): x = data["x"] #[xx[0] for xx in data["x"]] data, x = fields._sorted(data, x) Dt = [d[2][2] for d in data["D"]] Dn = [d[0][0] for d in data["D"]] eps = 1e-2 x = [-1., -eps, r] + x + [1000.] if normalize: Dn = [d/Dn[-1] for d in Dn] Dt = [d/Dt[-1] for d in Dt] Dn = [0., 0., eps] + Dn + [1.] Dt = [0., 0., eps] + Dt + [1.] fn = interp1d(x, Dn) ft = interp1d(x, Dt) return fn, ft
def _reinterpolate(tr, wave, new_wave): ''' Reinterpolate a transmission curve on a fixed, regular wavelength grid Parameters ---------- tr : array_like Transmission, normalized to 1 wave : array_like Wavelengths vector, in nanometers new_wave : array_like New wavelengths vector, in nanometers Returns ------- tr_regular : array_like The reinterpolated transmission ''' interp_fun = interp1d(wave, tr, bounds_error=False, fill_value=np.nan) return interp_fun(new_wave)
def interpolateSolution(tvecData,tvecScaled,yvec): """Interpolates scaled simulation vector onto data time vector. Args: tvecData (numpy.ndarray): Data time vector. tvecScaled (numpy.ndarray): Scaled simulation time vector. yvec (numpy.ndarray): Simulation values. Returns: numpy.ndarray: Scaled simulation vector. """ fscal=interpolate.interp1d(tvecScaled,yvec) yvecScaled=fscal(tvecData) return yvecScaled
def _get_interp1d(self,* , kind='linear', copy=True, bounds_error=False, fill_value=np.nan, assume_sorted=None): """returns a scipy interp1d object""" if assume_sorted is None: assume_sorted = utils.is_sorted(self.time) if self.n_signals > 1: axis = 1 else: axis = -1 f = interpolate.interp1d(x=self.time, y=self._ydata_rowsig, kind=kind, axis=axis, copy=copy, bounds_error=bounds_error, fill_value=fill_value, assume_sorted=assume_sorted) return f
def cheaptrick_get_power_spectrum(waveform, fs, fft_size, f0): power_spectrum = np.abs(np.fft.fft(waveform, fft_size)) ** 2 frequency_axis = np.arange(fft_size) / float(fft_size) * float(fs) ind = frequency_axis < (f0 + fs / fft_size) low_frequency_axis = frequency_axis[ind] low_frequency_replica = interp1d(f0 - low_frequency_axis, power_spectrum[ind], kind="linear", fill_value="extrapolate")(low_frequency_axis) p1 = low_frequency_replica[(frequency_axis < f0)[:len(low_frequency_replica)]] p2 = power_spectrum[(frequency_axis < f0)[:len(power_spectrum)]] power_spectrum[frequency_axis < f0] = p1 + p2 lb1 = int(fft_size / 2) + 1 lb2 = 1 ub2 = int(fft_size / 2) power_spectrum[lb1:] = power_spectrum[lb2:ub2][::-1] return power_spectrum
def world_synthesis_time_base_generation(temporal_positions, f0, fs, vuv, time_axis, default_f0): f0_interpolated_raw = interp1d(temporal_positions, f0, kind="linear", fill_value="extrapolate")(time_axis) vuv_interpolated = interp1d(temporal_positions, vuv, kind="linear", fill_value="extrapolate")(time_axis) vuv_interpolated = vuv_interpolated > 0.5 f0_interpolated = f0_interpolated_raw * vuv_interpolated.astype("float32") f0_interpolated[f0_interpolated == 0] = f0_interpolated[f0_interpolated == 0] + default_f0 total_phase = np.cumsum(2 * np.pi * f0_interpolated / float(fs)) core = np.mod(total_phase, 2 * np.pi) core = np.abs(core[1:] - core[:-1]) # account for diff, avoid deprecation warning with [:-1] pulse_locations = time_axis[:-1][core > (np.pi / 2.)] pulse_locations_index = np.round(pulse_locations * fs).astype("int32") return pulse_locations, pulse_locations_index, vuv_interpolated
def from_array(cls, tenors, forwards, t0=None, interp_mode=InterpMode.PiecewiseConst): """Create a Curve object: tenors (floats), fwds (floats), interp_mode""" if t0 is None: t0 = tenors[0] assert len(tenors) == len(forwards) if interp_mode == cls.InterpMode.PiecewiseConst: interpfwd = interp1d(tenors, forwards, kind='zero', bounds_error=False, fill_value='extrapolate') return cls(t0, lambda t: interpfwd(t)) elif interp_mode == cls.InterpMode.Linear: interpfwd = interp1d(tenors, forwards, kind='linear', bounds_error=False, fill_value='extrapolate') return cls(t0, lambda t: interpfwd(t)) elif interp_mode == cls.InterpMode.LinearLog: linzero = interp1d(tenors, np.log(forwards), kind='linear', bounds_error=False, fill_value='extrapolate') return cls(t0, lambda t: np.exp(linzero(t))) else: raise BaseException('invalid curve interpolation mode ...')
def ioneq(self): """ Ionization equilibrium data interpolated to the given temperature Note ---- Will return NaN where interpolation is out of range of the data. For computing ionization equilibrium outside of this temperature range, it is better to use `fiasco.Element.equilibrium_ionization` """ f = interp1d(self._ioneq[self._dset_names['ioneq_filename']]['temperature'], self._ioneq[self._dset_names['ioneq_filename']]['ionization_fraction'], kind='linear', bounds_error=False, fill_value=np.nan) ioneq = f(self.temperature) isfinite = np.isfinite(ioneq) ioneq[isfinite] = np.where(ioneq[isfinite] < 0., 0., ioneq[isfinite]) return u.Quantity(ioneq)
def _dere_cross_section(self, energy: u.erg): """ Calculate direct ionization cross-sections according to [1]_. References ---------- .. [1] Dere, K. P., 2007, A&A, `466, 771 <http://adsabs.harvard.edu/abs/2007A%26A...466..771D>`_ """ # Cross-sections from diparams file cross_section_total = np.zeros(energy.shape) for ip,bt_c,bt_e,bt_cross_section in zip(self._diparams['ip'], self._diparams['bt_c'], self._diparams['bt_e'], self._diparams['bt_cross_section']): U = energy/(ip.to(u.erg)) scaled_energy = 1. - np.log(bt_c)/np.log(U - 1. + bt_c) f_interp = interp1d(bt_e.value, bt_cross_section.value, kind='cubic', fill_value='extrapolate') scaled_cross_section = f_interp(scaled_energy.value)*bt_cross_section.unit # Only nonzero at energies above the ionization potential scaled_cross_section *= (U.value > 1.) cross_section = scaled_cross_section*(np.log(U) + 1.)/U/(ip**2) if not hasattr(cross_section_total, 'unit'): cross_section_total = cross_section_total*cross_section.unit cross_section_total += cross_section return cross_section_total
def __init__(self, hmat, m, n, tper, k, out0): hmat = np.asarray(hmat) if hmat.shape != (2 * m + 1, n + 1): raise ValueError('hmat shape = %s not compatible with M=%d, N=%d' % (hmat.shape, m, n)) # use symmetry to fill in negative input frequency data. fullh = np.empty((2 * m + 1, 2 * n + 1), dtype=complex) fullh[:, n:] = hmat / (k * tper) fullh[:, :n] = np.fliplr(np.flipud(fullh[:, n + 1:])).conj() self.hmat = fullh wc = 2.0 * np.pi / tper self.m_col = np.arange(-m, m + 1) * (1.0j * wc) self.n_col = np.arange(-n, n + 1) * (1.0j * wc / k) self.m_col = self.m_col.reshape((-1, 1)) self.n_col = self.n_col.reshape((-1, 1)) self.tper = tper self.k = k self.outfun = interp.interp1d(out0[:, 0], out0[:, 1], bounds_error=True, assume_sorted=True)
def convolve(self, wavelengths, densities): # define short names for the involved wavelength grids wa = wavelengths wb = self._Wavelengths # create a combined wavelength grid, restricted to the overlapping interval w1 = wa[ (wa>=wb[0]) & (wa<=wb[-1]) ] w2 = wb[ (wb>=wa[0]) & (wb<=wa[-1]) ] w = np.unique(np.hstack((w1,w2))) if len(w) < 2: return 0 # log-log interpolate SED and transmission on the combined wavelength grid # (use scipy interpolation function for SED because np.interp does not support broadcasting) F = np.exp(interp1d(np.log(wa), _log(densities), copy=False, bounds_error=False, fill_value=0.)(np.log(w))) T = np.exp(np.interp(np.log(w), np.log(wb), _log(self._Transmission), left=0., right=0.)) # perform the integration if self._PhotonCounter: return np.trapz(x=w, y=w*F*T) / self._IntegratedTransmission else: return np.trapz(x=w, y=F*T) / self._IntegratedTransmission ## This function calculates and returns the integrated value for a given spectral energy distribution over the # filter's wavelength range,
def integrate(self, wavelengths, densities): # define short names for the involved wavelength grids wa = wavelengths wb = self._Wavelengths # create a combined wavelength grid, restricted to the overlapping interval w1 = wa[(wa >= wb[0]) & (wa <= wb[-1])] w2 = wb[(wb >= wa[0]) & (wb <= wa[-1])] w = np.unique(np.hstack((w1, w2))) if len(w) < 2: return 0 # log-log interpolate SED and transmission on the combined wavelength grid # (use scipy interpolation function for SED because np.interp does not support broadcasting) F = np.exp(interp1d(np.log(wa), _log(densities), copy=False, bounds_error=False, fill_value=0.)(np.log(w))) T = np.exp(np.interp(np.log(w), np.log(wb), _log(self._Transmission), left=0., right=0.)) # perform the integration if self._PhotonCounter: return np.trapz(x=w, y=w * F * T) else: return np.trapz(x=w, y=F * T) ## This private helper function returns the natural logarithm for positive values, and a large negative number # (but not infinity) for zero or negative values.
def findPercentile(parameter, probability, percentile): npoints = 10000 interpfunc = interpolate.interp1d(parameter,probability, kind='linear') parRange = np.linspace(min(parameter),max(parameter),npoints) interProb = interpfunc(parRange) cumInteg = np.zeros(npoints-1) for i in range(1,npoints-1): cumInteg[i] = cumInteg[i-1] + (0.5*(interProb[i+1] + interProb[i]) * (parRange[i+1] - parRange[i])) cumInteg = cumInteg / cumInteg[-1] idx = (np.abs(cumInteg-percentile/100.)).argmin() return parRange[idx]
def convolveFilter(flux,wl,filter): input = np.loadtxt(filter) response_wl = input[:,0] * 1.e-4 response_trans = input[:,1] minwl = response_wl[0] maxwl = response_wl[len(response_wl)-1] intwl = np.copy(wl) intwl[intwl > maxwl] = -1 intwl[intwl < minwl] = -1 wlrange = intwl > 0 intwl = intwl[wlrange] transmission = np.zeros(len(flux)) interpfunc = interpolate.interp1d(response_wl,response_trans, kind='linear') transmission[wlrange] = interpfunc(intwl) tot_trans = integrate.simps(transmission,wl) tot_flux = integrate.simps(transmission*flux,wl) return tot_flux/tot_trans
def __init__(self): """ This function ... """ # From Battisit et al. 2016 wl_B16 = np.arange(0.125, 0.832, 0.01) x = 1. / wl_B16 Qfit_B16 = -2.488 + 1.803 * x - 0.261 * x ** 2 + 0.0145 * x ** 3 interpfunc = interpolate.interp1d(wl_B16, Qfit_B16, kind='linear') Qfit_B16_V = interpfunc(0.55) # Interpolate to find attenuation at V band central wavelengths n_atts_B16 = Qfit_B16 / Qfit_B16_V wavelengths = wl_B16 attenuations = Qfit_B16 + 1. # TODO: understand this more ... # Call the constructor of the base class super(BattistiAttenuationCurve, self).__init__(wavelengths, attenuations) # -----------------------------------------------------------------
def make_ecc_interpolant(): """ Make interpolation function from eccentricity file to determine number of harmonics to use for a given eccentricity. :returns: interpolant """ pth = resource_filename(Requirement.parse('libstempo'), 'libstempo/ecc_vs_nharm.txt') fil = np.loadtxt(pth) return interp1d(fil[:,0], fil[:,1]) # get interpolant for eccentric binaries
def interpolate_in_range(x, y, x_new, kind_interp): """ Given some samples y of the function y(x), interpolate_in_range returns the interpolation of values y(x_new) :param x: The points at which y(x) is evaluated. Array :param y: The values of y(x). Array :param x_new: The values at which y(x) has to be interpolated. Array :param kind_interp: The interpolation method of the function scipy.interpolate.interp1d. String :return: y_new: the new interpolates samples """ if x.size == 1: y_new = y * np.ones(x_new.size) elif x.size == 2: x = np.append(x, x_new[-1]) y = np.append(y, y[-1]) func = interp.interp1d(x, y, kind=kind_interp, bounds_error=False) y_new = func(x_new) else: func = interp.interp1d(x, y, kind=kind_interp, bounds_error=False) y_new = func(x_new) return y_new
def test_lund_rescale_mean_velocity_different_grid(): fileName = path.join(eddylicious.__path__[0], "..", "tests", "datasets", "channel_flow_180", "dns.dat") dns = np.genfromtxt(fileName) eta = dns[:, 0] yPlus = dns[:, 1] u = dns[:, 1] v = np.zeros(u.shape) etaInfl = np.linspace(0, eta[-1], 119) yPlusInfl = etaInfl*6.37309e-2/3.5e-4 uTest = interp1d(eta, u)(etaInfl) w = blending_function(etaInfl) uRescaled = lund_rescale_mean_velocity(eta, yPlus, u, v, etaInfl.size, etaInfl, yPlusInfl, 1, u[-1], u[-1], 1, w)[0][:, 0] for i in range(len(uTest)): assert_almost_equal(uTest[i], uRescaled[i], decimal=3) # Test recaling dns data onto itself, using a different grid with eta > 1
def linear(cls, time, y0, yf): """ return linear function values that array size is same as time length Args: time (array_like) : time y0 (float): initial value yf (float): final value Returns: (N, ) ndarray """ x = np.array([time[0], time[-1]]) y = np.array([y0, yf]) f = interpolate.interp1d(x, y) return f(time)
def integrate(self, wavelength_grid): """ Integrate the spectrum flux over the specified grid of wavelengths. Parameters ---------- wavelength_grid : quantity_like Returns ------- integrated_flux : :class:`~astropy.units.Quantity` """ grid = u.Quantity(wavelength_grid) grid = grid.to(self.wavelength.unit) interpolator = interp1d(self.wavelength.value, self.flux.value, kind='cubic') new_flux = interpolator(grid.value) return simps(new_flux, x=grid.value) * self.flux.unit * grid.unit
def get_results(self, time_steps, result_key="output", interpolation="nearest", as_eval_data=False): """ return results from internal storage for given time steps. .. warning:: calling this method before a simulation was run will result in an error. :param time_steps: time points where values are demanded :param result_key: type of values to be returned :param interpolation: interpolation method to use if demanded time-steps are not covered by the storage, see :func:`scipy.interpolate.interp1d` for all possibilities :param as_eval_data: return results as EvalData object for straightforward display """ func = interp1d(np.array(self._time_storage), np.array(self._value_storage[result_key]), kind=interpolation, assume_sorted=True, axis=0) values = func(time_steps) if as_eval_data: return EvalData([time_steps], values, name=".".join([self.name, result_key])) return values
def __init__(self, survey, band): self.survey = survey.strip() self.band = band.strip() try: self.mstar_file = resource_filename(__name__,'data/mstar/mstar_%s_%s.fit' % (self.survey, self.band)) except: raise IOError("Could not find mstar resource mstar_%s_%s.fit" % (self.survey, self.band)) try: self._mstar_arr = fitsio.read(self.mstar_file,ext=1) except: raise IOError("Could not find mstar file mstar_%s_%s.fit" % (self.survey, self.band)) # Tom - why not use CubicSpline here? That's why it exists... self._f = CubicSpline(self._mstar_arr['Z'],self._mstar_arr['MSTAR']) #self._f = interpolate.interp1d(self._mstar_arr['Z'],self._mstar_arr['MSTAR'],kind='cubic')
def fit_transform(self, X, **kwargs): """Fit to data, then transform it. Parameters ---------- X : array-like Time series dataset to be resampled. Returns ------- numpy.ndarray Resampled time series dataset. """ X_ = to_time_series_dataset(X) n_ts, sz, d = X_.shape equal_size = check_equal_size(X_) X_out = numpy.empty((n_ts, self.sz_, d)) for i in range(X_.shape[0]): xnew = numpy.linspace(0, 1, self.sz_) if not equal_size: sz = ts_size(X_[i]) for di in range(d): f = interp1d(numpy.linspace(0, 1, sz), X_[i, :sz, di], kind="slinear") X_out[i, :, di] = f(xnew) return X_out
def get_xsfb(infile, m_in, unit): if unit == 'pb': fac = 1000. if unit == 'fb': fac = 1. dirname = os.path.dirname(__file__) infile_path = os.path.join(dirname, infile) data = np.loadtxt(infile_path) try: mass_ar, xspb_ar = np.transpose(data) except: mass_ar, xspb_ar, dummy = np.transpose(data) xspb_data = interpolate.interp1d(mass_ar, xspb_ar) xspb = xspb_data(m_in) xsfb = xspb * fac return xsfb
def calculate_val(thresholds, embeddings1, embeddings2, actual_issame, far_target, nrof_folds=10): assert(embeddings1.shape[0] == embeddings2.shape[0]) assert(embeddings1.shape[1] == embeddings2.shape[1]) nrof_pairs = min(len(actual_issame), embeddings1.shape[0]) nrof_thresholds = len(thresholds) k_fold = KFold(n_splits=nrof_folds, shuffle=False) val = np.zeros(nrof_folds) far = np.zeros(nrof_folds) diff = np.subtract(embeddings1, embeddings2) dist = np.sum(np.square(diff),1) indices = np.arange(nrof_pairs) for fold_idx, (train_set, test_set) in enumerate(k_fold.split(indices)): # Find the threshold that gives FAR = far_target far_train = np.zeros(nrof_thresholds) for threshold_idx, threshold in enumerate(thresholds): _, far_train[threshold_idx] = calculate_val_far(threshold, dist[train_set], actual_issame[train_set]) if np.max(far_train)>=far_target: f = interpolate.interp1d(far_train, thresholds, kind='slinear') threshold = f(far_target) else: threshold = 0.0 val[fold_idx], far[fold_idx] = calculate_val_far(threshold, dist[test_set], actual_issame[test_set]) val_mean = np.mean(val) far_mean = np.mean(far) val_std = np.std(val) return val_mean, val_std, far_mean
def py_sampleProfile(intensity, Rmin, dR, nxy, dxy, udat, vdat, dRA=0., dDec=0., PA=0, inc=0.): """ Python implementation of sampleProfile. """ inc_cos = np.cos(inc) nrad = len(intensity) gridrad = np.linspace(Rmin, Rmin + dR * (nrad - 1), nrad) ncol, nrow = nxy, nxy # create the mesh grid x = (np.linspace(0.5, -0.5 + 1./float(ncol), ncol)) * dxy * ncol y = (np.linspace(0.5, -0.5 + 1./float(nrow), nrow)) * dxy * nrow # we shrink the x axis, since PA is the angle East of North of the # the plane of the disk (orthogonal to the angular momentum axis) # PA=0 is a disk with vertical orbital node (aligned along North-South) x_axis, y_axis = np.meshgrid(x / inc_cos, y) x_meshgrid = np.sqrt(x_axis ** 2. + y_axis ** 2.) # convert to Jansky sr_to_px = dxy**2. intensity *= sr_to_px f = interp1d(gridrad, intensity, kind='linear', fill_value=0., bounds_error=False, assume_sorted=True) intensmap = f(x_meshgrid) f_center = interp1d(gridrad, intensity, kind='linear', fill_value='extrapolate', bounds_error=False, assume_sorted=True) intensmap[int(nrow/2), int(ncol/2)] = f_center(0.) vis = py_sampleImage(intensmap, dxy, udat, vdat, PA=PA, dRA=dRA, dDec=dDec) return vis
def pwl(pairs, t): """ PWL describes a piecewise linear waveform :param pairs: a 2D list. Each rom is a pair [time, value] :param t: array with times where the function has to be evaluated :return: the function values at times defined in t """ # convert pairs to np.array is necessary if isinstance(pairs, list): pairs = np.array(pairs) # check pairs format if pairs.ndim == 1: pairs.shape = (-1,2) # get pairs x = [] y = [] for xk, yk in pairs: x.append(xk) y.append(yk) # create linear interpolator fun = interp1d(x, y, bounds_error=False, fill_value=(y[0], y[-1]), assume_sorted=True) # interpolation out = fun(t) return out
def plot(x, y, x_label, y_label, save_file): ticks = x plt.xticks(range(len(ticks)), ticks, fontsize = 15) plt.yticks(fontsize = 15) new_x = interpolate.interp1d(ticks, range(len(ticks)))(ticks) plt.plot(new_x, y, linestyle='-', alpha=1.0, markersize=12, marker='p', color='b') plt.xlabel(x_label, fontsize=24) plt.ylabel(y_label, fontsize=20) plt.savefig(save_file) plt.show()
def Dc_to_redshift(self, Dc): """ Calculate the redshift corresponding to the given comoving distance (along LoS) by using interpolation. """ if not hasattr(self, "_Dc_interp"): Dc_min, Dc_max = self.Dc_limit dDc = self.Dc_cell N = int((Dc_max - Dc_min) / dDc) z_ = np.linspace(self.zmin, self.zmax, num=N) Dc_ = self.cosmo.comoving_distance(z_).value # [Mpc] self._Dc_interp = interpolate.interp1d(Dc_, z_, kind="linear") return self._Dc_interp(Dc)
def make2d(w1d, x=None): """ Create 2D filter from the 1D one. Parameters ---------- w1d : 1D `~numpy.ndarray` The input 1D filter/window x : 1D `~numpy.ndarray`, optional The X-axis values of the input ``w1d`` filter Returns ------- w2d : 2D `~numpy.ndarray` Created 2D filter/window from the input 1D one. Credit ------ [1] MATLAB - creating 2D convolution filters https://cn.mathworks.com/matlabcentral/newsreader/view_thread/23588 """ if x is None: L = len(w1d) M = (L-1) / 2 x = np.linspace(-M, M, num=L) xmax = np.max(x) xsize = int(2 * xmax + 1) xx = np.linspace(-xmax, xmax, num=xsize) xg, yg = np.meshgrid(xx, xx) r = np.sqrt(xg**2 + yg**2) ridx = (r <= xmax) w2d = np.zeros(shape=(xsize, xsize)) finterp = interpolate.interp1d(x=x, y=w1d, kind="linear") w2d[ridx] = finterp(r[ridx]) return w2d
def spectrum_to_XYZ_emissive(spectrum_dict, cmf='1931_2deg'): ''' Converts a reflective or transmissive spectrum to XYZ coordinates. Args: spectrum_dict (`dict`): dictionary with wvl, values keys. Wvl in units of nm. cmf (`str`): which color matching function to use, defaults to CIE 1931 2 degree observer. Returns: `tuple` containing: `float`: X `float`: Y `float`: Z ''' wvl, values = spectrum_dict['wvl'], spectrum_dict['values'] cmf = get_cmf(cmf) wvl_cmf = cmf['wvl'] try: can_be_direct = np.allclose(wvl_cmf, wvl) except ValueError as e: can_be_direct = False if not can_be_direct: dat_interpf = interp1d(wvl, values, kind='linear', bounds_error=False, fill_value=0, assume_sorted=True) values = dat_interpf(wvl_cmf) dw = wvl_cmf[1] - wvl_cmf[0] k = 100 / (values * cmf['Y']).sum(axis=0) / dw X = k * (values * cmf['X']).sum(axis=0) Y = k * (values * cmf['Y']).sum(axis=0) Z = k * (values * cmf['Z']).sum(axis=0) return X, Y, Z
def add_demonstration(self, demonstration): interpolate = interp1d(np.linspace(0, 1, len(demonstration)), demonstration, kind='cubic') stretched_demo = interpolate(self.x) self.Y = np.vstack((self.Y, stretched_demo)) self.nrTraj = len(self.Y) self.W = np.dot(np.linalg.inv(np.dot(self.Phi, self.Phi.T)), np.dot(self.Phi, self.Y.T)).T # weights for each trajectory self.meanW = np.mean(self.W, 0) # mean of weights w1 = np.array(map(lambda x: x - self.meanW.T, self.W)) self.sigmaW = np.dot(w1.T, w1)/self.nrTraj # covariance of weights self.sigmaSignal = np.sum(np.sum((np.dot(self.W, self.Phi) - self.Y) ** 2)) / (self.nrTraj * self.nrSamples)
def resamp(cop_dat): # resample data to even sample points using same average sample rate # resample data t = np.linspace(cop_dat[0,2], cop_dat[-1,2], num=nsamp(cop_dat), endpoint=True) f_cor = interp1d(cop_dat[:,2], cop_dat[:,0], kind='linear') cor = f_cor(t) f_sag = interp1d(cop_dat[:,2], cop_dat[:,1], kind='linear') sag = f_sag(t) # convert all to nd arrays cor = to_sing(cor) sag = to_sing(sag) t = to_sing(t) return np.concatenate((cor,sag,t),axis=1)