Python click 模块,get_current_context() 实例源码

我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用click.get_current_context()

项目:kubey    作者:bradrf    | 项目源码 | 文件源码
def ctl_each(obj, command, arguments):
    '''Invoke any kubectl command directly for each pod matched and collate the output.'''
    width, height = click.get_terminal_size()
    kubectl = obj.kubey.kubectl
    collector = tabular.RowCollector()
    ns_pods = defaultdict(list)
    for pod in obj.kubey.each_pod(obj.maximum):
        ns_pods[pod.namespace].append(pod)
    for ns, pods in ns_pods.items():
        args = ['-n', ns] + list(arguments) + [p.name for p in pods]
        kubectl.call_table_rows(collector.handler_for(ns), command, *args)
    kubectl.wait()
    if collector.rows:
        click.echo(tabular.tabulate(obj, sorted(collector.rows), collector.headers))
    if kubectl.final_rc != 0:
        click.get_current_context().exit(kubectl.final_rc)
项目:globus-cli    作者:globus    | 项目源码 | 文件源码
def whoami_command():
    """
    Executor for `globus whoami`
    """

    username = lookup_option(WHOAMI_USERNAME_OPTNAME)
    if not username:
        safeprint('No login information available. Please try '
                  'logging in again.', write_to_stderr=True)
        click.get_current_context().exit(1)

    fields = tuple((x, x) for x in ('Username', 'Name', 'ID', 'Email'))
    formatted_print({
            'Username': username,
            'Name': lookup_option(WHOAMI_NAME_OPTNAME),
            'ID': lookup_option(WHOAMI_ID_OPTNAME),
            'Email': lookup_option(WHOAMI_EMAIL_OPTNAME)
        }, fields=fields,
        text_format=(FORMAT_TEXT_RECORD if is_verbose() else FORMAT_TEXT_RAW),
        response_key=None if is_verbose() else 'Username')
项目:globus-cli    作者:globus    | 项目源码 | 文件源码
def shell_complete_option(f):
    def callback(ctx, param, value):
        if not value or ctx.resilient_parsing:
            return

        if value == 'BASH':
            do_bash_complete()
        elif value == 'ZSH':
            do_zsh_complete()
        else:
            raise ValueError('Unsupported shell completion')

        click.get_current_context().exit(0)

    f = click.option('--shell-complete', cls=HiddenOption,
                     is_eager=True, expose_value=False,
                     type=click.Choice(SUPPORTED_SHELLS),
                     callback=callback)(f)
    return f
项目:globus-cli    作者:globus    | 项目源码 | 文件源码
def autoactivate(client, endpoint_id, if_expires_in=None):
    """
    Attempts to auto-activate the given endpoint with the given client
    If auto-activation fails, parses the returned activation requirements
    to determine which methods of activation are supported, then tells
    the user to use 'globus endpoint activate' with the correct options(s)
    """
    kwargs = {}
    if if_expires_in is not None:
        kwargs['if_expires_in'] = if_expires_in

    res = client.endpoint_autoactivate(endpoint_id, **kwargs)
    if res["code"] == "AutoActivationFailed":

        message = ("The endpoint could not be auto-activated and must be "
                   "activated before it can be used.\n\n" +
                   activation_requirements_help_text(res, endpoint_id))

        safeprint(message, write_to_stderr=True)
        click.get_current_context().exit(1)

    else:
        return res
项目:twtxt    作者:buckket    | 项目源码 | 文件源码
def style_tweet(tweet, porcelain=False):
    conf = click.get_current_context().obj["conf"]
    limit = conf.character_limit

    if porcelain:
        return "{nick}\t{url}\t{tweet}".format(
            nick=tweet.source.nick,
            url=tweet.source.url,
            tweet=str(tweet))
    else:
        if sys.stdout.isatty() and not tweet.text.isprintable():
            return None
        styled_text = format_mentions(tweet.text)
        len_styling = len(styled_text) - len(click.unstyle(styled_text))
        final_text = textwrap.shorten(styled_text, limit + len_styling) if limit else styled_text
        timestamp = tweet.absolute_datetime if conf.use_abs_time else tweet.relative_datetime
        return "? {nick} ({time}):\n{tweet}".format(
            nick=click.style(tweet.source.nick, bold=True),
            tweet=final_text,
            time=click.style(timestamp, dim=True))
