Python xml.etree.ElementTree 模块,iterparse() 实例源码

我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用xml.etree.ElementTree.iterparse()

项目:gml_application_schema_toolbox    作者:BRGM    | 项目源码 | 文件源码
def xml_parse(xml_file):
    """
    Parse an XML file, returns a tree of nodes and a dict of namespaces
    :param xml_file: the input XML file
    :returns: (doc, ns_map)
    """
    root = None
    ns_map = {} # prefix -> ns_uri
    for event, elem in ET.iterparse(xml_file, ['start-ns', 'start', 'end']):
        if event == 'start-ns':
            # elem = (prefix, ns_uri)
            ns_map[elem[0]] = elem[1]
        elif event == 'start':
            if root is None:
                root = elem
    for prefix, uri in ns_map.items():
        ET.register_namespace(prefix, uri)

    return (ET.ElementTree(root), ns_map)
项目:defusedxml    作者:tiran    | 项目源码 | 文件源码
def _get_py3_cls():
    """Python 3.3 hides the pure Python code but defusedxml requires it.

    The code is based on test.support.import_fresh_module().
    """
    pymodname = "xml.etree.ElementTree"
    cmodname = "_elementtree"

    pymod = sys.modules.pop(pymodname, None)
    cmod = sys.modules.pop(cmodname, None)

    sys.modules[cmodname] = None
    pure_pymod = importlib.import_module(pymodname)
    if cmod is not None:
        sys.modules[cmodname] = cmod
    else:
        sys.modules.pop(cmodname)
    sys.modules[pymodname] = pymod

    _XMLParser = pure_pymod.XMLParser
    _iterparse = pure_pymod.iterparse
    ParseError = pure_pymod.ParseError

    return _XMLParser, _iterparse, ParseError
项目:tinysearch    作者:jorendorff    | 项目源码 | 文件源码
def articles():
    n = 0
    with bz2.BZ2File("articles.xml.bz2", 'r') as infile:
        for event, elem in iterparse(infile, events=("start", "end")):
            if event == 'start':
                if elem.tag == '{http://www.mediawiki.org/xml/export-0.10/}mediawiki':
                    root = elem
            elif event == 'end':
                if elem.tag == '{http://www.mediawiki.org/xml/export-0.10/}page':
                    title_elem = elem.find('{http://www.mediawiki.org/xml/export-0.10/}title')
                    if title_elem is None: continue
                    title = title_elem.text
                    if title is None or ':' in title: continue
                    revision = elem.find('{http://www.mediawiki.org/xml/export-0.10/}revision')
                    if revision is None: continue
                    text_elem = revision.find('{http://www.mediawiki.org/xml/export-0.10/}text')
                    if text_elem is None: continue
                    text = text_elem.text
                    if text is None: continue

                    yield Article(n, title, text)
                    n += 1
                    #if title == 'Zhang Heng':
                    #    break
                root.clear()
项目:meter    作者:qianqians    | 项目源码 | 文件源码
def process_stream_iterparse(self, stream, heading=None):
        if self.verbosity >= 2 and heading is not None:
            fprintf(self.logfile, "\n=== %s ===\n", heading)
        si_tag = U_SSML12 + 'si'
        elemno = -1
        sst = self.bk._sharedstrings
        for event, elem in ET.iterparse(stream):
            if elem.tag != si_tag: continue
            elemno = elemno + 1
            if self.verbosity >= 3:
                fprintf(self.logfile, "element #%d\n", elemno)
                self.dump_elem(elem)
            result = get_text_from_si_or_is(self, elem)
            sst.append(result)                
            elem.clear() # destroy all child elements
        if self.verbosity >= 2:
            self.dumpout('Entries in SST: %d', len(sst))
        if self.verbosity >= 3:
            for x, s in enumerate(sst):
                fprintf(self.logfile, "SST x=%d s=%r\n", x, s)
项目:route-plotter    作者:perimosocordiae    | 项目源码 | 文件源码
def gpx_parser(fh):
  it = ElementTree.iterparse(fh, events=('start','end'))
  # look for the start gpx tag to fail fast
  for event, elem in it:
    if event == 'start' and elem.tag.endswith('}gpx'):
      break
  else:
    raise ValueError('Not a gpx file: %s' % fh.name)

  # do the main parse
  for event, elem in it:
    if event == 'end' and elem.tag.endswith('}trkpt'):
      latlon = (float(elem.attrib['lat']),
                float(elem.attrib['lon']))
      elev = np.nan
      time = None
      for child in elem:
        tag_name = child.tag.rsplit('}', 1)[1]
        if tag_name == 'ele':
          elev = float(child.text)
        elif tag_name == 'time':
          time = child.text
      yield latlon, time, elev
      elem.clear()
项目:OneClickDTU    作者:satwikkansal    | 项目源码 | 文件源码
def process_stream_iterparse(self, stream, heading=None):
        if self.verbosity >= 2 and heading is not None:
            fprintf(self.logfile, "\n=== %s ===\n", heading)
        si_tag = U_SSML12 + 'si'
        elemno = -1
        sst = self.bk._sharedstrings
        for event, elem in ET.iterparse(stream):
            if elem.tag != si_tag: continue
            elemno = elemno + 1
            if self.verbosity >= 3:
                fprintf(self.logfile, "element #%d\n", elemno)
                self.dump_elem(elem)
            result = get_text_from_si_or_is(self, elem)
            sst.append(result)                
            elem.clear() # destroy all child elements
        if self.verbosity >= 2:
            self.dumpout('Entries in SST: %d', len(sst))
        if self.verbosity >= 3:
            for x, s in enumerate(sst):
                fprintf(self.logfile, "SST x=%d s=%r\n", x, s)
