我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用xml.parsers.expat.ExpatError()。
def __init__(self, response, _message=None): status = response.status reason = response.reason body = response.body.read() try: detail = XML(body).findtext("./messages/msg") except ParseError as err: detail = body message = "HTTP %d %s%s" % ( status, reason, "" if detail is None else " -- %s" % detail) Exception.__init__(self, _message or message) self.status = status self.reason = reason self.headers = response.headers self.body = body self._response = response
def _upload_instances(self, path): instances_count = 0 dirs, not_in_use = default_storage.listdir(path) for instance_dir in dirs: instance_dir_path = os.path.join(path, instance_dir) i_dirs, files = default_storage.listdir(instance_dir_path) xml_file = None if 'submission.xml' in files: file_obj = default_storage.open( os.path.join(instance_dir_path, 'submission.xml')) xml_file = file_obj if xml_file: try: self._upload_instance(xml_file, instance_dir_path, files) except ExpatError: continue except Exception: pass else: instances_count += 1 return instances_count
def index(): """ /convert endpoint accepts a URL and converts it to json :param url: url to resource containing xml/rss document :return: json representation of the xml/rss document """ request = app.current_request if 'url' not in request.query_params: raise BadRequestError("Missing url parameter") else: url = request.query_params['url'] try: response = transform(get_rss(url)) except ExpatError: raise BadRequestError("Input is not XML") else: return response
def initRedirection(self): redir = None try: redir = edfmeta.parse_redirection(self.xmlInConfig) except expat.ExpatError: self.io.print_error("Error parsing redirection information") # XXX - What should we do if we fail to parse redirection? counter = 1 for r in redir['remote']: if 'name' not in r: r['name'] = 'remote-tunnel-%d' % counter counter += 1 counter = 1 for r in redir['local']: if 'name' not in r: r['name'] = 'local-tunnel-%d' % counter counter += 1 self.redirection = { 'remote' : [RemoteRedirection(**r) for r in redir['remote']], 'local' : [LocalRedirection(**l) for l in redir['local']] }
def _from_xml(self, datastring): if datastring is None: return None plurals = set(self.metadata.get('plurals', {})) try: node = etree.fromstring(datastring) root_tag = self._get_key(node.tag) links = self._get_links(root_tag, node) result = self._from_xml_node(node, plurals) if root_tag == constants.VIRTUAL_ROOT_KEY: return result return dict({root_tag: result}, **links) except Exception as e: parseError = False if (hasattr(etree, 'ParseError') and isinstance(e, getattr(etree, 'ParseError'))): parseError = True elif isinstance(e, expat.ExpatError): parseError = True if parseError: msg = _("Cannot understand XML") raise exception.MalformedResponseBody(reason=msg) else: raise
def setOutputFile(self): ''' Initiates the xml file from the configuration. ''' if (conf.xmlFile): try: self._outputFile = conf.xmlFile self.__root = None if os.path.exists(self._outputFile): try: self.__doc = xml.dom.minidom.parse(self._outputFile) self.__root = self.__doc.childNodes[0] except ExpatError: self.__doc = Document() self._outputFP = codecs.open(self._outputFile, "w+", UNICODE_ENCODING) if self.__root is None: self.__root = self.__doc.createElementNS(NAME_SPACE_ATTR, RESULTS_ELEM_NAME) self.__root.setAttributeNode(self._createAttribute(XMLNS_ATTR, NAME_SPACE_ATTR)) self.__root.setAttributeNode(self._createAttribute(SCHEME_NAME_ATTR, SCHEME_NAME)) self.__doc.appendChild(self.__root) except IOError: raise SqlmapFilePathException("Wrong filename provided for saving the xml file: %s" % conf.xmlFile)
def GetLocalTicket(si, user): try: sessionManager = si.content.sessionManager except Exception as e: if type(e).__name__ == 'ExpatError': msg = 'Malformed response while querying for local ticket: "%s"' % e raise vim.fault.HostConnectFault(msg=msg) else: msg = 'Failed to query for local ticket: "%s"' % e raise vim.fault.HostConnectFault(msg=msg) localTicket = sessionManager.AcquireLocalTicket(userName=user) with open(localTicket.passwordFilePath) as f: content = f.read() return localTicket.userName, content ## Private method that performs the actual Connect and returns a ## connected service instance object.
def recognizes(cls, file): """ Identifies if a given file has XML extension Args: file - a diffoscope.comparators.utils.file.File object Returns: False if file is not a XML File, True otherwise """ if not super().recognizes(file): return False with open(file.path) as f: try: file.parsed = _parse(f) except (ExpatError, UnicodeDecodeError) as e: return False return True
def set_query(self, value): if self._object.query != value: try: in_filter_xml = xml.dom.minidom.parseString(value) except ExpatError: self._object.query = value return for node in in_filter_xml.getElementsByTagName('Select'): filter_text = node.childNodes[0].data time_match = re.match('(.*TimeCreated)(\[.*?\])(.*)', filter_text) if time_match: # easy to replace with our time filter filter_text = time_match.group(1)+TIME_CREATED+time_match.group(3) else: # need to insert our time filter notime_match = re.match('(\*\[System\[)(.*)', filter_text) filter_text = notime_match.group(1)+INSERT_TIME+notime_match.group(2) node.childNodes[0].data = filter_text xml_query = prettify_xml(in_filter_xml) # undo replacement of single quotes with double xml_query = re.sub(r"(\w+)='(\w+)'", r'\1="\2"', xml_query) # remove the xml header and replace any "&" with "&" header = '<?xml version=\'1.0\' encoding=\'UTF-8\'?>\n' xml_query = xml_query.replace(header, '').replace('&', '&') self._object.query = xml_query
def params(cls, datasource, context): te = lambda x: datasource.talesEval(x, context) query = datasource.query query_error = True try: query = te(' '.join(string_to_lines(datasource.query))) query_error = False except Exception: pass try: xml.dom.minidom.parseString(query) use_xml = True except ExpatError: use_xml = False return dict( eventlog=te(datasource.eventlog), query=query, query_error=query_error, max_age=te(datasource.max_age), eventid=te(datasource.id), use_xml=use_xml, eventClass=datasource.eventClass )
def parse_payment_result(self, xml): """??????????""" try: data = xmltodict.parse(xml) except (xmltodict.ParsingInterrupted, ExpatError): raise InvalidSignatureException() if not data or 'xml' not in data: raise InvalidSignatureException() data = data['xml'] sign = data.pop('sign', None) real_sign = calculate_signature(data, self.api_key) if sign != real_sign: raise InvalidSignatureException() data['sign'] = sign return data
def listen(self): """ Listen to socket. """ try: while True: head = await self.reader.readexactly(8) size, handle = struct.unpack_from('<LL', head) body = await self.reader.readexactly(size) data = method = fault = None try: data, method = loads(body, use_builtin_types=True) except Fault as e: fault = e except ExpatError as e: # See #121 for this solution. handle_exception(exception=e, module_name=__name__, func_name='listen', extra_data={'body': body}) continue if data and len(data) == 1: data = data[0] self.event_loop.create_task(self.handle_payload(handle, method, data, fault)) except ConnectionResetError as e: logger.critical( 'Connection with the dedicated server has been closed, we will now close down the subprocess! {}'.format(str(e)) ) # When the connection has been reset, we will close the controller process so it can be restarted by the god # process. Exit code 10 gives the information to the god process. exit(10) except Exception as e: handle_exception(exception=e, module_name=__name__, func_name='listen') raise
def get_config(self): """ Sets the different RTMP properties. """ if self.room_pass: _url = self._config_url_pw.format(self.room_name, self.room_pass) else: _url = self._config_url.format(self.room_name) _response = util.web.http_get(url=_url, proxy=self.proxy) log.debug('room config response: %s' % _response) if _response['content'] is not None: try: _xml = parseString(_response['content']) except ExpatError as e: log.error('ExpatError: %s' % e) _xml = None if _xml is not None: root = _xml.getElementsByTagName('response')[0] # set the config_status property. self._config_status = root.getAttribute('result') if self._config_status != 'PW' or self._config_status != 'CLOSED': # set the roomtype property. self._roomtype = root.getAttribute('roomtype') # set the tc_url property. self._tc_url = root.getAttribute('rtmp') if root.getAttribute('greenroom'): # set the is_greenroom property. self._greenroom = True if 'bpassword' in _response['content']: # set the bpassword property. self._bpassword = root.getAttribute('bpassword')
def log(self, rev_start=None, rev_end=1, verbose=False): """ return a list of LogEntry instances for this path. rev_start is the starting revision (defaulting to the first one). rev_end is the last revision (defaulting to HEAD). if verbose is True, then the LogEntry instances also know which files changed. """ assert self.check() # make it simpler for the pipe rev_start = rev_start is None and "HEAD" or rev_start rev_end = rev_end is None and "HEAD" or rev_end if rev_start == "HEAD" and rev_end == 1: rev_opt = "" else: rev_opt = "-r %s:%s" % (rev_start, rev_end) verbose_opt = verbose and "-v" or "" locale_env = fixlocale() # some blather on stderr auth_opt = self._makeauthoptions() #stdin, stdout, stderr = os.popen3(locale_env + # 'svn log --xml %s %s %s "%s"' % ( # rev_opt, verbose_opt, auth_opt, # self.strpath)) cmd = locale_env + 'svn log --xml %s %s %s "%s"' % ( rev_opt, verbose_opt, auth_opt, self.strpath) popen = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=True, ) stdout, stderr = popen.communicate() stdout = py.builtin._totext(stdout, sys.getdefaultencoding()) minidom,ExpatError = importxml() try: tree = minidom.parseString(stdout) except ExpatError: raise ValueError('no such revision') result = [] for logentry in filter(None, tree.firstChild.childNodes): if logentry.nodeType == logentry.ELEMENT_NODE: result.append(LogEntry(logentry)) return result
def importxml(cache=[]): if cache: return cache from xml.dom import minidom from xml.parsers.expat import ExpatError cache.extend([minidom, ExpatError]) return cache
def retrieve_data(self): filename = os.path.join(self._xml_path, self.refid + '.xml') try: self._retrieved_data = compound.parse(filename) except ExpatError: print('Error in xml in file %s' % filename) self._error = True self._retrieved_data = None
def parse(inFilename): try: doc = minidom.parse(inFilename) except IOError as e: raise FileIOError(e) except ExpatError as e: raise ParseError(e) rootNode = doc.documentElement rootObj = supermod.DoxygenType.factory() rootObj.build(rootNode) return rootObj
def parse_pw_xml_output(data, dir_with_bands=None): """ Parse the xml data of QE v5.0.x Input data must be a single string, as returned by file.read() Returns a dictionary with parsed values """ import copy from xml.parsers.expat import ExpatError # NOTE : I often assume that if the xml file has been written, it has no # internal errors. try: dom = xml.dom.minidom.parseString(data) except ExpatError: return {'xml_warnings': "Error in XML parseString: bad format"}, {}, {} parsed_data = {} parsed_data['xml_warnings'] = [] structure_dict = {} # CARD CELL structure_dict, lattice_vectors, volume = copy.deepcopy(xml_card_cell(structure_dict, dom)) # CARD IONS structure_dict = copy.deepcopy(xml_card_ions(structure_dict, dom, lattice_vectors, volume)) # fermi energy cardname = 'BAND_STRUCTURE_INFO' target_tags = read_xml_card(dom, cardname) tagname = 'FERMI_ENERGY' parsed_data[tagname.replace('-', '_').lower()] = \ parse_xml_child_float(tagname, target_tags) * hartree_to_ev parsed_data[tagname.lower() + units_suffix] = default_energy_units return parsed_data, structure_dict
def test1(self): xml = "\0\r\n" parser = expat.ParserCreate() try: parser.Parse(xml, True) self.fail() except expat.ExpatError as e: self.assertEqual(str(e), 'unclosed token: line 2, column 0')
def test2(self): xml = "<?xml version\xc2\x85='1.0'?>\r\n" parser = expat.ParserCreate() try: parser.Parse(xml, True) self.fail() except expat.ExpatError as e: self.assertEqual(str(e), 'XML declaration not well-formed: line 1, column 14')
def test_expaterror(self): xml = '<' parser = expat.ParserCreate() try: parser.Parse(xml, True) self.fail() except expat.ExpatError as e: self.assertEqual(e.code, errors.codes[errors.XML_ERROR_UNCLOSED_TOKEN])