Python oslo_config.cfg 模块,ConfigOpts() 实例源码

我们从Python开源项目中,提取了以下24个代码示例,用于说明如何使用oslo_config.cfg.ConfigOpts()

项目:panko    作者:openstack    | 项目源码 | 文件源码
def prepare_service(argv=None, config_files=None, share=False):
    conf = cfg.ConfigOpts()
    for group, options in opts.list_opts():
        conf.register_opts(list(options),
                           group=None if group == "DEFAULT" else group)
    db_options.set_defaults(conf)
    if not share:
        defaults.set_cors_middleware_defaults()
        oslo_i18n.enable_lazy()
        log.register_options(conf)

    if argv is None:
        argv = sys.argv
    conf(argv[1:], project='panko', validate_default_values=True,
         version=version.version_info.version_string(),
         default_config_files=config_files)

    if not share:
        log.setup(conf, 'panko')
    # NOTE(liusheng): guru cannot run with service under apache daemon, so when
    # panko-api running with mod_wsgi, the argv is [], we don't start
    # guru.
    if argv:
        gmr.TextGuruMeditation.setup_autorun(version)
    return conf
项目:bilean    作者:openstack    | 项目源码 | 文件源码
def paste_deploy_app(paste_config_file, app_name, conf):
    """Load a WSGI app from a PasteDeploy configuration.

    Use deploy.loadapp() to load the app from the PasteDeploy configuration,
    ensuring that the supplied ConfigOpts object is passed to the app and
    filter constructors.

    :param paste_config_file: a PasteDeploy config file
    :param app_name: the name of the app/pipeline to load from the file
    :param conf: a ConfigOpts object to supply to the app and its filters
    :returns: the WSGI app
    """
    setup_paste_factories(conf)
    try:
        return deploy.loadapp("config:%s" % paste_config_file, name=app_name)
    finally:
        teardown_paste_factories()
项目:gnocchi    作者:gnocchixyz    | 项目源码 | 文件源码
def change_sack_size():
    conf = cfg.ConfigOpts()
    conf.register_cli_opts([_SACK_NUMBER_OPT])
    conf = service.prepare_service(conf=conf, log_to_std=True)
    s = incoming.get_driver(conf)
    try:
        report = s.measures_report(details=False)
    except incoming.SackDetectionError:
        LOG.error('Unable to detect the number of storage sacks.\n'
                  'Ensure gnocchi-upgrade has been executed.')
        return
    remainder = report['summary']['measures']
    if remainder:
        LOG.error('Cannot change sack when non-empty backlog. Process '
                  'remaining %s measures and try again', remainder)
        return
    LOG.info("Changing sack size to: %s", conf.sacks_number)
    old_num_sacks = s.NUM_SACKS
    s.set_storage_settings(conf.sacks_number)
    s.remove_sack_group(old_num_sacks)
项目:kolla-kubernetes-personal    作者:rthallisey    | 项目源码 | 文件源码
def setUp(self):
        super(TestCase, self).setUp()
        self.conf = cfg.ConfigOpts()
        default_config_files = self.get_default_config_files()
        common_config.parse(self.conf, [],
                            default_config_files=default_config_files)
        # NOTE(jeffrey4l): mock the _get_image_dir method to return a fake
        # docker images dir
        self.useFixture(fixtures.MockPatch(
            'kolla.cmd.build.KollaWorker._get_images_dir',
            mock.Mock(return_value=os.path.join(TESTS_ROOT, 'docker'))))
项目:kolla-kubernetes-personal    作者:rthallisey    | 项目源码 | 文件源码
def main():
    conf = cfg.ConfigOpts()
    common_config.parse(conf, sys.argv[1:], prog='kolla-build')

    retrieve_local_versions()
    retrieve_upstream_versions()

    compare_versions()
项目:deb-python-cotyledon    作者:openstack    | 项目源码 | 文件源码
def __init__(self, worker_id):
        conf = cfg.ConfigOpts()
        conf([], project='gnocchi', validate_default_values=True,
             version="0.1")
        oslo_config_glue.load_options(self, conf)
项目:hide-your-tweets    作者:diazjf    | 项目源码 | 文件源码
def get_config():
    config = cfg.ConfigOpts()
    config(['--config-file', 'castellan.conf'])
    return config
项目:cotyledon    作者:sileht    | 项目源码 | 文件源码
def oslo_app():
    conf = cfg.ConfigOpts()
    conf([], project='openstack-app', validate_default_values=True,
         version="0.1")

    p = cotyledon.ServiceManager()
    oslo_config_glue.setup(p, conf)
    p.add(OsloService)
    p.run()
