我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用click.secho()。
def write(self, file_path: Path, template: str, context: dict={}, preserve: bool = False): """Using a template file name it renders a template into a file given a context """ if not context: context = self.context error = False try: self._write(file_path, template, context, preserve) except TemplateSyntaxError as exc: message = '{0}:{1} error: {2}'.format(exc.filename, exc.lineno, exc.message) click.secho(message, fg='red') error = True except TemplateNotFound as exc: message = '{0} error: Template not found'.format(exc.name) click.secho(message, fg='red') error = True except TemplateError as exc: message = 'error: {0}'.format(exc.message) click.secho(message, fg='red') error = True if error and Generator.strict: sys.exit(-1)
def print_task_definition(task_definition, indent=" "): if task_definition.arn: click.secho('{} arn : {}'.format(indent, task_definition.arn), fg="cyan") click.secho('{} family : {}'.format(indent, task_definition.family), fg="cyan") click.secho('{} network_mode : {}'.format(indent, task_definition.networkMode), fg="cyan") if task_definition.taskRoleArn: click.secho('{} task_role_arn : {}'.format(indent, task_definition.taskRoleArn), fg="cyan") click.secho('{} containers:'.format(indent), fg="cyan") for c in task_definition.containers: click.secho('{} {}:'.format(indent, c.name), fg="cyan") click.secho('{} image : {}'.format(indent, c.image), fg="cyan") click.secho('{} cpu : {}'.format(indent, c.cpu), fg="cyan") click.secho('{} memory : {}'.format(indent, c.memory), fg="cyan") if c.portMappings: for p in c.portMappings: click.secho('{} port : {}'.format(indent, p), fg="cyan") if c.extraHosts: for h in c.extraHosts: click.secho('{} extra_host : {}'.format(indent, h), fg="cyan")
def scale(ctx, service_name, count, dry_run, wait, asg, force_asg): """ Set the desired count for service SERVICE_NAME to COUNT. """ service = Service(yml=Config(filename=ctx.obj['CONFIG_FILE'], env_file=ctx.obj['ENV_FILE']).get_service(service_name)) print manage_asg_count(service, count, asg, force_asg) click.secho('Updating desiredCount on "{}" service in cluster "{}" to {}.'.format( service.serviceName, service.clusterName, count ), fg="white") if not dry_run: service.scale(count) if wait: click.secho(" Waiting until the service is stable with our new count ...", fg='cyan') if service.wait_until_stable(): click.secho(" Done.", fg='white') else: click.secho(" FAILURE: the service failed to start.", fg='red') sys.exit(1)
def delete(ctx, service_name, dry_run): """ Delete the service SERVICE_NAME from AWS. """ service = Service(yml=Config(filename=ctx.obj['CONFIG_FILE'], env_file=ctx.obj['ENV_FILE']).get_service(service_name)) print() click.secho('Deleting service "{}":'.format(service.serviceName), fg="white") click.secho(' Service info:', fg="green") print_service_info(service) click.secho(' Task Definition info:', fg="green") print_task_definition(service.active_task_definition) print() if not dry_run: click.echo("If you really want to do this, answer \"{}\" to the question below.\n".format(service.serviceName)) value = click.prompt("What service do you want to delete? ") if value == service.serviceName: service.scale(0) print(" Waiting for our existing containers to die ...") service.wait_until_stable() print(" All containers dead.") service.delete() print(" Deleted service {} from cluster {}.".format(service.serviceName, service.clusterName)) else: click.echo("\nNot deleting service \"{}\"".format(service.serviceName))
def print_project_status(ctx): config = ctx.obj.config solution_num = ctx.obj.solution_num project_status = gather_project_status(ctx) # Print out a 'pretty' message showing project status, first up some project details click.secho("Project Details", bold=True) click.echo(" Project Name: " + config["project_name"]) click.echo(" Number of solutions generated: " + str(solution_num)) click.echo(" Latest Solution Folder: '" + config["project_name"] + "/solution" + str(solution_num) + "'") click.echo(" Language Choice: " + config["language"]) # And now details about what builds have been run/are passing. # This section uses lots (too many!) 'conditional expressions' to embed formatting into the output. click.secho("Build Status", bold=True) click.echo(" C Simulation: " + (click.style("Pass", fg='green') if "csim_pass" in project_status else (click.style("Fail", fg='red') if "csim_fail" in project_status else (click.style("Run (Can't get status)", fg='yellow') if "csim_done" in project_status else click.style("Not Run", fg='yellow'))))) click.echo(" C Synthesis: " + (click.style("Run", fg='green') if "syn_done" in project_status else click.style("Not Run", fg='yellow'))) click.echo(" Cosimulation: " + (click.style("Pass", fg='green') if "cosim_pass" in project_status else (click.style("Fail", fg='red') if "cosim_fail" in project_status else (click.style("Run (Can't get status)", fg='yellow') if "cosim_done" in project_status else click.style("Not Run", fg='yellow'))))) click.echo(" Export:" ) click.echo(" IP Catalog: " + (click.style("Run", fg='green') if "export_ip_done" in project_status else click.style("Not Run", fg='yellow'))) click.echo(" System Generator: " + (click.style("Run", fg='green') if "export_sysgen_done" in project_status else click.style("Not Run", fg='yellow'))) click.echo(" Export Evaluation: " + (click.style("Run", fg='green') if "evaluate_done" in project_status else click.style("Not Run", fg='yellow'))) ### Click Command Definitions ### # Report Command
def lumogon(file): """ Lumogon scans the output from the lumogon container inspection tool """ cve_data = load_vulnerability_database() containers = load_data(file)["containers"] for container in containers: click.secho("==> Scanning %s" % containers[container]["container_name"], fg="blue") packages = containers[container]["capabilities"]["dpkg"]["payload"] host = containers[container]["capabilities"]["host"]["payload"] os = DEBIAN_CODENAMES[math.floor(float(host["platformversion"]))] for package in sorted(packages): version = packages[package] vulns = determine_cves(package, version, os, cve_data) print_vulns(package, vulns)
def run_add_system(name, token, org, system, prompt): """ Adds a new system to the repo. """ repo = get_repo(token=token, org=org, name=name) try: repo.create_label(name=system.strip(), color=SYSTEM_LABEL_COLOR) click.secho("Successfully added new system {}".format(system), fg="green") if prompt and click.confirm("Run update to re-generate the page?"): run_update(name=name, token=token, org=org) except GithubException as e: if e.status == 422: click.secho( "Unable to add new system {}, it already exists.".format(system), fg="yellow") return raise
def get_config(repo): """ Get the config for the repo, merged with the default config. Returns the default config if no config file is found. """ files = get_files(repo) config = DEFAULT_CONFIG if "config.json" in files: # get the config file, parse JSON and merge it with the default config config_file = repo.get_file_contents('/config.json', ref="gh-pages") try: repo_config = json.loads(config_file.decoded_content.decode("utf-8")) config.update(repo_config) except ValueError: click.secho("WARNING: Unable to parse config file. Using defaults.", fg="yellow") return config
def export(mark, file, fenv): """ Export U-Boot environment variables """ try: envimg = uboot.EnvImgOld(start_string=mark) envimg.open_img(file) with open(fenv, 'w') as f: f.write(envimg.store()) except Exception as e: click.echo(str(e) if str(e) else "Unknown Error !") sys.exit(ERROR_CODE) click.secho("Environment variables saved into: %s" % fenv) # U-Boot envimg: Update U-Boot environment variables
def create(size, redundant, bigendian, infile, outfile): """ Create new image from attached file """ try: env = uboot.EnvBlob(size=size, redundant=redundant, bigendian=bigendian) with open(infile, 'r') as f: env.load(f.read()) with open(outfile, 'wb') as f: f.write(env.export()) except Exception as e: click.echo(str(e) if str(e) else "Unknown Error !") sys.exit(ERROR_CODE) click.secho(" Successfully created: %s" % outfile) # U-Boot mkenv: Extract image content
def extract(offset, size, file): """ Extract image content """ try: fileName, _ = os.path.splitext(file) env = uboot.EnvBlob(size=size) with open(file, "rb") as f: f.seek(offset) env.parse(f.read()) with open(fileName + '.txt', 'w') as f: f.write(env.store()) except Exception as e: click.echo(str(e) if str(e) else "Unknown Error !") sys.exit(ERROR_CODE) click.secho(" Successfully extracted: %s.txt" % fileName)
def _report_test(test, passed, slack): # overwrite globals with test specifics merged = _config.copy() merged.update(test['slack']) merged.pop('text', None) merged['as_user'] = 'false' if passed: logger.info(test['test_name'] + ": passed") click.secho("passed", fg='green') slack.api_call("chat.postMessage", text=test['test_name'] + ": passed. ", **merged) else: logger.info(test['test_name'] + ": failed") click.secho("failed", fg='red') slack.api_call("chat.postMessage", text=test['test_name'] + ": failed. ", **merged)
def run(self): click.echo('{now}\n{bottery} version {version}'.format( now=datetime.now().strftime('%B %m, %Y - %H:%M:%S'), bottery=click.style('Bottery', fg='green'), version=bottery.__version__ )) self.loop.run_until_complete(self.configure()) if self._server is not None: handler = self.server.make_handler() setup_server = self.loop.create_server(handler, '0.0.0.0', 7000) self.loop.run_until_complete(setup_server) click.echo('Server running at http://localhost:7000') if not self.tasks: click.secho('No tasks found.', fg='red') self.stop() sys.exit(1) for task in self.tasks: self.loop.create_task(task()) click.echo('Quit the bot with CONTROL-C') self.loop.run_forever()
def standings(self, league_table, league): """ Prints the league standings in a pretty way """ click.secho("-"*100, fg="green", bold=True) click.secho("%-2s %-5s %-25s %-10s %-10s %-11s %-11s %-10s" % ("GP", "POS", "CLUB", "PLAYED", "GOALS", "GOAL LOSS", "GOAL DIFF","POINTS")) click.secho("-"*100, fg="green", bold=True) for key in sorted(league_table["standings"].keys()): for team in league_table["standings"][key]: if team["goalDifference"] >= 0: team["goalDifference"] = ' ' + str(team["goalDifference"]) team_str = (u"{group:<3} {rank:<5} {team:<25} " u"{playedGames:<10} {goals:<10} {goalsAgainst:<10}" u" {goalDifference:<12} {points}").format(**team) click.secho(team_str, bold=True, fg=self.colors[team['group']]) click.secho("-"*100, fg="green", bold=True)
def fixtures(self, league_table): click.secho("-"*120, fg="green", bold=True) click.secho("%-27s %-27s %-6s %-6s %-22s %-10s %-8s %-6s" % ("HOME TEAM", "AWAY TEAM", "RES","MD", "DATE", "STATUS", "ID", "LEAGUE")) click.secho("-"*120, fg="green", bold=True) for fixture in league_table['fixtures']: fixture['league'] = LEAGUE_NAMES[fixture['competitionId']] if fixture['result']['goalsAwayTeam'] is None: fixture['result'] = 'None' else: fixture['result'] = '{0}:{1}'.format(fixture['result']['goalsHomeTeam'], fixture['result']['goalsAwayTeam']) fixture_str= (u"{homeTeamName:<27} {awayTeamName:<27} " u"{result:<6} {matchday:<6} {date:<22} " u"{status:<10} {id:<8} {league}").format(**fixture) click.secho(fixture_str, bold=True, fg=self.colors.TEXT) click.secho("-"*120, fg="green", bold=True)
def ensure_module(module_name: str): """ Makes sure that a module is importable. In case the module cannot be found, print an error and exit. Args: module_name: name of the module to look for """ try: importlib.import_module(module_name) except ModuleNotFoundError: click.secho( f'Module not found: {module_name}\n' f'Install it manually with: "pip install {module_name}"\n' f'Or install all dependencies with: "pip install -r requirements-dev.txt"', fg='red', err=True) exit(-1)
def _get_version(ctx: click.Context): if _get_version.leave_me_alone_already: return if not hasattr(ctx, 'obj') or ctx.obj is None: ctx.obj = {} try: from emft.__version_frozen__ import __version__, __pep440__ ctx.obj['semver'] = __version__ ctx.obj['pep440'] = __pep440__ except ModuleNotFoundError: ctx.invoke(pin_version) click.secho(f"Semver: {ctx.obj['semver']}", fg='green') click.secho(f"PEP440: {ctx.obj['pep440']}", fg='green') _get_version.leave_me_alone_already = True
def handle_result(f): @wraps(f) def wrapper(*args, **kwargs): while True: try: return f(*args, **kwargs) except UnauthorizedError: url = kwargs.get('url') click.echo('Please login') subprocess.call(['scm', 'login', url]) break except requests.ConnectionError: click.secho('Can not connect to content manager!', fg='red') break except Exception as e: click.secho(str(e), fg='red') return wrapper
def cli(action, config_file, target, dev_addr, verbose): """ harrier - Jinja2 & sass/scss aware site builder """ is_live = action == 'serve' # TODO add watch is_served = action == 'serve' setup_logging(verbose, times=is_live) try: config = Config(config_file) target = target or action config.setup(target, served_direct=is_served) if action == 'serve': watch(config) else: assert action == 'build' build(config) except HarrierProblem as e: msg = 'Error: {}' if not verbose: msg += ', use "--verbose" for more details' click.secho(msg.format(e), fg='red', err=True)
def _check_sem_version(self, version, spec): try: if semantic_version.Version(version) in spec: return version else: click.secho( 'Error: Invalid semantic version ({0})'.format( self.specversion), fg='red') exit(1) except ValueError: click.secho( 'Error: Invalid semantic version ({0})'.format( self.specversion), fg='red') exit(1)
def _download(self, url): # Note: here we check only for the version of locally installed # packages. For this reason we don't say what's the installation # path. if self.profile.check_package_version(self.package, self.version) \ or self.force_install: fd = FileDownloader(url, self.packages_dir) filepath = fd.get_filepath() click.secho('Download ' + basename(filepath)) try: fd.start() except KeyboardInterrupt: if isfile(filepath): remove(filepath) click.secho('Abort download!', fg='red') exit(1) return filepath else: click.secho('Already installed. Version {0}'.format( self.profile.get_package_version(self.package)), fg='yellow') return None
def copy_example_files(self, example, project_dir, sayno): if isdir(self.examples_dir): project_dir = util.check_dir(project_dir) example_path = project_dir local_example_path = util.safe_join(self.examples_dir, example) if isdir(local_example_path): self._copy_files(example, local_example_path, example_path, sayno) else: click.secho(EXAMPLE_NOT_FOUND_MSG, fg='yellow') else: click.secho('Error: examples are not installed', fg='red') click.secho('Please run:\n' ' apio install examples', fg='yellow') return 1 return 0
def _enable_darwin(self): # Check homebrew brew = subprocess.call('which brew > /dev/null', shell=True) if brew != 0: click.secho('Error: homebrew is required', fg='red') else: click.secho('Enable FTDI drivers for FPGA') subprocess.call(['brew', 'update']) subprocess.call(['brew', 'install', '--force', 'libftdi']) subprocess.call(['brew', 'unlink', 'libftdi']) subprocess.call(['brew', 'link', '--force', 'libftdi']) subprocess.call(['brew', 'install', '--force', 'libffi']) subprocess.call(['brew', 'unlink', 'libffi']) subprocess.call(['brew', 'link', '--force', 'libffi']) self.profile.add_setting('macos_drivers', True) self.profile.save() click.secho('FPGA drivers enabled', fg='green')
def api_request(command, organization='FPGAwars'): result = None r = None try: r = requests.get( 'https://api.github.com/repos/{0}/{1}'.format( organization, command), headers=_get_headers()) result = r.json() r.raise_for_status() except requests.exceptions.ConnectionError: click.secho(ERROR_MESSAGE, fg='red') exit(1) except Exception as e: click.secho('Error: ' + str(e), fg='red') exit(1) finally: if r: r.close() if result is None: click.secho('Error: wrong data from GitHub API', fg='red') exit(1) return result
def cli(ctx, lsftdi, lsusb, info): """System tools.\n Install with `apio install system`""" exit_code = 0 if lsftdi: exit_code = System().lsftdi() elif lsusb: exit_code = System().lsusb() elif info: click.secho('Platform: ', nl=False) click.secho(get_systype(), fg='yellow') else: click.secho(ctx.get_help()) ctx.exit(exit_code)
def cli(ctx, list, dir, files, project_dir, sayno): """Manage verilog examples.\n Install with `apio install examples`""" exit_code = 0 if list: exit_code = Examples().list_examples() elif dir: exit_code = Examples().copy_example_dir(dir, project_dir, sayno) elif files: exit_code = Examples().copy_example_files(files, project_dir, sayno) else: click.secho(ctx.get_help()) click.secho(Examples().examples_of_use_cad()) ctx.exit(exit_code)
def cli(ctx, packages, all, list, force, platform): """Install packages.""" if packages: for package in packages: Installer(package, platform, force).install() elif all: # pragma: no cover packages = Resources(platform).packages for package in packages: if package == 'pio-fpga': # skip pio-fpga continue Installer(package, platform, force).install() elif list: Resources(platform).list_packages(installed=True, notinstalled=True) else: click.secho(ctx.get_help())
def cli(ctx): """Check the latest Apio version.""" current_version = get_distribution('apio').version latest_version = get_pypi_latest_version() if latest_version is None: ctx.exit(1) if latest_version == current_version: click.secho('You\'re up-to-date!\nApio {} is currently the ' 'newest version available.'.format(latest_version), fg='green') else: click.secho('You\'re not updated\nPlease execute ' '`pip install -U apio` to upgrade.', fg="yellow")
def _check_package(name, path=''): is_dir = isdir(path) if not is_dir: click.secho( 'Error: {} toolchain is not installed'.format(name), fg='red') if config_data: # /etc/apio.json file exists if _check_apt_get(): click.secho('Please run:\n' ' apt-get install apio-{}'.format(name), fg='yellow') else: click.secho('Please run:\n' ' apio install {}'.format(name), fg='yellow') else: click.secho('Please run:\n' ' apio install {}'.format(name), fg='yellow') return is_dir
def get_pypi_latest_version(): r = None version = None try: r = requests.get('https://pypi.python.org/pypi/apio/json') version = r.json()['info']['version'] r.raise_for_status() except requests.exceptions.ConnectionError: click.secho('Error: Could not connect to Pypi.\n' 'Check your internet connection and try again', fg='red') except Exception as e: click.secho('Error: ' + str(e), fg='red') finally: if r: r.close() return version
def check_dir(_dir): if _dir is None: _dir = os.getcwd() if isfile(_dir): click.secho( 'Error: project directory is already a file: {0}'.format(_dir), fg='red') exit(1) if not exists(_dir): try: os.makedirs(_dir) except OSError: pass return _dir
def synthesize(access_key, secret_key, output_file, voice_name, voice_language, codec, text): """Synthesize passed text and save it as an audio file""" try: ivona_api = IvonaAPI( access_key, secret_key, voice_name=voice_name, language=voice_language, codec=codec, ) except (ValueError, IvonaAPIException) as e: raise click.ClickException("Something went wrong: {}".format(repr(e))) with click.open_file(output_file, 'wb') as file: ivona_api.text_to_speech(text, file) click.secho( "File successfully saved as '{}'".format(output_file), fg='green', )
def list_voices(access_key, secret_key, voice_language, voice_gender): """List available Ivona voices""" try: ivona_api = IvonaAPI(access_key, secret_key) except (ValueError, IvonaAPIException) as e: raise click.ClickException("Something went wrong: {}".format(repr(e))) click.echo("Listing available voices...") voices_list = ivona_api.get_available_voices( language=voice_language, gender=voice_gender, ) # Group voices by language voices_dict = dict() data = sorted(voices_list, key=lambda x: x['Language']) for k, g in groupby(data, key=lambda x: x['Language']): voices_dict[k] = list(g) for ln, voices in voices_dict.items(): voice_names = [v['Name'] for v in voices] click.echo("{}: {}".format(ln, ', '.join(voice_names))) click.secho("All done", fg='green')
def info(ctx, spec_file): """ Describes the specification provided. """ if ctx.obj.debug: click.secho( "Specification: " + click.format_filename(spec_file), fg="yellow" ) spec = Specification(spec_file) if 'info' in spec: version = spec['info']['version'] title = spec['info']['title'] spec_license = spec['info']['license']['name'] or 'Unknown' banner = f"{title} - {version}. {spec_license} licensed" click.secho(banner, fg='green') else: click.secho(f"No info was found in {spec}.", fg="red")
def combine_command(ctx, spec_file): """ Describes the specification provided. """ if ctx.obj.debug: click.secho( "Specification: " + click.format_filename(spec_file), fg="yellow" ) spec = Specification(spec_file) combined_spec_schema = spec.combine().schema() # TODO: If -y, format as yaml print(yaml.dump(combined_spec_schema))
def spec_command(ctx, spec_file): """ Describes the specification provided. """ if ctx.obj.debug: click.secho( "Specification: " + click.format_filename(spec_file), fg="yellow" ) spec = Specification(spec_file) # This is britle as it assumes info fields are defined in the spec. if 'info' in spec: version = spec['info']['version'] title = spec['info']['title'] spec_license = spec['info']['license']['name'] or 'Unknown' banner = f"{title} - v{version}. {spec_license} licensed" click.secho(banner, fg='green') else: click.secho(f"No info was found in {spec}.", fg="red") # TODO: Implement linting of a spec. # TODO: implement validation of a spec
def update(taxids, conn, force_download, silent): """Update local UniProt database""" if not silent: click.secho("WARNING: Update is very time consuming and can take several " "hours depending which organisms you are importing!", fg="yellow") if not taxids: click.echo("Please note that you can restrict import to organisms by " "NCBI taxonomy IDs") click.echo("Example (human, mouse, rat):\n") click.secho("\tpyuniprot update --taxids 9606,10090,10116\n\n", fg="green") if taxids: taxids = [int(taxid.strip()) for taxid in taxids.strip().split(',') if re.search('^ *\d+ *$', taxid)] database.update(taxids=taxids, connection=conn, force_download=force_download, silent=silent)
def _read_conf(conf, version, format): try: conf = parse_yaml(conf) except: click.secho( 'Missing configuration. Did you put a `beeper.yml` file?', blink=True, fg='red' ) sys.exit(1) conf.setdefault('language', 'python') conf.setdefault('python', 'python') conf.setdefault('postinstall', []) conf.setdefault('postinstall_commands', '\n'.join(conf.get('postinstall'))) conf.setdefault('manifest', set()) conf.setdefault('current_dir', os.environ.get('WORK_DIR') or os.getcwd()) conf.setdefault('scripts', []) conf['postbuild'] = conf['scripts'] conf['version'] = version conf['manifest'] = set(conf['manifest']) conf['format'] = format return conf
def prompt_overwrite(file_name): # Skip if file doesn't exist if not os.path.exists(file_name): return True # Prompt for file overwrite if outfile already exists fmt = "File ({}) already exists. Do you want to overwrite? (y/n): " message = fmt.format(file_name) click.secho(message, nl=False, fg="red") choice = click.getchar() click.echo() if choice not in ("y", "Y"): return False return True
def cli(*args, **kwargs): """ CSVtoTable commandline utility. """ # Convert CSV file content = convert.convert(kwargs["input_file"], **kwargs) # Serve the temporary file in browser. if kwargs["serve"]: convert.serve(content) # Write to output file elif kwargs["output_file"]: # Check if file can be overwrite if (not kwargs["overwrite"] and not prompt_overwrite(kwargs["output_file"])): raise click.Abort() convert.save(kwargs["output_file"], content) click.secho("File converted successfully: {}".format( kwargs["output_file"]), fg="green") else: # If its not server and output file is missing then raise error raise click.BadOptionUsage("Missing argument \"output_file\".")
def history(config, num_pages, start, output, arena): """Get your game history""" _check_creds(config) games = [] count = 0 while count != num_pages: config.logger.debug('Getting page %d of history', start) if arena: history = config.trackobot.arena_history(page=start) else: history = config.trackobot.history(page=start) config.logger.debug('Extending games list') games.extend(history['history']) count += 1 start += 1 if start > history['meta']['total_pages']: config.logger.info('Hit max pages on account') break config.logger.debug('Dumping game history to %s', output) with open(output, 'w') as f: json.dump(games, f) click.secho('Wrote {} games to {}'.format(len(games), output), fg='green')
def __init__(self, *args, **kwargs): import sys invalid = [] unicodes = ["\u2018", "\u2019", "\u201C", "\u201D", "\u2014"] for command in sys.argv: if any(x in command for x in unicodes): invalid.append(command) if invalid: click.secho("Error: Detected invalid character in: %s\n" "Verify the correct quotes or dashes (ASCII) are " "being used." % ', '.join(invalid), err=True, fg='red', bold=True) sys.exit(-1) super().__init__(*args, **kwargs) # Plugin state for current deployment that will be loaded from cache. # Used to construct the dynamic CLI. self._plugins = None
def validate(path, level): import qiime2.sdk try: artifact = qiime2.sdk.Artifact.load(path) except Exception as e: header = 'There was a problem loading %s as a QIIME 2 Artifact:' % path q2cli.util.exit_with_error(e, header=header) try: artifact.validate(level) except qiime2.plugin.ValidationError as e: header = 'Artifact %s does not appear to be valid at level=%s:' % ( path, level) with open(os.devnull, 'w') as dev_null: q2cli.util.exit_with_error(e, header=header, file=dev_null, suppress_footer=True) except Exception as e: header = ('An unexpected error has occurred while attempting to ' 'validate artifact %s:' % path) q2cli.util.exit_with_error(e, header=header) else: click.secho('Artifact %s appears to be valid at level=%s.' % (path, level), fg="green")
def _echo_citations(): import q2cli.cache click.secho('\nCitations', fg='green') click.secho('QIIME 2 framework and command line interface', fg='cyan') click.secho('Pending a QIIME 2 publication, please cite QIIME using the ' 'original publication: ' 'http://www.ncbi.nlm.nih.gov/pubmed/20383131') plugins = q2cli.cache.CACHE.plugins if plugins: for name, plugin in sorted(plugins.items()): click.secho('\n%s %s' % (name, plugin['version']), fg='cyan') click.secho(plugin['citation_text']) else: click.secho('\nNo plugins are currently installed.\nYou can browse ' 'the official QIIME 2 plugins at https://qiime2.org')
def exit_with_error(e, header='An error has been encountered:', file=None, suppress_footer=False): import sys import traceback import textwrap import click if file is None: file = sys.stderr footer = 'See above for debug info.' else: footer = 'Debug info has been saved to %s' % file.name error = textwrap.indent(str(e), ' ') segments = [header, error] if not suppress_footer: segments.append(footer) traceback.print_exception(type(e), e, e.__traceback__, file=file) file.write('\n') click.secho('\n\n'.join(segments), fg='red', bold=True, err=True) click.get_current_context().exit(1)
def init(directory: str, dsn: str): """ Initializes a migrations directory. """ try: os.makedirs(directory) except FileExistsError: click.secho("Unable to make directory (it exists)!", fg='red') if dsn is not None: dsn = '"{}"'.format(dsn) click.secho("Writing env.py...", fg='cyan') (Path(directory) / "env.py").write_text(env_file.format(dsn=dsn)) click.secho("Making versions directory...", fg='cyan') (Path(directory) / "versions").mkdir(mode=0o755) (Path(directory) / "README").write_text("Basic asql-migrate setup.") click.secho("Done!", fg='green')
def new(message: str): """ Creates a new migration file. """ files = _get_files() # build the message filename next_num = len(files) + 1 f_message = list(' '.join(message)[:32].lower().replace(" ", "_")) filename_message = ''.join(filter(lambda c: c in string.ascii_lowercase + "_", f_message)) f_name = "{:03d}_{}.py".format(next_num, filename_message) # format the template formatted_file = migration_template.format(revision=next_num, message=' '.join(message)) p = migrations_dir / "versions" / f_name p.write_text(formatted_file) click.secho("Created new migration file {}.".format(f_name))
def validate(text, file): """Validate JSON input using dependencies-schema""" content = None if text: print('Validating text input...') content = text if file: print('Validating file input...') content = file.read() if content is None: click.secho('Please give either text input or a file path. See help for more details.', fg='red') exit(1) try: validate_json(content) click.secho('Valid JSON schema!', fg='green') except Exception as e: click.secho('Invalid JSON schema!', fg='red') raise e
def connect(config): global session # Connect to cassandra auth_provider = None if config.get('user'): auth_provider = PlainTextAuthProvider(username=config.get('user'), password=config.get('password')) cluster = Cluster( contact_points=config.get('seeds'), port=(int(config.get('port')) if config.get('port') else 9042), auth_provider=auth_provider ) try: session = cluster.connect() except: click.secho("Unable to connect to Cassandra", fg='red') sys.exit() return session
def create_migration_table(keyspace): session.set_keyspace(keyspace) click.echo("Creating shift_migrations table... ", nl=False) try: session.execute( """ CREATE TABLE IF NOT EXISTS shift_migrations( type text, time timeuuid, migration text, hash text, PRIMARY KEY (type, time) ) WITH CLUSTERING ORDER BY(time DESC) """ ) click.secho('OK', fg='green', bold=True) return (True, None) except Exception as e: click.secho('ERROR', fg='red', bold=True) return (False, e)