Python xmltodict 模块,unparse() 实例源码

我们从Python开源项目中,提取了以下28个代码示例,用于说明如何使用xmltodict.unparse()

项目:PyPlanet    作者:PyPlanet    | 项目源码 | 文件源码
def send_properties(self, **kwargs):
        if not self._properties or 'ui_properties' not in self._properties:
            return

        # Decide the method to use.
        if self._instance.game.game == 'tm':
            method = 'Trackmania.UI.SetProperties'
        else:
            method = 'Shootmania.UI.SetProperties'

        # Create XML document
        try:
            xml = xd.unparse(self._properties, full_document=False, short_empty_elements=True)
        except Exception as e:
            logger.warning('Can\'t convert UI Properties to XML document! Error: {}'.format(str(e)))
            return

        try:
            await self._instance.gbx(method, xml, encode_json=False, response_id=False)
        except Exception as e:
            logger.warning('Can\'t send UI Properties! Error: {}'.format(str(e)))
            return
项目:wechat_mall    作者:a741424975game    | 项目源码 | 文件源码
def prepare_request(self, method, path, params):
        kwargs = {}
        _params = self.get_base_params()
        params.update(_params)
        newparams, prestr = params_filter(params)
        sign = build_mysign(prestr, self.partner_key)
        # ??????unicode xmltodict ???unicode
        newparams = params_encoding(newparams)
        newparams['sign'] = sign
        xml_dict = {'xml': newparams}
        kwargs['data'] = smart_str(xmltodict.unparse(xml_dict))
        url = self._full_url(path)
        if self.mch_cert and self.mch_key:
            kwargs['cert'] = (self.mch_cert, self.mch_key)
        return method, url, kwargs

    # ????
    # https://pay.weixin.qq.com/wiki/doc/api/jsapi.php?chapter=9_1
项目:wechat_mall    作者:a741424975game    | 项目源码 | 文件源码
def prepare_request(self, method, path, params):
        kwargs = {}
        _params = self.get_base_params()
        params.update(_params)
        newparams, prestr = params_filter(params)
        sign = build_mysign(prestr, key=self.partner_key)
        # ??????unicode xmltodict ???unicode
        newparams = params_encoding(newparams)
        newparams['sign'] = sign
        xml_dict = {'xml': newparams}
        kwargs['data'] = smart_str(xmltodict.unparse(xml_dict))
        url = self._full_url(path)
        if self.mch_cert and self.mch_key:
            kwargs['cert'] = (self.mch_cert, self.mch_key)
        return method, url, kwargs

    # ????
    # https://pay.weixin.qq.com/wiki/doc/api/jsapi.php?chapter=9_1
项目:TRX    作者:krmaxwell    | 项目源码 | 文件源码
def setUpClass(self):
        self.m_xml = ""
        self.m_data = {}
        self.m_data['MaltegoMessage'] = {}
        self.m_data['MaltegoMessage']['MaltegoTransformRequestMessage'] = {}
        self.m_data['MaltegoMessage']['MaltegoTransformRequestMessage']['Entities'] = {}
        self.m_data['MaltegoMessage']['MaltegoTransformRequestMessage']['Limits'] = {}
        self.m_data['MaltegoMessage']['MaltegoTransformRequestMessage']['Limits']["@HardLimit"] = 50
        self.m_data['MaltegoMessage']['MaltegoTransformRequestMessage']['Limits']["@SoftLimit"] = 50
        self.m_data['MaltegoMessage']['MaltegoTransformRequestMessage']['TransformFields'] = {}
        self.m_data['MaltegoMessage']['MaltegoTransformRequestMessage']['TransformFields']['Field'] = []
        self.m_data['MaltegoMessage']['MaltegoTransformRequestMessage']['TransformFields']['Field'].append({"@Name": "api", "#text": "JUSTKIDDING"})
        ent_data = {}
        ent_data['@Type'] = "IPAddress"
        ent_data['Value'] = "127.0.0.1"
        ent_data['Weight'] = "100"
        ent_data['AdditionalFields'] = {}
        ent_data['AdditionalFields']['Field'] = []
        ent_data['AdditionalFields']['Field'].append({"@Name": "ipv4-address", "@DisplayName": "IP Address", "#text": "127.0.0.1"})
        ent_data['AdditionalFields']['Field'].append({"@Name": "ipaddress.internal", "@DisplayName": "Internal", "#text": "true"})
        self.m_data['MaltegoMessage']['MaltegoTransformRequestMessage']['Entities']['Entity'] = ent_data
        self.m_xml = xmltodict.unparse(self.m_data)
