我们从Python开源项目中,提取了以下41个代码示例,用于说明如何使用docopt.DocoptExit()。
def decorate_docopt_usage(func): """Handle DocoptExit exception :param func: function :type func: function :return: wrapped function :rtype: function """ @wraps(func) def wrapper(*args, **kwargs): try: result = func(*args, **kwargs) except docopt.DocoptExit as e: emitter.publish("Command not recognized\n") emitter.publish(e) return 1 return result return wrapper
def docopt_cmd(func): def fn(self, arg): try: if fn.__name__ == "do_pub": # Fix for negative numbers opt = docopt(fn.__doc__, arg, options_first=True) else: opt = docopt(fn.__doc__, arg) except DocoptExit as e: print('Command is invalid. See :') print(e) return except SystemExit: # The SystemExit exception prints the usage for --help # We do not need to do the print here. return return func(self, opt) fn.__name__ = func.__name__ fn.__doc__ = func.__doc__ fn.__dict__.update(func.__dict__) return fn
def docopt_parse(doc, argv=None, version=None): import docopt as _docopt def extras_patch(help, version, options, doc): """Patch of docopt.extra handler. Patch for docopt's 'extras' function is needed, as it is hard sys-exiting the bot after '--help/-h' or '--version' commands """ exc = None if help and any((o.name in ('-h', '--help')) and o.value for o in options): exc = _docopt.DocoptExit() exc.args = (doc.strip("\n"), ) if version and any(o.name == '--version' and o.value for o in options): exc = _docopt.DocoptExit() exc.args = (version, ) if exc is not None: raise exc # Apply the patch above to docopt.extras _docopt.extras = extras_patch return _docopt.docopt(doc, argv=argv, version=version)
def test_without_command(self): with self.assertRaises(DocoptExit) as context: StateHolder.skip_docker = True poco = Poco(home_dir=self.tmpdir, argv=[""]) poco.run() self.assertIsNotNone(context.exception)
def test_wrong_parameters(self): with self.assertRaises(DocoptExit) as context: StateHolder.skip_docker = True poco = Poco(home_dir=self.tmpdir, argv=["notexistcommand"]) poco.run() self.assertIsNotNone(context.exception)
def cliparser(func): """ This decorator is used to simplify the try/except block and pass the result of the docopt parsing to the called action. """ def fn(self, arg): try: opt = docopt(fn.__doc__, arg) except DocoptExit as e: # The DocoptExit is thrown when the args do not match. # We print a message to the user and the usage block. print('Invalid Command!') print(e) return except SystemExit: # The SystemExit exception prints the usage for --help # We do not need to do the print here. return return func(self, opt) fn.__name__ = func.__name__ fn.__doc__ = func.__doc__ fn.__dict__.update(func.__dict__) return fn
def parse_commands(docstring): # type: (str) -> Generator[Tuple[List[str], List[str]], None, None] """Parse a docopt-style string for commands and subcommands. Args: docstring: A docopt-style string to parse. If the string is not a valid docopt-style string, it will not yield and values. Yields: All tuples of commands and subcommands found in the docopt docstring. """ try: docopt.docopt(docstring, argv=()) except (TypeError, docopt.DocoptLanguageError): return except docopt.DocoptExit: pass for command in _parse_section('usage', docstring): args = command.split() commands = [] i = 0 for i, arg in enumerate(args): if arg[0].isalpha() and not arg[0].isupper(): commands.append(arg) else: break yield commands, args[i:]
def settable_options(doc, argv, ignore, options_first): """Determine which options we can set, which ones are boolean, and which ones are repeatable. All set items are option long names. :param str doc: Docstring from docoptcfg(). :param iter argv: CLI arguments from docoptcfg(). :param iter ignore: Options to ignore from docoptcfg(). :param bool options_first: docopt argument from docoptcfg(). :return: Settable options, boolean options, repeatable options, and short to long option name mapping. :rtype: tuple """ settable, booleans, repeatable, short_map = set(), set(), set(), dict() # Determine which options are settable by docoptcfg and which ones are flags/booleans. options = docopt.parse_defaults(doc) short_map.update((o.short, o.long) for o in options) parsed_argv = docopt.parse_argv(docopt.TokenStream(argv, docopt.DocoptExit), list(options), options_first) overridden = [o.long for o in parsed_argv if hasattr(o, 'long')] for option in options: if option.long in overridden or (option.long in ignore or option.short in ignore): continue if option.argcount == 0: booleans.add(option.long) settable.add(option.long) # Determine which options are repeatable. if settable and '...' in doc: pattern = docopt.parse_pattern(docopt.formal_usage(docopt.DocoptExit.usage), options) for option in pattern.fix().flat(): if not hasattr(option, 'long'): continue # Positional argument or sub-command. if getattr(option, 'long') not in settable: continue # Don't care about this if we can't set it. if getattr(option, 'long') in booleans and getattr(option, 'value') == 0: repeatable.add(getattr(option, 'long')) elif hasattr(getattr(option, 'value'), '__iter__'): repeatable.add(getattr(option, 'long')) return settable, booleans, repeatable, short_map
def docopt_cmd(func): """ This decorator is used to simplify the try/except block and pass the result of the docopt parsing to the called action. """ def fn(self, arg): try: opt = docopt(fn.__doc__, arg) except DocoptExit as e: # The DocoptExit is thrown when the args do not match. # We print a message to the user and the usage block. print('Invalid Command!') print(e) return except SystemExit: # The SystemExit exception prints the usage for --help # We do not need to do the print here. return return func(self, opt) fn.__name__ = func.__name__ fn.__doc__ = func.__doc__ fn.__dict__.update(func.__dict__) return fn
def runtest(self): try: result = docopt.docopt(self.doc, argv=self.argv) except docopt.DocoptExit: result = 'user-error' if self.expect != result: raise DocoptTestException(self, result)
def test_required_argument_search(self): sys.argv = ['prog_name', '--search'] self.assertRaises(docopt.DocoptExit, docopt.docopt, __doc__)
def test_required_argument_target(self): sys.argv = ['prog_name', '--target'] self.assertRaises(docopt.DocoptExit, docopt.docopt, __doc__)
def test_required_argument_filter(self): sys.argv = ['prog_name', '--filter'] self.assertRaises(docopt.DocoptExit, docopt.docopt, __doc__)
def test_required_argument_results(self): sys.argv = ['prog_name', '--results'] self.assertRaises(docopt.DocoptExit, docopt.docopt, __doc__)
def test_required_argument_quality(self): sys.argv = ['prog_name', '--quality'] self.assertRaises(docopt.DocoptExit, docopt.docopt, __doc__)
def test_required_argument_sort_type(self): sys.argv = ['prog_name', '--sort-type'] self.assertRaises(docopt.DocoptExit, docopt.docopt, __doc__)
def test_no_required_argument_list(self): sys.argv = ['prog_name', '--list'] try: docopt.docopt(__doc__) except docopt.DocoptExit: self.fail('--list required argument!')
def test_no_required_argument_links(self): sys.argv = ['prog_name', '--links'] try: docopt.docopt(__doc__) except docopt.DocoptExit: self.fail('--links required argument!')
def test_no_required_argument_get_list(self): sys.argv = ['prog_name', '--get-list'] try: docopt.docopt(__doc__) except docopt.DocoptExit: self.fail('--get-list required argument!')
def test_no_required_argument_version(self): sys.argv = ['prog_name', '--version'] try: docopt.docopt(__doc__) except docopt.DocoptExit: self.fail('--version required argument!')
def handle(self, arguments, user, **kwargs): parsed = shlex.split(arguments) try: args = docopt.docopt(self.usage, parsed) except docopt.DocoptExit as e: raise exceptions.HandleError(safestring.mark_safe(str(e)), code='invalid_arg') subcommand = arguments.split(' ')[0] f = getattr(self, 'handle_{0}'.format(subcommand)) r = f(args, user, **kwargs) r['parsed_arguments'] = args r['subcommand'] = subcommand return r
def test_noop(): """Ensure docopt exit (displays usage) """ with pytest.raises(DocoptExit): main([]) with pytest.raises(DocoptExit): main(['foobar'])
def respond(self, cmd, msg_json): """Respond to a bot command.""" try: # Create a 'fake' CLI execution of the actual bot program argv = shlex.split(cmd.replace('``', '"')) args = docopt_parse(C.DOC, argv=argv, version=__version__) bot_main(self, args, msg_json) except docopt.DocoptExit as e: self.send("```text\n{0}```".format(str(e)))
def respond(self, cmd, msg_json, room_id=None): """Respond to a bot command.""" try: # Create a 'fake' CLI execution of the actual bot program argv = cmd.split() args = docopt_parse(C.DOC, argv=argv) client = GitterClient(self.token, room_id) bot_main(client, args, msg_json) except docopt.DocoptExit as e: self.send("```text\n{0}```".format(str(e)))
def test_commit_status_no_args(self): try: githubtools.commit_status.main() except DocoptExit as e: self.assertTrue(str(e).startswith("Usage:")) else: self.assertTrue(False)
def test_create_pull_request_no_args(self): try: githubtools.create_pull_request.main() except DocoptExit as e: self.assertTrue(str(e).startswith("Usage:")) else: self.assertTrue(False)
def test_merge_pull_request_no_args(self): try: githubtools.merge_pull_request.main() except DocoptExit as e: self.assertTrue(str(e).startswith("Usage:")) else: self.assertTrue(False)
def test_merge_no_args(self): try: githubtools.merge.main() except DocoptExit as e: self.assertTrue(str(e).startswith("Usage:")) else: self.assertTrue(False)
def main(args=None): args = docopt(__doc__, argv=args, version=VERSION) # from the whole input bag, split media files from transcriptions # media and its transcript must be paired (i.e same order) # for example, supose a folder with a video file macri_gato.mp4 and # its transcription is macri_gato.txt # # *.mp4 *.txt # macri_gato.* # macri_gato.mp4 macri_gato.txt media = [] transcripts = [] for filename in chain.from_iterable(glob.iglob(pattern) for pattern in args['<input_files>']): output_extension = os.path.splitext(filename)[1][1:] if output_extension in extensions_dict: media.append(filename) else: transcripts.append(filename) media_str = ' \n'.join(media) transcripts_str = ' \n'.join(transcripts) info = "Audio/Video:\n {}\nTranscripts/subtitle:\n {}".format(media_str, transcripts_str) logging.info(info) if not media or len(media) != len(transcripts): raise DocoptExit( "Input mismatch: the quantity of inputs and transcriptions differs" ) try: return miau( media, transcripts, args['--remix'], args['--output'], args['--dump'], debug=args['--debug'], force_language=args['--lang'] ) except ValueError as e: raise DocoptExit(str(e))
def docopt_full_help(docstring, *args, **kwargs): try: return docopt(docstring, *args, **kwargs) except DocoptExit: raise SystemExit(docstring)
def main(args=None): try: arguments = docopt(__doc__, argv=args, version=APPVSN) except DocoptExit as usage: print(usage) sys.exit(1) path = os.getcwd() logger.configure(arguments['--log-level']) result = False if arguments['create']: result = create(path, arguments) if arguments['build']: result = build(path, arguments) if arguments['version']: result = version(path) if arguments['deps']: result = deps(path) if arguments['release']: result = release(path, arguments) if arguments['package']: result = package(path, arguments) if arguments['upgrade']: result = upgrade(path, arguments) if arguments['add_package']: result = add_package(path, arguments) if arguments['eunit']: result = eunit(path, arguments) if arguments['ct']: result = ct(path, arguments) if arguments['fetch']: result = fetch(arguments) if arguments['install']: result = install(arguments) if arguments['uninstall']: result = uninstall(arguments) if arguments['installed']: result = installed() if result: sys.exit(0) else: sys.exit(1)
def test_args_none(self): self.assertRaises(DocoptExit, docopt, to_string(main_doc), None, version=__version__)
def test_args_two_commands(self): self.assertRaises(DocoptExit, docopt, to_string(main_doc), 'lsa luhn'.split(), version=__version__)
def test_args_url_and_file(self): self.assertRaises(DocoptExit, docopt, to_string(main_doc), 'lsa --url=URL --file=FILE'.split(), version=__version__)
def main(): try: cli.banner() arguments = docopt(__doc__, version="TWA Corp. SubDomain Finder - 2016") target = arguments['--target'] wordlist = arguments['--wordlist'] threads = arguments['--threads'] opt_scan = arguments['--scan'] opt_whois = arguments['--whois'] opt_scan_ports = arguments['--scan-ports'] opt_scan_options = arguments['--scan-options'] opt_uniq_ips = arguments['--uniq-ip'] except DocoptExit as e: cli.banner() os.system('python3 subdomain_finder.py --help') sys.exit(1) if not wordlist: wordlist = os.path.join(os.getcwd(), os.path.dirname(__file__), 'data', 'wordlist.txt') try: domains_ips = subdomain_finder.finder(threads, wordlist, target) except: print("Wordlist {0} ERROR: {1}".format(wordlist, sys.exc_info()[1])) exit(0) if opt_uniq_ips: print("\n IPs:") ips = subdomain_finder.ip_uniq(domains_ips) print(" Uniq: "+str(len(ips))) for ip in subdomain_finder.ip_uniq(domains_ips): print(" "+ip) if opt_scan: subdomain_finder.ip_scan(domains_ips, opt_scan_ports, opt_scan_options) if opt_whois: subdomain_finder.domain_whois(target) cli.banner()
def docopt_cmd(func): """ This decorator is used to simplify the try/except block and pass the result of the docopt parsing to the called action. """ def fn(self, arg): try: opt = docopt(fn.__doc__, arg) except DocoptExit as e: # The DocoptExit is thrown when the args do not match. # We print a message to the user and the usage block. print(colored("Invalid Command!", "red")) print(e) return except SystemExit: # The SystemExit exception prints the usage for --help # We do not need to do the print here. return return func(self, opt) fn.__name__ = func.__name__ fn.__doc__ = func.__doc__ fn.__dict__.update(func.__dict__) return fn
def values_from_file(docopt_dict, config_option, settable, booleans, repeatable): """Parse config file and read settable values. Can be overridden by both command line arguments and environment variables. :raise DocoptcfgError: If `config_option` isn't found in docstring. :raise DocoptcfgFileError: On any error while trying to read and parse config file. :param dict docopt_dict: Dictionary from docopt with environment variable defaults merged in by docoptcfg(). :param str config_option: Config option long name with file path as its value. :param iter settable: Option long names available to set by config file. :param iter booleans: Option long names of boolean/flag types. :param iter repeatable: Option long names of repeatable options. :return: Settable values. :rtype: dict """ section = docopt.DocoptExit.usage.split()[1] settable = set(o for o in settable if o != config_option) config = ConfigParser() defaults = dict() # Sanity checks. if config_option not in docopt_dict: raise DocoptcfgError if docopt_dict[config_option] is None or not settable: return defaults # Read config file. path = DocoptcfgFileError.FILE_PATH = docopt_dict[config_option] try: with open(path) as handle: if hasattr(config, 'read_file'): config.read_file(handle) else: getattr(config, 'readfp')(handle) except Error as exc: raise DocoptcfgFileError('Unable to parse config file.', str(exc)) except IOError as exc: raise DocoptcfgFileError('Unable to read config file.', str(exc)) # Make sure section is in config file. if not config.has_section(section): raise DocoptcfgFileError('Section [{0}] not in config file.'.format(section)) # Parse config file. for key in settable: if config.has_option(section, key[2:]): defaults[key] = get_opt(key, config, section, booleans, repeatable) return defaults
def main(): global in_pipe, out_pipe, public_key, private_key try: docopt_config = "Usage: my_program.py [--port=PORT] [--connect=PORT]" arguments = docopt.docopt(docopt_config) port = arguments["--port"] if port == None: port = 5555 connect_dest = arguments["--connect"] except docopt.DocoptExit as e: print(e.message) return context = zmq.Context() in_pipe = zpipe(context) out_pipe = zpipe(context) loop = asyncio.get_event_loop() net_config = {"port": port} # Generate Node Keys & Id. private_key = enc.generate_RSA(4096) public_key = private_key.publickey(); # debug("Private Key=[%s], Public Key=[%s]." % (str(private_key.exportKey("PEM")), str(public_key.exportKey("PEM")))) node_id = enc.generate_ID(public_key.exportKey("DER")) debug("node_id=[%s]." % node_id.hexdigest()) # Start Net Engine. zmq_future = loop.run_in_executor(None, engageNet, loop, context, out_pipe[0], in_pipe[1], net_config) # thread = threading.Thread(target=engageNet, args=(loop, context, out_pipe[0], in_pipe[1], net_config)) # thread.daemon = True # thread.start() # Connect for testing. if connect_dest != None: out_pipe[1].send_multipart([b"conn", "tcp://{}".format(connect_dest).encode()]) # out_pipe[0].send_multipart([b"conn", "tcp://localhost:{}".format(port).encode()]) try: loop.run_until_complete(zmq_future) except BaseException as e: handleException("loop.run_until_complete()", e) out_pipe[1].send_multipart([b"shutdown"]) loop.stop() loop.close() zmq_future.cancel() sys.exit(1)
def docopt_wrapper(usage, real_usage, **keywords): """ A wrapper around the real docopt parser. Redirects a failed command to /dev/null and prints the proper real_usage message, instead of just the usage string from usage. :param usage: The simplified usage string to parse :type usage: str :param real_usage: The original usage string to parse :type real_usage: str :param keywords: The keyword arguments to pass to docopt :type keywords: dict :returns: The parsed arguments :rtype: dict """ base_subcommand = keywords.pop('base_subcommand') subcommand = keywords.pop('subcommand') try: stdout = sys.stdout # We run docopt twice (once with the real usage string and # once with the modified usage string) in order to populate # the 'real' arguments properly. with open(os.devnull, 'w') as nullfile: sys.stdout = nullfile real_arguments = docopt.docopt( real_usage, argv=[base_subcommand]) arguments = docopt.docopt( usage, **keywords) sys.stdout = stdout real_arguments.update(arguments) real_arguments[subcommand] = True return real_arguments except docopt.DocoptExit: sys.stdout = stdout print(real_usage.strip(), file=sys.stderr) sys.exit(1) except SystemExit: sys.stdout = stdout if "argv" in keywords and any(h in ("-h", "--help") for h in keywords["argv"]): print(real_usage.strip()) elif "version" in keywords and any(v in ("--version") for v in keywords["argv"]): print(keywords["version"].strip()) sys.exit()
def beradio_cmd(): """ Usage: beradio forward --source=serial:///dev/ttyUSB0 --target=mqtt://localhost [--protocol=<version>] [--debug] beradio decode <payload> [--protocol=<version>] [--debug] beradio info beradio --version beradio (-h | --help) Options: --source=<source> Data source, e.g. serial:///dev/ttyUSB0 --target=<target> Data sink, e.g. mqtt://localhost --protocol=<version> Protocol version: 1 or 2 [default: 2] --version Show version information --debug Enable debug messages -h --help Show this screen """ options = docopt(beradio_cmd.__doc__, version=APP_NAME) #print 'options: {}'.format(options) source = options.get('--source') target = options.get('--target') protocol = options.get('--protocol') boot_logging(options) if options.get('forward'): if source.startswith('serial://') and target.startswith('mqtt://'): source = source.replace('serial://', '') SerialToMQTT(serial_device=source, mqtt_broker=target, protocol=protocol).setup().forward() elif source.startswith('data://') and target.startswith('mqtt://'): data = source.replace('data://', '') if data == 'stdin': data = sys.stdin.read().strip() DataToMQTT(mqtt_broker=target, protocol=protocol).setup().publish(data) else: raise DocoptExit('Unable to handle combination of {} and {} in forwarding mode'.format(options.get('--source'), options.get('--target'))) elif options.get('decode'): p = protocol_factory(protocol) payload = options.get('<payload>') return json.dumps(p.decode_safe(payload), indent=4) elif options.get('info'): network_id = str(NetworkIdentifier()) gateway_id = str(GatewayIdentifier()) print >>sys.stderr, '-' * 50 print >>sys.stderr, APP_NAME.center(50) print >>sys.stderr, '-' * 50 print >>sys.stderr, 'config file: {}'.format(BERadioConfig().config_file) print >>sys.stderr, 'network_id: {}'.format(network_id) print >>sys.stderr, 'gateway_id: {}'.format(gateway_id)