我们从Python开源项目中,提取了以下48个代码示例,用于说明如何使用xml.etree.ElementTree.fromstring()。
def _set_boot_device(conn, domain, device): """Set the boot device. :param conn: active libvirt connection. :param domain: libvirt domain object. :raises: LibvirtError if failed update domain xml. """ parsed = ET.fromstring(domain.XMLDesc()) os = parsed.find('os') boot_list = os.findall('boot') # Clear boot list for boot_el in boot_list: os.remove(boot_el) boot_el = ET.SubElement(os, 'boot') boot_el.set('dev', device) try: conn.defineXML(ET.tostring(parsed)) except libvirt.libvirtError as e: raise isd_exc.LibvirtError(err=e)
def choose_aws_role(assertion): """ Choose AWS role from SAML assertion """ aws_attribute_role = 'https://aws.amazon.com/SAML/Attributes/Role' attribute_value_urn = '{urn:oasis:names:tc:SAML:2.0:assertion}AttributeValue' roles = [] role_tuple = namedtuple("RoleTuple", ["principal_arn", "role_arn"]) root = ET.fromstring(base64.b64decode(assertion)) for saml2attribute in root.iter('{urn:oasis:names:tc:SAML:2.0:assertion}Attribute'): if saml2attribute.get('Name') == aws_attribute_role: for saml2attributevalue in saml2attribute.iter(attribute_value_urn): roles.append(role_tuple(*saml2attributevalue.text.split(','))) for index, role in enumerate(roles): role_name = role.role_arn.split('/')[1] print("%d: %s" % (index+1, role_name)) role_choice = input('Please select the AWS role: ')-1 return roles[role_choice]
def getFileURLs(file_ids): ''' Retrieve the ftp location for a list of file IDs @param file_ids: List of file IDs @return List of ftp locations ''' info_url='http://modwebsrv.modaps.eosdis.nasa.gov/axis2/services/MODAPSservices/getFileUrls?fileIds=' for file_id in file_ids: info_url += str(file_id) + ',' info_url = info_url[:-1] url = urlopen(info_url) tree = ET.fromstring(url.read().decode()) url.close() return [ child.text for child in tree ]
def checkCobertura(config, checkoutSteps, buildSteps, packageSteps, **kwargs): found = False for s in checkoutSteps: if s.getPackage().getName().endswith("unittests"): found = True for s in buildSteps: if s.getPackage().getName().endswith("unittests"): found = True for s in packageSteps: if s.getPackage().getName().endswith("unittests"): found = True if found: root = ElementTree.fromstring(config) publishers = root.find("publishers") if publishers.find("hudson.plugins.cobertura.CoberturaPublisher") is None: publishers.append(PLUGIN) config = ElementTree.tostring(root, encoding="UTF-8") return config
def _scanDir(self, workspace, dir): self.__dir = dir try: info = ElementTree.fromstring(subprocess.check_output( ["svn", "info", "--xml", dir], cwd=workspace, universal_newlines=True)) self.__url = info.find('entry/url').text self.__revision = int(info.find('entry').get('revision')) self.__repoRoot = info.find('entry/repository/root').text self.__repoUuid = info.find('entry/repository/uuid').text status = subprocess.check_output(["svn", "status", dir], cwd=workspace, universal_newlines=True) self.__dirty = status != "" except subprocess.CalledProcessError as e: raise BuildError("Svn audit failed: " + str(e)) except OSError as e: raise BuildError("Error calling git: " + str(e)) except ElementTree.ParseError as e: raise BuildError("Invalid XML received from svn")
def testSetGitShallowClone(self): self.executeBobJenkinsCmd("set-options -o scm.git.shallow=42 myTestJenkins") self.executeBobJenkinsCmd("push -q myTestJenkins") send = self.jenkinsMock.getServerData() config = ElementTree.fromstring(send[0][1]) for clone in config.iter('hudson.plugins.git.extensions.impl.CloneOption'): found = 0 for a in clone.getiterator(): if a.tag == 'shallow': assert(a.text == 'true') found += 1 if a.tag == 'depth': assert(a.text == '42') found += 1 assert(found == 2) self.executeBobJenkinsCmd("set-options -o scm.git.shallow=-1 myTestJenkins") with self.assertRaises(Exception) as c: self.executeBobJenkinsCmd("push -q myTestJenkins") assert(type(c.exception) == BuildError)
def testShortDescription(self): self.createComplexRecipes() self.executeBobJenkinsCmd("set-options --shortdescription myTestJenkinsComplex") self.executeBobJenkinsCmd("push -q myTestJenkinsComplex") send = self.jenkinsMock.getServerData() result_set = set() try: for i in send: if i[0] == '/createItem?name=dependency-two': for items in ElementTree.fromstring(i[1]).iter('description'): for line in [x for x in items.itertext()][0].splitlines(): if line.startswith('<li>') and line.endswith('</li>'): result_set.add(line[4:-5]) except: print("Malformed Data Recieved") self.assertEqual(result_set, {'root/dependency-one/dependency-two'})
def FromXml(self, xml): if not xml: raise WxPayException("xml?????") # ??ElementTree try: import xml.etree.cElementTree as ET except ImportError: import xml.etree.ElementTree as ET # ??xml xml2dict = {} xml_tree = ET.fromstring(xml) for child_node in xml_tree: xml2dict[child_node.tag] = child_node.text self.values = xml2dict return self.values # ?????????url??
def _get_defects(self, xml_string): '''evaluate the xml string returned by cppcheck (on sdterr) and use it to create a list of defects. ''' defects = [] for error in ElementTree.fromstring(xml_string).iter('error'): defect = {} defect['id'] = error.get('id') defect['severity'] = error.get('severity') defect['msg'] = str(error.get('msg')).replace('<','<') defect['verbose'] = error.get('verbose') for location in error.findall('location'): defect['file'] = location.get('file') defect['line'] = str(int(location.get('line')) - 1) defects.append(defect) return defects
def _create_html_index(self, files): name = self.generator.get_name() root = ElementTree.fromstring(CPPCHECK_HTML_FILE) title = root.find('head/title') title.text = 'cppcheck - report - %s' % name body = root.find('body') for div in body.findall('div'): if div.get('id') == 'page': page = div break for div in page.findall('div'): if div.get('id') == 'header': h1 = div.find('h1') h1.text = 'cppcheck report - %s' % name if div.get('id') == 'content': content = div self._create_html_table(content, files) s = ElementTree.tostring(root, method='html') s = CCPCHECK_HTML_TYPE + s node = self.generator.path.get_bld().find_or_declare('cppcheck/index.html') node.write(s) return node
def _create_html_table(self, content, files): table = ElementTree.fromstring(CPPCHECK_HTML_TABLE) for name, val in files.items(): f = val['htmlfile'] s = '<tr><td colspan="4"><a href="%s">%s</a></td></tr>\n' % (f,name) row = ElementTree.fromstring(s) table.append(row) errors = sorted(val['errors'], key=lambda e: int(e['line']) if e.has_key('line') else sys.maxint) for e in errors: if not e.has_key('line'): s = '<tr><td></td><td>%s</td><td>%s</td><td>%s</td></tr>\n' % (e['id'], e['severity'], e['msg']) else: attr = '' if e['severity'] == 'error': attr = 'class="error"' s = '<tr><td><a href="%s#line-%s">%s</a></td>' % (f, e['line'], e['line']) s+= '<td>%s</td><td>%s</td><td %s>%s</td></tr>\n' % (e['id'], e['severity'], attr, e['msg']) row = ElementTree.fromstring(s) table.append(row) content.append(table)
def get_xml_trees(db, host, pid, usernames, graceful=False): """ Params: db, host, paragraph id, the list of usernames wanted, Optional: graceful: True if no excpetions are to be raised excpetion raised if a user did not submit an annotation for the passage returns a list of xml roots elements """ cur = get_cursor(db, host) xmls = [] for username in usernames: username = str(username) # precaution for cases bad input e.g. 101 cur_uid = get_uid(db, host, username) cur.execute("SELECT xml FROM xmls WHERE paid=" + PLACE_HOLDER + " AND uid=" + PLACE_HOLDER + "ORDER BY ts DESC", (pid, cur_uid)) raw_xml = cur.fetchone() if not raw_xml and not graceful: raise Exception("The user " + username + " did not submit an annotation for this passage") else: xmls.append(fromstring(raw_xml[0])) return xmls
def get_by_xid(db, host, xid, graceful=False): """Returns the passages that correspond to the xid Optional: graceful: True if no excpetions are to be raised excpetion raised if xid does no exist """ # precaution for bad input e.g. 101->'101' xid = str(xid) cur = get_cursor(db, host) cur.execute("SELECT xml FROM xmls WHERE id" + "=" + PLACE_HOLDER, (int(xid),)) raw_xml = cur.fetchone() if not raw_xml and not graceful: raise Exception("The xid " + xid + " does not exist") elif raw_xml: return fromstring(raw_xml[0])
def get_mac_address(conn, domain_name): """Get MAC address from domain XML.""" domain = conn.lookupByName(domain_name) domain_xml = domain.XMLDesc() domain_tree = etree.fromstring(domain_xml) devices = domain_tree.find('devices') interfaces = devices.iterfind('interface') for interface in interfaces: source = interface.find('source') if source is None or source.get('network') != 'vagrant-private-dhcp': continue mac_element = interface.find('mac') mac_address = mac_element.get('address') return mac_address raise NoPrivateDHCPInterfaceException()
def __init__(self, path, *args, **kwargs): cls = self.__class__ super(cls, self).__init__(path, *args, **kwargs) while True: sess = PeektaggedSectionHeader(self.fh) if sess.tag == 'pkts': break sess.payload = self.fh.read(sess.len) if sess.tag == PEEKTAGGED_FILE_MAGIC: root = ET.fromstring(sess.payload) if root.tag != 'VersionInfo': raise PeektaggedException("Corrupted version info") for child in root: setattr(self, child.tag, child.text) elif sess.tag == 'sess': root = ET.fromstring(sess.payload) for child in root: if child.tag == 'PacketCount': setattr(self, 'total_packets', int(child.text)) break
def CreateClassFromXMLString(target_class, xml_string, string_encoding=None): """Creates an instance of the target class from the string contents. Args: target_class: class The class which will be instantiated and populated with the contents of the XML. This class must have a _tag and a _namespace class variable. xml_string: str A string which contains valid XML. The root element of the XML string should match the tag and namespace of the desired class. string_encoding: str The character encoding which the xml_string should be converted to before it is interpreted and translated into objects. The default is None in which case the string encoding is not changed. Returns: An instance of the target class with members assigned according to the contents of the XML - or None if the root XML tag and namespace did not match those of the target class. """ encoding = string_encoding or XML_STRING_ENCODING if encoding and isinstance(xml_string, unicode): xml_string = xml_string.encode(encoding) tree = ElementTree.fromstring(xml_string) return _CreateClassFromElementTree(target_class, tree)
def parse(xml_string, target_class=None, version=1, encoding=None): """Parses the XML string according to the rules for the target_class. Args: xml_string: str or unicode target_class: XmlElement or a subclass. If None is specified, the XmlElement class is used. version: int (optional) The version of the schema which should be used when converting the XML into an object. The default is 1. encoding: str (optional) The character encoding of the bytes in the xml_string. Default is 'UTF-8'. """ if target_class is None: target_class = XmlElement if isinstance(xml_string, unicode): if encoding is None: xml_string = xml_string.encode(STRING_ENCODING) else: xml_string = xml_string.encode(encoding) tree = ElementTree.fromstring(xml_string) return _xml_element_from_tree(tree, target_class, version)
def doSomethingWithResult(response): if response is None: return "KO" else: tree = ET.fromstring(response.text) status = "Free" # arrgh, namespaces!! elems=tree.findall(".//{http://schemas.microsoft.com/exchange/services/2006/types}BusyType") for elem in elems: status=elem.text tree2=ET.fromstring(response.request.body) elems2=tree2.findall(".//{http://schemas.microsoft.com/exchange/services/2006/types}Address") for e in elems2: room=e.text #sys.stderr.write(str(now.isoformat())+": Get BusyType for room "+str(rooms[room])+": len: "+str(len(elems))+" => "+str(elems[0])+"\n") sys.stderr.write(str(now.isoformat())+": Status for room "+str(rooms[room])+": "+str(room)+" => "+status+"\n") result.append((status, rooms[room], room))
def snabb_state(query_output): root = ET.fromstring(query_output) print ("<snabb>") for instance in root: # In each instance, we need to query the id, pci, pid. print ("<instance>") for child in instance: PRINT_TAG(child, "id") PRINT_TAG(child,"pid") PRINT_TAG(child,"next_hop_mac_v4") PRINT_TAG(child,"next_hop_mac_v6") #child = None #for child in instance: if child.tag == "pci": for pcis in child: for pci_child in pcis: PRINT_TAG(pci_child,"rxpackets") PRINT_TAG(pci_child,"txpackets") PRINT_TAG(pci_child,"rxdrop") PRINT_TAG(pci_child,"txdrop") print "</instance>" print ("</snabb>") return
def snabb_statistics(output,argv): root = ET.fromstring(output) print "<snabb>" found = 0 instance_id = "" for i in range(0,len(argv)): if argv[i] == "id": instance_id = argv[i+1] break for instance in root: if instance_id != '' and instance.findall("./id")[0].text != argv[2]: pass else: found += 1 stats_per_instance(instance) if found == 0: print "<statistics><id_error>no instance found</id_error></statistics>" print "</snabb>" return
def _http_get_and_parse(self, cmd, payload = {}): """ Call http_get and parse the returned Foscam data into a hash. The data all looks like: var id='000C5DDC9D6C'; """ ret = {} data = self._http_get(cmd,payload) self.parent.logger.debug("FoscamHD2:_http_get_and_parse:%s: data=%s" % (self.name,data)) if data is False: code = -1 else: # Return code is good, unless CGI result changes it later code = 0 root = ET.fromstring(data) for child in root.iter(): if child.tag == 'result': code = int(child.text) elif child.tag != 'CGI_Result': ret[child.tag] = child.text self.parent.logger.debug("FoscamHD2:_http_get_and_parse:%s: code=%d, ret=%s" % (self.name,code,ret)) return code,ret
def ip(self, name): leases = {} conn = self.conn for network in conn.listAllNetworks(): for lease in network.DHCPLeases(): ip = lease['ipaddr'] mac = lease['mac'] leases[mac] = ip try: vm = conn.lookupByName(name) xml = vm.XMLDesc(0) root = ET.fromstring(xml) except: return None for element in root.getiterator('{kvirt}info'): e = element.find('{kvirt}ip') if e is not None: return e.text nic = root.getiterator('interface')[0] mac = nic.find('mac').get('address') if vm.isActive() and mac in leases: return leases[mac] else: return None
def volumes(self, iso=False): isos = [] templates = [] default_templates = [os.path.basename(t) for t in defaults.TEMPLATES.values() if t is not None] conn = self.conn for storage in conn.listStoragePools(): storage = conn.storagePoolLookupByName(storage) storage.refresh(0) storagexml = storage.XMLDesc(0) root = ET.fromstring(storagexml) for element in root.getiterator('path'): storagepath = element.text break for volume in storage.listVolumes(): if volume.endswith('iso'): isos.append("%s/%s" % (storagepath, volume)) elif volume.endswith('qcow2') or volume.endswith('qc2') or volume in default_templates: templates.append("%s/%s" % (storagepath, volume)) if iso: return sorted(isos, key=lambda s: s.lower()) else: return sorted(templates, key=lambda s: s.lower())
def _uploadimage(self, name, pool='default', origin='/tmp', suffix='.ISO'): name = "%s%s" % (name, suffix) conn = self.conn poolxml = pool.XMLDesc(0) root = ET.fromstring(poolxml) for element in root.getiterator('path'): poolpath = element.text break imagepath = "%s/%s" % (poolpath, name) imagexml = self._xmlvolume(path=imagepath, size=0, diskformat='raw') pool.createXML(imagexml, 0) imagevolume = conn.storageVolLookupByPath(imagepath) stream = conn.newStream(0) imagevolume.upload(stream, 0, 0) with open("%s/%s" % (origin, name)) as ori: stream.sendAll(self.handler, ori) stream.finish()
def remove_cloudinit(self, name): conn = self.conn try: vm = conn.lookupByName(name) xml = vm.XMLDesc(0) root = ET.fromstring(xml) except: print("VM %s not found" % name) return {'result': 'failure', 'reason': "VM %s not found" % name} for element in root.getiterator('disk'): disktype = element.get('device') if disktype == 'cdrom': source = element.find('source') path = source.get('file') if source is None: break volume = conn.storageVolLookupByPath(path) volume.delete(0) element.remove(source) newxml = ET.tostring(root) conn.defineXML(newxml)
def vm_ports(self, name): conn = self.conn networks = [] try: vm = conn.lookupByName(name) except: common.pprint("VM %s not found" % name, color='red') return networks xml = vm.XMLDesc(0) root = ET.fromstring(xml) for element in root.getiterator('interface'): networktype = element.get('type') if networktype == 'bridge': network = element.find('source').get('bridge') else: network = element.find('source').get('network') networks.append(network) return networks
def _get_bridge(self, name): conn = self.conn bridges = [interface.name() for interface in conn.listAllInterfaces()] if name in bridges: return name try: net = self.conn.networkLookupByName(name) except: return None netxml = net.XMLDesc(0) root = ET.fromstring(netxml) bridge = root.getiterator('bridge') if bridge: attributes = bridge[0].attrib bridge = attributes.get('name') return bridge
def _le_xml(self, arquivo): if arquivo is None: return False if not isinstance(arquivo, basestring): arquivo = etree.tounicode(arquivo) if arquivo is not None: if isinstance(arquivo, basestring): if NAMESPACE_NFSE in arquivo: arquivo = por_acentos(arquivo) if u'<' in arquivo: self._xml = etree.fromstring(tira_abertura(arquivo)) else: arq = open(arquivo) txt = ''.join(arq.readlines()) txt = tira_abertura(txt) arq.close() self._xml = etree.fromstring(txt) else: self._xml = etree.parse(arquivo) return True return False
def validar(self): arquivo_esquema = self.caminho_esquema + self.arquivo_esquema # Aqui é importante remover a declaração do encoding # para evitar erros de conversão unicode para ascii xml = tira_abertura(self.xml).encode(u'utf-8') esquema = etree.XMLSchema(etree.parse(arquivo_esquema)) if not esquema.validate(etree.fromstring(xml)): for e in esquema.error_log: if e.level == 1: self.alertas.append(e.message.replace('{http://www.portalfiscal.inf.br/nfe}', '')) elif e.level == 2: self.erros.append(e.message.replace('{http://www.portalfiscal.inf.br/nfe}', '')) return esquema.error_log
def xml_to_dict(data): try: root = ET.fromstring(data) except ET.ParseError: root = [] d = {} for item in root: dd = {} for subitem in item: m = {} m['text'] = subitem.text m.update(subitem.attrib) dd[subitem.tag] = m d[dd['title']['text']] = dd return d
def test_check_host_status_by_cibadmin( self, mock_get_cib_xml, mock_set_cib_xml, mock_have_quorum, mock_get_node_state_tag_list, mock_check_if_status_changed): mock_get_cib_xml.return_value = STATUS_TAG_XML mock_set_cib_xml.return_value = None mock_have_quorum.return_value = 1 status_tag = ElementTree.fromstring(STATUS_TAG_XML) node_state_tag_list = status_tag.getchildren() mock_get_node_state_tag_list.return_value = node_state_tag_list mock_check_if_status_changed.return_value = None obj = handle_host.HandleHost() ret = obj._check_host_status_by_cibadmin() self.assertEqual(0, ret) mock_get_cib_xml.assert_called_once_with() mock_set_cib_xml.assert_called_once_with(STATUS_TAG_XML) mock_have_quorum.assert_called_once_with() mock_get_node_state_tag_list.assert_called_once_with() mock_check_if_status_changed.assert_called_once_with( node_state_tag_list)
def test_check_host_status_by_cibadmin_no_quorum( self, mock_get_cib_xml, mock_set_cib_xml, mock_have_quorum, mock_get_node_state_tag_list, mock_check_if_status_changed): mock_get_cib_xml.return_value = STATUS_TAG_XML mock_set_cib_xml.return_value = None mock_have_quorum.return_value = 0 status_tag = ElementTree.fromstring(STATUS_TAG_XML) node_state_tag_list = status_tag.getchildren() mock_get_node_state_tag_list.return_value = node_state_tag_list mock_check_if_status_changed.return_value = None obj = handle_host.HandleHost() ret = obj._check_host_status_by_cibadmin() self.assertEqual(0, ret) mock_get_cib_xml.assert_called_once_with() mock_set_cib_xml.assert_called_once_with(STATUS_TAG_XML) mock_have_quorum.assert_called_once_with() mock_get_node_state_tag_list.assert_called_once_with() mock_check_if_status_changed.assert_called_once_with( node_state_tag_list)
def testParseClassNode(self): example_xml_string = ( '<class name="Class1" extends="java.lang.Object">' '<method name="method1">' '</method>' '<method name="method2">' '</method>' '</class>') actual = dexdump._ParseClassNode( ElementTree.fromstring(example_xml_string)) expected = { 'methods': ['method1', 'method2'], 'superclass': 'java.lang.Object', } self.assertEquals(expected, actual)
def del_tags_from_xml(xml, tag_list=[]): """ It deletes the tags either by their names or xpath Arguments: 1.xml: It takes xml file path or xml string as input 2.tag_list: It contains list of tags which needs to be removed Returns: It returns xml string """ if os.path.exists(xml): tree = ElementTree.parse(xml) root = tree.getroot() else: root = ElementTree.fromstring(xml) for tag in tag_list: if 'xpath=' in tag: tag = tag.strip('xpath=') req_tags = getChildElementsListWithSpecificXpath(root, tag) else: req_tags = getChildElementsListWithSpecificXpath(root, ".//{0}".format(tag)) recursive_delete_among_children(root, req_tags) xml_string = ElementTree.tostring(root, encoding='utf-8', method='xml') return xml_string
def _update_interfaces(self): self._interfaces = {} root = etree.fromstring(self._introspect_xml) for e in root.iter('interface'): name = e.attrib['name'] self._interfaces[name] = Interface( self, self._service, self._address, self._path, e, self._camel_convert, self._timeout_ms, self._changed_coroutines.get(name, None) ) for e in root.iter('node'): try: self._nodes.append(e.attrib['name']) except KeyError: pass
def do_discover_reports(sdk_client): url = 'https://adwords.google.com/api/adwords/reportdownload/{}/reportDefinition.xsd'.format(VERSION) #pylint: disable=line-too-long xsd = request_xsd(url) root = ET.fromstring(xsd) nodes = list(root.find(".//*[@name='ReportDefinition.ReportType']/*")) stream_names = [p.attrib['value'] for p in nodes if p.attrib['value'] in VERIFIED_REPORTS] #pylint: disable=line-too-long streams = [] LOGGER.info("Starting report discovery") for stream_name in stream_names: schema, mdata = create_schema_for_report(stream_name, sdk_client) streams.append({'stream': stream_name, 'tap_stream_id': stream_name, 'metadata' : metadata.to_list(mdata), 'schema': schema}) LOGGER.info("Report discovery complete") return streams
def mountIso(self,instance,dev, image): tree = ElementTree.fromstring(self.getInsXMLDesc(instance,0)) for disk in tree.findall('devices/disk'): if disk.get('device') == 'cdrom': for elm in disk: if elm.tag == 'target': if elm.get('dev') == dev: src_media = ElementTree.Element('source') src_media.set('file', image) disk.append(src_media) if instance.state()[0] == 1: xml_disk = ElementTree.tostring(disk) try: instance.attachDevice(xml_disk) except libvirt.libvirtError,e: return '??????????{result}'.format(result=e.get_error_message()) xmldom = self.getInsXMLDesc(instance,1) if instance.state()[0] == 5: xmldom = ElementTree.tostring(tree) try: return self.defineXML(xmldom) except libvirt.libvirtError,e: return '??????????{result}'.format(result=e.get_error_message())
def getNetUsage(self,instance): devices = [] dev_usage = [] tree = ElementTree.fromstring(self.getInsXMLDesc(instance, flag=1)) if instance.state()[0] == 1: tree = ElementTree.fromstring(self.getInsXMLDesc(instance, flag=1)) for target in tree.findall("devices/interface/target"): devices.append(target.get("dev")) for i, dev in enumerate(devices): rx_use_ago = instance.interfaceStats(dev)[0] tx_use_ago = instance.interfaceStats(dev)[4] time.sleep(1) rx_use_now = instance.interfaceStats(dev)[0] tx_use_now = instance.interfaceStats(dev)[4] rx_diff_usage = (rx_use_now - rx_use_ago) * 8 tx_diff_usage = (tx_use_now - tx_use_ago) * 8 dev_usage.append({'dev': i, 'rx': rx_diff_usage, 'tx': tx_diff_usage}) else: for i, dev in enumerate(self.get_net_device(instance)): dev_usage.append({'dev': i, 'rx': 0, 'tx': 0}) return dev_usage
def delInstanceCdrom(self,instance,cdrom): '''????''' raw_xml = instance.XMLDesc(0) root = ElementTree.fromstring(raw_xml) for dk in root.findall('./devices'): devs = dk.getchildren() for dev in devs: if dev.tag == 'disk'and dev.get('device')=='cdrom': for iter in dev: if iter.tag == 'target' and iter.get('dev') == cdrom: devs.remove(dev) diskXml = ElementTree.tostring(root) try: return self.defineXML(diskXml) except libvirt.libvirtError,e: return '??????????????{result}'.format(result=e.get_error_message())
def addInstanceInterface(self,instance,brName): netk = self.getNetwork(brName) if netk: xml = netk.XMLDesc(0) tree = ElementTree.fromstring(xml) try: mode = tree.find('virtualport').get('type') except: mode = 'brctl' model = tree.find('forward').get('mode') interXml = Const.CreateNetcard(nkt_br=brName, ntk_name=brName +'-'+CommTools.radString(length=4), data={'type':model,'mode':mode}) try: return instance.attachDeviceFlags(interXml,3)#??????????flags?3?????????????? except libvirt.libvirtError,e: return '??????????????{result}'.format(result=e.get_error_message()) else:return False
def setInterfaceBandwidth(self,instance,port,bandwidth): '''????''' domXml = instance.XMLDesc(0) root = ElementTree.fromstring(domXml) try: for dev in root.findall('.//devices/'): if dev.tag == 'interface': for iter in dev: if iter.tag == 'target' and iter.get('dev') == port: bwXml = ElementTree.SubElement(dev,'bandwidth') inbdXml = ElementTree.Element('inbound') inbdXml.set('average',str(int(bandwidth)*1024)) inbdXml.set('peak',str(int(bandwidth)*1024)) inbdXml.set('burst','1024') outbdXml = ElementTree.Element('outbound') outbdXml.set('average',str(int(bandwidth)*1024)) outbdXml.set('peak',str(int(bandwidth)*1024)) outbdXml.set('burst','1024') bwXml.append(inbdXml) bwXml.append(outbdXml) domXml = ElementTree.tostring(root) except Exception,e: return {"status":"faild",'data':e} if self.defineXML(domXml):return {"status":"success",'data':None}
def setUpClass(cls): xml = """ <temporal> <fluent_changes> <fluent_change energy="60.281" fluent="PHONE_ACTIVE" frame="0" new_value="off"/> <fluent_change energy="60.281" fluent="PHONE_ACTIVE" frame="3890" new_value="on" old_value="off"/> <fluent_change energy="60.281" fluent="PHONE_ACTIVE" frame="3990" new_value="off" old_value="on"/> </fluent_changes> <actions> <event action="makecall_START" energy="60.281" frame="3718"/> <event action="makecall_END" energy="60.281" frame="3767"/> </actions> </temporal>""" cls.parsexml = ET.fromstring(xml)