我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用oslo_utils.importutils.import_object()。
def get_driver_instance(): """Instantiate a driver instance accordingly to the file configuration. :returns: a Driver instance :raises: exceptions.KuryrException """ module, name, classname = _parse_port_driver_config() # TODO(apuimedo): switch to the openstack/stevedore plugin system try: driver = importutils.import_object("{0}.{1}".format(module, classname)) except ImportError as ie: raise exceptions.KuryrException( "Cannot load port driver '{0}': {1}".format(module, ie)) _verify_port_driver_compliancy(driver, name) _verify_binding_driver_compatibility(driver, name) return driver
def fence(self, nodes=None): """ Try to shutdown nodes and wait for configurable amount of times :return: list of nodes and either they are shutdown or failed """ # update the list of nodes if required! if nodes: self.nodes = nodes driver_name = self.fencer_conf['driver'] driver = importutils.import_object( driver_name, self.nodes, self.fencer_conf ) LOG.debug('Loaded fencing driver {0} with config: ' '{1}'.format(driver.get_info(), self.fencer_conf)) return driver.fence()
def load_engine_driver(engine_driver): """Load a engine driver module. Load the engine driver module specified by the engine_driver configuration option or, if supplied, the driver name supplied as an argument. :param engine_driver: a engine driver name to override the config opt :returns: a EngineDriver server """ if not engine_driver: LOG.error("Engine driver option required, but not specified") sys.exit(1) LOG.info("Loading engine driver '%s'", engine_driver) try: driver = importutils.import_object( 'mogan.baremetal.%s' % engine_driver) return utils.check_isinstance(driver, BaseEngineDriver) except ImportError: LOG.exception("Unable to load the baremetal driver") sys.exit(1)
def __init__(self): self.router_scheduler = importutils.import_object( cfg.CONF.router_scheduler_driver) super(H3CL3RouterPlugin, self).__init__() self.vds_id = None self.client = rest_client.RestClient() self.enable_metadata = cfg.CONF.VCFCONTROLLER.enable_metadata self.router_binding_public_vrf = \ cfg.CONF.VCFCONTROLLER.router_binding_public_vrf self.disable_internal_l3flow_offload = \ cfg.CONF.VCFCONTROLLER.disable_internal_l3flow_offload self.enable_l3_router_rpc_notify = \ cfg.CONF.VCFCONTROLLER.enable_l3_router_rpc_notify self.vendor_rpc_topic = cfg.CONF.VCFCONTROLLER.vendor_rpc_topic self.setup_rpc() self.enable_l3_vxlan = cfg.CONF.VCFCONTROLLER.enable_l3_vxlan self.h3c_l3_vxlan = h3c_l3_vxlan_db.H3CL3VxlanDriver() if self.enable_l3_vxlan is True: self.h3c_l3_vxlan.initialize()
def delete(self, fixed_range=None, uuid=None): """Deletes a network.""" if fixed_range is None and uuid is None: raise Exception(_("Please specify either fixed_range or uuid")) net_manager = importutils.import_object(CONF.network_manager) if "NeutronManager" in CONF.network_manager: if uuid is None: raise Exception(_("UUID is required to delete " "Neutron Networks")) if fixed_range: raise Exception(_("Deleting by fixed_range is not supported " "with the NeutronManager")) # delete the network net_manager.delete_network(context.get_admin_context(), fixed_range, uuid)
def __init__(self): try: self.host_manager = driver.DriverManager( "nova.scheduler.host_manager", CONF.scheduler_host_manager, invoke_on_load=True).driver # TODO(Yingxin): Change to catch stevedore.exceptions.NoMatches # after stevedore v1.9.0 except RuntimeError: # NOTE(Yingxin): Loading full class path is deprecated and # should be removed in the N release. try: self.host_manager = importutils.import_object( CONF.scheduler_host_manager) LOG.warning(_LW("DEPRECATED: scheduler_host_manager uses " "classloader to load %(path)s. This legacy " "loading style will be removed in the " "N release."), {'path': CONF.scheduler_host_manager}) except (ImportError, ValueError): raise RuntimeError( _("Cannot load host manager from configuration " "scheduler_host_manager = %(conf)s."), {'conf': CONF.scheduler_host_manager}) self.servicegroup_api = servicegroup.API()
def __init__(self, network_driver=None, *args, **kwargs): self.driver = driver.load_network_driver(network_driver) self.instance_dns_manager = importutils.import_object( CONF.instance_dns_manager) self.instance_dns_domain = CONF.instance_dns_domain self.floating_dns_manager = importutils.import_object( CONF.floating_ip_dns_manager) self.network_api = network_api.API() self.network_rpcapi = network_rpcapi.NetworkAPI() self.security_group_api = ( openstack_driver.get_openstack_security_group_driver()) self.servicegroup_api = servicegroup.API() l3_lib = kwargs.get("l3_lib", CONF.l3_lib) self.l3driver = importutils.import_object(l3_lib) self.quotas_cls = objects.Quotas super(NetworkManager, self).__init__(service_name='network', *args, **kwargs)
def __init__(self, session, virtapi): self.compute_api = compute.API() self._session = session self._virtapi = virtapi self._volumeops = volumeops.VolumeOps(self._session) self.firewall_driver = firewall.load_driver( DEFAULT_FIREWALL_DRIVER, xenapi_session=self._session) vif_impl = importutils.import_class(CONF.xenserver.vif_driver) self.vif_driver = vif_impl(xenapi_session=self._session) self.default_root_dev = '/dev/sda' LOG.debug("Importing image upload handler: %s", CONF.xenserver.image_upload_handler) self.image_upload_handler = importutils.import_object( CONF.xenserver.image_upload_handler)
def setUp(self): super(LdapDNSTestCase, self).setUp() self.useFixture(fixtures.MonkeyPatch( 'nova.network.ldapdns.ldap', fake_ldap)) dns_class = 'nova.network.ldapdns.LdapDNS' self.driver = importutils.import_object(dns_class) attrs = {'objectClass': ['domainrelatedobject', 'dnsdomain', 'domain', 'dcobject', 'top'], 'associateddomain': ['root'], 'dc': ['root']} self.driver.lobj.add_s("ou=hosts,dc=example,dc=org", attrs.items()) self.driver.create_domain(domain1) self.driver.create_domain(domain2)
def setUp(self): super(XenAPIDom0IptablesFirewallTestCase, self).setUp() self.flags(connection_url='test_url', connection_password='test_pass', group='xenserver') self.flags(instance_name_template='%d', firewall_driver='nova.virt.xenapi.firewall.' 'Dom0IptablesFirewallDriver') self.user_id = 'mappin' self.project_id = 'fake' stubs.stubout_session(self.stubs, stubs.FakeSessionForFirewallTests, test_case=self) self.context = context.RequestContext(self.user_id, self.project_id) self.network = importutils.import_object(CONF.network_manager) self.conn = xenapi_conn.XenAPIDriver(fake.FakeVirtAPI(), False) self.fw = self.conn._vmops.firewall_driver
def __init__(self, *args, **kwargs): '''Create an instance of the servicegroup API. args and kwargs are passed down to the servicegroup driver when it gets created. ''' # Make sure report interval is less than service down time report_interval = CONF.report_interval if CONF.service_down_time <= report_interval: new_service_down_time = int(report_interval * 2.5) LOG.warning(_LW("Report interval must be less than service down " "time. Current config: <service_down_time: " "%(service_down_time)s, report_interval: " "%(report_interval)s>. Setting service_down_time " "to: %(new_service_down_time)s"), {'service_down_time': CONF.service_down_time, 'report_interval': report_interval, 'new_service_down_time': new_service_down_time}) CONF.set_override('service_down_time', new_service_down_time) driver_class = _driver_name_class_mapping[CONF.servicegroup_driver] self._driver = importutils.import_object(driver_class, *args, **kwargs)
def __init__(self, host, driver, nodename): self.host = host self.driver = driver self.pci_tracker = None self.nodename = nodename self.compute_node = None self.stats = importutils.import_object(CONF.compute_stats_class) self.tracked_instances = {} self.tracked_migrations = {} monitor_handler = monitors.MonitorHandler(self) self.monitors = monitor_handler.monitors self.ext_resources_handler = \ ext_resources.ResourceHandler(CONF.compute_resources) self.old_resources = objects.ComputeNode() self.scheduler_client = scheduler_client.SchedulerClient() self.ram_allocation_ratio = CONF.ram_allocation_ratio self.cpu_allocation_ratio = CONF.cpu_allocation_ratio self.disk_allocation_ratio = CONF.disk_allocation_ratio
def __init__(self, *args, **kwargs): super(IOArbLVMISERDriver, self).__init__(*args, **kwargs) LOG.warning(_LW('IOArbLVMISERDriver is deprecated, you should ' 'now just use IOArbLVMVolumeDriver and specify ' 'target_helper for the target driver you ' 'wish to use. In order to enable iser, please ' 'set iscsi_protocol with the value iser.')) LOG.debug('Attempting to initialize LVM driver with the ' 'following target_driver: ' 'cinder.volume.targets.iser.ISERTgtAdm') self.target_driver = importutils.import_object( 'cinder.volume.targets.iser.ISERTgtAdm', configuration=self.configuration, db=self.db, executor=self._execute)
def __init__(self, *args, **kwargs): super(LVMISERDriver, self).__init__(*args, **kwargs) LOG.warning(_LW('LVMISERDriver is deprecated, you should ' 'now just use LVMVolumeDriver and specify ' 'target_helper for the target driver you ' 'wish to use. In order to enable iser, please ' 'set iscsi_protocol with the value iser.')) LOG.debug('Attempting to initialize LVM driver with the ' 'following target_driver: ' 'cinder.volume.targets.iser.ISERTgtAdm') self.target_driver = importutils.import_object( 'cinder.volume.targets.iser.ISERTgtAdm', configuration=self.configuration, db=self.db, executor=self._execute)
def __init__(self): LOG.info(_LI("Init huawei l3 driver.")) self.setup_rpc() self.router_scheduler = importutils.import_object( cfg.CONF.router_scheduler_driver) self.start_periodic_l3_agent_status_check() super(HuaweiACL3RouterPlugin, self).__init__() if 'dvr' in self.supported_extension_aliases: l3_dvrscheduler_db.subscribe() l3_db.subscribe() LOG.info(_LI("Initialization finished successfully" " for huawei l3 driver."))
def __init__(self, quota_driver_class=None): """Initialize a Quota object.""" if not quota_driver_class: quota_driver_class = CONF.quota.quota_driver if isinstance(quota_driver_class, six.string_types): quota_driver_class = importutils.import_object(quota_driver_class) self._resources = {} self._driver = quota_driver_class
def __init__(self, learning_driver=None, service_name=None, *args, **kwargs): """Load the driver from args, or from flags.""" self.configuration = meteos.engine.configuration.Configuration( engine_manager_opts, config_group=service_name) super(LearningManager, self).__init__(*args, **kwargs) if not learning_driver: learning_driver = self.configuration.learning_driver self.driver = importutils.import_object( learning_driver, configuration=self.configuration, )
def load_container_driver(container_driver=None): """Load a container driver module. Load the container driver module specified by the container_driver configuration option or, if supplied, the driver name supplied as an argument. :param container_driver: a container driver name to override the config opt :returns: a ContainerDriver instance """ if not container_driver: container_driver = CONF.container_driver if not container_driver: LOG.error("Container driver option required, " "but not specified") sys.exit(1) LOG.info("Loading container driver '%s'", container_driver) try: if not container_driver.startswith('zun.'): container_driver = 'zun.container.%s' % container_driver driver = importutils.import_object(container_driver) if not isinstance(driver, ContainerDriver): raise Exception(_('Expected driver of type: %s') % str(ContainerDriver)) return driver except ImportError: LOG.exception("Unable to load the container driver") sys.exit(1)
def test_import_object_optional_arg_not_present(self): obj = importutils.import_object('oslo_utils.tests.fake.FakeDriver') self.assertEqual(obj.__class__.__name__, 'FakeDriver')
def test_import_object_optional_arg_present(self): obj = importutils.import_object('oslo_utils.tests.fake.FakeDriver', first_arg=False) self.assertEqual(obj.__class__.__name__, 'FakeDriver')
def test_import_object_required_arg_not_present(self): # arg 1 isn't optional here self.assertRaises(TypeError, importutils.import_object, 'oslo_utils.tests.fake.FakeDriver2')
def test_import_object_required_arg_present(self): obj = importutils.import_object('oslo_utils.tests.fake.FakeDriver2', first_arg=False) self.assertEqual(obj.__class__.__name__, 'FakeDriver2') # namespace tests
def test_import_object(self): dt = importutils.import_object('datetime.time') self.assertTrue(isinstance(dt, sys.modules['datetime'].time))
def __init__(self): super(Manila, self).__init__() self.manilaclient = utils.get_manilaclient() conn_conf = manila_conf.volume_connector if not conn_conf or conn_conf not in volume_connector_conf: msg = _("Must provide a valid volume connector") LOG.error(msg) raise exceptions.InvalidInput(msg) self.connector = importutils.import_object( volume_connector_conf[conn_conf], manilaclient=self.manilaclient)
def __init__(self, notifier): monitor = CONF.get('monitoring') backend_name = monitor['backend_name'] self.driver = importutils.import_object( monitor.driver, backend_name=backend_name, notifier=notifier ) driver_info = self.driver.get_info() LOG.info('Initializing driver %s with version %s found in %s' % (driver_info['name'], driver_info['version'], monitor.get('driver')))
def __init__(self): notifer_conf = CONF.get('notifiers') self.driver = importutils.import_object( notifer_conf.get('driver'), notifer_conf.get('endpoint'), notifer_conf.get('username'), notifer_conf.get('password'), notifer_conf.get('templates-dir'), notifer_conf.get('notify-from'), notifer_conf.get('notify-list'), **notifer_conf.get('options') )
def evacuate(self, nodes): fencer = FencerManager(nodes) evacuation_conf = CONF.get('evacuation') driver = importutils.import_object( evacuation_conf['driver'], nodes, evacuation_conf, fencer ) return driver.evacuate(self.enable_fencing)
def __init__(self): self.db = wan_qos_db.WanTcDb() rpc_callback = importutils.import_object( 'wan_qos.services.plugin.PluginRpcCallback', self) endpoints = ( [rpc_callback, agents_db.AgentExtRpcCallback()]) self.agent_rpc = api.TcAgentApi(cfg.CONF.host) self.conn = n_rpc.create_connection() self.conn.create_consumer(topics.TC_PLUGIN, endpoints, fanout=False) self.conn.consume_in_threads()
def setUp(self): super(TestBgpAgentFilter, self).setUp() self.bgp_drscheduler = importutils.import_object( 'neutron_dynamic_routing.services.bgp.scheduler.' 'bgp_dragent_scheduler.ChanceScheduler' ) self.plugin = self
def __init__(self): super(BgpPlugin, self).__init__() self.bgp_drscheduler = importutils.import_object( cfg.CONF.bgp_drscheduler_driver) self._setup_rpc() self._register_callbacks() self.add_periodic_dragent_status_check()
def initialize_driver(self, conf): self.conf = conf or cfg.CONF.BGP try: self.dr_driver_cls = ( importutils.import_object(self.conf.bgp_speaker_driver, self.conf)) except ImportError: LOG.exception(_LE("Error while importing BGP speaker driver %s"), self.conf.bgp_speaker_driver) raise SystemExit(1)
def __init__(self, topic, host=None): super(SchedulerManager, self).__init__() self.host = host or CONF.host self.topic = topic scheduler_driver = CONF.scheduler.scheduler_driver self.driver = importutils.import_object(scheduler_driver) self._startup_delay = True
def setUp(self): super(EngineManagerUnitTestCase, self).setUp() self.engine = importutils.import_object(CONF.engine_manager) self.context = context.RequestContext()
def create(self, label=None, cidr=None, num_networks=None, network_size=None, multi_host=None, vlan=None, vlan_start=None, vpn_start=None, cidr_v6=None, gateway=None, gateway_v6=None, bridge=None, bridge_interface=None, dns1=None, dns2=None, project_id=None, priority=None, uuid=None, fixed_cidr=None): """Creates fixed IPs for host by range.""" kwargs = {k: v for k, v in six.iteritems(locals()) if v and k != "self"} if multi_host is not None: kwargs['multi_host'] = multi_host == 'T' net_manager = importutils.import_object(CONF.network_manager) net_manager.create_networks(context.get_admin_context(), **kwargs)
def init_leases(network_id): """Get the list of hosts for a network.""" ctxt = context.get_admin_context() network = objects.Network.get_by_id(ctxt, network_id) network_manager = importutils.import_object(CONF.network_manager) return network_manager.get_dhcp_leases(ctxt, network)
def __init__(self, scheduler_driver=None, *args, **kwargs): if not scheduler_driver: scheduler_driver = CONF.scheduler_driver try: self.driver = driver.DriverManager( "nova.scheduler.driver", scheduler_driver, invoke_on_load=True).driver # TODO(Yingxin): Change to catch stevedore.exceptions.NoMatches after # stevedore v1.9.0 except RuntimeError: # NOTE(Yingxin): Loading full class path is deprecated and should # be removed in the N release. try: self.driver = importutils.import_object(scheduler_driver) LOG.warning(_LW("DEPRECATED: scheduler_driver uses " "classloader to load %(path)s. This legacy " "loading style will be removed in the " "N release."), {'path': scheduler_driver}) except (ImportError, ValueError): raise RuntimeError( _("Cannot load scheduler driver from configuration " "%(conf)s."), {'conf': scheduler_driver}) super(SchedulerManager, self).__init__(service_name='scheduler', *args, **kwargs)
def _get_openstack_security_group_driver(skip_policy_check=False): if is_neutron_security_groups(): return importutils.import_object(NEUTRON_DRIVER, skip_policy_check=skip_policy_check) elif CONF.security_group_api.lower() == 'nova': return importutils.import_object(NOVA_DRIVER, skip_policy_check=skip_policy_check) else: return importutils.import_object(CONF.security_group_api, skip_policy_check=skip_policy_check)
def _get_interface_driver(): global interface_driver if not interface_driver: interface_driver = importutils.import_object( CONF.linuxnet_interface_driver) return interface_driver
def get_volume_encryptor(connection_info, **kwargs): """Creates a VolumeEncryptor used to encrypt the specified volume. :param: the connection information used to attach the volume :returns VolumeEncryptor: the VolumeEncryptor for the volume """ encryptor = nop.NoOpEncryptor(connection_info, **kwargs) location = kwargs.get('control_location', None) if location and location.lower() == 'front-end': # case insensitive provider = kwargs.get('provider') if provider == 'LuksEncryptor': provider = 'nova.volume.encryptors.luks.' + provider elif provider == 'CryptsetupEncryptor': provider = 'nova.volume.encryptors.cryptsetup.' + provider elif provider == 'NoOpEncryptor': provider = 'nova.volume.encryptors.nop.' + provider try: encryptor = importutils.import_object(provider, connection_info, **kwargs) except Exception as e: LOG.error(_LE("Error instantiating %(provider)s: %(exception)s"), {'provider': provider, 'exception': e}) raise msg = ("Using volume encryptor '%(encryptor)s' for connection: " "%(connection_info)s" % {'encryptor': encryptor, 'connection_info': connection_info}) LOG.debug(strutils.mask_password(msg)) return encryptor
def __init__(self): transport = CONF.libvirt.remote_filesystem_transport cls_name = '.'.join([__name__, transport.capitalize()]) cls_name += 'Driver' self.driver = importutils.import_object(cls_name)
def instance_for_format(image, mountdir, partition): """Get a Mount instance for the image type :param image: instance of nova.virt.image.model.Image :param mountdir: path to mount the image at :param partition: partition number to mount """ LOG.debug("Instance for format image=%(image)s " "mountdir=%(mountdir)s partition=%(partition)s", {'image': image, 'mountdir': mountdir, 'partition': partition}) if isinstance(image, imgmodel.LocalFileImage): if image.format == imgmodel.FORMAT_RAW: LOG.debug("Using LoopMount") return importutils.import_object( "nova.virt.disk.mount.loop.LoopMount", image, mountdir, partition) else: LOG.debug("Using NbdMount") return importutils.import_object( "nova.virt.disk.mount.nbd.NbdMount", image, mountdir, partition) elif isinstance(image, imgmodel.LocalBlockImage): LOG.debug("Using BlockMount") return importutils.import_object( "nova.virt.disk.mount.block.BlockMount", image, mountdir, partition) else: # TODO(berrange) We could mount RBDImage directly # using kernel RBD block dev support. # # This is left as an enhancement for future # motivated developers todo, since raising # an exception is on par with what this # code did historically raise exception.UnsupportedImageModel( image.__class__.__name__)
def instance_for_device(image, mountdir, partition, device): """Get a Mount instance for the device type :param image: instance of nova.virt.image.model.Image :param mountdir: path to mount the image at :param partition: partition number to mount :param device: mounted device path """ LOG.debug("Instance for device image=%(image)s " "mountdir=%(mountdir)s partition=%(partition)s " "device=%(device)s", {'image': image, 'mountdir': mountdir, 'partition': partition, 'device': device}) if "loop" in device: LOG.debug("Using LoopMount") return importutils.import_object( "nova.virt.disk.mount.loop.LoopMount", image, mountdir, partition, device) elif "nbd" in device: LOG.debug("Using NbdMount") return importutils.import_object( "nova.virt.disk.mount.nbd.NbdMount", image, mountdir, partition, device) else: LOG.debug("Using BlockMount") return importutils.import_object( "nova.virt.disk.mount.block.BlockMount", image, mountdir, partition, device)
def instance_for_image(image, partition): """Get a VFS instance for the image :param image: instance of nova.virt.image.model.Image :param partition: the partition number to access """ LOG.debug("Instance for image image=%(image)s " "partition=%(partition)s", {'image': image, 'partition': partition}) vfs = None try: LOG.debug("Using primary VFSGuestFS") vfs = importutils.import_object( "nova.virt.disk.vfs.guestfs.VFSGuestFS", image, partition) if not VFS.guestfs_ready: # Inspect for capabilities and keep # track of the result only if succeeded. vfs.inspect_capabilities() VFS.guestfs_ready = True return vfs except exception.NovaException: if vfs is not None: # We are able to load libguestfs but # something wrong happens when trying to # check for capabilities. raise else: LOG.info(_LI("Unable to import guestfs, " "falling back to VFSLocalFS")) return importutils.import_object( "nova.virt.disk.vfs.localfs.VFSLocalFS", image, partition)
def _default_download_handler(): # TODO(sirp): This should be configurable like upload_handler return importutils.import_object( 'nova.virt.xenapi.image.glance.GlanceStore')
def test_live_migration_uses_migrateToURI_without_migratable_flag(self): self.compute = importutils.import_object(CONF.compute_manager) instance_dict = dict(self.test_instance) instance_dict.update({'host': 'fake', 'power_state': power_state.RUNNING, 'vm_state': vm_states.ACTIVE}) instance_ref = objects.Instance(**instance_dict) drvr = libvirt_driver.LibvirtDriver(fake.FakeVirtAPI(), False) # Preparing mocks vdmock = self.mox.CreateMock(fakelibvirt.virDomain) self.mox.StubOutWithMock(vdmock, "migrateToURI") _bandwidth = CONF.libvirt.live_migration_bandwidth vdmock.migrateToURI(drvr._live_migration_uri('dest'), mox.IgnoreArg(), None, _bandwidth).AndRaise( fakelibvirt.libvirtError("ERR")) # start test migrate_data = objects.LibvirtLiveMigrateData( graphics_listen_addr_vnc='0.0.0.0', graphics_listen_addr_spice='0.0.0.0', serial_listen_addr='127.0.0.1', target_connect_addr=None, bdms=[], block_migration=False) self.mox.ReplayAll() self.assertRaises(fakelibvirt.libvirtError, drvr._live_migration_operation, self.context, instance_ref, 'dest', False, migrate_data, vdmock, [])
def test_live_migration_uses_migrateToURI_without_dest_listen_addrs(self): self.compute = importutils.import_object(CONF.compute_manager) instance_dict = dict(self.test_instance) instance_dict.update({'host': 'fake', 'power_state': power_state.RUNNING, 'vm_state': vm_states.ACTIVE}) instance_ref = objects.Instance(**instance_dict) drvr = libvirt_driver.LibvirtDriver(fake.FakeVirtAPI(), False) # Preparing mocks vdmock = self.mox.CreateMock(fakelibvirt.virDomain) self.mox.StubOutWithMock(vdmock, "migrateToURI") _bandwidth = CONF.libvirt.live_migration_bandwidth vdmock.migrateToURI(drvr._live_migration_uri('dest'), mox.IgnoreArg(), None, _bandwidth).AndRaise( fakelibvirt.libvirtError("ERR")) # start test migrate_data = objects.LibvirtLiveMigrateData( serial_listen_addr='', target_connect_addr=None, bdms=[], block_migration=False) self.mox.ReplayAll() self.assertRaises(fakelibvirt.libvirtError, drvr._live_migration_operation, self.context, instance_ref, 'dest', False, migrate_data, vdmock, [])
def test_live_migration_fails_without_migratable_flag_or_0_addr(self): self.flags(enabled=True, vncserver_listen='1.2.3.4', group='vnc') self.compute = importutils.import_object(CONF.compute_manager) instance_dict = dict(self.test_instance) instance_dict.update({'host': 'fake', 'power_state': power_state.RUNNING, 'vm_state': vm_states.ACTIVE}) instance_ref = objects.Instance(**instance_dict) # Preparing mocks vdmock = self.mox.CreateMock(fakelibvirt.virDomain) self.mox.StubOutWithMock(vdmock, "migrateToURI") # start test migrate_data = objects.LibvirtLiveMigrateData( graphics_listen_addr_vnc='1.2.3.4', graphics_listen_addr_spice='1.2.3.4', serial_listen_addr='127.0.0.1', target_connect_addr=None, block_migration=False) self.mox.ReplayAll() drvr = libvirt_driver.LibvirtDriver(fake.FakeVirtAPI(), False) self.assertRaises(exception.MigrationError, drvr._live_migration_operation, self.context, instance_ref, 'dest', False, migrate_data, vdmock, [])
def test_run_image_cache_manager_pass(self): was = {'called': False} def fake_get_all_by_filters(context, *args, **kwargs): was['called'] = True instances = [] for x in range(2): instances.append(fake_instance.fake_db_instance( image_ref='1', uuid=x, name=x, vm_state='', task_state='')) return instances with utils.tempdir() as tmpdir: self.flags(instances_path=tmpdir) self.stub_out('nova.db.instance_get_all_by_filters', fake_get_all_by_filters) compute = importutils.import_object(CONF.compute_manager) self.flags(use_local=True, group='conductor') compute.conductor_api = conductor.API() ctxt = context.get_admin_context() compute._run_image_cache_manager_pass(ctxt) self.assertTrue(was['called'])
def setUp(self): super(XenAPIAggregateTestCase, self).setUp() self.flags(connection_url='http://test_url', connection_username='test_user', connection_password='test_pass', group='xenserver') self.flags(instance_name_template='%d', firewall_driver='nova.virt.xenapi.firewall.' 'Dom0IptablesFirewallDriver', host='host', compute_driver='xenapi.XenAPIDriver', default_availability_zone='avail_zone1') self.flags(use_local=True, group='conductor') host_ref = xenapi_fake.get_all('host')[0] stubs.stubout_session(self.stubs, stubs.FakeSessionForVMTests) self.context = context.get_admin_context() self.conn = xenapi_conn.XenAPIDriver(fake.FakeVirtAPI(), False) self.compute = importutils.import_object(CONF.compute_manager) self.api = compute_api.AggregateAPI() values = {'name': 'test_aggr', 'metadata': {'availability_zone': 'test_zone', pool_states.POOL_FLAG: 'XenAPI'}} self.aggr = objects.Aggregate(context=self.context, id=1, **values) self.fake_metadata = {pool_states.POOL_FLAG: 'XenAPI', 'master_compute': 'host', 'availability_zone': 'fake_zone', pool_states.KEY: pool_states.ACTIVE, 'host': xenapi_fake.get_record('host', host_ref)['uuid']}