我们从Python开源项目中,提取了以下49个代码示例,用于说明如何使用pyparsing.Group()。
def parser(self): join_type = (pp.Literal("LEFT") | pp.Literal("RIGHT") | pp.Literal("INNER") | pp.Literal("OUTER")) node_name = pp.Word(pp.alphas, pp.alphanums + "_$") col_name = pp.Word(pp.alphas, pp.alphanums + "_$") col_name_list = pp.Group(pp.delimitedList(col_name, delim=",")) l_brac = pp.Suppress("[") r_brac = pp.Suppress("]") single_join = (join_type + pp.Suppress("(") + node_name + l_brac + col_name_list + r_brac + pp.Suppress("==>") + node_name + l_brac + col_name_list + r_brac + pp.Suppress(")")) single_join.addParseAction(lambda x: self._add_join(join_type=x[0], child_node_name=x[1], child_cols=x[2], parent_node_name=x[3], parent_cols=x[4])) join_block = pp.OneOrMore(single_join) return join_block
def __init__(self, identifier_parser=None): """ :param IdentifierParser identifier_parser: An identifier parser for checking the 3P and 5P partners """ self.identifier_parser = identifier_parser if identifier_parser is not None else IdentifierParser() identifier = self.identifier_parser.language reference_seq = oneOf(['r', 'p', 'c']) coordinate = pyparsing_common.integer | '?' missing = Keyword('?') range_coordinate = missing(FUSION_MISSING) | ( reference_seq(FUSION_REFERENCE) + Suppress('.') + coordinate(FUSION_START) + Suppress('_') + coordinate( FUSION_STOP)) self.language = fusion_tags + nest(Group(identifier)(PARTNER_5P), Group(range_coordinate)(RANGE_5P), Group(identifier)(PARTNER_3P), Group(range_coordinate)(RANGE_3P)) super(FusionParser, self).__init__(self.language)
def _parse_atat_lattice(lattice_in): """Parse an ATAT-style `lat.in` string. The parsed string will be in three groups: (Coordinate system) (lattice) (atoms) where the atom group is split up into subgroups, each describing the position and atom name """ float_number = Regex(r'[-+]?[0-9]*\.?[0-9]+([eE][-+]?[0-9]+)?').setParseAction(lambda t: [float(t[0])]) vector = Group(float_number + float_number + float_number) angles = vector vector_line = vector + Suppress(LineEnd()) coord_sys = Group((vector_line + vector_line + vector_line) | (vector + angles + Suppress(LineEnd()))) lattice = Group(vector + vector + vector) atom = Group(vector + Group(OneOrMore(Word(alphas + '_')))) atat_lattice_grammer = coord_sys + lattice + Group(OneOrMore(atom)) # parse the input string and convert it to a POSCAR string return atat_lattice_grammer.parseString(lattice_in)
def expression(self): expression = pyparsing.Forward() # (1 + (2 + 3)) nested_expression = pyparsing.nestedExpr( "(", ")", expression).setParseAction(self._combine_lists) # FOO(2 , 3) function_call = ( _TOKEN().setResultsName("function") + _OPEN_PARENTHESIS() + pyparsing.delimitedList( pyparsing.Combine(expression, adjacent=False, joinString=" "), delim=",").setResultsName("func_args") + _CLOSE_PARENTHESIS() ) expression << pyparsing.OneOrMore( function_call.setParseAction(self._is_known_function) | pyparsing.Group(nested_expression) | _TOKEN() | _NOT_TOKEN() ) return pyparsing.Combine(expression, adjacent=False, joinString=" ")
def _maybe_attributes(self): """Possibly match some attributes. The syntax of attributes is described here: https://gcc.gnu.org/onlinedocs/gcc/Attribute-Syntax.html """ return pyparsing.Group( pyparsing.ZeroOrMore( _ATTRIBUTE + _DOUBLE_OPEN_PARENTHESIS + pyparsing.delimitedList( pyparsing.Group( self._identifier()("name") + pyparsing.Optional( _OPEN_PARENTHESIS + parsers.anything_beetween("()")("args") + _CLOSE_PARENTHESIS ) ) ) + _DOUBLE_CLOSE_PARENTHESIS ).setParseAction(self._make_attribute) )("attributes")
def _create_simple_statements(): global binary, ident, rvalue, simple_statement, semi, comp, number, slot_id, callrpc_stmt, generic_statement, streamer_stmt, stream, selector if simple_statement is not None: return meta_stmt = Group(Literal('meta').suppress() + ident + Literal('=').suppress() + rvalue + semi).setResultsName('meta_statement') require_stmt = Group(Literal('require').suppress() + ident + comp + rvalue + semi).setResultsName('require_statement') set_stmt = Group(Literal('set').suppress() - (ident | number) - Literal("to").suppress() - (rvalue | binary) - Optional(Literal('as').suppress() + config_type) + semi).setResultsName('set_statement') callrpc_stmt = Group(Literal("call").suppress() + (ident | number) + Literal("on").suppress() + slot_id + Optional(Literal("=>").suppress() + stream('explicit_stream')) + semi).setResultsName('call_statement') streamer_stmt = Group(Optional(Literal("manual")('manual')) + Optional(oneOf(u'encrypted signed')('security')) + Optional(Literal(u'realtime')('realtime')) + Literal('streamer').suppress() - Literal('on').suppress() - selector('selector') - Optional(Literal('to').suppress() - slot_id('explicit_tile')) - Optional(Literal('with').suppress() - Literal('streamer').suppress() - number('with_other')) - semi).setResultsName('streamer_statement') copy_stmt = Group(Literal("copy").suppress() - Optional(oneOf("all count average")('modifier')) - Optional(stream('explicit_input') | number('constant_input')) - Literal("=>") - stream("output") - semi).setResultsName('copy_statement') trigger_stmt = Group(Literal("trigger") - Literal("streamer") - number('index') - semi).setResultsName('trigger_statement') simple_statement = meta_stmt | require_stmt | set_stmt | callrpc_stmt | streamer_stmt | trigger_stmt | copy_stmt # In generic statements, keep track of the location where the match started for error handling locator = Empty().setParseAction(lambda s, l, t: l)('location') generic_statement = Group(locator + Group(ZeroOrMore(Regex(u"[^{};]+")) + Literal(u';'))('match')).setResultsName('unparsed_statement')
def _create_block_bnf(): global block_bnf, time_interval, slot_id, statement, block_id, ident, stream if block_bnf is not None: return trigger_clause = Group(stream_trigger | Group(stream).setResultsName('stream_always') | Group(ident).setResultsName('identifier')) every_block_id = Group(Literal(u'every').suppress() - time_interval).setResultsName('every_block') when_block_id = Group(Literal(u'when').suppress() + Literal("connected").suppress() - Literal("to").suppress() - slot_id).setResultsName('when_block') latch_block_id = Group(Literal(u'when').suppress() - stream_trigger).setResultsName('latch_block') config_block_id = Group(Literal(u'config').suppress() - slot_id).setResultsName('config_block') on_block_id = Group(Literal(u'on').suppress() - trigger_clause.setResultsName('triggerA') - Optional((Literal("and") | Literal("or")) - trigger_clause.setResultsName('triggerB'))).setResultsName('on_block') block_id = every_block_id | when_block_id | latch_block_id | config_block_id | on_block_id block_bnf = Forward() statement = generic_statement | block_bnf block_bnf << Group(block_id + Group(Literal(u'{').suppress() + ZeroOrMore(statement) + Literal(u'}').suppress())).setResultsName('block')
def __init__(self, identifier_parser=None): """ :param IdentifierParser identifier_parser: An identifier parser for checking the 3P and 5P partners """ self.identifier_parser = identifier_parser if identifier_parser is not None else IdentifierParser() gmod_default_ns = oneOf(list(language.gmod_namespace.keys())).setParseAction(self.handle_gmod_default) gmod_identifier = Group(self.identifier_parser.identifier_qualified) | Group(gmod_default_ns) self.language = gmod_tag + nest(gmod_identifier(IDENTIFIER)) super(GmodParser, self).__init__(self.language)
def build_legacy_fusion(identifier, reference): break_start = (ppc.integer | '?').setParseAction(fusion_break_handler_wrapper(reference, start=True)) break_end = (ppc.integer | '?').setParseAction(fusion_break_handler_wrapper(reference, start=False)) res = identifier(PARTNER_5P) + WCW + fusion_tags + nest(identifier(PARTNER_3P) + Optional( WCW + Group(break_start)(RANGE_5P) + WCW + Group(break_end)(RANGE_3P))) res.setParseAction(fusion_legacy_handler) return res
def __init__(self, identifier_parser=None): """ :param IdentifierParser identifier_parser: An identifier parser for checking the 3P and 5P partners """ identifier_parser = identifier_parser if identifier_parser is not None else IdentifierParser() super(LocationParser, self).__init__(Group(location_tag + nest(identifier_parser.language))(LOCATION))
def parse_filter_str(self, filter_str): """ method to parse filter string """ prop = pp.WordStart(pp.alphas) + pp.Word(pp.alphanums + "_").setResultsName("prop") value = (pp.QuotedString("'") | pp.QuotedString('"') | pp.Word( pp.printables, excludeChars=",")).setResultsName("value") types_ = pp.oneOf("re eq ne gt ge lt le").setResultsName("types") flags = pp.oneOf("C I").setResultsName("flags") comma = pp.Literal(',') quote = (pp.Literal("'") | pp.Literal('"')).setResultsName("quote") type_exp = pp.Group(pp.Literal("type") + pp.Literal( "=") + quote + types_ + quote).setResultsName("type_exp") flag_exp = pp.Group(pp.Literal("flag") + pp.Literal( "=") + quote + flags + quote).setResultsName("flag_exp") semi_expression = pp.Forward() semi_expression << pp.Group(pp.Literal("(") + prop + comma + value + pp.Optional(comma + type_exp) + pp.Optional(comma + flag_exp) + pp.Literal(")") ).setParseAction( self.parse_filter_obj).setResultsName("semi_expression") expr = pp.Forward() expr << pp.operatorPrecedence(semi_expression, [ ("not", 1, pp.opAssoc.RIGHT, self.not_operator), ("and", 2, pp.opAssoc.LEFT, self.and_operator), ("or", 2, pp.opAssoc.LEFT, self.or_operator) ]) result = expr.parseString(filter_str) return result
def __init__(self, ffilter, queue_out): FuzzQueue.__init__(self, queue_out) Thread.__init__(self) self.setName('filter_thread') self.queue_out = queue_out if PYPARSING: element = oneOf("c l w h") digits = "XB0123456789" integer = Word( digits )#.setParseAction( self.__convertIntegers ) elementRef = Group(element + oneOf("= != < > >= <=") + integer) operator = oneOf("and or") definition = elementRef + ZeroOrMore( operator + elementRef) nestedformula = Group(Suppress(Optional(Literal("("))) + definition + Suppress(Optional(Literal(")")))) self.finalformula = nestedformula + ZeroOrMore( operator + nestedformula) elementRef.setParseAction(self.__compute_element) nestedformula.setParseAction(self.__compute_formula) self.finalformula.setParseAction(self.__myreduce) self.res = None self.hideparams = ffilter if "XXX" in self.hideparams['codes']: self.hideparams['codes'].append("0") self.baseline = None
def __init__(self): if PYPARSING: category = Word( alphas + "_-*", alphanums + "_-*" ) operator = oneOf("and or ,") neg_operator = "not" elementRef = category definition = elementRef + ZeroOrMore( operator + elementRef) nestedformula = Group(Suppress(Optional(Literal("("))) + definition + Suppress(Optional(Literal(")")))) neg_nestedformula = Optional(neg_operator) + nestedformula self.finalformula = neg_nestedformula + ZeroOrMore( operator + neg_nestedformula) elementRef.setParseAction(self.__compute_element) neg_nestedformula.setParseAction(self.__compute_neg_formula) nestedformula.setParseAction(self.__compute_formula) self.finalformula.setParseAction(self.__myreduce)
def getchunk(): """ Using pyparsing, create chunk reader for chunk strings. """ slot = pp.Word("".join([pp.alphas, "_"]), "".join([pp.alphanums, "_"])) special_value = pp.Group(pp.oneOf([ACTRVARIABLE, "".join([ACTRNEG, ACTRVARIABLE]), ACTRNEG, VISIONGREATER, VISIONSMALLER, "".join([VISIONGREATER, ACTRVARIABLE]), "".join([VISIONSMALLER, ACTRVARIABLE])])\ + pp.Word("".join([pp.alphanums, "_", '"', "'"]))) strvalue = pp.QuotedString('"', unquoteResults=False) strvalue2 = pp.QuotedString("'", unquoteResults=False) varvalue = pp.Word("".join([pp.alphanums, "_"])) value = varvalue | special_value | strvalue | strvalue2 chunk_reader = pp.OneOrMore(pp.Group(slot + value)) return chunk_reader
def getrule(): """ Using pyparsing, get rule out of a string. """ arrow = pp.Literal("==>") buff = pp.Word(pp.alphas, "".join([pp.alphanums, "_"])) special_valueLHS = pp.oneOf([x for x in _LHSCONVENTIONS.keys()]) end_buffer = pp.Literal(">") special_valueRHS = pp.oneOf([x for x in _RHSCONVENTIONS.keys()]) chunk = getchunk() rule_reader = pp.Group(pp.OneOrMore(pp.Group(special_valueLHS + buff + end_buffer + pp.Group(pp.Optional(chunk))))) + arrow + pp.Group(pp.OneOrMore(pp.Group(special_valueRHS + buff + end_buffer + pp.Group(pp.Optional(chunk))))) return rule_reader
def group(cls, expr): def group_action(s, l, t): try: lst = t[0].asList() except (IndexError, AttributeError): lst = t return [cls(lst)] return Group(expr).setParseAction(group_action)
def main(): word = Word(alphanums) command = Group(OneOrMore(word)) token = Suppress("->") device = Group(OneOrMore(word)) argument = Group(OneOrMore(word)) event = command + token + device + Optional(token + argument) gate = Gate() garage = Garage() airco = Aircondition() heating = Heating() boiler = Boiler() fridge = Fridge() tests = ('open -> gate', 'close -> garage', 'turn on -> aircondition', 'turn off -> heating', 'increase -> boiler temperature -> 5 degrees', 'decrease -> fridge temperature -> 2 degrees') open_actions = {'gate':gate.open, 'garage':garage.open, 'aircondition':airco.turn_on, 'heating':heating.turn_on, 'boiler temperature':boiler.increase_temperature, 'fridge temperature':fridge.increase_temperature} close_actions = {'gate':gate.close, 'garage':garage.close, 'aircondition':airco.turn_off, 'heating':heating.turn_off, 'boiler temperature':boiler.decrease_temperature, 'fridge temperature':fridge.decrease_temperature} for t in tests: if len(event.parseString(t)) == 2: # no argument cmd, dev = event.parseString(t) cmd_str, dev_str = ' '.join(cmd), ' '.join(dev) if 'open' in cmd_str or 'turn on' in cmd_str: open_actions[dev_str]() elif 'close' in cmd_str or 'turn off' in cmd_str: close_actions[dev_str]() elif len(event.parseString(t)) == 3: # argument cmd, dev, arg = event.parseString(t) cmd_str, dev_str, arg_str = ' '.join(cmd), ' '.join(dev), ' '.join(arg) num_arg = 0 try: num_arg = int(arg_str.split()[0]) # extract the numeric part except ValueError as err: print("expected number but got: '{}'".format(arg_str[0])) if 'increase' in cmd_str and num_arg > 0: open_actions[dev_str](num_arg) elif 'decrease' in cmd_str and num_arg > 0: close_actions[dev_str](num_arg)
def build_parser(self): parsed_term = pyparsing.Group(pyparsing.Combine(pyparsing.Word(pyparsing.alphanums) + \ pyparsing.Suppress('*'))).setResultsName('wildcard') | \ pyparsing.Group(pyparsing.Combine(pyparsing.Word(pyparsing.alphanums+"._") + \ pyparsing.Word(':') + pyparsing.Group(pyparsing.Optional("\"") + \ pyparsing.Optional("<") + pyparsing.Optional(">") + pyparsing.Optional("=") + \ pyparsing.Optional("-") + pyparsing.Word(pyparsing.alphanums+"._/") + \ pyparsing.Optional("&") + pyparsing.Optional("<") + pyparsing.Optional(">") + \ pyparsing.Optional("=") + pyparsing.Optional("-") + \ pyparsing.Optional(pyparsing.Word(pyparsing.alphanums+"._/")) + \ pyparsing.Optional("\"")))).setResultsName('fields') | \ pyparsing.Group(pyparsing.Combine(pyparsing.Suppress('-')+ \ pyparsing.Word(pyparsing.alphanums+"."))).setResultsName('not_term') | \ pyparsing.Group(pyparsing.Word(pyparsing.alphanums)).setResultsName('term') parsed_or = pyparsing.Forward() parsed_quote_block = pyparsing.Forward() parsed_quote_block << ((parsed_term + parsed_quote_block) | parsed_term ) parsed_quote = pyparsing.Group(pyparsing.Suppress('"') + parsed_quote_block + \ pyparsing.Suppress('"')).setResultsName("quotes") | parsed_term parsed_parenthesis = pyparsing.Group((pyparsing.Suppress("(") + parsed_or + \ pyparsing.Suppress(")"))).setResultsName("parenthesis") | parsed_quote parsed_and = pyparsing.Forward() parsed_and << (pyparsing.Group(parsed_parenthesis + pyparsing.Suppress(pyparsing.Keyword("and")) + \ parsed_and).setResultsName("and") | \ pyparsing.Group(parsed_parenthesis + pyparsing.OneOrMore(~pyparsing.oneOf("or and") + \ parsed_and)).setResultsName("and") | parsed_parenthesis) parsed_or << (pyparsing.Group(parsed_and + pyparsing.Suppress(pyparsing.Keyword("or")) + \ parsed_or).setResultsName("or") | parsed_and) return parsed_or.parseString
def entry(): name = Word(alphas, alphas+'_') value = Word(nums, nums+".").setResultsName('nominal_value') uncert = Word(nums).setResultsName('std_dev') value_uncert = value + Suppress("(") + uncert + Suppress(")") return Dict( Group(name + Suppress("=") + value_uncert ) )
def parser(cls): lpar = pp.Suppress("(") rpar = pp.Suppress(")") mutate = pp.Suppress('mutate') col_name = pp.Word(pp.alphas, pp.alphanums + "_$") expr_evaluator = Evaluator(deferred_eval=True) col_expr = expr_evaluator.parser() mutation = col_name + pp.Suppress("=") + col_expr mutation.setParseAction(lambda x: {'col_name': x[0], 'col_expr': x[1]}) mutations = pp.Group(pp.delimitedList(mutation)) parser = mutate + lpar + mutations + rpar parser.setParseAction(lambda x: Mutate(mutations=x)) return parser
def parser(cls): rename = pp.Suppress("rename") rename_kwarg = common_parsers.column + pp.Suppress("=") + common_parsers.column rename_kwarg.setParseAction(lambda x: {x[0]: x[1]}) kwargs = pp.Group(pp.delimitedList(rename_kwarg)) kwargs.setParseAction(lambda x: {k: v for d in x for k, v in d.items()}) parser = rename + pp.Suppress("(") + kwargs + pp.Suppress(")") parser.setParseAction(lambda x: Rename(columns=x[0])) return parser
def parser(cls): select = pp.Suppress("select") column = common_parsers.column parser = select + pp.Suppress("(") + pp.Group(pp.delimitedList(column)) + pp.Suppress(")") parser.setParseAction(lambda x: Select(columns=x[0])) return parser
def parser(cls): remove = pp.Suppress("remove") column = common_parsers.column parser = remove + pp.Suppress("(") + pp.Group(pp.delimitedList(column)) + pp.Suppress(")") parser.setParseAction(lambda x: Remove(columns=x[0])) return parser
def parser(cls): unpack = pp.Suppress("unpack") packed_col_name = common_parsers.column dict_key = pp.Suppress("[") + pp.QuotedString(quoteChar="'") + pp.Suppress("]") dict_key_grp = pp.Group(pp.OneOrMore(dict_key)) new_col_name = common_parsers.column unpack_arg = new_col_name + pp.Suppress("=") + packed_col_name + dict_key_grp unpack_arg.setParseAction(lambda x: {'packed_col': x[1], 'key_list': x[2], 'new_col_name': x[0]}) parser = unpack + pp.Suppress("(") + pp.delimitedList(unpack_arg) + pp.Suppress(")") parser.setParseAction(lambda x: Unpack(unpack_list=x)) return parser
def parser(self): query_key = pp.Keyword("QUERY") query_value = pp.Suppress("|") + pp.SkipTo(pp.Suppress(";"), include=True) fields_key = pp.Keyword("FIELDS") field_name = common_parsers.column field_name_list = pp.Group(pp.delimitedList(field_name, delim=",")).setParseAction(lambda x: x.asList()) fields_block = (pp.Suppress(fields_key) + field_name_list) connector_name = pp.Word(pp.alphas, pp.alphanums + "_$") using_block = pp.Suppress("USING") + connector_name then_key = pp.Suppress("THEN") manipulation_set = pp.Suppress("|") + pp.SkipTo(pp.Suppress(";"), include=True) then_block = then_key + manipulation_set as_key = pp.Suppress("AS") node_name = pp.Word(pp.alphas, pp.alphanums + "_$") as_block = as_key + node_name query_node_block = (pp.Suppress(query_key) + query_value + pp.Optional(fields_block, default=None) + using_block + pp.Optional(then_block, default=None) + as_block) query_node_block.setParseAction(lambda x: self._add_query_node(query_value=x[0], connector_name=x[2], node_name=x[4], fields=x[1], manipulation_set=x[3])) single_query_node = query_node_block + pp.Optional(pp.Suppress("---")) retrieve_block = pp.OneOrMore(single_query_node) return retrieve_block
def parser(self): # Define punctuation as suppressed literals. lparen, rparen, lbrack, rbrack, lbrace, rbrace, colon = \ map(pp.Suppress, "()[]{}:") integer = pp.Combine(pp.Optional(pp.oneOf("+ -")) + pp.Word(pp.nums)) \ .setName("integer") \ .setParseAction(lambda toks: int(toks[0])) real = pp.Combine(pp.Optional(pp.oneOf("+ -")) + pp.Word(pp.nums) + "." + pp.Optional(pp.Word(pp.nums)) + pp.Optional(pp.oneOf("e E") + pp.Optional(pp.oneOf("+ -")) + pp.Word(pp.nums))) \ .setName("real") \ .setParseAction(lambda toks: float(toks[0])) _datetime_arg = (integer | real) datetime_args = pp.Group(pp.delimitedList(_datetime_arg)) _datetime = pp.Suppress(pp.Literal('datetime') + pp.Literal("(")) + datetime_args + pp.Suppress(")") _datetime.setParseAction(lambda x: self._make_datetime(x[0])) tuple_str = pp.Forward() list_str = pp.Forward() dict_str = pp.Forward() list_item = real | integer | _datetime | pp.quotedString.setParseAction(pp.removeQuotes) | \ pp.Group(list_str) | tuple_str | dict_str tuple_str << (pp.Suppress("(") + pp.Optional(pp.delimitedList(list_item)) + pp.Optional(pp.Suppress(",")) + pp.Suppress(")")) tuple_str.setParseAction(lambda toks : tuple(toks.asList())) list_str << (lbrack + pp.Optional(pp.delimitedList(list_item) + pp.Optional(pp.Suppress(","))) + rbrack) dict_entry = pp.Group(list_item + colon + list_item) dict_str << (lbrace + pp.Optional(pp.delimitedList(dict_entry) + pp.Optional(pp.Suppress(","))) + rbrace) dict_str.setParseAction(lambda toks: dict(toks.asList())) return list_item
def __setattr__(self, k, v): if isinstance(v, pp.ParserElement): Grammar.grm[k] = pp.Group(v).setResultsName(k)
def statement(): return pyparsing.Group( _IDENTIFIER.setResultsName("lhs") + _EQUALS + pyparsing.Combine( (anything_in_curly() | pyparsing.QuotedString("'", escChar="\\", unquoteResults=False) | pyparsing.QuotedString("\"", escChar="\\", unquoteResults=False) | _REGEX) + pyparsing.ZeroOrMore(_KEYWORD), adjacent=False, joinString=" ", ).setResultsName("rhs") )
def _arguments(self): return pyparsing.Group( pyparsing.Optional( pyparsing.delimitedList(self.expression())))
def _arguments(self): return pyparsing.Group( pyparsing.Optional(pyparsing.delimitedList(self._argument())) )
def _multiword_argument(self): return pyparsing.Group( self._variable() + pyparsing.OneOrMore(self._variable()) ).setParseAction(util.action(pre_ast.CompositeBlock))
def _enum_definition(self): """Detect an enum definition. e.g. enum foo { OPTION_1: 1 + 2, OPTION_2 } """ return ( _ENUM + pyparsing.Optional(self._identifier())("enum_name") + _OPEN_CURLY + pyparsing.ZeroOrMore( pyparsing.Group( self._identifier()("name") + pyparsing.Optional( _EQUALS # This allows us to get even invalid expressions. + pyparsing.SkipTo(pyparsing.Word(",}"))("expression") ) + pyparsing.Optional(_COMMA) ) )("fields") + _CLOSE_CURLY + self._maybe_attributes()("attributes") ).setParseAction(self._process_enum_definition)
def grammar(): """Define the query grammar. Backus-Naur form (BNF) of the grammar:: <grammar> ::= <item> | <item> <boolean> <grammar> <item> ::= <hosts> | "(" <grammar> ")" <boolean> ::= "and not" | "and" | "xor" | "or" Given that the pyparsing library defines the grammar in a BNF-like style, for the details of the tokens not specified above check directly the source code. Returns: pyparsing.ParserElement: the grammar parser. """ # Boolean operators boolean = (pp.CaselessKeyword('and not').leaveWhitespace() | pp.CaselessKeyword('and') | pp.CaselessKeyword('xor') | pp.CaselessKeyword('or'))('bool') # Parentheses lpar = pp.Literal('(')('open_subgroup') rpar = pp.Literal(')')('close_subgroup') # Hosts selection: clustershell (,!&^[]) syntax is allowed: host10[10-42].domain hosts = (~(boolean) + pp.Word(pp.alphanums + '-_.,!&^[]'))('hosts') # Final grammar, see the docstring for its BNF based on the tokens defined above # Groups are used to split the parsed results for an easy access full_grammar = pp.Forward() item = hosts | lpar + full_grammar + rpar full_grammar << pp.Group(item) + pp.ZeroOrMore(pp.Group(boolean + item)) # pylint: disable=expression-not-assigned return full_grammar
def grammar(): """Define the query grammar for the external backend used for testing.""" # Hosts selection: clustershell (,!&^[]) syntax is allowed: host10[10-42].domain hosts = pp.Word(pp.alphanums + '-_.,!&^[]')('hosts') # Final grammar, see the docstring for its BNF based on the tokens defined above # Groups are used to split the parsed results for an easy access full_grammar = pp.Forward() full_grammar << pp.Group(hosts) + pp.ZeroOrMore(pp.Group(hosts)) # pylint: disable=expression-not-assigned return full_grammar
def main(): word = Word(alphanums) command = Group(OneOrMore(word)) token = Suppress("->") device = Group(OneOrMore(word)) argument = Group(OneOrMore(word)) event = command + token + device + Optional(token + argument) gate = Gate() garage = Garage() airco = Aircondition() heating = Heating() boiler = Boiler() fridge = Fridge() tests = ('open -> gate', 'close -> garage', 'turn on -> aircondition', 'turn off -> heating', 'increase -> boiler temperature -> 20 degrees', 'decrease -> fridge temperature -> 6 degree') open_actions = {'gate':gate.open, 'garage':garage.open, 'aircondition':airco.turn_on, 'heating':heating.turn_on, 'boiler temperature':boiler.increase_temperature, 'fridge temperature':fridge.increase_temperature} close_actions = {'gate':gate.close, 'garage':garage.close, 'aircondition':airco.turn_off, 'heating':heating.turn_off, 'boiler temperature':boiler.decrease_temperature, 'fridge temperature':fridge.decrease_temperature} for t in tests: if len(event.parseString(t)) == 2: # no argument cmd, dev = event.parseString(t) cmd_str, dev_str = ' '.join(cmd), ' '.join(dev) if 'open' in cmd_str or 'turn on' in cmd_str: open_actions[dev_str]() elif 'close' in cmd_str or 'turn off' in cmd_str: close_actions[dev_str]() elif len(event.parseString(t)) == 3: # argument cmd, dev, arg = event.parseString(t) cmd_str, dev_str, arg_str = ' '.join(cmd), ' '.join(dev), ' '.join(arg) num_arg = 0 try: num_arg = int(arg_str.split()[0]) # extract the numeric part except ValueError as err: print("expected number but got: '{}'".format(arg_str[0])) if 'increase' in cmd_str and num_arg > 0: open_actions[dev_str](num_arg) elif 'decrease' in cmd_str and num_arg > 0: close_actions[dev_str](num_arg)
def parse(cls, search=False): """Parse the main query text. This method will also set the class attribute `parsed_search` to the parsed query, and it will return it too. :param cls: The class object, since it is a static method :type cls: object :param search: Search text string if a custom search string is to be used. False if the `cls.search` class attribute is to be used. :type search: str :returns: Parsed query :rtype: list >>> print(DocMatcher.parse('hello author = einstein')) [['hello'], ['author', '=', 'einstein']] >>> print(DocMatcher.parse('')) [] >>> print(\ DocMatcher.parse(\ '"hello world whatever =" tags = \\\'hello ====\\\'')) [['hello world whatever ='], ['tags', '=', 'hello ====']] >>> print(DocMatcher.parse('hello')) [['hello']] """ import pyparsing cls.logger.debug('Parsing search') search = search or cls.search papis_alphas = pyparsing.printables.replace('=', '') papis_key = pyparsing.Word(pyparsing.alphanums + '-') papis_value = pyparsing.QuotedString( quoteChar='"', escChar='\\', escQuote='\\' ) ^ pyparsing.QuotedString( quoteChar="'", escChar='\\', escQuote='\\' ) ^ papis_key equal = pyparsing.ZeroOrMore(" ") + \ pyparsing.Literal('=') + \ pyparsing.ZeroOrMore(" ") papis_query = pyparsing.ZeroOrMore( pyparsing.Group( pyparsing.ZeroOrMore( papis_key + equal ) + papis_value ) ) parsed = papis_query.parseString(search) cls.logger.debug('Parsed search = %s' % parsed) cls.parsed_search = parsed return cls.parsed_search
def main(): word = Word(alphanums) command = Group(OneOrMore(word)) token = Suppress("->") device = Group(OneOrMore(word)) argument = Group(OneOrMore(word)) event = command + token + device + Optional(token + argument) gate = Gate() garage = Garage() airco = Aircondition() heating = Heating() boiler = Boiler() fridge = Fridge() tests = ('open -> gate', 'close -> garage', 'turn on -> aircondition', 'turn off -> heating', 'increase -> boiler temperature -> 5 degrees', 'decrease -> fridge temperature -> 2 degrees') open_actions = {'gate': gate.open, 'garage': garage.open, 'aircondition': airco.turn_on, 'heating': heating.turn_on, 'boiler temperature': boiler.increase_temperature, 'fridge temperature': fridge.increase_temperature} close_actions = {'gate': gate.close, 'garage': garage.close, 'aircondition': airco.turn_off, 'heating': heating.turn_off, 'boiler temperature': boiler.decrease_temperature, 'fridge temperature': fridge.decrease_temperature} for t in tests: if len(event.parseString(t)) == 2: # ???? cmd, dev = event.parseString(t) cmd_str, dev_str = ' '.join(cmd), ' '.join(dev) if 'open' in cmd_str or 'turn on' in cmd_str: open_actions[dev_str]() elif 'close' in cmd_str or 'turn off' in cmd_str: close_actions[dev_str]() elif len(event.parseString(t)) == 3: # ??? cmd, dev, arg = event.parseString(t) cmd_str, dev_str, arg_str = ' '.join(cmd), ' '.join(dev), ' '.join(arg) num_arg = 0 try: num_arg = int(arg_str.split()[0]) # ?????? except ValueError as err: print("expected number but got: '{}'".format(arg_str[0])) if 'increase' in cmd_str and num_arg > 0: open_actions[dev_str](num_arg) elif 'decrease' in cmd_str and num_arg > 0: close_actions[dev_str](num_arg)
def meta_section(): return pyparsing.Group( pyparsing.Literal("meta") + _COLON + pyparsing.OneOrMore( statement() ).setResultsName("statements") ).setResultsName("meta")
def strings_section(): return pyparsing.Group( pyparsing.Literal("strings") + _COLON + pyparsing.OneOrMore(statement()).setResultsName("statements") ).setResultsName("strings")
def _type_instance(self): """A type declaration. The modifiers of a typedef: struct s *P[]; ^^^^<- The type instance. """ type_instance = ( # Function pointer (*f)(int foobar) pyparsing.ZeroOrMore(_STAR) + _OPEN_PARENTHESIS + pyparsing.Optional(_STAR("function_pointer")) + self._identifier()("type_instance_name") + _CLOSE_PARENTHESIS + parsers.anything_in_parentheses()("function_args") ) | ( # Function object f(foo bar *) pyparsing.ZeroOrMore(_STAR) + self._identifier()("type_instance_name") + parsers.anything_in_parentheses()("function_args") ) | ( # Simple form: *foo[10]; pyparsing.ZeroOrMore(_STAR)("type_pointer") + self._identifier()("type_instance_name") # Possibly array: [] , [][] + pyparsing.ZeroOrMore( _OPEN_BRACKET + pyparsing.SkipTo(_CLOSE_BRACKET)( "brackets_with_expression_inside*") + _CLOSE_BRACKET) # Bitfields: int x: 7; + pyparsing.Optional( _COLON + pyparsing.SkipTo( _SEMICOLON | _COMMA)("bitfield") ) ) return pyparsing.Group( type_instance + self._maybe_attributes() )
def _create_primitives(): global binary, ident, rvalue, number, quoted_string, semi, time_interval, slot_id, comp, config_type, stream, comment, stream_trigger, selector if ident is not None: return semi = Literal(u';').suppress() ident = Word(alphas+u"_", alphas + nums + u"_") number = Regex(u'((0x[a-fA-F0-9]+)|[+-]?[0-9]+)').setParseAction(lambda s, l, t: [int(t[0], 0)]) binary = Regex(u'hex:([a-fA-F0-9][a-fA-F0-9])+').setParseAction(lambda s, l, t: [unhexlify(t[0][4:])]) quoted_string = dblQuotedString comment = Literal('#') + restOfLine rvalue = number | quoted_string # Convert all time intervals into an integer number of seconds time_unit_multipliers = { u'second': 1, u'seconds': 1, u'minute': 60, u'minutes': 60, u'hour': 60*60, u'hours': 60*60, u'day': 60*60*24, u'days': 60*60*24, u'month': 60*60*24*30, u'months': 60*60*24*30, u'year': 60*60*24*365, u'years': 60*60*24*365, } config_type = oneOf('uint8_t uint16_t uint32_t int8_t int16_t int32_t uint8_t[] uint16_t[] uint32_t[] int8_t[] int16_t[] int32_t[] string binary') comp = oneOf('> < >= <= == ~=') time_unit = oneOf(u"second seconds minute minutes hour hours day days week weeks month months year years") time_interval = (number + time_unit).setParseAction(lambda s, l, t: [t[0]*time_unit_multipliers[t[1]]]) slot_id = Literal(u"controller") | (Literal(u'slot') + number) slot_id.setParseAction(lambda s,l,t: [SlotIdentifier.FromString(u' '.join([str(x) for x in t]))]) stream_modifier = Literal("system") | Literal("user") | Literal("combined") stream = Optional(Literal("system")) + oneOf("buffered unbuffered input output counter constant") + number + Optional(Literal("node")) stream.setParseAction(lambda s,l,t: [DataStream.FromString(u' '.join([str(x) for x in t]))]) all_selector = Optional(Literal("all")) + Optional(stream_modifier) + oneOf("buffered unbuffered inputs outputs counters constants") + Optional(Literal("nodes")) all_selector.setParseAction(lambda s,l,t: [DataStreamSelector.FromString(u' '.join([str(x) for x in t]))]) one_selector = Optional(Literal("system")) + oneOf("buffered unbuffered input output counter constant") + number + Optional(Literal("node")) one_selector.setParseAction(lambda s,l,t: [DataStreamSelector.FromString(u' '.join([str(x) for x in t]))]) selector = one_selector | all_selector trigger_comp = oneOf('> < >= <= ==') stream_trigger = Group((Literal(u'count') | Literal(u'value')) + Literal(u'(').suppress() - stream - Literal(u')').suppress() - trigger_comp - number).setResultsName('stream_trigger')
def grammar(): """Define the query grammar. Backus-Naur form (BNF) of the grammar:: <grammar> ::= <item> | <item> <and_or> <grammar> <item> ::= [<neg>] <query-token> | [<neg>] "(" <grammar> ")" <query-token> ::= <token> | <hosts> <token> ::= <category>:<key> [<operator> <value>] Given that the pyparsing library defines the grammar in a BNF-like style, for the details of the tokens not specified above check directly the source code. Returns: pyparsing.ParserElement: the grammar parser. """ # Boolean operators and_or = (pp.CaselessKeyword('and') | pp.CaselessKeyword('or'))('bool') # 'neg' is used as label to allow the use of dot notation, 'not' is a reserved word in Python neg = pp.CaselessKeyword('not')('neg') operator = pp.oneOf(OPERATORS, caseless=True)('operator') # Comparison operators quoted_string = pp.quotedString.copy().addParseAction(pp.removeQuotes) # Both single and double quotes are allowed # Parentheses lpar = pp.Literal('(')('open_subgroup') rpar = pp.Literal(')')('close_subgroup') # Hosts selection: glob (*) and clustershell (,!&^[]) syntaxes are allowed: # i.e. host10[10-42].*.domain hosts = quoted_string | (~(and_or | neg) + pp.Word(pp.alphanums + '-_.*,!&^[]')) # Key-value token for allowed categories using the available comparison operators # i.e. F:key = value category = pp.oneOf(CATEGORIES, caseless=True)('category') key = pp.Word(pp.alphanums + '-_.%@:')('key') selector = pp.Combine(category + ':' + key) # i.e. F:key # All printables characters except the parentheses that are part of this or the global grammar all_but_par = ''.join([c for c in pp.printables if c not in ('(', ')', '{', '}')]) value = (quoted_string | pp.Word(all_but_par))('value') token = selector + pp.Optional(operator + value) # Final grammar, see the docstring for its BNF based on the tokens defined above # Groups are used to split the parsed results for an easy access full_grammar = pp.Forward() item = pp.Group(pp.Optional(neg) + (token | hosts('hosts'))) | pp.Group( pp.Optional(neg) + lpar + full_grammar + rpar) full_grammar << item + pp.ZeroOrMore(pp.Group(and_or) + full_grammar) # pylint: disable=expression-not-assigned return full_grammar
def grammar(): """Define the query grammar. Backus-Naur form (BNF) of the grammar:: <grammar> ::= "*" | <items> <items> ::= <item> | <item> <whitespace> <items> <item> ::= <key>:<value> Given that the pyparsing library defines the grammar in a BNF-like style, for the details of the tokens not specified above check directly the source code. Returns: pyparsing.ParserElement: the grammar parser. """ quoted_string = pp.quotedString.copy().addParseAction(pp.removeQuotes) # Both single and double quotes are allowed # Key-value tokens: key:value # Lowercase key, all printable characters except the parentheses that are part of the global grammar for the value key = pp.Word(pp.srange('[a-z0-9-_.]"'), min=2)('key') all_but_par = ''.join([c for c in pp.printables if c not in ('(', ')', '{', '}')]) value = (quoted_string | pp.Word(all_but_par))('value') item = pp.Combine(key + ':' + value) # Final grammar, see the docstring for its BNF based on the tokens defined above # Groups are used to split the parsed results for an easy access return pp.Group(pp.Literal('*')('all')) | pp.OneOrMore(pp.Group(item))
def grammar(backend_keys): """Define the main multi-query grammar. Cumin provides a user-friendly generic query language that allows to combine the results of subqueries for multiple backends: * Each query part can be composed with the others using boolean operators ``and``, ``or``, ``and not``, ``xor``. * Multiple query parts can be grouped together with parentheses ``(``, ``)``. * Specific backend query ``I{backend-specific query syntax}``, where ``I`` is an identifier for the specific backend. * Alias replacement, according to aliases defined in the configuration file ``A:group1``. * The identifier ``A`` is reserved for the aliases replacement and cannot be used to identify a backend. * A complex query example: ``(D{host1 or host2} and (P{R:Class = Role::MyClass} and not A:group1)) or D{host3}`` Backus-Naur form (BNF) of the grammar:: <grammar> ::= <item> | <item> <boolean> <grammar> <item> ::= <backend_query> | <alias> | "(" <grammar> ")" <backend_query> ::= <backend> "{" <query> "}" <alias> ::= A:<alias_name> <boolean> ::= "and not" | "and" | "xor" | "or" Given that the pyparsing library defines the grammar in a BNF-like style, for the details of the tokens not specified above check directly the source code. Arguments: backend_keys (list): list of the GRAMMAR_PREFIX for each registered backend. Returns: pyparsing.ParserElement: the grammar parser. """ # Boolean operators boolean = (pp.CaselessKeyword('and not').leaveWhitespace() | pp.CaselessKeyword('and') | pp.CaselessKeyword('xor') | pp.CaselessKeyword('or'))('bool') # Parentheses lpar = pp.Literal('(')('open_subgroup') rpar = pp.Literal(')')('close_subgroup') # Backend query: P{PuppetDB specific query} query_start = pp.Combine(pp.oneOf(backend_keys, caseless=True)('backend') + pp.Literal('{')) query_end = pp.Literal('}') # Allow the backend specific query to use the end_query token as well, as long as it's in a quoted string # and fail if there is a query_start token before the first query_end is reached query = pp.SkipTo(query_end, ignore=pp.quotedString, failOn=query_start)('query') backend_query = pp.Combine(query_start + query + query_end) # Alias alias = pp.Combine(pp.CaselessKeyword('A') + ':' + pp.Word(pp.alphanums + '-_.+')('alias')) # Final grammar, see the docstring for its BNF based on the tokens defined above # Group are used to have an easy dictionary access to the parsed results full_grammar = pp.Forward() item = backend_query | alias | lpar + full_grammar + rpar full_grammar << pp.Group(item) + pp.ZeroOrMore(pp.Group(boolean + item)) # pylint: disable=expression-not-assigned return full_grammar
def parse_table(attribute, string): Line = OneOrMore(Float)('data') + Literal(';') + Optional(Comments, default='')('name') Grammar = Suppress(Keyword('mpc.{}'.format(attribute)) + Keyword('=') + Keyword('[') + Optional(Comments)) + OneOrMore(Group(Line)) + Suppress(Keyword(']') + Optional(Comments)) result, i, j = Grammar.scanString(string).next() _list = list() for r in result: _list.append([int_else_float_except_string(s) for s in r['data'].asList()]) return _list
def __init__(self, obj, config=str()): def process(config): pathexpr = p.Literal("'").suppress() + \ p.Optional( p.Combine( p.OneOrMore(p.Literal("/") + p.Word(p.alphanums)) + p.Literal("/").suppress() ) ).setResultsName('path') + \ p.Combine( (p.Literal('@').suppress() | p.Literal('!').suppress()) + p.Word(p.alphanums) + p.Literal("'").suppress() ).setResultsName('attrib') expr = p.Group(pathexpr).setResultsName('search') match = expr.parseString(config) _ret = [] if 'search' in match: if 'path' in match['search']: _ret.append(match['search']['path']) if 'attrib' in match['search']: _ret.append(match['search']['attrib']) return _ret super(Xattrib, self).__init__(obj, config=config, defer=True) if self.config is None or len(self.config) < 1 or not isinstance(self.config, str): raise ValueError('Xattrib plugin function requires a config string') try: _result = process("'%s'" % self.config) if len(_result) == 2: self.targetobject, self.targetattribute = _result elif len(_result) == 1: _config = getattr(obj, _result[0], None) if _config is None: raise ValueError('Xattrib plugin received an attribute name that does not exist') # TODO len check only required for attributes, but not method plugins if len(_config) > 1: raise ValueError('Xattrib plugin received a attribute name that contains multiple values') self.targetobject, self.targetattribute = process("'%s'" % _config[0]) else: raise Exception() except: raise ValueError('An error occured when processing the search string for the Xattrib plugin function')