我们从Python开源项目中,提取了以下22个代码示例,用于说明如何使用click.get_text_stream()。
def command(dir_in, recursive): files_out = [] if recursive: for root, dirs, files in os.walk(os.path.abspath(dir_in)): for f in files: files_out.append(cwl_file(os.path.join(root, f))) else: for f in os.listdir(dir_in): fi = os.path.join(dir_in, f) if os.path.isfile(fi): files_out.append(cwl_file(fi)) # order alphabetically on file name files_out = sorted(files_out, key=lambda x: x.get('path')) stdout_text = click.get_text_stream('stdout') stdout_text.write(json.dumps({'out_files': files_out}))
def notify(ctx, provider): """Send a notification to a passed provider. Data should be passed via a key=value input like so: notifiers notify pushover token=foo user=bar message=test """ p = get_notifier(provider) data = {} for item in ctx.args: data.update([item.split('=')]) if 'message' not in data: message = click.get_text_stream('stdin').read() if not message: raise click.ClickException( "'message' option is required. " "Either pass it explicitly or pipe into the command" ) data['message'] = message rsp = p.notify(**data) rsp.raise_on_errors()
def validate_text(ctx, param, value): conf = click.get_current_context().obj["conf"] if isinstance(value, tuple): value = " ".join(value) if not value and not sys.stdin.isatty(): value = click.get_text_stream("stdin").read() if value: value = value.strip() if conf.character_warning and len(value) > conf.character_warning: click.confirm("? Warning: Tweet is longer than {0} characters. Are you sure?".format( conf.character_warning), abort=True) return value else: raise click.BadArgumentUsage("Text can’t be empty.")
def staging(): stdin_text = click.get_text_stream('stdin') for line in stdin_text: print(line) # registers a new username
def download_classifications( workflow_id, output_file, generate, generate_timeout ): """ Downloads a workflow-specific classifications export for the given workflow. OUTPUT_FILE will be overwritten if it already exists. Set OUTPUT_FILE to - to output to stdout. """ workflow = Workflow.find(workflow_id) if generate: click.echo("Generating new export...", err=True) export = workflow.get_export( 'classifications', generate=generate, wait_timeout=generate_timeout ) with click.progressbar( export.iter_content(chunk_size=1024), label='Downloading', length=(int(export.headers.get('content-length')) / 1024 + 1), file=click.get_text_stream('stderr'), ) as chunks: for chunk in chunks: output_file.write(chunk)
def download(project_id, output_file, generate, generate_timeout, data_type): """ Downloads project-level data exports. OUTPUT_FILE will be overwritten if it already exists. Set OUTPUT_FILE to - to output to stdout. """ project = Project.find(project_id) if generate: click.echo("Generating new export...", err=True) export = project.get_export( data_type, generate=generate, wait_timeout=generate_timeout ) with click.progressbar( export.iter_content(chunk_size=1024), label='Downloading', length=(int(export.headers.get('content-length')) / 1024 + 1), file=click.get_text_stream('stderr'), ) as chunks: for chunk in chunks: output_file.write(chunk)
def lowercase(in_file, out_dir): create_dirs(out_dir) text = in_file.read() text = text.lower() stdout_text = click.get_text_stream('stdout') stdout_text.write(text)
def echo_stdout(): stdout = click.get_text_stream('stdout') if stdout.readable(): stdout.read()
def _stdin_user_credentials(): stdin = click.get_text_stream('stdin').read() stdin_lines = stdin.strip().splitlines() try: username, password = stdin_lines[:2] except ValueError: raise click.ClickException("Failed to read newline separated " "username and password from stdin.") return username, password
def write_csv(rows, delim): writer = csv.writer(click.get_text_stream('stdout'), delimiter=delim, lineterminator='\n') try: [writer.writerow(row) for row in rows] except (OSError, IOError): sys.stderr.close()
def cli(ctx, message, key, config, verbose): """Use simplepush and tasker to push text to remote phone's clipboard.""" if ctx.invoked_subcommand is None: if not os.path.exists(config): click.echo("Config file: {}".format(config)) click.echo("Config file does not exist. Please choose a different file, or initialize with push2clip init <key>") quit() with open(config, 'r') as f: conf = json.load(f) f.close() if not key: if not conf['key']: print("Error: Simplepush key is missing. Unable to send push.") quit() key = conf['key'] if not message: message = click.get_text_stream('stdin').read() event = conf.get("event", '') title = conf.get("title", '') sp = simplepush(key, event=event, title=title, message=message) if not sp: click.echo("An error was encountered while pushing message.") else: if verbose: click.echo("Push sent successfully.") click.echo("Key: {}".format(key)) click.echo("Event: {}".format(event)) click.echo("Title: {}".format(title)) click.echo("Message: {}".format(message))
def cli(ctx, config): csv_config = {} try: with open(config, 'r') as config_file: csv_config = json.load(config_file) l.INFO("Using custom CSV configuration: %s" % (csv_config)) except TypeError: l.WARN("Using default CSV configuration: %s" % (CSV_DEFAULT_CONFIG)) input_ = click.get_text_stream('stdin') convert(input_, configuration=csv_config)
def cli(ctx, config): s3_config = {} try: with open(config, 'r') as config_file: s3_config = json.load(config_file) l.INFO("Using custom CSV configuration: %s" % (s3_config)) except TypeError: l.WARN("Unable to parse s3 config") input_ = click.get_text_stream('stdin') convert(input_, configuration=s3_config)
def cli(ctx, config): client = MongoClient('localhost', 27017) db = client.twitter_connections collection = db.relationships input_ = click.get_text_stream('stdin') insert_lines(collection, input_)
def human(verbosity, datefmt, no_color): '''Use this command to format machine-readable logs for humans. `human` reads stdin and writes formatted lines to stdout. Verbosity Levels: FATAL=100 | ERROR=200 | WARN=300 | INFO=400 | DEBUG=500 | TRACE=600 (DEFAULT) USAGE: tail -f myapp.log | human cat myapp.log | human -v 400 --datefmt "%Y-%m-%d %H:%M:%S" ''' f = CosmologgerHumanFormatter(origin='todo', version=0, datefmt=datefmt, color=not no_color) with click.get_text_stream('stdin') as stdin: for line in stdin: line = line.strip() try: process(line, verbosity, f) except CosmologgerException as e: msg = _format_exception(line, e, no_color) click.echo(msg, err=True)
def main(args=None): """ Console script for renodiff Example usage: git show | renodiff """ stdin_text = click.get_text_stream('stdin') try: run(stdin_text.read()) return 0 except: return -1
def main( telegram_token: str, ns_login: str, ns_password: str, log_file: click.File, verbose: bool, ): logging.basicConfig( datefmt="%Y-%m-%d %H:%M:%S", format="%(asctime)s [%(levelname).1s] %(message)s", level=(logging.INFO if not verbose else logging.DEBUG), stream=(log_file or click.get_text_stream("stderr")), ) logging.info("Starting bot…") with ExitStack() as exit_stack: telegram = exit_stack.enter_context(closing(Telegram(telegram_token))) ns = exit_stack.enter_context(closing(Ns(ns_login, ns_password))) bot = exit_stack.enter_context(closing(Bot(telegram, ns))) try: asyncio.ensure_future(bot.run()) asyncio.get_event_loop().run_forever() finally: bot.stop() # Bot response phrases. # ----------------------------------------------------------------------------------------------------------------------
def ner(**kwargs): kwargs["no_standardize"] = not kwargs["no_standardize"] kwargs["no_convert_ions"] = not kwargs["no_convert_ions"] kwargs["no_header"] = not kwargs["no_header"] kwargs["no_normalize_text"] = not kwargs["no_normalize_text"] kwargs["no_annotation"] = not kwargs["no_annotation"] is_output_file = bool(kwargs["output"]) stdin = click.get_text_stream("stdin") input_text = "" if not stdin.isatty(): kwargs["input_file"] = "" input_text = click.get_text_stream("stdin").read().strip() if not input_text and not kwargs["input_file"]: raise click.UsageError("Cannot perform NER: stdin is empty and input file is not provided.") kwargs["opsin_types"] = get_opsin_types(kwargs["opsin_types"]) init_kwargs = get_kwargs(kwargs, KWARGS_CHS_INIT) process_kwargs = get_kwargs(kwargs, KWARGS_CHS_PROCESS) chemspot = ChemSpot(**init_kwargs) result = chemspot.process(input_text=input_text, **process_kwargs) if kwargs["dry_run"]: print(result) exit(0) if kwargs["raw_output"]: print(result["stdout"]) eprint(result["stderr"]) exit(0) if not is_output_file: print(dict_to_csv(result["content"], csv_delimiter=kwargs["delimiter"], write_header=kwargs["no_header"]))
def convert(**kwargs): kwargs["no_header"] = not kwargs["no_header"] kwargs["no_normalize_plurals"] = not kwargs["no_normalize_plurals"] kwargs["no_standardize"] = not kwargs["no_standardize"] kwargs["opsin_no_allow_acids_without_acid"] = not kwargs["opsin_no_allow_acids_without_acid"] kwargs["opsin_no_detailed_failure_analysis"] = not kwargs["opsin_no_detailed_failure_analysis"] kwargs["opsin_no_allow_radicals"] = not kwargs["opsin_no_allow_radicals"] kwargs["opsin_no_allow_uninterpretable_stereo"] = not kwargs["opsin_no_allow_uninterpretable_stereo"] is_output_file = bool(kwargs["output"]) stdin = click.get_text_stream("stdin") input_text = "" if not stdin.isatty(): kwargs["input_file"] = "" input_text = click.get_text_stream("stdin").read().strip() if not input_text and not kwargs["input_file"]: raise click.UsageError("Cannot do conversion: stdin is empty and input file is not provided.") init_kwargs = get_kwargs(kwargs, KWARGS_OPSIN_INIT) process_kwargs = get_kwargs(kwargs, KWARGS_OPSIN_PROCESS) opsin = OPSIN(**init_kwargs) result = opsin.process(input=input_text, output_formats=["smiles", "inchi", "inchikey"], **process_kwargs) if kwargs["dry_run"]: print(result) exit(0) if kwargs["raw_output"]: print(result["stdout"]) eprint(result["stderr"]) exit(0) if not is_output_file: print(dict_to_csv(result["content"], csv_delimiter=kwargs["delimiter"], write_header=kwargs["no_header"]))
def get_stream(name, file_type): stream = None stream_type = get_io_type(file_type) if stream_type == "string": stream = click.get_text_stream(name) else: stream = click.get_binary_stream(name) return stream
def append(ctx, docid, password, content): """ Append string to a document Trys to find the object, (decrypt,) adds content to the bottom, (encrypt,) update date fields and regenerates the title. :docid: str (bson object) :returns: bool """ coll = db.get_document_collection(ctx) doc, docid = db.get_document_by_id(ctx, docid) template, c = db.get_content(ctx, doc, password=password) d = datetime.datetime.now() if not content: content = "" for l in click.get_text_stream('stdin'): content = content + l content = template + content if isinstance(content, unicode): content = content.encode("utf-8") if doc["encrypted"] is True: title = utils.get_title_from_content(content) content = c.encrypt_content(content) else: if not "links" in doc["categories"]: title = utils.get_title_from_content(content) if content != template: doc["content"] = content doc["title"] = title doc["updated"] = d if validate(doc): coll.save(doc) transaction.log(ctx, docid, "append", title) utils.log_info("Content appended to \"%s\"." % title) else: utils.log_error("Validation of the updated object did not succeed") else: utils.log_info("No changes detected for \"%s\"" % title) return True
def import_mail(ctx, tag, category): content = "" for l in click.get_text_stream('stdin'): content = content + l msg = email.message_from_string(content) # title subject, encoding = email.header.decode_header(msg['Subject'])[0] if encoding is None: encoding = "utf-8" title = subject.decode(encoding) # content content = msg.get_payload(decode=False) content = quopri.decodestring(content) content = "# " + title + '\n\n' + content date = datetime.datetime.now() coll = db.get_document_collection(ctx) config = ctx.obj["config"] item = { "title": title, "content": content, "tags": list(tag), "categories": list(category), "created": date, "updated": date, "encrypted": False, } # insert item if its valid if validate(item): coll = db.get_document_collection(ctx) docid = coll.insert_one(item).inserted_id transaction.log(ctx, str(docid), "import", title) utils.log_info("Document \"%s\" created." % title) else: utils.log_error("Validation of the updated object did not succeed")