项目:fxcmminer    作者:JamesKBowler    | 项目源码 | 文件源码
def _xml_to_dict(self, stream):
        """
        Reads the locally saved xml file and converts to a Python
        dictionary.
        """
        print('[%%] Parsing catalogue to Dict')
        catalog = defaultdict(dict)
        for event, symbol in ET.iterparse(XML_FILE):
            if symbol.tag == 'symbol':
                if symbol.get('price-stream') == stream:
                    main_key = symbol.get('name')
                    values = symbol.attrib
                    catalog[main_key]['attribs'] = values
                    catalog[main_key]['time-frames'] = {}
                    child = symbol.getchildren()[0]
                    for tf in child.findall('timeframe'):
                        time_frame = tf.get('name')
                        v = tf.attrib
                        catalog[main_key]['time-frames'][time_frame] = v
            #symbol.clear()
        return dict(catalog)
项目:fusion360-dxf-export    作者:opendesk    | 项目源码 | 文件源码
def process_stream_iterparse(self, stream, heading=None):
        if self.verbosity >= 2 and heading is not None:
            fprintf(self.logfile, "\n=== %s ===\n", heading)
        si_tag = U_SSML12 + 'si'
        elemno = -1
        sst = self.bk._sharedstrings
        for event, elem in ET.iterparse(stream):
            if elem.tag != si_tag: continue
            elemno = elemno + 1
            if self.verbosity >= 3:
                fprintf(self.logfile, "element #%d\n", elemno)
                self.dump_elem(elem)
            result = get_text_from_si_or_is(self, elem)
            sst.append(result)                
            elem.clear() # destroy all child elements
        if self.verbosity >= 2:
            self.dumpout('Entries in SST: %d', len(sst))
        if self.verbosity >= 3:
            for x, s in enumerate(sst):
                fprintf(self.logfile, "SST x=%d s=%r\n", x, s)
项目:InternationalizationScript-iOS    作者:alexfeng    | 项目源码 | 文件源码
def process_stream_iterparse(self, stream, heading=None):
        if self.verbosity >= 2 and heading is not None:
            fprintf(self.logfile, "\n=== %s ===\n", heading)
        si_tag = U_SSML12 + 'si'
        elemno = -1
        sst = self.bk._sharedstrings
        for event, elem in ET.iterparse(stream):
            if elem.tag != si_tag: continue
            elemno = elemno + 1
            if self.verbosity >= 3:
                fprintf(self.logfile, "element #%d\n", elemno)
                self.dump_elem(elem)
            result = get_text_from_si_or_is(self, elem)
            sst.append(result)                
            elem.clear() # destroy all child elements
        if self.verbosity >= 2:
            self.dumpout('Entries in SST: %d', len(sst))
        if self.verbosity >= 3:
            for x, s in enumerate(sst):
                fprintf(self.logfile, "SST x=%d s=%r\n", x, s)
项目:InternationalizationScript-iOS    作者:alexfeng    | 项目源码 | 文件源码
def process_stream_iterparse(self, stream, heading=None):
        if self.verbosity >= 2 and heading is not None:
            fprintf(self.logfile, "\n=== %s ===\n", heading)
        si_tag = U_SSML12 + 'si'
        elemno = -1
        sst = self.bk._sharedstrings
        for event, elem in ET.iterparse(stream):
            if elem.tag != si_tag: continue
            elemno = elemno + 1
            if self.verbosity >= 3:
                fprintf(self.logfile, "element #%d\n", elemno)
                self.dump_elem(elem)
            result = get_text_from_si_or_is(self, elem)
            sst.append(result)                
            elem.clear() # destroy all child elements
        if self.verbosity >= 2:
            self.dumpout('Entries in SST: %d', len(sst))
        if self.verbosity >= 3:
            for x, s in enumerate(sst):
                fprintf(self.logfile, "SST x=%d s=%r\n", x, s)
项目:gml_application_schema_toolbox    作者:BRGM    | 项目源码 | 文件源码
def xml_parse(xml_file):
    """
    Parse an XML file, returns a tree of nodes and a dict of namespaces
    :param xml_file: the input XML file
    :returns: (doc, ns_map)
    """
    root = None
    ns_map = {} # prefix -> ns_uri
    for event, elem in ET.iterparse(xml_file, ['start-ns', 'start', 'end']):
        if event == 'start-ns':
            # elem = (prefix, ns_uri)
            ns_map[elem[0]] = elem[1]
        elif event == 'start':
            if root is None:
                root = elem
    for prefix, uri in ns_map.items():
        ET.register_namespace(prefix, uri)

    return (ET.ElementTree(root), ns_map)
