我们从Python开源项目中,提取了以下17个代码示例,用于说明如何使用click.clear()。
def resources(cls, jobs_resources): jobs_resources = to_list(jobs_resources) click.clear() data = [['Job', 'Mem Usage/Limit', 'CPU%-CPUs', 'GPU Mem', 'GPU Usage']] for job_resources in jobs_resources: job_resources = ContainerResourcesConfig.from_dict(job_resources) line = [ job_resources.job_uuid.hex, '{} / {}'.format(job_resources.memory_used / (1024 ** 3), job_resources.memory_limit / (1024 ** 3)), '{} - {}'.format(job_resources.cpu_percentage, len(job_resources.percpu_percentage))] if job_resources.gpu_resources: pass data.append(line) click.echo(tabulate(data, headers="firstrow")) sys.stdout.flush()
def procpkt(pkt): now = time() output = '{seconds}\t{ip}\t{hwaddr}\t{vendor}' if 'ARP' in pkt: hosts[pkt[ARP].psrc] = {} hosts[pkt[ARP].psrc]['hwaddr'] = pkt[ARP].hwsrc hosts[pkt[ARP].psrc]['vendor'] = mac2vendor.get_comment(pkt[ARP].hwsrc) hosts[pkt[ARP].psrc]['time'] = time() click.clear() for ip in sorted(hosts): print(output.format( seconds = int(now - hosts[ip]['time']), ip = ip, hwaddr = hosts[ip]['hwaddr'], vendor = hosts[ip]['vendor'] ))
def draw(self): execution = self.log_manager.execution events = self.events l = Layout() l.add(self.get_header_flex(execution)) l.add(self.get_stat_flex(execution)) l.add(Divider('=')) available_height = l.height - len(l.rows) - 2 if available_height > 0: for event in events[-available_height:]: l.add( Flex(style=stream_styles.get(event['stream'])) .add(event['time'].split('T')[1][:-4] + ' ', flex=0) .add(event['message'], flex=4) ) click.clear() l.draw()
def monitor(index, delta, query_string): click.clear() def cnt(): q = Q('query_string', query=query_string) s = Search( using=es.client, index=index).query(q) return s.count() N = cnt() tot = Search(using=es.client, index=index).count() if not delta: N = tot log.info('Processing %d records (total: %d)', N, tot) click.echo('You can exit by CTRL-C: results will still process') bar = SlowOverallFancyBar('', max=N, grand_total=tot) while True: time.sleep(5.0) try: n = cnt() if isinstance(n, int): if delta: done = N - n else: done = n bar.goto(done) except Exception as e: log.warn('Cannot count: %s', e) bar.finish()
def delete_index(index): """Delete an index""" click.clear() click.confirm( click.style( 'Really DELETE index "%s"?' % index, fg='white', bg='red'), abort=True) es.client.indices.delete(index=index) log.info('Index deleted')
def clone_index(use_helper, from_index, to_index): """Clone an index""" from elasticsearch_dsl import Search from elasticsearch.helpers import reindex click.clear() if not es.client.indices.exists(index=to_index): click.secho('%s not existing!'.format(to_index), fg='red') return 1 cnt = Search(using=es.client, index=to_index).count() message = 'Index %s already exists (%d records). Overwrite?' % ( to_index, cnt) click.confirm(message, abort=True) if use_helper: reindex( client=es.client, source_index=from_index, target_index=to_index) else: es.client.reindex( body=dict( source=dict(index=from_index), dest=dict(index=to_index)), wait_for_completion=False)
def monitor_clone_index(from_index, to_index): """Monitor the size of an index""" from elasticsearch_dsl import Search click.clear() cnt = Search(using=es.client, index=from_index).count() bar = SlowFancyBar('', max=cnt) while True: time.sleep(2.0) _cnt = Search(using=es.client, index=to_index).count() bar.goto(_cnt) bar.finish()
def monitor(app): """Set up application monitoring.""" heroku_app = HerokuApp(dallinger_uid=app) webbrowser.open(heroku_app.dashboard_url) webbrowser.open("https://requester.mturk.com/mturk/manageHITs") heroku_app.open_logs() check_call(["open", heroku_app.db_uri]) while _keep_running(): summary = get_summary(app) click.clear() click.echo(header) click.echo("\nExperiment {}\n".format(app)) click.echo(summary) time.sleep(10)
def start(self, cards): """ Start a StudySession with the iterator of cards given. :param cards: cards iterator. """ self.question_num = len(cards) self.question_count = 1 # starts at 1 for display convenience for card in cards: click.clear() self.show_question(card.question) self.show_answer(card.answer)
def main(ch): click.clear() getScore(ch)
def getLiveEvents(): click.clear() click.secho("Events Going On Live Right Now", bold=True, fg='green') for ix in range(0, len(tableHeads)): click.secho("\t" + str(ix+1) + "| " + str(tableHeads[ix].h2.text), fg='cyan')
def navigate_to(self, going): logger.debug('navigating to: %s' % going) self.session.process_events() going.initialize() while self.navigating: self.check_spotipy_me() click.clear() self.print_header() self.print_menu(going.get_ui()) response = going.get_response() if callable(response): response = response() logger.debug('Got response %s after evaluation' % response) if response == responses.QUIT: click.clear() click.echo('Thanks, bye!') self.navigating = False return elif response == responses.UP: break elif response == responses.NOOP: continue elif response == responses.PLAYER: self.navigate_to(self.player) elif response != going: self.navigate_to(response) # This happens when the `going` instance gets control again. We # don't want to remember the query and we want to rebuild the # menu's options # (and possibly something else?) going.initialize()
def main(user, flag): global _user global _league _user = user _league = user.get_league() if flag: click.clear() print('%s Managerial office' % _user.get_team().get_name()) print('-'.ljust(39,'-')) nav = click.prompt('What would you like to do?', type=click.Choice(['help','inbox','schedule','squad','standings','personal', 'save', 'exit'])) if nav == 'help': print('\nWhile in your office, you have a variety of resources available to you. You may:\n\n' 'inbox : access any new mail you\'ve received, whether they be newsletters, injury news, player communications, or transfer offers\n' 'schedule : take a look at upcoming games and past results of both your team and other teams in the league\n' 'squad : check on how your players are doing, including their stats based on recent training sessions\n' 'standings : see how your team ranks up on the table, along with other useful information and stats\n' 'personal : see your own personal stats and information\n' 'save : save your in-game progress\n' 'exit : exit out of the game, although why would you do that?\n') main(_user, False) elif nav == 'exit': pass elif nav == 'inbox': inbox() elif nav == 'schedule': schedule() elif nav == 'squad': squad() elif nav == 'standings': standings() elif nav == 'personal': personal() elif nav == 'save': save()
def inbox(): click.clear()
def new_game(): global user name = click.prompt('\nCongratulations on becoming a manager! What is your name?', confirmation_prompt=True) user = gen.Manager(name) print('\n') click.clear() print('Here are the list of teams in the league:') for n in range(10): print('%s) %s' % (str(n + 1), user.get_league().get_team_list()[n].get_name())) team_selection()
def run_cli(self): def get_title(): return "kube-shell" logger.info("running kube-shell event loop") if not os.path.exists(os.path.expanduser("~/.kube/config")): click.secho('Kube-shell uses ~/.kube/config for server side completion. Could not find ~/.kube/config. ' 'Server side completion functionality may not work.', fg='red', blink=True, bold=True) while True: global inline_help try: Kubeshell.clustername, Kubeshell.user, Kubeshell.namespace = KubeConfig.parse_kubeconfig() except: logger.error("unable to parse ~/.kube/config %s", exc_info=1) completer.set_namespace(self.namespace) try: user_input = prompt('kube-shell> ', history=self.history, auto_suggest=AutoSuggestFromHistory(), style=StyleFactory("vim").style, lexer=KubectlLexer, get_title=get_title, enable_history_search=False, get_bottom_toolbar_tokens=self.toolbar.handler, vi_mode=True, key_bindings_registry=registry, completer=completer) except (EOFError, KeyboardInterrupt): sys.exit() if user_input == "clear": click.clear() elif user_input == "exit": sys.exit() # if execute shell command then strip "!" if user_input.startswith("!"): user_input = user_input[1:] if user_input: if '-o' in user_input and 'json' in user_input: user_input += ' | pygmentize -l json' p = subprocess.Popen(user_input, shell=True) p.communicate()
def watch(limit): """watch scan rates across the cluster""" period = 5.0 prev = db.db() prev_totals = None while True: click.clear() time.sleep(period) cur = db.db() cur.data['gkrate'] = {} progress = [] prev_buckets = {b.bucket_id: b for b in prev.buckets()} totals = {'scanned': 0, 'krate': 0, 'lrate': 0, 'bucket_id': 'totals'} for b in cur.buckets(): if not b.scanned: continue totals['scanned'] += b.scanned totals['krate'] += b.krate totals['lrate'] += b.lrate if b.bucket_id not in prev_buckets: b.data['gkrate'][b.bucket_id] = b.scanned / period elif b.scanned == prev_buckets[b.bucket_id].scanned: continue else: b.data['gkrate'][b.bucket_id] = ( b.scanned - prev_buckets[b.bucket_id].scanned) / period progress.append(b) if prev_totals is None: totals['gkrate'] = '...' else: totals['gkrate'] = (totals['scanned'] - prev_totals['scanned']) / period prev = cur prev_totals = totals progress = sorted(progress, key=lambda x: x.gkrate, reverse=True) if limit: progress = progress[:limit] progress.insert(0, utils.Bag(totals)) format_plain( progress, None, explicit_only=True, keys=['bucket_id', 'scanned', 'gkrate', 'lrate', 'krate'])