Python click 模块,get_text_stream() 实例源码

我们从Python开源项目中,提取了以下22个代码示例,用于说明如何使用click.get_text_stream()

项目:nlppln    作者:nlppln    | 项目源码 | 文件源码
def command(dir_in, recursive):
    files_out = []

    if recursive:
        for root, dirs, files in os.walk(os.path.abspath(dir_in)):
            for f in files:
                files_out.append(cwl_file(os.path.join(root, f)))
    else:
        for f in os.listdir(dir_in):
            fi = os.path.join(dir_in, f)
            if os.path.isfile(fi):
                files_out.append(cwl_file(fi))

    # order alphabetically on file name
    files_out = sorted(files_out, key=lambda x: x.get('path'))

    stdout_text = click.get_text_stream('stdout')
    stdout_text.write(json.dumps({'out_files': files_out}))
项目:notifiers    作者:liiight    | 项目源码 | 文件源码
def notify(ctx, provider):
    """Send a notification to a passed provider.

    Data should be passed via a key=value input like so:

        notifiers notify pushover token=foo user=bar message=test

    """
    p = get_notifier(provider)
    data = {}
    for item in ctx.args:
        data.update([item.split('=')])
    if 'message' not in data:
        message = click.get_text_stream('stdin').read()
        if not message:
            raise click.ClickException(
                "'message' option is required. "
                "Either pass it explicitly or pipe into the command"
            )
        data['message'] = message

    rsp = p.notify(**data)
    rsp.raise_on_errors()
项目:twtxt    作者:buckket    | 项目源码 | 文件源码
def validate_text(ctx, param, value):
    conf = click.get_current_context().obj["conf"]
    if isinstance(value, tuple):
        value = " ".join(value)

    if not value and not sys.stdin.isatty():
        value = click.get_text_stream("stdin").read()

    if value:
        value = value.strip()
        if conf.character_warning and len(value) > conf.character_warning:
            click.confirm("? Warning: Tweet is longer than {0} characters. Are you sure?".format(
                conf.character_warning), abort=True)
        return value
    else:
        raise click.BadArgumentUsage("Text can’t be empty.")
项目:flora    作者:Lamden    | 项目源码 | 文件源码
def staging():
    stdin_text = click.get_text_stream('stdin')
    for line in stdin_text:
        print(line)

# registers a new username
项目:panoptes-cli    作者:zooniverse    | 项目源码 | 文件源码
def download_classifications(
    workflow_id,
    output_file,
    generate,
    generate_timeout
):
    """
    Downloads a workflow-specific classifications export for the given workflow.

    OUTPUT_FILE will be overwritten if it already exists. Set OUTPUT_FILE to -
    to output to stdout.
    """

    workflow = Workflow.find(workflow_id)

    if generate:
        click.echo("Generating new export...", err=True)

    export = workflow.get_export(
        'classifications',
        generate=generate,
        wait_timeout=generate_timeout
    )

    with click.progressbar(
        export.iter_content(chunk_size=1024),
        label='Downloading',
        length=(int(export.headers.get('content-length')) / 1024 + 1),
        file=click.get_text_stream('stderr'),
    ) as chunks:
        for chunk in chunks:
            output_file.write(chunk)
项目:panoptes-cli    作者:zooniverse    | 项目源码 | 文件源码
def download(project_id, output_file, generate, generate_timeout, data_type):
    """
    Downloads project-level data exports.

    OUTPUT_FILE will be overwritten if it already exists. Set OUTPUT_FILE to -
    to output to stdout.
    """

    project = Project.find(project_id)

    if generate:
        click.echo("Generating new export...", err=True)

    export = project.get_export(
        data_type,
        generate=generate,
        wait_timeout=generate_timeout
    )

    with click.progressbar(
        export.iter_content(chunk_size=1024),
        label='Downloading',
        length=(int(export.headers.get('content-length')) / 1024 + 1),
        file=click.get_text_stream('stderr'),
    ) as chunks:
        for chunk in chunks:
            output_file.write(chunk)