项目:twtxt    作者:buckket    | 项目源码 | 文件源码
def validate_text(ctx, param, value):
    conf = click.get_current_context().obj["conf"]
    if isinstance(value, tuple):
        value = " ".join(value)

    if not value and not sys.stdin.isatty():
        value = click.get_text_stream("stdin").read()

    if value:
        value = value.strip()
        if conf.character_warning and len(value) > conf.character_warning:
            click.confirm("? Warning: Tweet is longer than {0} characters. Are you sure?".format(
                conf.character_warning), abort=True)
        return value
    else:
        raise click.BadArgumentUsage("Text can’t be empty.")
项目:pipenv    作者:pypa    | 项目源码 | 文件源码
def get_code(shell=None, prog_name=None, env_name=None, extra_env=None):
    """Return the specified completion code"""
    from jinja2 import Template
    if shell in [None, 'auto']:
        shell = get_auto_shell()
    prog_name = prog_name or click.get_current_context().find_root().info_name
    env_name = env_name or '_%s_COMPLETE' % prog_name.upper().replace('-', '_')
    extra_env = extra_env if extra_env else {}
    if shell == 'fish':
        return Template(FISH_TEMPLATE).render(prog_name=prog_name, complete_var=env_name, extra_env=extra_env)
    elif shell == 'bash':
        return Template(BASH_COMPLETION_SCRIPT).render(prog_name=prog_name, complete_var=env_name, extra_env=extra_env)
    elif shell == 'zsh':
        return Template(ZSH_TEMPLATE).render(prog_name=prog_name, complete_var=env_name, extra_env=extra_env)
    elif shell == 'powershell':
        return Template(POWERSHELL_COMPLETION_SCRIPT).render(prog_name=prog_name, complete_var=env_name, extra_env=extra_env)
    else:
        raise click.ClickException('%s is not supported.' % shell)
项目:Sentry    作者:NetEaseGame    | 项目源码 | 文件源码
def configure():
    """
    Kick things off and configure all the things.

    A guess is made as to whether the entrypoint is coming from Click
    or from another invocation of `configure()`. If Click, we're able
    to pass along the Click context object.
    """
    from .settings import discover_configs, configure
    try:
        ctx = click.get_current_context()
    except RuntimeError:
        ctx = None
    _, py, yaml = discover_configs(ctx)

    # TODO(mattrobenolt): Surface this also as a CLI option?
    skip_backend_validation = 'SENTRY_SKIP_BACKEND_VALIDATION' in os.environ
    configure(ctx, py, yaml, skip_backend_validation)
项目:valohai-cli    作者:valohai    | 项目源码 | 文件源码
def _get_current_api_session():
    """
    Get an API session, either from the Click context cache, or a new one from the config.

    :return: API session
    :rtype: APISession
    """
    host, token = get_host_and_token()
    ctx = click.get_current_context(silent=True) or None
    cache_key = force_text('_api_session_%s_%s' % (host, token))
    session = (getattr(ctx, cache_key, None) if ctx else None)
    if not session:
        session = APISession(host, token)
        if ctx:
            setattr(ctx, cache_key, session)
    return session
项目:valohai-cli    作者:valohai    | 项目源码 | 文件源码
def lint(filenames):
    """
    Lint (syntax-check) a valohai.yaml file.

    The return code of this command will be the total number of errors found in all the files.
    """
    if not filenames:
        project = get_project()
        directory = (project.directory if project else get_project_directory())
        config_file = os.path.join(directory, 'valohai.yaml')
        if not os.path.exists(config_file):
            raise CLIException('There is no %s file. Pass in the names of configuration files to lint?' % config_file)
        filenames = [config_file]
    total_errors = 0
    for filename in filenames:
        total_errors += validate_file(filename)
    if total_errors:
        warn('There were %d total errors.' % total_errors)
    click.get_current_context().exit(total_errors)
