Python xmltodict 模块,parse() 实例源码

我们从Python开源项目中,提取了以下49个代码示例,用于说明如何使用xmltodict.parse()

项目:python-qnapstats    作者:colinodell    | 项目源码 | 文件源码
def _handle_response(self, resp, force_list=None):
        """Ensure response is successful and return body as XML."""
        self._debuglog("Request executed: " + str(resp.status_code))
        if resp.status_code != 200:
            return None

        if resp.headers["Content-Type"] != "text/xml":
            # JSON requests not currently supported
            return None
        self._debuglog("Response Text: " + resp.text)
        data = xmltodict.parse(resp.content, force_list=force_list)['QDocRoot']

        auth_passed = data['authPassed']
        if auth_passed is not None and len(auth_passed) == 1 and auth_passed == "0":
            self._session_error = True
            return None

        return data
项目:django-wechat-example    作者:wechatpy    | 项目源码 | 文件源码
def preprocess_message(self, request):
        '''
        ????????
        '''
        component = get_component()
        content = component.crypto.decrypt_message(
            request.body,
            request.query_params['msg_signature'],
            int(request.query_params['timestamp']),
            int(request.query_params['nonce'])
        )
        message = xmltodict.parse(to_text(content))['xml']
        cc = json.loads(json.dumps(message))
        cc['CreateTime'] = int(cc['CreateTime'])
        cc['CreateTime'] = datetime.fromtimestamp(cc['CreateTime'])
        if 'MsgId' in cc:
            cc['MsgId'] = int(cc['MsgId'])
        return cc
项目:jcsclient    作者:jiocloudservices    | 项目源码 | 文件源码
def web_response_to_json(response):
    """
    Modify the web response output to json format

    param response: response object from requests library

    return: json object representing the response content
    """
    try:
        if response:
            resp_dict = json.loads(response.content)
    except:
        try:
            resp_ordereddict = xmltodict.parse(response.content)
            resp_json = json.dumps(resp_ordereddict, indent=4,
                                   sort_keys=True)
            resp_dict = json.loads(resp_json)
        except:
            raise exception.UnknownOutputFormat()
    return resp_dict
项目:ApkParser    作者:yigitozgumus    | 项目源码 | 文件源码
def working_directory(directory):
    owd = os.getcwd()
    try:
        os.chdir(directory)
        yield directory
    finally:
        os.chdir(owd)


# def extractXML(project_dir, apk_location):
#     """
#     Parses AndroidManifest file and returns a dictionary object
#     :param project_dir: Project Location
#     :param apk_location: Apk location
#     :return: Parsed AndroidManifest Dictionary
#     """
#     with working_directory(project_dir):
#         subprocess.check_output(["./gradlew", "assembleRelease"])
#     with working_directory("/tmp"):
#         subprocess.call(["apktool", "d", apk_location])
#     with working_directory("/tmp" + "/app-release/"):
#         with open("AndroidManifest.xml") as fd:
#             obj_file = xmltodict.parse(fd.read())
#             return ast.literal_eval(json.dumps(obj_file))
项目:ApkParser    作者:yigitozgumus    | 项目源码 | 文件源码
def extractXML(apk_location,config_location):
    """

    @param project_dir:
    @param apk_location:
    @return:
    """
    with working_directory("/tmp"):
        subprocess.call(["apktool", "d", apk_location])
    config = ConfigParser.ConfigParser()
    config.read(config_location)
    app_name = "app-external-release"
    temp = config.get("APP_NAME","app_flavor_name")
    if temp != None:
        app_name = temp
    with working_directory("/tmp/" + app_name):
        with open("AndroidManifest.xml") as fd:
            obj_file = xmltodict.parse(fd.read())
            return ast.literal_eval(json.dumps(obj_file))
项目:BirdCLEF2017    作者:kahst    | 项目源码 | 文件源码
def calcAvgPrecision(p, gt):

    precision = 0
    rcnt = 0

    #parse ordered predictions
    for i in range(0, len(p)):

        #relevant species
        if p[i][0] in gt:            
            rcnt += 1
            precision += float(rcnt) / (i + 1)

    #precision by relevant species from ground truth
    avgp = precision / len(gt)

    return avgp