项目:xls2lua-tools    作者:pepsigit    | 项目源码 | 文件源码
def process_stream_iterparse(self, stream, heading=None):
        if self.verbosity >= 2 and heading is not None:
            fprintf(self.logfile, "\n=== %s ===\n", heading)
        si_tag = U_SSML12 + 'si'
        elemno = -1
        sst = self.bk._sharedstrings
        for event, elem in ET.iterparse(stream):
            if elem.tag != si_tag: continue
            elemno = elemno + 1
            if self.verbosity >= 3:
                fprintf(self.logfile, "element #%d\n", elemno)
                self.dump_elem(elem)
            result = get_text_from_si_or_is(self, elem)
            sst.append(result)                
            elem.clear() # destroy all child elements
        if self.verbosity >= 2:
            self.dumpout('Entries in SST: %d', len(sst))
        if self.verbosity >= 3:
            for x, s in enumerate(sst):
                fprintf(self.logfile, "SST x=%d s=%r\n", x, s)
项目:tools    作者:freedict    | 项目源码 | 文件源码
def parse(self):
        # get an iterable from XML
        context = iter(ET.iterparse(self.__input, events=("start", "end")))
        # get the root element
        _event, root = next(context)
        # extract namespace
        end = root.tag.find('}')
        if end > 0:
            self._namespace = root.tag[:end+1]

        for event, elem in context:
            if event == 'start':
                if elem.tag.endswith('body'):
                    break # do not parse body
                else: continue # skip node, not fully populated

            self.handle_tag(elem)
项目:tools    作者:freedict    | 项目源码 | 文件源码
def attach_xml_body(tei_file, xml_entries):
    """Read given TEI XML file until the body tag. From there, insert the given
    entries. The result is a full TEI XML structure."""
    events = ET.iterparse(tei_file, events=["start"])
    root = next(events)[1]
    for _, elem in events:
        if elem.tag == 'body':
            break

    text = next(n for n in root if n.tag.endswith('text'))
    text.clear() # throw away all potential content
    body = ET.SubElement(text, 'body')
    for entry in xml_entries:
        body.append(entry)
    ET.register_namespace('', 'http://www.tei-c.org/ns/1.0')
    return ET.ElementTree(root)
项目:pyopendoc    作者:mbwk    | 项目源码 | 文件源码
def _parse(self, filebytes):
        tf = tempfile.TemporaryFile()
        tf.write(filebytes)
        tf.seek(0)

        events = "start", "start-ns"
        root = None
        ns_map = []

        for event, elem in ET.iterparse(tf, events):
            if event == "start-ns":
                ns_map.append(elem)
            elif event == "start":
                if root is None:
                    root = elem
                for prefix, uri in ns_map:
                    elem.set("{}:{}".format(self.xmlns_str, prefix), uri)
                ns_map = []

        tf.close()
        return ET.ElementTree(root)
项目:xls2lua    作者:jiangzhhhh    | 项目源码 | 文件源码
def process_stream_iterparse(self, stream, heading=None):
        if self.verbosity >= 2 and heading is not None:
            fprintf(self.logfile, "\n=== %s ===\n", heading)
        si_tag = U_SSML12 + 'si'
        elemno = -1
        sst = self.bk._sharedstrings
        for event, elem in ET.iterparse(stream):
            if elem.tag != si_tag: continue
            elemno = elemno + 1
            if self.verbosity >= 3:
                fprintf(self.logfile, "element #%d\n", elemno)
                self.dump_elem(elem)
            result = get_text_from_si_or_is(self, elem)
            sst.append(result)                
            elem.clear() # destroy all child elements
        if self.verbosity >= 2:
            self.dumpout('Entries in SST: %d', len(sst))
        if self.verbosity >= 3:
            for x, s in enumerate(sst):
                fprintf(self.logfile, "SST x=%d s=%r\n", x, s)
项目:multiplierz    作者:BlaisProteomics    | 项目源码 | 文件源码
def mzmlToSqlite(xmlfile, sqlitefile):
    parser = xml.iterparse(xmlfile)

    writeQueue = multiprocessing.Queue()
    writerProc = multiprocessing.Process(target = mzmlToSqlite_writer,
                                         args = (sqlitefile, writeQueue))
    writerProc.start()


    for evt, obj in parser:
        if obj.tag == ns('spectrum'):
            writeQueue.put(('spectrum', readSpectrumXML(obj)))
            obj.clear()
        elif obj.tag == ns('chromatogram'):
            writeQueue.put(('chromatogram', readChromatoXML(obj)))
            obj.clear()

    writeQueue.put(('stop', None))

    writerProc.join()
    return sqlitefile
项目:mes    作者:osess    | 项目源码 | 文件源码
def process_stream_iterparse(self, stream, heading=None):
        if self.verbosity >= 2 and heading is not None:
            fprintf(self.logfile, "\n=== %s ===\n", heading)
        si_tag = U_SSML12 + 'si'
        elemno = -1
        sst = self.bk._sharedstrings
        for event, elem in ET.iterparse(stream):
            if elem.tag != si_tag: continue
            elemno = elemno + 1
            if self.verbosity >= 3:
                fprintf(self.logfile, "element #%d\n", elemno)
                self.dump_elem(elem)
            result = get_text_from_si_or_is(self, elem)
            sst.append(result)                
            elem.clear() # destroy all child elements
        if self.verbosity >= 2:
            self.dumpout('Entries in SST: %d', len(sst))
        if self.verbosity >= 3:
            for x, s in enumerate(sst):
                fprintf(self.logfile, "SST x=%d s=%r\n", x, s)
