我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用click.Path()。
def parse_requirement_line(line, filename, line_number, session, finder): """Parse a requirement line and return an InstallRequirement instance. :param line: One line from a requirements.txt file. :param filename: Path to a requirements.txt file. :param line_number: The integer line number of the current line. :param session: Instance of pip.download.PipSession. :param finder: Instance of pip.download.PackageFinder. """ if not line: return None reqs = list(req_file.process_line( line, filename, line_number, session=session, finder=finder)) return reqs[0] if len(reqs) > 0 else None
def create_potree_page(work_dir, server_url, tablename, column): '''Create an html demo page with potree viewer ''' # get potree build potree = os.path.join(work_dir, 'potree') potreezip = os.path.join(work_dir, 'potree.zip') if not os.path.exists(potree): download('Getting potree code', 'http://3d.oslandia.com/potree.zip', potreezip) # unzipping content with ZipFile(potreezip) as myzip: myzip.extractall(path=work_dir) tablewschema = tablename.split('.')[-1] sample_page = os.path.join(work_dir, 'potree-{}.html'.format(tablewschema)) abs_sample_page = str(Path(sample_page).absolute()) pending('Creating a potree demo page : file://{}'.format(abs_sample_page)) resource = '{}.{}'.format(tablename, column) server_url = server_url.replace('http://', '') with io.open(sample_page, 'wb') as html: html.write(potree_page.format(resource=resource, server_url=server_url).encode()) ok()
def create_cesium_page(work_dir, tablename, column): '''Create an html demo page with cesium viewer ''' cesium = os.path.join(work_dir, 'cesium') cesiumzip = os.path.join(work_dir, 'cesium.zip') if not os.path.exists(cesium): download('Getting cesium code', 'http://3d.oslandia.com/cesium.zip', cesiumzip) # unzipping content with ZipFile(cesiumzip) as myzip: myzip.extractall(path=work_dir) tablewschema = tablename.split('.')[-1] sample_page = os.path.join(work_dir, 'cesium-{}.html'.format(tablewschema)) abs_sample_page = str(Path(sample_page).absolute()) pending('Creating a cesium demo page : file://{}'.format(abs_sample_page)) resource = '{}.{}'.format(tablename, column) with io.open(sample_page, 'wb') as html: html.write(cesium_page.format(resource=resource).encode()) ok()
def up(ctx, targets, run_id): """ Provisions nodes from the given target(s) in the given PinFile. pinfile: Path to pinfile (Default: workspace path) targets: Provision ONLY the listed target(s). If omitted, ALL targets in the appropriate PinFile will be provisioned. """ pf_w_path = _get_pinfile_path() try: return_code, results = lpcli.lp_up(pf_w_path, targets, run_id=run_id) _handle_results(ctx, results, return_code) except LinchpinError as e: ctx.log_state(e) sys.exit(1)
def destroy(ctx, targets, run_id): """ Destroys nodes from the given target(s) in the given PinFile. pinfile: Path to pinfile (Default: workspace path) targets: Destroy ONLY the listed target(s). If omitted, ALL targets in the appropriate PinFile will be destroyed. """ pf_w_path = _get_pinfile_path() try: return_code, results = lpcli.lp_destroy(pf_w_path, targets, run_id=run_id) _handle_results(ctx, results, return_code) except LinchpinError as e: ctx.log_state(e) sys.exit(1)
def get_click_options(self): import click import q2cli import q2cli.core name = '--' + self.cli_name type = click.Path(exists=True, file_okay=True, dir_okay=False, readable=True) type = q2cli.core.MultipleType(type) help = ('Metadata file or artifact viewable as metadata. This ' 'option may be supplied multiple times to merge metadata.') if self.default is None: requirement = '[optional]' else: requirement = '[required]' option = q2cli.Option([name], type=type, help=help, multiple=True) yield self._add_description(option, requirement)
def main(ctx, # type: click.Context directory, # type: click.Path quiet, # type: bool extension, # type: str source_directory, # type: str build_directory, # type: str ): # type: (...) -> None """reqwire: micromanages your requirements.""" requirements_dir = pathlib.Path(str(directory)) console.verbose = not quiet ctx.obj = { 'build_dir': build_directory, 'directory': requirements_dir, 'extension': extension, 'source_dir': source_directory, }
def config_examples(dest, user_dir): """ Copy the example workflows to a directory. \b DEST: Path to which the examples should be copied. """ examples_path = Path(lightflow.__file__).parents[1] / 'examples' if examples_path.exists(): dest_path = Path(dest).resolve() if not user_dir: dest_path = dest_path / 'examples' if dest_path.exists(): if not click.confirm('Directory already exists. Overwrite existing files?', default=True, abort=True): return else: dest_path.mkdir() for example_file in examples_path.glob('*.py'): shutil.copy(str(example_file), str(dest_path / example_file.name)) click.echo('Copied examples to {}'.format(str(dest_path))) else: click.echo('The examples source path does not exist')
def prepare(sets): """Prepare sticker sets to be uploaded by this tool. Reads any given directories, process any file Telegram won't accept. Generates and overwrites existing .ssd file. """ # TODO: do first-run configurations to setting paths to executables of pngquant and/or waifu2x from pytgasu.prepare import SetDefGenerator, PrepareImageFiles for set_dir in sets: set_dir = Path(set_dir).resolve() PrepareImageFiles(set_dir=set_dir) SetDefGenerator(set_dir=set_dir) print(NOTICE_GO_EDIT_DEFS)
def logout(): """Logout from Telegram.""" if not Path(PATH_TGSESSION_FILE).exists(): # Return early because TelegramClient creates a session first, # but only when you can log_out() the session file gets deleted. # If it's not there, don't create it and waste time talking to Telegram. print(ERROR_NOT_LOGGEDIN) return from telethon import TelegramClient from pytgasu.upload import CustomisedSession tc = TelegramClient( session=CustomisedSession.try_load_or_create_new(), api_id=TG_API_ID, api_hash=TG_API_HASH) tc.connect() # I guess even if user is not authorised, invoking LogOutRequest does not cause problems tc.log_out() Path(PATH_TGSESSION_FILE).parent.unlink()
def copy_plugin_contents(path, plugin): module_name = plugin.module_name.split(".", 1)[0] plugin_path = path.joinpath("plugin") plugin_path.mkdir() for entry in plugin.dist.resource_listdir(module_name): if entry == "__pycache__": continue filename = plugin.dist.get_resource_filename( pkg_resources._manager, os.path.join(module_name, entry) ) filename = pathlib.Path(filename).absolute() destpath = plugin_path.joinpath(filename.name) if filename.is_dir(): shutil.copytree(filename.as_posix(), destpath.as_posix(), symlinks=True) else: shutil.copy2(filename.as_posix(), destpath.as_posix(), follow_symlinks=False)
def init(): """Return top level command handler.""" @click.command() @click.option('--api', required=False, help='API url to use.', envvar='TREADMILL_RESTAPI') @click.option('-m', '--manifest', help='App manifest file (stream)', type=click.Path(exists=True, readable=True)) @click.option('--delete', help='Delete the app.', is_flag=True, default=False) @click.argument('appname', required=False) @cli.handle_exceptions(restclient.CLI_REST_EXCEPTIONS) def configure(api, manifest, delete, appname): """Configure a Treadmill app""" restapi = context.GLOBAL.admin_api(api) if appname: if delete: return _delete(restapi, appname) return _configure(restapi, manifest, appname) else: return _list(restapi) return configure
def init(): """Return top level command handler.""" @click.command() @click.argument('inputfile', type=click.Path(exists=True)) @click.argument('params', nargs=-1, type=click.Path(exists=True, readable=True)) def interpolate(inputfile, params): """Interpolate input file template.""" env = jinja2.Environment( loader=jinja2.FileSystemLoader(os.path.dirname(inputfile)), keep_trailing_newline=True ) data = {} for param in params: with io.open(param, 'rb') as fd: data.update(yaml.load(stream=fd)) cli.out(env.get_template(os.path.basename(inputfile)).render(data)) return interpolate
def init(): """Return top level command handler""" @click.command() @click.option('--treadmill-root', type=click.Path(exists=True), envvar='TREADMILL_APPROOT', required=True, help='Treadmill root path.') @click.option('--upload-user', envvar='TREADMILL_ID', required=True, help='Upload postmortem statistics with this user.') @click.option('--upload-url', help='Upload postmortem statistics to this file url.') def collect(treadmill_root, upload_user, upload_url): """Collect Treadmill node data""" os.environ['TREADMILL_ID'] = upload_user postmortem.run(treadmill_root, upload_url) return collect
def init(): """Top level command handler.""" @click.command() @click.option('--approot', type=click.Path(exists=True), envvar='TREADMILL_APPROOT', required=True) @click.option('--runtime', envvar='TREADMILL_RUNTIME', required=True) @click.argument('container_dir', type=click.Path(exists=True)) def finish(approot, runtime, container_dir): """Finish treadmill application on the node.""" # Run with finish context as finish runs in cleanup. with lc.LogContext(_LOGGER, os.path.basename(container_dir), lc.ContainerAdapter) as log: log.info('finish (approot %s)', approot) tm_env = appenv.AppEnvironment(approot) app_runtime.get_runtime(runtime, tm_env, container_dir).finish() return finish
def init(): """Top level command handler.""" @click.command() @click.option('--approot', type=click.Path(exists=True), envvar='TREADMILL_APPROOT', required=True) @click.option('--runtime', envvar='TREADMILL_RUNTIME', required=True) @click.argument('eventfile', type=click.Path(exists=True)) def configure(approot, runtime, eventfile): """Configure local manifest and schedule app to run.""" tm_env = appenv.AppEnvironment(root=approot) container_dir = app_cfg.configure(tm_env, eventfile, runtime) _LOGGER.info('Configured %r', container_dir) return configure
def clone(force): """Clone a copy of the TigerHost project to a private location and set the project path to the cloned location. """ path = default_project_path() if os.path.exists(path): if not force: click.confirm('Path {} already exists. Continuing will remove this path.'.format( path), default=True, abort=True) if os.path.isdir(path): shutil.rmtree(path) else: os.remove(path) click.echo('Cloning to {}...'.format(path), nl=False) clone_project() click.secho('Done', fg='black', bg='green') save_project_path(path)
def modpack_file(path: Path) -> Generator[ModPack, None, None]: """Context manager for manipulation of existing mod-pack. Keyword arguments: path: Path to the existing ModPack file, which should be provided. Yields: ModPack loaded from path. If no exception occurs, the provided modpack is written (with changes) back to the file on context exit. """ with path.open(encoding='utf-8', mode='r') as istream: mp = ModPack.load(istream) yield mp with path.open(encoding='utf-8', mode='w') as ostream: mp.dump(ostream)
def new(ctx, pack, path, gamever): """Create and initialize a new mod-pack.""" # Check file system state pack_path = Path(pack) mods_path = Path(path) if not pack_path.parent.exists(): msg = _('Mod-pack directory does not exists: {}').format(pack_path.parent) raise UserReport(msg) # Mods path existence is checked by click # Setup game fro the mod-pack game = ctx['default_game'] if gamever is not None: game.version = gamever with pack_path.open(mode='w', encoding='utf-8') as stream: mp = ModPack(game, mods_path.relative_to(pack_path.parent)) mp.dump(stream)
def install(ctx, pack, release, mod): """Install new MOD into a mod-pack.""" with modpack_file(Path(pack)) as pack: moddb = pack.game.database mod = Mod.find(moddb.session(), mod) proxy_session = requests.Session() with ctx['token_path'].open(encoding='utf-8') as token: proxy_session.auth = Authorization.load(token) changes = pack.install_changes( mod=mod, min_release=Release[release.capitalize()], session=proxy_session, ) pack.apply(changes)
def upgrade(ctx, pack, release, mod): """Upgrade MOD and its dependencies.""" with modpack_file(Path(pack)) as pack: moddb = pack.game.database mod = Mod.find(moddb.session(), mod) proxy_session = requests.Session() with ctx['token_path'].open(encoding='utf-8') as token: proxy_session.auth = Authorization.load(token) changes = pack.upgrade_changes( mod=mod, min_release=Release[release.capitalize()], session=proxy_session, ) if not changes: raise AlreadyUpToDate(mod.name) pack.apply(changes)
def load_config(func): """Decorator for add load config file option""" @functools.wraps(func) def internal(*args, **kwargs): filename = kwargs.pop('config') if filename is None: click.echo('--config option is required.', err=True) raise SystemExit(1) config = load(pathlib.Path(filename)) kwargs['config'] = config return func(*args, **kwargs) decorator = click.option( '--config', '-c', type=click.Path(exists=True), envvar='YUI_CONFIG_FILE_PATH', ) return decorator(internal)
def cmd_status(self): click.echo('On branch %s' % self.repo.active_branch) click.echo() click.echo(' Working') click.echo(' Cache') click.echo(' Depot') click.echo(' SHA-256 Size Path') for entry in self._entries(): if self.depot: self.depot.get_status(entry) if entry.is_linked: w_bit = 'W' elif entry.in_working: w_bit = '*' else: w_bit = ' ' c_bit = entry.in_cache and 'C' or ' ' d_bit = entry.in_depot and 'D' or ' ' click.echo('[ {} {} {} ] {} {:>6} {}'.format( w_bit, c_bit, d_bit, entry.digest[:8], human_size(entry.size), entry.rel_path)) click.echo()
def sync_source_option(func): """ Add the sync source option to the decorated command func. :param func: Click CLI command to decorate :return: decorated CLI command """ return option( '--src', '-s', 'sync_source', type=Path( exists=True, file_okay=False, resolve_path=True, ), help='Local directory from which to sync. Optional.', )(func)
def sync_destination_option(func): """ Add the sync destination option to the decorated command func. :param func: Click CLI command to decorate :return: decorated CLI command """ return option( '--dest', '-d', 'sync_destination', type=Path( file_okay=False, ), help='Remote directory to sync to. Optional.', )(func)
def make_directory_override_option(func): """ Add the make destination option to the decorated command func. :param func: Click CLI command to decorate :return: decorated CLI command """ return option( '--dest', '-d', 'make_destination', type=Path( file_okay=False, ), help='Remote directory to run make in. Optional.', )(func)
def extract_start(source_id, sources_config): """ Start extraction for a pipeline specified by ``source_id`` defined in ``--sources-config``. ``--sources-config defaults to ``settings.SOURCES_CONFIG_FILE``. :param sources_config: Path to file containing pipeline definitions. Defaults to the value of ``settings.SOURCES_CONFIG_FILE`` :param source_id: identifier used in ``--sources_config`` to describe pipeline """ sources = load_sources_config(sources_config) # Find the requested source definition in the list of available sources source = None for candidate_source in sources: if candidate_source['id'] == source_id: source = candidate_source continue # Without a config we can't do anything, notify the user and exit if not source: click.echo('Error: unable to find source with id "%s" in sources ' 'config' % source_id) return setup_pipeline(source)
def run(model, collect, filename, directory, ignore_git, pytest_args): """ Run the test suite and collect results. MODEL: Path to model file. Can also be supplied via the environment variable MEMOTE_MODEL or configured in 'setup.cfg' or 'memote.ini'. """ if ignore_git: repo = None else: repo = callbacks.probe_git() if not any(a.startswith("--tb") for a in pytest_args): pytest_args = ["--tb", "short"] + pytest_args if not any(a.startswith("-v") for a in pytest_args): pytest_args.append("-vv") if collect: if repo is not None and directory is not None: filename = join(directory, "{}.json".format(repo.active_branch.commit.hexsha)) code = api.test_model(model, filename, pytest_args=pytest_args) else: code = api.test_model(model, pytest_args=pytest_args) sys.exit(code)
def index_documents(ctx, hocr_files, autocomplete_min_count): def show_fn(hocr_path): if hocr_path is None: return '' else: return hocr_path.name global repository if repository is None: repository = DatabaseRepository(ctx.obj['DB_PATH']) hocr_files = tuple(pathlib.Path(p) for p in hocr_files) with click.progressbar(hocr_files, item_show_func=show_fn) as hocr_files: for hocr_path in hocr_files: try: repository.ingest_document(hocr_path, autocomplete_min_count) except Exception as e: logger.error("Could not ingest {}".format(hocr_path)) logger.exception(e)
def get_requirements_and_latest(filename, force=False): """Parse a requirements file and get latest version for each requirement. Yields a tuple of (original line, InstallRequirement instance, spec_versions, latest_version). :param filename: Path to a requirements.txt file. :param force: Force getting latest version even for packages without a version specified. """ session = PipSession() finder = PackageFinder( session=session, find_links=[], index_urls=[PyPI.simple_url]) _, content = get_file_content(filename, session=session) for line_number, line, orig_line in yield_lines(content): line = req_file.COMMENT_RE.sub('', line) line = line.strip() req = parse_requirement_line(line, filename, line_number, session, finder) if req is None or req.name is None or req_file.SCHEME_RE.match(req.name): yield (orig_line, None, None, None) continue spec_ver = current_version(req) if spec_ver or force: latest_ver = latest_version(req, session, finder) yield (orig_line, req, spec_ver, latest_ver)
def tileset(table, column, server_url, work_dir): """ (Re)build a tileset.json for a given table """ # intialize flask application create_app() work_dir = Path(work_dir) if '.' not in table: table = 'public.{}'.format(table) lpsession = Session(table, column) # initialize range for level of details fullbbox = lpsession.boundingbox bbox = [ fullbbox['xmin'], fullbbox['ymin'], fullbbox['zmin'], fullbbox['xmax'], fullbbox['ymax'], fullbbox['zmax'] ] pending('Building tileset from database') hcy = threedtiles.build_hierarchy_from_pg( lpsession, server_url, bbox ) ok() tileset = os.path.join(str(work_dir.resolve()), 'tileset-{}.{}.json'.format(table, column)) pending('Writing tileset to disk') with io.open(tileset, 'wb') as out: out.write(hcy.encode()) ok()
def demo(sample, work_dir, server_url, usewith): ''' download sample lidar data, load it into pgpointcloud ''' srid = None if isinstance(samples[sample], (list, tuple)): # srid given srid = samples[sample][0] download_link = samples[sample][1] else: download_link = samples[sample] filepath = Path(download_link) pending('Using sample data {}: {}'.format(sample, filepath.name)) dest = os.path.join(work_dir, filepath.name) ok() if not os.path.exists(dest): download('Downloading sample', download_link, dest) # now load data if srid: _load(dest, sample, 'points', work_dir, server_url, 400, usewith, srid=srid) else: _load(dest, sample, 'points', work_dir, server_url, 400, usewith) click.echo( 'Now you can test lopocs server by executing "lopocs serve"' .format(sample) )
def find_element(yaml_dict, search_str): """ Given a dictionary representing a yaml document and a yaml path string, find the specified element in the dictionary.""" # First split on / to determine which yaml dict we are searching in dict_parts = search_str.split("/") parsed_parts = [] for dict_part in dict_parts: matches = re.match("(.*)(\[([0-9]+)\])", dict_part) if matches: list_name = matches.groups()[0] list_index = int(matches.groups()[2]) parsed_parts.append(list_name) parsed_parts.append(list_index) else: parsed_parts.append(dict_part) # traverse the yaml path node = yaml_dict try: for key in parsed_parts: node = node[key] except (KeyError, IndexError, TypeError): raise exceptions.InvalidSearchStringException(search_str) # Try accessing the line of the path we are currently on. If we can't access it, # it means that the user has specified a path to a dict or list, without indicating an item within the # dictionary or list. try: node.line node.key = parsed_parts[-1] # add the last parsed key as the node's key except AttributeError: click.echo("ERROR: Path exists but not specific enough (%s)." % search_str, err=True) exit(1) return node
def clut_option(**attrs): attrs.setdefault("help", "Color lookup table (path to PNG or JPEG image)") return str_option("--clut", type=click.Path(exists=True, dir_okay=False, resolve_path=True), **attrs)
def input_dir_option(**attrs): attrs.setdefault("help", "Input directory containing .jpg and/or .png images, for collage functions") return str_option("--input-dir", type=click.Path(exists=True, file_okay=False, resolve_path=True), **attrs)
def name_option(default=None, **attrs): attrs.setdefault("help", "Filename for image output (should end with .png or .jpg)") return str_option("--name", type=click.Path(dir_okay=False), default=default or "noise.png", **attrs)
def get_click_options(self): import click import q2cli yield q2cli.Option( ['--' + self.cli_name], type=click.Path(exists=False, dir_okay=True, file_okay=False, writable=True), help='Output unspecified results to a directory')
def get_click_options(self): import click import q2cli yield q2cli.Option( ['--' + self.cli_name], type=click.Path(exists=True, dir_okay=False, file_okay=True, readable=True), help='Use config file for command options')
def load_envfile(config_path): """Load an Envfile.yaml into a envfile.System given a Path.""" try: with config_path.open() as f: return _load_envfile(safe_load(f.read())) except ValidationError as e: click.echo("Error loading Envfile.yaml:") for error in e.errors: click.echo("---\n" + error) exit(1)
def cli_deploy(logfile, directory, envfile_path): envfile = load_envfile(Path(envfile_path)) directory = Path(directory) run_local = start(logfile) redeploy(run_local, envfile, directory) print_service_url(run_local, envfile)
def cli_watch(logfile, directory, envfile_path): envfile = load_envfile(Path(envfile_path)) directory = Path(directory) run_local = start(logfile) redeploy(run_local, envfile, directory) print_service_url(run_local, envfile) watch(run_local, envfile, directory)
def antlr(): """generate a new parser based on the grammar using antlr""" cwd = str(Path('qface/idl/parser').abspath()) sh('antlr4 -Dlanguage=Python3 -Werror -package qface.idl.parser -o . -listener -visitor T.g4', cwd=cwd)
def on_any_event(self, event): if event.is_directory: return if Path(event.src_path).ext == '.py': sh('python3 -m pytest')
def run(self): if self.is_running: return self.is_running = True sh(self.script, cwd=Path.getcwd()) self.is_running = False
def reload(script, input, output): """ reloads the generator script when the script files or the input files changes """ script = Path(script).expand().abspath() output = Path(output).expand().abspath() input = input if isinstance(input, (list, tuple)) else [input] output.makedirs_p() _script_reload(script, input, output)
def _script_reload(script, input, output): """run the named generator and monitor the input and generator folder""" input = [Path(entry).expand().abspath() for entry in input] output = Path(output).expand().abspath() cmd = 'python3 {0} {1} {2}'.format(script, ' '.join(input), output) event_handler = RunScriptChangeHandler(cmd) event_handler.run() # run always once observer = Observer() path = script.dirname().expand().abspath() click.secho('watch: {0}'.format(path), fg='blue') observer.schedule(event_handler, path, recursive=True) for entry in input: entry = entry.dirname().expand().abspath() click.secho('watch: {0}'.format(entry), fg='blue') observer.schedule(event_handler, entry, recursive=True) path = Path(__file__).parent / 'qface' click.secho('watch: {0}'.format(path), fg='blue') observer.schedule(event_handler, path, recursive=True) observer.start() try: while True: time.sleep(1) except KeyboardInterrupt: observer.stop() observer.join()
def upload(): sh('twine upload dist/*') Path('build').rmtree_p()
def pack(): Path('build').rmtree_p() Path('dist').rmtree_p() sh('python3 setup.py bdist_wheel') sh('unzip -l dist/*.whl')
def clean(): Path('build').rmtree_p() Path('dist').rmtree_p() Path('qface.egg-info').rmtree_p()
def _wizard(): click.echo('Running the setup wizard. On each line, a default value is shown in the brackets if valid.\ Leave blank to use it, or enter a new value.') config.mc_dir = click.prompt('Your \'.minecraft\' folder path', show_default=True, default=config.mc_dir, type=click.Path(exists=True)) config.java_dir = click.prompt('Your \'javaw\' file path', show_default=True, default=config.java_dir, type=click.Path(exists=True)) config.max_mem = click.prompt('Maximum memory allocated to Minecraft in MB', show_default=True, default=config.max_mem, type=int) config.username = click.prompt('Your Minecraft username', show_default=True, default=config.username) click.echo('Done! More entries can be reached later manually.\n') config.save()