我们从Python开源项目中,提取了以下35个代码示例,用于说明如何使用oslo_config.cfg.OptGroup()。
def load_repositories_options(): repo_opts = [ cfg.StrOpt( 'offsets', default='monasca_transform.offset_specs:JSONOffsetSpecs', help='Repository for offset persistence' ), cfg.StrOpt( 'data_driven_specs', default='monasca_transform.data_driven_specs.' 'json_data_driven_specs_repo:JSONDataDrivenSpecsRepo', help='Repository for metric and event data_driven_specs' ), cfg.IntOpt('offsets_max_revisions', default=10, help="Max revisions of offsets for each application") ] repo_group = cfg.OptGroup(name='repositories', title='repositories') cfg.CONF.register_group(repo_group) cfg.CONF.register_opts(repo_opts, group=repo_group)
def load_messaging_options(): messaging_options = [ cfg.StrOpt('adapter', default='monasca_transform.messaging.adapter:' 'KafkaMessageAdapter', help='Message adapter implementation'), cfg.StrOpt('topic', default='metrics', help='Messaging topic'), cfg.StrOpt('brokers', default='192.168.10.4:9092', help='Messaging brokers'), cfg.StrOpt('publish_kafka_project_id', default='111111', help='publish aggregated metrics tenant'), cfg.StrOpt('adapter_pre_hourly', default='monasca_transform.messaging.adapter:' 'KafkaMessageAdapterPreHourly', help='Message adapter implementation'), cfg.StrOpt('topic_pre_hourly', default='metrics_pre_hourly', help='Messaging topic pre hourly') ] messaging_group = cfg.OptGroup(name='messaging', title='messaging') cfg.CONF.register_group(messaging_group) cfg.CONF.register_opts(messaging_options, group=messaging_group)
def get_list_providers(): # ensure all driver groups have been registered sections = CONF.list_all_sections() for section in sections: CONF.register_group(cfg.OptGroup(section)) # ensure all of enable drivers configured exact opts enable_drivers = CONF.providers.enable_drivers list_providers = [] for driver in enable_drivers.keys(): type_driver = enable_drivers.get(driver) if type_driver == 'openstack': CONF.register_opts( calplus.conf.providers.openstack_opts, driver) elif type_driver == 'amazon': CONF.register_opts( calplus.conf.providers.amazon_opts, driver) else: continue list_providers.append( Provider(type_driver, CONF.get(driver)) ) return list_providers
def _get_allowed_hostclass(self, project_name): """Get the allowed list of hostclass from configuration.""" try: group = CONF[project_name] except cfg.NoSuchOptError: # dynamically add the group into the configuration group = cfg.OptGroup(project_name, 'project options') CONF.register_group(group) CONF.register_opt(cfg.ListOpt('allowed_classes'), group=project_name) try: allowed_classes = CONF[project_name].allowed_classes except cfg.NoSuchOptError: LOG.error('No allowed_classes config option in [%s]', project_name) return [] else: if allowed_classes: return allowed_classes else: return []
def _register_opts(self): """Register oslo.config options""" opts = [] option = cfg.StrOpt('url', default=None, help=_('API endpoint url')) opts.append(option) option = cfg.IntOpt('read_timeout', default=5, help=_('API read timeout in seconds')) opts.append(option) option = cfg.IntOpt('retries', default=3, help=_('API request retries')) opts.append(option) opt_group = cfg.OptGroup('valet') CONF.register_group(opt_group) CONF.register_opts(opts, group=opt_group)
def __init__(self, conf_path): self.conf_path = conf_path self.opt_group = cfg.OptGroup(name='endpoint', title='Get the endpoints for keystone') self.endpoint_opts = [cfg.StrOpt('endpoint', default='None', help=('URL or IP address where OpenStack keystone runs.')) ] CONF = cfg.CONF CONF.register_group(self.opt_group) CONF.register_opts(self.endpoint_opts, self.opt_group) CONF(default_config_files=[self.conf_path]) self.AUTH_ENDPOINT = CONF.endpoint.endpoint
def setUp(self): super(AuthConfTestCase, self).setUp() self.config(region_name='fake_region', group='keystone') self.test_group = 'test_group' self.cfg_fixture.conf.register_group(cfg.OptGroup(self.test_group)) mogan_auth.register_auth_opts(self.cfg_fixture.conf, self.test_group) self.config(auth_type='password', group=self.test_group) # NOTE(pas-ha) this is due to auth_plugin options # being dynamically registered on first load, # but we need to set the config before plugin = kaloading.get_plugin_loader('password') opts = kaloading.get_auth_plugin_conf_options(plugin) self.cfg_fixture.register_opts(opts, group=self.test_group) self.config(auth_url='http://127.0.0.1:9898', username='fake_user', password='fake_pass', project_name='fake_tenant', group=self.test_group)
def _register_opts(self): '''Register Options''' opts = [] option = cfg.StrOpt(self.opt_failure_mode_str, choices=['reject', 'yield'], default='reject', help=_('Mode to operate in if Valet planning fails for any reason.')) opts.append(option) option = cfg.StrOpt(self.opt_project_name_str, default=None, help=_('Valet Project Name')) opts.append(option) option = cfg.StrOpt(self.opt_username_str, default=None, help=_('Valet Username')) opts.append(option) option = cfg.StrOpt(self.opt_password_str, default=None, help=_('Valet Password')) opts.append(option) option = cfg.StrOpt(self.opt_auth_uri_str, default=None, help=_('Keystone Authorization API Endpoint')) opts.append(option) opt_group = cfg.OptGroup(self.opt_group_str) cfg.CONF.register_group(opt_group) cfg.CONF.register_opts(opts, group=opt_group) # TODO(JD): Factor out common code between this and the cinder filter
def load_database_options(): db_opts = [ cfg.StrOpt('server_type'), cfg.StrOpt('host'), cfg.StrOpt('database_name'), cfg.StrOpt('username'), cfg.StrOpt('password'), cfg.BoolOpt('use_ssl', default=False), cfg.StrOpt('ca_file') ] mysql_group = cfg.OptGroup(name='database', title='database') cfg.CONF.register_group(mysql_group) cfg.CONF.register_opts(db_opts, group=mysql_group)
def load_service_options(): service_opts = [ cfg.StrOpt('coordinator_address'), cfg.StrOpt('coordinator_group'), cfg.FloatOpt('election_polling_frequency'), cfg.BoolOpt('enable_debug_log_entries', default='false'), cfg.StrOpt('setup_file'), cfg.StrOpt('setup_target'), cfg.StrOpt('spark_driver'), cfg.StrOpt('service_log_path'), cfg.StrOpt('service_log_filename', default='monasca-transform.log'), cfg.StrOpt('spark_event_logging_dest'), cfg.StrOpt('spark_event_logging_enabled'), cfg.StrOpt('spark_jars_list'), cfg.StrOpt('spark_master_list'), cfg.StrOpt('spark_python_files'), cfg.IntOpt('stream_interval'), cfg.StrOpt('work_dir'), cfg.StrOpt('spark_home'), cfg.BoolOpt('enable_record_store_df_cache'), cfg.StrOpt('record_store_df_cache_storage_level') ] service_group = cfg.OptGroup(name='service', title='service') cfg.CONF.register_group(service_group) cfg.CONF.register_opts(service_opts, group=service_group)
def load_stage_processors_options(): app_opts = [ cfg.BoolOpt('pre_hourly_processor_enabled'), ] app_group = cfg.OptGroup(name='stage_processors', title='stage_processors') cfg.CONF.register_group(app_group) cfg.CONF.register_opts(app_opts, group=app_group)
def get_subnets(self, pod, project_id): subnet_id = config.CONF.neutron_defaults.pod_subnet if not subnet_id: # NOTE(ivc): this option is only required for # DefaultPodSubnetDriver and its subclasses, but it may be # optional for other drivers (e.g. when each namespace has own # subnet) raise cfg.RequiredOptError('pod_subnet', cfg.OptGroup('neutron_defaults')) return {subnet_id: _get_subnet(subnet_id)}
def get_subnets(self, service, project_id): subnet_id = config.CONF.neutron_defaults.service_subnet if not subnet_id: # NOTE(ivc): this option is only required for # DefaultServiceSubnetDriver and its subclasses, but it may be # optional for other drivers (e.g. when each namespace has own # subnet) raise cfg.RequiredOptError('service_subnet', cfg.OptGroup('neutron_defaults')) return {subnet_id: _get_subnet(subnet_id)}
def get_security_groups(self, pod, project_id): sg_list = config.CONF.neutron_defaults.pod_security_groups if not sg_list: # NOTE(ivc): this option is only required for # Default{Pod,Service}SecurityGroupsDriver and its subclasses, # but it may be optional for other drivers (e.g. when each # namespace has own set of security groups) raise cfg.RequiredOptError('pod_security_groups', cfg.OptGroup('neutron_defaults')) return sg_list[:]
def get_security_groups(self, service, project_id): # NOTE(ivc): use the same option as DefaultPodSecurityGroupsDriver sg_list = config.CONF.neutron_defaults.pod_security_groups if not sg_list: # NOTE(ivc): this option is only required for # Default{Pod,Service}SecurityGroupsDriver and its subclasses, # but it may be optional for other drivers (e.g. when each # namespace has own set of security groups) raise cfg.RequiredOptError('pod_security_groups', cfg.OptGroup('neutron_defaults')) return sg_list[:]
def get_project(self, pod): project_id = config.CONF.neutron_defaults.project if not project_id: # NOTE(ivc): this option is only required for # DefaultPodProjectDriver and its subclasses, but it may be # optional for other drivers (e.g. when each namespace has own # project) raise cfg.RequiredOptError('project', cfg.OptGroup('neutron_defaults')) return project_id
def get_project(self, service): project_id = config.CONF.neutron_defaults.project if not project_id: # NOTE(ivc): this option is only required for # DefaultServiceProjectDriver and its subclasses, but it may be # optional for other drivers (e.g. when each namespace has own # project) raise cfg.RequiredOptError('project', cfg.OptGroup('neutron_defaults')) return project_id
def main(): service.prepare_service("gexporter", sys.argv) CONF = cfg.CONF opt_group = cfg.OptGroup(name='gexporter', title='Options for the\ exporter service') CONF.register_group(opt_group) CONF.register_opts(API_SERVICE_OPTS, opt_group) CONF.set_override('topic', CONF.gexporter.topic, opt_group) CONF.set_override('rabbitmq_host', CONF.gexporter.rabbitmq_host, opt_group) CONF.set_override('rabbitmq_port', CONF.gexporter.rabbitmq_port, opt_group) CONF.set_override('rabbitmq_username', CONF.gexporter.rabbitmq_username, opt_group) endpoints = [ controller.Controller(), ] log.info('Starting exporter service in PID %s' % os.getpid()) rpc_server = broker.Broker(CONF.gexporter.topic, CONF.gexporter.rabbitmq_host, endpoints) print 'Galaxia Exporter service started in PID %s' % os.getpid() rpc_server.serve()
def main(): service.prepare_service("grenderer", sys.argv) CONF = cfg.CONF opt_group = cfg.OptGroup(name='grenderer', title='Options for the\ renderer service') CONF.register_group(opt_group) CONF.register_opts(API_SERVICE_OPTS, opt_group) CONF.set_override('topic', CONF.grenderer.topic, opt_group) CONF.set_override('rabbitmq_host', CONF.grenderer.rabbitmq_host, opt_group) CONF.set_override('rabbitmq_port', CONF.grenderer.rabbitmq_port, opt_group) CONF.set_override('rabbitmq_username', CONF.grenderer.rabbitmq_username, opt_group) CONF.set_override('handler', CONF.grenderer.handler, opt_group) handlers = { 'prometheus': controller.Controller, } endpoints = [ handlers[CONF.grenderer.handler](), ] log.info('Starting renderer service in PID %s' % os.getpid()) rpc_server = broker.Broker(CONF.grenderer.topic, CONF.grenderer.rabbitmq_host, endpoints) print 'Galaxia Renderer service started in PID %s' % os.getpid() rpc_server.serve()
def create_metrics_exporter(self, **kwargs): """ Handler to create metrics exporter :param kwargs: :return: """ """ CONF = cfg.CONF opt_group = cfg.OptGroup(name='gapi', title='Options for the api service') CONF.register_group(opt_group) CONF.register_opts(API_SERVICE_OPTS, opt_group) CONF.set_override('topic_exporter', CONF.gapi.topic_exporter, opt_group) CONF.set_override('rabbitmq_host', CONF.gapi.rabbitmq_host, opt_group) """ exporter_id = str(uuid.uuid4()) kwargs['exporter_id'] = exporter_id sql_query = query_list.INSERT_INTO_EXPORTER params = kwargs['exporter_name'], exporter_id try: conn = sql_helper.engine.connect() conn.execute(sql_query, params) except SQLAlchemyError as ex: return "A database exception has been hit and metrics exporter has\ failed" log.info("Initializing a client connection") try: mq_client = client.Client(CONF.gapi.topic_exporter, CONF.gapi.rabbitmq_host) ctxt = {} log.info("Publishing message to rabbitmq") mq_client.rpc_client.cast(ctxt, 'export_metrics', message=kwargs) except Exception as ex: return "A messaging exception has been hit and metrics export\ request has failed" return "Metrics export request has been successfully accepted"
def prepare_service(service_name, argv=[]): """ Initializes the various galaxia services :param service_name: Name of the service :param argv: Configuration file :return: """ cfg.CONF(argv[1:], project='galaxia') opt_group = cfg.OptGroup(name=service_name, title='Logging options for\ the service') cfg.CONF.register_group(opt_group) cfg.CONF.register_opts(LOGGING_SERVICE_OPTS, opt_group) log_file = cfg.CONF._get("log_file", group=opt_group) log_level = cfg.CONF._get("log_level", group=opt_group) log_helper.setup_logging(log_file, log_level) db_group = cfg.OptGroup(name='db', title='Options for the database') cfg.CONF.register_group(db_group) cfg.CONF.register_opts(DB_SERVICE_OPTS, db_group) cfg.CONF.set_override('db_host', cfg.CONF.db.db_host, db_group) cfg.CONF.set_override('type', cfg.CONF.db.type, db_group) cfg.CONF.set_override('db_location', cfg.CONF.db.db_location, db_group) sql_helper.init_db(cfg.CONF.db.type, cfg.CONF.db.username, cfg.CONF.db.password, cfg.CONF.db.db_host, "galaxia", cfg.CONF.db.db_location) load_mapping.initialize()
def test_list_opts(self): for group, opt_list in opts.list_opts(): if isinstance(group, six.string_types): self.assertEqual(group, 'DEFAULT') else: self.assertIsInstance(group, cfg.OptGroup) for opt in opt_list: self.assertIsInstance(opt, cfg.Opt)
def test_load_config_file_to_realize_all_driver(self): CONF(['--config-file', 'calplus/tests/fake_config_file.conf']) # TODO: Maybe we need remove example group, # such as: openstack and amazon # ensure all driver groups have been registered sections = CONF.list_all_sections() for section in sections: CONF.register_group(cfg.OptGroup(section)) # ensure all of enable drivers configured exact opts enable_drivers = CONF.providers.enable_drivers for driver in enable_drivers.keys(): if enable_drivers.get(driver) == 'openstack': CONF.register_opts( calplus.conf.providers.openstack_opts, driver) elif enable_drivers.get(driver) == 'amazon': CONF.register_opts( calplus.conf.providers.amazon_opts, driver) else: continue self.assertEqual(CONF.openstack1['driver_name'], 'HUST') self.assertEqual(CONF.openstack2['driver_name'], 'SOICT') self.assertEqual(CONF.amazon['driver_name'], 'Amazon')
def test_list_opts(self): for group, opt_list in opts.list_opts(): if isinstance(group, six.string_types): self.assertEqual('DEFAULT', group) else: self.assertIsInstance(group, cfg.OptGroup) for opt in opt_list: self.assertIsInstance(opt, cfg.Opt)
def list_opts(): group_name, ssl_opts = sslutils.list_opts()[0] ssl_group = cfg.OptGroup(name=group_name, title='Options for the ssl') return { ssl_group: ssl_opts }
def register_opts(config): config.register_group(cfg.OptGroup(name='hardware', title="Hardware settings", help="")) config.register_opts(group='hardware', opts=hardware_opts) config.register_group(cfg.OptGroup(name='underlay', title="Underlay configuration", help="")) config.register_opts(group='underlay', opts=underlay_opts) config.register_group(cfg.OptGroup(name='k8s_deploy', title="K8s deploy configuration", help="")) config.register_opts(group='k8s_deploy', opts=k8s_deploy_opts) config.register_group(cfg.OptGroup(name='k8s', title="K8s config and credentials", help="")) config.register_opts(group='k8s', opts=k8s_opts) config.register_group(cfg.OptGroup(name='ccp_deploy', title="CCP deploy configuration", help="")) config.register_opts(group='ccp_deploy', opts=ccp_deploy_opts) config.register_group(cfg.OptGroup(name='ccp', title="CCP config and credentials", help="")) config.register_opts(group='ccp', opts=ccp_opts) config.register_group(cfg.OptGroup(name='os', title="Openstack config and credentials", help="")) config.register_opts(group='os', opts=os_opts) config.register_group( cfg.OptGroup(name='os_deploy', title="Openstack deploy config and credentials", help="")) config.register_opts(group='os_deploy', opts=os_deploy_opts) return config
def post_config(conf): for service_provider in conf.service_providers: sp_group = cfg.OptGroup(name='sp_%s' % service_provider, title=service_provider) conf.register_opts(SP_OPTS, sp_group)
def _register_opts(self): """Register additional options specific to this filter plugin""" opts = [] option = cfg.StrOpt('failure_mode', choices=['reject', 'yield'], default='reject', help=_('Mode to operate in if Valet ' 'planning fails for any reason.')) # In the filter plugin space, there's no access to Nova's # keystone credentials, so we have to specify our own. # This also means we can't act as the user making the request # at scheduling-time. opts.append(option) option = cfg.StrOpt('admin_tenant_name', default=None, help=_('Valet Project Name')) opts.append(option) option = cfg.StrOpt('admin_username', default=None, help=_('Valet Username')) opts.append(option) option = cfg.StrOpt('admin_password', default=None, help=_('Valet Password')) opts.append(option) option = cfg.StrOpt('admin_auth_url', default=None, help=_('Keystone Authorization API Endpoint')) opts.append(option) opt_group = cfg.OptGroup('valet') cfg.CONF.register_group(opt_group) cfg.CONF.register_opts(opts, group=opt_group)
def start(): conf = service.prepare_service() if conf.statsd.resource_id is None: raise cfg.RequiredOptError("resource_id", cfg.OptGroup("statsd")) stats = Stats(conf) loop = asyncio.get_event_loop() # TODO(jd) Add TCP support listen = loop.create_datagram_endpoint( lambda: StatsdServer(stats), local_addr=(conf.statsd.host, conf.statsd.port)) def _flush(): loop.call_later(conf.statsd.flush_delay, _flush) stats.flush() loop.call_later(conf.statsd.flush_delay, _flush) transport, protocol = loop.run_until_complete(listen) LOG.info("Started on %s:%d", conf.statsd.host, conf.statsd.port) LOG.info("Flush delay: %d seconds", conf.statsd.flush_delay) try: loop.run_forever() except KeyboardInterrupt: pass transport.close() loop.close()
def _register_opts(self): '''Register options''' opts = [] option = cfg.StrOpt(self.opt_name_str, default=None, help=_('Valet API endpoint')) opts.append(option) option = cfg.IntOpt(self.opt_conn_timeout, default=3, help=_('Valet Plugin Connect Timeout')) opts.append(option) option = cfg.IntOpt(self.opt_read_timeout, default=5, help=_('Valet Plugin Read Timeout')) opts.append(option) opt_group = cfg.OptGroup(self.opt_group_str) cfg.CONF.register_group(opt_group) cfg.CONF.register_opts(opts, group=opt_group) # TODO(JD): Keep stack param for now. We may need it again.
def register_opts(conf): conf.register_opts(default_opts) conf.register_opt(rpcapi_cap_opt, UPGRADE_GROUP_NAME) trust_group = cfg.OptGroup(name=TRUSTED_GROUP_NAME, title="Trust parameters") conf.register_group(trust_group) conf.register_opts(trusted_opts, group=trust_group) conf.register_opts(metrics_weight_opts, group=METRICS_GROUP_NAME)
def acquire_service_pub_ip_info(self, spec_type, spec_lb_ip, project_id): if spec_type != 'LoadBalancer': return None if spec_lb_ip: user_specified_ip = spec_lb_ip.format() res_id = self._drv_pub_ip.is_ip_available(user_specified_ip) if res_id: service_pub_ip_info = (obj_lbaas.LBaaSPubIp( ip_id=res_id, ip_addr=str(user_specified_ip), alloc_method='user')) return service_pub_ip_info else: # user specified IP is not valid LOG.error("IP=%s is not available", user_specified_ip) return None else: LOG.debug("Trying to allocate public ip from pool") # get public subnet id from kuryr.conf external_svc_subnet = config.CONF.neutron_defaults.external_svc_subnet if not external_svc_subnet: raise cfg.RequiredOptError('external_svc_subnet', cfg.OptGroup('neutron_defaults')) neutron = clients.get_neutron_client() n_subnet = neutron.show_subnet(external_svc_subnet).get('subnet') if not n_subnet: LOG.error( "No subnet found for external_svc_subnet=%s", external_svc_subnet) raise kl_exc.NoResourceException public_network_id = n_subnet['network_id'] res_id, alloc_ip_addr = (self._drv_pub_ip.allocate_ip (public_network_id, external_svc_subnet, project_id, 'kuryr_lb')) service_pub_ip_info = obj_lbaas.LBaaSPubIp(ip_id=res_id, ip_addr=alloc_ip_addr, alloc_method='pool') return service_pub_ip_info
def create(self, **kwargs): """ Handler to create Dashboard :param kwargs: :return: response for the request """ """ CONF = cfg.CONF opt_group = cfg.OptGroup(name='gapi', title='Options for the\ api service') CONF.register_group(opt_group) CONF.register_opts(API_SERVICE_OPTS, opt_group) CONF.set_override('topic', CONF.gapi.topic, opt_group) CONF.set_override('rabbitmq_host', CONF.gapi.rabbitmq_host, opt_group) """ names_list = None search_string = None search_type = None exclude = 0 name = kwargs['name'] if not "names_list" in kwargs.keys() or kwargs['names_list'] is None: search_type = kwargs['search_type'] search_string = kwargs['search_string'] if "exclude" in kwargs.keys() and kwargs["exclude"]: #names_list = kwargs['search_type']+"!~"+kwargs['search_string'] exclude = 1 #else: #names_list = kwargs['search_type']+"=~"+kwargs['search_string'] else: names_list = ','.join(kwargs['names_list']) metrics_list = ','.join(str(kwargs['metrics_list'])) d_url = os.getenv('renderer_endpoint') + name status = "In Progress" create_datetime = str(datetime.datetime.now()) sql_query = query_list.INSERT_INTO_DASHBOARD params = [name, names_list, metrics_list, search_string, search_type, d_url, status, create_datetime, create_datetime, exclude] try: conn = sql_helper.engine.connect() conn.execute(sql_query, params) except SQLAlchemyError as ex: return "A database exception has been hit and dashboard creation\ has failed %s" % ex.message log.info("Initializing a client connection") try: mq_client = client.Client(CONF.gapi.topic, CONF.gapi.rabbitmq_host) ctxt = {} log.info("Publishing message to rabbitmq") mq_client.rpc_client.cast(ctxt, 'render_graph', message=kwargs) except Exception as ex: return "A messaging exception has been hit and dashboard creation\ has failed" return "Dashboard creation request has been accepted and the new\ dashboard will be available @ %s" % d_url
def configure(): """Register configuration.""" CONF.register_cli_opts(build_os_options()) CONF.register_opts(_COMMON) monitors_grp = cfg.OptGroup('monitoring', title='Monitoring', help='Monitoring Driver/plugin to be used to ' 'monitor compute nodes') CONF.register_group(monitors_grp) CONF.register_opts(_MONITORS, group='monitoring') fencers_grp = cfg.OptGroup('fencer', title='fencer Options', help='fencer Driver/plugin to be used to ' 'fence compute nodes') CONF.register_group(fencers_grp) CONF.register_opts(_FENCER, group='fencer') # Evacuation Section :) evacuators_grp = cfg.OptGroup('evacuation', title='Evacuation Options', help='Evacuation Driver/plugin opts to be ' 'used to Evacuate compute nodes') CONF.register_group(evacuators_grp) CONF.register_opts(_EVACUATION, group='evacuation') # Notification Section :) notifiers_grp = cfg.OptGroup('notifiers', title='Notification Options', help='Notification Driver/plugin opts to be ' 'used to Notify admins/users if failure ' 'happens') CONF.register_group(notifiers_grp) CONF.register_opts(_NOTIFIERS, group='notifiers') # Keystone Auth keystone_grp = cfg.OptGroup('keystone_authtoken', title='Keystone Auth Options', help='OpenStack Credentials to call the nova ' 'APIs to evacuate ') CONF.register_group(keystone_grp) CONF.register_opts(_KEYSTONE_AUTH_TOKEN, group='keystone_authtoken') default_conf = cfg.find_config_files('freezer', 'freezer-dr', '.conf') log.register_options(CONF) CONF(args=sys.argv[1:], project='freezer', default_config_files=default_conf, version=FREEZER_DR_VERSION)