项目:deeppavlov    作者:deepmipt    | 项目源码 | 文件源码
def clean_dataset(path):
    """Remove duplicates from the dataset and write clean data in .tsv files

    Args:
        path: a path to the dataset
    """

    with open(path, 'r') as labels_file:
        context = ET.iterparse(labels_file, events=("start", "end"))
        # turn it into an iterator
        context = iter(context)
        # get the root element
        event, root = next(context)

        with open(os.path.splitext(path)[0] + '.tsv', 'w') as tsv_file:
            writer = csv.writer(tsv_file, delimiter='\t')

            same_set = set()

            for event, elem in context:
                if event == "end" and elem.tag == "paraphrase":
                    question = []
                    y = None
                    for child in elem.iter():
                        if child.get('name') == 'text_1':
                            question.append(child.text)
                        if child.get('name') == 'text_2':
                            question.append(child.text)
                        if child.get('name') == 'class':
                            y = 1 if int(child.text) >= 0 else 0
                    root.clear()
                    check_string = "\n".join(question)
                    if check_string not in same_set:
                        writer.writerow([y, question[0], question[1]])
                        same_set.add(check_string)
项目:setlr    作者:tetherless-world    | 项目源码 | 文件源码
def iterparse(self, file):
        return self.create_fa().iterparse(file, self.validate_dtd)
    # I need a better name
项目:setlr    作者:tetherless-world    | 项目源码 | 文件源码
def handler_parse(self, file, state=None):
        for x in self.parse(file, state):
            pass

    # I plan to implement 'iterparse' as a near copy of 'parse'
    # but without any references to callbacks
项目:setlr    作者:tetherless-world    | 项目源码 | 文件源码
def iterparse(self, file, validate_dtd=False):
        return self.parse(file, None, validate_dtd)
项目:setlr    作者:tetherless-world    | 项目源码 | 文件源码
def test_parse():
    import os
    filename = "/Users/dalke/Music/iTunes/iTunes Music Library.xml"
    if not os.path.exists(filename):
        print "Cannot find %r: skipping test" % (filename,)
        return

    # Work through callbacks
    ef = IterParseFilter()
    def print_info(event, ele, state):
        d = {}
        children = iter(ele)
        for child in children:
            key = child.text
            value = children.next().text
            d[key] = value
        print "%r is by %r" % (d["Name"], d.get("Artist", "<unknown>"))
        ele.clear()

    ef.on_end("/plist/dict/dict/dict", print_info)
    ef.handler_parse(open(filename))

    # Work through iterators
    ef = IterParseFilter()
    ef.iter_end("/plist/dict/dict/dict")
    for (event, ele) in ef.iterparse(open(filename)):
        d = {}
        children = iter(ele)
        for child in children:
            key = child.text
            value = children.next().text
            d[key] = value
        print "%r is a %r song" % (d["Name"], d.get("Genre", "<unknown>"))
        ele.clear()
项目:retrieve-and-rank-tuning    作者:rchaks    | 项目源码 | 文件源码
def document_corpus_as_iterable(corpus):
    stats = defaultdict(int)
    with smart_file_open(corpus) as infile:
        LOGGER.info("Loading documents from solr xml file: %s" % corpus)
        # reader = UnicodeRecoder(infile, encoding='utf-8')
        for event, element in ET.iterparse(infile):
            if event == 'end' and element.tag == 'doc':
                stats['num_xml_entries'] += 1
                yield _parse_doc_elements(element)
项目:AlexaPi    作者:alexa-pi    | 项目源码 | 文件源码
def parse_new_asx(data):
    # Copied from mopidy.audio.playlists
    try:
        for _, element in elementtree.iterparse(data):
            element.tag = element.tag.lower()  # normalize
            for ref in element.findall('entry/ref[@href]'):
                yield fix_asf_uri(ref.get('href', '').strip())

            for entry in element.findall('entry[@href]'):
                yield fix_asf_uri(entry.get('href', '').strip())
    except elementtree.ParseError:
        return
项目:ns3-rdma    作者:bobzhuyb    | 项目源码 | 文件源码
def main(argv):
    file_obj = open(argv[1])
    print "Reading XML file ",

    sys.stdout.flush()        
    level = 0
    sim_list = []
    for event, elem in ElementTree.iterparse(file_obj, events=("start", "end")):
        if event == "start":
            level += 1
        if event == "end":
            level -= 1
            if level == 0 and elem.tag == 'FlowMonitor':
                sim = Simulation(elem)
                sim_list.append(sim)
                elem.clear() # won't need this any more
                sys.stdout.write(".")
                sys.stdout.flush()
    print " done."


    for sim in sim_list:
        for flow in sim.flows:
            t = flow.fiveTuple
            proto = {6: 'TCP', 17: 'UDP'} [t.protocol]
            print "FlowID: %i (%s %s/%s --> %s/%i)" % \
                (flow.flowId, proto, t.sourceAddress, t.sourcePort, t.destinationAddress, t.destinationPort)
            print "\tTX bitrate: %.2f kbit/s" % (flow.txBitrate*1e-3,)
            print "\tRX bitrate: %.2f kbit/s" % (flow.rxBitrate*1e-3,)
            print "\tMean Delay: %.2f ms" % (flow.delayMean*1e3,)
            print "\tPacket Loss Ratio: %.2f %%" % (flow.packetLossRatio*100)
