Python xml.etree.ElementTree 模块,fromstring() 实例源码

我们从Python开源项目中,提取了以下48个代码示例,用于说明如何使用xml.etree.ElementTree.fromstring()

项目:ironic-staging-drivers    作者:openstack    | 项目源码 | 文件源码
def _set_boot_device(conn, domain, device):
    """Set the boot device.

    :param conn: active libvirt connection.
    :param domain: libvirt domain object.
    :raises: LibvirtError if failed update domain xml.
    """

    parsed = ET.fromstring(domain.XMLDesc())
    os = parsed.find('os')
    boot_list = os.findall('boot')

    # Clear boot list
    for boot_el in boot_list:
        os.remove(boot_el)

    boot_el = ET.SubElement(os, 'boot')
    boot_el.set('dev', device)

    try:
        conn.defineXML(ET.tostring(parsed))
    except libvirt.libvirtError as e:
        raise isd_exc.LibvirtError(err=e)
项目:okta-awscli    作者:jmhale    | 项目源码 | 文件源码
def choose_aws_role(assertion):
        """ Choose AWS role from SAML assertion """
        aws_attribute_role = 'https://aws.amazon.com/SAML/Attributes/Role'
        attribute_value_urn = '{urn:oasis:names:tc:SAML:2.0:assertion}AttributeValue'
        roles = []
        role_tuple = namedtuple("RoleTuple", ["principal_arn", "role_arn"])
        root = ET.fromstring(base64.b64decode(assertion))
        for saml2attribute in root.iter('{urn:oasis:names:tc:SAML:2.0:assertion}Attribute'):
            if saml2attribute.get('Name') == aws_attribute_role:
                for saml2attributevalue in saml2attribute.iter(attribute_value_urn):
                    roles.append(role_tuple(*saml2attributevalue.text.split(',')))

        for index, role in enumerate(roles):
            role_name = role.role_arn.split('/')[1]
            print("%d: %s" % (index+1, role_name))
        role_choice = input('Please select the AWS role: ')-1
        return roles[role_choice]
项目:scikit-dataaccess    作者:MITHaystack    | 项目源码 | 文件源码
def getFileURLs(file_ids):
    '''
    Retrieve the ftp location for a list of file IDs

    @param file_ids: List of file IDs
    @return List of ftp locations
    '''



    info_url='http://modwebsrv.modaps.eosdis.nasa.gov/axis2/services/MODAPSservices/getFileUrls?fileIds='

    for file_id in file_ids:
        info_url += str(file_id) + ','

    info_url = info_url[:-1]


    url = urlopen(info_url)
    tree = ET.fromstring(url.read().decode())
    url.close()

    return [ child.text for child in tree ]
项目:bob    作者:BobBuildTool    | 项目源码 | 文件源码
def checkCobertura(config, checkoutSteps, buildSteps, packageSteps, **kwargs):
    found = False
    for s in checkoutSteps:
        if s.getPackage().getName().endswith("unittests"): found = True
    for s in buildSteps:
        if s.getPackage().getName().endswith("unittests"): found = True
    for s in packageSteps:
        if s.getPackage().getName().endswith("unittests"): found = True

    if found:
        root = ElementTree.fromstring(config)
        publishers = root.find("publishers")
        if publishers.find("hudson.plugins.cobertura.CoberturaPublisher") is None:
            publishers.append(PLUGIN)
            config = ElementTree.tostring(root, encoding="UTF-8")

    return config
项目:bob    作者:BobBuildTool    | 项目源码 | 文件源码
def _scanDir(self, workspace, dir):
        self.__dir = dir
        try:
            info = ElementTree.fromstring(subprocess.check_output(
                ["svn", "info", "--xml", dir],
                cwd=workspace, universal_newlines=True))
            self.__url = info.find('entry/url').text
            self.__revision = int(info.find('entry').get('revision'))
            self.__repoRoot = info.find('entry/repository/root').text
            self.__repoUuid = info.find('entry/repository/uuid').text

            status = subprocess.check_output(["svn", "status", dir],
                cwd=workspace, universal_newlines=True)
            self.__dirty = status != ""
        except subprocess.CalledProcessError as e:
            raise BuildError("Svn audit failed: " + str(e))
        except OSError as e:
            raise BuildError("Error calling git: " + str(e))
        except ElementTree.ParseError as e:
            raise BuildError("Invalid XML received from svn")
项目:bob    作者:BobBuildTool    | 项目源码 | 文件源码
def testSetGitShallowClone(self):
        self.executeBobJenkinsCmd("set-options -o scm.git.shallow=42 myTestJenkins")
        self.executeBobJenkinsCmd("push -q myTestJenkins")
        send = self.jenkinsMock.getServerData()
        config = ElementTree.fromstring(send[0][1])
        for clone in config.iter('hudson.plugins.git.extensions.impl.CloneOption'):
            found = 0
            for a in clone.getiterator():
                if a.tag == 'shallow':
                    assert(a.text == 'true')
                    found += 1
                if a.tag == 'depth':
                    assert(a.text == '42')
                    found += 1
            assert(found == 2)
        self.executeBobJenkinsCmd("set-options -o scm.git.shallow=-1 myTestJenkins")
        with self.assertRaises(Exception) as c:
            self.executeBobJenkinsCmd("push -q myTestJenkins")
        assert(type(c.exception) == BuildError)
