我们从Python开源项目中,提取了以下28个代码示例,用于说明如何使用click.format_filename()。
def info(ctx, spec_file): """ Describes the specification provided. """ if ctx.obj.debug: click.secho( "Specification: " + click.format_filename(spec_file), fg="yellow" ) spec = Specification(spec_file) if 'info' in spec: version = spec['info']['version'] title = spec['info']['title'] spec_license = spec['info']['license']['name'] or 'Unknown' banner = f"{title} - {version}. {spec_license} licensed" click.secho(banner, fg='green') else: click.secho(f"No info was found in {spec}.", fg="red")
def combine_command(ctx, spec_file): """ Describes the specification provided. """ if ctx.obj.debug: click.secho( "Specification: " + click.format_filename(spec_file), fg="yellow" ) spec = Specification(spec_file) combined_spec_schema = spec.combine().schema() # TODO: If -y, format as yaml print(yaml.dump(combined_spec_schema))
def spec_command(ctx, spec_file): """ Describes the specification provided. """ if ctx.obj.debug: click.secho( "Specification: " + click.format_filename(spec_file), fg="yellow" ) spec = Specification(spec_file) # This is britle as it assumes info fields are defined in the spec. if 'info' in spec: version = spec['info']['version'] title = spec['info']['title'] spec_license = spec['info']['license']['name'] or 'Unknown' banner = f"{title} - v{version}. {spec_license} licensed" click.secho(banner, fg='green') else: click.secho(f"No info was found in {spec}.", fg="red") # TODO: Implement linting of a spec. # TODO: implement validation of a spec
def walk_files(ctx, paths, ignore_dir): for i, p in enumerate(paths): p = click.format_filename(p) if p == '.' or os.path.isdir(p): for root, dirs, files in os.walk(p): if is_dir_ignored(root.lstrip('./'), *ignore_dir): continue for fn in files: yield os.path.join(root, fn) elif os.path.exists(p): yield p elif i == 0: ctx.fail('{}: No such file or directory'.format(p)) break
def account(ctx, access_token, output_file, data_format, verbose): """Show basic account info""" if not ctx.obj: ctx.obj = get_do_manager(access_token) do_account = ctx.obj.get_account() dataset = api.create_accounts_dataset(do_account, verbose=verbose) # Export to file if output_file: export_kwargs = {'lineterminator': os.linesep} if data_format == 'csv' else {} output_file.write(dataset.export(data_format, **export_kwargs).encode()) click.secho( "{format} data was successfully exported to '{file_path}'".format( format=data_format.upper(), file_path=click.format_filename(output_file.name), ), fg='green', ) # Print dataset to stdout else: for key, value in dataset.dict[0].items(): click_echo_kvp(key, value)
def cli(paths, language, time, errors, recursive): """ Verify the solution to a problem. Runs the appropriate command for a language (specified in the configuration file) with the file path(s) as arguments. If the LANGUAGE option isn't specified, it will be identified based on the file extension. Similarly, the problem ID will be identified based on the file name. """ for path in paths: if os.path.isdir(path): if recursive: validate_directory(path, language, time, errors) else: click.echo('Skipping %s because it is a directory ' 'and --recursive was not specified' % click.format_filename(path)) else: validate_file(path, language, time, errors)
def generate_resources(resources, path): if len(resources) > 1 and not os.path.isdir(path): if os.path.exists(path): sys.exit('%s needs to be a directory to create multiple ' 'resource files' % click.format_filename(path)) os.mkdir(path) for resource in resources: if len(resources) > 1 or os.path.isdir(path): resource_path = '%s/%s' % (path, resource) else: resource_path = path if os.path.exists(resource_path) and not \ click.confirm('%s already exists. Do you want to overwrite it?' % click.format_filename(resource_path)): continue shutil.copy('%s/resources/%s' % (paths.DATA, resource), path) click.echo('Created %s at path %s' % (resource, click.format_filename(path)))
def generate(json_file): """Read the JSON_FILE and write the PBS files""" json_file = click.format_filename(json_file) settings = read_jsonfile(json_file) simulation = Simulation(settings) click.echo('Job length = {}'.format(simulation.job_length)) simulation.writeSimulationFiles()
def generate(app, force, no_optimize, verbose, quiet): """ app: this will be resolved to os.getcwd()/{app}.yml """ logging.basicConfig(level=logging.WARN + 10 * quiet - 10 * verbose) cwd = os.getcwd() this_dir = os.path.dirname(os.path.realpath(__file__)) templates_dir = os.path.join(this_dir, u"templates") yaml_raw_data = read_yaml_file(os.path.join(cwd, YAML_FILENAME)) for one_raw_app in yaml_raw_data.get(YamlSchemaKeywords.APPS): if one_raw_app.get(YamlSchemaKeywords.APP_NAME) == app: target_app = one_raw_app break else: raise click.BadArgumentUsage(u"App not found") normalized_data = normalize_schema(target_app) app_target_path = os.path.join(cwd, app) if os.path.exists(app_target_path): if force: logger.info(u"Deleting {}".format(app_target_path)) shutil.rmtree(app_target_path) else: raise click.ClickException(u'Path: %s already exists.' % click.format_filename(app_target_path)) structure_of_app = get_structure(normalized_data) app_generator = TemplateFileAppGenerator(cwd, templates_dir, normalized_data, structure_of_app) logger.info(u"Generating app") app_generator.generate_app() if not no_optimize: logger.info(u"Optimizing source code") app_generator.optimize_source_codes() click.echo(u'Done')
def _handle_build_failure(app, logfile_name): click.echo(RED("Build failed! Last 15 lines of log:")) # TODO: More efficient tailing lines = [] with open(logfile_name, "r") as fh: for line in fh: lines = lines[-14:] + [line] for line in lines: click.echo(" " + remove_ansi(line).rstrip()) click.echo("See full build log at {log}".format( log=click.format_filename(logfile_name)), err=True ) app.run_hooks(PluginHook.DOCKER_FAILURE) sys.exit(1)
def suplat(input, comment, volumn, ld, outmode): infile = click.format_filename(input) appsuperlattice = superlattice.App(infile, comment, volumn, ld, outmode) appsuperlattice.run()
def ocumaker(input, comment, element, speckle, nspeckle, trs, refined, outmode, mpr): infile = click.format_filename(input) appoccupymaker = occupymaker.App(infile, comment, element, speckle, nspeckle, trs, refined, outmode, mpr) appoccupymaker.run()
def ocubiter(input, comment, element, speckle, nspeckle, trs, refined, outmode, mpr): infile=click.format_filename(input) appoccupybiter = occupybiter.App(infile, comment, element, speckle, nspeckle, trs, refined, outmode, mpr) appoccupybiter.run()
def ocutenter(input, comment, element, speckle, nspeckle, zoom, trs, refined, outmode, mpr): infile=click.format_filename(input) y = yaml.load(open(infile, "r")) appoccupytenter = occupytenter.App(y, comment, element, speckle, nspeckle, zoom, trs, refined, outmode, mpr) appoccupytenter.run()
def ocumakert(input, comment, element, speckle, trs, refined, outmode): infile=click.format_filename(input) appoccupymakert = occupymakert.App(infile, comment, element, speckle, trs, refined, outmode) appoccupymakert.run()
def paths(ctx, spec_file): """ List paths within the specification provided """ if ctx.obj.debug: click.secho( "Specification: " + click.format_filename(spec_file), fg="yellow" ) spec = Specification(spec_file) spec_paths = spec.paths() click.echo(spec_paths)
def endpoints_command(ctx, spec_file): """ List paths within the specification provided """ if ctx.obj.debug: click.secho( "Specification: " + click.format_filename(spec_file), fg="yellow" ) click.echo("Paths:") spec = Specification(spec_file) for path in spec.paths(): click.echo(' '*4 + path)
def main_build(ctx, # type: click.Context options, # type: Dict[str, Any] all, # type: bool tag, # type: Iterable[str] pip_compile_options, # type: Iterable[str] ): # type: (...) -> None """Build requirements with pip-compile.""" if not options['directory'].exists(): console.error('run `{} init\' first', ctx.find_root().info_name) ctx.abort() if not all and not tag: console.error('either --all or --tag must be provided.') ctx.abort() src_dir = options['directory'] / options['source_dir'] dest_dir = options['directory'] / options['build_dir'] if not dest_dir.exists(): dest_dir.mkdir() default_args = ['-r'] if not tag: pattern = '*{}'.format(options['extension']) tag = (path.stem for path in src_dir.glob(pattern)) for tag_name in tag: src = src_dir / ''.join((tag_name, options['extension'])) dest = dest_dir / '{}.txt'.format(tag_name) console.info('building {}', click.format_filename(str(dest))) args = default_args[:] args += [str(src)] args += list(pip_compile_options) with atomicwrites.AtomicWriter(str(dest), 'w', True).open() as f: f.write(reqwire.scaffold.MODELINES_HEADER) with tempfile.NamedTemporaryFile() as temp_file: args += ['-o', temp_file.name] sh.pip_compile(*args, _out=f, _tty_out=False)
def main_init(ctx, # type: click.Context options, # type: Dict[str, Any] force, # type: bool index_url, # type: str tag, # type: Iterable[str] extra_index_url, # type: Tuple[str] ): # type: (...) -> None """Initialize reqwire in the current directory.""" if not force and options['directory'].exists(): console.error('requirements directory already exists') ctx.abort() src_dir = reqwire.scaffold.init_source_dir( options['directory'], exist_ok=force, name=options['source_dir']) console.info('created {}', click.format_filename(str(src_dir))) build_dir = reqwire.scaffold.init_source_dir( options['directory'], exist_ok=force, name=options['build_dir']) console.info('created {}', click.format_filename(str(build_dir))) if not tag: tag = ('docs', 'main', 'qa', 'test') for tag_name in tag: filename = reqwire.scaffold.init_source_file( working_directory=options['directory'], tag_name=tag_name, extension=options['extension'], index_url=index_url, extra_index_urls=extra_index_url) console.info('created {}', click.format_filename(str(filename)))
def droplets(ctx, access_token, output_file, data_format, verbose): """List your droplets""" if not ctx.obj: ctx.obj = get_do_manager(access_token) do_droplets = ctx.obj.get_all_droplets() dataset = api.create_droplets_dataset(do_droplets, verbose=verbose) # Export to file if output_file: export_kwargs = {'lineterminator': os.linesep} if data_format == 'csv' else {} output_file.write(dataset.export(data_format, **export_kwargs).encode()) click.secho( "{format} data was successfully exported to '{file_path}'".format( format=data_format.upper(), file_path=click.format_filename(output_file.name), ), fg='green', ) # Print dataset to stdout else: for n, row in enumerate(dataset.dict, start=1): droplet_name = row.pop('Name') droplet_status = row.pop('Status') click.secho( '# {} ({})'.format(droplet_name, droplet_status), fg='yellow', bold=True, ) for key, value in row.items(): click_echo_kvp(key, value) if n != len(dataset.dict): click.echo() # Print a new line between droplets
def domains(ctx, access_token, output_file, data_format, verbose): """List your domains""" if not ctx.obj: ctx.obj = get_do_manager(access_token) do_domains = ctx.obj.get_all_domains() dataset = api.create_domains_dataset(do_domains, verbose=verbose) # Export to file if output_file: export_kwargs = {'lineterminator': os.linesep} if data_format == 'csv' else {} output_file.write(dataset.export(data_format, **export_kwargs).encode()) click.secho( "{format} data was successfully exported to '{file_path}'".format( format=data_format.upper(), file_path=click.format_filename(output_file.name), ), fg='green', ) # Print dataset to stdout else: domain = None for n, row in enumerate(dataset.dict, start=1): # Group the record by the domain if domain != row['Domain']: if n > 1: click.echo() # Print a new line between droplets domain = row['Domain'] click.secho('# {}'.format(domain), fg='yellow', bold=True) click.echo( '{subdomain:<35} {record_type:<10} {destination}'.format( subdomain=row['Subdomain'], record_type=row['Record type'], destination=row['Destination'], ) )
def validate_file(path, language, time_execution, show_errors): problem = get_problem_from_path(path) if problem is None: click.echo('Skipping %s because it does not contain ' 'a valid problem ID' % click.format_filename(path)) return if language is None: language = get_language_from_path(path) or {} click.echo('Checking output of %s: ' % click.format_filename(path), nl=False) result = verify_solution(path, language, time_execution, problem) print_result(result, show_errors, time_execution)
def cli(problem, language, path): """ Create the file for a problem. Simply specify a valid problem ID and the file will be created at euler_<id>.<extension> (if the PATH option isn't specified). Optionally, the LANGUAGE argument can be specified, which will then be used to identify an appropriate template for the file. """ if path is None: filename_format = data.config['filename format'] path = filename_format.format(id=problem['id'], extension=language['extension']) if os.path.exists(path) and not \ click.confirm('%s already exists. Do you want to overwrite it?' % click.format_filename(path)): return try: write_to_file(problem, language, path) except (FileNotFoundError, PermissionError) as exception: sys.exit('An exception occurred: %s' % exception) click.echo('Written to %s' % click.format_filename(path)) if 'resources' in problem and \ click.confirm('Generate resources for this problem?'): resource_path = click.prompt('Path (default: current directory)', default='.', show_default=False, type=click.Path(writable=True, readable=False)) generate_resources(problem['resources'], resource_path)
def print_config(ctx, param, sample_netcdf): if not sample_netcdf or ctx.resilient_parsing: return the_config = Config.from_nc(click.format_filename(sample_netcdf)).to_dict() click.echo(json.dumps(the_config, sort_keys=True, indent=4)) ctx.exit()
def init(ctx, directory): "Initialize new configuration directory." from sentry.runner.settings import discover_configs, generate_settings if directory is not None: ctx.obj['config'] = directory directory, py, yaml = discover_configs(ctx) # In this case, the config is pointing directly to a file, so we # must maintain old behavior, and just abort if yaml is None and os.path.isfile(py): # TODO: Link to docs explaining about new behavior of SENTRY_CONF? raise click.ClickException("Found legacy '%s' file, so aborting." % click.format_filename(py)) if yaml is None: raise click.ClickException("DIRECTORY must not be a file.") if directory and not os.path.exists(directory): os.makedirs(directory) py_contents, yaml_contents = generate_settings() if os.path.isfile(yaml): click.confirm("File already exists at '%s', overwrite?" % click.format_filename(yaml), abort=True) with click.open_file(yaml, 'wb') as fp: fp.write(yaml_contents) if os.path.isfile(py): click.confirm("File already exists at '%s', overwrite?" % click.format_filename(py), abort=True) with click.open_file(py, 'wb') as fp: fp.write(py_contents)
def main_remove(ctx, options, tag, specifiers, ): # type: (...) -> None """Remove packages from requirement source files.""" if not options['directory'].exists(): console.error('run `{} init\' first', ctx.find_root().info_name) ctx.abort() if not tag: tag = ('main',) for tag_name in tag: filename = reqwire.scaffold.build_filename( working_directory=options['directory'], tag_name=tag_name, extension=options['extension']) if not filename.exists(): console.warn('"{}" does not exist', click.format_filename(str(filename))) continue req_file = reqwire.helpers.requirements.RequirementFile( str(filename)) for specifier in specifiers: hireq = (reqwire.helpers.requirements.HashableInstallRequirement .from_line(specifier)) for requirement in req_file.requirements: src_req_name = requirement.name target_req_name = hireq.name if src_req_name == target_req_name: req_file.requirements.remove(requirement) console.info('removed "{}" from {}', src_req_name, tag_name) reqwire.helpers.requirements.write_requirements( filename=str(filename), requirements=req_file.requirements, header=reqwire.scaffold.build_source_header( index_url=req_file.index_url, extra_index_urls=req_file.extra_index_urls, nested_cfiles=req_file.nested_cfiles, nested_rfiles=req_file.nested_rfiles))
def quickstart(): """Quickstart wizard for setting up twtxt.""" width = click.get_terminal_size()[0] width = width if width <= 79 else 79 click.secho("twtxt - quickstart", fg="cyan") click.secho("==================", fg="cyan") click.echo() help_text = "This wizard will generate a basic configuration file for twtxt with all mandatory options set. " \ "You can change all of these later with either twtxt itself or by editing the config file manually. " \ "Have a look at the docs to get information about the other available options and their meaning." click.echo(textwrap.fill(help_text, width)) click.echo() nick = click.prompt("? Please enter your desired nick", default=os.environ.get("USER", "")) def overwrite_check(path): if os.path.isfile(path): click.confirm("? '{0}' already exists. Overwrite?".format(path), abort=True) cfgfile = click.prompt("? Please enter the desired location for your config file", os.path.join(Config.config_dir, Config.config_name), type=click.Path(readable=True, writable=True, file_okay=True)) cfgfile = os.path.expanduser(cfgfile) overwrite_check(cfgfile) twtfile = click.prompt("? Please enter the desired location for your twtxt file", os.path.expanduser("~/twtxt.txt"), type=click.Path(readable=True, writable=True, file_okay=True)) twtfile = os.path.expanduser(twtfile) overwrite_check(twtfile) twturl = click.prompt("? Please enter the URL your twtxt file will be accessible from", default="https://example.org/twtxt.txt") disclose_identity = click.confirm("? Do you want to disclose your identity? Your nick and URL will be shared when " "making HTTP requests", default=False) click.echo() add_news = click.confirm("? Do you want to follow the twtxt news feed?", default=True) conf = Config.create_config(cfgfile, nick, twtfile, twturl, disclose_identity, add_news) twtfile_dir = os.path.dirname(twtfile) if not os.path.exists(twtfile_dir): os.makedirs(twtfile_dir) open(twtfile, "a").close() click.echo() click.echo("? Created config file at '{0}'.".format(click.format_filename(conf.config_file))) click.echo("? Created twtxt file at '{0}'.".format(click.format_filename(twtfile)))
def configure(ctx, py, yaml, skip_backend_validation=False): """ Given the two different config files, set up the environment. NOTE: Will only execute once, so it's safe to call multiple times. """ global __installed if __installed: return from .importer import install if yaml is None: # `yaml` will be None when SENTRY_CONF is pointed # directly to a file, in which case, this file must exist if not os.path.exists(py): if ctx: raise click.ClickException("Configuration file does not exist. Use 'sentry init' to initialize the file.") raise ValueError("Configuration file does not exist at '%s'" % click.format_filename(py)) elif not os.path.exists(yaml) and not os.path.exists(py): if ctx: raise click.ClickException("Configuration file does not exist. Use 'sentry init' to initialize the file.") raise ValueError("Configuration file does not exist at '%s'" % click.format_filename(yaml)) os.environ['DJANGO_SETTINGS_MODULE'] = 'sentry_config' install('sentry_config', py, DEFAULT_SETTINGS_MODULE) # HACK: we need to force access of django.conf.settings to # ensure we don't hit any import-driven recursive behavior from django.conf import settings hasattr(settings, 'INSTALLED_APPS') from .initializer import initialize_app, on_configure initialize_app({ 'config_path': py, 'settings': settings, 'options': yaml, }, skip_backend_validation=skip_backend_validation) on_configure({'settings': settings}) __installed = True