Python oslo_utils.importutils 模块,import_object() 实例源码

我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用oslo_utils.importutils.import_object()

项目:kuryr-libnetwork    作者:openstack    | 项目源码 | 文件源码
def get_driver_instance():
    """Instantiate a driver instance accordingly to the file configuration.

    :returns: a Driver instance
    :raises: exceptions.KuryrException
    """
    module, name, classname = _parse_port_driver_config()

    # TODO(apuimedo): switch to the openstack/stevedore plugin system
    try:
        driver = importutils.import_object("{0}.{1}".format(module, classname))
    except ImportError as ie:
        raise exceptions.KuryrException(
            "Cannot load port driver '{0}': {1}".format(module, ie))

    _verify_port_driver_compliancy(driver, name)
    _verify_binding_driver_compatibility(driver, name)

    return driver
项目:freezer-dr    作者:openstack    | 项目源码 | 文件源码
def fence(self, nodes=None):
        """
        Try to shutdown nodes and wait for configurable amount of times
        :return: list of nodes and either they are shutdown or failed
        """
        # update the list of nodes if required!
        if nodes:
            self.nodes = nodes
        driver_name = self.fencer_conf['driver']
        driver = importutils.import_object(
            driver_name,
            self.nodes,
            self.fencer_conf
        )
        LOG.debug('Loaded fencing driver {0} with config: '
                  '{1}'.format(driver.get_info(), self.fencer_conf))

        return driver.fence()
项目:mogan    作者:openstack    | 项目源码 | 文件源码
def load_engine_driver(engine_driver):
    """Load a engine driver module.

    Load the engine driver module specified by the engine_driver
    configuration option or, if supplied, the driver name supplied as an
    argument.

    :param engine_driver: a engine driver name to override the config opt
    :returns: a EngineDriver server
    """

    if not engine_driver:
        LOG.error("Engine driver option required, but not specified")
        sys.exit(1)

    LOG.info("Loading engine driver '%s'", engine_driver)
    try:
        driver = importutils.import_object(
            'mogan.baremetal.%s' % engine_driver)
        return utils.check_isinstance(driver, BaseEngineDriver)
    except ImportError:
        LOG.exception("Unable to load the baremetal driver")
        sys.exit(1)
项目:networking-h3c    作者:openstack    | 项目源码 | 文件源码
def __init__(self):
        self.router_scheduler = importutils.import_object(
            cfg.CONF.router_scheduler_driver)

        super(H3CL3RouterPlugin, self).__init__()
        self.vds_id = None
        self.client = rest_client.RestClient()
        self.enable_metadata = cfg.CONF.VCFCONTROLLER.enable_metadata
        self.router_binding_public_vrf = \
            cfg.CONF.VCFCONTROLLER.router_binding_public_vrf
        self.disable_internal_l3flow_offload = \
            cfg.CONF.VCFCONTROLLER.disable_internal_l3flow_offload
        self.enable_l3_router_rpc_notify = \
            cfg.CONF.VCFCONTROLLER.enable_l3_router_rpc_notify
        self.vendor_rpc_topic = cfg.CONF.VCFCONTROLLER.vendor_rpc_topic
        self.setup_rpc()

        self.enable_l3_vxlan = cfg.CONF.VCFCONTROLLER.enable_l3_vxlan
        self.h3c_l3_vxlan = h3c_l3_vxlan_db.H3CL3VxlanDriver()
        if self.enable_l3_vxlan is True:
            self.h3c_l3_vxlan.initialize()
项目:Trusted-Platform-Module-nova    作者:BU-NU-CLOUD-SP16    | 项目源码 | 文件源码
def delete(self, fixed_range=None, uuid=None):
        """Deletes a network."""
        if fixed_range is None and uuid is None:
            raise Exception(_("Please specify either fixed_range or uuid"))

        net_manager = importutils.import_object(CONF.network_manager)
        if "NeutronManager" in CONF.network_manager:
            if uuid is None:
                raise Exception(_("UUID is required to delete "
                                  "Neutron Networks"))
            if fixed_range:
                raise Exception(_("Deleting by fixed_range is not supported "
                                "with the NeutronManager"))
        # delete the network
        net_manager.delete_network(context.get_admin_context(),
            fixed_range, uuid)
