我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用xml.etree.ElementTree.iterparse()。
def xml_parse(xml_file): """ Parse an XML file, returns a tree of nodes and a dict of namespaces :param xml_file: the input XML file :returns: (doc, ns_map) """ root = None ns_map = {} # prefix -> ns_uri for event, elem in ET.iterparse(xml_file, ['start-ns', 'start', 'end']): if event == 'start-ns': # elem = (prefix, ns_uri) ns_map[elem[0]] = elem[1] elif event == 'start': if root is None: root = elem for prefix, uri in ns_map.items(): ET.register_namespace(prefix, uri) return (ET.ElementTree(root), ns_map)
def _get_py3_cls(): """Python 3.3 hides the pure Python code but defusedxml requires it. The code is based on test.support.import_fresh_module(). """ pymodname = "xml.etree.ElementTree" cmodname = "_elementtree" pymod = sys.modules.pop(pymodname, None) cmod = sys.modules.pop(cmodname, None) sys.modules[cmodname] = None pure_pymod = importlib.import_module(pymodname) if cmod is not None: sys.modules[cmodname] = cmod else: sys.modules.pop(cmodname) sys.modules[pymodname] = pymod _XMLParser = pure_pymod.XMLParser _iterparse = pure_pymod.iterparse ParseError = pure_pymod.ParseError return _XMLParser, _iterparse, ParseError
def articles(): n = 0 with bz2.BZ2File("articles.xml.bz2", 'r') as infile: for event, elem in iterparse(infile, events=("start", "end")): if event == 'start': if elem.tag == '{http://www.mediawiki.org/xml/export-0.10/}mediawiki': root = elem elif event == 'end': if elem.tag == '{http://www.mediawiki.org/xml/export-0.10/}page': title_elem = elem.find('{http://www.mediawiki.org/xml/export-0.10/}title') if title_elem is None: continue title = title_elem.text if title is None or ':' in title: continue revision = elem.find('{http://www.mediawiki.org/xml/export-0.10/}revision') if revision is None: continue text_elem = revision.find('{http://www.mediawiki.org/xml/export-0.10/}text') if text_elem is None: continue text = text_elem.text if text is None: continue yield Article(n, title, text) n += 1 #if title == 'Zhang Heng': # break root.clear()
def process_stream_iterparse(self, stream, heading=None): if self.verbosity >= 2 and heading is not None: fprintf(self.logfile, "\n=== %s ===\n", heading) si_tag = U_SSML12 + 'si' elemno = -1 sst = self.bk._sharedstrings for event, elem in ET.iterparse(stream): if elem.tag != si_tag: continue elemno = elemno + 1 if self.verbosity >= 3: fprintf(self.logfile, "element #%d\n", elemno) self.dump_elem(elem) result = get_text_from_si_or_is(self, elem) sst.append(result) elem.clear() # destroy all child elements if self.verbosity >= 2: self.dumpout('Entries in SST: %d', len(sst)) if self.verbosity >= 3: for x, s in enumerate(sst): fprintf(self.logfile, "SST x=%d s=%r\n", x, s)
def gpx_parser(fh): it = ElementTree.iterparse(fh, events=('start','end')) # look for the start gpx tag to fail fast for event, elem in it: if event == 'start' and elem.tag.endswith('}gpx'): break else: raise ValueError('Not a gpx file: %s' % fh.name) # do the main parse for event, elem in it: if event == 'end' and elem.tag.endswith('}trkpt'): latlon = (float(elem.attrib['lat']), float(elem.attrib['lon'])) elev = np.nan time = None for child in elem: tag_name = child.tag.rsplit('}', 1)[1] if tag_name == 'ele': elev = float(child.text) elif tag_name == 'time': time = child.text yield latlon, time, elev elem.clear()
def _xml_to_dict(self, stream): """ Reads the locally saved xml file and converts to a Python dictionary. """ print('[%%] Parsing catalogue to Dict') catalog = defaultdict(dict) for event, symbol in ET.iterparse(XML_FILE): if symbol.tag == 'symbol': if symbol.get('price-stream') == stream: main_key = symbol.get('name') values = symbol.attrib catalog[main_key]['attribs'] = values catalog[main_key]['time-frames'] = {} child = symbol.getchildren()[0] for tf in child.findall('timeframe'): time_frame = tf.get('name') v = tf.attrib catalog[main_key]['time-frames'][time_frame] = v #symbol.clear() return dict(catalog)
def parse(self): # get an iterable from XML context = iter(ET.iterparse(self.__input, events=("start", "end"))) # get the root element _event, root = next(context) # extract namespace end = root.tag.find('}') if end > 0: self._namespace = root.tag[:end+1] for event, elem in context: if event == 'start': if elem.tag.endswith('body'): break # do not parse body else: continue # skip node, not fully populated self.handle_tag(elem)
def attach_xml_body(tei_file, xml_entries): """Read given TEI XML file until the body tag. From there, insert the given entries. The result is a full TEI XML structure.""" events = ET.iterparse(tei_file, events=["start"]) root = next(events)[1] for _, elem in events: if elem.tag == 'body': break text = next(n for n in root if n.tag.endswith('text')) text.clear() # throw away all potential content body = ET.SubElement(text, 'body') for entry in xml_entries: body.append(entry) ET.register_namespace('', 'http://www.tei-c.org/ns/1.0') return ET.ElementTree(root)
def _parse(self, filebytes): tf = tempfile.TemporaryFile() tf.write(filebytes) tf.seek(0) events = "start", "start-ns" root = None ns_map = [] for event, elem in ET.iterparse(tf, events): if event == "start-ns": ns_map.append(elem) elif event == "start": if root is None: root = elem for prefix, uri in ns_map: elem.set("{}:{}".format(self.xmlns_str, prefix), uri) ns_map = [] tf.close() return ET.ElementTree(root)
def mzmlToSqlite(xmlfile, sqlitefile): parser = xml.iterparse(xmlfile) writeQueue = multiprocessing.Queue() writerProc = multiprocessing.Process(target = mzmlToSqlite_writer, args = (sqlitefile, writeQueue)) writerProc.start() for evt, obj in parser: if obj.tag == ns('spectrum'): writeQueue.put(('spectrum', readSpectrumXML(obj))) obj.clear() elif obj.tag == ns('chromatogram'): writeQueue.put(('chromatogram', readChromatoXML(obj))) obj.clear() writeQueue.put(('stop', None)) writerProc.join() return sqlitefile
def clean_dataset(path): """Remove duplicates from the dataset and write clean data in .tsv files Args: path: a path to the dataset """ with open(path, 'r') as labels_file: context = ET.iterparse(labels_file, events=("start", "end")) # turn it into an iterator context = iter(context) # get the root element event, root = next(context) with open(os.path.splitext(path)[0] + '.tsv', 'w') as tsv_file: writer = csv.writer(tsv_file, delimiter='\t') same_set = set() for event, elem in context: if event == "end" and elem.tag == "paraphrase": question = [] y = None for child in elem.iter(): if child.get('name') == 'text_1': question.append(child.text) if child.get('name') == 'text_2': question.append(child.text) if child.get('name') == 'class': y = 1 if int(child.text) >= 0 else 0 root.clear() check_string = "\n".join(question) if check_string not in same_set: writer.writerow([y, question[0], question[1]]) same_set.add(check_string)
def iterparse(self, file): return self.create_fa().iterparse(file, self.validate_dtd) # I need a better name
def handler_parse(self, file, state=None): for x in self.parse(file, state): pass # I plan to implement 'iterparse' as a near copy of 'parse' # but without any references to callbacks
def iterparse(self, file, validate_dtd=False): return self.parse(file, None, validate_dtd)
def test_parse(): import os filename = "/Users/dalke/Music/iTunes/iTunes Music Library.xml" if not os.path.exists(filename): print "Cannot find %r: skipping test" % (filename,) return # Work through callbacks ef = IterParseFilter() def print_info(event, ele, state): d = {} children = iter(ele) for child in children: key = child.text value = children.next().text d[key] = value print "%r is by %r" % (d["Name"], d.get("Artist", "<unknown>")) ele.clear() ef.on_end("/plist/dict/dict/dict", print_info) ef.handler_parse(open(filename)) # Work through iterators ef = IterParseFilter() ef.iter_end("/plist/dict/dict/dict") for (event, ele) in ef.iterparse(open(filename)): d = {} children = iter(ele) for child in children: key = child.text value = children.next().text d[key] = value print "%r is a %r song" % (d["Name"], d.get("Genre", "<unknown>")) ele.clear()
def document_corpus_as_iterable(corpus): stats = defaultdict(int) with smart_file_open(corpus) as infile: LOGGER.info("Loading documents from solr xml file: %s" % corpus) # reader = UnicodeRecoder(infile, encoding='utf-8') for event, element in ET.iterparse(infile): if event == 'end' and element.tag == 'doc': stats['num_xml_entries'] += 1 yield _parse_doc_elements(element)
def parse_new_asx(data): # Copied from mopidy.audio.playlists try: for _, element in elementtree.iterparse(data): element.tag = element.tag.lower() # normalize for ref in element.findall('entry/ref[@href]'): yield fix_asf_uri(ref.get('href', '').strip()) for entry in element.findall('entry[@href]'): yield fix_asf_uri(entry.get('href', '').strip()) except elementtree.ParseError: return
def main(argv): file_obj = open(argv[1]) print "Reading XML file ", sys.stdout.flush() level = 0 sim_list = [] for event, elem in ElementTree.iterparse(file_obj, events=("start", "end")): if event == "start": level += 1 if event == "end": level -= 1 if level == 0 and elem.tag == 'FlowMonitor': sim = Simulation(elem) sim_list.append(sim) elem.clear() # won't need this any more sys.stdout.write(".") sys.stdout.flush() print " done." for sim in sim_list: for flow in sim.flows: t = flow.fiveTuple proto = {6: 'TCP', 17: 'UDP'} [t.protocol] print "FlowID: %i (%s %s/%s --> %s/%i)" % \ (flow.flowId, proto, t.sourceAddress, t.sourcePort, t.destinationAddress, t.destinationPort) print "\tTX bitrate: %.2f kbit/s" % (flow.txBitrate*1e-3,) print "\tRX bitrate: %.2f kbit/s" % (flow.rxBitrate*1e-3,) print "\tMean Delay: %.2f ms" % (flow.delayMean*1e3,) print "\tPacket Loss Ratio: %.2f %%" % (flow.packetLossRatio*100)
def _get_elements(self, fp, tag): ''' Convenience and memory management function that iterates required tags ''' context = iter(ET.iterparse(fp, events=('start', 'end'))) _, root = next(context) # get root element for event, elem in context: if event == 'end' and elem.tag == tag: yield elem root.clear() # preserve memory
def iterate_dc_xml(**kwargs): from bibcat.ingesters.ingester import new_graph import xml.etree.ElementTree as etree filepath = kwargs.get("in_file") ingester = kwargs.get("ingester") shard_size = kwargs.get("shard_size", -1) output_dir = kwargs.get("output_dir", os.path.abspath(os.path.join(PROJECT_BASE, "output"))) start = datetime.datetime.utcnow() click.echo("Starting DC XML at {} for records at {}".format( start, filepath)) count = 0 shard_template = "dc-{}k-{}k.ttl" if shard_size is not None and shard_size > 0: shard_name = shard_template.format(count, shard_size) shard_graph = new_graph() for event, elem in etree.iterparse(filepath): if event.startswith('end') and \ elem.tag.endswith("Description"): ingester.transform(etree.tostring(elem)) shard_graph += ingester.graph if not count%10 and count > 0: click.echo(".", nl=False) #! DEBUG code with open(os.path.join(output_dir, "dpl-dc-test.ttl"), "wb+") as fo: fo.write(shard_graph.serialize(format='turtle')) break if not count%100: click.echo(count, nl=False) if shard_size is not None and shard_size > 0 and not count%shard_size: with open(os.path.join(output_dir, shard_name), 'wb+') as fo: fo.write(shard_graph.serialize(format='turtle')) shard_graph = new_graph() shard_name = shard_template.format(count, count+shard_size) count += 1 end = datetime.datetime.utcnow() click.echo("Finished DC ingestion at {} total time of {} mins for {}".format( end, (end-start).seconds / 60.0, count))
def parse(self): tree = iterparse(self.path) file_size = int(os.path.getsize(self.path)) print "\n[*] PARSING FILE: " \ + colored(self.path.split("\\")[-1], 'yellow', attrs=['bold']) print "[*] FILE SIZE: " + \ colored("%d MB" % (file_size / 1024 / 1024), 'yellow', attrs=['bold']) print "[*] BUILDING THE STRUCTURES WILL TAKE SOME TIME" try: for event, elem in tree: operation = elem.find('Operation') if elem.tag == 'event' and operation is not None: if ('Reg' in operation.text or 'CreateFile' in operation.text) \ and 'HKLM' not in elem.find('Path').text \ and 'HKCR' not in elem.find('Path').text \ and 'NAME NOT FOUND' in elem.find('Result').text: self.events[operation.text].append(elem) else: elem.clear() print colored("[*] PARSING FINISHED CORRECTLY\n", 'green', attrs=['bold']) return self.events except Exception as error: print colored("[*] PARSING FAILED", 'red', attrs=['bold']) print colored(" => " + str(error), 'red', attrs=['bold'])
def get_etree_iter(xml): return iter(ET.iterparse(io.BytesIO(xml), events=('start', 'end')))
def set_fields_xblock(self, path_to_file): path_index_page = 'index.html' try: tree = ET.parse('{}/imsmanifest.xml'.format(path_to_file)) except IOError: pass else: namespace = '' for node in [node for _, node in ET.iterparse('{}/imsmanifest.xml'.format(path_to_file), events=['start-ns'])]: if node[0] == '': namespace = node[1] break root = tree.getroot() if namespace: resource = root.find('{{{0}}}resources/{{{0}}}resource'.format(namespace)) schemaversion = root.find('{{{0}}}metadata/{{{0}}}schemaversion'.format(namespace)) else: resource = root.find('resources/resource') schemaversion = root.find('metadata/schemaversion') if resource: path_index_page = resource.get('href') if (not schemaversion is None) and (re.match('^1.2$', schemaversion.text) is None): self.version_scorm = 'SCORM_2004' self.scorm_file = os.path.join(settings.PROFILE_IMAGE_BACKEND['options']['base_url'], '{}/{}'.format(self.location.block_id, path_index_page))
def process_file(self, file_name): data = self.read_file(file_name, self.encoding) data = self.preprocess_data(data) try: stream = IO_Stream(bytearray("\n".join(data), encoding="utf-8")) self.tree = ET.iterparse(stream) if self._strip_namespace: for _, element in self.tree: element.tag = element.tag.rpartition("}")[-1] except Exception as e: print(self._current_file) print_error_context(str(e), "\n".join(data).split("\n")) raise e self.process_tree(self.tree)
def __xml_iter(file, tag): """ :param file: xml ????. :param tag: ???????? ???. :return: ??? ???????? ? ????????? ?????? ? xml. """ return (elem for event, elem in etree.iterparse(file, events=['end']) if event == 'end' and elem.tag == tag)
def parse_and_remove(filename, path): path_parts = path.split('/') doc = iterparse(filename, ('start', 'end')) # Skip the root element next(doc) tag_stack = [] elem_stack = [] for event, elem in doc: if event == 'start': tag_stack.append(elem.tag) elem_stack.append(elem) elif event == 'end': if tag_stack == path_parts: yield elem elem_stack[-2].remove(elem) try: tag_stack.pop() elem_stack.pop() except IndexError: pass # Find zip code with most potholes
def iterators(): """ Test iterators. >>> e = ET.XML("<html><body>this is a <i>paragraph</i>.</body>..</html>") >>> summarize_list(e.iter()) ['html', 'body', 'i'] >>> summarize_list(e.find("body").iter()) ['body', 'i'] >>> summarize(next(e.iter())) 'html' >>> "".join(e.itertext()) 'this is a paragraph...' >>> "".join(e.find("body").itertext()) 'this is a paragraph.' >>> next(e.itertext()) 'this is a ' Method iterparse should return an iterator. See bug 6472. >>> sourcefile = serialize(e, to_string=False) >>> next(ET.iterparse(sourcefile)) # doctest: +ELLIPSIS ('end', <Element 'i' at 0x...>) >>> tree = ET.ElementTree(None) >>> tree.iter() Traceback (most recent call last): AttributeError: 'NoneType' object has no attribute 'iter' """
def getDataFromExternal2(self, xmltvFile, date, ch_list, progress_callback=None): if xbmcvfs.exists(xmltvFile): f = FileWrapper(xmltvFile) if f: context = ElementTree.iterparse(f, events=("start", "end")) size = f.size return self.parseXMLTV(context, f, size, self.logoFolder, progress_callback)
def ensure_elementtree_imported(verbosity, logfile): global ET, ET_has_iterparse, Element_has_iter if ET is not None: return if "IronPython" in sys.version: import xml.etree.ElementTree as ET #### 2.7.2.1: fails later with #### NotImplementedError: iterparse is not supported on IronPython. (CP #31923) else: try: import xml.etree.cElementTree as ET except ImportError: try: import cElementTree as ET except ImportError: try: import lxml.etree as ET except ImportError: try: import xml.etree.ElementTree as ET except ImportError: try: import elementtree.ElementTree as ET except ImportError: raise Exception("Failed to import an ElementTree implementation") if hasattr(ET, 'iterparse'): _dummy_stream = BYTES_IO(b'') try: ET.iterparse(_dummy_stream) ET_has_iterparse = True except NotImplementedError: pass Element_has_iter = hasattr(ET.ElementTree, 'iter') if verbosity: etree_version = repr([ (item, getattr(ET, item)) for item in ET.__dict__.keys() if item.lower().replace('_', '') == 'version' ]) print(ET.__file__, ET.__name__, etree_version, ET_has_iterparse, file=logfile)
def own_process_stream(self, stream, heading=None): if self.verbosity >= 2 and heading is not None: fprintf(self.logfile, "\n=== %s ===\n", heading) getmethod = self.tag2meth.get row_tag = U_SSML12 + "row" self_do_row = self.do_row for event, elem in ET.iterparse(stream): if elem.tag == row_tag: self_do_row(elem) elem.clear() # destroy all child elements (cells) elif elem.tag == U_SSML12 + "dimension": self.do_dimension(elem) elif elem.tag == U_SSML12 + "mergeCell": self.do_merge_cell(elem) self.finish_off()
def tcx_parser(fh): it = ElementTree.iterparse(fh, events=('start','end')) # look for the start TrainingCenterDatabase tag to fail fast for event, elem in it: if event == 'start' and elem.tag.endswith('}TrainingCenterDatabase'): break else: raise ValueError('Not a tcx file: %s' % fh.name) # do the main parse for event, elem in it: if event == 'end' and elem.tag.endswith('}Trackpoint'): latlon = None elev = np.nan time = None for child in elem: tag_name = child.tag.rsplit('}', 1)[1] if tag_name == 'Time': time = child.text elif tag_name == 'AltitudeMeters': elev = float(child.text) elif tag_name == 'Position': vals = dict((c.tag.rsplit('}', 1)[1], float(c.text)) for c in child) latlon = (vals['LatitudeDegrees'], vals['LongitudeDegrees']) if latlon is not None: yield latlon, time, elev elem.clear()
def detect_xspf_header(data): data = data[0:150] if b'xspf' not in data.lower(): return False try: data = io.BytesIO(data) for event, element in elementtree.iterparse(data, events=(b'start',)): return element.tag.lower() == '{http://xspf.org/ns/0/}playlist' except elementtree.ParseError: pass return False
def detect_asx_header(data: bytes): data = data[0:50] if b'asx' not in data.lower(): return False try: bytesIO = io.BytesIO(data) for event, element in elementtree.iterparse(bytesIO, events=(b'start',)): return element.tag.lower() == 'asx' except elementtree.ParseError: pass return False
def parse_xspf(data: bytes): try: # Last element will be root. element = None for event, element in elementtree.iterparse(io.BytesIO(data)): element.tag = element.tag.lower() # normalize if element is not None: ns = 'http://xspf.org/ns/0/' for track in element.iterfind('{%s}tracklist/{%s}track' % (ns, ns)): yield track.findtext('{%s}location' % ns) except elementtree.ParseError: return
def parse_asx(data): try: # Last element will be root. element = None for event, element in elementtree.iterparse(io.BytesIO(data)): element.tag = element.tag.lower() # normalize if element is not None: for ref in element.findall('entry/ref[@href]'): yield ref.get('href', '').strip() for entry in element.findall('entry[@href]'): yield entry.get('href', '').strip() except elementtree.ParseError: return