我们从Python开源项目中,提取了以下27个代码示例,用于说明如何使用click.echo_via_pager()。
def output(python_object, format="raw", pager=False): if format == 'yaml': output_string = yaml.safe_dump(python_object, default_flow_style=False, encoding='utf-8', allow_unicode=True) elif format == 'json': output_string = json.dumps(python_object, sort_keys=4, indent=4) elif format == 'raw': output_string = str(python_object) elif format == 'pformat': output_string = pprint.pformat(python_object) else: raise Exception("No valid output format provided. Supported: 'yaml', 'json', 'raw', 'pformat'") if pager: click.echo_via_pager(output_string) else: click.echo(output_string)
def write(self, data): if isinstance(data, six.binary_type): data = data.decode('utf-8') # echo_via_pager() already appends a '\n' at the end of text, # so we use rstrip() to remove extra newlines (#89) click.echo_via_pager(data.rstrip())
def cli(sort): """ Lists all available problems. """ problems = sorted(data.problems, key=lambda problem: problem[sort.lower()]) problem_list = ((problem['id'], problem['name'], '%d%%' % problem['difficulty']) for problem in problems) table = tabulate(problem_list, TABLE_HEADERS, tablefmt='fancy_grid') click.echo_via_pager(table)
def config_list(): lines = [] for option in config: value = config.get(option) lines.append('{} = {}'.format(option, value)) output = '\n'.join(lines) click.echo_via_pager(output)
def _show_file(file_path, header_name='Shown'): if file_path.startswith('file:/'): file_path = file_path[6:] click.secho("%s file: %s" % (header_name, file_path), fg='blue') if not os.path.exists(file_path): click.secho(' The file does not exist', fg='yellow') else: with open(file_path) as fd: click.echo_via_pager(fd.read())
def log(ctx, execution_id, client): """Get execution log (plain text from ansible-playbook) for a certain execution.""" response = client.get_execution_log(execution_id, headers={"Content-Type": "text/plain"}, raw_response=True, stream=True) if ctx.obj["pager"]: click.echo_via_pager(response.text) else: for line in response.iter_lines(decode_unicode=True): click.echo(line)
def format_output_json(ctx, response, error=False): response = json_dumps(response) response = colorize(response, ctx.obj["color"], "json") if error: click.echo(response, err=True) elif ctx.obj["pager"]: click.echo_via_pager(response) else: click.echo(response)
def build_table(self, view_entries, limit, pager, format_method, build_urls=True, print_output=True): """Build the table used for the gh view command. :type view_entries: list :param view_entries: A list of `github3` items. :type limit: int :param limit: Determines the number of items to show. :type format_method: callable :param format_method: A method called to format each item in the table. :type build_urls: bool :param build_urls: Determines whether to build urls for the gh view # command. :type print_output: bool :param print_output: determines whether to print the output (True) or return the output as a string (False). :rtype: str :return: the output if print_output is True, else, return None. """ if build_urls: self.build_table_urls(view_entries) index = 0 output = '' for view_entry in view_entries: index += 1 view_entry.index = index output += format_method(view_entry) + '\n' if index >= limit: break if build_urls: if len(view_entries) > limit: output += click.style((' <Hiding ' + str(len(view_entries) - limit) + ' item(s) with the -l/--limit flag>\n'), fg=self.config.clr_message) if index == 0: output += click.style('No results found', fg=self.config.clr_message) elif build_urls: output += click.style(self.create_tip(index)) else: output += click.style('') if print_output: if pager: color = None if platform.system() == 'Windows': color = True # Strip out Unicode, which seems to have issues on # Windows with click.echo_via_pager. output = re.sub(r'[^\x00-\x7F]+', '', output) click.echo_via_pager(output, color) else: click.secho(output) return None else: return output
def generate_url_contents(self, url): """Generate the formatted contents of the given item's url. Converts the HTML to text using HTML2Text, colors it, then displays the output in a pager. :type url: str :param url: The url whose contents to fetch. :rtype: str :return: The string representation of the formatted url contents. """ try: headers = {'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_10_1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/39.0.2171.95 Safari/537.36'} # NOQA raw_response = requests.get(url, headers=headers, verify=self.config.verify_ssl) except (requests.exceptions.SSLError, requests.exceptions.ConnectionError) as e: contents = 'Error: ' + str(e) + '\n' contents += 'Try running gh view # with the --browser/-b flag\n' return contents contents = self.html_to_text.handle(raw_response.text) # Strip out Unicode, which seems to have issues when html2txt is # coupled with click.echo_via_pager. contents = re.sub(r'[^\x00-\x7F]+', '', contents) contents = self.format_markdown(contents) return contents
def view_url(self, url): """View the given url. :type index: int :param index: The index for the given item, used with the gh view [index] commend. :type url: str :param url: The url to view """ contents = self.generate_url_contents(url) header = click.style('Viewing ' + url + '\n\n', fg=self.config.clr_message) contents = header + contents contents += click.style(('\nView this article in a browser with' ' the -b/--browser flag.\n'), fg=self.config.clr_message) contents += click.style(('\nPress q to quit viewing this ' 'article.\n'), fg=self.config.clr_message) if contents == '{"error":"Not Found"}\n': click.secho('Invalid user/repo combination.') return color = None if platform.system() == 'Windows': color = True click.echo_via_pager(contents, color)
def delete(config, a, se): """ Deletes all the directory content (files, dirs) """ if a and click.confirm("Delete all contents of " + config.dir_to_use + " ?"): click.echo("Attempting to delete: " + str(analyzer.get_entries_count()) + " entries...\n") cleaner.delete_dir_content(config.dir_to_use) filemanager.write_cleanup_report(cleaner.cleanup_data, config.app_dir) filemanager.pickle_data("last-cleanup", cleaner.cleanup_data, config.app_dir) # Make clean up data persistent click.echo("\nDeletion complete!") click.echo("* Deletions: " + str(cleaner.cleanup_data["deletions"])) click.echo("* Deletion size: " + converter.human_readable_size(cleaner.cleanup_data["size"])) click.echo("* Errors: " + str(cleaner.cleanup_data["error_count"])) if se: try: last_cleanup = filemanager.unpickle_data("last-cleanup") click.echo("Errors encountered during the last deletion [" + last_cleanup["datetime"] + "]:") click.echo("Total: " + str(last_cleanup["error_count"]) + "\n") click.echo_via_pager("\n\n".join("* %s" % error for error in last_cleanup["errors"])) except FileNotFoundError: click.echo("No error data was found.")
def get_news_articles(source): data = requests.get( "https://newsapi.org/v1/articles?source=" + source + "&apiKey=ba4d03d4a6f84f10962a79fd977e43b2") if data.status_code == 200: news_json = data.json() articles = news_json["articles"] for article in articles: author = article['author'] title = article['title'] description = article['description'] click.echo_via_pager(" TITLE: {} \n DESCRIPTION: {} \n AUTHOR: {}".format(title, description, author)) click.secho("=" * 130, fg='blue') else: click.echo("An error occurred try again later")
def readme(): """ View the README """ click.echo_via_pager( highlight( pkg_resources.resource_string("cuv", "README.rst"), get_lexer_by_name('rst'), formatter=TerminalFormatter(), ) )
def __exit__(self, a, b, c): msg = ''.join(self._lines) click.echo_via_pager(msg, color=True)
def paged_echo(): """ Unfortunately, to use .echo_via_pager() in Click, you have to feed it all of the lines at once. This returns a context-manager that lets you incrementally call '.echo' (same as 'click.echo') incrementally, only outputting (to the pager) when the context closes. """ return _PagedEcho()
def _echo(output, min_lines=10): ctx = click.get_current_context() if ctx.obj.get('use_pager') and output.count('\n') > min_lines: _func = click.echo_via_pager else: _func = click.echo _func(output, sys.stdout)
def pager(self, text, end=None): return echo_via_pager(text)
def readme(packages): """ View packages' long descriptions. If stdout is a terminal, the descriptions are passed to a pager program (e.g., `less(1)`). Packages can be specified as either ``packagename`` to show the latest version or as ``packagename==version`` to show the long description for ``version``. """ for pkg in packages: click.echo_via_pager(pkg["info"]["description"])
def list(ctx, **kwargs): jobs = JobAPI(ecx_session=ctx.ecx_session).list() if ctx.json: ctx.print_response(jobs) return job_table_info = [(x['name'], x['id'], x['status'], format_last_run_time(x['lastRunTime'])) for x in jobs] if not job_table_info: return print click.echo_via_pager(tabulate(job_table_info, headers=["Name","ID", "Status", "Last run"])) print
def list(ctx, **kwargs): resp = ctx.ecx_session.get(restype=ctx.restype, endpoint=ctx.endpoint) list_field = kwargs.get('listfield') or resource_to_listfield.get(ctx.restype) or (ctx.restype + 's') if ctx.json or list_field not in resp: ctx.print_response(resp) return resources = resp[list_field] if kwargs['fields']: fields = [x.strip() for x in kwargs['fields'].split(',')] else: fields = ["name", "id"] table_data = [] for res in resources: row = [] for field in fields: row.append(res.get(field, None)) table_data.append(row) if not table_data: return print click.echo_via_pager(tabulate(table_data, headers=fields)) print
def usedby(ctx, id, **kwargs): resp = AssociationAPI(ecx_session=ctx.ecx_session).get_using_resources(ctx.restype, id)["resources"] if ctx.json: ctx.print_response(resp) return table_data = [(x["type"], x["resourceId"], x["name"]) for x in resp] print click.echo_via_pager(tabulate(table_data, headers=["Type", "ID", "Name"])) print
def print_response(self, resp): if not self.links: remove_links(resp) click.echo_via_pager(json.dumps(resp, indent=4))
def cli(issue, since): """ View comments on an issue/PR """ output = show_comment(issue.data) for comment in issue.comments.get(params={"since": since}): output += '\n' + show_comment(comment) # echo_via_pager adds a newline, so remove the "extra" newline at the end click.echo_via_pager(output.rstrip('\r\n'))
def log(self): history = self.get_history() outputs = [] for i, record in enumerate(history): output = '' record['timestamp'] = time.strftime( '%a, %d %b %Y %H:%M:%S +0000', time.strptime(record['updated'], '%Y%m%d-%H%M%S')) checksum = ( ' ({algorithm} {checksum})\nDate: {timestamp}'.format( **record)) if self.versioned: output = output + '\n' + ( click.style( 'version {}'.format(record['version']) + checksum, fg='green')) else: output = output + '\n' + ( click.style( 'update {}'.format(len(history) - i) + checksum, fg='green')) for attr, val in sorted(record['user_config'].items()): output = output + '\n' + ( '{:<10} {}'.format(attr+':', val)) if record.get('message', None) is not None: wrapper = textwrap.TextWrapper( initial_indent=' ', subsequent_indent=' ', width=66) output = output + '\n\n' output = output + '\n'.join( wrapper.wrap(str(record['message']))) outputs.append(output) click.echo_via_pager('\n\n'.join(reversed(outputs)) + '\n')
def emit(self, record): """ Echos the record.msg string by using click.echo() The record will have attributes set to it when a user logs a message with any kwargs given. This function looks for any attributes that are in ECHO_KWARGS. If record has any kwargs attributes, the record will be echoed with the given kwargs. This makes click logging with a logger very easy. A user can add {"pager": True} as an extra param to any of the log commands to print the content to a pager. Args: record (logging.LogRecord): record which gets echoed with click.echo() """ try: # first format the record which adds the click style formatted_record = self.format(record) # user wants to use a pager to show message if hasattr(record, "pager") and record.pager: # save the original eviron dict original_env = dict(os.environ) # Force system to use pager and add default prompt at bottom left of the screen os.environ['PAGER'] = "less" os.environ['LESS'] = LESS_ENV try: click.echo_via_pager(formatted_record.msg, color=record.color if hasattr(record, "color") else None) # being paranoid here because we do NOT want to mess with people's environment except: os.environ.clear() os.environ.update(original_env) raise else: os.environ.clear() os.environ.update(original_env) else: # Sets the default kwargs kwargs = dict() # if user added a known kwarg, change the defaults for kwarg_name in self.ECHO_KWARGS: if hasattr(record, kwarg_name): kwargs[kwarg_name] = getattr(record, kwarg_name) # echo to console click.echo(formatted_record.msg, **kwargs) except (KeyboardInterrupt, SystemExit): raise except: self.handleError(record)
def timeline(ctx, pager, limit, twtfile, sorting, timeout, porcelain, source, cache, force_update): """Retrieve your personal timeline.""" if source: source_obj = ctx.obj["conf"].get_source_by_nick(source) if not source_obj: logger.debug("Not following {0}, trying as URL".format(source)) source_obj = Source(source, source) sources = [source_obj] else: sources = ctx.obj["conf"].following tweets = [] if cache: try: with Cache.discover(update_interval=ctx.obj["conf"].timeline_update_interval) as cache: force_update = force_update or not cache.is_valid if force_update: tweets = get_remote_tweets(sources, limit, timeout, cache) else: logger.debug("Multiple calls to 'timeline' within {0} seconds. Skipping update".format( cache.update_interval)) # Behold, almighty list comprehensions! (I might have gone overboard here…) tweets = list(chain.from_iterable([cache.get_tweets(source.url) for source in sources])) except OSError as e: logger.debug(e) tweets = get_remote_tweets(sources, limit, timeout) else: tweets = get_remote_tweets(sources, limit, timeout) if twtfile and not source: source = Source(ctx.obj["conf"].nick, ctx.obj["conf"].twturl, file=twtfile) tweets.extend(get_local_tweets(source, limit)) if not tweets: return tweets = sort_and_truncate_tweets(tweets, sorting, limit) if pager: click.echo_via_pager(style_timeline(tweets, porcelain)) else: click.echo(style_timeline(tweets, porcelain))