项目:Trusted-Platform-Module-nova    作者:BU-NU-CLOUD-SP16    | 项目源码 | 文件源码
def __init__(self):
        try:
            self.host_manager = driver.DriverManager(
                    "nova.scheduler.host_manager",
                    CONF.scheduler_host_manager,
                    invoke_on_load=True).driver
        # TODO(Yingxin): Change to catch stevedore.exceptions.NoMatches
        # after stevedore v1.9.0
        except RuntimeError:
            # NOTE(Yingxin): Loading full class path is deprecated and
            # should be removed in the N release.
            try:
                self.host_manager = importutils.import_object(
                    CONF.scheduler_host_manager)
                LOG.warning(_LW("DEPRECATED: scheduler_host_manager uses "
                                "classloader to load %(path)s. This legacy "
                                "loading style will be removed in the "
                                "N release."),
                            {'path': CONF.scheduler_host_manager})
            except (ImportError, ValueError):
                raise RuntimeError(
                        _("Cannot load host manager from configuration "
                          "scheduler_host_manager = %(conf)s."),
                        {'conf': CONF.scheduler_host_manager})
        self.servicegroup_api = servicegroup.API()
项目:Trusted-Platform-Module-nova    作者:BU-NU-CLOUD-SP16    | 项目源码 | 文件源码
def __init__(self, network_driver=None, *args, **kwargs):
        self.driver = driver.load_network_driver(network_driver)
        self.instance_dns_manager = importutils.import_object(
                CONF.instance_dns_manager)
        self.instance_dns_domain = CONF.instance_dns_domain
        self.floating_dns_manager = importutils.import_object(
                CONF.floating_ip_dns_manager)
        self.network_api = network_api.API()
        self.network_rpcapi = network_rpcapi.NetworkAPI()
        self.security_group_api = (
            openstack_driver.get_openstack_security_group_driver())

        self.servicegroup_api = servicegroup.API()

        l3_lib = kwargs.get("l3_lib", CONF.l3_lib)
        self.l3driver = importutils.import_object(l3_lib)

        self.quotas_cls = objects.Quotas

        super(NetworkManager, self).__init__(service_name='network',
                                             *args, **kwargs)
项目:Trusted-Platform-Module-nova    作者:BU-NU-CLOUD-SP16    | 项目源码 | 文件源码
def __init__(self, session, virtapi):
        self.compute_api = compute.API()
        self._session = session
        self._virtapi = virtapi
        self._volumeops = volumeops.VolumeOps(self._session)
        self.firewall_driver = firewall.load_driver(
            DEFAULT_FIREWALL_DRIVER,
            xenapi_session=self._session)
        vif_impl = importutils.import_class(CONF.xenserver.vif_driver)
        self.vif_driver = vif_impl(xenapi_session=self._session)
        self.default_root_dev = '/dev/sda'

        LOG.debug("Importing image upload handler: %s",
                  CONF.xenserver.image_upload_handler)
        self.image_upload_handler = importutils.import_object(
                                CONF.xenserver.image_upload_handler)
项目:Trusted-Platform-Module-nova    作者:BU-NU-CLOUD-SP16    | 项目源码 | 文件源码
def setUp(self):
        super(LdapDNSTestCase, self).setUp()

        self.useFixture(fixtures.MonkeyPatch(
            'nova.network.ldapdns.ldap',
            fake_ldap))
        dns_class = 'nova.network.ldapdns.LdapDNS'
        self.driver = importutils.import_object(dns_class)

        attrs = {'objectClass': ['domainrelatedobject', 'dnsdomain',
                                 'domain', 'dcobject', 'top'],
                 'associateddomain': ['root'],
                 'dc': ['root']}
        self.driver.lobj.add_s("ou=hosts,dc=example,dc=org", attrs.items())
        self.driver.create_domain(domain1)
        self.driver.create_domain(domain2)
项目:Trusted-Platform-Module-nova    作者:BU-NU-CLOUD-SP16    | 项目源码 | 文件源码
def setUp(self):
        super(XenAPIDom0IptablesFirewallTestCase, self).setUp()
        self.flags(connection_url='test_url',
                   connection_password='test_pass',
                   group='xenserver')
        self.flags(instance_name_template='%d',
                   firewall_driver='nova.virt.xenapi.firewall.'
                                   'Dom0IptablesFirewallDriver')
        self.user_id = 'mappin'
        self.project_id = 'fake'
        stubs.stubout_session(self.stubs, stubs.FakeSessionForFirewallTests,
                              test_case=self)
        self.context = context.RequestContext(self.user_id, self.project_id)
        self.network = importutils.import_object(CONF.network_manager)
        self.conn = xenapi_conn.XenAPIDriver(fake.FakeVirtAPI(), False)
        self.fw = self.conn._vmops.firewall_driver
项目:Trusted-Platform-Module-nova    作者:BU-NU-CLOUD-SP16    | 项目源码 | 文件源码
def __init__(self, *args, **kwargs):
        '''Create an instance of the servicegroup API.

        args and kwargs are passed down to the servicegroup driver when it gets
        created.
        '''
        # Make sure report interval is less than service down time
        report_interval = CONF.report_interval
        if CONF.service_down_time <= report_interval:
            new_service_down_time = int(report_interval * 2.5)
            LOG.warning(_LW("Report interval must be less than service down "
                            "time. Current config: <service_down_time: "
                            "%(service_down_time)s, report_interval: "
                            "%(report_interval)s>. Setting service_down_time "
                            "to: %(new_service_down_time)s"),
                        {'service_down_time': CONF.service_down_time,
                         'report_interval': report_interval,
                         'new_service_down_time': new_service_down_time})
            CONF.set_override('service_down_time', new_service_down_time)

        driver_class = _driver_name_class_mapping[CONF.servicegroup_driver]
        self._driver = importutils.import_object(driver_class,
                                                 *args, **kwargs)