项目:bob    作者:BobBuildTool    | 项目源码 | 文件源码
def testShortDescription(self):
        self.createComplexRecipes()
        self.executeBobJenkinsCmd("set-options --shortdescription myTestJenkinsComplex")
        self.executeBobJenkinsCmd("push -q myTestJenkinsComplex")
        send = self.jenkinsMock.getServerData()
        result_set = set()
        try:
            for i in send:
                if i[0] == '/createItem?name=dependency-two':
                    for items in ElementTree.fromstring(i[1]).iter('description'):

                        for line in [x for x in items.itertext()][0].splitlines():
                            if line.startswith('<li>') and line.endswith('</li>'):
                                result_set.add(line[4:-5])
        except:
            print("Malformed Data Recieved")

        self.assertEqual(result_set, {'root/dependency-one/dependency-two'})
项目:weichatpay    作者:river291    | 项目源码 | 文件源码
def FromXml(self, xml):
        if not xml:
            raise WxPayException("xml?????")

        # ??ElementTree
        try:
            import xml.etree.cElementTree as ET
        except ImportError:
            import xml.etree.ElementTree as ET
        # ??xml
        xml2dict = {}

        xml_tree = ET.fromstring(xml)
        for child_node in xml_tree:
            xml2dict[child_node.tag] = child_node.text

        self.values = xml2dict
        return self.values

    # ?????????url??
项目:SoCFoundationFlow    作者:mattaw    | 项目源码 | 文件源码
def _get_defects(self, xml_string):
        '''evaluate the xml string returned by cppcheck (on sdterr) and use it to create
        a list of defects.
        '''
        defects = []
        for error in ElementTree.fromstring(xml_string).iter('error'):
            defect = {}
            defect['id'] = error.get('id')
            defect['severity'] = error.get('severity')
            defect['msg'] = str(error.get('msg')).replace('<','&lt;')
            defect['verbose'] = error.get('verbose')
            for location in error.findall('location'):
                defect['file'] = location.get('file')
                defect['line'] = str(int(location.get('line')) - 1)
            defects.append(defect)
        return defects
项目:SoCFoundationFlow    作者:mattaw    | 项目源码 | 文件源码
def _create_html_index(self, files):
        name = self.generator.get_name()
        root = ElementTree.fromstring(CPPCHECK_HTML_FILE)
        title = root.find('head/title')
        title.text = 'cppcheck - report - %s' % name

        body = root.find('body')
        for div in body.findall('div'):
            if div.get('id') == 'page':
                page = div
                break
        for div in page.findall('div'):
            if div.get('id') == 'header':
                h1 = div.find('h1')
                h1.text = 'cppcheck report - %s' % name
            if div.get('id') == 'content':
                content = div
                self._create_html_table(content, files)

        s = ElementTree.tostring(root, method='html')
        s = CCPCHECK_HTML_TYPE + s
        node = self.generator.path.get_bld().find_or_declare('cppcheck/index.html')
        node.write(s)
        return node
项目:SoCFoundationFlow    作者:mattaw    | 项目源码 | 文件源码
def _create_html_table(self, content, files):
        table = ElementTree.fromstring(CPPCHECK_HTML_TABLE)
        for name, val in files.items():
            f = val['htmlfile']
            s = '<tr><td colspan="4"><a href="%s">%s</a></td></tr>\n' % (f,name)
            row = ElementTree.fromstring(s)
            table.append(row)

            errors = sorted(val['errors'], key=lambda e: int(e['line']) if e.has_key('line') else sys.maxint)
            for e in errors:
                if not e.has_key('line'):
                    s = '<tr><td></td><td>%s</td><td>%s</td><td>%s</td></tr>\n' % (e['id'], e['severity'], e['msg'])
                else:
                    attr = ''
                    if e['severity'] == 'error':
                        attr = 'class="error"'
                    s = '<tr><td><a href="%s#line-%s">%s</a></td>' % (f, e['line'], e['line'])
                    s+= '<td>%s</td><td>%s</td><td %s>%s</td></tr>\n' % (e['id'], e['severity'], attr, e['msg'])
                row = ElementTree.fromstring(s)
                table.append(row)
        content.append(table)
项目:SoCFoundationFlow    作者:mattaw    | 项目源码 | 文件源码
def _get_defects(self, xml_string):
        '''evaluate the xml string returned by cppcheck (on sdterr) and use it to create
        a list of defects.
        '''
        defects = []
        for error in ElementTree.fromstring(xml_string).iter('error'):
            defect = {}
            defect['id'] = error.get('id')
            defect['severity'] = error.get('severity')
            defect['msg'] = str(error.get('msg')).replace('<','&lt;')
            defect['verbose'] = error.get('verbose')
            for location in error.findall('location'):
                defect['file'] = location.get('file')
                defect['line'] = str(int(location.get('line')) - 1)
            defects.append(defect)
        return defects
