我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用lxml.etree.XMLSyntaxError()。
def get_searx_version(response_container): response_html = response_container.content.decode() try: dom = html.fromstring(response_html) except etree.XMLSyntaxError: # not a valid HTML document # TODO workaround with regex ? return '' searx_full_version = extract_text_from_dom(dom, "/html/head/meta[@name='generator']/@content") if searx_full_version is None: searx_version = '' else: s = searx_full_version.split('/') if len(s) == 2: searx_version = s[1] else: searx_version = searx_full_version return searx_version
def xml_to_xsd_validation(file_xml, file_xsd): """ Verify that the XML compliance with XSD Arguments: 1. file_xml: Input xml file 2. file_xsd: xsd file which needs to be validated against xml Return: No return value """ try: print_info("Validating:{0}".format(file_xml)) print_info("xsd_file:{0}".format(file_xsd)) xml_doc = parse(file_xml) xsd_doc = parse(file_xsd) xmlschema = XMLSchema(xsd_doc) xmlschema.assert_(xml_doc) return True except XMLSyntaxError as err: print_error("PARSING ERROR:{0}".format(err)) return False except AssertionError, err: print_error("Incorrect XML schema: {0}".format(err)) return False
def check_output(self, want, got, optionflags): alt_self = getattr(self, '_temp_override_self', None) if alt_self is not None: super_method = self._temp_call_super_check_output self = alt_self else: super_method = OutputChecker.check_output parser = self.get_parser(want, got, optionflags) if not parser: return super_method( self, want, got, optionflags) try: want_doc = parser(want) except etree.XMLSyntaxError: return False try: got_doc = parser(got) except etree.XMLSyntaxError: return False return self.compare_docs(want_doc, got_doc)
def get_xml_data(req_string, headers, data=None): req = urllib2.Request(req_string, headers=headers) html_data = _get_html_data(req, data) # Clean chunked data html_data = clean_chunked_data(html_data) #log_user_action(req.get_host() ,'chunked data', html_data, {}) try: data = etree.fromstring(html_data) except XMLSyntaxError: # lxml cannot handle encoding declarations :( data = etree.HTML(html_data, etree.HTMLParser()) # data is None when it was not XML, like 404 page without 404 code if data is not None: data = data.getroottree() else: raise urllib2.HTTPError(req_string, 404, "Not an XML", None, None) # TODO: check valid #if not data.find('.//prestashop'): # raise urllib2.HTTPError(req_string, 404, "Not an XML", None, None) return data
def test_restricted_element1(self): try: tree = self.module.parse(self.xml_bomb, forbid_dtd=False, forbid_entities=False) except XMLSyntaxError: self.skipTest("lxml detects entityt reference loop") root = tree.getroot() self.assertEqual(root.text, None) self.assertEqual(list(root), []) self.assertEqual(root.getchildren(), []) self.assertEqual(list(root.iter()), [root]) self.assertEqual(list(root.iterchildren()), []) self.assertEqual(list(root.iterdescendants()), []) self.assertEqual(list(root.itersiblings()), []) self.assertEqual(list(root.getiterator()), [root]) self.assertEqual(root.getnext(), None)
def test_restricted_element2(self): try: tree = self.module.parse(self.xml_bomb2, forbid_dtd=False, forbid_entities=False) except XMLSyntaxError: self.skipTest("lxml detects entityt reference loop") root = tree.getroot() bomb, tag = root self.assertEqual(root.text, "text") self.assertEqual(list(root), [bomb, tag]) self.assertEqual(root.getchildren(), [bomb, tag]) self.assertEqual(list(root.iter()), [root, bomb, tag]) self.assertEqual(list(root.iterchildren()), [bomb, tag]) self.assertEqual(list(root.iterdescendants()), [bomb, tag]) self.assertEqual(list(root.itersiblings()), []) self.assertEqual(list(root.getiterator()), [root, bomb, tag]) self.assertEqual(root.getnext(), None) self.assertEqual(root.getprevious(), None) self.assertEqual(list(bomb.itersiblings()), [tag]) self.assertEqual(bomb.getnext(), tag) self.assertEqual(bomb.getprevious(), None) self.assertEqual(tag.getnext(), None) self.assertEqual(tag.getprevious(), bomb)
def validateonefile(fIn): doc, xsd = getXML(fIn) if (doc == None) and (xsd == None): print "Invalid CityGML document: not a CityGML document." sys.exit() xmlschema = etree.XMLSchema(xsd) valid = doc.xmlschema(xsd) if valid == True: print "Document is valid." else: try: xmlschema.assert_(doc) except etree.XMLSyntaxError as e: print "Invalid document" print "Error", e log = xmlschema.error_log print log except AssertionError as e: print "INVALID DOCUMENT" print "================" print e
def _get_xml_field(xml, field, data_field=None): """return the string specified, or None if not present""" try: if isinstance(xml, dict): xml_dict = xml else: xml_dict = lxml_to_dict.parse(xml) try: response_dict = xml_dict["Response"] if data_field is None: msg = response_dict[field] else: data = response_dict["Success"][field] msg = data[data_field] return msg except KeyError: return {} # To retain compatibility with the ExpatError # that xmltodict occasionally raised from LM responses except etree.XMLSyntaxError: return {}
def test_download_default_transcript(self, backend, download_transcript_mock, params): """ Check default transcript is downloaded from a video platform API. """ player = self.player[backend] for index, event in enumerate(download_transcript_mock.get_outcomes()): mock = download_transcript_mock(event=event) self.mocked_objects = mock.apply_mock(self.mocked_objects) try: res = player(self.xblock).download_default_transcript(**params[index]) message = '' expected_default_transcript = mock.expected_value[0] self.assertIsInstance(res, unicode) self.assertEqual(res, expected_default_transcript) except VideoXBlockException as ex: message = ex.message except etree.XMLSyntaxError: message = 'XMLSyntaxError exception' expected_message = mock.expected_value[-1] self.assertIn(expected_message, message) self.restore_mocked()
def from_xmlfile(cls, session, filename, validator=None): try: xml_doc = et.parse(filename).getroot() if validator: validator.validate(xml_doc) ogit_id = xml_doc.attrib['ID'] ogit_name = xml_doc.attrib['NodeName'] ogit_automation_marsnodetype = xml_doc.attrib['NodeType'] #ogitid = hashlib.md5(ogit_id).hexdigest() data = { 'ogit/Automation/marsNodeFormalRepresentation':et.tostring(xml_doc), 'ogit/_owner': xml_doc.attrib['CustomerID'], 'ogit/_id': ogit_id, 'ogit/_type':'ogit/Automation/MARSNode', 'ogit/name':ogit_name, 'ogit/Automation/marsNodeType': ogit_automation_marsnodetype, 'ogit/id':ogit_name } except XMLValidateError: raise MARSNodeError("ERROR: {f} does not contain a valid MARS node".format(f=filename)) except et.XMLSyntaxError: raise MARSNodeError("ERROR: {f} does not contain valid XML".format(f=filename)) return cls(session, data)
def yang_register(self, name, raw_yang_module): yin_output = _MetaString() yang_module = self.ctx.add_module(name, raw_yang_module) if yang_module.keyword != 'module': return emit_yin(self.ctx, yang_module, yin_output) # stripping namespaces yin_output = str(yin_output).replace('<xr:', '<') try: yin_tree = etree.fromstring(yin_output) except etree.XMLSyntaxError as err: return namespace = yin_tree.xpath('*[name()="namespace"]')[0].attrib.get('uri') # with these containers _containers = yin_tree.xpath('*[name()="container"] | *[name()="grouping"]/container') if not _containers: # no containers, no phun return containers = map(self._get_container_name, _containers) self.register({namespace: containers})
def _xml_obj_from_str(xml_str, dev): xml_req_tree = None try: xml_req_tree = etree.fromstring(xml_str) except etree.XMLSyntaxError: xml_req_tree = _build_xml(xml_str, dev) if not etree.iselement(xml_req_tree): # still not XML obj, but should raise InvalidRequestError( dev, err='Invalid request "{req}"'.format( req=xml_str ) ) return xml_req_tree
def Open(self): """ Opens a session to the nexpose appliance by logging in. This function with raise an exception on error or if the session is already open. """ if self._session_id: raise SessionIsNotClosedException("Please close the session first!") try: response = self._Execute_APIv1d1(self._login_request) except NexposeConnectionException as ex: if isinstance(ex.inner_exception, etree.XMLSyntaxError): raise NexposeException("Unexpected error! Is the Nexpose appliance activated?") raise ex if response.tag == "LoginResponse": if response.attrib["success"] == "1": self._session_id = response.attrib["session-id"] if not self._session_id: raise NexposeFailureException("Login failure!")
def update_storage_details(node, xmlbytes): """Set node storage from lshw output. This has been copied into this migration so that it can be modified in its original location without breaking this migration. """ try: doc = etree.XML(xmlbytes) except etree.XMLSyntaxError as e: raise ValidationError( {'hardware_details': ['Invalid XML: %s' % (e,)]}) evaluator = etree.XPathEvaluator(doc) storage = evaluator(_xpath_storage_bytes) if not storage or math.isnan(storage): storage = 0 node.storage = storage node.save(update_fields=['storage'])
def _details_make_backwards_compatible(details, root): # For backward-compatibilty, if lshw details are available, these # should form the root of the composite document. xmldata = details.get("lshw") if xmldata is not None: try: lshw = etree.fromstring(xmldata) except etree.XMLSyntaxError as e: maaslog.warning("Invalid lshw details: %s", e) del details["lshw"] # Don't process again later. else: # We're throwing away the existing root, but we can adopt # its nsmap by becoming its child. root.append(lshw) root = lshw # We may have mutated details and root. return details, root
def parse_response(self, response): try: root = etree.fromstring(response) except etree.XMLSyntaxError: print('Bad XML:', response) return None if self.is_message(root): return self.process_message(root) elif self.is_ignored_event(root): pass elif self.is_join(root): return self.process_join(root) elif self.is_leave(root): return self.process_leave(root) else: print('Unknown message: ', response)
def is_valid(self): """ Validate if the NZB File is okay; this will generate some overhead but at the same time it caches a lot of the results it returns so future calls will be speedy The function returns True if the nzb file is valid, otherwise it returns False """ if self._lazy_is_valid is None: if self.open(): # Open DTD file and create dtd object dtdfd = open(NZB_XML_DTD_FILE) dtd = etree.DTD(dtdfd) # Verify our dtd file against our current stream try: nzb = etree.parse(self.filepath) except XMLSyntaxError as e: if e[0] is not None: # We have corruption logger.error( "NZB-File '%s' is corrupt" % self.filepath) logger.debug( 'NZB-File XMLSyntaxError Exception %s' % str(e)) # Mark situation self._lazy_is_valid = False # We failed return False self._lazy_is_valid = dtd.validate(nzb) return self._lazy_is_valid is True
def to_prov_document(content: str or bytes or model.ProvDocument) -> model.ProvDocument: """ Takes a string, bytes or ProvDocument as argument and return a ProvDocument The strings or bytes can contain JSON or XML representations of PROV :param content: String or BufferedReader or ProvDocument :return: ProvDocument :rtype: ProvDocument """ if isinstance(content, model.ProvDocument): return content if isinstance(content, str): content_bytes = str.encode(content) else: content_bytes = content try: if content_bytes.find(b"{") > -1: return model.ProvDocument.deserialize(content=content, format='json').flattened() if content_bytes.find(b'<?xml') > -1: return model.ProvDocument.deserialize(content=content, format='xml').flattened() elif content_bytes.find(b'document') > -1: return model.ProvDocument.deserialize(content=content, format='provn').flattened() else: raise exceptions.ParseException("Invalid PROV Document of type {}".format(type(content))) except json.decoder.JSONDecodeError: raise exceptions.ParseException("Invalid PROV-JSON of type {}".format(type(content))) except etree.XMLSyntaxError: raise exceptions.ParseException("Invalid PROV-XML of type {}".format(type(content)))
def parse_schedule(xml, filename): """ Parses a schedule definition in XML. :param str xml: The XML with a schedule definition :param str filename: :rtype: enarksh.xml_reader.node.ScheduleNode """ with open(os.path.join(C.HOME, 'etc/enarksh.xsd'), 'rb') as f: xsd = f.read() etree.clear_error_log() schema_root = etree.XML(xsd) schema = etree.XMLSchema(schema_root) parser = etree.XMLParser(schema=schema, encoding='utf8') try: root = etree.fromstring(bytes(xml, 'utf8'), parser) # Root element must be a schedule. if root.tag != 'Schedule': raise Exception("Root element must be 'Schedule' but '{0!s}' was found.".format(root.tag)) schedule = create_node('Schedule') schedule.read_xml(root) error = schedule.validate() if error: raise Exception( "File '{0!s}' is not a valid schedule configuration file.\n{1!s}".format(filename, error)) # Set recursion and dependency levels. schedule.set_levels() except etree.XMLSyntaxError as exception: log = logging.getLogger('enarksh') log.error(exception.error_log.filter_from_level(etree.ErrorLevels.WARNING)) raise exception return schedule # ------------------------------------------------------------------------------------------------------------------
def scrape_page_for_open_location(self, my_webpage): # logger.info(u"scraping", url) try: my_webpage.scrape_for_fulltext_link() if my_webpage.error: self.error += my_webpage.error if my_webpage.is_open: my_open_location = my_webpage.mint_open_location() self.open_locations.append(my_open_location) # logger.info(u"found open version at", webpage.url) else: # logger.info(u"didn't find open version at", webpage.url) pass except requests.Timeout, e: self.error += "Timeout in scrape_page_for_open_location on {}: {}".format(my_webpage, unicode(e.message).encode("utf-8")) logger.info(self.error) except requests.exceptions.ConnectionError, e: self.error += "ConnectionError in scrape_page_for_open_location on {}: {}".format(my_webpage, unicode(e.message).encode("utf-8")) logger.info(self.error) except requests.exceptions.ChunkedEncodingError, e: self.error += "ChunkedEncodingError in scrape_page_for_open_location on {}: {}".format(my_webpage, unicode(e.message).encode("utf-8")) logger.info(self.error) except requests.exceptions.RequestException, e: self.error += "RequestException in scrape_page_for_open_location on {}: {}".format(my_webpage, unicode(e.message).encode("utf-8")) logger.info(self.error) except etree.XMLSyntaxError, e: self.error += "XMLSyntaxError in scrape_page_for_open_location on {}: {}".format(my_webpage, unicode(e.message).encode("utf-8")) logger.info(self.error) except Exception, e: self.error += "Exception in scrape_page_for_open_location on {}: {}".format(my_webpage, unicode(e.message).encode("utf-8")) logger.info(self.error)
def get_tree(page): page = page.replace(" ", " ") # otherwise starts-with for lxml doesn't work try: tree = html.fromstring(page) except (etree.XMLSyntaxError, etree.ParserError) as e: print u"not parsing, beause etree error in get_tree: {}".format(e) tree = None return tree
def output_difference(self, example, got, optionflags): want = example.want parser = self.get_parser(want, got, optionflags) errors = [] if parser is not None: try: want_doc = parser(want) except etree.XMLSyntaxError: e = sys.exc_info()[1] errors.append('In example: %s' % e) try: got_doc = parser(got) except etree.XMLSyntaxError: e = sys.exc_info()[1] errors.append('In actual output: %s' % e) if parser is None or errors: value = OutputChecker.output_difference( self, example, got, optionflags) if errors: errors.append(value) return '\n'.join(errors) else: return value html = parser is html_fromstring diff_parts = [] diff_parts.append('Expected:') diff_parts.append(self.format_doc(want_doc, html, 2)) diff_parts.append('Got:') diff_parts.append(self.format_doc(got_doc, html, 2)) diff_parts.append('Diff:') diff_parts.append(self.collect_diff(want_doc, got_doc, html, 2)) return '\n'.join(diff_parts)
def test_empty_parse(self): self.assertRaises(etree.XMLSyntaxError, etree.fromstring, '')
def _validate_document(self, document_string, harvest_object, validator=None): ''' Validates an XML document with the default, or if present, the provided validators. It will create a HarvestObjectError for each validation error found, so they can be shown properly on the frontend. Returns a tuple, with a boolean showing whether the validation passed or not, the profile used and a list of errors (tuples with error message and error lines if present). ''' if not validator: validator = self._get_validator() document_string = re.sub('<\?xml(.*)\?>', '', document_string) try: xml = etree.fromstring(document_string) except etree.XMLSyntaxError, e: self._save_object_error('Could not parse XML file: {0}'.format(str(e)), harvest_object, 'Import') return False, None, [] valid, profile, errors = validator.is_valid(xml) if not valid: log.error('Validation errors found using profile {0} for object with GUID {1}'.format(profile, harvest_object.guid)) for error in errors: self._save_object_error(error[0], harvest_object, 'Validation', line=error[1]) return valid, profile, errors
def parse(self, xmlfile, **kwargs): try: tree = self.module.parse(xmlfile, **kwargs) except XMLSyntaxError: self.skipTest("lxml detects entityt reference loop") return self.module.tostring(tree)
def parseString(self, xmlstring, **kwargs): try: tree = self.module.fromstring(xmlstring, **kwargs) except XMLSyntaxError: self.skipTest("lxml detects entityt reference loop") return self.module.tostring(tree)
def parse_xml(self, file_path): parser = etree.XMLParser(huge_tree=True) try: return etree.parse(file_path, parser) except etree.XMLSyntaxError: # probably corrupt raise TypeError()
def next(self): try: return next(self._producer) except XMLSyntaxError: raise StopIteration( "This iterator may need to be reset by calling `reset` to continue using it after" " using a random-access function like `get_by_id`")
def get_vcxproj_data(vs_project): """ Return xml data from vcxproj file :param vs_project: the vcxproj file :type vs_project: str :return: dict with VS Project data :rtype: dict """ vcxproj = {} try: tree = etree.parse(vs_project) namespace = str(tree.getroot().nsmap) ns = {'ns': namespace.partition('\'')[-1].rpartition('\'')[0]} vcxproj['tree'] = tree vcxproj['ns'] = ns assert 'http://schemas.microsoft.com' in ns['ns'] except AssertionError: # pragma: no cover send( '.vcxproj file cannot be import, because this file does not seem to comply with' ' Microsoft xml data !', 'error' ) exit(1) except (OSError, IOError): # pragma: no cover send( '.vcxproj file cannot be import. ' 'Please, verify you have rights to this directory or file exists !', 'error' ) exit(1) except etree.XMLSyntaxError: # pragma: no cover send('This file is not a ".vcxproj" file or XML is broken !', 'error') exit(1) return vcxproj
def validate_xml(xml): """ ??lxml.etree.parse ??xml???????? """ from lxml import etree try: return etree.parse(xml) except etree.XMLSyntaxError: return False
def test_data_fail(self): from lxml.etree import XMLSyntaxError spam_data = os.path.join(DATA_PATH, 'spam.html') banklist_data = os.path.join(DATA_PATH, 'banklist.html') with tm.assertRaises(XMLSyntaxError): self.read_html(spam_data) with tm.assertRaises(XMLSyntaxError): self.read_html(banklist_data)
def test_parsing_note_error(xmldata_note_error, parser): with pytest.raises((ParseError, cParseError, XMLSyntaxError), parsing_method=parser): for doc in xmliter(xmldata_note_error, 'note', parsing_method=parser): pass
def test_parsing_note_error(xmldata_note_error, parser): with pytest.raises((ParseError, cParseError, XMLSyntaxError)): xmlparse(xmldata_note_error, parsing_method=parser)
def parse_to_dict(xml): """Return the XML as an OrderedDict.""" try: return lxml_to_dict.parse(xml) # To retain compatibility with the ExpatError # that xmltodict occasionally raised from LM responses except etree.XMLSyntaxError: pass
def test_get_xml_field_ExpatError_returns_empty_dict(): with patch.object(lxml_to_dict, "parse") as parse: # Inject dummy values into XMLSyntaxError constructor parse.side_effect = XMLSyntaxError(*list(range(5))) actual = api_xml._get_xml_field('any_xml', 'myfield') assert_equal(actual, {})