项目:Trusted-Platform-Module-nova    作者:BU-NU-CLOUD-SP16    | 项目源码 | 文件源码
def __init__(self, host, driver, nodename):
        self.host = host
        self.driver = driver
        self.pci_tracker = None
        self.nodename = nodename
        self.compute_node = None
        self.stats = importutils.import_object(CONF.compute_stats_class)
        self.tracked_instances = {}
        self.tracked_migrations = {}
        monitor_handler = monitors.MonitorHandler(self)
        self.monitors = monitor_handler.monitors
        self.ext_resources_handler = \
            ext_resources.ResourceHandler(CONF.compute_resources)
        self.old_resources = objects.ComputeNode()
        self.scheduler_client = scheduler_client.SchedulerClient()
        self.ram_allocation_ratio = CONF.ram_allocation_ratio
        self.cpu_allocation_ratio = CONF.cpu_allocation_ratio
        self.disk_allocation_ratio = CONF.disk_allocation_ratio
项目:ioarbiter    作者:att    | 项目源码 | 文件源码
def __init__(self, *args, **kwargs):
        super(IOArbLVMISERDriver, self).__init__(*args, **kwargs)

        LOG.warning(_LW('IOArbLVMISERDriver is deprecated, you should '
                        'now just use IOArbLVMVolumeDriver and specify '
                        'target_helper for the target driver you '
                        'wish to use. In order to enable iser, please '
                        'set iscsi_protocol with the value iser.'))

        LOG.debug('Attempting to initialize LVM driver with the '
                  'following target_driver: '
                  'cinder.volume.targets.iser.ISERTgtAdm')
        self.target_driver = importutils.import_object(
            'cinder.volume.targets.iser.ISERTgtAdm',
            configuration=self.configuration,
            db=self.db,
            executor=self._execute)
项目:ioarbiter    作者:att    | 项目源码 | 文件源码
def __init__(self, *args, **kwargs):
        super(LVMISERDriver, self).__init__(*args, **kwargs)

        LOG.warning(_LW('LVMISERDriver is deprecated, you should '
                        'now just use LVMVolumeDriver and specify '
                        'target_helper for the target driver you '
                        'wish to use. In order to enable iser, please '
                        'set iscsi_protocol with the value iser.'))

        LOG.debug('Attempting to initialize LVM driver with the '
                  'following target_driver: '
                  'cinder.volume.targets.iser.ISERTgtAdm')
        self.target_driver = importutils.import_object(
            'cinder.volume.targets.iser.ISERTgtAdm',
            configuration=self.configuration,
            db=self.db,
            executor=self._execute)
项目:networking-huawei    作者:openstack    | 项目源码 | 文件源码
def __init__(self):
        LOG.info(_LI("Init huawei l3 driver."))
        self.setup_rpc()
        self.router_scheduler = importutils.import_object(
            cfg.CONF.router_scheduler_driver)
        self.start_periodic_l3_agent_status_check()
        super(HuaweiACL3RouterPlugin, self).__init__()
        if 'dvr' in self.supported_extension_aliases:
            l3_dvrscheduler_db.subscribe()
        l3_db.subscribe()
        LOG.info(_LI("Initialization finished successfully"
                 " for huawei l3 driver."))
项目:trio2o    作者:openstack    | 项目源码 | 文件源码
def __init__(self, quota_driver_class=None):
        """Initialize a Quota object."""

        if not quota_driver_class:
            quota_driver_class = CONF.quota.quota_driver

        if isinstance(quota_driver_class, six.string_types):
            quota_driver_class = importutils.import_object(quota_driver_class)

        self._resources = {}
        self._driver = quota_driver_class
项目:meteos    作者:openstack    | 项目源码 | 文件源码
def __init__(self, learning_driver=None, service_name=None,
                 *args, **kwargs):
        """Load the driver from args, or from flags."""
        self.configuration = meteos.engine.configuration.Configuration(
            engine_manager_opts,
            config_group=service_name)
        super(LearningManager, self).__init__(*args, **kwargs)

        if not learning_driver:
            learning_driver = self.configuration.learning_driver

        self.driver = importutils.import_object(
            learning_driver, configuration=self.configuration,
        )