项目:bilean    作者:openstack    | 项目源码 | 文件源码
def start(self, application, default_port):
        """Run a WSGI server with the given application.

        :param application: The application to run in the WSGI server
        :param conf: a cfg.ConfigOpts object
        :param default_port: Port to bind to if none is specified in conf
        """

        eventlet.wsgi.MAX_HEADER_LINE = self.conf.max_header_line
        self.application = application
        self.default_port = default_port
        self.configure_socket()
        self.start_wsgi()
项目:bilean    作者:openstack    | 项目源码 | 文件源码
def setup_paste_factories(conf):
    """Set up the generic paste app and filter factories.

    The app factories are constructed at runtime to allow us to pass a
    ConfigOpts object to the WSGI classes.

    :param conf: a ConfigOpts object
    """
    global app_factory, filter_factory

    app_factory = AppFactory(conf)
    filter_factory = FilterFactory(conf)
项目:daisy    作者:opnfv    | 项目源码 | 文件源码
def test_parser(argv):
    options_keys = ['dha', 'network', 'cluster', 'host', 'install', 'isbare', 'scenario']

    conf = cfg.ConfigOpts()
    parse(conf, argv)
    for option in options_keys:
        if conf[option]:
            if option == 'isbare':
                argv[argv.index('--' + option) + 1] = int(argv[argv.index('--' + option) + 1])
            assert conf[option] == argv[argv.index('--' + option) + 1]
项目:daisy    作者:opnfv    | 项目源码 | 文件源码
def get_conf_from_deploy():
    conf = cfg.ConfigOpts()
    parse(conf, sys.argv[1:])
    daisyserver_size, controller_node_size, compute_node_size,\
        daisy_passwd, daisy_ip, daisy_gateway,\
        hosts_num = get_yml_para(conf['dha'])
    print "{hosts_num} {ip} {passwd} -s {size} -g {gateway}".format(
        hosts_num=hosts_num,
        passwd=daisy_passwd,
        size=daisyserver_size,
        ip=daisy_ip,
        gateway=daisy_gateway)
项目:deb-oslotest    作者:openstack    | 项目源码 | 文件源码
def get_config_parser():
    conf = cfg.ConfigOpts()
    conf.register_cli_opt(
        cfg.StrOpt(
            'repo_root',
            default='.',
            help='directory containing the git repositories',
        )
    )
    return conf
项目:gnocchi    作者:gnocchixyz    | 项目源码 | 文件源码
def upgrade():
    conf = cfg.ConfigOpts()
    sack_number_opt = copy.copy(_SACK_NUMBER_OPT)
    sack_number_opt.default = 128
    conf.register_cli_opts([
        cfg.BoolOpt("skip-index", default=False,
                    help="Skip index upgrade."),
        cfg.BoolOpt("skip-storage", default=False,
                    help="Skip storage upgrade."),
        cfg.BoolOpt("skip-incoming", default=False,
                    help="Skip incoming storage upgrade."),
        cfg.BoolOpt("skip-archive-policies-creation", default=False,
                    help="Skip default archive policies creation."),
        sack_number_opt,
    ])
    conf = service.prepare_service(conf=conf, log_to_std=True)
    if not conf.skip_index:
        index = indexer.get_driver(conf)
        LOG.info("Upgrading indexer %s", index)
        index.upgrade()
    if not conf.skip_storage:
        # FIXME(jd) Pass None as coordinator because it's not needed in this
        # case. This will be removed when the storage will stop requiring a
        # coordinator object.
        s = storage.get_driver(conf, None)
        LOG.info("Upgrading storage %s", s)
        s.upgrade()
    if not conf.skip_incoming:
        i = incoming.get_driver(conf)
        LOG.info("Upgrading incoming storage %s", i)
        i.upgrade(conf.sacks_number)

    if (not conf.skip_archive_policies_creation
            and not index.list_archive_policies()
            and not index.list_archive_policy_rules()):
        if conf.skip_index:
            index = indexer.get_driver(conf)
        for name, ap in six.iteritems(archive_policy.DEFAULT_ARCHIVE_POLICIES):
            index.create_archive_policy(ap)
        index.create_archive_policy_rule("default", "*", "low")
项目:gnocchi    作者:gnocchixyz    | 项目源码 | 文件源码
def metricd():
    conf = cfg.ConfigOpts()
    conf.register_cli_opts([
        cfg.IntOpt("stop-after-processing-metrics",
                   default=0,
                   min=0,
                   help="Number of metrics to process without workers, "
                   "for testing purpose"),
    ])
    conf = service.prepare_service(conf=conf)

    if conf.stop_after_processing_metrics:
        metricd_tester(conf)
    else:
        MetricdServiceManager(conf).run()
