我们从Python开源项目中,提取了以下11个代码示例,用于说明如何使用oslo_utils.importutils.try_import()。
def __init__(self): if not importutils.try_import('iboot'): raise ironic_exception.DriverLoadError( driver=self.__class__.__name__, reason=_("Unable to import iboot library")) self.boot = fake.FakeBoot() self.power = iboot_power.IBootPower() self.deploy = fake.FakeDeploy()
def __init__(self): LOG.warning("This driver is deprecated and will be removed " "in the Rocky release. " "Use 'staging-iboot' hardware type instead.") if not importutils.try_import('iboot'): raise ironic_exception.DriverLoadError( driver=self.__class__.__name__, reason=_("Unable to import iboot library")) self.power = iboot_power.IBootPower() self.boot = pxe.PXEBoot() self.deploy = iscsi_deploy.ISCSIDeploy()
def __init__(self): LOG.warning("This driver is deprecated and will be removed " "in the Rocky release. " "Use 'staging-iboot' hardware type instead.") if not importutils.try_import('iboot'): raise ironic_exception.DriverLoadError( driver=self.__class__.__name__, reason=_("Unable to import iboot library")) self.power = iboot_power.IBootPower() self.boot = pxe.PXEBoot() self.deploy = agent.AgentDeploy()
def __init__(self): LOG.warning("This driver is deprecated and will be removed " "in the Rocky release. " "Use 'staging-amt' hardware type instead.") if not importutils.try_import('pywsman'): raise ironic_exception.DriverLoadError( driver=self.__class__.__name__, reason=_("Unable to import pywsman library")) self.power = amt_power.AMTPower() self.boot = pxe.PXEBoot() self.deploy = amt_deploy.AMTISCSIDeploy() self.management = amt_management.AMTManagement()
def __init__(self): LOG.warning("This driver is deprecated and will be removed " "in the Rocky release. " "Use 'staging-amt' hardware type instead.") if not importutils.try_import('pywsman'): raise ironic_exception.DriverLoadError( driver=self.__class__.__name__, reason=_("Unable to import pywsman library")) self.power = amt_power.AMTPower() self.boot = pxe.PXEBoot() self.deploy = agent.AgentDeploy() self.management = amt_management.AMTManagement()
def __init__(self, manager_module, manager_class, topic, host=None): super(RPCService, self).__init__() self.topic = topic self.host = host or CONF.host manager_module = importutils.try_import(manager_module) manager_class = getattr(manager_module, manager_class) self.manager = manager_class(self.topic, self.host) self.rpcserver = None
def __init__(self, host, manager_module, manager_class): super(RPCService, self).__init__() self.host = host manager_module = importutils.try_import(manager_module) manager_class = getattr(manager_module, manager_class) self.manager = manager_class(host, manager_module.MANAGER_TOPIC) self.topic = self.manager.topic self.rpcserver = None self.deregister = True
def test_try_import(self): dt = importutils.try_import('datetime') self.assertEqual(sys.modules['datetime'], dt)
def test_try_import_returns_default(self): foo = importutils.try_import('foo.bar') self.assertIsNone(foo)
def _get_connection(self): if self._cached_conn is None: libvirt_module = importutils.try_import('libvirt') if not libvirt_module: raise error.OSFError('libvirt-python is required ' 'to use LibvirtDriver') self._cached_conn = libvirt_module.open(self.connection_uri) return self._cached_conn
def __init__(self, manager_module, manager_class, topic, host=None): super(RPCService, self).__init__() self.host = host or CONF.host manager_module = importutils.try_import(manager_module) manager_class = getattr(manager_module, manager_class) self.manager = manager_class(host, topic) self.topic = topic self.rpcserver = None