项目:crane    作者:kiwicom    | 项目源码 | 文件源码
def main(**parsed_settings):
    click_context = click.get_current_context()
    click_context.color = True  # GitLab doesn't report terminal type correctly so we need to force it

    settings.update(parsed_settings)
    rancher.session.auth = settings['access_key'], settings['secret_key']
    deployment.load_from_settings(settings)

    hooks.dispatch('before_upgrade')

    try:
        upgrade()
    except Exception as ex:
        hooks.dispatch('after_upgrade_failure')
        if isinstance(ex, rancher.UpgradeFailed):
            return  # we handled it gracefully already
        raise
    else:
        hooks.dispatch('after_upgrade_success')
项目:kubey    作者:bradrf    | 项目源码 | 文件源码
def tail(obj, follow, prefix, number):
    '''Show recent logs from containers for each pod matched.

    NUMBER is a count of recent lines or a relative duration (e.g. 5s, 2m, 3h)
    '''

    kubectl = obj.kubey.kubectl

    if re.match(r'^\d+$', number):
        log_args = ['--tail', str(number)]
    else:
        log_args = ['--since', number]

    if follow:
        log_args.append('-f')

    for pod in obj.kubey.each_pod(obj.maximum):
        for container in pod.containers:
            args = ['-n', pod.namespace, '-c', container.name] + log_args + [pod.name]
            if prefix:
                prefix = '[%s:%s] ' % (pod.name, container.name)
                kubectl.call_prefix(prefix, 'logs', *args)
            else:
                kubectl.call_async('logs', *args)

    kubectl.wait()
    if kubectl.final_rc != 0:
        click.get_current_context().exit(kubectl.final_rc)
项目:shub-image    作者:scrapinghub    | 项目源码 | 文件源码
def debug_log(msg):
    ctx = click.get_current_context(True)
    if ctx and ctx.params.get('debug'):
        click.echo(msg)
项目:sphinxcontrib-versioning    作者:Robpol86    | 项目源码 | 文件源码
def from_context(cls):
        """Retrieve this class' instance from the current Click context.

        :return: Instance of this class.
        :rtype: Config
        """
        try:
            ctx = click.get_current_context()
        except RuntimeError:
            return cls()
        return ctx.find_object(cls)
项目:athena    作者:slint    | 项目源码 | 文件源码
def click_fail(msg):
    """Convenience failing function."""
    click.get_current_context().fail(msg)
项目:bay    作者:eventbrite    | 项目源码 | 文件源码
def invoke(self, command_name, **kwargs):
        """
        Runs a [sub]command by name, passing context automatically.
        """
        context = click.get_current_context()
        command = cli.get_command(context, command_name)
        context.invoke(command, **kwargs)
项目:dpm-py    作者:oki-archive    | 项目源码 | 文件源码
def configure():
    """
    Update configuration options. Configuration will be saved in ~/.dpm/conf or
    the file provided with --config option.
    """
    config.prompt_config(click.get_current_context().parent.params['config_path'])
项目:dpm-py    作者:oki-archive    | 项目源码 | 文件源码
def validate():
    """
    Validate datapackage in the current dir. Print validation errors if found.
    """
    client = click.get_current_context().meta['client']

    try:
        client.validate()
    except (ValidationError, dprclient.ResourceDoesNotExist) as e:
        echo('[ERROR] %s\n' % str(e))
        sys.exit(1)

    echo('datapackage.json is valid\n')
项目:dpm-py    作者:oki-archive    | 项目源码 | 文件源码
def purge():
    """
    Purge datapackage from the registry server.
    """
    client = click.get_current_context().meta['client']
    client.purge()
    echo('purge ok')
