我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用lxml.etree.iterparse()。
def iso_info(self, iso): result = dict( product=None, version=None, build=None ) iso = isoparser.parse(iso) content = self._find_iso_content(iso) content = io.BytesIO(content) context = etree.iterparse(content) for action, elem in context: if elem.text: text = elem.text if elem.tag == 'productName': result['product'] = text elif elem.tag == 'version': result['version'] = text elif elem.tag == 'buildNumber': result['build'] = text return result
def download_replication(state): """Downloads replication archive for a given state, and returns a list of changeset data to process.""" changesets = [] url = '{0}/{1:03}/{2:03}/{3:03}.osm.gz'.format(REPLICATION_BASE_URL, int(state / 1000000), int(state / 1000) % 1000, state % 1000) response = urllib2.urlopen(url) data = response.read() gz = gzip.GzipFile(fileobj=StringIO(data)) chdata = {} for event, element in etree.iterparse(gz, events=('start', 'end')): if event == 'start': if element.tag == 'changeset': chdata = {} elif element.tag == 'tag': chdata[element.get('k')] = element.get('v') elif event == 'end' and element.tag == 'changeset': chdata['id'] = int(element.get('id')) # The maximum length of the field is 190 characters due to a MySQL index limitation chdata['user'] = element.get('user')[:190] chdata['uid'] = int(element.get('uid')) chdata['timestamp'] = element.get('created_at') if filter_changeset(chdata): changesets.append(chdata) return changesets
def parse(self, filename): logger.debug("Parsing {}".format(filename)) parser = etree.iterparse( filename, tag='{' + NS['feed'] + '}entry') for event, node in batch(parser, 500, transaction.savepoint): vx = Vulnerability.from_node(node) # We don't use a ZODB set here as we a) won't ever change this # again in the future (we just rebuild the tree) and also I want to # avoid making millions of micro-records. for cpe in vx.affected_products: self.products.setdefault(cpe.product, set()) self.products[cpe.product].add(vx) # We need to explicitly clear this node. iterparse only builds the # tree incrementally but does not remove data that isn't needed any # longer. See # http://www.ibm.com/developerworks/xml/library/x-hiperfparse/ node.clear() while node.getprevious() is not None: del node.getparent()[0]
def unicode_version(self): logger.debug("Get unicode version from meta") # Only parse the "meta" element # Skip comment, as we do not care. context = etree.iterparse(self.source, tag=META_TAG, **self.PARSER_OPTIONS) self._fast_iter(context) unicode_version = self._lgr.metadata.unicode_version self._lgr = None # FD is now potentially at the end of the documents, # set it back to start if hasattr(self.source, "seek"): self.source.seek(0) return unicode_version
def parse_document(self): logger.debug('Start parsing of file: %s', self.filename) # Keep content intact, so do not strip CDATA section # (used in the <meta>/<description> element). # Do not resolve entities. # Skip comment, as we do not care. context = etree.iterparse(self.source, **self.PARSER_OPTIONS) self._fast_iter(context) # FD is now potentially at the end of the documents, # set it back to start if hasattr(self.source, "seek"): self.source.seek(0) return self._lgr
def lxml_trace(data, html=True, **kwargs): """Print out the lxml events that occur during parsing. This lets you see how lxml parses a document when no Beautiful Soup code is running. """ from lxml import etree for event, element in etree.iterparse(StringIO(data), html=html, **kwargs): print(("%s, %4s, %s" % (event, element.tag, element.text)))
def lxml_trace(data, html=True, **kwargs): """Print out the lxml events that occur during parsing. This lets you see how lxml parses a document when no Beautiful Soup code is running. """ from lxml import etree for event, element in etree.iterparse(StringIO(data), html=html, **kwargs): print("%s, %4s, %s" % (event, element.tag, element.text))
def iterparse(self, file): return self.create_fa().iterparse(file, self.validate_dtd) # I need a better name
def handler_parse(self, file, state=None): for x in self.parse(file, state): pass # I plan to implement 'iterparse' as a near copy of 'parse' # but without any references to callbacks
def iterparse(self, file, validate_dtd=False): return self.parse(file, None, validate_dtd)
def test_parse(): import os filename = "/Users/dalke/Music/iTunes/iTunes Music Library.xml" if not os.path.exists(filename): print "Cannot find %r: skipping test" % (filename,) return # Work through callbacks ef = IterParseFilter() def print_info(event, ele, state): d = {} children = iter(ele) for child in children: key = child.text value = children.next().text d[key] = value print "%r is by %r" % (d["Name"], d.get("Artist", "<unknown>")) ele.clear() ef.on_end("/plist/dict/dict/dict", print_info) ef.handler_parse(open(filename)) # Work through iterators ef = IterParseFilter() ef.iter_end("/plist/dict/dict/dict") for (event, ele) in ef.iterparse(open(filename)): d = {} children = iter(ele) for child in children: key = child.text value = children.next().text d[key] = value print "%r is a %r song" % (d["Name"], d.get("Genre", "<unknown>")) ele.clear()
def __init__(self, content_types_file): """Constructor @param content_types_file: a file like object of [Content_Types].xml """ self.overrides = {} # {subpart content type: [xml file, ...], ...} context = etree.iterparse(content_types_file, tag='{%s}Override' % ns.CONTENT_TYPES) for dummy, override in context: key = override.get('ContentType') if self.overrides.has_key(key): self.overrides[key].append(override.get('PartName')) else: self.overrides[key] = [override.get('PartName')] return
def _get_elements(self, fp, tag): ''' Convenience and memory management function that iterates required tags ''' context = iter(ET.iterparse(fp, events=('start', 'end'))) _, root = next(context) # get root element for event, elem in context: if event == 'end' and elem.tag == tag: yield elem root.clear() # preserve memory
def __init__(self, sheet, window=None, namespace_tags=None): self.sheet = sheet self.name = "Unknown" m = ODS_TABLE_NAME.match(self.sheet) if m: self.name = m.groups(0)[0] if not PY2 and isinstance(self.name, bytes): self.name = self.name.decode('utf-8') self.window = window or 1000 # We must wrap the XML fragments in a valid header otherwise iterparse # will explode with certain (undefined) versions of libxml2. The # namespaces are in the ODS file, and change with the libreoffice # version saving it, so get them from the ODS file if possible. The # default namespaces are an option to preserve backwards compatibility # of ODSRowSet. if namespace_tags: self.namespace_tags = namespace_tags else: namespaces = DEFAULT_NAMESPACES ods_header = u"<wrapper {0}>"\ .format(" ".join('xmlns:{0}="{1}"'.format(k, v) for k, v in namespaces.iteritems())).encode('utf-8') ods_footer = u"</wrapper>".encode('utf-8') self.namespace_tags = (ods_header, ods_footer) self._row_matcher = ODS_ROW_MATCH
def raw(self, sample=False): """ Iterate over all rows in this sheet. """ rows = self._row_matcher.findall(self.sheet) for row in rows: row_data = [] block = self.namespace_tags[0] + row + self.namespace_tags[1] partial = io.BytesIO(block) for action, element in etree.iterparse(partial, ('end',)): if element.tag != _tag(NS_OPENDOCUMENT_TABLE, TABLE_CELL): continue cell = _read_cell(element) repeat = element.attrib.get( _tag(NS_OPENDOCUMENT_TABLE, COLUMN_REPEAT)) if repeat: number_of_repeat = int(repeat) row_data += [cell] * number_of_repeat else: row_data.append(cell) del partial yield row_data del rows
def applicationCollectMac(): #Developed the OS X function first because I have a Mac! appArray = [] # Execute system profiler appCollect = Popen (["system_profiler", "-detailLevel", "full", "SPApplicationsDataType", "-xml"], stdout = PIPE).communicate()[0] # appCollect = open("platform_sample_files/osx_sample_system_profiler_output.xml") # Run sample profiler output as the system_profileer command is a little slow xmlApp = appCollect.read() xmlTree = etree.parse(StringIO(xmlApp)) xmlContext = etree.iterparse(StringIO(xmlApp)) xmlRoot = xmlTree.getroot() for eachItem in xmlRoot: # This cascade isn't pretty and needs cleanup! for eachItem in eachItem: for eachItem in eachItem: for eachItem in eachItem: if eachItem.tag == "dict": appDict = {} for eachItem in eachItem: if eachItem.tag == "key": tagKey = eachItem.text else: tagText = eachItem.text try: if tagText and tagKey: appDict[str(tagKey)]= str(tagText) except: pass appArray.append(appDict) return appArray
def process_file(self, file_name): data = self.read_file(file_name, self.encoding) data = self.preprocess_data(data) try: stream = IO_Stream(bytearray("\n".join(data), encoding="utf-8")) self.tree = ET.iterparse(stream) if self._strip_namespace: for _, element in self.tree: element.tag = element.tag.rpartition("}")[-1] except Exception as e: print(self._current_file) print_error_context(str(e), "\n".join(data).split("\n")) raise e self.process_tree(self.tree)
def parse_xmlns(file, ns_map): events = "start", "start-ns" root = None for event, elem in etree.iterparse(file, events): if event == "start-ns": ns_map.append(elem) elif event == "start": if root is None: root = elem return etree.ElementTree(root)
def get_root(self): it = ET.iterparse(self.file) for _, el in it: if '}' in el.tag: el.tag = el.tag.split('}', 1)[1] # strip all namespaces return it.root
def get_tag_attributes(source, tag_name): g = etree.iterparse(source, ('start', 'end')) for event, tag in g: if event == 'start': if xml._local_name(tag) == tag_name: return tag.attrib else: continue else: tag.clear() return None
def iterparse_until(source, target_name, quit_name): g = etree.iterparse(source, ('start', 'end')) for event, tag in g: if event == 'start': if xml._local_name(tag) == quit_name: break else: if xml._local_name(tag) == target_name: yield tag else: tag.clear()
def iter_tagseq(self, text, with_closing=False): if not isinstance(text, bytes): text = text.encode('utf8') for action, e in ET.iterparse(BytesIO(text), events=("start", "end"), tag="*", html=True, no_network=True): if isinstance(e, HtmlComment): continue if action == 'end' and with_closing: yield self.tagseq_close + e.tag else: yield e.tag
def use_iso(self, iso): iso = isoparser.parse(iso) content = self._find_iso_content(iso) content = io.BytesIO(content) context = etree.iterparse(content) for action, elem in context: if elem.text: text = elem.text if elem.tag == 'version': self._values['version'] = text elif elem.tag == 'buildNumber': self._values['build'] = text
def process_notes(): database.connect() if not check_update(): return response = urllib2.urlopen(NOTES_URI) # Parsing bz2 through a temporary file tmpfile = TemporaryFile() while True: chunk = response.read(512*1024) if not chunk: break tmpfile.write(chunk) tmpfile.seek(0) with database.atomic(): with BZ2File(tmpfile) as f: for event, element in etree.iterparse(f): if element.tag == 'note': if len(element) > 0 and element[0].text and '#mapsme' in element[0].text: note_id = element.get('id') try: ch = Change.get(Change.changeset == note_id, Change.action == 'n') if element[-1].get('action') == 'closed' and ch.processed is None: print('Found closed note {0}'.format(note_id)) ch.processed = hour_difference(ch.timestamp, element[-1].get('timestamp')) ch.save() except Change.DoesNotExist: ch = Change() ch.action = 'n' ch.version = '' ch.changeset = note_id ch.user = element[0].get('user') if element[0].get('uid') else 'Anonymous Note' print('Found new note {0} by {1}'.format(note_id, ch.user.encode('utf-8'))) ch.timestamp = datetime.strptime(element[0].get('timestamp'), '%Y-%m-%dT%H:%M:%SZ') if element[-1].get('action') == 'closed' and ch.processed is None: ch.processed = hour_difference(ch.timestamp, element[-1].get('timestamp')) changes = [(element.get('lon'), element.get('lat')), {'note': element[0].text}] ch.changes = json.dumps(changes, ensure_ascii=False) ch.save() element.clear()
def lxml_trace(data, html=True): """Print out the lxml events that occur during parsing. This lets you see how lxml parses a document when no Beautiful Soup code is running. """ from lxml import etree for event, element in etree.iterparse(StringIO(data), html=html): print("%s, %4s, %s" % (event, element.tag, element.text))
def __init__(self, source): for event, element in etree.iterparse(source): self.handle(event, element)
def get_namespace(message): f = BytesIO(message) for _, element in etree.iterparse(f, events=('start',)): tag = etree.QName(element) if tag.localname == 'Document': return tag.namespace