我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用oslo_utils.importutils.import_class()。
def get_config_hash(file_dir, resource_mapping, exts=['conf']): res = {} if not os.path.isdir(file_dir): logger.debug( "Directory {} not found. Returning emty dict".format(file_dir)) return {} conf_files = [conf for conf in os.listdir(file_dir) if conf.split('.')[-1] in exts] for conf_file in conf_files: if conf_file in resource_mapping.keys(): drv = resource_mapping[conf_file].get( 'driver', 'fuel_external_git.drivers.openstack_config.OpenStackConfig' ) drv_class = importutils.import_class(drv) config = drv_class( os.path.join(file_dir, conf_file), resource_mapping[conf_file]['resource'] ) deep_merge(res, config.to_config_dict()) return res
def __init__(self, host, binary, topic, manager, report_interval=None, periodic_interval=None, periodic_fuzzy_delay=None, service_name=None, *args, **kwargs): super(Service, self).__init__() if not rpc.initialized(): rpc.init(CONF) self.host = host self.binary = binary self.topic = topic self.manager_class_name = manager manager_class = importutils.import_class(self.manager_class_name) self.manager = manager_class(host=self.host, service_name=service_name, *args, **kwargs) self.report_interval = report_interval self.periodic_interval = periodic_interval self.periodic_fuzzy_delay = periodic_fuzzy_delay self.saved_args, self.saved_kwargs = args, kwargs self.timers = []
def _get_manager(self): """Initialize a Manager object appropriate for this service. Use the service name to look up a Manager subclass from the configuration and initialize an instance. If no class name is configured, just return None. :returns: a Manager instance, or None. """ fl = '%s_manager' % self.name if fl not in CONF: return None manager_class_name = CONF.get(fl, None) if not manager_class_name: return None manager_class = importutils.import_class(manager_class_name) return manager_class()
def get_class(api_name, version, version_map): """Returns the client class for the requested API version :param api_name: the name of the API, e.g. 'compute', 'image', etc :param version: the requested API version :param version_map: a dict of client classes keyed by version :rtype: a client class for the requested API version """ try: client_path = version_map[str(version)] except (KeyError, ValueError): msg = _("Invalid %(api_name)s client version '%(version)s'. " "Must be one of: %(version_map)s") % { 'api_name': api_name, 'version': version, 'version_map': ', '.join(version_map.keys())} raise exceptions.UnsupportedVersion(msg) return importutils.import_class(client_path)
def get_matching_classes(self, loadable_class_names): """Get loadable classes from a list of names. Each name can be a full module path or the full path to a method that returns classes to use. The latter behavior is useful to specify a method that returns a list of classes to use in a default case. """ classes = [] for cls_name in loadable_class_names: obj = importutils.import_class(cls_name) if self._is_correct_class(obj): classes.append(obj) elif inspect.isfunction(obj): # Get list of classes from a function for cls in obj(): classes.append(cls) else: error_str = 'Not a class of the correct type' raise exception.ClassNotFound(class_name=cls_name, exception=error_str) return classes
def get_client_class(api_name, version, version_map): """Returns the client class for the requested API version :param api_name: the name of the API, e.g. 'compute', 'image', etc :param version: the requested API version :param version_map: a dict of client classes keyed by version :rtype: a client class for the requested API version """ try: client_path = version_map[str(version)] except (KeyError, ValueError): sorted_versions = sorted(version_map.keys(), key=lambda s: list(map(int, s.split('.')))) msg = _( "Invalid %(api_name)s client version '%(version)s'. " "must be one of: %(version_map)s" ) raise exceptions.UnsupportedVersion(msg % { 'api_name': api_name, 'version': version, 'version_map': ', '.join(sorted_versions), }) return importutils.import_class(client_path)
def get_client_class(api_name, version, version_map): """Returns the client class for the requested API version. :param api_name: the name of the API, e.g. 'compute', 'image', etc :param version: the requested API version :param version_map: a dict of client classes keyed by version :rtype: a client class for the requested API version """ try: client_path = version_map[str(version)] except (KeyError, ValueError): msg = _("Invalid %(api_name)s client version '%(version)s'. must be " "one of: %(map_keys)s") msg = msg % {'api_name': api_name, 'version': version, 'map_keys': ', '.join(version_map.keys())} raise exceptions.UnsupportedVersion(msg) return importutils.import_class(client_path)
def __init__(self, host, binary, topic, manager, periodic_enable=None, periodic_fuzzy_delay=None, periodic_interval_max=None): super(Service, self).__init__() if not rpc.initialized(): rpc.init(CONF) self.host = host self.binary = binary self.topic = topic self.manager_class_name = manager manager_class = importutils.import_class(self.manager_class_name) self.rpcserver = None self.manager = manager_class(host=self.host) self.periodic_enable = periodic_enable self.periodic_fuzzy_delay = periodic_fuzzy_delay self.periodic_interval_max = periodic_interval_max
def load_extension(self, ext_factory): """Execute an extension factory. Loads an extension. The 'ext_factory' is the name of a callable that will be imported and called with one argument--the extension manager. The factory callable is expected to call the register() method at least once. """ LOG.debug("Loading extension %s", ext_factory) if isinstance(ext_factory, six.string_types): # Load the factory factory = importutils.import_class(ext_factory) else: factory = ext_factory # Call it LOG.debug("Calling extension factory %s", ext_factory) factory(self)
def __init__(self, session, virtapi): self.compute_api = compute.API() self._session = session self._virtapi = virtapi self._volumeops = volumeops.VolumeOps(self._session) self.firewall_driver = firewall.load_driver( DEFAULT_FIREWALL_DRIVER, xenapi_session=self._session) vif_impl = importutils.import_class(CONF.xenserver.vif_driver) self.vif_driver = vif_impl(xenapi_session=self._session) self.default_root_dev = '/dev/sda' LOG.debug("Importing image upload handler: %s", CONF.xenserver.image_upload_handler) self.image_upload_handler = importutils.import_object( CONF.xenserver.image_upload_handler)
def __init__(self, host, binary, topic, manager, report_interval=None, periodic_enable=None, periodic_fuzzy_delay=None, periodic_interval_max=None, db_allowed=True, *args, **kwargs): super(Service, self).__init__() self.host = host self.binary = binary self.topic = topic self.manager_class_name = manager self.servicegroup_api = servicegroup.API() manager_class = importutils.import_class(self.manager_class_name) self.manager = manager_class(host=self.host, *args, **kwargs) self.rpcserver = None self.report_interval = report_interval self.periodic_enable = periodic_enable self.periodic_fuzzy_delay = periodic_fuzzy_delay self.periodic_interval_max = periodic_interval_max self.saved_args, self.saved_kwargs = args, kwargs self.backdoor_port = None self.conductor_api = conductor.API(use_local=db_allowed) self.conductor_api.wait_until_ready(context.get_admin_context())
def __init__(self, application, limits=None, limiter=None, **kwargs): """Initialize new `RateLimitingMiddleware`. It wraps the given WSGI application and sets up the given limits. @param application: WSGI application to wrap @param limits: String describing limits @param limiter: String identifying class for representing limits Other parameters are passed to the constructor for the limiter. """ base_wsgi.Middleware.__init__(self, application) # Select the limiter class if limiter is None: limiter = Limiter else: limiter = importutils.import_class(limiter) # Parse the limits, if any are provided if limits is not None: limits = limiter.parse_limits(limits) self._limiter = limiter(limits or DEFAULT_LIMITS, **kwargs)
def __init__(self, *args, **kwargs): LOG.warning(_LW('The cells feature of Nova is considered experimental ' 'by the OpenStack project because it receives much ' 'less testing than the rest of Nova. This may change ' 'in the future, but current deployers should be aware ' 'that the use of it in production right now may be ' 'risky. Also note that cells does not currently ' 'support rolling upgrades, it is assumed that cells ' 'deployments are upgraded lockstep so n-1 cells ' 'compatibility does not work.')) # Mostly for tests. cell_state_manager = kwargs.pop('cell_state_manager', None) super(CellsManager, self).__init__(service_name='cells', *args, **kwargs) if cell_state_manager is None: cell_state_manager = cells_state.CellStateManager self.state_manager = cell_state_manager() self.msg_runner = messaging.MessageRunner(self.state_manager) cells_driver_cls = importutils.import_class( CONF.cells.driver) self.driver = cells_driver_cls() self.instances_to_heal = iter([])
def _parse_class_args(self): """Parse the contrailplugin.ini file. Opencontrail supports extension such as ipam, policy, these extensions can be configured in the plugin configuration file as shown below. Plugin then loads the specified extensions. contrail_extensions=ipam:<classpath>,policy:<classpath> """ contrail_extensions = cfg.CONF.APISERVER.contrail_extensions # If multiple class specified for same extension, last one will win # according to DictOpt behavior for ext_name, ext_class in contrail_extensions.items(): try: if not ext_class or ext_class == 'None': self.supported_extension_aliases.append(ext_name) continue ext_class = importutils.import_class(ext_class) ext_instance = ext_class() ext_instance.set_core(self) for method in dir(ext_instance): for prefix in ['get', 'update', 'delete', 'create']: if method.startswith('%s_' % prefix): setattr(self, method, ext_instance.__getattribute__(method)) self.supported_extension_aliases.append(ext_name) except Exception: LOG.exception("Contrail Backend Error") # Converting contrail backend error to Neutron Exception raise exceptions.InvalidContrailExtensionError( ext_name=ext_name, ext_class=ext_class) self._build_auth_details()
def API(): importutils = oslo_utils.importutils cluster_api_class = oslo_config.cfg.CONF.cluster_api_class cls = importutils.import_class(cluster_api_class) return cls()
def __init__(self, host, binary, manager): super(Service, self).__init__() self.host = host self.binary = binary self.manager_class_name = manager manager_class = importutils.import_class(self.manager_class_name) self.manager = manager_class(host=self.host)
def __init__(self, virtapi): super(DockerDriver, self).__init__(virtapi) self._docker = None vif_class = importutils.import_class(CONF.docker.vif_driver) self.vif_driver = vif_class() self.firewall_driver = firewall.load_driver( default='nova.virt.firewall.NoopFirewallDriver') # NOTE(zhangguoqing): For passing the nova unit tests self.active_migrations = {}
def test_all_public_methods_are_traced(self): profiler_opts.set_defaults(conf.CONF) self.config(enabled=True, group='profiler') classes = [ 'zun.compute.api.API', 'zun.compute.rpcapi.API', ] for clsname in classes: # give the metaclass and trace_cls() decorator a chance to patch # methods of the classes above six.reload_module( importutils.import_module(clsname.rsplit('.', 1)[0])) cls = importutils.import_class(clsname) for attr, obj in cls.__dict__.items(): # only public methods are traced if attr.startswith('_'): continue # only checks callables if not (inspect.ismethod(obj) or inspect.isfunction(obj)): continue # osprofiler skips static methods if isinstance(obj, staticmethod): continue self.assertTrue(getattr(obj, '__traced__', False), obj)
def setUp(self): super(TestShell, self).setUp() self.shell_class = importutils.import_class(self.shell_class_name) self.cmd_patch = mock.patch(self.shell_class_name + ".run_subcommand") self.cmd_save = self.cmd_patch.start() self.addCleanup(self.cmd_patch.stop) self.app = mock.Mock("Test Shell")
def test_import_class(self): dt = importutils.import_class('datetime.datetime') self.assertEqual(sys.modules['datetime'].datetime, dt)
def test_import_bad_class(self): self.assertRaises(ImportError, importutils.import_class, 'lol.u_mad.brah')
def _get_client_class_and_version(version): if not isinstance(version, api_versions.APIVersion): version = api_versions.get_api_version(version) else: api_versions.check_major_version(version) if version.is_latest(): raise exceptions.UnsupportedVersion(("The version should be explicit, " "not latest.")) return version, importutils.import_class( "harborclient.v%s.client.Client" % version.ver_major)
def _get_connector(self): connector = cinder_conf.volume_connector if not connector or connector not in volume_connector_conf: msg = _("Must provide an valid volume connector") LOG.error(msg) raise exceptions.FuxiException(msg) return importutils.import_class(volume_connector_conf[connector])()
def driver_dict_from_config(named_driver_config, *args, **kwargs): driver_registry = dict() for driver_str in named_driver_config: driver_type, _sep, driver = driver_str.partition('=') driver_class = importutils.import_class(driver) driver_registry[driver_type] = driver_class(*args, **kwargs) return driver_registry
def _import_factory(self, local_conf): """Import an app/filter class. Lookup the KEY from the PasteDeploy local conf and import the class named there. This class can then be used as an app or filter factory. """ class_name = local_conf[self.KEY].replace(':', '.').strip() return importutils.import_class(class_name)
def _get_client_class_and_version(version): if not isinstance(version, api_versions.APIVersion): version = api_versions.get_api_version(version) else: api_versions.check_major_version(version) if version.is_latest(): raise exceptions.UnsupportedVersion( _('The version should be explicit, not latest.')) return version, importutils.import_class( 'zunclient.v%s.client.Client' % version.ver_major)
def load_config(self): LOG.info("Loading Ironic settings.") self._client_cls = importutils.import_class(CONF.IRONIC.ironic_client) self._client = self._get_client() self._ipam_strategies = self._parse_ironic_ipam_strategies() LOG.info("Ironic Driver config loaded. Client: %s" % (CONF.IRONIC.endpoint_url))
def get_client_class(version): version_map = { '1.1': 'dhclient.v2.client.Client', '2': 'dhclient.v2.client.Client', '3': 'dhclient.v2.client.Client', } try: client_path = version_map[str(version)] except (KeyError, ValueError): msg = _("Invalid client version '%(version)s'. must be one of: " "%(keys)s") % {'version': version, 'keys': ', '.join(version_map.keys())} raise exceptions.UnsupportedVersion(msg) return importutils.import_class(client_path)
def get_client_class(api_name, version, version_map): try: client_path = version_map[str(version)] except (KeyError, ValueError): msg = _("Invalid %(api_name)s client version '%(version)s'. must be " "one of: %(map_keys)s") msg = msg % {'api_name': api_name, 'version': version, 'map_keys': ', '.join(version_map.keys())} raise exceptions.UnsupportedVersion(msg) return importutils.import_class(client_path)
def get_client_class(version): version_map = { '1.1': 'icgwclient.v2.client.Client', '2': 'icgwclient.v2.client.Client', '3': 'icgwclient.v2.client.Client', } try: client_path = version_map[str(version)] except (KeyError, ValueError): msg = _("Invalid client version '%(version)s'. must be one of: " "%(keys)s") % {'version': version, 'keys': ', '.join(version_map.keys())} raise exceptions.UnsupportedVersion(msg) return importutils.import_class(client_path)
def get_client_class(api_name, version, version_map): try: client_path = version_map[str(version)] except (KeyError, ValueError): msg = "Invalid %s client version '%s'. must be one of: %s" % ( (api_name, version, ', '.join(version_map.keys()))) raise exceptions.UnsupportedVersion(msg) return importutils.import_class(client_path)
def main(manager='rock.extension_manager.ExtensionManager'): utils.register_all_options() utils.prepare_log(service_name='rock-mon') log = logging.getLogger(__name__) log.info('Start rock monitor.') mgr_class = importutils.import_class(manager) file_path = os.path.abspath(__file__) file_dir = os.path.dirname(file_path) ext_mgr = mgr_class(file_dir + '/extensions') ext_mgr.start_collect_data()
def data_get_by_obj_time(obj_name, delta): model_name = 'model_' + obj_name model = importutils.import_class( 'rock.db.sqlalchemy.%s.%s' % (model_name, rule_utils.underline_to_camel(model_name))) timedelta = datetime.timedelta(seconds=delta) return db_api.get_period_records(model, timeutils.utcnow()-timedelta, end_time=timeutils.utcnow(), sort_key='created_at')
def get_tasks_objects(task_cls_name): result = [] for i in task_cls_name: result.append(importutils.import_class(i)()) return result
def main(manager='rock.rules.rule_manager.RuleManager'): utils.register_all_options() utils.prepare_log(service_name='rock-engine') log = logging.getLogger(__name__) log.info('Start rock engine') mgr_class = importutils.import_class(manager) mgr = mgr_class('/etc/rock/cases') from rock.tasks.check_and_run import check_and_run check_and_run() mgr.after_start()
def _get_volume_drivers(self): driver_registry = dict() for driver_str in dpm_volume_drivers: driver_type, _sep, driver = driver_str.partition('=') driver_class = importutils.import_class(driver) driver_registry[driver_type] = driver_class(self._host) return driver_registry
def __init__(self): self.queryclient = LazyLoader(importutils.import_class( 'mogan.scheduler.client.query.SchedulerQueryClient')) self.reportclient = LazyLoader(importutils.import_class( 'mogan.scheduler.client.report.SchedulerReportClient'))
def monkey_patch(): """If the CONF.monkey_patch set as True, this function patches a decorator for all functions in specified modules. You can set decorators for each modules using CONF.monkey_patch_modules. The format is "Module path:Decorator function". name - name of the function function - object of the function """ # If CONF.monkey_patch is not True, this function do nothing. if not CONF.monkey_patch: return if six.PY2: is_method = inspect.ismethod else: def is_method(obj): # Unbound methods became regular functions on Python 3 return inspect.ismethod(obj) or inspect.isfunction(obj) # Get list of modules and decorators for module_and_decorator in CONF.monkey_patch_modules: module, decorator_name = module_and_decorator.split(':') # import decorator function decorator = importutils.import_class(decorator_name) __import__(module) # Retrieve module information using pyclbr module_data = pyclbr.readmodule_ex(module) for key, value in module_data.items(): # set the decorator for the class methods if isinstance(value, pyclbr.Class): clz = importutils.import_class("%s.%s" % (module, key)) for method, func in inspect.getmembers(clz, is_method): setattr(clz, method, decorator("%s.%s.%s" % (module, key, method), func)) # set the decorator for the function if isinstance(value, pyclbr.Function): func = importutils.import_class("%s.%s" % (module, key)) setattr(sys.modules[module], key, decorator("%s.%s" % (module, key), func))
def API(): cls = importutils.import_class("masakari.compute.nova.API") return cls()
def include_var(name, rawtext, text, lineno, inliner, options=None, content=None): """include variable :param name: The local name of the interpreted role, the role name actually used in the document. :param rawtext: A string containing the enitre interpreted text input, including the role and markup. Return it as a problematic node linked to a system message if a problem is encountered. :param text: The interpreted text content. :param lineno: The line number where the interpreted text begins. :param inliner: The docutils.parsers.rst.states.Inliner object that called include_var. It contains the several attributes useful for error reporting and document tree access. :param options: A dictionary of directive options for customization (from the 'role' directive), to be interpreted by the role function. Used for additional attributes for the generated elements and other functionality. :param content: A list of strings, the directive content for customization (from the 'role' directive). To be interpreted by the role function. :return: """ obj = importutils.import_class(text) if isinstance(obj, (tuple, list)): obj = ", ".join(obj) elif isinstance(obj, dict): obj = json.dumps(dict, indent=4) else: obj = str(obj) return [nodes.Text(obj)], []
def __init__(self): self.queryclient = LazyLoader(importutils.import_class( 'nova.scheduler.client.query.SchedulerQueryClient')) self.reportclient = LazyLoader(importutils.import_class( 'nova.scheduler.client.report.SchedulerReportClient'))
def API(): cls = importutils.import_class(CONF.keymgr.api_class) return cls()
def API(skip_policy_check=False): if is_neutron() is None: network_api_class = oslo_config.cfg.CONF.network_api_class elif is_neutron(): network_api_class = NEUTRON_NET_API else: network_api_class = NOVA_NET_API cls = importutils.import_class(network_api_class) return cls(skip_policy_check=skip_policy_check)