我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用astropy.units.s()。
def density(self, density: u.kg/u.m**3): """Sets the simulation's density profile to the specified array. Other arrays which depend on the density values, such as the kinetic pressure, are then redefined automatically. Parameters ---------- density : ndarray Array of density values. Shape and size must be equal to those of the simulation grid. Must have units of density. """ assert density.shape == self.domain_shape, """ Specified density array shape {} does not match simulation grid {}. """.format(density.shape, self.domain_shape) self._density = density # Momentum
def momentum(self, momentum: u.kg/(u.m**2 * u.s)): """Sets the simulation's momentum profile to the specified array. Other arrays which depend on the velocity values, such as the kinetic pressure, are then redefined automatically. Parameters ---------- momentum : ndarray Array of momentum vectors. Shape must be (3, x, [y, z]), where x, y and z are the dimensions of the simulation grid. Note that a full 3D vector is necessary even if the simulation has fewer than 3 dimensions. """ assert momentum.shape == (3, *self.domain_shape), """ Specified density array shape {} does not match simulation grid {}. """.format(momentum.shape, (3, *self.domain_shape)) self._momentum = momentum # Internal energy
def energy(self, energy: u.J/u.m**3): """Sets the simulation's total energy density profile to the specified array. Other arrays which depend on the energy values, such as the kinetic pressure, are then redefined automatically. Parameters ---------- energy : ndarray Array of energy values. Shape must be (x, y, z), where x, y, and z are the grid sizes of the simulation in the x, y, and z dimensions. Must have units of energy. """ assert energy.shape == self.domain_shape, """ Specified density array shape {} does not match simulation grid {}. """.format(energy.shape, self.domain_shape) self._energy = energy # Magnetic field
def magnetic_field(self, magnetic_field: u.Tesla): """ Sets the simulation's magnetic field profile to the specified array. Other arrays which depend on the magnetic field, such as the magnetic pressure, are then redefined automatically. Parameters ---------- magnetic_field : ndarray Array of magnetic field values. Shape must be (3, x, [y, z]), where x, y, and z are the grid sizes of the simulation in the x, y, and z dimensions. Note that a full 3D vector is necessary even if the simulation has fewer than 3 dimensions. """ assert magnetic_field.shape == (3, *self.domain_shape), """ Specified density array shape {} does not match simulation grid {}. """.format(magnetic_field.shape, (3, *self.domain_shape)) self._magnetic_field = magnetic_field
def __init__(self, plasma): """ """ self.dt = 1e-4 * u.s self.current_iteration = 0 self.current_time = 0 * u.s self.plasma = plasma # Domain size # self.grid_size = grid_size # Physical parameters # self.gamma = gamma grids = (self.plasma.x.si, self.plasma.y.si, self.plasma.z.si) ranges = [grid for grid in grids if len(grid) > 1] stepsize = [range[1] - range[0] for range in ranges] * grids[0].unit self.solver = Solver(stepsize) # Collect equations into a nice easy-to-use list self.equations = [self._ddt_density, self._ddt_momentum, self._ddt_energy, self._ddt_magfield]
def direct_ionization_rate(self): """ Calculate direct ionization rate in cm3/s Needs an equation reference or explanation """ xgl, wgl = np.polynomial.laguerre.laggauss(12) kBT = const.k_B.cgs*self.temperature energy = np.outer(xgl, kBT)*kBT.unit + self.ip cross_section = self.direct_ionization_cross_section(energy) if cross_section is None: return None term1 = np.sqrt(8./np.pi/const.m_e.cgs)*np.sqrt(kBT)*np.exp(-self.ip/kBT) term2 = ((wgl*xgl)[:,np.newaxis]*cross_section).sum(axis=0) term3 = (wgl[:,np.newaxis]*cross_section).sum(axis=0)*self.ip/kBT return term1*(term2 + term3)
def ionization_rate(self): """ Total ionization rate. Includes contributions from both direct ionization and excitation-autoionization See Also -------- direct_ionization_rate excitation_autoionization_rate """ di_rate = self.direct_ionization_rate() di_rate = np.zeros(self.temperature.shape)*u.cm**3/u.s if di_rate is None else di_rate ea_rate = self.excitation_autoionization_rate() ea_rate = np.zeros(self.temperature.shape)*u.cm**3/u.s if ea_rate is None else ea_rate return di_rate + ea_rate
def recombination_rate(self): """ Total recombination rate. Includes contributions from dielectronic recombination and radiative recombination. See Also -------- radiative_recombination_rate dielectronic_recombination_rate """ rr_rate = self.radiative_recombination_rate() rr_rate = np.zeros(self.temperature.shape)*u.cm**3/u.s if rr_rate is None else rr_rate dr_rate = self.dielectronic_recombination_rate() dr_rate = np.zeros(self.temperature.shape)*u.cm**3/u.s if dr_rate is None else dr_rate return rr_rate + dr_rate
def GetFluxRatio(sptlist, Tsec, xgrid): """ Returns the flux ratio between the secondary star of temperature Tsec and the (possibly multiple) primary star(s) given in the 'sptlist' list (given as spectral types) xgrid is a np.ndarray containing the x-coordinates to find the flux ratio at (in nm) """ prim_flux = np.zeros(xgrid.size) # Determine the flux from the primary star(s) for spt in sptlist: end = search("[0-9]", spt).end() T = MS.Interpolate(MS.Temperature, spt[:end]) R = MS.Interpolate(MS.Radius, spt[:end]) prim_flux += Planck(xgrid * units.nm.to(units.cm), T) * R ** 2 # Determine the secondary star flux s_spt = MS.GetSpectralType(MS.Temperature, Tsec) R = MS.Interpolate(MS.Radius, s_spt) sec_flux = Planck(xgrid * units.nm.to(units.cm), Tsec) * R ** 2 return sec_flux / prim_flux
def integral(x, y, I, k=10): """ Integrate y = f(x) for x = 0 to a such that the integral = I I can be an array. Returns the values a that are found. """ I = np.atleast_1d(I) f = UnivariateSpline(x, y, s=k) # Integrate as a function of x F = f.antiderivative() Y = F(x) a = [] for intval in I: F2 = UnivariateSpline(x, Y/Y[-1] - intval, s=0) a.append(F2.roots()) return np.hstack(a)
def get_max_separation(p_spt, s_temp, v, d=1.0 * u.parsec): """ Get the maximum separation for a binary candidate :param p_spt: The spectral type of the primary star :param s_temp: The temperature of the companion :param v: The velocity, in km/s, of the companion :param d: The distance, in parsec, to the system :return: The maximum primary-->secondary separation, in arcseconds """ # Convert the companion temperature and primary spectral type to mass from kglib.spectral_type import Mamajek_Table MS = SpectralTypeRelations.MainSequence() MT = Mamajek_Table.MamajekTable() teff2mass = MT.get_interpolator('Teff', 'Msun') M1 = MS.Interpolate('mass', p_spt) M2 = teff2mass(s_temp) Mt = (M1 + M2) * u.M_sun # Compute the maximum separation G = constants.G a_max = (G * Mt / v ** 2) alpha_max = (a_max / d).to(u.arcsecond, equivalencies=u.dimensionless_angles()) return alpha_max
def plot_profile(wave, flux, line, name): # define the velocity scale [AA / s] vel_aas = (wave - line*(1+zem[i])) / (line*(1+zem[i])) * c # convert to [km / s] vel_kms = vel_aas.to('km/s') # define parameters for the x and y axes ax.set_xlim(xmin, xmax) ax.xaxis.set_minor_locator(minorLocator) ax.tick_params(axis='x', labelsize=xls) ax.set_ylim(0., 1.5) # make the plot ax.plot(vel_kms, flux) # include the name of the line plt.text(xmin+0.03*(xmax-xmin), 0.15, name) # mark the approximate centroid velocity plt.axvline(x=vcen[i], ymin=0., ymax = 1.5, linewidth=1, color='k', linestyle='dotted') plt.axvline(x=vcen[i]+30., ymin=0., ymax = 1.5, linewidth=0.5, color='k') plt.axvline(x=vcen[i]-30., ymin=0., ymax = 1.5, linewidth=0.5, color='k') # label other lines for clarity for k in range(0, len(lines)): vel_off_aas = (lines[k] - line) / line * c vel_off_kms = vel_off_aas.to('km/s') / (u.km / u.s) plt.axvline(x=vcen[i]+vel_off_kms, ymin=0., ymax = 1.5, linewidth=1, color='k', linestyle='dotted') # define the data directory
def plot_profile(wave, flux, line, name): # define the velocity scale [AA / s] vel_aas = (wave - line*(1+zem[i])) / (line*(1+zem[i])) * c # convert to [km / s] vel_kms = vel_aas.to('km/s') # define parameters for the x and y axes ax.set_xlim(xmin, xmax) ax.xaxis.set_minor_locator(minorLocator) ax.tick_params(axis='x', labelsize=xls) ax.set_ylim(0., 3.) # make the plot ax.plot(vel_kms, np.log(1/flux)) # include the name of the line plt.text(xmin+0.03*(xmax-xmin), 2.3, name) # mark the approximate centroid velocity plt.axvline(x=vcen[i], ymin=0., ymax = 1.5, linewidth=1, color='k', linestyle='dotted') plt.axvline(x=vcen[i]+30., ymin=0., ymax = 1.5, linewidth=0.5, color='k') plt.axvline(x=vcen[i]-30., ymin=0., ymax = 1.5, linewidth=0.5, color='k') # label other lines for clarity for k in range(0, len(lines)): vel_off_aas = (lines[k] - line) / line * c vel_off_kms = vel_off_aas.to('km/s') / (u.km / u.s) plt.axvline(x=vcen[i]+vel_off_kms, ymin=0., ymax = 1.5, linewidth=1, color='k', linestyle='dotted') # define the data directory
def test_circular_integration_1step(self): """Compare total number and integrated column density for circle. """ from ..core import CircularAperture Nobs = 6.41756750e26 parent = 1.4e4 * u.km daughter = 1.7e5 * u.km Q = 5.8e23 / u.s v = 1 * u.km / u.s aper = CircularAperture(3300 * u.km) coma = Haser(Q, v, parent, daughter) N1 = coma.total_number(aper) N2 = coma._integrate_column_density(aper) assert np.isclose(N1, N2)
def __init__(self, Q, species): """Parameters ---------- Q : `Astropy.units` quantity or iterable, mandatory production rate usually in units of `u.molecule / u.s` species : dictionary or list of dictionaries, mandatory defines gas velocity, lifetimes, disassociative lifetimes Returns ------- Vectorial instance Examples -------- TBD not yet implemented """ pass
def pds_ferret(targetid, bib=None): """Use the Small Bodies Data Ferret (http://sbntools.psi.edu/ferret/) at the Planetary Data System's Small Bodies Node to query for information on a specific small body in the PDS. Parameters ---------- targetid : str, mandatory target identifier bib : SBPy Bibliography instance, optional, default None Bibliography instance that will be populated Returns ------- data : dict A hierarchical data object Examples -------- >>> from sbpy.data import pds_ferret >>> data = pds_ferret('ceres') # doctest: +SKIP not yet implemented """
def load_results(self, loop): """ Load EBTEL output for a given loop object. Parameters ---------- loop : `synthesizAR.Loop` object """ # load text N_s = len(loop.field_aligned_coordinate) _tmp = np.loadtxt(loop.hydro_configuration['output_filename']) # reshape into a 1D loop structure with units time = _tmp[:, 0]*u.s electron_temperature = np.outer(_tmp[:, 1], np.ones(N_s))*u.K ion_temperature = np.outer(_tmp[:, 2], np.ones(N_s))*u.K density = np.outer(_tmp[:, 3], np.ones(N_s))*(u.cm**(-3)) velocity = np.outer(_tmp[:, -2], np.ones(N_s))*u.cm/u.s # flip sign of velocity at apex i_mirror = np.where(np.diff(loop.coordinates.value[:, 2]) > 0)[0][-1] + 2 velocity[:, i_mirror:] = -velocity[:, i_mirror:] return time, electron_temperature, ion_temperature, density, velocity
def non_equilibrium_ionization(self, time: u.s, temperature: u.K, density: u.cm**(-3), rate_matrix=None, initial_condition=None): """ Compute the ionization fraction in non-equilibrium for a given temperature and density timeseries. """ if rate_matrix is None: rate_matrix = self._rate_matrix() if initial_condition is None: initial_condition = self.equilibrium_ionization(rate_matrix=rate_matrix) interpolate_indices = [np.abs(self.temperature - t).argmin() for t in temperature] y = np.zeros(time.shape + (self.atomic_number+1,)) y[0, :] = initial_condition[interpolate_indices[0], :] identity = u.Quantity(np.eye(self.atomic_number + 1)) for i in range(1, time.shape[0]): dt = time[i] - time[i-1] term1 = identity - density[i]*dt/2.*rate_matrix[interpolate_indices[i], :, :] term2 = identity + density[i-1]*dt/2.*rate_matrix[interpolate_indices[i-1], :, :] y[i, :] = np.linalg.inv(term1) @ term2 @ y[i-1, :] y[i, :] = np.fabs(y[i, :]) y[i, :] /= y[i, :].sum() return u.Quantity(y)
def proton_collision_rate(self): """ Calculates the collision rate for de-exciting and exciting collisions for protons """ # Create scaled temperature--these are not stored in the file bt_t = np.vectorize(np.linspace, excluded=[0, 1], otypes='O')(0, 1, [ups.shape[0] for ups in self._psplups['bt_rate']]) # Get excitation rates directly from scaled data energy_ratio = np.outer(const.k_B.cgs*self.temperature, 1.0/self._psplups['delta_energy'].to(u.erg)) ex_rate = np.array(list(map(self.burgess_tully_descale, bt_t, self._psplups['bt_rate'], energy_ratio.T, self._psplups['bt_c'], self._psplups['bt_type']))) ex_rate = u.Quantity(np.where(ex_rate > 0., ex_rate, 0.), u.cm**3/u.s).T # Calculation de-excitation rates from excitation rate omega_upper = 2.*self._elvlc['J'][self._psplups['upper_level'] - 1] + 1. omega_lower = 2.*self._elvlc['J'][self._psplups['lower_level'] - 1] + 1. dex_rate = (omega_lower/omega_upper)*ex_rate*np.exp(1./energy_ratio) return dex_rate, ex_rate
def __init__(self, domain_x, domain_y, domain_z, gamma=5/3): # Define domain sizes self.x = domain_x self.y = domain_y self.z = domain_z x, y, z = self.x.si.value, self.y.si.value, self.z.si.value self.grid = np.meshgrid(x, y, z, indexing='ij') * u.m self.grid = np.squeeze(self.grid) self.domain_shape = [] for length in (len(x), len(y), len(z)): if length > 1: self.domain_shape.append(length) self.domain_shape = tuple(self.domain_shape) print(self.domain_shape) self.gamma = gamma # Initiate core plasma variables self._density = np.zeros(self.domain_shape) * u.kg / u.m**3 self._momentum = np.zeros((3, *self.domain_shape)) * u.kg \ / (u.m**2 * u.s) self._energy = np.zeros(self.domain_shape) * u.J / u.m**3 self._magnetic_field = np.zeros((3, *self.domain_shape)) * u.T # Collect core variables into a list for usefulness self.core_variables # Connect a simulation object for simulating self.simulation_physics = MHDSimulation(self)
def pressure(self): """Sets the simulation's kinetic pressure profile to the specified array. The kinetic pressure is defined as: .. math:: p = (\\gamma - 1) (e_0 - \\frac{\\rho\\textbf{v}^2}{2}) """ v = self.velocity return (self.gamma - 1) \ * (self.energy - ((self.density * dot(v, v)) / 2))
def simulate(self, max_its=np.inf, max_time=np.inf*u.s): """Simulates the plasma as set up, either for the given number of iterations or until the simulation reaches the given time. Parameters ---------- max_its : int Tells the simulation to run for a set number of iterations. max_time : astropy.units.Quantity Maximum total (in-simulation) time to allow the simulation to run. Must have units of time. Examples -------- >>> # Run a simulation for exactly one thousand iterations. >>> myplasma.simulate(max_time=1000) >>> # Run a simulation for up to half an hour of simulation time. >>> myplasma.simulate(max_time=30*u.minute) """ if np.isinf(max_its) and np.isinf(max_time.value): raise ValueError("Either max_time or max_its must be set.") physics = self.simulation_physics dt = physics.dt if np.isinf(max_time): pb = ProgressBar(max_its) else: pb = ProgressBar(int(max_time / dt)) with pb as bar: while (physics.current_iteration < max_its and physics.current_time < max_time): physics.time_stepper() bar.update()
def burgess_tully_descale(x, y, energy_ratio, c, scaling_type): """ Convert scaled Burgess-Tully parameters to physical quantities. For more details see [1]_. References ---------- .. [1] Burgess, A. and Tully, J. A., 1992, A&A, `254, 436 <http://adsabs.harvard.edu/abs/1992A%26A...254..436B>`_ """ nots = splrep(x, y, s=0) if scaling_type == 1: x_new = 1.0 - np.log(c)/np.log(energy_ratio + c) upsilon = splev(x_new, nots, der=0)*np.log(energy_ratio + np.e) elif scaling_type == 2: x_new = energy_ratio/(energy_ratio + c) upsilon = splev(x_new, nots, der=0) elif scaling_type == 3: x_new = energy_ratio/(energy_ratio + c) upsilon = splev(x_new, nots, der=0)/(energy_ratio + 1.0) elif scaling_type == 4: x_new = 1.0 - np.log(c)/np.log(energy_ratio + c) upsilon = splev(x_new, nots, der=0)*np.log(energy_ratio + c) elif scaling_type == 5: # dielectronic x_new = energy_ratio/(energy_ratio + c) upsilon = splev(x_new, nots, der=0)/energy_ratio elif scaling_type == 6: # protons x_new = energy_ratio/(energy_ratio + c) upsilon = 10**splev(x_new, nots, der=0) else: raise ValueError('Unrecognized BT92 scaling option.') return upsilon
def test_create_ion_with_wrong_units_raises_unit_conversion_error(): with pytest.raises(u.UnitsError): fiasco.Ion('fe_5', temperature.value*u.s)
def preprocessor(self, table, line, index): line = line.strip().split() if index == 0: filetype = int(line[0]) table.append([filetype]) if filetype == 1: self.dtypes = [int,float,float,float,float] self.units = [None,(u.cm**3)/u.s,None,u.K,u.K] self.headings = ['fit_type', 'A_fit', 'B_fit', 'T0_fit', 'T1_fit'] self.descriptions = ['fit type','A fit parameter','B fit parameter', 'T0 fit parameter','T1 fit parameter'] elif filetype == 2: self.dtypes = [int,float,float,float,float,float,float] self.units = [None,(u.cm**3)/u.s,None,u.K,u.K,None,u.K] self.headings = ['fit_type', 'A_fit', 'B_fit', 'T0_fit', 'T1_fit', 'C_fit', 'T2_fit'] self.descriptions = ['fit type','A fit parameter','B fit parameter', 'T0 fit parameter','T1 fit parameter','C fit parameter', 'T2 fit parameter'] elif filetype == 3: self.dtypes = [int,float,float] self.units = [None,(u.cm**3)/u.s,None] self.headings = ['fit_type', 'A_fit', 'eta_fit'] self.descriptions = ['fit type','A rad fit parameter','eta fit parameter'] else: raise ValueError('Unrecognized .rrparams filetype {}'.format(filetype)) else: if table[0][0] == 1 or table[0][0] == 2: table[0] += line[3:] else: table[0] += line[2:]
def preprocessor(self, table, line, index): line = line.strip().split() if index == 0: self._drparams_filetype = int(line[0]) if self._drparams_filetype == 1: # Badnell type self.dtypes = [int,float,float] self.units = [None,u.K,(u.cm**3)/u.s*(u.K**(3/2))] self.headings = ['fit_type', 'E_fit', 'c_fit'] self.descriptions = ['fit type', 'E fit parameter','c fit parameter'] elif self._drparams_filetype == 2: # Shull type self.dtypes = [int,float,float,float,float] self.units = [None,(u.cm**3)/u.s*(u.K**(3/2)),u.dimensionless_unscaled,u.K,u.K] self.headings = ['fit_type', 'A_fit', 'B_fit', 'T0_fit', 'T1_fit'] self.descriptions = ['fit type','A fit coefficient','B fit coefficient', 'T0 fit coefficient','T1 fit coefficient'] else: raise ValueError('Unrecognized drparams filetype {}'.format(self._drparams_filetype)) else: if self._drparams_filetype == 1: tmp = np.array(line[2:],dtype=float) if index%2 == 0: tmp_col = table[-1] for i in range(tmp.shape[0]): table.append([self._drparams_filetype,tmp_col[i],tmp[i]]) del table[0] else: table.append(tmp) else: table.append([self._drparams_filetype]+line[2:])
def split_by_component(df): df['prim_comp'] = df.Comp.map(lambda s: s[0]) df['sec_comp'] = df.Comp.map(lambda s: s[-1]) comps = pd.concat((df[['prim_comp', 'Sp1']], df[['sec_comp', 'Sp2']])) prim = comps.loc[comps.prim_comp.notnull()].rename(columns={'Sp1': 'SpT', 'prim_comp': 'comp'}) sec = comps.loc[comps.sec_comp.notnull()].rename(columns={'Sp2': 'SpT', 'sec_comp': 'comp'}) return pd.concat((prim, sec))[['comp', 'SpT']].drop_duplicates(subset='comp')
def safe_convert(s, default=0): """ Converts something to a float. If an error occurs, returns the default value. """ try: v = float(s) except ValueError: v = default return v
def convert_to_hex(val, delimiter=':', force_sign=False): """ Converts a numerical value into a hexidecimal string Parameters: =========== - val: float The decimal number to convert to hex. - delimiter: string The delimiter between hours, minutes, and seconds in the output hex string. - force_sign: boolean Include the sign of the string on the output, even if positive? Usually, you will set this to False for RA values and True for DEC Returns: ======== A hexadecimal representation of the input value. """ s = np.sign(val) s_factor = 1 if s > 0 else -1 val = np.abs(val) degree = int(val) minute = int((val - degree)*60) second = (val - degree - minute/60.0)*3600. if degree == 0 and s_factor < 0: return '-00{2:s}{0:02d}{2:s}{1:.2f}'.format(minute, second, delimiter) elif force_sign or s_factor < 0: deg_str = '{:+03d}'.format(degree * s_factor) else: deg_str = '{:02d}'.format(degree * s_factor) return '{0:s}{3:s}{1:02d}{3:s}{2:.2f}'.format(deg_str, minute, second, delimiter)
def fwhm(x, y, k=10, ret_roots=False): """ Determine full-with-half-maximum of a peaked set of points, x and y. Assumes that there is only one peak present in the dataset. The function uses a spline interpolation with smoothing parameter k ('s' in scipy.interpolate.UnivariateSpline). If ret_roots=True, return the x-locations at half maximum instead of just the distance between them. """ class NoPeaksFound(Exception): pass half_max = np.max(y) / 2.0 s = UnivariateSpline(x, y - half_max, s=k) roots = s.roots() if len(roots) > 2: # Multiple peaks. Use the two that straddle the maximum value maxvel = x[np.argmax(y)] left_idx = np.argmin(maxvel - roots) right_idx = np.argmin(roots - maxvel) roots = np.array((roots[left_idx], roots[right_idx])) elif len(roots) < 2: raise NoPeaksFound("No proper peaks were found in the data set; likely " "the dataset is flat (e.g. all zeros).") if ret_roots: return roots[0], roots[1] return abs(roots[1] - roots[0])
def __call__(self, x, *args, **kwargs): """ Get the spline value and uncertainty at point(s) x. args and kwargs are passed to spline.__call__ """ x = np.atleast_1d(x) s = np.vstack([curve(x, *args, **kwargs) for curve in self._splines]) return (np.mean(s, axis=0), np.std(s, axis=0))
def plot_profile(wave, flux, line, name, fosc): # define the velocity scale [AA / s] vel_aas = (wave - line*(1+zem[i])) / (line*(1+zem[i])) * c # convert to [km / s] vel_kms = vel_aas.to('km/s') # define parameters for the x and y axes ax.set_xlim(xmin, xmax) ax.xaxis.set_minor_locator(minorLocator) ax.tick_params(axis='x', labelsize=xls) ymax = 3.e12 ax.set_ylim(0., ymax) # make the plot (using equations 5 and 8 from Savage & Sembach 1991) ax.plot(vel_kms, np.log(1/flux) / 2.654e-15 / (fosc * line)) # include the name of the line plt.text(xmin+0.03*(xmax-xmin), ymax*0.6, name) # mark the approximate centroid velocity plt.axvline(x=vcen[i], ymin=0., ymax = 1.5, linewidth=1, color='k', linestyle='dotted') plt.axvline(x=vcen[i]+30., ymin=0., ymax = 1.5, linewidth=0.5, color='k') plt.axvline(x=vcen[i]-30., ymin=0., ymax = 1.5, linewidth=0.5, color='k') # label other lines for clarity for k in range(0, len(lines)): vel_off_aas = (lines[k] - line) / line * c vel_off_kms = vel_off_aas.to('km/s') / (u.km / u.s) plt.axvline(x=vcen[i]+vel_off_kms, ymin=0., ymax = 1.5, linewidth=1, color='k', linestyle='dotted') # define the data directory
def calculate(wave, flux, line, fosc): # define the velocity scale [AA / s] vel_aas = (wave - line*(1+zem[i])) / (line*(1+zem[i])) * c # convert to [km / s] vel_kms = vel_aas.to('km/s') # label other lines for clarity tau = np.log(1/flux) for k in range(0, len(lines)): vel_off_aas = (lines[k] - line) / line * c vel_off_kms = vel_off_aas.to('km/s') / (u.km / u.s) return(tau)
def scat_plot(): f = plt.figure() # filename = 'MLP5_dap_multi_' + str(plate) + '_quicklook.pdf' mpl5_dir = os.environ['MANGADIR_MPL5'] drp = fits.open(mpl5_dir + 'drpall-v2_0_1.fits') drpdata = drp[1].data absmag = drpdata.field('nsa_elpetro_absmag') plt.xlim(-16,-24) plt.ylim(1,7) plt.scatter(absmag[:,5], absmag[:,1]-absmag[:,5], marker='.',color=['blue'], s=0.5) plt.xlabel('i-band absolute magnitude', fontsize=16) plt.ylabel('NUV - i', fontsize=16) plt.tick_params(axis='both', labelsize=14) ifu_list = drpdata.field('plateifu') for i in good_galaxies: ithname = str(i[0]) + str(i[1]) for e in range(0, len(ifu_list)): ethname = ifu_list[e] ethname = ethname.replace("-","") if ithname == ethname: plt.scatter(absmag[e, 5], absmag[e, 1] - absmag[e, 5], marker='*',color=['red']) f.savefig("scatter.pdf", bbox_inches='tight') # pp = PdfPages('scatter.pdf') # pp.savefig(plot_1) plt.close() os.system("open %s &" % 'scatter.pdf')
def luminosity(wavelength,flux,lDistcm): return const.c*u.s/u.m/(wavelength*10**-9)*flux*10**-23*(4*math.pi*lDistcm**2)/solarLum # Given two coefficients and the color will return the mass-to-light ratio # as defined by Bell and DeJong 2000
def luminosity(wavelength,flux,lDcm): return const.c*u.s/u.m/(wavelength*10**-9)*flux*10**-23*(4*math.pi*lDcm**2)/solarLum
def __init__(self, name, z, x,y): self.z = z self.y = y self.x = x self.name = name #lDcm gives the luminosity distance in centimeters #self.lDcm = cosmo.luminosity_distance(0.603)*u.Mpc.to(u.cm)/u.Mpc #self.lDcm = cosmo.luminosity_distance(self.z)*u.Mpc.to(u.cm) / u.Mpc #pixtoKpc is the conversion betweem pixels to rad to kpc based on z value #self.pixtoKpc = conv.arcsec_per_kpc_proper(self.z)*0.05/u.arcsec*u.kpc self.lDcm = cosmo.luminosity_distance(self.z)*u.Mpc.to(u.cm) / u.Mpc self.radToKpc = conv.arcsec_per_kpc_proper(self.z)*0.05/u.arcsec*u.kpc # Let's set up some stuff so that I can grab stuff from inside an excel sheet.
def color_gui(self, cmap): """ Function to change the cmap of the canvas :param cmap: string. matplotlib's color map. cmap = 'none' to gray scale again :return: """ if cmap == 'none': self.color = False self.cmap = "" else: self.color = True self.cmap = cmap self.reload_canvas()
def load_data(self): hdulist = fits.open(self.filename) print("MuseCube: Loading the cube fluxes and variances...") # import pdb; pdb.set_trace() self.cube = hdulist[1].data self.stat = hdulist[2].data # for ivar weighting ; consider creating it in init ; takes long # self.flux_over_ivar = self.cube / self.stat self.header_1 = hdulist[1].header # Necesito el header para crear una buena copia del white. self.header_0 = hdulist[0].header if self.filename_white is None: print("MuseCube: No white image given, creating one.") w_data = self.create_white(save=False) w_header_0 = copy.deepcopy(self.header_0) w_header_1 = copy.deepcopy(self.header_1) # These loops remove the third dimension from the header's keywords. This is neccesary in order to # create the white image and preserve the cube astrometry for i in w_header_0.keys(): if '3' in i: del w_header_0[i] for i in w_header_1.keys(): if '3' in i: del w_header_1[i] # prepare the header hdu = fits.HDUList() hdu_0 = fits.PrimaryHDU(header=w_header_0) hdu_1 = fits.ImageHDU(data=w_data, header=w_header_1) hdu.append(hdu_0) hdu.append(hdu_1) hdu.writeto('new_white.fits', clobber=True) self.filename_white = 'new_white.fits' print("MuseCube: `new_white.fits` image saved to disk.")
def test_column_density_small_aperture(self): """Test column density for aperture << lengthscale. Should be within 1% of ideal value. """ Q = 1e28 / u.s v = 1 * u.km / u.s rho = 10 * u.km parent = 1e4 * u.km N_avg = 2 * Haser(Q, v, parent).column_density(rho) ideal = Q / v / 2 / rho assert np.isclose(N_avg.decompose().value, ideal.decompose().value, rtol=0.01)
def test_total_number_large_aperture(self): """Test column density for aperture >> lengthscale.""" Q = 1 / u.s v = 1 * u.km / u.s rho = 1000 * u.km parent = 10 * u.km N = Haser(Q, v, parent).total_number(rho) ideal = Q * parent / v assert np.isclose(N, ideal.decompose().value) # TEST FAILS # # def test_total_number_rho_NJ78(self): # """Reproduce Newburn and Johnson 1978. # Species, N observed, Q/v (km**-1) # CN, 6.4e26, 5.8e23 # C3, 8.3e28, 9.0e23 # C2, 7.8e27, 5.9e24 # Cannot reproduce C3 quoted in paper. # """ # #Nobs = [6.41756750e26, 8.63191842e+28, 7.81278300e27] # #Nobs = [6.4e26, 4.2e27, 7.8e27] # Nobs = [6.4e26, 8.3e28, 7.8e27] # parent = [1.4e4, 0, 1.0e4] * u.km # daughter = [1.7e5, 4.6e4, 7.6e4] * u.km # Q = [5.8e23, 9.0e23, 5.9e24] / u.s # rho = 3300 * u.km # N = np.zeros(3) # for i in range(3): # coma = Haser(Q[i], 1 * u.km / u.s, parent[i], daughter[i]) # N[i] = coma.total_number(rho) # assert np.allclose(N, Nobs)
def test_circular_integration_0step(self): """Compare total number and integrated column density for circle.""" from ..core import CircularAperture Nobs = 6.41756750e26 parent = 1.4e4 * u.km Q = 5.8e23 / u.s v = 1 * u.km / u.s aper = CircularAperture(3300 * u.km) coma = Haser(Q, v, parent) N1 = coma.total_number(aper) N2 = coma._integrate_column_density(aper) assert np.isclose(N1, N2)
def test_total_number_rectangular_ap(self): """ compare with: import astropy.units as u from sbpy.imageanalysis.utils import rarray from sbpy.activity import Haser r = rarray((5000, 3300), subsample=10) * u.km parent = 1.4e4 * u.km daughter = 1.7e5 * u.km Q = 5.8e23 / u.s v = 1 * u.km / u.s coma = Haser(Q, v, parent, daughter) sigma = coma.column_density(r) print((sigma * 1 * u.km**2).decompose()) --> <Quantity 3.449607967230623e+26> This differs from the test value below by XXX """ from ..core import RectangularAperture parent = 1.4e4 * u.km daughter = 1.7e5 * u.km Q = 5.8e23 / u.s v = 1 * u.km / u.s aper = RectangularAperture([5000, 3300] * u.km) coma = Haser(Q, v, parent, daughter) N = coma.total_number(aper) assert np.isclose(N, 3.449607967230623e+26)
def __init__(self, Q, v): assert isinstance(Q, u.Quantity) assert Q.unit.is_equivalent((u.s**-1, u.mol / u.s)) self.Q = Q assert isinstance(v, u.Quantity) assert v.unit.is_equivalent(u.m / u.s) self.v = v
def __getattr__(self, field): if field in self.table.columns: return self.table[field] else: raise AttributeError ("field '{:s}' does not exist".format(field))
def calculate_counts_simple(channel, loop, *args): """ Calculate the AIA intensity using only the temperature response functions. """ response_function = (splev(np.ravel(loop.electron_temperature), channel['temperature_response_spline']) * u.count*u.cm**5/u.s/u.pixel) counts = np.reshape(np.ravel(loop.density**2)*response_function, loop.density.shape) return counts
def __init__(self, observing_time: u.s, observing_area=None): self.logger = logging.getLogger(name=type(self).__name__) self.observing_time = np.arange(observing_time[0].to(u.s).value, observing_time[1].to(u.s).value, self.cadence.value)*u.s self.observing_area = observing_area
def make_aia_animation(aia, start_time: u.s, stop_time: u.s, root_dir, figsize=None, norm=None, fontsize=14, **kwargs): """ Build animation from a series of synthesized AIA observations """ with h5py.File(aia.counts_file, 'r') as hf: reference_time = u.Quantity(hf['time'], hf['time'].attrs['units']) start_index = np.where(reference_time == start_time)[0][0] stop_index = np.where(reference_time == stop_time)[0][0] fig_format = os.path.join(root_dir, f'{aia.name}', '{}', 'map_t{:06d}.fits') fig, ims = plot_aia_channels(aia, start_time, root_dir, figsize=figsize, norm=norm, fontsize=fontsize, use_with_animation=True) def update_fig(i): for channel in aia.channels: tmp = Map(fig_format.format(channel['name'], i)) ims[channel['name']].set_array(tmp.data) fig.suptitle(r'$t={:.0f}$ {}'.format(reference_time[i].value, reference_time.unit.to_string()), fontsize=fontsize) return [ims[k] for k in ims] animator_settings = {'interval': 50, 'blit': True} animator_settings.update(kwargs.get('animator_settings', {})) animation = matplotlib.animation.FuncAnimation(fig, update_fig, frames=range(start_index, stop_index), **animator_settings) return animation