项目:dpm-py    作者:oki-archive    | 项目源码 | 文件源码
def delete():
    """
    Delete datapackage from the registry server.
    """
    client = click.get_current_context().meta['client']
    client.delete()
    echo('delete ok')
项目:dpm-py    作者:oki-archive    | 项目源码 | 文件源码
def undelete():
    """
    Undelete datapackage from the registry server.
    """
    client = click.get_current_context().meta['client']
    client.undelete()
    echo('undelete ok')
项目:dpm-py    作者:oki-archive    | 项目源码 | 文件源码
def tag(tag_string):
    """
    Tag datapackage on the server. Create new copy of the latest version of the
    datapackage on the server, and assign a tag.
    """
    client = click.get_current_context().meta['client']
    puburl = client.tag(tag_string)
    echo('Datapackage successfully tagged.')
项目:q2cli    作者:qiime2    | 项目源码 | 文件源码
def _parse_boolean(self, string):
        """Parse string representing a boolean into Python bool type.

        Supported values match `configparser.ConfigParser.getboolean`.

        """
        trues = ['1', 'yes', 'true', 'on']
        falses = ['0', 'no', 'false', 'off']

        string_lower = string.lower()
        if string_lower in trues:
            return True
        elif string_lower in falses:
            return False
        else:
            import itertools
            import click

            msg = (
                "Error: unrecognized value for --%s flag: %s\n"
                "Supported values (case-insensitive): %s" %
                (self.cli_name, string,
                 ', '.join(itertools.chain(trues, falses)))
            )
            click.secho(msg, err=True, fg='red', bold=True)
            ctx = click.get_current_context()
            ctx.exit(1)
项目:q2cli    作者:qiime2    | 项目源码 | 文件源码
def get_value(self, arguments, fallback=None):
        import os
        import os.path
        import click

        try:
            path = self._locate_value(arguments, fallback=fallback)

            # TODO: do we want a --force like flag?
            if os.path.exists(path):
                click.secho("Error: --%s directory already exists, won't "
                            "overwrite." % self.cli_name, err=True, fg='red',
                            bold=True)
                ctx = click.get_current_context()
                ctx.exit(1)

            os.makedirs(path)

            def fallback_(name, cli_name):
                return os.path.join(path, name)
            return fallback_

        except ValueNotFoundException:
            # Always fail to find a value as this handler doesn't exist.
            def fail(*_):
                raise ValueNotFoundException()

            return fail
项目:q2cli    作者:qiime2    | 项目源码 | 文件源码
def _error_with_duplicate_in_set(self, elements):
        import click
        import collections

        counter = collections.Counter(elements)
        dups = {name for name, count in counter.items() if count > 1}

        ctx = click.get_current_context()
        click.echo(ctx.get_usage() + '\n', err=True)
        click.secho("Error: Option --%s was given these values: %r more than "
                    "one time, values passed should be unique."
                    % (self.cli_name, dups), err=True, fg='red', bold=True)
        ctx.exit(1)
项目:q2cli    作者:qiime2    | 项目源码 | 文件源码
def handle_in_params(self, kwargs):
        import q2cli.handlers

        arguments = {}
        missing = []
        cmd_fallback = self.cmd_config_handler.get_value(kwargs)

        verbose = self.verbose_handler.get_value(kwargs, fallback=cmd_fallback)
        quiet = self.quiet_handler.get_value(kwargs, fallback=cmd_fallback)

        if verbose and quiet:
            click.secho('Unsure of how to be quiet and verbose at the '
                        'same time.', fg='red', bold=True, err=True)
            click.get_current_context().exit(1)

        for item in self.action['signature']:
            if item['type'] == 'input' or item['type'] == 'parameter':
                name = item['name']
                handler = self.generated_handlers[name]
                try:
                    arguments[name] = handler.get_value(
                        kwargs, fallback=cmd_fallback
                    )
                except q2cli.handlers.ValueNotFoundException:
                    missing += handler.missing

        return arguments, missing, verbose, quiet
