Python oslo_utils.importutils 模块,import_module() 实例源码

我们从Python开源项目中,提取了以下34个代码示例,用于说明如何使用oslo_utils.importutils.import_module()

项目:os-faults    作者:openstack    | 项目源码 | 文件源码
def _import_modules_from_package():
    folder = os.path.dirname(os_faults.__file__)
    library_root = os.path.normpath(os.path.join(folder, os.pardir))

    for root, dirs, files in os.walk(folder):
        for filename in files:
            if (filename.startswith('__') or
                    filename.startswith('test') or
                    not filename.endswith('.py')):
                continue

            relative_path = os.path.relpath(os.path.join(root, filename),
                                            library_root)
            name = os.path.splitext(relative_path)[0]  # remove extension
            module_name = '.'.join(name.split(os.sep))  # convert / to .

            if module_name not in sys.modules:
                module = importutils.import_module(module_name)
                sys.modules[module_name] = module
            else:
                module = sys.modules[module_name]

            yield module
项目:ovn-scale-test    作者:openvswitch    | 项目源码 | 文件源码
def make_plugin_base_section(plugin_group):
    elements = []

    for item in plugin_group["items"]:
        name = item["name"].title() if "SLA" != item["name"] else item["name"]
        category_obj = category("%s %ss" % (plugin_group["group"].title(),
                                            name))
        elements.append(category_obj)

        module, cls = item["base"].split(":")
        plugin_base = getattr(importutils.import_module(module), cls)

        for p in plugin_base.get_all():
            category_obj.append(make_plugin_section(p, item["name"]))

    return elements
项目:Trusted-Platform-Module-nova    作者:BU-NU-CLOUD-SP16    | 项目源码 | 文件源码
def __init__(self, host, **kwargs):
        """Create an NWFilter firewall driver

        :param host: nova.virt.libvirt.host.Host instance
        :param kwargs: currently unused
        """

        global libvirt
        if libvirt is None:
            try:
                libvirt = importutils.import_module('libvirt')
            except ImportError:
                LOG.warning(_LW("Libvirt module could not be loaded. "
                             "NWFilterFirewall will not work correctly."))
        self._host = host
        self.static_filters_configured = False
        self.handle_security_groups = False
项目:Trusted-Platform-Module-nova    作者:BU-NU-CLOUD-SP16    | 项目源码 | 文件源码
def __init__(self, virtapi, read_only=False):
        super(IronicDriver, self).__init__(virtapi)
        global ironic
        if ironic is None:
            ironic = importutils.import_module('ironicclient')
            # NOTE(deva): work around a lack of symbols in the current version.
            if not hasattr(ironic, 'exc'):
                ironic.exc = importutils.import_module('ironicclient.exc')
            if not hasattr(ironic, 'client'):
                ironic.client = importutils.import_module(
                                                    'ironicclient.client')

        self.firewall_driver = firewall.load_driver(
            default='nova.virt.firewall.NoopFirewallDriver')
        self.node_cache = {}
        self.node_cache_time = 0

        ironicclient_log_level = CONF.ironic.client_log_level
        if ironicclient_log_level:
            level = py_logging.getLevelName(ironicclient_log_level)
            logger = py_logging.getLogger('ironicclient')
            logger.setLevel(level)

        self.ironicclient = client_wrapper.IronicClientWrapper()
项目:Trusted-Platform-Module-nova    作者:BU-NU-CLOUD-SP16    | 项目源码 | 文件源码
def __init__(self):
        """Initialise the IronicClientWrapper for use.

        Initialise IronicClientWrapper by loading ironicclient
        dynamically so that ironicclient is not a dependency for
        Nova.
        """
        global ironic
        if ironic is None:
            ironic = importutils.import_module('ironicclient')
            # NOTE(deva): work around a lack of symbols in the current version.
            if not hasattr(ironic, 'exc'):
                ironic.exc = importutils.import_module('ironicclient.exc')
            if not hasattr(ironic, 'client'):
                ironic.client = importutils.import_module(
                                                    'ironicclient.client')
        self._cached_client = None
项目:Trusted-Platform-Module-nova    作者:BU-NU-CLOUD-SP16    | 项目源码 | 文件源码
def __init__(self, image, partition=None):
        """Create a new local VFS instance

        :param image: instance of nova.virt.image.model.Image
        :param partition: the partition number of access
        """

        super(VFSGuestFS, self).__init__(image, partition)

        global guestfs
        if guestfs is None:
            try:
                guestfs = importutils.import_module('guestfs')
            except Exception as e:
                raise exception.NovaException(
                    _("libguestfs is not installed (%s)") % e)

        self.handle = None
        self.mount = False