项目:gennotes    作者:madprime    | 项目源码 | 文件源码
def _get_elements(self, fp, tag):
        '''
            Convenience and memory management function
            that iterates required tags
        '''
        context = iter(ET.iterparse(fp, events=('start', 'end')))
        _, root = next(context)  # get root element
        for event, elem in context:
            if event == 'end' and elem.tag == tag:
                yield elem
                root.clear()  # preserve memory
项目:dpla-service-hub    作者:KnowledgeLinks    | 项目源码 | 文件源码
def iterate_dc_xml(**kwargs):
    from bibcat.ingesters.ingester import new_graph
    import xml.etree.ElementTree as etree
    filepath = kwargs.get("in_file")
    ingester = kwargs.get("ingester")
    shard_size = kwargs.get("shard_size", -1)
    output_dir = kwargs.get("output_dir", 
        os.path.abspath(os.path.join(PROJECT_BASE, "output")))
    start = datetime.datetime.utcnow()
    click.echo("Starting DC XML at {} for records at {}".format(
        start,
        filepath))
    count = 0
    shard_template = "dc-{}k-{}k.ttl"
    if shard_size is not None and shard_size > 0:
        shard_name = shard_template.format(count, shard_size)
    shard_graph = new_graph()
    for event, elem in etree.iterparse(filepath):
        if event.startswith('end') and \
           elem.tag.endswith("Description"):
            ingester.transform(etree.tostring(elem))
            shard_graph += ingester.graph
            if not count%10 and count > 0:
                click.echo(".", nl=False)
                #! DEBUG code
                with open(os.path.join(output_dir, "dpl-dc-test.ttl"), "wb+") as fo:
                    fo.write(shard_graph.serialize(format='turtle'))
                break
            if not count%100:
                click.echo(count, nl=False)
            if shard_size is not None and shard_size > 0 and not count%shard_size:
                with open(os.path.join(output_dir, shard_name), 'wb+') as fo:
                    fo.write(shard_graph.serialize(format='turtle'))
                shard_graph = new_graph()
                shard_name = shard_template.format(count, count+shard_size)
            count += 1
    end = datetime.datetime.utcnow()
    click.echo("Finished DC ingestion at {} total time of {} mins for {}".format(
        end,
        (end-start).seconds / 60.0,
        count))
项目:uac-a-mola    作者:ElevenPaths    | 项目源码 | 文件源码
def parse(self):

        tree = iterparse(self.path)

        file_size = int(os.path.getsize(self.path))

        print "\n[*] PARSING FILE: " \
            + colored(self.path.split("\\")[-1], 'yellow', attrs=['bold'])

        print "[*] FILE SIZE: " + \
            colored("%d MB" % (file_size / 1024 / 1024),
                    'yellow', attrs=['bold'])

        print "[*] BUILDING THE STRUCTURES WILL TAKE SOME TIME"

        try:
            for event, elem in tree:
                operation = elem.find('Operation')
                if elem.tag == 'event' and operation is not None:
                    if ('Reg' in operation.text or 'CreateFile' in operation.text) \
                       and 'HKLM' not in elem.find('Path').text \
                       and 'HKCR' not in elem.find('Path').text \
                       and 'NAME NOT FOUND' in elem.find('Result').text:
                        self.events[operation.text].append(elem)
                    else:
                        elem.clear()

            print colored("[*] PARSING FINISHED CORRECTLY\n",
                          'green', attrs=['bold'])

            return self.events

        except Exception as error:
            print colored("[*] PARSING FAILED", 'red', attrs=['bold'])
            print colored(" => " + str(error), 'red', attrs=['bold'])
项目:nzb-monkey    作者:nzblnk    | 项目源码 | 文件源码
def get_etree_iter(xml):

        return iter(ET.iterparse(io.BytesIO(xml), events=('start', 'end')))
项目:edx_xblock_scorm    作者:raccoongang    | 项目源码 | 文件源码
def set_fields_xblock(self, path_to_file):
        path_index_page = 'index.html'
        try:
            tree = ET.parse('{}/imsmanifest.xml'.format(path_to_file))
        except IOError:
            pass
        else:
            namespace = ''
            for node in [node for _, node in ET.iterparse('{}/imsmanifest.xml'.format(path_to_file), events=['start-ns'])]:
                if node[0] == '':
                    namespace = node[1]
                    break
            root = tree.getroot()

            if namespace:
                resource = root.find('{{{0}}}resources/{{{0}}}resource'.format(namespace))
                schemaversion = root.find('{{{0}}}metadata/{{{0}}}schemaversion'.format(namespace))
            else:
                resource = root.find('resources/resource')
                schemaversion = root.find('metadata/schemaversion')

            if resource:
                path_index_page = resource.get('href')

            if (not schemaversion is None) and (re.match('^1.2$', schemaversion.text) is None):
                self.version_scorm = 'SCORM_2004'

        self.scorm_file = os.path.join(settings.PROFILE_IMAGE_BACKEND['options']['base_url'],
                                       '{}/{}'.format(self.location.block_id, path_index_page))
