我们从Python开源项目中,提取了以下30个代码示例,用于说明如何使用click.open_file()。
def do_start_build_stuff(ctx): config = ctx.obj.config solution_num = ctx.obj.solution_num try: file = click.open_file("run_hls.tcl","w") file.write("open_project " + config["project_name"] + "\n") file.write("set_top " + config["top_level_function_name"] + "\n") for src_file in config["src_files"]: file.write("add_files " + config["src_dir_name"] + "/" + src_file + "\n") for tb_file in config["tb_files"]: file.write("add_files -tb " + config["tb_dir_name"] + "/" + tb_file + "\n") if ctx.params['keep']: file.write("open_solution -reset \"solution" + str(solution_num) + "\"" + "\n") else: file.write("open_solution \"solution" + str(solution_num) + "\"" + "\n") file.write("set_part \{" + config["part_name"] + "\}" + "\n") file.write("create_clock -period " + config["clock_period"] + " -name default" + "\n") return file except OSError: click.echo("Woah! Couldn't create a Tcl run file in the current folder!") raise click.Abort() # Function to write a default build into the HLS Tcl build script.
def synthesize(access_key, secret_key, output_file, voice_name, voice_language, codec, text): """Synthesize passed text and save it as an audio file""" try: ivona_api = IvonaAPI( access_key, secret_key, voice_name=voice_name, language=voice_language, codec=codec, ) except (ValueError, IvonaAPIException) as e: raise click.ClickException("Something went wrong: {}".format(repr(e))) with click.open_file(output_file, 'wb') as file: ivona_api.text_to_speech(text, file) click.secho( "File successfully saved as '{}'".format(output_file), fg='green', )
def get_blocks_fast(start, end, chunksize, max_workers, url): """Request blocks from steemd in JSON format""" rpc = SimpleSteemAPIClient(url) if end == 0: end = rpc.last_irreversible_block_num() with click.open_file('-', 'w', encoding='utf8') as f: blocks = _get_blocks_fast( start=start, end=end, chunksize=chunksize, max_workers=max_workers, rpc=rpc, url=url) json_blocks = map(json.dumps, blocks) for block in json_blocks: click.echo(block.encode('utf8'), file=f) # pylint: disable=too-many-arguments
def load_blocks_from_checkpoints(checkpoints_path, start, end): """Load blocks from checkpoints""" checkpoint_set = sbds.checkpoints.required_checkpoints_for_range( path=checkpoints_path, start=start, end=end) total_blocks_to_load = end - start with fileinput.FileInput( mode='r', files=checkpoint_set.checkpoint_paths, openhook=checkpoint_opener_wrapper(encoding='utf8')) as blocks: blocks = toolz.itertoolz.drop(checkpoint_set.initial_checkpoint_offset, blocks) if total_blocks_to_load > 0: with click.open_file('-', 'w', encoding='utf8') as f: for i, block in enumerate(blocks, 1): click.echo(block.strip().encode('utf8'), file=f) if i == total_blocks_to_load: break else: with click.open_file('-', 'w', encoding='utf8') as f: for block in blocks: click.echo(block.strip().encode('utf8'), file=f)
def report(self) -> t.Optional[str]: """ Create an report and output it as configured. :param with_tester_results: include the hypothesis tester results :param to_string: return the report as a string and don't output it? :return: the report string if ``to_string == True`` """ if not self.misc["out"] == "-" and not os.path.exists(os.path.dirname(self.misc["out"])): logging.error("Folder for report ({}) doesn't exist".format(os.path.dirname(self.misc["out"]))) exit(1) with click.open_file(self.misc["out"], mode='w') as f: import tablib data = tablib.Dataset(self.misc["columns"]) for row in self._table(): data.append(row) f.write(data.csv)
def find_last_article(content_dir): articles = list(sorted(glob(os.path.join(content_dir, '*.md')))) if not articles: return None filename = articles[-1] date = os.path.basename(filename)[0:10] candidates = [] for filename in articles: if os.path.basename(filename)[0:10] == date: candidates.append(filename) if len(candidates) == 1: return candidates[0] to_sort = [] for filename in candidates: with click.open_file(filename) as f: match = re.search(r'Date: (.+)', f.read()) creation_date = match.group(1) to_sort.append((creation_date, filename)) return list(sorted(to_sort))[-1][1]
def load_json(ctx, filename): if filename is None: if sys.stdin.isatty(): click.echo(ctx.get_usage()) click.echo("Try `jsoncut --help' for more information.") sys.exit(0) else: filename = '-' try: with click.open_file(filename) as file_: return json.load(file_) except EnvironmentError as e: if not sys.stdin.isatty(): sys.stdin.read() click.echo(exc.default_error_mesg_fmt(e), err=True) sys.exit(1) except json.JSONDecodeError as e: click.echo(exc.default_error_mesg_fmt(e), err=True) sys.exit(1)
def prefixlist(ctx, as_set, output, output_format, proto, **kwargs): """ get prefix list for specified as-sets """ if kwargs.get('debug', False): logging.basicConfig(level=logging.DEBUG) if kwargs.get('fancy', 0): prefixes = PrefixSet(aggregate=kwargs['aggregate']) else: prefixes = PrefixList() bgpfuc = bgpfu.client.IRRClient() with bgpfuc as c: if kwargs.get('sources', False): c.set_sources(kwargs['sources']) # prefixes = c.get_prefixes(as_set, proto) for chunk in c.iter_prefixes(as_set, proto): prefixes.iter_add(chunk) if not kwargs['fancy'] and kwargs['aggregate']: prefixes = prefixes.aggregate() with click.open_file(output, 'w') as fobj: outfmt.write(output_format, fobj, prefixes.str_list()) print("LEN {}".format(len(prefixes)))
def create_config(): '''Create the config file with default close statuses and an empty github auth_token''' CONFIG['close_status'] = {'Invalid': ['invalid', 'wontfix', 'worksforme'], 'Insufficient data': ['insufficient_info'], 'Duplicate': ['duplicate']} CONFIG['github'] = {'auth_token': ''} if os.path.exists(CFG_PATH): if click.confirm('You already have a config file, if you continue ' 'your custom settings will be lost'): with click.open_file(CFG_PATH, 'w+') as config_file: CONFIG.write(config_file) else: sys.exit(1) else: with click.open_file(CFG_PATH, 'w+') as config_file: CONFIG.write(config_file)
def cli(gh, path, method, data, header, do_paginate): """ Make an arbitrary API request """ extra_headers = {} for h in header: name, value = re.split(r'\s*:\s*', h, maxsplit=1) extra_headers[name] = value if data is not None and len(data) > 1 and data[0] == '@': with click.open_file(data[1:]) as fp: data = fp.read() r = gh[path][method](decode=False, data=data, headers=extra_headers) if not r.ok: die(r) elif do_paginate and method.lower() == 'get' and 'next' in r.links: for q in gh.paginate(r): print_json(q) else: echo_response(r)
def check_for_syn_results(proj_name, solution_num, top_level_function_name): return_val = False try: with click.open_file(proj_name + "/solution" + str(solution_num) + "/syn/report/" + top_level_function_name + "_csynth.rpt"): return_val = True except OSError: pass return return_val # Function to check is C synthesis is going to be required but may have been forgorgotten by the user.
def get_vars_from_file(filename): try: with click.open_file(filename) as f: config = imp.load_source('config', '', f) return config except OSError: click.echo("Error: No hls_config.py found, please create a config file for your project. For an example config file please see the 'examples' folder within the hlsclt install directory.") raise click.Abort() # Funtion to parse a loaded config structure and overwrite the config dictionary defaults.
def stream_blocks(url, block_nums, start, end): """Stream blocks from steemd in JSON format \b Which Steemd: \b 1. CLI "--url" option if provided 2. ENV var "STEEMD_HTTP_URL" if provided 3. Default: "https://steemd.steemitdev.com" \b Which Blocks To Output: \b - Stream blocks beginning with current block by omitting --start, --end, and BLOCKS - Fetch a range of blocks using --start and/or --end - Fetch list of blocks by passing BLOCKS a JSON array of block numbers (either filename or "-" for STDIN) Where To Output Blocks: \b 2. ENV var "BLOCKS_OUT" if provided 3. Default: STDOUT """ # Setup steemd source rpc = SimpleSteemAPIClient(url) with click.open_file('-', 'w', encoding='utf8') as f: if block_nums: block_nums = json.load(block_nums) blocks = _stream_blocks(rpc, block_nums) elif start and end: blocks = _stream_blocks(rpc, range(start, end)) else: blocks = rpc.stream(start) json_blocks = map(json.dumps, blocks) for block in json_blocks: click.echo(block, file=f)
def _cli(input, output): from catex.core import merge text = merge(*input).__repr__() if output[-4:] == '.pdf': from latex import build_pdf pdf = build_pdf(text) pdf.save_to(output) else: file_out = click.open_file(output) file_out.write(text)
def from_file(filename): f = open_file(filename, 'r', encoding='utf8') lines = [line.replace('\n', '').strip() for line in f] return LaTeX(lines)
def read_file(path): with open_file(path, 'r', encoding='utf8') as f: return ''.join(f.readlines())
def write(context): """Starts a new article""" config = context.obj title = click.prompt('Title') author = click.prompt('Author', default=config.get('DEFAULT_AUTHOR')) slug = slugify(title) creation_date = datetime.now() basename = '{:%Y-%m-%d}_{}.md'.format(creation_date, slug) meta = ( ('Title', title), ('Date', '{:%Y-%m-%d %H:%M}:00'.format(creation_date)), ('Modified', '{:%Y-%m-%d %H:%M}:00'.format(creation_date)), ('Author', author), ) file_content = '' for key, value in meta: file_content += '{}: {}\n'.format(key, value) file_content += '\n\n' file_content += 'Text...\n\n' file_content += '![image description]({filename}/images/my-photo.jpg)\n\n' file_content += 'Text...\n\n' os.makedirs(config['CONTENT_DIR'], exist_ok=True) path = os.path.join(config['CONTENT_DIR'], basename) with click.open_file(path, 'w') as f: f.write(file_content) click.echo(path) click.launch(path)
def texts2json(ids, names, field, text_docs): '''Convert a set of text documents into a JSON array of document objects.''' docs = [] names = read_names(names) ids = read_names(ids) for idx, path in enumerate(text_docs): tokens_doc = open(path, 'r') content = "" with click.open_file(path): content = tokens_doc.read() # ordered so that these attributes stay at the top doc = OrderedDict() if idx < len(ids) - 1: doc['id'] = ids[idx] else: doc['id'] = path if idx < len(names) - 1: doc['name'] = names[idx] else: doc['name'] = path doc[field] = content docs.append(doc) tokens_doc.close() out_content = json.dumps(docs, indent=2) output(out_content)
def batchgetlinks(ctx, finput): ''' Invokes getlinks command for a list of products/releases defined in a YAML file passed as argument ''' click.echo('\n ####### BATCH GET LINKS #######') with click.open_file(finput, 'r') as f: products = yaml.load(f) # load getlinks product/rels file # create a 'docs' dir which will hold enclosed dirs with docs per product if not os.path.isdir('docs'): os.mkdir('docs') os.chdir('docs') for product in products: if not os.path.isdir(product): os.mkdir(product) os.chdir(product) # pprint(os.getcwd()) for release in products[product]['releases']: ctx.obj['PRODUCT'] = product # pprint(product + " " + release) if release is None: release = '' ctx.invoke(getlinks, product=product, release=release) os.chdir('..')
def normalize_input(input): """Normalize file or string input.""" try: src = click.open_file(input).readlines() except IOError: src = [input] return src
def init(ctx, directory): "Initialize new configuration directory." from sentry.runner.settings import discover_configs, generate_settings if directory is not None: ctx.obj['config'] = directory directory, py, yaml = discover_configs(ctx) # In this case, the config is pointing directly to a file, so we # must maintain old behavior, and just abort if yaml is None and os.path.isfile(py): # TODO: Link to docs explaining about new behavior of SENTRY_CONF? raise click.ClickException("Found legacy '%s' file, so aborting." % click.format_filename(py)) if yaml is None: raise click.ClickException("DIRECTORY must not be a file.") if directory and not os.path.exists(directory): os.makedirs(directory) py_contents, yaml_contents = generate_settings() if os.path.isfile(yaml): click.confirm("File already exists at '%s', overwrite?" % click.format_filename(yaml), abort=True) with click.open_file(yaml, 'wb') as fp: fp.write(yaml_contents) if os.path.isfile(py): click.confirm("File already exists at '%s', overwrite?" % click.format_filename(py), abort=True) with click.open_file(py, 'wb') as fp: fp.write(py_contents)
def get_auth_token(github): ''' Checks the .pgimport file for github authentication key, if it is not present, creates it. ''' if os.path.exists(CFG_PATH): CONFIG.read(CFG_PATH) otp_auth = CONFIG['github']['auth_token'] if not otp_auth: otp_auth = create_auth_token(github) CONFIG['github']['auth_token'] = otp_auth.token with click.open_file(CFG_PATH, 'w+') as config_file: CONFIG.write(config_file) return otp_auth
def pull(ctx, filename, **kwargs): """ pull config from a device """ # set filename before kwargs get mangled if filename != '-': filename = filename.format(**kwargs) filename = os.path.join(kwargs['output_dir'], filename) update_context(ctx, kwargs) dev = ctx.connect(kwargs) config = dev.pull() with click.open_file(filename, 'w') as fobj: fobj.write(config)
def _compile_report(self): """ Builds the report csv. """ with click.open_file(config.outfile, "w") as f: csv_writer = csv.writer(f) csv_writer.writerow(['index', 'type', 'message']) for idx, error in enumerate(self.errors): csv_writer.writerow([idx, error.type, error.message]) return
def password_prompt(ctx, param, value): """Prompt for password if ``--password-stdin`` is not used.""" if ctx.resilient_parsing: return if not value: if 'password_stdin' in ctx.params: with click.open_file('-') as fp: value = fp.read().strip('\n') else: value = click.prompt('Password', hide_input=True) click.echo(value) return value
def gather_project_status(ctx): config = ctx.obj.config solution_num = ctx.obj.solution_num project_status = [] # Pull details from csim report try: with click.open_file(config["project_name"] + "/solution" + str(solution_num) + "/csim/report/" + config["top_level_function_name"] + "_csim.log","r") as f: # Pass/Fail info is always in the second last line of the csim report status_line = f.readlines()[-2] if "0 errors" in status_line.lower(): project_status.append("csim_pass") elif "fail" in status_line.lower(): project_status.append("csim_fail") else: project_status.append("csim_done") f.close() except OSError: pass # Pull setails from csynth report if os.path.isfile(config["project_name"] + "/solution" + str(solution_num) + "/syn/report/" + config["top_level_function_name"] + "_csynth.rpt"): project_status.append('syn_done') # Pull details from cosim report try: with click.open_file(config["project_name"] + "/solution" + str(solution_num) + "/sim/report/" + config["top_level_function_name"] + "_cosim.rpt","r") as f: # search through cosim report to find out pass/fail status for each language for line in f: if config["language"] in line.lower(): if "pass" in line.lower(): project_status.append('cosim_pass') elif "fail" in line.lower(): project_status.append('cosim_fail') project_status.append('cosim_done') f.close() except OSError: pass # Pull details from implementation directory, first the presence of an export... if os.path.isdir(config["project_name"] + "/solution" + str(solution_num) + "/impl/ip"): project_status.append('export_ip_done') if os.path.isdir(config["project_name"] + "/solution" + str(solution_num) + "/impl/sysgen"): project_status.append('export_sysgen_done') # ... then the presence of a Vivado evaluate run if os.path.isfile(config["project_name"] + "/solution" + str(solution_num) + "/impl/report/" + config["language"] + "/" + config["top_level_function_name"] + "_export.rpt"): project_status.append('evaluate_done') return project_status # Function for printing out the project status
def do_checklog(filename, ignore, echo): ignore = [i for i in ignore if not i.startswith('#')] _logger.debug("ignored regular expressions:\n%s", '\n'.join(ignore)) ignore_regexes = [re.compile(i, re.MULTILINE) for i in ignore] if echo is None and filename == '-': echo = True with click.open_file(filename) as logfile: cur_rec_mo = None cur_rec = [] error_records = [] ignored_error_records = [] def _process_cur_rec(): # record start, process current record if cur_rec_mo and \ cur_rec_mo.group('loglevel') not in NON_ERROR_LEVELS: record = ''.join(cur_rec) for ignore_regex in ignore_regexes: if ignore_regex.search(record): ignored_error_records.append(record) break else: error_records.append(record) reccount = 0 for line in logfile: if echo: click.echo(line, nl=False, color=True) sys.stdout.flush() line = ANSI_CSI_RE.sub('', line) # strip ANSI colors mo = LOG_START_RE.match(line) if mo: reccount += 1 _process_cur_rec() cur_rec_mo = mo cur_rec = [line] else: cur_rec.append(line) _process_cur_rec() # last record if not reccount: raise click.ClickException("No Odoo log record found in input.") if error_records or ignored_error_records: msg = _render_errors(error_records, ignored_error_records) click.echo(msg) if error_records: raise click.ClickException("Errors detected in log.")
def report(self, with_tester_results: bool = True, to_string: bool = False) -> t.Optional[str]: """ Create an report and output it as configured. :param with_tester_results: include the hypothesis tester results :param to_string: return the report as a string and don't output it? :return: the report string if ``to_string == True`` """ output = [""] def string_printer(line: str, **args): output[0] += str(line) + "\n" print_func = string_printer if to_string else print with click.open_file(self.misc["out"], mode='w') as f: for block in self.stats_helper.valid_runs(): assert isinstance(block, RunData) print_func("{descr:<20} ({num:>5} single benchmarks)" .format(descr=block.description(), num=len(block.data[block.properties[0]])), file=f) for prop in sorted(block.properties): mean = np.mean(block[prop]) stdev = np.std(block[prop]) / mean mean_str = str(FNumber(mean, rel_deviation=stdev)) print_func("\t {prop:<18} mean = {mean:>15s}, deviation = {dev:>5.2%}".format( prop=prop, mean=mean_str, dev=stdev, dev_perc=stdev/mean ), file=f) if with_tester_results: self._report_list("Equal program blocks", self.stats_helper.get_evaluation(with_equal=True, with_uncertain=False, with_unequal=False), f, print_func) self._report_list("Unequal program blocks", self.stats_helper.get_evaluation(with_equal=False, with_uncertain=False, with_unequal=True), f, print_func) self._report_list("Uncertain program blocks", self.stats_helper.get_evaluation(with_equal=False, with_uncertain=True, with_unequal=False), f, print_func) if to_string: return output[0]
def photos(context, path): """Adds images to the last article""" config = context.obj header('Looking for the latest article...') article_filename = find_last_article(config['CONTENT_DIR']) if not article_filename: return click.secho('No articles.', fg='red') click.echo(os.path.basename(article_filename)) header('Looking for images...') images = list(find_images(path)) if not images: return click.secho('Found no images.', fg='red') for filename in images: click.secho(filename, fg='green') if not click.confirm('\nAdd these images to the latest article'): abort(config) url_prefix = os.path.join('{filename}', IMAGES_PATH) images_dir = os.path.join(config['CONTENT_DIR'], IMAGES_PATH) os.makedirs(images_dir, exist_ok=True) header('Processing images...') urls = [] for filename in images: image_basename = os.path.basename(filename).replace(' ', '-').lower() urls.append(os.path.join(url_prefix, image_basename)) image_filename = os.path.join(images_dir, image_basename) print(filename, image_filename) import_image(filename, image_filename) content = '\n' for url in urls: url = url.replace('\\', '/') content += '\n![image description]({})\n'.format(url) header('Adding to article: {}'.format(article_filename)) with click.open_file(article_filename, 'a') as f: f.write(content) click.launch(article_filename)
def cmd(ctx, message, **kwargs): """The *load* command reads records or payloads from disk files or standard input. The files to read are searched with the pattern specified by PATH which in the simplest form is an existing file name. Other forms may include '*', '?' and character ranges expressed by '[]'. A single '-' may be used to read from standard input. Note that patterns containg wildcards may need to be escaped to avoid shell expansion. The default mode of operation is to load files containing NDEF records. In '--pack' mode the files are loaded into the payload of NDEF records with record type (NDEF Record TNF and TYPE) set to the mimetype discovered from the payload and record name (NDEF Record ID) set to the filename. \b Examples: ndeftool load message.ndef print ndeftool load '*.ndef' print cat message.ndef | ndeftool load - print ndeftool load --pack /etc/hostname print ndeftool load --pack '/etc/cron.daily/*' print """ dmsg(__name__ + ' ' + str(kwargs)) if message is None: message = [] if kwargs['path'] == '-': filenames = kwargs['path'] else: filenames = sorted(glob.iglob(kwargs['path'])) if len(filenames) == 0: info("No files selected by path '%s'." % kwargs['path']) for filename in filenames: try: f = click.open_file(filename, 'rb') except (OSError, IOError) as error: warn(str(error)) else: if kwargs['pack']: message.append(pack_file(f)) else: message.extend(load_file(f, ctx.meta['decode-errors'])) return message