我们从Python开源项目中,提取了以下21个代码示例,用于说明如何使用click.File()。
def run(infile, outfile, time_interval, quiet): logging.basicConfig(level=logging.WARN if quiet else logging.INFO) logger = logging.getLogger(__name__) logger.info('loading input file %s ...' % infile) with open(infile) as fin: # Do not use click.File because we want close the file asap data = json.load(fin) n = len(data) logger.info( 'loading input file %s done. %d data found.'% (infile, n)) for i in xrange(len(data)): logger.info('Sleeping for %d sec [%d/%d] ...' % (time_interval, i+1, n)) time.sleep(time_interval) with open(outfile, 'w') as fout: json.dump(data[:(i+1)], fout) logger.info('Dumped %dth/%d data to %s' % (i+1, n, outfile))
def cfdv32mx(config): """Format cfdi v3.2 for Mexico. \b File where the files will be written document.xml. cfdicli --in_file /path/to/yout/json/documnt.json cfdv32mx \b File where the files will be written from document.json. cfdicli --out_file ./document.xml cfdv32mx """ # TODO: look for a secure option for eval. # Or simply the CLI only should manage json? # TODO: Implement json option also. dict_input = eval(config.in_file.read()) invoice = cfdv32.get_invoice(dict_input) if invoice.valid: config.out_file.write(invoice.document) config.out_file.flush() click.echo('Document %s has been created.' % config.out_file.name) else: click.echo(invoice.ups.message)
def pull(project, run, kind, entity): project, run = api.parse_slug(run, project=project) urls = api.download_urls(project, run=run, entity=entity) if len(urls) == 0: raise ClickException("Run has no files") click.echo("Downloading: {project}/{run}".format( project=click.style(project, bold=True), run=run )) for name in urls: if api.file_current(name, urls[name]['md5']): click.echo("File %s is up to date" % name) else: length, response = api.download_file(urls[name]['url']) with click.progressbar(length=length, label='File %s' % name, fill_char=click.style('&', fg='green')) as bar: with open(name, "wb") as f: for data in response.iter_content(chunk_size=4096): f.write(data) bar.update(len(data))
def codegen_options(f): f = click.option( "-c", "--categories", multiple=True, default=default_categories, type=click.Choice(all_categories), help="A list of the categories of inputs and outputs that should " "be enabled" )(f) f = click.option( "-f", "--param_file", type=click.File(), help="""YAML or JSON file describing the firmware module configuration to be flashed. This is the same file that is used for rosparam in the launch file.""" "code" )(f) f = click.option( "-p", "--plugin", multiple=True, help="Enable a specific plugin" )(f) f = click.option( "-t", "--target", help="PlatformIO target (e.g. upload)" )(f) f = click.option( "--status_update_interval", default=5, help="Minimum interval between driver status updates (in seconds)" )(f) return f
def bake(input_file, output_file, data): """ This command bakes Open Badges data into a file and saves the result to an output file. Positional Arguments: \b Input filename: File must exist. \b Output filename: If file exists, it will be overwritten. """ output = utils.bake(input_file, data, output_file) click.echo( "{} is done baking. Remember to let it cool".format(output_file.name) )
def generate_crowdflower_interface_template(input_csv, output_html): """ Generate CrowFlower interface template based on input data spreadsheet :param file input_csv: CSV file with the input data :param output_html: File in which to write the output :type output_html: file :return: 0 on success """ # Get the filed names of the input data spreadsheet sheet = csv.DictReader(input_csv) fields = sheet.fieldnames # Get "fe_[0-9][0-9]" fields fe_fields = [f for f in fields if re.match(r'fe_[0-9]{2}$', f)] # Get "chunk[0-9][0-9]" fields token_fields = [f for f in fields if re.match(r'chunk_[0-9]{2}$', f)] # Generate fe blocks for every token field fe_blocks = [] for fe_field in fe_fields: fe_blocks.append(FE_TEMPLATE % {'fe_field': fe_field}) crowdflower_interface_template = HEADER # Generate fe_name blocks(question blocks) for every fe_name field for idx, token_field in enumerate(token_fields): dic = {'question_num': idx + 1, 'token_field': token_field} # Add fe blocks into template dic['fe_blocks'] = ''.join(fe_blocks) # Add current fe_name block or question block into template crowdflower_interface_template += (TOKEN_TEMPLATE % dic) crowdflower_interface_template += FOOTER output_html.write(crowdflower_interface_template) return 0
def parse_files(ctx, param, values): ret = [] converter = click.File('rb') for item in values: if '=' not in item: raise click.BadParameter('String parameter "%s" should be in form of FIELD=VALUE') field, value = item.split('=', 1) input_file = converter.convert(value, param, ctx) pair = (field, input_file) ret.append(pair) return ret
def file(ctx, data_dir, data_file): """Use the File SWAG Backend""" if not ctx.file: ctx.data_file = data_file if not ctx.data_dir: ctx.data_dir = data_dir ctx.type = 'file'
def tpflist(campaign, channel, sc, wget): """Prints the Target Pixel File URLS for a given CAMPAIGN/QUARTER and ccd CHANNEL. CAMPAIGN can refer to a K2 Campaign (e.g. 'C4') or a Kepler Quarter (e.g. 'Q4'). """ try: urls = mast.get_tpf_urls(campaign, channel=channel, short_cadence=sc) if wget: WGET_CMD = 'wget -nH --cut-dirs=6 -c -N ' print('\n'.join([WGET_CMD + url for url in urls])) else: print('\n'.join(urls)) except mast.NoDataFoundException as e: click.echo(e)
def load_hdf5(path): '''Load a Dataset object from an HDF5 file. ''' with h5py.File(path, 'r') as f: x = f['x'][...] return Dataset( x=x.reshape(x.shape[0], -1), y=f['y'][...].astype(np.int32), vocab=f['vocab'][...], height=x.shape[1], width=x.shape[2] )
def cli_render(input, output, size): '''Render a JSONlines dataset to numpy arrays, saved in an HDF5 file. ''' chars = [] images = [] for line in input: datum = json.loads(line) chars.append(datum['target']) images.append(render( [np.array(s) for s in datum['strokes']], size)) vocab = list(sorted(set(chars))) char_to_index = {ch: y for y, ch in enumerate(vocab)} with h5py.File(output, 'a') as f: str_dt = h5py.special_dtype(vlen=str) f.require_dataset( 'vocab', (len(vocab),), dtype=str_dt )[...] = vocab f.require_dataset( 'x', shape=(len(images), size, size), dtype=np.float32 )[...] = np.array(images) f.require_dataset( 'y', shape=(len(chars),), dtype=np.int )[...] = np.array([char_to_index[ch] for ch in chars])
def convert(self, param=None, ctx=None, value=None): if not value: content = sys.stdin.readlines() else: is_compressed = value.endswith('.gz') is_binary = self.mode.endswith('b') if os.path.exists(value): local_mode = 'rb' if is_compressed else self.mode file_obj = open(value, local_mode) else: url = urlparse(value) if not url.scheme: raise click.BadParameter( 'File \'{}\' not found'.format(url)) elif url.scheme not in self.SUPPORTED_SCHEMES: raise click.BadParameter( 'Scheme \'{}\' not supported'.format(url.scheme)) else: file_obj = urlopen(value) if is_compressed: with gzip.GzipFile(mode=self.mode, fileobj=file_obj) as file: content = file.read() if not is_binary: content = local_decode(content) else: content = file_obj.read() file_obj.close() return content
def ssh_command(func): func = click.option( "-i", "--identity-file", type=click.File(lazy=False), default=str(get_private_key_path()), help="Path to the private key file", show_default=True )(func) func = click.option( "-b", "--batch-size", type=int, default=20, help="By default, command won't connect to all servers " "simultaneously, it is trying to process servers in batches. " "Negative number or 0 means connect to all hosts", show_default=True, )(func) @functools.wraps(func) @click.pass_context def decorator(ctx, identity_file, batch_size, *args, **kwargs): private_key = asyncssh.import_private_key(identity_file.read()) batch_size = batch_size if batch_size > 0 else None identity_file.close() ctx.obj["private_key"] = private_key ctx.obj["batch_size"] = batch_size ctx.obj["event_loop"] = asyncio.get_event_loop() return func(*args, **kwargs) return decorator
def main( telegram_token: str, ns_login: str, ns_password: str, log_file: click.File, verbose: bool, ): logging.basicConfig( datefmt="%Y-%m-%d %H:%M:%S", format="%(asctime)s [%(levelname).1s] %(message)s", level=(logging.INFO if not verbose else logging.DEBUG), stream=(log_file or click.get_text_stream("stderr")), ) logging.info("Starting bot…") with ExitStack() as exit_stack: telegram = exit_stack.enter_context(closing(Telegram(telegram_token))) ns = exit_stack.enter_context(closing(Ns(ns_login, ns_password))) bot = exit_stack.enter_context(closing(Bot(telegram, ns))) try: asyncio.ensure_future(bot.run()) asyncio.get_event_loop().run_forever() finally: bot.stop() # Bot response phrases. # ----------------------------------------------------------------------------------------------------------------------
def fetch_samples_from_obserations(features, exact, from_, output, context): """Fetch sample data containing features.""" import redbiom.util iterable = redbiom.util.from_or_nargs(from_, features) import redbiom.fetch tab, map_ = redbiom.fetch.data_from_features(context, iterable, exact) import h5py with h5py.File(output, 'w') as fp: tab.to_hdf5(fp, 'redbiom') _write_ambig(map_, output)
def fetch_samples_from_samples(samples, from_, output, context): """Fetch sample data.""" import redbiom.util iterable = redbiom.util.from_or_nargs(from_, samples) import redbiom.fetch table, ambig = redbiom.fetch.data_from_samples(context, iterable) import h5py with h5py.File(output, 'w') as fp: table.to_hdf5(fp, 'redbiom') _write_ambig(ambig, output)
def test_persistfile_exists(self, filename, add_postfix=True): filename = self._get_persist_filename(filename, add_postfix=add_postfix) if os.path.isfile(filename): raise click.UsageError("File '%s' already exists" % filename)
def cli_decrypt(ctx, filename, key_file, passphrase, public_keyfile, verify, keep, progress, leave_progress_bar): """Decrypt file(s) with private key.""" if progress is None: progress = sys.stdout.isatty() if passphrase is None: if progress: passphrase = click.prompt('Passphrase', hide_input=True) else: raise click.ClickException("No passphrase given.") ed_prv = load_private_keyfile(key_file, passphrase) sender_pubkey = None if public_keyfile: sender_pubkey = load_public_keyfile(public_keyfile) for fh in filename: if not fh.name.endswith(".s11"): # TODO: implement output-filename as cli option raise click.UsageError("File '%s' does not end with .s11" % fh.name) output_filename = fh.name[:-4] _decrypt( f=fh, ed_prv=ed_prv, sender_pubkey=sender_pubkey, verify=verify, progress=progress, leave_progress_bar=leave_progress_bar, output_filename=output_filename, ) fh.close() if verify is False and not keep: os.unlink(fh.name)
def unbake(input_file, output_file): """ This command extracts Open Badges data from an image and prints it to a file or the standard output. Positional Arguments: \b Input filename: File must exist. \b Output filename: If file exists, it will be overwritten. """ click.echo('') output_file.write(utils.unbake(input_file)) click.echo('\n')
def status(run, settings, project): if settings: click.echo(click.style("Logged in?", bold=True) + " %s" % bool(api.api_key)) click.echo(click.style("Current Settings", bold=True) + " (%s)" % api.settings_file) settings = api.settings() click.echo(json.dumps( settings, sort_keys=True, indent=2, separators=(',', ': ') )) # project, run = api.parse_slug(run, project=project) # existing = set() # TODO: populate this set with the current files in the run dir # remote = api.download_urls(project, run) # not_synced = set() # remote_names = set([name for name in remote]) # for file in existing: # meta = remote.get(file) # if meta and not api.file_current(file, meta['md5']): # not_synced.add(file) # elif not meta: # not_synced.add(file) # TODO: remove items that exists and have the md5 # only_remote = remote_names.difference(existing) # up_to_date = existing.difference(only_remote).difference(not_synced) # click.echo('File status for ' + click.style('"%s/%s" ' % # (project, run), bold=True)) # if len(not_synced) > 0: # click.echo(click.style('Push needed: ', bold=True) + # click.style(", ".join(not_synced), fg="red")) # if len(only_remote) > 0: # click.echo(click.style('Pull needed: ', bold=True) + # click.style(", ".join(only_remote), fg="red")) # if len(up_to_date) > 0: # click.echo(click.style('Up to date: ', bold=True) + # click.style(", ".join(up_to_date), fg="green")) #@cli.command(context_settings=CONTEXT, help="Store notes for a future training run")
def put(local, remote): """Put a file or folder and its contents on the board. Put will upload a local file or folder to the board. If the file already exists on the board it will be overwritten with no warning! You must pass at least one argument which is the path to the local file/folder to upload. If the item to upload is a folder then it will be copied to the board recursively with its entire child structure. You can pass a second optional argument which is the path and name of the file/folder to put to on the connected board. For example to upload a main.py from the current directory to the board's root run: ampy --port /board/serial/port put main.py Or to upload a board_boot.py from a ./foo subdirectory and save it as boot.py in the board's root run: ampy --port /board/serial/port put ./foo/board_boot.py boot.py To upload a local folder adafruit_library and all of its child files/folders as an item under the board's root run: ampy --port /board/serial/port put adafruit_library Or to put a local folder adafruit_library on the board under the path /lib/adafruit_library on the board run: ampy --port /board/serial/port put adafruit_library /lib/adafruit_library """ # Use the local filename if no remote filename is provided. if remote is None: remote = os.path.basename(os.path.abspath(local)) # Check if path is a folder and do recursive copy of everything inside it. # Otherwise it's a file and should simply be copied over. if os.path.isdir(local): # Directory copy, create the directory and walk all children to copy # over the files. board_files = files.Files(_board) for parent, child_dirs, child_files in os.walk(local): # Create board filesystem absolute path to parent directory. remote_parent = posixpath.normpath(posixpath.join(remote, os.path.relpath(parent, local))) try: # Create remote parent directory. board_files.mkdir(remote_parent) # Loop through all the files and put them on the board too. for filename in child_files: with open(os.path.join(parent, filename), 'rb') as infile: remote_filename = posixpath.join(remote_parent, filename) board_files.put(remote_filename, infile.read()) except files.DirectoryExistsError: # Ignore errors for directories that already exist. pass else: # File copy, open the file and copy its contents to the board. # Put the file on the board. with open(local, 'rb') as infile: board_files = files.Files(_board) board_files.put(remote, infile.read())