我们从Python开源项目中,提取了以下37个代码示例,用于说明如何使用pkg_resources.resource_listdir()。
def _fuzzdb_get_strings(max_len=0): 'Helper to get all the strings from fuzzdb' ignored = ['integer-overflow'] for subdir in pkg_resources.resource_listdir('protofuzz', BASE_PATH): if subdir in ignored: continue path = '{}/{}'.format(BASE_PATH, subdir) listing = pkg_resources.resource_listdir('protofuzz', path) for filename in listing: if not filename.endswith('.txt'): continue path = '{}/{}/{}'.format(BASE_PATH, subdir, filename) source = _open_fuzzdb_file(path) for line in source: string = line.decode('utf-8').strip() if not string or string.startswith('#'): continue if max_len != 0 and len(line) > max_len: continue yield string
def load_model(package_name, model_dir, model_name): model_path = model_dir + "/" + model_name model = {} for f in pkg_resources.resource_listdir(package_name, model_path): f = model_path + '/' + f with pkg_resources.resource_stream(package_name, f) as fd: append_model(model, yaml.safe_load(fd)) imports_path = model.get('imports') if imports_path: f = model_dir + '/' + imports_path with pkg_resources.resource_stream(package_name, f) as fd: append_model(model, yaml.safe_load(fd)) extend_base_objects(model) extend_api_objects(model) return model # Singleton generator
def getTestList(): """ Generate a list containing the names of the test modules, which should be executed. Returns: List with names of test Python modules (list/string). """ testModList = [] modulesInNgamsTest = pkg_resources.resource_listdir(__name__, ".") fileList = [f for f in modulesInNgamsTest if f.endswith("Test.py")] fileList.sort() supprTests = [] for file in fileList: testMod = os.path.basename(file).split(".")[0] try: supprTests.index(testMod) print "===> NOTE: Test Suite: %-32s disabled - RE-ENABLE!" %\ testMod continue except: pass if (file.find("ngamsTest.py") == -1): testModList.append(testMod) return testModList
def setup_output_path(self): """ Called on task startup to copy all static resources into the output path (and to make sure the output path exists as a directory). """ self.logger.info('setting up output path') try: self.output_path.mkdir() except FileExistsError: pass try: (self.output_path / 'simple').mkdir() except FileExistsError: pass for filename in resource_listdir(__name__, 'static'): with (self.output_path / filename).open('wb') as f: source = resource_stream(__name__, 'static/' + filename) f.write(source.read()) source.close()
def get_model_list(package_name="gluon", model_dir="models"): model_list = list() for f in pkg_resources.resource_listdir(package_name, model_dir): if f == 'base': continue model_list.append(f) return model_list
def show_wordlists(): click.echo(_('builtin word lists:')) choices = [res for res in pkg_resources.resource_listdir( __name__, 'wordlists') if '.' not in res] click.echo(' '.join(sorted(choices)))
def find_components(package, base_class): """Find components which are subclass of a given base class. """ for filename in resource_listdir(package, ''): basename, extension = os.path.splitext(filename) if extension != '.py' or basename.startswith('.'): continue module_name = "{}.{}".format(package, basename) __import__(module_name, fromlist='*') module = sys.modules[module_name] if not hasattr(module, '__all__'): continue yield from scan_module(module, base_class)
def _get_configspec(): """Found and read all the configuration specifications""" files = sorted(pkg_resources.resource_listdir(__name__, "")) specfiles = [fn for fn in files if fn.endswith(".conf.spec")] if os.environ.get("DEBUG_FG21SIM"): print("DEBUG: Found config specifications: %s" % ", ".join(specfiles), file=sys.stderr) # NOTE: # `resource_string()` returns the resource in *binary/bytes* string configspec = "\n".join([ pkg_resources.resource_string(__name__, fn).decode("utf-8") for fn in specfiles ]).split("\n") return configspec
def resource_walk(package_or_requirement, resource_name): """Generate the file names in a resource tree. Parameters ---------- package_or_requirement : str or Requirement Package or requirement that contains the resource. resource_name : str Name of the resource. Returns ------- tuple For each directory in the tree rooted at the given resoruce a 3-tuple ``(dirpath, dirnames, filenames)`` is returned. *dirpath* is a string, the path to the directory starting with *resource_name*. *dirnames* is a list of the names of subdirectories in *dirpath*. *filenames* is a list of names of non-directory files in *dirpath*. """ queue = [resource_name] while len(queue) > 0: dirpath = queue.pop() dirnames = [] filenames = [] for name in resource_listdir(package_or_requirement, dirpath): fullpath = os.path.join(dirpath, name) if resource_isdir(package_or_requirement, fullpath): dirnames.append(name) queue.append(fullpath) else: filenames.append(name) yield dirpath, dirnames, filenames
def send_extra(self): """ Sends any extra JS/CSS files placed in Gate One's 'static/extra' directory. Can be useful if you want to use Gate One's file synchronization and caching capabilities in your app. .. note:: You may have to create the 'static/extra' directory before putting files in there. """ #extra_path = resource_filename('gateone', 'static/extra') extra_path = os.path.join(getsettings('BASE_DIR'), 'static/extra') #if not resource_exists('gateone', '/static/extra'): #return # Nothing to do if not os.path.exists(extra_path): return # Nothing to do #for f in resource_listdir('gateone', '/static/extra'): #filepath = resource_filename('gateone', '/static/extra/%s' % f) #if filepath.endswith('.js'): #self.send_js(filepath, force=True) #elif filepath.endswith('.css'): #self.send_css(filepath, force=True) for f in os.listdir(extra_path): filepath = os.path.join(extra_path,f) if filepath.endswith('.js'): self.send_js(filepath, force=True) elif filepath.endswith('.css'): self.send_css(filepath, force=True)
def enumerate_themes(self): """ Returns a JSON-encoded object containing the installed themes and text color schemes. """ #themes = resource_listdir('gateone', '/templates/themes') themes = os.listdir(os.path.join(getsettings('BASE_DIR'), 'templates/themes')) # Just in case other junk wound up in that directory: themes = [a for a in themes if a.endswith('.css')] themes = [a.replace('.css', '') for a in themes] # Just need the names message = {'go:themes_list': {'themes': themes}} self.write_message(message)
def send_client_files(self): """ Sends the client our standard CSS and JS. """ # Render and send the client our terminal.css terminal_css = resource_filename( 'applications', '/templates/terminal.css') #terminal_css = os.path.join(getsettings('BASE_DIR'), 'static', 'templates', 'terminal.css') self.render_and_send_css(terminal_css, element_id="terminal.css") # Send the client our JavaScript files js_files = resource_listdir('applications', '/static/') #js_files = os.listdir(os.path.join(getsettings('BASE_DIR'), 'static', 'terminal')) js_files.sort() for fname in js_files: if fname.endswith('.js'): js_file_path = resource_filename( 'applications', '/static/%s' % fname) #js_file_path = os.path.join(os.path.join(getsettings('BASE_DIR'), 'static', 'terminal'),fname) if fname == 'terminal.js': self.ws.send_js(js_file_path, requires=["terminal.css"]) elif fname == 'terminal_input.js': self.ws.send_js(js_file_path, requires="terminal.js") else: self.ws.send_js(js_file_path, requires='terminal_input.js') self.ws.send_plugin_static_files( 'go_terminal_plugins', requires=["terminal_input.js"]) # Send the client the 256-color style information and our printing CSS self.send_256_colors() self.send_print_stylesheet()
def enumerate_fonts(self): """ Returns a JSON-encoded object containing the installed fonts. """ from applications.woff_info import woff_info fonts = resource_listdir( 'applications', '/static/fonts') #fonts = os.listdir(os.path.join(getsettings('BASE_DIR'), 'static/terminal/fonts')) font_list = [] for font in fonts: if not font.endswith('.woff'): continue font_path = resource_filename( 'applications', '/static/fonts/%s' % font) #font_path = os.path.join(os.path.join(getsettings('BASE_DIR'), 'static/terminal/fonts'), font) font_info = woff_info(font_path) if "Font Family" not in font_info: self.ws.logger.error(_( "Bad font in fonts dir (missing Font Family in name " "table): %s" % font)) continue # Bad font if font_info["Font Family"] not in font_list: font_list.append(font_info["Font Family"]) message = {'terminal:fonts_list': {'fonts': font_list}} self.write_message(message) #@require(policies('terminal'))
def enumerate_colors(self): """ Returns a JSON-encoded object containing the installed text color schemes. """ colors = resource_listdir( 'applications', '/templates/term_colors') #colors = os.listdir(os.path.join(getsettings('BASE_DIR'), 'static/templates/term_colors')) colors = [a for a in colors if a.endswith('.css')] colors = [a.replace('.css', '') for a in colors] message = {'terminal:colors_list': {'colors': colors}} self.write_message(message)
def get_genome_size_file(genome): # type: (str) -> str genome_names = pkg_resources.resource_listdir("epic", "scripts/chromsizes") name_dict = {n.lower().replace(".chromsizes", ""): n for n in genome_names} # No try/except here, because get_egs would already have failed if genome # did not exist genome_exact = name_dict[genome.lower()] return pkg_resources.resource_filename( "epic", "scripts/chromsizes/{}".format(genome_exact))
def get_effective_genome_length(genome, read_length): # type: (str, int) -> float genome_names = pkg_resources.resource_listdir("epic", "scripts/effective_sizes") name_dict = {n.split("_")[0]: "".join(n.split("_")[:-1]) for n in genome_names} try: genome_exact = name_dict[genome.lower()] egf = pkg_resources.resource_string( # type: ignore "epic", "scripts/effective_sizes/{}_{}.txt".format( genome_exact, read_length)).split()[-1].decode() except KeyError: genome_list = "\n".join(list(name_dict.keys())) logging.error( "Genome " + genome + " not found.\n These are the available genomes: " + genome_list + "\nIf yours is not there, please request it at github.com/endrebak/epic .") genome_length = sum(create_genome_size_dict(genome).values()) logging.info("Using an effective genome fraction of {}.".format(egf)) assert float( egf) < 1, "Something wrong happened, effective genome fraction over 1!" egs = float(egf) * genome_length return egs
def _makeReference( output_path, type_suffix ): recs = [] for resource in pkg.resource_listdir(_REF_DIR, ''): if pkg.resource_isdir(_REF_DIR, resource ): expected_file = "{0}_{1}.fasta".format(resource, type_suffix) expected_path = op.join(_REF_PATH, resource, expected_file) if op.exists( expected_path ): recs += _readFasta( expected_path ) else: raise MissingReferenceException('Missing expected reference file "{0}" for Locus "{1}"'.format(expected_file, resource)) _writeFasta( output_path, recs ) return True
def makeExonReference(): data = {} for resource in pkg.resource_listdir(_REF_DIR, ''): if pkg.resource_isdir(_REF_DIR, resource ): expected_file = "{0}_exons.map".format(resource) expected_path = op.join(_REF_PATH, resource, expected_file) if op.exists( expected_path ): data[resource] = expected_path elif _make_exon_map( expected_path, resource ): data[resource] = expected_path else: raise MissingReferenceException('Missing expected reference file "{0}" for Locus "{1}"'.format(expected_file, resource)) _writeMap( _EXON_REF, data ) return True
def _get_global_builders(): """Find builders defined globally """ res = {} for name in pkg_resources.resource_listdir('pkgpanda', 'docker/'): res[name] = pkg_resources.resource_filename('pkgpanda', 'docker/' + name) return res
def __load_all_items(): items = {} for fname in pkg_resources.resource_listdir(__name__, 'data/items'): fullname = os.path.join('data/items/', fname) inpu = pkg_resources.resource_stream(__name__, fullname) Logger.info("Loading items from {}", fullname) items[fullname] = tuple(_load_items(inpu)) Logger.info("Loaded {} items", len(items[fullname])) Logger.info("Total items: {}", sum(len(v) for v in items.values())) return items
def recursive_copy(origin, destiny): """Copy directory from resource to destiny folder""" if resource_isdir(__name__, origin): if not exists(destiny): os.makedirs(destiny) for element in resource_listdir(__name__, origin): origin_element = join(origin, element) destiny_element = join(destiny, element) recursive_copy(origin_element, destiny_element) else: with open(destiny, "wb") as fil: fil.write(resource(origin))
def get_all_test_case_names(): tests = pkg_resources.resource_listdir("tbget", "tests") return [i for i in tests if i.endswith(".txt") and not i.endswith(".expected.txt")]
def get_languages_supported_by_all(cls, root_egg): egg_interfaces = cls.get_all_relevant_interfaces(root_egg) default_languages = ['en_gb'] if not egg_interfaces: return default_languages domains_in_use = [e.name for e in egg_interfaces] languages_for_eggs = {} for translation_entry_point in iter_entry_points('reahl.translations'): requirement = translation_entry_point.dist.as_requirement() egg_internal_path = cls.get_egg_internal_path_for(translation_entry_point) if resource_isdir(requirement, egg_internal_path): languages = [d for d in resource_listdir(requirement, egg_internal_path) if (resource_isdir(requirement, '%s/%s' % (egg_internal_path, d)) and not d.startswith('__'))] else: logging.error('Translations of %s not found in %s' % (requirement, egg_internal_path)) languages = [] for language in languages: language_path = '%s/%s/LC_MESSAGES' % (egg_internal_path, language) domains = [d[:-3] for d in resource_listdir(requirement, language_path) if d.endswith('.mo')] for domain in domains: if domain in domains_in_use: languages = languages_for_eggs.setdefault(domain, set()) languages.add(language) if not languages_for_eggs.values(): return default_languages languages = (list(languages_for_eggs.values()))[0].intersection(*languages_for_eggs.values()) languages.update(default_languages) return list(languages)
def _install(package, src_dir, dst_dir, params, prefix_len=None, rec=None): """Interpolate source directory into target directory with params.""" package_name = package.__name__ contents = pkg_resources.resource_listdir(package_name, src_dir) if prefix_len is None: prefix_len = len(src_dir) + 1 for item in contents: resource_path = '/'.join([src_dir, item]) dst_path = os.path.join(dst_dir, resource_path[prefix_len:]) if pkg_resources.resource_isdir(package_name, '/'.join([src_dir, item])): fs.mkdir_safe(dst_path) if rec: rec.write('%s/\n' % dst_path) _install(package, os.path.join(src_dir, item), dst_dir, params, prefix_len=prefix_len, rec=rec) else: if resource_path.endswith('.swp'): continue _LOGGER.info('Render: %s => %s', resource_path, dst_path) resource_str = pkg_resources.resource_string(package_name, resource_path) if rec: rec.write('%s\n' % dst_path) _update(dst_path, _render(resource_str.decode('utf-8'), params))
def main(target_file, include_dir): """Console script for phriky_units""" print pkg_resources.resource_listdir('phriky_units.resources', '') with open('delete_me.txt', 'w') as fp: fp.write(resource_string('phriky_units.resources.cppcheck', 'std.cfg'))
def get_script(version): """ Generate the script to get the database from *version* (the result of :func:`detect_version`) to the current version of the software. If *version* is ``None``, this is simply the contents of the :file:`sql/create_piwheels.sql` script. Otherwise, it is a concatenation of various update scripts. """ if version is None: return resource_string(__name__, 'sql/create_piwheels.sql').decode('utf-8') # Build the list of upgradable versions from the scripts in the sql/ # directory upgrades = {} ver_regex = re.compile(r'update_piwheels_(?P<from>.*)_to_(?P<to>.*)\.sql$') for filename in resource_listdir(__name__, 'sql'): match = ver_regex.match(filename) if match is not None: upgrades[match.group('from')] = (match.group('to'), filename) # Attempt to find a list of scripts which'll get us from the existing # version to the desired one. NOTE: This is a stupid algorithm which won't # attempt different branches or back-tracking so if you wind up with custom # versions or downgrade scripts in the sql directory, things will probably # break this_version = version output = [] try: while this_version != __version__: this_version, filename = upgrades[this_version] output.append(resource_string(__name__, 'sql/' + filename)) except KeyError: raise RuntimeError("Unable to find upgrade path from %s to %s" % ( version, __version__)) return ''.join(script.decode('utf-8') for script in output)
def _get_settings(self): if isdefined(self.inputs.settings): NIWORKFLOWS_LOG.info('User-defined settings, overriding defaults') return self.inputs.settings filestart = '{}-mni_registration_{}_'.format( self.inputs.moving.lower(), self.inputs.flavor) filenames = [i for i in pkgr.resource_listdir('niworkflows', 'data') if i.startswith(filestart) and i.endswith('.json')] return [pkgr.resource_filename('niworkflows.data', f) for f in sorted(filenames)]
def load_package(self, pkg): migrations = [] for resource_name in pkg_resources.resource_listdir(pkg, "migrations"): name, _ = os.path.splitext(resource_name) m = MIGRATION_FILE_PATTERN.match(name) if m: migration = Migration(Version(int(m.group("version")), m.group("name")), functools.partial(pkg_resources.resource_stream, pkg, "migrations/" + resource_name)) migrations.append(migration) self.migrations = sorted(migrations, key=lambda e: e.version.version)
def _find_templates() -> Dict[str, str]: """ Find all templates and return a map from short name to full name """ lookup = OrderedDict() # type: Dict[str, str] templates = pkg_resources.resource_listdir("etl", "templates") for filename in sorted(templates): name = os.path.splitext(filename)[0] lookup[name] = os.path.join("templates", filename) return lookup
def parameters_from_yaml(name, input_key=None, expected_key=None): package_name, resource_name = name.split('.', 1) resources = [] if resource_isdir(package_name, resource_name): resources.extend([resource_name + '/' + r for r in resource_listdir(package_name, resource_name) if r.endswith(('.yml', '.yaml'))]) elif resource_exists(package_name, resource_name + '.yml'): resources.append(resource_name + '.yml') elif resource_exists(package_name, resource_name + '.yaml'): resources.append(resource_name + '.yaml') if not resources: raise RuntimeError('Not able to load any yaml file for {0}'.format(name)) parameters = [] for resource_name in resources: with resource_stream(package_name, resource_name) as stream: data = yaml.load(stream, Loader=serializer.YAMLLoader) if input_key and expected_key: parameters.append((data[expected_key], data[input_key])) continue for root_key, root_value in data.items(): if isinstance(root_value, Mapping): for expected, data_input in root_value.items(): for properties in data_input if isinstance(data_input, (tuple, list)) else [data_input]: parameters.append((root_key, expected, properties)) else: for properties in root_value if isinstance(root_value, (tuple, list)) else [root_value]: parameters.append((root_key, properties)) return parameters
def copy_resource_dir(src, dest): """ To copy package data directory to destination """ package_name = "mocha" dest = (dest + "/" + os.path.basename(src)).rstrip("/") if pkg_resources.resource_isdir(package_name, src): if not os.path.isdir(dest): os.makedirs(dest) for res in pkg_resources.resource_listdir(__name__, src): copy_resource_dir(src + "/" + res, dest) else: if not os.path.isfile(dest) and os.path.splitext(src)[1] not in [".pyc"]: copy_resource_file(src, dest)
def _init_plugins(self, plugindir, plugins_to_load=None): if plugindir and not os.path.isdir(plugindir): raise InvalidPluginDir(plugindir) if not plugindir: plugindir = DIR("plugins") logger.debug("plugindir: {0}".format(plugindir)) if os.path.isdir(plugindir): pluginfiles = glob(os.path.join(plugindir, "[!_]*.py")) plugins = strip_extension(os.path.basename(p) for p in pluginfiles) else: # we might be in an egg; try to get the files that way logger.debug("trying pkg_resources") import pkg_resources try: plugins = strip_extension( pkg_resources.resource_listdir(__name__, "plugins")) except OSError: raise InvalidPluginDir(plugindir) hooks = {} oldpath = copy.deepcopy(sys.path) sys.path.insert(0, plugindir) for plugin in plugins: if plugins_to_load and plugin not in plugins_to_load: logger.debug("skipping plugin {0}, not in plugins_to_load {1}".format(plugin, plugins_to_load)) continue logger.debug("plugin: {0}".format(plugin)) try: mod = importlib.import_module(plugin) modname = mod.__name__ for hook in re.findall("on_(\w+)", " ".join(dir(mod))): hookfun = getattr(mod, "on_" + hook) logger.debug("plugin: attaching %s hook for %s", hook, modname) hooks.setdefault(hook, []).append(hookfun) if mod.__doc__: # firstline = mod.__doc__.split('\n')[0] part_attachment = json.loads(mod.__doc__) hooks.setdefault('help', {})[modname] = part_attachment hooks.setdefault('extendedhelp', {})[modname] = mod.__doc__ # bare except, because the modules could raise any number of errors # on import, and we want them not to kill our server except: logger.warning("import failed on module {0}, module not loaded".format(plugin)) logger.warning("{0}".format(sys.exc_info()[0])) logger.warning("{0}".format(traceback.format_exc())) sys.path = oldpath return hooks
def init_templates(config_file): config = configparser.ConfigParser(interpolation=configparser.ExtendedInterpolation()) config.read(config_file) template_dir = config.get('DEFAULT', 'TEMPLATE_DIR') texmf_dir = os.path.expanduser('~/texmf/tex/latex/fensterbrief/') # check if template directory exists if not os.path.exists(template_dir): answer = input("+ Shall directory %s be created? " % template_dir).lower() if 'y' in answer: os.makedirs(template_dir) else: return # create user's 'texmf' directory if not os.path.exists(texmf_dir): answer = input("+ Shall directory %s be created? " % texmf_dir).lower() if 'y' in answer: os.makedirs(texmf_dir) else: return # copy templates to tempalte directory for res_name in resource_listdir('templates', ''): if res_name.endswith(".tex") or res_name.endswith(".md") or res_name.endswith(".lco") or res_name.endswith(".sty"): src_fd = resource_stream('templates', res_name) if res_name.endswith(".tex") or res_name.endswith(".md"): dst_file = os.path.join(template_dir, res_name) else: dst_file = os.path.join(texmf_dir, res_name) print("+ Copy resource file to %s" % dst_file) write_file = False if os.path.exists(dst_file): answer = input("+ Shall %s be overwritten? " % dst_file).lower() if 'y' in answer: write_file = True else: write_file = True if write_file: with open(dst_file, 'wb') as dst_fd: shutil.copyfileobj(src_fd, dst_fd) # update fensterbrief.run_program('texhash')
def _validate_config_files(self): """ Validates the configuration files necessary for the application. An exception is thrown if any of the required files are inaccessible. """ # Determine the module of the derived class mod = self.__class__.__module__ # If the configuration directory exists in the library, create config files as necessary # This check also provides backwards compatibility for projects that don't have the # configuration files in the library. if pkg_resources.resource_exists(mod, self.LIB_CONFIG_DIR): # Create configuration directory if not found if not os.access(self._config_dir, os.R_OK): logger.info("Configuration directory '{0}' not found, creating...".format(self._config_dir)) os.makedirs(self._config_dir) # Count of current configuration files config_files_count = len([name for name in os.listdir(self._config_dir) if os.path.isfile(os.path.join(self._config_dir, name))]) # Create configuration files if not found files = pkg_resources.resource_listdir(mod, self.LIB_APP_CONFIG_DIR) for f in files: config_path = os.path.join(self._config_dir, f) if not os.access(config_path, os.R_OK): f_lower = f.lower() # Copy configuration file. Only copy logging file if the directory was empty if not(f_lower.endswith(".py") or f_lower.endswith(".pyc")) and \ (f_lower != Application.LOGGING_CONFIG_FILE or config_files_count == 0): logger.info("Configuration file '{0}' not found, creating...".format(f)) shutil.copyfile(pkg_resources.resource_filename( mod, self.LIB_APP_CONFIG_DIR + "/" + f), config_path) if not os.access(self._dxlclient_config_path, os.R_OK): raise Exception( "Unable to access client configuration file: {0}".format( self._dxlclient_config_path)) if not os.access(self._app_config_path, os.R_OK): raise Exception( "Unable to access application configuration file: {0}".format( self._app_config_path))
def dbsource(dbname, var, resolution=None, tscale=None): """ Temporary solution, just to move on with CoTeDe. """ db_cfg = {} cfg_dir = 'datasource' for src_cfg in pkg_resources.resource_listdir('oceansdb', cfg_dir): text = pkg_resources.resource_string( 'oceansdb', os.path.join(cfg_dir, src_cfg)) text = text.decode('UTF-8', 'replace') cfg = json.loads(text) for c in cfg: assert c not in db_cfg, "Trying to overwrite %s" db_cfg[c] = cfg[c] dbpath = oceansdb_dir() datafiles = [] cfg = db_cfg[dbname] if (resolution is None): resolution = cfg['vars'][var]['default_resolution'] if (tscale is None): tscale = cfg['vars'][var][resolution]["default_tscale"] for c in cfg['vars'][var][resolution][tscale]: download_file(outputdir=dbpath, **c) if 'filename' in c: filename = os.path.join(dbpath, c['filename']) else: filename = os.path.join(dbpath, os.path.basename(urlparse(c['url']).path)) if 'varnames' in cfg['vars'][var][resolution]: datafiles.append(Dataset_flex(filename, aliases=cfg['vars'][var][resolution]['varnames'])) else: datafiles.append(Dataset_flex(filename)) return datafiles
def copy_dir(source, dest, variables, out_=sys.stdout, i=0): """ Copies the ``source`` directory to the ``dest`` directory, where ``source`` is some tuple representing an installed package and a subdirectory in the package, e.g., ('pecan', os.path.join('scaffolds', 'base')) ('pecan_extension', os.path.join('scaffolds', 'scaffold_name')) ``variables``: A dictionary of variables to use in any substitutions. Substitution is performed via ``string.Template``. ``out_``: File object to write to (default is sys.stdout). """ def out(msg): out_.write('%s%s' % (' ' * (i * 2), msg)) out_.write('\n') out_.flush() names = sorted(pkg_resources.resource_listdir(source[0], source[1])) if not os.path.exists(dest): out('Creating %s' % dest) makedirs(dest) else: out('%s already exists' % dest) return for name in names: full = '/'.join([source[1], name]) dest_full = os.path.join(dest, substitute_filename(name, variables)) sub_file = False if dest_full.endswith('_tmpl'): dest_full = dest_full[:-5] sub_file = True if pkg_resources.resource_isdir(source[0], full): out('Recursing into %s' % os.path.basename(full)) copy_dir((source[0], full), dest_full, variables, out_, i + 1) continue else: content = pkg_resources.resource_string(source[0], full) if sub_file: content = render_template(content, variables) if content is None: continue # pragma: no cover out('Copying %s to %s' % (full, dest_full)) f = open(dest_full, 'wb') f.write(content) f.close()
def run(cls, args): samples = """ conf.sample.yaml Hoaxy configuration file domains_claim.sample.txt Claim domains domains_factchecking.sample.txt Factchecking domains site.sample.yaml Claim and/or factchecking sites crontab.sample.txt Crontab sample """ if args['--home'] is None: hoaxy_home = HOAXY_HOME msg = """ Sample files are put into the default location: '{}'. Please edit and rename sample files to make Hoaxy work with them. {}""" msg = msg.format(hoaxy_home, samples) else: hoaxy_home = os.path.expanduser(args['--home']) if not hoaxy_home.endswith('/'): hoaxy_home += '/' msg = """ Sample files are put into folder '{}'. You need to set environment HOAXY_HOME={} to activate this path. Also please edit and rename samples to make Hoaxy work with them. {}""" msg = msg.format(hoaxy_home, hoaxy_home, samples) if not os.path.exists(hoaxy_home): try: org_umask = os.umask(0) os.makedirs(hoaxy_home, 0755) finally: os.umask(org_umask) samples = resource_listdir('hoaxy.data', 'samples') for sample in samples: if not sample.startswith('__init__.'): sample = resource_filename('hoaxy.data.samples', sample) shutil.copy(sample, hoaxy_home) os.chmod( os.path.join(hoaxy_home, os.path.basename(sample)), 0644) print(msg)