我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用pkgutil.walk_packages()。
def sources(): try: sourceDict = [] for i in __all__: for loader, module_name, is_pkg in pkgutil.walk_packages([os.path.join(os.path.dirname(__file__), i)]): if is_pkg: continue try: module = loader.find_module(module_name).load_module(module_name) sourceDict.append((module_name, module.source())) except Exception as e: log_utils.log('Could not load "%s": %s' % (module_name, e), log_utils.LOGDEBUG) return sourceDict except: return []
def main(): argv = [ "", "--with-coverage", ( "--cover-package=" "girlfriend.workflow," "girlfriend.data," "girlfriend.util" ), ] for importer, modname, ispkg in pkgutil.walk_packages( path=girlfriend.testing.__path__, prefix="girlfriend.testing."): if ispkg: continue if modname in excludes: continue argv.append(modname) nose.run(argv=argv)
def get_package_libraries(pkg): """ Recursively yield template tag libraries defined in submodules of a package. """ for entry in walk_packages(pkg.__path__, pkg.__name__ + '.'): try: module = import_module(entry[1]) except ImportError as e: raise InvalidTemplateLibrary( "Invalid template library specified. ImportError raised when " "trying to load '%s': %s" % (entry[1], e) ) if hasattr(module, 'register'): yield entry[1]
def import_submodules(package, recursive=True): """ Import all submodules of a module, recursively, including subpackages. From http://stackoverflow.com/questions/3365740/how-to-import-all-submodules :param package: package (name or actual module) :type package: str | module :rtype: dict[str, types.ModuleType] """ import importlib import pkgutil if isinstance(package, str): package = importlib.import_module(package) results = {} for _loader, name, is_pkg in pkgutil.walk_packages(package.__path__): full_name = package.__name__ + '.' + name results[full_name] = importlib.import_module(full_name) if recursive and is_pkg: results.update(import_submodules(full_name))
def init_drivers(self): """ Searchs and initializes all controller drivers. See __init__.py in scc.drivers. """ log.debug("Initializing drivers...") cfg = Config() self._to_start = set() # del-eted later by start_drivers for importer, modname, ispkg in pkgutil.walk_packages(path=drivers.__path__, onerror=lambda x: None): if not ispkg and modname != "driver": if modname == "usb" or cfg["drivers"].get(modname): # 'usb' driver has to be always active mod = getattr(__import__('scc.drivers.%s' % (modname,)).drivers, modname) if hasattr(mod, "init"): if getattr(mod, "init")(self, cfg): if hasattr(mod, "start"): self._to_start.add(getattr(mod, "start")) else: log.debug("Skipping disabled driver '%s'", modname)
def test_packages(self): """ Tests if every known Action is documentated in docs/actions.md """ try: import gi gi.require_version('Gtk', '3.0') gi.require_version('GdkX11', '3.0') gi.require_version('Rsvg', '2.0') except ImportError: pass from setup import packages for importer, modname, ispkg in pkgutil.walk_packages(path=scc.__path__, prefix="scc.", onerror=lambda x: None): if ispkg: assert modname in packages, "Package '%s' is not being installed by setup.py" % (modname,)
def get_plugins(cls): """ Dynamically loads all the plugins in the plugins folder and sorts them by the PRIORITY key. If no PRIORITY is defined for a given plugin, a priority of 0 is assumed. """ logger = logging.getLogger(__name__) locations = [ dingdangpath.PLUGIN_PATH, dingdangpath.CONTRIB_PATH, dingdangpath.CUSTOM_PATH ] logger.debug("Looking for plugins in: %s", ', '.join(["'%s'" % location for location in locations])) plugins = [] exclude_plugins = [] # plugins that are not allow to be call via Wechat or Email thirdparty_exclude_plugins = ['NetEaseMusic'] for finder, name, ispkg in pkgutil.walk_packages(locations): try: loader = finder.find_module(name) mod = loader.load_module(name) except Exception: logger.warning("Skipped plugin '%s' due to an error.", name, exc_info=True) else: if hasattr(mod, 'WORDS'): logger.debug("Found plugin '%s' with words: %r", name, mod.WORDS) plugins.append(mod) if name in thirdparty_exclude_plugins: exclude_plugins.append(mod) else: logger.warning("Skipped plugin '%s' because it misses " + "the WORDS constant.", name) plugins.sort(key=lambda mod: mod.PRIORITY if hasattr(mod, 'PRIORITY') else 0, reverse=True) return (plugins, exclude_plugins)
def import_submodules(package, recursive=True): """ Import all submodules of a package, recursively, including subpackages Arguments: 1. package = (string) name of the package (module) loader of the package 2. recrusive = (bool) True = load packages and modules from all sub-packages as well. (bool) False = load only first level of packages and modules, do not load modules from sub packages """ if isinstance(package, str): package = importlib.import_module(package) results = {} for loader, name, is_pkg in pkgutil.walk_packages(package.__path__): full_name = package.__name__ + '.' + name results[full_name] = importlib.import_module(full_name) if recursive and is_pkg: results.update(import_submodules(full_name)) return results
def list_modules_in_package_path(self, package_path, prefix): modules = [] path_to_walk = [(package_path, prefix)] while len(path_to_walk) > 0: curr_path, curr_prefix = path_to_walk.pop() for importer, name, ispkg in pkgutil.walk_packages( path=[curr_path] ): if ispkg: path_to_walk.append( ( '%s/%s/' % (curr_path, name), '%s.%s' % (curr_prefix, name) ) ) else: modules.append((name, '%s.%s' % (curr_prefix, name))) return modules
def list_modules_in_package_path(package_path, prefix): modules = [] path_to_walk = [(package_path, prefix)] while len(path_to_walk) > 0: curr_path, curr_prefix = path_to_walk.pop() for importer, name, ispkg in pkgutil.walk_packages( path=[curr_path] ): if ispkg: path_to_walk.append( ( '%s/%s/' % (curr_path, name), '%s.%s' % (curr_prefix, name) ) ) else: modules.append((name, '%s.%s' % (curr_prefix, name))) return modules
def _load_from_package(self, pkg ): '''???python??????? :param pkg: Python? :type pkg: ModuleType :returns list - ???????? ''' tests = [] for _, modulename, ispkg in pkgutil.walk_packages(pkg.__path__, pkg.__name__+'.', onerror=self._walk_package_error): if ispkg: continue try: __import__(modulename) tests += self._load_from_module(sys.modules[modulename]) except: self._module_errs[modulename] = traceback.format_exc() return tests
def import_submodules(package, recursive=True): """ Import all submodules of a module, recursively, including subpackages :param package: package (name or actual module) :type package: str | module :rtype: dict[str, types.ModuleType] """ if isinstance(package, str): package = importlib.import_module(package) results = {} for loader, name, is_pkg in pkgutil.walk_packages(package.__path__): full_name = package.__name__ + '.' + name results[full_name] = importlib.import_module(full_name) if recursive and is_pkg: results.update(import_submodules(full_name)) return results
def import_submodules(package, recursive=True): """Import all submodules of a module, recursively, including subpackages :param package: package (name or actual module) :type package: str | module :rtype: dict[str, types.ModuleType] """ if isinstance(package, str): package = importlib.import_module(package) for loader, name, is_pkg in pkgutil.walk_packages( package.__path__, package.__name__ + '.'): if TEST_DIR in name: continue imported = importlib.import_module(name) for prop in getattr(imported, "TRAITS", []): symbolize(name, prop) if recursive and is_pkg: import_submodules(name) # This is where the names defined in submodules are imported
def import_submodules(package, recursive=True): """Import all submodules of a module, recursively, including subpackages Arguments: * `package` -- package (name or actual module) Keyword arguments: * `recursive` -- import modules recursively """ if package is None: return {} if isinstance(package, str): package = importlib.import_module(package) importlib.reload(package) results = {} for loader, name, is_pkg in pkgutil.walk_packages(package.__path__): full_name = package.__name__ + '.' + name results[full_name] = importlib.import_module(full_name) importlib.reload(results[full_name]) if recursive and is_pkg: results.update(import_submodules(full_name)) return results
def import_submodules(context: dict, root_module: str, path: str): """ Import all submodules and register them in the ``context`` namespace. >>> import_submodules(locals(), __name__, __path__) """ modules = {} for loader, module_name, is_pkg in pkgutil.walk_packages(path, root_module + '.'): # this causes a Runtime error with model conflicts # module = loader.find_module(module_name).load_module(module_name) module = __import__(module_name, globals(), locals(), ['__name__']) keys = getattr(module, '__all__', None) if keys is None: keys = [k for k in vars(module).keys() if not k.startswith('_')] for k in keys: context[k] = getattr(module, k, None) modules[module_name] = module # maintain existing module namespace import with priority for k, v in modules.items(): context[k] = v
def _compile_modules(self): modules = {} for module_finder, name, ispkg in pkgutil.walk_packages(): for attempt in range(2): with suppress((AttributeError, ImportError)): if attempt == 0: loader = module_finder.find_spec(name).loader filename = loader.get_filename(name) elif attempt == 1: filename = get_module_file_attribute(name) break else: continue modules[os.path.abspath(filename)] = name return modules
def _load_all_functions(self): if self.all_functions is None: self.all_functions = dict() my_package = '{0}/__init__.py'.format(os.path.dirname(__file__)) verbose_print('FunctionSupport loading all function modules near {0}.'.format(my_package), indent_after=True) for importer, modname, ispkg in walk_packages(my_package): if not ispkg and modname.startswith('hq.hquery.functions.'): verbose_print('Found candidate module {0} -- loading.'.format(modname)) module = importer.find_module(modname).load_module(modname) if hasattr(module, 'exports'): exports = {name.rstrip('_'): getattr(module, name) for name in getattr(module, 'exports')} verbose_print('Module {0} exports are: {1}'.format(modname, exports.keys())) if any(not (isclass(obj) or isfunction(obj)) for obj in exports.values()): raise RuntimeError('Non-class/function export(s) loaded from module {0}'.format(modname)) self.all_functions.update(exports) else: verbose_print('Module {0} defined no exports.'.format(modname)) verbose_print('Finished loading function modules.', outdent_before=True)
def load_notifiers(): path = os.path.dirname(os.path.abspath(__file__)) before, sep, _ = __name__.rpartition('.') prefix = before + sep registry = {} for _, modname, _ in pkgutil.walk_packages([path], prefix): submodule = importlib.import_module(modname, __name__) if hasattr(submodule, 'register'): submodule.register(registry) else: key = getattr(submodule, 'NAME', modname.split('.')[-1]) if hasattr(submodule, 'notify_factory'): registry[key] = submodule.notify_factory elif hasattr(submodule, 'notify'): registry[key] = dummy_notify_factory(submodule.notify) return registry
def get_routes(package): routes = [] for _, modname, ispkg in pkgutil.walk_packages( path=package.__path__, prefix=package.__name__ + '.', onerror=lambda x: None): if not ispkg: module = import_module(modname) for k, cls in vars(module).items(): if k.startswith("_") or not isinstance(cls, six.class_types): continue if issubclass(cls, BaseAPI): if getattr(cls, "route", False): routes.append(cls) return routes # monkeypatch to force application json on raised exceptions
def get_spiders_classes(entry_point_name=None): discovered = dict() if not entry_point_name: entry_point_name = __name__.split(".")[0] def load_module(module_name): module = inspect.importlib.import_module(module_name) for importer, modname, ispkg in pkgutil.walk_packages(path=module.__path__): mod = inspect.importlib.import_module(module.__name__ + "." + modname) if ispkg: load_module(mod.__name__) else: for name, obj in inspect.getmembers(mod): if is_spider(obj) and mod.__name__ == obj.__module__: try: sn = getattr(obj, "name", None) if sn in discovered: raise Exception("Duplicate spider '{}': {} and {}".format(sn, obj, discovered[sn])) discovered[sn] = obj except AttributeError: pass load_module(module_name=entry_point_name) return discovered
def get_reports(): """ Compile a list of all reports available across all modules in the reports path. Returns a list of tuples: [ (module_name, (report, report, report, ...)), (module_name, (report, report, report, ...)), ... ] """ module_list = [] # Iterate through all modules within the reports path. These are the user-created files in which reports are # defined. for importer, module_name, is_pkg in pkgutil.walk_packages([settings.REPORTS_ROOT]): module = importlib.import_module('reports.{}'.format(module_name)) report_list = [cls() for _, cls in inspect.getmembers(module, is_report)] module_list.append((module_name, report_list)) return module_list
def _get_module(module_name): """Returns a tuple with all objects that could be instantiated without arguments. Args: module_name: A string object. It represents the name of the top-level module. Returns: A ModuleType object with all sub-modules imported from the given module. """ top_level_module = importlib.import_module(module_name) prefix = modulename(top_level_module) + "." try: for module_loader, name, is_pkg in pkgutil.walk_packages(path=top_level_module.__path__, prefix=prefix): importlib.import_module(name, top_level_module) except (ImportError, ImportWarning) as e: LOG.debug(str(e)) return top_level_module
def get_all_classes_of_type(to_find=None, path="."): """ Get all classes currently imported by the Python environment found in the given path. :param to_find: The class to find all instances of. :param path: The import path to walk. :return: A list of all classes that are subclasses of to_find as found in the given path. """ to_return = [] for importer, name, is_package in pkgutil.walk_packages(path=[path]): current_package_name = name if path is "." else ".".join([path.replace("/", "."), name]) to_return.extend(IntrospectionHelper.get_classes_from_module_name( module_name=current_package_name, parent_class=to_find, )) return to_return
def list_cls_under_mod(mod, cls, uq_attr): """ List classes including derived under one module (imported). Parameters: mod: imported module cls: Base class uq_attr: name of the class member that each class own a unique value Return: A dict that represents a class. """ r = dict() for loader, name, ispkg in pkgutil.walk_packages( getattr(mod, '__path__', None), prefix=mod.__name__ + '.'): if ispkg is False: mod = importlib.import_module(name) for name, c in inspect.getmembers(mod, inspect.isclass): if issubclass(c, cls): v = getattr(c, uq_attr) if v is not None: r[v] = c return r
def fetch_list_datasets(): """Get all datasets into a dictionary. Returns ------- dict A dictionary where keys are names of datasets and values are a dictionary containing information like urls or keywords of a dataset. """ db_list = {} for _, modname, ispkg in pkgutil.walk_packages(path=datasets.__path__, prefix=datasets.__name__ + '.', onerror=lambda x: None): if ispkg: paths = modname.split('.') db = get_dataset_attributes(modname) if db: dbname = paths[-1] db_list.update({dbname: db}) return db_list
def writedocs(dir, pkgpath='', done=None): """Write out HTML documentation for all modules in a directory tree.""" if done is None: done = {} for importer, modname, ispkg in pkgutil.walk_packages([dir], pkgpath): writedoc(modname) return
def run(self, callback, key=None, completer=None, onerror=None): if key: key = lower(key) self.quit = False seen = {} for modname in sys.builtin_module_names: if modname != '__main__': seen[modname] = 1 if key is None: callback(None, modname, '') else: desc = split(__import__(modname).__doc__ or '', '\n')[0] if find(lower(modname + ' - ' + desc), key) >= 0: callback(None, modname, desc) for importer, modname, ispkg in pkgutil.walk_packages(onerror=onerror): if self.quit: break if key is None: callback(None, modname, '') else: loader = importer.find_module(modname) if hasattr(loader,'get_source'): import StringIO desc = source_synopsis( StringIO.StringIO(loader.get_source(modname)) ) or '' if hasattr(loader,'get_filename'): path = loader.get_filename(modname) else: path = None else: module = loader.load_module(modname) desc = (module.__doc__ or '').splitlines()[0] path = getattr(module,'__file__',None) if find(lower(modname + ' - ' + desc), key) >= 0: callback(path, modname, desc) if completer: completer()
def import_submodules(name, submodules=None): """Import all submodules for a package/module name""" sys.path.insert(0, name) if submodules: for submodule in submodules: import_string('{0}.{1}'.format(name, submodule)) else: for item in pkgutil.walk_packages([name]): import_string('{0}.{1}'.format(name, item[1]))
def find_packages(path): # This method returns packages and subpackages as well. return [name for _, name, is_pkg in walk_packages([path]) if is_pkg]
def load_classes(self, package, suffix): classes = {} folder = list(import_module(package).__path__)[0] for loader, mod_name, is_pkg in pkgutil.walk_packages([folder]): if not mod_name.endswith(suffix): continue try: module = loader.find_module(mod_name).load_module(mod_name) except: log.exception('Loading', mod_name) continue cls_name = to_camel(mod_name) mod_cls = getattr(module, cls_name, '') if not isclass(mod_cls): log.warning('Could not find', cls_name, 'in', mod_name) continue try: plugin_path = object.__getattribute__(self, 'plugin_path') + '.' except AttributeError: plugin_path = '' mod_cls.attr_name = self.make_name(mod_cls) mod_cls.plugin_path = plugin_path + mod_cls.attr_name classes[mod_cls.attr_name] = mod_cls return classes
def import_submodules(cls, entity): """Import packages and/or modules.""" entity = importlib.import_module(entity) try: for _, name, is_pkg in pkgutil.walk_packages(entity.__path__): if not name.startswith('_'): full_name = entity.__name__ + '.' + name importlib.import_module(full_name) except AttributeError: pass
def makeParserLoadSubmodules(self, parser): for _, modname, ispkg in walk_packages(path=mongodb_consistent_backup.__path__, prefix=mongodb_consistent_backup.__name__ + '.'): if not ispkg: continue try: components = modname.split('.') mod = __import__(components[0]) for comp in components[1:]: mod = getattr(mod, comp) parser = mod.config(parser) except AttributeError: continue return parser
def get_all_modules(path, level=""): modules = [] for loader, name, ispkg in walk_packages([path]): modules.append(level + "." + name if level else name) if ispkg: modules.extend(get_all_modules(os.path.join(path, name), modules[-1])) if level == package_name: modules.append(os.path.basename(path)) return modules
def list_modules_in_package_path(package_path, prefix): """Get the list of all modules in required package_path recursively Args: package_path: The fully qualified linux path of the python package to traverse recursively prefix: The root python package name for the plugins Returns: List of modules in the given package path. """ modules = [] path_to_walk = [(package_path, prefix)] while len(path_to_walk) > 0: curr_path, curr_prefix = path_to_walk.pop() for importer, name, ispkg in pkgutil.walk_packages( path=[curr_path] ): if ispkg: path_to_walk.append( ( '%s/%s/' % (curr_path, name), '%s.%s' % (curr_prefix, name) ) ) else: modules.append((name, '%s.%s' % (curr_prefix, name))) return modules
def build_awg_translator_map(): translators_map = {} translators = [_[1] for _ in pkgutil.walk_packages(drivers.__path__)] for translator in translators: module = import_module('QGL.drivers.' + translator) ext = module.get_seq_file_extension() if ext in translators_map: translators_map[ext].append(module) else: translators_map[ext] = [module] return translators_map # static translator map