项目:coquery    作者:gkunter    | 项目源码 | 文件源码
def process_file(self, file_name):
        data = self.read_file(file_name, self.encoding)
        data = self.preprocess_data(data)
        try:
            stream = IO_Stream(bytearray("\n".join(data), encoding="utf-8"))
            self.tree = ET.iterparse(stream)
            if self._strip_namespace:
                for _, element in self.tree:
                    element.tag = element.tag.rpartition("}")[-1]
        except Exception as e:
            print(self._current_file)
            print_error_context(str(e), "\n".join(data).split("\n"))
            raise e
        self.process_tree(self.tree)
项目:rupo    作者:IlyaGusev    | 项目源码 | 文件源码
def __xml_iter(file, tag):
        """
        :param file: xml ????.
        :param tag: ???????? ???.
        :return: ??? ???????? ? ????????? ?????? ? xml.
        """
        return (elem for event, elem in etree.iterparse(file, events=['end']) if event == 'end' and elem.tag == tag)
项目:python-cookbook-3rd    作者:tuanavu    | 项目源码 | 文件源码
def parse_and_remove(filename, path):
    path_parts = path.split('/')
    doc = iterparse(filename, ('start', 'end'))
    # Skip the root element
    next(doc)

    tag_stack = []
    elem_stack = []
    for event, elem in doc:
        if event == 'start':
            tag_stack.append(elem.tag)
            elem_stack.append(elem)
        elif event == 'end':
            if tag_stack == path_parts:
                yield elem
                elem_stack[-2].remove(elem)
            try:
                tag_stack.pop()
                elem_stack.pop()
            except IndexError:
                pass

# Find zip code with most potholes
项目:zippy    作者:securesystemslab    | 项目源码 | 文件源码
def iterators():
    """
    Test iterators.

    >>> e = ET.XML("<html><body>this is a <i>paragraph</i>.</body>..</html>")
    >>> summarize_list(e.iter())
    ['html', 'body', 'i']
    >>> summarize_list(e.find("body").iter())
    ['body', 'i']
    >>> summarize(next(e.iter()))
    'html'
    >>> "".join(e.itertext())
    'this is a paragraph...'
    >>> "".join(e.find("body").itertext())
    'this is a paragraph.'
    >>> next(e.itertext())
    'this is a '

    Method iterparse should return an iterator. See bug 6472.

    >>> sourcefile = serialize(e, to_string=False)
    >>> next(ET.iterparse(sourcefile))  # doctest: +ELLIPSIS
    ('end', <Element 'i' at 0x...>)

    >>> tree = ET.ElementTree(None)
    >>> tree.iter()
    Traceback (most recent call last):
    AttributeError: 'NoneType' object has no attribute 'iter'
    """
项目:oil    作者:oilshell    | 项目源码 | 文件源码
def iterators():
    """
    Test iterators.

    >>> e = ET.XML("<html><body>this is a <i>paragraph</i>.</body>..</html>")
    >>> summarize_list(e.iter())
    ['html', 'body', 'i']
    >>> summarize_list(e.find("body").iter())
    ['body', 'i']
    >>> summarize(next(e.iter()))
    'html'
    >>> "".join(e.itertext())
    'this is a paragraph...'
    >>> "".join(e.find("body").itertext())
    'this is a paragraph.'
    >>> next(e.itertext())
    'this is a '

    Method iterparse should return an iterator. See bug 6472.

    >>> sourcefile = serialize(e, to_string=False)
    >>> next(ET.iterparse(sourcefile))  # doctest: +ELLIPSIS
    ('end', <Element 'i' at 0x...>)

    >>> tree = ET.ElementTree(None)
    >>> tree.iter()
    Traceback (most recent call last):
    AttributeError: 'NoneType' object has no attribute 'iter'
    """
项目:python2-tracer    作者:extremecoders-re    | 项目源码 | 文件源码
def iterators():
    """
    Test iterators.

    >>> e = ET.XML("<html><body>this is a <i>paragraph</i>.</body>..</html>")
    >>> summarize_list(e.iter())
    ['html', 'body', 'i']
    >>> summarize_list(e.find("body").iter())
    ['body', 'i']
    >>> summarize(next(e.iter()))
    'html'
    >>> "".join(e.itertext())
    'this is a paragraph...'
    >>> "".join(e.find("body").itertext())
    'this is a paragraph.'
    >>> next(e.itertext())
    'this is a '

    Method iterparse should return an iterator. See bug 6472.

    >>> sourcefile = serialize(e, to_string=False)
    >>> next(ET.iterparse(sourcefile))  # doctest: +ELLIPSIS
    ('end', <Element 'i' at 0x...>)

    >>> tree = ET.ElementTree(None)
    >>> tree.iter()
    Traceback (most recent call last):
    AttributeError: 'NoneType' object has no attribute 'iter'
    """
项目:script.tvguide.fullscreen    作者:primaeval    | 项目源码 | 文件源码
def getDataFromExternal2(self, xmltvFile, date, ch_list, progress_callback=None):
        if xbmcvfs.exists(xmltvFile):
            f = FileWrapper(xmltvFile)
            if f:
                context = ElementTree.iterparse(f, events=("start", "end"))
                size = f.size
                return self.parseXMLTV(context, f, size, self.logoFolder, progress_callback)