项目:knittingpattern    作者:fossasia    | 项目源码 | 文件源码
def instruction_to_svg(self, instruction):
        """:return: an SVG representing the instruction.

        The SVG file is determined by the type attribute of the instruction.
        An instruction of type ``"knit"`` is looked for in a file named
        ``"knit.svg"``.

        Every element inside a group labeled ``"color"`` of mode ``"layer"``
        that has a ``"fill"`` style gets this fill replaced by the color of
        the instruction.
        Example of a recangle that gets filled like the instruction:

        .. code:: xml

            <g inkscape:label="color" inkscape:groupmode="layer">
                <rect style="fill:#ff0000;fill-opacity:1;fill-rule:nonzero"
                      id="rectangle1" width="10" height="10" x="0" y="0" />
            </g>

        If nothing was loaded to display this instruction, a default image is
        be generated by :meth:`default_instruction_to_svg`.
        """
        return xmltodict.unparse(self.instruction_to_svg_dict(instruction))
项目:cloudpunch    作者:target    | 项目源码 | 文件源码
def write_jmeter_config(self, jconfig, target):
        with open(ORIGINAL_JMETER_FILE, 'r') as f:
            default_jmeter_config = f.read()

        parsed_xml = xmltodict.parse(default_jmeter_config)
        xml_short = parsed_xml['jmeterTestPlan']['hashTree']['hashTree']

        # Change thread number
        xml_short['ThreadGroup']['stringProp'][1]['#text'] = str(jconfig['threads'])
        # Change ramp up time
        xml_short['ThreadGroup']['stringProp'][2]['#text'] = str(jconfig['ramp-up'])
        # Change duration
        xml_short['ThreadGroup']['stringProp'][3]['#text'] = str(jconfig['duration'])
        # Change target
        xml_short['hashTree']['HTTPSamplerProxy']['stringProp'][0]['#text'] = target
        # Change port
        xml_short['hashTree']['HTTPSamplerProxy']['stringProp'][1]['#text'] = str(jconfig['port'])
        # Change path
        xml_short['hashTree']['HTTPSamplerProxy']['stringProp'][6]['#text'] = str(jconfig['path'])

        with open(NEW_JMETER_FILE, 'w') as f:
            f.write(xmltodict.unparse(parsed_xml).encode('utf-8'))
项目:Server    作者:malaonline    | 项目源码 | 文件源码
def wx_dict2xml(d):
    return xmltodict.unparse({'xml': d}, full_document=False)
项目:pyChomikBox    作者:JuniorJPDJ    | 项目源码 | 文件源码
def pack(name, dict_data, *args, **kwargs):
        if '@xmlns' not in dict_data:
            dict_data['@xmlns'] = 'http://chomikuj.pl/'
        data = {
            's:Envelope': {'s:Body': {name: dict_data}, '@s:encodingStyle': 'http://schemas.xmlsoap.org/soap/encoding/',
                           '@xmlns:s': 'http://schemas.xmlsoap.org/soap/envelope/'}}
        return xmltodict.unparse(data, *args, **kwargs)
项目:PiModules    作者:modmypi    | 项目源码 | 文件源码
def write_config_xml(xmlfile, dict):
    try:
            with open(xmlfile, "wt") as fo:
                    xmltodict.unparse(dict, fo, pretty=True)
    except IOError as e:
        print "Error writing XML file: ", e
        return False

    return True
项目:django-sofortueberweisung    作者:ParticulateSolutions    | 项目源码 | 文件源码
def test_known_transaction_known_at_sofort_received(self):
        client = Client()
        self._create_test_transaction(transaction_id='123-abc-received')
        post_data = {'status_notification': {'transaction': '123-abc-received'}}
        xml_data = xmltodict.unparse(post_data)
        response = client.post('/sofort/notify/', data=xml_data, content_type='application/hal+json')
        self.assertEqual(response.status_code, 202)
项目:django-sofortueberweisung    作者:ParticulateSolutions    | 项目源码 | 文件源码
def test_known_transaction_known_at_sofort_loss(self):
        client = Client()
        self._create_test_transaction(transaction_id='123-abc-loss')
        post_data = {'status_notification': {'transaction': '123-abc-loss'}}
        xml_data = xmltodict.unparse(post_data)
        response = client.post('/sofort/notify/', data=xml_data, content_type='application/hal+json')
        self.assertEqual(response.status_code, 202)