项目:SoCFoundationFlow    作者:mattaw    | 项目源码 | 文件源码
def _create_html_index(self, files):
        name = self.generator.get_name()
        root = ElementTree.fromstring(CPPCHECK_HTML_FILE)
        title = root.find('head/title')
        title.text = 'cppcheck - report - %s' % name

        body = root.find('body')
        for div in body.findall('div'):
            if div.get('id') == 'page':
                page = div
                break
        for div in page.findall('div'):
            if div.get('id') == 'header':
                h1 = div.find('h1')
                h1.text = 'cppcheck report - %s' % name
            if div.get('id') == 'content':
                content = div
                self._create_html_table(content, files)

        s = ElementTree.tostring(root, method='html')
        s = CCPCHECK_HTML_TYPE + s
        node = self.generator.path.get_bld().find_or_declare('cppcheck/index.html')
        node.write(s)
        return node
项目:SoCFoundationFlow    作者:mattaw    | 项目源码 | 文件源码
def _create_html_table(self, content, files):
        table = ElementTree.fromstring(CPPCHECK_HTML_TABLE)
        for name, val in files.items():
            f = val['htmlfile']
            s = '<tr><td colspan="4"><a href="%s">%s</a></td></tr>\n' % (f,name)
            row = ElementTree.fromstring(s)
            table.append(row)

            errors = sorted(val['errors'], key=lambda e: int(e['line']) if e.has_key('line') else sys.maxint)
            for e in errors:
                if not e.has_key('line'):
                    s = '<tr><td></td><td>%s</td><td>%s</td><td>%s</td></tr>\n' % (e['id'], e['severity'], e['msg'])
                else:
                    attr = ''
                    if e['severity'] == 'error':
                        attr = 'class="error"'
                    s = '<tr><td><a href="%s#line-%s">%s</a></td>' % (f, e['line'], e['line'])
                    s+= '<td>%s</td><td>%s</td><td %s>%s</td></tr>\n' % (e['id'], e['severity'], attr, e['msg'])
                row = ElementTree.fromstring(s)
                table.append(row)
        content.append(table)
项目:SoCFoundationFlow    作者:mattaw    | 项目源码 | 文件源码
def _create_html_index(self, files):
        name = self.generator.get_name()
        root = ElementTree.fromstring(CPPCHECK_HTML_FILE)
        title = root.find('head/title')
        title.text = 'cppcheck - report - %s' % name

        body = root.find('body')
        for div in body.findall('div'):
            if div.get('id') == 'page':
                page = div
                break
        for div in page.findall('div'):
            if div.get('id') == 'header':
                h1 = div.find('h1')
                h1.text = 'cppcheck report - %s' % name
            if div.get('id') == 'content':
                content = div
                self._create_html_table(content, files)

        s = ElementTree.tostring(root, method='html')
        s = CCPCHECK_HTML_TYPE + s
        node = self.generator.path.get_bld().find_or_declare('cppcheck/index.html')
        node.write(s)
        return node
项目:SoCFoundationFlow    作者:mattaw    | 项目源码 | 文件源码
def _create_html_table(self, content, files):
        table = ElementTree.fromstring(CPPCHECK_HTML_TABLE)
        for name, val in files.items():
            f = val['htmlfile']
            s = '<tr><td colspan="4"><a href="%s">%s</a></td></tr>\n' % (f,name)
            row = ElementTree.fromstring(s)
            table.append(row)

            errors = sorted(val['errors'], key=lambda e: int(e['line']) if e.has_key('line') else sys.maxint)
            for e in errors:
                if not e.has_key('line'):
                    s = '<tr><td></td><td>%s</td><td>%s</td><td>%s</td></tr>\n' % (e['id'], e['severity'], e['msg'])
                else:
                    attr = ''
                    if e['severity'] == 'error':
                        attr = 'class="error"'
                    s = '<tr><td><a href="%s#line-%s">%s</a></td>' % (f, e['line'], e['line'])
                    s+= '<td>%s</td><td>%s</td><td %s>%s</td></tr>\n' % (e['id'], e['severity'], attr, e['msg'])
                row = ElementTree.fromstring(s)
                table.append(row)
        content.append(table)
项目:nn4nlp-code    作者:neubig    | 项目源码 | 文件源码
def get_xml_trees(db, host, pid, usernames, graceful=False):
    """ Params: db, host, paragraph id, the list of usernames wanted,
        Optional:
        graceful: True if no excpetions are to be raised
        excpetion raised if a user did not submit an annotation for the passage
        returns a list of xml roots elements
        """
    cur = get_cursor(db, host)
    xmls = []
    for username in usernames:
        username = str(username)  # precaution for cases bad input e.g. 101
        cur_uid = get_uid(db, host, username)
        cur.execute("SELECT xml FROM xmls WHERE paid=" + PLACE_HOLDER +
                    " AND uid=" + PLACE_HOLDER + "ORDER BY ts DESC",
                    (pid, cur_uid))
        raw_xml = cur.fetchone()
        if not raw_xml and not graceful:
            raise Exception("The user " + username +
                            " did not submit an annotation for this passage")
        else:
            xmls.append(fromstring(raw_xml[0]))
    return xmls