项目:gnocchi    作者:gnocchixyz    | 项目源码 | 文件源码
def prepare_service(conf=None):
    if conf is None:
        conf = cfg.ConfigOpts()

    opts.set_defaults()
    policy_opts.set_defaults(conf)
    conf = service.prepare_service(conf=conf)
    cfg_path = conf.oslo_policy.policy_file
    if not os.path.isabs(cfg_path):
        cfg_path = conf.find_file(cfg_path)
    if cfg_path is None or not os.path.exists(cfg_path):
        cfg_path = os.path.abspath(os.path.join(os.path.dirname(__file__),
                                   '..', 'rest', 'policy.json'))
    conf.set_default('policy_file', cfg_path, group='oslo_policy')
    return conf
项目:gnocchi    作者:gnocchixyz    | 项目源码 | 文件源码
def injector():
    conf = cfg.ConfigOpts()
    conf.register_cli_opts([
        cfg.IntOpt("metrics", default=1, min=1),
        cfg.StrOpt("archive-policy-name", default="low"),
        cfg.StrOpt("creator", default="admin"),
        cfg.IntOpt("batch-of-measures", default=1000),
        cfg.IntOpt("measures-per-batch", default=10),
    ])
    conf = service.prepare_service(conf=conf)
    index = indexer.get_driver(conf)
    instore = incoming.get_driver(conf)

    def todo():
        metric = index.create_metric(
            uuid.uuid4(),
            creator=conf.creator,
            archive_policy_name=conf.archive_policy_name)

        for _ in six.moves.range(conf.batch_of_measures):
            measures = [
                incoming.Measure(
                    utils.dt_in_unix_ns(utils.utcnow()), random.random())
                for __ in six.moves.range(conf.measures_per_batch)]
            instore.add_measures(metric, measures)

    with futures.ThreadPoolExecutor(max_workers=conf.metrics) as executor:
        for m in six.moves.range(conf.metrics):
            executor.submit(todo)
项目:bareon-allocator    作者:openstack    | 项目源码 | 文件源码
def make_config():
    conf = cfg.ConfigOpts()

    conf.register_cli_opts(cli_opts)
    log.register_options(conf)

    return conf
项目:masakari    作者:openstack    | 项目源码 | 文件源码
def __call__(self, args=None, default_config_files=None):
            if default_config_files is None:
                default_config_files = []
            return cfg.ConfigOpts.__call__(
                self,
                args=args,
                prog='test',
                version='1.0',
                usage='%(prog)s FOO BAR',
                default_config_files=default_config_files,
                validate_default_values=True)
项目:kolla    作者:sieve-microservices    | 项目源码 | 文件源码
def setUp(self):
        super(TestCase, self).setUp()
        self.conf = cfg.ConfigOpts()
        default_config_files = self.get_default_config_files()
        common_config.parse(self.conf, [],
                            default_config_files=default_config_files)
        # NOTE(jeffrey4l): mock the _get_image_dir method to return a fake
        # docker images dir
        self.useFixture(fixtures.MockPatch(
            'kolla.image.build.KollaWorker._get_images_dir',
            mock.Mock(return_value=os.path.join(TESTS_ROOT, 'docker'))))
项目:kolla-kubernetes-personal    作者:rthallisey    | 项目源码 | 文件源码
def main():
    conf = cfg.ConfigOpts()
    common_config.parse(conf, sys.argv[1:], prog='kolla-build')

    if conf.debug:
        LOG.setLevel(logging.DEBUG)

    kolla = KollaWorker(conf)
    kolla.setup_working_dir()
    kolla.find_dockerfiles()
    kolla.create_dockerfiles()

    if conf.template_only:
        LOG.info('Dockerfiles are generated in %s', kolla.working_dir)
        return

    # We set the atime and mtime to 0 epoch to preserve allow the Docker cache
    # to work like we want. A different size or hash will still force a rebuild
    kolla.set_time()

    queue = kolla.build_queue()
    push_queue = six.moves.queue.Queue()

    if conf.save_dependency:
        kolla.save_dependency(conf.save_dependency)
        LOG.info('Docker images dependency is saved in %s',
                 conf.save_dependency)
        return

    for x in six.moves.range(conf.threads):
        worker = WorkerThread(queue, push_queue, conf)
        worker.setDaemon(True)
        worker.start()

    for x in six.moves.range(conf.push_threads):
        push_thread = PushThread(conf, push_queue)
        push_thread.start()

    # block until queue is empty
    queue.join()
    push_queue.join()

    kolla.summary()
    kolla.cleanup()

    return kolla.get_image_statuses()