项目:functest    作者:opnfv    | 项目源码 | 文件源码
def try_append_module(name, modules):
    """ Append the module into specified module system """

    if name not in modules:
        modules[name] = importutils.import_module(name)
项目:meteos    作者:openstack    | 项目源码 | 文件源码
def __init__(self, db_driver=None):
        super(Base, self).__init__()
        if not db_driver:
            db_driver = CONF.db_driver
        self.db = importutils.import_module(db_driver)  # pylint: disable=C0103
项目:python-pankoclient    作者:openstack    | 项目源码 | 文件源码
def Client(version, *args, **kwargs):
    module = 'pankoclient.v%s.client' % version
    module = importutils.import_module(module)
    client_class = getattr(module, 'Client')
    return client_class(*args, **kwargs)
项目:python-bileanclient    作者:openstack    | 项目源码 | 文件源码
def import_versioned_module(version, submodule=None):
    module = 'bileanclient.v%s' % version
    if submodule:
        module = '.'.join((module, submodule))
    return importutils.import_module(module)
项目:zun    作者:openstack    | 项目源码 | 文件源码
def _get_classes_from_module(self, module_name):
        """Get the classes from a module that match the type we want."""
        classes = []
        module = importutils.import_module(module_name)
        for obj_name in dir(module):
            # Skip objects that are meant to be private.
            if obj_name.startswith('_'):
                continue
            itm = getattr(module, obj_name)
            if self._is_correct_class(itm):
                classes.append(itm)
        return classes
项目:zun    作者:openstack    | 项目源码 | 文件源码
def test_all_public_methods_are_traced(self):
        profiler_opts.set_defaults(conf.CONF)
        self.config(enabled=True,
                    group='profiler')

        classes = [
            'zun.compute.api.API',
            'zun.compute.rpcapi.API',
        ]
        for clsname in classes:
            # give the metaclass and trace_cls() decorator a chance to patch
            # methods of the classes above
            six.reload_module(
                importutils.import_module(clsname.rsplit('.', 1)[0]))
            cls = importutils.import_class(clsname)

            for attr, obj in cls.__dict__.items():
                # only public methods are traced
                if attr.startswith('_'):
                    continue
                # only checks callables
                if not (inspect.ismethod(obj) or inspect.isfunction(obj)):
                    continue
                # osprofiler skips static methods
                if isinstance(obj, staticmethod):
                    continue

                self.assertTrue(getattr(obj, '__traced__', False), obj)
项目:deb-oslo.utils    作者:openstack    | 项目源码 | 文件源码
def test_import_module(self):
        dt = importutils.import_module('datetime')
        self.assertEqual(sys.modules['datetime'], dt)
项目:python-picassoclient    作者:openstack    | 项目源码 | 文件源码
def import_versioned_module(version, submodule=None):
    module = 'picassoclient.%s' % version
    if submodule:
        module = '.'.join((module, submodule))
    return importutils.import_module(module)
项目:python-harborclient    作者:int32bit    | 项目源码 | 文件源码
def get_subcommand_parser(self, version, do_help=False, argv=None):
        parser = self.get_base_parser(argv)

        self.subcommands = {}
        subparsers = parser.add_subparsers(metavar='<subcommand>')

        actions_module = importutils.import_module(
            "harborclient.v%s.shell" % version.ver_major)

        self._find_actions(subparsers, actions_module, version, do_help)
        self._find_actions(subparsers, self, version, do_help)
        self._add_bash_completion_subparser(subparsers)

        return parser
项目:monasca-events-api    作者:openstack    | 项目源码 | 文件源码
def load_policy_modules():
    """Load all modules that contain policies.

    Method iterates over modules of :py:mod:`monasca_events_api.policies`
    and imports only those that contain following methods:

    - list_rules

    """
    for modname in _list_module_names():
        mod = importutils.import_module(_BASE_MOD_PATH + modname)
        if hasattr(mod, 'list_rules'):
            yield mod
项目:monasca-events-api    作者:openstack    | 项目源码 | 文件源码
def load_conf_modules():
    """Load all modules that contain configuration.

    Method iterates over modules of :py:mod:`monasca_events_api.conf`
    and imports only those that contain following methods:

    - list_opts (required by oslo_config.genconfig)
    - register_opts (required by :py:currentmodule:)

    """
    imported_modules = []
    for modname in _list_module_names():
        mod = importutils.import_module('monasca_events_api.conf.' + modname)
        required_funcs = ['register_opts', 'list_opts']
        for func in required_funcs:
            if not hasattr(mod, func):
                msg = ("The module 'monasca_events_api.conf.%s' should have a"
                       " '%s' function which returns"
                       " the config options."
                       % (modname, func))
                LOG.warning(msg)
            else:
                imported_modules.append(mod)

    LOG.debug('Found %d modules that contain configuration',
              len(imported_modules))

    return imported_modules
