我们从Python开源项目中,提取了以下18个代码示例,用于说明如何使用oslo_config.cfg.IntOpt()。
def load_repositories_options(): repo_opts = [ cfg.StrOpt( 'offsets', default='monasca_transform.offset_specs:JSONOffsetSpecs', help='Repository for offset persistence' ), cfg.StrOpt( 'data_driven_specs', default='monasca_transform.data_driven_specs.' 'json_data_driven_specs_repo:JSONDataDrivenSpecsRepo', help='Repository for metric and event data_driven_specs' ), cfg.IntOpt('offsets_max_revisions', default=10, help="Max revisions of offsets for each application") ] repo_group = cfg.OptGroup(name='repositories', title='repositories') cfg.CONF.register_group(repo_group) cfg.CONF.register_opts(repo_opts, group=repo_group)
def load_pre_hourly_processor_options(): app_opts = [ cfg.IntOpt('late_metric_slack_time', default=600), cfg.StrOpt('data_provider', default='monasca_transform.processor.' 'pre_hourly_processor:' 'PreHourlyProcessorDataProvider'), cfg.BoolOpt('enable_instance_usage_df_cache'), cfg.StrOpt('instance_usage_df_cache_storage_level'), cfg.BoolOpt('enable_batch_time_filtering'), cfg.IntOpt('effective_batch_revision', default=2) ] app_group = cfg.OptGroup(name='pre_hourly_processor', title='pre_hourly_processor') cfg.CONF.register_group(app_group) cfg.CONF.register_opts(app_opts, group=app_group)
def list_opts(): return [ ('DEFAULT', [ # FIXME(jd) Move to [api] cfg.StrOpt('api_paste_config', default="api_paste.ini", help="Configuration file for WSGI definition of API."), ]), ('api', [ cfg.IntOpt('default_api_return_limit', min=1, default=100, help='Default maximum number of ' 'items returned by API request.'), ]), ('database', panko.storage.OPTS), ('storage', STORAGE_OPTS), ]
def setUp(self): super(SqliteConfFixture, self).setUp() self.register_opt(cfg.StrOpt('connection', default=''), group='database') self.register_opt(cfg.IntOpt('max_pool_size', default=None), group='database') self.register_opt(cfg.IntOpt('idle_timeout', default=None), group='database') self.register_opts(cli.MIGRATION_OPTS) self.url = db_test_utils.get_connect_string("sqlite") self.set_default('connection', self.url, group='database') self.set_default('disable_microsecond_data_migration', False) lockutils.set_defaults(lock_path='/tmp') self._drop_db() self.addCleanup(self.cleanup)
def _register_opts(self): """Register oslo.config options""" opts = [] option = cfg.StrOpt('url', default=None, help=_('API endpoint url')) opts.append(option) option = cfg.IntOpt('read_timeout', default=5, help=_('API read timeout in seconds')) opts.append(option) option = cfg.IntOpt('retries', default=3, help=_('API request retries')) opts.append(option) opt_group = cfg.OptGroup('valet') CONF.register_group(opt_group) CONF.register_opts(opts, group=opt_group)
def load_service_options(): service_opts = [ cfg.StrOpt('coordinator_address'), cfg.StrOpt('coordinator_group'), cfg.FloatOpt('election_polling_frequency'), cfg.BoolOpt('enable_debug_log_entries', default='false'), cfg.StrOpt('setup_file'), cfg.StrOpt('setup_target'), cfg.StrOpt('spark_driver'), cfg.StrOpt('service_log_path'), cfg.StrOpt('service_log_filename', default='monasca-transform.log'), cfg.StrOpt('spark_event_logging_dest'), cfg.StrOpt('spark_event_logging_enabled'), cfg.StrOpt('spark_jars_list'), cfg.StrOpt('spark_master_list'), cfg.StrOpt('spark_python_files'), cfg.IntOpt('stream_interval'), cfg.StrOpt('work_dir'), cfg.StrOpt('spark_home'), cfg.BoolOpt('enable_record_store_df_cache'), cfg.StrOpt('record_store_df_cache_storage_level') ] service_group = cfg.OptGroup(name='service', title='service') cfg.CONF.register_group(service_group) cfg.CONF.register_opts(service_opts, group=service_group)
def setUp(self): super(MySQLConfFixture, self).setUp() self.register_opt(cfg.IntOpt('max_pool_size', default=20), group='database') self.register_opt(cfg.IntOpt('idle_timeout', default=3600), group='database') self.register_opt(cfg.StrOpt('connection', default=''), group='database') self.url = db_test_utils.get_connect_string("mysql") self.set_default('connection', self.url, group='database') lockutils.set_defaults(lock_path='/tmp') self._drop_db()
def setUp(self): super(PostgresConfFixture, self).setUp() self.register_opt(cfg.StrOpt('connection', default=''), group='database') self.register_opt(cfg.IntOpt('max_pool_size', default=20), group='database') self.register_opt(cfg.IntOpt('idle_timeout', default=3600), group='database') self.register_opts(cli.MIGRATION_OPTS) self.url = db_test_utils.get_connect_string("postgres") self.set_default('connection', self.url, group='database') self.set_default('disable_microsecond_data_migration', False) lockutils.set_defaults(lock_path='/tmp') self._drop_db()
def __init__(self): super(QueueManager, self).__init__(name="QueueManager") self.config_opts = [ cfg.StrOpt("db_connection", help="the DB url", required=True), cfg.IntOpt('db_pool_size', default=10, required=False), cfg.IntOpt('db_pool_recycle', default=30, required=False), cfg.IntOpt('db_max_overflow', default=5, required=False) ] self.queues = {}
def __init__(self): super(ProjectManager, self).__init__(name="ProjectManager") self.config_opts = [ cfg.IntOpt("default_TTL", default=1440, required=False), cfg.FloatOpt("default_share", default=10.0, required=False), cfg.StrOpt("db_connection", help="the DB url", required=True), cfg.IntOpt('db_pool_size', default=10, required=False), cfg.IntOpt('db_pool_recycle', default=30, required=False), cfg.IntOpt('db_max_overflow', default=5, required=False) ] self.projects = {}
def __init__(self): super(FairShareManager, self).__init__("FairShareManager") self.config_opts = [ cfg.IntOpt('periods', default=3), cfg.IntOpt('period_length', default=7), cfg.FloatOpt('default_share', default=10.0), cfg.FloatOpt('decay_weight', default=0.5, help="the decay weight (float value [0,1])"), cfg.IntOpt('age_weight', default=10, help="the age weight"), cfg.IntOpt('vcpus_weight', default=100, help="the vcpus weight"), cfg.IntOpt('memory_weight', default=70, help="the memory weight") ]
def __init__(self): super(SchedulerManager, self).__init__("SchedulerManager") self.config_opts = [ cfg.IntOpt("backfill_depth", default=100), ] self.workers = []
def logger_conf(logger_name): return [ cfg.StrOpt('output_format', default="%(asctime)s - %(levelname)s - %(message)s"), cfg.BoolOpt('store', default=True), cfg.StrOpt('logging_level', default='debug'), cfg.StrOpt('logging_dir', default='/var/log/valet/'), cfg.StrOpt('logger_name', default=logger_name + ".log"), cfg.IntOpt('max_main_log_size', default=5000000), cfg.IntOpt('max_log_size', default=1000000), cfg.IntOpt('max_num_of_logs', default=3), ]
def metricd(): conf = cfg.ConfigOpts() conf.register_cli_opts([ cfg.IntOpt("stop-after-processing-metrics", default=0, min=0, help="Number of metrics to process without workers, " "for testing purpose"), ]) conf = service.prepare_service(conf=conf) if conf.stop_after_processing_metrics: metricd_tester(conf) else: MetricdServiceManager(conf).run()
def injector(): conf = cfg.ConfigOpts() conf.register_cli_opts([ cfg.IntOpt("metrics", default=1, min=1), cfg.StrOpt("archive-policy-name", default="low"), cfg.StrOpt("creator", default="admin"), cfg.IntOpt("batch-of-measures", default=1000), cfg.IntOpt("measures-per-batch", default=10), ]) conf = service.prepare_service(conf=conf) index = indexer.get_driver(conf) instore = incoming.get_driver(conf) def todo(): metric = index.create_metric( uuid.uuid4(), creator=conf.creator, archive_policy_name=conf.archive_policy_name) for _ in six.moves.range(conf.batch_of_measures): measures = [ incoming.Measure( utils.dt_in_unix_ns(utils.utcnow()), random.random()) for __ in six.moves.range(conf.measures_per_batch)] instore.add_measures(metric, measures) with futures.ThreadPoolExecutor(max_workers=conf.metrics) as executor: for m in six.moves.range(conf.metrics): executor.submit(todo)
def logger_conf(logger_name): return [ cfg.StrOpt('output_format', default="%(asctime)s - %(levelname)s - %(message)s"), # dict cfg.BoolOpt('store', default=True), cfg.StrOpt('logging_level', default='debug'), cfg.StrOpt('logging_dir', default='/var/log/valet/'), cfg.StrOpt('logger_name', default=logger_name + ".log"), cfg.IntOpt('max_main_log_size', default=5000000), cfg.IntOpt('max_log_size', default=1000000), cfg.IntOpt('max_num_of_logs', default=3), ]
def _register_opts(self): '''Register options''' opts = [] option = cfg.StrOpt(self.opt_name_str, default=None, help=_('Valet API endpoint')) opts.append(option) option = cfg.IntOpt(self.opt_conn_timeout, default=3, help=_('Valet Plugin Connect Timeout')) opts.append(option) option = cfg.IntOpt(self.opt_read_timeout, default=5, help=_('Valet Plugin Read Timeout')) opts.append(option) opt_group = cfg.OptGroup(self.opt_group_str) cfg.CONF.register_group(opt_group) cfg.CONF.register_opts(opts, group=opt_group) # TODO(JD): Keep stack param for now. We may need it again.
def __init__(self): super(KeystoneManager, self).__init__("KeystoneManager") self.config_opts = [ cfg.StrOpt("auth_url", help="the Keystone url (v3 only)", required=True), cfg.StrOpt("username", help="the name of user with admin role", required=True), cfg.StrOpt("user_domain_name", help="the user domain", default="default", required=False), cfg.StrOpt("password", help="the password of user with admin role", required=True), cfg.StrOpt("project_name", help="the project to request authorization on", required=True), cfg.StrOpt("project_domain_name", help="the project domain", default="default", required=False), cfg.StrOpt("project_id", help="the project id to request authorization on", required=False), cfg.IntOpt("timeout", help="set the http connection timeout", default=60, required=False), cfg.IntOpt("clock_skew", help="set the clock skew (seconds)", default=60, required=False), cfg.StrOpt("ssl_ca_file", help="set the PEM encoded Certificate Authority to " "use when verifying HTTPs connections", default=None, required=False), cfg.StrOpt("ssl_cert_file", help="set the SSL client certificate (PEM encoded)", default=None, required=False), cfg.StrOpt("amqp_url", help="the amqp transport url", default=None, required=True), cfg.StrOpt("amqp_exchange", help="the amqp exchange", default="keystone", required=False), cfg.StrOpt("amqp_topic", help="the notification topic", default="keystone_notification", required=False) ]