我们从Python开源项目中,提取了以下38个代码示例,用于说明如何使用astropy.units.m()。
def test_rejection_sample(self): rnd = np.random.RandomState(42) data = self.data['binary'] joker = TheJoker(self.joker_params['binary'], random_state=rnd) with pytest.raises(ValueError): joker.rejection_sample(data) joker.rejection_sample(data, n_prior_samples=128) # check that jitter is always set to the fixed value jitter = 5.*u.m/u.s params = JokerParams(P_min=8*u.day, P_max=1024*u.day, jitter=jitter) joker = TheJoker(params) prior_samples = joker.sample_prior(128) assert quantity_allclose(prior_samples['jitter'], jitter) full_samples = joker.rejection_sample(data, n_prior_samples=128) assert quantity_allclose(full_samples['jitter'], jitter)
def create_LOFAR_configuration(antfile: str, meta: dict = None) -> Configuration: """ Define from the LOFAR configuration file :param antfile: :param meta: :return: Configuration """ antxyz = numpy.genfromtxt(antfile, skip_header=2, usecols=[1, 2, 3], delimiter=",") nants = antxyz.shape[0] assert antxyz.shape[1] == 3, "Antenna array has wrong shape %s" % antxyz.shape anames = numpy.genfromtxt(antfile, dtype='str', skip_header=2, usecols=[0], delimiter=",") mounts = numpy.repeat('XY', nants) location = EarthLocation(x=[3826923.9] * u.m, y=[460915.1] * u.m, z=[5064643.2] * u.m) fc = Configuration(location=location, names=anames, mount=mounts, xyz=antxyz, frame='global', diameter=35.0) return fc
def density(self, density: u.kg/u.m**3): """Sets the simulation's density profile to the specified array. Other arrays which depend on the density values, such as the kinetic pressure, are then redefined automatically. Parameters ---------- density : ndarray Array of density values. Shape and size must be equal to those of the simulation grid. Must have units of density. """ assert density.shape == self.domain_shape, """ Specified density array shape {} does not match simulation grid {}. """.format(density.shape, self.domain_shape) self._density = density # Momentum
def momentum(self, momentum: u.kg/(u.m**2 * u.s)): """Sets the simulation's momentum profile to the specified array. Other arrays which depend on the velocity values, such as the kinetic pressure, are then redefined automatically. Parameters ---------- momentum : ndarray Array of momentum vectors. Shape must be (3, x, [y, z]), where x, y and z are the dimensions of the simulation grid. Note that a full 3D vector is necessary even if the simulation has fewer than 3 dimensions. """ assert momentum.shape == (3, *self.domain_shape), """ Specified density array shape {} does not match simulation grid {}. """.format(momentum.shape, (3, *self.domain_shape)) self._momentum = momentum # Internal energy
def energy(self, energy: u.J/u.m**3): """Sets the simulation's total energy density profile to the specified array. Other arrays which depend on the energy values, such as the kinetic pressure, are then redefined automatically. Parameters ---------- energy : ndarray Array of energy values. Shape must be (x, y, z), where x, y, and z are the grid sizes of the simulation in the x, y, and z dimensions. Must have units of energy. """ assert energy.shape == self.domain_shape, """ Specified density array shape {} does not match simulation grid {}. """.format(energy.shape, self.domain_shape) self._energy = energy # Magnetic field
def velocity(self, velocity: u.m / u.s): """Defines the velocity throughout the simulation, and automatically updates the momentum based on the current density values. Parameters ---------- velocity : ndarray Array of velocity vectors with shape (3, x, [y, z]) where x, y and z are the spatial grid sizes of the simulation. Note that a full 3D vector is required even if the simulation is run for fewer than 3 dimensions. Must have units of velocity. """ assert velocity.shape == (3, *self.domain_shape), """Specified velocity array shape does not match simulation grid.""" self.momentum = velocity * self.density
def rho_as_angle(rho, eph): """Projected linear distance to angular distance. Parameters ---------- rho : `~astropy.units.Quantity` Projected distance in units of length. eph : dictionary-like or `~sbpy.data.Ephem` Ephemerides; requires geocentric distance as `delta`. Returns ------- rho_l : `~astropy.units.Quantity` """ if rho.unit.is_equivalent(u.m): rho_a = np.arctan(rho / eph['delta'].to(u.m)) else: assert rho.unit.is_equivalent(u.rad) rho_a = rho return rho_a
def rho_as_length(rho, eph): """Angular distance to projected linear distance. Parameters ---------- rho : `~astropy.units.Quantity` Projected distance in units of angle. eph : dictionary-like or `~sbpy.data.Ephem` Ephemerides; requires geocentric distance as `delta`. Returns ------- rho_l : `~astropy.units.Quantity` """ if rho.unit.is_equivalent(u.rad): rho_l = eph['delta'].to(u.m) * np.tan(rho) else: assert rho.unit.is_equivalent(u.m) rho_l = rho return rho_l
def setup(self): mjd = np.linspace(55555., 56555., 256) self.data = RVData(mjd, np.sin(mjd)*u.km/u.s, stddev=0.1*np.random.uniform(size=mjd.size)*u.km/u.s) n = 128 samples = dict() samples['P'] = np.random.uniform(size=n) * u.day samples['phi0'] = np.random.uniform(size=n) * u.radian samples['omega'] = np.random.uniform(size=n) * u.radian samples['ecc'] = np.random.uniform(size=n) * u.one samples['jitter'] = np.random.uniform(size=n) * u.m/u.s self.samples = samples self.n = n
def test_shit(): joker_params = JokerParams(P_min=8*u.day, P_max=32768*u.day, jitter=0*u.m/u.s) joker = TheJoker(joker_params) t = np.random.uniform(0, 250, 16) + 56831.324 t.sort() rv = np.cos(t) rv_err = np.random.uniform(0.1, 0.2, t.size) data = RVData(t=t, rv=rv*u.km/u.s, stddev=rv_err*u.km/u.s) samples = joker.sample_prior(size=16384) chunk = [] for k in samples: chunk.append(np.array(samples[k])) chunk = np.ascontiguousarray(np.vstack(chunk).T) t0 = time.time() cy_ll = batch_marginal_ln_likelihood(chunk, data, joker_params) print("Cython:", time.time() - t0) t0 = time.time() n_chunk = len(chunk) py_ll = np.zeros(n_chunk) for i in range(n_chunk): try: py_ll[i] = marginal_ln_likelihood(chunk[i], data, joker_params) except Exception as e: py_ll[i] = np.nan print("Python:", time.time() - t0) assert np.allclose(np.array(cy_ll), py_ll)
def test_plotting(): # check that plotting at least succeeds with allowed arguments t = np.random.uniform(55555., 56012., size=128) rv = 100 * np.sin(0.5*t) * u.km/u.s ivar = 1 / (np.random.normal(0,5,size=t.size)*u.km/u.s)**2 data = RVData(t=t, rv=rv, ivar=ivar) data.plot() # style data.plot(color='r') # custom axis fig,ax = plt.subplots(1,1) data.plot(ax=plt.gca()) # formatting data.plot(rv_unit=u.m/u.s) data.plot(rv_unit=u.m/u.s, time_format='jd') data.plot(rv_unit=u.m/u.s, time_format=lambda x: x.utc.mjd) # try with no errors data = RVData(t=t, rv=rv) data.plot() data.plot(ecolor='r') plt.close('all')
def GetAltAzHESS(coords, date='2016-06-06 00:00:00'): ''' Get AltAz of a source at a given date for the HESS site Parameters ---------- coords : CoordinatesHandler object date : date for which the results is valid. Format is YYYY-MM-DD HH:MM:SS ''' #Site Location print "At HESS Location at ",date location = EarthLocation(lat = '-23d16m18s' , lon = '16d30m00s', height = 1800*u.m) time = Time(date, format='iso', scale='utc') return _GetAltAz(coords,location,time)
def GetAltAzLAPALMA(coords, date='2016-06-06 00:00:00'): ''' Get AltAz of a source at a given date for the HELAPALMASS site Parameters ---------- coords : CoordinatesHandler object date : date for which the results is valid. Format is YYYY-MM-DD HH:MM:SS ''' #Site Location print "At CTA North Location at ",date location = EarthLocation(lat = '28d45m43s' , lon = '-17d53m24s', height = 1800*u.m) time = Time(date, format='iso', scale='utc') return _GetAltAz(coords,location,time)
def create_configuration_from_file(antfile: str, name: str = None, location: EarthLocation = None, mount: str = 'altaz', names: str = "%d", frame: str = 'local', diameter=35.0, meta: dict = None, rmax=None, **kwargs) -> Configuration: """ Define from a file :param names: :param antfile: Antenna file name :param name: Name of array e.g. 'LOWBD2' :param location: :param mount: mount type: 'altaz', 'xy' :param frame: 'local' | 'global' :param diameter: Effective diameter of station or antenna :param meta: Any meta info :return: Configuration """ antxyz = numpy.genfromtxt(antfile, delimiter=",") assert antxyz.shape[1] == 3, ("Antenna array has wrong shape %s" % antxyz.shape) if frame == 'local': latitude = location.geodetic[1].to(u.rad).value antxyz = xyz_at_latitude(antxyz, latitude) if rmax is not None: lantxyz = antxyz - numpy.average(antxyz, axis=0) r = numpy.sqrt(lantxyz[:, 0] ** 2 + lantxyz[:, 1] ** 2 + lantxyz[:, 2] ** 2) antxyz = antxyz[r < rmax] log.debug('create_configuration_from_file: Maximum radius %.1f m includes %d antennas/stations' % (rmax, antxyz.shape[0])) nants = antxyz.shape[0] anames = [names % ant for ant in range(nants)] mounts = numpy.repeat(mount, nants) fc = Configuration(location=location, names=anames, mount=mounts, xyz=antxyz, frame=frame, diameter=diameter) return fc
def __init__(self, domain_x, domain_y, domain_z, gamma=5/3): # Define domain sizes self.x = domain_x self.y = domain_y self.z = domain_z x, y, z = self.x.si.value, self.y.si.value, self.z.si.value self.grid = np.meshgrid(x, y, z, indexing='ij') * u.m self.grid = np.squeeze(self.grid) self.domain_shape = [] for length in (len(x), len(y), len(z)): if length > 1: self.domain_shape.append(length) self.domain_shape = tuple(self.domain_shape) print(self.domain_shape) self.gamma = gamma # Initiate core plasma variables self._density = np.zeros(self.domain_shape) * u.kg / u.m**3 self._momentum = np.zeros((3, *self.domain_shape)) * u.kg \ / (u.m**2 * u.s) self._energy = np.zeros(self.domain_shape) * u.J / u.m**3 self._magnetic_field = np.zeros((3, *self.domain_shape)) * u.T # Collect core variables into a list for usefulness self.core_variables # Connect a simulation object for simulating self.simulation_physics = MHDSimulation(self)
def _planck(self, lam, Teff): """ Computes monochromatic blackbody intensity in W/m^3 using the Planck function. @lam: wavelength in m @Teff: effective temperature in K Returns: monochromatic blackbody intensity """ return 2*self.h*self.c*self.c/lam**5 * 1./(np.exp(self.h*self.c/lam/self.k/Teff)-1)
def loadFile(input_file): df = pd.read_csv(input_file).dropna() x = df['stereo:estimated_direction:x'] y = df['stereo:estimated_direction:y'] z = df['stereo:estimated_direction:z'] _, lat, lon = cartesian_to_spherical( x.values * u.m, y.values * u.m, z.values * u.m) alt = Angle(90 * u.deg - lat).degree az = Angle(lon).wrap_at(180 * u.deg).degree df['alt'] = alt df['az'] = az return df
def altAz(self, df): x = df['stereo:estimated_direction:x'] y = df['stereo:estimated_direction:y'] z = df['stereo:estimated_direction:z'] _, lat, lon = cartesian_to_spherical( x * u.m, y * u.m, z * u.m) alt = Angle(90 * u.deg - lat).degree az = Angle(lon).wrap_at(180 * u.deg).degree return alt, az
def dbquantity(): return DBQuantity(column("value"), u.m)
def test_dbquantity_add_same_units(dbquantity): expr = (dbquantity + 100*u.m).value # value + :value_1 value_1 = expr.right.value assert_almost_equal(value_1, 100)
def test_dbquantity_radd(dbquantity): expr = (100*u.m + dbquantity).value # :value_1 + value value_1 = expr.left.value assert_almost_equal(value_1, 100)
def test_dbquantity_sub_same_untis(dbquantity): expr = (dbquantity - 100*u.m).value # value - :value_1 value_1 = expr.right.value assert_almost_equal(value_1, 100)
def test_dbquantity_rsub_same_units(dbquantity): expr = (100*u.m - dbquantity).value # :value_1 - value value_1 = expr.left.value assert_almost_equal(value_1, 100)
def test_dbquantity_div_another_unit(dbquantity): q = dbquantity/(2*u.m) expr = q.value value_1 = expr.right.value assert_almost_equal(value_1, 2) assert q.unit == u.Unit("")
def test_dbquantity_greater_same_units(dbquantity): expr = dbquantity > 100*u.m # value > :value_1 value_1 = expr.right.value assert_almost_equal(value_1, 100)
def test_exponential(): p = exponential(x="z",A="P_0",x_0="z_0",x_s="z_s") P_0 = 100.0*apu.kPa z_0 = 0.0*apu.m z_s = -100.0*apu.m z = apu.Quantity(np.linspace(0,1000.,100),"m") p.set_param_values(P_0=P_0, z_0=z_0, z_s=z_s) assert_allclose(p(z).value, (P_0*np.exp((z-z_0)/z_s)).value) assert str(p(z).unit) == str(P_0.unit)
def convert_time(in_time): """Converts time to seconds since epoch Args: in_time (str): time obtained from header's keyword DATE-OBS Returns: time in seconds since epoch """ return time.mktime(time.strptime(in_time, "%Y-%m-%dT%H:%M:%S.%f"))
def print_default_args(args): """Print default values of arguments. This is mostly helpful for debug but people not familiar with the software might find it useful as well Notes: This function is deprecated. Notes: This is not dynamically updated so use with caution Args: args (object): An argparse instance """ arg_name = {'auto_clean': '--auto-clean', 'clean_cosmic': '-c, --cosmic', 'debug_mode': '--debug', 'flat_normalize': '--flat-normalize', 'ignore_bias': '--ignore-bias', 'log_to_file': '--log-to-file', 'norm_order': '--flat-norm-order', 'raw_path': '--raw-path', 'red_path': '--red-path', 'saturation_limit': '--saturation', 'destiny': '-d --proc-path', 'interactive_ws': '-i --interactive', 'lamp_all_night': '-r --reference-lamp', 'lamp_file': '-l --lamp-file', 'output_prefix': '-o --output-prefix', 'pattern': '-s --search-pattern', 'procmode': '-m --proc-mode', 'reference_dir': '-R --reference-files', 'source': '-p --data-path', 'save_plots': '--save-plots', 'dcr_par_dir': '--dcr-par-dir'} for key in args.__dict__: log_ccd.debug('Default value for {:s} is {:s}'.format( arg_name[key], str(args.__getattribute__(key))))
def test_from_flam(self): fluxd = 6.730018324465526e-14 * u.W / u.m**2 / u.um aper = 1 * u.arcsec eph = dict(rh=1.5 * u.au, delta=1.0 * u.au) S = 1869 * u.W / u.m**2 / u.um afrho = Afrho.from_fluxd(None, fluxd, aper, eph, S=S) assert np.isclose(afrho.cm, 1000)
def test_fluxd(self): afrho = Afrho(1000, 'cm') aper = 1 * u.arcsec eph = dict(rh=1.5 * u.au, delta=1.0 * u.au) S = 1869 * u.W / u.m**2 / u.um fluxd = afrho.fluxd(None, aper, eph, S=S) assert fluxd.unit.is_equivalent(S.unit) assert np.isclose(fluxd.value, 6.730018324465526e-14)
def test_from_flam(self): fluxd = 3.824064455850402e-15 * u.W / u.m**2 / u.um wave = 10 * u.um aper = 1 * u.arcsec eph = dict(rh=1.5 * u.au, delta=1.0 * u.au) efrho = Efrho.from_fluxd(wave, fluxd, aper, eph) assert np.isclose(efrho.cm, 1000)
def __init__(self, Q, v): assert isinstance(Q, u.Quantity) assert Q.unit.is_equivalent((u.s**-1, u.mol / u.s)) self.Q = Q assert isinstance(v, u.Quantity) assert v.unit.is_equivalent(u.m / u.s) self.v = v
def __init__(self, Q, v, parent, daughter=None): super(Haser, self).__init__(Q, v) bib.register('activity.gas.Haser', {'model': '1957BSRSL..43..740H'}) assert isinstance(parent, u.Quantity) assert parent.unit.is_equivalent(u.m) self.parent = parent if daughter is None: self.daughter = None else: assert isinstance(daughter, u.Quantity) assert daughter.unit.is_equivalent(u.m) self.daughter = daughter
def _convert_unit(self, rho, eph): """Make units match those of self.""" if not self.dim.unit.is_equivalent(rho.unit): if rho.unit.is_equivalent(u.m): x = rho_as_angle(rho, eph) else: x = rho_as_length(rho, eph) else: x = rho return x
def create_named_configuration(name: str = 'LOWBD2', **kwargs) -> Configuration: """ Standard configurations e.g. LOWBD2, MIDBD2 :param name: name of Configuration LOWBD2, LOWBD1, LOFAR, VLAA :param rmax: Maximum distance of station from the average (m) :return: For LOWBD2, setting rmax gives the following number of stations 100.0 13 300.0 94 1000.0 251 3000.0 314 10000.0 398 30000.0 476 100000.0 512 """ if name == 'LOWBD2': location = EarthLocation(lon="116.4999", lat="-26.7000", height=300.0) fc = create_configuration_from_file(antfile=arl_path("data/configurations/LOWBD2.csv"), location=location, mount='xy', names='LOWBD2_%d', diameter=35.0, **kwargs) elif name == 'LOWBD1': location = EarthLocation(lon="116.4999", lat="-26.7000", height=300.0) fc = create_configuration_from_file(antfile=arl_path("data/configurations/LOWBD1.csv"), location=location, mount='xy', names='LOWBD1_%d', diameter=35.0, **kwargs) elif name == 'LOWBD2-CORE': location = EarthLocation(lon="116.4999", lat="-26.7000", height=300.0) fc = create_configuration_from_file(antfile=arl_path("data/configurations/LOWBD2-CORE.csv"), location=location, mount='xy', names='LOWBD2_%d', diameter=35.0, **kwargs) elif name == 'LOFAR': assert get_parameter(kwargs, "meta", False) is False fc = create_LOFAR_configuration(antfile=arl_path("data/configurations/LOFAR.csv")) elif name == 'VLAA': location = EarthLocation(lon="-107.6184", lat="34.0784", height=2124.0) fc = create_configuration_from_file(antfile=arl_path("data/configurations/VLA_A_hor_xyz.csv"), location=location, mount='altaz', names='VLA_%d', diameter=25.0, **kwargs) elif name == 'VLAA_north': location = EarthLocation(lon="-107.6184", lat="90.000", height=2124.0) fc = create_configuration_from_file(antfile=arl_path("data/configurations/VLA_A_hor_xyz.csv"), location=location, mount='altaz', names='VLA_%d', diameter=25.0, **kwargs) else: raise ValueError("No such Configuration %s" % name) return fc
def compute_ck2004_response(self, path, verbose=False): """ Computes Castelli & Kurucz (2004) intensities across the entire range of model atmospheres. @path: path to the directory containing ck2004 SEDs @verbose: switch to determine whether computing progress should be printed on screen Returns: n/a """ models = glob.glob(path+'/*M1.000*') Nmodels = len(models) # Store the length of the filename extensions for parsing: offset = len(models[0])-models[0].rfind('.') Teff, logg, abun = np.empty(Nmodels), np.empty(Nmodels), np.empty(Nmodels) InormE, InormP = np.empty(Nmodels), np.empty(Nmodels) if verbose: print('Computing Castelli & Kurucz (2004) passband intensities for %s:%s. This will take a while.' % (self.pbset, self.pbname)) for i, model in enumerate(models): #~ spc = np.loadtxt(model).T -- waaay slower spc = np.fromfile(model, sep=' ').reshape(-1,2).T Teff[i] = float(model[-17-offset:-12-offset]) logg[i] = float(model[-11-offset:-9-offset])/10 sign = 1. if model[-9-offset]=='P' else -1. abun[i] = sign*float(model[-8-offset:-6-offset])/10 spc[0] /= 1e10 # AA -> m spc[1] *= 1e7 # erg/s/cm^2/A -> W/m^3 wl = spc[0][(spc[0] >= self.ptf_table['wl'][0]) & (spc[0] <= self.ptf_table['wl'][-1])] fl = spc[1][(spc[0] >= self.ptf_table['wl'][0]) & (spc[0] <= self.ptf_table['wl'][-1])] fl *= self.ptf(wl) flP = fl*wl InormE[i] = np.log10(fl.sum()/self.ptf_area*(wl[1]-wl[0])) # energy-weighted intensity InormP[i] = np.log10(flP.sum()/self.ptf_photon_area*(wl[1]-wl[0])) # photon-weighted intensity if verbose: if 100*i % (len(models)) == 0: print('%d%% done.' % (100*i/(len(models)-1))) # Store axes (Teff, logg, abun) and the full grid of Inorm, with # nans where the grid isn't complete. self._ck2004_axes = (np.unique(Teff), np.unique(logg), np.unique(abun)) self._ck2004_energy_grid = np.nan*np.ones((len(self._ck2004_axes[0]), len(self._ck2004_axes[1]), len(self._ck2004_axes[2]), 1)) self._ck2004_photon_grid = np.nan*np.ones((len(self._ck2004_axes[0]), len(self._ck2004_axes[1]), len(self._ck2004_axes[2]), 1)) for i, I0 in enumerate(InormE): self._ck2004_energy_grid[Teff[i] == self._ck2004_axes[0], logg[i] == self._ck2004_axes[1], abun[i] == self._ck2004_axes[2], 0] = I0 for i, I0 in enumerate(InormP): self._ck2004_photon_grid[Teff[i] == self._ck2004_axes[0], logg[i] == self._ck2004_axes[1], abun[i] == self._ck2004_axes[2], 0] = I0 # Tried radial basis functions but they were just terrible. #~ self._log10_Inorm_ck2004 = interpolate.Rbf(self._ck2004_Teff, self._ck2004_logg, self._ck2004_met, self._ck2004_Inorm, function='linear') self.content.append('ck2004') self.atmlist.append('ck2004')
def compute_ck2004_ldcoeffs(self, plot_diagnostics=False): if 'ck2004_all' not in self.content: print('Castelli & Kurucz (2004) intensities are not computed yet. Please compute those first.') return None self._ck2004_ld_energy_grid = np.nan*np.ones((len(self._ck2004_intensity_axes[0]), len(self._ck2004_intensity_axes[1]), len(self._ck2004_intensity_axes[2]), 11)) self._ck2004_ld_photon_grid = np.nan*np.ones((len(self._ck2004_intensity_axes[0]), len(self._ck2004_intensity_axes[1]), len(self._ck2004_intensity_axes[2]), 11)) mus = self._ck2004_intensity_axes[3] for Tindex in range(len(self._ck2004_intensity_axes[0])): for lindex in range(len(self._ck2004_intensity_axes[1])): for mindex in range(len(self._ck2004_intensity_axes[2])): IsE = 10**self._ck2004_Imu_energy_grid[Tindex,lindex,mindex,:].flatten() fEmask = np.isfinite(IsE) if len(IsE[fEmask]) <= 1: continue IsE /= IsE[fEmask][-1] IsP = 10**self._ck2004_Imu_photon_grid[Tindex,lindex,mindex,:].flatten() fPmask = np.isfinite(IsP) IsP /= IsP[fPmask][-1] cElin, pcov = cfit(self._ldlaw_lin, mus[fEmask], IsE[fEmask], p0=[0.5]) cElog, pcov = cfit(self._ldlaw_log, mus[fEmask], IsE[fEmask], p0=[0.5, 0.5]) cEsqrt, pcov = cfit(self._ldlaw_sqrt, mus[fEmask], IsE[fEmask], p0=[0.5, 0.5]) cEquad, pcov = cfit(self._ldlaw_quad, mus[fEmask], IsE[fEmask], p0=[0.5, 0.5]) cEnlin, pcov = cfit(self._ldlaw_nonlin, mus[fEmask], IsE[fEmask], p0=[0.5, 0.5, 0.5, 0.5]) self._ck2004_ld_energy_grid[Tindex, lindex, mindex] = np.hstack((cElin, cElog, cEsqrt, cEquad, cEnlin)) cPlin, pcov = cfit(self._ldlaw_lin, mus[fPmask], IsP[fPmask], p0=[0.5]) cPlog, pcov = cfit(self._ldlaw_log, mus[fPmask], IsP[fPmask], p0=[0.5, 0.5]) cPsqrt, pcov = cfit(self._ldlaw_sqrt, mus[fPmask], IsP[fPmask], p0=[0.5, 0.5]) cPquad, pcov = cfit(self._ldlaw_quad, mus[fPmask], IsP[fPmask], p0=[0.5, 0.5]) cPnlin, pcov = cfit(self._ldlaw_nonlin, mus[fPmask], IsP[fPmask], p0=[0.5, 0.5, 0.5, 0.5]) self._ck2004_ld_photon_grid[Tindex, lindex, mindex] = np.hstack((cPlin, cPlog, cPsqrt, cPquad, cPnlin)) if plot_diagnostics: if Tindex == 10 and lindex == 9 and mindex == 5: print self._ck2004_intensity_axes[0][Tindex], self._ck2004_intensity_axes[1][lindex], self._ck2004_intensity_axes[2][mindex] print mus, IsE print cElin, cElog, cEsqrt import matplotlib.pyplot as plt plt.plot(mus[fEmask], IsE[fEmask], 'bo') plt.plot(mus[fEmask], self._ldlaw_lin(mus[fEmask], *cElin), 'r-') plt.plot(mus[fEmask], self._ldlaw_log(mus[fEmask], *cElog), 'g-') plt.plot(mus[fEmask], self._ldlaw_sqrt(mus[fEmask], *cEsqrt), 'y-') plt.plot(mus[fEmask], self._ldlaw_quad(mus[fEmask], *cEquad), 'm-') plt.plot(mus[fEmask], self._ldlaw_nonlin(mus[fEmask], *cEnlin), 'k-') plt.show() self.content.append('ck2004_ld')
def create_background_image(ccd, profile_model, nsigma, separation): """Creates a background-only image Using a profile model and assuming the spectrum is misaligned only a little bit (i.e. a couple of pixels from end to end) with respect to the lines of the detector. The number of sigmas determines the width of the zone to be masked and the separation is an offset that is added. Args: ccd (object): A ccdproc.CCDData instance. profile_model (object): An astropy.modeling.Model instance. Describes the spatial profile of the target. nsigma (float): Number of sigmas. Used to calculate the width of the target zone to be masked. separation (float): Additional offset that adds to the width of the target zone. Returns: A ccdproc.CCDData instance with the spectrum masked. """ background_ccd = ccd.copy() spatial_length, dispersion_length = background_ccd.data.shape target_profiles = [] if 'CompoundModel' in profile_model.__class__.name: log_spec.debug(profile_model.submodel_names) for m in range(len(profile_model.submodel_names)): submodel_name = profile_model.submodel_names[m] target_profiles.append(profile_model[submodel_name]) else: target_profiles.append(profile_model) for target in target_profiles: target_mean = target.mean.value target_stddev = target.stddev.value data_low_lim = np.int(np.max( [0, target_mean - (nsigma / 2. + separation) * target_stddev])) data_high_lim = np.int(np.min([spatial_length, int( target_mean + (nsigma / 2. + separation) * target_stddev)])) background_ccd.data[data_low_lim:data_high_lim, :] = 0 if False: plt.title('Background Image') plt.imshow(background_ccd.data, clim=(0, 50)) plt.show() return background_ccd