我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用astropy.units.Quantity()。
def t_E(self): """ *astropy.Quantity* The Einstein timescale. "day" is the default unit. Regardless of input value, returns value with units of u.day. May be set as a *float* --> assumes units of degrees. """ if 't_E' in self.parameters.keys(): self._check_time_quantity('t_E') return self.parameters['t_E'].to(u.day).value elif ('t_star' in self.parameters.keys() and 'rho' in self.parameters.keys()): return self.t_star/self.rho elif ('t_eff' in self.parameters.keys() and 'u_0' in self.parameters.keys()): return self.t_eff/self.u_0 else: raise KeyError("You're trying to access t_E that was not set")
def alpha(self): """ *astropy.Quantity* The angle of the source trajectory relative to the binary lens axis (or primary-secondary axis). Measured counterclockwise, i.e., according to convention advocated by `Skowron et al. 2011 (ApJ, 738, 87) <http://adsabs.harvard.edu/abs/2011ApJ...738...87S>`_. May be set as a *float* --> assumes "deg" is the default unit. Regardless of input value, returns value in degrees. """ if not isinstance(self.parameters['alpha'], u.Quantity): self.parameters['alpha'] = self.parameters['alpha'] * u.deg return self.parameters['alpha'].to(u.deg)
def _set_single_mass(self, new_mass): """ Initializes total_mass and epsilon if only one mass componenet is defined (i.e. a point lens). """ if isinstance(new_mass, u.Quantity): if new_mass.unit.physical_type == 'dimensionless': new_mass *= u.solMass elif new_mass.unit.physical_type != 'mass': msg = 'wrong physical_type of new total_mass: {:}' raise ValueError(msg.format(new_mass.unit.physical_type)) self._total_mass = new_mass else: self._total_mass = new_mass * u.solMass self._epsilon = np.array([1.0])
def _add_mass(self, new_mass, index): """ Private function: Updates the total_mass and adds a component to the epsilon array if masses are added sequentially. e.g. the lens is defined by defining mass_1 and mass_2. """ if not isinstance(new_mass, u.Quantity): new_mass *= u.solMass elif new_mass.unit.physical_type == 'dimensionless': new_mass *= u.solMass elif new_mass.unit.physical_type != 'mass': msg = 'wrong physical_type of new total_mass: {:}' raise ValueError(msg.format(new_mass.unit.physical_type)) new_total_mass = self._total_mass + new_mass self._epsilon = self._total_mass * self._epsilon / new_total_mass self._epsilon = np.insert( self._epsilon, index, new_mass / new_total_mass) self._total_mass = new_total_mass
def radial_search(self, inp, radius, **kwargs): """ Search for sources in a radius around the input coord Parameters ---------- inp : str or tuple or SkyCoord See linetools.utils.radec_to_coord for details Single coordinate radius : Angle or Quantity Tolerance for a match Returns ------- idx : int array Catalog IDs corresponding to match in order of increasing separation Returns an empty array if there is no match """ raise DeprecationWarning("THIS METHOD HAS BEEN DEPRECATED. USE query_position()")
def ioneq(self): """ Ionization equilibrium data interpolated to the given temperature Note ---- Will return NaN where interpolation is out of range of the data. For computing ionization equilibrium outside of this temperature range, it is better to use `fiasco.Element.equilibrium_ionization` """ f = interp1d(self._ioneq[self._dset_names['ioneq_filename']]['temperature'], self._ioneq[self._dset_names['ioneq_filename']]['ionization_fraction'], kind='linear', bounds_error=False, fill_value=np.nan) ioneq = f(self.temperature) isfinite = np.isfinite(ioneq) ioneq[isfinite] = np.where(ioneq[isfinite] < 0., 0., ioneq[isfinite]) return u.Quantity(ioneq)
def excitation_autoionization_rate(self): """ Calculate ionization rate due to excitation autoionization """ # Collision constant c = (const.h.cgs**2)/((2. * np.pi * const.m_e.cgs)**(1.5) * np.sqrt(const.k_B.cgs)) kBTE = u.Quantity([(const.k_B.cgs * self.temperature) / (de.to(u.erg)) for de in self._easplups['delta_energy']]) # Descale upsilon shape = self._easplups['bt_upsilon'].shape xs = np.tile(np.linspace(0, 1, shape[1]), shape[0]).reshape(shape) args = [xs, self._easplups['bt_upsilon'].value, kBTE.value, self._easplups['bt_c'].value, self._easplups['bt_type']] upsilon = u.Quantity(list(map(self.burgess_tully_descale, *args))) # Rate coefficient rate = c * upsilon * np.exp(-1 / kBTE) / np.sqrt(self.temperature[np.newaxis,:]) return rate.sum(axis=0)
def __getitem__(self, key): if type(key) is int: raise NotImplementedError('Iteration not supported.') with h5py.File(self.hdf5_dbase_root, 'r') as hf: grp = hf[self.top_level_path] if key not in grp: raise IndexError('{} not found in {} filetype'.format(key, self.top_level_path)) ds = grp[key] if isinstance(ds, h5py.Group): data = DataIndexer(self.hdf5_dbase_root, '/'.join([self.top_level_path, key])) else: if ds.attrs['unit'] == 'SKIP' or ds.dtype == 'object': data = np.array(ds, dtype=ds.dtype) else: data = u.Quantity(np.array(ds), ds.attrs['unit'], dtype=ds.dtype) if '|S' in data.dtype.str: data = data.astype(str) return data
def strip_units(self): """ Strips units from an xypoint structure. Returns: A copy of the xypoint with no units The x-units the y-units """ xunits = self.x.unit if isinstance(self.x, u.Quantity) else 1.0 yunits = self.y.unit if isinstance(self.y, u.Quantity) else 1.0 x = self.x.value if isinstance(self.x, u.Quantity) else self.x y = self.y.value if isinstance(self.y, u.Quantity) else self.y err = self.err.value if isinstance(self.err, u.Quantity) else self.err cont = self.cont.value if isinstance(self.cont, u.Quantity) else self.cont return xypoint(x=x, y=y, cont=cont, err=err), xunits, yunits
def integrate(self, wavelength_grid): """ Integrate the spectrum flux over the specified grid of wavelengths. Parameters ---------- wavelength_grid : quantity_like Returns ------- integrated_flux : :class:`~astropy.units.Quantity` """ grid = u.Quantity(wavelength_grid) grid = grid.to(self.wavelength.unit) interpolator = interp1d(self.wavelength.value, self.flux.value, kind='cubic') new_flux = interpolator(grid.value) return simps(new_flux, x=grid.value) * self.flux.unit * grid.unit
def _make_quantity(data, unit): """ Get a LogQuantity if the a LogUnit is used rather than a regular Quantity. Parameters ---------- data: numpy.ndarray the data unit: ~astropy.unit.Unit or ~astropy.unit.LogUnit The data units Returns ------- ~astropy.unit.Quantity or ~astropy.unit.LogQuantity depending on the unit type """ if isinstance(unit, LogUnit): return LogQuantity(data, unit=unit) else: return Quantity(data, unit=unit)
def column_density(self, rho, eph=None): """Coma column density at a projected distance from nucleus. Parameters ---------- rho : `~astropy.units.Quantity` Projected distance of the region of interest on the plane of the sky in units of length or angle. eph : dictionary-like or `~sbpy.data.Ephem` Ephemerides at epoch; requires geocentric distance as `delta` keyword if aperture has angular units. Returns ------- sigma : float """ pass
def total_number(self, aper, eph=None): """Total number of molecules in aperture. Parameters ---------- aper : `~astropy.units.Quantity` or `~sbpy.activity.Aperture` Observation aperture as a radius for a circular aperture (projected length, or angle) or an `Aperture` instance. eph : dictionary-like or `~sbpy.data.Ephem`, optional Ephemerides at epoch; requires geocentric distance as `delta` keyword if aperture has angular units. Returns ------- N : int """ pass
def column_density(self, rho, eph=None): from .core import rho_as_length assert isinstance(rho, u.Quantity) r = rho_as_length(rho, eph=eph) x = 0 if self.parent is None else (r / self.parent).decompose() y = 0 if self.daughter is None else (r / self.daughter).decompose() sigma = self.Q / 2 / np.pi / r / self.v if self.daughter is None or self.daughter == 0: sigma *= np.pi / 2 - self._iK0(x) elif self.parent is None or self.parent == 0: sigma *= np.pi / 2 - self._iK0(y) else: sigma *= (self.daughter / (self.parent - self.daughter) * (self._iK0(y) - self._iK0(x))) return sigma.decompose()
def rho_as_angle(rho, eph): """Projected linear distance to angular distance. Parameters ---------- rho : `~astropy.units.Quantity` Projected distance in units of length. eph : dictionary-like or `~sbpy.data.Ephem` Ephemerides; requires geocentric distance as `delta`. Returns ------- rho_l : `~astropy.units.Quantity` """ if rho.unit.is_equivalent(u.m): rho_a = np.arctan(rho / eph['delta'].to(u.m)) else: assert rho.unit.is_equivalent(u.rad) rho_a = rho return rho_a
def rho_as_length(rho, eph): """Angular distance to projected linear distance. Parameters ---------- rho : `~astropy.units.Quantity` Projected distance in units of angle. eph : dictionary-like or `~sbpy.data.Ephem` Ephemerides; requires geocentric distance as `delta`. Returns ------- rho_l : `~astropy.units.Quantity` """ if rho.unit.is_equivalent(u.rad): rho_l = eph['delta'].to(u.m) * np.tan(rho) else: assert rho.unit.is_equivalent(u.m) rho_l = rho return rho_l
def __call__(self, rho, eph=None): """Evaluate the aperture. Parameters ---------- rho : `~astropy.units.Quantity` Position to evaluate, in units of length or angle. eph : dictionary-like or `~sbpy.data.Ephem`, optional Ephemerides at epoch; requires geocentric distance as `delta` keyword if the aperture's units and `rho`'s units do not match. """ x = self._convert_unit(rho, eph) # normalize to 1.0 at the center? return np.exp(-x**2 / self.sigma**2 / 2)
def _process_map(self, tmp_map, crop=None, resample=None): """ Rotate, crop and resample map if needed. Can do any other needed processing here too. Parameters ---------- map : `~sunpy.map.Map` Original HMI map crop : `tuple` `[bottom_left_corner,top_right_corner]`, optional The lower left and upper right corners of the cropped map. Both should be of type `~astropy.units.Quantity` and have the same units as `map.xrange` and `map.yrange` resample : `~astropy.units.Quantity`, `[new_xdim,new_ydim]`, optional The new x- and y-dimensions of the resampled map, should have the same units as `map.dimensions.x` and `map.dimensions.y` """ tmp_map = tmp_map.rotate() if crop is not None: bottom_left = SkyCoord(*crop[0], frame=tmp_map.coordinate_frame) top_right = SkyCoord(*crop[1], frame=tmp_map.coordinate_frame) tmp_map = tmp_map.submap(bottom_left, top_right) if resample is not None: tmp_map = tmp_map.resample(resample, method='linear') return tmp_map
def _filter_streamlines(self, streamline, close_threshold=0.05, loop_length_range: u.cm =[2.e+9, 5.e+10]*u.cm, **kwargs): """ Check extracted loop to make sure it fits given criteria. Return True if it passes. Parameters ---------- streamline : yt streamline object close_threshold : `float` percentage of domain width allowed between loop endpoints loop_length_range : `~astropy.Quantity` minimum and maximum allowed loop lengths (in centimeters) """ streamline = streamline[np.all(streamline != 0.0, axis=1)] loop_length = np.sum(np.linalg.norm(np.diff(streamline, axis=0), axis=1)) if np.fabs(streamline[0, 2] - streamline[-1, 2]) > close_threshold*self.extrapolated_3d_field.domain_width[2]: return False elif loop_length > loop_length_range[1].to(u.cm).value or loop_length < loop_length_range[0].to(u.cm).value: return False else: return True
def flatten_emissivities(channel, emission_model): """ Compute product between wavelength response and emissivity for all ions """ flattened_emissivities = [] for ion in emission_model: wavelength, emissivity = emission_model.get_emissivity(ion) if wavelength is None or emissivity is None: flattened_emissivities.append(None) continue interpolated_response = splev(wavelength.value, channel['wavelength_response_spline'], ext=1) em_summed = np.dot(emissivity.value, interpolated_response) unit = emissivity.unit*u.count/u.photon*u.steradian/u.pixel*u.cm**2 flattened_emissivities.append(u.Quantity(em_summed, unit)) return flattened_emissivities
def restore(cls, savefile): """ Restore the emission model from a JSON representation. """ with open(savefile, 'r') as f: restore_dict = json.load(f) temperature = u.Quantity(restore_dict['temperature'], restore_dict['temperature_unit']) density = u.Quantity(restore_dict['density'], restore_dict['density_unit']) ion_list = [Ion(ion, temperature, **ds) for ion, ds in zip(restore_dict['ion_list'], restore_dict['dset_names'])] emission_model = cls(density, *ion_list) if 'emissivity_savefile' in restore_dict: emission_model.emissivity_savefile = restore_dict['emissivity_savefile'] if 'ionization_fraction_savefile' in restore_dict: emission_model.ionization_fraction_savefile = restore_dict['ionization_fraction_savefile'] return emission_model
def non_equilibrium_ionization(self, time: u.s, temperature: u.K, density: u.cm**(-3), rate_matrix=None, initial_condition=None): """ Compute the ionization fraction in non-equilibrium for a given temperature and density timeseries. """ if rate_matrix is None: rate_matrix = self._rate_matrix() if initial_condition is None: initial_condition = self.equilibrium_ionization(rate_matrix=rate_matrix) interpolate_indices = [np.abs(self.temperature - t).argmin() for t in temperature] y = np.zeros(time.shape + (self.atomic_number+1,)) y[0, :] = initial_condition[interpolate_indices[0], :] identity = u.Quantity(np.eye(self.atomic_number + 1)) for i in range(1, time.shape[0]): dt = time[i] - time[i-1] term1 = identity - density[i]*dt/2.*rate_matrix[interpolate_indices[i], :, :] term2 = identity + density[i-1]*dt/2.*rate_matrix[interpolate_indices[i-1], :, :] y[i, :] = np.linalg.inv(term1) @ term2 @ y[i-1, :] y[i, :] = np.fabs(y[i, :]) y[i, :] /= y[i, :].sum() return u.Quantity(y)
def proton_collision_rate(self): """ Calculates the collision rate for de-exciting and exciting collisions for protons """ # Create scaled temperature--these are not stored in the file bt_t = np.vectorize(np.linspace, excluded=[0, 1], otypes='O')(0, 1, [ups.shape[0] for ups in self._psplups['bt_rate']]) # Get excitation rates directly from scaled data energy_ratio = np.outer(const.k_B.cgs*self.temperature, 1.0/self._psplups['delta_energy'].to(u.erg)) ex_rate = np.array(list(map(self.burgess_tully_descale, bt_t, self._psplups['bt_rate'], energy_ratio.T, self._psplups['bt_c'], self._psplups['bt_type']))) ex_rate = u.Quantity(np.where(ex_rate > 0., ex_rate, 0.), u.cm**3/u.s).T # Calculation de-excitation rates from excitation rate omega_upper = 2.*self._elvlc['J'][self._psplups['upper_level'] - 1] + 1. omega_lower = 2.*self._elvlc['J'][self._psplups['lower_level'] - 1] + 1. dex_rate = (omega_lower/omega_upper)*ex_rate*np.exp(1./energy_ratio) return dex_rate, ex_rate
def _set_time_quantity(self, key, new_time): """ Save a variable with units of time (e.g. t_E, t_star, t_eff). If units are not given, assume days. """ if isinstance(new_time, u.Quantity): self.parameters[key] = new_time else: self.parameters[key] = new_time * u.day
def _check_time_quantity(self, key): if not isinstance(self.parameters[key], u.Quantity): self._set_time_quantity(key, self.parameters[key])
def alpha(self, new_alpha): if isinstance(new_alpha, u.Quantity): self.parameters['alpha'] = new_alpha else: self.parameters['alpha'] = new_alpha * u.deg
def total_mass(self, new_mass): if not isinstance(new_mass, u.Quantity): new_mass *= u.solMass elif new_mass.unit.physical_type == 'dimensionless': new_mass *= u.solMass elif new_mass.unit.physical_type != 'mass': msg = 'wrong physical_type of new total_mass: {:}' raise ValueError(msg.format(new_mass.unit.physical_type)) self._total_mass = new_mass self._last_mass_set = 'total_mass'
def mass(self): """ *astropy.Quantity* The mass of a point lens --> total mass. An astropy.Quantity with mass units. May be set as a float (in which case solMass is assumed). """ if self._epsilon.size > 1: raise TypeError( 'mass can only be defined for a point lens. use total_mass' + 'for multiple bodies') else: return self.total_mass
def mass_1(self): """ *astropy.Quantity* The mass of the primary. Defined as total_mass * epsilon[0]. An *astropy.Quantity* with mass units. If set as a *float*, units are assumed to be solMass. """ return self.total_mass * self._epsilon[0]
def mass_2(self): """ *astropy.Quantity* The mass of the secondary. Defined as total_mass * epsilon[1]. An *astropy.Quantity* with mass units. If set as a *float*, units are assumed to be solMass. Note that if total_mass is defined before mass_2, and there is no epsilon corresponding to mass_2, mass_2 is added to the total_mass. """ return self.total_mass * self._epsilon[1]
def mass_3(self): """ *astropy.Quantity* The mass of the tertiary. Defined as total_mass * epsilon[2]. An *astropy.Quantity* with mass units. If set as a *float*, units are assumed to be solMass. Note that if total_mass is defined before mass_3, and there is no epsilon corresponding to mass_3, mass_3 is added to the total_mass. """ return self.total_mass * self._epsilon[2]
def distance(self): """ *astropy.Quantity* The distance to the lens. May be set as a *float*. If no unit is given, the value is assumed to be kpc. """ return self._distance
def distance(self, new_distance): if not isinstance(new_distance, u.Quantity): self._distance = new_distance * 1000. * u.pc else: if new_distance.unit.physical_type != 'distance': TypeError('Wrong type of new_distance!') if (new_distance.unit == "pc") or (new_distance.unit == "kpc"): self._distance = new_distance else: raise u.UnitsError( 'Allowed units for Lens distance are "pc" or "kpc"')
def pi_L(self): """ *astropy.Quantity* The parallax to the lens in milliarcseconds. May be set as a *float*, in which case units are assumed to be milliarcseconds. """ return self._distance.to(u.mas, equivalencies=u.parallax())
def a_proj(self): """ *astropy.Quantity* Projected separation between the components of the lens in AU. An *astropy.Quantity* with distance units. If set as *float* (without units), AU is assumed. """ raise NotImplementedError('a_proj is not used, e.g. to set s') return self._a_proj
def a_proj(self, new_a_proj): raise NotImplementedError('a_proj is not used, e.g. to set s') if not isinstance(new_distance, u.Quantity): new_a_proj = new_a_proj * u.au self._a_proj = new_a_proj
def mu_rel(self): """ *astropy.Quantity* Relative proper motion between the source and lens stars. If set as a *float*, units are assumed to be mas/yr. """ return self._mu_rel
def mu_rel(self, value): if isinstance(value, u.Quantity): self._mu_rel = value else: self._mu_rel = value * u.mas / u.yr
def t_E(self): """ *astropy.Quantity* The Einstein crossing time (in days). If set as a *float*, assumes units are in days. """ try: t_E = self.theta_E/self.mu_rel return t_E.to(u.day) except Exception: return None
def pi_rel(self): """ *astropy.Quantity*, read-only The source-lens relative parallax in milliarcseconds. """ return self.lens.pi_L.to(u.mas) - self.source.pi_S.to(u.mas)
def theta_E(self): """ *astropy.Quantity*, read-only The angular Einstein Radius in milliarcseconds. """ kappa = (4. * G / (c**2 * au)).to( u.mas/u.Msun, equivalencies=u.dimensionless_angles()) return np.sqrt( kappa * self.lens.total_mass.to(u.solMass) * self.pi_rel.to(u.mas))
def r_E(self): """ *astropy.Quantity*, read-only The physical size of the Einstein Radius in the Lens plane (in AU). """ return (self.lens.distance * self.theta_E.to( '', equivalencies=u.dimensionless_angles())).to(u.au)
def r_E_tilde(self): """ *astropy.Quantity*, read-only The physical size of the Einstein Radius projected onto the Observer plane (in AU). """ return self.r_E * self.source.distance / ( self.source.distance - self.lens.distance)
def distance(self, new_distance): if new_distance is None: self._distance = new_distance else: if not isinstance(new_distance, u.Quantity): self._distance = new_distance * 1000. * u.pc else: if (new_distance.unit == "pc") or (new_distance.unit == "kpc"): self._distance = new_distance else: raise u.UnitsError( 'Allowed units for Source distance are "pc" or "kpc"')
def pi_S(self): """ *astropy.Quantity* The parallax to the source in millarcseconds. May be set as a *float*. If no units are specified, assumes milliarcseconds (*u.mas*). """ return self._distance.to(u.mas, equivalencies=u.parallax())
def pi_S(self, new_value): if new_value is None: pass else: if not isinstance(new_value, u.Quantity): new_value = new_value * u.mas self._distance = new_value.to(u.pc, equivalencies=u.parallax())
def angular_radius(self): """ *astropy.Quantity* Angular radius of the source. May be set as a *float*. If units are not specified, assumed to be microarcseconds (*u.mas*). """ return self._angular_radius
def angular_radius(self, new_value): if not isinstance(new_value, u.Quantity) and new_value is not None: new_value = new_value * u.uas self._angular_radius = new_value
def coord_to_ID(self, coord, tol=0.5*u.arcsec, closest=True, **kwargs): """ Convert an input coord to an ID if matched within a given tolerance. If multiple sources are identified, return the closest unless closest=False Parameters ---------- coord : str or tuple or SkyCoord See linetools.utils.radec_to_coord Single coordinate tol : Quantity Angle closest : bool, optional If False, raise an error if multiple sources are within tol Returns ------- ID : int ID of the closest source to the input coord within the given tolerance """ # Catalog ids = self.radial_search(coord, tol, **kwargs) if len(ids) == 0: warnings.warn("No sources found at your coordinate within tol={:g}. Returning None".format(tol)) return None, None elif len(ids) > 1: if closest: warnings.warn("Found multiple sources in the catalog. Taking the closest one") else: raise IOError("Multiple sources within tol={:g}. Refine".format(tol)) # Finish ID = ids[0] return ID
def pairs(self, sep, dv): """ Generate a pair catalog Parameters ---------- sep : Angle or Quantity dv : Quantity Offset in velocity. Positive for projected pairs (i.e. dz > input value) Returns ------- """ # Checks if not isinstance(sep, (Angle, Quantity)): raise IOError("Input radius must be an Angle type, e.g. 10.*u.arcsec") if not isinstance(dv, (Quantity)): raise IOError("Input velocity must be a quantity, e.g. u.km/u.s") # Match idx, d2d, d3d = match_coordinates_sky(self.coords, self.coords, nthneighbor=2) close = d2d < sep # Cut on redshift if dv > 0.: # Desire projected pairs zem1 = self.cat['zem'][close] zem2 = self.cat['zem'][idx[close]] dv12 = ltu.dv_from_z(zem1,zem2) gdz = np.abs(dv12) > dv # f/g and b/g izfg = dv12[gdz] < 0*u.km/u.s ID_fg = self.cat[self.idkey][close][gdz][izfg] ID_bg = self.cat[self.idkey][idx[close]][gdz][izfg] else: pdb.set_trace() # Reload return ID_fg, ID_bg