项目:nlppln    作者:nlppln    | 项目源码 | 文件源码
def lowercase(in_file, out_dir):
    create_dirs(out_dir)

    text = in_file.read()
    text = text.lower()

    stdout_text = click.get_text_stream('stdout')
    stdout_text.write(text)
项目:browserstacker    作者:Stranger6667    | 项目源码 | 文件源码
def echo_stdout():
    stdout = click.get_text_stream('stdout')
    if stdout.readable():
        stdout.read()
项目:aws-adfs    作者:venth    | 项目源码 | 文件源码
def _stdin_user_credentials():
    stdin = click.get_text_stream('stdin').read()
    stdin_lines = stdin.strip().splitlines()
    try:
        username, password = stdin_lines[:2]
    except ValueError:
        raise click.ClickException("Failed to read newline separated "
                                   "username and password from stdin.")
    return username, password
项目:textkit    作者:learntextvis    | 项目源码 | 文件源码
def write_csv(rows, delim):
    writer = csv.writer(click.get_text_stream('stdout'), delimiter=delim, lineterminator='\n')
    try:
        [writer.writerow(row) for row in rows]
    except (OSError, IOError):
        sys.stderr.close()
项目:push2clip    作者:kf5grd    | 项目源码 | 文件源码
def cli(ctx, message, key, config, verbose):
    """Use simplepush and tasker to push text to remote phone's clipboard."""

    if ctx.invoked_subcommand is None:
        if not os.path.exists(config):
            click.echo("Config file: {}".format(config))
            click.echo("Config file does not exist. Please choose a different file, or initialize with push2clip init <key>")
            quit()

        with open(config, 'r') as f:
            conf = json.load(f)
            f.close()

        if not key:
            if not conf['key']:
                print("Error: Simplepush key is missing. Unable to send push.")
                quit()
            key = conf['key']

        if not message:
            message = click.get_text_stream('stdin').read()

        event = conf.get("event", '')
        title = conf.get("title", '')

        sp = simplepush(key, event=event, title=title, message=message) 
        if not sp:
            click.echo("An error was encountered while pushing message.")
        else:
            if verbose:
                click.echo("Push sent successfully.")
                click.echo("Key: {}".format(key))
                click.echo("Event: {}".format(event))
                click.echo("Title: {}".format(title))
                click.echo("Message: {}".format(message))
项目:smtk    作者:Data4Democracy    | 项目源码 | 文件源码
def cli(ctx, config):
    csv_config = {}

    try:
        with open(config, 'r') as config_file:
            csv_config = json.load(config_file)
            l.INFO("Using custom CSV configuration: %s" % (csv_config))
    except TypeError:
        l.WARN("Using default CSV configuration: %s" % (CSV_DEFAULT_CONFIG))

    input_ = click.get_text_stream('stdin')
    convert(input_, configuration=csv_config)
项目:smtk    作者:Data4Democracy    | 项目源码 | 文件源码
def cli(ctx, config):
    s3_config = {}

    try:
        with open(config, 'r') as config_file:
            s3_config = json.load(config_file)
            l.INFO("Using custom CSV configuration: %s" % (s3_config))
    except TypeError:
        l.WARN("Unable to parse s3 config")

    input_ = click.get_text_stream('stdin')
    convert(input_, configuration=s3_config)
项目:smtk    作者:Data4Democracy    | 项目源码 | 文件源码
def cli(ctx, config):
    client = MongoClient('localhost', 27017)
    db = client.twitter_connections
    collection = db.relationships

    input_ = click.get_text_stream('stdin')
    insert_lines(collection, input_)
项目:cosmolog    作者:planetlabs    | 项目源码 | 文件源码
def human(verbosity, datefmt, no_color):
    '''Use this command to format machine-readable logs for humans.

    `human` reads stdin and writes formatted lines to stdout.

    Verbosity Levels:
    FATAL=100 | ERROR=200 | WARN=300 | INFO=400 | DEBUG=500 |
    TRACE=600 (DEFAULT)

    USAGE:

    tail -f myapp.log | human

    cat myapp.log | human -v 400 --datefmt "%Y-%m-%d %H:%M:%S"

    '''
    f = CosmologgerHumanFormatter(origin='todo',
                                  version=0,
                                  datefmt=datefmt,
                                  color=not no_color)
    with click.get_text_stream('stdin') as stdin:
        for line in stdin:
            line = line.strip()
            try:
                process(line, verbosity, f)
            except CosmologgerException as e:
                msg = _format_exception(line, e, no_color)
                click.echo(msg, err=True)