项目:network_discovery    作者:plucena24    | 项目源码 | 文件源码
def neighbor_discover_regex(self):
        '''
        regex to parse output of discovery_command
        looking for:
            device_device
            device_ip
            device_ipv6
            device_model
            local_interface
            device_interface
            device_version
        '''
        return "Device ID: (?P<device_device>[\w\d\_\-\.]+)[\W\w]+?\n"\
           "\s+IP [Aa]ddress: (?P<device_ip>[0-9\.]+)\n" \
           "(?:\s+IPv6 address: (?P<device_ipv6>[a-z0-9\:]+)(?:\s+\(global unicast\)\n)?)?" \
           "[\n\W\w]*?" \
           "Platform:\s*[Cc]isco\s(?P<device_model>[\w\d\-\_\.]+)[\W\w\s]+?\n" \
           "Interface: (?P<local_interface>[A-Za-z0-9/\-]+)" \
           ".*: (?P<device_interface>[A-Za-z0-9/\-]+)\n" \
           "[\n\W\w\S\s]*?" \
           "Version.*\n" \
           "(?P<device_version>[\w\W]+?)\n"
项目:network_discovery    作者:plucena24    | 项目源码 | 文件源码
def neighbor_discover_regex(self):
        '''
        regex to parse output of discovery_command
        looking for:
            device_device
            device_ip
            device_ipv6
            device_model
            local_interface
            device_interface
            device_version
        '''
        return "Device ID:(?P<device_name>[\w\d\_\-\.]+)[\W\w]+?\n"\
               "\s+IPv4 [Aa]ddress: (?P<device_ip>[0-9\.]+)\n" \
               "(?:\s+IPv6 [Aa]ddress: (?!fe80)(?P<device_ipv6>[a-z0-9\:]+)\n)?" \
               "[\n\W\w]*?" \
               "Platform:\s*(?P<device_model>[\w\d\-\_\.]+)[\W\w\s]+?\n" \
               "Interface: (?P<local_interface>[A-Za-z0-9/]+)" \
               ".*: (?P<device_interface>[A-Za-z0-9/\-]+)\n" \
               "[\n\W\w\S\s]*?" \
               "Version.*\n" \
               "(?P<device_version>[\w\W]+?)\n"
项目:TPN    作者:myfavouritekk    | 项目源码 | 文件源码
def load_gt(xml_file):
    res = []
    with open(xml_file) as f:
        xml = dict(xmltodict.parse(f.read())['annotation'])
        try:
            obj = xml['object']
        except KeyError:
            print "xml {} has no objects.".format(xml_file)
            return np.asarray(res)
        if type(obj) is not list:
            boxes = [obj]
        else:
            boxes = obj
        for box in boxes:
            track_id = str(box['trackid'])
            bbox = map(int, [box['bndbox']['xmin'],
                             box['bndbox']['ymin'],
                             box['bndbox']['xmax'],
                             box['bndbox']['ymax'],
                             track_id])
            res.append(bbox)
    return np.asarray(res)
项目:django-sofortueberweisung    作者:ParticulateSolutions    | 项目源码 | 文件源码
def mock_urlopen(request, cafile=None):
    response = {}
    url = request.get_full_url()
    try:
        data = xmltodict.parse(request.data)
    except:
        data = {}
    try:
        if url == 'https://api.sofort.com/api/xml':
            if 'transaction_request' in data:
                if 'transaction' in data['transaction_request']:
                    if data['transaction_request']['transaction'] == '123-abc-received':
                        response = TEST_RESPONSES['123-abc-received']
                    elif data['transaction_request']['transaction'] == '123-abc-loss':
                        response = TEST_RESPONSES['123-abc-loss']
    except KeyError:
        response = False
        result = MockResponse(response)
    else:
        result = MockResponse(response)
        result.headers.update({'Content-type': 'application/xml; charset=UTF-8'})
        result.headers.update({'Accept': 'application/xml; charset=UTF-8'})
    return result
项目:traffic_video_analysis    作者:polltooh    | 项目源码 | 文件源码
def get_density_map(data_name, save_data, show_image = False):
    assert(data_name.endswith(".xml"))
    #xml_data = data_name + '.xml'

    with open(data_name) as xml_d:
        doc = xmltodict.parse(xml_d.read())

    img = np.zeros((240,352), np.float32)

    def add_to_image(image, bbox):
        xmin = int(bbox['xmin'])
        ymin = int(bbox['ymin'])
        xmax = int(bbox['xmax'])
        ymax = int(bbox['ymax'])
        density = 1/ float((ymax - ymin) * (xmax - xmin))   
        image[ymin:ymax, xmin:xmax] += density
        return image

    for vehicle in doc['annotation']['vehicle']:
        add_to_image(img, vehicle['bndbox'])

    if show_image: 
        show_mask(img, data_name.replace('xml','jpg'))
    if save_data:
        img.tofile(data_name.replace('xml','desmap'))