项目:zun    作者:openstack    | 项目源码 | 文件源码
def load_container_driver(container_driver=None):
    """Load a container driver module.

    Load the container driver module specified by the container_driver
    configuration option or, if supplied, the driver name supplied as an
    argument.
    :param container_driver: a container driver name to override the config opt
    :returns: a ContainerDriver instance
    """
    if not container_driver:
        container_driver = CONF.container_driver
        if not container_driver:
            LOG.error("Container driver option required, "
                      "but not specified")
            sys.exit(1)

    LOG.info("Loading container driver '%s'", container_driver)
    try:
        if not container_driver.startswith('zun.'):
            container_driver = 'zun.container.%s' % container_driver
        driver = importutils.import_object(container_driver)
        if not isinstance(driver, ContainerDriver):
            raise Exception(_('Expected driver of type: %s') %
                            str(ContainerDriver))

        return driver
    except ImportError:
        LOG.exception("Unable to load the container driver")
        sys.exit(1)
项目:deb-oslo.utils    作者:openstack    | 项目源码 | 文件源码
def test_import_object_optional_arg_not_present(self):
        obj = importutils.import_object('oslo_utils.tests.fake.FakeDriver')
        self.assertEqual(obj.__class__.__name__, 'FakeDriver')
项目:deb-oslo.utils    作者:openstack    | 项目源码 | 文件源码
def test_import_object_optional_arg_present(self):
        obj = importutils.import_object('oslo_utils.tests.fake.FakeDriver',
                                        first_arg=False)
        self.assertEqual(obj.__class__.__name__, 'FakeDriver')
项目:deb-oslo.utils    作者:openstack    | 项目源码 | 文件源码
def test_import_object_required_arg_not_present(self):
        # arg 1 isn't optional here
        self.assertRaises(TypeError, importutils.import_object,
                          'oslo_utils.tests.fake.FakeDriver2')
项目:deb-oslo.utils    作者:openstack    | 项目源码 | 文件源码
def test_import_object_required_arg_present(self):
        obj = importutils.import_object('oslo_utils.tests.fake.FakeDriver2',
                                        first_arg=False)
        self.assertEqual(obj.__class__.__name__, 'FakeDriver2')

    # namespace tests
项目:deb-oslo.utils    作者:openstack    | 项目源码 | 文件源码
def test_import_object(self):
        dt = importutils.import_object('datetime.time')
        self.assertTrue(isinstance(dt, sys.modules['datetime'].time))
项目:fuxi    作者:openstack    | 项目源码 | 文件源码
def __init__(self):
        super(Manila, self).__init__()
        self.manilaclient = utils.get_manilaclient()

        conn_conf = manila_conf.volume_connector
        if not conn_conf or conn_conf not in volume_connector_conf:
            msg = _("Must provide a valid volume connector")
            LOG.error(msg)
            raise exceptions.InvalidInput(msg)
        self.connector = importutils.import_object(
            volume_connector_conf[conn_conf],
            manilaclient=self.manilaclient)
项目:freezer-dr    作者:openstack    | 项目源码 | 文件源码
def __init__(self, notifier):
        monitor = CONF.get('monitoring')
        backend_name = monitor['backend_name']
        self.driver = importutils.import_object(
            monitor.driver,
            backend_name=backend_name,
            notifier=notifier
        )
        driver_info = self.driver.get_info()
        LOG.info('Initializing driver %s with version %s found in %s' %
                 (driver_info['name'], driver_info['version'],
                  monitor.get('driver')))
项目:freezer-dr    作者:openstack    | 项目源码 | 文件源码
def __init__(self):
        notifer_conf = CONF.get('notifiers')
        self.driver = importutils.import_object(
            notifer_conf.get('driver'),
            notifer_conf.get('endpoint'),
            notifer_conf.get('username'),
            notifer_conf.get('password'),
            notifer_conf.get('templates-dir'),
            notifer_conf.get('notify-from'),
            notifer_conf.get('notify-list'),
            **notifer_conf.get('options')
        )
项目:freezer-dr    作者:openstack    | 项目源码 | 文件源码
def evacuate(self, nodes):
        fencer = FencerManager(nodes)
        evacuation_conf = CONF.get('evacuation')
        driver = importutils.import_object(
            evacuation_conf['driver'],
            nodes,
            evacuation_conf,
            fencer
        )

        return driver.evacuate(self.enable_fencing)
项目:tc-as-a-service    作者:openstack    | 项目源码 | 文件源码
def __init__(self):
        self.db = wan_qos_db.WanTcDb()
        rpc_callback = importutils.import_object(
            'wan_qos.services.plugin.PluginRpcCallback', self)
        endpoints = (
            [rpc_callback, agents_db.AgentExtRpcCallback()])
        self.agent_rpc = api.TcAgentApi(cfg.CONF.host)
        self.conn = n_rpc.create_connection()
        self.conn.create_consumer(topics.TC_PLUGIN,
                                  endpoints,
                                  fanout=False)
        self.conn.consume_in_threads()
