我们从Python开源项目中,提取了以下10个代码示例,用于说明如何使用pkg_resources.resource_isdir()。
def _get_translation_directories(self, enabled_plugins): main_translation_directory = 'translations' result = [main_translation_directory] entry_points = (e for e in iter_entry_points(group='wazo_admin_ui.plugins') if e.name in enabled_plugins) for ep in entry_points: if resource_isdir(ep.module_name, TRANSLATION_DIRECTORY): result.append(resource_filename(ep.module_name, TRANSLATION_DIRECTORY)) return result
def resource_walk(package_or_requirement, resource_name): """Generate the file names in a resource tree. Parameters ---------- package_or_requirement : str or Requirement Package or requirement that contains the resource. resource_name : str Name of the resource. Returns ------- tuple For each directory in the tree rooted at the given resoruce a 3-tuple ``(dirpath, dirnames, filenames)`` is returned. *dirpath* is a string, the path to the directory starting with *resource_name*. *dirnames* is a list of the names of subdirectories in *dirpath*. *filenames* is a list of names of non-directory files in *dirpath*. """ queue = [resource_name] while len(queue) > 0: dirpath = queue.pop() dirnames = [] filenames = [] for name in resource_listdir(package_or_requirement, dirpath): fullpath = os.path.join(dirpath, name) if resource_isdir(package_or_requirement, fullpath): dirnames.append(name) queue.append(fullpath) else: filenames.append(name) yield dirpath, dirnames, filenames
def _makeReference( output_path, type_suffix ): recs = [] for resource in pkg.resource_listdir(_REF_DIR, ''): if pkg.resource_isdir(_REF_DIR, resource ): expected_file = "{0}_{1}.fasta".format(resource, type_suffix) expected_path = op.join(_REF_PATH, resource, expected_file) if op.exists( expected_path ): recs += _readFasta( expected_path ) else: raise MissingReferenceException('Missing expected reference file "{0}" for Locus "{1}"'.format(expected_file, resource)) _writeFasta( output_path, recs ) return True
def makeExonReference(): data = {} for resource in pkg.resource_listdir(_REF_DIR, ''): if pkg.resource_isdir(_REF_DIR, resource ): expected_file = "{0}_exons.map".format(resource) expected_path = op.join(_REF_PATH, resource, expected_file) if op.exists( expected_path ): data[resource] = expected_path elif _make_exon_map( expected_path, resource ): data[resource] = expected_path else: raise MissingReferenceException('Missing expected reference file "{0}" for Locus "{1}"'.format(expected_file, resource)) _writeMap( _EXON_REF, data ) return True
def recursive_copy(origin, destiny): """Copy directory from resource to destiny folder""" if resource_isdir(__name__, origin): if not exists(destiny): os.makedirs(destiny) for element in resource_listdir(__name__, origin): origin_element = join(origin, element) destiny_element = join(destiny, element) recursive_copy(origin_element, destiny_element) else: with open(destiny, "wb") as fil: fil.write(resource(origin))
def get_languages_supported_by_all(cls, root_egg): egg_interfaces = cls.get_all_relevant_interfaces(root_egg) default_languages = ['en_gb'] if not egg_interfaces: return default_languages domains_in_use = [e.name for e in egg_interfaces] languages_for_eggs = {} for translation_entry_point in iter_entry_points('reahl.translations'): requirement = translation_entry_point.dist.as_requirement() egg_internal_path = cls.get_egg_internal_path_for(translation_entry_point) if resource_isdir(requirement, egg_internal_path): languages = [d for d in resource_listdir(requirement, egg_internal_path) if (resource_isdir(requirement, '%s/%s' % (egg_internal_path, d)) and not d.startswith('__'))] else: logging.error('Translations of %s not found in %s' % (requirement, egg_internal_path)) languages = [] for language in languages: language_path = '%s/%s/LC_MESSAGES' % (egg_internal_path, language) domains = [d[:-3] for d in resource_listdir(requirement, language_path) if d.endswith('.mo')] for domain in domains: if domain in domains_in_use: languages = languages_for_eggs.setdefault(domain, set()) languages.add(language) if not languages_for_eggs.values(): return default_languages languages = (list(languages_for_eggs.values()))[0].intersection(*languages_for_eggs.values()) languages.update(default_languages) return list(languages)
def _install(package, src_dir, dst_dir, params, prefix_len=None, rec=None): """Interpolate source directory into target directory with params.""" package_name = package.__name__ contents = pkg_resources.resource_listdir(package_name, src_dir) if prefix_len is None: prefix_len = len(src_dir) + 1 for item in contents: resource_path = '/'.join([src_dir, item]) dst_path = os.path.join(dst_dir, resource_path[prefix_len:]) if pkg_resources.resource_isdir(package_name, '/'.join([src_dir, item])): fs.mkdir_safe(dst_path) if rec: rec.write('%s/\n' % dst_path) _install(package, os.path.join(src_dir, item), dst_dir, params, prefix_len=prefix_len, rec=rec) else: if resource_path.endswith('.swp'): continue _LOGGER.info('Render: %s => %s', resource_path, dst_path) resource_str = pkg_resources.resource_string(package_name, resource_path) if rec: rec.write('%s\n' % dst_path) _update(dst_path, _render(resource_str.decode('utf-8'), params))
def parameters_from_yaml(name, input_key=None, expected_key=None): package_name, resource_name = name.split('.', 1) resources = [] if resource_isdir(package_name, resource_name): resources.extend([resource_name + '/' + r for r in resource_listdir(package_name, resource_name) if r.endswith(('.yml', '.yaml'))]) elif resource_exists(package_name, resource_name + '.yml'): resources.append(resource_name + '.yml') elif resource_exists(package_name, resource_name + '.yaml'): resources.append(resource_name + '.yaml') if not resources: raise RuntimeError('Not able to load any yaml file for {0}'.format(name)) parameters = [] for resource_name in resources: with resource_stream(package_name, resource_name) as stream: data = yaml.load(stream, Loader=serializer.YAMLLoader) if input_key and expected_key: parameters.append((data[expected_key], data[input_key])) continue for root_key, root_value in data.items(): if isinstance(root_value, Mapping): for expected, data_input in root_value.items(): for properties in data_input if isinstance(data_input, (tuple, list)) else [data_input]: parameters.append((root_key, expected, properties)) else: for properties in root_value if isinstance(root_value, (tuple, list)) else [root_value]: parameters.append((root_key, properties)) return parameters
def copy_resource_dir(src, dest): """ To copy package data directory to destination """ package_name = "mocha" dest = (dest + "/" + os.path.basename(src)).rstrip("/") if pkg_resources.resource_isdir(package_name, src): if not os.path.isdir(dest): os.makedirs(dest) for res in pkg_resources.resource_listdir(__name__, src): copy_resource_dir(src + "/" + res, dest) else: if not os.path.isfile(dest) and os.path.splitext(src)[1] not in [".pyc"]: copy_resource_file(src, dest)
def copy_dir(source, dest, variables, out_=sys.stdout, i=0): """ Copies the ``source`` directory to the ``dest`` directory, where ``source`` is some tuple representing an installed package and a subdirectory in the package, e.g., ('pecan', os.path.join('scaffolds', 'base')) ('pecan_extension', os.path.join('scaffolds', 'scaffold_name')) ``variables``: A dictionary of variables to use in any substitutions. Substitution is performed via ``string.Template``. ``out_``: File object to write to (default is sys.stdout). """ def out(msg): out_.write('%s%s' % (' ' * (i * 2), msg)) out_.write('\n') out_.flush() names = sorted(pkg_resources.resource_listdir(source[0], source[1])) if not os.path.exists(dest): out('Creating %s' % dest) makedirs(dest) else: out('%s already exists' % dest) return for name in names: full = '/'.join([source[1], name]) dest_full = os.path.join(dest, substitute_filename(name, variables)) sub_file = False if dest_full.endswith('_tmpl'): dest_full = dest_full[:-5] sub_file = True if pkg_resources.resource_isdir(source[0], full): out('Recursing into %s' % os.path.basename(full)) copy_dir((source[0], full), dest_full, variables, out_, i + 1) continue else: content = pkg_resources.resource_string(source[0], full) if sub_file: content = render_template(content, variables) if content is None: continue # pragma: no cover out('Copying %s to %s' % (full, dest_full)) f = open(dest_full, 'wb') f.write(content) f.close()