我们从Python开源项目中,提取了以下49个代码示例,用于说明如何使用click.style()。
def print_project_status(ctx): config = ctx.obj.config solution_num = ctx.obj.solution_num project_status = gather_project_status(ctx) # Print out a 'pretty' message showing project status, first up some project details click.secho("Project Details", bold=True) click.echo(" Project Name: " + config["project_name"]) click.echo(" Number of solutions generated: " + str(solution_num)) click.echo(" Latest Solution Folder: '" + config["project_name"] + "/solution" + str(solution_num) + "'") click.echo(" Language Choice: " + config["language"]) # And now details about what builds have been run/are passing. # This section uses lots (too many!) 'conditional expressions' to embed formatting into the output. click.secho("Build Status", bold=True) click.echo(" C Simulation: " + (click.style("Pass", fg='green') if "csim_pass" in project_status else (click.style("Fail", fg='red') if "csim_fail" in project_status else (click.style("Run (Can't get status)", fg='yellow') if "csim_done" in project_status else click.style("Not Run", fg='yellow'))))) click.echo(" C Synthesis: " + (click.style("Run", fg='green') if "syn_done" in project_status else click.style("Not Run", fg='yellow'))) click.echo(" Cosimulation: " + (click.style("Pass", fg='green') if "cosim_pass" in project_status else (click.style("Fail", fg='red') if "cosim_fail" in project_status else (click.style("Run (Can't get status)", fg='yellow') if "cosim_done" in project_status else click.style("Not Run", fg='yellow'))))) click.echo(" Export:" ) click.echo(" IP Catalog: " + (click.style("Run", fg='green') if "export_ip_done" in project_status else click.style("Not Run", fg='yellow'))) click.echo(" System Generator: " + (click.style("Run", fg='green') if "export_sysgen_done" in project_status else click.style("Not Run", fg='yellow'))) click.echo(" Export Evaluation: " + (click.style("Run", fg='green') if "evaluate_done" in project_status else click.style("Not Run", fg='yellow'))) ### Click Command Definitions ### # Report Command
def run(self): click.echo('{now}\n{bottery} version {version}'.format( now=datetime.now().strftime('%B %m, %Y - %H:%M:%S'), bottery=click.style('Bottery', fg='green'), version=bottery.__version__ )) self.loop.run_until_complete(self.configure()) if self._server is not None: handler = self.server.make_handler() setup_server = self.loop.create_server(handler, '0.0.0.0', 7000) self.loop.run_until_complete(setup_server) click.echo('Server running at http://localhost:7000') if not self.tasks: click.secho('No tasks found.', fg='red') self.stop() sys.exit(1) for task in self.tasks: self.loop.create_task(task()) click.echo('Quit the bot with CONTROL-C') self.loop.run_forever()
def metrics(period, resolution, tag, threads, metricsfile): try: pool = Pool(processes=threads) period_seconds = period * 3600 resolution_seconds = resolution * 3600 if metricsfile: with open(metricsfile) as fp: m = json.loads(fp.read().replace('][', ',')) else: m = metrics_api.get_tag_metrics(tag_name=tag, **context.settings) click.echo(click.style('Found: %s metrics', fg='green') % (len(m))) expire = partial(_expire_metric_path, period_seconds, resolution_seconds, tag) expired_paths = tqdm(pool.imap_unordered(expire, m)) expired_paths = sum(filter(None, expired_paths)) click.echo(click.style('Expired: %s metric paths', fg='green') % expired_paths) except Exception, e: print 'Cleanup metrics failed. %s' % e finally: pool.terminate() pool.join()
def check_name(ctx, param, value): """ Checks the project name to be a valid name :param ctx: app context :param param: parameter :param value: the parameter value :return: """ regex = re.compile(r'^[a-zA-Z_]*') if regex.match(value): return value while True: try: click.echo( click.style("{}".format(value), bold=True) + ' is not a valid python package name. try something else', err=True) value = param.prompt_for_value(ctx) if regex.match(value): return value continue except (KeyboardInterrupt, EOFError): raise ctx.Abort()
def check_template(ctx, _, value): """ Checks the template to be valid template :param ctx: app context :param value: the parameter value :return: """ if not value: # TODO: get list and show raise ctx.abort() else: url = urlparse(value) if not url.netloc: url = url._replace(netloc='github.com') if url.path[-4:] == '.git': url = url._replace(path=url.path[:-4]) path = os.path.join(os.environ['HOME'], '.flactory/templates', url.netloc, url.path) if os.path.isdir(path): return path # TODO: if not exist pull it automatically repr_name = click.style("[{}] {}".format(url.netloc, url.path[:-4]), bold=True) raise ctx.fail(repr_name + " doesn't exist.")
def list_boards(self): """Return a list with all the supported boards""" # Print table click.echo('\nSupported boards:\n') BOARDLIST_TPL = ('{board:22} {fpga:20} {type:<5} {size:<5} {pack:<10}') terminal_width, _ = click.get_terminal_size() click.echo('-' * terminal_width) click.echo(BOARDLIST_TPL.format( board=click.style('Board', fg='cyan'), fpga='FPGA', type='Type', size='Size', pack='Pack')) click.echo('-' * terminal_width) for board in self.boards: fpga = self.boards[board]['fpga'] click.echo(BOARDLIST_TPL.format( board=click.style(board, fg='cyan'), fpga=fpga, type=self.fpgas[fpga]['type'], size=self.fpgas[fpga]['size'], pack=self.fpgas[fpga]['pack'])) click.secho(BOARDS_MSG, fg='green')
def list_fpgas(self): """Return a list with all the supported FPGAs""" # Print table click.echo('\nSupported FPGAs:\n') FPGALIST_TPL = ('{fpga:30} {type:<5} {size:<5} {pack:<10}') terminal_width, _ = click.get_terminal_size() click.echo('-' * terminal_width) click.echo(FPGALIST_TPL.format( fpga=click.style('FPGA', fg='cyan'), type='Type', size='Size', pack='Pack')) click.echo('-' * terminal_width) for fpga in self.fpgas: click.echo(FPGALIST_TPL.format( fpga=click.style(fpga, fg='cyan'), type=self.fpgas[fpga]['type'], size=self.fpgas[fpga]['size'], pack=self.fpgas[fpga]['pack']))
def pick_item(zot, item_id): if not ID_PAT.match(item_id): items = tuple(zot.search(item_id)) if len(items) > 1: click.echo("Multiple matches available.") item_descriptions = [] for it in items: desc = click.style(it.title, fg='blue') if it.creator: desc = click.style(it.creator + u': ', fg="cyan") + desc if it.date: desc += click.style(" ({})".format(it.date), fg='yellow') item_descriptions.append(desc) return select(zip(items, item_descriptions)).key elif items: return items[0].key else: raise ValueError("Could not find any items for the query.") return item_id
def _failures_to_str(fails, no_fails): def _r(l): for msg, count, expected_count in l: res.append(' {}: {}'.format(msg, count)) if expected_count is not None: res.append(' (expected {})'.format(expected_count)) res.append('\n') res = [] if no_fails: res.append(click.style( 'messages that did not cause failure:\n', bold=True)) _r(no_fails) if fails: res.append(click.style( 'messages that caused failure:\n', bold=True)) _r(fails) return ''.join(res)
def add_monitor(ctx, name, uri, location, frequency, email, validation_string, bypass_head_request, verify_ssl, redirect_is_failure, sla_threshold, raw): if validation_string: # We must bypass head request we're to validate string. bypass_head_request = True with Spinner('Creating monitor: ', remove_message=raw): status, message, monitor = newrelic.create_monitor( ctx.obj['ACCOUNT'], name, uri, frequency, location, email, validation_string, bypass_head_request, verify_ssl, redirect_is_failure, sla_threshold) if raw: print(json.dumps(monitor)) return if status == 0: print(click.style(u'OK', fg='green', bold=True)) print('Monitor: ' + message) else: print(click.style(u'Error', fg='red', bold=True)) raise click.ClickException(message)
def delete_monitor(ctx, monitor, confirm): if not confirm: confirm = click.prompt(''' ! WARNING: Destructive Action ! This command will destroy the monitor: {monitor} ! To proceed, type "{monitor}" or re-run this command with --confirm={monitor} '''.format(monitor=monitor), prompt_suffix='> ') if confirm.strip() != monitor: print('abort') sys.exit(1) with Spinner('Deleting monitor {}: '.format(monitor), remove_message=False): newrelic.delete_monitor(ctx.obj['ACCOUNT'], monitor) print(click.style(u'OK', fg='green', bold=True))
def get_vm_options(): """ Get user-selected config options for the 21 VM. """ logger.info(click.style("Configure 21 virtual machine.", fg=TITLE_COLOR)) logger.info("Press return to accept defaults.") default_disk = Two1MachineVirtual.DEFAULT_VDISK_SIZE default_memory = Two1MachineVirtual.DEFAULT_VM_MEMORY default_port = Two1MachineVirtual.DEFAULT_SERVICE_PORT disk_size = click.prompt(" Virtual disk size in MB (default = %s)" % default_disk, type=int, default=default_disk, show_default=False) vm_memory = click.prompt(" Virtual machine memory in MB (default = %s)" % default_memory, type=int, default=default_memory, show_default=False) server_port = click.prompt(" Port for micropayments server (default = %s)" % default_port, type=int, default=default_port, show_default=False) return VmConfiguration(disk_size=disk_size, vm_memory=vm_memory, server_port=server_port)
def get_zerotier_address(marketplace): """ Gets the zerotier IP address from the given marketplace name Args: marketplace (str): name of the marketplace network to lookup ip Returns: str: a string representation of the zerotier IP address Raises: UnloggedException: if the zt network doesn't exist """ logger.info("You might need to enter your superuser password.") address = zerotier.get_address(marketplace) if not address: join_cmd = click.style("21 join", bold=True, reset=False) no_zt_network = click.style( "You are not part of the {}. Use {} to join the market.", fg="red") raise UnloggedException(no_zt_network.format(marketplace, join_cmd)) return address
def add(ctx, service_name, docker_image_name): """ Add a new service to 21 sell \b Adding a new service $ 21 sell add <service_name> <docker_image_name> """ manager = ctx.obj['manager'] logger.info(click.style("Adding services.", fg=cli_helpers.TITLE_COLOR)) def service_successfully_added_hook(tag): cli_helpers.print_str(tag, ["Added"], "TRUE", True) def service_already_exists_hook(tag): cli_helpers.print_str(tag, ["Already exists"], "FALSE", False) def service_failed_to_add_hook(tag): cli_helpers.print_str(tag, ["Failed to add"], "FALSE", False) manager.add_service(service_name, docker_image_name, service_successfully_added_hook, service_already_exists_hook, service_failed_to_add_hook)
def _leave(client, network): """Join the given network. Args: client (two1.server.rest_client.TwentyOneRestClient) an object for sending authenticated requests to the TwentyOne backend network (str): the name of the network being joined. Defaults to 21mkt """ # ensures the zerotier daemon is running zerotier.start_daemon() try: nwid = zerotier.get_network_id(network) except KeyError: logger.info('not in network') return {'left': False, 'reason': 'not in network'} try: zerotier.leave_network(nwid) logger.info(uxstring.UxString.successful_leave.format(click.style(network, fg="magenta"))) return {'left': True} except subprocess.CalledProcessError as e: logger.info(str(e)) return {'left': False, 'reason': str(e)}
def get_or_create_wallet(wallet_path): """Create a new wallet or return the currently existing one.""" data_provider = twentyone_provider.TwentyOneProvider(two1.TWO1_PROVIDER_HOST) if wallet.Two1Wallet.check_wallet_file(wallet_path): return wallet.Wallet(wallet_path=wallet_path, data_provider=data_provider) # configure wallet with default options click.pause(uxstring.UxString.create_wallet) wallet_options = dict(data_provider=data_provider, wallet_path=wallet_path) if not wallet.Two1Wallet.configure(wallet_options): raise click.ClickException(uxstring.UxString.Error.create_wallet_failed) # Display the wallet mnemonic and tell user to back it up. # Read the wallet JSON file and extract it. with open(wallet_path, 'r') as f: wallet_config = json.load(f) mnemonic = wallet_config['master_seed'] click.pause(uxstring.UxString.create_wallet_done % click.style(mnemonic, fg='green')) if wallet.Two1Wallet.check_wallet_file(wallet_path): return wallet.Wallet(wallet_path=wallet_path, data_provider=data_provider)
def click_echo_kvp(key, value, padding=20, color='green'): """ Helper function for pretty printing key value pairs in click :param key: item key :type key: str :param value: item value :type value: any :param padding: key padding :type padding: int :param color: key color (ANSI compliant) :type color: str :returns: proper `click.echo` call to stdout :rtype: None """ return click.echo( click.style('{key:<{padding}}'.format( key=key + ':', padding=padding ), fg=color) + str(value) )
def authenticate_account(get_auth_url=False, code=None, for_business=False): if for_business: error(translator['od_pref.authenticate_account.for_business_unsupported']) return authenticator = od_auth.OneDriveAuthenticator() click.echo(translator['od_pref.authenticate_account.permission_note']) if code is None: click.echo(translator['od_pref.authenticate_account.paste_url_note']) click.echo('\n' + click.style(authenticator.get_auth_url(), underline=True) + '\n') if get_auth_url: return click.echo(translator['od_pref.authenticate_account.paste_url_instruction'].format( redirect_url=click.style(authenticator.APP_REDIRECT_URL, bold=True))) url = click.prompt(translator['od_pref.authenticate_account.paste_url_prompt'], type=str) code = extract_qs_param(url, 'code') if code is None: error(translator['od_pref.authenticate_account.error.code_not_found_in_url']) return try: authenticator.authenticate(code) success(translator['od_pref.authenticate_account.success.authorized']) save_account(authenticator) except Exception as e: error(translator['od_pref.authenticate_account.error.authorization'].format(error_message=str(e)))
def print_saved_drives(): click.echo(click.style('Drives that have been set up:', bold=True)) all_drive_ids = context.all_drives() if len(all_drive_ids) > 0: click.echo() for i, drive_id in enumerate(all_drive_ids): curr_drive_config = context.get_drive(drive_id) curr_account = context.get_account(curr_drive_config.account_id) click.echo(' ' + click.style('#%d - Drive "%s":' % (i, curr_drive_config.drive_id), underline=True)) click.echo(' Account: %s (%s)' % (curr_account.account_email, curr_drive_config.account_id)) click.echo(' Local root: %s' % curr_drive_config.localroot_path) click.echo(' Ignore file: %s' % curr_drive_config.ignorefile_path) click.echo() else: click.echo(' No Drive has been setup with onedrived.\n') return all_drive_ids
def debug_request(request): def request_echo(fmt, *args): click.echo(click.style('> ', fg='blue') + expand_args(fmt, args)) headers = request.headers headers['host'] = urlparse.urlparse(request.url).hostname request_echo(click.style('%s %s HTTP/1.1', bold=True), request.method, request.path_url) for key, value in sorted(headers.items()): request_echo('%s: %s', key.title(), value) if request.body: body_text = request.body if isinstance(body_text, bytes): body_text = body_text.decode('utf-8') request_echo('') for line in body_text.splitlines(): request_echo(line)
def package(self, output=None): """ Only build the package """ # Make sure we're in a venv. self.check_venv() # force not to delete the local zip self.override_stage_config_setting('delete_local_zip', False) # Execute the prebuild script if self.prebuild_script: self.execute_prebuild_script() # Create the Lambda Zip self.create_package(output) self.callback('zip') size = human_size(os.path.getsize(self.zip_path)) click.echo(click.style("Package created", fg="green", bold=True) + ": " + click.style(self.zip_path, bold=True) + " (" + size + ")")
def collision_warning(self, item): """ Given a string, print a warning if this could collide with a Zappa core package module. Use for app functions and events. """ namespace_collisions = [ "zappa.", "wsgi.", "middleware.", "handler.", "util.", "letsencrypt.", "cli." ] for namespace_collision in namespace_collisions: if namespace_collision in item: click.echo(click.style("Warning!", fg="red", bold=True) + " You may have a namespace collision with " + click.style(item, bold=True) + "! You may want to rename that file.")
def decide_rule(rule, controller, idbased): click.secho('we could extend DAG with rule', fg='blue') if idbased: rule = next(x for x in controller.adageobj.rules if x.identifier == rule) click.secho('rule: {}/{} ({})'.format(rule.offset, rule.rule.name, rule.identifier)) # IPython.embed() try: resp = raw_input(click.style("Shall we? (y/N) ", fg='blue')) except NameError: resp = input(click.style("Shall we? (y/N) ", fg='blue')) shall = resp.lower() == 'y' if shall: click.secho('ok we will extend.', fg='green') else: click.secho('maybe another time... your response: {}'.format( resp), fg='yellow') return shall
def decide_step(node, controller, idbased): if idbased: node = controller.adageobj.dag.getNode(node) click.echo('we could submit a DAG node {}'.format(node)) # IPython.embed() try: resp = raw_input(click.style("Shall we? (y/N) ", fg='magenta')) except NameError: resp = input(click.style("Shall we? (y/N) ", fg='magenta')) shall = resp.lower() == 'y' if shall: click.secho('ok we will submit.', fg='green') else: click.secho('will not submit for now... your response: {}'.format( resp), fg='yellow') return shall
def cli(ctx, dirpath, pythonversion, timeout): ctx.obj = conf( python_version=pythonversion, dir_path=dirpath or getcwd(), timeout=timeout ) cireqs.set_log_level(click_log.get_level()) splash = r""" o8o `"' .ooooo. oooo oooo d8b .ooooo. .ooooo oo .oooo.o d88' `"Y8 `888 `888""8P d88' `88b d88' `888 d88( "8 888 888 888 888ooo888 888 888 `"Y88b. 888 .o8 888 888 888 .o 888 888 o. )88b `Y8bod8P' o888o d888b `Y8bod8P' `V8bod888 8""888P' 888. 8P' """ splash = click.style(splash, fg='green') + click.style( "v{}".format(cireqs.__version__.__version__), fg='red') + "\n" if ctx.invoked_subcommand is None: click.echo("\n".join([splash, ctx.get_help()])) return
def _style(enabled, text, **kwargs): """ Helper function to enable/disable styled output text. Args: enable (bool): Turn on or off styling. text (string): The string that should be styled. kwargs (dict): Parameters that are passed through to click.style Returns: string: The input with either the styling applied (enabled=True) or just the text (enabled=False) """ if enabled: return click.style(text, **kwargs) else: return text
def follow(ctx, nick, url, force): """Add a new source to your followings.""" source = Source(nick, url) sources = ctx.obj['conf'].following if not force: if source.nick in (source.nick for source in sources): click.confirm("? You’re already following {0}. Overwrite?".format( click.style(source.nick, bold=True)), default=False, abort=True) _, status = get_remote_status([source])[0] if not status or status.status_code != 200: click.confirm("? The feed of {0} at {1} is not available. Follow anyway?".format( click.style(source.nick, bold=True), click.style(source.url, bold=True)), default=False, abort=True) ctx.obj['conf'].add_source(source) click.echo("? You’re now following {0}.".format( click.style(source.nick, bold=True)))
def unfollow(ctx, nick): """Remove an existing source from your followings.""" source = ctx.obj['conf'].get_source_by_nick(nick) try: with Cache.discover() as cache: cache.remove_tweets(source.url) except OSError as e: logger.debug(e) ret_val = ctx.obj['conf'].remove_source_by_nick(nick) if ret_val: click.echo("? You’ve unfollowed {0}.".format( click.style(source.nick, bold=True))) else: click.echo("? You’re not following {0}.".format( click.style(nick, bold=True)))
def style_tweet(tweet, porcelain=False): conf = click.get_current_context().obj["conf"] limit = conf.character_limit if porcelain: return "{nick}\t{url}\t{tweet}".format( nick=tweet.source.nick, url=tweet.source.url, tweet=str(tweet)) else: if sys.stdout.isatty() and not tweet.text.isprintable(): return None styled_text = format_mentions(tweet.text) len_styling = len(styled_text) - len(click.unstyle(styled_text)) final_text = textwrap.shorten(styled_text, limit + len_styling) if limit else styled_text timestamp = tweet.absolute_datetime if conf.use_abs_time else tweet.relative_datetime return "? {nick} ({time}):\n{tweet}".format( nick=click.style(tweet.source.nick, bold=True), tweet=final_text, time=click.style(timestamp, dim=True))
def style_source_with_status(source, status, porcelain=False): if porcelain: return "{nick}\t{url}\t{status}\t{content_length}\t{last_modified}".format( nick=source.nick, url=source.url, status=status.status_code, content_length=status.content_length, last_modified=status.last_modified) else: if status.status_code == 200: scolor, smessage = "green", str(status.status_code) elif status: scolor, smessage = "red", str(status.status_code) else: scolor, smessage = "red", "ERROR" return "? {nick} @ {url} [{content_length}, {last_modified}] ({status})".format( nick=click.style(source.nick, bold=True, fg=scolor), url=source.url, status=click.style(smessage, fg=scolor), content_length=status.natural_content_length, last_modified=status.natural_last_modified)
def cli(packages, virtualenv, python, use_ipython, shell, keep, use_editor, tmpdir, index): # pylint: disable=too-many-arguments """Easily try out python packages.""" if not packages: raise click.BadArgumentUsage("At least one package is required.") if not shell and use_ipython: shell = "ipython" click.echo("==> Use python {0}".format(click.style(python, bold=True))) if shell: click.echo("==> Use shell {0}".format(click.style(shell, bold=True))) click.echo("[*] Downloading packages: {0}".format(click.style(",".join(p.url for p in packages), bold=True))) try: envdir = try_packages(packages, virtualenv, python, shell, use_editor, keep, tmpdir, index) except TryError as error: click.secho("[*] {0}".format(error), fg="red") sys.exit(1) if keep: click.echo("==> Have a look at the try environment at: {0}".format(envdir))
def _github_ask_credentials(): click.echo(''' We need your GitHub credentials to create an access token to be stored in AerisCloud. Your credentials won't be stored anywhere. The access token will allow AerisCloud to access your private repositories as well as those owned by any organization you might be a member of. You can revoke this access token at any time by going to this URL: %s ''' % (click.style('https://github.com/settings/applications', bold=True))) user = None pwd = None while not user: user = click.prompt('Github username') while not pwd: pwd = click.prompt('Github password', hide_input=True) return user, pwd
def _show_service_list(service_list, enabled_services): """ Show the service list, coloring enabled services, return the list of non default services TODO: find a better command name, as it doesn't just show services """ if not enabled_services: enabled_services = [] service_complete = {} for service, service_info in sorted(service_list.iteritems()): if not service_info['default']: service_complete[service] = service_info['description'] if service in enabled_services: click.secho('* %s - %s' % ( style(service, bold=True, fg='green'), service_info['description'] )) else: click.secho('* %s - %s' % ( style(service, bold=True), service_info['description'] )) return service_complete
def _up(box, make): click.echo('\nStarting box %s\n' % style(box.name(), bold=True)) if not box.is_running(): start_box(box) else: box.vagrant('provision') if make: click.echo('\nRunning make %s in your box\n' % style(make, bold=True)) if box.ssh_shell('make %s' % make) != 0: fatal('make command failed!') if sync(box, 'down') is False: # sync failed, message should already be displayed, exit sys.exit(1)
def _clone_project(project_name): gh = Github() (gh_repo, forked) = gh.get_repo(project_name, fork=True) if not gh_repo: click.echo('error: no repository named %s was found on ' 'your user and configured organizations' % click.style(project_name, bold=True)) sys.exit(1) if forked: click.echo('We found a repository named %s and forked it to your ' 'user, it now accessible at the following url:\n' '%s' % (forked.full_name, gh_repo.html_url)) dest_path = os.path.join(projects_path(), project_name) click.echo('We will clone %s in %s\n' % (gh_repo.ssh_url, dest_path)) vgit('clone', gh_repo.ssh_url, dest_path) # add our upstream remote if gh_repo.parent: repo = Repo(dest_path) repo.create_remote('upstream', gh_repo.parent.ssh_url)
def list(output_json): """ List the available base boxes """ boxes = baseboxes() if output_json: json.dump(boxes, sys.stdout) return click.secho('The following boxes are available:\n', fg='yellow') if not boxes: click.secho(' No box found', fg='cyan') sys.exit(0) for infra in boxes: click.secho('* %s:' % (infra), fg='blue', bold=True) for box in boxes[infra]: click.echo('\t* %s' % (click.style(box, fg='green')))
def cli(boxes): """ Suspend a running box, see resume """ for project, project_boxes in boxes.iteritems(): running_boxes = project_boxes.running() project_name = click.style(project.name(), fg='magenta') if not running_boxes: click.secho('No running boxes found for %s' % project_name, fg='yellow', bold=True) continue click.secho('Suspending boxes for %s' % project_name, fg='blue', bold=True) for box in running_boxes: box.suspend() click.echo(''.join([ click.style('\tBox ', fg='green'), click.style(box.name(), bold=True), click.style(' has been suspended.', fg='green') ]))
def queues(): """Report on progress by queues.""" conn = worker.connection failure_q = None def _repr(q): return "running:%d pending:%d finished:%d" % ( StartedJobRegistry(q.name, conn).count, q.count, FinishedJobRegistry(q.name, conn).count) for q in Queue.all(conn): if q.name == 'failed': failure_q = q continue click.echo("%s %s" % (q.name, _repr(q))) if failure_q: click.echo( click.style(failure_q.name, fg='red') + ' %s' % _repr(failure_q))
def log_message(self, line, character, error, message, reposition=True): """ Log a linter message to stderr, ignoring duplicates. """ if error[0] == 'E': formatted_code = click.style(error, fg='red') else: formatted_code = click.style(error, fg='yellow') if reposition: line, character = self.reposition(line, character) message = '{}:{}:{} {} {}'.format(self.filename, line, character, formatted_code, message) if message in self.logged_messages: return self.exit_code = 1 self.logged_messages.append(message) click.echo(message)
def check_api(self): reply = self.connection.check_api() if reply.status_code == 200: reply_json = reply.json() if reply_json == {'api_available': 'True'}: click.echo("nebula responding as expected") else: click.echo( click.style( "nebula api not responding, are you logged in?", fg="red")) else: click.echo( click.style( "nebula api not responding, are you logged in?", fg="red"))
def worker_accountCreator(counters): while counters[0] != counters[3]: Account = creatorQueue.get() Account.setupMailbox() Account = makeClubAccount(Account) if Account.errorState == None: logQueue.put(click.style('Created account %s.' % Account.username, fg = 'green', bold=True)) Account.save() counters[0]+=1 logQueue.put('WRITE BLYAD') tosQueue.put(Account) verifierQueue.put(Account) else: logQueue.put(False) logQueue.put(click.style('Error occured at stage %d when creating account %s' % (Account.errorState, Account.username), fg='red', bold=True)) return True
def accept(self): counter = 0 while counter != 60: emails = requests.get(self.inbox, verify=False).text if len(emails) <= 1: time.sleep(0.2) counter += 1 continue for line in emails.split('\n'): if line.startswith('https://'): try: if 'Thank you for signing up! Your account is now active.' in requests.get(line).text: self.accepted = True return True except requests.ConnectionError: time.sleep(requestSleepTimerB) if requestSleepTimerB == 5.1: logQueue.put(click.style('[Mail accepter] Received slow down warning. Using max delay of 5.1 already.', fg='red', bold=True)) else: logQueue.put(click.style('[Mail accepter] Received slow down warning. Increasing delay from %d to %d.' % (requestSleepTimerB, requestSleepTimerB+0.2), fg='red', bold=True)) requestSleepTimerB += 0.2 return False return False
def push(ctx, run, project, description, entity, force, files): # TODO: do we support the case of a run with the same name as a file? if os.path.exists(run): raise BadParameter("Run id is required if files are specified.") project, run = api.parse_slug(run, project=project) click.echo("Updating run: {project}/{run}".format( project=click.style(project, bold=True), run=run)) candidates = [] if len(files) == 0: raise BadParameter("No files specified") # TODO: Deal with files in a sub directory api.push(project, files=[f.name for f in files], run=run, description=description, entity=entity, force=force, progress=sys.stdout) #@cli.command(context_settings=CONTEXT, help="Pull files from Weights & Biases")
def add_color(value, color): return click.style('{}'.format(value), fg=color)
def format(self, record): import click s = super(ColorFormatter, self).format(record) if not record.exc_info: level = record.levelname.lower() if level in self.colors: s = click.style(s, **self.colors[level]) return s
def delete_index(index): """Delete an index""" click.clear() click.confirm( click.style( 'Really DELETE index "%s"?' % index, fg='white', bg='red'), abort=True) es.client.indices.delete(index=index) log.info('Index deleted')
def print_vulns(package, vulns): if vulns: click.secho("%s has vulnerabilities" % package, fg="green") click.echo(" Currently installed %s" % vulns[0]["installed"]) click.echo(" Latest version %s" % vulns[0]["latest"]) for vuln in vulns: click.echo(" %s is %s" % (vuln["cve"], click.style(vuln["urgency"], fg=COLOR_MAPPING[vuln["urgency"].replace("*", "")]))) if vuln["fixed"]: click.echo(" Vulnerability fixed in %s" % vuln["fixed"])
def colored_text(text, color, bold=False): """ Color text :param text: :param color: :param bold: :return: """ return click.style(str(text), fg=color, bold=bold)
def version(): """ Display albt version """ click.echo(click.style('albt: ', fg='white') + click.style('1.0.2b4', fg='green'))