项目:neutron-dynamic-routing    作者:openstack    | 项目源码 | 文件源码
def setUp(self):
        super(TestBgpAgentFilter, self).setUp()
        self.bgp_drscheduler = importutils.import_object(
            'neutron_dynamic_routing.services.bgp.scheduler.'
            'bgp_dragent_scheduler.ChanceScheduler'
        )
        self.plugin = self
项目:neutron-dynamic-routing    作者:openstack    | 项目源码 | 文件源码
def __init__(self):
        super(BgpPlugin, self).__init__()
        self.bgp_drscheduler = importutils.import_object(
            cfg.CONF.bgp_drscheduler_driver)
        self._setup_rpc()
        self._register_callbacks()
        self.add_periodic_dragent_status_check()
项目:neutron-dynamic-routing    作者:openstack    | 项目源码 | 文件源码
def initialize_driver(self, conf):
        self.conf = conf or cfg.CONF.BGP
        try:
            self.dr_driver_cls = (
                    importutils.import_object(self.conf.bgp_speaker_driver,
                                              self.conf))
        except ImportError:
            LOG.exception(_LE("Error while importing BGP speaker driver %s"),
                          self.conf.bgp_speaker_driver)
            raise SystemExit(1)
项目:mogan    作者:openstack    | 项目源码 | 文件源码
def __init__(self, topic, host=None):
        super(SchedulerManager, self).__init__()
        self.host = host or CONF.host
        self.topic = topic
        scheduler_driver = CONF.scheduler.scheduler_driver
        self.driver = importutils.import_object(scheduler_driver)
        self._startup_delay = True
项目:networking-huawei    作者:libuparayil    | 项目源码 | 文件源码
def __init__(self):
        LOG.info(_LI("Init huawei l3 driver."))
        self.setup_rpc()
        self.router_scheduler = importutils.import_object(
            cfg.CONF.router_scheduler_driver)
        self.start_periodic_l3_agent_status_check()
        super(HuaweiACL3RouterPlugin, self).__init__()
        if 'dvr' in self.supported_extension_aliases:
            l3_dvrscheduler_db.subscribe()
        l3_db.subscribe()
        LOG.info(_LI("Initialization finished successfully"
                 " for huawei l3 driver."))
项目:masakari    作者:openstack    | 项目源码 | 文件源码
def setUp(self):
        super(EngineManagerUnitTestCase, self).setUp()
        self.engine = importutils.import_object(CONF.engine_manager)
        self.context = context.RequestContext()
项目:Trusted-Platform-Module-nova    作者:BU-NU-CLOUD-SP16    | 项目源码 | 文件源码
def create(self, label=None, cidr=None, num_networks=None,
               network_size=None, multi_host=None, vlan=None,
               vlan_start=None, vpn_start=None, cidr_v6=None, gateway=None,
               gateway_v6=None, bridge=None, bridge_interface=None,
               dns1=None, dns2=None, project_id=None, priority=None,
               uuid=None, fixed_cidr=None):
        """Creates fixed IPs for host by range."""
        kwargs = {k: v for k, v in six.iteritems(locals())
                  if v and k != "self"}
        if multi_host is not None:
            kwargs['multi_host'] = multi_host == 'T'
        net_manager = importutils.import_object(CONF.network_manager)
        net_manager.create_networks(context.get_admin_context(), **kwargs)
项目:Trusted-Platform-Module-nova    作者:BU-NU-CLOUD-SP16    | 项目源码 | 文件源码
def init_leases(network_id):
    """Get the list of hosts for a network."""
    ctxt = context.get_admin_context()
    network = objects.Network.get_by_id(ctxt, network_id)
    network_manager = importutils.import_object(CONF.network_manager)
    return network_manager.get_dhcp_leases(ctxt, network)
项目:Trusted-Platform-Module-nova    作者:BU-NU-CLOUD-SP16    | 项目源码 | 文件源码
def __init__(self, scheduler_driver=None, *args, **kwargs):
        if not scheduler_driver:
            scheduler_driver = CONF.scheduler_driver
        try:
            self.driver = driver.DriverManager(
                    "nova.scheduler.driver",
                    scheduler_driver,
                    invoke_on_load=True).driver
        # TODO(Yingxin): Change to catch stevedore.exceptions.NoMatches after
        # stevedore v1.9.0
        except RuntimeError:
            # NOTE(Yingxin): Loading full class path is deprecated and should
            # be removed in the N release.
            try:
                self.driver = importutils.import_object(scheduler_driver)
                LOG.warning(_LW("DEPRECATED: scheduler_driver uses "
                                "classloader to load %(path)s. This legacy "
                                "loading style will be removed in the "
                                "N release."),
                            {'path': scheduler_driver})
            except (ImportError, ValueError):
                raise RuntimeError(
                        _("Cannot load scheduler driver from configuration "
                          "%(conf)s."),
                        {'conf': scheduler_driver})
        super(SchedulerManager, self).__init__(service_name='scheduler',
                                               *args, **kwargs)