项目:pyoozie    作者:Shopify    | 项目源码 | 文件源码
def xml_to_comparable_dict(xml):

    def _sort_key(value):
        """Recursively sort lists embedded within dicts."""
        if hasattr(value, 'items'):
            return six.text_type(sorted([(k, _sort_key(v)) for k, v in value.items()]))
        elif isinstance(value, (tuple, set, list)):
            return six.text_type(sorted(value, key=_sort_key))
        else:
            return six.text_type(value)

    def _unorder(value):
        """Convert from a `collections.OrderedDict` to a `dict` with predictably sorted lists."""
        if hasattr(value, 'items'):
            return {k: _unorder(v) for k, v in value.items()}
        elif isinstance(value, (tuple, set, list)):
            return sorted(tuple(_unorder(v) for v in value), key=_sort_key)
        else:
            return value

    return _unorder(xmltodict.parse(xml))
项目:refer-parser2    作者:lichengunc    | 项目源码 | 文件源码
def parse_bracketed(s):
    '''Parse word features [abc=... def = ...]
    Also manages to parse out features that have XML within them
    '''
    word = None
    attrs = {}
    temp = {}
    # Substitute XML tags, to replace them later
    for i, tag in enumerate(re.findall(r"(<[^<>]+>.*<\/[^<>]+>)", s)):
        temp["^^^%d^^^" % i] = tag
        s = s.replace(tag, "^^^%d^^^" % i)
    # Load key-value pairs, substituting as necessary
    for attr, val in re.findall(r"([^=\s]*)=([^\s]*)", s):
        if val in temp:
            val = remove_escapes(temp[val])
        if attr == 'Text':
            word = remove_escapes(val)
        else:
            attrs[attr] = remove_escapes(val)
    return (word, attrs)
项目:unsonic    作者:redshodan    | 项目源码 | 文件源码
def toDict(self, body):
        root = xmltodict.parse(body, attr_prefix="")

        def walker(d):
            if not isinstance(d, list) and not isinstance(d, OrderedDict):
                return
            for key, val in d.items():
                if isinstance(val, list):
                    for val2 in val:
                        walker(val2)
                elif isinstance(val, OrderedDict):
                    walker(val)
                elif val is None:
                    d[key] = OrderedDict()
                elif val == "false":
                    d[key] = False
                elif val == "true":
                    d[key] = True

        walker(root)
        return root
项目:huawei-hilink-status    作者:trick77    | 项目源码 | 文件源码
def call_api(device_ip, token, resource, xml_attribs=True):
    headers = {}
    if token is not None:
        headers = {'__RequestVerificationToken': token}
    try:
        r = requests.get(url='http://' + device_ip + resource, headers=headers, allow_redirects=False, timeout=(2.0,2.0))
    except requests.exceptions.RequestException as e:
        print ("Error: "+str(e))
        return False;

    if r.status_code == 200:
        d = xmltodict.parse(r.text, xml_attribs=xml_attribs)
        if 'error' in d:
            raise Exception('Received error code ' + d['error']['code'] + ' for URL ' + r.url)
        return d            
    else:
      raise Exception('Received status code ' + str(r.status_code) + ' for URL ' + r.url)
项目:python-zillow    作者:seme0021    | 项目源码 | 文件源码
def test_getcomps_comps(self):
        RAW_XML = ""
        with open('./testdata/get_comps.xml', 'r') as f:
            RAW_XML = ''.join(f.readlines())

        data = xmltodict.parse(RAW_XML)
        comps = data.get('Comps:comps')['response']['properties']['comparables']['comp']

        comp_places = []

        for datum in comps:
            place = Place()
            place.set_data(datum)
            comp_places.append(place)

        self.assertEqual(10, len(comp_places))
项目:mobileinsight-core    作者:mobile-insight    | 项目源码 | 文件源码
def decode_json(self):
        """
        Decode the message and convert to a standard JSON dictionary.

        :returns: a string that contains the converted JSON document.
        """

        if not self.decoded_json_cache:
            cls = self.__class__

            d = self.decode()

            try:
                import xmltodict
                if "Msg" in d:
                    d["Msg"] = xmltodict.parse(d["Msg"])
            except ImportError:
                pass
            self.decoded_json_cache = json.dumps(d, cls=SuperEncoder)
        return self.decoded_json_cache
项目:wechat_sdk    作者:lyroge    | 项目源码 | 文件源码
def _dispatch(self, xml):
        logger.info('post xml:%s', xml)

        # ?????????
        d = xmltodict.parse(xml)

        # ?????????????????
        root = d['xml']
        msg_type = root['MsgType']

        meth = getattr(self, 'on_%s' % msg_type, None)
        r = meth(root)

        logger.info('return: %s', r)

        return r