项目:globus-cli    作者:globus    | 项目源码 | 文件源码
def outformat_is_json():
    """
    Only safe to call within a click context.
    """
    ctx = click.get_current_context()
    state = ctx.ensure_object(CommandState)
    return state.outformat_is_json()
项目:globus-cli    作者:globus    | 项目源码 | 文件源码
def outformat_is_text():
    """
    Only safe to call within a click context.
    """
    ctx = click.get_current_context()
    state = ctx.ensure_object(CommandState)
    return state.outformat_is_text()
项目:globus-cli    作者:globus    | 项目源码 | 文件源码
def get_jmespath_expression():
    """
    Only safe to call within a click context.
    """
    ctx = click.get_current_context()
    state = ctx.ensure_object(CommandState)
    return state.jmespath_expr
项目:globus-cli    作者:globus    | 项目源码 | 文件源码
def verbosity():
    """
    Only safe to call within a click context.
    """
    ctx = click.get_current_context()
    state = ctx.ensure_object(CommandState)
    return state.verbosity
项目:globus-cli    作者:globus    | 项目源码 | 文件源码
def is_verbose():
    """
    Only safe to call within a click context.
    """
    ctx = click.get_current_context()
    state = ctx.ensure_object(CommandState)
    return state.is_verbose()
项目:globus-cli    作者:globus    | 项目源码 | 文件源码
def task_pause_info(task_id):
    """
    Executor for `globus task pause-info`
    """
    client = get_client()
    res = client.task_pause_info(task_id)

    def _custom_text_format(res):
        explicit_pauses = [
            field for field in EXPLICIT_PAUSE_MSG_FIELDS
            # n.b. some keys are absent for completed tasks
            if res.get(field[1])
        ]
        effective_pause_rules = res['pause_rules']

        if not explicit_pauses and not effective_pause_rules:
            safeprint('Task {} is not paused.'.format(task_id))
            click.get_current_context().exit(0)

        if explicit_pauses:
            formatted_print(
                res, fields=explicit_pauses, text_format=FORMAT_TEXT_RECORD,
                text_preamble='This task has been explicitly paused.\n',
                text_epilog='\n' if effective_pause_rules else None)

        if effective_pause_rules:
            formatted_print(
                effective_pause_rules, fields=PAUSE_RULE_DISPLAY_FIELDS,
                text_preamble=(
                    'The following pause rules are effective on this task:\n'))

    formatted_print(res, text_format=_custom_text_format)
项目:globus-cli    作者:globus    | 项目源码 | 文件源码
def resolve_id_or_name(client, bookmark_id_or_name):
    # leading/trailing whitespace doesn't make sense for UUIDs and the Transfer
    # service outright forbids it for bookmark names, so we can strip it off
    bookmark_id_or_name = bookmark_id_or_name.strip()

    res = None
    try:
        UUID(bookmark_id_or_name)  # raises ValueError if argument not a UUID
    except ValueError:
        pass
    else:
        try:
            res = client.get_bookmark(bookmark_id_or_name.lower())
        except TransferAPIError as exception:
            if exception.code != 'BookmarkNotFound':
                raise

    if not res:  # non-UUID input or UUID not found; fallback to match by name
        try:
            # n.b. case matters to the Transfer service for bookmark names, so
            # two bookmarks can exist whose names vary only by their case
            res = next(bookmark_row for bookmark_row in client.bookmark_list()
                       if bookmark_row['name'] == bookmark_id_or_name)

        except StopIteration:
            safeprint(u'No bookmark found for "{}"'.format(
                      bookmark_id_or_name), write_to_stderr=True)
            click.get_current_context().exit(1)

    return res