项目:Trusted-Platform-Module-nova    作者:BU-NU-CLOUD-SP16    | 项目源码 | 文件源码
def _get_openstack_security_group_driver(skip_policy_check=False):
    if is_neutron_security_groups():
        return importutils.import_object(NEUTRON_DRIVER,
                                         skip_policy_check=skip_policy_check)
    elif CONF.security_group_api.lower() == 'nova':
        return importutils.import_object(NOVA_DRIVER,
                                         skip_policy_check=skip_policy_check)
    else:
        return importutils.import_object(CONF.security_group_api,
                                         skip_policy_check=skip_policy_check)
项目:Trusted-Platform-Module-nova    作者:BU-NU-CLOUD-SP16    | 项目源码 | 文件源码
def _get_interface_driver():
    global interface_driver
    if not interface_driver:
        interface_driver = importutils.import_object(
                CONF.linuxnet_interface_driver)
    return interface_driver
项目:Trusted-Platform-Module-nova    作者:BU-NU-CLOUD-SP16    | 项目源码 | 文件源码
def get_volume_encryptor(connection_info, **kwargs):
    """Creates a VolumeEncryptor used to encrypt the specified volume.

    :param: the connection information used to attach the volume
    :returns VolumeEncryptor: the VolumeEncryptor for the volume
    """
    encryptor = nop.NoOpEncryptor(connection_info, **kwargs)

    location = kwargs.get('control_location', None)
    if location and location.lower() == 'front-end':  # case insensitive
        provider = kwargs.get('provider')

        if provider == 'LuksEncryptor':
            provider = 'nova.volume.encryptors.luks.' + provider
        elif provider == 'CryptsetupEncryptor':
            provider = 'nova.volume.encryptors.cryptsetup.' + provider
        elif provider == 'NoOpEncryptor':
            provider = 'nova.volume.encryptors.nop.' + provider
        try:
            encryptor = importutils.import_object(provider, connection_info,
                                                  **kwargs)
        except Exception as e:
            LOG.error(_LE("Error instantiating %(provider)s: %(exception)s"),
                      {'provider': provider, 'exception': e})
            raise

    msg = ("Using volume encryptor '%(encryptor)s' for connection: "
           "%(connection_info)s" %
           {'encryptor': encryptor, 'connection_info': connection_info})
    LOG.debug(strutils.mask_password(msg))

    return encryptor
项目:Trusted-Platform-Module-nova    作者:BU-NU-CLOUD-SP16    | 项目源码 | 文件源码
def __init__(self):
        transport = CONF.libvirt.remote_filesystem_transport
        cls_name = '.'.join([__name__, transport.capitalize()])
        cls_name += 'Driver'
        self.driver = importutils.import_object(cls_name)
项目:Trusted-Platform-Module-nova    作者:BU-NU-CLOUD-SP16    | 项目源码 | 文件源码
def instance_for_format(image, mountdir, partition):
        """Get a Mount instance for the image type

        :param image: instance of nova.virt.image.model.Image
        :param mountdir: path to mount the image at
        :param partition: partition number to mount
        """
        LOG.debug("Instance for format image=%(image)s "
                  "mountdir=%(mountdir)s partition=%(partition)s",
                  {'image': image, 'mountdir': mountdir,
                   'partition': partition})

        if isinstance(image, imgmodel.LocalFileImage):
            if image.format == imgmodel.FORMAT_RAW:
                LOG.debug("Using LoopMount")
                return importutils.import_object(
                    "nova.virt.disk.mount.loop.LoopMount",
                    image, mountdir, partition)
            else:
                LOG.debug("Using NbdMount")
                return importutils.import_object(
                    "nova.virt.disk.mount.nbd.NbdMount",
                    image, mountdir, partition)
        elif isinstance(image, imgmodel.LocalBlockImage):
            LOG.debug("Using BlockMount")
            return importutils.import_object(
                    "nova.virt.disk.mount.block.BlockMount",
                    image, mountdir, partition)
        else:
            # TODO(berrange) We could mount RBDImage directly
            # using kernel RBD block dev support.
            #
            # This is left as an enhancement for future
            # motivated developers todo, since raising
            # an exception is on par with what this
            # code did historically
            raise exception.UnsupportedImageModel(
                image.__class__.__name__)
