我们从Python开源项目中,提取了以下49个代码示例,用于说明如何使用pyparsing.ParseException()。
def parse(self, content, root=None, path_info=None): if not root: root = block.Root() try: parsed = self.parser.parse(content) except ParseException as e: error_msg = 'char {char} (line:{line}, col:{col})'.format(char=e.loc, line=e.lineno, col=e.col) if path_info: LOG.error('Failed to parse config "{file}": {error}'.format(file=path_info, error=error_msg)) else: LOG.error('Failed to parse config: {error}'.format(error=error_msg)) raise InvalidConfiguration(error_msg) if len(parsed) and parsed[0].getName() == 'file_delimiter': # Were parse nginx dump LOG.info('Switched to parse nginx configuration dump.') root_filename = self._prepare_dump(parsed) self.is_dump = True self.cwd = os.path.dirname(root_filename) parsed = self.configs[root_filename] self.parse_block(parsed, root) return root
def __init__(self, requirement_string): try: req = REQUIREMENT.parseString(requirement_string) except ParseException as e: raise InvalidRequirement( "Invalid requirement, parse error at \"{0!r}\"".format( requirement_string[e.loc:e.loc + 8])) self.name = req.name if req.url: parsed_url = urlparse.urlparse(req.url) if not (parsed_url.scheme and parsed_url.netloc) or ( not parsed_url.scheme and not parsed_url.netloc): raise InvalidRequirement("Invalid URL given") self.url = req.url else: self.url = None self.extras = set(req.extras.asList() if req.extras else []) self.specifier = SpecifierSet(req.specifier) self.marker = req.marker if req.marker else None
def test_nicks(self): """Test the nicks regex.""" nicks = [ 'FranciscoD_', 'FranciscoD|Uni', 'FranciscoD^|Uni', 'FranciscoD-_Uni', ] result = None for entry in nicks: try: result = nick.parseString(entry) except pp.ParseException as x: pass self.assertEqual(entry, result.nick)
def try_compute_type_table(banana): """ Compute the type table for the provided banana string if possible. Does not throw any exception if it fails. :type banana: str :param banana: The string to parse and type check. """ try: # Convert the grammar into an AST parser = grammar.banana_grammar() ast = parser.parse(banana) # Compute the type table for the given AST return typeck.typeck(ast) except exception.BananaException: return None except p.ParseSyntaxException: return None except p.ParseFatalException: return None except p.ParseException: return None
def validate_expression(expr_string): """ Validate the provided expression string. :type expr_string: str :param expr_string: Expression string to validate. :returns: Returns a handle that can be use to validate name usage against an environment. :raises: exception.BananaInvalidExpression """ if not isinstance(expr_string, basestring): raise exception.BananaArgumentTypeError( expected_type=basestring, received_type=type(expr_string) ) parser = ExpressionParser() try: res = parser.parse_tree(expr_string) return ExpressionHandle(res, expr_string) except p.ParseException as e: raise exception.BananaInvalidExpression(str(e))
def build_filter_string(self, filter_input_value): """ TODO: document this function :param filter_input_value: :return: """ try: parsed_filter = parse_filter(filter_input_value) except pyparsing.ParseException: raise ACFilterParsingException('Could not parse filter: "{0}"'.format(filter_input_value)) out_filter_list = list() self.process_filter_element(parsed_filter[0], out_filter_list) if out_filter_list[0] == '(': # If out filter list starts with an opening parenthesis, remove first and last positions ad both will # correspond to redundant parentheses out_filter_list = out_filter_list[1: -1] filter_string = ''.join(out_filter_list) return filter_string
def _calculate_value(self, calculation, values): """ Parse and perform a calculation using a dict of fields Using either a dict of values to field names Returns a NaN if the calculation cannot be performed, e.g. incorrect field names. """ nsp = NumericStringParser() field_regex = r'\{(.+?)\}' interpolated_calculation = re.sub(field_regex, self._replace_fields(values), calculation) try: result = nsp.eval(interpolated_calculation) except ParseException: return None return result
def _perform_calculation(self, calculation): """ Parse and perform a calculation using a dict of fields Using either a dict of values to field names Returns a NaN if the calculation cannot be performed, e.g. incorrect field names. """ nsp = NumericStringParser() field_regex = r'\{(.+?)\}' interpolated_calculation = re.sub(field_regex, self._replace_fields, calculation) try: result = nsp.eval(interpolated_calculation) except ParseException: return None return result
def post_query(self, q=None): if q is not None: try: query = query_parser.parseString(q) except pyparsing.ParseException: api.abort(501, {"cause": "Not implemented error", "detail": "q", "reason": "Query not implemented"}) resource_type = query[0] api.enforce("create resource type", {"name": resource_type}) schema = pecan.request.indexer.get_resource_type_schema() rt = schema.resource_type_from_dict(resource_type, {}, 'creating') try: pecan.request.indexer.create_resource_type(rt) except indexer.ResourceTypeAlreadyExists: pass pecan.response.status = 204
def __init__(self, session, fname, ions=None, ds_short_name="ku_latest"): self.session = session self.gfall_reader = GFALLReader(fname) if ions is not None: try: ions = parse_selected_species(ions) except ParseException: raise ValueError('Input is not a valid species string {}'.format(ions)) ions = pd.DataFrame.from_records(ions, columns=["atomic_number", "ion_charge"]) self.ions = ions.set_index(['atomic_number', 'ion_charge']) else: self.ions = None self.data_source = DataSource.as_unique(self.session, short_name=ds_short_name) if self.data_source.data_source_id is None: # To get the id if a new data source was created self.session.flush()
def compile(self): manipulation_set = pp.Optional(pp.Suppress(pp.Keyword("THEN")) + pp.Suppress("|") + pp.SkipTo(pp.Suppress(";"), include=True)) manipulation_set.setParseAction(lambda x: self._add_manipulation_set(x[0])) parser = (pp.Keyword("CONNECT") + self.connect_block.parser() + pp.Keyword("RETRIEVE") + self.retrieve_block.parser() + pp.Optional(pp.Keyword("JOIN") + self.join_block.parser())) try: parser.parseString(self.qgl_str) except pp.ParseException, e: raise QGLSyntaxError("Couldn't parse query: \n %s" % e) self._create_connectors() self._create_query_nodes() if self.join_block: self._create_joins() if self.manipulation_set_str: self.query_graph.manipulation_set.append_from_str(self.manipulation_set_str)
def _parse(self, df=None, independent_param_vals=None): expr_evaluator = Evaluator(df=df, name_dict=independent_param_vals) param_expr = expr_evaluator.parser() render_as_type = pp.Word(pp.alphas, pp.alphanums + "_$") render_as_type.setParseAction(lambda x: self._set_render_type(value=x[0])) container_type = pp.Optional(pp.Word(pp.alphas, pp.alphanums + "_$") + pp.Suppress(":"), default=None) container_type.setParseAction(lambda x: self._set_container_type(value=x[0])) parser = param_expr + pp.Suppress("->") + container_type + render_as_type try: parser.parseString(self.parameter_str) except pp.ParseException, e: raise ParameterRenderError("Error parsing parameter string: \n %s" % e) python_value = expr_evaluator.output_value() return python_value
def tokenize_str(buf): r = reduce(lambda a, b: a ^ b, map(lambda x:x[1], token_lst)) i = 0 last_token = None while i < len(buf): buf = buf[i:] try: tbuf = buf.lstrip() if len(tbuf) > 2 and tbuf[0] == "/" and tbuf[1] == "/" and tbuf[1] == "*" and last_token in token_before_regex: t = token.regex.parseString(buf) else: t = r.parseString(buf) t = t.asDict().items()[0] if t[0] not in ("single_line_comment", "multi_line_comment"): last_token = t[0] yield t i = buf.find(t[1]) + len(t[1]) except pp.ParseException as e: print e return
def test_positional_sexpr(self): AST.from_schema('sexpr', { '0': { '_tag': 'start', '_parser': number + number, }, '1': { '_tag': 'end', '_parser': number + number } }) ast = AST.parse('(sexpr (start 1 1) (end 2 2))') assert ast.start == [1.0, 1.0] assert ast.end == [2.0, 2.0] assert AST.parse(ast.to_string()) == ast with raises(ParseException): AST.parse('(sexpr (end 2 2) (start 1 1))')
def XXXX_create_cast_expression(self, tok): if tok.typeof_arg: type_expression = self.type_manager.get_type_of( tok.typeof_arg.first) else: type_expression = tok.simple_type # Check that casting makes sense. target = self.type_manager.get_type_of(type_expression) if not target: raise pyparsing.ParseException( "%s is not a type" % target) return c_ast.CFunctionCall( function_name='()', arguments=[ c_ast.CLiteral(target), tok.expression, ], )
def _make_attribute(self, tok): """Compose a c_ast.CAttribute() object for each attribute.""" result = [] for attr_specifier in tok: expression = [] if attr_specifier.args: # Try to parse the expression if possible. try: expression = [self.expression_parser.parse( attr_specifier.args)] except pyparsing.ParseException: pass result.append(c_ast.CAttribute( attr_specifier.name.first, *expression)) return result
def FromString(cls, desc): """Parse this stop condition from a string representation. The string needs to match: run_time number [seconds|minutes|hours|days|months|years] Args: desc (str): The description Returns: TimeBasedStopCondition """ parse_exp = Literal(u'run_time').suppress() + time_interval(u'interval') try: data = parse_exp.parseString(desc) return TimeBasedStopCondition(data[u'interval'][0]) except ParseException: raise ArgumentError(u"Could not parse time based stop condition")
def _parse_line(self, line_no, line): """Parse a line in a TileBus file Args: lineno (int): The line number for printing useful error messages line (string): The line that we are trying to parse """ try: matched = statement.parseString(line) except ParseException, exc: raise DataError("Error parsing line in TileBus file", line_number=line_no, column=exc.col, contents=line) if 'symbol' in matched: self._parse_cmd(matched) elif 'filename' in matched: self._parse_include(matched) elif 'variable' in matched: self._parse_assignment(matched) elif 'interface' in matched: self._parse_interface(matched) elif 'configvar' in matched: self._parse_configvar(matched)
def prepare(self, fp): def _read_and_parse(): for i, line in enumerate((x.strip() for x in fp.readlines())): if i == 0: continue m = None for p in patterns: try: m = p.parseString(line, parseAll=True) fields = m['group'], m['host'], m['graph'], m.get('subgraph', None), m.get('datarow', None), \ m['attribute'], m['value'] #logging.debug("%-30s%-30s%-30s%-30s%-30s%-30s%s", *fields) yield Row(*fields) break except ParseException: continue if not m: logger.error("No pattern matched line: %s", line) for f in _read_and_parse(): yield f
def dice(self, *, input : str = '6'): ''' Roll dice Inputs: Examples: S | S - number of sides (default is 6) [6 | 12] AdS | A - amount (default is 1) [5d6 | 2d10] AdSt | t - return total [2d6t | 20d5t] AdSs | s - return sorted [4d6s | 5d8s] AdS^H | ^H - return highest H rolls [10d6^4 | 2d7^1] AdSvL | vL - return lowest L rolls [15d7v2 | 8d9v2] ''' # TODO: Add documentation on arithmetic/basic integer operations if 'd' not in input: input = 'd' + input with multiprocessing.Pool(1) as pool: async_result = pool.apply_async(dice.roll, (input,)) future = self.bot.loop.run_in_executor(None, async_result.get, 10.0) try: result = await asyncio.wait_for(future, 10.0, loop = self.bot.loop) if type(result) is int: await self.bot.embed_reply(result) else: await self.bot.embed_reply(", ".join(str(roll) for roll in result)) except discord.errors.HTTPException: await self.bot.embed_reply(":no_entry: Output too long") except pyparsing.ParseException: await self.bot.embed_reply(":no_entry: Invalid input") except (concurrent.futures.TimeoutError, multiprocessing.context.TimeoutError): await self.bot.embed_reply(":no_entry: Execution exceeded time limit")
def parseAssert(self, text): try: parsed = ASSERT.parseString(text, parseAll=True).asList() except pp.ParseException as e: raise ParseException('Check for the appropriate syntax!', e.lineno, e.line) # Returns the AST built by the parser. return parsed
def parseRetract(self, text): try: parsed = RETRACT.parseString(text, parseAll=True).asList() except pp.ParseException as e: raise ParseException('Check for the appropriate syntax!', e.lineno, e.line) # Returns the AST built by the parser. return parsed
def parseProgram(self, text): try: parsed = PROGRAM.parseString(text, parseAll=True).asList() except pp.ParseException as e: raise ParseException('Check for the appropriate syntax!', e.lineno, e.line) # Returns the AST built by the parser. return parsed
def getkerneldescriptors(self, ast, var_name='meta_args'): descs = ast.get_variable(var_name) if descs is None: raise ParseError( "kernel call does not contain a {0} type".format(var_name)) try: nargs = int(descs.shape[0]) except AttributeError: raise ParseError( "kernel metadata {0}: {1} variable must be an array". format(self._name, var_name)) if len(descs.shape) is not 1: raise ParseError( "kernel metadata {0}: {1} variable must be a 1 dimensional " "array".format(self._name, var_name)) if descs.init.find("[") is not -1 and descs.init.find("]") is not -1: # there is a bug in f2py raise ParseError( "Parser does not currently support [...] initialisation for " "{0}, please use (/.../) instead".format(var_name)) try: inits = expr.FORT_EXPRESSION.parseString(descs.init)[0] except ParseException: raise ParseError("kernel metadata has an invalid format {0}". format(descs.init)) nargs = int(descs.shape[0]) if len(inits) != nargs: raise ParseError( "Error, in {0} specification, the number of args {1} and " "number of dimensions {2} do not match". format(var_name, nargs, len(inits))) return inits
def test_abort_on_parse_failure(self): with open(util.get_data_filename('broken.conf')) as handle: self.assertRaises(ParseException, load, handle)
def _parse_files(self, filepath, override=False): """Parse files from a glob :param str filepath: Nginx config file path :param bool override: Whether to parse a file that has been parsed :returns: list of parsed tree structures :rtype: list """ files = glob.glob(filepath) # nginx on unix calls glob(3) for this # XXX Windows nginx uses FindFirstFile, and # should have a narrower call here trees = [] for item in files: if item in self.parsed and not override: continue try: with open(item) as _file: parsed = nginxparser.load(_file) self.parsed[item] = parsed trees.append(parsed) except IOError: logger.warning("Could not open file: %s", item) except pyparsing.ParseException as err: logger.debug("Could not parse file: %s due to %s", item, err) return trees
def test_predicate_failure(self): """Checks that if there's a problem with the relation/object, that an error gets thrown""" statement = 'composite(p(HGNC:CASP8),p(HGNC:FADD),a(ADO:"Abeta_42")) -> nope(GOBP:"neuron apoptotic process")' with self.assertRaises(ParseException): self.parser.relation.parseString(statement)
def parse_statements(graph, statements, bel_parser): """Parses a list of statements from a BEL Script. :param BELGraph graph: A BEL graph :param iter[str] statements: An enumerated iterable over the lines in the statements section of a BEL script :param BelParser bel_parser: A BEL parser """ t = time.time() for line_number, line in statements: try: bel_parser.parseString(line, line_number=line_number) except ParseException as e: parse_log.error('Line %07d - General Parser Failure: %s', line_number, line) graph.add_warning(line_number, line, BelSyntaxError(line_number, line, e.loc), bel_parser.get_annotations()) except PyBelWarning as e: parse_log.warning('Line %07d - %s: %s', line_number, e.__class__.__name__, e) graph.add_warning(line_number, line, e, bel_parser.get_annotations()) except Exception as e: parse_log.exception('Line %07d - General Failure: %s', line_number, line) graph.add_warning(line_number, line, e, bel_parser.get_annotations()) log.info('Parsed statements section in %.02f seconds with %d warnings', time.time() - t, len(graph.warnings)) for k, v in sorted(Counter(e.__class__.__name__ for _, _, e, _ in graph.warnings).items(), reverse=True): log.debug(' %s: %d', k, v)
def match(cmp_value, spec): """Match a given value to a given spec DSL.""" expr = make_grammar() try: ast = expr.parseString(spec) except pyparsing.ParseException: ast = [spec] if len(ast) == 1: return ast[0] == cmp_value op = op_methods[ast[0]] return op(cmp_value, *ast[1:])
def split_by_commas(value): """Split values by commas and quotes according to api-wg :param value: value to be split .. versionadded:: 3.17 """ word = (pp.QuotedString(quoteChar='"', escChar='\\') | pp.Word(pp.printables, excludeChars='",')) grammar = pp.stringStart + pp.delimitedList(word) + pp.stringEnd try: return list(grammar.parseString(value)) except pp.ParseException: raise ValueError("Invalid value: %s" % value)
def __init__(self, marker): try: self._markers = _coerce_parse_result(MARKER.parseString(marker)) except ParseException as e: err_str = "Invalid marker: {0!r}, parse error at {1!r}".format( marker, marker[e.loc:e.loc + 8]) raise InvalidMarker(err_str)