项目:next-book    作者:EmmaOnThursday    | 项目源码 | 文件源码
def get_shelves(gr_user_id, goodreads_key):
    """Pulls user's shelves out of their user info."""

    user = requests.get('https://www.goodreads.com/user/show.xml?key=$%s&v=2&id=%s' % (goodreads_key, gr_user_id))
    user_info = xmltodict.parse(user.content)
    # initialize user shelf dictionary
    shelves = {}
    # extract user shelves: name, id, book count; eventually this should save to DB
    for user_shelf in user_info['GoodreadsResponse']['user']['user_shelves']['user_shelf']:
        shelf_name = user_shelf['name']
        shelf_id = user_shelf['id']['#text']
        num_of_books = int(user_shelf['book_count']['#text'])
        num_pages = (num_of_books/200) + 1
        shelves[shelf_name] = {'id': shelf_id, 'item_count': num_of_books, 'pages': num_pages}

    return shelves
项目:next-book    作者:EmmaOnThursday    | 项目源码 | 文件源码
def get_books_from_shelves(shelves, goodreads_key):
    """Takes in dictionary of user's shelves; returns list of all books on shelves.
    Return list: books stored in tuples: (shelf name, book info)."""

    all_books = []

    for shelf in shelves.keys():
        pages = shelves[shelf]['pages']
        for page in range(1,pages+1):
            shelf_response = requests.get('https://www.goodreads.com/review/list.xml?key=$%s&v=2&id=%s&shelf=%s&per_page=200&page=%d' 
                                            % (goodreads_key, gr_user_id, shelf, page))
            parsed_shelf = xmltodict.parse(shelf_response.content)
            for book in parsed_shelf['GoodreadsResponse']['reviews']['review']:
                all_books.append((shelf, book))
    return all_books



##### ADD BOOK ####
项目:next-book    作者:EmmaOnThursday    | 项目源码 | 文件源码
def fetch_book_data():
    """Based on book's Goodreads ID, fetch language & original publication year.
    Uses GR method book.show; saves data to library table."""

    to_update = Book.query.filter((Book.language.is_(None)) | (Book.original_pub_year.is_(None))).all()   

    # for book in need_language:
    for book in to_update:
        response = requests.get("https://www.goodreads.com/book/show/%s?key=%s&format=xml" % (book.goodreads_bid, goodreads_key))
        parsed_response = xmltodict.parse(response.content)
        book_info = parsed_response['GoodreadsResponse']
        book.original_pub_year = int(book_info['book']['work']['original_publication_year']['#text'])
        book.language = book_info['book']['language_code']
        db.session.add(book)

    db.session.commit()
    return




###### ADD USERBOOK ######
项目:next-book    作者:EmmaOnThursday    | 项目源码 | 文件源码
def fetch_book_data():
    """Based on book's Goodreads ID, fetch language & original publication year.
    Uses GR method book.show; saves data to library table."""

    to_update = Book.query.filter((Book.language.is_(None)) | (Book.original_pub_year.is_(None))).all()

    for book in to_update:
        response = requests.get("https://www.goodreads.com/book/show/%s?key=%s&format=xml" % (book.goodreads_bid, goodreads_key))
        parsed_response = xmltodict.parse(response.content)
        book_info = parsed_response['GoodreadsResponse']
        book.original_pub_year = int(book_info['book']['work']['original_publication_year']['#text'])
        book.language = book_info['book']['language_code']
        db.session.add(book)

    db.session.commit()


###################################
# FUNCTION CALLS

# connect_to_db(app)
# fetch_book_metadata()
项目:QRadio    作者:QTek    | 项目源码 | 文件源码
def parser(self, response):
        if response:
            match = re.findall(r'API limit', response.text) #
            if match:
                self.error_log.error_log('Sorry API Limit reached (300 q/month). Get new: https://totalhash.cymru.com/contact-us/',
                                         self.station_name)
                return []

            xml_dict = xmltodict.parse(response.text)
            if 'response' in xml_dict:
                if 'result' in xml_dict['response']:
                    if 'doc' in xml_dict['response']['result']:
                        try:
                            response = xml_dict['response']['result'] # Unpacking xmltodict
                            for records in response['doc']:
                                self.hash_list.append(records['str']['#text']) # Append hash_values
                        except: pass
        return self.hash_list # Return empty
