我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用astropy.io.fits.PrimaryHDU()。
def simple_generic_writer(data, file_name, **kwargs): """ Basic `Spectrum1DRef` FITS writer. """ # Create fits columns flux_col = fits.Column(name='FLUX', format='E', array=data.data) disp_col = fits.Column(name='WAVE', format='E', array=data.dispersion) uncert_col = fits.Column(name='UNCERT', format='E', array=data.uncertainty.array) mask_col = fits.Column(name='MASK', format='L', array=data.mask) cols = fits.ColDefs([flux_col, disp_col, uncert_col, mask_col]) # Create the bin table tbhdu = fits.BinTableHDU.from_columns(cols) # Create header prihdu = fits.PrimaryHDU(header=data.meta.get('header', data.wcs.to_header())) # Compose thdulist = fits.HDUList([prihdu, tbhdu]) thdulist.writeto("{}.fits".format(file_name), overwrite=True)
def write_slice(self, outfile, data, z, clobber=False): freq = z2freq(z) Dc = cosmo.comoving_distance(z).value # [Mpc] header = fits.Header() header["BUNIT"] = (self.header["BUNIT"], self.header.comments["BUNIT"]) header["Lside"] = (self.header["Lside"], self.header.comments["Lside"]) header["Nside"] = (self.header["Nside"], self.header.comments["Nside"]) header["REDSHIFT"] = (z, "redshift of this slice") header["FREQ"] = (freq, "[MHz] observed HI signal frequency") header["Dc"] = (Dc, "[cMpc] comoving distance") header["DATE"] = (datetime.now(timezone.utc).astimezone().isoformat(), "File creation date") header.add_history(" ".join(sys.argv)) hdu = fits.PrimaryHDU(data=data, header=header) try: hdu.writeto(outfile, overwrite=clobber) except TypeError: hdu.writeto(outfile, clobber=clobber) logger.info("Wrote slice to file: %s" % outfile)
def write_fits(self, outfile, oldheader=None, clobber=False): if os.path.exists(outfile) and (not clobber): raise OSError("Sky FITS already exists: %s" % outfile) if oldheader is not None: header = oldheader header.extend(self.fits_header, update=True) else: header = self.fits_header header.add_history(datetime.now().isoformat()) header.add_history(" ".join(sys.argv)) image = self.image image[~self.mask] = np.nan image *= self.factor_K2JyPixel hdu = fits.PrimaryHDU(data=image, header=header) try: hdu.writeto(outfile, overwrite=True) except TypeError: hdu.writeto(outfile, clobber=True) # old astropy versions logger.info("Wrote FITS image of sky model to file: %s" % outfile)
def writeToFits(self, dataArrs, outPrefix = None, clobber = True, output_verify = "exception"): """ Save dict of ndarray to fits file dataArrs: {index: dataArr} returned by `loadSpeImg` """ if outPrefix is None: matched = re.match('(.*)\.spe.*$', self._filename, flags = re.IGNORECASE) if matched is not None and matched.groups()[0] != '': outPrefix = matched.groups()[0] else: outPrefix = self._filename for index, dataArr in dataArrs.items(): name = "{}_x{:03}.fits".format(outPrefix, index) hdu = fits.PrimaryHDU(data = dataArr, header = self._fitshdr, ) hdu.writeto(name, clobber = clobber, output_verify = output_verify)
def combineFits(filenames,outFname): """ combine the data in multiple ExoSOFT fits files together. Used primarily for after multi-process runs. """ nFiles = len(filenames) (head0,dataALL) = loadFits(filenames[0]) for filename in filenames[1:]: (head,data) = loadFits(filename) dataALL = np.concatenate((dataALL,data)) hdu = pyfits.PrimaryHDU(dataALL) hdulist = pyfits.HDUList([hdu]) header = hdulist[0].header for key in head0: if key=='NSAMPLES': ##load in total number of samples for this combined file header['NSAMPLES'] = (int(head0['NSAMPLES'])*nFiles,head0.comments['NSAMPLES']) else: header[key] = (head0[key],head0.comments[key]) hdulist.writeto(outFname) hdulist.close() log.info("output file written to:below\n"+outFname)
def renameFits(filenameIn,filenameOut,killInput=True,overwrite=True): """ Load input into a new fits HDU, write to output name, close, rm input file. """ goodToGo = True if os.path.exists(filenameOut): if overwrite: rmFiles([filenameOut]) else: goodToGo=False log.critical("File already exists, either set overwrite==True, or there be more careful.") if goodToGo: (head,data) = loadFits(filenameIn) hdu = pyfits.PrimaryHDU(data) hdulist = pyfits.HDUList([hdu]) header = hdulist[0].header for key in head: header[key] = (head[key],head.comments[key]) hdulist.writeto(filenameOut) hdulist.close() log.info("output file written to:below\n"+filenameOut) if killInput: rmFiles([filenameIn])
def predict(self): print("Predicting validation data...") input_validation = np.zeros((1,self.ny,self.nx,1), dtype='float32') input_validation[0,:,:,0] = self.image start = time.time() out = self.model.predict(input_validation) end = time.time() print("Prediction took {0:3.2} seconds...".format(end-start)) print("Saving data...") hdu = fits.PrimaryHDU(out[0,:,:,0]) import os.path if os.path.exists(self.output): os.system('rm {0}'.format(self.output)) print('Overwriting...') hdu.writeto('{0}'.format(self.output)) # import matplotlib.pyplot as plt # plt.imshow(out[0,:,:,0]) # plt.savefig('hmi.pdf')
def write_frame(data, header, path): """ This function ... :param data: :param header: :return: """ # Create the HDU hdu = fits.PrimaryHDU(data, header) # Write the HDU to a FITS file hdu.writeto(path, clobber=True) # -----------------------------------------------------------------
def write_datacube(datacube, header, path): """ This function ... :param datacube: :param header: :param path: :return: """ # Create the HDU from the data array and the header hdu = fits.PrimaryHDU(np.array(datacube), header) # Write the HDU to a FITS file hdu.writeto(path, clobber=True) # Inform the user that the file has been created log.debug("File " + path + " created") # -----------------------------------------------------------------
def regrid_h2(projDir='/media/DATAPART/projects/GAS/testing/', region_list = ['L1688','NGC1333','B18','OrionA'], file_extension='DR1_rebase3', herDir = '/media/DATAPART/projects/GAS/otherData/herschel_ayushi/', herFile_list=['OphL1688','perseus','Tau_B18','orionA-N']): for region_i in range(len(region_list)): region = region_list[region_i] herFilename = herFile_list[region_i] herColFits = herDir+'{0}/Colden_temp/{1}_colden_masked.fits'.format(region,herFilename) nh3ImFits = projDir + '{0}/{0}_NH3_11_{1}_mom0_QA_trim.fits'.format(region,file_extension) h2_hdu = fits.open(herColFits) nh3_hdr = fits.getheader(nh3ImFits) new_h2 = FITS_tools.hcongrid.hcongrid(h2_hdu[0].data,h2_hdu[0].header,nh3_hdr) new_h2_hdu = fits.PrimaryHDU(new_h2,nh3_hdr) new_h2_hduList = fits.HDUList([new_h2_hdu]) new_h2_hduList.writeto('nh2_regridded/{0}_NH2_regrid.fits',clobber=True)
def reg_mask_ota(self): print self.reg_file box_x,box_y,box_dx,box_dy = self.parse_box_reg() for i,reg in enumerate(box_x): ota_reg_mask = self.mask_reg(self.box_x[i], self.box_y[i], self.box_dx[i], self.box_dy[i]) hdu = fits.PrimaryHDU(ota_reg_mask.astype(float)) mask_name = self.mask_name hdu.writeto(mask_name,clobber=True) iraf.unlearn(iraf.imutil.imcopy) iraf.imutil.imcopy.setParam('input',mask_name) iraf.imutil.imcopy.setParam('output',mask_name.replace('fits','pl')) iraf.imutil.imcopy.setParam('verbose','no') iraf.imutil.imcopy(mode='h') return ota_reg_mask
def write_fiberextract(self, side, ext, prefix): if not self.check_side(side): return None outname = self.build_outname(side, prefix) self.log.info('Making fiberextract image for %s' %op.basename(outname)) self.build_FE(side, ext) hdu = fits.PrimaryHDU(np.array(self.fiberextract[side], dtype='float32'), header=self.header) hdu.header.remove('BIASSEC') hdu.header.remove('TRIMSEC') hdu.header.remove('AMPSEC') hdu.header.remove('DETSEC') hdu.header.remove('CCDSEC') hdu.header['CRVAL1'] = self.wavelim[0] hdu.header['CDELT1'] = self.disp hdu.header['CD1_1'] = self.disp hdu.header['CRPIX1'] = 1 hdu.header['CCDPOS']=side hdu.header['DATASEC']='[%i:%i,%i:%i]'%(1,self.fiberextract[side].shape[0], 1,self.fiberextract[side].shape[1]) hdu.header.remove('CCDHALF') hdu.header.remove('AMPLIFIE') self.write_to_fits(hdu, outname)
def write_to_fits(hdu, outname): """Write fits out Parameters ---------- hdu : PrimaryHDU Object fits obect to write out outname : str file name to write out """ try: hdu.writeto(outname, overwrite=True) except TypeError: hdu.writeto(outname, clobber=True)
def save_fibmodel(self): ''' Save the fibers to fits file with two extensions The first is 3-d for trace, wavelength and fiber_to_fiber The second is which fibers are good and not dead ''' try: self.fibers[0].fibmodel except: self.log.warning('Trying to save fibermodel but none exist.') return None ylims = np.linspace(-1.*self.fsize, self.fsize, self.fibmodel_nbins) fibmodel = np.zeros((len(self.fibers), self.fibmodel_nbins, self.D)) for i,fiber in enumerate(self.fibers): fibmodel[i,:,:] = fiber.fibmodel s = fits.PrimaryHDU(np.array(fibmodel,dtype='float32')) t = fits.ImageHDU(np.array(ylims,dtype='float32')) hdu = fits.HDUList([s,t]) fn = op.join(self.path, 'fibermodel_%s_%s_%s_%s.fits' %(self.specid, self.ifuslot, self.ifuid, self.amp)) self.write_to_fits(hdu, fn)
def make_error_frame(image1, image2, mask1, mask2, header, outname): print("Making error image for %s" %op.basename(outname)) a,b = image1.shape new = np.zeros((a*2,b)) mas = np.zeros((a*2,b)) new[:a,:] = image1 new[a:,:] = image2 mas[:a,:] = mask1 mas[a:,:] = mask2 err = np.zeros(new.shape) for i in xrange(2*a): err[i,:] = np.where(mas[i,:]<0, mas[i,:], biweight_filter(new[i,:], 31, func=biweight_midvariance)) hdu = fits.PrimaryHDU(np.array(err, dtype='float32'), header=header) hdu.header.remove('BIASSEC') hdu.header.remove('TRIMSEC') hdu.header['DATASEC'] = '[%i:%i,%i:%i]' %(1,b,1,2*a) outname = op.join(op.dirname(outname), 'e.' + op.basename(outname)) hdu.writeto(outname, overwrite=True)
def writeSVD(self, svdoutputfits='ZAP_SVD.fits'): """Write the SVD to an individual fits file.""" check_file_exists(svdoutputfits) header = fits.Header() header['ZAPvers'] = (__version__, 'ZAP version') header['ZAPzlvl'] = (self.run_zlevel, 'ZAP zero level correction') header['ZAPclean'] = (self.run_clean, 'ZAP NaN cleaning performed for calculation') header['ZAPcftyp'] = (self._cftype, 'ZAP continuum filter type') header['ZAPcfwid'] = (self._cfwidth, 'ZAP continuum filter size') header['ZAPmask'] = (self.maskfile, 'ZAP mask used to remove sources') nseg = len(self.pranges) header['ZAPnseg'] = (nseg, 'Number of segments used for ZAP SVD') hdu = fits.HDUList([fits.PrimaryHDU(self.zlsky)]) for i in range(len(self.pranges)): hdu.append(fits.ImageHDU(self.especeval[i][0])) # write for later use hdu.writeto(svdoutputfits) logger.info('SVD file saved to %s', svdoutputfits)
def save_eor_window(self, outfile, clobber=False): header = self.header_eor_windowr() hdu = fits.PrimaryHDU(data=self.eor_window.astype(np.int16), header=header) try: hdu.writeto(outfile, overwrite=clobber) except TypeError: hdu.writeto(outfile, clobber=clobber)
def write(self, outfile=None, clobber=None): if outfile is None: outfile = self.get_outfile() if clobber is None: clobber = self.configs.clobber hdu = fits.PrimaryHDU(data=self.cube, header=self.header) try: hdu.writeto(outfile, overwrite=clobber) except TypeError: hdu.writeto(outfile, clobber=clobber) logger.info("Wrote light-cone cube to: %s" % outfile)
def write(self, outfile, clobber=False): hdu = fits.PrimaryHDU(data=self.cube, header=self.header) logger.info("Created FITS object, writing to disk ...") try: hdu.writeto(outfile, overwrite=clobber) except TypeError: hdu.writeto(outfile, clobber=clobber) logger.info("Wrote light-cone cube to: %s" % outfile)
def write(self, outfile, clobber=False): header = self.header header.extend(self.wcs.to_header(), update=True) header["DATE"] = (datetime.now(timezone.utc).astimezone().isoformat(), "File creation date") header.add_history(" ".join(sys.argv)) hdu = fits.PrimaryHDU(data=self.data, header=header) try: hdu.writeto(outfile, overwrite=clobber) except TypeError: hdu.writeto(outfile, clobber=clobber)
def write_mask(self, outfile, clobber=False): if os.path.exists(outfile) and (not clobber): raise OSError("Sky mask already exists: %s" % outfile) header = self.fits_header header.add_history(datetime.now().isoformat()) header.add_history(" ".join(sys.argv)) hdu = fits.PrimaryHDU(data=self.mask.astype(np.int16), header=header) try: hdu.writeto(outfile, overwrite=True) except TypeError: hdu.writeto(outfile, clobber=True) # old astropy versions logger.info("Wrote mask of sky model to file: %s" % outfile)
def save(self, outfile, clobber=False): """ Save the calculated 2D power spectrum as a FITS image. """ hdu = fits.PrimaryHDU(data=self.ps2d, header=self.header) try: hdu.writeto(outfile, overwrite=clobber) except TypeError: hdu.writeto(outfile, clobber=clobber) logger.info("Wrote 2D power spectrum to file: %s" % outfile)
def write_rmfimg(self, outfile, clobber=False): rmfimg = self.get_rmfimg() # merge headers header = self.hdr_matrix.copy(strip=True) header.extend(self.hdr_ebounds.copy(strip=True)) outfits = fits.PrimaryHDU(data=rmfimg, header=header) try: outfits.writeto(outfile, checksum=True, overwrite=clobber) except TypeError: outfits.writeto(outfile, checksum=True, clobber=clobber) # class RMF }}}
def _open(self, cache=False, **kwargs): if not _fits: load_lib() hdulist = _fits.open(self.request.get_file(), cache=cache, **kwargs) self._index = [] for n, hdu in zip(range(len(hdulist)), hdulist): if (isinstance(hdu, _fits.ImageHDU) or isinstance(hdu, _fits.PrimaryHDU)): # Ignore (primary) header units with no data (use '.size' # rather than '.data' to avoid actually loading the image): if hdu.size > 0: self._index.append(n) self._hdulist = hdulist
def burnInStripper(mcmcFnames,burnInLengths): """ Strip the initial burn-in off each chain. """ newFnames=[] for i in range(0,len(mcmcFnames)): filename = mcmcFnames[i] burnIn = burnInLengths[i] if os.path.exists(filename): (head,data) = loadFits(filename) ##strip burn-in and write to new fits log.debug("Before stripping burn-in, file had "+str(len(data[:,0]))+" samples") hdu = pyfits.PrimaryHDU(data[burnIn:,:]) hdulist = pyfits.HDUList([hdu]) newHead = hdulist[0].header log.debug("Before stripping burn-in, file had "+str(len(hdulist[0].data[:,0]))+" samples") for key in head: newHead[key]=(head[key],head.comments[key]) n = os.path.splitext(os.path.basename(filename)) newFname = os.path.join(os.path.dirname(mcmcFnames[0]),n[0]+'_BIstripped.fits') hdulist.writeto(newFname) log.info("output file written to:below\n"+newFname) hdulist.close() newFnames.append(newFname) log.debug("burn-in stripped file written to:\n"+newFname) return newFnames
def convert_hpx(self, comp_id, freq_id, clobber=False): """ Convert the specified HEALPix map product to HPX projected FITS image. Also add the metadata of the HPX image to the manifest. Raises ------ IOError : Output HPX image already exists and ``clobber=False`` """ from astropy.io import fits from .utils.healpix import healpix2hpx # root_dir = self.get_root_dir() metadata = self.get_product(comp_id, freq_id) infile = os.path.join(root_dir, metadata["healpix"]["path"]) outfile = os.path.splitext(infile)[0] + "_hpx.fits" if os.path.exists(outfile): if clobber: os.remove(outfile) logger.warning("Removed existing HPX image: %s" % outfile) else: raise IOError("Output HPX image already exists: %s" % outfile) # Convert HEALPix map to HPX projected FITS image logger.info("Converting HEALPix map to HPX image: %s" % infile) hpx_data, hpx_header = healpix2hpx(infile) hdu = fits.PrimaryHDU(data=hpx_data, header=hpx_header) hdu.writeto(outfile) logger.info("Converted HEALPix map to HPX image: %s" % outfile) # size = os.path.getsize(outfile) md5 = calc_md5(outfile) metadata["hpx"] = { "path": os.path.relpath(outfile, root_dir), "size": size, "md5": md5, } return (outfile, size, md5)
def write_fits_image(outfile, image, header=None, float32=False, clobber=False, checksum=False): """ Write the supplied image (together with header information) into the output FITS file. Parameters ---------- outfile : str The path/filename to the output file storing the pickled object. image : 2D `~numpy.ndarray` The image data to be written out to the FITS file. NOTE: image.shape: (nrow, ncol) <-> FITS image: (ncol, nrow) header : `~astropy.io.fits.Header` The FITS header information for this image float32 : bool, optional Whether coerce the image data (generally double/float64 data type) into single/float32 (in order to save space)? Default: False (i.e., preserve the data type unchanged) clobber : bool, optional Whether to overwrite the existing output file. Default: False checksum : bool, optional Whether to calculate the data checksum, which may cost some time? Default: False """ _create_dir(outfile) _check_existence(outfile, clobber=clobber, remove=True) hdr = fits.Header() hdr["CREATOR"] = (__name__, "File creator") hdr["DATE"] = (datetime.now(timezone.utc).astimezone().isoformat(), "File creation date") if header is not None: hdr.extend(header, update=True) if float32: image = np.asarray(image, dtype=np.float32) hdu = fits.PrimaryHDU(data=image, header=header) hdu.writeto(outfile, checksum=checksum) logger.info("Wrote image to FITS file: %s" % outfile)
def makeColdHeatMap(inpath,outpath,inCube,startwl,coldwls): cube = pyfits.open(inpath+inCube) cube_all = cube[0].data[startwl:,0:,0:] hdr_all = cube[0].header cube = pyfits.open(inpath+inCube.replace('_i','_old_i')) cube_old = cube[0].data[startwl:,0:,0:] cube = pyfits.open(inpath+inCube.replace('_i','_young_i')) cube_young = cube[0].data[startwl:,0:,0:] Fold = 100. * (0.5*cube_old + 0.5*(cube_all-cube_young)) / cube_all Fyoung = 100. * (0.5*cube_young + 0.5*(cube_all-cube_old)) / cube_all tot_all = 100.* (coldwls[len(coldwls)-1] - coldwls[0]) # Get header with appropriate WCS info im36 = pyfits.open("SKIRTinput/new3.6MJySr.fits") hdr_wcs = im36[0].header tot_old = integrateHeating(Fold,coldwls) / tot_all hdu = pyfits.PrimaryHDU(tot_old,hdr_wcs) hdu.writeto(outpath+"heatingTotColdOld.fits",clobber=True) tot_young = integrateHeating(Fyoung,coldwls) / tot_all hdu = pyfits.PrimaryHDU(tot_young,hdr_wcs) hdu.writeto(outpath+"heatingTotColdYoung.fits",clobber=True)
def regrid_h2(nh3_image,h2_image): # Edit to write out regridded image - glue won't work if files not on same grid h2fits = fits.open(h2_image) nh3_hdr = fits.getheader(nh3_image) new_h2 = FITS_tools.hcongrid.hcongrid(h2fits[0].data,h2fits[0].header,nh3_hdr) new_h2_hdu = fits.PrimaryHDU(new_h2,nh3_hdr) return new_h2_hdu
def log10_h2(h2_image): log_h2_data = np.log10(h2_image.data) log_h2_hdu = fits.PrimaryHDU(log_h2_data,h2_image.header) return log_h2_hdu
def make_obs_mask(region_list,file_ext): for region in region_list: rms_hdu = fits.open('{0}/{0}_NH3_11_{1}_mom0_QA_trim.fits'.format(region,file_ext)) # Set nan pixels to zero to create mask obs_mask = mask_obs_area(rms_hdu[0].data) new_hdu = fits.PrimaryHDU(obs_mask,rms_hdu[0].header) new_hdu.writeto('{0}/{0}_NH3_11_{1}_obsMask.fits'.format(region,file_ext),clobber=True)
def to_fits(self): primary_hdu = fits.PrimaryHDU() image_hdu = fits.ImageHDU(self.data, self.header) hdulist = fits.HDUList([primary_hdu, image_hdu]) return hdulist
def create_mock_fits(): x = np.ones((5, 5)) prihdu = fits.PrimaryHDU(x) # Single extension FITS img = fits.ImageHDU(data=x) singlehdu = fits.HDUList([prihdu, img]) singlehdu.writeto('image.fits', clobber=True)
def wraptable2fits(cat_columns, extname): tbhdu = fits_from_columns(pyfits.ColDefs(cat_columns)) hdu = pyfits.PrimaryHDU() import datetime, time now = datetime.datetime.fromtimestamp(time.time()) nowstr = now.isoformat() nowstr = nowstr[:nowstr.rfind('.')] hdu.header['DATE'] = nowstr hdu.header['ANALYSIS'] = 'NWAY matching' tbhdu.header['EXTNAME'] = extname hdulist = pyfits.HDUList([hdu, tbhdu]) return hdulist
def load_data(self): hdulist = fits.open(self.filename) print("MuseCube: Loading the cube fluxes and variances...") # import pdb; pdb.set_trace() self.cube = hdulist[1].data self.stat = hdulist[2].data # for ivar weighting ; consider creating it in init ; takes long # self.flux_over_ivar = self.cube / self.stat self.header_1 = hdulist[1].header # Necesito el header para crear una buena copia del white. self.header_0 = hdulist[0].header if self.filename_white is None: print("MuseCube: No white image given, creating one.") w_data = self.create_white(save=False) w_header_0 = copy.deepcopy(self.header_0) w_header_1 = copy.deepcopy(self.header_1) # These loops remove the third dimension from the header's keywords. This is neccesary in order to # create the white image and preserve the cube astrometry for i in w_header_0.keys(): if '3' in i: del w_header_0[i] for i in w_header_1.keys(): if '3' in i: del w_header_1[i] # prepare the header hdu = fits.HDUList() hdu_0 = fits.PrimaryHDU(header=w_header_0) hdu_1 = fits.ImageHDU(data=w_data, header=w_header_1) hdu.append(hdu_0) hdu.append(hdu_1) hdu.writeto('new_white.fits', clobber=True) self.filename_white = 'new_white.fits' print("MuseCube: `new_white.fits` image saved to disk.")
def spec_to_redmonster_format(spec, fitsname, n_id=None, mag=None): """ Function used to create a spectrum in the REDMONSTER software format :param spec: XSpectrum1D object :param mag: List containing 2 elements, the first is the keyword in the header that will contain the magnitud saved in the second element :param fitsname: Name of the fitsfile that will be created :return: """ from scipy import interpolate wave = spec.wavelength.value wave_log = np.log10(wave) n = len(wave) spec.wavelength = wave_log * u.angstrom new_wave_log = np.arange(wave_log[1], wave_log[n - 2], 0.0001) spec_rebined = spec.rebin(new_wv=new_wave_log * u.angstrom) flux = spec_rebined.flux.value f = interpolate.interp1d(wave_log, spec.sig.value) sig = f(new_wave_log) inv_sig = 1. / np.array(sig) ** 2 inv_sig = np.where(np.isinf(inv_sig), 0, inv_sig) inv_sig = np.where(np.isnan(inv_sig), 0, inv_sig) hdu1 = fits.PrimaryHDU([flux]) hdu2 = fits.ImageHDU([inv_sig]) hdu1.header['COEFF0'] = new_wave_log[0] hdu1.header['COEFF1'] = new_wave_log[1] - new_wave_log[0] if n_id != None: hdu1.header['ID'] = n_id if mag != None: hdu1.header[mag[0]] = mag[1] hdulist_new = fits.HDUList([hdu1, hdu2]) hdulist_new.writeto(fitsname, clobber=True)
def save(self, filename, clobber=False): hdulist = pf.HDUList() ext = '.oif.fits' if filename.find(ext)==-1: filename += ext hdu = pf.PrimaryHDU() hdu.header.set('EXT', 'MODEL', comment='Type of information in the HDU') hdu.header.set('DATE', strftime('%Y%m%dT%H%M%S'), comment='Creation Date') hdu.header.set('NOBJ', self.nobj, comment='Number of objects in the model') hdu.header.set('NPARAMS', self.nparams, comment='Total number of free parameters') for i, item in enumerate(self._objs): hdu.header.set('OBJ'+str(i), item.name, comment='Name of object '+str(i)) allmodes = ['vis2', 't3phi', 't3amp', 'visphi', 'visamp'] for mode in allmodes: number = [] if getattr(self.oidata, mode): number = [len(i) for i in getattr(self.oidata, "_input_"+mode) if i is not None] hdu.header.set('N'+mode, int(np.sum(number)), comment='Total number of '+mode+' measurements') hdu.header.set('F_'+mode, len(number), comment='Number of '+mode+' files') hdu.header.add_comment('Written by Guillaume SCHWORER') hdulist.append(hdu) hdulist.writeto(filename, clobber=clobber) for item in self._objs: item.save(filename, append=True, clobber=clobber) if self._hasdata: self.oidata.save(filename, append=True, clobber=clobber) return filename
def test_succeeds_func_fits_hdu(): from astropy.io import fits return fits.PrimaryHDU(np.arange(3 * 5).reshape((3, 5)))
def write(filename, data, **kwargs): from astropy.io import fits if isinstance(data, np.ndarray): data = fits.PrimaryHDU(data) return data.writeto(filename, **kwargs)
def tofits(outfilename, pixelarray, hdr = None, verbose = True): """ Takes a 2D numpy array and write it into a FITS file. If you specify a header (pyfits format, as returned by fromfits()) it will be used for the image. You can give me boolean numpy arrays, I will convert them into 8 bit integers. """ pixelarrayshape = pixelarray.shape if verbose : print "FITS export shape : (%i, %i)" % (pixelarrayshape[0], pixelarrayshape[1]) if pixelarray.dtype.name == "bool": pixelarray = np.cast["uint8"](pixelarray) if os.path.isfile(outfilename): os.remove(outfilename) if hdr == None: # then a minimal header will be created hdu = fits.PrimaryHDU(pixelarray.transpose()) else: # this if else is probably not needed but anyway ... hdu = fits.PrimaryHDU(pixelarray.transpose(), hdr) hdu.writeto(outfilename) if verbose : print "Wrote %s" % outfilename # Array manipulation
def make_fiber_image(Fe, header, outname, args, amp): print("Making Fiberextract image for %s" %op.basename(outname)) a,b = Fe.shape hdu = fits.PrimaryHDU(np.array(Fe, dtype='float32'), header=header) hdu.header.remove('BIASSEC') hdu.header.remove('TRIMSEC') hdu.header['DATASEC'] = '[%i:%i,%i:%i]' %(1,b,1,a) hdu.header['CRVAL1'] = args.wvl_dict[amp][0] hdu.header['CDELT1'] = args.disp[amp] hdu.header['CD1_1'] = args.disp[amp] hdu.header['CRPIX1'] = 1 write_to_fits(hdu, outname)
def write_spectrograph_image(self, side, ext, prefix): if not self.check_side(side): return None outname = self.build_outname(side, prefix) self.log.info('Making spectrograph image for %s' %op.basename(outname)) image = [] for i, amp in enumerate(self.side_dict[side]): F = self.load_file(amp) if F is not None: image.append(F[ext].data) else: image.append(np.zeros((self.N, self.D))) new = np.zeros((self.N*2, self.D)) new[:self.N,:] = image[0] new[self.N:,:] = image[1] hdu = fits.PrimaryHDU(np.array(new, dtype='float32'), header=self.header) hdu.header.remove('BIASSEC') hdu.header.remove('TRIMSEC') hdu.header.remove('AMPSEC') hdu.header.remove('DETSEC') hdu.header.remove('CCDSEC') hdu.header['DATASEC']='[%i:%i,%i:%i]'%(1,self.D,1,2*self.N) hdu.header['CCDPOS']=side hdu.header.remove('CCDHALF') hdu.header.remove('AMPLIFIE') self.write_to_fits(hdu, outname)
def save(self, image_list=[], spec_list=[]): ''' Save the entire amplifier include the list of fibers. This property is not used often as "amp*.pkl" is large and typically the fibers can be loaded and the other amplifier properties quickly recalculated. ''' self.log.info('Saving images/properties to %s' %self.path) fn = op.join(self.path, 'multi_%s_%s_%s_%s.fits' %(self.specid, self.ifuslot, self.ifuid, self.amp)) fits_list = [] for i,image in enumerate(image_list): if i==0: fits_list.append(fits.PrimaryHDU(np.array(getattr(self, image), dtype='float32'))) else: fits_list.append(fits.ImageHDU(np.array(getattr(self, image), dtype='float32'))) fits_list[-1].header['EXTNAME'] = image for i, spec in enumerate(spec_list): try: s = np.array([getattr(fiber, spec) for fiber in self.fibers], dtype='float32') fits_list.append(fits.ImageHDU(s)) fits_list[-1].header['EXTNAME'] = spec except AttributeError: self.log.warning('Attribute %s does not exist to save' %spec) if fits_list: fits_list[0] = self.write_header(fits_list[0]) hdu = fits.HDUList(fits_list) self.write_to_fits(hdu, fn)
def make_fiber_image(Fe, header, outname, args, amp): print("Making Fiberextract image for %s" %op.basename(outname)) a,b = Fe.shape hdu = fits.PrimaryHDU(np.array(Fe, dtype='float32'), header=header) hdu.header.remove('BIASSEC') hdu.header.remove('TRIMSEC') hdu.header['DATASEC'] = '[%i:%i,%i:%i]' %(1,b,1,a) hdu.header['CRVAL1'] = args.wvl_dict[amp][0] hdu.header['CDELT1'] = args.disp[amp] hdu.header['CD1_1'] = args.disp[amp] hdu.header['CRPIX1'] = 1 hdu.writeto(outname, overwrite=True)
def make_spectrograph_image(image1, image2, header, outname): print("Making spectrograph image for %s" %op.basename(outname)) a,b = image1.shape new = np.zeros((a*2,b)) new[:a,:] = image1 new[a:,:] = image2 hdu = fits.PrimaryHDU(np.array(new, dtype='float32'), header=header) hdu.header.remove('BIASSEC') hdu.header.remove('TRIMSEC') hdu.header['DATASEC'] = '[%i:%i,%i:%i]' %(1,b,1,2*a) hdu.writeto(outname, overwrite=True)
def make_amplifier_image(image, header, outname): print("Making amplifier image for %s" %op.basename(outname)) a,b = image.shape hdu = fits.PrimaryHDU(np.array(image, dtype='float32'), header=header) hdu.header.remove('BIASSEC') hdu.header.remove('TRIMSEC') hdu.header['DATASEC'] = '[%i:%i,%i:%i]' %(1,b,1,a) hdu.writeto(outname, overwrite=True)
def make_library_image(amp_image, header, outname, fits_list, for_bias=True): a,b = amp_image.shape overscan = [] trimsec = [] bias = re.split('[\[ \] \: \,]', header['BIASSEC'])[1:-1] biassec = [int(t)-((i+1)%2) for i,t in enumerate(bias)] trim = re.split('[\[ \] \: \,]', header['TRIMSEC'])[1:-1] trimsec = [int(t)-((i+1)%2) for i,t in enumerate(trim)] for F in fits_list: overscan.append(biweight_location(F[0].data[biassec[2]:biassec[3], biassec[0]:biassec[1]])) del F[0].data A = [] for j,hdu in enumerate(fits_list): if for_bias: A.append(biweight_filter2d(hdu[0].data[:,i], (25,5),(5,1)) - overscan[j]) else: A.append(hdu[0].data - overscan[j]) amp_image[:,i] = biweight_location(A, axis=(0,)) good = np.isfinite(amp_image[:,i]) amp_image[:,i] = np.interp(np.arange(a), np.arange(a)[good], amp_image[good,i]) for hdu in fits_list: del hdu[0].data hdu.close() hdu = fits.PrimaryHDU(np.array(amp_image[trimsec[2]:trimsec[3], trimsec[0]:trimsec[1]], dtype='float32'), header=header) hdu.header.remove('BIASSEC') hdu.header.remove('TRIMSEC') hdu.header['DATASEC'] = '[%i:%i,%i:%i]' %(1,trimsec[1]-trimsec[0],1,a) hdu.writeto(outname, overwrite=True)
def check_bias(args, amp, folder, edge=3, width=10): # Create empty lists for the left edge jump, right edge jump, and structure left_edge, right_edge, structure, overscan = [], [], [], [] # Select only the bias frames that match the input amp, e.g., "RU" sel = [i for i, v in enumerate(args.bia_list) if v.amp == amp] log = args.bia_list[sel[0]].log overscan_list = [[v.overscan_value for i, v in enumerate(args.bia_list) if v.amp == amp]] overscan = biweight_location(overscan_list) log.info('Overscan value for %s: %0.3f' % (amp, overscan)) # Loop through the bias list and measure the jump/structure big_array = np.array([v.image for v in itemgetter(*sel)(args.bia_list)]) if args.quick: func = np.median else: func = biweight_location masterbias = func(big_array, axis=(0,)) a, b = masterbias.shape hdu = fits.PrimaryHDU(np.array(masterbias, dtype='float32')) log.info('Writing masterbias_%s.fits' % (amp)) write_fits(hdu, op.join(folder, 'masterbias_%s_%s.fits' % (args.specid, amp))) left_edge = func(masterbias[:, edge:edge+width]) right_edge = func(masterbias[:, (b-width-edge):(b-edge)]) structure = func(masterbias[:, edge:(b-edge)], axis=(0,)) log.info('Left edge - Overscan, Right edge - Overscan: %0.3f, %0.3f' % (left_edge, right_edge)) return left_edge, right_edge, structure, overscan, masterbias