我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用netCDF4.Dataset()。
def test_get_schema(): '''Should identify schema from id dimension in netCDF file.''' tempdir = tempfile.gettempdir() v1_0_dim = 'station' v1_1_dim = 'feature_id' v1_0_file = os.path.join(tempdir, 'v1_0.nc') with Dataset(v1_0_file, 'w') as nc: dim = nc.createDimension(v1_0_dim, None) expected = constants.SCHEMAv1_0 returned = nwm_data.get_schema(nc) assert expected == returned os.remove(v1_0_file) v1_1_file = os.path.join(tempdir, 'v1_1.nc') with Dataset(v1_1_file, 'w') as nc: dim = nc.createDimension(v1_1_dim, None) expected = constants.SCHEMAv1_1 returned = nwm_data.get_schema(nc) assert expected == returned os.remove(v1_1_file)
def polyNetcdf(poly, nc, v): ncf = Dataset(nc) # affine transform(px width, row rotation, UL x-coord, col rotation, px height, UL y-coord) affine = Affine(0.1, 0.0, -180.0, 0.0, -0.1, 90.0) ndims = ncf[v].ndim if ndims == 2: array = ncf[v][:,:] ncf.close() stats = (extractDate(nc), zonal_stats(poly, array, affine=affine, nodata=-9999.0, stats=['median'])[0]['median']) return [stats] elif ndims == 3: # assume it's time/lat/lon #print 'N-dims:', ndims cdftime = utime(ncf['time'].units) stats = [] for i in range(ncf[v].shape[0]): array = ncf[v][i,:,:] dt = cdftime.num2date(ncf['time'][i]).strftime('%Y-%m-%d') stats.append((dt, zonal_stats(poly, array, affine=affine, nodata=-9999.0, stats=['median'])[0]['median'])) ncf.close() #print stats return stats else: print 'Don\'t know what to do with %s dimensions!' % ndims sys.exit()
def pointNetcdf(pt, nc, v): ncf = Dataset(nc) # affine transform(px width, row rotation, UL x-coord, col rotation, px height, UL y-coord) affine = Affine(0.1, 0.0, -180.0, 0.0, -0.1, 90.0) ndims = ncf[v].ndim if ndims == 2: array = ncf[v][:,:] ncf.close() stats = (extractDate(nc), point_query(pt, array, affine=affine, nodata=-9999.0)[0]) return [stats] elif ndims == 3: # assume it's time/lat/lon #print 'N-dims:', ndims cdftime = utime(ncf['time'].units) stats = [] for i in range(ncf[v].shape[0]): array = ncf[v][i,:,:] dt = cdftime.num2date(ncf['time'][i]).strftime('%Y-%m-%d') stats.append((dt, point_query(pt, array, affine=affine, nodata=-9999.0)[0])) ncf.close() #print stats return stats else: print 'Don\'t know what to do with %s dimensions!' % ndims sys.exit()
def setup_netcdf_root(root, header, N): """ Add variables, dimensions (all of size *N), and attributes from *config* to :class:`Dataset` *root* and return *root*. """ # add global attributes root.source = header.Source_of_Data root.station = header.Station_Name root.code = header.IAGA_CODE root.latitude = header.Geodetic_Latitude root.longitude = header.Geodetic_Longitude root.elevation = header.Elevation root.reported = header.Reported root.sensor_orientation = header.Sensor_Orientation root.data_interval_type = header.Data_Interval_Type root.data_type = header.Data_Type root.creation_time_utc = str(datetime.utcnow()) # add dimensions time_dim = root.createDimension('time', N) codes = ['X', 'Y', 'Z', 'F', 'H', 'D'] for code in codes: dim = root.createDimension(code, N) # add variables time = root.createVariable('time', 'f8', ('time',)) time.standard_name = 'time' time.units = 'seconds since 2000-01-01 12:00:00.0' time.calendar = 'gregorian' for code in codes: var = root.createVariable(code, 'f8', (code,), zlib=True) var.standard_name = NAME_MAP[code] var.units = UNITS_MAP[code] return root
def test_from_dataset_11_list(self): """Test the creation of a list of InteractiveLists""" variables, coords = self._from_dataset_test_variables ds = xr.Dataset(variables, coords) # Create two lists, each containing two arrays of variables v1 and v2. # In the first list, the xdim dimensions are 0 and 1. # In the second, the xdim dimensions are both 2 l = self.list_class.from_dataset( ds, name=[['v1', 'v2']], xdim=[[0, 1], 2], prefer_list=True) self.assertEqual(len(l), 2) self.assertIsInstance(l[0], psyd.InteractiveList) self.assertIsInstance(l[1], psyd.InteractiveList) self.assertEqual(len(l[0]), 2) self.assertEqual(len(l[1]), 2) self.assertEqual(l[0][0].xdim, 0) self.assertEqual(l[0][1].xdim, 1) self.assertEqual(l[1][0].xdim, 2) self.assertEqual(l[1][1].xdim, 2)
def test_to_dataframe(self): variables, coords = self._from_dataset_test_variables variables['v1'][:] = np.arange(variables['v1'].size).reshape( variables['v1'].shape) ds = xr.Dataset(variables, coords) l = psyd.InteractiveList.from_dataset(ds, name='v1', t=[0, 1]) l.extend(psyd.InteractiveList.from_dataset(ds, name='v1', t=2, x=slice(1, 3)), new_name=True) self.assertEqual(len(l), 3) self.assertTrue(all(arr.ndim == 1 for arr in l), msg=l) df = l.to_dataframe() self.assertEqual(df.shape, (ds.xdim.size, 3)) self.assertEqual(df.index.values.tolist(), ds.xdim.values.tolist()) self.assertEqual(df[l[0].psy.arr_name].values.tolist(), ds.v1[0].values.tolist()) self.assertEqual(df[l[1].psy.arr_name].values.tolist(), ds.v1[1].values.tolist()) self.assertEqual(df[l[2].psy.arr_name].notnull().sum(), 2) self.assertEqual( df[l[2].psy.arr_name].values[ df[l[2].psy.arr_name].notnull().values].tolist(), ds.v1[2, 1:3].values.tolist())
def initialize_file(vs, ncfile, create_time_dimension=True): """ Define standard grid in netcdf file """ if not isinstance(ncfile, Dataset): raise TypeError("Argument needs to be a netCDF4 Dataset") for dim in variables.BASE_DIMENSIONS: var = vs.variables[dim] dimsize = variables.get_dimensions(vs, var.dims[::-1], include_ghosts=False)[0] nc_dim = add_dimension(vs, dim, dimsize, ncfile) initialize_variable(vs, dim, var, ncfile) write_variable(vs, dim, var, getattr(vs, dim), ncfile) if create_time_dimension: nc_dim_time = ncfile.createDimension("Time", None) nc_dim_var_time = ncfile.createVariable("Time", "f8", ("Time",)) nc_dim_var_time.long_name = "Time" nc_dim_var_time.units = "days" nc_dim_var_time.time_origin = "01-JAN-1900 00:00:00"
def threaded_io(vs, filepath, mode): """ If using IO threads, start a new thread to write the netCDF data to disk. """ if vs.use_io_threads: _wait_for_disk(vs, filepath) _io_locks[filepath].clear() nc_dataset = Dataset(filepath, mode) try: yield nc_dataset finally: if vs.use_io_threads: io_thread = threading.Thread(target=_write_to_disk, args=(vs, nc_dataset, filepath)) io_thread.start() else: _write_to_disk(vs, nc_dataset, filepath)
def write(self): """ Write all cached states to the NetCDF file, and clear the cache. This will append to any existing NetCDF file. Raises ------ InvalidStateError If cached states do not all have the same quantities as every other cached and written state. """ with nc4.Dataset(self._filename, self._write_mode) as dataset: self._ensure_cached_state_keys_compatible_with_dataset(dataset) time_list, state_list = self._get_ordered_times_and_states() self._ensure_time_exists(dataset, time_list[0]) it_start = dataset.dimensions['time'].size it_end = it_start + len(time_list) append_times_to_dataset(time_list, dataset, self._time_units) all_states = combine_states(state_list) for name, value in all_states.items(): ensure_variable_exists(dataset, name, value) dataset.variables[name][ it_start:it_end, :] = value.values[:, :] self._cached_state_dict = {}
def get_rad_data(self, inp_file): unzip_file = self.unZip(inp_file) if self.return_code != 0: return try: if sys.platform == 'win32' or sys.platform == 'cygwin': cdf_file = Dataset(unzip_file, 'r') else: cdf_file = NetCDFFile(unzip_file, 'r') except: self.decodeError(inp_file) return # Variable Description Units # -------- ----------------------------------- -------- # swgnt Surface net downward shortwave flux W m-2 self.lati = cdf_file.variables[self.vars['latitude']][:] self.longi = cdf_file.variables[self.vars['longitude']][:] self.tims = cdf_file.variables['time'][:] self.swgnt += self.getGHI(cdf_file.variables[self.vars['swgnt']]) cdf_file.close()
def test_README(self): sim_filename = self.run_with_output("wxgen sim -db examples/database.nc -n 10 -t 100") file = netCDF4.Dataset(sim_filename, 'r') self.assertTrue("time" in file.dimensions) self.assertTrue("ensemble_member" in file.dimensions) self.assertEqual(100, file.dimensions["time"].size) self.assertEqual(10, file.dimensions["ensemble_member"].size) file.close() truth_filename = self.run_with_output("wxgen truth -db examples/database.nc") file = netCDF4.Dataset(truth_filename, 'r') self.assertTrue("time" in file.dimensions) self.assertTrue("ensemble_member" in file.dimensions) self.assertEqual(729, file.dimensions["time"].size) self.assertEqual(1, file.dimensions["ensemble_member"].size) file.close() self.run_with_image("wxgen verif %s %s -m timeseries" % (sim_filename, truth_filename)) self.run_with_image("wxgen verif %s %s -m variance" % (sim_filename, truth_filename)) time.sleep(1) for filename in [sim_filename, truth_filename]: remove(filename)
def anim(filename,varname,cax): template = filename+'_%03i.nc' nc0 = Dataset(template%(0),'r') # open proc 0 nc file nt = len(nc0.dimensions['t']) nc0.close() def animate(i): global kt time,z2d=read(filename,varname,kt) im.set_data(z2d) ti.set_text('%.0f '%time) kt +=5 return im,ti fig=plt.figure() ax = fig.add_subplot(111) time,z2d=read(filename,varname,0) im=ax.imshow(z2d,vmin=cax[0],vmax=cax[1]) ti=ax.text(100,-50,'%.0f '%time) print('launch the animation') global kt kt = 0 ani = animation.FuncAnimation(fig, animate,arange(nt),interval=5, blit=True) plt.show()
def create_diag(self,diag): nc = Dataset(self.diagfile,'w',format='NETCDF4') nc.createDimension('t',None) # define dimensions ('None' is record dim) d = nc.createVariable('t','f',('t',)) d.long_name = 'model time' d = nc.createVariable('kt','i',('t',)) d.long_name = 'model iteration' for v in self.list_diag: d = nc.createVariable(v,'f',('t',)) d.long_name = v nc.close() self.kdiag = 0 # set up internal buffer to avoid too frequent disk access self.ndiags = len(self.list_diag)+2 self.buffersize = 10 self.buffer=zeros((self.buffersize,self.ndiags))
def write(self,tend,t,dt,kt,tnextdiag,tnexthis,var): nh = self.nh nc = Dataset(self.restart_file,'w') nc.setncattr('tend',tend) nc.setncattr('t',t) nc.setncattr('dt',dt) nc.setncattr('kt',kt) nc.setncattr('tnextdiag',tnextdiag) nc.setncattr('tnexthis',tnexthis) nc.createDimension('x',self.nxl) nc.createDimension('y',self.nyl) for v in self.varname_list: nc.createVariable(v,'d',('y','x')) # save in double precision z2d = var.get(v) nc.variables[v][:,:]=z2d[:,:] nc.close()
def plot_numvisc(diagfile): plt.figure() nc = Dataset(diagfile) t=nc.variables['t'][:] ke=nc.variables['ke'][:] dkdt=np.diff(ke)/np.diff(t) ens=nc.variables['enstrophy'][:] ensm=0.5*(ens[1:]+ens[:-1]) # deltake[visc,res]=-(ke[-1]-ke[0]) # deltaens[visc,res]=max(medfilt(ens,21))-ens[5] visc_tseries = -dkdt/ensm*4.4*np.pi visc_num = max(visc_tseries[t[1:]>0.02]) #print('N=%4i / visc = %4.1e / num = %4.2e'%(N[res],Kdiff[visc],visc_num[res])) plt.semilogy(t[1:],visc_tseries) plt.xlabel('time') plt.ylabel('viscosity (-(1/2V)dE/dt)') plt.grid('on') plt.show()
def read(t_MaskInfo): c_FilterDir = mospat_inc_filters.c_FilterDir + 'REGION_MASK/' c_AllMaskNames = t_MaskInfo['c_FilterName'] c_AllFileNames = t_MaskInfo['c_FilterFile'] c_AllVarNames = t_MaskInfo['c_FilterVar'] nmask = len(IncF.c_RegionMask) for imask in range(nmask): c_Mask = IncF.c_RegionMask[imask] # Identifying the idx of mask of interest idx_mask = [i for i, x in enumerate(c_AllMaskNames) if x == c_Mask][0] c_FileName = c_FilterDir + c_AllFileNames[idx_mask] c_VarName = c_AllVarNames[idx_mask] # READING FILE WITH MASK file = nc.Dataset(c_FileName, 'r') f_lat = np.array(file.variables['lat']) f_lon = np.array(file.variables['lon']) f_mask = np.array(np.squeeze(file.variables[c_VarName])) return f_mask, f_lat, f_lon
def load_grid(nc): """ Get a SGrid object from a netCDF4.Dataset or file/URL. :param str or netCDF4.Dataset nc: a netCDF4 Dataset or URL/filepath to the netCDF file :return: SGrid object :rtype: sgrid.SGrid """ if isinstance(nc, Dataset): pass else: nc = Dataset(nc, 'r') return SGrid.load_grid(nc)
def get_dataset(ncfile, dataset=None): """ Utility to create a netCDF4 Dataset from a filename, list of filenames, or just pass it through if it's already a netCDF4.Dataset if dataset is not None, it should be a valid netCDF4 Dataset object, and it will simiply be returned """ if dataset is not None: return dataset if isinstance(ncfile, nc4.Dataset): return ncfile elif isinstance(ncfile, collections.Iterable) and len(ncfile) == 1: return nc4.Dataset(ncfile[0]) elif isstring(ncfile): return nc4.Dataset(ncfile) else: return nc4.MFDataset(ncfile)
def is_valid_mesh(nc, varname): """ determine if the given variable name is a valid mesh definition :param nc: a netCDF4 Dataset to check :param varname: name of the candidate mesh variable """ try: mesh_var = nc.variables[varname] except KeyError: return False try: if (mesh_var.cf_role.strip() == 'mesh_topology' and int(mesh_var.topology_dimension) == 2): return True except AttributeError: # not a valid mesh variable return False # Defining properties of various connectivity arrays # so that the same code can load all of them.
def file_to_subset_setup(request): ids = [2, 4, 6] flows = [3.1, -9999.0, 5.0] date = '2017-04-29_00:00:00' flows = ma.masked_array(flows, mask=[0, 1, 0]) # explicit mask with Dataset(_file_to_subset, 'w') as nc: nc.model_output_valid_time = date dim = nc.createDimension('feature_id', 3) id_var = nc.createVariable('feature_id', 'i', ('feature_id',)) id_var[:] = ids flow_var = nc.createVariable('streamflow', 'f', ('feature_id',), fill_value=-9999.0) flow_var[:] = flows extra_var = nc.createVariable('extra_var', 'i', ('feature_id',)) extra_var[:] = [1, 2, 3] def file_to_subset_teardown(): os.remove(_file_to_subset) request.addfinalizer(file_to_subset_teardown)
def test_time_from_variable(): '''Should read date from time variable.''' tempdir = tempfile.gettempdir() nc_file = os.path.join(tempdir, 'test_time_from_variable.nc') date_obj = parser.parse('2017-04-29 04:00:00') units = 'minutes since 1970-01-01 00:00:00 UTC' nc_date = round(date2num(date_obj, units)) with Dataset(nc_file, 'w') as nc: dim = nc.createDimension('time', 1) time_var = nc.createVariable('time', 'i', ('time',)) time_var[:] = [nc_date] time_var.units = units with Dataset(nc_file, 'r') as nc: expected = date_obj.replace(tzinfo=pytz.utc) returned = nwm_data.time_from_dataset(nc) assert expected == returned os.remove(nc_file)
def files_to_cube_setup(request): date_template = '2017-04-29_0{0}:00:00' for i, nc_file in enumerate(_files_to_cube): date = date_template.format(i) flows = [flow * (i + 1) for flow in _flows_template] if i == 1: flows[1] = -9999.0 # one way of masking data elif i == 2: flows = ma.masked_array(flows, mask=[0, 1, 0]) # explicit mask with Dataset(nc_file, 'w') as nc: nc.model_output_valid_time = date dim = nc.createDimension('feature_id', 3) id_var = nc.createVariable('feature_id', 'i', ('feature_id',)) id_var[:] = _ids flow_var = nc.createVariable('streamflow', 'f', ('feature_id',), fill_value=-9999.0) flow_var[:] = flows def files_to_cube_teardown(): for nc_file in _files_to_cube: os.remove(nc_file) request.addfinalizer(files_to_cube_teardown)
def file_to_read_streamflow_setup(request): ids = [2, 4, 6] flows = [1.3, -9999.0, 5.1] date = '2017-04-29_04:00:00' flows = ma.masked_array(flows, mask=[0, 1, 0]) # explicit mask with Dataset(_file_to_read_streamflow, 'w') as nc: nc.model_output_valid_time = date dim = nc.createDimension('feature_id', 3) id_var = nc.createVariable('feature_id', 'i', ('feature_id',)) id_var[:] = ids flow_var = nc.createVariable('streamflow', 'f', ('feature_id',), fill_value=-9999.0) flow_var[:] = flows def file_to_read_streamflow_teardown(): os.remove(_file_to_read_streamflow) request.addfinalizer(file_to_read_streamflow_teardown)
def load_netcdf_meta(datafile): ''' Loads metadata for NetCDF Parameters: :datafile: str: Path on disk to NetCDF file Returns: :meta: Dictionary of metadata ''' ras = nc.Dataset(datafile) attrs = _get_nc_attrs(ras) sds = _get_subdatasets(ras) meta = {'meta': attrs, 'layer_meta': sds, 'name': datafile, 'variables': list(ras.variables.keys()), } return meta_strings_to_dict(meta)
def to_netcdf(self, path): """ Store the model in a netCDF file that attempts to be displayable using standard tools and loosely follows the WRF 'standard'. :param path: the path where to store the model """ import netCDF4 d = netCDF4.Dataset(path, 'w', format='NETCDF4') d0, d1, k = self.m_ext.shape d.createDimension('fuel_moisture_classes_stag', k) d.createDimension('south_north', d0) d.createDimension('west_east', d1) ncfmc = d.createVariable('FMC_GC', 'f4', ('south_north', 'west_east','fuel_moisture_classes_stag')) ncfmc[:,:,:] = self.m_ext ncfmc_cov = d.createVariable('FMC_COV', 'f4', ('south_north', 'west_east','fuel_moisture_classes_stag', 'fuel_moisture_classes_stag')) ncfmc_cov[:,:,:,:] = self.P d.close()
def from_netcdf(cls, path): """ Construct a fuel moisture model from data stored in a netCDF file. :param path: the path to the netCDF4 file """ import netCDF4 print "reading from netCDF file", path d = netCDF4.Dataset(path) ncfmc = d.variables['FMC_GC'][:,:,:] print "FuelMoistureModel.from_netcdf: reading FMC_GC shape",ncfmc.shape d0, d1, k = ncfmc.shape P = d.variables['FMC_COV'][:,:,:,:] Tk = np.array([1.0, 10.0, 100.0]) * 3600 fm = FuelMoistureModel(ncfmc[:,:,:k-2], Tk) fm.m_ext[:,:,k-2:] = ncfmc[:,:,k-2:] fm.P[:,:,:,:] = P return fm
def create_netcdf(savef, dts, dat): """ Write Florida current data to netcdf file """ dataset = Dataset(savef, 'w', format='NETCDF4_CLASSIC') # Create time coordinate tdim = dataset.createDimension('time', None) time = dataset.createVariable('time',np.float64,(tdim.name,)) time.units = 'hours since 0001-01-01 00:00:00.0' time.calendar = 'gregorian' time[:] = date2num(dts, time.units, calendar=time.calendar) # Create data variable fc = dataset.createVariable('florida_current_transport',np.float64,(tdim.name),fill_value=1.e20) fc.units = 'Sv' fc[:] = dat # close file print 'SAVING: %s' % savef dataset.close()
def write_to_netcdf(self, ncfile): """ Write observation data to netcdf file """ # Open ncfile and create coords dataset = Dataset(ncfile, 'w', format='NETCDF4_CLASSIC') tdim = dataset.createDimension('time', None) # Create time coordinate time = dataset.createVariable('time',np.float64,(tdim.name,)) time.units = 'hours since 0001-01-01 00:00:00.0' time.calendar = 'gregorian' time[:] = date2num(self.dates, time.units, calendar=time.calendar) # Create variables fc = dataset.createVariable('florida_current_transport',np.float64,(tdim.name,)) fc.units = 'Sv' fc[:] = self.fc # Close file print 'SAVING: %s' % ncfile dataset.close()
def testMetadata(ncattributes, files): answer = list([]) # metadata variables... (ncattributes) #print "files: ", len(files) content = dict() count = 0 for f in files: print f nc_fid = netCDF4.Dataset( f,'r') for v in ncattributes: checkVariable(nc_fid,v,answer,count) # print "variables failed = " , count nc_fid.close() for a in nc_fid.ncattrs(): # content[a] = str(nc_fid.getncattr(a)) content[str(a).replace(".","_")] = str(nc_fid.getncattr(a)) return answer,content
def __init__(self, grid_def, description=''): with nc.Dataset(grid_def) as f: # Select points from double density horizontal grid. Only # need t-points. try: x_t = f.variables['nav_lon'][:] y_t = f.variables['nav_lat'][:] z = f.variables['deptht'][:] except KeyError: x_t = f.variables['lon'][:] y_t = f.variables['lat'][:] z = f.variables['depth'][:] mask = np.zeros_like(f.variables['tmask'], dtype=bool) mask[f.variables['tmask'][:] == 0.0] = True super(OrasGrid, self).__init__(x_t, y_t, mask_t=mask, levels=zdescription)
def __init__(self, num_lons=128, num_lats=64, num_levels=1, mask_file=None, description=''): levels = range(num_levels) self.type = 'Spectral' self.full_name = 'T42' if mask_file: with nc.Dataset(mask_file) as f: try: mask = np.round(f.variables['WGOCN'][0, 0, :, :-1]) except KeyError as e: print("Error: var WGOCN not in {}.".format(mask_file), file=sys.stderr) raise e else: # Default is all unmasked. mask = np.zeros((num_lats, num_lons)) assert mask.shape[0] == num_lats assert mask.shape[1] == num_lons super(T42Grid, self).__init__(num_lons, num_lats, mask_t=mask, levels=levels, description=description)
def regrid(regrid_weights, src_data, dest_grid): """ Regrid a single time index of data. """ print('Horizontal regridding ...') # Destination arrays dest_data = np.ndarray((dest_grid.num_lat_points, dest_grid.num_lon_points)) with nc.Dataset(regrid_weights) as wf: n_s = wf.dimensions['n_s'].size n_b = wf.dimensions['n_b'].size row = wf.variables['row'][:] col = wf.variables['col'][:] s = wf.variables['S'][:] dest_data[:, :] = apply_weights(src_data[:, :], dest_data.shape, n_s, n_b, row, col, s) return dest_data
def regrid(regrid_weights, src_data, dest_grid): """ Regrid a single time index of data. """ print('Horizontal regridding ...') # Destination arrays dest_data = np.ndarray((dest_grid.num_levels, dest_grid.num_lat_points, dest_grid.num_lon_points)) with nc.Dataset(regrid_weights) as wf: n_s = wf.dimensions['n_s'].size n_b = wf.dimensions['n_b'].size row = wf.variables['row'][:] col = wf.variables['col'][:] s = wf.variables['S'][:] for l in range(src_data.shape[0]): dest_data[l, :, :] = apply_weights(src_data[l, :, :], dest_data.shape[1:], n_s, n_b, row, col, s) return dest_data
def __init__(self, grid_def, description=''): with nc.Dataset(grid_def) as f: # Get lon and lat x_t = f.variables['lon'][:] y_t = f.variables['lat'][:] try: z = f.variables['depth'][:] except KeyError: z = f.variables['level'][:] try: mask = f.variables['practical_salinity'][0, :, :, :].mask[:] except KeyError: mask = f.variables['s_an'][0, :, :, :].mask[:] super(WoaGrid, self).__init__(x_t, y_t, z, mask, description)
def __init__(self, grid_def, description=''): with nc.Dataset(grid_def) as f: # Select points from double density horizontal grid. Only # need t-points. x_t = f.variables['lon'][:] y_t = f.variables['lat'][:] try: z = f.variables['level'][:] except KeyError: z = f.variables['depth'][:] try: mask = f.variables['pottmp'][0, :].mask[:] except KeyError: mask = f.variables['POT'][0, :].mask[:] super(GodasGrid, self).__init__(x_t, y_t, z, mask, description)
def __init__(self, h_grid_def, v_grid_def, mask_file, description): with nc.Dataset(h_grid_def) as f: # Select points from double density horizontal grid. Only # need t-points. x_t = f.variables['x'][1::2,1::2] y_t = f.variables['y'][1::2,1::2] self.x_vt = f.variables['x'][:] self.y_vt = f.variables['y'][:] with nc.Dataset(v_grid_def) as f: # Only take cell centres. z = f.variables['zeta'][1::2] if mask_file is None: mask = np.zeros_like(x_t, dtype=bool) else: with nc.Dataset(mask_file) as f: mask = np.zeros_like(f.variables['mask'], dtype=bool) mask[f.variables['mask'][:] == 0.0] = True super(MomGrid, self).__init__(x_t, y_t, z, mask, description)
def get_time_origin(filename): """ Parse time.units to find the start/origin date of the file. Return a datetime.date object. """ date_search_strings = ['\d{4}-\d{2}-\d{2}','\d{4}-\d{1}-\d{2}', '\d{4}-\d{2}-\d{1}','\d{4}-\d{1}-\d{1}'] with nc.Dataset(filename) as f: time_var = f.variables['time'] assert 'months since' in time_var.units or \ 'days since' in time_var.units or \ 'hours since' in time_var.units, \ "Time units doesn't have expected format: {}".format(time_var.units) for ds in date_search_strings: m = re.search(ds, time_var.units) if m is not None: break assert m is not None date = dt.datetime.strptime(m.group(0), '%Y-%m-%d') return dt.date(date.year, date.month, date.day)
def write_nemo_output_at_time(filename, var_name, var_longname, var_units, var_data, time_idx, time_pt, write_ic=False): with nc.Dataset(filename, 'r+') as f: if not f.variables.has_key(var_name): var = f.createVariable(var_name, 'f8', ('time_counter', 'z', 'y', 'x')) var.long_name = var_longname var.units = var_units var = f.variables[var_name] if write_ic: var[0, :] = var_data[:] f.variables['time_counter'][0] = time_pt else: var[time_idx, :] = var_data[:] f.variables['time_counter'][time_idx] = time_pt
def _create_file(filename, mask, lat, lon): """create the netcdf file to store the srex mask""" import netCDF4 as nc with nc.Dataset(filename, 'w') as ncf: ncf.createDimension('lat', size=lat.size) ncf.createDimension('lon', size=lon.size) ncf.createVariable('lat', 'f', 'lat') ncf.createVariable('lon', 'f', 'lon') ncf.createVariable('mask', 'f', ('lat', 'lon')) ncf.variables['lat'][:] = lat ncf.variables['lon'][:] = lon ncf.variables['mask'][:] = mask # create unique filename for the mask
def readGraceData(filename, lat_name, lon_name, data_name, time=None): ''' This function reads in netcdf data provided by GRACE Tellus @param filename: Name of file to read in @param lat_name: Name of latitude data @param lon_name: Name of longitude data @param data_name: Name of data product @param time: Name of time data ''' nc = Dataset(filename, 'r') lat_index = nc[lat_name][:] lon_index = nc[lon_name][:] data = nc[data_name][:] if time != None: time = nc.variables[time] date_index = pd.to_datetime(num2date(time[:],units=time.units,calendar=time.calendar)) return pd.Panel(data=data, items=date_index,major_axis=lat_index, minor_axis=lon_index) else: return pd.DataFrame(data = data, columns=lon_index, index=lat_index)
def nc_to_dataframe(nc_fname, columns=slice(None)): """ Return a pandas data frame containing the information in the netCDF file *nc_fname*. Return it and a mapping to the header metadata. Use *columns* to select columns (via a list of column names). """ root = Dataset(nc_fname) data = {} data.update({dim: root[dim][:] for dim in root.dimensions if dim != 'time'}) index = data['time'] = map(fromJ2000, root['time'][:]) return (PD.DataFrame(data=data, index=index)[columns], {x: getattr(root, x) for x in root.ncattrs()})
def test_1D_cf_bounds(self): """Test whether the CF Conventions for 1D bounaries are correct""" final_bounds = np.arange(-180, 181, 30) lon = xr.Variable(('lon', ), np.arange(-165, 166, 30), {'bounds': 'lon_bounds'}) cf_bounds = xr.Variable(('lon', 'bnds'), np.zeros((len(lon), 2))) for i in range(len(lon)): cf_bounds[i, :] = final_bounds[i:i+2] ds = xr.Dataset(coords={'lon': lon, 'lon_bounds': cf_bounds}) decoder = psyd.CFDecoder(ds) self.assertEqual(list(final_bounds), list(decoder.get_plotbounds(lon)))
def test_1D_bounds_calculation(self): """Test whether the 1D cell boundaries are calculated correctly""" final_bounds = np.arange(-180, 181, 30) lon = xr.Variable(('lon', ), np.arange(-165, 166, 30)) ds = xr.Dataset(coords={'lon': lon}) decoder = psyd.CFDecoder(ds) self.assertEqual(list(final_bounds), list(decoder.get_plotbounds(lon)))
def _filter_test_ds(self): return xr.Dataset( {'v0': xr.Variable(('ydim', 'xdim'), np.zeros((4, 4)), attrs={'test': 1, 'test2': 1}), 'v1': xr.Variable(('xdim', ), np.zeros(4), attrs={'test': 2, 'test2': 2}), 'v2': xr.Variable(('xdim', ), np.zeros(4), attrs={'test': 3, 'test2': 3})}, {'ydim': xr.Variable(('ydim', ), np.arange(1, 5)), 'xdim': xr.Variable(('xdim', ), np.arange(4))})
def test_from_dataset_01_basic(self): """test creation without any additional information""" variables, coords = self._from_dataset_test_variables ds = xr.Dataset(variables, coords) l = self.list_class.from_dataset(ds) self.assertEqual(len(l), 4) self.assertEqual(set(l.names), set(variables)) for arr in l: self.assertEqual(arr.dims, variables[arr.name].dims, msg="Wrong dimensions for variable " + arr.name) self.assertEqual(arr.shape, variables[arr.name].shape, msg="Wrong shape for variable " + arr.name)
def test_from_dataset_02_name(self): """Test the from_dataset creation method with selected names""" variables, coords = self._from_dataset_test_variables ds = xr.Dataset(variables, coords) l = self.list_class.from_dataset(ds, name="v2") self.assertEqual(len(l), 1) self.assertEqual(set(l.names), {"v2"}) for arr in l: self.assertEqual(arr.dims, variables[arr.name].dims, msg="Wrong dimensions for variable " + arr.name) self.assertEqual(arr.shape, variables[arr.name].shape, msg="Wrong shape for variable " + arr.name)
def test_from_dataset_04_exact_selection(self): """Test the from_dataset creation method with selected names""" variables, coords = self._from_dataset_test_variables ds = xr.Dataset(variables, coords) l = self.list_class.from_dataset(ds, ydim=2, method=None, name=['v0', 'v2']) self.assertEqual(len(l), 2) self.assertEqual(set(l.names), {'v0', 'v2'}) for arr in l: self.assertEqual(arr.ydim, 2, msg="Wrong ydim slice for " + arr.name)
def test_from_dataset_05_exact_array_selection(self): """Test the from_dataset creation method with selected names""" variables, coords = self._from_dataset_test_variables ds = xr.Dataset(variables, coords) l = self.list_class.from_dataset(ds, ydim=[[2, 3]], method=None, name=['v0', 'v2']) self.assertEqual(len(l), 2) self.assertEqual(set(l.names), {'v0', 'v2'}) for arr in l: self.assertEqual(arr.ydim.values.tolist(), [2, 3], msg="Wrong ydim slice for " + arr.name)