项目:GLaDOS2    作者:TheComet    | 项目源码 | 文件源码
def woeid_search(query):
    """
    Find the first Where On Earth ID for the given query. Result is the etree
    node for the result, so that location data can still be retrieved. Returns
    None if there is no result, or the woeid field is empty.
    """
    glados.log('woeid_search for: "{}"'.format(query))
    query = urllib.parse.quote('select * from geo.places where text="{}"'.format(query))
    query = 'http://query.yahooapis.com/v1/public/yql?q=' + query
    glados.log('Request: {}'.format(query))
    body = urllib.request.urlopen(query).read()
    parsed = xmltodict.parse(body).get('query')
    results = parsed.get('results')
    if results is None or results.get('place') is None:
        return None
    if type(results.get('place')) is list:
        return results.get('place')[0]
    return results.get('place')
项目:GLaDOS2    作者:TheComet    | 项目源码 | 文件源码
def get_woeid(self, message, user):
        if user == '':
            user = message.author.name

        try:
            woeid = self.woeid_db[user.lower()]

            query = urllib.parse.quote('select * from weather.forecast where woeid="{}" and u=\'c\''.format(woeid))
            query = 'http://query.yahooapis.com/v1/public/yql?q=' + query
            glados.log('Request: {}'.format(query))

            body = urllib.request.urlopen(query).read()
            parsed = xmltodict.parse(body).get('query')
            results = parsed.get('results')

            if results is None:
                await self.client.send_message(message.channel, 'Couldn\'t look up location. The WOEID of {} is: {}'.format(user, woeid))
                return

            location = results.get('channel').get('title')
            await self.client.send_message(message.channel, 'Location of {} is {}'.format(user, location))
        except KeyError:
            await self.client.send_message(message.channel, 'No location set. You can use .setlocation to set one')
项目:frog4-datastore    作者:netgroup-polito    | 项目源码 | 文件源码
def _transform_yang_to_dict(yang_model_string):
    class Opts(object):
        def __init__(self, yin_canonical=False, yin_pretty_strings=True):
            self.yin_canonical = yin_canonical
            self.yin_pretty_strings = yin_pretty_strings

    ctx = Context(FileRepository())
    yang_mod = ctx.add_module('yang', yang_model_string, format='yang')

    yin = YINPlugin()
    modules = []
    modules.append(yang_mod)
    ctx.opts = Opts()
    yin_string = StringIO()
    yin.emit(ctx=ctx, modules=modules, fd=yin_string)
    xml = yin_string.getvalue()
    return xmltodict.parse(xml)
项目:shakecast    作者:usgs    | 项目源码 | 文件源码
def import_user_xml(xml_file='', _user=None):
    '''
    Import an XML file created by the ShakeCast workbook; Users

    Args:
        xml_file (string): The filepath to the xml_file that will be uploaded
        _user (int): User id of admin making inventory changes

    Returns:
        dict: a dictionary that contains information about the function run
        ::
            data = {'status': either 'finished' or 'failed',
                    'message': message to be returned to the UI,
                    'log': message to be added to ShakeCast log
                           and should contain info on error}
    '''
    with open(xml_file, 'r') as xml_str:
        user_xml_dict = json.loads(json.dumps(xmltodict.parse(xml_str)))
        user_list = user_xml_dict['UserTable']['UserRow']
        if isinstance(user_list, list) is False:
            user_list = [user_list]

    data = import_user_dicts(user_list, _user)

    return data
项目:shakecast    作者:usgs    | 项目源码 | 文件源码
def determine_xml(xml_file=''):
    '''
    Determine what type of XML file this is; facility, group, 
    user, master, or unknown
    '''
    tree = ET.parse(xml_file)
    root = tree.getroot()

    xml_type = ''
    if 'FacilityTable' in str(root):
        xml_type = 'facility'
    elif 'GroupTable' in str(root):
        xml_type = 'group'
    elif 'UserTable' in str(root):
        xml_type = 'user'
    elif 'Inventory' in str(root):
        xml_type = 'master'
    else:
        xml_type = 'unknown'

    return xml_type
项目:pyldfire    作者:seanthegeek    | 项目源码 | 文件源码
def submit_file(self, file_obj, filename="sample"):
        """Submits a file to WildFire for analysis

        Args:
            file_obj (file): The file to send
            filename (str): An optional filename

        Returns:
            dict: Analysis results

        Raises:
             WildFireException: If an API error occurs
        """

        url = "{0}{1}".format(self.api_root, "/submit/file")
        data = dict(apikey=self.api_key)
        files = dict(file=(filename, file_obj))
        response = self.session.post(url, data=data, files=files)

        return xmltodict.parse(response.text)['wildfire']['upload-file-info']
