我们从Python开源项目中,提取了以下24个代码示例,用于说明如何使用oslo_config.cfg.ConfigOpts()。
def prepare_service(argv=None, config_files=None, share=False): conf = cfg.ConfigOpts() for group, options in opts.list_opts(): conf.register_opts(list(options), group=None if group == "DEFAULT" else group) db_options.set_defaults(conf) if not share: defaults.set_cors_middleware_defaults() oslo_i18n.enable_lazy() log.register_options(conf) if argv is None: argv = sys.argv conf(argv[1:], project='panko', validate_default_values=True, version=version.version_info.version_string(), default_config_files=config_files) if not share: log.setup(conf, 'panko') # NOTE(liusheng): guru cannot run with service under apache daemon, so when # panko-api running with mod_wsgi, the argv is [], we don't start # guru. if argv: gmr.TextGuruMeditation.setup_autorun(version) return conf
def paste_deploy_app(paste_config_file, app_name, conf): """Load a WSGI app from a PasteDeploy configuration. Use deploy.loadapp() to load the app from the PasteDeploy configuration, ensuring that the supplied ConfigOpts object is passed to the app and filter constructors. :param paste_config_file: a PasteDeploy config file :param app_name: the name of the app/pipeline to load from the file :param conf: a ConfigOpts object to supply to the app and its filters :returns: the WSGI app """ setup_paste_factories(conf) try: return deploy.loadapp("config:%s" % paste_config_file, name=app_name) finally: teardown_paste_factories()
def change_sack_size(): conf = cfg.ConfigOpts() conf.register_cli_opts([_SACK_NUMBER_OPT]) conf = service.prepare_service(conf=conf, log_to_std=True) s = incoming.get_driver(conf) try: report = s.measures_report(details=False) except incoming.SackDetectionError: LOG.error('Unable to detect the number of storage sacks.\n' 'Ensure gnocchi-upgrade has been executed.') return remainder = report['summary']['measures'] if remainder: LOG.error('Cannot change sack when non-empty backlog. Process ' 'remaining %s measures and try again', remainder) return LOG.info("Changing sack size to: %s", conf.sacks_number) old_num_sacks = s.NUM_SACKS s.set_storage_settings(conf.sacks_number) s.remove_sack_group(old_num_sacks)
def setUp(self): super(TestCase, self).setUp() self.conf = cfg.ConfigOpts() default_config_files = self.get_default_config_files() common_config.parse(self.conf, [], default_config_files=default_config_files) # NOTE(jeffrey4l): mock the _get_image_dir method to return a fake # docker images dir self.useFixture(fixtures.MockPatch( 'kolla.cmd.build.KollaWorker._get_images_dir', mock.Mock(return_value=os.path.join(TESTS_ROOT, 'docker'))))
def main(): conf = cfg.ConfigOpts() common_config.parse(conf, sys.argv[1:], prog='kolla-build') retrieve_local_versions() retrieve_upstream_versions() compare_versions()
def __init__(self, worker_id): conf = cfg.ConfigOpts() conf([], project='gnocchi', validate_default_values=True, version="0.1") oslo_config_glue.load_options(self, conf)
def get_config(): config = cfg.ConfigOpts() config(['--config-file', 'castellan.conf']) return config
def oslo_app(): conf = cfg.ConfigOpts() conf([], project='openstack-app', validate_default_values=True, version="0.1") p = cotyledon.ServiceManager() oslo_config_glue.setup(p, conf) p.add(OsloService) p.run()
def start(self, application, default_port): """Run a WSGI server with the given application. :param application: The application to run in the WSGI server :param conf: a cfg.ConfigOpts object :param default_port: Port to bind to if none is specified in conf """ eventlet.wsgi.MAX_HEADER_LINE = self.conf.max_header_line self.application = application self.default_port = default_port self.configure_socket() self.start_wsgi()
def setup_paste_factories(conf): """Set up the generic paste app and filter factories. The app factories are constructed at runtime to allow us to pass a ConfigOpts object to the WSGI classes. :param conf: a ConfigOpts object """ global app_factory, filter_factory app_factory = AppFactory(conf) filter_factory = FilterFactory(conf)
def test_parser(argv): options_keys = ['dha', 'network', 'cluster', 'host', 'install', 'isbare', 'scenario'] conf = cfg.ConfigOpts() parse(conf, argv) for option in options_keys: if conf[option]: if option == 'isbare': argv[argv.index('--' + option) + 1] = int(argv[argv.index('--' + option) + 1]) assert conf[option] == argv[argv.index('--' + option) + 1]
def get_conf_from_deploy(): conf = cfg.ConfigOpts() parse(conf, sys.argv[1:]) daisyserver_size, controller_node_size, compute_node_size,\ daisy_passwd, daisy_ip, daisy_gateway,\ hosts_num = get_yml_para(conf['dha']) print "{hosts_num} {ip} {passwd} -s {size} -g {gateway}".format( hosts_num=hosts_num, passwd=daisy_passwd, size=daisyserver_size, ip=daisy_ip, gateway=daisy_gateway)
def get_config_parser(): conf = cfg.ConfigOpts() conf.register_cli_opt( cfg.StrOpt( 'repo_root', default='.', help='directory containing the git repositories', ) ) return conf
def upgrade(): conf = cfg.ConfigOpts() sack_number_opt = copy.copy(_SACK_NUMBER_OPT) sack_number_opt.default = 128 conf.register_cli_opts([ cfg.BoolOpt("skip-index", default=False, help="Skip index upgrade."), cfg.BoolOpt("skip-storage", default=False, help="Skip storage upgrade."), cfg.BoolOpt("skip-incoming", default=False, help="Skip incoming storage upgrade."), cfg.BoolOpt("skip-archive-policies-creation", default=False, help="Skip default archive policies creation."), sack_number_opt, ]) conf = service.prepare_service(conf=conf, log_to_std=True) if not conf.skip_index: index = indexer.get_driver(conf) LOG.info("Upgrading indexer %s", index) index.upgrade() if not conf.skip_storage: # FIXME(jd) Pass None as coordinator because it's not needed in this # case. This will be removed when the storage will stop requiring a # coordinator object. s = storage.get_driver(conf, None) LOG.info("Upgrading storage %s", s) s.upgrade() if not conf.skip_incoming: i = incoming.get_driver(conf) LOG.info("Upgrading incoming storage %s", i) i.upgrade(conf.sacks_number) if (not conf.skip_archive_policies_creation and not index.list_archive_policies() and not index.list_archive_policy_rules()): if conf.skip_index: index = indexer.get_driver(conf) for name, ap in six.iteritems(archive_policy.DEFAULT_ARCHIVE_POLICIES): index.create_archive_policy(ap) index.create_archive_policy_rule("default", "*", "low")
def metricd(): conf = cfg.ConfigOpts() conf.register_cli_opts([ cfg.IntOpt("stop-after-processing-metrics", default=0, min=0, help="Number of metrics to process without workers, " "for testing purpose"), ]) conf = service.prepare_service(conf=conf) if conf.stop_after_processing_metrics: metricd_tester(conf) else: MetricdServiceManager(conf).run()
def prepare_service(conf=None): if conf is None: conf = cfg.ConfigOpts() opts.set_defaults() policy_opts.set_defaults(conf) conf = service.prepare_service(conf=conf) cfg_path = conf.oslo_policy.policy_file if not os.path.isabs(cfg_path): cfg_path = conf.find_file(cfg_path) if cfg_path is None or not os.path.exists(cfg_path): cfg_path = os.path.abspath(os.path.join(os.path.dirname(__file__), '..', 'rest', 'policy.json')) conf.set_default('policy_file', cfg_path, group='oslo_policy') return conf
def injector(): conf = cfg.ConfigOpts() conf.register_cli_opts([ cfg.IntOpt("metrics", default=1, min=1), cfg.StrOpt("archive-policy-name", default="low"), cfg.StrOpt("creator", default="admin"), cfg.IntOpt("batch-of-measures", default=1000), cfg.IntOpt("measures-per-batch", default=10), ]) conf = service.prepare_service(conf=conf) index = indexer.get_driver(conf) instore = incoming.get_driver(conf) def todo(): metric = index.create_metric( uuid.uuid4(), creator=conf.creator, archive_policy_name=conf.archive_policy_name) for _ in six.moves.range(conf.batch_of_measures): measures = [ incoming.Measure( utils.dt_in_unix_ns(utils.utcnow()), random.random()) for __ in six.moves.range(conf.measures_per_batch)] instore.add_measures(metric, measures) with futures.ThreadPoolExecutor(max_workers=conf.metrics) as executor: for m in six.moves.range(conf.metrics): executor.submit(todo)
def make_config(): conf = cfg.ConfigOpts() conf.register_cli_opts(cli_opts) log.register_options(conf) return conf
def __call__(self, args=None, default_config_files=None): if default_config_files is None: default_config_files = [] return cfg.ConfigOpts.__call__( self, args=args, prog='test', version='1.0', usage='%(prog)s FOO BAR', default_config_files=default_config_files, validate_default_values=True)
def setUp(self): super(TestCase, self).setUp() self.conf = cfg.ConfigOpts() default_config_files = self.get_default_config_files() common_config.parse(self.conf, [], default_config_files=default_config_files) # NOTE(jeffrey4l): mock the _get_image_dir method to return a fake # docker images dir self.useFixture(fixtures.MockPatch( 'kolla.image.build.KollaWorker._get_images_dir', mock.Mock(return_value=os.path.join(TESTS_ROOT, 'docker'))))
def main(): conf = cfg.ConfigOpts() common_config.parse(conf, sys.argv[1:], prog='kolla-build') if conf.debug: LOG.setLevel(logging.DEBUG) kolla = KollaWorker(conf) kolla.setup_working_dir() kolla.find_dockerfiles() kolla.create_dockerfiles() if conf.template_only: LOG.info('Dockerfiles are generated in %s', kolla.working_dir) return # We set the atime and mtime to 0 epoch to preserve allow the Docker cache # to work like we want. A different size or hash will still force a rebuild kolla.set_time() queue = kolla.build_queue() push_queue = six.moves.queue.Queue() if conf.save_dependency: kolla.save_dependency(conf.save_dependency) LOG.info('Docker images dependency is saved in %s', conf.save_dependency) return for x in six.moves.range(conf.threads): worker = WorkerThread(queue, push_queue, conf) worker.setDaemon(True) worker.start() for x in six.moves.range(conf.push_threads): push_thread = PushThread(conf, push_queue) push_thread.start() # block until queue is empty queue.join() push_queue.join() kolla.summary() kolla.cleanup() return kolla.get_image_statuses()
def get_socket(conf, default_port): '''Bind socket to bind ip:port in conf :param conf: a cfg.ConfigOpts object :param default_port: port to bind to if none is specified in conf :returns : a socket object as returned from socket.listen or ssl.wrap_socket if conf specifies cert_file ''' bind_addr = get_bind_addr(conf, default_port) # TODO(jaypipes): eventlet's greened socket module does not actually # support IPv6 in getaddrinfo(). We need to get around this in the # future or monitor upstream for a fix address_family = [addr[0] for addr in socket.getaddrinfo(bind_addr[0], bind_addr[1], socket.AF_UNSPEC, socket.SOCK_STREAM) if addr[0] in (socket.AF_INET, socket.AF_INET6)][0] cert_file = conf.cert_file key_file = conf.key_file use_ssl = cert_file or key_file if use_ssl and (not cert_file or not key_file): raise RuntimeError(_("When running server in SSL mode, you must " "specify both a cert_file and key_file " "option value in your configuration file")) sock = None retry_until = time.time() + 30 while not sock and time.time() < retry_until: try: sock = eventlet.listen(bind_addr, backlog=conf.backlog, family=address_family) except socket.error as err: if err.args[0] != errno.EADDRINUSE: raise eventlet.sleep(0.1) if not sock: raise RuntimeError(_("Could not bind to %(bind_addr)s after trying " " 30 seconds") % {'bind_addr': bind_addr}) return sock
def api(): # Compat with previous pbr script try: double_dash = sys.argv.index("--") except ValueError: double_dash = None else: sys.argv.pop(double_dash) conf = cfg.ConfigOpts() for opt in opts.API_OPTS: # NOTE(jd) Register the API options without a default, so they are only # used to override the one in the config file c = copy.copy(opt) c.default = None conf.register_cli_opt(c) conf = prepare_service(conf) if double_dash is not None: # NOTE(jd) Wait to this stage to log so we're sure the logging system # is in place LOG.warning( "No need to pass `--' in gnocchi-api command line anymore, " "please remove") uwsgi = spawn.find_executable("uwsgi") if not uwsgi: LOG.error("Unable to find `uwsgi'.\n" "Be sure it is installed and in $PATH.") return 1 workers = utils.get_default_workers() args = [ "--if-not-plugin", "python", "--plugin", "python", "--endif", "--http-socket", "%s:%d" % (conf.host or conf.api.host, conf.port or conf.api.port), "--master", "--enable-threads", "--die-on-term", # NOTE(jd) See https://github.com/gnocchixyz/gnocchi/issues/156 "--add-header", "Connection: close", "--processes", str(math.floor(workers * 1.5)), "--threads", str(workers), "--lazy-apps", "--chdir", "/", "--wsgi", "gnocchi.rest.wsgi", "--pyargv", " ".join(sys.argv[1:]), ] virtual_env = os.getenv("VIRTUAL_ENV") if virtual_env is not None: args.extend(["-H", os.getenv("VIRTUAL_ENV", ".")]) return os.execl(uwsgi, uwsgi, *args)