我们从Python开源项目中,提取了以下16个代码示例,用于说明如何使用click.BadOptionUsage()。
def cli(*args, **kwargs): """ CSVtoTable commandline utility. """ # Convert CSV file content = convert.convert(kwargs["input_file"], **kwargs) # Serve the temporary file in browser. if kwargs["serve"]: convert.serve(content) # Write to output file elif kwargs["output_file"]: # Check if file can be overwrite if (not kwargs["overwrite"] and not prompt_overwrite(kwargs["output_file"])): raise click.Abort() convert.save(kwargs["output_file"], content) click.secho("File converted successfully: {}".format( kwargs["output_file"]), fg="green") else: # If its not server and output file is missing then raise error raise click.BadOptionUsage("Missing argument \"output_file\".")
def onlycopy(ctx, srcdir, srcfile, checkdir, relpath, failonerror, include_regex, include_src_regex, include_dst_regex): """ onlycopy command prints list of all the files that aren't in the srcdir """ if not (srcdir or srcfile): raise click.BadOptionUsage("The regex didn't compile. For correct usage" " see: https://docs.python.org/2/library/re.html") # validate the regex options check_regex(include_regex) if include_regex is not None: include_src_regex = include_regex include_dst_regex = include_regex check_regex(include_src_regex) check_regex(include_dst_regex) if relpath: basepath = os.getcwd() for i in onlycopy_yield(srcdir, checkdir, failonerror, src_regex=include_src_regex, dst_regex=include_dst_regex, srcfile=srcfile): click.echo(os.path.relpath(i, basepath)) else: for i in onlycopy_yield(srcdir, checkdir, failonerror, src_regex=include_src_regex, dst_regex=include_dst_regex, srcfile=srcfile): click.echo(i)
def debug_type(s): # I am certain there's a better way to get click to do this... global DEBUGS debugs = [sp.strip() for sp in re.split(r"[ ,;]+", s)] debugs = [d for d in debugs if d] for d in debugs: if d not in VALID_DEBUGS: raise click.BadOptionUsage(f"--debug={d}?? Choose from {', '.join(VALID_DEBUGS)}") DEBUGS = debugs return debugs
def required_exactly_one_of(*arg_names): def decorator(fn): @wraps(fn) def wrapper(*args, **kwargs): n = sum(1 for arg_name in arg_names if kwargs.get(arg_name, None) is not None) if n != 1: raise click.BadOptionUsage('Please specify exactly one of %s ' % ', '.join(arg_names)) return fn(*args, **kwargs) return wrapper return decorator
def clip(bounds, reference, **kwargs): if not bounds and not reference: raise click.BadOptionUsage("One of --bounds or --reference must be supplied.") if not bounds: bounds = spatial.import_bounds(reference) elevation.clip(bounds, **kwargs)
def channel_mask(channels): mask = 0 for channel in channels: if not (11 <= channel <= 26): raise click.BadOptionUsage("channels must be from 11 to 26") mask |= 1 << channel return mask
def separator_type(sep): import click if len(sep) > 1: raise click.BadOptionUsage('separator can only be a char') if sep == unit_char: raise click.BadOptionUsage('separator can not be `\\` ') return sep
def check_regex(regexstring): """ method to check that the regex works""" if regexstring is not None: try: compiledregex = re.compile(regexstring, flags=re.IGNORECASE) # result = compiledregex.match(string) except: raise click.BadOptionUsage("The regex didn't compile. For correct usage" " see: https://docs.python.org/2/library/re.html")
def revoke(ctx, image, user, revoke_all, allow_home, allow_view, volume): """Revokes access to application identified by IMAGE to a specific user USER and specified parameters.""" allow_common = False source = target = mode = None if volume is not None: allow_common = True try: source, target, mode = parse_volume_string(volume) except ValueError as e: raise click.BadOptionUsage("volume", str(e)) session = ctx.obj.session with orm.transaction(session): orm_app = session.query(orm.Application).filter( orm.Application.image == image).one() orm_user = session.query(orm.User).filter( orm.User.name == user).one() if revoke_all: session.query(orm.Accounting).filter( orm.Accounting.application == orm_app, orm.Accounting.user == orm_user, ).delete() else: orm_policy = session.query(orm.ApplicationPolicy).filter( orm.ApplicationPolicy.allow_home == allow_home, orm.ApplicationPolicy.allow_common == allow_common, orm.ApplicationPolicy.allow_view == allow_view, orm.ApplicationPolicy.volume_source == source, orm.ApplicationPolicy.volume_target == target, orm.ApplicationPolicy.volume_mode == mode).one() session.query(orm.Accounting).filter( orm.Accounting.application == orm_app, orm.Accounting.user == orm_user, orm.Accounting.application_policy == orm_policy, ).delete()
def audit(anchore_config, ctx, image, imagefile, include_allanchore): """ Image IDs can be specified as hash ids, repo names (e.g. centos), or tags (e.g. centos:latest). """ global config, imagelist, nav ecode = 0 success = True config = anchore_config #include_allanchore = True if image and imagefile: raise click.BadOptionUsage('Can only use one of --image, --imagefile') #if image or imagefile: # include_allanchore = False try: imagedict = build_image_list(anchore_config, image, imagefile, not (image or imagefile), include_allanchore) imagelist = imagedict.keys() try: ret = anchore_utils.discover_imageIds(imagelist) except ValueError as err: raise err else: imagelist = ret except Exception as err: anchore_print_err("could not load input images") sys.exit(1)
def get_command(self, ctx, name): config_file = ctx.params.get('config') try: conf = config.configure(path=config_file) except ex.ApimasException as e: raise click.BadOptionUsage(str(e)) root_url, spec = conf['root'], conf['spec'] cli = _construct_cli(root_url, spec) if name is None: return cli.get_base_command() return cli.endpoint_groups.get(name)
def main(paths, pipelines, pipelines_skip, pipeline_tags, infra_provider, live_log, all_tests, cluster_debug, junit_xml, test): if bool(all_tests) == bool(test): raise click.BadOptionUsage( 'Should specify either --test NAME or --all-tests') if bool(pipelines) and bool(pipelines_skip): raise click.BadOptionUsage( 'Can not use both --pipelines and --pipelines-skip') if cluster_debug and (infra_provider == 'aws'): # FIXME in AC-5210 raise click.BadOptionUsage( 'AWS does not have an automatic cleaner thus debug is forbidden.' ) with closing(multilogger.init_handler(logger, live_log)) as multilog: discovered = discover_integration_tests( paths or [INTEGRATION_TESTS_PATH]) print_msg(u'Discovered tests in:\n{}\n'.format('\n'.join(discovered))) filtered = _filter_by_infra_provider(registered_pipelines, infra_provider) if not pipelines: # Take effect only if pipelines to run are not set explicitly. filtered = _filter_by_pipeline_tags(filtered, pipeline_tags) filtered = _filter_by_include_exclude(filtered, pipelines, pipelines_skip) filtered = _filter_by_test_name(filtered, test) if os.environ.get('BUILD_CLUSTER') != '1' and len(filtered) > 1: sys.exit('Can not run multiple pipelines without BUILD_CLUSTER') slots = infra_provider_slots[infra_provider] print_msg(u'Requested pipelines: {}'.format(len(filtered))) print_msg(u'Infra provider "{}" slots: {}'.format(infra_provider, slots)) q_len = int(math.ceil(len(filtered) / float(slots))) if q_len > 1: print_msg(u'Pipelines will be queued into slots.') print_msg(u'Estimated longest queue: {} pipelines'.format(q_len)) with ThreadPoolExecutor(max_workers=slots) as executor: for pipe, tests in filtered.items(): full_name = u'{}_{}'.format(*pipe) executor.submit(run_tests_in_a_pipeline, full_name, tests, cluster_debug) _print_logs(multilog, live_log) if junit_xml: write_junit_xml(junit_xml, test_results) if test_results.has_any_failures(): sys.exit(1)
def __determine_device_id(ctx_params): # All attached devices devices = Adb.get_devices_as_list() if not devices: raise click.ClickException("No devices attached") device_param = ctx_params["device"] index_param = ctx_params["index"] if device_param is not None and index_param is not None: # Only one of the two options can be passed raise click.BadOptionUsage( "Only one of the options '--device' or '--index' is allowed") elif device_param is not None: # Only return the device id when the device is attached if __is_device_id_attached(devices, device_param): return device_param else: raise click.ClickException( "The device '%s' isn't attached" % device_param) elif index_param is not None: # Get the device id from the index # Only return the device id when the device is attached try: return devices[index_param][0] except IndexError: raise click.ClickException( "No device with the index '%s' available" % index_param) # When no option is provided, then read the local and global config device_id = Config.read_value(Config.SECTION_DEVICE, Config.KEY_DEFAULT) if device_id and __is_device_id_attached(devices, device_id): # Only return the device id when the device is attached return device_id if len(devices) == 1: # Last resort: Return the only attached device return devices[0][0] else: raise click.ClickException("Can't determine the best matching device")
def config(ctx, config, all_): """Get/set configuration on the NCP""" click.secho( "NOTE: Configuration changes do not persist across resets", fg='red' ) if config and all_: raise click.BadOptionUsage("Specify a config or --all, not both") if not (config or all_): raise click.BadOptionUsage("One of config or --all must be specified") s = yield from util.setup(ctx.obj['device'], ctx.obj['baudrate'], util.print_cb) if all_: for config in t.EzspConfigId: v = yield from s.getConfigurationValue(config) if v[0] == t.EzspStatus.ERROR_INVALID_ID: continue click.echo("%s=%s" % (config.name, v[1])) s.close() return if '=' in config: config, value = config.split("=", 1) if config.isdigit(): try: config = t.EzspConfigId(int(config)) except ValueError: raise click.BadArgumentUsage("Invalid config ID: %s" % ( config, )) else: try: config = t.EzspConfigId[config] except KeyError: raise click.BadArgumentUsage("Invalid config name: %s" % ( config, )) try: value = t.uint16_t(value) if not (0 <= value <= 65535): raise ValueError("%s out of allowed range 0..65535" % ( value, )) except ValueError as e: raise click.BadArgumentUsage("Invalid value: %s" % (e, )) v = yield from s.setConfigurationValue(config, value) click.echo(v) s.close() return v = yield from s.getConfigurationValue(config) click.echo(v)
def query(anchore_config, image, imagefile, include_allanchore, module): """ Image IDs can be specified as hash ids, repo names (e.g. centos), or tags (e.g. centos:latest). Execute the specified query (module) with any parameters it requires. Modules are scripts in a specific location. Each query has its own parameters and outputs. Examples using pre-defined queries: 'anchore query --image nginx:latest list-packages all' 'anchore query has-package wget' 'anchore query --image nginx:latest list-files-detail all' 'anchore query cve-scan all' """ global config, imagelist, nav ecode = 0 success = True config = anchore_config if module: if image and imagefile: raise click.BadOptionUsage('Can only use one of --image, --imagefile') try: imagedict = build_image_list(anchore_config, image, imagefile, not (image or imagefile), include_allanchore) imagelist = imagedict.keys() try: ret = anchore_utils.discover_imageIds(imagelist) except ValueError as err: raise err else: #imagelist = ret.keys() imagelist = ret except Exception as err: anchore_print_err("could not load input images") sys.exit(1) try: nav = init_nav_contexts() result = nav.run_query(list(module)) if result: anchore_utils.print_result(config, result) if nav.check_for_warnings(result): ecode = 2 except: anchore_print_err("query operation failed") ecode = 1 contexts['anchore_allimages'].clear() sys.exit(ecode)
def build_image_list(config, image, imagefile, all_local, include_allanchore, dockerfile=None, exclude_file=None): """Given option inputs from the cli, construct a list of image ids. Includes all found with no exclusion logic""" if not image and not (imagefile or all_local): raise click.BadOptionUsage('No input found for image source. One of <image>, <imagefile>, or <all> must be specified') if image and imagefile: raise click.BadOptionUsage('Only one of <image> and <imagefile> can be specified') filter_images = [] if exclude_file: with open(exclude_file) as f: for line in f.readlines(): filter_images.append(line.strip()) imagelist = {} if image: imagelist[image] = {'dockerfile':dockerfile} if imagefile: filelist = anchore_utils.read_kvfile_tolist(imagefile) for i in range(len(filelist)): l = filelist[i] imageId = l[0] try: dfile = l[1] except: dfile = None imagelist[imageId] = {'dockerfile':dfile} if all_local: docker_cli = contexts['docker_cli'] if docker_cli: for f in docker_cli.images(all=True, quiet=True, filters={'dangling': False}): if f not in imagelist and f not in filter_images: imagelist[f] = {'dockerfile':None} else: raise Exception("Could not load any images from local docker host - is docker running?") if include_allanchore: ret = contexts['anchore_db'].load_all_images().keys() if ret and len(ret) > 0: for l in list(set(imagelist.keys()) | set(ret)): imagelist[l] = {'dockerfile':None} # Remove excluded items for excluded in filter_images: docker_cli = contexts['docker_cli'] if not docker_cli: raise Exception("Could not query docker - is docker running?") for img in docker_cli.images(name=excluded, quiet=True): imagelist.pop(img, None) return imagelist