我们从Python开源项目中,提取了以下34个代码示例,用于说明如何使用oslo_utils.importutils.import_module()。
def _import_modules_from_package(): folder = os.path.dirname(os_faults.__file__) library_root = os.path.normpath(os.path.join(folder, os.pardir)) for root, dirs, files in os.walk(folder): for filename in files: if (filename.startswith('__') or filename.startswith('test') or not filename.endswith('.py')): continue relative_path = os.path.relpath(os.path.join(root, filename), library_root) name = os.path.splitext(relative_path)[0] # remove extension module_name = '.'.join(name.split(os.sep)) # convert / to . if module_name not in sys.modules: module = importutils.import_module(module_name) sys.modules[module_name] = module else: module = sys.modules[module_name] yield module
def make_plugin_base_section(plugin_group): elements = [] for item in plugin_group["items"]: name = item["name"].title() if "SLA" != item["name"] else item["name"] category_obj = category("%s %ss" % (plugin_group["group"].title(), name)) elements.append(category_obj) module, cls = item["base"].split(":") plugin_base = getattr(importutils.import_module(module), cls) for p in plugin_base.get_all(): category_obj.append(make_plugin_section(p, item["name"])) return elements
def __init__(self, host, **kwargs): """Create an NWFilter firewall driver :param host: nova.virt.libvirt.host.Host instance :param kwargs: currently unused """ global libvirt if libvirt is None: try: libvirt = importutils.import_module('libvirt') except ImportError: LOG.warning(_LW("Libvirt module could not be loaded. " "NWFilterFirewall will not work correctly.")) self._host = host self.static_filters_configured = False self.handle_security_groups = False
def __init__(self, virtapi, read_only=False): super(IronicDriver, self).__init__(virtapi) global ironic if ironic is None: ironic = importutils.import_module('ironicclient') # NOTE(deva): work around a lack of symbols in the current version. if not hasattr(ironic, 'exc'): ironic.exc = importutils.import_module('ironicclient.exc') if not hasattr(ironic, 'client'): ironic.client = importutils.import_module( 'ironicclient.client') self.firewall_driver = firewall.load_driver( default='nova.virt.firewall.NoopFirewallDriver') self.node_cache = {} self.node_cache_time = 0 ironicclient_log_level = CONF.ironic.client_log_level if ironicclient_log_level: level = py_logging.getLevelName(ironicclient_log_level) logger = py_logging.getLogger('ironicclient') logger.setLevel(level) self.ironicclient = client_wrapper.IronicClientWrapper()
def __init__(self): """Initialise the IronicClientWrapper for use. Initialise IronicClientWrapper by loading ironicclient dynamically so that ironicclient is not a dependency for Nova. """ global ironic if ironic is None: ironic = importutils.import_module('ironicclient') # NOTE(deva): work around a lack of symbols in the current version. if not hasattr(ironic, 'exc'): ironic.exc = importutils.import_module('ironicclient.exc') if not hasattr(ironic, 'client'): ironic.client = importutils.import_module( 'ironicclient.client') self._cached_client = None
def __init__(self, image, partition=None): """Create a new local VFS instance :param image: instance of nova.virt.image.model.Image :param partition: the partition number of access """ super(VFSGuestFS, self).__init__(image, partition) global guestfs if guestfs is None: try: guestfs = importutils.import_module('guestfs') except Exception as e: raise exception.NovaException( _("libguestfs is not installed (%s)") % e) self.handle = None self.mount = False
def try_append_module(name, modules): """ Append the module into specified module system """ if name not in modules: modules[name] = importutils.import_module(name)
def __init__(self, db_driver=None): super(Base, self).__init__() if not db_driver: db_driver = CONF.db_driver self.db = importutils.import_module(db_driver) # pylint: disable=C0103
def Client(version, *args, **kwargs): module = 'pankoclient.v%s.client' % version module = importutils.import_module(module) client_class = getattr(module, 'Client') return client_class(*args, **kwargs)
def import_versioned_module(version, submodule=None): module = 'bileanclient.v%s' % version if submodule: module = '.'.join((module, submodule)) return importutils.import_module(module)
def _get_classes_from_module(self, module_name): """Get the classes from a module that match the type we want.""" classes = [] module = importutils.import_module(module_name) for obj_name in dir(module): # Skip objects that are meant to be private. if obj_name.startswith('_'): continue itm = getattr(module, obj_name) if self._is_correct_class(itm): classes.append(itm) return classes
def test_all_public_methods_are_traced(self): profiler_opts.set_defaults(conf.CONF) self.config(enabled=True, group='profiler') classes = [ 'zun.compute.api.API', 'zun.compute.rpcapi.API', ] for clsname in classes: # give the metaclass and trace_cls() decorator a chance to patch # methods of the classes above six.reload_module( importutils.import_module(clsname.rsplit('.', 1)[0])) cls = importutils.import_class(clsname) for attr, obj in cls.__dict__.items(): # only public methods are traced if attr.startswith('_'): continue # only checks callables if not (inspect.ismethod(obj) or inspect.isfunction(obj)): continue # osprofiler skips static methods if isinstance(obj, staticmethod): continue self.assertTrue(getattr(obj, '__traced__', False), obj)
def test_import_module(self): dt = importutils.import_module('datetime') self.assertEqual(sys.modules['datetime'], dt)
def import_versioned_module(version, submodule=None): module = 'picassoclient.%s' % version if submodule: module = '.'.join((module, submodule)) return importutils.import_module(module)
def get_subcommand_parser(self, version, do_help=False, argv=None): parser = self.get_base_parser(argv) self.subcommands = {} subparsers = parser.add_subparsers(metavar='<subcommand>') actions_module = importutils.import_module( "harborclient.v%s.shell" % version.ver_major) self._find_actions(subparsers, actions_module, version, do_help) self._find_actions(subparsers, self, version, do_help) self._add_bash_completion_subparser(subparsers) return parser
def load_policy_modules(): """Load all modules that contain policies. Method iterates over modules of :py:mod:`monasca_events_api.policies` and imports only those that contain following methods: - list_rules """ for modname in _list_module_names(): mod = importutils.import_module(_BASE_MOD_PATH + modname) if hasattr(mod, 'list_rules'): yield mod
def load_conf_modules(): """Load all modules that contain configuration. Method iterates over modules of :py:mod:`monasca_events_api.conf` and imports only those that contain following methods: - list_opts (required by oslo_config.genconfig) - register_opts (required by :py:currentmodule:) """ imported_modules = [] for modname in _list_module_names(): mod = importutils.import_module('monasca_events_api.conf.' + modname) required_funcs = ['register_opts', 'list_opts'] for func in required_funcs: if not hasattr(mod, func): msg = ("The module 'monasca_events_api.conf.%s' should have a" " '%s' function which returns" " the config options." % (modname, func)) LOG.warning(msg) else: imported_modules.append(mod) LOG.debug('Found %d modules that contain configuration', len(imported_modules)) return imported_modules
def test__verify_driver(self): cfg.CONF.set_override('enabled_drivers', ['kuryr.lib.binding.drivers.veth'], group='binding') driver = importutils.import_module('kuryr.lib.binding.drivers.veth') binding._verify_driver(driver) # assert no exception raise driver = importutils.import_module('kuryr.lib.binding.drivers.vlan') self.assertRaises(exceptions.DriverNotEnabledException, binding._verify_driver, driver)
def import_versioned_module(version, submodule=None): module = 'heatclient.v%s' % version if submodule: module = '.'.join((module, submodule)) return importutils.import_module(module)
def import_versioned_module(version, submodule=None): module = 'monitoringclient.v%s' % version if submodule: module = '.'.join((module, submodule)) return importutils.import_module(module)
def get_subcommand_parser(self): parser = self.get_base_parser() self.subcommands = {} subparsers = parser.add_subparsers(metavar='<subcommand>') submodule = importutils.import_module('zaqar_demo') self._find_actions(subparsers, submodule) self._find_actions(subparsers, self) return parser
def import_zhmcclient(): """Lazy initialization for zhmcclient This function helps in lazy loading zhmclient. The zhmcclient can otherwise be set to fakezhmcclient for unittest framework """ LOG.debug("get_zhmcclient") requests.packages.urllib3.disable_warnings() global zhmcclient if zhmcclient is None: zhmcclient = importutils.import_module('zhmcclient') return zhmcclient
def import_versioned_module(version, submodule=None): module = 'glareclient.v%s' % version if submodule: module = '.'.join((module, submodule)) return importutils.import_module(module)
def __init__(self, db_driver=None): super(Base, self).__init__() if not db_driver: db_driver = CONF.db_driver self.db = importutils.import_module(db_driver)
def _extract_all_actions(): path = os.path.dirname(mistral_actions.__file__) base_len = len(path[:-len(mistral_actions.__name__)]) action_list = [] for root, dirs, files in os.walk(path): for f in files: if f.endswith('.py') and not f.startswith('_'): module_name = (root + '/' + f)[base_len:-len('.py')].replace( '/', '.') module = importutils.import_module(module_name) action_list.extend(_extract_actions_from_module(module)) return action_list
def get_subcommand_parser(self, do_help=False, argv=None): parser = self.get_base_parser(argv) self.subcommands = {} subparsers = parser.add_subparsers(metavar='<subcommand>') actions_module = importutils.import_module( "mistral_actions.client.shell") self._find_actions(subparsers, actions_module, do_help) self._find_actions(subparsers, self, do_help) self._add_bash_completion_subparser(subparsers) return parser
def import_versioned_module(version, submodule=None): module = 'masakariclient.v%s' % version if submodule: module = '.'.join((module, submodule)) return importutils.import_module(module)
def Client(version, *args, **kwargs): module = 'tricircleclient.v%s.client' % version module = importutils.import_module(module) client_class = getattr(module, 'Client') return client_class(*args, **kwargs)
def load_network_driver(network_driver=None): if not network_driver: network_driver = CONF.network_driver if not network_driver: LOG.error(_LE("Network driver option required, but not specified")) sys.exit(1) LOG.info(_LI("Loading network driver '%s'"), network_driver) return importutils.import_module(network_driver)
def __init__(self, uri, read_only=False, conn_event_handler=None, lifecycle_event_handler=None): global libvirt if libvirt is None: libvirt = importutils.import_module('libvirt') self._uri = uri self._read_only = read_only self._conn_event_handler = conn_event_handler self._lifecycle_event_handler = lifecycle_event_handler self._skip_list_all_domains = False self._caps = None self._hostname = None self._wrapped_conn = None self._wrapped_conn_lock = threading.Lock() self._event_queue = None self._events_delayed = {} # Note(toabctl): During a reboot of a domain, STOPPED and # STARTED events are sent. To prevent shutting # down the domain during a reboot, delay the # STOPPED lifecycle event some seconds. self._lifecycle_delay = 15
def __init__(self): global libosinfo try: if libosinfo is None: libosinfo = importutils.import_module( 'gi.repository.Libosinfo') except ImportError as exp: LOG.info(_LI("Cannot load Libosinfo: (%s)"), exp) else: self.loader = libosinfo.Loader() self.loader.process_default_path() self.db = self.loader.get_db() self.oslist = self.db.get_os_list()
def deserialize_remote_exception(data, allowed_remote_exmods): failure = jsonutils.loads(str(data)) trace = failure.get('tb', []) message = failure.get('message', "") + "\n" + "\n".join(trace) name = failure.get('class') module = failure.get('module') # NOTE(ameade): We DO NOT want to allow just any module to be imported, in # order to prevent arbitrary code execution. if module != 'exceptions' and module not in allowed_remote_exmods: return messaging.RemoteError(name, failure.get('message'), trace) try: mod = importutils.import_module(module) klass = getattr(mod, name) if not issubclass(klass, Exception): raise TypeError("Can only deserialize Exceptions") failure = klass(*failure.get('args', []), **failure.get('kwargs', {})) except (AttributeError, TypeError, ImportError): return messaging.RemoteError(name, failure.get('message'), trace) ex_type = type(failure) str_override = lambda self: message new_ex_type = type(ex_type.__name__ + _REMOTE_POSTFIX, (ex_type,), {'__str__': str_override, '__unicode__': str_override}) new_ex_type.__module__ = '%s%s' % (module, _REMOTE_POSTFIX) try: # NOTE(ameade): Dynamically create a new exception type and swap it in # as the new type for the exception. This only works on user defined # Exceptions and not core python exceptions. This is important because # we cannot necessarily change an exception message so we must override # the __str__ method. failure.__class__ = new_ex_type except TypeError: # NOTE(ameade): If a core exception then just add the traceback to the # first exception argument. failure.args = (message,) + failure.args[1:] return failure