项目:kolla-kubernetes-personal    作者:rthallisey    | 项目源码 | 文件源码
def main():
    conf = cfg.ConfigOpts()
    common_config.parse(conf, sys.argv[1:], prog='kolla-build')

    if conf.debug:
        LOG.setLevel(logging.DEBUG)

    kolla = KollaWorker(conf)
    kolla.setup_working_dir()
    kolla.find_dockerfiles()
    kolla.create_dockerfiles()

    if conf.template_only:
        LOG.info('Dockerfiles are generated in %s', kolla.working_dir)
        return

    # We set the atime and mtime to 0 epoch to preserve allow the Docker cache
    # to work like we want. A different size or hash will still force a rebuild
    kolla.set_time()

    queue = kolla.build_queue()
    push_queue = six.moves.queue.Queue()

    if conf.save_dependency:
        kolla.save_dependency(conf.save_dependency)
        LOG.info('Docker images dependency is saved in %s',
                 conf.save_dependency)
        return

    for x in six.moves.range(conf.threads):
        worker = WorkerThread(queue, push_queue, conf)
        worker.setDaemon(True)
        worker.start()

    for x in six.moves.range(conf.push_threads):
        push_thread = PushThread(conf, push_queue)
        push_thread.start()

    # block until queue is empty
    queue.join()
    push_queue.join()

    kolla.summary()
    kolla.cleanup()

    return kolla.get_image_statuses()
项目:bilean    作者:openstack    | 项目源码 | 文件源码
def get_socket(conf, default_port):
    '''Bind socket to bind ip:port in conf

    :param conf: a cfg.ConfigOpts object
    :param default_port: port to bind to if none is specified in conf

    :returns : a socket object as returned from socket.listen or
               ssl.wrap_socket if conf specifies cert_file
    '''

    bind_addr = get_bind_addr(conf, default_port)

    # TODO(jaypipes): eventlet's greened socket module does not actually
    # support IPv6 in getaddrinfo(). We need to get around this in the
    # future or monitor upstream for a fix
    address_family = [addr[0] for addr in socket.getaddrinfo(bind_addr[0],
                      bind_addr[1], socket.AF_UNSPEC, socket.SOCK_STREAM)
                      if addr[0] in (socket.AF_INET, socket.AF_INET6)][0]

    cert_file = conf.cert_file
    key_file = conf.key_file
    use_ssl = cert_file or key_file
    if use_ssl and (not cert_file or not key_file):
        raise RuntimeError(_("When running server in SSL mode, you must "
                             "specify both a cert_file and key_file "
                             "option value in your configuration file"))

    sock = None
    retry_until = time.time() + 30
    while not sock and time.time() < retry_until:
        try:
            sock = eventlet.listen(bind_addr, backlog=conf.backlog,
                                   family=address_family)
        except socket.error as err:
            if err.args[0] != errno.EADDRINUSE:
                raise
            eventlet.sleep(0.1)

    if not sock:
        raise RuntimeError(_("Could not bind to %(bind_addr)s after trying "
                             " 30 seconds") % {'bind_addr': bind_addr})
    return sock
项目:gnocchi    作者:gnocchixyz    | 项目源码 | 文件源码
def api():
    # Compat with previous pbr script
    try:
        double_dash = sys.argv.index("--")
    except ValueError:
        double_dash = None
    else:
        sys.argv.pop(double_dash)

    conf = cfg.ConfigOpts()
    for opt in opts.API_OPTS:
        # NOTE(jd) Register the API options without a default, so they are only
        # used to override the one in the config file
        c = copy.copy(opt)
        c.default = None
        conf.register_cli_opt(c)
    conf = prepare_service(conf)

    if double_dash is not None:
        # NOTE(jd) Wait to this stage to log so we're sure the logging system
        # is in place
        LOG.warning(
            "No need to pass `--' in gnocchi-api command line anymore, "
            "please remove")

    uwsgi = spawn.find_executable("uwsgi")
    if not uwsgi:
        LOG.error("Unable to find `uwsgi'.\n"
                  "Be sure it is installed and in $PATH.")
        return 1

    workers = utils.get_default_workers()

    args = [
        "--if-not-plugin", "python", "--plugin", "python", "--endif",
        "--http-socket", "%s:%d" % (conf.host or conf.api.host,
                                    conf.port or conf.api.port),
        "--master",
        "--enable-threads",
        "--die-on-term",
        # NOTE(jd) See https://github.com/gnocchixyz/gnocchi/issues/156
        "--add-header", "Connection: close",
        "--processes", str(math.floor(workers * 1.5)),
        "--threads", str(workers),
        "--lazy-apps",
        "--chdir", "/",
        "--wsgi", "gnocchi.rest.wsgi",
        "--pyargv", " ".join(sys.argv[1:]),
    ]

    virtual_env = os.getenv("VIRTUAL_ENV")
    if virtual_env is not None:
        args.extend(["-H", os.getenv("VIRTUAL_ENV", ".")])

    return os.execl(uwsgi, uwsgi, *args)