我们从Python开源项目中,提取了以下46个代码示例,用于说明如何使用pyparsing.Optional()。
def lexical_analysis(self, src): delimited = re.sub(r'\s+', ' ', ' '.join(src.strip().split('\n'))).split(';') result = [] for stmt in delimited: if stmt == '': return result string = pp.Regex('[a-zA-Z0-9=_]+') nums = pp.Regex('[0-9]+') ws = pp.OneOrMore(pp.White()).suppress() lp = pp.Regex('[(]').suppress() rp = pp.Regex('[)]').suppress() c = pp.Regex('[,]').suppress() q = pp.Regex("[']").suppress() table_name = string.setResultsName('table_name') create_table = (pp.Keyword('CREATE', caseless = True) + ws + pp.Keyword('TABLE', caseless = True) + ws + pp.Optional(pp.Keyword('IF', caseless = True) + ws + pp.Keyword('NOT', caseless = True) + ws + pp.Keyword('EXISTS', caseless = True))).suppress() + table_name + lp column_name = string.setResultsName('column_name') data_type = string.setResultsName('data_type') length = lp + nums.setResultsName('length') + rp nullable = (pp.Optional(pp.Keyword('NOT', caseless = True) + ws) + pp.Keyword('NULL', caseless = True)).setResultsName('nullable') default_value = pp.Keyword('DEFAULT', caseless = True).suppress() + ws + string.setResultsName('default_value') auto_increment = pp.Keyword('AUTO_INCREMENT', caseless = True).setResultsName('auto_increment') column = pp.Optional(ws) + column_name + ws + data_type + pp.Optional(pp.MatchFirst([length, ws + nullable, ws + default_value, ws + auto_increment])) + pp.Optional(pp.MatchFirst([ws + nullable, ws + default_value, ws + auto_increment])) + pp.Optional(pp.MatchFirst([ws + default_value, ws + auto_increment])) + pp.Optional(ws + auto_increment) + pp.Optional(ws) + c primary_key = pp.Keyword('PRIMARY KEY', caseless = True).suppress() + lp + pp.OneOrMore(q + string.setResultsName('primary_key') + q + pp.Optional(c)) + rp + pp.Optional(c) key = pp.Keyword('KEY', caseless = True).suppress() + lp + q + string.setResultsName('key') + q + pp.Optional(c) + rp + pp.Optional(c) parser = create_table + pp.OneOrMore(pp.Group(column)) + pp.Optional(primary_key) + pp.Optional(key) + rp + pp.OneOrMore(ws + string).suppress() result.append(parser.parseString(stmt, parseAll=True)) return result
def __init__(self, identifier_parser=None): """ :param IdentifierParser identifier_parser: An identifier parser for checking the 3P and 5P partners """ self.identifier_parser = identifier_parser if identifier_parser is not None else IdentifierParser() pmod_default_ns = oneOf(list(pmod_namespace.keys())).setParseAction(self.handle_pmod_default_ns) pmod_legacy_ns = oneOf(list(pmod_legacy_labels.keys())).setParseAction(self.handle_pmod_legacy_ns) pmod_identifier = MatchFirst([ Group(self.identifier_parser.identifier_qualified), Group(pmod_default_ns), Group(pmod_legacy_ns) ]) self.language = pmod_tag + nest(pmod_identifier(IDENTIFIER) + Optional( WCW + amino_acid(PMOD_CODE) + Optional(WCW + ppc.integer(PMOD_POSITION)))) super(PmodParser, self).__init__(self.language)
def compile(self): manipulation_set = pp.Optional(pp.Suppress(pp.Keyword("THEN")) + pp.Suppress("|") + pp.SkipTo(pp.Suppress(";"), include=True)) manipulation_set.setParseAction(lambda x: self._add_manipulation_set(x[0])) parser = (pp.Keyword("CONNECT") + self.connect_block.parser() + pp.Keyword("RETRIEVE") + self.retrieve_block.parser() + pp.Optional(pp.Keyword("JOIN") + self.join_block.parser())) try: parser.parseString(self.qgl_str) except pp.ParseException, e: raise QGLSyntaxError("Couldn't parse query: \n %s" % e) self._create_connectors() self._create_query_nodes() if self.join_block: self._create_joins() if self.manipulation_set_str: self.query_graph.manipulation_set.append_from_str(self.manipulation_set_str)
def _parse(self, df=None, independent_param_vals=None): expr_evaluator = Evaluator(df=df, name_dict=independent_param_vals) param_expr = expr_evaluator.parser() render_as_type = pp.Word(pp.alphas, pp.alphanums + "_$") render_as_type.setParseAction(lambda x: self._set_render_type(value=x[0])) container_type = pp.Optional(pp.Word(pp.alphas, pp.alphanums + "_$") + pp.Suppress(":"), default=None) container_type.setParseAction(lambda x: self._set_container_type(value=x[0])) parser = param_expr + pp.Suppress("->") + container_type + render_as_type try: parser.parseString(self.parameter_str) except pp.ParseException, e: raise ParameterRenderError("Error parsing parameter string: \n %s" % e) python_value = expr_evaluator.output_value() return python_value
def _struct_definition_possibly_with_fields(self): """Detect a struct/enum/union definition. e.g. struct foobar { int v[100]; } __attribute__((packed)) """ return ( (_STRUCT | _UNION)("type") + pyparsing.Optional(self._identifier())("type_name") + _OPEN_CURLY + pyparsing.ZeroOrMore( self.element )("fields") + _CLOSE_CURLY + self._maybe_attributes()("attributes") ).setParseAction(self._process_struct_definition)
def _create_simple_statements(): global binary, ident, rvalue, simple_statement, semi, comp, number, slot_id, callrpc_stmt, generic_statement, streamer_stmt, stream, selector if simple_statement is not None: return meta_stmt = Group(Literal('meta').suppress() + ident + Literal('=').suppress() + rvalue + semi).setResultsName('meta_statement') require_stmt = Group(Literal('require').suppress() + ident + comp + rvalue + semi).setResultsName('require_statement') set_stmt = Group(Literal('set').suppress() - (ident | number) - Literal("to").suppress() - (rvalue | binary) - Optional(Literal('as').suppress() + config_type) + semi).setResultsName('set_statement') callrpc_stmt = Group(Literal("call").suppress() + (ident | number) + Literal("on").suppress() + slot_id + Optional(Literal("=>").suppress() + stream('explicit_stream')) + semi).setResultsName('call_statement') streamer_stmt = Group(Optional(Literal("manual")('manual')) + Optional(oneOf(u'encrypted signed')('security')) + Optional(Literal(u'realtime')('realtime')) + Literal('streamer').suppress() - Literal('on').suppress() - selector('selector') - Optional(Literal('to').suppress() - slot_id('explicit_tile')) - Optional(Literal('with').suppress() - Literal('streamer').suppress() - number('with_other')) - semi).setResultsName('streamer_statement') copy_stmt = Group(Literal("copy").suppress() - Optional(oneOf("all count average")('modifier')) - Optional(stream('explicit_input') | number('constant_input')) - Literal("=>") - stream("output") - semi).setResultsName('copy_statement') trigger_stmt = Group(Literal("trigger") - Literal("streamer") - number('index') - semi).setResultsName('trigger_statement') simple_statement = meta_stmt | require_stmt | set_stmt | callrpc_stmt | streamer_stmt | trigger_stmt | copy_stmt # In generic statements, keep track of the location where the match started for error handling locator = Empty().setParseAction(lambda s, l, t: l)('location') generic_statement = Group(locator + Group(ZeroOrMore(Regex(u"[^{};]+")) + Literal(u';'))('match')).setResultsName('unparsed_statement')
def _create_block_bnf(): global block_bnf, time_interval, slot_id, statement, block_id, ident, stream if block_bnf is not None: return trigger_clause = Group(stream_trigger | Group(stream).setResultsName('stream_always') | Group(ident).setResultsName('identifier')) every_block_id = Group(Literal(u'every').suppress() - time_interval).setResultsName('every_block') when_block_id = Group(Literal(u'when').suppress() + Literal("connected").suppress() - Literal("to").suppress() - slot_id).setResultsName('when_block') latch_block_id = Group(Literal(u'when').suppress() - stream_trigger).setResultsName('latch_block') config_block_id = Group(Literal(u'config').suppress() - slot_id).setResultsName('config_block') on_block_id = Group(Literal(u'on').suppress() - trigger_clause.setResultsName('triggerA') - Optional((Literal("and") | Literal("or")) - trigger_clause.setResultsName('triggerB'))).setResultsName('on_block') block_id = every_block_id | when_block_id | latch_block_id | config_block_id | on_block_id block_bnf = Forward() statement = generic_statement | block_bnf block_bnf << Group(block_id + Group(Literal(u'{').suppress() + ZeroOrMore(statement) + Literal(u'}').suppress())).setResultsName('block')
def build_legacy_fusion(identifier, reference): break_start = (ppc.integer | '?').setParseAction(fusion_break_handler_wrapper(reference, start=True)) break_end = (ppc.integer | '?').setParseAction(fusion_break_handler_wrapper(reference, start=False)) res = identifier(PARTNER_5P) + WCW + fusion_tags + nest(identifier(PARTNER_3P) + Optional( WCW + Group(break_start)(RANGE_5P) + WCW + Group(break_end)(RANGE_3P))) res.setParseAction(fusion_legacy_handler) return res
def __init__(self): self.fragment_range = (ppc.integer | '?')(FRAGMENT_START) + '_' + (ppc.integer | '?' | '*')(FRAGMENT_STOP) self.missing_fragment = Keyword('?')(FRAGMENT_MISSING) self.language = fragment_tag + nest( (self.fragment_range | self.missing_fragment(FRAGMENT_MISSING)) + Optional( WCW + word(FRAGMENT_DESCRIPTION))) super(FragmentParser, self).__init__(self.language)
def parse_filter_str(self, filter_str): """ method to parse filter string """ prop = pp.WordStart(pp.alphas) + pp.Word(pp.alphanums + "_").setResultsName("prop") value = (pp.QuotedString("'") | pp.QuotedString('"') | pp.Word( pp.printables, excludeChars=",")).setResultsName("value") types_ = pp.oneOf("re eq ne gt ge lt le").setResultsName("types") flags = pp.oneOf("C I").setResultsName("flags") comma = pp.Literal(',') quote = (pp.Literal("'") | pp.Literal('"')).setResultsName("quote") type_exp = pp.Group(pp.Literal("type") + pp.Literal( "=") + quote + types_ + quote).setResultsName("type_exp") flag_exp = pp.Group(pp.Literal("flag") + pp.Literal( "=") + quote + flags + quote).setResultsName("flag_exp") semi_expression = pp.Forward() semi_expression << pp.Group(pp.Literal("(") + prop + comma + value + pp.Optional(comma + type_exp) + pp.Optional(comma + flag_exp) + pp.Literal(")") ).setParseAction( self.parse_filter_obj).setResultsName("semi_expression") expr = pp.Forward() expr << pp.operatorPrecedence(semi_expression, [ ("not", 1, pp.opAssoc.RIGHT, self.not_operator), ("and", 2, pp.opAssoc.LEFT, self.and_operator), ("or", 2, pp.opAssoc.LEFT, self.or_operator) ]) result = expr.parseString(filter_str) return result
def parseformat(classname=None, formatstring=None): attribmarker = (p.Literal('@')|p.Literal('!')).suppress() cellseparator = '||' concatmarker = p.Optional(p.Literal('+')) attribgroup = attribmarker + concatmarker + p.Word(p.alphanums) cells = [] _splitstring = [cell.strip() for cell in formatstring.split(cellseparator)] for cell in _splitstring: _scan = attribgroup.scanString(cell) _templist = [] prestart = 0 end = 0 for match in _scan: start = match[1] end = match[2] _start = cell[prestart:start] if len(_start) > 0: # conditional logic avoids empty leading output cells _templist.append(om.Filler(_start)) _templist.append(om.AttributeMatch(cell[start + 1:end])) #, classname=classname)) prestart = end # print('templist:', _templist) _end = cell[end:] if len(_end) > 0: # conditional logic avoids empty trailing output cells _templist.append(om.Filler(cell[end:])) cells.append(_templist) return cells # --- static ---
def _build_input_source_parser(legalChars, commentInProgress): """Builds a PyParsing parser for alternate user input sources (from file, pipe, etc.)""" input_mark = pyparsing.Literal('<') input_mark.setParseAction(lambda x: '') file_name = pyparsing.Word(legalChars + '/\\') input_from = file_name('inputFrom') input_from.setParseAction(replace_with_file_contents) # a not-entirely-satisfactory way of distinguishing < as in "import from" from < # as in "lesser than" inputParser = input_mark + pyparsing.Optional(input_from) + pyparsing.Optional('>') + \ pyparsing.Optional(file_name) + (pyparsing.stringEnd | '|') inputParser.ignore(commentInProgress) return inputParser
def __init__(self, ffilter, queue_out): FuzzQueue.__init__(self, queue_out) Thread.__init__(self) self.setName('filter_thread') self.queue_out = queue_out if PYPARSING: element = oneOf("c l w h") digits = "XB0123456789" integer = Word( digits )#.setParseAction( self.__convertIntegers ) elementRef = Group(element + oneOf("= != < > >= <=") + integer) operator = oneOf("and or") definition = elementRef + ZeroOrMore( operator + elementRef) nestedformula = Group(Suppress(Optional(Literal("("))) + definition + Suppress(Optional(Literal(")")))) self.finalformula = nestedformula + ZeroOrMore( operator + nestedformula) elementRef.setParseAction(self.__compute_element) nestedformula.setParseAction(self.__compute_formula) self.finalformula.setParseAction(self.__myreduce) self.res = None self.hideparams = ffilter if "XXX" in self.hideparams['codes']: self.hideparams['codes'].append("0") self.baseline = None
def __init__(self): if PYPARSING: category = Word( alphas + "_-*", alphanums + "_-*" ) operator = oneOf("and or ,") neg_operator = "not" elementRef = category definition = elementRef + ZeroOrMore( operator + elementRef) nestedformula = Group(Suppress(Optional(Literal("("))) + definition + Suppress(Optional(Literal(")")))) neg_nestedformula = Optional(neg_operator) + nestedformula self.finalformula = neg_nestedformula + ZeroOrMore( operator + neg_nestedformula) elementRef.setParseAction(self.__compute_element) neg_nestedformula.setParseAction(self.__compute_neg_formula) nestedformula.setParseAction(self.__compute_formula) self.finalformula.setParseAction(self.__myreduce)
def __init__(self, get_obfuscated): """BNF grammar for Kivy source statements. Parameters ---------- get_obfuscated : function Function to return the obfuscated name for an identifier. """ super(ObfuscateKivyBNF, self).__init__(get_obfuscated) self.kivy_import = ( ZeroOrMore( (self.directive | Literal('import') | self.ident | self.separator.suppress() | self.fnumber.suppress() | self.string.suppress() ) + Optional(self.comment).suppress() )) self.kivy_import.setParseAction(self.add_kivy_import) ############### # Parse actions ###############
def __init__(self): real_word_dashes = Word(pyparsing.alphas + '-') punctuation = Word('.!?:,;-') punctuation_no_dash = Word('.!?:,;') punctuation_reference_letter = Word('.:,;-') printable = Word(pyparsing.printables, exact=1) letter = Word(pyparsing.alphas, exact=1) letter_reference = punctuation_reference_letter + letter nums = Word(pyparsing.nums) + Optional(letter) + \ ZeroOrMore(letter_reference) word_end = pyparsing.ZeroOrMore(Word(')') | Word('}') | Word(']')) + \ WordEnd() self.single_number = ( WordStart() + real_word_dashes + nums + word_end ) self.single_number_parens = ( printable + letter + Optional(punctuation_no_dash) + pyparsing.OneOrMore( Word('([{', exact=1) + pyparsing.OneOrMore(nums | Word('-')) + Word(')]}', exact=1) ) + word_end ) self.number_then_punctuation = ( printable + letter + nums + punctuation + pyparsing.ZeroOrMore(nums | punctuation) + word_end ) self.punctuation_then_number = ( printable + letter + punctuation_no_dash + nums + pyparsing.ZeroOrMore(punctuation | nums) + word_end )
def main(): word = Word(alphanums) command = Group(OneOrMore(word)) token = Suppress("->") device = Group(OneOrMore(word)) argument = Group(OneOrMore(word)) event = command + token + device + Optional(token + argument) gate = Gate() garage = Garage() airco = Aircondition() heating = Heating() boiler = Boiler() fridge = Fridge() tests = ('open -> gate', 'close -> garage', 'turn on -> aircondition', 'turn off -> heating', 'increase -> boiler temperature -> 5 degrees', 'decrease -> fridge temperature -> 2 degrees') open_actions = {'gate':gate.open, 'garage':garage.open, 'aircondition':airco.turn_on, 'heating':heating.turn_on, 'boiler temperature':boiler.increase_temperature, 'fridge temperature':fridge.increase_temperature} close_actions = {'gate':gate.close, 'garage':garage.close, 'aircondition':airco.turn_off, 'heating':heating.turn_off, 'boiler temperature':boiler.decrease_temperature, 'fridge temperature':fridge.decrease_temperature} for t in tests: if len(event.parseString(t)) == 2: # no argument cmd, dev = event.parseString(t) cmd_str, dev_str = ' '.join(cmd), ' '.join(dev) if 'open' in cmd_str or 'turn on' in cmd_str: open_actions[dev_str]() elif 'close' in cmd_str or 'turn off' in cmd_str: close_actions[dev_str]() elif len(event.parseString(t)) == 3: # argument cmd, dev, arg = event.parseString(t) cmd_str, dev_str, arg_str = ' '.join(cmd), ' '.join(dev), ' '.join(arg) num_arg = 0 try: num_arg = int(arg_str.split()[0]) # extract the numeric part except ValueError as err: print("expected number but got: '{}'".format(arg_str[0])) if 'increase' in cmd_str and num_arg > 0: open_actions[dev_str](num_arg) elif 'decrease' in cmd_str and num_arg > 0: close_actions[dev_str](num_arg)
def build_parser(self): parsed_term = pyparsing.Group(pyparsing.Combine(pyparsing.Word(pyparsing.alphanums) + \ pyparsing.Suppress('*'))).setResultsName('wildcard') | \ pyparsing.Group(pyparsing.Combine(pyparsing.Word(pyparsing.alphanums+"._") + \ pyparsing.Word(':') + pyparsing.Group(pyparsing.Optional("\"") + \ pyparsing.Optional("<") + pyparsing.Optional(">") + pyparsing.Optional("=") + \ pyparsing.Optional("-") + pyparsing.Word(pyparsing.alphanums+"._/") + \ pyparsing.Optional("&") + pyparsing.Optional("<") + pyparsing.Optional(">") + \ pyparsing.Optional("=") + pyparsing.Optional("-") + \ pyparsing.Optional(pyparsing.Word(pyparsing.alphanums+"._/")) + \ pyparsing.Optional("\"")))).setResultsName('fields') | \ pyparsing.Group(pyparsing.Combine(pyparsing.Suppress('-')+ \ pyparsing.Word(pyparsing.alphanums+"."))).setResultsName('not_term') | \ pyparsing.Group(pyparsing.Word(pyparsing.alphanums)).setResultsName('term') parsed_or = pyparsing.Forward() parsed_quote_block = pyparsing.Forward() parsed_quote_block << ((parsed_term + parsed_quote_block) | parsed_term ) parsed_quote = pyparsing.Group(pyparsing.Suppress('"') + parsed_quote_block + \ pyparsing.Suppress('"')).setResultsName("quotes") | parsed_term parsed_parenthesis = pyparsing.Group((pyparsing.Suppress("(") + parsed_or + \ pyparsing.Suppress(")"))).setResultsName("parenthesis") | parsed_quote parsed_and = pyparsing.Forward() parsed_and << (pyparsing.Group(parsed_parenthesis + pyparsing.Suppress(pyparsing.Keyword("and")) + \ parsed_and).setResultsName("and") | \ pyparsing.Group(parsed_parenthesis + pyparsing.OneOrMore(~pyparsing.oneOf("or and") + \ parsed_and)).setResultsName("and") | parsed_parenthesis) parsed_or << (pyparsing.Group(parsed_and + pyparsing.Suppress(pyparsing.Keyword("or")) + \ parsed_or).setResultsName("or") | parsed_and) return parsed_or.parseString
def ResourceSchema(schema): base_schema = { voluptuous.Optional('started_at'): utils.to_datetime, voluptuous.Optional('ended_at'): utils.to_datetime, voluptuous.Optional('user_id'): voluptuous.Any(None, six.text_type), voluptuous.Optional('project_id'): voluptuous.Any(None, six.text_type), voluptuous.Optional('metrics'): MetricsSchema, } base_schema.update(schema) return base_schema
def parser(self): query_key = pp.Keyword("QUERY") query_value = pp.Suppress("|") + pp.SkipTo(pp.Suppress(";"), include=True) fields_key = pp.Keyword("FIELDS") field_name = common_parsers.column field_name_list = pp.Group(pp.delimitedList(field_name, delim=",")).setParseAction(lambda x: x.asList()) fields_block = (pp.Suppress(fields_key) + field_name_list) connector_name = pp.Word(pp.alphas, pp.alphanums + "_$") using_block = pp.Suppress("USING") + connector_name then_key = pp.Suppress("THEN") manipulation_set = pp.Suppress("|") + pp.SkipTo(pp.Suppress(";"), include=True) then_block = then_key + manipulation_set as_key = pp.Suppress("AS") node_name = pp.Word(pp.alphas, pp.alphanums + "_$") as_block = as_key + node_name query_node_block = (pp.Suppress(query_key) + query_value + pp.Optional(fields_block, default=None) + using_block + pp.Optional(then_block, default=None) + as_block) query_node_block.setParseAction(lambda x: self._add_query_node(query_value=x[0], connector_name=x[2], node_name=x[4], fields=x[1], manipulation_set=x[3])) single_query_node = query_node_block + pp.Optional(pp.Suppress("---")) retrieve_block = pp.OneOrMore(single_query_node) return retrieve_block
def parser(self): # Define punctuation as suppressed literals. lparen, rparen, lbrack, rbrack, lbrace, rbrace, colon = \ map(pp.Suppress, "()[]{}:") integer = pp.Combine(pp.Optional(pp.oneOf("+ -")) + pp.Word(pp.nums)) \ .setName("integer") \ .setParseAction(lambda toks: int(toks[0])) real = pp.Combine(pp.Optional(pp.oneOf("+ -")) + pp.Word(pp.nums) + "." + pp.Optional(pp.Word(pp.nums)) + pp.Optional(pp.oneOf("e E") + pp.Optional(pp.oneOf("+ -")) + pp.Word(pp.nums))) \ .setName("real") \ .setParseAction(lambda toks: float(toks[0])) _datetime_arg = (integer | real) datetime_args = pp.Group(pp.delimitedList(_datetime_arg)) _datetime = pp.Suppress(pp.Literal('datetime') + pp.Literal("(")) + datetime_args + pp.Suppress(")") _datetime.setParseAction(lambda x: self._make_datetime(x[0])) tuple_str = pp.Forward() list_str = pp.Forward() dict_str = pp.Forward() list_item = real | integer | _datetime | pp.quotedString.setParseAction(pp.removeQuotes) | \ pp.Group(list_str) | tuple_str | dict_str tuple_str << (pp.Suppress("(") + pp.Optional(pp.delimitedList(list_item)) + pp.Optional(pp.Suppress(",")) + pp.Suppress(")")) tuple_str.setParseAction(lambda toks : tuple(toks.asList())) list_str << (lbrack + pp.Optional(pp.delimitedList(list_item) + pp.Optional(pp.Suppress(","))) + rbrack) dict_entry = pp.Group(list_item + colon + list_item) dict_str << (lbrace + pp.Optional(pp.delimitedList(dict_entry) + pp.Optional(pp.Suppress(","))) + rbrace) dict_str.setParseAction(lambda toks: dict(toks.asList())) return list_item
def number_parser(): point = pp.Literal(".") e = pp.CaselessLiteral("e") plusorminus = pp.Literal("+") ^ pp.Literal("-") num = pp.Word(pp.nums) dec = pp.Combine(num + pp.Optional(point + pp.Optional(num)) + pp.Optional(e + pp.Optional(plusorminus) + num)) ^\ pp.Combine(point + pp.Optional(num) + pp.Optional(e + pp.Optional(plusorminus) + num)) bin = pp.Combine(pp.Literal("0") + pp.CaselessLiteral("b") + pp.Word("01")) hex = pp.Combine(pp.Literal("0") + pp.CaselessLiteral("x") + pp.Word(pp.hexnums)) oct = pp.Combine(pp.Literal("0") + pp.Optional(pp.CaselessLiteral("o")) + pp.Word("01234567")) return dec ^ bin ^ hex ^ oct
def _arguments(self): return pyparsing.Group( pyparsing.Optional( pyparsing.delimitedList(self.expression())))
def _arguments(self): return pyparsing.Group( pyparsing.Optional(pyparsing.delimitedList(self._argument())) )
def _enum_definition(self): """Detect an enum definition. e.g. enum foo { OPTION_1: 1 + 2, OPTION_2 } """ return ( _ENUM + pyparsing.Optional(self._identifier())("enum_name") + _OPEN_CURLY + pyparsing.ZeroOrMore( pyparsing.Group( self._identifier()("name") + pyparsing.Optional( _EQUALS # This allows us to get even invalid expressions. + pyparsing.SkipTo(pyparsing.Word(",}"))("expression") ) + pyparsing.Optional(_COMMA) ) )("fields") + _CLOSE_CURLY + self._maybe_attributes()("attributes") ).setParseAction(self._process_enum_definition)
def _numeric_type_identifier(self): with_sign_identifier = ( self._number_sign_identifier() + pyparsing.Optional(self._number_size_identifier()) ) with_size_identifier = ( pyparsing.Optional(self._number_sign_identifier()) + self._number_size_identifier() ) return with_sign_identifier | with_size_identifier
def _number_size_identifier(self): may_have_int_suffix = _LONG_LONG | _SHORT | _LONG return _INT | _CHAR | (may_have_int_suffix + pyparsing.Optional(_INT))
def _struct_typedef(): return ( _TYPEDEF + (_STRUCT.setResultsName("type") | _UNION.setResultsName("type")) + pyparsing.Optional(_IDENTIFIER).setResultsName("id") + parsers.anything_in_curly() + pyparsing.Optional(_STAR) + _IDENTIFIER.setResultsName("typedef_name") + pyparsing.SkipTo(_SEMICOLON) + _SEMICOLON ).setResultsName("_struct_typedef")
def _enum(): return ( _ENUM + pyparsing.Optional(_IDENTIFIER).setResultsName("id") + parsers.anything_in_curly() + _SEMICOLON ).setResultsName("_enum")
def _define_function_like(self): return ( (_IDENTIFIER.setResultsName("name") + _OPEN_PARENTHESES).leaveWhitespace() + pyparsing.Optional( pyparsing.delimitedList( _IDENTIFIER | pyparsing.Literal("...") # vararg macro. )).setResultsName("arguments") + _CLOSE_PARENTHESES + pyparsing.restOfLine.setResultsName("replacement") ).setParseAction(self._add_function_like)
def parse_line(attribute, string): Grammar = Suppress(Keyword('mpc.{}'.format(attribute)) + Keyword('=')) + String('data') + Suppress(Literal(';') + Optional(Comments)) result, i, j = Grammar.scanString(string).next() return [int_else_float_except_string(s) for s in result['data'].asList()]
def compute(self): def getname(obj, name): _val = None if hasattr(obj, name): _val = getattr(obj, name, None) if _val is None: return _val try: if _val.isdynamic: #TODO make this work for non-attributes, non-dynamics (use .issingleton? - what about a concat mode?) raise ValueError('Combine plugin cannot process %s because it contains a dynamic class' % name) except AttributeError: raise TypeError('Expected an attribute but got a %s' % type(_val)) if _val.issingleton(): _ret = '%s' % _val[0].raw() else: _ret = ', '.join(['%s' % v.raw() for v in _val]) return _ret attrmarker = (p.Literal('@') | p.Literal('!')) attrmatch = attrmarker.suppress() + p.Word(p.alphanums) for i in attrmatch.scanString(self.config): x = i[0][0] self.__attribs__[x] = getname(self.targetobject, x) if all(v is not None for v in self.__attribs__.values()): self.computable = True if self.computable: attrmatch = p.Literal('@').suppress() + p.Word(p.alphanums) attrmatch.setParseAction(self.substitute) attrlist = p.ZeroOrMore(p.Optional(p.White()) + attrmatch + p.Optional(p.White())) self.__result__ = attrlist.transformString(self.config)
def parseTerms(): """ expop :: '^' multop :: '*' | '/' addop :: '+' | '-' integer :: ['+' | '-'] '0'..'9'+ atom :: PI | E | real | fn '(' expr ')' | '(' expr ')' factor :: atom [ expop factor ]* term :: factor [ multop factor ]* expr :: term [ addop term ]* """ global terms if not terms: point = Literal( "." ) e = CaselessLiteral( "E" ) fnumber = Combine( Word( "+-"+nums, nums ) + Optional( point + Optional( Word( nums ) ) ) + Optional( e + Word( "+-"+nums, nums ) ) ) ident = Word(alphas, alphas+nums+"_$") plus = Literal( "+" ) minus = Literal( "-" ) mult = Literal( "*" ) div = Literal( "/" ) lpar = Literal( "(" ).suppress() rpar = Literal( ")" ).suppress() addop = plus | minus multop = mult | div expop = Literal( "^" ) pi = CaselessLiteral( "PI" ) expr = Forward() atom = (Optional("-") + ( pi | e | fnumber | ident + lpar + expr + rpar ).setParseAction( pushFirst ) | ( lpar + expr.suppress() + rpar )).setParseAction(pushUMinus) # by defining exponentiation as "atom [ ^ factor ]..." instead of "atom [ ^ atom ]...", we get right-to-left exponents, instead of left-to-righ # that is, 2^3^2 = 2^(3^2), not (2^3)^2. factor = Forward() factor << atom + ZeroOrMore( ( expop + factor ).setParseAction( pushFirst ) ) term = factor + ZeroOrMore( ( multop + factor ).setParseAction( pushFirst ) ) expr << term + ZeroOrMore( ( addop + term ).setParseAction( pushFirst ) ) terms = expr return terms
def main(): word = Word(alphanums) command = Group(OneOrMore(word)) token = Suppress("->") device = Group(OneOrMore(word)) argument = Group(OneOrMore(word)) event = command + token + device + Optional(token + argument) gate = Gate() garage = Garage() airco = Aircondition() heating = Heating() boiler = Boiler() fridge = Fridge() tests = ('open -> gate', 'close -> garage', 'turn on -> aircondition', 'turn off -> heating', 'increase -> boiler temperature -> 20 degrees', 'decrease -> fridge temperature -> 6 degree') open_actions = {'gate':gate.open, 'garage':garage.open, 'aircondition':airco.turn_on, 'heating':heating.turn_on, 'boiler temperature':boiler.increase_temperature, 'fridge temperature':fridge.increase_temperature} close_actions = {'gate':gate.close, 'garage':garage.close, 'aircondition':airco.turn_off, 'heating':heating.turn_off, 'boiler temperature':boiler.decrease_temperature, 'fridge temperature':fridge.decrease_temperature} for t in tests: if len(event.parseString(t)) == 2: # no argument cmd, dev = event.parseString(t) cmd_str, dev_str = ' '.join(cmd), ' '.join(dev) if 'open' in cmd_str or 'turn on' in cmd_str: open_actions[dev_str]() elif 'close' in cmd_str or 'turn off' in cmd_str: close_actions[dev_str]() elif len(event.parseString(t)) == 3: # argument cmd, dev, arg = event.parseString(t) cmd_str, dev_str, arg_str = ' '.join(cmd), ' '.join(dev), ' '.join(arg) num_arg = 0 try: num_arg = int(arg_str.split()[0]) # extract the numeric part except ValueError as err: print("expected number but got: '{}'".format(arg_str[0])) if 'increase' in cmd_str and num_arg > 0: open_actions[dev_str](num_arg) elif 'decrease' in cmd_str and num_arg > 0: close_actions[dev_str](num_arg)
def MetricSchema(definition): creator = pecan.request.auth_helper.get_current_user( pecan.request) # First basic validation schema = voluptuous.Schema({ "archive_policy_name": six.text_type, "resource_id": functools.partial(ResourceID, creator=creator), "name": six.text_type, voluptuous.Optional("unit"): voluptuous.All(six.text_type, voluptuous.Length(max=31)), }) definition = schema(definition) archive_policy_name = definition.get('archive_policy_name') name = definition.get('name') if name and '/' in name: abort(400, "'/' is not supported in metric name") if archive_policy_name is None: try: ap = pecan.request.indexer.get_archive_policy_for_metric(name) except indexer.NoArchivePolicyRuleMatch: # NOTE(jd) Since this is a schema-like function, we # should/could raise ValueError, but if we do so, voluptuous # just returns a "invalid value" with no useful message – so we # prefer to use abort() to make sure the user has the right # error message abort(400, "No archive policy name specified " "and no archive policy rule found matching " "the metric name %s" % name) else: definition['archive_policy_name'] = ap.name resource_id = definition.get('resource_id') if resource_id is None: original_resource_id = None else: if name is None: abort(400, {"cause": "Attribute value error", "detail": "name", "reason": "Name cannot be null " "if resource_id is not null"}) original_resource_id, resource_id = resource_id enforce("create metric", { "creator": creator, "archive_policy_name": archive_policy_name, "resource_id": resource_id, "original_resource_id": original_resource_id, "name": name, "unit": definition.get('unit'), }) return definition
def main(): word = Word(alphanums) command = Group(OneOrMore(word)) token = Suppress("->") device = Group(OneOrMore(word)) argument = Group(OneOrMore(word)) event = command + token + device + Optional(token + argument) gate = Gate() garage = Garage() airco = Aircondition() heating = Heating() boiler = Boiler() fridge = Fridge() tests = ('open -> gate', 'close -> garage', 'turn on -> aircondition', 'turn off -> heating', 'increase -> boiler temperature -> 5 degrees', 'decrease -> fridge temperature -> 2 degrees') open_actions = {'gate': gate.open, 'garage': garage.open, 'aircondition': airco.turn_on, 'heating': heating.turn_on, 'boiler temperature': boiler.increase_temperature, 'fridge temperature': fridge.increase_temperature} close_actions = {'gate': gate.close, 'garage': garage.close, 'aircondition': airco.turn_off, 'heating': heating.turn_off, 'boiler temperature': boiler.decrease_temperature, 'fridge temperature': fridge.decrease_temperature} for t in tests: if len(event.parseString(t)) == 2: # ???? cmd, dev = event.parseString(t) cmd_str, dev_str = ' '.join(cmd), ' '.join(dev) if 'open' in cmd_str or 'turn on' in cmd_str: open_actions[dev_str]() elif 'close' in cmd_str or 'turn off' in cmd_str: close_actions[dev_str]() elif len(event.parseString(t)) == 3: # ??? cmd, dev, arg = event.parseString(t) cmd_str, dev_str, arg_str = ' '.join(cmd), ' '.join(dev), ' '.join(arg) num_arg = 0 try: num_arg = int(arg_str.split()[0]) # ?????? except ValueError as err: print("expected number but got: '{}'".format(arg_str[0])) if 'increase' in cmd_str and num_arg > 0: open_actions[dev_str](num_arg) elif 'decrease' in cmd_str and num_arg > 0: close_actions[dev_str](num_arg)
def _type_instance(self): """A type declaration. The modifiers of a typedef: struct s *P[]; ^^^^<- The type instance. """ type_instance = ( # Function pointer (*f)(int foobar) pyparsing.ZeroOrMore(_STAR) + _OPEN_PARENTHESIS + pyparsing.Optional(_STAR("function_pointer")) + self._identifier()("type_instance_name") + _CLOSE_PARENTHESIS + parsers.anything_in_parentheses()("function_args") ) | ( # Function object f(foo bar *) pyparsing.ZeroOrMore(_STAR) + self._identifier()("type_instance_name") + parsers.anything_in_parentheses()("function_args") ) | ( # Simple form: *foo[10]; pyparsing.ZeroOrMore(_STAR)("type_pointer") + self._identifier()("type_instance_name") # Possibly array: [] , [][] + pyparsing.ZeroOrMore( _OPEN_BRACKET + pyparsing.SkipTo(_CLOSE_BRACKET)( "brackets_with_expression_inside*") + _CLOSE_BRACKET) # Bitfields: int x: 7; + pyparsing.Optional( _COLON + pyparsing.SkipTo( _SEMICOLON | _COMMA)("bitfield") ) ) return pyparsing.Group( type_instance + self._maybe_attributes() )
def _create_primitives(): global binary, ident, rvalue, number, quoted_string, semi, time_interval, slot_id, comp, config_type, stream, comment, stream_trigger, selector if ident is not None: return semi = Literal(u';').suppress() ident = Word(alphas+u"_", alphas + nums + u"_") number = Regex(u'((0x[a-fA-F0-9]+)|[+-]?[0-9]+)').setParseAction(lambda s, l, t: [int(t[0], 0)]) binary = Regex(u'hex:([a-fA-F0-9][a-fA-F0-9])+').setParseAction(lambda s, l, t: [unhexlify(t[0][4:])]) quoted_string = dblQuotedString comment = Literal('#') + restOfLine rvalue = number | quoted_string # Convert all time intervals into an integer number of seconds time_unit_multipliers = { u'second': 1, u'seconds': 1, u'minute': 60, u'minutes': 60, u'hour': 60*60, u'hours': 60*60, u'day': 60*60*24, u'days': 60*60*24, u'month': 60*60*24*30, u'months': 60*60*24*30, u'year': 60*60*24*365, u'years': 60*60*24*365, } config_type = oneOf('uint8_t uint16_t uint32_t int8_t int16_t int32_t uint8_t[] uint16_t[] uint32_t[] int8_t[] int16_t[] int32_t[] string binary') comp = oneOf('> < >= <= == ~=') time_unit = oneOf(u"second seconds minute minutes hour hours day days week weeks month months year years") time_interval = (number + time_unit).setParseAction(lambda s, l, t: [t[0]*time_unit_multipliers[t[1]]]) slot_id = Literal(u"controller") | (Literal(u'slot') + number) slot_id.setParseAction(lambda s,l,t: [SlotIdentifier.FromString(u' '.join([str(x) for x in t]))]) stream_modifier = Literal("system") | Literal("user") | Literal("combined") stream = Optional(Literal("system")) + oneOf("buffered unbuffered input output counter constant") + number + Optional(Literal("node")) stream.setParseAction(lambda s,l,t: [DataStream.FromString(u' '.join([str(x) for x in t]))]) all_selector = Optional(Literal("all")) + Optional(stream_modifier) + oneOf("buffered unbuffered inputs outputs counters constants") + Optional(Literal("nodes")) all_selector.setParseAction(lambda s,l,t: [DataStreamSelector.FromString(u' '.join([str(x) for x in t]))]) one_selector = Optional(Literal("system")) + oneOf("buffered unbuffered input output counter constant") + number + Optional(Literal("node")) one_selector.setParseAction(lambda s,l,t: [DataStreamSelector.FromString(u' '.join([str(x) for x in t]))]) selector = one_selector | all_selector trigger_comp = oneOf('> < >= <= ==') stream_trigger = Group((Literal(u'count') | Literal(u'value')) + Literal(u'(').suppress() - stream - Literal(u')').suppress() - trigger_comp - number).setResultsName('stream_trigger')
def grammar(): """Define the query grammar. Backus-Naur form (BNF) of the grammar:: <grammar> ::= <item> | <item> <and_or> <grammar> <item> ::= [<neg>] <query-token> | [<neg>] "(" <grammar> ")" <query-token> ::= <token> | <hosts> <token> ::= <category>:<key> [<operator> <value>] Given that the pyparsing library defines the grammar in a BNF-like style, for the details of the tokens not specified above check directly the source code. Returns: pyparsing.ParserElement: the grammar parser. """ # Boolean operators and_or = (pp.CaselessKeyword('and') | pp.CaselessKeyword('or'))('bool') # 'neg' is used as label to allow the use of dot notation, 'not' is a reserved word in Python neg = pp.CaselessKeyword('not')('neg') operator = pp.oneOf(OPERATORS, caseless=True)('operator') # Comparison operators quoted_string = pp.quotedString.copy().addParseAction(pp.removeQuotes) # Both single and double quotes are allowed # Parentheses lpar = pp.Literal('(')('open_subgroup') rpar = pp.Literal(')')('close_subgroup') # Hosts selection: glob (*) and clustershell (,!&^[]) syntaxes are allowed: # i.e. host10[10-42].*.domain hosts = quoted_string | (~(and_or | neg) + pp.Word(pp.alphanums + '-_.*,!&^[]')) # Key-value token for allowed categories using the available comparison operators # i.e. F:key = value category = pp.oneOf(CATEGORIES, caseless=True)('category') key = pp.Word(pp.alphanums + '-_.%@:')('key') selector = pp.Combine(category + ':' + key) # i.e. F:key # All printables characters except the parentheses that are part of this or the global grammar all_but_par = ''.join([c for c in pp.printables if c not in ('(', ')', '{', '}')]) value = (quoted_string | pp.Word(all_but_par))('value') token = selector + pp.Optional(operator + value) # Final grammar, see the docstring for its BNF based on the tokens defined above # Groups are used to split the parsed results for an easy access full_grammar = pp.Forward() item = pp.Group(pp.Optional(neg) + (token | hosts('hosts'))) | pp.Group( pp.Optional(neg) + lpar + full_grammar + rpar) full_grammar << item + pp.ZeroOrMore(pp.Group(and_or) + full_grammar) # pylint: disable=expression-not-assigned return full_grammar
def parse_table(attribute, string): Line = OneOrMore(Float)('data') + Literal(';') + Optional(Comments, default='')('name') Grammar = Suppress(Keyword('mpc.{}'.format(attribute)) + Keyword('=') + Keyword('[') + Optional(Comments)) + OneOrMore(Group(Line)) + Suppress(Keyword(']') + Optional(Comments)) result, i, j = Grammar.scanString(string).next() _list = list() for r in result: _list.append([int_else_float_except_string(s) for s in r['data'].asList()]) return _list
def __init__(self, comm_file_path): expression_spaced = Forward() expression = Forward() args_spaced = Forward() cb = Optional(',') + ')' # closing_brackets might include a ',' ob = Optional(' ') + '(' + Optional(' ') # closing_brackets might include a ' ' value = (Or([pyparsing_common.identifier.copy().setResultsName('id'), pyparsing_common.number.copy().setResultsName('number'), QuotedString("'").setResultsName('string')])).setParseAction(Value).setResultsName('value') values = (ZeroOrMore(value.setResultsName('valueList', listAllMatches=True) + Optional(','))).setParseAction( Values) keyword = pyparsing_common.identifier.copy() keyword_argument = ( keyword.setResultsName('keyword') + '=' + expression_spaced.setResultsName('expression') ).setParseAction(Keyword_argument) keyword_arguments = ( keyword_argument.setResultsName('keyword_argument', listAllMatches=True) + ZeroOrMore(',' + keyword_argument.setResultsName('keyword_argument', listAllMatches=True)) ).setParseAction(Keyword_arguments) expression << (Or([ value, (ob + values.setResultsName('values') + cb), '_F' + ob + keyword_arguments.setResultsName('keyword_arguments') + cb, ob + expression.setResultsName('expression') + cb ])).setParseAction(Expression) expression_spaced << (Or([expression, ob + expression_spaced + cb])) left_side = pyparsing_common.identifier.setResultsName('left_side') operator_name = pyparsing_common.identifier.setResultsName('operator_name') paragraph = (Optional(left_side + "=") + operator_name + ob + Optional(keyword_arguments .setResultsName( 'keyword_arguments')) + cb + Optional(';')).setParseAction(Paragraph) file = OneOrMore(paragraph).setResultsName('paragraphs').setParseAction(File) self.beam_data_model = file.parseFile(comm_file_path)
def __init__(self, obj, config=str()): def process(config): pathexpr = p.Literal("'").suppress() + \ p.Optional( p.Combine( p.OneOrMore(p.Literal("/") + p.Word(p.alphanums)) + p.Literal("/").suppress() ) ).setResultsName('path') + \ p.Combine( (p.Literal('@').suppress() | p.Literal('!').suppress()) + p.Word(p.alphanums) + p.Literal("'").suppress() ).setResultsName('attrib') expr = p.Group(pathexpr).setResultsName('search') match = expr.parseString(config) _ret = [] if 'search' in match: if 'path' in match['search']: _ret.append(match['search']['path']) if 'attrib' in match['search']: _ret.append(match['search']['attrib']) return _ret super(Xattrib, self).__init__(obj, config=config, defer=True) if self.config is None or len(self.config) < 1 or not isinstance(self.config, str): raise ValueError('Xattrib plugin function requires a config string') try: _result = process("'%s'" % self.config) if len(_result) == 2: self.targetobject, self.targetattribute = _result elif len(_result) == 1: _config = getattr(obj, _result[0], None) if _config is None: raise ValueError('Xattrib plugin received an attribute name that does not exist') # TODO len check only required for attributes, but not method plugins if len(_config) > 1: raise ValueError('Xattrib plugin received a attribute name that contains multiple values') self.targetobject, self.targetattribute = process("'%s'" % _config[0]) else: raise Exception() except: raise ValueError('An error occured when processing the search string for the Xattrib plugin function')