项目:django-sofortueberweisung    作者:ParticulateSolutions    | 项目源码 | 文件源码
def test_known_transaction_unknown_at_sofort(self):
        client = Client()
        self._create_test_transaction(transaction_id='123-abc-unknown')
        post_data = {'status_notification': {'transaction': '123-abc-unknown'}}
        xml_data = xmltodict.unparse(post_data)
        response = client.post('/sofort/notify/', data=xml_data, content_type='application/hal+json')
        self.assertEqual(response.status_code, 400)
项目:DevOps    作者:YoLoveLife    | 项目源码 | 文件源码
def _winrm_send_input(self, protocol, shell_id, command_id, stdin, eof=False):
        rq = {'env:Envelope': protocol._get_soap_header(
            resource_uri='http://schemas.microsoft.com/wbem/wsman/1/windows/shell/cmd',
            action='http://schemas.microsoft.com/wbem/wsman/1/windows/shell/Send',
            shell_id=shell_id)}
        stream = rq['env:Envelope'].setdefault('env:Body', {}).setdefault('rsp:Send', {})\
            .setdefault('rsp:Stream', {})
        stream['@Name'] = 'stdin'
        stream['@CommandId'] = command_id
        stream['#text'] = base64.b64encode(to_bytes(stdin))
        if eof:
            stream['@End'] = 'true'
        protocol.send_message(xmltodict.unparse(rq))
项目:cetusshop    作者:icetusorg    | 项目源码 | 文件源码
def _output_convert(output_type, data):
    output_switch = {'dict': data,
                     'raw': data,
                     'json': json.dumps(data, indent=4),
                     'xml': xmltodict.unparse({'root': data})}
    #output_switch['json'] = output_switch['json'].replace('\n',''),replace('\\','')
    #print(output_switch['json'])
    return output_switch.get(output_type, None)
项目:automatic-repo    作者:WZQ1397    | 项目源码 | 文件源码
def pythonJsonToXml():
    dictVal = {
        'page': {
            'title': 'King Crimson',
            'ns': 0,
            'revision': {
                'id': 547909091,
            }
        }
    }
    convertedXml = xmltodict.unparse(dictVal)
    print("convertedXml=\n",convertedXml)
###############################################################################
项目:table2xml    作者:phiedulxp    | 项目源码 | 文件源码
def html2xml(html):
    soup = BeautifulSoup(html)
    json_ = get_deep_table(soup.table)
    return xmltodict.unparse(json_, pretty=True)
项目:knittingpattern    作者:fossasia    | 项目源码 | 文件源码
def default_instruction_to_svg(self, instruction):
        """As :meth:`instruction_to_svg` but it only takes the ``default.svg``
        file into account.

        In case no file is found for an instruction in
        :meth:`instruction_to_svg`,
        this method is used to determine the default svg for it.

        The content is created by replacing the text ``{instruction.type}`` in
        the whole svg file named ``default.svg``.

        If no file ``default.svg`` was loaded, an empty string is returned.
        """
        svg_dict = self.default_instruction_to_svg_dict(instruction)
        return xmltodict.unparse(svg_dict)
项目:knittingpattern    作者:fossasia    | 项目源码 | 文件源码
def write_to_file(self, file):
        """Writes the current SVG to the :paramref:`file`.

        :param file: a file-like object
        """
        xmltodict.unparse(self._structure, file, pretty=True)
项目:knittingpattern    作者:fossasia    | 项目源码 | 文件源码
def _dump_to_file(self, file):
        """dump to the file"""
        xmltodict.unparse(self.object(), file, pretty=True)
项目:ansible-provider-docs    作者:alibaba    | 项目源码 | 文件源码
def _winrm_send_input(self, protocol, shell_id, command_id, stdin, eof=False):
        rq = {'env:Envelope': protocol._get_soap_header(
            resource_uri='http://schemas.microsoft.com/wbem/wsman/1/windows/shell/cmd',
            action='http://schemas.microsoft.com/wbem/wsman/1/windows/shell/Send',
            shell_id=shell_id)}
        stream = rq['env:Envelope'].setdefault('env:Body', {}).setdefault('rsp:Send', {})\
            .setdefault('rsp:Stream', {})
        stream['@Name'] = 'stdin'
        stream['@CommandId'] = command_id
        stream['#text'] = base64.b64encode(to_bytes(stdin))
        if eof:
            stream['@End'] = 'true'
        protocol.send_message(xmltodict.unparse(rq))
