我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用click.UsageError()。
def main(qatif, stats, outdir, cloudmask): if not stats and not outdir and not cloudmask: raise click.UsageError( "Specify --stats, --cloudmask MASK, or --outdir DIR") if outdir: stats = True with rasterio.open(qatif) as src: arr = src.read(1) profile = src.profile if cloudmask: write_cloud_mask(arr, profile=profile, cloudmask=cloudmask) if stats: base = os.path.basename(qatif) summary = summary_stats(arr, basename=base, outdir=outdir, profile=profile) click.echo(json.dumps(summary, indent=2)) if outdir: click.echo("QA variables written as uint8 tifs to {}".format(outdir), err=True)
def rename_command(source, destination): """ Executor for `globus rename` """ source_ep, source_path = source dest_ep, dest_path = destination if source_ep != dest_ep: raise click.UsageError(('rename requires that the source and dest ' 'endpoints are the same, {} != {}') .format(source_ep, dest_ep)) endpoint_id = source_ep client = get_client() autoactivate(client, endpoint_id, if_expires_in=60) res = client.operation_rename(endpoint_id, oldpath=source_path, newpath=dest_path) formatted_print(res, text_format=FORMAT_TEXT_RAW, response_key='message')
def endpoint_search(filter_fulltext, filter_owner_id, filter_scope): """ Executor for `globus endpoint search` """ if filter_scope == 'all' and not filter_fulltext: raise click.UsageError( 'When searching all endpoints (--filter-scope=all, the default), ' 'a full-text search filter is required. Other scopes (e.g. ' '--filter-scope=recently-used) may be used without specifying ' 'an additional filter.') client = get_client() owner_id = filter_owner_id if owner_id: owner_id = maybe_lookup_identity_id(owner_id) search_iterator = client.endpoint_search( filter_fulltext=filter_fulltext, filter_scope=filter_scope, filter_owner_id=owner_id) formatted_print(search_iterator, fields=ENDPOINT_LIST_FIELDS, json_converter=iterable_response_to_dict)
def role_create(role, principal, endpoint_id): """ Executor for `globus endpoint role show` """ principal_type, principal_val = principal client = get_client() if principal_type == 'identity': principal_val = maybe_lookup_identity_id(principal_val) if not principal_val: raise click.UsageError( 'Identity does not exist. ' 'Use --provision-identity to auto-provision an identity.') elif principal_type == 'provision-identity': principal_val = maybe_lookup_identity_id(principal_val, provision=True) principal_type = 'identity' role_doc = assemble_generic_doc( 'role', principal_type=principal_type, principal=principal_val, role=role) res = client.add_endpoint_role(endpoint_id, role_doc) formatted_print(res, simple_text='ID: {}'.format(res['id']))
def copy_template(template_path: Path, path: Path, variables: dict): for d in template_path.iterdir(): target_path = path / d.relative_to(template_path) if d.is_dir(): copy_template(d, target_path, variables) elif target_path.exists(): # better not overwrite any existing files! raise click.UsageError('Target file "{}" already exists. Aborting!'.format(target_path)) else: with Action('Writing {}..'.format(target_path)): target_path.parent.mkdir(parents=True, exist_ok=True) with d.open() as fd: contents = fd.read() template = string.Template(contents) contents = template.safe_substitute(variables) with target_path.open('w') as fd: fd.write(contents)
def main(source, out, raw, format, view): """ Generate CRCDiagrams from SOURCE saving they as OUT. \n The default output format is png. \n Example:\n crc-diagram source_file.py output.png """ if os.path.isdir(source): crc_cards = crc_diagram.folder_to_crc(source) else: crc_cards = crc_diagram.to_crc(source) if raw: out = path_to_stream(out or sys.stdout, 'w') json.dump([crc.to_dict() for crc in crc_cards], out, indent=4) else: if out is None: raise click.UsageError('Missing argument "out".') DotRender(crc_cards, format=format).render(out, view=view)
def generate(filename, data_file, output_dir, template_file, skip_first_row): if not os.path.exists(filename): raise click.UsageError("Layout not found: %s\n" % filename) try: module = _import_file(filename) except (ImportError, ValueError) as e: raise click.UsageError("Unable to load %r: %s\n" % (filename, e)) layout_classes = list(_iter_layout_classes(module)) if not layout_classes: raise click.UsageError("No layout found in file: %s\n" % filename) layout_cls = layout_classes.pop() click.secho('Generating documents...', fg='white') layout = layout_cls(data_file, output_dir, template_file, skip_first_row) # Check files/paths layout._check_paths() data = csv.reader(open(layout.data_file)) if layout.skip_first_row: next(data) # TODO: Show info about rows count for row in data: layout.generate_document(row)
def resolve_command(self, ctx, args): """ Override clicks ``resolve_command`` method and appends *Did you mean ...* suggestions to the raised exception message. """ try: return super(AliasedGroup, self).resolve_command(ctx, args) except click.exceptions.UsageError as error: error_msg = str(error) original_cmd_name = click.utils.make_str(args[0]) matches = difflib.get_close_matches( original_cmd_name, self.list_commands(ctx), self.max_suggestions, self.cutoff) if matches: error_msg += '{0}{0}Did you mean one of these?{0} {1}'.format( os.linesep, '{0} '.format(os.linesep).join(matches, )) raise click.exceptions.UsageError(error_msg, error.ctx)
def freeze_app(app, freezer, path, base_url): if not base_url: raise click.UsageError('No base URL provided, use --base-url') print('Generating HTML...') app.config['FREEZER_DESTINATION'] = path app.config['FREEZER_BASE_URL'] = base_url app.config['SERVER_NAME'] = urllib.parse.urlparse(base_url).netloc # make sure Frozen Flask warnings are treated as errors warnings.filterwarnings('error', category=flask_frozen.FrozenFlaskWarning) try: freezer.freeze() except flask_frozen.FrozenFlaskWarning as w: print('Error:', w, file=sys.stderr) sys.exit(1)
def get_dns_name_for_hosted_zone_id(self, hosted_zone_id): """ gets the dns name associated with the 'hosted_zone_id'. ensure: self.hosted_zone_id == hosted_zone_id self.dns_name[-1] == '.' """ assert hosted_zone_id is not None self.hosted_zone_id = hosted_zone_id try: response = self.route53.get_hosted_zone(Id=hosted_zone_id) self.dns_name = response['HostedZone']['Name'] except ClientError as e: raise click.UsageError('%s' % e) log.info('found hosted zone %s for dns name %s' % (self.hosted_zone_id, self.dns_name)) assert self.dns_name[-1] == '.' assert self.hosted_zone_id is not None
def get_hosted_zone_id_for_dns_name(self, dns_name): """ gets the hosted_zone_id associated with the 'dns_name'. require: there is exactly 1 hosted zone for 'dns_name' in Route53. ensure: self.hosted_zone_id == hosted_zone_id self.dns_name[-1] == '.' self.dns_name[-1] == dns_name """ self.dns_name = dns_name if dns_name[-1] == '.' else '%s.' % dns_name response = self.route53.list_hosted_zones_by_name(DNSName=self.dns_name) zones = filter(lambda r: r['Name'] == self.dns_name, response['HostedZones']) if len(zones) == 1: self.hosted_zone_id = zones[0]['Id'].split('/')[-1] elif len(zones) > 1: raise click.UsageError('There are %d hosted zones for the DNS name %s, please specify --hosted-zone-id' % (len(zones), self.dns_name)) else: raise click.UsageError('There is no hosted zones for the DNS name %s' % self.dns_name) assert self.dns_name[-1] == '.' assert self.hosted_zone_id is not None log.info('found dns name %s for hosted zone id %s' % (self.dns_name, self.hosted_zone_id))
def import_(ctx, dirs): for dir in dirs: if not os.path.isdir(dir): raise click.UsageError('<%s> must be a directory' % dir) def do_import(report_name, instances): count = 0 try: for instance in instances: import_report_instance(ctx, report_name, instance) count += 1 finally: echo_info('Imported %d %s instances' % (count, report_name)) def instances(dir): for filename in glob.glob(os.path.join(dir, '*.json')): with open(filename) as f: yield json.loads(f.read()) for dir in dirs: report_name = dir.rstrip('/').split('/')[-1] do_import(report_name, instances(dir))
def jsoncsv(output, input, expand_, restore_, safe, separator): if expand_ and restore_: raise click.UsageError('can not choose both, default is `-e`') func = expand if restore_: func = restore for line in input: obj = json.loads(line) new = func(obj, separator=separator, safe=safe) content = json.dumps(new, ensure_ascii=False).encode('utf-8') output.write(content) output.write('\n') input.close() output.close()
def parent(ctx, input, depth): """Takes a [x, y, z] tile as input and writes its parent to stdout in the same form. $ echo "[486, 332, 10]" | mercantile parent Output: [243, 166, 9] """ src = normalize_input(input) for line in iter_lines(src): tile = json.loads(line)[:3] if tile[2] - depth < 0: raise click.UsageError("Invalid parent level: {0}".format(tile[2] - depth)) for i in range(depth): tile = mercantile.parent(tile) output = json.dumps(tile) click.echo(output)
def run(s, p): if s: server = micro_server("s1", auri=AMQ_URI) @server.service("foobar") def h(a): print a, os.getpid() return {"b": a} server.start_service(2, daemon=False) elif p: if not os.path.isfile(p) or os.path.splitext(p)[1] != '.yaml': raise click.BadParameter( 'the param must be yaml config') w = WORK_FRAME(auri=AMQ_URI, service_group_conf=p) w.frame_start() else: raise click.UsageError( 'Could not find other command. You can run kael run --help to see more information')
def stop(counters, all=False): """ Stop one or more in-progress executions. """ project = get_project(require=True) params = {'project': project.id} if counters and all: raise click.UsageError('Pass either an execution # or `--all`, not both.') elif counters: params['counter'] = sorted(IntegerRange.parse(counters).as_set()) elif all: params['status'] = 'incomplete' else: warn('Nothing to stop (pass #s or `--all`)') return 1 for execution in request('get', '/api/v0/executions/', params=params).json()['results']: click.echo('Stopping #{counter}... '.format(counter=execution['counter']), nl=False) resp = request('post', execution['urls']['stop']) click.echo(resp.text) success('Done.')
def validate_git_specifier(refspec, branch, commit, tag): """ Validate that the set of specifiers given is consistent. The set is valid if: - only a branch is given - only a commit is given - only a tag is given - a refspec and target non-master branch is given :param refspec: provided refspec like 'pull/1/head' :param branch: provided branch like 'master' :param commit: provided commit SHA like '2cbd73cbd5aacc965ecfa480fa90164a85191489' :param tag: provided tag like 'v1.3.0-rc2' """ if commit and (refspec or branch or tag): raise UsageError('If a commit is specified, neither a refspec, branch, or tag can also be specified.') if tag and (commit or refspec or branch): raise UsageError('If a tag is specified, neither a refspec, branch, or commit can also be specified.') if refspec and not branch: raise UsageError('If a refspec is specified, the name of the branch to create for it is required.') if refspec and branch == 'master': raise UsageError('The branch specified for a refspec cannot be the master branch.')
def stop_executions(config, context_id, endpoint, all_contexts): """Stop running executions.""" client = from_config(config, endpoint=endpoint) if not bool(context_id) ^ all_contexts: raise click.UsageError( 'Either specify context id or use --all-contexts') if context_id: contexts = (client.contexts[cid] for cid in context_id) else: contexts = client.contexts for context in contexts: for execution in context.executions: try: click.echo( 'Stopping execution {0.id} on context {1.id} ... '.format( execution, context), nl=False) execution.stop() click.secho('OK', fg='green') except errors.APIError: click.secho('FAIL', fg='red')
def extend(from_region: str, to_region: str, cluster_name: str, ring_size: int, dc_suffix: str, num_tokens: int, instance_type: str, volume_type: str, volume_size: int, volume_iops: int, no_termination_protection: bool, use_dmz: bool, hosted_zone: str, artifact_name: str, docker_image: str, environment: list, sns_topic: str, sns_email: str): if from_region != to_region and not(use_dmz): raise click.UsageError('Extending to a new region requires --use-dmz') extend_cluster(options=locals())
def validate_artifact_version(options: dict) -> dict: conflict_options_msg = """Conflicting options: --artifact-name and --docker-image cannot be specified at the same time""" if not options['docker_image']: if not options['artifact_name']: options['artifact_name'] = 'planb-cassandra-3.0' image_version = get_latest_docker_image_version(options['artifact_name']) docker_image = 'registry.opensource.zalan.do/stups/{}:{}' \ .format(options['artifact_name'], image_version) info('Using docker image: {}'.format(docker_image)) else: if options['artifact_name']: raise click.UsageError(conflict_options_msg) image_version = options['docker_image'].split(':')[-1] docker_image = options['docker_image'] return dict(options, docker_image=docker_image, image_version=image_version)
def backport(self): if not self.branches: raise click.UsageError("At least one branch must be specified.") self.fetch_upstream() for maint_branch in self.sorted_branches: click.echo(f"Now backporting '{self.commit_sha1}' into '{maint_branch}'") cherry_pick_branch = self.get_cherry_pick_branch(maint_branch) self.checkout_branch(maint_branch) commit_message = "" try: self.cherry_pick() commit_message = self.amend_commit_message(cherry_pick_branch) except subprocess.CalledProcessError as cpe: click.echo(cpe.output) click.echo(self.get_exit_message(maint_branch)) except CherryPickException: click.echo(self.get_exit_message(maint_branch)) raise else: if self.push: self.push_to_remote(maint_branch, cherry_pick_branch, commit_message) self.cleanup_branch(cherry_pick_branch) else: click.echo(\ f""" Finished cherry-pick {self.commit_sha1} into {cherry_pick_branch} \U0001F600 --no-push option used. ... Stopping here. To continue and push the changes: $ cherry_picker --continue To abort the cherry-pick and cleanup: $ cherry_picker --abort """)
def do_pylintcmd(load_plugins, rcfile, module, expected, pylint_options): # import pdb; pdb.set_trace() if not module: module = [] candidate_addons_dirs = ( opj('odoo', 'addons'), 'odoo_addons', '.', ) for candidate_addons_dir in candidate_addons_dirs: if os.path.isdir(candidate_addons_dir): module.extend( opj(candidate_addons_dir, addon) for addon in get_installable_addons(candidate_addons_dir) ) if not module: raise click.UsageError("Please provide module or package " "to lint (--module).") cmd = [ '--load-plugins', load_plugins, '--rcfile', rcfile, ] + list(pylint_options) + list(module) log_cmd(['pylint'] + cmd, level=logging.INFO) lint_res = pylint.lint.Run(cmd[:], exit=False) sys.stdout.flush() sys.stderr.flush() expected = _consolidate_expected(rcfile, expected) fails, no_fails = _get_failures(lint_res.linter.stats, expected) if fails or no_fails: msg = cmd_string(['pylint'] + cmd) msg += '\n' msg += _failures_to_str(fails, no_fails) click.echo('\n') click.echo(msg) if fails: raise click.ClickException("pylint errors detected.")
def remove(ctx, service_names, is_all): """ Remove a service from 21 sell \b Removing a service $ 21 sell remove <service_name> \b Removing all services from 21 sell $ 21 sell remove --all """ if not service_names and is_all is False: raise click.UsageError('No service selected.', ctx=ctx) manager = ctx.obj['manager'] logger.info(click.style("Removing services.", fg=cli_helpers.TITLE_COLOR)) def service_successfully_removed_hook(tag): cli_helpers.print_str(tag, ["Removed"], "TRUE", True) def service_does_not_exists_hook(tag): cli_helpers.print_str(tag, ["Doesn't exist"], "FALSE", False) def service_failed_to_remove_hook(tag): cli_helpers.print_str(tag, ["Failed to remove"], "FALSE", False) if is_all: services_to_remove = manager.available_user_services() else: services_to_remove = service_names for service_name in services_to_remove: manager.remove_service(service_name, service_successfully_removed_hook, service_does_not_exists_hook, service_failed_to_remove_hook)
def task_event_list(task_id, limit, filter_errors, filter_non_errors): """ Executor for `globus task-event-list` """ client = get_client() # cannot filter by both errors and non errors if filter_errors and filter_non_errors: raise click.UsageError("Cannot filter by both errors and non errors") elif filter_errors: filter_string = "is_error:1" elif filter_non_errors: filter_string = "is_error:0" else: filter_string = "" event_iterator = client.task_event_list( task_id, num_results=limit, filter=filter_string) formatted_print(event_iterator, fields=(('Time', 'time'), ('Code', 'code'), ('Is Error', 'is_error'), ('Details', 'details')), json_converter=iterable_response_to_dict)
def create_command(principal, permissions, endpoint_plus_path): """ Executor for `globus endpoint permission create` """ if not principal: raise click.UsageError( 'A security principal is required for this command') endpoint_id, path = endpoint_plus_path principal_type, principal_val = principal client = get_client() if principal_type == 'identity': principal_val = maybe_lookup_identity_id(principal_val) if not principal_val: raise click.UsageError( 'Identity does not exist. ' 'Use --provision-identity to auto-provision an identity.') elif principal_type == 'provision-identity': principal_val = maybe_lookup_identity_id(principal_val, provision=True) principal_type = 'identity' rule_data = assemble_generic_doc( 'access', permissions=permissions, principal=principal_val, principal_type=principal_type, path=path) res = client.add_endpoint_acl_rule(endpoint_id, rule_data) formatted_print(res, text_format=FORMAT_TEXT_RECORD, fields=[('Message', 'message'), ('Rule ID', 'access_id')])
def exclusive(ctx_params, exclusive_params, error_message): """ Enables defining mutually exclusive options. https://gist.github.com/thebopshoobop/51c4b6dce31017e797699030e3975dbf :param ctx_params: :param exclusive_params: :param error_message: :return: """ if sum([1 if ctx_params[p] else 0 for p in exclusive_params]) > 1: raise click.UsageError(error_message)
def _globals(command): def proxy(verbose, alwaysprompt, neverprompt, **kwargs): # handle these global options setverbose(verbose) if alwaysprompt: if neverprompt: raise UsageError("--alwaysprompt and --neverprompt options" " cannot be used together") setwantprompt(PROMPT_ALWAYS) elif neverprompt: setwantprompt(PROMPT_NEVER) # pass all other arguments on to the command command(**kwargs) proxy.__name__ = command.__name__ proxy.__doc__ = command.__doc__ proxy = option('-a', '--alwaysprompt', is_flag=True, help="Always prompt the user to answer questions, even" " named questions that they have answered on previous runs" )(proxy) proxy = option('-n', '--neverprompt', is_flag=True, help="Never prompt the user to answer questions. Questions" " will be answered automatically using the user's previous" " answer or the `noprompt` value.")(proxy) proxy = option('-v', '--verbose', 'verbose', is_flag=True, help="Produce extra output")(proxy) return proxy
def _check_paths(self): if not os.path.exists(self.data_file): raise click.UsageError("Data file not found: %s\n" % self.data_file) if self.template_file is not None and not os.path.exists(self.template_file): raise click.UsageError("Template file not found: %s\n" % self.template_file) if os.path.exists(self.output_dir) and not os.access(self.output_dir, os.W_OK): raise click.UsageError("Output is not writable: %s\n" % self.output_dir) if not os.path.exists(self.output_dir): os.makedirs(self.output_dir)
def handle_parse_result(self, ctx, opts, args): if (self.name in opts) and self.mutually_exclusive.intersection(opts): raise click.UsageError('Illegal usage: {0}'.format(self._message)) return super(MutuallyExclusiveOption, self).handle_parse_result(ctx, opts, args)
def main(az, workflow, resume, config): try: c = Config(az=az, workflow=workflow, config_file=config) except ConfigError as e: raise click.UsageError(str(e)) p = Provisioner(c) p.run(resume)
def validate_parent_id(ctx, param, value): if (ctx.params['resource_id'].startswith('sg-') and not value): raise click.UsageError( "Security Group lock status requires --parent-id flag") return value
def check_resp(r): if r.json()['success']: return if r.status_code == 401: raise click.UsageError('API call error: wrong API key') raise click.UsageError('API call error:\n%s' % pformat(r.json()['details']))
def req_api_key(self): if self.api_key is None: raise click.UsageError('"moniqueio --api-key" argument is not specified') return self.api_key
def _check_reserve_usage(empty, memory, cpu, disk): """Checks params constraints for reserve verb.""" if empty: if memory: raise click.UsageError('Cannot combine --empty and --memory') if cpu: raise click.UsageError('Cannot combine --empty and --cpu') if disk: raise click.UsageError('Cannot combine --empty and --disk')
def _check_tenant_exists(restapi, allocation): """Check if tenant exist.""" tenant_url = '/tenant/{}'.format(allocation) # Check if tenant exists. try: restclient.get(restapi, tenant_url).json() except restclient.NotFoundError: raise click.UsageError( 'Allocation not found, ' 'run allocation configure {} --systems ...'.format(allocation))
def init(): """Return top level command handler.""" @click.command() @click.option('--run/--no-run', is_flag=True, default=False) @click.option('--treadmill-id', help='Treadmill admin user.') @click.pass_context def spawn(ctx, treadmill_id, run): """Installs Treadmill spawn.""" ctx.obj['PARAMS']['zookeeper'] = context.GLOBAL.zk.url ctx.obj['PARAMS']['ldap'] = context.GLOBAL.ldap.url dst_dir = ctx.obj['PARAMS']['dir'] profile = ctx.obj['PARAMS'].get('profile') bootstrap.wipe( os.path.join(dst_dir, 'wipe_me'), os.path.join(dst_dir, 'bin', 'wipe_spawn.sh') ) run_script = None if run: run_script = os.path.join(dst_dir, 'bin', 'run.sh') if treadmill_id: ctx.obj['PARAMS']['treadmillid'] = treadmill_id if not ctx.obj['PARAMS'].get('treadmillid'): raise click.UsageError( '--treadmill-id is required, ' 'unable to derive treadmill-id from context.') bootstrap.install( 'spawn', dst_dir, ctx.obj['PARAMS'], run=run_script, profile=profile, ) return spawn
def _blackout_server(zkclient, server, reason): """Blackout server.""" if not reason: raise click.UsageError('--reason is required.') path = z.path.blackedout_server(server) zkutils.ensure_exists( zkclient, path, acl=[zkutils.make_host_acl(server, 'rwcda')], data=str(reason) ) presence.kill_node(zkclient, server)
def handle_parse_result(self, ctx, opts, args): if self.mutually_exclusive.intersection(opts) and \ self.name in opts: raise click.UsageError( "Illegal usage: `{}` is mutually exclusive with " "arguments `{}`.".format( self.name, ', '.join(self.mutually_exclusive) ) ) if self.name == _OPTIONS_FILE and self.name in opts: _file = opts.pop(_OPTIONS_FILE) for _param in ctx.command.params: opts[_param.name] = _param.default or \ _param.value_from_envvar(ctx) or '' with open(_file, 'r') as stream: data = yaml.load(stream) _command_name = ctx.command.name if data.get(_command_name, None): opts.update(data[_command_name]) else: raise click.BadParameter( 'Manifest file should have %s scope' % _command_name ) opts['vpc_id'] = opts.pop('vpc_name') ctx.params = opts return super().handle_parse_result(ctx, opts, args)
def error(self, message): raise click.UsageError(message)
def get_auto_shell(): """Return the shell that is calling this process""" try: import psutil parent = psutil.Process(os.getpid()).parent() if platform.system() == 'Windows': parent = parent.parent() or parent return parent.name().replace('.exe', '') except ImportError: raise click.UsageError("Please explicitly give the shell type or install the psutil package to activate the" " automatic shell detection.")
def ner(**kwargs): kwargs["no_standardize"] = not kwargs["no_standardize"] kwargs["no_convert_ions"] = not kwargs["no_convert_ions"] kwargs["no_header"] = not kwargs["no_header"] kwargs["no_normalize_text"] = not kwargs["no_normalize_text"] kwargs["no_annotation"] = not kwargs["no_annotation"] is_output_file = bool(kwargs["output"]) stdin = click.get_text_stream("stdin") input_text = "" if not stdin.isatty(): kwargs["input_file"] = "" input_text = click.get_text_stream("stdin").read().strip() if not input_text and not kwargs["input_file"]: raise click.UsageError("Cannot perform NER: stdin is empty and input file is not provided.") kwargs["opsin_types"] = get_opsin_types(kwargs["opsin_types"]) init_kwargs = get_kwargs(kwargs, KWARGS_CHS_INIT) process_kwargs = get_kwargs(kwargs, KWARGS_CHS_PROCESS) chemspot = ChemSpot(**init_kwargs) result = chemspot.process(input_text=input_text, **process_kwargs) if kwargs["dry_run"]: print(result) exit(0) if kwargs["raw_output"]: print(result["stdout"]) eprint(result["stderr"]) exit(0) if not is_output_file: print(dict_to_csv(result["content"], csv_delimiter=kwargs["delimiter"], write_header=kwargs["no_header"]))
def convert(**kwargs): kwargs["no_header"] = not kwargs["no_header"] kwargs["no_normalize_plurals"] = not kwargs["no_normalize_plurals"] kwargs["no_standardize"] = not kwargs["no_standardize"] kwargs["opsin_no_allow_acids_without_acid"] = not kwargs["opsin_no_allow_acids_without_acid"] kwargs["opsin_no_detailed_failure_analysis"] = not kwargs["opsin_no_detailed_failure_analysis"] kwargs["opsin_no_allow_radicals"] = not kwargs["opsin_no_allow_radicals"] kwargs["opsin_no_allow_uninterpretable_stereo"] = not kwargs["opsin_no_allow_uninterpretable_stereo"] is_output_file = bool(kwargs["output"]) stdin = click.get_text_stream("stdin") input_text = "" if not stdin.isatty(): kwargs["input_file"] = "" input_text = click.get_text_stream("stdin").read().strip() if not input_text and not kwargs["input_file"]: raise click.UsageError("Cannot do conversion: stdin is empty and input file is not provided.") init_kwargs = get_kwargs(kwargs, KWARGS_OPSIN_INIT) process_kwargs = get_kwargs(kwargs, KWARGS_OPSIN_PROCESS) opsin = OPSIN(**init_kwargs) result = opsin.process(input=input_text, output_formats=["smiles", "inchi", "inchikey"], **process_kwargs) if kwargs["dry_run"]: print(result) exit(0) if kwargs["raw_output"]: print(result["stdout"]) eprint(result["stderr"]) exit(0) if not is_output_file: print(dict_to_csv(result["content"], csv_delimiter=kwargs["delimiter"], write_header=kwargs["no_header"]))
def main(word, word_dist, word_dist_rate, keyboard_name): """ keedi, keyboard usage stats for words Invoke keedi on a word with the --word <word> option or pass words in on stdin. """ if word: words = [word] elif not sys.stdin.isatty(): words = filter(None, (transform_word(w) for w in sys.stdin)) else: click.ClickException("no --word specified or standard input given.") for w in words: word_dist_computation = None keyboard = KEYBOARDS[keyboard_name] wd = None if word_dist: wd = word_distance(w, keyboard) wdr = None if word_dist_rate: try: wdr = word_distance_rate(w, keyboard, precomputation=word_dist_computation) except IncomputableRateException as e: click.UsageError(e).show() if sys.stdin.isatty(): sys.exit(1) else: continue click.echo('\t'.join(item for item in (str(wd), str(wdr), w) if item)) sys.exit(0)
def main(): try: cli.add_command(sync_command) cli.add_command(update_password) cli.add_command(daemon) cli.add_command(edit) cli.add_command(browserhelp) cli(standalone_mode=False) except click.UsageError as e: e.show() exit(1) except (exceptions.Error, click.ClickException) as e: logger.error(e) logger.debug(e, exc_info=True) exit(1)
def init(ctx): """Initializes the database.""" db = ctx.obj.db db_url = db.url # Check if the database already exists if is_sqlitedb_url(db_url) and sqlitedb_present(db_url): raise click.UsageError("Refusing to overwrite database " "at {}".format(db_url)) db.reset() # ------------------------------------------------------------------------- # User commands
def user(ctx): """Subcommand to manage users.""" db = ctx.obj.db db_url = db.url # sqlite driver for sqlalchemy creates an empty file on commit as a side # effect. We don't want this creation to happen, so before attempting # the creation we stop short if we already find out that the file is # missing and cannot possibly be initialized. if is_sqlitedb_url(db_url) and not sqlitedb_present(db_url): raise click.UsageError("Could not find database at {}".format(db_url))
def app(ctx): """Subcommand to manage applications.""" db = ctx.obj.db db_url = db.url if is_sqlitedb_url(db_url) and not sqlitedb_present(db_url): raise click.UsageError("Could not find database at {}".format(db_url))
def cuv(ctx, coverage_fname, exclude, branch): """ Cuv'ner provides ways to visualize your project's coverage data. Everything works on the console and assumes a unicode and 256-color capable terminal. There must be a .coverage file which is loaded for coverage data; it is assumed to be in the top level of your source code checkout. """ if coverage_fname is None: coverage_fname = find_coverage_data('.') # coverage_fname still could be None cfg = Config() ctx.obj = cfg cfg.nice_width = min(80, click.get_terminal_size()[0]) cfg.exclude = exclude cfg.branch = branch if coverage_fname is not None: cfg.data = coverage.Coverage(data_file=coverage_fname) cfg.data.load() else: raise click.UsageError( "No coverage data. Do you have a .coverage file?" )
def url_validation(cls, url): r = re.match("^https?:\/\/[\w\-\.]+\.[a-z]{2,6}\.?(\/[\w\.]*)*\/?$", url) if r is None: raise click.UsageError('Please, type valid URL') return url
def test_persistfile_exists(self, filename, add_postfix=True): filename = self._get_persist_filename(filename, add_postfix=add_postfix) if os.path.isfile(filename): raise click.UsageError("File '%s' already exists" % filename)