项目:kuryr    作者:axbaretto    | 项目源码 | 文件源码
def test__verify_driver(self):
        cfg.CONF.set_override('enabled_drivers',
                              ['kuryr.lib.binding.drivers.veth'],
                              group='binding')
        driver = importutils.import_module('kuryr.lib.binding.drivers.veth')
        binding._verify_driver(driver)  # assert no exception raise
        driver = importutils.import_module('kuryr.lib.binding.drivers.vlan')
        self.assertRaises(exceptions.DriverNotEnabledException,
                          binding._verify_driver, driver)
项目:eclcli    作者:nttcom    | 项目源码 | 文件源码
def import_versioned_module(version, submodule=None):
    module = 'heatclient.v%s' % version
    if submodule:
        module = '.'.join((module, submodule))
    return importutils.import_module(module)
项目:eclcli    作者:nttcom    | 项目源码 | 文件源码
def import_versioned_module(version, submodule=None):
    module = 'monitoringclient.v%s' % version

    if submodule:
        module = '.'.join((module, submodule))
    return importutils.import_module(module)
项目:zaqar-demo    作者:openstacker    | 项目源码 | 文件源码
def get_subcommand_parser(self):
        parser = self.get_base_parser()
        self.subcommands = {}
        subparsers = parser.add_subparsers(metavar='<subcommand>')
        submodule = importutils.import_module('zaqar_demo')
        self._find_actions(subparsers, submodule)
        self._find_actions(subparsers, self)
        return parser
项目:nova-dpm    作者:openstack    | 项目源码 | 文件源码
def import_zhmcclient():
    """Lazy initialization for zhmcclient

    This function helps in lazy loading zhmclient. The zhmcclient can
    otherwise be set to fakezhmcclient for unittest framework
    """

    LOG.debug("get_zhmcclient")
    requests.packages.urllib3.disable_warnings()

    global zhmcclient
    if zhmcclient is None:
        zhmcclient = importutils.import_module('zhmcclient')

    return zhmcclient
项目:python-glareclient    作者:openstack    | 项目源码 | 文件源码
def import_versioned_module(version, submodule=None):
    module = 'glareclient.v%s' % version
    if submodule:
        module = '.'.join((module, submodule))
    return importutils.import_module(module)
项目:ranger-agent    作者:openstack    | 项目源码 | 文件源码
def __init__(self, db_driver=None):
        super(Base, self).__init__()
        if not db_driver:
            db_driver = CONF.db_driver
        self.db = importutils.import_module(db_driver)
项目:mistral-actions    作者:int32bit    | 项目源码 | 文件源码
def _extract_all_actions():
    path = os.path.dirname(mistral_actions.__file__)
    base_len = len(path[:-len(mistral_actions.__name__)])
    action_list = []
    for root, dirs, files in os.walk(path):
        for f in files:
            if f.endswith('.py') and not f.startswith('_'):
                module_name = (root + '/' + f)[base_len:-len('.py')].replace(
                    '/', '.')
                module = importutils.import_module(module_name)
                action_list.extend(_extract_actions_from_module(module))
    return action_list
项目:mistral-actions    作者:int32bit    | 项目源码 | 文件源码
def get_subcommand_parser(self, do_help=False, argv=None):
        parser = self.get_base_parser(argv)

        self.subcommands = {}
        subparsers = parser.add_subparsers(metavar='<subcommand>')

        actions_module = importutils.import_module(
            "mistral_actions.client.shell")

        self._find_actions(subparsers, actions_module, do_help)
        self._find_actions(subparsers, self, do_help)
        self._add_bash_completion_subparser(subparsers)

        return parser
项目:python-masakariclient    作者:openstack    | 项目源码 | 文件源码
def import_versioned_module(version, submodule=None):
    module = 'masakariclient.v%s' % version
    if submodule:
        module = '.'.join((module, submodule))
    return importutils.import_module(module)
项目:python-tricircleclient    作者:openstack    | 项目源码 | 文件源码
def Client(version, *args, **kwargs):
    module = 'tricircleclient.v%s.client' % version
    module = importutils.import_module(module)
    client_class = getattr(module, 'Client')
    return client_class(*args, **kwargs)