项目:globus-cli    作者:globus    | 项目源码 | 文件源码
def do_bash_complete():
    comp_words, quoted = safe_split_line(os.environ['COMP_WORDS'])
    cur_index = int(os.environ['COMP_CWORD'])
    try:
        cur = comp_words[cur_index]
        completed_args = comp_words[1:-1]
    except IndexError:
        cur = None
        completed_args = comp_words[1:]

    choices = get_all_choices(completed_args, cur, quoted)

    safeprint('\t'.join(choices), newline=False)
    click.get_current_context().exit(0)
项目:globus-cli    作者:globus    | 项目源码 | 文件源码
def exit_with_mapped_status(http_status):
    """
    Given an HTTP Status, exit with either an error status of 1 or the
    status mapped by what we were given.
    """
    # get the mapping by looking up the state and getting the mapping attr
    mapping = (click.get_current_context().ensure_object(CommandState)
               .http_status_map)

    # if there is a mapped exit code, exit with that. Otherwise, exit 1
    if http_status in mapping:
        sys.exit(mapping[http_status])
    else:
        sys.exit(1)
项目:twtxt    作者:buckket    | 项目源码 | 文件源码
def generate_user_agent():
    try:
        version = pkg_resources.require("twtxt")[0].version
    except pkg_resources.DistributionNotFound:
        version = "unknown"

    conf = click.get_current_context().obj["conf"]
    if conf.disclose_identity and conf.nick and conf.twturl:
        user_agent = "twtxt/{version} (+{url}; @{nick})".format(
            version=version, url=conf.twturl, nick=conf.nick)
    else:
        user_agent = "twtxt/{version}".format(version=version)

    return {"User-Agent": user_agent}
项目:twtxt    作者:buckket    | 项目源码 | 文件源码
def get_source_by_url(url):
    conf = click.get_current_context().obj["conf"]
    if url == conf.twturl:
        return conf.source
    return next((source for source in conf.following if url == source.url), None)
项目:twtxt    作者:buckket    | 项目源码 | 文件源码
def get_source_by_name(nick):
    nick = nick.lower()
    conf = click.get_current_context().obj["conf"]
    if nick == conf.nick and conf.twturl:
        return conf.source
    return next((source for source in conf.following if nick == source.nick), None)
项目:histonets-cv    作者:sul-cidr    | 项目源码 | 文件源码
def main(rst=None):
    """Histonets computer vision application for image processing"""
    ctx = click.get_current_context()
    if rst:
        click.echo()
        comamnds_text = 'Commands'
        options_text = 'Options:'
        main_help, _ = main.get_help(ctx).split(comamnds_text, 1)
        click.echo(main_help)
        click.echo(comamnds_text)
        click.echo('-' * len(comamnds_text))
        click.echo()
        for command_name, command in sorted(main.commands.items()):
            click.echo(command_name)
            click.echo('~' * len(command_name))
            click.echo(command.get_usage(ctx)
                              .replace('histonets ', "histonets {} ".format(
                                       command_name))
                              .replace('\b\n', ''))
            click.echo()
            click.echo(command.help.replace('\b\n', ''))
            command_help = command.get_help(ctx)
            _, command_options_help = command_help.split(options_text, 1)
            command_options, _ = command_options_help.rsplit('--help', 1)
            click.echo()
            click.echo(options_text)
            click.echo(command_options)
            click.echo()
    elif ctx.invoked_subcommand is None:
        click.echo(main.get_help(ctx))
项目:histonets-cv    作者:sul-cidr    | 项目源码 | 文件源码
def pipeline(image, actions):
    """Allow chaining a series of actions to be applied to IMAGE.
    Output will depend on the last action applied.

    \b
    - ACTIONS is a JSON list of dictionaries containing each an 'action' key
      specifying the action to apply, a 'arguments' key which is a
      list of arguments, and a 'options' key with a dictionary to set the
      options for the corresponding action.

      Example::

        histonets pipeline '[{"action": "contrast", "options": {"value": 50}}]'
    """
    output = image.image
    for action in actions:
        ctx = click.get_current_context()
        arguments = [output] + action.get('arguments', [])
        command = main.get_command(ctx, action['action'])
        if command is None:
            raise click.BadParameter(
                "Action '{}' not found".format(action['action']))
        action_options = action.get('options', {})
        options = {param.name: action_options.get(param.name, param.default)
                   for param in command.params[:-2]}
        options['output'] = RAW
        try:
            output = command.callback(*arguments, **options)
        except TypeError as e:
            raise click.BadParameter(e)
    return output
