我们从Python开源项目中,提取了以下49个代码示例,用于说明如何使用xmltodict.parse()。
def _handle_response(self, resp, force_list=None): """Ensure response is successful and return body as XML.""" self._debuglog("Request executed: " + str(resp.status_code)) if resp.status_code != 200: return None if resp.headers["Content-Type"] != "text/xml": # JSON requests not currently supported return None self._debuglog("Response Text: " + resp.text) data = xmltodict.parse(resp.content, force_list=force_list)['QDocRoot'] auth_passed = data['authPassed'] if auth_passed is not None and len(auth_passed) == 1 and auth_passed == "0": self._session_error = True return None return data
def preprocess_message(self, request): ''' ???????? ''' component = get_component() content = component.crypto.decrypt_message( request.body, request.query_params['msg_signature'], int(request.query_params['timestamp']), int(request.query_params['nonce']) ) message = xmltodict.parse(to_text(content))['xml'] cc = json.loads(json.dumps(message)) cc['CreateTime'] = int(cc['CreateTime']) cc['CreateTime'] = datetime.fromtimestamp(cc['CreateTime']) if 'MsgId' in cc: cc['MsgId'] = int(cc['MsgId']) return cc
def web_response_to_json(response): """ Modify the web response output to json format param response: response object from requests library return: json object representing the response content """ try: if response: resp_dict = json.loads(response.content) except: try: resp_ordereddict = xmltodict.parse(response.content) resp_json = json.dumps(resp_ordereddict, indent=4, sort_keys=True) resp_dict = json.loads(resp_json) except: raise exception.UnknownOutputFormat() return resp_dict
def working_directory(directory): owd = os.getcwd() try: os.chdir(directory) yield directory finally: os.chdir(owd) # def extractXML(project_dir, apk_location): # """ # Parses AndroidManifest file and returns a dictionary object # :param project_dir: Project Location # :param apk_location: Apk location # :return: Parsed AndroidManifest Dictionary # """ # with working_directory(project_dir): # subprocess.check_output(["./gradlew", "assembleRelease"]) # with working_directory("/tmp"): # subprocess.call(["apktool", "d", apk_location]) # with working_directory("/tmp" + "/app-release/"): # with open("AndroidManifest.xml") as fd: # obj_file = xmltodict.parse(fd.read()) # return ast.literal_eval(json.dumps(obj_file))
def extractXML(apk_location,config_location): """ @param project_dir: @param apk_location: @return: """ with working_directory("/tmp"): subprocess.call(["apktool", "d", apk_location]) config = ConfigParser.ConfigParser() config.read(config_location) app_name = "app-external-release" temp = config.get("APP_NAME","app_flavor_name") if temp != None: app_name = temp with working_directory("/tmp/" + app_name): with open("AndroidManifest.xml") as fd: obj_file = xmltodict.parse(fd.read()) return ast.literal_eval(json.dumps(obj_file))
def calcAvgPrecision(p, gt): precision = 0 rcnt = 0 #parse ordered predictions for i in range(0, len(p)): #relevant species if p[i][0] in gt: rcnt += 1 precision += float(rcnt) / (i + 1) #precision by relevant species from ground truth avgp = precision / len(gt) return avgp
def neighbor_discover_regex(self): ''' regex to parse output of discovery_command looking for: device_device device_ip device_ipv6 device_model local_interface device_interface device_version ''' return "Device ID: (?P<device_device>[\w\d\_\-\.]+)[\W\w]+?\n"\ "\s+IP [Aa]ddress: (?P<device_ip>[0-9\.]+)\n" \ "(?:\s+IPv6 address: (?P<device_ipv6>[a-z0-9\:]+)(?:\s+\(global unicast\)\n)?)?" \ "[\n\W\w]*?" \ "Platform:\s*[Cc]isco\s(?P<device_model>[\w\d\-\_\.]+)[\W\w\s]+?\n" \ "Interface: (?P<local_interface>[A-Za-z0-9/\-]+)" \ ".*: (?P<device_interface>[A-Za-z0-9/\-]+)\n" \ "[\n\W\w\S\s]*?" \ "Version.*\n" \ "(?P<device_version>[\w\W]+?)\n"
def neighbor_discover_regex(self): ''' regex to parse output of discovery_command looking for: device_device device_ip device_ipv6 device_model local_interface device_interface device_version ''' return "Device ID:(?P<device_name>[\w\d\_\-\.]+)[\W\w]+?\n"\ "\s+IPv4 [Aa]ddress: (?P<device_ip>[0-9\.]+)\n" \ "(?:\s+IPv6 [Aa]ddress: (?!fe80)(?P<device_ipv6>[a-z0-9\:]+)\n)?" \ "[\n\W\w]*?" \ "Platform:\s*(?P<device_model>[\w\d\-\_\.]+)[\W\w\s]+?\n" \ "Interface: (?P<local_interface>[A-Za-z0-9/]+)" \ ".*: (?P<device_interface>[A-Za-z0-9/\-]+)\n" \ "[\n\W\w\S\s]*?" \ "Version.*\n" \ "(?P<device_version>[\w\W]+?)\n"
def load_gt(xml_file): res = [] with open(xml_file) as f: xml = dict(xmltodict.parse(f.read())['annotation']) try: obj = xml['object'] except KeyError: print "xml {} has no objects.".format(xml_file) return np.asarray(res) if type(obj) is not list: boxes = [obj] else: boxes = obj for box in boxes: track_id = str(box['trackid']) bbox = map(int, [box['bndbox']['xmin'], box['bndbox']['ymin'], box['bndbox']['xmax'], box['bndbox']['ymax'], track_id]) res.append(bbox) return np.asarray(res)
def mock_urlopen(request, cafile=None): response = {} url = request.get_full_url() try: data = xmltodict.parse(request.data) except: data = {} try: if url == 'https://api.sofort.com/api/xml': if 'transaction_request' in data: if 'transaction' in data['transaction_request']: if data['transaction_request']['transaction'] == '123-abc-received': response = TEST_RESPONSES['123-abc-received'] elif data['transaction_request']['transaction'] == '123-abc-loss': response = TEST_RESPONSES['123-abc-loss'] except KeyError: response = False result = MockResponse(response) else: result = MockResponse(response) result.headers.update({'Content-type': 'application/xml; charset=UTF-8'}) result.headers.update({'Accept': 'application/xml; charset=UTF-8'}) return result
def get_density_map(data_name, save_data, show_image = False): assert(data_name.endswith(".xml")) #xml_data = data_name + '.xml' with open(data_name) as xml_d: doc = xmltodict.parse(xml_d.read()) img = np.zeros((240,352), np.float32) def add_to_image(image, bbox): xmin = int(bbox['xmin']) ymin = int(bbox['ymin']) xmax = int(bbox['xmax']) ymax = int(bbox['ymax']) density = 1/ float((ymax - ymin) * (xmax - xmin)) image[ymin:ymax, xmin:xmax] += density return image for vehicle in doc['annotation']['vehicle']: add_to_image(img, vehicle['bndbox']) if show_image: show_mask(img, data_name.replace('xml','jpg')) if save_data: img.tofile(data_name.replace('xml','desmap'))
def xml_to_comparable_dict(xml): def _sort_key(value): """Recursively sort lists embedded within dicts.""" if hasattr(value, 'items'): return six.text_type(sorted([(k, _sort_key(v)) for k, v in value.items()])) elif isinstance(value, (tuple, set, list)): return six.text_type(sorted(value, key=_sort_key)) else: return six.text_type(value) def _unorder(value): """Convert from a `collections.OrderedDict` to a `dict` with predictably sorted lists.""" if hasattr(value, 'items'): return {k: _unorder(v) for k, v in value.items()} elif isinstance(value, (tuple, set, list)): return sorted(tuple(_unorder(v) for v in value), key=_sort_key) else: return value return _unorder(xmltodict.parse(xml))
def parse_bracketed(s): '''Parse word features [abc=... def = ...] Also manages to parse out features that have XML within them ''' word = None attrs = {} temp = {} # Substitute XML tags, to replace them later for i, tag in enumerate(re.findall(r"(<[^<>]+>.*<\/[^<>]+>)", s)): temp["^^^%d^^^" % i] = tag s = s.replace(tag, "^^^%d^^^" % i) # Load key-value pairs, substituting as necessary for attr, val in re.findall(r"([^=\s]*)=([^\s]*)", s): if val in temp: val = remove_escapes(temp[val]) if attr == 'Text': word = remove_escapes(val) else: attrs[attr] = remove_escapes(val) return (word, attrs)
def toDict(self, body): root = xmltodict.parse(body, attr_prefix="") def walker(d): if not isinstance(d, list) and not isinstance(d, OrderedDict): return for key, val in d.items(): if isinstance(val, list): for val2 in val: walker(val2) elif isinstance(val, OrderedDict): walker(val) elif val is None: d[key] = OrderedDict() elif val == "false": d[key] = False elif val == "true": d[key] = True walker(root) return root
def call_api(device_ip, token, resource, xml_attribs=True): headers = {} if token is not None: headers = {'__RequestVerificationToken': token} try: r = requests.get(url='http://' + device_ip + resource, headers=headers, allow_redirects=False, timeout=(2.0,2.0)) except requests.exceptions.RequestException as e: print ("Error: "+str(e)) return False; if r.status_code == 200: d = xmltodict.parse(r.text, xml_attribs=xml_attribs) if 'error' in d: raise Exception('Received error code ' + d['error']['code'] + ' for URL ' + r.url) return d else: raise Exception('Received status code ' + str(r.status_code) + ' for URL ' + r.url)
def test_getcomps_comps(self): RAW_XML = "" with open('./testdata/get_comps.xml', 'r') as f: RAW_XML = ''.join(f.readlines()) data = xmltodict.parse(RAW_XML) comps = data.get('Comps:comps')['response']['properties']['comparables']['comp'] comp_places = [] for datum in comps: place = Place() place.set_data(datum) comp_places.append(place) self.assertEqual(10, len(comp_places))
def decode_json(self): """ Decode the message and convert to a standard JSON dictionary. :returns: a string that contains the converted JSON document. """ if not self.decoded_json_cache: cls = self.__class__ d = self.decode() try: import xmltodict if "Msg" in d: d["Msg"] = xmltodict.parse(d["Msg"]) except ImportError: pass self.decoded_json_cache = json.dumps(d, cls=SuperEncoder) return self.decoded_json_cache
def _dispatch(self, xml): logger.info('post xml:%s', xml) # ????????? d = xmltodict.parse(xml) # ????????????????? root = d['xml'] msg_type = root['MsgType'] meth = getattr(self, 'on_%s' % msg_type, None) r = meth(root) logger.info('return: %s', r) return r
def get_shelves(gr_user_id, goodreads_key): """Pulls user's shelves out of their user info.""" user = requests.get('https://www.goodreads.com/user/show.xml?key=$%s&v=2&id=%s' % (goodreads_key, gr_user_id)) user_info = xmltodict.parse(user.content) # initialize user shelf dictionary shelves = {} # extract user shelves: name, id, book count; eventually this should save to DB for user_shelf in user_info['GoodreadsResponse']['user']['user_shelves']['user_shelf']: shelf_name = user_shelf['name'] shelf_id = user_shelf['id']['#text'] num_of_books = int(user_shelf['book_count']['#text']) num_pages = (num_of_books/200) + 1 shelves[shelf_name] = {'id': shelf_id, 'item_count': num_of_books, 'pages': num_pages} return shelves
def get_books_from_shelves(shelves, goodreads_key): """Takes in dictionary of user's shelves; returns list of all books on shelves. Return list: books stored in tuples: (shelf name, book info).""" all_books = [] for shelf in shelves.keys(): pages = shelves[shelf]['pages'] for page in range(1,pages+1): shelf_response = requests.get('https://www.goodreads.com/review/list.xml?key=$%s&v=2&id=%s&shelf=%s&per_page=200&page=%d' % (goodreads_key, gr_user_id, shelf, page)) parsed_shelf = xmltodict.parse(shelf_response.content) for book in parsed_shelf['GoodreadsResponse']['reviews']['review']: all_books.append((shelf, book)) return all_books ##### ADD BOOK ####
def fetch_book_data(): """Based on book's Goodreads ID, fetch language & original publication year. Uses GR method book.show; saves data to library table.""" to_update = Book.query.filter((Book.language.is_(None)) | (Book.original_pub_year.is_(None))).all() # for book in need_language: for book in to_update: response = requests.get("https://www.goodreads.com/book/show/%s?key=%s&format=xml" % (book.goodreads_bid, goodreads_key)) parsed_response = xmltodict.parse(response.content) book_info = parsed_response['GoodreadsResponse'] book.original_pub_year = int(book_info['book']['work']['original_publication_year']['#text']) book.language = book_info['book']['language_code'] db.session.add(book) db.session.commit() return ###### ADD USERBOOK ######
def fetch_book_data(): """Based on book's Goodreads ID, fetch language & original publication year. Uses GR method book.show; saves data to library table.""" to_update = Book.query.filter((Book.language.is_(None)) | (Book.original_pub_year.is_(None))).all() for book in to_update: response = requests.get("https://www.goodreads.com/book/show/%s?key=%s&format=xml" % (book.goodreads_bid, goodreads_key)) parsed_response = xmltodict.parse(response.content) book_info = parsed_response['GoodreadsResponse'] book.original_pub_year = int(book_info['book']['work']['original_publication_year']['#text']) book.language = book_info['book']['language_code'] db.session.add(book) db.session.commit() ################################### # FUNCTION CALLS # connect_to_db(app) # fetch_book_metadata()
def parser(self, response): if response: match = re.findall(r'API limit', response.text) # if match: self.error_log.error_log('Sorry API Limit reached (300 q/month). Get new: https://totalhash.cymru.com/contact-us/', self.station_name) return [] xml_dict = xmltodict.parse(response.text) if 'response' in xml_dict: if 'result' in xml_dict['response']: if 'doc' in xml_dict['response']['result']: try: response = xml_dict['response']['result'] # Unpacking xmltodict for records in response['doc']: self.hash_list.append(records['str']['#text']) # Append hash_values except: pass return self.hash_list # Return empty
def woeid_search(query): """ Find the first Where On Earth ID for the given query. Result is the etree node for the result, so that location data can still be retrieved. Returns None if there is no result, or the woeid field is empty. """ glados.log('woeid_search for: "{}"'.format(query)) query = urllib.parse.quote('select * from geo.places where text="{}"'.format(query)) query = 'http://query.yahooapis.com/v1/public/yql?q=' + query glados.log('Request: {}'.format(query)) body = urllib.request.urlopen(query).read() parsed = xmltodict.parse(body).get('query') results = parsed.get('results') if results is None or results.get('place') is None: return None if type(results.get('place')) is list: return results.get('place')[0] return results.get('place')
def get_woeid(self, message, user): if user == '': user = message.author.name try: woeid = self.woeid_db[user.lower()] query = urllib.parse.quote('select * from weather.forecast where woeid="{}" and u=\'c\''.format(woeid)) query = 'http://query.yahooapis.com/v1/public/yql?q=' + query glados.log('Request: {}'.format(query)) body = urllib.request.urlopen(query).read() parsed = xmltodict.parse(body).get('query') results = parsed.get('results') if results is None: await self.client.send_message(message.channel, 'Couldn\'t look up location. The WOEID of {} is: {}'.format(user, woeid)) return location = results.get('channel').get('title') await self.client.send_message(message.channel, 'Location of {} is {}'.format(user, location)) except KeyError: await self.client.send_message(message.channel, 'No location set. You can use .setlocation to set one')
def _transform_yang_to_dict(yang_model_string): class Opts(object): def __init__(self, yin_canonical=False, yin_pretty_strings=True): self.yin_canonical = yin_canonical self.yin_pretty_strings = yin_pretty_strings ctx = Context(FileRepository()) yang_mod = ctx.add_module('yang', yang_model_string, format='yang') yin = YINPlugin() modules = [] modules.append(yang_mod) ctx.opts = Opts() yin_string = StringIO() yin.emit(ctx=ctx, modules=modules, fd=yin_string) xml = yin_string.getvalue() return xmltodict.parse(xml)
def import_user_xml(xml_file='', _user=None): ''' Import an XML file created by the ShakeCast workbook; Users Args: xml_file (string): The filepath to the xml_file that will be uploaded _user (int): User id of admin making inventory changes Returns: dict: a dictionary that contains information about the function run :: data = {'status': either 'finished' or 'failed', 'message': message to be returned to the UI, 'log': message to be added to ShakeCast log and should contain info on error} ''' with open(xml_file, 'r') as xml_str: user_xml_dict = json.loads(json.dumps(xmltodict.parse(xml_str))) user_list = user_xml_dict['UserTable']['UserRow'] if isinstance(user_list, list) is False: user_list = [user_list] data = import_user_dicts(user_list, _user) return data
def determine_xml(xml_file=''): ''' Determine what type of XML file this is; facility, group, user, master, or unknown ''' tree = ET.parse(xml_file) root = tree.getroot() xml_type = '' if 'FacilityTable' in str(root): xml_type = 'facility' elif 'GroupTable' in str(root): xml_type = 'group' elif 'UserTable' in str(root): xml_type = 'user' elif 'Inventory' in str(root): xml_type = 'master' else: xml_type = 'unknown' return xml_type
def submit_file(self, file_obj, filename="sample"): """Submits a file to WildFire for analysis Args: file_obj (file): The file to send filename (str): An optional filename Returns: dict: Analysis results Raises: WildFireException: If an API error occurs """ url = "{0}{1}".format(self.api_root, "/submit/file") data = dict(apikey=self.api_key) files = dict(file=(filename, file_obj)) response = self.session.post(url, data=data, files=files) return xmltodict.parse(response.text)['wildfire']['upload-file-info']
def submit_remote_file(self, url): """Submits a file from a remote URL for analysis Args: url (str): The URL where the file is located Returns: dict: Analysis results Raises: WildFireException: If an API error occurs Notes: This is for submitting files located at remote URLs, not web pages. See Also: submit_urls(self, urls) """ request_url = "{0}{1}".format(self.api_root, "/submit/url") data = dict(apikey=self.api_key, url=url) response = self.session.post(request_url, data=data) return xmltodict.parse(response.text)['wildfire']['upload-file-info']
def lambda_handler(self, event, context): # Loop through records provided by S3 Event trigger self.logger.info("Working on bucket-key in S3...") # Extract the Key and Bucket names for the asset uploaded to S3 key = event['key'] bucket = event['bucket'] self.logger.info("Bucket: {} \t Key: {}".format(bucket, key)) # Generate a signed URL for the uploaded asset signed_url = self.get_signed_url(self.SIGNED_URL_EXPIRATION, bucket, key) self.logger.info("Signed URL: {}".format(signed_url)) # Launch MediaInfo # Pass the signed URL of the uploaded asset to MediaInfo as an input # MediaInfo will extract the technical metadata from the asset # The extracted metadata will be outputted in XML format and # stored in the variable xml_output xml_output = subprocess.check_output(["mediainfo", "--full", "--output=XML", signed_url]) self.logger.info("Output: {}".format(xml_output)) xml_json = xmltodict.parse(xml_output) return self.write_job_spec_to_file(xml_json, bucket, key)
def _decrypt_message(self, msg, msg_signature, timestamp, nonce): """??????????????????? :param msg: ?????POST????? :param msg_signature: ??????URL???msg_signature :param timestamp: ??????URL???timestamp :param nonce: ??????URL???nonce :return: ?????? """ timestamp = to_binary(timestamp) nonce = to_binary(nonce) if isinstance(msg, six.string_types): try: msg = xmltodict.parse(to_text(msg))['xml'] except Exception as e: raise ParseError(e) encrypt = msg['Encrypt'] signature = get_sha1_signature(self.__token, timestamp, nonce, encrypt) if signature != msg_signature: raise ValidateSignatureError() return self.__pc.decrypt(encrypt, self.__id)
def _parse(self, xml_file): if xml_file: data = ET.parse(xml_file).getroot() else: self.log.error("Failed to retrieve suppliers") return False ''' Data is an XML ElementTree Object ''' self.id_map = {} self.catid = '' self.catname = '' for elem in data.iter('SupplierMapping'): self.mfrid = elem.attrib['supplier_id'] self.mfrname = elem.attrib['name'] if not self.mfrname: self.mfrname = "Unknown" self.id_map[self.mfrid] = self.mfrname self.log.info("Parsed {} Manufacturers from IceCat Supplier Map".format(str(len(self.id_map.keys()))))
def _parse(self, xml_file): if xml_file.endswith('.gz'): with gzip.open(xml_file, 'rb') as f: data = ET.parse(f).getroot() else: data = ET.parse(xml_file).getroot() ''' Data is an XML ElementTree Object ''' self.id_map = {} self.catid = '' self.catname = '' self.findpath = 'Name[@langid="' + langid + '"]' for elem in data.iter('Category'): self.catid = elem.attrib['ID'] for name in elem.iterfind(self.findpath): self.catname = name.attrib['Value'] # only need one match break if not self.catname: self.catname = "Unknown" self.id_map[self.catid] = self.catname self.log.info("Parsed {} Categories from IceCat CategoriesList".format(str(len(self.id_map.keys()))))
def _parse(self, xml_file): self.xml_file = xml_file self.key_count = 0 if not self.suppliers: self.suppliers = IceCatSupplierMapping(log=self.log, auth=self.auth, data_dir=self.data_dir) if not self.categories: self.categories = IceCatCategoryMapping(log=self.log, data_dir=self.data_dir, auth=self.auth) print("Parsing products from index file:", xml_file) with progressbar.ProgressBar(max_value=progressbar.UnknownLength) as self.bar: with open(self.xml_file, 'rb') as f: self.o = xmltodict.parse(f, attr_prefix='', postprocessor=self._postprocessor, namespace_separator='', process_namespaces=True, namespaces=self._namespaces) f.closed # peel down to file key self.o = self.o['icecat-interface']['files.index']['file'] self.log.info("Parsed {} products from IceCat catalog".format(str(len(self.o)))) return len(self.o)
def list_publish_profiles(resource_group_name, name, slot=None): import xmltodict content = _generic_site_operation(resource_group_name, name, 'list_publishing_profile_xml_with_secrets', slot) full_xml = '' for f in content: full_xml += f.decode() profiles = xmltodict.parse(full_xml, xml_attribs=True)['publishData']['publishProfile'] converted = [] for profile in profiles: new = {} for key in profile: # strip the leading '@' xmltodict put in for attributes new[key.lstrip('@')] = profile[key] converted.append(new) return converted
def validate_result(self, test, result, traceback=""): """Validate adding result gives the expected output. Args: test (rotest.core.case.TestCase): the test its result was added. result (str): result to add to the test. traceback (str): the traceback of the test. Raises: AssertionError. the result wasn't added as expected. """ if isinstance(test, TestBlock): return result_xml_file = os.path.join(test.work_dir, XMLHandler.XML_REPORT_PATH) expected_xml_file = self.expected_xml_files.next() expected_xml = xmltodict.parse(open(expected_xml_file, "rt").read(), dict_constructor=dict) result_xml = xmltodict.parse(open(result_xml_file, "rt").read(), dict_constructor=dict) self.assertEqual(expected_xml, result_xml)
def post(self, request, *args, **kwargs): pay = PayApi() data = request.body data = dict(xmltodict.parse(data)['xml']) result = {} sign = data['sign'] del data['sign'] #check_sign = wx.get_sign(data) if sign: order_id = data['out_trade_no'][10:] pay_number = data['transaction_id'] result = self.handle_order(order_id, pay_number) else: result['return_code'] = 'FAIL' result['return_msg'] = 'ERROR' result_xml = pay.dict_to_xml(result) return HttpResponse(result_xml)
def refresh_properties(self): if self._instance.game.game == 'tm': method = 'Trackmania.UI.GetProperties' else: method = 'Shootmania.UI.GetProperties' try: self._raw = await self._instance.gbx(method) self._properties = xd.parse(self._raw['raw_1']) except Exception as e: self._properties = dict() self._raw = None
def _transform_res(res, transform: str='xml'): if transform == 'xml': content = xmltodict.parse(res.text) return content['GoodreadsResponse'] if transform == 'json': content = json.loads(res.text) # This is just for consistency of return values across # different methods in this class - the ordering is not meaningful return OrderedDict(content.items()) return res.text
def getJsonFromPlex(url): response = requests.get(url) xml_obj = xmltodict.parse(response.text) json_obj = json.loads(json.dumps(xml_obj)) return json_obj
def _parse_xml(self, xml): d = xmltodict.parse(xml) self.ssid = d['WLANProfile']['SSIDConfig']['SSID']['name']
def __init__(self, xml_doc): with open(xml_doc) as fd: self._doc = xmltodict.parse(fd.read(), process_namespaces=True, namespaces=namespaces) self.peilmetingen = self._get_peilmetingen_df() self.observaties = self._get_observaties_df() self.metadata_locatie = \ self._doc["kern:dov-schema"]["grondwaterlocatie"] self.metadata_filters = self._get_filter_metadata()
def parse_wfs(response, layer, version): """A generator to parse the response from a wfs, depending on the server version Parameters ---------- response : StringIO The response from a wfs.getfeature() query (OWSlib) layer : str The wfs layer that is queried version : str The version of the WFS server: only '1.1.0' and '2.0.0' """ if version == "1.1.0": # convert layer preposition to null layer = 'null:' + layer.split(':')[1] # convert the response to a dictionary doc = xmltodict.parse(response) # yield the layers of the dict for a in doc['wfs:FeatureCollection']['gml:featureMembers']: yield (a[layer]) elif version == "2.0.0": # convert the response to a dictionary doc = xmltodict.parse(response.read()) # yield the layers of the dict for a in doc['wfs:FeatureCollection']['wfs:member']: yield (a[layer])
def process_message(self, msg: Message): command, text = self.parse_message(msg) if text.lower() in self.help_words: return await msg.answer("??????:\n" + "\n".join(self.description) + "\n\n????????? ????:\n" + ', '.join([k.capitalize() for k in self.news.keys()])) url = self.news["???????"] if text.lower() in self.news: url = self.news[text] async with aiohttp.ClientSession() as sess: async with sess.get(url) as resp: xml = xmltodict.parse(await resp.text()) if "rss" not in xml or "channel" not in xml["rss"] or "item" not in xml["rss"]["channel"]: return await msg.answer(self.error) items = xml["rss"]["channel"]["item"] item = choice(items) if "title" not in item or "description" not in item: return await msg.answer(self.error) return await msg.answer(f'?? {item["title"]}\n' f'?? {item["description"]}')
def preprocess_message(self, request): component = get_component() content = component.crypto.decrypt_message( request.body, request.query_params['msg_signature'], int(request.query_params['timestamp']), int(request.query_params['nonce']) ) message = xmltodict.parse(to_text(content))['xml'] cc = json.loads(json.dumps(message)) cc['CreateTime'] = int(cc['CreateTime']) cc['CreateTime'] = datetime.fromtimestamp(cc['CreateTime']) if 'MsgId' in cc: cc['MsgId'] = int(cc['MsgId']) return cc
def wx_xml2dict(xmlstr): return xmltodict.parse(xmlstr)['xml']
def display(self, response, headers, webobject=True): resp_json = "" request_id = None try: if headers and headers.get('x-jcs-request-id'): request_id = headers.get('x-jcs-request-id') elif headers and headers.get('request-id'): request_id = headers.get('request-id') if response: if webobject: resp_dict = json.loads(response) else: resp_dict = response if not request_id: request_id = utils.requestid_in_response(resp_dict) resp_json = json.dumps(resp_dict, indent=4, sort_keys=True) except: try: resp_ordereddict = xmltodict.parse(response) resp_json = json.dumps(resp_ordereddict, indent=4, sort_keys=True) resp_dict = json.loads(resp_json) if not request_id: request_id = utils.requestid_in_response(resp_dict) resp_json = json.dumps(resp_dict, indent=4, sort_keys=True) resp_json = resp_json.replace("\\n", "\n") resp_json = resp_json.replace("\\", "") except Exception as e: raise e #raise exception.UnknownOutputFormat() # Handle request-id displaying if not request_id: raise exception.UnknownOutputFormat() output_msg = resp_json output_msg += "\nRequest-Id: " + request_id print(output_msg)
def load_xml_into_raw_dict(filename): """ Returns a raw dict containing an xml dump using `xmltodict.parse`. """ with open(filename) as xapi_dump: dump = xmltodict.parse(xapi_dump.read()) return dump['database']