我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用getopt.gnu_getopt()。
def get_opt(argv): (opts, args) = getopt.gnu_getopt(argv, 'j:e:t:', ['jobs=', 'env=', 'tags=']) if args: args = ','.join(args) raise Exception('non-dashed options "%s" not recognized' % args) jobs_dir = None env_vars = [] job_tags = [] for (opt, arg) in opts: if opt in ['-j', '--jobs']: jobs_dir = arg elif opt in ['-e', '--env']: env_vars = arg.split(',') elif opt in ['-t', '--tags']: job_tags = [a for a in arg.split(',') if a] if not jobs_dir: raise Exception('at least -j or --jobs must be specified') return (jobs_dir, job_tags, env_vars)
def get_opt(argv): (opts, args) = getopt.gnu_getopt( argv, 's:r:d:t:', ['source=', 'refspec=', 'destination=', 'timeout='] ) if args: args = ','.join(args) raise Exception('non-dashed options "%s" not recognized' % args) src = None dst = None refspec = None timeout = 600 for (opt, arg) in opts: if opt in ['-s', '--source']: src = arg elif opt in ['-r', '--refspec']: refspec = arg elif opt in ['-d', '--destination']: dst = arg elif opt in ['-t', '--timeout']: timeout = int(arg) return (src, dst, refspec, timeout)
def __init__(self): # Since everything in this class is so heavily overloaded, the only # way of defining and using fields is to access __dict__ directly. # Dictionary: flag name (string) -> Flag object. self.__dict__['__flags'] = {} # Dictionary: module name (string) -> list of Flag objects that are defined # by that module. self.__dict__['__flags_by_module'] = {} # Dictionary: module id (int) -> list of Flag objects that are defined by # that module. self.__dict__['__flags_by_module_id'] = {} # Dictionary: module name (string) -> list of Flag objects that are # key for that module. self.__dict__['__key_flags_by_module'] = {} # Set if we should use new style gnu_getopt rather than getopt when parsing # the args. Only possible with Python 2.3+ self.UseGnuGetOpt(False)
def parse_args(args, options): """Parse arguments from command-line to set options.""" long_opts = ['help', 'oauth', 'followers', 'following', 'api-rate', 'ids'] short_opts = "horgai" opts, extra_args = getopt(args, short_opts, long_opts) for opt, arg in opts: if opt in ('-h', '--help'): print(__doc__) raise SystemExit(1) elif opt in ('-o', '--oauth'): options['oauth'] = True elif opt in ('-r', '--followers'): options['followers'] = True elif opt in ('-g', '--following'): options['followers'] = False elif opt in ('-a', '--api-rate'): options['api-rate' ] = True elif opt in ('-i', '--ids'): options['show_id'] = True options['extra_args'] = extra_args
def parse_args(query=''): query = query.split() dic = { 'isUpdate': False, 'platform': default_platform, 'command': '' } try: opts, args = getopt.gnu_getopt(query, 'uo:') except: return dic for opt, arg in opts: if opt == '-u': dic['isUpdate'] = True elif opt == '-o': dic['platform'] = arg dic['command'] = '-'.join(args) return dic
def test_gnu_getopt(self): # Test handling of GNU style scanning mode. cmdline = ['-a', 'arg1', '-b', '1', '--alpha', '--beta=2'] # GNU style opts, args = getopt.gnu_getopt(cmdline, 'ab:', ['alpha', 'beta=']) self.assertEqual(args, ['arg1']) self.assertEqual(opts, [('-a', ''), ('-b', '1'), ('--alpha', ''), ('--beta', '2')]) # recognize "-" as an argument opts, args = getopt.gnu_getopt(['-a', '-', '-b', '-'], 'ab:', []) self.assertEqual(args, ['-']) self.assertEqual(opts, [('-a', ''), ('-b', '-')]) # Posix style via + opts, args = getopt.gnu_getopt(cmdline, '+ab:', ['alpha', 'beta=']) self.assertEqual(opts, [('-a', '')]) self.assertEqual(args, ['arg1', '-b', '1', '--alpha', '--beta=2']) # Posix style via POSIXLY_CORRECT self.env["POSIXLY_CORRECT"] = "1" opts, args = getopt.gnu_getopt(cmdline, 'ab:', ['alpha', 'beta=']) self.assertEqual(opts, [('-a', '')]) self.assertEqual(args, ['arg1', '-b', '1', '--alpha', '--beta=2'])
def parse_args(args, options): """Parse arguments from command-line to set options.""" short_opts = 's:t:m:fp:ih' long_opts = ['save-dir=', 'timeline=', 'mentions=', 'favorites', 'privatemsg=', 'isoformat', 'help'] opts, extra_args = gnu_getopt(args, short_opts, long_opts) for opt, arg in opts: if opt in ('-s', '--save-dir'): options['save-dir'] = arg elif opt in ('-t', '--timeline'): options['timeline'] = arg elif opt in ('-m', '--mentions'): options['mentions'] = arg elif opt in ('-f', '--favorites'): options['favorites'] = True elif opt in ('-p', '--privatemsg'): options['privatemsg'] = arg elif opt in ('-i', '--isoformat'): options['isoformat'] = True elif opt in ('-h', '--help'): print(__doc__) sys.exit(0) options['extra_args'] = extra_args
def main(): try: opts, args = getopt.gnu_getopt(sys.argv[1:], 'ho', ['help', 'output=']) except getopt.GetoptError: Usage() sys.exit(2) try: user = args[0] except: Usage() sys.exit(2) output = None for o, a in opts: if o in ("-h", "--help"): Usage() sys.exit(2) if o in ("-o", "--output"): output = a FetchTwitter(user, output)
def attach(dname='REDHAWK_DEV', debug=False): global dom try : olist, a= getopt.gnu_getopt(sys.argv,"d:") d=filter(lambda x: x[0] == '-d', olist )[0] dname=d[1] sys.argv=a except: pass print "Attach:"+str(dname) dom=redhawk.attach(dname ) if debug: redhawk.setDebug(true)
def UseGnuGetOpt(self, use_gnu_getopt=True): """Use GNU-style scanning. Allows mixing of flag and non-flag arguments. See http://docs.python.org/library/getopt.html#getopt.gnu_getopt Args: use_gnu_getopt: wether or not to use GNU style scanning. """ self.__dict__['__use_gnu_getopt'] = use_gnu_getopt
def parse_args(args, options): """Parse arguments from command-line to set options.""" long_opts = ['help', 'oauth', 'save-dir=', 'api-rate', 'timeline=', 'mentions=', 'favorites', 'follow-redirects',"redirect-sites=", 'dms=', 'isoformat'] short_opts = "hos:at:m:vfr:d:i" opts, extra_args = getopt(args, short_opts, long_opts) for opt, arg in opts: if opt in ('-h', '--help'): print(__doc__) raise SystemExit(0) elif opt in ('-o', '--oauth'): options['oauth'] = True elif opt in ('-s', '--save-dir'): options['save-dir'] = arg elif opt in ('-a', '--api-rate'): options['api-rate' ] = True elif opt in ('-t', '--timeline'): options['timeline'] = arg elif opt in ('-m', '--mentions'): options['mentions'] = arg elif opt in ('-v', '--favorites'): options['favorites'] = True elif opt in ('-f', '--follow-redirects'): options['follow-redirects'] = True elif opt in ('-r', '--redirect-sites'): options['redirect-sites'] = arg elif opt in ('-d', '--dms'): options['dms'] = arg elif opt in ('-i', '--isoformat'): options['isoformat'] = True options['extra_args'] = extra_args
def parse_args(args, options): long_opts = ['help', 'format=', 'refresh', 'oauth=', 'refresh-rate=', 'config=', 'length=', 'timestamp', 'datestamp', 'no-ssl', 'force-ansi'] short_opts = "e:p:f:h?rR:c:l:td" opts, extra_args = getopt(args, short_opts, long_opts) if extra_args and hasattr(extra_args[0], 'decode'): extra_args = [arg.decode(locale.getpreferredencoding()) for arg in extra_args] for opt, arg in opts: if opt in ('-f', '--format'): options['format'] = arg elif opt in ('-r', '--refresh'): options['refresh'] = True elif opt in ('-R', '--refresh-rate'): options['refresh_rate'] = int(arg) elif opt in ('-l', '--length'): options["length"] = int(arg) elif opt in ('-t', '--timestamp'): options["timestamp"] = True elif opt in ('-d', '--datestamp'): options["datestamp"] = True elif opt in ('-?', '-h', '--help'): options['action'] = 'help' elif opt in ('-c', '--config'): options['config_filename'] = arg elif opt == '--no-ssl': options['secure'] = False elif opt == '--oauth': options['oauth_filename'] = arg elif opt == '--force-ansi': options['force-ansi'] = True if extra_args and not ('action' in options and options['action'] == 'help'): options['action'] = extra_args[0] options['extra_args'] = extra_args[1:]
def parse_args(args): options = {} short_opts = 'rR:f:l:tdc:h' long_opts = ['refresh', 'refresh-rate=', 'format=', 'length=', 'timestamp', 'datestamp', 'config=', 'oauth=', 'help', 'invert-split', 'force-ansi'] opts, extra_args = gnu_getopt(args, short_opts, long_opts) # decode Non-ASCII args for Python 2 if extra_args and hasattr(extra_args[0], 'decode'): extra_args = [arg.decode(locale.getpreferredencoding()) for arg in extra_args] for opt, arg in opts: if opt in ('-r', '--refresh'): options['refresh'] = True elif opt in ('-R', '--refresh-rate'): options['refresh-rate'] = int(arg) elif opt in ('-f', '--format'): options['format'] = arg elif opt in ('-l', '--length'): options['length'] = int(arg) elif opt in ('-t', '--timestamp'): options['timestamp'] = True elif opt in ('-d', '--datestamp'): options['datestamp'] = True elif opt in ('-c', '--config'): options['config-filename'] = arg elif opt == '--oauth': options['oauth-filename'] = arg elif opt in ('-h', '--help'): options['action'] = 'help' elif opt == '--invert-split': options['invert-split'] = True elif opt == '--force-ansi': options['force-ansi'] = True if extra_args and 'action' not in options: options['action'] = extra_args[0] options['extra-args'] = extra_args[1:] return options
def parse_parameters(cmd_args, short_option_definition, long_option_definition): try: opts, args = getopt.gnu_getopt(cmd_args, short_option_definition, long_option_definition) except ImportError as e: print("unable to import python module 'getopt'") print(e) sys.exit(-1) except getopt.GetoptError as e: print("error when parsing command line arguments") print(e) sys.exit(-1) else: return opts, args
def main(argv): try: opts, args = getopt.gnu_getopt(argv[1:], "hp:o:d:z:", ["help", "publish_on_pubrel=", "overlapping_single=", "dropQoS0=", "port=", "zero_length_clientids="]) except getopt.GetoptError as err: print(err) # will print something like "option -a not recognized" usage() sys.exit(2) publish_on_pubrel = overlapping_single = dropQoS0 = zero_length_clientids = True port = 1883 for o, a in opts: if o in ("-h", "--help"): usage() sys.exit() elif o in ("-p", "--publish_on_pubrel"): publish_on_pubrel = False if a in ["off", "false", "0"] else True elif o in ("-o", "--overlapping_single"): overlapping_single = False if a in ["off", "false", "0"] else True elif o in ("-d", "--dropQoS0"): dropQoS0 = False if a in ["off", "false", "0"] else True elif o in ("-z", "--zero_length_clientids"): zero_length_clientids = False if a in ["off", "false", "0"] else True elif o in ("--port"): port = int(a) else: assert False, "unhandled option" run(publish_on_pubrel=publish_on_pubrel, overlapping_single=overlapping_single, dropQoS0=dropQoS0, port=port, zero_length_clientids=zero_length_clientids)
def parse_argv_module_id(argv): #baseline_args = ['module_name','lsid','cache_path','execution_id','cwd'] arg_names = [arg_name[2:]+'=' for arg_name in argv if arg_name[:2]=='--'] (param_value_pairs,remnants)=getopt.gnu_getopt(argv,"",arg_names) #Set defaults module_name = '' lsid = '' cache_path = common.cache_path execution_id = '0' module_libdir = None cwd = os.getcwd() sg_prepare_template = '' args_used = set() for arg_pair in param_value_pairs: param_name = arg_pair[0][2:] #strip off '--' at start param_value = arg_pair[1] if param_name in args_used: raise Exception ('duplicated parameter: %s'%param_name) else: args_used.add(param_name) if param_name == 'module_name': module_name = param_value elif param_name == 'lsid': lsid = param_value elif param_name == 'cache_path': cache_path = param_value elif param_name == 'execution_id': execution_id = param_value elif param_name == 'sg_prepare_template': sg_prepare_template = param_value elif param_name == 'module_libdir': module_libdir = param_value module_name = os.path.basename(module_libdir) elif param_name == 'cwd': cwd = param_value # test harness #module_name = "CancerBirdseedSNPsToGeli" #lsid = "urn_lsid_broadinstitute_org_cancer_genome_analysis_00038_12" #cache_path = common.cache_path #execution_id = "0" #cwd = "/xchip/tcga_scratch/gsaksena/test_run_gp_module" return (module_name,lsid,cache_path,execution_id,module_libdir,cwd,sg_prepare_template)
def main(args): try: (opts, args) = getopt(args, 'o:TPX') except GetoptError: usage() if len(args) != 1: usage() from tokenizer import Tokenizer from parser import Parser from error import JtError import context from os.path import abspath filename = abspath(args[0]) stdin = file(filename, 'r') target = 'P' stdout = sys.stdout for (ok, ov) in opts: if ok in ('-T', '-P', '-X'): target = ok[1] elif ok == '-o': stdout = file(ov, 'w') contents = stdin.read() tokenizer = Tokenizer() tokenizer.build() tokenizer.input(contents) parser = Parser(tokenizer) result_tree = None try: result_tree = parser.parse() except JtError, error: failure(error) context.add_pdf(result_tree) ok = context.inspect(result_tree) ok &= context.validate(result_tree) if target == 'T': print >>stdout, result_tree if not ok: failure() result_tree.filename = filename if target != 'T': if stdout.isatty(): failure('Prevented from printing binary garbage to the terminal.') if target == 'P': result_tree.compile_pyc(stdout) elif target == 'X': result_tree.compile_x86(stdout) else: raise NotImplementedError() # vim:ts=4 sts=4 sw=4 et
def main(): """Command line interface for the ``qpass`` program.""" # Initialize logging to the terminal. coloredlogs.install() # Prepare for command line argument parsing. action = show_matching_entry program_opts = dict() show_opts = dict(use_clipboard=is_clipboard_supported()) # Parse the command line arguments. try: options, arguments = getopt.gnu_getopt(sys.argv[1:], 'elnp:vqh', [ 'edit', 'list', 'no-clipboard', 'password-store=', 'verbose', 'quiet', 'help', ]) for option, value in options: if option in ('-e', '--edit'): action = edit_matching_entry elif option in ('-l', '--list'): action = list_matching_entries elif option in ('-n', '--no-clipboard'): show_opts['use_clipboard'] = False elif option in ('-p', '--password-store'): stores = program_opts.setdefault('stores', []) stores.append(PasswordStore(directory=value)) elif option in ('-v', '--verbose'): coloredlogs.increase_verbosity() elif option in ('-q', '--quiet'): coloredlogs.decrease_verbosity() elif option in ('-h', '--help'): usage(__doc__) return else: raise Exception("Unhandled option! (programming error)") if not (arguments or action == list_matching_entries): usage(__doc__) return except Exception as e: warning("Error: %s", e) sys.exit(1) # Execute the requested action. try: action(QuickPass(**program_opts), arguments, **(show_opts if action == show_matching_entry else {})) except PasswordStoreError as e: # Known issues don't get a traceback. logger.error("%s", e) sys.exit(1) except KeyboardInterrupt: # If the user interrupted an interactive prompt they most likely did so # intentionally, so there's no point in generating more output here. sys.exit(1)
def cmd_logs(data, buffer, args): """List files in Weechat's log dir.""" cmd_init() global home_dir sort_by_size = False filter = [] try: opts, args = getopt.gnu_getopt(args.split(), 's', ['size']) if args: filter = args for opt, var in opts: opt = opt.strip('-') if opt in ('size', 's'): sort_by_size = True except Exception, e: error('Argument error, %s' %e) return WEECHAT_RC_OK # is there's a filter, filter_excludes should be False file_list = dir_list(home_dir, filter, filter_excludes=not filter) if sort_by_size: file_list.sort(key=get_size) else: file_list.sort() file_sizes = map(lambda x: human_readable_size(get_size(x)), file_list) # calculate column lenght if file_list: L = file_list[:] L.sort(key=len) bigest = L[-1] column_len = len(bigest) + 3 else: column_len = '' buffer = buffer_create() if get_config_boolean('clear_buffer'): weechat.buffer_clear(buffer) file_list = zip(file_list, file_sizes) msg = 'Found %s logs.' %len(file_list) print_line(msg, buffer, display=True) for file, size in file_list: separator = column_len and '.'*(column_len - len(file)) prnt(buffer, '%s %s %s' %(strip_home(file), separator, size)) if file_list: print_line(msg, buffer) return WEECHAT_RC_OK ### Completion ###
def twitter(bot, message): """#twitter [-p ??] -p : ???? """ try: cmd, *args = shlex.split(message.text) except ValueError: return False if not cmd[0] in config['trigger']: return False if not cmd[1:] == 'twitter': return False try: options, args = getopt.gnu_getopt(args, 'hp:') except getopt.GetoptError: # ???? reply(bot, message, twitter.__doc__) return True days = 0 for o, a in options: if o == '-p': # ???? try: days = int(a) if days < 0: raise ValueError except ValueError: reply(bot, message, twitter.__doc__) return True elif o == '-h': # ?? reply(bot, message, twitter.__doc__) return True tweets = Twitter.objects(Q(date__gte=datetime.now().date()+timedelta(days=-days)) & Q(date__lte=datetime.now().date()+timedelta(days=-days+1))) if tweets: reply(bot, message, '\n---------\n'.join([str(tweet) for tweet in tweets])) return True else: reply(bot, message, '??????...') return True
def poster(bot, message): """#poster [-h] [-f] -h : ????? -f : ???? """ try: cmd, *args = shlex.split(message.text) except ValueError: return False if not cmd[0] in config['trigger']: return False if not cmd[1:] == 'poster': return False try: options, args = getopt.gnu_getopt(args, 'hf') except getopt.GetoptError: # ???? reply(bot, message, poster.__doc__) return True refresh = False # ??? for o, a in options: if o == '-h': # ?? reply(bot, message, poster.__doc__) return True elif o == '-f': refresh = True weekday = datetime.now().weekday() if weekday >= 3: delta = timedelta(days=weekday-3) else: delta = timedelta(days=7+weekday-3) thu_date = datetime.now().date() - delta url = '{}event{}.jpg'.format(BASE_URL, thu_date.strftime('%Y%m%d')) filename = os.path.basename(url) dir = os.path.join(config['cq_root_dir'], config['cq_image_dir'], 'poster') path = os.path.join(dir, filename) if not os.path.exists(dir): os.mkdir(dir) if not os.path.exists(path) or refresh: resp = requests.get(url, timeout=60, proxies=config.get('proxies')) if not resp.status_code == 200: reply(bot, message, '?????...????????????') return True with open(path, 'wb') as f: f.write(resp.content) reply(bot, message, CQImage(os.path.join('poster', filename))) return True
def unalias_command(bot, message): """#unalias [-h] ?? -h : ????? ?? : ?????? """ try: cmd, *args = shlex.split(message.text) except ValueError: return False if not cmd[0] in config['trigger']: return False if not cmd[1:] == 'unalias': return False try: options, args = getopt.gnu_getopt(args, 'hd:') except getopt.GetoptError: # ???? reply(bot, message, unalias_command.__doc__) return True origin = None for o, a in options: if o == '-h': # ?? reply(bot, message, unalias_command.__doc__) return True # ?????? if not args: reply(bot, message, unalias_command.__doc__) return True command = args[0] alias = Alias.objects(alias=command) if not alias: reply(bot, message, '?????{}'.format(command)) reply(bot, message, '????? {} => {}'.format(alias.alias, alias.origin)) alias.delete() return True
def main(): try: shortflags = 'h' longflags = ['help', 'consumer-key=', 'consumer-secret=', 'access-key=', 'access-secret=', 'encoding='] opts, args = getopt.gnu_getopt(sys.argv[1:], shortflags, longflags) except getopt.GetoptError: PrintUsageAndExit() consumer_keyflag = None consumer_secretflag = None access_keyflag = None access_secretflag = None encoding = None for o, a in opts: if o in ("-h", "--help"): PrintUsageAndExit() if o in ("--consumer-key"): consumer_keyflag = a if o in ("--consumer-secret"): consumer_secretflag = a if o in ("--access-key"): access_keyflag = a if o in ("--access-secret"): access_secretflag = a if o in ("--encoding"): encoding = a message = ' '.join(args) if not message: PrintUsageAndExit() rc = TweetRc() consumer_key = consumer_keyflag or GetConsumerKeyEnv() or rc.GetConsumerKey() consumer_secret = consumer_secretflag or GetConsumerSecretEnv() or rc.GetConsumerSecret() access_key = access_keyflag or GetAccessKeyEnv() or rc.GetAccessKey() access_secret = access_secretflag or GetAccessSecretEnv() or rc.GetAccessSecret() if not consumer_key or not consumer_secret or not access_key or not access_secret: PrintUsageAndExit() api = twitter.Api(consumer_key=consumer_key, consumer_secret=consumer_secret, access_token_key=access_key, access_token_secret=access_secret, input_encoding=encoding) try: status = api.PostUpdate(message) except UnicodeDecodeError: print "Your message could not be encoded. Perhaps it contains non-ASCII characters? " print "Try explicitly specifying the encoding with the --encoding flag" sys.exit(2) print "%s just posted: %s" % (status.user.name, status.text)
def get_options(): ''' Extracts command line arguments ''' input_fname = None output_fname = None test_fname = None max_candidates = MAX_CANDIDATES check_only = False silent = False try: opts, args = getopt.gnu_getopt(sys.argv[1:], 'hi:o:t:', ['help', 'input-file=', 'output-file=', 'test-file=', 'check-only', 'silent']) except getopt.GetoptError, err: sys.stderr.write('Error: %s\n' % err) usage() sys.exit(1) for o, a in opts: if o in ('-i', '--input-file'): input_fname = a elif o in ('-o', '--output-file'): output_fname = a elif o in ('-t', '--test-file'): test_fname = a elif o in ('-h', '--help'): usage() sys.exit() elif o in ('--check-only',): check_only = True elif o in ('--silent',): silent = True elif o in ('--max-candidates', ): try: max_candidates = int(a) except ValueError, e: sys.stderr.write('Error: --max-candidates takes integer argument (you provided %s).\n' % a) sys.exit(1) if max_candidates < 1: sys.stderr.write('Error: --max-candidates must be above 0.\n') sys.exit(1) else: sys.stderr.write('Error: unknown option %s. Type --help to see the options.\n' % o) sys.exit(1) if check_only: if test_fname or output_fname: sys.stderr.write('No test file or output file is required to check the input format.\n') sys.exit(1) else: if not test_fname: sys.stderr.write('Error: no test file provided.\n') sys.exit(1) return input_fname, output_fname, test_fname, max_candidates, check_only, silent
def main(argv): try: opts, args = getopt.gnu_getopt(argv[1:], "hp:o:d:z:t:m:r:s:", ["help", "port=", "publish_on_pubrel=", "overlapping_single=", "dropQoS0=", "zero_length_clientids=" "topicAliasMaximum=", "maximumPacketSize=", "receiveMaximum=", "serverKeepAlive="]) except getopt.GetoptError as err: print(err) # will print something like "option -a not recognized" usage() sys.exit(2) publish_on_pubrel = overlapping_single = dropQoS0 = zero_length_clientids = True topicAliasMaximum = 2 maximumPacketSize = 1000 receiveMaximum = 20 serverKeepAlive = 60 port = 1883 for o, a in opts: if o in ("-h", "--help"): usage() sys.exit() elif o in ("-p", "--publish_on_pubrel"): publish_on_pubrel = False if a in ["off", "false", "0"] else True elif o in ("-o", "--overlapping_single"): overlapping_single = False if a in ["off", "false", "0"] else True elif o in ("-d", "--dropQoS0"): dropQoS0 = False if a in ["off", "false", "0"] else True elif o in ("-z", "--zero_length_clientids"): zero_length_clientids = False if a in ["off", "false", "0"] else True elif o in ("-t", "--topicAliasMaximum"): if a.isnumeric(): topicAliasMaximum = a elif o in ("-m", "--maximumPacketSize"): if a.isnumeric(): maximumPacketSize = a elif o in ("-r", "--receiveMaximum"): if a.isnumeric(): receiveMaximum = a elif o in ("-s", "--serverKeepAlive"): if a.isnumeric(): serverKeepAlive = a elif o in ("--port"): port = int(a) else: assert False, "unhandled option" run(publish_on_pubrel=publish_on_pubrel, overlapping_single=overlapping_single, dropQoS0=dropQoS0, port=port, zero_length_clientids=zero_length_clientids, topicAliasMaximum=topicAliasMaximum, maximumPacketSize=maximumPacketSize, receiveMaximum=receiveMaximum, serverKeepAlive=serverKeepAlive)