项目:Trusted-Platform-Module-nova    作者:BU-NU-CLOUD-SP16    | 项目源码 | 文件源码
def instance_for_device(image, mountdir, partition, device):
        """Get a Mount instance for the device type

        :param image: instance of nova.virt.image.model.Image
        :param mountdir: path to mount the image at
        :param partition: partition number to mount
        :param device: mounted device path
        """

        LOG.debug("Instance for device image=%(image)s "
                  "mountdir=%(mountdir)s partition=%(partition)s "
                  "device=%(device)s",
                  {'image': image, 'mountdir': mountdir,
                   'partition': partition, 'device': device})

        if "loop" in device:
            LOG.debug("Using LoopMount")
            return importutils.import_object(
                "nova.virt.disk.mount.loop.LoopMount",
                image, mountdir, partition, device)
        elif "nbd" in device:
            LOG.debug("Using NbdMount")
            return importutils.import_object(
                "nova.virt.disk.mount.nbd.NbdMount",
                image, mountdir, partition, device)
        else:
            LOG.debug("Using BlockMount")
            return importutils.import_object(
                "nova.virt.disk.mount.block.BlockMount",
                image, mountdir, partition, device)
项目:Trusted-Platform-Module-nova    作者:BU-NU-CLOUD-SP16    | 项目源码 | 文件源码
def instance_for_image(image, partition):
        """Get a VFS instance for the image

        :param image: instance of nova.virt.image.model.Image
        :param partition: the partition number to access
        """

        LOG.debug("Instance for image image=%(image)s "
                  "partition=%(partition)s",
                  {'image': image, 'partition': partition})

        vfs = None
        try:
            LOG.debug("Using primary VFSGuestFS")
            vfs = importutils.import_object(
                "nova.virt.disk.vfs.guestfs.VFSGuestFS",
                image, partition)
            if not VFS.guestfs_ready:
                # Inspect for capabilities and keep
                # track of the result only if succeeded.
                vfs.inspect_capabilities()
                VFS.guestfs_ready = True
            return vfs
        except exception.NovaException:
            if vfs is not None:
                # We are able to load libguestfs but
                # something wrong happens when trying to
                # check for capabilities.
                raise
            else:
                LOG.info(_LI("Unable to import guestfs, "
                             "falling back to VFSLocalFS"))

        return importutils.import_object(
            "nova.virt.disk.vfs.localfs.VFSLocalFS",
            image, partition)
项目:Trusted-Platform-Module-nova    作者:BU-NU-CLOUD-SP16    | 项目源码 | 文件源码
def _default_download_handler():
    # TODO(sirp):  This should be configurable like upload_handler
    return importutils.import_object(
            'nova.virt.xenapi.image.glance.GlanceStore')
项目:Trusted-Platform-Module-nova    作者:BU-NU-CLOUD-SP16    | 项目源码 | 文件源码
def test_live_migration_uses_migrateToURI_without_migratable_flag(self):
        self.compute = importutils.import_object(CONF.compute_manager)
        instance_dict = dict(self.test_instance)
        instance_dict.update({'host': 'fake',
                              'power_state': power_state.RUNNING,
                              'vm_state': vm_states.ACTIVE})
        instance_ref = objects.Instance(**instance_dict)

        drvr = libvirt_driver.LibvirtDriver(fake.FakeVirtAPI(), False)

        # Preparing mocks
        vdmock = self.mox.CreateMock(fakelibvirt.virDomain)
        self.mox.StubOutWithMock(vdmock, "migrateToURI")
        _bandwidth = CONF.libvirt.live_migration_bandwidth
        vdmock.migrateToURI(drvr._live_migration_uri('dest'),
                            mox.IgnoreArg(),
                            None,
                            _bandwidth).AndRaise(
                                fakelibvirt.libvirtError("ERR"))

        # start test
        migrate_data = objects.LibvirtLiveMigrateData(
            graphics_listen_addr_vnc='0.0.0.0',
            graphics_listen_addr_spice='0.0.0.0',
            serial_listen_addr='127.0.0.1',
            target_connect_addr=None,
            bdms=[],
            block_migration=False)
        self.mox.ReplayAll()
        self.assertRaises(fakelibvirt.libvirtError,
                          drvr._live_migration_operation,
                          self.context, instance_ref, 'dest',
                          False, migrate_data, vdmock, [])
