我们从Python开源项目中,提取了以下31个代码示例,用于说明如何使用oslo_config.cfg.StrOpt()。
def load_repositories_options(): repo_opts = [ cfg.StrOpt( 'offsets', default='monasca_transform.offset_specs:JSONOffsetSpecs', help='Repository for offset persistence' ), cfg.StrOpt( 'data_driven_specs', default='monasca_transform.data_driven_specs.' 'json_data_driven_specs_repo:JSONDataDrivenSpecsRepo', help='Repository for metric and event data_driven_specs' ), cfg.IntOpt('offsets_max_revisions', default=10, help="Max revisions of offsets for each application") ] repo_group = cfg.OptGroup(name='repositories', title='repositories') cfg.CONF.register_group(repo_group) cfg.CONF.register_opts(repo_opts, group=repo_group)
def load_messaging_options(): messaging_options = [ cfg.StrOpt('adapter', default='monasca_transform.messaging.adapter:' 'KafkaMessageAdapter', help='Message adapter implementation'), cfg.StrOpt('topic', default='metrics', help='Messaging topic'), cfg.StrOpt('brokers', default='192.168.10.4:9092', help='Messaging brokers'), cfg.StrOpt('publish_kafka_project_id', default='111111', help='publish aggregated metrics tenant'), cfg.StrOpt('adapter_pre_hourly', default='monasca_transform.messaging.adapter:' 'KafkaMessageAdapterPreHourly', help='Message adapter implementation'), cfg.StrOpt('topic_pre_hourly', default='metrics_pre_hourly', help='Messaging topic pre hourly') ] messaging_group = cfg.OptGroup(name='messaging', title='messaging') cfg.CONF.register_group(messaging_group) cfg.CONF.register_opts(messaging_options, group=messaging_group)
def load_pre_hourly_processor_options(): app_opts = [ cfg.IntOpt('late_metric_slack_time', default=600), cfg.StrOpt('data_provider', default='monasca_transform.processor.' 'pre_hourly_processor:' 'PreHourlyProcessorDataProvider'), cfg.BoolOpt('enable_instance_usage_df_cache'), cfg.StrOpt('instance_usage_df_cache_storage_level'), cfg.BoolOpt('enable_batch_time_filtering'), cfg.IntOpt('effective_batch_revision', default=2) ] app_group = cfg.OptGroup(name='pre_hourly_processor', title='pre_hourly_processor') cfg.CONF.register_group(app_group) cfg.CONF.register_opts(app_opts, group=app_group)
def list_opts(): return [ ('DEFAULT', [ # FIXME(jd) Move to [api] cfg.StrOpt('api_paste_config', default="api_paste.ini", help="Configuration file for WSGI definition of API."), ]), ('api', [ cfg.IntOpt('default_api_return_limit', min=1, default=100, help='Default maximum number of ' 'items returned by API request.'), ]), ('database', panko.storage.OPTS), ('storage', STORAGE_OPTS), ]
def setUp(self): super(SqliteConfFixture, self).setUp() self.register_opt(cfg.StrOpt('connection', default=''), group='database') self.register_opt(cfg.IntOpt('max_pool_size', default=None), group='database') self.register_opt(cfg.IntOpt('idle_timeout', default=None), group='database') self.register_opts(cli.MIGRATION_OPTS) self.url = db_test_utils.get_connect_string("sqlite") self.set_default('connection', self.url, group='database') self.set_default('disable_microsecond_data_migration', False) lockutils.set_defaults(lock_path='/tmp') self._drop_db() self.addCleanup(self.cleanup)
def _register_opts(self): """Register oslo.config options""" opts = [] option = cfg.StrOpt('url', default=None, help=_('API endpoint url')) opts.append(option) option = cfg.IntOpt('read_timeout', default=5, help=_('API read timeout in seconds')) opts.append(option) option = cfg.IntOpt('retries', default=3, help=_('API request retries')) opts.append(option) opt_group = cfg.OptGroup('valet') CONF.register_group(opt_group) CONF.register_opts(opts, group=opt_group)
def __init__(self, conf_path): self.conf_path = conf_path self.opt_group = cfg.OptGroup(name='endpoint', title='Get the endpoints for keystone') self.endpoint_opts = [cfg.StrOpt('endpoint', default='None', help=('URL or IP address where OpenStack keystone runs.')) ] CONF = cfg.CONF CONF.register_group(self.opt_group) CONF.register_opts(self.endpoint_opts, self.opt_group) CONF(default_config_files=[self.conf_path]) self.AUTH_ENDPOINT = CONF.endpoint.endpoint
def _register_opts(self): '''Register Options''' opts = [] option = cfg.StrOpt(self.opt_failure_mode_str, choices=['reject', 'yield'], default='reject', help=_('Mode to operate in if Valet planning fails for any reason.')) opts.append(option) option = cfg.StrOpt(self.opt_project_name_str, default=None, help=_('Valet Project Name')) opts.append(option) option = cfg.StrOpt(self.opt_username_str, default=None, help=_('Valet Username')) opts.append(option) option = cfg.StrOpt(self.opt_password_str, default=None, help=_('Valet Password')) opts.append(option) option = cfg.StrOpt(self.opt_auth_uri_str, default=None, help=_('Keystone Authorization API Endpoint')) opts.append(option) opt_group = cfg.OptGroup(self.opt_group_str) cfg.CONF.register_group(opt_group) cfg.CONF.register_opts(opts, group=opt_group) # TODO(JD): Factor out common code between this and the cinder filter
def load_config(self): """Load config options from nova config file or command line (for example: /etc/nova/nova.conf) Sample settings in nova config: [kozinaki_EC2] user=AKIAJR7NAEIZPWSTFBEQ key=zv9zSem8OE+k/axFkPCgZ3z3tLrhvFBaIIa0Ik0j """ provider_opts = [ cfg.StrOpt('aws_secret_access_key', help='AWS secret key', secret=True), cfg.StrOpt('aws_access_key_id', help='AWS access key id', secret=True), cfg.StrOpt('region', help='AWS region name'), ] cfg.CONF.register_opts(provider_opts, self.config_name) return cfg.CONF[self.config_name]
def load_config(self): """Load config options from nova config file or command line (for example: /etc/nova/nova.conf) Sample settings in nova config: [kozinaki_EC2] user=AKIAJR7NAEIZPWSTFBEQ key=zv9zSem8OE+k/axFkPCgZ3z3tLrhvFBaIIa0Ik0j """ provider_opts = [ cfg.StrOpt('path_to_json_token', help='Google API json token file', secret=True), cfg.StrOpt('project', help='Google project id'), cfg.StrOpt('zone', help='Google zone name'), ] cfg.CONF.register_opts(provider_opts, self.config_name) return cfg.CONF[self.config_name]
def load_config(self): """Load config options from nova config file or command line (for example: /etc/nova/nova.conf) Sample settings in nova config: [kozinaki_EC2] user=AKIAJR7NAEIZPWSTFBEQ key=zv9zSem8OE+k/axFkPCgZ3z3tLrhvFBaIIa0Ik0j """ provider_cls = get_libcloud_driver(getattr(Provider, self.provider_name)) provider_cls_info = inspect.getargspec(provider_cls.__init__) provider_opts = [cfg.StrOpt(arg) for arg in provider_cls_info.args] provider_opts.append(cfg.StrOpt('location')) provider_opts.append(cfg.StrOpt('root_password')) provider_opts.append(cfg.StrOpt('project_id')) cfg.CONF.register_opts(provider_opts, self.config_name) return cfg.CONF[self.config_name]
def load_database_options(): db_opts = [ cfg.StrOpt('server_type'), cfg.StrOpt('host'), cfg.StrOpt('database_name'), cfg.StrOpt('username'), cfg.StrOpt('password'), cfg.BoolOpt('use_ssl', default=False), cfg.StrOpt('ca_file') ] mysql_group = cfg.OptGroup(name='database', title='database') cfg.CONF.register_group(mysql_group) cfg.CONF.register_opts(db_opts, group=mysql_group)
def load_service_options(): service_opts = [ cfg.StrOpt('coordinator_address'), cfg.StrOpt('coordinator_group'), cfg.FloatOpt('election_polling_frequency'), cfg.BoolOpt('enable_debug_log_entries', default='false'), cfg.StrOpt('setup_file'), cfg.StrOpt('setup_target'), cfg.StrOpt('spark_driver'), cfg.StrOpt('service_log_path'), cfg.StrOpt('service_log_filename', default='monasca-transform.log'), cfg.StrOpt('spark_event_logging_dest'), cfg.StrOpt('spark_event_logging_enabled'), cfg.StrOpt('spark_jars_list'), cfg.StrOpt('spark_master_list'), cfg.StrOpt('spark_python_files'), cfg.IntOpt('stream_interval'), cfg.StrOpt('work_dir'), cfg.StrOpt('spark_home'), cfg.BoolOpt('enable_record_store_df_cache'), cfg.StrOpt('record_store_df_cache_storage_level') ] service_group = cfg.OptGroup(name='service', title='service') cfg.CONF.register_group(service_group) cfg.CONF.register_opts(service_opts, group=service_group)
def get_source_opts(type_=None, location=None, reference=None): return [cfg.StrOpt('type', choices=['local', 'git', 'url'], default=type_, help='Source location type'), cfg.StrOpt('location', default=location, help='The location for source install'), cfg.StrOpt('reference', default=reference, help=('Git reference to pull, commit sha, tag ' 'or branch name'))]
def __metric_opts(self): """List of options to be used in metric defined sections""" return [ cfg.StrOpt("metric_name", help="Metric Name used to log monitoring information" " in Monasca", required=True), cfg.DictOpt("dimensions", default={}, help="Dict that contains dimensions information. " "component:nova-compute,service:compute", ), cfg.StrOpt("aggregate", choices=["any", "all"], help="How to consider the compute node is down. If you " "metric reports many states, like checking " "different services on the compute host, should we" " consider if one component down all are down or" " only if all components are down. Default is all." " This means if all components fail, freezer-dr" " will consider the host failed", default='all' ), cfg.StrOpt("undetermined", choices=['OK', 'ALARM'], default='ALARM', help="How to handle UNDETERMINED states. It can be " "ignored, will be considered OK state or can be " "considered ALARM. Default is ALARM") ]
def setUp(self): super(MySQLConfFixture, self).setUp() self.register_opt(cfg.IntOpt('max_pool_size', default=20), group='database') self.register_opt(cfg.IntOpt('idle_timeout', default=3600), group='database') self.register_opt(cfg.StrOpt('connection', default=''), group='database') self.url = db_test_utils.get_connect_string("mysql") self.set_default('connection', self.url, group='database') lockutils.set_defaults(lock_path='/tmp') self._drop_db()
def setUp(self): super(PostgresConfFixture, self).setUp() self.register_opt(cfg.StrOpt('connection', default=''), group='database') self.register_opt(cfg.IntOpt('max_pool_size', default=20), group='database') self.register_opt(cfg.IntOpt('idle_timeout', default=3600), group='database') self.register_opts(cli.MIGRATION_OPTS) self.url = db_test_utils.get_connect_string("postgres") self.set_default('connection', self.url, group='database') self.set_default('disable_microsecond_data_migration', False) lockutils.set_defaults(lock_path='/tmp') self._drop_db()
def __init__(self): super(QueueManager, self).__init__(name="QueueManager") self.config_opts = [ cfg.StrOpt("db_connection", help="the DB url", required=True), cfg.IntOpt('db_pool_size', default=10, required=False), cfg.IntOpt('db_pool_recycle', default=30, required=False), cfg.IntOpt('db_max_overflow', default=5, required=False) ] self.queues = {}
def __init__(self): super(ProjectManager, self).__init__(name="ProjectManager") self.config_opts = [ cfg.IntOpt("default_TTL", default=1440, required=False), cfg.FloatOpt("default_share", default=10.0, required=False), cfg.StrOpt("db_connection", help="the DB url", required=True), cfg.IntOpt('db_pool_size', default=10, required=False), cfg.IntOpt('db_pool_recycle', default=30, required=False), cfg.IntOpt('db_max_overflow', default=5, required=False) ] self.projects = {}
def get_options(self): options = super(TokenEndpoint, self).get_options() options.extend([ # Maintain name 'url' for compatibility cfg.StrOpt('url', help='Specific service endpoint to use'), cfg.StrOpt('token', secret=True, help='Authentication token to use'), ]) return options
def logger_conf(logger_name): return [ cfg.StrOpt('output_format', default="%(asctime)s - %(levelname)s - %(message)s"), cfg.BoolOpt('store', default=True), cfg.StrOpt('logging_level', default='debug'), cfg.StrOpt('logging_dir', default='/var/log/valet/'), cfg.StrOpt('logger_name', default=logger_name + ".log"), cfg.IntOpt('max_main_log_size', default=5000000), cfg.IntOpt('max_log_size', default=1000000), cfg.IntOpt('max_num_of_logs', default=3), ]
def _register_opts(self): """Register additional options specific to this filter plugin""" opts = [] option = cfg.StrOpt('failure_mode', choices=['reject', 'yield'], default='reject', help=_('Mode to operate in if Valet ' 'planning fails for any reason.')) # In the filter plugin space, there's no access to Nova's # keystone credentials, so we have to specify our own. # This also means we can't act as the user making the request # at scheduling-time. opts.append(option) option = cfg.StrOpt('admin_tenant_name', default=None, help=_('Valet Project Name')) opts.append(option) option = cfg.StrOpt('admin_username', default=None, help=_('Valet Username')) opts.append(option) option = cfg.StrOpt('admin_password', default=None, help=_('Valet Password')) opts.append(option) option = cfg.StrOpt('admin_auth_url', default=None, help=_('Keystone Authorization API Endpoint')) opts.append(option) opt_group = cfg.OptGroup('valet') cfg.CONF.register_group(opt_group) cfg.CONF.register_opts(opts, group=opt_group)
def get_config_parser(): conf = cfg.ConfigOpts() conf.register_cli_opt( cfg.StrOpt( 'repo_root', default='.', help='directory containing the git repositories', ) ) return conf
def injector(): conf = cfg.ConfigOpts() conf.register_cli_opts([ cfg.IntOpt("metrics", default=1, min=1), cfg.StrOpt("archive-policy-name", default="low"), cfg.StrOpt("creator", default="admin"), cfg.IntOpt("batch-of-measures", default=1000), cfg.IntOpt("measures-per-batch", default=10), ]) conf = service.prepare_service(conf=conf) index = indexer.get_driver(conf) instore = incoming.get_driver(conf) def todo(): metric = index.create_metric( uuid.uuid4(), creator=conf.creator, archive_policy_name=conf.archive_policy_name) for _ in six.moves.range(conf.batch_of_measures): measures = [ incoming.Measure( utils.dt_in_unix_ns(utils.utcnow()), random.random()) for __ in six.moves.range(conf.measures_per_batch)] instore.add_measures(metric, measures) with futures.ThreadPoolExecutor(max_workers=conf.metrics) as executor: for m in six.moves.range(conf.metrics): executor.submit(todo)
def init_config(conf): opts = [ cfg.StrOpt('endpoint_type', default='publicURL') ] conf.register_opts(opts, group="networking_sfc") return conf.networking_sfc
def logger_conf(logger_name): return [ cfg.StrOpt('output_format', default="%(asctime)s - %(levelname)s - %(message)s"), # dict cfg.BoolOpt('store', default=True), cfg.StrOpt('logging_level', default='debug'), cfg.StrOpt('logging_dir', default='/var/log/valet/'), cfg.StrOpt('logger_name', default=logger_name + ".log"), cfg.IntOpt('max_main_log_size', default=5000000), cfg.IntOpt('max_log_size', default=1000000), cfg.IntOpt('max_num_of_logs', default=3), ]
def _register_opts(self): '''Register options''' opts = [] option = cfg.StrOpt(self.opt_name_str, default=None, help=_('Valet API endpoint')) opts.append(option) option = cfg.IntOpt(self.opt_conn_timeout, default=3, help=_('Valet Plugin Connect Timeout')) opts.append(option) option = cfg.IntOpt(self.opt_read_timeout, default=5, help=_('Valet Plugin Read Timeout')) opts.append(option) opt_group = cfg.OptGroup(self.opt_group_str) cfg.CONF.register_group(opt_group) cfg.CONF.register_opts(opts, group=opt_group) # TODO(JD): Keep stack param for now. We may need it again.
def get_user_opts(uid, gid): return [ cfg.StrOpt('uid', default=uid, help='The user id'), cfg.StrOpt('gid', default=gid, help='The group id'), ]
def load_config(self): """Load config options from nova config file or command line (for example: /etc/nova/nova.conf) Sample settings in nova config: [kozinaki_EC2] user=AKIAJR7NAEIZPWSTFBEQ key=zv9zSem8OE+k/axFkPCgZ3z3tLrhvFBaIIa0Ik0j """ provider_opts = [ cfg.StrOpt('subscription_id', help='Subscribe is from azure portal settings'), cfg.StrOpt('key_file', help='API key to work with the cloud provider', secret=True), cfg.StrOpt('username', help='Default vm username'), cfg.StrOpt('password', help='Azure: default instance password. ' 'Password must be 6-72 characters long'), cfg.StrOpt('app_client_id', help='Azure app client id'), cfg.StrOpt('app_secret', help='Azure app secret'), cfg.StrOpt('app_tenant', help='Azure app tenant'), cfg.StrOpt('resource_group_name', help='Azure resource group name'), cfg.StrOpt('location', help='VM location'), cfg.StrOpt('storage_account_name', help='Azure storage account name'), cfg.StrOpt('os_disk_name', help='VM default disk name'), cfg.StrOpt('vnet_name', help='Azure default virtual network'), cfg.StrOpt('subnet_name', help='Azure default subnet name'), cfg.StrOpt('ip_config_name', help='Azure default ip config name'), cfg.StrOpt('nic_name', help='Azure default nic name'), cfg.StrOpt('cloud_service_name', help='Azure default cloud service name'), cfg.StrOpt('deployment_name', help='Azure default deployment name'), ] cfg.CONF.register_opts(provider_opts, self.config_name) return cfg.CONF[self.config_name]
def __init__(self): super(KeystoneManager, self).__init__("KeystoneManager") self.config_opts = [ cfg.StrOpt("auth_url", help="the Keystone url (v3 only)", required=True), cfg.StrOpt("username", help="the name of user with admin role", required=True), cfg.StrOpt("user_domain_name", help="the user domain", default="default", required=False), cfg.StrOpt("password", help="the password of user with admin role", required=True), cfg.StrOpt("project_name", help="the project to request authorization on", required=True), cfg.StrOpt("project_domain_name", help="the project domain", default="default", required=False), cfg.StrOpt("project_id", help="the project id to request authorization on", required=False), cfg.IntOpt("timeout", help="set the http connection timeout", default=60, required=False), cfg.IntOpt("clock_skew", help="set the clock skew (seconds)", default=60, required=False), cfg.StrOpt("ssl_ca_file", help="set the PEM encoded Certificate Authority to " "use when verifying HTTPs connections", default=None, required=False), cfg.StrOpt("ssl_cert_file", help="set the SSL client certificate (PEM encoded)", default=None, required=False), cfg.StrOpt("amqp_url", help="the amqp transport url", default=None, required=True), cfg.StrOpt("amqp_exchange", help="the amqp exchange", default="keystone", required=False), cfg.StrOpt("amqp_topic", help="the notification topic", default="keystone_notification", required=False) ]