项目:renodiff    作者:honza    | 项目源码 | 文件源码
def main(args=None):
    """
    Console script for renodiff

    Example usage:

    git show | renodiff
    """
    stdin_text = click.get_text_stream('stdin')
    try:
        run(stdin_text.read())
        return 0
    except:
        return -1
项目:ns-bot    作者:eigenein    | 项目源码 | 文件源码
def main(
    telegram_token: str,
    ns_login: str,
    ns_password: str,
    log_file: click.File,
    verbose: bool,
):
    logging.basicConfig(
        datefmt="%Y-%m-%d %H:%M:%S",
        format="%(asctime)s [%(levelname).1s] %(message)s",
        level=(logging.INFO if not verbose else logging.DEBUG),
        stream=(log_file or click.get_text_stream("stderr")),
    )

    logging.info("Starting bot…")
    with ExitStack() as exit_stack:
        telegram = exit_stack.enter_context(closing(Telegram(telegram_token)))
        ns = exit_stack.enter_context(closing(Ns(ns_login, ns_password)))
        bot = exit_stack.enter_context(closing(Bot(telegram, ns)))
        try:
            asyncio.ensure_future(bot.run())
            asyncio.get_event_loop().run_forever()
        finally:
            bot.stop()


# Bot response phrases.
# ----------------------------------------------------------------------------------------------------------------------
项目:molminer    作者:gorgitko    | 项目源码 | 文件源码
def ner(**kwargs):
    kwargs["no_standardize"] = not kwargs["no_standardize"]
    kwargs["no_convert_ions"] = not kwargs["no_convert_ions"]
    kwargs["no_header"] = not kwargs["no_header"]
    kwargs["no_normalize_text"] = not kwargs["no_normalize_text"]
    kwargs["no_annotation"] = not kwargs["no_annotation"]

    is_output_file = bool(kwargs["output"])

    stdin = click.get_text_stream("stdin")
    input_text = ""
    if not stdin.isatty():
        kwargs["input_file"] = ""
        input_text = click.get_text_stream("stdin").read().strip()
        if not input_text and not kwargs["input_file"]:
            raise click.UsageError("Cannot perform NER: stdin is empty and input file is not provided.")

    kwargs["opsin_types"] = get_opsin_types(kwargs["opsin_types"])

    init_kwargs = get_kwargs(kwargs, KWARGS_CHS_INIT)
    process_kwargs = get_kwargs(kwargs, KWARGS_CHS_PROCESS)

    chemspot = ChemSpot(**init_kwargs)
    result = chemspot.process(input_text=input_text, **process_kwargs)

    if kwargs["dry_run"]:
        print(result)
        exit(0)

    if kwargs["raw_output"]:
        print(result["stdout"])
        eprint(result["stderr"])
        exit(0)

    if not is_output_file:
        print(dict_to_csv(result["content"], csv_delimiter=kwargs["delimiter"], write_header=kwargs["no_header"]))