项目:nn4nlp-code    作者:neubig    | 项目源码 | 文件源码
def get_by_xid(db, host, xid, graceful=False):
    """Returns the passages that correspond to the xid

        Optional:
        graceful: True if no excpetions are to be raised
        excpetion raised if xid does no exist
    """
    # precaution for bad input e.g. 101->'101'
    xid = str(xid)
    cur = get_cursor(db, host)
    cur.execute("SELECT xml FROM xmls WHERE id" + "=" +
                PLACE_HOLDER, (int(xid),))
    raw_xml = cur.fetchone()
    if not raw_xml and not graceful:
        raise Exception("The xid " + xid + " does not exist")
    elif raw_xml:
        return fromstring(raw_xml[0])
项目:kolla-kubernetes-personal    作者:rthallisey    | 项目源码 | 文件源码
def get_mac_address(conn, domain_name):
    """Get MAC address from domain XML."""
    domain = conn.lookupByName(domain_name)
    domain_xml = domain.XMLDesc()
    domain_tree = etree.fromstring(domain_xml)
    devices = domain_tree.find('devices')
    interfaces = devices.iterfind('interface')

    for interface in interfaces:
        source = interface.find('source')
        if source is None or source.get('network') != 'vagrant-private-dhcp':
            continue
        mac_element = interface.find('mac')
        mac_address = mac_element.get('address')
        return mac_address

    raise NoPrivateDHCPInterfaceException()
项目:wltrace    作者:jhshi    | 项目源码 | 文件源码
def __init__(self, path, *args, **kwargs):
        cls = self.__class__
        super(cls, self).__init__(path, *args, **kwargs)

        while True:
            sess = PeektaggedSectionHeader(self.fh)
            if sess.tag == 'pkts':
                break

            sess.payload = self.fh.read(sess.len)

            if sess.tag == PEEKTAGGED_FILE_MAGIC:
                root = ET.fromstring(sess.payload)
                if root.tag != 'VersionInfo':
                    raise PeektaggedException("Corrupted version info")

                for child in root:
                    setattr(self, child.tag, child.text)

            elif sess.tag == 'sess':
                root = ET.fromstring(sess.payload)
                for child in root:
                    if child.tag == 'PacketCount':
                        setattr(self, 'total_packets', int(child.text))
                        break
项目:GAMADV-XTD    作者:taers232c    | 项目源码 | 文件源码
def CreateClassFromXMLString(target_class, xml_string, string_encoding=None):
  """Creates an instance of the target class from the string contents.

  Args:
    target_class: class The class which will be instantiated and populated
        with the contents of the XML. This class must have a _tag and a
        _namespace class variable.
    xml_string: str A string which contains valid XML. The root element
        of the XML string should match the tag and namespace of the desired
        class.
    string_encoding: str The character encoding which the xml_string should
        be converted to before it is interpreted and translated into
        objects. The default is None in which case the string encoding
        is not changed.

  Returns:
    An instance of the target class with members assigned according to the
    contents of the XML - or None if the root XML tag and namespace did not
    match those of the target class.
  """
  encoding = string_encoding or XML_STRING_ENCODING
  if encoding and isinstance(xml_string, unicode):
    xml_string = xml_string.encode(encoding)
  tree = ElementTree.fromstring(xml_string)
  return _CreateClassFromElementTree(target_class, tree)
项目:GAMADV-XTD    作者:taers232c    | 项目源码 | 文件源码
def parse(xml_string, target_class=None, version=1, encoding=None):
  """Parses the XML string according to the rules for the target_class.

  Args:
    xml_string: str or unicode
    target_class: XmlElement or a subclass. If None is specified, the
        XmlElement class is used.
    version: int (optional) The version of the schema which should be used when
        converting the XML into an object. The default is 1.
    encoding: str (optional) The character encoding of the bytes in the
        xml_string. Default is 'UTF-8'.
  """
  if target_class is None:
    target_class = XmlElement
  if isinstance(xml_string, unicode):
    if encoding is None:
      xml_string = xml_string.encode(STRING_ENCODING)
    else:
      xml_string = xml_string.encode(encoding)
  tree = ElementTree.fromstring(xml_string)
  return _xml_element_from_tree(tree, target_class, version)
项目:roomfinder    作者:GuillaumeMorini    | 项目源码 | 文件源码
def doSomethingWithResult(response):
    if response is None:
        return "KO"
    else:
        tree = ET.fromstring(response.text)

        status = "Free"
        # arrgh, namespaces!!
        elems=tree.findall(".//{http://schemas.microsoft.com/exchange/services/2006/types}BusyType")
        for elem in elems:
            status=elem.text

        tree2=ET.fromstring(response.request.body)
        elems2=tree2.findall(".//{http://schemas.microsoft.com/exchange/services/2006/types}Address")
        for e in elems2:
            room=e.text

        #sys.stderr.write(str(now.isoformat())+": Get BusyType for room "+str(rooms[room])+": len: "+str(len(elems))+" => "+str(elems[0])+"\n")
        sys.stderr.write(str(now.isoformat())+": Status for room "+str(rooms[room])+": "+str(room)+" => "+status+"\n")
        result.append((status, rooms[room], room))