项目:pyldfire    作者:seanthegeek    | 项目源码 | 文件源码
def submit_remote_file(self, url):
        """Submits a file from a remote URL for analysis

        Args:
            url (str): The URL where the file is located

        Returns:
            dict: Analysis results

        Raises:
             WildFireException: If an API error occurs

        Notes:
            This is for submitting files located at remote URLs, not web pages.

        See Also:
            submit_urls(self, urls)
        """

        request_url = "{0}{1}".format(self.api_root, "/submit/url")
        data = dict(apikey=self.api_key, url=url)
        response = self.session.post(request_url, data=data)

        return xmltodict.parse(response.text)['wildfire']['upload-file-info']
项目:mu    作者:excamera    | 项目源码 | 文件源码
def lambda_handler(self, event, context):
      # Loop through records provided by S3 Event trigger
      self.logger.info("Working on bucket-key in S3...")
      # Extract the Key and Bucket names for the asset uploaded to S3
      key = event['key']
      bucket = event['bucket']
      self.logger.info("Bucket: {} \t Key: {}".format(bucket, key))
      # Generate a signed URL for the uploaded asset
      signed_url = self.get_signed_url(self.SIGNED_URL_EXPIRATION, bucket, key)
      self.logger.info("Signed URL: {}".format(signed_url))
      # Launch MediaInfo
      # Pass the signed URL of the uploaded asset to MediaInfo as an input
      # MediaInfo will extract the technical metadata from the asset
      # The extracted metadata will be outputted in XML format and
      # stored in the variable xml_output
      xml_output = subprocess.check_output(["mediainfo", "--full", "--output=XML", signed_url])
      self.logger.info("Output: {}".format(xml_output))
      xml_json = xmltodict.parse(xml_output)
      return self.write_job_spec_to_file(xml_json, bucket, key)
项目:wechat-async-sdk    作者:ihjmh    | 项目源码 | 文件源码
def _decrypt_message(self, msg, msg_signature, timestamp, nonce):
        """???????????????????

        :param msg: ?????POST?????
        :param msg_signature: ??????URL???msg_signature
        :param timestamp: ??????URL???timestamp
        :param nonce: ??????URL???nonce
        :return: ??????
        """
        timestamp = to_binary(timestamp)
        nonce = to_binary(nonce)
        if isinstance(msg, six.string_types):
            try:
                msg = xmltodict.parse(to_text(msg))['xml']
            except Exception as e:
                raise ParseError(e)

        encrypt = msg['Encrypt']
        signature = get_sha1_signature(self.__token, timestamp, nonce, encrypt)
        if signature != msg_signature:
            raise ValidateSignatureError()
        return self.__pc.decrypt(encrypt, self.__id)
项目:pyIceCat    作者:moonlitesolutions    | 项目源码 | 文件源码
def _parse(self, xml_file):

        if xml_file:
            data = ET.parse(xml_file).getroot()
        else:
            self.log.error("Failed to retrieve suppliers")
            return False

        '''
        Data is an XML ElementTree Object
        '''
        self.id_map = {}
        self.catid = ''
        self.catname = ''

        for elem in data.iter('SupplierMapping'):
            self.mfrid = elem.attrib['supplier_id']
            self.mfrname = elem.attrib['name']
            if not self.mfrname:
                self.mfrname = "Unknown"
            self.id_map[self.mfrid] = self.mfrname
        self.log.info("Parsed {} Manufacturers from IceCat Supplier Map".format(str(len(self.id_map.keys()))))
项目:pyIceCat    作者:moonlitesolutions    | 项目源码 | 文件源码
def _parse(self, xml_file):
        if xml_file.endswith('.gz'):
            with gzip.open(xml_file, 'rb') as f:
                data = ET.parse(f).getroot()
        else:
             data = ET.parse(xml_file).getroot()

        '''
        Data is an XML ElementTree Object
        '''
        self.id_map = {}
        self.catid = ''
        self.catname = ''
        self.findpath = 'Name[@langid="' + langid + '"]'
        for elem in data.iter('Category'):
            self.catid = elem.attrib['ID']
            for name in elem.iterfind(self.findpath):
                self.catname = name.attrib['Value']
                # only need one match
                break
            if not self.catname:
                self.catname = "Unknown"
            self.id_map[self.catid] = self.catname

        self.log.info("Parsed {} Categories from IceCat CategoriesList".format(str(len(self.id_map.keys()))))
项目:pyIceCat    作者:moonlitesolutions    | 项目源码 | 文件源码
def _parse(self, xml_file):
        self.xml_file = xml_file
        self.key_count = 0

        if not self.suppliers:
            self.suppliers = IceCatSupplierMapping(log=self.log, auth=self.auth, data_dir=self.data_dir)
        if not self.categories:
            self.categories = IceCatCategoryMapping(log=self.log, data_dir=self.data_dir, auth=self.auth)


        print("Parsing products from index file:", xml_file)
        with progressbar.ProgressBar(max_value=progressbar.UnknownLength) as self.bar:
            with open(self.xml_file, 'rb') as f:
                self.o = xmltodict.parse(f, attr_prefix='', postprocessor=self._postprocessor,
                    namespace_separator='', process_namespaces=True, namespaces=self._namespaces)
            f.closed

            # peel down to file key
            self.o = self.o['icecat-interface']['files.index']['file']
            self.log.info("Parsed {} products from IceCat catalog".format(str(len(self.o))))
        return len(self.o)
