Python click 模块,get_binary_stream() 实例源码

我们从Python开源项目中,提取了以下3个代码示例,用于说明如何使用click.get_binary_stream()

项目:clickhouse-cli    作者:hatarist    | 项目源码 | 文件源码
def run_cli(host, port, user, password, database, settings, query, format,
            format_stdin, multiline, stacktrace, version, files):
    """
    A third-party client for the ClickHouse DBMS.
    """
    if version:
        return show_version()

    if password:
        password = click.prompt("Password", hide_input=True, show_default=False, type=str)

    data_input = ()

    # Read from STDIN if non-interactive mode
    stdin = click.get_binary_stream('stdin')
    if not stdin.isatty():
        data_input += (stdin,)

    # Read the given file
    if files:
        data_input += files

    # TODO: Rename the CLI's instance into something more feasible
    cli = CLI(
        host, port, user, password, database, settings,
        format, format_stdin, multiline, stacktrace
    )
    cli.run(query, data_input)
项目:pyexcel-cli    作者:pyexcel    | 项目源码 | 文件源码
def get_stream(name, file_type):
    stream = None
    stream_type = get_io_type(file_type)
    if stream_type == "string":
        stream = click.get_text_stream(name)
    else:
        stream = click.get_binary_stream(name)

    return stream
项目:omnihash    作者:Miserlou    | 项目源码 | 文件源码
def main(click_context, hashmes, s, v, c, f, m, j):
    """
    If there is a file at hashme, read and omnihash that file.
    Elif hashme is a string, omnihash that.
    """

    # Print version and quit
    if v:
        version = pkg_resources.require("omnihash")[0].version
        click.echo(version)
        return

    intialize_plugins()

    results = []
    if not hashmes:
        # If no stdin, just help and quit.
        if not sys.stdin.isatty():
            digesters = make_digesters(None, f, c)
            stdin = click.get_binary_stream('stdin')
            bytechunks = iter(lambda: stdin.read(io.DEFAULT_BUFFER_SIZE), b'')
            if not j:
                click.echo("Hashing " + click.style("standard input", bold=True) + "..", err=True)
            results.append([produce_hashes(bytechunks, digesters, match=m, use_json=j)])
        else:
            print(click_context.get_help())
            return
    else:
        hash_many = len(hashmes) > 1
        for hashme in hashmes:
            result = {}
            digesters = make_digesters(hashme, f, c)
            bytechunks = iterate_bytechunks(hashme, s, j, hash_many)
            if bytechunks:
                result = produce_hashes(bytechunks, digesters, match=m, use_json=j)
            if result:
                result['NAME'] = hashme
                results.append(result)

    if results and j:
        print(json.dumps(results, indent=4, sort_keys=True))


##
# Main Logic
##