项目:vmx-docker-lwaftr    作者:Juniper    | 项目源码 | 文件源码
def snabb_state(query_output):
    root = ET.fromstring(query_output)
    print ("<snabb>")
    for instance in root:
        # In each instance, we need to query the id, pci, pid.
        print ("<instance>")
        for child in instance:
            PRINT_TAG(child, "id")
            PRINT_TAG(child,"pid")
        PRINT_TAG(child,"next_hop_mac_v4")
        PRINT_TAG(child,"next_hop_mac_v6")
        #child = None
        #for child in instance:
        if child.tag == "pci":
                for pcis in child:
                    for pci_child in pcis:
                            PRINT_TAG(pci_child,"rxpackets")
                            PRINT_TAG(pci_child,"txpackets")
                            PRINT_TAG(pci_child,"rxdrop")
                            PRINT_TAG(pci_child,"txdrop")
        print "</instance>"
    print ("</snabb>")
    return
项目:vmx-docker-lwaftr    作者:Juniper    | 项目源码 | 文件源码
def snabb_statistics(output,argv):
    root = ET.fromstring(output)
    print "<snabb>"
    found = 0
    instance_id = ""
    for i in range(0,len(argv)):
        if argv[i] == "id":
            instance_id = argv[i+1]
            break

    for instance in root:
        if instance_id != '' and instance.findall("./id")[0].text != argv[2]:
            pass
        else:
        found += 1
            stats_per_instance(instance)

    if found == 0:
        print "<statistics><id_error>no instance found</id_error></statistics>"
    print "</snabb>"
    return
项目:vmx-docker-lwaftr    作者:Juniper    | 项目源码 | 文件源码
def snabb_statistics(output,argv):
    root = ET.fromstring(output)
    print "<snabb>"
    found = 0
    instance_id = ""
    for i in range(0,len(argv)):
        if argv[i] == "id":
            instance_id = argv[i+1]
            break

    for instance in root:
        if instance_id != '' and instance.findall("./id")[0].text != argv[2]:
            pass
        else:
        found += 1
            stats_per_instance(instance)

    if found == 0:
        print "<statistics><id_error>no instance found</id_error></statistics>"
    print "</snabb>"
    return
项目:camera-polyglot    作者:jimboca    | 项目源码 | 文件源码
def _http_get_and_parse(self, cmd, payload = {}):
        """ 
        Call http_get and parse the returned Foscam data into a hash.  The data 
        all looks like:  var id='000C5DDC9D6C';
        """
        ret  = {}
        data = self._http_get(cmd,payload)
        self.parent.logger.debug("FoscamHD2:_http_get_and_parse:%s: data=%s" % (self.name,data))
        if data is False:
            code = -1
        else:
            # Return code is good, unless CGI result changes it later
            code = 0
            root = ET.fromstring(data)
            for child in root.iter():
                if child.tag == 'result':
                    code = int(child.text)
                elif child.tag != 'CGI_Result':
                    ret[child.tag] = child.text
        self.parent.logger.debug("FoscamHD2:_http_get_and_parse:%s: code=%d, ret=%s" % (self.name,code,ret))
        return code,ret
项目:kcli    作者:karmab    | 项目源码 | 文件源码
def ip(self, name):
        leases = {}
        conn = self.conn
        for network in conn.listAllNetworks():
            for lease in network.DHCPLeases():
                ip = lease['ipaddr']
                mac = lease['mac']
                leases[mac] = ip
        try:
            vm = conn.lookupByName(name)
            xml = vm.XMLDesc(0)
            root = ET.fromstring(xml)
        except:
            return None
        for element in root.getiterator('{kvirt}info'):
            e = element.find('{kvirt}ip')
            if e is not None:
                return e.text
        nic = root.getiterator('interface')[0]
        mac = nic.find('mac').get('address')
        if vm.isActive() and mac in leases:
            return leases[mac]
        else:
            return None
项目:kcli    作者:karmab    | 项目源码 | 文件源码
def volumes(self, iso=False):
        isos = []
        templates = []
        default_templates = [os.path.basename(t) for t in defaults.TEMPLATES.values() if t is not None]
        conn = self.conn
        for storage in conn.listStoragePools():
            storage = conn.storagePoolLookupByName(storage)
            storage.refresh(0)
            storagexml = storage.XMLDesc(0)
            root = ET.fromstring(storagexml)
            for element in root.getiterator('path'):
                storagepath = element.text
                break
            for volume in storage.listVolumes():
                if volume.endswith('iso'):
                    isos.append("%s/%s" % (storagepath, volume))
                elif volume.endswith('qcow2') or volume.endswith('qc2') or volume in default_templates:
                    templates.append("%s/%s" % (storagepath, volume))
        if iso:
            return sorted(isos, key=lambda s: s.lower())
        else:
            return sorted(templates, key=lambda s: s.lower())
项目:kcli    作者:karmab    | 项目源码 | 文件源码
def _uploadimage(self, name, pool='default', origin='/tmp', suffix='.ISO'):
        name = "%s%s" % (name, suffix)
        conn = self.conn
        poolxml = pool.XMLDesc(0)
        root = ET.fromstring(poolxml)
        for element in root.getiterator('path'):
            poolpath = element.text
            break
        imagepath = "%s/%s" % (poolpath, name)
        imagexml = self._xmlvolume(path=imagepath, size=0, diskformat='raw')
        pool.createXML(imagexml, 0)
        imagevolume = conn.storageVolLookupByPath(imagepath)
        stream = conn.newStream(0)
        imagevolume.upload(stream, 0, 0)
        with open("%s/%s" % (origin, name)) as ori:
            stream.sendAll(self.handler, ori)
            stream.finish()
