我们从Python开源项目中,提取了以下36个代码示例,用于说明如何使用astropy.units.km()。
def deblend_cube(region='OrionA',vmin=0.,vmax=20.): tex = fits.getdata('{0}/parameterMaps/{0}_Tex_DR1_rebase3_flag.fits'.format(region)) mom0 = fits.getdata('{0}/{0}_NH3_11_DR1_rebase3_mom0_QA_trim.fits'.format(region)) vlsr = fits.getdata('{0}/parameterMaps/{0}_Vlsr_DR1_rebase3_flag.fits'.format(region)) sigv = fits.getdata('{0}/parameterMaps/{0}_Sigma_DR1_rebase3_flag.fits'.format(region)) nnh3 = fits.getdata('{0}/parameterMaps/{0}_N_NH3_DR1_rebase3_flag.fits'.format(region)) cube = SpectralCube.read('{0}/{0}_NH3_11_DR1_rebase3_trim.fits'.format(region)) cube = cube.with_spectral_unit(u.km/u.s,velocity_convention='radio') tpeak = mom0/(np.sqrt(2*np.pi)*sigv) vlsr[vlsr==0]=np.nan sigv[sigv==0]=np.nan deblend = np.zeros(cube.shape) hdr = cube.wcs.to_header() spaxis = cube.spectral_axis.value for plane in np.arange(deblend.shape[0]): deblend[plane,:,:] = tpeak*np.exp(-(spaxis[plane]-vlsr)**2/(2*sigv**2)) newcube = SpectralCube(deblend,cube.wcs,header=hdr) slab = newcube.spectral_slab(vmin*u.km/u.s,vmax*u.km/u.s) slab.write('{0}/{0}_singlecomp.fits'.format(region),overwrite=True)
def test_plot_rv_curves(): pars = dict(P=[30.]*u.day, K=[10.]*u.km/u.s, ecc=[0.11239], omega=[0.]*u.radian, phi0=[np.pi/2]*u.radian) samples = JokerSamples(VelocityTrend2) for k in pars: samples[k] = pars[k] samples['v0'] = [150] * u.km/u.s samples['v1'] = [0.01] * u.km/u.s/u.day t_grid = np.random.uniform(56000, 56500, 1024) t_grid.sort() fig, ax = plt.subplots(1, 1, figsize=(12,5)) plot_rv_curves(samples, t_grid, ax=ax, trend_t0=56000.)
def mean_spectra(region,line,file_extension,restFreq,spec_param): ''' Sum spectra over entire mapped region Cubes are missing BUNIT header parameter. Fix. ''' filein = '{0}/0{}_{1}_{2}_trim.fits'.format(region,line,file_extension) #add_fits_units(filein,'K') cube = SpectralCube.read(filein) #trim_edge_cube(cube) slice_unmasked = cube.unmasked_data[:,:,:] if line == 'NH3_33': slice_unmasked[spec_param['mask33_chans'][0]:spec_param['mask33_chans'][1],:,:]=0. summed_spectrum = np.nanmean(slice_unmasked,axis=(1,2)) cube2 = cube.with_spectral_unit(u.km/u.s,velocity_convention='radio', rest_value=restFreq*u.GHz) return summed_spectrum, cube2.spectral_axis
def get_max_separation(p_spt, s_temp, v, d=1.0 * u.parsec): """ Get the maximum separation for a binary candidate :param p_spt: The spectral type of the primary star :param s_temp: The temperature of the companion :param v: The velocity, in km/s, of the companion :param d: The distance, in parsec, to the system :return: The maximum primary-->secondary separation, in arcseconds """ # Convert the companion temperature and primary spectral type to mass from kglib.spectral_type import Mamajek_Table MS = SpectralTypeRelations.MainSequence() MT = Mamajek_Table.MamajekTable() teff2mass = MT.get_interpolator('Teff', 'Msun') M1 = MS.Interpolate('mass', p_spt) M2 = teff2mass(s_temp) Mt = (M1 + M2) * u.M_sun # Compute the maximum separation G = constants.G a_max = (G * Mt / v ** 2) alpha_max = (a_max / d).to(u.arcsecond, equivalencies=u.dimensionless_angles()) return alpha_max
def plot_profile(wave, flux, line, name): # define the velocity scale [AA / s] vel_aas = (wave - line*(1+zem[i])) / (line*(1+zem[i])) * c # convert to [km / s] vel_kms = vel_aas.to('km/s') # define parameters for the x and y axes ax.set_xlim(xmin, xmax) ax.xaxis.set_minor_locator(minorLocator) ax.tick_params(axis='x', labelsize=xls) ax.set_ylim(0., 1.5) # make the plot ax.plot(vel_kms, flux) # include the name of the line plt.text(xmin+0.03*(xmax-xmin), 0.15, name) # mark the approximate centroid velocity plt.axvline(x=vcen[i], ymin=0., ymax = 1.5, linewidth=1, color='k', linestyle='dotted') plt.axvline(x=vcen[i]+30., ymin=0., ymax = 1.5, linewidth=0.5, color='k') plt.axvline(x=vcen[i]-30., ymin=0., ymax = 1.5, linewidth=0.5, color='k') # label other lines for clarity for k in range(0, len(lines)): vel_off_aas = (lines[k] - line) / line * c vel_off_kms = vel_off_aas.to('km/s') / (u.km / u.s) plt.axvline(x=vcen[i]+vel_off_kms, ymin=0., ymax = 1.5, linewidth=1, color='k', linestyle='dotted') # define the data directory
def plot_profile(wave, flux, line, name): # define the velocity scale [AA / s] vel_aas = (wave - line*(1+zem[i])) / (line*(1+zem[i])) * c # convert to [km / s] vel_kms = vel_aas.to('km/s') # define parameters for the x and y axes ax.set_xlim(xmin, xmax) ax.xaxis.set_minor_locator(minorLocator) ax.tick_params(axis='x', labelsize=xls) ax.set_ylim(0., 3.) # make the plot ax.plot(vel_kms, np.log(1/flux)) # include the name of the line plt.text(xmin+0.03*(xmax-xmin), 2.3, name) # mark the approximate centroid velocity plt.axvline(x=vcen[i], ymin=0., ymax = 1.5, linewidth=1, color='k', linestyle='dotted') plt.axvline(x=vcen[i]+30., ymin=0., ymax = 1.5, linewidth=0.5, color='k') plt.axvline(x=vcen[i]-30., ymin=0., ymax = 1.5, linewidth=0.5, color='k') # label other lines for clarity for k in range(0, len(lines)): vel_off_aas = (lines[k] - line) / line * c vel_off_kms = vel_off_aas.to('km/s') / (u.km / u.s) plt.axvline(x=vcen[i]+vel_off_kms, ymin=0., ymax = 1.5, linewidth=1, color='k', linestyle='dotted') # define the data directory
def test_circular_integration_1step(self): """Compare total number and integrated column density for circle. """ from ..core import CircularAperture Nobs = 6.41756750e26 parent = 1.4e4 * u.km daughter = 1.7e5 * u.km Q = 5.8e23 / u.s v = 1 * u.km / u.s aper = CircularAperture(3300 * u.km) coma = Haser(Q, v, parent, daughter) N1 = coma.total_number(aper) N2 = coma._integrate_column_density(aper) assert np.isclose(N1, N2)
def get_wind_speed(self, n=3): ''' Populates the self.wind_speed property Based on the information in Rs232_Comms_v120.pdf document Medians n measurements. This isn't mentioned specifically by the manual but I'm guessing it won't hurt. ''' self.logger.info('Getting wind speed') if self.wind_speed_enabled(): values = [] for i in range(0, n): result = self.query('V!') if result: value = float(result[0]) self.logger.debug(' Wind Speed Query = {:.1f}'.format(value)) values.append(value) if len(values) >= 3: self.wind_speed = np.median(values) * u.km / u.hr self.logger.info(' Wind speed = {:.1f}'.format(self.wind_speed)) else: self.wind_speed = None else: self.wind_speed = None return self.wind_speed
def pairs(self, sep, dv): """ Generate a pair catalog Parameters ---------- sep : Angle or Quantity dv : Quantity Offset in velocity. Positive for projected pairs (i.e. dz > input value) Returns ------- """ # Checks if not isinstance(sep, (Angle, Quantity)): raise IOError("Input radius must be an Angle type, e.g. 10.*u.arcsec") if not isinstance(dv, (Quantity)): raise IOError("Input velocity must be a quantity, e.g. u.km/u.s") # Match idx, d2d, d3d = match_coordinates_sky(self.coords, self.coords, nthneighbor=2) close = d2d < sep # Cut on redshift if dv > 0.: # Desire projected pairs zem1 = self.cat['zem'][close] zem2 = self.cat['zem'][idx[close]] dv12 = ltu.dv_from_z(zem1,zem2) gdz = np.abs(dv12) > dv # f/g and b/g izfg = dv12[gdz] < 0*u.km/u.s ID_fg = self.cat[self.idkey][close][gdz][izfg] ID_bg = self.cat[self.idkey][idx[close]][gdz][izfg] else: pdb.set_trace() # Reload return ID_fg, ID_bg
def setup(self): mjd = np.linspace(55555., 56555., 256) self.data = RVData(mjd, np.sin(mjd)*u.km/u.s, stddev=0.1*np.random.uniform(size=mjd.size)*u.km/u.s) n = 128 samples = dict() samples['P'] = np.random.uniform(size=n) * u.day samples['phi0'] = np.random.uniform(size=n) * u.radian samples['omega'] = np.random.uniform(size=n) * u.radian samples['ecc'] = np.random.uniform(size=n) * u.one samples['jitter'] = np.random.uniform(size=n) * u.m/u.s self.samples = samples self.n = n
def test_pack_prior_samples(self): samples = self.samples.copy() M,units = pack_prior_samples(samples, self.data.rv.unit) assert units[-1] == u.km/u.s assert M.shape == (self.n, 5) samples.pop('jitter') assert 'jitter' not in samples M,units = pack_prior_samples(samples, self.data.rv.unit) assert units[-1] == u.km/u.s assert M.shape == (self.n, 5)
def test_shit(): joker_params = JokerParams(P_min=8*u.day, P_max=32768*u.day, jitter=0*u.m/u.s) joker = TheJoker(joker_params) t = np.random.uniform(0, 250, 16) + 56831.324 t.sort() rv = np.cos(t) rv_err = np.random.uniform(0.1, 0.2, t.size) data = RVData(t=t, rv=rv*u.km/u.s, stddev=rv_err*u.km/u.s) samples = joker.sample_prior(size=16384) chunk = [] for k in samples: chunk.append(np.array(samples[k])) chunk = np.ascontiguousarray(np.vstack(chunk).T) t0 = time.time() cy_ll = batch_marginal_ln_likelihood(chunk, data, joker_params) print("Cython:", time.time() - t0) t0 = time.time() n_chunk = len(chunk) py_ll = np.zeros(n_chunk) for i in range(n_chunk): try: py_ll[i] = marginal_ln_likelihood(chunk[i], data, joker_params) except Exception as e: py_ll[i] = np.nan print("Python:", time.time() - t0) assert np.allclose(np.array(cy_ll), py_ll)
def test_plotting(): # check that plotting at least succeeds with allowed arguments t = np.random.uniform(55555., 56012., size=128) rv = 100 * np.sin(0.5*t) * u.km/u.s ivar = 1 / (np.random.normal(0,5,size=t.size)*u.km/u.s)**2 data = RVData(t=t, rv=rv, ivar=ivar) data.plot() # style data.plot(color='r') # custom axis fig,ax = plt.subplots(1,1) data.plot(ax=plt.gca()) # formatting data.plot(rv_unit=u.m/u.s) data.plot(rv_unit=u.m/u.s, time_format='jd') data.plot(rv_unit=u.m/u.s, time_format=lambda x: x.utc.mjd) # try with no errors data = RVData(t=t, rv=rv) data.plot() data.plot(ecolor='r') plt.close('all')
def get_max_velocity(p_spt, s_temp): MS = SpectralTypeRelations.MainSequence() s_spt = MS.GetSpectralType('temperature', s_temp, prec=1e-3) R1 = MS.Interpolate('radius', p_spt) T1 = MS.Interpolate('temperature', p_spt) M1 = MS.Interpolate('mass', p_spt) M2 = MS.Interpolate('mass', s_spt) G = constants.G.cgs.value Msun = constants.M_sun.cgs.value Rsun = constants.R_sun.cgs.value v2 = 2.0 * G * Msun * (M1 + M2) / (Rsun * R1 * (T1 / s_temp) ** 2) return np.sqrt(v2) * u.cm.to(u.km)
def plot_profile(wave, flux, line, name, fosc): # define the velocity scale [AA / s] vel_aas = (wave - line*(1+zem[i])) / (line*(1+zem[i])) * c # convert to [km / s] vel_kms = vel_aas.to('km/s') # define parameters for the x and y axes ax.set_xlim(xmin, xmax) ax.xaxis.set_minor_locator(minorLocator) ax.tick_params(axis='x', labelsize=xls) ymax = 3.e12 ax.set_ylim(0., ymax) # make the plot (using equations 5 and 8 from Savage & Sembach 1991) ax.plot(vel_kms, np.log(1/flux) / 2.654e-15 / (fosc * line)) # include the name of the line plt.text(xmin+0.03*(xmax-xmin), ymax*0.6, name) # mark the approximate centroid velocity plt.axvline(x=vcen[i], ymin=0., ymax = 1.5, linewidth=1, color='k', linestyle='dotted') plt.axvline(x=vcen[i]+30., ymin=0., ymax = 1.5, linewidth=0.5, color='k') plt.axvline(x=vcen[i]-30., ymin=0., ymax = 1.5, linewidth=0.5, color='k') # label other lines for clarity for k in range(0, len(lines)): vel_off_aas = (lines[k] - line) / line * c vel_off_kms = vel_off_aas.to('km/s') / (u.km / u.s) plt.axvline(x=vcen[i]+vel_off_kms, ymin=0., ymax = 1.5, linewidth=1, color='k', linestyle='dotted') # define the data directory
def calculate(wave, flux, line, fosc): # define the velocity scale [AA / s] vel_aas = (wave - line*(1+zem[i])) / (line*(1+zem[i])) * c # convert to [km / s] vel_kms = vel_aas.to('km/s') # label other lines for clarity tau = np.log(1/flux) for k in range(0, len(lines)): vel_off_aas = (lines[k] - line) / line * c vel_off_kms = vel_off_aas.to('km/s') / (u.km / u.s) return(tau)
def test_dbquantity_to(dbquantity): expr = dbquantity.to(u.km).value # value * :value_1 value_1 = expr.right.value assert_almost_equal(value_1, 0.001) # check value_1
def test_dbquantity_add_diff_units(dbquantity): expr = (dbquantity + 0.1*u.km).value # value + :value_1 value_1 = expr.right.value assert_almost_equal(value_1, 100)
def test_dbquantity_rsub_diff_untis(dbquantity): expr = (0.1*u.km - dbquantity).value # :param_1 - :value_1 * value value_1 = expr.right.left.value param_1 = expr.left.value assert_almost_equal(value_1, 0.001) assert_almost_equal(param_1, 0.1)
def test_dbquantity_greater_diff_units(dbquantity): expr = dbquantity > 0.1*u.km # value > :value_1 value_1 = expr.right.value assert_almost_equal(value_1, 100)
def test_rho_as_angle(): # arctan(100 km, 1 au) * 206264.806 = 0.13787950659645942 rho = rho_as_angle(100 * u.km, {'delta': 1 * u.au}) assert np.isclose(rho.to(u.arcsec).value, 0.13787950659645942)
def test_rho_as_length(): # 1 au * tan(1") = 725.2709438078363 rho = rho_as_length(1 * u.arcsec, {'delta': 1 * u.au}) assert np.isclose(rho.to(u.km).value, 725.2709438078363)
def test_as_angle(self): r = 100 * u.km aper = CircularAperture(r) eph = {'delta': 1 * u.au} assert aper.as_angle(eph).dim == rho_as_angle(r, eph)
def test_as_angle(self): shape = [100, 200] * u.km aper = AnnularAperture(shape) eph = {'delta': 1 * u.au} assert all(aper.as_angle(eph).dim == rho_as_angle(shape, eph))
def test_as_angle(self): shape = [100, 200] * u.km aper = RectangularAperture(shape) eph = {'delta': 1 * u.au} assert all(aper.as_angle(eph).dim == rho_as_angle(shape, eph))
def test_column_density_small_aperture(self): """Test column density for aperture << lengthscale. Should be within 1% of ideal value. """ Q = 1e28 / u.s v = 1 * u.km / u.s rho = 10 * u.km parent = 1e4 * u.km N_avg = 2 * Haser(Q, v, parent).column_density(rho) ideal = Q / v / 2 / rho assert np.isclose(N_avg.decompose().value, ideal.decompose().value, rtol=0.01)
def test_total_number_large_aperture(self): """Test column density for aperture >> lengthscale.""" Q = 1 / u.s v = 1 * u.km / u.s rho = 1000 * u.km parent = 10 * u.km N = Haser(Q, v, parent).total_number(rho) ideal = Q * parent / v assert np.isclose(N, ideal.decompose().value) # TEST FAILS # # def test_total_number_rho_NJ78(self): # """Reproduce Newburn and Johnson 1978. # Species, N observed, Q/v (km**-1) # CN, 6.4e26, 5.8e23 # C3, 8.3e28, 9.0e23 # C2, 7.8e27, 5.9e24 # Cannot reproduce C3 quoted in paper. # """ # #Nobs = [6.41756750e26, 8.63191842e+28, 7.81278300e27] # #Nobs = [6.4e26, 4.2e27, 7.8e27] # Nobs = [6.4e26, 8.3e28, 7.8e27] # parent = [1.4e4, 0, 1.0e4] * u.km # daughter = [1.7e5, 4.6e4, 7.6e4] * u.km # Q = [5.8e23, 9.0e23, 5.9e24] / u.s # rho = 3300 * u.km # N = np.zeros(3) # for i in range(3): # coma = Haser(Q[i], 1 * u.km / u.s, parent[i], daughter[i]) # N[i] = coma.total_number(rho) # assert np.allclose(N, Nobs)
def test_circular_integration_0step(self): """Compare total number and integrated column density for circle.""" from ..core import CircularAperture Nobs = 6.41756750e26 parent = 1.4e4 * u.km Q = 5.8e23 / u.s v = 1 * u.km / u.s aper = CircularAperture(3300 * u.km) coma = Haser(Q, v, parent) N1 = coma.total_number(aper) N2 = coma._integrate_column_density(aper) assert np.isclose(N1, N2)
def test_total_number_rectangular_ap(self): """ compare with: import astropy.units as u from sbpy.imageanalysis.utils import rarray from sbpy.activity import Haser r = rarray((5000, 3300), subsample=10) * u.km parent = 1.4e4 * u.km daughter = 1.7e5 * u.km Q = 5.8e23 / u.s v = 1 * u.km / u.s coma = Haser(Q, v, parent, daughter) sigma = coma.column_density(r) print((sigma * 1 * u.km**2).decompose()) --> <Quantity 3.449607967230623e+26> This differs from the test value below by XXX """ from ..core import RectangularAperture parent = 1.4e4 * u.km daughter = 1.7e5 * u.km Q = 5.8e23 / u.s v = 1 * u.km / u.s aper = RectangularAperture([5000, 3300] * u.km) coma = Haser(Q, v, parent, daughter) N = coma.total_number(aper) assert np.isclose(N, 3.449607967230623e+26)
def compute(): ''' ''' # Kepler-444 and simulation parameters Nocc = 15 # number of occultations observed nout = 4.0 # obseved out of transit durations [tint] lammin = 1.0 # min wavelength [um] lammax = 30.0 # max wavelength [um] Tstar = 5040. # stellar temperature [K] Rs = 0.752 # stellar radius [Solar Radii] Rp = 0.4 # planet radius [Earth Radii] d = 36. # distance to system [pc] # Additional params for K-444b r = 0.04178 # semi-major axis [AU] A = 0. # Planet albedo e = 1.0 # Planet emissivity i = 88.0 # Orbital inclination [degrees] P = 3.6 # Orbital period [days] # Convert some units to km Rs_km = Rs * u.solRad.in_units(u.km) Rp_km = Rp * u.earthRad.in_units(u.km) r_km = r * u.AU.in_units(u.km) P_mins = P * 24. * 60. # Planet temperature [K] Tplan = Tstar * ((1.-A)/e)**0.25 * (0.5*Rs_km/r_km)**0.5 # transit duration tdur_mins = (P_mins / np.pi) * np.arcsin(np.sqrt((Rp_km + Rs_km) ** 2 - r_km / (Rs_km * np.cos(i))) / r_km) # integration time [seconds] tdur = Nocc * tdur_mins * 60.0 print("Planet Temperature : %.1f K" %Tplan) print("Transit Duration : %.2f mins" %(tdur_mins)) # Estimate for JWST fig1, ax1 = jwst.estimate_eclipse_snr(tint = tdur, nout = nout, lammin = lammin, lammax = lammax, Tstar = Tstar, Tplan = Tplan, Rs = Rs, Rp = Rp, d = d) # Estimate for an OST-like telescope, but using the JWST filters fig2, ax2 = jwst.estimate_eclipse_snr(tint = tdur, nout = nout, lammin = lammin, lammax = lammax, Tstar = Tstar, Tplan = Tplan, Rs = Rs, Rp = Rp, d = d, atel = 144., thermal = False) return fig1, ax1, fig2, ax2
def __init__(self, P_min, P_max, trend_cls=None, jitter=None, jitter_unit=None, anomaly_tol=1E-10): # the names of the default parameters self.default_params = ['P', 'phi0', 'ecc', 'omega', 'jitter', 'K'] self.P_min = P_min self.P_max = P_max self.anomaly_tol = float(anomaly_tol) # TODO: validate the specified long-term velocity trends # if trend is not None and not isinstance(trend, PolynomialVelocityTrend): # raise TypeError("Velocity trends must be PolynomialVelocityTrend " # "instances, not '{0}'".format(type(trend))) if trend_cls is None: # by default, assume constant trend_cls = VelocityTrend1 self.trend_cls = trend_cls self._n_trend = len(self.trend_cls.parameters) # validate the input jitter specification if jitter is None: jitter = 0 * u.km/u.s if isiterable(jitter): if len(jitter) != 2: raise ValueError("If specifying parameters for the jitter prior, you " "must pass in a length-2 container containing the " "mean and standard deviation of the Gaussian over " "log(jitter^2)") if jitter_unit is None or not isinstance(jitter_unit, u.UnitBase): raise TypeError("If specifying parameters for the jitter prior, you " "must also specify the units of the jitter for " "evaluating the prior as an astropy.units.UnitBase " "instance.") self._fixed_jitter = False self._jitter_unit = jitter_unit self.jitter = jitter else: self._fixed_jitter = True self.jitter = jitter
def __init__(self, seed=42): np.random.seed(seed) EPOCH = np.random.uniform(0., 40) self.data = OrderedDict() self.joker_params = OrderedDict() self.truths = OrderedDict() P = np.random.uniform(40, 80) * u.day mjd = np.random.uniform(0, 300., 8) _genmjd = mjd + (EPOCH % P.value) # First just a binary truth = dict() truth['P'] = P truth['K'] = np.random.uniform(5, 15) * u.km/u.s truth['phi0'] = np.random.uniform(0., 2*np.pi) * u.radian truth['omega'] = np.random.uniform(0., 2*np.pi) * u.radian truth['ecc'] = np.random.uniform() self.v0 = np.random.uniform(-100, 100) * u.km/u.s orbit = RVOrbit(**truth) rv = orbit.generate_rv_curve(mjd) + self.v0 err = np.full_like(rv.value, 0.01) * u.km/u.s data = RVData(mjd, rv, stddev=err) self.data['binary'] = data self.joker_params['binary'] = JokerParams(P_min=8*u.day, P_max=1024*u.day) self.truths['binary'] = truth.copy() self.truths['binary']['phi0'] = self.truths['binary']['phi0'] - ((2*np.pi*data.t_offset/P.value))*u.radian # hierarchical triple - long term velocity trend self.v1 = np.random.uniform(-1, 1) * u.km/u.s/u.day orbit = RVOrbit(**truth) rv = orbit.generate_rv_curve(mjd) + self.v0 + self.v1*(mjd-mjd.min())*u.day err = np.full_like(rv.value, 0.01) * u.km/u.s data = RVData(mjd, rv, stddev=err, t_offset=mjd.min()) self.data['triple'] = data self.joker_params['triple'] = JokerParams(P_min=8*u.day, P_max=1024*u.day, trend_cls=VelocityTrend2) self.truths['triple'] = truth.copy() self.truths['triple']['phi0'] = self.truths['triple']['phi0'] - ((2*np.pi*data.t_offset/P.value))*u.radian # Binary on circular orbit truth = dict() truth['P'] = P truth['K'] = np.random.uniform(5, 15) * u.km/u.s truth['phi0'] = np.random.uniform(0., 2*np.pi) * u.radian truth['omega'] = 0*u.radian truth['ecc'] = 0. orbit = RVOrbit(**truth) rv = orbit.generate_rv_curve(_genmjd) + self.v0 err = np.full_like(rv.value, 0.1) * u.km/u.s data = RVData(mjd+EPOCH, rv, stddev=err) self.data['circ_binary'] = data self.joker_params['circ_binary'] = JokerParams(P_min=8*u.day, P_max=1024*u.day) self.truths['circ_binary'] = truth.copy() self.truths['circ_binary']['phi0'] = self.truths['circ_binary']['phi0'] - (2*np.pi*data.t_offset/P.value)*u.radian
def MacroBroad(data, vmacro, extend=True): """ This broadens the data by a given macroturbulent velocity. It works for small wavelength ranges. I need to make a better version that is accurate for large wavelength ranges! Sorry for the terrible variable names, it was copied from convol.pro in AnalyseBstar (Karolien Lefever) Parameters: =========== -data: kglib.utils.DataStructures.xypoint instance Stores the data to be broadened. The data MUST be equally-spaced before calling this! -vmacro: float The macroturbulent velocity, in km/s -extend: boolean If true, the y-axis will be extended to avoid edge-effects Returns: ======== A broadened version of data. """ # Make the kernel c = constants.c.cgs.value * units.cm.to(units.km) sq_pi = np.sqrt(np.pi) lambda0 = np.median(data.x) xspacing = data.x[1] - data.x[0] mr = vmacro * lambda0 / c ccr = 2 / (sq_pi * mr) px = np.arange(-data.size() / 2, data.size() / 2 + 1) * xspacing pxmr = abs(px) / mr profile = ccr * (np.exp(-pxmr ** 2) + sq_pi * pxmr * (erf(pxmr) - 1.0)) # Extend the xy axes to avoid edge-effects, if desired if extend: before = data.y[-profile.size / 2 + 1:] after = data.y[:profile.size / 2] extended = np.r_[before, data.y, after] first = data.x[0] - float(int(profile.size / 2.0 + 0.5)) * xspacing last = data.x[-1] + float(int(profile.size / 2.0 + 0.5)) * xspacing x2 = np.linspace(first, last, extended.size) conv_mode = "valid" else: extended = data.y.copy() x2 = data.x.copy() conv_mode = "same" # Do the convolution newdata = data.copy() newdata.y = fftconvolve(extended, profile / profile.sum(), mode=conv_mode) return newdata
def get_rv(vel, corr, Npix=None, **fit_kws): """ Get the best radial velocity, with errors. This will only work if the ccf was made with the maximum likelihood method! Uses the formula given in Zucker (2003) MNRAS, 342, 4 for the rv error. Parameters: =========== - vel: numpy.ndarray The velocities - corr: numpy.ndarray The ccf values. Should be the same size as vel - Npix: integer The number of pixels used in the CCF. Returns: ======== -rv: float The best radial velocity, in km/s -rv_err: float Uncertainty in the velocity, in km/s -ccf: float The CCF power at the maximum velocity. """ vel = np.atleast_1d(vel) corr = np.atleast_1d(corr) sorter = np.argsort(vel) fcn = spline(vel[sorter], corr[sorter]) fcn_prime = fcn.derivative(1) fcn_2prime = fcn.derivative(2) guess = vel[np.argmax(corr)] def errfcn(v): ll = 1e6*fcn_prime(v)**2 return ll out = minimize_scalar(errfcn, bounds=(guess-2, guess+2), method='bounded') rv = out.x if Npix is None: Npix = vel.size rv_var = -(Npix * fcn_2prime(rv) * (fcn(rv) / (1 - fcn(rv) ** 2))) ** (-1) return rv, np.sqrt(rv_var), fcn(rv)
def astropy_smooth(data, vel, linearize=False, kernel=convolution.Gaussian1DKernel, **kern_args): """ Smooth using a gaussian filter, using astropy. Parameters: =========== - data: kglib.utils.DataStructures.xypoint instance The data to smooth. - vel: float The velocity scale to smooth out. Can either by an astropy quantity or a float in km/s - linearize: boolean If True, we will put the data in a constant log-wavelength spacing grid before smoothing. The output has the same spacing as the input regardless of this variable. - kernel: astropy.convolution kernel The astropy kernel to use. The default is the Gaussian1DKernel. - kern_args: Additional kernel arguments beyond width Returns: ======== A smoothed version of the data, on the same wavelength grid as the data """ if linearize: original_data = data.copy() datafcn = spline(data.x, data.y, k=3) linear = DataStructures.xypoint(data.x.size) linear.x = np.logspace(np.log10(data.x[0]), np.log10(data.x[-1]), linear.size()) linear.y = datafcn(linear.x) data = linear # Figure out feature size in pixels if not isinstance(vel, u.quantity.Quantity): vel *= u.km / u.second featuresize = (vel / constants.c).decompose().value dlam = np.log(data.x[1] / data.x[0]) Npix = featuresize / dlam # Make kernel and smooth kern = kernel(Npix, **kern_args) smoothed = convolution.convolve(data.y, kern, boundary='extend') if linearize: fcn = spline(data.x, smoothed) return fcn(original_data.x) return smoothed
def photo_lengthscale(species, source=None): """Photodissociation lengthscale for a gas species. Parameters ---------- species : string The species to look up. source : string, optional Retrieve values from this source (case insensitive). See references for keys. Returns ------- gamma : astropy Quantity The lengthscale at 1 au. Example ------- >>> from sbpy.activity import photo_lengthscale >>> gamma = photo_lengthscale('OH') References ---------- [CS93] H2O and OH from Table IV of Cochran & Schleicher 1993, Icarus 105, 235-253. Quoted for intermediate solar activity. """ data = { # (value, {key feature: ADS bibcode}) 'H2O': { 'CS93': (2.4e4 * u.km, {'H2O photodissociation lengthscale': '1993Icar..105..235C'})}, 'OH': { 'CS93': (1.6e5 * u.km, {'OH photodissociation lengthscale': '1993Icar..105..235C'})}, } default_sources = { 'H2O': 'CS93', 'OH': 'CS93', } assert species.upper() in data, "No timescale available for {}. Choose from: {}".format( species, ', '.join(data.keys())) gas = data[species.upper()] source = default_sources[species.upper()] if source is None else source assert source.upper() in gas, 'Source key {} not available for {}. Choose from: {}'.format( source, species, ', '.join(gas.keys())) gamma, bibcode = gas[source.upper()] bib.register('activity.gas.photo_lengthscale', bibcode) return gamma