我们从Python开源项目中,提取了以下49个代码示例,用于说明如何使用click.confirm()。
def delete(experiment, project): """Delete experiment group.""" user, project_name = get_project_or_local(project) if not click.confirm("Are sure you want to delete experiment `{}`".format(experiment)): click.echo('Existing without deleting experiment.') sys.exit(1) try: response = PolyaxonClients().experiment.delete_experiment( user, project_name, experiment) except (PolyaxonHTTPError, PolyaxonShouldExitError) as e: Printer.print_error('Could not delete experiment `{}`.'.format(experiment)) Printer.print_error('Error message `{}`.'.format(e)) sys.exit(1) if response.status_code == 204: Printer.print_success("Experiment `{}` was delete successfully".format(experiment))
def stop(experiment, project): """Get experiment by uuid. Examples:
polyaxon experiment stop 2 ``` """ user, project_name = get_project_or_local(project) if not click.confirm("Are sure you want to stop experiment `{}`".format(experiment)): click.echo('Existing without stopping experiment.') sys.exit(0) try: PolyaxonClients().experiment.stop(user, project_name, experiment) except (PolyaxonHTTPError, PolyaxonShouldExitError) as e: Printer.print_error('Could not stop experiment `{}`.'.format(experiment)) Printer.print_error('Error message `{}`.'.format(e)) sys.exit(1) Printer.print_success("Experiment is being stopped.")
```
def delete(project): """Delete project.""" user, project_name = get_project_or_local(project) if not click.confirm("Are sure you want to delete project `{}/{}`".format(user, project_name)): click.echo('Existing without deleting project.') sys.exit(1) try: response = PolyaxonClients().project.delete_project(user, project_name) except (PolyaxonHTTPError, PolyaxonShouldExitError) as e: Printer.print_error('Could not delete project `{}`.'.format(project)) Printer.print_error('Error message `{}`.'.format(e)) sys.exit(1) if response.status_code == 204: Printer.print_success("Project `{}` was delete successfully".format(project))
def build_end_callback(ctx,sub_command_returns,keep,report): # Catch the case where no subcommands have been issued and offer a default build if not sub_command_returns: if click.confirm("No build stages specified, would you like to run a default sequence using all the build stages?", abort=True): do_default_build(ctx) ctx.obj.file.write("exit" + "\n") ctx.obj.file.close() # Call the Vivado HLS process hls_processs = subprocess.run(["vivado_hls", "-f", "run_hls.tcl"]) # Check return status of the HLS process. if hls_processs.returncode < 0: raise click.Abort() elif hls_processs.returncode > 0: click.echo("Warning: HLS Process returned an error, skipping report opening!") raise click.Abort() else: do_end_build_stuff(ctx,sub_command_returns,report) # csim subcommand
def init(banner, hidden, backup): """Initialize a manage shell in current directory $ manage init --banner="My awesome app shell" initializing manage... creating manage.yml """ manage_file = HIDDEN_MANAGE_FILE if hidden else MANAGE_FILE if os.path.exists(manage_file): if not click.confirm('Rewrite {0}?'.format(manage_file)): return if backup: bck = '.bck_{0}'.format(manage_file) with open(manage_file, 'r') as source, open(bck, 'w') as bck_file: bck_file.write(source.read()) with open(manage_file, 'w') as output: data = default_manage_dict if banner: data['shell']['banner']['message'] = banner output.write(yaml.dump(data, default_flow_style=False))
def run_add_system(name, token, org, system, prompt): """ Adds a new system to the repo. """ repo = get_repo(token=token, org=org, name=name) try: repo.create_label(name=system.strip(), color=SYSTEM_LABEL_COLOR) click.secho("Successfully added new system {}".format(system), fg="green") if prompt and click.confirm("Run update to re-generate the page?"): run_update(name=name, token=token, org=org) except GithubException as e: if e.status == 422: click.secho( "Unable to add new system {}, it already exists.".format(system), fg="yellow") return raise
def call(self, args, devnull=False): """Call other processes. args - list of command args devnull - whether to pipe stdout to /dev/null (or equivalent) """ if self.debug: click.echo(subprocess.list2cmdline(args)) click.confirm('Continue?', default=True, abort=True) try: kwargs = {} if devnull: # Pipe to /dev/null (or equivalent). kwargs['stderr'] = subprocess.STDOUT kwargs['stdout'] = self.FNULL ret_code = subprocess.call(args, **kwargs) except subprocess.CalledProcessError: return False return ret_code
def undeploy(ctx): """Removes OpenShift from the cluster.""" print("Preparing to remove OpenShift...") if not ctx.init_with_checks(): print("Failed cursory checks, exiting.") exit(1) if ctx.auto_confirm: print("Auto confirm (-y) option set, clearing existing installation.") else: print("Really consider the decision you're about to make.") if not click.confirm("Do you want to clear the existing installation?"): print("Okay, cancelling.") exit(1) ctx.delete_namespace_byname("openshift-origin") print("Note: namespaces with openshift finalizer will need to be manually deleted if desired.") print("See: https://github.com/paralin/openshift-under-kubernetes/blob/master/REMOVING_OPENSHIFT.md")
def do_tag(config, force, src, requirement, yes): tag = config.version if not yes: click.confirm('Tag project with {}?'.format(tag), abort=True) if force: force_cmd = ['-f'] else: force_cmd = [] if call(['git', 'diff', '--exit-code']) != 0: raise click.ClickException("Please commit first.") if call(['git', 'diff', '--exit-code', '--cached']) != 0: raise click.ClickException("Please commit first.") out = check_output(['git', 'ls-files', '--other', '--exclude-standard', '--directory']) if out: click.echo(out) raise click.ClickException("Please commit first.") do_tag_requirements(config, force, src, requirement, yes=True) check_call(['git', 'tag'] + force_cmd + [tag]) check_call(['git', 'push', '-q'] + force_cmd + ['origin', 'tag', tag])
def handle_unexpected_errors(f): """Decorator that catches unexpected errors.""" @wraps(f) def call_f(*args, **kwargs): try: return f(*args, **kwargs) except Exception as e: errorf = StringIO() print_exc(file=errorf) error = errorf.getvalue() click.echo( "Looks like there's a bug in our code. Sorry about that!" " Here's the traceback:\n" + error) if click.confirm( "Would you like to file an issue in our issue tracker?", default=True, abort=True): url = "https://github.com/datawire/pib/issues/new?body=" body = quote_plus(BUG_REPORT_TEMPLATE.format( os.getcwd(), __version__, python_version, run_result(["uname", "-a"]), error)) webbrowser.open_new(url + body) return call_f
def send(ctx, address, amount, denomination, use_unconfirmed, verbose): """Send a specified address some satoshis. \b Usage ----- Send 5000 satoshi from your on-chain balance to the Apache Foundation. $ 21 send 1BtjAzWGLyAavUkbw3QsyzzNDKdtPXk95D 5000 satoshis You can use the following denominations: satoshis, bitcoins, and USD. By default, this command uses only confirmed transactions and UTXOs to send coins. To use unconfirmed transactions, use the --use-unconfirmed flag. """ if denomination == '': confirmed = click.confirm(uxstring.UxString.default_price_denomination, default=True) if not confirmed: raise exceptions.Two1Error(uxstring.UxString.cancel_command) denomination = currency.Price.SAT price = currency.Price(amount, denomination) return _send(ctx.obj['wallet'], address, price.satoshis, verbose, use_unconfirmed)
def convert_amount_to_satoshis_with_prompt(amount, denomination): """ Converts and amount with denomination to satoshis. Prompts user if no denomination is specified. Args: amount (float): representing the amount to flush denomination (str): One of [satoshis, bitcoins, usd] Returns (int): converted amount to satoshis. """ if amount != 0.0: if denomination == '': confirmed = click.confirm(uxstring.UxString.default_price_denomination, default=True) if not confirmed: raise exceptions.Two1Error(uxstring.UxString.cancel_command) denomination = Price.SAT amount = Price(amount, denomination).satoshis else: amount = None return amount
def build_id2word(self, fname=None, save_to=None): # read words.csv file if not fname: fname = self.words_fname or click.prompt('words file') fname = self.__dest(fname) assert os.path.isfile(fname), 'No such file: %s' % fname if save_to: self.id2word_fname = self.__dest(save_to) else: self.id2word_fname = LdaUtils.change_ext(fname, 'id2word') # if there is no id2word file or the user wants to rebuild, build .id2word if not os.path.isfile(self.id2word_fname) or click.confirm('There alread is id2word. Do you want to rebuild?'): print 'start building id2word' start = time() id2word = corpora.Dictionary(LdaUtils.filter_words(LdaUtils.iter_csv(fname, -1).split())) id2word.save(self.id2word_fname) # save print 'building id2word takes: %s' % LdaUtils.human_readable_time(time() - start) self.id2word = corpora.Dictionary.load(self.id2word_fname) return self.id2word
def build_corpus(self, fname=None, save_to=None): # read sentences file if not fname: fname = click.prompt('sentences file') fname = self.__dest(fname) assert os.path.isfile(fname), 'No such file: %s' % fname if save_to: self.corpus_fname = self.__dest(save_to) else: self.corpus_fname = LdaUtils.change_ext(fname, 'corpus') # if there is no corpus file or the user wants to rebuild, build .corpus if not os.path.isfile(self.corpus_fname) or click.confirm('There already is corpus. Do you want to rebuild?'): print 'start building corpus' start = time() corpora.MmCorpus.serialize(self.corpus_fname, self.__iter_doc2bow(LdaUtils.iter_csv(fname, -1).split())) # save print 'building corpus takes: %s' % LdaUtils.human_readable_time(time() - start) self.corpus = corpora.MmCorpus(self.corpus_fname) return self.corpus
def build_model(self, fname=None, save_to=None): id2word = self.id2word or self.build_id2word() corpus = self.corpus or self.build_corpus() # read model.lda file if not fname: fname = click.prompt('model file name', type=str, default='model.lda') fname = self.__dest(fname) # if there is no model file or the user wants to rebuild, build .model if not os.path.isfile(fname) or click.confirm('There already is %s. Do you want to re run lda?' % fname): num_procs = click.prompt('Number of processes to launch', type=int, default=multiprocessing.cpu_count()) num_epochs = click.prompt('Number of epochs to run', type=int, default=20) num_topics = click.prompt('Number of topics', type=int, default=100) print 'start building model' start = time() model = LdaMulticore(corpus, id2word=id2word, num_topics=num_topics, workers=num_procs, passes=num_epochs) model.save(fname) #save print 'building model takes: %s' % LdaUtils.human_readable_time(time() - start) self.model = LdaMulticore.load(fname) return self.model
def generate_resources(resources, path): if len(resources) > 1 and not os.path.isdir(path): if os.path.exists(path): sys.exit('%s needs to be a directory to create multiple ' 'resource files' % click.format_filename(path)) os.mkdir(path) for resource in resources: if len(resources) > 1 or os.path.isdir(path): resource_path = '%s/%s' % (path, resource) else: resource_path = path if os.path.exists(resource_path) and not \ click.confirm('%s already exists. Do you want to overwrite it?' % click.format_filename(resource_path)): continue shutil.copy('%s/resources/%s' % (paths.DATA, resource), path) click.echo('Created %s at path %s' % (resource, click.format_filename(path)))
def config_examples(dest, user_dir): """ Copy the example workflows to a directory. \b DEST: Path to which the examples should be copied. """ examples_path = Path(lightflow.__file__).parents[1] / 'examples' if examples_path.exists(): dest_path = Path(dest).resolve() if not user_dir: dest_path = dest_path / 'examples' if dest_path.exists(): if not click.confirm('Directory already exists. Overwrite existing files?', default=True, abort=True): return else: dest_path.mkdir() for example_file in examples_path.glob('*.py'): shutil.copy(str(example_file), str(dest_path / example_file.name)) click.echo('Copied examples to {}'.format(str(dest_path))) else: click.echo('The examples source path does not exist')
def follow(ctx, nick, url, force): """Add a new source to your followings.""" source = Source(nick, url) sources = ctx.obj['conf'].following if not force: if source.nick in (source.nick for source in sources): click.confirm("? You’re already following {0}. Overwrite?".format( click.style(source.nick, bold=True)), default=False, abort=True) _, status = get_remote_status([source])[0] if not status or status.status_code != 200: click.confirm("? The feed of {0} at {1} is not available. Follow anyway?".format( click.style(source.nick, bold=True), click.style(source.url, bold=True)), default=False, abort=True) ctx.obj['conf'].add_source(source) click.echo("? You’re now following {0}.".format( click.style(source.nick, bold=True)))
def validate_text(ctx, param, value): conf = click.get_current_context().obj["conf"] if isinstance(value, tuple): value = " ".join(value) if not value and not sys.stdin.isatty(): value = click.get_text_stream("stdin").read() if value: value = value.strip() if conf.character_warning and len(value) > conf.character_warning: click.confirm("? Warning: Tweet is longer than {0} characters. Are you sure?".format( conf.character_warning), abort=True) return value else: raise click.BadArgumentUsage("Text can’t be empty.")
def remove(inventory_name): """ Remove inventories. """ if not inventory_name.isalnum(): fatal("Your inventory name should only contains alphanumeric " "characters.") dest_path = os.path.join(inventory_path, inventory_name) if not os.path.exists(dest_path): fatal("The %s inventory doesn't exist." % inventory_name) if not click.confirm("Do you want to delete the %s inventory?" % inventory_name, default=False): info("Aborted. Nothing has been done.") return if os.path.isdir(dest_path) and not os.path.islink(dest_path): shutil.rmtree(dest_path) else: os.remove(dest_path) success("The %s inventory has been removed." % inventory_name)
def remove(organization_name): """ Remove organizations. """ if not organization_name.isalnum(): fatal("Your organization name should only contains alphanumeric " "characters.") dest_path = get_env_path(organization_name) if not os.path.exists(dest_path): fatal("The %s organization doesn't exist." % organization_name) if not click.confirm("Do you want to delete the %s organization?" % organization_name, default=False): info("Aborted. Nothing has been done.") return if os.path.isdir(dest_path) and not os.path.islink(dest_path): shutil.rmtree(dest_path) else: os.remove(dest_path) success("The %s organization has been removed." % organization_name)
def _setup_github(): if (config.has('github', 'enabled') and config.get('github', 'enabled') == 'false'): return if (config.has('github', 'token') and config.has('github', 'organizations')): return if not click.confirm('Do you wish to enable Github integration?', default=True): config.set('github', 'enabled', 'false') return config.set('github', 'enabled', 'true') for i in range(0, 3): try: _try_setup_github() break except GitHubError: pass else: sys.exit(1)
def _ask_project_details(git_root, project): if not project.initialized(): (app_name, organization, app_id) = _ask_general_info(git_root) services = _ask_services(organization) basebox, basebox_url = _ask_basebox() project.set_name(app_name) project.set_organization(organization) project.set_id(app_id) project.add_box({ 'basebox': basebox }, basebox_url) else: if not click.confirm('There is already a project configured in this ' 'folder, do you want to modify it\'s config?'): fatal('aborting') services = _ask_services(project.organization(), project.services()) project.set_services(services)
def upload(ctx, yes, local_path, remote_path): """Upload files to remote host. When uploading a single file or directory, the remote path can be either the full path to upload data into or the path to an existing directory where the data should be placed. In the latter case, the base file name from the local path will be used as the remote name. When uploading multiple files, the remote path must refer to an existing directory. Local path could be glob. """ ask_confirm, to_upload = get_files_to_upload(ctx, local_path) if not yes and ask_confirm: if not click.confirm("Some files cannot be uploaded. Proceed?"): ctx.exit(0) ctx.obj["event_loop"].run_until_complete( upload_files(ctx, remote_path, sorted(to_upload)))
def init(path, github_client_id, github_client_secret): path = os.path.expanduser(path) if not os.path.exists(path): os.makedirs(path) config_path = os.path.join(path, 'zeus.config.py') if os.path.exists(config_path): click.confirm( 'Configuration file already present at [{}]. Overwrite it?'.format( config_path), abort=True ) with open(config_path, 'wb') as fp: fp.write( CONFIG.format( secret_key=repr(binascii.hexlify(os.urandom(24))), github_client_id=repr(github_client_id), github_client_secret=repr(github_client_secret), workspace_root=repr(path), ).encode('utf-8') ) click.echo('Configuration written at {}'.format(config_path))
def create_database(): """Delete then create all the database tables.""" if not click.confirm('Are you sure?'): click.secho('Aborted', fg='red') return click.echo('Dropping everything') db.drop_all() click.echo('Creating tables') db.create_all() click.secho('Done', fg='green')
def clone(force): """Clone a copy of the TigerHost project to a private location and set the project path to the cloned location. """ path = default_project_path() if os.path.exists(path): if not force: click.confirm('Path {} already exists. Continuing will remove this path.'.format( path), default=True, abort=True) if os.path.isdir(path): shutil.rmtree(path) else: os.remove(path) click.echo('Cloning to {}...'.format(path), nl=False) clone_project() click.secho('Done', fg='black', bg='green') save_project_path(path)
def copy_config(source_path): if not os.path.isabs(source_path): source_path = os.path.join(os.getcwd(), source_path) destination_path = os.path.join(config_path, config_filename) if source_path and source_path != destination_path: if os.path.exists(source_path): if not os.path.exists(destination_path): common.mkdir(config_path) copyfile(source_path, destination_path) else: output.warning("Destination file %s already exists." % destination_path) if click.confirm('Do you want to overwrite it?'): os.remove(destination_path) copyfile(source_path, destination_path) else: output.abort("To run osxstrap without copying config, use the osxstrap command.") else: output.abort("Input file %s does not exist." % source_path)
def sample_context(ctx, app, sample_context, config): with open(sample_context) as fd: sample_ctx = fd.read() try: ujson.loads(sample_ctx) except ValueError as e: ctx.fail('Invalid JSON, %s: %s' % (str(e), sample_ctx)) with open(config, 'r') as config_file: config = yaml.safe_load(config_file) click.echo('Setting sample contenxt for %s to:\n%s' % (app, sample_ctx)) click.confirm('Do you want to continue?', abort=True) with db_from_config(config) as (conn, cursor): cursor.execute('UPDATE `application` SET `sample_context`=%s WHERE `name`=%s;', (sample_ctx, app)) conn.commit() click.echo(click.style('All done!', fg='green'))
def delete(ctx, name): if not click.confirm('Delete cluster {}?'.format(name)): return conf = load_config(name) region = conf['cluster']['zone'] zone = '-'.join(region.split('-')[:2]) context = get_context_from_settings(name) jupyter, jport, jlport, scheduler, sport, bport = services_in_context(context) cmd = "kubectl delete services --all --context {0}".format(context) logger.info(cmd) call(cmd) cmd = 'gcloud compute forwarding-rules list --format json' logger.info(cmd) out = check_output(cmd) items = json.loads(out) for item in items: if item["IPAddress"] in [jupyter, scheduler]: assert ('jupyter-notebook' in item['description'] or 'dask-scheduler' in item['description']) cmd = ('gcloud compute forwarding-rules delete ' + item['name'] + ' --region ' + zone) logger.info(cmd) call(cmd) call("gcloud container clusters delete {0}".format(name))
def stop(id): """ Stop a run before it can finish. """ try: experiment = ExperimentClient().get(normalize_job_name(id)) except FloydException: experiment = ExperimentClient().get(id) if experiment.state not in ["queued", "running"]: floyd_logger.info("Job in {} state cannot be stopped".format(experiment.state)) return if ExperimentClient().stop(experiment.id): floyd_logger.info("Experiment shutdown request submitted. Check status to confirm shutdown") else: floyd_logger.error("Failed to stop job")
def input_createntials(config): """ Read user input and check if credentials are valid. """ while True: try: config = IO.input_jira_credentials(config.url, config.username, config.password) BaseFactory.create_jira(config) except Exception: IO.error('Credentials not valid') if click.confirm('Try again?', default=True, abort=True): continue return config
def create_pub_issue(self, sk_issue): """ Migrate SK issue to PUB Jira :param sk_issue: :return: """ fields = self.convert_fields(sk_issue) click.echo("\nPlease confirm migration:") io.print_dict(fields, indent=1) if not click.confirm('\nMigrate?', default=False): return None return self._pub_jira.create_issue(fields=fields)
def list_todo(name): os.system('clear') todo_title() s = select([Category]) result = s.execute() for r in result: if r[1] == name: q = session.query(Items).filter(Items.category_id==r[0]).all() click.secho(name , fg='cyan', bold=True) for i in q: click.secho('>>>' + i.items , fg='white', bold=True) if click.confirm('Do you want to sync?'): # click.secho('Well done!' , fg='green', bold=True) F = Firebase() F.upload_firebase()
def __init__(self, load=True): if load: self._parse_cli_args() self._read_ini_settings() self._check_ocp_vars() if not os.path.exists(self.inventory_file) or self.args.create_inventory: if self.no_confirm: self._create_inventory_file() else: if click.confirm('Overwrite the existing inventory file?'): self._create_inventory_file() if self.args.create_ocp_vars or "load_balancer_hostname:" in self.lb_config: if self.no_confirm: self._create_ocp_vars() else: if click.confirm('Update the OCP install variables?'): self._create_ocp_vars() if os.path.exists(self.inventory_file): self._launch_refarch_env()
def _confirm_and_delete(base_dir, unknown_jobs, confirm, jenkins_url): utils.sechowrap('The following jobs are present on the server but not in ' 'the local repository:', fg='yellow') click.secho(' ' + '\n '.join(sorted(unknown_jobs)), fg='yellow') if confirm: utils.sechowrap('') delete = click.confirm(click.style(click.wrap_text( 'Do you really want to delete these jobs? THIS CANNOT BE ' 'RECOVERED!' ), fg='red', bold=True)) else: delete = True if delete: for job in unknown_jobs: _, jenkins_url = jenkins_api.handle_auth(base_dir, jenkins_api.delete_job, jenkins_url, job)
def create_first_user(app, created_models, verbosity, db, **kwargs): if User not in created_models: return if not router.allow_syncdb(db, User): return if not kwargs.get('interactive', True): return import click if not click.confirm('\nWould you like to create a user account now?', default=True): # Not using `abort=1` because we don't want to exit out from further execution click.echo('\nRun `sentry createuser` to do this later.\n') return from sentry.runner import call_command call_command('sentry.runner.commands.createuser.createuser')
def deploy_app(ctx, tag, force, yes): """ trigger a deployment """ ctx.set_git_hash(git_hash=tag) if not yes: status.echo(ctx) click.confirm("Continue?", abort=True, default=True) app = App(ctx.name, ctx.docker, ctx.tag) if not ecr.image_exists(app.image): click.echo("Can't find a docker for {}\naborting..".format(app.image)) return click.echo("Deploying {}".format(ctx.tag)) deployments = deploy.execute(app, force) deploy.wait_for(deployments)
def config_envdir(ctx, target_dir): env = Environment(ctx.name) cfg = env.secret target_dir = os.path.abspath(target_dir) if not click.confirm("found {} vars, will write to `{}/*`".format(len(cfg), target_dir)): click.echo("Exiting..") sys.exit(1) if not os.path.exists(target_dir): click.echo("{} did not exists, creating".format(target_dir)) os.mkdir(target_dir) else: if os.path.exists(target_dir) and not os.path.isdir(target_dir): click.echo("{} exists but isn't a directory, unable to continue".format(target_dir)) sys.exit(2) config.write_envdir(cfg, target_dir)
def create_config(): '''Create the config file with default close statuses and an empty github auth_token''' CONFIG['close_status'] = {'Invalid': ['invalid', 'wontfix', 'worksforme'], 'Insufficient data': ['insufficient_info'], 'Duplicate': ['duplicate']} CONFIG['github'] = {'auth_token': ''} if os.path.exists(CFG_PATH): if click.confirm('You already have a config file, if you continue ' 'your custom settings will be lost'): with click.open_file(CFG_PATH, 'w+') as config_file: CONFIG.write(config_file) else: sys.exit(1) else: with click.open_file(CFG_PATH, 'w+') as config_file: CONFIG.write(config_file)
def yaml_wizard(directory): while True: command = choose_command(directory) image = choose_image() yaml = YAML_SKELLINGTON.format( image=image, command=command, ) click.secho('Here\'s a preview of the Valohai.yaml file I\'m going to create.', fg='cyan') print(yaml) yaml_path = os.path.join(directory, 'valohai.yaml') if not click.confirm('Write this to {path}?'.format(path=click.style(yaml_path, bold=True))): # pragma: no cover click.echo('Okay, let\'s try again...') continue with codecs.open(yaml_path, 'w', 'UTF-8') as out_fp: out_fp.write(yaml) success('All done! Wrote {path}.'.format(path=yaml_path)) break
def choose_image(): click.echo( 'Now let\'s pick a Docker image to use with your code.\n' 'Here are some recommended choices, but feel free to type in one of your own.' ) while True: image = prompt_from_list( IMAGE_SUGGESTIONS, 'Choose a number or enter a Docker image name.', nonlist_validator=lambda s: s.strip() ) if isinstance(image, dict): image = image['name'] if click.confirm('Is {image} correct?'.format(image=click.style(image, bold=True))): break success('Great! Using {image}.'.format(image=image)) return image
def unlink(yes): """ Unlink a linked Valohai project. """ dir = get_project_directory() project = get_project() if not project: click.echo('{dir} or its parents do not seem linked to a project.'.format(dir=dir)) return 1 if not yes: click.confirm( 'Unlink {dir} from {name}?'.format( dir=click.style(project.directory, bold=True), name=click.style(project.name, bold=True), ), abort=True, ) links = settings.get('links', {}) links.pop(dir) settings['links'] = links settings.save() success('Unlinked {dir} from {name}.'.format( dir=click.style(dir, bold=True), name=click.style(project.name, bold=True) ))
def check_cli_version(): """Check if the current cli version satisfies the server requirements""" try: server_version = PolyaxonClients().version.get_cli_version() except (PolyaxonHTTPError, PolyaxonShouldExitError) as e: Printer.print_error('Could not get cli version.') Printer.print_error('Error message `{}`.'.format(e)) sys.exit(1) current_version = pkg_resources.get_distribution(PROJECT_CLI_NAME).version if LooseVersion(current_version) < LooseVersion(server_version.min_version): click.echo("""Your version of CLI ({}) is no longer compatible with server.""".format( current_version)) if click.confirm("Do you want to upgrade to " "version {} now?".format(server_version.latest_version)): pip_upgrade() sys.exit(0) else: clint.textui.puts("Your can manually run:") with clint.textui.indent(4): clint.textui.puts("pip install -U polyaxon-cli") clint.textui.puts( "to upgrade to the latest version `{}`".format(server_version.latest_version)) sys.exit(0) elif LooseVersion(current_version) < LooseVersion(server_version.latest_version): clint.textui.puts("New version of CLI ({}) is now available. To upgrade run:".format( server_version.latest_version )) with clint.textui.indent(4): clint.textui.puts("pip install -U polyaxon-cli")
def get_key(env): bucket_name, key_path = get_bucket_name_and_key_path(env) try: bucket = bucketstore.get(bucket_name) except ValueError: if click.confirm("The bucket {} does not exist, would you like to create it?".format(bucket_name)): try: bucket = bucketstore.get(bucket_name, create=True) except botocore.exceptions.ClientError as e: if (e.response["Error"]["Code"] == "BucketAlreadyExists"): raise click.ClickException("The bucket {} already exists.\nIf you created this bucket, you currently credentials do not have access, else try renaming to a different bucket".format(bucket_name)) else: raise e else: return try: key = bucket.key(key_path) # Call the meta, will raise the error if does not exist key.meta except botocore.exceptions.ClientError as e: if (e.response["Error"]["Code"] == "NoSuchKey"): bucket.set(key_path, "{}") key = bucket.key(key_path) else: raise e return key
def Confirm(self, text: str, default=True, abort=True): """Ask for a confirmation, either yes or no to a question. Args: text (str): prompts the user for a confirmation, with the given test as the question default (bool): the default for the confirmation answer. If True the default is Y(es), if False the default is N(o) abort (bool): if the program should abort if the user answer to the confirm prompt is no. The default is an abort. Returns: bool: False if the user entered no, True if the user entered yes """ return click.confirm(text, abort=abort, default=default)
def cli(homedir): """pakalolo key management tool""" chain_updated = False storedir, gpg = open_keying(homedir) chain = load_chain(storedir) if chain is None: if click.confirm("No claimchain was found in directory " + storedir + ", would you like to create one?", abort=True): chain, chain_updated = first_run(gpg) if len(chain.store) == 0: if click.confirm("Your claimchain contains no keys, would you like to create one?", abort=True): chain, chain_updated = first_run(gpg) if chain_updated and click.confirm('Do you want to save your changes?', abort=True): store_chain(chain, storedir)
def create_index(index, mappings_and_settings): """Create an index given mappings and settings as a JSON""" body = simplejson.load(mappings_and_settings) click.confirm('Create index "%s"?' % index, abort=True) es.client.indices.create(index=index, body=body) log.info('Index created')