项目:kcli    作者:karmab    | 项目源码 | 文件源码
def remove_cloudinit(self, name):
        conn = self.conn
        try:
            vm = conn.lookupByName(name)
            xml = vm.XMLDesc(0)
            root = ET.fromstring(xml)
        except:
            print("VM %s not found" % name)
            return {'result': 'failure', 'reason': "VM %s not found" % name}
        for element in root.getiterator('disk'):
            disktype = element.get('device')
            if disktype == 'cdrom':
                source = element.find('source')
                path = source.get('file')
                if source is None:
                    break
                volume = conn.storageVolLookupByPath(path)
                volume.delete(0)
                element.remove(source)
        newxml = ET.tostring(root)
        conn.defineXML(newxml)
项目:kcli    作者:karmab    | 项目源码 | 文件源码
def vm_ports(self, name):
        conn = self.conn
        networks = []
        try:
            vm = conn.lookupByName(name)
        except:
            common.pprint("VM %s not found" % name, color='red')
            return networks
        xml = vm.XMLDesc(0)
        root = ET.fromstring(xml)
        for element in root.getiterator('interface'):
            networktype = element.get('type')
            if networktype == 'bridge':
                network = element.find('source').get('bridge')
            else:
                network = element.find('source').get('network')
            networks.append(network)
        return networks
项目:kcli    作者:karmab    | 项目源码 | 文件源码
def _get_bridge(self, name):
        conn = self.conn
        bridges = [interface.name() for interface in conn.listAllInterfaces()]
        if name in bridges:
            return name
        try:
            net = self.conn.networkLookupByName(name)
        except:
            return None
        netxml = net.XMLDesc(0)
        root = ET.fromstring(netxml)
        bridge = root.getiterator('bridge')
        if bridge:
            attributes = bridge[0].attrib
            bridge = attributes.get('name')
        return bridge
项目:PySIGNFe    作者:thiagopena    | 项目源码 | 文件源码
def _le_xml(self, arquivo):
        if arquivo is None:
            return False

        if not isinstance(arquivo, basestring):
            arquivo = etree.tounicode(arquivo)

        if arquivo is not None:
            if isinstance(arquivo, basestring): 
                if NAMESPACE_NFSE in arquivo:
                    arquivo = por_acentos(arquivo)
                if u'<' in arquivo:
                    self._xml = etree.fromstring(tira_abertura(arquivo))
                else:
                    arq = open(arquivo)
                    txt = ''.join(arq.readlines())
                    txt = tira_abertura(txt)
                    arq.close()
                    self._xml = etree.fromstring(txt)
            else:
                self._xml = etree.parse(arquivo)
            return True

        return False
项目:PySIGNFe    作者:thiagopena    | 项目源码 | 文件源码
def validar(self):
        arquivo_esquema = self.caminho_esquema + self.arquivo_esquema

        # Aqui é importante remover a declaração do encoding
        # para evitar erros de conversão unicode para ascii
        xml = tira_abertura(self.xml).encode(u'utf-8')

        esquema = etree.XMLSchema(etree.parse(arquivo_esquema))

        if not esquema.validate(etree.fromstring(xml)):
            for e in esquema.error_log:
                if e.level == 1:
                    self.alertas.append(e.message.replace('{http://www.portalfiscal.inf.br/nfe}', ''))
                elif e.level == 2:
                    self.erros.append(e.message.replace('{http://www.portalfiscal.inf.br/nfe}', ''))

        return esquema.error_log
项目:eitbapi    作者:erral    | 项目源码 | 文件源码
def xml_to_dict(data):
    try:
        root = ET.fromstring(data)
    except ET.ParseError:
        root = []
    d = {}
    for item in root:
        dd = {}
        for subitem in item:
            m = {}
            m['text'] = subitem.text
            m.update(subitem.attrib)
            dd[subitem.tag] = m

        d[dd['title']['text']] = dd

    return d
项目:masakari-monitors    作者:openstack    | 项目源码 | 文件源码
def test_check_host_status_by_cibadmin(
        self, mock_get_cib_xml, mock_set_cib_xml, mock_have_quorum,
        mock_get_node_state_tag_list, mock_check_if_status_changed):
        mock_get_cib_xml.return_value = STATUS_TAG_XML
        mock_set_cib_xml.return_value = None
        mock_have_quorum.return_value = 1
        status_tag = ElementTree.fromstring(STATUS_TAG_XML)
        node_state_tag_list = status_tag.getchildren()
        mock_get_node_state_tag_list.return_value = node_state_tag_list
        mock_check_if_status_changed.return_value = None

        obj = handle_host.HandleHost()
        ret = obj._check_host_status_by_cibadmin()

        self.assertEqual(0, ret)
        mock_get_cib_xml.assert_called_once_with()
        mock_set_cib_xml.assert_called_once_with(STATUS_TAG_XML)
        mock_have_quorum.assert_called_once_with()
        mock_get_node_state_tag_list.assert_called_once_with()
        mock_check_if_status_changed.assert_called_once_with(
            node_state_tag_list)