项目:meter    作者:qianqians    | 项目源码 | 文件源码
def ensure_elementtree_imported(verbosity, logfile):
    global ET, ET_has_iterparse, Element_has_iter
    if ET is not None:
        return
    if "IronPython" in sys.version:
        import xml.etree.ElementTree as ET
        #### 2.7.2.1: fails later with 
        #### NotImplementedError: iterparse is not supported on IronPython. (CP #31923)
    else:
        try: import xml.etree.cElementTree as ET
        except ImportError:
            try: import cElementTree as ET
            except ImportError:
                try: import lxml.etree as ET
                except ImportError:
                    try: import xml.etree.ElementTree as ET
                    except ImportError:
                        try: import elementtree.ElementTree as ET
                        except ImportError:
                            raise Exception("Failed to import an ElementTree implementation")
    if hasattr(ET, 'iterparse'):
        _dummy_stream = BYTES_IO(b'')
        try:
            ET.iterparse(_dummy_stream)
            ET_has_iterparse = True
        except NotImplementedError:
            pass
    Element_has_iter = hasattr(ET.ElementTree, 'iter')
    if verbosity:
        etree_version = repr([
            (item, getattr(ET, item))
            for item in ET.__dict__.keys()
            if item.lower().replace('_', '') == 'version'
            ])
        print(ET.__file__, ET.__name__, etree_version, ET_has_iterparse, file=logfile)
项目:meter    作者:qianqians    | 项目源码 | 文件源码
def own_process_stream(self, stream, heading=None):
        if self.verbosity >= 2 and heading is not None:
            fprintf(self.logfile, "\n=== %s ===\n", heading)
        getmethod = self.tag2meth.get
        row_tag = U_SSML12 + "row"
        self_do_row = self.do_row
        for event, elem in ET.iterparse(stream):
            if elem.tag == row_tag:
                self_do_row(elem)
                elem.clear() # destroy all child elements (cells)
            elif elem.tag == U_SSML12 + "dimension":
                self.do_dimension(elem)
            elif elem.tag == U_SSML12 + "mergeCell":
                self.do_merge_cell(elem)
        self.finish_off()
项目:route-plotter    作者:perimosocordiae    | 项目源码 | 文件源码
def tcx_parser(fh):
  it = ElementTree.iterparse(fh, events=('start','end'))
  # look for the start TrainingCenterDatabase tag to fail fast
  for event, elem in it:
    if event == 'start' and elem.tag.endswith('}TrainingCenterDatabase'):
      break
  else:
    raise ValueError('Not a tcx file: %s' % fh.name)

  # do the main parse
  for event, elem in it:
    if event == 'end' and elem.tag.endswith('}Trackpoint'):
      latlon = None
      elev = np.nan
      time = None
      for child in elem:
        tag_name = child.tag.rsplit('}', 1)[1]
        if tag_name == 'Time':
          time = child.text
        elif tag_name == 'AltitudeMeters':
          elev = float(child.text)
        elif tag_name == 'Position':
          vals = dict((c.tag.rsplit('}', 1)[1], float(c.text)) for c in child)
          latlon = (vals['LatitudeDegrees'], vals['LongitudeDegrees'])
      if latlon is not None:
        yield latlon, time, elev
      elem.clear()
项目:OneClickDTU    作者:satwikkansal    | 项目源码 | 文件源码
def ensure_elementtree_imported(verbosity, logfile):
    global ET, ET_has_iterparse, Element_has_iter
    if ET is not None:
        return
    if "IronPython" in sys.version:
        import xml.etree.ElementTree as ET
        #### 2.7.2.1: fails later with 
        #### NotImplementedError: iterparse is not supported on IronPython. (CP #31923)
    else:
        try: import xml.etree.cElementTree as ET
        except ImportError:
            try: import cElementTree as ET
            except ImportError:
                try: import lxml.etree as ET
                except ImportError:
                    try: import xml.etree.ElementTree as ET
                    except ImportError:
                        try: import elementtree.ElementTree as ET
                        except ImportError:
                            raise Exception("Failed to import an ElementTree implementation")
    if hasattr(ET, 'iterparse'):
        _dummy_stream = BYTES_IO(b'')
        try:
            ET.iterparse(_dummy_stream)
            ET_has_iterparse = True
        except NotImplementedError:
            pass
    Element_has_iter = hasattr(ET.ElementTree, 'iter')
    if verbosity:
        etree_version = repr([
            (item, getattr(ET, item))
            for item in ET.__dict__.keys()
            if item.lower().replace('_', '') == 'version'
            ])
        print(ET.__file__, ET.__name__, etree_version, ET_has_iterparse, file=logfile)
项目:OneClickDTU    作者:satwikkansal    | 项目源码 | 文件源码
def own_process_stream(self, stream, heading=None):
        if self.verbosity >= 2 and heading is not None:
            fprintf(self.logfile, "\n=== %s ===\n", heading)
        getmethod = self.tag2meth.get
        row_tag = U_SSML12 + "row"
        self_do_row = self.do_row
        for event, elem in ET.iterparse(stream):
            if elem.tag == row_tag:
                self_do_row(elem)
                elem.clear() # destroy all child elements (cells)
            elif elem.tag == U_SSML12 + "dimension":
                self.do_dimension(elem)
            elif elem.tag == U_SSML12 + "mergeCell":
                self.do_merge_cell(elem)
        self.finish_off()