项目:azure-cli    作者:Azure    | 项目源码 | 文件源码
def list_publish_profiles(resource_group_name, name, slot=None):
    import xmltodict

    content = _generic_site_operation(resource_group_name, name,
                                      'list_publishing_profile_xml_with_secrets', slot)
    full_xml = ''
    for f in content:
        full_xml += f.decode()

    profiles = xmltodict.parse(full_xml, xml_attribs=True)['publishData']['publishProfile']
    converted = []
    for profile in profiles:
        new = {}
        for key in profile:
            # strip the leading '@' xmltodict put in for attributes
            new[key.lstrip('@')] = profile[key]
        converted.append(new)

    return converted
项目:rotest    作者:gregoil    | 项目源码 | 文件源码
def validate_result(self, test, result, traceback=""):
        """Validate adding result gives the expected output.

        Args:
            test (rotest.core.case.TestCase): the test its result was added.
            result (str): result to add to the test.
            traceback (str): the traceback of the test.

        Raises:
            AssertionError. the result wasn't added as expected.
        """
        if isinstance(test, TestBlock):
            return

        result_xml_file = os.path.join(test.work_dir,
                                       XMLHandler.XML_REPORT_PATH)

        expected_xml_file = self.expected_xml_files.next()

        expected_xml = xmltodict.parse(open(expected_xml_file, "rt").read(),
                                       dict_constructor=dict)
        result_xml = xmltodict.parse(open(result_xml_file, "rt").read(),
                                     dict_constructor=dict)

        self.assertEqual(expected_xml, result_xml)
项目:django-wechat-pay    作者:ChanMo    | 项目源码 | 文件源码
def post(self, request, *args, **kwargs):
        pay = PayApi()
        data = request.body
        data = dict(xmltodict.parse(data)['xml'])
        result = {}
        sign = data['sign']
        del data['sign']
        #check_sign = wx.get_sign(data)
        if sign:
            order_id = data['out_trade_no'][10:]
            pay_number = data['transaction_id']
            result = self.handle_order(order_id, pay_number)
        else:
            result['return_code'] = 'FAIL'
            result['return_msg'] = 'ERROR'

        result_xml = pay.dict_to_xml(result)
        return HttpResponse(result_xml)
项目:PyPlanet    作者:PyPlanet    | 项目源码 | 文件源码
def refresh_properties(self):
        if self._instance.game.game == 'tm':
            method = 'Trackmania.UI.GetProperties'
        else:
            method = 'Shootmania.UI.GetProperties'
        try:
            self._raw = await self._instance.gbx(method)
            self._properties = xd.parse(self._raw['raw_1'])
        except Exception as e:
            self._properties = dict()
            self._raw = None
项目:goodreads-api-client-python    作者:mdzhang    | 项目源码 | 文件源码
def _transform_res(res, transform: str='xml'):
        if transform == 'xml':
            content = xmltodict.parse(res.text)
            return content['GoodreadsResponse']
        if transform == 'json':
            content = json.loads(res.text)
            # This is just for consistency of return values across
            # different methods in this class - the ordering is not meaningful
            return OrderedDict(content.items())
        return res.text
项目:plexMusicPlayer    作者:Tyzer34    | 项目源码 | 文件源码
def getJsonFromPlex(url):
    response = requests.get(url)
    xml_obj = xmltodict.parse(response.text)
    json_obj = json.loads(json.dumps(xml_obj))
    return json_obj
项目:win32wifi    作者:kedos    | 项目源码 | 文件源码
def _parse_xml(self, xml):
        d = xmltodict.parse(xml)
        self.ssid = d['WLANProfile']['SSIDConfig']['SSID']['name']
项目:pydov    作者:DOV-Vlaanderen    | 项目源码 | 文件源码
def __init__(self, xml_doc):
        with open(xml_doc) as fd:
            self._doc = xmltodict.parse(fd.read(), process_namespaces=True,
                                        namespaces=namespaces)
            self.peilmetingen = self._get_peilmetingen_df()
            self.observaties = self._get_observaties_df()
            self.metadata_locatie = \
                self._doc["kern:dov-schema"]["grondwaterlocatie"]
            self.metadata_filters = self._get_filter_metadata()