项目:prestashop_api    作者:savaskoc    | 项目源码 | 文件源码
def _request(self, method, path, params=None, data=None, files=None):
        if data is not None:
            data = xmltodict.unparse({'prestashop': data}).encode('utf-8')
        res = requests.request(method, self._get_url(path), auth=(self.key, ''), params=params, data=data, files=files)
        return self._check_response(res, xmltodict.parse(res.text)['prestashop'] if not files and res.text else None)
项目:course-data-tools    作者:StoDevX    | 项目源码 | 文件源码
def xmlify(data):
    for course in data:
        if 'revisions' in course:
            course['revisions'] = [OrderedDict(sorted(rev.items()))
                                   for rev in course['revisions']]
    data = [OrderedDict(sorted(c.items())) for c in data]
    massaged = {'root': {'course': data}}
    return xmltodict.unparse(massaged, pretty=True)
项目:course-data-tools    作者:StoDevX    | 项目源码 | 文件源码
def load_data_from_server(term, dry_run=False):
    try:
        url = build_static_term_url(term)
        parsed_data = request_data(url, term)
    except BadDataException:
        print(f'{term}: static file is invlid xml; attempting database query')
        url = build_term_url(term)
        try:
            parsed_data = request_data(url, term)
        except BadDataException:
            print(f'{term}: fetching attempt #1 failed')
            try:
                parsed_data = request_data(url, term)
            except BadDataException:
                print(f'{term}: fetching attempt #2 failed')
                try:
                    parsed_data = request_data(url, term)
                except BadDataException:
                    print(f'{term}: fetching attempt #3 failed')
                    print(f'{term}: no xml returned after three tries')
                    return None

    if not parsed_data['searchresults']:
        logging.info(f'No data returned for {term}')
        return None

    # We sort the courses here, before we save it to disk, so that we don't
    # need to re-sort every time we load from disk.
    parsed_data['searchresults']['course'].sort(key=lambda c: c['clbid'])

    # Embed the term into each course individually
    for course in parsed_data['searchresults']['course']:
        course['term'] = term

    if not dry_run:
        destination = make_xml_term_path(term)
        serialized_data = xmltodict.unparse(parsed_data, pretty=True)
        save_data(serialized_data, destination)
        logging.debug(f'Fetched {destination}')

    return parsed_data
项目:localstack    作者:localstack    | 项目源码 | 文件源码
def get_cors(bucket_name):
    response = Response()
    cors = BUCKET_CORS.get(bucket_name)
    if not cors:
        # TODO: check if bucket exists, otherwise return 404-like error
        cors = {
            'CORSConfiguration': []
        }
    body = xmltodict.unparse(cors)
    response._content = body
    response.status_code = 200
    return response
项目:localstack    作者:localstack    | 项目源码 | 文件源码
def get_lifecycle(bucket_name):
    response = Response()
    lifecycle = BUCKET_LIFECYCLE.get(bucket_name)
    if not lifecycle:
        # TODO: check if bucket exists, otherwise return 404-like error
        lifecycle = {
            'LifecycleConfiguration': []
        }
    body = xmltodict.unparse(lifecycle)
    response._content = body
    response.status_code = 200
    return response
项目:fnapy    作者:alexandriagroup    | 项目源码 | 文件源码
def dict2xml(_dict):
    """Returns a XML string from the input dictionary"""
    xml = xmltodict.unparse(_dict, pretty=True)
    xmlepured = remove_namespace(xml)
    return xmlepured
项目:lago    作者:lago-project    | 项目源码 | 文件源码
def dict_to_xml(spec, full_document=False):
    """
    Convert dict to XML

    Args:
        spec(dict): dict to convert
        full_document(bool): whether to add XML headers

    Returns:
        lxml.etree.Element: XML tree
    """

    middle = xmltodict.unparse(spec, full_document=full_document, pretty=True)
    return lxml.etree.fromstring(middle)
项目:localstack    作者:atlassian    | 项目源码 | 文件源码
def get_cors(bucket_name):
    response = Response()
    cors = BUCKET_CORS.get(bucket_name)
    if not cors:
        # TODO: check if bucket exists, otherwise return 404-like error
        cors = {
            'CORSConfiguration': []
        }
    body = xmltodict.unparse(cors)
    response._content = body
    response.status_code = 200
    return response