项目:ndeftool    作者:nfcpy    | 项目源码 | 文件源码
def info(*args):
    if click.get_current_context().meta['output-logmsg'] != 'silent':
        click.secho(*args, err=True, fg='green')
项目:ndeftool    作者:nfcpy    | 项目源码 | 文件源码
def dmsg(*args):
    if click.get_current_context().meta['output-logmsg'] == 'debug':
        click.secho(*args, err=True, fg='yellow')
项目:databrewer    作者:rmax    | 项目源码 | 文件源码
def _fail(message, retcode=1):
    click.echo(message, sys.stderr)
    click.get_current_context().find_root().exit(retcode)
项目:databrewer    作者:rmax    | 项目源码 | 文件源码
def _echo(output, min_lines=10):
    ctx = click.get_current_context()
    if ctx.obj.get('use_pager') and output.count('\n') > min_lines:
        _func = click.echo_via_pager
    else:
        _func = click.echo
    _func(output, sys.stdout)
项目:kozinaki    作者:compunova    | 项目源码 | 文件源码
def cmd_callback(self, name):
        ctx = click.get_current_context()
        for node_name in name:
            node = node_manager.node_get(node_name=node_name)
            if node:
                response = node.command(cmd=ctx.command.name)
                for r in response:
                    print(r)
            else:
                click.secho('Node "{}" not found'.format(node_name), fg='red')


# CREATE PROVIDER NODES
项目:kozinaki    作者:compunova    | 项目源码 | 文件源码
def create_node_callback(self, name, compute_driver, **kwargs):
        with click_spinner.spinner():
            ctx = click.get_current_context()
            node_manager.node_create(node_name=name, node_type=ctx.command.name,
                                     compute_driver=compute_driver, **kwargs)
项目:anchore    作者:anchore    | 项目源码 | 文件源码
def init_nav_contexts():
    try:
        # use the obj from the current click context. This is a bit hacky, but works as long as this method is
        # invoked in an execution context of click
        anchore_config = click.get_current_context().obj
        nav = navigator.Navigator(anchore_config=anchore_config, imagelist=imagelist, allimages=contexts['anchore_allimages'])
        return nav
    except Exception as err:
        anchore_print_err("explore operation failed")
        success = False
        ecode = 1

    if not success:
        contexts['anchore_allimages'].clear()
        sys.exit(ecode)
项目:anchore    作者:anchore    | 项目源码 | 文件源码
def init_nav_contexts():
    try:
        # use the obj from the current click context. This is a bit hacky, but works as long as this method is
        # invoked in an execution context of click
        anchore_config = click.get_current_context().obj
        nav = navigator.Navigator(anchore_config=anchore_config, imagelist=imagelist, allimages=contexts['anchore_allimages'])
        return nav
    except Exception as err:
        anchore_print_err("explore operation failed")
        success = False
        ecode = 1

    if not success:
        contexts['anchore_allimages'].clear()
        sys.exit(ecode)
项目:Sentry    作者:NetEaseGame    | 项目源码 | 文件源码
def get_sentry_conf():
    """
    Fetch the SENTRY_CONF value, either from the click context
    if available, or SENTRY_CONF environment variable.
    """
    try:
        ctx = click.get_current_context()
        return ctx.obj['config']
    except (RuntimeError, KeyError):
        try:
            return os.environ['SENTRY_CONF']
        except KeyError:
            return '~/.sentry'
项目:logstapo    作者:ThiefMaster    | 项目源码 | 文件源码
def data(self):
        return click.get_current_context().params['config']