项目:masakari-monitors    作者:openstack    | 项目源码 | 文件源码
def test_check_host_status_by_cibadmin_no_quorum(
        self, mock_get_cib_xml, mock_set_cib_xml, mock_have_quorum,
        mock_get_node_state_tag_list, mock_check_if_status_changed):
        mock_get_cib_xml.return_value = STATUS_TAG_XML
        mock_set_cib_xml.return_value = None
        mock_have_quorum.return_value = 0
        status_tag = ElementTree.fromstring(STATUS_TAG_XML)
        node_state_tag_list = status_tag.getchildren()
        mock_get_node_state_tag_list.return_value = node_state_tag_list
        mock_check_if_status_changed.return_value = None

        obj = handle_host.HandleHost()
        ret = obj._check_host_status_by_cibadmin()

        self.assertEqual(0, ret)
        mock_get_cib_xml.assert_called_once_with()
        mock_set_cib_xml.assert_called_once_with(STATUS_TAG_XML)
        mock_have_quorum.assert_called_once_with()
        mock_get_node_state_tag_list.assert_called_once_with()
        mock_check_if_status_changed.assert_called_once_with(
            node_state_tag_list)
项目:nojs    作者:chrisdickinson    | 项目源码 | 文件源码
def testParseClassNode(self):
    example_xml_string = (
        '<class name="Class1" extends="java.lang.Object">'
        '<method name="method1">'
        '</method>'
        '<method name="method2">'
        '</method>'
        '</class>')

    actual = dexdump._ParseClassNode(
        ElementTree.fromstring(example_xml_string))

    expected = {
      'methods': ['method1', 'method2'],
      'superclass': 'java.lang.Object',
    }
    self.assertEquals(expected, actual)
项目:warriorframework    作者:warriorframework    | 项目源码 | 文件源码
def del_tags_from_xml(xml, tag_list=[]):
    """
        It deletes the tags either by their names or xpath

        Arguments:
            1.xml: It takes xml file path or xml string as input
            2.tag_list: It contains list of tags which needs to be removed
        Returns:
            It returns xml string
    """
    if os.path.exists(xml):
        tree = ElementTree.parse(xml)
        root = tree.getroot()
    else:
        root = ElementTree.fromstring(xml)
    for tag in tag_list:
        if 'xpath=' in tag:
            tag = tag.strip('xpath=')
            req_tags = getChildElementsListWithSpecificXpath(root, tag)
        else:
            req_tags = getChildElementsListWithSpecificXpath(root, ".//{0}".format(tag))
        recursive_delete_among_children(root, req_tags)

    xml_string = ElementTree.tostring(root, encoding='utf-8', method='xml')
    return xml_string
项目:adbus    作者:ccxtechnologies    | 项目源码 | 文件源码
def _update_interfaces(self):
        self._interfaces = {}

        root = etree.fromstring(self._introspect_xml)
        for e in root.iter('interface'):
            name = e.attrib['name']
            self._interfaces[name] = Interface(
                    self, self._service, self._address, self._path, e,
                    self._camel_convert, self._timeout_ms,
                    self._changed_coroutines.get(name, None)
            )

        for e in root.iter('node'):
            try:
                self._nodes.append(e.attrib['name'])
            except KeyError:
                pass
项目:tap-adwords    作者:singer-io    | 项目源码 | 文件源码
def do_discover_reports(sdk_client):
    url = 'https://adwords.google.com/api/adwords/reportdownload/{}/reportDefinition.xsd'.format(VERSION) #pylint: disable=line-too-long
    xsd = request_xsd(url)
    root = ET.fromstring(xsd)
    nodes = list(root.find(".//*[@name='ReportDefinition.ReportType']/*"))

    stream_names = [p.attrib['value'] for p in nodes if p.attrib['value'] in VERIFIED_REPORTS] #pylint: disable=line-too-long
    streams = []
    LOGGER.info("Starting report discovery")
    for stream_name in stream_names:
        schema, mdata = create_schema_for_report(stream_name, sdk_client)
        streams.append({'stream': stream_name,
                        'tap_stream_id': stream_name,
                        'metadata' : metadata.to_list(mdata),
                        'schema': schema})

    LOGGER.info("Report discovery complete")
    return streams
项目:VManagePlatform    作者:welliamcao    | 项目源码 | 文件源码
def mountIso(self,instance,dev, image):
        tree = ElementTree.fromstring(self.getInsXMLDesc(instance,0))
        for disk in tree.findall('devices/disk'):
            if disk.get('device') == 'cdrom':
                for elm in disk:
                    if elm.tag == 'target':
                        if elm.get('dev') == dev:
                            src_media = ElementTree.Element('source')
                            src_media.set('file', image)
                            disk.append(src_media)
                            if instance.state()[0] == 1:
                                xml_disk = ElementTree.tostring(disk)
                                try:
                                    instance.attachDevice(xml_disk)
                                except libvirt.libvirtError,e:
                                    return '??????????{result}'.format(result=e.get_error_message()) 
                                xmldom = self.getInsXMLDesc(instance,1)
                            if instance.state()[0] == 5:
                                xmldom = ElementTree.tostring(tree)
                            try:
                                return self.defineXML(xmldom)
                            except libvirt.libvirtError,e:
                                return '??????????{result}'.format(result=e.get_error_message())
