我们从Python开源项目中,提取了以下49个代码示例,用于说明如何使用xml.etree.ElementTree.Element()。
def formatAlertsCount(numberofalerts, outformat): """ Create XML / Json Structure with number of Alerts in requested timespan """ if outformat == "xml": ewssimpleinfo = ET.Element('EWSSimpleIPInfo') alertCount = ET.SubElement(ewssimpleinfo, 'AlertCount') if numberofalerts: alertCount.text = str(numberofalerts) else: alertCount.text = str(0) prettify(ewssimpleinfo) alertcountxml = '<?xml version="1.0" encoding="UTF-8"?>' alertcountxml += (ET.tostring(ewssimpleinfo, encoding="utf-8", method="xml")).decode('utf-8') return alertcountxml else: return ({'AlertCount': numberofalerts})
def exportXML(self): elements = list() for node in anytree.PreOrderIter(self.tree): nodeName = parameters.encodeXMLName(node.name) if not elements: top = xmlElement(self.tree.name) top.attrib['name'] = self.tree.name top.attrib['is_checked'] = 'False' top.attrib['is_default'] = 'False' elements.append(top) continue for e in elements: if e.attrib['name'] == node.parent.name: se = xmlSubElement(e, nodeName) se.attrib['name'] = node.name se.attrib['is_checked'] = 'True' if node.is_checked else 'False' se.attrib['is_default'] = 'True' if node.is_default else 'False' se.attrib['parent'] = e.attrib['name'] elements.append(se) break # else: # print('could not find parent for {}'.format(node.name)) string = xmlElementTree.tostring(top, encoding='unicode', method='xml') return string
def to_xml(self): """Produces an XML out of the point data. Disregards the "action" field.""" el = etree.Element(self.osm_type, id=str(self.osm_id), version=str(self.version)) for tag, value in self.tags.items(): etree.SubElement(el, 'tag', k=tag, v=value) if self.osm_type == 'node': el.set('lat', str(self.lat)) el.set('lon', str(self.lon)) elif self.osm_type == 'way': for node_id in self.members: etree.SubElement(el, 'nd', ref=str(node_id)) elif self.osm_type == 'relation': for member in self.members: m = etree.SubElement(el, 'member') for i, n in enumerate(('type', 'ref', 'role')): m.set(n, str(member[i])) return el
def get_xml(self, encoding='utf-8'): """ Returns the XML-code of this schedule. The returned byte string contains the XML in the requested encoding. You save the byte sting to file in binary mode (the file will encoded the requested encoding). Or you can convert the byte string to a string with .decode(encoding). :param str encoding: The encoding of the XML. :rtype: bytes """ tree = Element(None) self.generate_xml(tree) xml_string = ElementTree.tostring(tree) document = minidom.parseString(xml_string) return document.toprettyxml(indent=' ', encoding=encoding) # ------------------------------------------------------------------------------------------------------------------
def to_xml(self): read = Element('read') if self.echo is not None: read.set('echo', self.echo) if self.length is not None: length = Element('length') length.text = str(self.length) read.append(length) for thing in ('delim', 'match', 'assign'): if getattr(self, thing) is not None: read.append(getattr(self, thing).to_xml()) if self.timeout is not None: timeout = Element('timeout') timeout.text = str(self.timeout) read.append(timeout) return read
def to_xml(self): root = Element('pov') cbid = Element('cbid') cbid.text = self.target root.append(cbid) seed_node = Element('seed') seed_node.text = self.seed root.append(seed_node) replay = Element('replay') root.append(replay) for action in self.actions: replay.append(action.to_xml()) # hack to make sure all crashes happen regardless of sockets closing # replay.append(Delay(500).to_xml()) return root
def to_xml(self): root = Element('pov') cbid = Element('cbid') cbid.text = self.target root.append(cbid) replay = Element('replay') root.append(replay) for action in self.actions: replay.append(action.to_xml()) # hack to make sure all crashes happen regardless of sockets closing # replay.append(Delay(500).to_xml()) return root
def test_to_xml(self): random_cbid = random_string(random.randint(8, 24)) random_actions = [Decl('a', Value([Data("abcdef")])), Delay(random.randint(0, 10000))] # create XML representation by hand element = Element('pov') cbid = Element('cbid') cbid.text = random_cbid element.append(cbid) replay = Element('replay') for action in random_actions: replay.append(action.to_xml()) element.append(replay) # create POV and XML representation automatically pov = CQE_POV(random_cbid, random_actions) self.assertTrue(repr(pov) == ElementTree.tostring(element))
def to_xml(self): if self.format is not None: tag = Element('data', attrib={'format': self.format}) else: tag = Element('data') if self.format == 'hex': tag.text = self.data.encode('hex') else: # asciic case encoded = '' for c in self.data: if c in _PRINTABLE: encoded += c elif c in ('\n', '\r', '\t'): encoded += { '\n': '\\n', '\r': '\\r', '\t': '\\t', }[c] else: encoded += '\\x{:02x}'.format(ord(c)) tag.text = encoded return tag
def as_xml(self): xml_output = Element(self.plugin_command, title=self.COMMAND_TITLE) xml_output.append(Element('vulnerable', isVulnerable=str(self.is_vulnerable))) if len(self.support_protocols) > 0: protocol_xml = Element('supportProtocols') for p in self.support_protocols: protocol_xml.append(Element('protocol',name=p)) xml_output.append(protocol_xml) if len(self.support_vulnerable_ciphers) > 0: cipher_xml = Element('vulnerableCipherSuites') for c in self.support_vulnerable_ciphers: cipher_xml.append(Element('cipherSuite',name=c)) xml_output.append(cipher_xml) return xml_output
def as_xml(self): xml_result = Element(self.plugin_command, title=self.COMMAND_TITLE) resumption_rate_xml = Element( 'sessionResumptionWithSessionIDs', attrib={'totalAttempts': str(self.attempted_resumptions_nb), 'errors': str(len(self.errored_resumptions_list)), 'isSupported': str(self.attempted_resumptions_nb == self.successful_resumptions_nb), 'successfulAttempts': str(self.successful_resumptions_nb), 'failedAttempts': str(self.failed_resumptions_nb)} ) # Add error messages if there was any for error_msg in self.errored_resumptions_list: resumption_error_xml = Element('error') resumption_error_xml.text = error_msg resumption_rate_xml.append(resumption_error_xml) xml_result.append(resumption_rate_xml) return xml_result
def as_xml(self): xml_result = Element(self.plugin_command, title=self.COMMAND_TITLE) # We keep the session resumption XML node resum_rate_xml = super(ResumptionResult, self).as_xml() session_resum_xml = resum_rate_xml[0] xml_result.append(session_resum_xml) # Add the ticket resumption node xml_resum_ticket_attr = {} if self.ticket_resumption_error: xml_resum_ticket_attr['error'] = self.ticket_resumption_error else: xml_resum_ticket_attr['isSupported'] = str(self.is_ticket_resumption_supported) if not self.is_ticket_resumption_supported: xml_resum_ticket_attr['reason'] = self.ticket_resumption_failed_reason xml_resum_ticket = Element('sessionResumptionWithTLSTickets', attrib=xml_resum_ticket_attr) xml_result.append(xml_resum_ticket) return xml_result
def as_xml(self): xml_result = Element(self.plugin_command, title=self.COMMAND_TITLE) is_hsts_supported = True if self.hsts_header else False xml_hsts_attr = {'isSupported': str(is_hsts_supported)} if is_hsts_supported: # Do some light parsing of the HSTS header hsts_header_split = self.hsts_header.split('max-age=')[1].split(';') hsts_max_age = hsts_header_split[0].strip() hsts_subdomains = False if len(hsts_header_split) > 1 and 'includeSubdomains' in hsts_header_split[1]: hsts_subdomains = True xml_hsts_attr['maxAge'] = hsts_max_age xml_hsts_attr['includeSubdomains'] = str(hsts_subdomains) xml_hsts = Element('httpStrictTransportSecurity', attrib=xml_hsts_attr) xml_result.append(xml_hsts) return xml_result
def UrhoWriteTriggers(triggersList, filename, fOptions): triggersElem = ET.Element('animation') for trigger in triggersList: triggerElem = ET.SubElement(triggersElem, "trigger") if trigger.time is not None: triggerElem.set("time", FloatToString(trigger.time)) if trigger.ratio is not None: triggerElem.set("normalizedtime", FloatToString(trigger.ratio)) # We use a string variant, for other types See typeNames[] in Variant.cpp # and XMLElement::GetVariant() triggerElem.set("type", "String") triggerElem.set("value", str(trigger.data)) WriteXmlFile(triggersElem, filename, fOptions) #-------------------- # Utils #-------------------- # Search for the most complete element mask
def save(self): root = ET.Element('TS') if self.version: root.attrib['version'] = self.version if self.language: root.attrib['language'] = self.language for ctx in sorted(self.__contexts.itervalues()): ctx.save(root) rough_string = ET.tostring(root, 'utf-8') reparsed = minidom.parseString(rough_string) text = reparsed.toprettyxml(indent=" ") text = text.encode('utf-8') with open(self.__file, 'wb') as f: f.write(text)
def xml(self): el = ET.Element(self.name) keys = self.attrs.keys() keys = sorted(keys) for a in keys: value = self.attrs[a] if isinstance(value, bool): el.set(a, str(value).lower()) else: el.set(a, str(value)) if self.body: el.text = self.body for verb in self.verbs: el.append(verb.xml()) return el
def export(self, filename): self.nodes, self.renderer = self.compute() initWidth, initHeight = (self.options['initialWidth'], self.options['initialHeight']) doc = ElementTree.Element('svg', width=str(initWidth), height=str(initHeight)) transform = self.getTranslation() trans = ElementTree.SubElement(doc, 'g', transform=transform) ElementTree.SubElement(trans, 'g', attrib={'class': 'dummy-layer'}) mainLayer = self.add_main(trans) self.add_timeline(mainLayer) if self.options['showTicks']: self.add_axis(mainLayer) self.add_links(mainLayer) self.add_labels(mainLayer) self.add_dots(mainLayer) svglines = ElementTree.tostring(doc) with open(filename, 'wb') as fid: fid.write(svglines) return svglines
def writeXML(dataObj, fname, dtdPath): if dataObj.predSev == 0: sev = "ABSENT" elif dataObj.predSev == 1: sev = "MILD" elif dataObj.predSev == 2: sev = "MODERATE" elif dataObj.predSev == 3: sev = "SEVERE" root = ET.Element("RDoC") ET.SubElement(root, "TEXT").text = dataObj.text.content tags = ET.SubElement(root, "TAGS") pos_val = ET.SubElement(tags, "POSITIVE_VALENCE") pos_val.set('score', sev) pos_val.set('annotated_by', dataObj.Nannotators) tree = ET.ElementTree(root) tree.write(fname)
def get_tdblock_as_xmlobj(self, db_details): """ To get the testdata blocks from the database as xml object """ rootobj = False td_collection = db_details.get('td_collection') td_document = db_details.get('td_document') if td_collection is not None and td_document is not None: tddoc = self.get_doc_from_db(td_collection, td_document) else: tddoc = False if tddoc is not False: root = ET.Element('data') self.convert_tddict_to_xmlobj(root, tddoc['data']) rootobj = root return rootobj
def get_globalblock_as_xmlobj(self, db_details): """ To get the global block from the database as xml object """ globalobj = False global_collection = db_details.get('global_collection') global_document = db_details.get('global_document') if global_collection is not None and global_document is not None: globaldoc = self.get_doc_from_db(global_collection, global_document) else: globaldoc = False if globaldoc is not False: global_root = ET.Element('global') self.convert_tddict_to_xmlobj(global_root, globaldoc['global']) globalobj = global_root return globalobj
def sub_from_varconfigfile(string, varconfigfile, start_pat="${", end_pat="}"): """ """ try: # when varconfigfile is an XMl object(root element) - this happens # only when varconfigfile is taken from database server if isinstance(varconfigfile, ElementTree.Element) is True: cfg_elem_obj = ConfigurationElement("Varconfig_from_database", start_pat, end_pat) cfg_elem_obj.parse_data(varconfigfile, elem_type="xml_object") else: cfg_elem_obj = ConfigurationElement(varconfigfile, start_pat, end_pat) cfg_elem_obj.parse_data(varconfigfile) newstring = cfg_elem_obj.expand_variables(string) except TypeError as exception: print_info("At least one of the variables in command string is not found in " + varconfigfile) #print_exception(exception) return False return newstring
def get_list_from_varconfigfile(string, varconfigfile, start_pat="${", end_pat="}"): """ """ try: # when varconfigfile is an XMl object(root element) - this happens # only when varconfigfile is taken from database server if isinstance(varconfigfile, ElementTree.Element) is True: cfg_elem_obj = ConfigurationElement("Varconfig_from_database", start_pat, end_pat) cfg_elem_obj.parse_data(varconfigfile, elem_type="xml_object") else: cfg_elem_obj = ConfigurationElement(varconfigfile, start_pat, end_pat) cfg_elem_obj.parse_data(varconfigfile) newstring = cfg_elem_obj.get_list(string) except TypeError as exception: print_info("At least one of the variables in command string is not found in " + varconfigfile) #print_exception(exception) return False return newstring
def getElementWithTagAttribValueMatch(start, tag, attrib, value): """ When start is an xml datafile, it finds the root and first element with: tag, attrib, value. Or when it's an xml element, it finds the first child element with: tag, attrib, value. If there is not a match, it returns False. """ node = False if isinstance(start, (file, str)): # check if file exist here if file_Utils.fileExists(start): node = ElementTree.parse(start).getroot() else: print_warning('The file={0} is not found.'.format(start)) elif isinstance(start, ElementTree.Element): node = start if node is not False and node is not None: elementName = ".//%s[@%s='%s']" % (tag, attrib, value) element = node.find(elementName) else: element = node return element
def getChildElementWithSpecificXpath(start, xpath): """ This method takes a xml file or parent element as input and finds the first child containing specified xpath Returns the child element. Arguments: start = xml file or parent element xpath = a valid xml path value as supported by python, refer https://docs.python.org/2/library/xml.etree.elementtree.html """ node = False if isinstance(start, (file, str)): # check if file exist here if file_Utils.fileExists(start): node = ElementTree.parse(start).getroot() else: print_warning('The file={0} is not found.'.format(start)) elif isinstance(start, ElementTree.Element): node = start if node is not False or node is not None: element = node.find(xpath) else: element = False return element
def confirm_url(question, attrib_value): """ This function recursively checks whether a given url is a valid repository or not. If it isn't, it promps the user to enter a new url and checks that. :Arguments: 1. question (xml.etree.ElementTree.Element) = The question tag from data.xml 2. attrib_value (str) = the url to be checked :Returns: 1. attrib_value (str) = valid url """ if not check_url_is_a_valid_repo(attrib_value): attrib_value = raw_input("Please enter a valid URL: ") attrib_value = confirm_url(question, attrib_value) return attrib_value
def add_drivers_to_tags(tag, drivers, driver_numbers): """ This function appends the driver tags sets the attributes and attribute names to the corresponding driver tag :Arguments: 1. tag (xml.etree.ElementTree.Element) = Current tag to which the newly formed driver tags may be appended 2. drivers (list[str]) = list of driver names available to the user 3. driver_numbers (list[int]) = list of the numbers which correspond to the driver names that the user wants. """ print "Selected drivers:" for driver_number in driver_numbers: driver_number = driver_number * 2 - 1 if driver_number > len(drivers): print "Corresponding driver for " + str((driver_number+1)/2) +\ " not found." continue print str((driver_number+1)/2) + ". " + drivers[driver_number] driver_tag = Element("driver") driver_tag.set("name", drivers[driver_number]) tag.append(driver_tag)
def diff_attributes_values(root, node, tag, attribute, values): """ This function creates tags in the new xml file which contain information about the value tags from data.xml :Arguments: 1. root (xml.etree.ElementTree.Element) = parent of the current node from data.xml 2. node (xml.etree.ElementTree.Element) = current node from data.xml 3. tag (xml.etree.ElementTree.Element) = current node that would be added to the new xml file 4. attribute (xml.etree.ElementTree.Element) = The current attribure tag 5. values (list[xml.etree.ElementTree.Element]) = complete list of value tags in that particular nesting from data.xml """ for i in range(0, len(values)): info = values[i].find("info") if info is not None: print info.text print "Warrior recommends that all these dependencies be installed on" \ " your machine." get_answer_for_depen(root, node, tag, attribute, values)
def main(): """ This function basically creates the xml file by calling various other functions, runs the file and then saves it. """ root = Element('data') dir_path = os.path.dirname(os.path.realpath(sys.argv[0])) rel_path = get_relative_path(dir_path, "data.xml") tree = xml.etree.ElementTree.parse(rel_path) input_root = tree.getroot() nodes = get_firstlevel_children(input_root, "tag") populate_xml(root, nodes) temp_xml = 'temp.xml' pretty_xml = minidom.parseString(xml.etree.ElementTree.tostring(root))\ .toprettyxml(indent=" ") with open(temp_xml, "w") as config_file: config_file.write(pretty_xml) config_file.flush() config_file.close() save_file(temp_xml)
def _getqrtag(self, text, rect): qr = qrcode.QRCode(error_correction=qrcode.constants.ERROR_CORRECT_L) qr.add_data(text) qr.make(fit=True) img = qr.make_image() bio = io.BytesIO() img.save(bio) pngqr = bio.getvalue() base64qr = base64.b64encode(pngqr) #<image x="110" y="20" width="280px" height="160px" xlink:href="data:image/png;base64,……"/> imagetag = ElementTree.Element("image") imagetag.set("xlink:href", "data:image/png;base64,"+base64qr.decode("ascii")) imagetag.set("x", str(rect.x)) imagetag.set("y", str(rect.y)) imagetag.set("width", str(rect.w)) imagetag.set("height", str(rect.h)) return imagetag
def _prepare_document(self): """ Build the main document node and set xml namespaces. """ self._xml = ET.Element("Document") self._xml.set("xmlns", "urn:iso:std:iso:20022:tech:xsd:" + self.schema) self._xml.set("xmlns:xsi", "http://www.w3.org/2001/XMLSchema-instance") ET.register_namespace("", "urn:iso:std:iso:20022:tech:xsd:" + self.schema) ET.register_namespace("xsi", "http://www.w3.org/2001/XMLSchema-instance") CstmrDrctDbtInitn_node = ET.Element("CstmrDrctDbtInitn") self._xml.append(CstmrDrctDbtInitn_node)
def mountIso(self,instance,dev, image): tree = ElementTree.fromstring(self.getInsXMLDesc(instance,0)) for disk in tree.findall('devices/disk'): if disk.get('device') == 'cdrom': for elm in disk: if elm.tag == 'target': if elm.get('dev') == dev: src_media = ElementTree.Element('source') src_media.set('file', image) disk.append(src_media) if instance.state()[0] == 1: xml_disk = ElementTree.tostring(disk) try: instance.attachDevice(xml_disk) except libvirt.libvirtError,e: return '??????????{result}'.format(result=e.get_error_message()) xmldom = self.getInsXMLDesc(instance,1) if instance.state()[0] == 5: xmldom = ElementTree.tostring(tree) try: return self.defineXML(xmldom) except libvirt.libvirtError,e: return '??????????{result}'.format(result=e.get_error_message())
def setInterfaceBandwidth(self,instance,port,bandwidth): '''????''' domXml = instance.XMLDesc(0) root = ElementTree.fromstring(domXml) try: for dev in root.findall('.//devices/'): if dev.tag == 'interface': for iter in dev: if iter.tag == 'target' and iter.get('dev') == port: bwXml = ElementTree.SubElement(dev,'bandwidth') inbdXml = ElementTree.Element('inbound') inbdXml.set('average',str(int(bandwidth)*1024)) inbdXml.set('peak',str(int(bandwidth)*1024)) inbdXml.set('burst','1024') outbdXml = ElementTree.Element('outbound') outbdXml.set('average',str(int(bandwidth)*1024)) outbdXml.set('peak',str(int(bandwidth)*1024)) outbdXml.set('burst','1024') bwXml.append(inbdXml) bwXml.append(outbdXml) domXml = ElementTree.tostring(root) except Exception,e: return {"status":"faild",'data':e} if self.defineXML(domXml):return {"status":"success",'data':None}
def create_xml_page(dictionary: dict, creator_name='DocSeg') -> 'Page': page = Page.from_dict(dictionary) # Create xml root = ET.Element('PcGts') root.set('xmlns', _ns['p']) # Metadata generated_on = str(datetime.datetime.now()) metadata = ET.SubElement(root, 'Metadata') creator = ET.SubElement(metadata, 'Creator') creator.text = creator_name created = ET.SubElement(metadata, 'Created') created.text = generated_on last_change = ET.SubElement(metadata, 'LastChange') last_change.text = generated_on root.append(page.to_xml()) for k, v in _attribs.items(): root.attrib[k] = v return root
def _build_xml(self, tag, val=None, children=Void()): """Construct an xml element with a given tag, value, and children.""" attribs = {'val': val} if val is not None else {} # If children is a list then convert each element separately, if it is # a tuple, treat it as a single element if isinstance(children, self.Void): children = () elif isinstance(children, list): children = [self._from_value(child) for child in children] else: children = (self._from_value(children),) xml = ET.Element(tag, attribs) xml.extend(children) return xml
def query(self, cmd, state, *args, **kwargs): """Create an XML string for the 'interp' command.""" # Attrs: # raw - ? # bool - Verbose output # int - The current state id # Args: # string - The query to evaluate elt = ET.Element('call', {'val': 'interp', 'raw': 'true', 'verbose': 'true', 'id': str(state)}) elt.text = cmd return ('Query', ET.tostring(elt, kwargs.get('encoding', 'utf-8')))
def send_feedback(self): """Print stored items to console/Alfred as XML.""" root = ET.Element('items') for item in self._items: root.append(item.elem) sys.stdout.write('<?xml version="1.0" encoding="utf-8"?>\n') sys.stdout.write(ET.tostring(root).encode('utf-8')) sys.stdout.flush() #################################################################### # Updating methods ####################################################################
def build_soap_xml(items, namespace=None): """Build a SOAP XML. :param items: a list of dictionaries where key is the element name and the value is the element text. :param namespace: the namespace for the elements, None for no namespace. Defaults to None :returns: a XML string. """ def _create_element(name, value=None): xml_string = name if namespace: xml_string = "{%(namespace)s}%(item)s" % {'namespace': namespace, 'item': xml_string} element = ElementTree.Element(xml_string) element.text = value return element soap_namespace = "http://www.w3.org/2003/05/soap-envelope" envelope_element = ElementTree.Element("{%s}Envelope" % soap_namespace) body_element = ElementTree.Element("{%s}Body" % soap_namespace) for item in items: for i in item: insertion_point = _create_element(i) if isinstance(item[i], dict): for j, value in item[i].items(): insertion_point.append(_create_element(j, value)) else: insertion_point.text = item[i] body_element.append(insertion_point) envelope_element.append(body_element) return ElementTree.tostring(envelope_element)
def onFinishBuilding(app, exception): currentVersion = app.env.config["version"] if "latest_docs_version" in app.env.config["html_context"].keys(): latestVersion = app.env.config["html_context"]["latest_docs_version"] else: latestVersion = "dev" base_domain = app.env.config["html_context"]["SITEMAP_DOMAIN"] file_path = "./_build/algolia_index/index.json" sitemap_path = "./_build/sitemap/sitemap_" + currentVersion + ".xml" checkDirectory(file_path) checkDirectory(sitemap_path) f = open(file_path, 'w+') root = ET.Element("urlset") root.set("xmlns", "http://www.sitemaps.org/schemas/sitemap/0.9") for link in indexObjs: url = ET.SubElement(root, "url") ET.SubElement(url, "loc").text = base_domain + str(currentVersion) + "/" + link["url"] ET.SubElement(url, "changefreq").text = "daily" ET.SubElement(url, "priority").text = "1" if ( currentVersion == latestVersion ) else "0.5" ET.ElementTree(root).write(sitemap_path) f.write(json.dumps(indexObjs))
def eccu_doc_for_purge_dir(path_to_purge): root = ET.Element('eccu') parent = root names = (name for name in path_to_purge.split('/') if name) for name in names: parent = ET.SubElement(parent, 'match:recursive-dirs', {'value': name}) revalidate = ET.SubElement(parent, 'revalidate') revalidate.text = 'now' return ET.tostring(root, encoding='ascii') #'<?xml version="1.0"?>\n' + ET.tostring(root) #xml = StringIO() #xml.write('<?xml version="1.0"?>\n') #xml.write(ET.tostring(root)) #return xml
def add_to_document(self, parent): """Adds an ``Argument`` object to this ElementTree document. Adds an <arg> subelement to the parent element, typically <args> and sets up its subelements with their respective text. :param parent: An ``ET.Element`` to be the parent of a new <arg> subelement :returns: An ``ET.Element`` object representing this argument. """ arg = ET.SubElement(parent, "arg") arg.set("name", self.name) if self.title is not None: ET.SubElement(arg, "title").text = self.title if self.description is not None: ET.SubElement(arg, "description").text = self.description if self.validation is not None: ET.SubElement(arg, "validation").text = self.validation # add all other subelements to this Argument, represented by (tag, text) subelements = [ ("data_type", self.data_type), ("required_on_edit", self.required_on_edit), ("required_on_create", self.required_on_create) ] for name, value in subelements: ET.SubElement(arg, name).text = str(value).lower() return arg
def to_xml(self): """Creates an ``ET.Element`` representing self, then returns it. :returns root, an ``ET.Element`` representing this scheme. """ root = ET.Element("scheme") ET.SubElement(root, "title").text = self.title # add a description subelement if it's defined if self.description is not None: ET.SubElement(root, "description").text = self.description # add all other subelements to this Scheme, represented by (tag, text) subelements = [ ("use_external_validation", self.use_external_validation), ("use_single_instance", self.use_single_instance), ("streaming_mode", self.streaming_mode) ] for name, value in subelements: ET.SubElement(root, name).text = str(value).lower() endpoint = ET.SubElement(root, "endpoint") args = ET.SubElement(endpoint, "args") # add arguments as subelements to the <args> element for arg in self.arguments: arg.add_to_document(args) return root
def write_to(self, stream): """Write an XML representation of self, an ``Event`` object, to the given stream. The ``Event`` object will only be written if its data field is defined, otherwise a ``ValueError`` is raised. :param stream: stream to write XML to. """ if self.data is None: raise ValueError("Events must have at least the data field set to be written to XML.") event = ET.Element("event") if self.stanza is not None: event.set("stanza", self.stanza) event.set("unbroken", str(int(self.unbroken))) # if a time isn't set, let Splunk guess by not creating a <time> element if self.time is not None: ET.SubElement(event, "time").text = str(self.time) # add all other subelements to this Event, represented by (tag, text) subelements = [ ("source", self.source), ("sourcetype", self.sourceType), ("index", self.index), ("host", self.host), ("data", self.data) ] for node, value in subelements: if value is not None: ET.SubElement(event, node).text = value if self.done: ET.SubElement(event, "done") stream.write(ET.tostring(event)) stream.flush()
def CDATA(text=None): """ A CDATA element factory function that uses the function itself as the tag (based on the Comment factory function in the ElementTree implementation). """ element = etree.Element(CDATA) element.text = text return element # We're replacing the _write method of the ElementTree class so that it would # recognize and correctly print out CDATA sections.
def __walk(self, node, parent): name = self.__genName(node.tag) tag = self.__getNamespace(node.tag) if parent is None: self.root = name self.lines.append("%s = ET.Element(%s)" % (name, tag)) else: self.lines.append("%s = ET.SubElement(%s, %s)" % (name, parent, tag)) # handles text try: t = node.text.strip() if t == '': t = None except: t = None if t is not None: self.lines.append("%s.text = kwargs.get('', '%s') # PARAMETERIZE" % (name, t)) # handles attributes for key,val in node.items(): key = self.__getNamespace(key) self.lines.append("%s.set(%s, kwargs.get('', '%s')) # PARAMETERIZE" % (name, key, val)) for i in node.getchildren(): self.__walk(i, name)