我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用argparse.Namespace()。
def _parse_args(args): parser = argparse.ArgumentParser() if any([arg == '--version' for arg in args]): return argparse.Namespace(version=True) parser.add_argument('script', help='Script to run') parser.add_argument('target', nargs='?', default='build', help='Target object to build; defaults to \'build\'') parser.add_argument('--version', action='store_true', help='Print version info and exit') parser.add_argument('--clear', action='store_true', help='Clear output directory') parser.add_argument('--clear-cache', action='store_true', help='Clear cache before compiling') parser.add_argument('--threads', '-t', type=int, help='Set thread count; defaults to cores*2') parser.add_argument('--no-threading', '-nt', action='store_true', help='Disable multithreaded compiling') # TODO: Make target '*' instead of '?' so multiple targets could be ran from the same command return parser.parse_args(args)
def parse(self, argv): """ Parse arguments. :param argv: arguments. """ values = dict() self.data = None self._errors = list() for idx, param in enumerate(self.params): try: values[param['dest'] or param['name']] = self.parse_parameter(param, argv, idx) except ParamException as e: self._errors.append(str(e)) self.data = Namespace(**values)
def list_action_remove(self, player, values, map_dictionary, view, **kwargs): # Check permission. if not await self.instance.permission_manager.has_permission(player, 'admin:remove_map'): await self.instance.chat( '$f00You don\'t have the permission to perform this action!', player ) return # Ask for confirmation. cancel = bool(await ask_confirmation(player, 'Are you sure you want to remove the map \'{}\'$z$s from the server?'.format( map_dictionary['name'] ), size='sm')) if cancel is True: return # Simulate command. await self.remove_map(player, Namespace(nr=map_dictionary['id'])) # Reload parent view. await view.refresh(player)
def run_with_args(args, parser): # type: (argparse.Namespace, argparse.ArgumentParser) -> int set_logging_parameters(args, parser) start_time = time.time() ret = OK try: if args.profile: outline("Profiling...") profile("ret = whatstyle(args, parser)", locals(), globals()) else: ret = whatstyle(args, parser) except IOError as exc: # If the output is piped into a pager like 'less' we get a broken pipe when # the pager is quit early and that is ok. if exc.errno == errno.EPIPE: pass elif str(exc) == 'Stream closed': pass else: raise if not PY2: sys.stderr.close() iprint(INFO_TIME, 'Run time: %s seconds' % (time.time() - start_time)) return ret
def test_start_object(self): server = PJFServer(configuration=PJFConfiguration(Namespace(ports={"servers": {"HTTP_PORT": 8080, "HTTPS_PORT": 8443}}, html=False, level=6, command=["radamsa"], stdin=True, json={"a": "test"}, indent=True, strong_fuzz=False, url_encode=False, parameters=[], notify=False, debug=False, content_type="text/plain", utf8=False, nologo=True))) server.run() json_http = urllib2.urlopen("http://127.0.0.1:8080").read() try: import requests requests.packages.urllib3.disable_warnings() json_https = requests.get('https://127.0.0.1:8443', verify=False).content self.assertTrue(json_https) except ImportError: pass self.assertTrue(json_http) server.stop()
def cli_args(): """Parse the command line arguments. :return: The parsed arguments. :rtype: argparse.Namespace """ parser = argparse.ArgumentParser() parser.add_argument('-c', '--check', action='store_true', help="check if the system is vulnerable to WCry") parser.add_argument('-m', '--mitigate', action='store_true', help="mitigate the system's vulnerability by disabling the" " SMBv1 protocol, if necessary; implies --check") parser.add_argument('-f', '--fix', action='store_true') parser.add_argument('--download-directory', help="Optionally specify a directory where the Microsoft" " KB update is saved when using --fix") if len(sys.argv) == 1: parser.print_help() sys.exit(1) # else: return parser.parse_args()
def register_command(handler: Callable[[argparse.Namespace], None], main_parser: Optional[ArgParserType]=None, ) -> Callable[[argparse.Namespace], None]: if main_parser is None: main_parser = global_argparser if id(main_parser) not in _subparsers: subparsers = main_parser.add_subparsers(title='commands', dest='command') _subparsers[id(main_parser)] = subparsers else: subparsers = _subparsers[id(main_parser)] @functools.wraps(handler) def wrapped(args): handler(args) doc_summary = handler.__doc__.split('\n\n')[0] inner_parser = subparsers.add_parser(handler.__name__.replace('_', '-'), description=handler.__doc__, help=doc_summary) inner_parser.set_defaults(function=wrapped) wrapped.register_command = functools.partial(register_command, main_parser=inner_parser) wrapped.add_argument = inner_parser.add_argument return wrapped
def __call__( self, parser, # type: argparse.ArgumentParser namespace, # type: argparse.Namespace values, # type: Union[ARGPARSE_TEXT, Sequence[Any], None] option_string=None # type: Optional[ARGPARSE_TEXT] ): # type: (...) -> None """Checks to make sure that the destination is empty before writing. :raises parser.error: if destination is already set """ if getattr(namespace, self.dest) is not None: # type: ignore # typeshed doesn't know about Action.dest yet? parser.error('{} argument may not be specified more than once'.format(option_string)) return setattr(namespace, self.dest, values) # type: ignore # typeshed doesn't know about Action.dest yet?
def start(self): self.setup_sockets() import StaticUPnP_Settings permissions = Namespace(**StaticUPnP_Settings.permissions) print(permissions) if permissions.drop_permissions: self.drop_privileges(permissions.user, permissions.group) self.running = Value(ctypes.c_int, 1) self.queue = Queue() self.reciever_thread = Process(target=self.socket_handler, args=(self.queue, self.running)) self.reciever_thread.start() self.schedule_thread = Process(target=self.schedule_handler, args=(self.running,)) self.schedule_thread.start() self.response_thread = Process(target=self.response_handler, args=(self.queue, self.running)) self.response_thread.start()
def get_interface_addresses(logger): import StaticUPnP_Settings interface_config = Namespace(**StaticUPnP_Settings.interfaces) ip_addresses = StaticUPnP_Settings.ip_addresses if len(ip_addresses) == 0: import netifaces ifs = netifaces.interfaces() if len(interface_config.include) > 0: ifs = interface_config.include if len(interface_config.exclude) > 0: for iface in interface_config.exclude: ifs.remove(iface) for i in ifs: addrs = netifaces.ifaddresses(i) if netifaces.AF_INET in addrs: for addr in addrs[netifaces.AF_INET]: ip_addresses.append(addr['addr']) logger.info("Regestering multicast on %s: %s"%(i, addr['addr'])) return ip_addresses
def parse_args() -> argparse.Namespace: # pragma: no cover """ Parses the Command Line Arguments using argparse :return: The parsed arguments """ parser = argparse.ArgumentParser() parser.add_argument("connection", help="The Type of Connection to use") parser.add_argument("-v", "--verbose", action="store_true", help="Activates verbose output") parser.add_argument("-d", "--debug", action="store_true", help="Activates debug-level logging output") parser.add_argument("-q", "--quiet", action="store_true", help="Disables all text output") parser.add_argument("-c", "--config", default=os.path.join(os.path.expanduser("~"), ".kudubot"), help="Overrides the configuration directory location") return parser.parse_args()
def test_init_2(tmpdir): "it should open sam file if provided" make_bam(tmpdir.strpath, """ 123456789_123456789_ r1 + ........... r1 - ......*.... r2 + .........*. r2 - .....*....... """) o = Namespace(query="test.vcf", cfdna=tmpdir.join("test.bam").strpath, gdna=None, output=None) init(o) assert isinstance(o.cfdna, AlignmentFile) assert o.gdna == None
def test_init_3(tmpdir): "it should generate a proper output file name if not provided" make_bam(tmpdir.strpath, """ 123456789_123456789_ r1 + ........... r1 - ......*.... r2 + .........*. r2 - .....*....... """) o = Namespace(query="test.vcf", cfdna=tmpdir.join("test.bam").strpath, gdna=None, output=None) init(o) assert o.output != None
def test_aggregate_reads_3(): "it should ignore when 3+ reads share the same name" o = Namespace(verbos=False, qual=20, mismatch_limit=-1) reads = ( ("r1", 'A', 60, 2, 11, -1, 2, 9, False, True), ("r1", 'C', 60, 2, 11, -1, 2, -9, True, True), ("r1", 'C', 60, 2, 11, -1, 2, 9, False, True), ("r2", 'C', 60, 2, 11, -1, 0, 0, True, False) ) unique_pairs, unique_single, _, nerror, *_ = aggregate_reads(o, reads) assert len(unique_pairs) == 0 assert len(unique_single) == 1 assert nerror == 3
def test_aggregate_reads_4(): "it should ignore when base in overlap area inconsistent between two reads" o = Namespace(verbos=False, qual=20, mismatch_limit=-1) reads = ( ("r1", 'A', 60, 2, 11, -1, 4, 11, False, True), ("r1", 'C', 60, 4, 13, -1, 2, -11, True, True), ("r2", 'C', 60, 3, 12, -1, 5, 11, False, True), ("r2", 'C', 60, 5, 14, -1, 3, -11, True, True) ) unique_pairs, unique_single, *_, ninconsis = aggregate_reads(o, reads) assert len(unique_pairs) == 1 assert ninconsis == 2
def test_aggregate_reads_5(): "it should drop reads that has too much mismatch" o = Namespace(verbos=False, qual=20, mismatch_limit=2) reads = ( ("r1", 'C', 60, 2, 11, 1, 4, 11, False, True), ("r1", 'C', 60, 4, 13, 1, 2, -11, True, True), ("r2", 'C', 60, 3, 12, 3, 5, 11, False, True), ("r2", 'C', 60, 5, 14, 1, 3, -11, True, True), ("r3", 'C', 60, 6, 14, 3, 0, 0, True, False), ("r4", 'C', 60, 7, 14, 1, 0, 0, True, False) ) unique_pairs, unique_single, *_, nlowq, ninconsis = aggregate_reads(o, reads) assert len(unique_pairs) == 1 assert len(unique_single) == 1 assert nlowq == 3
def test_count_different_type_1(): "it should NOT count dna that more than 10% reads have different bases" o = Namespace(verbos=False, allow_inconsist=False) pair = { ('ref', 2, True): [ ('A', 60), ('T', 60), ('T', 60) ], ('ref', 3, False): [ ('A', 60), ('T', 60), ('T', 60) ] } mor, mnr, msr, oor, onr, osr, moa, mna, msa, ooa, ona, osa, inconsis = count_different_type(o, pair, {}, 'T', 'A') assert inconsis == 6 assert sum((mor, mnr, msr, oor, onr, osr, moa, mna, msa, ooa, ona, osa)) == 0
def test_get_reads_1(tmpdir): "it should get all but only the reads that covers the given position" make_bam(tmpdir.strpath, """ 123456789_123456789_12 r1 + ........... r1 - ......*.... r2 + .........*. r2 - .....*....... r3 + ........... r3 - ....*...... r4 + ........... r4 - ........... 123456789_123456789_12 """) o = Namespace(verbos=False, mismatch_limit=-1) sam = AlignmentFile(tmpdir.join("test.bam").strpath) assert sum( 1 for _ in get_reads(o, sam, 'ref', '4') ) == 2 assert sum( 1 for _ in get_reads(o, sam, 'ref', '12') ) == 7 assert sum( 1 for _ in get_reads(o, sam, 'ref', '20') ) == 2
def test_get_reads_2(tmpdir): "it should read properties correctly" make_bam(tmpdir.strpath, """ 123456789_123 r1 + ...*....... r1 - .*......... """) o = Namespace(verbos=False, mismatch_limit=-1) sam = AlignmentFile(tmpdir.join("test.bam").strpath) r = next(get_reads(o, sam, 'ref', '4')) assert r[0] == "r1" # name assert r[3] == 0 # 0-based pos assert r[4] == 11 # length assert r[5] == -1 # mismatch, not caculated assert r[6] == 2 # mate pos assert r[7] == 13 # template length assert r[8] == False # is_reverse assert r[9] == True # paired and mapped
def test_pad_softclip_2(tmpdir): "it should ignore more than two reads which share the same name" make_bam(tmpdir.strpath, """ r1 + __.*....... r1 - .*.......__ r1 - .*.......__ r2 + .*.......__ r2 - .*.......__ """) o = Namespace(verbos=False, mismatch_limit=-1) sam = AlignmentFile(tmpdir.join("test.bam").strpath) adjusted_pos = pad_softclip(sam) assert sum(1 for startpos, length in adjusted_pos.values() if startpos != -1) == 1
def test_pad_softclip_3(tmpdir): "it should pad softclipped bases" make_bam(tmpdir.strpath, """ 123456789_123 r1 + __.*....... r1 - .*......... r2 - ...*....... r2 + .*.......__ """) o = Namespace(verbos=False, mismatch_limit=-1) sam = AlignmentFile(tmpdir.join("test.bam").strpath) adjusted_pos = pad_softclip(sam) assert adjusted_pos["r1"] == (0, 13) # 0-based position assert adjusted_pos["r2"] == (0, 13)
def get_api_id(config, args): """ Get the API ID from Terraform, or from AWS if that fails. :param config: configuration :type config: :py:class:`~.Config` :param args: command line arguments :type args: :py:class:`argparse.Namespace` :return: API Gateway ID :rtype: str """ try: logger.debug('Trying to get Terraform rest_api_id output') runner = TerraformRunner(config, args.tf_path) outputs = runner._get_outputs() depl_id = outputs['rest_api_id'] logger.debug("Terraform rest_api_id output: '%s'", depl_id) except Exception: logger.info('Unable to find API rest_api_id from Terraform state;' ' querying AWS.', exc_info=1) aws = AWSInfo(config) depl_id = aws.get_api_id() logger.debug("AWS API ID: '%s'", depl_id) return depl_id
def _predict(predictors: Dict[str, str]): def predict_inner(args: argparse.Namespace) -> None: predictor = _get_predictor(args, predictors) output_file = None if args.silent and not args.output_file: print("--silent specified without --output-file.") print("Exiting early because no output will be created.") sys.exit(0) # ExitStack allows us to conditionally context-manage `output_file`, which may or may not exist with ExitStack() as stack: input_file = stack.enter_context(args.input_file) # type: ignore if args.output_file: output_file = stack.enter_context(args.output_file) # type: ignore _run(predictor, input_file, output_file, args.batch_size, not args.silent, args.cuda_device) return predict_inner
def get_parsed_args(self, comp_words): """ gets the parsed args from a patched parser """ active_parsers = self._patch_argument_parser() parsed_args = argparse.Namespace() self.completing = True if USING_PYTHON2: # Python 2 argparse only properly works with byte strings. comp_words = [ensure_bytes(word) for word in comp_words] try: stderr = sys.stderr sys.stderr = io.open(os.devnull, "w") active_parsers[0].parse_known_args(comp_words, namespace=parsed_args) sys.stderr.close() sys.stderr = stderr except BaseException: pass self.completing = False return parsed_args
def test_with_config(self, is_file, open_): # Create our stand-in config file. config_file = textwrap.dedent(u"""\ --- local_paths: reporoot: ~/Code publish: local """) is_file.return_value = True open_.return_value = io.StringIO(config_file) # Get the config and test the result. flags = Namespace(user_config='/bogus/file.yaml', command='not_init') user_config = main.read_user_config(flags) assert user_config == { 'local_paths': {'reporoot': '~/Code'}, 'publish': 'local', }
def test_with_ssh_repo(self, login): # Set up test data to return when we attempt to make the # pull request. gh = mock.MagicMock(spec=github3.github.GitHub) login.return_value = gh url = 'https://github.com/me/repo/pulls/1/' gh.repository().create_pull.return_value = Namespace(html_url=url) # Run the task. task = github.CreateGitHubPullRequest() pr = task.execute(**self.task_kwargs) # Assert we got the correct result. assert pr.html_url == url # Assert that the correct methods were called. login.assert_called_once_with('lukesneeringer', '1335020400') gh.repository.assert_called_with('me', 'repo') gh.repository().create_pull.assert_called_once_with( base='master', body='This pull request was generated by artman. ' 'Please review it thoroughly before merging.', head='pubsub-python-v1', title='Python GAPIC: Pubsub v1', )
def test_with_http_url(self, login): # Set up test data to return when we attempt to make the # pull request. gh = mock.MagicMock(spec=github3.github.GitHub) login.return_value = gh url = 'https://github.com/me/repo/pulls/1/' gh.repository().create_pull.return_value = Namespace(html_url=url) # Run the task. task = github.CreateGitHubPullRequest() pr = task.execute(**dict(self.task_kwargs, git_repo={ 'location': 'https://github/me/repo/', })) # Assert we got the correct result. assert pr.html_url == url # Assert that the correct repository method was still called. gh.repository.assert_called_with('me', 'repo')
def print_args(self): """Print out all the arguments in this parser.""" if not self.opt: self.parse_args(print_args=False) values = {} for key, value in self.opt.items(): values[str(key)] = str(value) for group in self._action_groups: group_dict = { a.dest: getattr(self.args, a.dest, None) for a in group._group_actions } namespace = argparse.Namespace(**group_dict) count = 0 for key in namespace.__dict__: if key in values: if count == 0: print('[ ' + group.title + ': ] ') count += 1 print('[ ' + key + ': ' + values[key] + ' ]')
def _test_standalone_sequana(qtbot, tmpdir): wkdir = TemporaryDirectory() inputdir = os.path.realpath( sequana_data("Hm2_GTGAAA_L005_R1_001.fastq.gz")).rsplit(os.sep,1)[0] # Standalone for sequana given a wkdir and pipeline and input_dir args = Namespace(pipeline="quality_control", wkdir=wkdir.name, input_directory=inputdir) widget = sequana_gui.SequanaGUI(ipython=False, user_options=args) qtbot.addWidget(widget) assert widget.mode == "sequana" widget.force = True widget.save_project() widget.click_run() count = 0 while widget.process.state() and count < 5: time.sleep(0.5) count+=0.5 widget.click_stop() time.sleep(1)
def _setup_arg_parser(argv): """ Parse arguments from command line Args: argv list: by default it's command line arguments Returns: argparse.Namespace: parsed argument """ parser = argparse.ArgumentParser( description='MySQL binlog to Google Cloud Pub/Sub') parser.add_argument('conf', help='configuration file for publishing') parser.add_argument('--loglevel', '-l', default=None, help='log level for root') if os.path.isfile('logging.ini'): _log_file = 'logging.ini' else: _log_file = None parser.add_argument('--logconf', default=_log_file, help='INI file log configuration') args = parser.parse_args(argv) return args
def RunTestsCommand(args): """Checks test type and dispatches to the appropriate function. Args: args: argparse.Namespace object. Returns: Integer indicated exit code. Raises: Exception: Unknown command name passed in, or an exception from an individual test runner. """ command = args.command ProcessCommonOptions(args) logging.info('command: %s', ' '.join(sys.argv)) if args.enable_platform_mode or command in _DEFAULT_PLATFORM_MODE_TESTS: return RunTestsInPlatformMode(args) if command == 'python': return _RunPythonTests(args) else: raise Exception('Unknown test type.')
def __init__(self, args=None, parse_args=True): self.cli = CLI() self.args, self.unknown_args = argparse.Namespace(), [] if parse_args: self.args, self.unknown_args = self.parse_arguments(args=args) self.settings = self.args.settings or os.environ.get('CLINNER_SETTINGS') # Inject parameters related to current stage as environment variables self.inject() # Load settings settings.build_from_module(self.args.settings) # Apply quiet mode if self.args.quiet: self.cli.disable()
def add_domains(args_or_config, domains): """Registers new domains to be used during the current client run. Domains are not added to the list of requested domains if they have already been registered. :param args_or_config: parsed command line arguments :type args_or_config: argparse.Namespace or configuration.NamespaceConfig :param str domain: one or more comma separated domains :returns: domains after they have been normalized and validated :rtype: `list` of `str` """ validated_domains = [] for domain in domains.split(","): domain = util.enforce_domain_sanity(domain.strip()) validated_domains.append(domain) if domain not in args_or_config.domains: args_or_config.domains.append(domain) return validated_domains
def update(namespace, username=None): """ This updates a Namespace (as returned by ArgumentParser) with config values if they aren't present in the Namespace already. """ if not os.path.isfile(_get_config_path()): return namespace if not username: username = "DEFAULT" conf = _get_config() if not username in conf: print("User {} is not configured.".format(username)) sys.exit(1) return update_namespace(namespace, conf[username])
def test_remove_values_having_hyphen(self): api_exts = "dvr, l3-flavors, rbac-policies, project-id" remove_exts = ["dvr", "project-id"] args = Namespace( remove={ "network-feature-enabled.api-extensions": remove_exts } ) self.conf = self._get_conf("v2.0", "v3") self.conf.set("network-feature-enabled", "api-extensions", api_exts) self.conf.remove_values(args) conf_exts = self.conf.get("network-feature-enabled", "api-extensions") conf_exts = conf_exts.split(',') for ext in api_exts.split(','): if ext in remove_exts: self.assertFalse(ext in conf_exts) else: self.assertTrue(ext in conf_exts)
def parse_arguments(args: list) -> Namespace: """ Parses the arguments passed on invocation in a dict and return it """ parser = ArgumentParser(description="A tool to combine multiple merge tools") parser.add_argument('-b', '--base', required=True) parser.add_argument('-l', '--local', required=True) parser.add_argument('-r', '--remote', required=True) parser.add_argument('-m', '--merged', required=True) # convert to absolute path parsed_arg = parser.parse_args(args) parsed_arg.base = os.path.abspath(parsed_arg.base) parsed_arg.local = os.path.abspath(parsed_arg.local) parsed_arg.remote = os.path.abspath(parsed_arg.remote) parsed_arg.merged = os.path.abspath(parsed_arg.merged) return parsed_arg
def merge(config: RawConfigParser, args: Namespace, launcher: ToolsLauncher, analyser: ConflictedFileAnalyser) -> int: """ Handle the merge tools chain for the given argument config -- the current amt configuration args -- the arguments with the base, local, remote and merged file names launcher -- the launcher helper """ if not (config.has_option(SECT_AMT, OPT_TOOLS)): raise RuntimeError('Missing the {0}.{1} configuration'.format(SECT_AMT, OPT_TOOLS)) tools = config.get(SECT_AMT, OPT_TOOLS).split(';') merge_result = ERROR_NO_TOOL for tool in tools: merge_result = merge_with_tool(tool, config, args, launcher, analyser) if merge_result == 0: return 0 print(" [AMT] ? Sorry, it seems we can't solve it this time") return merge_result
def setUp(self): options.cfg = argparse.Namespace() options.cfg.corpus = None options.cfg.current_server = "MockConnection" options.cfg.MODE = QUERY_MODE_TOKENS options.cfg.stopword_list = [] options.cfg.context_mode = CONTEXT_NONE options.cfg.drop_on_na = True options.cfg.drop_duplicates = True options.cfg.benchmark = False options.cfg.verbose = False self.Session = Session() self.Session.Resource = BaseResource() self.manager = Manager()
def setUp(self): self.maxDiff = None options.cfg = argparse.Namespace() options.cfg.number_of_tokens = 0 options.cfg.limit_matches = False options.cfg.regexp = False options.cfg.query_case_sensitive = False options.get_configuration_type = lambda: SQL_MYSQL options.get_resource = _monkeypatch_get_resource self.Session = MockOptions() self.Session.Resource = self.resource self.Session.Lexicon = None self.Session.Corpus = None self.link = coquery.links.Link( self.resource.name, "corpus_word", self.external.name, "word_label", join="LEFT JOIN") options.cfg.current_server = "Default" options.cfg.table_links = {} options.cfg.table_links[options.cfg.current_server] = [self.link]
def setUp(self): self.maxDiff = None options.cfg = argparse.Namespace() options.cfg.number_of_tokens = 0 options.cfg.limit_matches = False options.cfg.regexp = False options.cfg.query_case_sensitive = False options.get_configuration_type = lambda: SQL_MYSQL options.get_resource = _monkeypatch_get_resource self.Session = MockOptions() self.Session.Resource = self.resource self.Session.Lexicon = None self.Session.Corpus = None self.link = coquery.links.Link( self.resource.name, "word_label", self.external.name, "word_label", join="LEFT JOIN") options.cfg.current_server = "Default" options.cfg.table_links = {} options.cfg.table_links[options.cfg.current_server] = [self.link]
def setUp(self): self.server = mock.Mock() self.flow = mock.Mock() self.storage = mock.Mock() self.credentials = mock.Mock() self.flow.step1_get_authorize_url.return_value = ( 'http://example.com/auth') self.flow.step2_exchange.return_value = self.credentials self.flags = argparse.Namespace( noauth_local_webserver=True, logging_level='INFO') self.server_flags = argparse.Namespace( noauth_local_webserver=False, logging_level='INFO', auth_host_port=[8080, ], auth_host_name='localhost')
def parse(self, args): """ Parse the command line arguments. @param args: the list of user-provided command line arguments -- normally sys.argv[1:] @type args: tuple of str @return: an object initialized with the parsed arguments @rtype: argparse.Namespace """ try: return self.parser.parse_args(args) except SystemExit as se: if se.code > 0: raise AppExit('Bad command line', ExitCodes.USAGE_ERROR) else: raise AppExit(code=ExitCodes.CLEAN)
def __init__(self, args): """Min Mapping Quality Zero constructor.""" # This needs to happen first, because threshold is initialised here. super(MQ0FFilter, self).__init__(args) # Change the threshold to custom gq value. self.threshold = self._default_threshold if isinstance(args, argparse.Namespace): self.threshold = args.mq_score elif isinstance(args, dict): try: self.threshold = float(args.get(self.parameter)) except (TypeError, ValueError): logging.error("Could not retrieve threshold from %s", args.get(self.parameter)) logging.error("This parameter requires to be a float!") raise Exception("Could not create MQ0F filter from parameters: %s" % args)
def __init__(self, args): """Min Mapping Quality Zero constructor.""" # This needs to happen first, because threshold is initialised here. super(MQ0Filter, self).__init__(args) # Change the threshold to custom gq value. self.threshold = self._default_threshold if isinstance(args, argparse.Namespace): self.threshold = args.mq_score elif isinstance(args, dict): try: self.threshold = float(args.get(self.parameter)) except (TypeError, ValueError): logging.error("Could not retrieve threshold from %s", args.get(self.parameter)) logging.error("This parameter requires to be a float!") raise Exception("Could not create MQ0 filter from parameters: %s" % args)
def __init__(self, args): """AD Ratio constructor.""" # This needs to happen first, because threshold is initialised here. super(DP4Filter, self).__init__(args) # Change the threshold to custom dp value. self.threshold = self._default_threshold if isinstance(args, argparse.Namespace): self.threshold = args.ad_ratio elif isinstance(args, dict): try: self.threshold = float(args.get(self.parameter)) except (TypeError, ValueError): logging.error("Could not retrieve threshold from %s", args.get(self.parameter)) logging.error("This parameter requires to be a float!") raise Exception("Could not create DP4 filter from parameters: %s" % args)
def __init__(self, args): """Min Mapping Quality constructor.""" # This needs to happen first, because threshold is initialised here. super(MQFilter, self).__init__(args) # Change the threshold to custom gq value. self.threshold = self._default_threshold if isinstance(args, argparse.Namespace): self.threshold = args.mq_score elif isinstance(args, dict): try: self.threshold = int(args.get(self.parameter)) except (TypeError, ValueError): logging.error("Could not retrieve threshold from %s", args.get(self.parameter)) logging.error("This parameter requires to be an integer!") raise Exception("Could not create MQ filter from parameters: %s" % args)
def __init__(self, args): """Min Depth constructor.""" # This needs to happen first, because threshold is initialised here. super(GQFilter, self).__init__(args) # Change the threshold to custom gq value. self.threshold = self._default_threshold if isinstance(args, argparse.Namespace): self.threshold = args.gq_score elif isinstance(args, dict): try: self.threshold = int(args.get(self.parameter)) except (TypeError, ValueError): logging.error("Could not retrieve threshold from %s", args.get(self.parameter)) logging.error("This parameter requires to be an integer!") raise Exception("Could not create GQ filter from parameters: %s" % args)
def __init__(self, args): """Min Depth constructor.""" # This needs to happen first, because threshold is initialised here. super(QualFilter, self).__init__(args) # Change the threshold to custom gq value. self.threshold = self._default_threshold if isinstance(args, argparse.Namespace): self.threshold = args.gq_score elif isinstance(args, dict): try: self.threshold = float(args.get(self.parameter)) except (TypeError, ValueError): logging.error("Could not retrieve threshold from %s", args.get(self.parameter)) logging.error("This parameter requires to be a float!") raise Exception("Could not create QUAL filter from parameters: %s" % args)
def __init__(self, args): """Min Depth constructor.""" # This needs to happen first, because threshold is initialised here. super(UncallableGTFilter, self).__init__(args) # Change the threshold to custom gq value. self.threshold = self._default_threshold if isinstance(args, argparse.Namespace): self.threshold = args.gq_score elif isinstance(args, dict): try: self.threshold = str(args.get(self.parameter)) except (TypeError, ValueError): logging.error("Could not retrieve threshold from %s", args.get(self.parameter)) logging.error("This parameter requires to be a string!") self.threshold = None
def parse_linter_args(): """Parse command-line arguments and set up the command-line interface and help. Defaults to current working directory for the directory arg and the copy of rules.py in the directory this module is in for the rules arg. Returns ------- argparse.Namespace object subclass with arguments as attributes for each given argument """ parser = argparse.ArgumentParser() group = parser.add_mutually_exclusive_group() group.add_argument('-d', '--directory', help="The local path to your repository's base directory. Defaults to the current working directory.", default=os.getcwd() ) parser.add_argument('-r', '--rules', help='The path to the rules configuration file, a YAML file containing the rules you would like to check for. Defaults to path/to/openlinter/rules.yml.', default=os.path.join(get_current_script_dir(), 'rules.yml') ) parser.add_argument('-v', '--version', action='version', version='1.0.1') return parser.parse_args()