项目:fusion360-dxf-export    作者:opendesk    | 项目源码 | 文件源码
def ensure_elementtree_imported(verbosity, logfile):
    global ET, ET_has_iterparse, Element_has_iter
    if ET is not None:
        return
    if "IronPython" in sys.version:
        import xml.etree.ElementTree as ET
        #### 2.7.2.1: fails later with 
        #### NotImplementedError: iterparse is not supported on IronPython. (CP #31923)
    else:
        try: import xml.etree.cElementTree as ET
        except ImportError:
            try: import cElementTree as ET
            except ImportError:
                try: import lxml.etree as ET
                except ImportError:
                    try: import xml.etree.ElementTree as ET
                    except ImportError:
                        try: import elementtree.ElementTree as ET
                        except ImportError:
                            raise Exception("Failed to import an ElementTree implementation")
    if hasattr(ET, 'iterparse'):
        _dummy_stream = BYTES_IO(b'')
        try:
            ET.iterparse(_dummy_stream)
            ET_has_iterparse = True
        except NotImplementedError:
            pass
    Element_has_iter = hasattr(ET.ElementTree, 'iter')
    if verbosity:
        etree_version = repr([
            (item, getattr(ET, item))
            for item in ET.__dict__.keys()
            if item.lower().replace('_', '') == 'version'
            ])
        print(ET.__file__, ET.__name__, etree_version, ET_has_iterparse, file=logfile)
项目:fusion360-dxf-export    作者:opendesk    | 项目源码 | 文件源码
def own_process_stream(self, stream, heading=None):
        if self.verbosity >= 2 and heading is not None:
            fprintf(self.logfile, "\n=== %s ===\n", heading)
        getmethod = self.tag2meth.get
        row_tag = U_SSML12 + "row"
        self_do_row = self.do_row
        for event, elem in ET.iterparse(stream):
            if elem.tag == row_tag:
                self_do_row(elem)
                elem.clear() # destroy all child elements (cells)
            elif elem.tag == U_SSML12 + "dimension":
                self.do_dimension(elem)
            elif elem.tag == U_SSML12 + "mergeCell":
                self.do_merge_cell(elem)
        self.finish_off()
项目:aio    作者:pavhofman    | 项目源码 | 文件源码
def detect_xspf_header(data):
    data = data[0:150]
    if b'xspf' not in data.lower():
        return False

    try:
        data = io.BytesIO(data)
        for event, element in elementtree.iterparse(data, events=(b'start',)):
            return element.tag.lower() == '{http://xspf.org/ns/0/}playlist'
    except elementtree.ParseError:
        pass
    return False
项目:aio    作者:pavhofman    | 项目源码 | 文件源码
def detect_asx_header(data: bytes):
    data = data[0:50]
    if b'asx' not in data.lower():
        return False

    try:
        bytesIO = io.BytesIO(data)
        for event, element in elementtree.iterparse(bytesIO, events=(b'start',)):
            return element.tag.lower() == 'asx'
    except elementtree.ParseError:
        pass
    return False
项目:aio    作者:pavhofman    | 项目源码 | 文件源码
def parse_xspf(data: bytes):
    try:
        # Last element will be root.
        element = None
        for event, element in elementtree.iterparse(io.BytesIO(data)):
            element.tag = element.tag.lower()  # normalize
        if element is not None:
            ns = 'http://xspf.org/ns/0/'
            for track in element.iterfind('{%s}tracklist/{%s}track' % (ns, ns)):
                yield track.findtext('{%s}location' % ns)
    except elementtree.ParseError:
        return
项目:aio    作者:pavhofman    | 项目源码 | 文件源码
def parse_asx(data):
    try:
        # Last element will be root.
        element = None
        for event, element in elementtree.iterparse(io.BytesIO(data)):
            element.tag = element.tag.lower()  # normalize

        if element is not None:
            for ref in element.findall('entry/ref[@href]'):
                yield ref.get('href', '').strip()

            for entry in element.findall('entry[@href]'):
                yield entry.get('href', '').strip()
    except elementtree.ParseError:
        return
项目:InternationalizationScript-iOS    作者:alexfeng    | 项目源码 | 文件源码
def ensure_elementtree_imported(verbosity, logfile):
    global ET, ET_has_iterparse, Element_has_iter
    if ET is not None:
        return
    if "IronPython" in sys.version:
        import xml.etree.ElementTree as ET
        #### 2.7.2.1: fails later with 
        #### NotImplementedError: iterparse is not supported on IronPython. (CP #31923)
    else:
        try: import xml.etree.cElementTree as ET
        except ImportError:
            try: import cElementTree as ET
            except ImportError:
                try: import lxml.etree as ET
                except ImportError:
                    try: import xml.etree.ElementTree as ET
                    except ImportError:
                        try: import elementtree.ElementTree as ET
                        except ImportError:
                            raise Exception("Failed to import an ElementTree implementation")
    if hasattr(ET, 'iterparse'):
        _dummy_stream = BYTES_IO(b'')
        try:
            ET.iterparse(_dummy_stream)
            ET_has_iterparse = True
        except NotImplementedError:
            pass
    Element_has_iter = hasattr(ET.ElementTree, 'iter')
    if verbosity:
        etree_version = repr([
            (item, getattr(ET, item))
            for item in ET.__dict__.keys()
            if item.lower().replace('_', '') == 'version'
            ])
        print(ET.__file__, ET.__name__, etree_version, ET_has_iterparse, file=logfile)