项目:pydov    作者:DOV-Vlaanderen    | 项目源码 | 文件源码
def parse_wfs(response, layer, version):
        """A generator to parse the response from a wfs, depending on the
        server version

        Parameters
        ----------
        response : StringIO
            The response from a wfs.getfeature() query (OWSlib)
        layer : str
            The wfs layer that is queried
        version : str
            The version of the WFS server: only '1.1.0' and '2.0.0'

        """
        if version == "1.1.0":
            # convert layer preposition to null
            layer = 'null:' + layer.split(':')[1]
            # convert the response to a dictionary
            doc = xmltodict.parse(response)
            # yield the layers of the dict
            for a in doc['wfs:FeatureCollection']['gml:featureMembers']:
                yield (a[layer])
        elif version == "2.0.0":
            # convert the response to a dictionary
            doc = xmltodict.parse(response.read())
            # yield the layers of the dict
            for a in doc['wfs:FeatureCollection']['wfs:member']:
                yield (a[layer])
项目:sketal    作者:vk-brain    | 项目源码 | 文件源码
def process_message(self, msg: Message):
        command, text = self.parse_message(msg)

        if text.lower() in self.help_words:
            return await msg.answer("??????:\n" + "\n".join(self.description) + "\n\n????????? ????:\n" +
                                    ', '.join([k.capitalize() for k in self.news.keys()]))

        url = self.news["???????"]
        if text.lower() in self.news:
            url = self.news[text]

        async with aiohttp.ClientSession() as sess:
            async with sess.get(url) as resp:
                xml = xmltodict.parse(await resp.text())

                if "rss" not in xml or "channel" not in xml["rss"] or "item" not in xml["rss"]["channel"]:
                    return await msg.answer(self.error)

                items = xml["rss"]["channel"]["item"]
                item = choice(items)

                if "title" not in item or "description" not in item:
                    return await msg.answer(self.error)

                return await msg.answer(f'?? {item["title"]}\n'
                                        f'?? {item["description"]}')
项目:django-wechat-example    作者:wechatpy    | 项目源码 | 文件源码
def preprocess_message(self, request):
        component = get_component()
        content = component.crypto.decrypt_message(
            request.body,
            request.query_params['msg_signature'],
            int(request.query_params['timestamp']),
            int(request.query_params['nonce'])
        )
        message = xmltodict.parse(to_text(content))['xml']
        cc = json.loads(json.dumps(message))
        cc['CreateTime'] = int(cc['CreateTime'])
        cc['CreateTime'] = datetime.fromtimestamp(cc['CreateTime'])
        if 'MsgId' in cc:
            cc['MsgId'] = int(cc['MsgId'])
        return cc
项目:Server    作者:malaonline    | 项目源码 | 文件源码
def wx_xml2dict(xmlstr):
    return xmltodict.parse(xmlstr)['xml']
项目:jcsclient    作者:jiocloudservices    | 项目源码 | 文件源码
def display(self, response, headers, webobject=True):
        resp_json = ""
        request_id = None
        try:
            if headers and headers.get('x-jcs-request-id'):
                request_id = headers.get('x-jcs-request-id')
            elif headers and headers.get('request-id'):
                request_id = headers.get('request-id')
            if response:
                if webobject:
                    resp_dict = json.loads(response)
                else:
                    resp_dict = response
                if not request_id:
                    request_id = utils.requestid_in_response(resp_dict)
                resp_json = json.dumps(resp_dict, indent=4, sort_keys=True)
        except:
            try:
                resp_ordereddict = xmltodict.parse(response)
                resp_json = json.dumps(resp_ordereddict, indent=4,
                                       sort_keys=True)
                resp_dict = json.loads(resp_json)
                if not request_id:
                    request_id = utils.requestid_in_response(resp_dict)
                resp_json = json.dumps(resp_dict, indent=4, sort_keys=True)
                resp_json = resp_json.replace("\\n", "\n")
                resp_json = resp_json.replace("\\", "")
            except Exception as e:
                raise e
                #raise exception.UnknownOutputFormat()
        # Handle request-id displaying
        if not request_id:
            raise exception.UnknownOutputFormat()
        output_msg = resp_json
        output_msg += "\nRequest-Id: " + request_id
        print(output_msg)
项目:xapitodict    作者:mseri    | 项目源码 | 文件源码
def load_xml_into_raw_dict(filename):
    """
    Returns a raw dict containing an xml dump
    using `xmltodict.parse`.
    """
    with open(filename) as xapi_dump:
        dump = xmltodict.parse(xapi_dump.read())
        return dump['database']