我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用click.prompt()。
def ask_for_configs(): remote_path = click.prompt("Remote path", default="~/remote/path") ssh_host = click.prompt("SSH host", default="ssh_host") ssh_user = click.prompt("SSH username or enter '-' to skip", default="ssh_user") ignores = click.prompt("Files or folders to ignore " "(separated by space)", default=" ") if ssh_user == "-": ssh_user = None if ignores.strip(): ignores = ignores.split(" ") else: ignores = [] return psync.generate_config(ssh_user=ssh_user, ssh_host=ssh_host, remote_path=remote_path, ignores=ignores)
def delete(ctx, service_name, dry_run): """ Delete the service SERVICE_NAME from AWS. """ service = Service(yml=Config(filename=ctx.obj['CONFIG_FILE'], env_file=ctx.obj['ENV_FILE']).get_service(service_name)) print() click.secho('Deleting service "{}":'.format(service.serviceName), fg="white") click.secho(' Service info:', fg="green") print_service_info(service) click.secho(' Task Definition info:', fg="green") print_task_definition(service.active_task_definition) print() if not dry_run: click.echo("If you really want to do this, answer \"{}\" to the question below.\n".format(service.serviceName)) value = click.prompt("What service do you want to delete? ") if value == service.serviceName: service.scale(0) print(" Waiting for our existing containers to die ...") service.wait_until_stable() print(" All containers dead.") service.delete() print(" Deleted service {} from cluster {}.".format(service.serviceName, service.clusterName)) else: click.echo("\nNot deleting service \"{}\"".format(service.serviceName))
def cluster_ssh(ctx, service_name): """ SSH to the specified EC2 system in the ECS cluster running SERVICE_NAME. """ service = Service( yml=Config(filename=ctx.obj['CONFIG_FILE'], env_file=ctx.obj['ENV_FILE']).get_service(service_name)) ips = service.get_host_ips() for index, ip in enumerate(ips): click.echo("Instance {}: {}".format(index+1, ip)) instance = click.prompt("Which instance to ssh to?", type=int) if instance > len(ips): click.echo("That is not a valid instance.") return instance_ip = ips[instance-1] service.cluster_ssh(instance_ip)
def select_stream(graylog_api, stream): is_admin = graylog_api.user["permissions"] == ["*"] or 'Admin' in graylog_api.user["roles"] stream = stream if stream is not None else graylog_api.default_stream if not stream: streams = graylog_api.streams()["streams"] click.echo("Please select a stream to query:") for i, st in enumerate(streams): stream_id = st["id"].encode(utils.UTF8) message = "{}: Stream '{}' (id: {})".format(i, st["title"].encode(utils.UTF8), stream_id) click.echo(message) if is_admin: message = "{}: Stream '{}'".format(len(streams), 'All Streams') click.echo(message) stream_index = click.prompt("Enter stream number:", type=int, default=0) if stream_index < len(streams): stream = streams[stream_index]["id"] elif stream_index >= len(streams) and not is_admin: CliInterface.select_stream(graylog_api, stream) if stream and stream != '*': return "streams:{}".format(stream)
def pair(config): """Pair to your account. This is still useless, but will be needed for certain remore commands on the vehicle. """ api = api_from_config(config) if not api.request_pairing(): raise click.ClickException("Unable to request pairing") click.echo( "A Pairing code has been requested to your phone." "Please enter it below!") code = click.prompt('Registration Code', type=int) if api.confirm_pairing(code): raise click.ClickException("Error pairing client") click.echo(click.style("Pairing sucessfull!", fg='green'))
def delete_monitor(ctx, monitor, confirm): if not confirm: confirm = click.prompt(''' ! WARNING: Destructive Action ! This command will destroy the monitor: {monitor} ! To proceed, type "{monitor}" or re-run this command with --confirm={monitor} '''.format(monitor=monitor), prompt_suffix='> ') if confirm.strip() != monitor: print('abort') sys.exit(1) with Spinner('Deleting monitor {}: '.format(monitor), remove_message=False): newrelic.delete_monitor(ctx.obj['ACCOUNT'], monitor) print(click.style(u'OK', fg='green', bold=True))
def get_vm_options(): """ Get user-selected config options for the 21 VM. """ logger.info(click.style("Configure 21 virtual machine.", fg=TITLE_COLOR)) logger.info("Press return to accept defaults.") default_disk = Two1MachineVirtual.DEFAULT_VDISK_SIZE default_memory = Two1MachineVirtual.DEFAULT_VM_MEMORY default_port = Two1MachineVirtual.DEFAULT_SERVICE_PORT disk_size = click.prompt(" Virtual disk size in MB (default = %s)" % default_disk, type=int, default=default_disk, show_default=False) vm_memory = click.prompt(" Virtual machine memory in MB (default = %s)" % default_memory, type=int, default=default_memory, show_default=False) server_port = click.prompt(" Port for micropayments server (default = %s)" % default_port, type=int, default=default_port, show_default=False) return VmConfiguration(disk_size=disk_size, vm_memory=vm_memory, server_port=server_port)
def get_server_port(): """ Get user-selected server port. This is used for native docker engine on AWS. """ logger.info(click.style("Configure 21 micropayments server:", fg=TITLE_COLOR)) logger.info("Press return to accept default.") default_port = Two1Machine.DEFAULT_SERVICE_PORT server_port = click.prompt(" Port for micropayments server (default = %s)" % default_port, type=int, default=default_port, show_default=False) return server_port
def allow(ctx, foreign_account, permission, weight, threshold, account): """ Add a key/account to an account's permission """ if not foreign_account: from musebase.account import PasswordKey pwd = click.prompt( "Password for Key Derivation", hide_input=True, confirmation_prompt=True ) foreign_account = format( PasswordKey(account, pwd, permission).get_public(), "MUSE" ) pprint(ctx.muse.allow( foreign_account, weight=weight, account=account, permission=permission, threshold=threshold ))
def unlockWallet(f): @click.pass_context def new_func(ctx, *args, **kwargs): if not ctx.obj.get("unsigned", False): if ctx.muse.wallet.created(): if "UNLOCK" in os.environ: pwd = os.environ["UNLOCK"] else: pwd = click.prompt("Current Wallet Passphrase", hide_input=True) ctx.muse.wallet.unlock(pwd) else: click.echo("No wallet installed yet. Creating ...") pwd = click.prompt("Wallet Encryption Passphrase", hide_input=True, confirmation_prompt=True) ctx.muse.wallet.create(pwd) return ctx.invoke(f, *args, **kwargs) return update_wrapper(new_func, f)
def init(ovh, force): """Generate the configuration file.""" if config_file_exists(): if not force: ovh.error('A configuration file already exists ' '(use --force to erase it).') ovh.exit() else: ovh.warning('The configuration file will be erased.\n') # Display the welcome message ovh.echo(WELCOME_MESSAGE) # According to the choice, we launch the good def choice = click.prompt('Your choice', default=1, value_proc=check_choice) ovh.echo('') launch_setup_by_choice(ovh, choice)
def launch_setup_2(ctx): """Choice 1 : user has the AK and AS tokens. We generate for him a link to validate the CK token.""" endpoint = click.prompt('Endpoint', default='ovh-eu', value_proc=check_endpoint) application_key = click.prompt('Application key') application_secret = click.prompt('Application secret') ctx.echo('') validation = get_ck_validation(endpoint, application_key, application_secret) ctx.info("Please visit the following link to authenticate you and " "validate the token :") ctx.info(validation['validationUrl']) click.pause() create_config_file(endpoint, application_key, application_secret, validation['consumerKey']) ctx.success('Configuration file created.')
def extract(html_response, ssl_verification_enabled, session): """ :param response: raw http response :param html_response: html result of parsing http response :return: """ roles_page_url = _action_url_on_validation_success(html_response) vip_security_code = click.prompt(text='Enter your Symantec VIP Access code', type=str) click.echo('Going for aws roles') return _retrieve_roles_page( roles_page_url, _context(html_response), session, ssl_verification_enabled, vip_security_code, )
def config_(action, **kw): """ Configure your launcher and game preferences. The 'action' argument is optional, and can be 'reset' or 'wizard'. If it's left blank, only given options will be set. Else, given options will be set AFTER the corresponding action is executed. """ if action == 'reset': ok = click.prompt('Are you sure you want to reset you settings?', confirmation_prompt=True) if ok: config.reset() elif action == 'wizard': _wizard() for k, v in kw.items(): if v is None: break if isinstance(v, str) and v[0] == '=': v = v[1:] config[k.replace('-', '_')] = v config.save()
def authenticate_account(get_auth_url=False, code=None, for_business=False): if for_business: error(translator['od_pref.authenticate_account.for_business_unsupported']) return authenticator = od_auth.OneDriveAuthenticator() click.echo(translator['od_pref.authenticate_account.permission_note']) if code is None: click.echo(translator['od_pref.authenticate_account.paste_url_note']) click.echo('\n' + click.style(authenticator.get_auth_url(), underline=True) + '\n') if get_auth_url: return click.echo(translator['od_pref.authenticate_account.paste_url_instruction'].format( redirect_url=click.style(authenticator.APP_REDIRECT_URL, bold=True))) url = click.prompt(translator['od_pref.authenticate_account.paste_url_prompt'], type=str) code = extract_qs_param(url, 'code') if code is None: error(translator['od_pref.authenticate_account.error.code_not_found_in_url']) return try: authenticator.authenticate(code) success(translator['od_pref.authenticate_account.success.authorized']) save_account(authenticator) except Exception as e: error(translator['od_pref.authenticate_account.error.authorization'].format(error_message=str(e)))
def connect(): while True: click.secho("??????") click.secho("-" * 20) for k,v in SERVERS.items(): click.secho("[%d] :%s (%s)" % (k, v[0], v[1])) click.secho("-" * 20) num = click.prompt("????? ", type=int, default=1) if num not in SERVERS: click.echo("????") continue ip,port = SERVERS[num][1].split(":") c = api.connect(ip, int(port)) if not c: raise Exception("????") else: break
def run_function(df, value): click.secho("??????? " + str(value) + " : " + FUNCTION_LIST[value][0]) click.secho("-" * 20) click.secho(FUNCTION_LIST[value][1]) params_str = click.prompt("????? ", type=str, default=FUNCTION_LIST[value][3]) params = [p.strip() for p in params_str.split(",")] click.secho("-" * 20) try: result = FUNCTION_LIST[value][2](params) if df: result = api.to_df(result) click.secho(str(result), bold=True) return result else: pprint.pprint(result) return result except Exception as e: import traceback print('-' * 60) traceback.print_exc(file=sys.stdout) print('-' * 60) click.secho("??????????? " + str(e), fg='red')
def gen_config_file(real_trade_dll_name): se("????????..") random_uuid = uuid.uuid1().hex enc_key = random_uuid[:16] enc_iv = random_uuid[16:] se("???enc_key = [{}] , enc_iv = [{}]".format(enc_key, enc_iv)) bind_ip = click.prompt('??????ip??', default="127.0.0.1") bind_port = click.prompt('?????????', default="19820") config_file_content = """bind={} port={} trade_dll_path={} transport_enc_key={} transport_enc_iv={} """.format(bind_ip, bind_port, real_trade_dll_name, enc_key, enc_iv) return config_file_content, bind_ip, bind_port, enc_key, enc_iv
def build_id2word(self, fname=None, save_to=None): # read words.csv file if not fname: fname = self.words_fname or click.prompt('words file') fname = self.__dest(fname) assert os.path.isfile(fname), 'No such file: %s' % fname if save_to: self.id2word_fname = self.__dest(save_to) else: self.id2word_fname = LdaUtils.change_ext(fname, 'id2word') # if there is no id2word file or the user wants to rebuild, build .id2word if not os.path.isfile(self.id2word_fname) or click.confirm('There alread is id2word. Do you want to rebuild?'): print 'start building id2word' start = time() id2word = corpora.Dictionary(LdaUtils.filter_words(LdaUtils.iter_csv(fname, -1).split())) id2word.save(self.id2word_fname) # save print 'building id2word takes: %s' % LdaUtils.human_readable_time(time() - start) self.id2word = corpora.Dictionary.load(self.id2word_fname) return self.id2word
def build_corpus(self, fname=None, save_to=None): # read sentences file if not fname: fname = click.prompt('sentences file') fname = self.__dest(fname) assert os.path.isfile(fname), 'No such file: %s' % fname if save_to: self.corpus_fname = self.__dest(save_to) else: self.corpus_fname = LdaUtils.change_ext(fname, 'corpus') # if there is no corpus file or the user wants to rebuild, build .corpus if not os.path.isfile(self.corpus_fname) or click.confirm('There already is corpus. Do you want to rebuild?'): print 'start building corpus' start = time() corpora.MmCorpus.serialize(self.corpus_fname, self.__iter_doc2bow(LdaUtils.iter_csv(fname, -1).split())) # save print 'building corpus takes: %s' % LdaUtils.human_readable_time(time() - start) self.corpus = corpora.MmCorpus(self.corpus_fname) return self.corpus
def build_model(self, fname=None, save_to=None): id2word = self.id2word or self.build_id2word() corpus = self.corpus or self.build_corpus() # read model.lda file if not fname: fname = click.prompt('model file name', type=str, default='model.lda') fname = self.__dest(fname) # if there is no model file or the user wants to rebuild, build .model if not os.path.isfile(fname) or click.confirm('There already is %s. Do you want to re run lda?' % fname): num_procs = click.prompt('Number of processes to launch', type=int, default=multiprocessing.cpu_count()) num_epochs = click.prompt('Number of epochs to run', type=int, default=20) num_topics = click.prompt('Number of topics', type=int, default=100) print 'start building model' start = time() model = LdaMulticore(corpus, id2word=id2word, num_topics=num_topics, workers=num_procs, passes=num_epochs) model.save(fname) #save print 'building model takes: %s' % LdaUtils.human_readable_time(time() - start) self.model = LdaMulticore.load(fname) return self.model
def assign(self, datafile=None, outputfile=None): if not datafile: datafile = click.prompt('Data file', type=str, default='sentences_all.csv') datafile = self.__dest(datafile) self.datafile = datafile if not outputfile: datafilename, ext = os.path.splitext(datafile) default_outputfile = datafilename+'_result'+ext outputfile = click.prompt('output file', type=str, default=default_outputfile) assert os.path.isfile(datafile), 'No such file: %s' % datafile print 'start assiging' start = time() with open(datafile) as fi, open(outputfile, 'w') as fo: csv_reader = csv.reader(fi, delimiter=',') csv_writer = csv.writer(fo, delimiter=',') for row in csv_reader: out_row = row[:2] # post_id and sentence_seq filtered_words = LdaUtils.filter_words(row[-1].split(' ')) out_row.append(' '.join(map(str, self.query_tag(filtered_words)))) csv_writer.writerow(out_row) print 'assigning takes: %s' % LdaUtils.human_readable_time(time() - start)
def rm(ctx, pass_name, recursive, force): """Remove the password names `pass-name` from the password store. This command is alternatively named `remove` or `delete`. If `--recursive` or `-r` is specified, delete pass-name recursively if it is a directory. If `--force` or `-f` is specified, do not interactively prompt before removal. """ try: ctx.obj.remove_path(pass_name, recursive, force) except StoreNotInitialisedError: click.echo(MSG_STORE_NOT_INITIALISED_ERROR) return 1 except FileNotFoundError: click.echo('{0} is not in the password store.'.format(pass_name)) return 1 except PermissionError: click.echo(MSG_PERMISSION_ERROR) return 1
def _github_ask_credentials(): click.echo(''' We need your GitHub credentials to create an access token to be stored in AerisCloud. Your credentials won't be stored anywhere. The access token will allow AerisCloud to access your private repositories as well as those owned by any organization you might be a member of. You can revoke this access token at any time by going to this URL: %s ''' % (click.style('https://github.com/settings/applications', bold=True))) user = None pwd = None while not user: user = click.prompt('Github username') while not pwd: pwd = click.prompt('Github password', hide_input=True) return user, pwd
def login(): # Import in here for performance reasons import webbrowser # TODO: use Oauth and a local webserver: https://community.auth0.com/questions/6501/authenticating-an-installed-cli-with-oidc-and-a-th url = api.app_url + '/profile' # TODO: google cloud SDK check_browser.py launched = webbrowser.open_new_tab(url) if launched: click.echo( 'Opening [{0}] in a new tab in your default browser.'.format(url)) else: click.echo("You can find your API keys here: {0}".format(url)) key = click.prompt("{warning} Paste an API key from your profile".format( warning=click.style("Not authenticated!", fg="red")), value_proc=lambda x: x.strip()) if key: # TODO: get the username here... # username = api.viewer().get('entity', 'models') write_netrc(api.api_url, "user", key)
def choose_license(licenses, author, year): click.echo("Found the following matching licenses:") click.echo( green( "\n".join( [ '{index}: {name}'.format(index=index + 1, name=lic.name) for index, lic in enumerate(licenses) ] ) ) ) choice = click.prompt( "Please choose which one you'd like to add", default=1, type=click.IntRange(1, len(licenses)) ) return licenses[choice - 1]
def login(ctx, username, api_key): """Logs the user in by asking for username and api_key """ if username is None: username = click.prompt('Username (netID)') click.echo() if api_key is None: click.echo('Please get your API key from ' + click.style(_api_key_url, underline=True)) api_key = click.prompt('API key') click.echo() click.echo('Checking your credentials...', nl=False) client = ApiClient(api_server_url=settings.API_SERVER_URL, username=username, api_key=api_key) try: client.test_api_key() except ApiClientAuthenticationError: click.secho('invalid', bg='red', fg='black') click.echo('Please try again.') ctx.exit(code=exit_codes.OTHER_FAILURE) else: click.secho('OK', bg='green', fg='black') user = User(username=username, api_key=api_key) save_user(user)
def prompt_choices(choices): """Displays a prompt for the given choices :param list choices: the choices for the user to choose from :rtype: int :returns: the index of the chosen choice """ assert len(choices) > 1 for i in range(len(choices)): click.echo('{number}) {choice}'.format( number=i + 1, choice=choices[i] )) value = click.prompt('1-{}'.format(len(choices)), type=int) - 1 if value < 0 or value >= len(choices): raise click.ClickException('Invalid choice.') return value
def get_gh_api(): if not os.environ.get("OSXSTRAP_GITHUB_USERNAME"): username = click.prompt('Please enter your Github username') common.set_dotenv_key("OSXSTRAP_GITHUB_USERNAME", username) else: username = os.environ.get("OSXSTRAP_GITHUB_USERNAME") if not os.environ.get("OSXSTRAP_GITHUB_API_TOKEN"): token = click.prompt('Please create a Github access token by going to https://github.com/settings/tokens/new?scopes=gist&description=osxstrap+gist+cli and enter it here') common.set_dotenv_key("OSXSTRAP_GITHUB_API_TOKEN", token) else: token = os.environ.get("OSXSTRAP_GITHUB_API_TOKEN") gh = login(token=token) return gh files = { config_filename : { 'content': 'What... is the air-speed velocity of an unladen swallow?' } } gist = gh.create_gist('Answer this to cross the bridge', files, public=False) # gist == <Gist [gist-id]> print(gist.html_url) system_username = common.run('whoami', capture=True)
def allow(ctx, foreign_account, permission, weight, threshold, account): """ Add a key/account to an account's permission """ if not foreign_account: from bitsharesbase.account import PasswordKey pwd = click.prompt( "Password for Key Derivation", hide_input=True, confirmation_prompt=True ) foreign_account = format( PasswordKey(account, pwd, permission).get_public(), "BTS" ) pprint(ctx.bitshares.allow( foreign_account, weight=weight, account=account, permission=permission, threshold=threshold ))
def unlock(f): """ This decorator will unlock the wallet by either asking for a passphrase or taking the environmental variable ``UNLOCK`` """ @click.pass_context def new_func(ctx, *args, **kwargs): if not ctx.obj.get("unsigned", False): if ctx.bitshares.wallet.created(): if "UNLOCK" in os.environ: pwd = os.environ["UNLOCK"] else: pwd = click.prompt("Current Wallet Passphrase", hide_input=True) ctx.bitshares.wallet.unlock(pwd) else: click.echo("No wallet installed yet. Creating ...") pwd = click.prompt("Wallet Encryption Passphrase", hide_input=True, confirmation_prompt=True) ctx.bitshares.wallet.create(pwd) return ctx.invoke(f, *args, **kwargs) return update_wrapper(new_func, f)
def set_config_file(config_file=DEFAULT_CONFIG_FILE): while True: url = click.prompt('Please enter the SLR base URL (e.g. https://slo-metrics.example.com)') with Action('Checking {}..'.format(url)): requests.get(url, timeout=5, allow_redirects=False) zmon_url = click.prompt('Please enter the ZMON URL (e.g. https://demo.zmon.io)') with Action('Checking {}..'.format(zmon_url)): requests.get(zmon_url, timeout=5, allow_redirects=False) break data = { 'url': url, 'zmon_url': zmon_url, } fn = os.path.expanduser(config_file) with Action('Writing configuration to {}..'.format(fn)): with open(fn, 'w') as fd: json.dump(data, fd) return data
def cli(): # Show the user the intro message get_intro() # Prompt the user to enter a news source news_source = click.prompt("Please select a news source by entering its number or press enter for BBC", default=1) # Dictionary containing the news sources news_dict = get_news_sources() if news_source in news_dict.keys(): get_news_articles(news_dict[news_source]) else: click.echo(click.style("Wrong input try again", fg='red')) # Return the news sources
def upload_firebase(self): data={} category = [] cat = session.query(Category).all() for x in cat: qry = session.query(Items).filter(Items.category_id==x.id) data[x.category]=[d.items for d in qry] name = click.prompt(click.style('Please enter your username:', fg='cyan', bold=True)) print Fore.GREEN + 'Syncing..... ' jsonData = firebase.put( '/todo-cli', name, data) if jsonData: print Fore.CYAN + 'Done!' exit() else: print 'Try again'
def get_firebase(self): name = click.prompt(click.style('Please enter username?', fg = 'cyan' , bold=True)) jsonData = firebase.get('/todo-cli' , name) for k in jsonData: #inserting category to database category = Category(category=k) session.add(category) session.commit() for i in jsonData[k]: s = select([Category]) result = s.execute() for r in result: if r[1] == k: data = Items(category_id=r[0], items=i) session.add(data) session.commit() session.close() click.secho('Done', fg = 'yellow' , bold=True)
def start_todo(): # click.echo(click.style("\n[1] Create new list of todo.", fg ='white', bold = True)) # click.echo(click.style("[2] Add items to your category", fg ='white', bold = True)) # click.echo(click.style("[q] Quit.", fg ='white', bold = True)) choice = str(click.prompt(click.style('>>>', fg = 'cyan', bold = True))) while choice != 'q': # Respond to the user's choice. if 'create todo' in choice: print 'creating' elif 'open todo' in choice: print 'openning' elif 'list todo' in choice: print 'listing' elif choice: exit_todo() else: click.echo("\nTry again\n") ###LOOPING FUNCTIONS ###
def open_todo(name): os.system('clear') todo_title() s = select([Category]) result = s.execute() for r in result: if r[1] == name: q = session.query(Items).filter(Items.category_id==r[0]).all() click.secho(name.upper(), fg='cyan', bold=True, underline=True) for i in q: click.secho('>>>' + i.items, fg='white', bold=True ) # else: # click.secho('oops list {} doesnt exist'.format(name), fg='white', bold=True ) item = click.prompt(click.style('>>', fg='green', bold=True)) if item == 'done': list_todo(name) else: add_todo(name, item)
def parse_cli_args(self): ''' Command line argument processing ''' tag_help = '''Skip to various parts of install valid tags include: - vms (create vms for adding nodes to cluster or CNS/CRS) - node-setup (install the proper packages on the CNS/CRS nodes) - heketi-setup (install heketi and config on the crs master/CRS ONLY) - heketi-ocp (install the heketi secret and storage class on OCP/CRS ONLY) - clean (remove vms and unregister them from RHN also remove storage classes or secrets''' parser = argparse.ArgumentParser(description='Add new nodes to an existing OCP deployment', formatter_class=RawTextHelpFormatter) parser.add_argument('--node_type', action='store', default='app', help='Specify the node label: app, infra, storage') parser.add_argument('--node_number', action='store', default='1', help='Specify the number of nodes to add') parser.add_argument('--create_inventory', action='store_true', help='Helper script to create json inventory file and exit') parser.add_argument('--no_confirm', default=None, help='Skip confirmation prompt') parser.add_argument('--tag', default=None, help=tag_help) parser.add_argument('--verbose', default=None, action='store_true', help='Verbosely display commands') self.args = parser.parse_args() self.verbose = self.args.verbose
def select_one_song(songs): """Display the songs returned by search api. :params songs: API['result']['songs'] :return: a Song object. """ if len(songs) == 1: select_i = 0 else: table = PrettyTable(['Sequence', 'Song Name', 'Artist Name']) for i, song in enumerate(songs, 1): table.add_row([i, song['name'], song['ar'][0]['name']]) click.echo(table) select_i = click.prompt('Select one song', type=int, default=1) while select_i < 1 or select_i > len(songs): select_i = click.prompt('Error Select! Select Again', type=int) song_id, song_name = songs[select_i-1]['id'], songs[select_i-1]['name'] song = Song(song_id, song_name) return song
def select_one_album(albums): """Display the albums returned by search api. :params albums: API['result']['albums'] :return: a Album object. """ if len(albums) == 1: select_i = 0 else: table = PrettyTable(['Sequence', 'Album Name', 'Artist Name']) for i, album in enumerate(albums, 1): table.add_row([i, album['name'], album['artist']['name']]) click.echo(table) select_i = click.prompt('Select one album', type=int, default=1) while select_i < 1 or select_i > len(albums): select_i = click.prompt('Error Select! Select Again', type=int) album_id = albums[select_i-1]['id'] album_name = albums[select_i-1]['name'] album = Album(album_id, album_name) return album
def select_one_artist(artists): """Display the artists returned by search api. :params artists: API['result']['artists'] :return: a Artist object. """ if len(artists) == 1: select_i = 0 else: table = PrettyTable(['Sequence', 'Artist Name']) for i, artist in enumerate(artists, 1): table.add_row([i, artist['name']]) click.echo(table) select_i = click.prompt('Select one artist', type=int, default=1) while select_i < 1 or select_i > len(artists): select_i = click.prompt('Error Select! Select Again', type=int) artist_id = artists[select_i-1]['id'] artist_name = artists[select_i-1]['name'] artist = Artist(artist_id, artist_name) return artist
def select_one_playlist(playlists): """Display the playlists returned by search api or user playlist. :params playlists: API['result']['playlists'] or API['playlist'] :return: a Playlist object. """ if len(playlists) == 1: select_i = 0 else: table = PrettyTable(['Sequence', 'Name']) for i, playlist in enumerate(playlists, 1): table.add_row([i, playlist['name']]) click.echo(table) select_i = click.prompt('Select one playlist', type=int, default=1) while select_i < 1 or select_i > len(playlists): select_i = click.prompt('Error Select! Select Again', type=int) playlist_id = playlists[select_i-1]['id'] playlist_name = playlists[select_i-1]['name'] playlist = Playlist(playlist_id, playlist_name) return playlist
def select_one_user(users): """Display the users returned by search api. :params users: API['result']['userprofiles'] :return: a User object. """ if len(users) == 1: select_i = 0 else: table = PrettyTable(['Sequence', 'Name']) for i, user in enumerate(users, 1): table.add_row([i, user['nickname']]) click.echo(table) select_i = click.prompt('Select one user', type=int, default=1) while select_i < 1 or select_i > len(users): select_i = click.prompt('Error Select! Select Again', type=int) user_id = users[select_i-1]['userId'] user_name = users[select_i-1]['nickname'] user = User(user_id, user_name) return user
def listen(self): # Default value. # TODO: Input plugin system goes here # Plugins should be used for getting input so users can build their # own custom front-end to the application. # This will allow the speech to text functions to be moved out of the # main class and into their own plugins. if self.input_system == 'google': self.question = self._google_stt() else: self.question = click.prompt("Input your query") # TODO: Loading plugin system goes here # Eventually I want people to be able to build custom behavior # that executes when the response is being fetched. So there could # be a loading screen that plays music or something for example. click.echo("Wait for response...") # Get a response from the AI using the api interface. self.ai.get_response(self.question) # Clean up the response and turn it into a Python object that can be read self.response = self.ai.parse() return self.response
def allow(ctx, foreign_account, permission, weight, threshold, account): """ Add a key/account to an account's permission """ if not foreign_account: from decentbase.account import PasswordKey pwd = click.prompt( "Password for Key Derivation", hide_input=True, confirmation_prompt=True ) foreign_account = format( PasswordKey(account, pwd, permission).get_public(), "DCT" ) pprint(ctx.decent.allow( foreign_account, weight=weight, account=account, permission=permission, threshold=threshold ))
def unlockWallet(f): @click.pass_context def new_func(ctx, *args, **kwargs): if not ctx.obj.get("unsigned", False): if ctx.decent.wallet.created(): if "UNLOCK" in os.environ: pwd = os.environ["UNLOCK"] else: pwd = click.prompt("Current Wallet Passphrase", hide_input=True) ctx.decent.wallet.unlock(pwd) else: click.echo("No wallet installed yet. Creating ...") pwd = click.prompt("Wallet Encryption Passphrase", hide_input=True, confirmation_prompt=True) ctx.decent.wallet.create(pwd) return ctx.invoke(f, *args, **kwargs) return update_wrapper(new_func, f)
def init_app(): """ create a new app, or sync with an existing one """ cwd = os.path.abspath('.') try: repo = git.Repo.discover(cwd) except NotGitRepository: click.echo("No repository found") sys.exit(1) suggested_name = init.get_default_name(cwd) if suggested_name is None: click.echo("Unable to derive a name from the current git repository or directory!") sys.exit(2) name = click.prompt("Enter environment name".format(suggested_name), default=suggested_name) init.initialize(name, repo.head())
def main(): """pyFootball v.1 Early Alpha A python based football management simulator, very similar to and heavily based on Football Manager created by Sports Interactive. To start or load a game, run this file without any parameters. Coded by Justin Auger http://justnaugr.github.io\n Credits to Click for the great CLI. """ start = click.prompt('Would you like to start a new game or load a saved game?', type=click.Choice(['new','load'])) if start=='new': new_game() elif start=='load': load_game()
def connect(self, kwargs): try: # get config config = self.get_connect_config(kwargs) typ = config['type'] # check for subtype if ':' in typ: typ = config['type'].split(':', 1)[0] cls = ngage.plugin.get_plugin_class(typ) drv = cls(config) self.log.debug("connecting to {host}".format(**config)) drv.open() return drv except AuthenticationError: config['password'] = click.prompt('password', hide_input=True) if config['password']: return connect(config) raise