我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用lxml.etree.fromstring()。
def test_update_id(self): """Check use of family_id on update when family id is different from item_id (i.e. on updated item), family_id should be used for doc-id and ntbid """ article = copy.deepcopy(self.article) family_id = "test_family_id" article['family_id'] = family_id article['rewrite_sequence'] = 3 formatter_output = self.formatter.format(article, {'name': 'Test NTBNITF'}) doc = formatter_output[0]['encoded_item'] nitf_xml = etree.fromstring(doc) head = nitf_xml.find('head') ntb_id = head.find('meta[@name="NTBID"]') self.assertEqual(ntb_id.get('content'), 'NTB' + family_id) doc_id = nitf_xml.find('head/docdata/doc-id') self.assertEqual(doc_id.get('regsrc'), 'NTB') self.assertEqual(doc_id.get('id-string'), 'NTB{}_{:02}'.format(family_id, 3))
def test_body_none(self): article = copy.deepcopy(self.article) article['body_html'] = None formatter_output = self.formatter.format(article, {'name': 'Test NTBNITF'}) # the test will raise an exception during self.formatter.format if SDNTB-420 bug is still present # but we also check that body.content is there doc = formatter_output[0]['encoded_item'] nitf_xml = etree.fromstring(doc) expected = (""" <body.content> <p class="lead" lede="true">This is the abstract</p> <p class="txt">footer text</p> <media media-type="image" class="illustrasjonsbilde"> <media-reference mime-type="image/jpeg" source="test_id"/> <media-caption>test feature media</media-caption> </media> </body.content>""").replace('\n', '').replace(' ', '') content = etree.tostring(nitf_xml.find('body/body.content'), encoding="unicode").replace('\n', '').replace(' ', '') self.assertEqual(content, expected)
def _get_remote_md5(self): """Return the md5 sum of the remote file, if it exists. """ E = action_element_maker() top = E.top( E.FileSystem( E.Files( E.File( E.SrcName(self.dst), E.Operations( E.md5sum() ) ) ) ) ) nc_get_reply = self.device.action(top) reply_ele = etree.fromstring(nc_get_reply.xml) md5sum = find_in_action('md5sum', reply_ele) if md5sum is not None: return md5sum.text.strip()
def create_remote_dir(self): """Create the remote directory. Raises: FileCreateDirectoryError: if the directory could not be created. """ E = action_element_maker() top = E.top( E.FileSystem( E.Files( E.File( E.SrcName(self._remote_dir.strip('/')), E.Operations( E.MkDir() ) ) ) ) ) nc_get_reply = self.device.action(top) reply_ele = etree.fromstring(nc_get_reply.xml) self.remote_dir_exists = True
def __init__(self, to_url, action_ns_prefix, action, resource_uri, additional_namespaces=None): self._nsmap = copy.deepcopy(NS) if additional_namespaces: self._nsmap.update(additional_namespaces) # NS shortcuts self._action_ns_prefix = action_ns_prefix self._resource_uri = resource_uri # Use a WSMAN SOAP Template to save on the boiler plate self._root = etree.fromstring(self.ENVELOPE_TEMPLATE) # Update the To self._set_text("/s:Envelope/s:Header/wsa:To", to_url) # Set the action action_uri = "{}/{}".format(self._nsmap[action_ns_prefix], action) self._set_text("/s:Envelope/s:Header/wsa:Action", action_uri) # Set the Resource URI self._set_text("/s:Envelope/s:Header/wsman:ResourceURI", resource_uri)
def _download(self): """ Downloads the data from the WFS. :return: Top element of the parsed XML document. :rtype: xml.etree.ElementTree """ logger.info('Getting data from the server.') try: wfs = WebFeatureService( url='https://kartta.hel.fi/ws/geoserver/avoindata/wfs', version='2.0.0', ) response = wfs.getfeature( typename='avoindata:liikennemerkkipilotti_pysakointipaikat', ) return etree.fromstring(bytes(response.getvalue(), 'UTF-8')) except Exception: logger.error('Unable to get data from the server.', exc_info=True)
def fields_view_get(self, view_id=None, view_type='form', toolbar=False, submenu=False): """ For configurable products switch the name field with the config_name so as to keep the view intact in whatever form it is at the moment of execution and not duplicate the original just for the sole purpose of displaying the proper name""" res = super(ProductProduct, self).fields_view_get( view_id=view_id, view_type=view_type, toolbar=toolbar, submenu=submenu ) if self.env.context.get('default_config_ok'): xml_view = etree.fromstring(res['arch']) xml_name = xml_view.xpath("//field[@name='name']") xml_label = xml_view.xpath("//label[@for='name']") if xml_name: xml_name[0].attrib['name'] = 'config_name' if xml_label: xml_label[0].attrib['for'] = 'config_name' view_obj = self.env['ir.ui.view'] xarch, xfields = view_obj.postprocess_and_fields(self._name, xml_view, view_id) res['arch'] = xarch res['fields'] = xfields return res
def get_user_data(self, user: str) -> UserInfo: """ :param user: username who's information we're getting :return type list: """ # List that stores all the UserInfo Objects to return with aiohttp.ClientSession(auth=self._auth, headers={"User-Agent": self.user_agent}) as session: async with session.get(MAL_APP_INFO, params={"u": user}) as response: # Raise an error if we get the wrong response code if response.status != 200: raise ResponseError(response.status) response_data = await response.read() # We want the [0] index as myanimelist always returns the user data first user_info = etree.fromstring(response_data)[0] # Add to list containing UserInfo objects return UserInfo( id=user_info.find("user_id").text, username=user_info.find("user_name").text, watching=user_info.find("user_watching").text, completed=user_info.find("user_completed").text, on_hold=user_info.find("user_onhold").text, dropped=user_info.find("user_dropped").text, plan_to_watch=user_info.find("user_plantowatch").text, days_spent_watching=user_info.find("user_days_spent_watching").text )
def getAudioMetadata(fileRef): args = [config.mediaInfoExe] args.append( "--Output=EBUCore") args.append(fileRef) # Command line as string (used for logging purposes only) cmdStr = " ".join(args) status, out, err = shared.launchSubProcess(args) # Configure XML parser to get rid of blank lines in MediaInfo output parser = etree.XMLParser(remove_blank_text=True) # Parse string to element #outElt = etree.fromstring(out.encode('utf-8')) outElt = etree.XML(out.encode('utf-8'), parser=parser) # Main results to dictionary dictOut = {} dictOut["cmdStr"] = cmdStr dictOut["status"] = status dictOut["outElt"] = outElt dictOut["stderr"] = err return(dictOut)
def _le_xml(self, arquivo): if arquivo is None: return False if not isinstance(arquivo, basestring): arquivo = etree.tounicode(arquivo) if arquivo is not None: if isinstance(arquivo, basestring): if NAMESPACE_NFSE in arquivo: arquivo = por_acentos(arquivo) if u'<' in arquivo: self._xml = etree.fromstring(tira_abertura(arquivo)) else: arq = open(arquivo) txt = ''.join(arq.readlines()) txt = tira_abertura(txt) arq.close() self._xml = etree.fromstring(txt) else: self._xml = etree.parse(arquivo) return True return False
def validar(self): arquivo_esquema = self.caminho_esquema + self.arquivo_esquema # Aqui é importante remover a declaração do encoding # para evitar erros de conversão unicode para ascii xml = tira_abertura(self.xml).encode(u'utf-8') esquema = etree.XMLSchema(etree.parse(arquivo_esquema)) if not esquema.validate(etree.fromstring(xml)): for e in esquema.error_log: if e.level == 1: self.alertas.append(e.message.replace('{http://www.portalfiscal.inf.br/nfe}', '')) elif e.level == 2: self.erros.append(e.message.replace('{http://www.portalfiscal.inf.br/nfe}', '')) return esquema.error_log
def parse_rsc_html(htmlstring): """Messy RSC HTML needs this special parser to fix problems before creating selector.""" converted = UnicodeDammit(htmlstring) if not converted.unicode_markup: raise UnicodeDecodeError('Failed to detect encoding, tried [%s]') root = fromstring(htmlstring, parser=HTMLParser(recover=True, encoding=converted.original_encoding)) # Add p.otherpara tags around orphan text newp = None for child in root.get_element_by_id('wrapper'): if newp is not None: if child.tag in BLOCK_ELEMENTS or child.get('id', '').startswith('sect') or child.getnext() is None: child.addprevious(newp) newp = None else: newp.append(child) if newp is None and child.tag in BLOCK_ELEMENTS and child.tail and child.tail.strip(): newp = Element('p', **{'class': 'otherpara'}) newp.text = child.tail child.tail = '' return root
def clean_markup(self, markup, parser=None): """Apply ``Cleaner`` to markup string or document and return a cleaned string or document.""" result_type = type(markup) if isinstance(markup, six.string_types): doc = fromstring(markup, parser=parser) else: doc = copy.deepcopy(markup) self(doc) if issubclass(result_type, six.binary_type): return tostring(doc, encoding='utf-8') elif issubclass(result_type, six.text_type): return tostring(doc, encoding='unicode') else: return doc #: A default Cleaner instance, which kills comments, processing instructions, script tags, style tags.
def getTagsFromFile(text): ''' get statistics on general tags and properties tags''' tree = etree.fromstring(text) #root = tree.getroot() gentags ={} proptags ={} for item in tree.xpath('//*'): proptag='' if getTagString(item) in propertysTagList: for child in item.getchildren(): childstr=getTagString(child) if childstr != '': proptag = ptag(item)+"-"+childstr+getAttrString(child) if proptag != '': if proptags.has_key(proptag): proptags[proptag] += 1 else: proptags[proptag] = 1 gentag=ptag(item) if gentag != '': if gentags.has_key(gentag): gentags[gentag] += 1 else: gentags[gentag] = 1 return gentags, proptags
def del_tags_from_xml(xml, tag_list=[]): """ It deletes the tags either by their names or xpath Arguments: 1.xml: It takes xml file path or xml string as input 2.tag_list: It contains list of tags which needs to be removed Returns: It returns xml string """ if os.path.exists(xml): tree = ElementTree.parse(xml) root = tree.getroot() else: root = ElementTree.fromstring(xml) for tag in tag_list: if 'xpath=' in tag: tag = tag.strip('xpath=') req_tags = getChildElementsListWithSpecificXpath(root, tag) else: req_tags = getChildElementsListWithSpecificXpath(root, ".//{0}".format(tag)) recursive_delete_among_children(root, req_tags) xml_string = ElementTree.tostring(root, encoding='utf-8', method='xml') return xml_string
def pull(self, resource_uri, context, max_elems=100): """Executes pull operation over WSMan. :param resource_uri: URI of resource to pull :param context: enumeration context :param max_elems: maximum number of elements returned by the operation :returns: an lxml.etree.Element object of the response received :raises: WSManRequestFailure on request failures :raises: WSManInvalidResponse when receiving invalid response """ payload = _PullPayload(self.endpoint, resource_uri, context, max_elems) resp = self._do_request(payload) resp_xml = ElementTree.fromstring(resp.content) return resp_xml
def invoke(self, resource_uri, method, selectors, properties): """Executes invoke operation over WSMan. :param resource_uri: URI of resource to invoke :param method: name of the method to invoke :param selector: dict of selectors :param properties: dict of properties :returns: an lxml.etree.Element object of the response received. :raises: WSManRequestFailure on request failures :raises: WSManInvalidResponse when receiving invalid response """ payload = _InvokePayload(self.endpoint, resource_uri, method, selectors, properties) resp = self._do_request(payload) resp_xml = ElementTree.fromstring(resp.content) return resp_xml
def extract_svg_content(filename): prefix = unique_prefix() + "_" root = etree.parse(filename).getroot() # We have to ensure all Ids in SVG are unique. Let's make it nasty by # collecting all ids and doing search & replace # Potentially dangerous (can break user text) ids = [] for el in root.getiterator(): if "id" in el.attrib and el.attrib["id"] != "origin": ids.append(el.attrib["id"]) with open(filename) as f: content = f.read() for i in ids: content = content.replace("#"+i, "#" + prefix + i) root = etree.fromstring(content) # Remove SVG namespace to ease our lifes and change ids for el in root.getiterator(): if "id" in el.attrib and el.attrib["id"] != "origin": el.attrib["id"] = prefix + el.attrib["id"] if '}' in str(el.tag): el.tag = el.tag.split('}', 1)[1] return [ x for x in root if x.tag and x.tag not in ["title", "desc"]]
def get_core_last_version(self, url, version_core): last_version_core = "" major = version_core.split(".")[0] url_release = url + major + ".x" try: response = requests.get(url_release) response.raise_for_status() if response.status_code == 200: tree = etree.fromstring(response.content) last_version_core = tree.xpath("/project/releases/release/tag")[0].text log.print_cms("info", "[+] Last CMS version: " + last_version_core, "", 0) self.core_details["infos"]["last_version"] = last_version_core except requests.exceptions.HTTPError as e: msg = "Unable to retrieve last wordpress version. Search manually !" log.print_cms("alert", "[-] " + msg, "", 1) return "", e return last_version_core, None
def sign_request(self): ''' Calculates the signature to the header of the SOAP request which can be used by the STS to verify that the SOAP message originated from a trusted service. ''' base_xml = etree.fromstring(self._xml_text) request_tree = _extract_element(base_xml, 'Body', {'SOAP-ENV': "http://schemas.xmlsoap.org/soap/envelope/"}) request = _canonicalize(etree.tostring(request_tree)) request_tree = _extract_element(base_xml, 'Timestamp', {'ns3': "http://docs.oasis-open.org/wss/2004/01/oasis-200401-wss-wssecurity-utility-1.0.xsd"}) timestamp = _canonicalize(etree.tostring(request_tree)) self._request_digest = _make_hash(request.encode(UTF_8)).decode(UTF_8) # pylint: disable=W0612 self._timestamp_digest = _make_hash(timestamp.encode(UTF_8)).decode(UTF_8) # pylint: disable=W0612 self._algorithm = SHA256 self._signed_info = _canonicalize(SIGNED_INFO_TEMPLATE % self.__dict__) self._signature_value = _sign(self._private_key, self._signed_info).decode(UTF_8) self._signature_text = _canonicalize(SIGNATURE_TEMPLATE % self.__dict__) self.embed_signature()
def _canonicalize(xml_string): ''' Given an xml string, canonicalize the string per U{http://www.w3.org/2001/10/xml-exc-c14n#} @type xml_string: C{str} @param xml_string: The XML string that needs to be canonicalized. @rtype: C{str} @return: Canonicalized string in Unicode. ''' parser = etree.XMLParser(remove_blank_text=True) tree = etree.fromstring(xml_string, parser=parser).getroottree() string = BytesIO() tree.write_c14n(string, exclusive=True, with_comments=False) return string.getvalue().decode(UTF_8)
def get_ntp_servers(self): ntp_servers = {} rpc_command = '<Get><Configuration><NTP></NTP></Configuration></Get>' result_tree = ETREE.fromstring(self.device.make_rpc_call(rpc_command)) for version in ['IPV4', 'IPV6']: xpath = './/Peer{version}Table/Peer{version}'.format(version=version) for peer in result_tree.xpath(xpath): peer_type = napalm_base.helpers.find_txt( peer, 'PeerType{version}/Naming/PeerType'.format(version=version)) if peer_type != 'Server': continue server_address = napalm_base.helpers.find_txt( peer, 'Naming/Address{version}'.format(version=version)) if not server_address: continue ntp_servers[server_address] = {} return ntp_servers
def get_account_names(saml_assertion): saml_url = "https://signin.aws.amazon.com:443/saml" headers = { 'Content-Type': 'application/x-www-form-urlencoded; charset=UTF-8' } response = requests.post(saml_url, headers=headers, data={ 'SAMLResponse': saml_assertion.assertion }) response.raise_for_status() html_response = ET.fromstring(response.text, ET.HTMLParser()) account_names = {} for element in html_response.findall('.//div[@class="saml-account-name"]'): account_id = element.text.split(' ')[2].replace('(', '').replace(')', '') account_name = element.text.split(' ')[1] account_names[account_id] = account_name return account_names
def _introspect(self, service, device_instance, path, publish=True): value = self._dbus_conn.call_blocking(service, path, None, 'Introspect', '', []) tree = etree.fromstring(value) nodes = tree.findall('node') if len(nodes) == 0: for iface in tree.findall('interface'): if iface.attrib.get('name') == 'com.victronenergy.BusItem': self._add_item(service, device_instance, path, publish=publish) else: for child in nodes: name = child.attrib.get('name') if name is not None: if path.endswith('/'): p = path + name else: p = path + '/' + name self._introspect(service, device_instance, p, publish=publish)
def import_gemini_object(self, gemini_string): '''Imports the Gemini metadata into CKAN. The harvest_source_reference is an ID that the harvest_source uses for the metadata document. It is the same ID the Coupled Resources use to link dataset and service records. Some errors raise Exceptions. ''' log = logging.getLogger(__name__ + '.import') xml = etree.fromstring(gemini_string) valid, profile, errors = self._get_validator().is_valid(xml) if not valid: out = errors[0][0] + ':\n' + '\n'.join(e[0] for e in errors[1:]) log.error('Errors found for object with GUID %s:' % self.obj.guid) self._save_object_error(out,self.obj,'Import') unicode_gemini_string = etree.tostring(xml, encoding=unicode, pretty_print=True) # may raise Exception for errors package_dict = self.write_package_from_gemini_string(unicode_gemini_string)
def escape(self, tags): output = [] for tag in tags: if re.match(r'<.+ >$', tag): try: etree.fromstring(tag) output.append(tag) except: tag = re.sub(r'(<)(.+) (>)', r'\1\n\2\n\3', tag) tag = self.tagger.tag_text(tag, notagdns=True, notagip=True, notagurl=True, notagemail=True, tagonly=True) tag = [html.escape(t) for t in tag] output += tag elif not re.match(r'<.+>$', tag): output.append(html.escape(tag)) else: test = re.match(r'<rep(.+?) text="(.+)"', tag) if test is not None: output.append('<rep{} text="{}"/>'.format(test.group(1), html.escape(test.group(2)))) else: if re.match(r'[<>]\t', tag): output.append(html.escape(tag)) else: output.append(tag) return output
def _validate_2(resp): """Validates for CASv2""" nsmap = {'cas': 'http://www.yale.edu/tp/cas'} text = await resp.text() tree = etree.fromstring(text) failure = tree.find('cas:authenticationFailure', nsmap) if failure is not None: # Authentication failed! return False success = tree.find('cas:authenticationSuccess', nsmap) if success is not None: attrs = {'user': tree.find('*/cas:user', nsmap).text} return attrs else: # Neither success nor failure? raise InvalidCasResponse('Neither success nor failure on login!', resp)
def _validate_3(resp): """Validates for CASv3""" nsmap = {'cas': 'http://www.yale.edu/tp/cas'} text = await resp.text() tree = etree.fromstring(text) failure = tree.find('cas:authenticationFailure', nsmap) if failure is not None: # Authentication failed! return False success = tree.find('cas:authenticationSuccess', nsmap) if success is not None: attrs = process_attributes(tree) user = tree.find('*/cas:user', nsmap) attrs['user'] = user.text return attrs else: # Neither success nor failure? raise InvalidCasResponse("Neither success nor failure on login!", resp)
def populate(self): data = self.runner.get_command('show version | display-xml') xml_data = ET.fromstring(data) self.facts['name'] = self.parse_name(xml_data) self.facts['version'] = self.parse_version(xml_data) data = self.runner.get_command('show system | display-xml') xml_data = ET.fromstring(data) self.facts['servicetag'] = self.parse_serialnum(xml_data) self.facts['model'] = self.parse_model(xml_data) data = self.runner.get_command('show running-configuration | grep hostname') self.facts['hostname'] = self.parse_hostname(data)
def from_string(ujml_string, file_name="<ujml_input>", globals=None) -> UjmlNode: """ Used lo load in a ujml code from a string. :param string ujml_string: String containing the ujml code. :param string file_name: Source code file name. :param dict globals: Optional dictionary containing global values available in ujml local python interpreter :return: Ujml root node. :rtype: urban_journey.UjmlNode """ parser = etree.XMLParser() lookup = etree.ElementDefaultClassLookup(element=UjmlElement) parser.set_element_class_lookup(lookup) root_elem = etree.fromstring(ujml_string, parser) ujml_node = UjmlNode(root_elem, file_name, globals or {}) return ujml_node
def from_file(file_path, globals=None): """ Used lo load in a ujml code from a file. :param string file_path: Path to ujml file :param dict globals: Optional dictionary containing global values available in ujml local python interpreter :return: Ujml root node. :rtype: urban_journey.UjmlNode """ file_path = os.path.abspath(file_path) with open(file_path) as f: source = f.read() parser = etree.XMLParser() lookup = etree.ElementDefaultClassLookup(element=UjmlElement) parser.set_element_class_lookup(lookup) root_elem = etree.fromstring(source, parser) ujml_node = UjmlNode(root_elem, file_path, globals or {}) return ujml_node
def get_xml_data(req_string, headers, data=None): req = urllib2.Request(req_string, headers=headers) html_data = _get_html_data(req, data) # Clean chunked data html_data = clean_chunked_data(html_data) #log_user_action(req.get_host() ,'chunked data', html_data, {}) try: data = etree.fromstring(html_data) except XMLSyntaxError: # lxml cannot handle encoding declarations :( data = etree.HTML(html_data, etree.HTMLParser()) # data is None when it was not XML, like 404 page without 404 code if data is not None: data = data.getroottree() else: raise urllib2.HTTPError(req_string, 404, "Not an XML", None, None) # TODO: check valid #if not data.find('.//prestashop'): # raise urllib2.HTTPError(req_string, 404, "Not an XML", None, None) return data
def get_proper_citation(xml): root = etree.fromstring(xml) if root.findall('error'): proper_citation = '' else: data_elements = root.findall('data')[0] data_elements = [(e.find('name').text, e.find('value').text) for e in data_elements] # these shouldn't duplicate a = [v for n, v in data_elements if n == 'Proper Citation'] proper_citation = a[0] if a else '' return proper_citation
def submit_to_h(target_uri, found, resolved, h, found_rrids, existing): prefix, exact, exact_for_hypothesis, suffix = found xml, status_code, resolver_uri = resolved new_tags = [] if exact in existing: new_tags.append('RRIDCUR:Duplicate') else: existing.append(exact) if status_code < 300: root = etree.fromstring(xml) if root.findall('error'): s = 'Resolver lookup failed.' s += '<hr><p><a href="%s">resolver lookup</a></p>' % resolver_uri r = h.create_annotation_with_target_using_only_text_quote(url=target_uri, prefix=prefix, exact=exact_for_hypothesis, suffix=suffix, text=s, tags=new_tags + ['RRIDCUR:Unresolved']) print('ERROR, rrid unresolved') else: data_elements = root.findall('data')[0] s = '' data_elements = [(e.find('name').text, e.find('value').text) for e in data_elements] # these shouldn't duplicate citation = [(n, v) for n, v in data_elements if n == 'Proper Citation'] name = [(n, v) for n, v in data_elements if n == 'Name'] data_elements = citation + name + sorted([(n, v) for n, v in data_elements if (n != 'Proper Citation' or n != 'Name') and v is not None]) for name, value in data_elements: if (name == 'Reference' or name == 'Mentioned In Literature') and value is not None and value.startswith('<a class'): if len(value) > 500: continue # nif-0000-30467 fix keep those pubmed links short! s += '<p>%s: %s</p>' % (name, value) s += '<hr><p><a href="%s">resolver lookup</a></p>' % resolver_uri r = h.create_annotation_with_target_using_only_text_quote(url=target_uri, prefix=prefix, exact=exact_for_hypothesis, suffix=suffix, text=s, tags=new_tags + [exact]) elif status_code >= 500: s = 'Resolver lookup failed due to server error.' s += '<hr><p><a href="%s">resolver lookup</a></p>' % resolver_uri else: s = 'Resolver lookup failed.' s += '<hr><p><a href="%s">resolver lookup</a></p>' % resolver_uri r = h.create_annotation_with_target_using_only_text_quote(url=target_uri, prefix=prefix, exact=exact_for_hypothesis, suffix=suffix, text=s, tags=new_tags + ['RRIDCUR:Unresolved']) found_rrids[exact] = r.json()['links']['incontext'] return r
def create_tf_record(output_filename, label_map_dict, annotations_dir, image_dir, examples): """Creates a TFRecord file from examples. Args: output_filename: Path to where output file is saved. label_map_dict: The label map dictionary. annotations_dir: Directory where annotation files are stored. image_dir: Directory where image files are stored. examples: Examples to parse and save to tf record. """ writer = tf.python_io.TFRecordWriter(output_filename) for idx, example in enumerate(examples): if idx % 100 == 0: logging.info('On image %d of %d', idx, len(examples)) path = os.path.join(annotations_dir, 'xmls', example + '.xml') if not os.path.exists(path): logging.warning('Could not find %s, ignoring example.', path) continue with tf.gfile.GFile(path, 'r') as fid: xml_str = fid.read() xml = etree.fromstring(xml_str) data = dataset_util.recursive_parse_xml_to_dict(xml)['annotation'] tf_example = dict_to_tf_example(data, label_map_dict, image_dir) writer.write(tf_example.SerializeToString()) writer.close() # TODO: Add test for pet/PASCAL main files.
def test_re_encode(self): """ Test re-encoding objects working properly. """ for (om, xml) in object_examples: omx = encode_xml(decode_xml(encode_xml(om))) xn = etree.fromstring(xml) self.assertTrue(elements_equal(omx, xn), 'encode(decode(encode(om))) === xml')
def test_re_decode(self): """ Test re-decoding objects working properly. """ for (om, xml) in object_examples: xn = decode_xml(encode_xml(decode_xml(etree.fromstring(xml)))) self.assertEqual(om, xn, 'decode(encode(decode(xml))) === om')
def test_example(self): """ Tests the decoder based on an example. """ # try to parse the xml with open(os.path.join(os.path.dirname(__file__), 'example.om')) as f: xmlnode = etree.fromstring(f.read()) omnode = decode_xml(xmlnode) # and check that they are as expected self.assertEqual(omnode, expected, "Decoding an OpenMath object")
def test_example(self): """ Tests the encoder based on an example. """ with open(os.path.join(os.path.dirname(__file__), 'example.om')) as f: xmlnode = etree.fromstring(f.read()) encoded = encode_xml(expected, 'om') print(etree.tostring(encoded, pretty_print=True).decode()) # and check that they are as expected self.assertTrue(elements_equal(encoded, xmlnode), "Encoding an OpenMath object")
def setUp(self): super(TestCase, self).setUp() article_legacy = ARTICLE.copy() article_legacy['anpa_category'] = [{'name': 'service1'}, {'name': 'service2'}, {'name': 'service3'}] self.formatter = NTBNITFLegacyFormatter() self.base_formatter = Formatter() init_app(self.app) self.tz = pytz.timezone(self.app.config['DEFAULT_TIMEZONE']) if self.article is None: # formatting is done once for all tests to save time # as long as used attributes are not modified, it's fine self.article = article_legacy self.formatter_output = self.formatter.format(self.article, {'name': 'Test NTBNITF'}) self.docs = [formatter['encoded_item'] for formatter in self.formatter_output] self.nitf_xmls = [etree.fromstring(doc) for doc in self.docs] self.nitf_xml = self.nitf_xmls[0]
def setUp(self): super().setUp() self.formatter = NTBNITFFormatter() self.base_formatter = Formatter() init_app(self.app) self.tz = pytz.timezone(self.app.config['DEFAULT_TIMEZONE']) if self.article is None: # formatting is done once for all tests to save time # as long as used attributes are not modified, it's fine self.article = ARTICLE self.formatter_output = self.formatter.format(self.article, {'name': 'Test NTBNITF'}) self.doc = self.formatter_output[0]['encoded_item'] self.nitf_xml = etree.fromstring(self.doc)
def test_empty_dateline(self): """SDNTB-293 regression test""" article = copy.deepcopy(self.article) article['dateline'] = {'located': None} formatter_output = self.formatter.format(article, {'name': 'Test NTBNITF'}) doc = formatter_output[0]['encoded_item'] nitf_xml = etree.fromstring(doc) self.assertEqual(nitf_xml.find('body/body.head/dateline'), None)
def test_prefix_cleaning(self): """SDNTB-313 regression test""" article = copy.deepcopy(self.article) article['abstract'] = '' del article['associations'] article['body_html'] = "<pref:h1><other_pref:body.content><t:t/>toto</other_pref:body.content></pref:h1>" expected = (b'<body.content><p class="lead" lede="true" />toto<p class="txt">' b'footer text</p></body.content>') formatter_output = self.formatter.format(article, {'name': 'Test NTBNITF'}) doc = formatter_output[0]['encoded_item'] nitf_xml = etree.fromstring(doc) body_content = nitf_xml.find("body/body.content") self.assertEqual(b''.join(etree.tostring(body_content).split()), b''.join(expected.split()))
def test_single_counter(self): """SDNTB-338 regression test""" # media counter should appear once and only once when no image is present article = copy.deepcopy(self.article) article['body_html'] = "<p/>" del article['associations'] formatter_output = self.formatter.format(article, {'name': 'Test NTBNITF'}) doc = formatter_output[0]['encoded_item'] nitf_xml = etree.fromstring(doc) head = nitf_xml.find('head') media_counters = head.findall('meta[@name="NTBBilderAntall"]') self.assertEqual(len(media_counters), 1) self.assertEqual(media_counters[0].get('content'), '0')
def test_355(self): """SDNTB-355 regression test formatter should not crash when featuremedia is None """ article = copy.deepcopy(self.article) article['associations']['featuremedia'] = None formatter_output = self.formatter.format(article, {'name': 'Test NTBNITF'}) doc = formatter_output[0]['encoded_item'] nitf_xml = etree.fromstring(doc) # the test will raise an exception during self.formatter.format if SDNTB-355 bug is still present # but we check in addition that media counter is as expected media_counter = nitf_xml.find('head').find('meta[@name="NTBBilderAntall"]') self.assertEqual(media_counter.get('content'), '2')
def test_358(self): """SDNTB-358 regression test invalid characters should be stripped """ article = copy.deepcopy(self.article) bad_char_txt = "SKJÆ\x12R I SJØEN: Kirken Gospa od Skrpjela" article['associations']['embedded10005446043']["description_text"] = bad_char_txt article['body_html'] += bad_char_txt # formatting in next line will fail with body_html if invalid chars are not stripped formatter_output = self.formatter.format(article, {'name': 'Test NTBNITF'}) doc = formatter_output[0]['encoded_item'] # next line will fail if SDNTB-358 is still present etree.fromstring(doc)
def test_388(self): """SDNTB-388 regression test check that between 2 words is not resulting in the 2 words being merged """ article = copy.deepcopy(self.article) article['abstract'] = '' del article['associations'] del article['body_footer'] article['body_html'] = "<p>word1 word2</p>" formatter_output = self.formatter.format(article, {'name': 'Test NTBNITF'}) doc = formatter_output[0]['encoded_item'] nitf_xml = etree.fromstring(doc) p_content = nitf_xml.find("body/body.content/p[@class='txt-ind']").text # there must be a space between the two words self.assertEqual(p_content, "word1 word2")
def test_390(self): """SDNTB-390 regression test formatter should not crash when an embedded is None """ article = copy.deepcopy(self.article) article['associations']['embedded18237840351'] = None formatter_output = self.formatter.format(article, {'name': 'Test NTBNITF'}) doc = formatter_output[0]['encoded_item'] nitf_xml = etree.fromstring(doc) media_counter = nitf_xml.find('head').find('meta[@name="NTBBilderAntall"]') # the test will raise an exception during self.formatter.format if SDNTB-390 bug is still present # but we check in addition that media counter is as expected (same as for test_355) self.assertEqual(media_counter.get('content'), '2')
def test_rewrite_sequence_none(self): article = copy.deepcopy(self.article) article['rewrite_sequence'] = None formatter_output = self.formatter.format(article, {'name': 'Test NTBNITF'}) doc = formatter_output[0]['encoded_item'] nitf_xml = etree.fromstring(doc) doc_id = nitf_xml.find('head/docdata/doc-id') self.assertEqual(doc_id.get('id-string'), 'NTB{}_{:02}'.format(article['family_id'], 0))
def testEntityReplacement(self): doc = """<!DOCTYPE html SYSTEM "about:legacy-compat"><html>β</html>""" tree = etree.fromstring(doc, parser = self.parser).getroottree() result = serializer.serialize(tree, tree="lxml", omit_optional_tags=False) self.assertEquals(u"""<!DOCTYPE html SYSTEM "about:legacy-compat"><html>\u03B2</html>""", result)