我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用lxml.etree.XMLParser()。
def produce_output(inputf, outfile): ecbplus = etree.parse(inputf, etree.XMLParser(remove_blank_text=True)) root_ecbplus = ecbplus.getroot() root_ecbplus.getchildren() event_mentions = extract_event_CAT(ecbplus) event_per_sentence = event_sentence(ecbplus, event_mentions) event_pairs = generate_event_pairs(event_per_sentence) # print(event_mentions) for k, v in event_pairs.items(): for i in v: output = open(outfile, "a") output.writelines("_".join(event_mentions[i[0]]) + "\t" + "_".join(event_mentions[i[1]]) + "\tPRECONDITION" + "\n") output.close()
def parseXML(self): assert self.filepath.endswith(XML_EXT), "Unsupport file format" parser = etree.XMLParser(encoding=ENCODE_METHOD) xmltree = ElementTree.parse(self.filepath, parser=parser).getroot() filename = xmltree.find('filename').text try: verified = xmltree.attrib['verified'] if verified == 'yes': self.verified = True except KeyError: self.verified = False for object_iter in xmltree.findall('object'): bndbox = object_iter.find("bndbox") label = object_iter.find('name').text # Add chris difficult = False if object_iter.find('difficult') is not None: difficult = bool(int(object_iter.find('difficult').text)) self.addShape(label, bndbox, difficult) return True
def __init__(self, file_like) : parser = etree.XMLParser(ns_clean=True) tree = etree.parse(file_like, parser) gexf_xml = tree.getroot() tag = self.ns_clean(gexf_xml.tag).lower() if tag <> "gexf" : self.msg_unexpected_tag("gexf", tag) return self.gexf_obj = None for child in gexf_xml : tag = self.ns_clean(child.tag).lower() if tag == "meta" : meta_xml = child self.gexf_obj = self.extract_gexf_obj(meta_xml) if tag == "graph" : graph_xml = child if self.gexf_obj == None : self.msg_unexpected_tag("meta", tag) return self.graph_obj = self.extract_graph_obj(graph_xml)
def getAudioMetadata(fileRef): args = [config.mediaInfoExe] args.append( "--Output=EBUCore") args.append(fileRef) # Command line as string (used for logging purposes only) cmdStr = " ".join(args) status, out, err = shared.launchSubProcess(args) # Configure XML parser to get rid of blank lines in MediaInfo output parser = etree.XMLParser(remove_blank_text=True) # Parse string to element #outElt = etree.fromstring(out.encode('utf-8')) outElt = etree.XML(out.encode('utf-8'), parser=parser) # Main results to dictionary dictOut = {} dictOut["cmdStr"] = cmdStr dictOut["status"] = status dictOut["outElt"] = outElt dictOut["stderr"] = err return(dictOut)
def set_bing_wallpaper(): r = requests.get(URL01) if r.status_code == 200: try: parser = etree.XMLParser(recover=True) xml = etree.XML(r.content, parser) print(etree.tostring(xml)) print('===========') image = xml.find('image') urlBase = image.find('urlBase') url = 'http://www.bing.com%s_1920x1200.jpg' % (urlBase.text) if download(url) is True: set_background(comun.POTD) print('===========') except Exception as e: print(e)
def readXml(self, simType): path = os.path.dirname(__file__) if simType == types.TYPE_USIM: path = os.path.join(path, "sim_files_3g.xml") else: path = os.path.join(path, "sim_files_2g.xml") tree = etree.ElementTree() if not os.path.exists(path): logging.warning("File %s not exists" %path) logging.info("Create xml") if simType == types.TYPE_USIM: root = etree.Element('sim_3G') else: root = etree.Element('sim_2G') else: parser = etree.XMLParser(remove_blank_text=True) root = etree.parse(path, parser).getroot() return path, root
def remove_resource_value(issue, filepath, ignore_layouts_value): """ Read an xml file and remove an element which is unused, then save the file back to the filesystem """ # if os.path.exists(filepath): # tory ignore layouts ?????layout??? print "remove_resource_value()...%s --> %s" % (issue.elements[0][0], filepath) if os.path.exists(filepath) and (ignore_layouts_value is False or issue.elements[0][0] != 'layout'): doCheck(filepath=filepath,issue=issue) for element in issue.elements: print('removing {0} from resource {1}'.format(element, filepath)) parser = etree.XMLParser(remove_blank_text=False, remove_comments=False, remove_pis=False, strip_cdata=False, resolve_entities=False) tree = etree.parse(filepath, parser) root = tree.getroot() for unused_value in root.findall('.//{0}[@name="{1}"]'.format(element[0], element[1])): root.remove(unused_value) with open(filepath, 'wb') as resource: tree.write(resource, encoding='utf-8', xml_declaration=True)
def _canonicalize(xml_string): ''' Given an xml string, canonicalize the string per U{http://www.w3.org/2001/10/xml-exc-c14n#} @type xml_string: C{str} @param xml_string: The XML string that needs to be canonicalized. @rtype: C{str} @return: Canonicalized string in Unicode. ''' parser = etree.XMLParser(remove_blank_text=True) tree = etree.fromstring(xml_string, parser=parser).getroottree() string = BytesIO() tree.write_c14n(string, exclusive=True, with_comments=False) return string.getvalue().decode(UTF_8)
def from_string(ujml_string, file_name="<ujml_input>", globals=None) -> UjmlNode: """ Used lo load in a ujml code from a string. :param string ujml_string: String containing the ujml code. :param string file_name: Source code file name. :param dict globals: Optional dictionary containing global values available in ujml local python interpreter :return: Ujml root node. :rtype: urban_journey.UjmlNode """ parser = etree.XMLParser() lookup = etree.ElementDefaultClassLookup(element=UjmlElement) parser.set_element_class_lookup(lookup) root_elem = etree.fromstring(ujml_string, parser) ujml_node = UjmlNode(root_elem, file_name, globals or {}) return ujml_node
def from_file(file_path, globals=None): """ Used lo load in a ujml code from a file. :param string file_path: Path to ujml file :param dict globals: Optional dictionary containing global values available in ujml local python interpreter :return: Ujml root node. :rtype: urban_journey.UjmlNode """ file_path = os.path.abspath(file_path) with open(file_path) as f: source = f.read() parser = etree.XMLParser() lookup = etree.ElementDefaultClassLookup(element=UjmlElement) parser.set_element_class_lookup(lookup) root_elem = etree.fromstring(source, parser) ujml_node = UjmlNode(root_elem, file_path, globals or {}) return ujml_node
def __init__(self, filepath): if not filepath.lower().endswith((".ski","_parameters.xml")): raise ValueError("Invalid filename extension for ski file") # Set the path to the ski file self.path = os.path.expanduser(filepath) # load the XML tree (remove blank text to avoid confusing the pretty printer when saving) self.tree = etree.parse(arch.opentext(self.path), parser=etree.XMLParser(remove_blank_text=True)) # Replace path by the full, absolute path self.path = os.path.abspath(self.path) ## This function saves the (possibly updated) contents of the SkiFile instance into the specified file. # The filename \em must end with ".ski". Saving to and thus replacing the ski file from which this # SkiFile instance was originally constructed is allowed, but often not the intention.
def get_total_pages(self): page_number = 0 url = "http://itunes.apple.com/WebObjects/MZStore.woa/wa/viewContentsUserReviews?id=%s&pageNumber=%d&sortOrdering=4&onlyLatestVersion=false&type=Purple+Software" % (self.app_id, page_number) # req = urllib.request.Request(url, headers={"X-Apple-Store-Front": self.front,"User-Agent": self.user_agent}) # u = urllib.request.urlopen(req, timeout=5) # page = u.read() headers = {"X-Apple-Store-Front": self.front,"User-Agent": self.user_agent} u = requests.get(url, timeout=5, verify=False, headers=headers) u.raise_for_status() page = u.content # root = ET.fromstring(page) parser = etree.XMLParser(recover=True) root = etree.fromstring(page, parser=parser) for node in root.findall('{http://www.apple.com/itms/}View/{http://www.apple.com/itms/}ScrollView/{http://www.apple.com/itms/}VBoxView/{http://www.apple.com/itms/}View/{http://www.apple.com/itms/}MatrixView/{http://www.apple.com/itms/}VBoxView/{http://www.apple.com/itms/}VBoxView/{http://www.apple.com/itms/}HBoxView/{http://www.apple.com/itms/}TextView/{http://www.apple.com/itms/}SetFontStyle/{http://www.apple.com/itms/}b'): try: self.total_pages = re.search('Page 1 of (\d+)', node.text).group(1) except: self.total_pages = 1 self.total_pages = int(self.total_pages) self.finish_page = self.total_pages
def get_total_pages(self): page_number = 0 url = "https://itunes.apple.com/WebObjects/MZStore.woa/wa/viewContentsUserReviews?id=%s&pageNumber=%d&sortOrdering=4&onlyLatestVersion=false&type=Purple+Software" % (self.app_id, page_number) # req = urllib.request.Request(url, headers={"X-Apple-Store-Front": self.front,"User-Agent": self.user_agent}) # u = urllib.request.urlopen(req) # page = u.read() headers = {"X-Apple-Store-Front": self.front,"User-Agent": self.user_agent} u = requests.get(url, verify=False, headers=headers) u.raise_for_status() page = u.content parser = etree.XMLParser(recover=True) root = etree.fromstring(page, parser=parser) for node in root.findall('{http://www.apple.com/itms/}View/{http://www.apple.com/itms/}ScrollView/{http://www.apple.com/itms/}VBoxView/{http://www.apple.com/itms/}View/{http://www.apple.com/itms/}MatrixView/{http://www.apple.com/itms/}VBoxView/{http://www.apple.com/itms/}VBoxView/{http://www.apple.com/itms/}HBoxView/{http://www.apple.com/itms/}TextView/{http://www.apple.com/itms/}SetFontStyle/{http://www.apple.com/itms/}b'): try: self.total_pages = re.search(b'Page 1 of (\d+)', node.text).group(1) except: self.total_pages = 1 return int(self.total_pages)
def __init__(self, xml): # self.tree = etree.parse(StringIO(xml)) parser = etree.XMLParser(remove_blank_text=True) try: self.tree = etree.parse(StringIO(xml), parser) except Exception as e: log.error('Exception when parse xml: {}'.format(e)) log.error('xml that fail: \n{}'.format(xml)) log.error('Traceback: {}'.format(traceback.format_exc())) return False self.vm_dict = self.dict_from_xml(self.tree) # def update_xml(self,**kwargs): # if kwargs.__contains__('vcpus'): # log.debug(1.)
def parsexml_(infile, parser=None, **kwargs): if parser is None: # Use the lxml ElementTree compatible parser so that, e.g., # we ignore comments. try: parser = etree_.ETCompatXMLParser() except AttributeError: # fallback to xml.etree parser = etree_.XMLParser() doc = etree_.parse(infile, parser=parser, **kwargs) return doc # # User methods # # Calls to the methods in these classes are generated by generateDS.py. # You can replace these methods by re-implementing the following class # in a module named generatedssuper.py.
def emit(self, outfile=None, versioned=True, ns={}): #def emit(self, outfile=None, ns={}): if outfile is None: outfile = sys.stdout else: if versioned: outfile = self.version_filename(outfile, self.conf) pass outfile = open(outfile, "w") pass ns['includes'] = self.build_includes() ns['provisioners'] = self.build_provisioners() ns['devices'] = self.build_devices() ns['iterators'] = self.build_iterators() ns['templated_changes'] = self.build_templated_changes() ns['non_templated_changes'] = self.build_non_templated_changes() output = self.config_template.safe_substitute(ns) output = etree.fromstring( output, parser=etree.XMLParser() ) #log.debug("output: %s", output ) outfile.write( etree.tostring(output, pretty_print=True) ) #outfile.write(output) outfile.write('\n')
def remove_prefix(fname): """This removes namespace prefix from all the things in the xml. """ from lxml import etree, objectify parser = etree.XMLParser(remove_blank_text=True) tree = etree.parse(fname, parser) root = tree.getroot() for elem in root.getiterator(): if not hasattr(elem.tag, 'find'): continue i = elem.tag.find('}') if i >= 0: elem.tag = elem.tag[i + 1:] objectify.deannotate(root, cleanup_namespaces=True) # fname_out = fname.replace('.xml', '.out.xml') # tree.write(fname_out, # pretty_print=True, # xml_declaration=True, # encoding='UTF-8') return tree
def get_stream_info(self): """Parse xml stream info returned by server.""" stream_info = [] self.stream_xml = self.stream_xml.replace('encoding="utf-8"', '') parser = etree.XMLParser(remove_blank_text=True) tree = etree.parse(StringIO(self.stream_xml), parser) root = tree.getroot() for s in root.iterchildren(): if s.tag == "station": s_dic = dict(zip(s.keys(), s.values())) s_dic['channel'] = [] stream_info.append(s_dic) for c in s.iterchildren(): if c.tag == "stream": c_dic = dict(zip(c.keys(), c.values())) s_dic['channel'].append(c_dic) return stream_info
def process_schema_buffer(buf, table, db, mongodb): parser = etree.XMLParser(recover=True) tnode = etree.fromstring(buf, parser=parser) doc = dict() doc['_id'] = db + '.' + table doc['primary_key'] = [] doc['table'] = table doc['db'] = db for child in tnode: if child.tag == 'field': if child.attrib['Key'] == 'PRI': doc['primary_key'].append(child.attrib['Field']) try: mongodb.insert_primary_key(doc) except Exception as e: raise SysException(e) del tnode
def GetAttachmentCollection(self, _id): """Get Attachments for given List Item ID""" # Build Request soap_request = soap('GetAttachmentCollection') soap_request.add_parameter('listName', self.listName) soap_request.add_parameter('listItemID', _id) self.last_request = str(soap_request) # Send Request response = self._session.post(url=self._url('Lists'), headers = self._headers('GetAttachmentCollection'), data = str(soap_request), verify = False) # Parse Request if response.status_code == 200: envelope = etree.fromstring(response.text.encode('utf-8'), parser=etree.XMLParser(huge_tree=self.huge_tree)) attaches = envelope[0][0][0][0] attachments = [] for attachment in attaches.getchildren(): attachments.append(attachment.text) return attachments else: return response
def sign(self): # TODO: change hardcodeed key paths to environement variables cert = open('cert.pem').read() key = open('key.pem').read() root = etree.fromstring(self._xml.encode('ISO-8859-1'), parser=etree.XMLParser(encoding='ISO-8859-1')) signed_root = xmldsig(root, digest_algorithm='sha1').sign(algorithm='rsa-sha1', key=key, cert=cert) signed_root.xpath('//ext:UBLExtensions/ext:UBLExtension/ext:ExtensionContent/ds:Signature', namespaces={'ext': 'urn:oasis:names:specification:ubl:schema:xsd:CommonExtensionComponents-2', 'ds': 'http://www.w3.org/2000/09/xmldsig#'})[0].attrib['Id'] = 'SignSUNAT' self._xml = etree.tostring(signed_root, encoding='ISO-8859-1') print (xmldsig(signed_root).verify(require_x509=True, x509_cert=cert, ca_pem_file=key, ca_path=None, hmac_key=None, validate_schema=True, parser=None, id_attribute=None))
def raw_to_vcs(self, b): """ Convert xml from the raw pbit to onse suitable for version control - i.e. nicer encoding, pretty print, etc. """ parser = etree.XMLParser(remove_blank_text=True) # If no encoding is specified in the XML, all is well - we can decode it then pass the unicode to the parser. # However, if encoding is specified, then lxml won't accept an already decoded string - so we have to pass it # the bytes (and let it decode). m = re.match(b'^.{,4}\<\?xml [^\>]*encoding=[\'"]([a-z0-9_\-]+)[\'"]', b) if m: xml_encoding = m.group(1).decode('ascii') if xml_encoding.lower() != self.lxml_encoding.lower(): raise ValueError("TODO") root = etree.fromstring(b, parser) else: root = etree.fromstring(b.decode(self.encoding), parser) # return pretty-printed, with XML, in UTF-8 return etree.tostring(root, pretty_print=True, xml_declaration=self.xml_declaration, encoding='utf-8')
def GetInstanceList(self, root, name, debug=False): instances = [] project = self.GetProject(debug) if project is not None: factory = InstancesPathFactory(instances) parser = etree.XMLParser() parser.resolvers.add(LibraryResolver(self, debug)) instances_path_xslt_tree = etree.XSLT( etree.parse( os.path.join(ScriptDirectory, "plcopen", "instances_path.xslt"), parser), extensions={ ("instances_ns", "AddInstance"): factory.AddInstance}) instances_path_xslt_tree( root, instance_type=etree.XSLT.strparam(name)) return instances
def GetPouInstanceTagName(self, instance_path, debug=False): project = self.GetProject(debug) factory = InstanceTagName(self) parser = etree.XMLParser() parser.resolvers.add(LibraryResolver(self, debug)) instance_tagname_xslt_tree = etree.XSLT( etree.parse( os.path.join(ScriptDirectory, "plcopen", "instance_tagname.xslt"), parser), extensions={("instance_tagname_ns", name): getattr(factory, name) for name in ["ConfigTagName", "ResourceTagName", "PouTagName", "ActionTagName", "TransitionTagName"]}) instance_tagname_xslt_tree( project, instance_path=etree.XSLT.strparam(instance_path)) return factory.GetTagName()
def GetVariableDictionary(self, object_with_vars, tree=False, debug=False): variables = [] factory = VariablesInfosFactory(variables) parser = etree.XMLParser() parser.resolvers.add(LibraryResolver(self, debug)) variables_infos_xslt_tree = etree.XSLT( etree.parse( os.path.join(ScriptDirectory, "plcopen", "variables_infos.xslt"), parser), extensions={("var_infos_ns", name): getattr(factory, name) for name in ["SetType", "AddDimension", "AddTree", "AddVarToTree", "AddVariable"]}) variables_infos_xslt_tree( object_with_vars, tree=etree.XSLT.strparam(str(tree))) return variables # Add a global var to configuration to configuration
def parse_links_xml(self, content): """ Method parse links from xml """ links = [] tree = etree.XML(content, etree.XMLParser()) tags = tree.xpath('//*') for tag in tags: if tag.text and validate_uri_start(tag.text): links.append(tag.text) for attr in tag.attrib: if validate_uri_start(tag.attrib[attr]): links.append(tag.attrib[attr]) if tag.text and tag.text.strip(): try: content_links = self.parse_links_html_re(tag.text) links.extend(content_links) except KeyError: Registry().get('logger').log("ENC: " + tag.text) return links
def download_default_transcript(self, url=None, language_code=None): # pylint: disable=unused-argument """ Download default transcript from Youtube API and format it to WebVTT-like unicode. Reference to `get_transcripts_from_youtube()`: https://github.com/edx/edx-platform/blob/ecc3473d36b3c7a360e260f8962e21cb01eb1c39/common/lib/xmodule/xmodule/video_module/transcripts_utils.py#L122 """ if url is None: raise VideoXBlockException(_('`url` parameter is required.')) utf8_parser = etree.XMLParser(encoding='utf-8') data = requests.get(url) xmltree = etree.fromstring(data.content, parser=utf8_parser) sub = [ self.format_transcript_element(element, i) for i, element in enumerate(xmltree, 1) ] sub = "".join(sub) sub = u"WEBVTT\n\n" + unicode(sub) if "WEBVTT" not in sub else unicode(sub) return sub
def xmla_authentication(self): # type: () -> bool """Check if excel need authentication to access cubes or not. (xmla_authentication tag in the config file). :return: True | False """ # xmla authentication only in excel if self.config_file_exists(): with open(self.get_config_file_path()) as config_file: parser = etree.XMLParser() tree = etree.parse(config_file, parser) try: return tree.xpath('/cubes/xmla_authentication')[ 0].text == 'True' except BaseException: return False else: return False
def get_cubes_names(self): """Get all cubes names in the config file. :return: dict with dict name as key and cube source as value (csv | postgres | mysql | oracle | mssql) """ # if client_type == 'excel': file_path = self.get_config_file_path() # elif client_type == 'web': # file_path = self.get_web_confile_file_path() # else: # raise ValueError("Unknown client_type: {}".format(client_type)) with open(file_path) as config_file: parser = etree.XMLParser() tree = etree.parse(config_file, parser) try: return { cube.find('name').text: cube.find('source').text for cube in tree.xpath('/cubes/cube') } except BaseException: # pragma: no cover raise ValueError('missed name or source tags')
def default_parser(self, encoding): # This can either return a parser object or a class, which # will be instantiated with default arguments. if self._default_parser is not None: return self._default_parser return etree.XMLParser( target=self, strip_cdata=False, recover=True, encoding=encoding, remove_blank_text=False, attribute_defaults=False, dtd_validation=False, load_dtd=False, no_network=True, ns_clean=True, resolve_entities=False, remove_comments=False, remove_pis=False, collect_ids=False, compact=False, )
def check_document_units(cls, path): with open(path, 'r') as file: p = etree.XMLParser(huge_tree = True) document = etree.parse(file, parser = p) height_attr = document.getroot().get('height') if height_attr is None: raise util.UserError('SVG document has no height attribute. See https://github.com/Feuermurmel/openscad-template/wiki/Absolute-Measurements') _, height_unit = cls._parse_measure(height_attr) if height_unit is None or height_unit == 'px': raise util.UserError('Height of SVG document is not an absolute measure. See https://github.com/Feuermurmel/openscad-template/wiki/Absolute-Measurements') if document.getroot().get('viewBox') is None: raise util.UserError('SVG document has no viewBox attribute. See https://github.com/Feuermurmel/openscad-template/wiki/Absolute-Measurements')
def parse(cls, filename=None, huge_tree=True): """Parse an SVG file (or stdin) and return an SVGContext. Args: filename: The SVG file to parse. If this is None stdin will be read by default. Returns: An SVGContext """ parser = etree.XMLParser(huge_tree=huge_tree) if filename is None: document = etree.parse(sys.stdin, parser=parser) else: with open(filename, 'r') as stream: document = etree.parse(stream, parser=parser) return cls(document)
def protectLand(infile, outfile, fraction, landClasses=None, otherArable=False, regions=None, unprotectFirst=False): """ Create a copy of `infile` that protects a `fraction` of `landClasses` in `regions`. :param infile: the path of a GCAM land_input XML file :param outfile: the path of the XML file to create by modifying data from `infile` :param fraction: the fraction of land in the given land classes to protect :param landClasses: a string or a list of strings, or None. If None, all "standard" unmanaged land classes are modified. :param otherArable: (bool) if True, land class 'OtherArableLand' is included in default land classes. :param regions: a string or a list of strings, or None. If None, all regions are modified. :param unprotectFirst: (bool) if True, make all land "unprotected" before protecting. :return: None """ parser = ET.XMLParser(remove_blank_text=True) tree = ET.parse(infile, parser) createProtected(tree, fraction, landClasses=landClasses, otherArable=otherArable, regions=regions, unprotectFirst=unprotectFirst) tree.write(outfile, xml_declaration=True, pretty_print=True)
def readConditionalFile(xmlFile, varDict, removeComments=True): """ Read a conditional XML file, interpreting tests using the values in `varDict`, and returning the resulting XML tree after replacing conditional elements with the elements the expression evaluates to. :param xmlFile: (str) pathname of an XML file to read. :param varDict: (dict) values to use when interpreting <TEST> nodes :return: an XML tree """ parser = ET.XMLParser(remove_blank_text=True, remove_comments=removeComments) tree = ET.parse(xmlFile, parser) root = tree.getroot() elts = evaluateConditional(root, varDict) return root.gettree()
def default_parser(self): # This can either return a parser object or a class, which # will be instantiated with default arguments. return etree.XMLParser(target=self, strip_cdata=False, recover=True)
def __init__(self, app, image_sample): self._app = app self._testimage = image_sample self.temporary = tempfile.NamedTemporaryFile() self.parser = etree.XMLParser(remove_blank_text=True) self.current = None self.image_list = []
def default_parser(self, encoding): # This can either return a parser object or a class, which # will be instantiated with default arguments. if self._default_parser is not None: return self._default_parser return etree.XMLParser( target=self, strip_cdata=False, recover=True, encoding=encoding)
def setUp(self): self.parser = etree.XMLParser(resolve_entities=False) self.treewalker = html5lib.getTreeWalker("lxml") self.serializer = serializer.HTMLSerializer()
def __init__(self, document, additional_namespaces=None): self._logger = logging.getLogger(__name__) self._nsmap = copy.deepcopy(NS) if additional_namespaces: self._nsmap.update(additional_namespaces) self._dict = {} self._document = document parser = etree.XMLParser(ns_clean=True, recover=True, encoding='utf-8') self._root = etree.fromstring(self._document, parser=parser) self._check_fault() self._parse()
def read_input(catff, naff, pairs_same_sentence_ppmi, pairs_cross_sentence_ppmi): ecbplus = etree.parse(catff, etree.XMLParser(remove_blank_text=True)) root_ecbplus = ecbplus.getroot() root_ecbplus.getchildren() doc_naf = etree.parse(naff, etree.XMLParser(remove_blank_text=True)) naf_root = doc_naf.getroot() naf_root.getchildren() event_tokens, event_lemmas, event_same_sentence = read_cat_naf(ecbplus, naf_root) event_lemma_pairs_same_sentence = sentence_coocc(event_lemmas, event_same_sentence) event_lemma_pairs_cross_sentence = cross_sentence(event_tokens) plot_link_same_sent = candidate_pairs_same_sent(pairs_same_sentence_ppmi,event_lemma_pairs_same_sentence,event_lemmas,event_same_sentence,event_tokens) plot_link_cross_sent = candidate_pairs_cross_sent(pairs_cross_sentence_ppmi, event_lemma_pairs_cross_sentence, event_lemmas, event_tokens) plot_link = plot_link_same_sent + plot_link_cross_sent plot_link_cleaned = [] plot_link_cleaned = [i for i in plot_link if i not in plot_link_cleaned] return plot_link_cleaned
def read_input(catff, naff, pairs_same_sentence_ppmi, pairs_cross_sentence_ppmi): ecbplus = etree.parse(catff, etree.XMLParser(remove_blank_text=True)) root_ecbplus = ecbplus.getroot() root_ecbplus.getchildren() doc_naf = etree.parse(naff, etree.XMLParser(remove_blank_text=True)) naf_root = doc_naf.getroot() naf_root.getchildren() event_tokens, event_lemmas, event_same_sentence = read_cat_naf(ecbplus, naf_root) contains_event = get_tanchor_cat(ecbplus) event_lemma_pairs_same_sentence = sentence_coocc(event_lemmas, event_same_sentence) event_lemma_pairs_cross_sentence = cross_sentence(event_tokens) plot_link_same_sent = candidate_pairs_same_sent(pairs_same_sentence_ppmi,event_lemma_pairs_same_sentence,event_lemmas,event_same_sentence,event_tokens,contains_event) plot_link_cross_sent = candidate_pairs_cross_sent(pairs_cross_sentence_ppmi, event_lemma_pairs_cross_sentence, event_lemmas, event_tokens, contains_event) plot_link = plot_link_same_sent + plot_link_cross_sent plot_link_cleaned = [] plot_link_cleaned = [i for i in plot_link if i not in plot_link_cleaned] return plot_link_cleaned
def parse_schedule(xml, filename): """ Parses a schedule definition in XML. :param str xml: The XML with a schedule definition :param str filename: :rtype: enarksh.xml_reader.node.ScheduleNode """ with open(os.path.join(C.HOME, 'etc/enarksh.xsd'), 'rb') as f: xsd = f.read() etree.clear_error_log() schema_root = etree.XML(xsd) schema = etree.XMLSchema(schema_root) parser = etree.XMLParser(schema=schema, encoding='utf8') try: root = etree.fromstring(bytes(xml, 'utf8'), parser) # Root element must be a schedule. if root.tag != 'Schedule': raise Exception("Root element must be 'Schedule' but '{0!s}' was found.".format(root.tag)) schedule = create_node('Schedule') schedule.read_xml(root) error = schedule.validate() if error: raise Exception( "File '{0!s}' is not a valid schedule configuration file.\n{1!s}".format(filename, error)) # Set recursion and dependency levels. schedule.set_levels() except etree.XMLSyntaxError as exception: log = logging.getLogger('enarksh') log.error(exception.error_log.filter_from_level(etree.ErrorLevels.WARNING)) raise exception return schedule # ------------------------------------------------------------------------------------------------------------------
def parse_dynamic_worker(xml, parent): """ Parses a schedule definition in XML. :param str xml: The XML with a schedule definition :param parent: :rtype: enarksh.xml_reader.node.CompoundJobNode """ with open(os.path.join(C.HOME, 'etc/enarksh.xsd'), 'rb') as f: xsd = f.read() schema_root = etree.XML(xsd) schema = etree.XMLSchema(schema_root) parser = etree.XMLParser(schema=schema, encoding='utf8') root = etree.fromstring(bytes(xml, 'utf8'), parser) # Root element must be a dynamic inner worker. if root.tag != 'DynamicInnerWorker': raise Exception("Root element must be 'DynamicInnerWorker' but '{0!s}' was found.".format(root.tag)) worker = create_node('DynamicInnerWorker') worker.read_xml(root) error = worker.validate(parent) if error: raise Exception("XML message is not a valid dynamic worker configuration.\n{0!s}".format(error)) # Set recursion and dependency levels. worker.set_levels() return worker # ------------------------------------------------------------------------------------------------------------------
def parse_host(filename): """ Parses a host definition in XML. :param str filename: The XML file with a host definition :rtype: enarksh.xml_reader.Host.Host """ with open(filename, 'rt', encoding='utf-8') as stream: xml = stream.read() with open(os.path.join(C.HOME, 'etc/enarksh.xsd'), 'rb') as stream: xsd = stream.read() schema_root = etree.XML(xsd) schema = etree.XMLSchema(schema_root) parser = etree.XMLParser(schema=schema, encoding='utf8') root = etree.fromstring(bytes(xml, 'utf8'), parser) # Root element must be a schedule. if root.tag != 'Host': raise Exception("Root element must be 'Host' but '{0!s}' was found.".format(root.tag)) host = Host() host.read_xml(root) error = host.validate() if error: raise Exception("File '{0!s}' is not a valid host configuration file.\n{1!s}".format(filename, error)) return host # ----------------------------------------------------------------------------------------------------------------------