项目:Trusted-Platform-Module-nova    作者:BU-NU-CLOUD-SP16    | 项目源码 | 文件源码
def test_live_migration_uses_migrateToURI_without_dest_listen_addrs(self):
        self.compute = importutils.import_object(CONF.compute_manager)
        instance_dict = dict(self.test_instance)
        instance_dict.update({'host': 'fake',
                              'power_state': power_state.RUNNING,
                              'vm_state': vm_states.ACTIVE})
        instance_ref = objects.Instance(**instance_dict)

        drvr = libvirt_driver.LibvirtDriver(fake.FakeVirtAPI(), False)

        # Preparing mocks
        vdmock = self.mox.CreateMock(fakelibvirt.virDomain)
        self.mox.StubOutWithMock(vdmock, "migrateToURI")
        _bandwidth = CONF.libvirt.live_migration_bandwidth
        vdmock.migrateToURI(drvr._live_migration_uri('dest'),
                            mox.IgnoreArg(),
                            None,
                            _bandwidth).AndRaise(
                                fakelibvirt.libvirtError("ERR"))

        # start test
        migrate_data = objects.LibvirtLiveMigrateData(
            serial_listen_addr='',
            target_connect_addr=None,
            bdms=[],
            block_migration=False)
        self.mox.ReplayAll()
        self.assertRaises(fakelibvirt.libvirtError,
                          drvr._live_migration_operation,
                          self.context, instance_ref, 'dest',
                          False, migrate_data, vdmock, [])
项目:Trusted-Platform-Module-nova    作者:BU-NU-CLOUD-SP16    | 项目源码 | 文件源码
def test_live_migration_fails_without_migratable_flag_or_0_addr(self):
        self.flags(enabled=True, vncserver_listen='1.2.3.4', group='vnc')
        self.compute = importutils.import_object(CONF.compute_manager)
        instance_dict = dict(self.test_instance)
        instance_dict.update({'host': 'fake',
                              'power_state': power_state.RUNNING,
                              'vm_state': vm_states.ACTIVE})
        instance_ref = objects.Instance(**instance_dict)

        # Preparing mocks
        vdmock = self.mox.CreateMock(fakelibvirt.virDomain)
        self.mox.StubOutWithMock(vdmock, "migrateToURI")

        # start test
        migrate_data = objects.LibvirtLiveMigrateData(
            graphics_listen_addr_vnc='1.2.3.4',
            graphics_listen_addr_spice='1.2.3.4',
            serial_listen_addr='127.0.0.1',
            target_connect_addr=None,
            block_migration=False)
        self.mox.ReplayAll()
        drvr = libvirt_driver.LibvirtDriver(fake.FakeVirtAPI(), False)
        self.assertRaises(exception.MigrationError,
                          drvr._live_migration_operation,
                          self.context, instance_ref, 'dest',
                          False, migrate_data, vdmock, [])
项目:Trusted-Platform-Module-nova    作者:BU-NU-CLOUD-SP16    | 项目源码 | 文件源码
def test_run_image_cache_manager_pass(self):
        was = {'called': False}

        def fake_get_all_by_filters(context, *args, **kwargs):
            was['called'] = True
            instances = []
            for x in range(2):
                instances.append(fake_instance.fake_db_instance(
                                                        image_ref='1',
                                                        uuid=x,
                                                        name=x,
                                                        vm_state='',
                                                        task_state=''))
            return instances

        with utils.tempdir() as tmpdir:
            self.flags(instances_path=tmpdir)

            self.stub_out('nova.db.instance_get_all_by_filters',
                          fake_get_all_by_filters)
            compute = importutils.import_object(CONF.compute_manager)
            self.flags(use_local=True, group='conductor')
            compute.conductor_api = conductor.API()
            ctxt = context.get_admin_context()
            compute._run_image_cache_manager_pass(ctxt)
            self.assertTrue(was['called'])
项目:Trusted-Platform-Module-nova    作者:BU-NU-CLOUD-SP16    | 项目源码 | 文件源码
def setUp(self):
        super(XenAPIAggregateTestCase, self).setUp()
        self.flags(connection_url='http://test_url',
                   connection_username='test_user',
                   connection_password='test_pass',
                   group='xenserver')
        self.flags(instance_name_template='%d',
                   firewall_driver='nova.virt.xenapi.firewall.'
                                   'Dom0IptablesFirewallDriver',
                   host='host',
                   compute_driver='xenapi.XenAPIDriver',
                   default_availability_zone='avail_zone1')
        self.flags(use_local=True, group='conductor')
        host_ref = xenapi_fake.get_all('host')[0]
        stubs.stubout_session(self.stubs, stubs.FakeSessionForVMTests)
        self.context = context.get_admin_context()
        self.conn = xenapi_conn.XenAPIDriver(fake.FakeVirtAPI(), False)
        self.compute = importutils.import_object(CONF.compute_manager)
        self.api = compute_api.AggregateAPI()
        values = {'name': 'test_aggr',
                  'metadata': {'availability_zone': 'test_zone',
                  pool_states.POOL_FLAG: 'XenAPI'}}
        self.aggr = objects.Aggregate(context=self.context, id=1,
                                      **values)
        self.fake_metadata = {pool_states.POOL_FLAG: 'XenAPI',
                              'master_compute': 'host',
                              'availability_zone': 'fake_zone',
                              pool_states.KEY: pool_states.ACTIVE,
                              'host': xenapi_fake.get_record('host',
                                                             host_ref)['uuid']}