项目:Trusted-Platform-Module-nova    作者:BU-NU-CLOUD-SP16    | 项目源码 | 文件源码
def load_network_driver(network_driver=None):
    if not network_driver:
        network_driver = CONF.network_driver

    if not network_driver:
        LOG.error(_LE("Network driver option required, but not specified"))
        sys.exit(1)

    LOG.info(_LI("Loading network driver '%s'"), network_driver)

    return importutils.import_module(network_driver)
项目:Trusted-Platform-Module-nova    作者:BU-NU-CLOUD-SP16    | 项目源码 | 文件源码
def _get_classes_from_module(self, module_name):
        """Get the classes from a module that match the type we want."""
        classes = []
        module = importutils.import_module(module_name)
        for obj_name in dir(module):
            # Skip objects that are meant to be private.
            if obj_name.startswith('_'):
                continue
            itm = getattr(module, obj_name)
            if self._is_correct_class(itm):
                classes.append(itm)
        return classes
项目:Trusted-Platform-Module-nova    作者:BU-NU-CLOUD-SP16    | 项目源码 | 文件源码
def __init__(self, db_driver=None):
        super(Base, self).__init__()
        if not db_driver:
            db_driver = CONF.db_driver
        self.db = importutils.import_module(db_driver)
项目:Trusted-Platform-Module-nova    作者:BU-NU-CLOUD-SP16    | 项目源码 | 文件源码
def __init__(self, uri, read_only=False,
                 conn_event_handler=None,
                 lifecycle_event_handler=None):

        global libvirt
        if libvirt is None:
            libvirt = importutils.import_module('libvirt')

        self._uri = uri
        self._read_only = read_only
        self._conn_event_handler = conn_event_handler
        self._lifecycle_event_handler = lifecycle_event_handler
        self._skip_list_all_domains = False
        self._caps = None
        self._hostname = None

        self._wrapped_conn = None
        self._wrapped_conn_lock = threading.Lock()
        self._event_queue = None

        self._events_delayed = {}
        # Note(toabctl): During a reboot of a domain, STOPPED and
        #                STARTED events are sent. To prevent shutting
        #                down the domain during a reboot, delay the
        #                STOPPED lifecycle event some seconds.
        self._lifecycle_delay = 15
项目:Trusted-Platform-Module-nova    作者:BU-NU-CLOUD-SP16    | 项目源码 | 文件源码
def __init__(self):

        global libosinfo
        try:
            if libosinfo is None:
                libosinfo = importutils.import_module(
                                             'gi.repository.Libosinfo')
        except ImportError as exp:
            LOG.info(_LI("Cannot load Libosinfo: (%s)"), exp)
        else:
            self.loader = libosinfo.Loader()
            self.loader.process_default_path()

            self.db = self.loader.get_db()
            self.oslist = self.db.get_os_list()
项目:Trusted-Platform-Module-nova    作者:BU-NU-CLOUD-SP16    | 项目源码 | 文件源码
def deserialize_remote_exception(data, allowed_remote_exmods):
    failure = jsonutils.loads(str(data))

    trace = failure.get('tb', [])
    message = failure.get('message', "") + "\n" + "\n".join(trace)
    name = failure.get('class')
    module = failure.get('module')

    # NOTE(ameade): We DO NOT want to allow just any module to be imported, in
    # order to prevent arbitrary code execution.
    if module != 'exceptions' and module not in allowed_remote_exmods:
        return messaging.RemoteError(name, failure.get('message'), trace)

    try:
        mod = importutils.import_module(module)
        klass = getattr(mod, name)
        if not issubclass(klass, Exception):
            raise TypeError("Can only deserialize Exceptions")

        failure = klass(*failure.get('args', []), **failure.get('kwargs', {}))
    except (AttributeError, TypeError, ImportError):
        return messaging.RemoteError(name, failure.get('message'), trace)

    ex_type = type(failure)
    str_override = lambda self: message
    new_ex_type = type(ex_type.__name__ + _REMOTE_POSTFIX, (ex_type,),
                       {'__str__': str_override, '__unicode__': str_override})
    new_ex_type.__module__ = '%s%s' % (module, _REMOTE_POSTFIX)
    try:
        # NOTE(ameade): Dynamically create a new exception type and swap it in
        # as the new type for the exception. This only works on user defined
        # Exceptions and not core python exceptions. This is important because
        # we cannot necessarily change an exception message so we must override
        # the __str__ method.
        failure.__class__ = new_ex_type
    except TypeError:
        # NOTE(ameade): If a core exception then just add the traceback to the
        # first exception argument.
        failure.args = (message,) + failure.args[1:]
    return failure