我们从Python开源项目中,提取了以下49个代码示例,用于说明如何使用click.Context()。
def _write_requirements(ctx: click.Context, packages_list, outfile, prefix_list=None): with open('temp', 'w') as source_file: source_file.write('\n'.join(packages_list)) packages, _, ret = do_ex( ctx, [ 'pip-compile', '--index', '--upgrade', '--annotate', '--no-header', '-n', 'temp' ] ) os.remove('temp') with open(outfile, 'w') as req_file: if prefix_list: for prefix in prefix_list: req_file.write(f'{prefix}\n') for package in packages.splitlines(): req_file.write(f'{package}\n')
def generate_man_page(ctx, version=None): """ Generate documentation for the given command. :param click.Context ctx: the click context for the cli application. :rtype: str :returns: the generate man page from the given click Context. """ # Create man page with the details from the given context man_page = ManPage(ctx.command_path) man_page.version = version man_page.short_help = ctx.command.short_help man_page.description = ctx.command.help man_page.synopsis = ' '.join(ctx.command.collect_usage_pieces(ctx)) man_page.options = [x.get_help_record(None) for x in ctx.command.params if isinstance(x, click.Option)] commands = getattr(ctx.command, 'commands', None) if commands: man_page.commands = [(k, v.short_help) for k, v in commands.items()] return str(man_page)
def write_man_pages(name, cli, parent_ctx=None, version=None, target_dir=None): """ Generate man page files recursively for the given click cli function. :param str name: the cli name :param cli: the cli instance :param click.Context parent_ctx: the parent click context :param str target_dir: the directory where the generated man pages are stored. """ ctx = click.Context(cli, info_name=name, parent=parent_ctx) man_page = generate_man_page(ctx, version) path = '{0}.1'.format(ctx.command_path.replace(' ', '-')) if target_dir: path = os.path.join(target_dir, path) with open(path, 'w+') as f: f.write(man_page) commands = getattr(cli, 'commands', {}) for name, command in commands.items(): write_man_pages(name, command, parent_ctx=ctx, version=version, target_dir=target_dir)
def main(ctx, # type: click.Context directory, # type: click.Path quiet, # type: bool extension, # type: str source_directory, # type: str build_directory, # type: str ): # type: (...) -> None """reqwire: micromanages your requirements.""" requirements_dir = pathlib.Path(str(directory)) console.verbose = not quiet ctx.obj = { 'build_dir': build_directory, 'directory': requirements_dir, 'extension': extension, 'source_dir': source_directory, }
def deploy(ctx, autogen_policy, profile, api_gateway_stage, stage, connection_timeout): # type: (click.Context, Optional[bool], str, str, str, int) -> None factory = ctx.obj['factory'] # type: CLIFactory factory.profile = profile config = factory.create_config_obj( chalice_stage_name=stage, autogen_policy=autogen_policy, api_gateway_stage=api_gateway_stage, ) session = factory.create_botocore_session( connection_timeout=connection_timeout) d = factory.create_default_deployer(session=session, ui=UI()) deployed_values = d.deploy(config, chalice_stage_name=stage) record_deployed_values(deployed_values, os.path.join( config.project_dir, '.chalice', 'deployed.json'))
def url(ctx, stage): # type: (click.Context, str) -> None factory = ctx.obj['factory'] # type: CLIFactory config = factory.create_config_obj(stage) deployed = config.deployed_resources(stage) if deployed is not None: click.echo( "https://{api_id}.execute-api.{region}.amazonaws.com/{stage}/" .format(api_id=deployed.rest_api_id, region=deployed.region, stage=deployed.api_gateway_stage) ) else: e = click.ClickException( "Could not find a record of deployed values to chalice stage: '%s'" % stage) e.exit_code = 2 raise e
def generate_sdk(ctx, sdk_type, stage, outdir): # type: (click.Context, str, str, str) -> None factory = ctx.obj['factory'] # type: CLIFactory config = factory.create_config_obj(stage) session = factory.create_botocore_session() client = TypedAWSClient(session) deployed = config.deployed_resources(stage) if deployed is None: click.echo("Could not find API ID, has this application " "been deployed?", err=True) raise click.Abort() else: rest_api_id = deployed.rest_api_id api_gateway_stage = deployed.api_gateway_stage client.download_sdk(rest_api_id, outdir, api_gateway_stage=api_gateway_stage, sdk_type=sdk_type)
def validate(type_scheme: Type) -> t.Callable[[click.Context, str, t.Any], t.Any]: """ Creates a valid click option validator function that can be passed to click via the callback parameter. The validator function expects the type of the value to be the raw type of the type scheme. :param type_scheme: type scheme the validator validates against :return: the validator function """ def func(ctx, param, value): param = param.human_readable_name param = param.replace("-", "") res = verbose_isinstance(value, type_scheme, value_name=param) if not res: raise click.BadParameter(str(res)) return value return func
def messages(ctx, room_uuid, newer_than, older_than, limit, verbose): # type: (click.Context, str, int, int, int, bool) -> None """???????????????""" api = ctx.obj['api'] messages = api.get_messages(uuid.UUID(room_uuid), newer_than=newer_than, older_than=older_than) template = u'{m[date]} {m[user][nickname]} {m[text]}' if verbose: template = u''' {m[date]} {m[user][nickname]} {m[text]} \tid: {m[id]} \tunique_id: {m[unique_id]} \tmedia: {m[media]} \tmessage_type: {m[message_type]} \tdictated: {m[dictated]} '''.strip() for m in messages[-limit:]: click.echo(template.format(m=m)) # type: ignore
def test_no_parameters(self): """Validate a `click.Group` with no parameters. This exercises the code paths for a group with *no* arguments, *no* options and *no* environment variables. """ @click.group() def cli(): """A sample command group.""" pass ctx = click.Context(cli, info_name='cli') output = list(ext._format_command(ctx, show_nested=False)) self.assertEqual(textwrap.dedent(""" .. program:: cli .. code-block:: shell cli [OPTIONS] COMMAND [ARGS]... """).lstrip(), '\n'.join(output))
def require_executable(executable): """Check if an executable is installed. If not, exit with an error. """ def decorator(f): @click.pass_context def new_func(ctx, *args, **kwargs): """ :param click.Context ctx: """ if utils.which(executable) is None: click.echo('{} is not installed. Please install it.'.format(executable)) ctx.exit(code=exit_codes.OTHER_FAILURE) return ctx.invoke(f, *args, **kwargs) return update_wrapper(new_func, f) return decorator
def ensure_executable_exists(name, get_executable): """Ensures that the private executable exists. If it doesn't, then call get_executable, which must be a callable that installs the executable. """ def decorator(f): @click.pass_context def new_func(ctx, *args, **kwargs): """ @type ctx: click.Context """ path = path_utils.executable_path(name) if not os.path.exists(path): echo_heading('Installing {}.'.format(name), marker='-', marker_color='magenta') get_executable() assert os.path.exists(path) return ctx.invoke(f, *args, **kwargs) return update_wrapper(new_func, f) return decorator
def get_stack_or_env(ctx, path): """ Parses the path to generate relevant Envrionment and Stack object. :param ctx: Cli context. :type ctx: click.Context :param path: Path to either stack config or environment folder. :type path: str """ stack = None env = None config_reader = ConfigReader(ctx.obj["sceptre_dir"], ctx.obj["options"]) if os.path.splitext(path)[1]: stack = config_reader.construct_stack(path) else: env = config_reader.construct_environment(path) return (stack, env)
def main(ctx): # type: (click.Context) -> None """proxenos-cli: Consistently select nodes from service discovery."""
def validate_taxdump(value, method): if (os.path.isfile("%s/%s" % (value, "names.dmp")) and os.path.isfile("%s/%s" % (value, "nodes.dmp")) and os.path.isfile("%s/%s" % (value, "merged.dmp")) and os.path.isfile("%s/%s" % (value, "delnodes.dmp"))): return value else: with click.Context(method) as ctx: click.echo(ctx.get_help()) raise click.BadParameter("Could not find names.dmp in taxdump, specify a value or make sure the files are present")
def cd(path): """ Context to temporarily change the working directory Args: path: working directory to cd into """ old_dir = os.getcwd() os.chdir(path) try: yield finally: os.chdir(old_dir)
def _print_version(ctx: click.Context, param, value): if not value or ctx.resilient_parsing: return ensure_repo() _get_version(ctx) exit(0) # @click.group(invoke_without_command=True)
def reqs(ctx: click.Context, prod, test, dev): """ Write requirements files """ if not find_executable('pip-compile'): click.secho('Missing module "pip-tools".\n' 'Install it manually with: "pip install pip-tools"\n' 'Or install all dependencies with: "pip install -r requirements-dev.txt"', err=True, fg='red') exit(-1) if prod: sys.path.insert(0, os.path.abspath('.')) from setup import install_requires _write_requirements( ctx, packages_list=install_requires, outfile='requirements.txt' ) sys.path.pop(0) if test: """Writes requirements-test.txt""" from setup import test_requires _write_requirements( ctx, packages_list=test_requires, outfile='requirements-test.txt', prefix_list=['-r requirements.txt'] ) if dev: """Writes requirements-dev.txt""" from setup import dev_requires _write_requirements( ctx, packages_list=dev_requires, outfile='requirements-dev.txt', prefix_list=['-r requirements.txt', '-r requirements-test.txt'] )
def pip_install(ctx, *specifiers): # type: (click.Context, str) -> None try: result = sh.pip.install(*specifiers, _err_to_out=True, _iter=True) for line in result: click.echo(line, nl=False) except sh.ErrorReturnCode: ctx.abort()
def main_build(ctx, # type: click.Context options, # type: Dict[str, Any] all, # type: bool tag, # type: Iterable[str] pip_compile_options, # type: Iterable[str] ): # type: (...) -> None """Build requirements with pip-compile.""" if not options['directory'].exists(): console.error('run `{} init\' first', ctx.find_root().info_name) ctx.abort() if not all and not tag: console.error('either --all or --tag must be provided.') ctx.abort() src_dir = options['directory'] / options['source_dir'] dest_dir = options['directory'] / options['build_dir'] if not dest_dir.exists(): dest_dir.mkdir() default_args = ['-r'] if not tag: pattern = '*{}'.format(options['extension']) tag = (path.stem for path in src_dir.glob(pattern)) for tag_name in tag: src = src_dir / ''.join((tag_name, options['extension'])) dest = dest_dir / '{}.txt'.format(tag_name) console.info('building {}', click.format_filename(str(dest))) args = default_args[:] args += [str(src)] args += list(pip_compile_options) with atomicwrites.AtomicWriter(str(dest), 'w', True).open() as f: f.write(reqwire.scaffold.MODELINES_HEADER) with tempfile.NamedTemporaryFile() as temp_file: args += ['-o', temp_file.name] sh.pip_compile(*args, _out=f, _tty_out=False)
def main_init(ctx, # type: click.Context options, # type: Dict[str, Any] force, # type: bool index_url, # type: str tag, # type: Iterable[str] extra_index_url, # type: Tuple[str] ): # type: (...) -> None """Initialize reqwire in the current directory.""" if not force and options['directory'].exists(): console.error('requirements directory already exists') ctx.abort() src_dir = reqwire.scaffold.init_source_dir( options['directory'], exist_ok=force, name=options['source_dir']) console.info('created {}', click.format_filename(str(src_dir))) build_dir = reqwire.scaffold.init_source_dir( options['directory'], exist_ok=force, name=options['build_dir']) console.info('created {}', click.format_filename(str(build_dir))) if not tag: tag = ('docs', 'main', 'qa', 'test') for tag_name in tag: filename = reqwire.scaffold.init_source_file( working_directory=options['directory'], tag_name=tag_name, extension=options['extension'], index_url=index_url, extra_index_urls=extra_index_url) console.info('created {}', click.format_filename(str(filename)))
def get_full_name(ctx): """ Return the fully qualified name of the command. >>> main_ctx = click.Context(click.Command('main')) >>> market_ctx = click.Context(click.Command('market'), parent=main_ctx) >>> join_ctx = click.Context(click.Command('join'), parent=market_ctx) >>> get_full_name(join_ctx) '21_market_join' """ if ctx.parent is None: # This is `two1.cli.main` return '21' else: return get_full_name(ctx.parent) + '_' + ctx.command.name
def cli(ctx, project_dir, debug=False): # type: (click.Context, str, bool) -> None if project_dir is None: project_dir = os.getcwd() ctx.obj['project_dir'] = project_dir ctx.obj['debug'] = debug ctx.obj['factory'] = CLIFactory(project_dir, debug) os.chdir(project_dir)
def local(ctx, host='127.0.0.1', port=8000, stage=DEFAULT_STAGE_NAME): # type: (click.Context, str, int, str) -> None factory = ctx.obj['factory'] # type: CLIFactory run_local_server(factory, host, port, stage, os.environ)
def delete(ctx, profile, stage): # type: (click.Context, str, str) -> None factory = ctx.obj['factory'] # type: CLIFactory factory.profile = profile config = factory.create_config_obj(chalice_stage_name=stage) session = factory.create_botocore_session() d = factory.create_default_deployer(session=session, ui=UI()) d.delete(config, chalice_stage_name=stage) remove_stage_from_deployed_values(stage, os.path.join( config.project_dir, '.chalice', 'deployed.json'))
def logs(ctx, num_entries, include_lambda_messages, stage, profile): # type: (click.Context, int, bool, str, str) -> None factory = ctx.obj['factory'] # type: CLIFactory factory.profile = profile config = factory.create_config_obj(stage, False) deployed = config.deployed_resources(stage) if deployed is not None: session = factory.create_botocore_session() retriever = factory.create_log_retriever( session, deployed.api_handler_arn) display_logs(retriever, num_entries, include_lambda_messages, sys.stdout)
def package(ctx, single_file, stage, out): # type: (click.Context, bool, str, str) -> None factory = ctx.obj['factory'] # type: CLIFactory config = factory.create_config_obj(stage) packager = factory.create_app_packager(config) if single_file: dirname = tempfile.mkdtemp() try: packager.package_app(config, dirname) create_zip_file(source_dir=dirname, outfile=out) finally: shutil.rmtree(dirname) else: packager.package_app(config, out)
def generate_pipeline(ctx, codebuild_image, source, buildspec_file, filename): # type: (click.Context, str, str, str, str) -> None """Generate a cloudformation template for a starter CD pipeline. This command will write a starter cloudformation template to the filename you provide. It contains a CodeCommit repo, a CodeBuild stage for packaging your chalice app, and a CodePipeline stage to deploy your application using cloudformation. You can use any AWS SDK or the AWS CLI to deploy this stack. Here's an example using the AWS CLI: \b $ chalice generate-pipeline pipeline.json $ aws cloudformation deploy --stack-name mystack \b --template-file pipeline.json --capabilities CAPABILITY_IAM """ from chalice import pipeline factory = ctx.obj['factory'] # type: CLIFactory config = factory.create_config_obj() p = pipeline.CreatePipelineTemplate() params = pipeline.PipelineParameters( app_name=config.app_name, lambda_python_version=config.lambda_python_version, codebuild_image=codebuild_image, code_source=source, ) output = p.create_template(params) if buildspec_file: extractor = pipeline.BuildSpecExtractor() buildspec_contents = extractor.extract_buildspec(output) with open(buildspec_file, 'w') as f: f.write(buildspec_contents) with open(filename, 'w') as f: f.write(serialize_to_json(output))
def convert(self, value, param, ctx: click.Context) -> t.List[str]: """ Convert method that makes this class usable as a click type. """ if isinstance(value, self): return value elif isinstance(value, str): value = str(value) return value.split(",") self.fail("{} is no valid comma separated string list".format(value), param, ctx)
def convert(self, value, param, ctx: click.Context) -> t.Optional[bool]: """ Convert method that makes this class usable as a click type. """ if isinstance(value, self): return value elif isinstance(value, str): value = value.lower() if value == "true" : return True elif value == "false": return False elif value == "none": return None self.fail("{} is no valid bool or 'none'".format(value), param, ctx)
def convert(self, value, param, ctx: click.Context) -> int: """ Convert method that makes this class usable as a click type. """ if isinstance(value, self): return value self.fail("{} is no valid time span".format(value), param, ctx)
def disable_click_colors(): """ Set a Click context where colors are disabled. Creates a throwaway BaseCommand to play nicely with the Context constructor. The intended side-effect here is that click.echo() checks this context and will suppress colors. https://github.com/pallets/click/blob/e1aa43a3/click/globals.py#L39 """ ctx = Context(BaseCommand('AllYourBaseAreBelongToUs')) ctx.color = False push_context(ctx)
def cli(ctx, config, access_token): # type: (click.Context, str, str) -> None """BOCCO API http://api-docs.bocco.me/ ? CLI ????????""" debug = False downloads = None if config: with open(config, 'r') as f: config_json = json.load(f) debug = config_json['debug'] downloads = config_json['downloads'] access_token = config_json['access_token'] ctx.obj['api'] = Client(access_token) ctx.obj['debug'] = debug ctx.obj['downloads'] = downloads
def rooms(ctx, verbose): # type: (click.Context, bool) -> None """???????""" api = ctx.obj['api'] template = u'{index}. {r[name]}\n\t{r[uuid]}' if verbose: template = u''' {index}. {r[name]} \tUUID: {r[uuid]} \tMembers({members_count}): {members} \tSensors({sensors_count}): {sensors} \tLast message: {last_message_id} \tUpdated at: {r[updated_at]} '''.strip() for i, r in enumerate(api.get_rooms()): member_names = [m['user']['nickname'] for m in r['members']] sensor_names = [s['nickname'] for s in r['sensors']] last_message_id = 0 if 0 < len(r['messages']): last_message_id = r['messages'][0]['id'] click.echo(template.format( # type: ignore index=i + 1, r=r, members_count=len(member_names), members=u', '.join(member_names), sensors_count=len(sensor_names), last_message_id=last_message_id, sensors=u', '.join(sensor_names)))
def send(ctx, room_uuid, text): # type: (click.Context, str, str) -> None """????????????.""" api = ctx.obj['api'] click.echo(u'????????...') # type: ignore api.post_text_message(uuid.UUID(room_uuid), text)
def web(ctx): # type: (click.Context) -> None """Web ????? API ?????????""" api = ctx.obj['api'] debug = ctx.obj['debug'] downloads = ctx.obj['downloads'] app.config.update(dict(DEBUG=debug, DOWNLOADS=downloads)) app.api = api app.run()
def _get_ctx(): @click.group() def cli(): """A sample command group.""" pass @cli.command() def hello(): """A sample command.""" pass return click.Context(cli, info_name='cli')
def ensure_project_path(f): """Ensure that the project path exists. In the event that it does not exist, prompt the user to specify the path """ @click.pass_context def new_func(ctx, *args, **kwargs): """ :param click.Context ctx: """ if get_project_path() is None: click.echo('Config project_path not set.') click.echo('You can do one of the following:') choice = click_utils.prompt_choices([ 'Clone the repository to {}.'.format(default_project_path()), 'Specify the path to an existing repo.' ]) if choice == 0: path = default_project_path() if os.path.exists(path): click.confirm('Path {} already exists. Continuing will remove this path.'.format( path), default=True, abort=True) if os.path.isdir(path): shutil.rmtree(path) else: os.remove(path) click.echo('Cloning to {}...'.format(path), nl=False) clone_project() click.secho('Done', fg='black', bg='green') save_project_path(path) else: value = click.prompt( 'Please enter the path to your project', type=str) value = path_utils.canonical_path(value) if not os.path.exists(value) or not os.path.isdir(value): click.echo('This directory does not exist.') ctx.exit(code=exit_codes.OTHER_FAILURE) click.confirm('Is your project at {}?'.format( value), default=True, abort=True) save_project_path(value) return ctx.invoke(f, *args, **kwargs) return update_wrapper(new_func, f)
def option_hosted_zone_id(f): """Similar to click.option for hosted-zone-id, but in the event that the user does not specify the option, try to retrieve the ID from AWS and only error out if that fails or is ambiguous. """ @click.option('--hosted-zone-id', '-hz', default=None, help='Route 53 Hosted Zone ID for {}'.format(settings.DOMAIN_NAME)) @click.pass_context def new_func(ctx, hosted_zone_id, *args, **kwargs): """ :param click.Context ctx: """ if hosted_zone_id is None: echo_heading('Trying to find hosted zone for {}.'.format(settings.DOMAIN_NAME), marker='-', marker_color='magenta') client = boto3.client('route53') response = client.list_hosted_zones_by_name( DNSName=settings.DOMAIN_NAME ) if len(response['HostedZones']) == 0: click.echo('Unable to find a Hosted Zone for {}. Please specify it explicitly by passing --hosted-zone-id/-hz.') ctx.exit(code=exit_codes.OTHER_FAILURE) elif len(response['HostedZones']) > 1: click.echo('Found multiple Hosted Zones for {}. Please specify one explicitly by passing --hosted-zone-id/-hz.') ctx.exit(code=exit_codes.OTHER_FAILURE) hosted_zone_id = response['HostedZones'][0]['Id'].split('/')[-1] click.echo('Done.') return ctx.invoke(f, hosted_zone_id=hosted_zone_id, *args, **kwargs) return update_wrapper(new_func, f)
def skip_if_debug(f): @click.pass_context def new_func(ctx, *args, **kwargs): """ :param click.Context ctx: """ if settings.DEBUG: click.echo('Skipping because DEBUG is True.') ctx.exit() return ctx.invoke(f, *args, **kwargs) return update_wrapper(new_func, f)
def get_stack(ctx, path): """ Parses the path to generate relevant Envrionment and Stack object. :param ctx: Cli context. :type ctx: click.Context :param path: Path to either stack config or environment folder. :type path: str """ return ConfigReader( ctx.obj["sceptre_dir"], ctx.obj["options"] ).construct_stack(path)
def test_current_config(): with click.Context(main).scope() as ctx: ctx.params['config'] = {'foo': 'bar'} assert config.current_config == {'foo': 'bar'}
def test_get_config_path(self): ctx = click.Context(click.Command('cli')) ctx.obj = {'project_dir': 'foo/bar/baz/'} path = _get_config_path(ctx) self.assertEqual(path, 'foo/bar/baz/.pipetree/config.json')
def cmd_select(ctx, # type: click.Context backend, # type: str host, # type: str port, # type: int filter_address, # type: str filter_service_name, # type: str filter_service_tags, # type: str hash_method, # type: proxenos.rendezvous.HashMethod key, # type: str ): # type: (...) -> None """Selects a node from a cluster.""" if port is None: port = DEFAULT_PORTS[backend] driver_manager = stevedore.driver.DriverManager( namespace=proxenos.mappers.NAMESPACE, name=backend, invoke_on_load=False) try: mapper = driver_manager.driver(host=host, port=port) except proxenos.errors.ServiceDiscoveryConnectionError as err: click.secho(str(err), fg='red') ctx.exit(proxenos.const.ExitCode.CONNECTION_FAILURE) mapper.update() cluster = mapper.cluster.copy() # TODO(darvid): Add filtering support in the mapper API itself if filter_service_tags: pattern = re.compile(filter_service_tags) cluster = {service for service in cluster if all(pattern.match(tag) for tag in service.tags)} if filter_service_name: pattern = re.compile(filter_service_name) cluster = {service for service in cluster if pattern.match(service.name)} if filter_address: pattern = re.compile(filter_address) cluster = {service for service in cluster if pattern.match(str(service.socket_address))} service = proxenos.rendezvous.select_node( cluster, key, hash_method=hash_method) if service is None: ctx.exit(proxenos.const.ExitCode.SERVICES_NOT_FOUND) click.echo(str(service.socket_address))
def do_ex(ctx: click.Context, cmd: typing.List[str], cwd: str = '.') -> typing.Tuple[str, str, int]: """ Executes a given command Args: ctx: Click context cmd: command to run cwd: working directory (defaults to ".") Returns: stdout, stderr, exit_code """ def _popen_pipes(cmd_, cwd_): def _always_strings(env_dict): """ On Windows and Python 2, environment dictionaries must be strings and not unicode. """ env_dict.update( (key, str(value)) for (key, value) in env_dict.items() ) return env_dict return subprocess.Popen( cmd_, stdout=subprocess.PIPE, stderr=subprocess.PIPE, cwd=str(cwd_), env=_always_strings( dict( os.environ, # try to disable i18n LC_ALL='C', LANGUAGE='', HGPLAIN='1', ) ) ) def _ensure_stripped_str(_, str_or_bytes): if isinstance(str_or_bytes, str): return '\n'.join(str_or_bytes.strip().splitlines()) else: return '\n'.join(str_or_bytes.decode('utf-8', 'surogate_escape').strip().splitlines()) exe = find_executable(cmd.pop(0)) if not exe: exit(-1) cmd.insert(0, exe) click.secho(f'{cmd}', nl=False, fg='magenta') p = _popen_pipes(cmd, cwd) out, err = p.communicate() click.secho(f' -> {p.returncode}', fg='magenta') return _ensure_stripped_str(ctx, out), _ensure_stripped_str(ctx, err), p.returncode
def do( ctx: click.Context, cmd: typing.List[str], cwd: str = '.', mute_stdout: bool = False, mute_stderr: bool = False, # @formatter:off filter_output: typing.Union[None, typing.Iterable[str]]=None # @formatter:on ) -> str: """ Executes a command and returns the result Args: ctx: click context cmd: command to execute cwd: working directory (defaults to ".") mute_stdout: if true, stdout will not be printed mute_stderr: if true, stderr will not be printed filter_output: gives a list of partial strings to filter out from the output (stdout or stderr) Returns: stdout """ def _filter_output(input_): def _filter_line(line): # noinspection PyTypeChecker for filter_str in filter_output: if filter_str in line: return False return True if filter_output is None: return input_ return '\n'.join(filter(_filter_line, input_.split('\n'))) if not isinstance(cmd, (list, tuple)): cmd = shlex.split(cmd) out, err, ret = do_ex(ctx, cmd, cwd) if out and not mute_stdout: click.secho(f'{_filter_output(out)}', fg='cyan') if err and not mute_stderr: click.secho(f'{_filter_output(err)}', fg='red') if ret: click.secho(f'command failed: {cmd}', err=True, fg='red') exit(ret) return out
def main_add(ctx, # type: click.Context options, # type: Dict[str, Any] build, # type: bool editable, # type: Iterable[str] tag, # type: Iterable[str] install, # type: bool pre, # type: bool resolve_canonical_names, # type: bool resolve_versions, # type: bool specifiers, # type: Tuple[str, ...] ): # type: (...) -> None """Add packages to requirement source files.""" if not options['directory'].exists(): console.error('run `{} init\' first', ctx.find_root().info_name) ctx.abort() specifiers = tuple('-e {}'.format(e) for e in editable) + specifiers if install: pip_args = tuple(shlex.split(' '.join(specifiers))) if pre: pip_args = ('--pre',) + pip_args pip_install(ctx, *pip_args) if not tag: tag = ('main',) pip_options, session = reqwire.helpers.requirements.build_pip_session() src_dir = options['directory'] / options['source_dir'] lookup_index_urls = set() # type: Set[str] for tag_name in tag: filename = src_dir / ''.join((tag_name, options['extension'])) if not filename.exists(): continue _, finder = reqwire.helpers.requirements.parse_requirements( filename=str(filename)) lookup_index_urls |= set(finder.index_urls) try: for tag_name in tag: console.info('saving requirement(s) to {}', tag_name) reqwire.scaffold.extend_source_file( working_directory=str(options['directory']), tag_name=tag_name, specifiers=specifiers, extension=options['extension'], lookup_index_urls=lookup_index_urls, prereleases=pre, resolve_canonical_names=resolve_canonical_names, resolve_versions=resolve_versions) except piptools.exceptions.NoCandidateFound as err: console.error(str(err)) ctx.abort() if build: ctx.invoke(main_build, all=False, tag=tag)
def catch_all(func): """ Catch (nearly) all exceptions unless in debug mode. Args: func (function): function being decorated """ def _catch_all(ctx, *args, **kwargs): """ Args: ctx (click.Context): cli context object args (tuple): tuple of args of the fuction kwargs (dict): keyword args of the function """ def stderr(msg): click.echo(click.style(msg, fg='red'), file=sys.stderr) try: return func(ctx, *args, **kwargs) except click.Abort: # on SIGINT click.prompt raises click.Abort logger.error('') # just to get a newline except click.ClickException: raise # Let click deal with it except Exception: # generic error string stderr( 'You have experienced a client-side technical error.') # only dump the stack traces if the debug flag is set if kwargs.get('debug') or ctx.obj['debug']: stderr("\nFunction: {}.{}".format(func.__module__, func.__name__)) stderr("Args: {}".format(args)) stderr("Kwargs: {}".format(kwargs)) stderr("{}".format(traceback.format_exc())) else: stderr( 'For more detail, run your command with the debug flag: ' '`21 --debug <command>.`') sys.exit(1) return functools.update_wrapper(_catch_all, func)
def test_basic_parameters(self): """Validate a combination of parameters. This exercises the code paths for a group with arguments, options and environment variables. """ @click.group() @click.option('--param', envvar='PARAM', help='A sample option') @click.argument('ARG', envvar='ARG') def cli(): """A sample command group.""" pass ctx = click.Context(cli, info_name='cli') output = list(ext._format_command(ctx, show_nested=False)) self.assertEqual(textwrap.dedent(""" .. program:: cli .. code-block:: shell cli [OPTIONS] ARG COMMAND [ARGS]... .. rubric:: Options .. option:: --param <param> A sample option .. rubric:: Arguments .. option:: ARG Required argument .. rubric:: Environment variables .. envvar:: PARAM Provide a default for :option:`--param` .. envvar:: ARG Provide a default for :option:`ARG` """).lstrip(), '\n'.join(output))