项目:VManagePlatform    作者:welliamcao    | 项目源码 | 文件源码
def getNetUsage(self,instance):
        devices = []
        dev_usage = []
        tree = ElementTree.fromstring(self.getInsXMLDesc(instance, flag=1))
        if instance.state()[0] == 1:
            tree = ElementTree.fromstring(self.getInsXMLDesc(instance, flag=1))
            for target in tree.findall("devices/interface/target"):
                devices.append(target.get("dev"))
            for i, dev in enumerate(devices):
                rx_use_ago = instance.interfaceStats(dev)[0]
                tx_use_ago = instance.interfaceStats(dev)[4]
                time.sleep(1)
                rx_use_now = instance.interfaceStats(dev)[0]
                tx_use_now = instance.interfaceStats(dev)[4]
                rx_diff_usage = (rx_use_now - rx_use_ago) * 8
                tx_diff_usage = (tx_use_now - tx_use_ago) * 8
                dev_usage.append({'dev': i, 'rx': rx_diff_usage, 'tx': tx_diff_usage})
        else:
            for i, dev in enumerate(self.get_net_device(instance)):
                dev_usage.append({'dev': i, 'rx': 0, 'tx': 0})
        return dev_usage
项目:VManagePlatform    作者:welliamcao    | 项目源码 | 文件源码
def delInstanceCdrom(self,instance,cdrom):
        '''????'''
        raw_xml = instance.XMLDesc(0) 
        root = ElementTree.fromstring(raw_xml) 
        for dk in root.findall('./devices'):
            devs = dk.getchildren()
            for dev in devs:
                if dev.tag == 'disk'and dev.get('device')=='cdrom':
                    for iter in dev:
                        if iter.tag == 'target' and iter.get('dev') == cdrom:
                            devs.remove(dev)
        diskXml = ElementTree.tostring(root)      
        try:
            return self.defineXML(diskXml)
        except libvirt.libvirtError,e:
            return '??????????????{result}'.format(result=e.get_error_message())
项目:VManagePlatform    作者:welliamcao    | 项目源码 | 文件源码
def addInstanceInterface(self,instance,brName):
        netk = self.getNetwork(brName)
        if netk:     
            xml = netk.XMLDesc(0)
            tree = ElementTree.fromstring(xml)
            try:
                mode = tree.find('virtualport').get('type') 
            except:
                mode = 'brctl'
            model = tree.find('forward').get('mode')               
            interXml = Const.CreateNetcard(nkt_br=brName, ntk_name=brName +'-'+CommTools.radString(length=4), data={'type':model,'mode':mode})
            try:
                return instance.attachDeviceFlags(interXml,3)#??????????flags?3?????????????? 
            except libvirt.libvirtError,e:
                return '??????????????{result}'.format(result=e.get_error_message()) 
        else:return False
项目:VManagePlatform    作者:welliamcao    | 项目源码 | 文件源码
def setInterfaceBandwidth(self,instance,port,bandwidth):
        '''????'''
        domXml = instance.XMLDesc(0)
        root = ElementTree.fromstring(domXml)
        try:
            for dev in root.findall('.//devices/'):
                if dev.tag == 'interface':
                    for iter in dev:
                        if iter.tag == 'target' and iter.get('dev') == port:
                            bwXml = ElementTree.SubElement(dev,'bandwidth')   
                            inbdXml = ElementTree.Element('inbound')
                            inbdXml.set('average',str(int(bandwidth)*1024))
                            inbdXml.set('peak',str(int(bandwidth)*1024))
                            inbdXml.set('burst','1024')
                            outbdXml = ElementTree.Element('outbound')
                            outbdXml.set('average',str(int(bandwidth)*1024))
                            outbdXml.set('peak',str(int(bandwidth)*1024))
                            outbdXml.set('burst','1024')
                            bwXml.append(inbdXml)
                            bwXml.append(outbdXml)
            domXml = ElementTree.tostring(root)
        except Exception,e:
            return {"status":"faild",'data':e}
        if self.defineXML(domXml):return {"status":"success",'data':None}
项目:Causality    作者:vcla    | 项目源码 | 文件源码
def setUpClass(cls):
        xml = """
<temporal>
        <fluent_changes>
                <fluent_change energy="60.281" fluent="PHONE_ACTIVE" frame="0" new_value="off"/>
                <fluent_change energy="60.281" fluent="PHONE_ACTIVE" frame="3890" new_value="on" old_value="off"/>
                <fluent_change energy="60.281" fluent="PHONE_ACTIVE" frame="3990" new_value="off" old_value="on"/>
        </fluent_changes>
        <actions>
                <event action="makecall_START" energy="60.281" frame="3718"/>
                <event action="makecall_END" energy="60.281" frame="3767"/>
        </actions>
</temporal>"""
        cls.parsexml = ET.fromstring(xml)