项目:molminer    作者:gorgitko    | 项目源码 | 文件源码
def convert(**kwargs):
    kwargs["no_header"] = not kwargs["no_header"]
    kwargs["no_normalize_plurals"] = not kwargs["no_normalize_plurals"]
    kwargs["no_standardize"] = not kwargs["no_standardize"]
    kwargs["opsin_no_allow_acids_without_acid"] = not kwargs["opsin_no_allow_acids_without_acid"]
    kwargs["opsin_no_detailed_failure_analysis"] = not kwargs["opsin_no_detailed_failure_analysis"]
    kwargs["opsin_no_allow_radicals"] = not kwargs["opsin_no_allow_radicals"]
    kwargs["opsin_no_allow_uninterpretable_stereo"] = not kwargs["opsin_no_allow_uninterpretable_stereo"]

    is_output_file = bool(kwargs["output"])

    stdin = click.get_text_stream("stdin")
    input_text = ""
    if not stdin.isatty():
        kwargs["input_file"] = ""
        input_text = click.get_text_stream("stdin").read().strip()
    if not input_text and not kwargs["input_file"]:
        raise click.UsageError("Cannot do conversion: stdin is empty and input file is not provided.")

    init_kwargs = get_kwargs(kwargs, KWARGS_OPSIN_INIT)
    process_kwargs = get_kwargs(kwargs, KWARGS_OPSIN_PROCESS)

    opsin = OPSIN(**init_kwargs)
    result = opsin.process(input=input_text, output_formats=["smiles", "inchi", "inchikey"], **process_kwargs)

    if kwargs["dry_run"]:
        print(result)
        exit(0)

    if kwargs["raw_output"]:
        print(result["stdout"])
        eprint(result["stderr"])
        exit(0)

    if not is_output_file:
        print(dict_to_csv(result["content"], csv_delimiter=kwargs["delimiter"], write_header=kwargs["no_header"]))
项目:pyexcel-cli    作者:pyexcel    | 项目源码 | 文件源码
def get_stream(name, file_type):
    stream = None
    stream_type = get_io_type(file_type)
    if stream_type == "string":
        stream = click.get_text_stream(name)
    else:
        stream = click.get_binary_stream(name)

    return stream
项目:rvo    作者:noqqe    | 项目源码 | 文件源码
def append(ctx, docid, password, content):
    """
    Append string to a document

    Trys to find the object, (decrypt,) adds content to the bottom,
    (encrypt,) update date fields and regenerates the title.

    :docid: str (bson object)
    :returns: bool
    """

    coll = db.get_document_collection(ctx)
    doc, docid = db.get_document_by_id(ctx, docid)

    template, c = db.get_content(ctx, doc, password=password)

    d = datetime.datetime.now()

    if not content:
        content = ""
        for l in click.get_text_stream('stdin'):
                content = content + l

    content = template + content

    if isinstance(content, unicode):
        content = content.encode("utf-8")

    if doc["encrypted"] is True:
        title = utils.get_title_from_content(content)
        content = c.encrypt_content(content)
    else:
        if not "links" in doc["categories"]:
            title = utils.get_title_from_content(content)


    if content != template:
        doc["content"] = content
        doc["title"] = title
        doc["updated"] = d
        if validate(doc):
            coll.save(doc)
            transaction.log(ctx, docid, "append", title)
            utils.log_info("Content appended to \"%s\"." % title)
        else:
            utils.log_error("Validation of the updated object did not succeed")
    else:
        utils.log_info("No changes detected for \"%s\"" % title)

    return True
项目:rvo    作者:noqqe    | 项目源码 | 文件源码
def import_mail(ctx, tag, category):
    content = ""
    for l in click.get_text_stream('stdin'):
        content = content + l
    msg = email.message_from_string(content)

    # title
    subject, encoding = email.header.decode_header(msg['Subject'])[0]
    if encoding is None:
        encoding = "utf-8"

    title = subject.decode(encoding)

    # content
    content = msg.get_payload(decode=False)
    content = quopri.decodestring(content)
    content = "# " + title + '\n\n' + content
    date = datetime.datetime.now()

    coll = db.get_document_collection(ctx)
    config = ctx.obj["config"]

    item = {
        "title": title,
        "content": content,
        "tags": list(tag),
        "categories": list(category),
        "created": date,
        "updated": date,
        "encrypted": False,
    }

    # insert item if its valid
    if validate(item):
        coll = db.get_document_collection(ctx)
        docid = coll.insert_one(item).inserted_id

        transaction.log(ctx, str(docid), "import", title)
        utils.log_info("Document \"%s\" created." % title)

    else:
        utils.log_error("Validation of the updated object did not succeed")