我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用click.get_current_context()。
def ctl_each(obj, command, arguments): '''Invoke any kubectl command directly for each pod matched and collate the output.''' width, height = click.get_terminal_size() kubectl = obj.kubey.kubectl collector = tabular.RowCollector() ns_pods = defaultdict(list) for pod in obj.kubey.each_pod(obj.maximum): ns_pods[pod.namespace].append(pod) for ns, pods in ns_pods.items(): args = ['-n', ns] + list(arguments) + [p.name for p in pods] kubectl.call_table_rows(collector.handler_for(ns), command, *args) kubectl.wait() if collector.rows: click.echo(tabular.tabulate(obj, sorted(collector.rows), collector.headers)) if kubectl.final_rc != 0: click.get_current_context().exit(kubectl.final_rc)
def whoami_command(): """ Executor for `globus whoami` """ username = lookup_option(WHOAMI_USERNAME_OPTNAME) if not username: safeprint('No login information available. Please try ' 'logging in again.', write_to_stderr=True) click.get_current_context().exit(1) fields = tuple((x, x) for x in ('Username', 'Name', 'ID', 'Email')) formatted_print({ 'Username': username, 'Name': lookup_option(WHOAMI_NAME_OPTNAME), 'ID': lookup_option(WHOAMI_ID_OPTNAME), 'Email': lookup_option(WHOAMI_EMAIL_OPTNAME) }, fields=fields, text_format=(FORMAT_TEXT_RECORD if is_verbose() else FORMAT_TEXT_RAW), response_key=None if is_verbose() else 'Username')
def shell_complete_option(f): def callback(ctx, param, value): if not value or ctx.resilient_parsing: return if value == 'BASH': do_bash_complete() elif value == 'ZSH': do_zsh_complete() else: raise ValueError('Unsupported shell completion') click.get_current_context().exit(0) f = click.option('--shell-complete', cls=HiddenOption, is_eager=True, expose_value=False, type=click.Choice(SUPPORTED_SHELLS), callback=callback)(f) return f
def autoactivate(client, endpoint_id, if_expires_in=None): """ Attempts to auto-activate the given endpoint with the given client If auto-activation fails, parses the returned activation requirements to determine which methods of activation are supported, then tells the user to use 'globus endpoint activate' with the correct options(s) """ kwargs = {} if if_expires_in is not None: kwargs['if_expires_in'] = if_expires_in res = client.endpoint_autoactivate(endpoint_id, **kwargs) if res["code"] == "AutoActivationFailed": message = ("The endpoint could not be auto-activated and must be " "activated before it can be used.\n\n" + activation_requirements_help_text(res, endpoint_id)) safeprint(message, write_to_stderr=True) click.get_current_context().exit(1) else: return res
def style_tweet(tweet, porcelain=False): conf = click.get_current_context().obj["conf"] limit = conf.character_limit if porcelain: return "{nick}\t{url}\t{tweet}".format( nick=tweet.source.nick, url=tweet.source.url, tweet=str(tweet)) else: if sys.stdout.isatty() and not tweet.text.isprintable(): return None styled_text = format_mentions(tweet.text) len_styling = len(styled_text) - len(click.unstyle(styled_text)) final_text = textwrap.shorten(styled_text, limit + len_styling) if limit else styled_text timestamp = tweet.absolute_datetime if conf.use_abs_time else tweet.relative_datetime return "? {nick} ({time}):\n{tweet}".format( nick=click.style(tweet.source.nick, bold=True), tweet=final_text, time=click.style(timestamp, dim=True))
def validate_text(ctx, param, value): conf = click.get_current_context().obj["conf"] if isinstance(value, tuple): value = " ".join(value) if not value and not sys.stdin.isatty(): value = click.get_text_stream("stdin").read() if value: value = value.strip() if conf.character_warning and len(value) > conf.character_warning: click.confirm("? Warning: Tweet is longer than {0} characters. Are you sure?".format( conf.character_warning), abort=True) return value else: raise click.BadArgumentUsage("Text can’t be empty.")
def get_code(shell=None, prog_name=None, env_name=None, extra_env=None): """Return the specified completion code""" from jinja2 import Template if shell in [None, 'auto']: shell = get_auto_shell() prog_name = prog_name or click.get_current_context().find_root().info_name env_name = env_name or '_%s_COMPLETE' % prog_name.upper().replace('-', '_') extra_env = extra_env if extra_env else {} if shell == 'fish': return Template(FISH_TEMPLATE).render(prog_name=prog_name, complete_var=env_name, extra_env=extra_env) elif shell == 'bash': return Template(BASH_COMPLETION_SCRIPT).render(prog_name=prog_name, complete_var=env_name, extra_env=extra_env) elif shell == 'zsh': return Template(ZSH_TEMPLATE).render(prog_name=prog_name, complete_var=env_name, extra_env=extra_env) elif shell == 'powershell': return Template(POWERSHELL_COMPLETION_SCRIPT).render(prog_name=prog_name, complete_var=env_name, extra_env=extra_env) else: raise click.ClickException('%s is not supported.' % shell)
def configure(): """ Kick things off and configure all the things. A guess is made as to whether the entrypoint is coming from Click or from another invocation of `configure()`. If Click, we're able to pass along the Click context object. """ from .settings import discover_configs, configure try: ctx = click.get_current_context() except RuntimeError: ctx = None _, py, yaml = discover_configs(ctx) # TODO(mattrobenolt): Surface this also as a CLI option? skip_backend_validation = 'SENTRY_SKIP_BACKEND_VALIDATION' in os.environ configure(ctx, py, yaml, skip_backend_validation)
def _get_current_api_session(): """ Get an API session, either from the Click context cache, or a new one from the config. :return: API session :rtype: APISession """ host, token = get_host_and_token() ctx = click.get_current_context(silent=True) or None cache_key = force_text('_api_session_%s_%s' % (host, token)) session = (getattr(ctx, cache_key, None) if ctx else None) if not session: session = APISession(host, token) if ctx: setattr(ctx, cache_key, session) return session
def lint(filenames): """ Lint (syntax-check) a valohai.yaml file. The return code of this command will be the total number of errors found in all the files. """ if not filenames: project = get_project() directory = (project.directory if project else get_project_directory()) config_file = os.path.join(directory, 'valohai.yaml') if not os.path.exists(config_file): raise CLIException('There is no %s file. Pass in the names of configuration files to lint?' % config_file) filenames = [config_file] total_errors = 0 for filename in filenames: total_errors += validate_file(filename) if total_errors: warn('There were %d total errors.' % total_errors) click.get_current_context().exit(total_errors)
def main(**parsed_settings): click_context = click.get_current_context() click_context.color = True # GitLab doesn't report terminal type correctly so we need to force it settings.update(parsed_settings) rancher.session.auth = settings['access_key'], settings['secret_key'] deployment.load_from_settings(settings) hooks.dispatch('before_upgrade') try: upgrade() except Exception as ex: hooks.dispatch('after_upgrade_failure') if isinstance(ex, rancher.UpgradeFailed): return # we handled it gracefully already raise else: hooks.dispatch('after_upgrade_success')
def tail(obj, follow, prefix, number): '''Show recent logs from containers for each pod matched. NUMBER is a count of recent lines or a relative duration (e.g. 5s, 2m, 3h) ''' kubectl = obj.kubey.kubectl if re.match(r'^\d+$', number): log_args = ['--tail', str(number)] else: log_args = ['--since', number] if follow: log_args.append('-f') for pod in obj.kubey.each_pod(obj.maximum): for container in pod.containers: args = ['-n', pod.namespace, '-c', container.name] + log_args + [pod.name] if prefix: prefix = '[%s:%s] ' % (pod.name, container.name) kubectl.call_prefix(prefix, 'logs', *args) else: kubectl.call_async('logs', *args) kubectl.wait() if kubectl.final_rc != 0: click.get_current_context().exit(kubectl.final_rc)
def debug_log(msg): ctx = click.get_current_context(True) if ctx and ctx.params.get('debug'): click.echo(msg)
def from_context(cls): """Retrieve this class' instance from the current Click context. :return: Instance of this class. :rtype: Config """ try: ctx = click.get_current_context() except RuntimeError: return cls() return ctx.find_object(cls)
def click_fail(msg): """Convenience failing function.""" click.get_current_context().fail(msg)
def invoke(self, command_name, **kwargs): """ Runs a [sub]command by name, passing context automatically. """ context = click.get_current_context() command = cli.get_command(context, command_name) context.invoke(command, **kwargs)
def configure(): """ Update configuration options. Configuration will be saved in ~/.dpm/conf or the file provided with --config option. """ config.prompt_config(click.get_current_context().parent.params['config_path'])
def validate(): """ Validate datapackage in the current dir. Print validation errors if found. """ client = click.get_current_context().meta['client'] try: client.validate() except (ValidationError, dprclient.ResourceDoesNotExist) as e: echo('[ERROR] %s\n' % str(e)) sys.exit(1) echo('datapackage.json is valid\n')
def purge(): """ Purge datapackage from the registry server. """ client = click.get_current_context().meta['client'] client.purge() echo('purge ok')
def delete(): """ Delete datapackage from the registry server. """ client = click.get_current_context().meta['client'] client.delete() echo('delete ok')
def undelete(): """ Undelete datapackage from the registry server. """ client = click.get_current_context().meta['client'] client.undelete() echo('undelete ok')
def tag(tag_string): """ Tag datapackage on the server. Create new copy of the latest version of the datapackage on the server, and assign a tag. """ client = click.get_current_context().meta['client'] puburl = client.tag(tag_string) echo('Datapackage successfully tagged.')
def _parse_boolean(self, string): """Parse string representing a boolean into Python bool type. Supported values match `configparser.ConfigParser.getboolean`. """ trues = ['1', 'yes', 'true', 'on'] falses = ['0', 'no', 'false', 'off'] string_lower = string.lower() if string_lower in trues: return True elif string_lower in falses: return False else: import itertools import click msg = ( "Error: unrecognized value for --%s flag: %s\n" "Supported values (case-insensitive): %s" % (self.cli_name, string, ', '.join(itertools.chain(trues, falses))) ) click.secho(msg, err=True, fg='red', bold=True) ctx = click.get_current_context() ctx.exit(1)
def get_value(self, arguments, fallback=None): import os import os.path import click try: path = self._locate_value(arguments, fallback=fallback) # TODO: do we want a --force like flag? if os.path.exists(path): click.secho("Error: --%s directory already exists, won't " "overwrite." % self.cli_name, err=True, fg='red', bold=True) ctx = click.get_current_context() ctx.exit(1) os.makedirs(path) def fallback_(name, cli_name): return os.path.join(path, name) return fallback_ except ValueNotFoundException: # Always fail to find a value as this handler doesn't exist. def fail(*_): raise ValueNotFoundException() return fail
def _error_with_duplicate_in_set(self, elements): import click import collections counter = collections.Counter(elements) dups = {name for name, count in counter.items() if count > 1} ctx = click.get_current_context() click.echo(ctx.get_usage() + '\n', err=True) click.secho("Error: Option --%s was given these values: %r more than " "one time, values passed should be unique." % (self.cli_name, dups), err=True, fg='red', bold=True) ctx.exit(1)
def handle_in_params(self, kwargs): import q2cli.handlers arguments = {} missing = [] cmd_fallback = self.cmd_config_handler.get_value(kwargs) verbose = self.verbose_handler.get_value(kwargs, fallback=cmd_fallback) quiet = self.quiet_handler.get_value(kwargs, fallback=cmd_fallback) if verbose and quiet: click.secho('Unsure of how to be quiet and verbose at the ' 'same time.', fg='red', bold=True, err=True) click.get_current_context().exit(1) for item in self.action['signature']: if item['type'] == 'input' or item['type'] == 'parameter': name = item['name'] handler = self.generated_handlers[name] try: arguments[name] = handler.get_value( kwargs, fallback=cmd_fallback ) except q2cli.handlers.ValueNotFoundException: missing += handler.missing return arguments, missing, verbose, quiet
def outformat_is_json(): """ Only safe to call within a click context. """ ctx = click.get_current_context() state = ctx.ensure_object(CommandState) return state.outformat_is_json()
def outformat_is_text(): """ Only safe to call within a click context. """ ctx = click.get_current_context() state = ctx.ensure_object(CommandState) return state.outformat_is_text()
def get_jmespath_expression(): """ Only safe to call within a click context. """ ctx = click.get_current_context() state = ctx.ensure_object(CommandState) return state.jmespath_expr
def verbosity(): """ Only safe to call within a click context. """ ctx = click.get_current_context() state = ctx.ensure_object(CommandState) return state.verbosity
def is_verbose(): """ Only safe to call within a click context. """ ctx = click.get_current_context() state = ctx.ensure_object(CommandState) return state.is_verbose()
def task_pause_info(task_id): """ Executor for `globus task pause-info` """ client = get_client() res = client.task_pause_info(task_id) def _custom_text_format(res): explicit_pauses = [ field for field in EXPLICIT_PAUSE_MSG_FIELDS # n.b. some keys are absent for completed tasks if res.get(field[1]) ] effective_pause_rules = res['pause_rules'] if not explicit_pauses and not effective_pause_rules: safeprint('Task {} is not paused.'.format(task_id)) click.get_current_context().exit(0) if explicit_pauses: formatted_print( res, fields=explicit_pauses, text_format=FORMAT_TEXT_RECORD, text_preamble='This task has been explicitly paused.\n', text_epilog='\n' if effective_pause_rules else None) if effective_pause_rules: formatted_print( effective_pause_rules, fields=PAUSE_RULE_DISPLAY_FIELDS, text_preamble=( 'The following pause rules are effective on this task:\n')) formatted_print(res, text_format=_custom_text_format)
def resolve_id_or_name(client, bookmark_id_or_name): # leading/trailing whitespace doesn't make sense for UUIDs and the Transfer # service outright forbids it for bookmark names, so we can strip it off bookmark_id_or_name = bookmark_id_or_name.strip() res = None try: UUID(bookmark_id_or_name) # raises ValueError if argument not a UUID except ValueError: pass else: try: res = client.get_bookmark(bookmark_id_or_name.lower()) except TransferAPIError as exception: if exception.code != 'BookmarkNotFound': raise if not res: # non-UUID input or UUID not found; fallback to match by name try: # n.b. case matters to the Transfer service for bookmark names, so # two bookmarks can exist whose names vary only by their case res = next(bookmark_row for bookmark_row in client.bookmark_list() if bookmark_row['name'] == bookmark_id_or_name) except StopIteration: safeprint(u'No bookmark found for "{}"'.format( bookmark_id_or_name), write_to_stderr=True) click.get_current_context().exit(1) return res
def do_bash_complete(): comp_words, quoted = safe_split_line(os.environ['COMP_WORDS']) cur_index = int(os.environ['COMP_CWORD']) try: cur = comp_words[cur_index] completed_args = comp_words[1:-1] except IndexError: cur = None completed_args = comp_words[1:] choices = get_all_choices(completed_args, cur, quoted) safeprint('\t'.join(choices), newline=False) click.get_current_context().exit(0)
def exit_with_mapped_status(http_status): """ Given an HTTP Status, exit with either an error status of 1 or the status mapped by what we were given. """ # get the mapping by looking up the state and getting the mapping attr mapping = (click.get_current_context().ensure_object(CommandState) .http_status_map) # if there is a mapped exit code, exit with that. Otherwise, exit 1 if http_status in mapping: sys.exit(mapping[http_status]) else: sys.exit(1)
def generate_user_agent(): try: version = pkg_resources.require("twtxt")[0].version except pkg_resources.DistributionNotFound: version = "unknown" conf = click.get_current_context().obj["conf"] if conf.disclose_identity and conf.nick and conf.twturl: user_agent = "twtxt/{version} (+{url}; @{nick})".format( version=version, url=conf.twturl, nick=conf.nick) else: user_agent = "twtxt/{version}".format(version=version) return {"User-Agent": user_agent}
def get_source_by_url(url): conf = click.get_current_context().obj["conf"] if url == conf.twturl: return conf.source return next((source for source in conf.following if url == source.url), None)
def get_source_by_name(nick): nick = nick.lower() conf = click.get_current_context().obj["conf"] if nick == conf.nick and conf.twturl: return conf.source return next((source for source in conf.following if nick == source.nick), None)
def main(rst=None): """Histonets computer vision application for image processing""" ctx = click.get_current_context() if rst: click.echo() comamnds_text = 'Commands' options_text = 'Options:' main_help, _ = main.get_help(ctx).split(comamnds_text, 1) click.echo(main_help) click.echo(comamnds_text) click.echo('-' * len(comamnds_text)) click.echo() for command_name, command in sorted(main.commands.items()): click.echo(command_name) click.echo('~' * len(command_name)) click.echo(command.get_usage(ctx) .replace('histonets ', "histonets {} ".format( command_name)) .replace('\b\n', '')) click.echo() click.echo(command.help.replace('\b\n', '')) command_help = command.get_help(ctx) _, command_options_help = command_help.split(options_text, 1) command_options, _ = command_options_help.rsplit('--help', 1) click.echo() click.echo(options_text) click.echo(command_options) click.echo() elif ctx.invoked_subcommand is None: click.echo(main.get_help(ctx))
def pipeline(image, actions): """Allow chaining a series of actions to be applied to IMAGE. Output will depend on the last action applied. \b - ACTIONS is a JSON list of dictionaries containing each an 'action' key specifying the action to apply, a 'arguments' key which is a list of arguments, and a 'options' key with a dictionary to set the options for the corresponding action. Example:: histonets pipeline '[{"action": "contrast", "options": {"value": 50}}]' """ output = image.image for action in actions: ctx = click.get_current_context() arguments = [output] + action.get('arguments', []) command = main.get_command(ctx, action['action']) if command is None: raise click.BadParameter( "Action '{}' not found".format(action['action'])) action_options = action.get('options', {}) options = {param.name: action_options.get(param.name, param.default) for param in command.params[:-2]} options['output'] = RAW try: output = command.callback(*arguments, **options) except TypeError as e: raise click.BadParameter(e) return output
def info(*args): if click.get_current_context().meta['output-logmsg'] != 'silent': click.secho(*args, err=True, fg='green')
def dmsg(*args): if click.get_current_context().meta['output-logmsg'] == 'debug': click.secho(*args, err=True, fg='yellow')
def _fail(message, retcode=1): click.echo(message, sys.stderr) click.get_current_context().find_root().exit(retcode)
def _echo(output, min_lines=10): ctx = click.get_current_context() if ctx.obj.get('use_pager') and output.count('\n') > min_lines: _func = click.echo_via_pager else: _func = click.echo _func(output, sys.stdout)
def cmd_callback(self, name): ctx = click.get_current_context() for node_name in name: node = node_manager.node_get(node_name=node_name) if node: response = node.command(cmd=ctx.command.name) for r in response: print(r) else: click.secho('Node "{}" not found'.format(node_name), fg='red') # CREATE PROVIDER NODES
def create_node_callback(self, name, compute_driver, **kwargs): with click_spinner.spinner(): ctx = click.get_current_context() node_manager.node_create(node_name=name, node_type=ctx.command.name, compute_driver=compute_driver, **kwargs)
def init_nav_contexts(): try: # use the obj from the current click context. This is a bit hacky, but works as long as this method is # invoked in an execution context of click anchore_config = click.get_current_context().obj nav = navigator.Navigator(anchore_config=anchore_config, imagelist=imagelist, allimages=contexts['anchore_allimages']) return nav except Exception as err: anchore_print_err("explore operation failed") success = False ecode = 1 if not success: contexts['anchore_allimages'].clear() sys.exit(ecode)
def get_sentry_conf(): """ Fetch the SENTRY_CONF value, either from the click context if available, or SENTRY_CONF environment variable. """ try: ctx = click.get_current_context() return ctx.obj['config'] except (RuntimeError, KeyError): try: return os.environ['SENTRY_CONF'] except KeyError: return '~/.sentry'
def data(self): return click.get_current_context().params['config']