我们从Python开源项目中,提取了以下22个代码示例,用于说明如何使用xml.etree.ElementTree.dump()。
def get_media_servers(self): try: requests.packages.urllib3.disable_warnings() url = "http://"+self.server_ip+":47365/getMediaServers" print(url) r = requests.get(url) root = ET.fromstring(r.content) #print(ET.dump(root)) #for atype in root.findall('mediaserver'): for child in root: print("+", child.tag, child.get('name'), child.get('udn')) return r except Exception as err: print("Exception get_media_servers: {0}".format(err)) return None
def get_zones(self): try: requests.packages.urllib3.disable_warnings() url = "http://"+self.server_ip+":47365/getZones" print(url) r = requests.get(url) root = ET.fromstring(r.content) #print(ET.dump(root)) for atype in root.findall('zones'): for child in atype: print(child.tag, child.get('udn')) for room in child: print("+", room.tag, room.get('name'), room.get('udn')) for renderer in room: print(" +", renderer.tag, renderer.get('name'), renderer.get('udn')) return r except Exception as err: print("Exception get_zones: {0}".format(err)) return None
def _dl_first_xml(self, first_url): log.o(log_text.method_got_first_url(first_url)) # NOTE download and parse as xml first_xml = b.dl_html(first_url) var._['_raw_first_xml'] = first_xml # save raw xml text try: first = ET.fromstring(first_xml) except Exception as e: er = err.ParseXMLError('parse first xml text failed, first_url ', first_url) er.text = first raise er from e # check first Error if first.find('error') != None: raise err.MethodError('first xml info Error', ET.dump(first.find('error'))) var._['_vid_info']['vip'] = str(first.find('channel').get('vip')) return first
def show_xml(document): from xml.etree import ElementTree as XML xml_document = XML.Element("results") legs_xml = XML.SubElement(xml_document, 'legs') for n, leg in enumerate(document['legs'], start=1): leg_xml = XML.SubElement(legs_xml, 'leg', n=str(n)) leg_xml.text = leg teams_xml = XML.SubElement(xml_document, 'teams') for team in document['teams']: team_xml = XML.SubElement(teams_xml, "team") name_xml = XML.SubElement(team_xml, "name") name_xml.text = team['name'] position_xml = XML.SubElement(team_xml, "position") for n, position in enumerate(team['position'], start=1): leg_xml = XML.SubElement(position_xml, "leg", n=str(n)) leg_xml.text = str(position) pi = XML.ProcessingInstruction("xml", 'version="1.0"') XML.dump(pi) XML.dump(xml_document)
def _analyze_entity_node(self, node, entity): interface = node.get("interface") if interface is not None: mac_address = self.mac_addr_helper.get_interface_mac_addr(interface) entity.set_interface(interface, mac_address) for child in node.getchildren(): if child.tag == "packet": # pre processing "include" node # TODO: Possible to be deleted #self.__add_include_to_packet_node(child) # analyze packet node! self._set_packet_node(child, entity) if self.print_tc_preprocessing is True: self.__indent(node) ET.dump(node)
def writeToExternalFile( self, filename, metadata ): tree = self.convertMetadataToXML( self, metadata ) #ET.dump(tree) tree.write(filename, encoding='utf-8')
def bug_xmltoolkitX1(): """ dump() doesn't flush the output buffer >>> tree = ET.XML("<doc><table><tbody/></table></doc>") >>> ET.dump(tree); print("tail") <doc><table><tbody /></table></doc> tail """
def __callback_emm_state(self,msg): """ Given the EMM message, update EMM state and substate. :param msg: the NAS signaling message that carries EMM state """ self.__emm_status.state = msg.data["EMM State"] self.__emm_status.substate = msg.data["EMM Substate"] tmp = msg.data["PLMN"].split('-') self.__emm_status.guti.mcc = tmp[0] self.__emm_status.guti.mnc = tmp[1] self.__emm_status.guti.mme_group_id = msg.data["GUTI MME Group ID"] self.__emm_status.guti.mme_code = msg.data["GUTI MME Code"] self.__emm_status.guti.m_tmsi = msg.data["GUTI M-TMSI"] self.log_info(self.__emm_status.dump()) #broadcast state = { 'conn state': self.__emm_status.state, 'conn substate': self.__emm_status.substate, } # self.log_info('EMM_STATE', str(state)) self.broadcast_info('EMM_STATE', state) if self.callflow_state_machine.update_state(msg): self.log_info("Call flow status: " + str(self.callflow_state_machine.get_current_state()))
def __callback_emm(self,msg): """ Extract EMM status and configurations from the NAS messages :param msg: the EMM NAS message """ for field in msg.data.iter('field'): if field.get('show') == "UE network capability": # print str(ET.dump(field)) for val in field.iter('field'): if val.get('name') == 'nas_eps.emm.acc_csfb_cap': csfb_cap = True if val.get('show') == '1' else False self.log_info("CSFB Capbility: " + str(csfb_cap)) self.broadcast_info('CSFB_CAP', str(csfb_cap)) if field.get('show')=="EPS mobile identity - GUTI": field_val={} field_val['e212.mcc']=None field_val['e212.mnc']=None field_val['nas_eps.emm.mme_grp_id']=None field_val['nas_eps.emm.mme_code']=None field_val['nas_eps.emm.m_tmsi']=None for val in field.iter('field'): field_val[val.get('name')]=val.get('show') self.__emm_status.guti.mcc=field_val['e212.mcc'] self.__emm_status.guti.mnc=field_val['e212.mnc'] self.__emm_status.guti.mme_group_id=field_val['nas_eps.emm.mme_grp_id'] self.__emm_status.guti.mme_code=field_val['nas_eps.emm.mme_code'] self.__emm_status.guti.m_tmsi=field_val['nas_eps.emm.m_tmsi']
def dump(self): """ Report the EMM status :returns: a string that encodes EMM status """ return (self.__class__.__name__ + ' EMM.state='+xstr(self.state) + ' EMM.substate='+xstr(self.substate) + ' MCC=' + xstr(self.guti.mcc) + ' MNC=' + xstr(self.guti.mnc) + ' MMEGI=' + xstr(self.guti.mme_group_id) + ' MMEC=' + xstr(self.guti.mme_code) + ' TMSI=' + xstr(self.guti.m_tmsi))
def dump(self): # TODO: deal with potential KeyError here dump_text = ' EPS_ID=' + xstr(self.eps_id) + ' type=' + xstr(bearer_type[self.type]) \ + ":\n\t"+self.qos.dump_rate()+'\n\t'+self.qos.dump_delivery() return dump_text
def bug_xmltoolkitX1(): """ dump() doesn't flush the output buffer >>> tree = ET.XML("<doc><table><tbody/></table></doc>") >>> ET.dump(tree); sys.stdout.write("tail") <doc><table><tbody /></table></doc> tail """
def compare_resources(module, res1, res2): # we now have 2 nodes that we can compare, so lets dump them into files for comparring n1_file_fd, n1_tmp_path = tempfile.mkstemp() n2_file_fd, n2_tmp_path = tempfile.mkstemp() n1_file = open(n1_tmp_path, 'w') n2_file = open(n2_tmp_path, 'w') ## dump the XML resource definitions into temporary files sys.stdout = n1_file ET.dump(res1) sys.stdout = n2_file ET.dump(res2) sys.stdout = sys.__stdout__ ## n1_file.close() n2_file.close() ## normalize the files and store results in new files - this also removes some unimportant spaces and stuff n3_file_fd, n3_tmp_path = tempfile.mkstemp() n4_file_fd, n4_tmp_path = tempfile.mkstemp() rc, out, err = module.run_command('xmllint --format --output ' + n3_tmp_path + ' ' + n1_tmp_path) rc, out, err = module.run_command('xmllint --format --output ' + n4_tmp_path + ' ' + n2_tmp_path) ## addd files that should be cleaned up module.add_cleanup_file(n1_tmp_path) module.add_cleanup_file(n2_tmp_path) module.add_cleanup_file(n3_tmp_path) module.add_cleanup_file(n4_tmp_path) ## now compare files diff = '' rc, out, err = module.run_command('diff ' + n3_tmp_path + ' ' + n4_tmp_path) if rc != 0: # if there was difference then show the diff n3_file = open(n3_tmp_path, 'r+') n4_file = open(n4_tmp_path, 'r+') diff = { 'before_header': '', 'before': to_native(b('').join(n3_file.readlines())), 'after_header': '', 'after': to_native(b('').join(n4_file.readlines())), } return rc, diff
def create_xml_file(self, processed_object): root = ET.Element('data') for keys in processed_object: child = ET.SubElement(root, keys) child.text = str(processed_object[keys]) ET.dump(root) tree = ET.ElementTree(root) tree.write('results.xml')
def _write_xml(root, xml_file): from xml.dom import minidom #print(ElementTree.dump(root)) dom = minidom.parseString(ElementTree.tostring(root)) output = open(xml_file, 'w') output.write(dom.toprettyxml(indent=" ")) output.close() return xml_file
def printToConsole(self): ''' FOR DEBUGGING ONLY! Print the full XML document to the console. ''' ET.dump(self.root)
def check(self, fileName, data): suffix = QFileInfo(fileName).suffix().lower() if suffix=='': if type(data)!=list: suffix = 'xml' else: suffix = 'csv' print("Get: [{}]".format(fileName)) try: if suffix=='xml': if bool(data)==False: tree = ET.ElementTree(file=fileName) data = tree.getroot() if self.args.file_data: ET.dump(data) n = data.tag=='pyslvs' and data.find('info')!=None elif suffix=='csv': if bool(data)==False: with open(fileName, newline=str()) as stream: reader = csv.reader(stream, delimiter=' ', quotechar='|') for row in reader: data += ' '.join(row).split('\t,') if self.args.file_data: print(data) n = (len([e for e, x in enumerate(data) if x=='_info_'])==4 and len([e for e, x in enumerate(data) if x=='_table_'])==8 and len([e for e, x in enumerate(data) if x=='_design_'])==2) return n, data except: return False, list()
def __callback_esm_state(self,msg): """ Given the ESM message, update ESM state :param msg: the NAS signaling message that carries EMM state """ self.__cur_eps_id = msg.data["EPS bearer ID"] if self.__cur_eps_id not in self.__esm_status: self.__esm_status[self.__cur_eps_id] = EsmStatus() # print self.__cur_eps_id, str(self.__esm_status), str(bearer_type), msg.data["EPS bearer type"], str(msg.data['timestamp']) self.__esm_status[self.__cur_eps_id].eps_id = int(msg.data["EPS bearer ID"]) self.__esm_status[self.__cur_eps_id].type = int(msg.data["EPS bearer type"]) if self.__esm_status[self.__cur_eps_id].type == 255: self.__esm_status[self.__cur_eps_id].type = 1 self.__esm_status[self.__cur_eps_id].qos.qci = msg.data["QCI"] self.__esm_status[self.__cur_eps_id].qos.max_bitrate_ulink = msg.data["UL MBR"] self.__esm_status[self.__cur_eps_id].qos.max_bitrate_dlink = msg.data["DL MBR"] self.__esm_status[self.__cur_eps_id].qos.guaranteed_bitrate_ulink=msg.data["UL GBR"] self.__esm_status[self.__cur_eps_id].qos.guaranteed_bitrate_dlink=msg.data["DL MBR"] self.__esm_status[self.__cur_eps_id].qos.max_bitrate_ulink_ext=msg.data["UL MBR ext"] self.__esm_status[self.__cur_eps_id].qos.max_bitrate_dlink_ext=msg.data["DL MBR ext"] self.__esm_status[self.__cur_eps_id].qos.guaranteed_bitrate_ulink_ext=msg.data["UL GBR ext"] self.__esm_status[self.__cur_eps_id].qos.guaranteed_bitrate_dlink_ext=msg.data["DL MBR ext"] self.__esm_status[self.__cur_eps_id].timestamp = msg.data["timestamp"] self.log_info(self.__esm_status[self.__cur_eps_id].dump()) self.profile.update("LteNasProfile:"+self.__emm_status.profile_id()+".eps.qos:"+bearer_type[self.__esm_status[self.__cur_eps_id].type], {'qci':self.__esm_status[self.__cur_eps_id].qos.qci, 'max_bitrate_ulink':self.__esm_status[self.__cur_eps_id].qos.max_bitrate_ulink, 'max_bitrate_dlink':self.__esm_status[self.__cur_eps_id].qos.max_bitrate_dlink, 'guaranteed_bitrate_ulink':self.__esm_status[self.__cur_eps_id].qos.guaranteed_bitrate_ulink, 'guaranteed_bitrate_dlink':self.__esm_status[self.__cur_eps_id].qos.guaranteed_bitrate_dlink, 'max_bitrate_ulink_ext':self.__esm_status[self.__cur_eps_id].qos.max_bitrate_ulink_ext, 'max_bitrate_dlink_ext':self.__esm_status[self.__cur_eps_id].qos.max_bitrate_dlink_ext, 'guaranteed_bitrate_ulink_ext':self.__esm_status[self.__cur_eps_id].qos.guaranteed_bitrate_ulink_ext, 'guaranteed_bitrate_dlink_ext':self.__esm_status[self.__cur_eps_id].qos.guaranteed_bitrate_dlink_ext, }) # broadcast state = { 'conn state': esm_state[int(msg.data["EPS bearer state"]) - 1], 'timestamp' : str(msg.data['timestamp']) } # self.log_info(str(state)) self.broadcast_info('ESM_STATE', state)