我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用lxml.etree._Element()。
def extract_value(self, html): """ Use css_select or re_select to extract a field value :return: """ value = '' if self.css_select: value = html.cssselect(self.css_select) elif self.xpath_select: value = html.xpath(self.xpath_select) else: raise ValueError('%s field: css_select or xpath_select is expected' % self.__class__.__name__) if isinstance(value, list) and len(value) == 1: if isinstance(value[0], etree._Element): text = '' for node in value[0].itertext(): text += node.strip() value = text return value
def xpathOne(self,xpath): ''' :param xpath: <class str|xpath expression> :function: xpath match one :return: <class Parser> ''' try: labels = self._html.xpath(xpath) except Exception as e: printText("[Error]parser.py Parser xpathOne:%s %s %s"%(e,xpath,self.url),logFile=self.logFile,color=self.color,debug=self.debug) return Parser(data='',url=self.url,logFile=self.logFile,color=self.color,debug=self.debug) if len(labels) > 0: label = labels[0] return Parser(data=etree.tostring(label,encoding="unicode",method="html"),url=self.url,logFile=self.logFile,color=self.color, debug=self.debug) if isinstance(label,etree._Element) else Parser(data=label,url=self.url,logFile=self.logFile,color=self.color,debug=self.debug) else: printText("[WARING]parser.py Parser xpathOne parse None:%s %s"%(xpath,self.url),logFile=self.logFile,color=self.color,debug=self.debug) return Parser(data='',url=self.url,logFile=self.logFile,color=self.color,debug=self.debug)
def xpathAll(self,xpath): ''' :param xpath: <class str|xpath expression> :function: xpath match all :return: [<class Parser>,<class Parser>...] ''' try: labels = self._html.xpath(xpath) except Exception as e: printText("[Error]parser.py Parser xpathAll:%s %s %s"%(e,xpath,self.url),logFile=self.logFile,color=self.color,debug=self.debug) return [] if len(labels)>0: return [Parser(data=etree.tostring(label,encoding="unicode",method="html"),url=self.url,logFile=self.logFile,color=self.color, debug=self.debug) if isinstance(label,etree._Element) else Parser(data=label,url=self.url,logFile=self.logFile,color=self.color, debug=self.debug) for label in labels] else: printText("[WARING]parser.py Parser xpathAll parse None:%s %s"%(xpath,self.url),logFile=self.logFile,color=self.color,debug=self.debug) return []
def _get_root(self, value): if isinstance(value, basestring): root = fromstring(unicode('<root>') + value + unicode('</root>'), self.parser)[0] elif isinstance(value, etree._Element): root = self._copy(value) elif isinstance(value, PyQuery): root = value else: raise TypeError( 'Value must be string, PyQuery or Element. Got %r' % value) if hasattr(root, 'text') and isinstance(root.text, basestring): root_text = root.text else: root_text = '' return root, root_text
def __init__(self, name=None, element=None, icon=None): if element is None: element = Element('Group') name = xmlfactory.create_name_element(name) uuid = xmlfactory.create_uuid_element() element.append(uuid) element.append(name) if icon: icon_el = xmlfactory.create_icon_element(icon) element.append(icon_el) assert type(element) in [_Element, Element, ObjectifiedElement], \ 'The provided element is not an LXML Element, but {}'.format( type(element) ) assert element.tag == 'Group', 'The provided element is not a Group '\ 'element, but a {}'.format(element.tag) self._element = element
def merge_nodes(src: etree._Element, dst: etree._Element): """ Merges the node ``src`` including their subelements to ``dst``. The Nodes are considered as equal - and thus merged - if their fully qualified names are identical. Different matching and merging strategies will be added as needed. """ def child_with_qname(element: etree._Element, qname: etree.QName): for child in element.iterchildren(qname.text): if etree.QName(child).text == qname.text: return child merged_elements = set() for child in dst.iterchildren(): twin = child_with_qname(src, etree.QName(child)) if twin is not None: merge_nodes(twin, child) merged_elements.add(twin) for child in src.iterchildren(): if child in merged_elements: continue dst.append(deepcopy(child))
def __call__(self, transformation_root: etree._Element, copy: bool = None, **context: AnyType) -> AnyType: copy = self.config.copy if copy is None else copy self._init_transformation(transformation_root, copy, context) for step in self.steps: _step_name = step.name if hasattr(step, 'name') else step.__name__ dbg("Processing rule '{}'.".format(_step_name)) self.states.current_step = step try: if isinstance(step, Rule): self._apply_rule(step) else: self._apply_handlers(step) except AbortTransformation: dbg("Aborting due to 'AbortTransformation'.") break if self.config.result_object: result = dot_lookup(self, self.config.result_object) else: result = None self._finalize_transformation() return result
def xpathone(self, xpath): """ :param xpath: xpath expression :function: xpath match one :return: class Parser """ try: labels = self.__html.xpath(xpath) except Exception as e: Logger.error("%s [-] <xpath> %s <url> %s" % (e, xpath, self.url)) return Parser(data='', url=self.url) if len(labels) > 0: label = labels[0] return Parser( data=etree.tostring(label, encoding="unicode", method="html"), url=self.url ) if isinstance(label, etree._Element) else Parser(data=label, url=self.url) else: Logger.warning("The parsing result is None [-] <xpath> %s <url> %s" % (xpath, self.url)) return Parser(data='', url=self.url)
def xpathall(self, xpath): """ :param xpath: xpath expression :function: xpath match all :return: [class Parser, class Parser, ...] """ try: labels = self.__html.xpath(xpath) except Exception as e: Logger.error("%s [-] <xpath> %s <url> %s" % (e, xpath, self.url)) return [] if len(labels) > 0: return [Parser(data=etree.tostring(label, encoding="unicode", method="html"), url=self.url) if isinstance(label, etree._Element) else Parser(data=label, url=self.url) for label in labels] else: Logger.warning("The parsing result is None [-] <xpath> %s <url> %s" % (xpath, self.url)) return []
def Query(self, uri): """Performs a query and returns a resulting feed or entry. Args: uri: A string representing the URI of the feed that is to be queried. Returns: On success, a tuple in the form: (boolean succeeded=True, ElementTree._Element result) On failure, a tuple in the form: (boolean succeeded=False, {'status': HTTP status code from server, 'reason': HTTP reason from the server, 'body': HTTP body of the server's response}) """ result = self.Get(uri) return result
def _get_elements_by_xpath(filter_, data, expression): try: from lxml import etree except ImportError: raise common.FilterError(filter_, "module lxml not found") # pylint: disable=no-member html_parser = etree.HTMLParser(encoding='utf-8', recover=True, strip_cdata=True) document = etree.fromstringlist([data], html_parser) for elem in document.xpath(expression): # pylint: disable=protected-access if isinstance(elem, etree._Element): text = etree.tostring(elem) else: text = str(elem) if isinstance(text, str): yield text else: yield text.decode('utf-8')
def _filter(self, item: str, result: common.Result) -> ty.Iterable[str]: try: from lxml import etree except ImportError: raise common.FilterError(self, "module lxml not found") # pylint: disable=no-member html_parser = etree.HTMLParser(encoding='utf-8', recover=True, strip_cdata=True) document = etree.fromstringlist([item], html_parser) for elem in document.findall(".//*[@id='" + self._conf["sel"] + "']"): # pylint: disable=protected-access if isinstance(elem, etree._Element): text = etree.tostring(elem) # type: ty.Union[str, bytes] if text: if hasattr(text, 'decode'): yield text.decode('utf-8') else: yield str(text) else: yield str(elem)
def test_any_value_element_tree(): schema = get_any_schema() item = etree.Element('{http://tests.python-zeep.org}lxml') etree.SubElement(item, 'node').text = 'foo' container_elm = schema.get_element('{http://tests.python-zeep.org/}container') # Create via arg obj = container_elm(item) node = etree.Element('document') container_elm.render(node, obj) expected = """ <document> <ns0:container xmlns:ns0="http://tests.python-zeep.org/"> <ns0:lxml xmlns:ns0="http://tests.python-zeep.org"> <node>foo</node> </ns0:lxml> </ns0:container> </document> """ assert_nodes_equal(expected, node) item = container_elm.parse(node.getchildren()[0], schema) assert isinstance(item._value_1, etree._Element) assert item._value_1.tag == '{http://tests.python-zeep.org}lxml'
def _render_value_item(self, parent, value, render_path): if value is None: # can be an lxml element return elif isinstance(value, etree._Element): parent.append(value) elif self.restrict: if isinstance(value, list): for val in value: self.restrict.render(parent, val, None, render_path) else: self.restrict.render(parent, value, None, render_path) else: if isinstance(value.value, list): for val in value.value: value.xsd_elm.render(parent, val, render_path) else: value.xsd_elm.render(parent, value.value, render_path)
def _validate_item(self, value, render_path): if value is None: # can be an lxml element return # Check if we received a proper value object. If we receive the wrong # type then return a nice error message if self.restrict: expected_types = (etree._Element, dict,) + self.restrict.accepted_types else: expected_types = (etree._Element, dict, AnyObject) if not isinstance(value, expected_types): type_names = [ '%s.%s' % (t.__module__, t.__name__) for t in expected_types ] err_message = "Any element received object of type %r, expected %s" % ( type(value).__name__, ' or '.join(type_names)) raise TypeError('\n'.join(( err_message, "See http://docs.python-zeep.org/en/master/datastructures.html" "#any-objects for more information" )))
def load(self, schema, node): """Load the XML Schema passed in via the node attribute. :type schema: zeep.xsd.schema.Schema :type node: etree._Element """ if node is None: return if not schema.documents.has_schema_document_for_ns(self._target_namespace): raise RuntimeError( "The document needs to be registered in the schema before " + "it can be loaded") # Disable XML schema validation for now # if len(node) > 0: # self.xml_schema = etree.XMLSchema(node) visitor = SchemaVisitor(schema, self) visitor.visit_schema(node)
def map_node_to_class(self, node): if isinstance(node, etree._ProcessingInstruction): return nodes.ProcessingInstruction elif isinstance(node, etree._Comment): return nodes.Comment elif isinstance(node, etree._ElementTree): return nodes.Document elif isinstance(node, etree._Element): return nodes.Element elif isinstance(node, LXMLAttribute): return nodes.Attribute elif isinstance(node, LXMLText): if node.is_cdata: return nodes.CDATA else: return nodes.Text raise exceptions.Xml4hImplementationBug( 'Unrecognized type for implementation node: %s' % node)
def new_impl_element(self, tagname, ns_uri=None, parent=None): if ns_uri is not None: if ':' in tagname: tagname = tagname.split(':')[1] my_nsmap = {None: ns_uri} # Add any xmlns attribute prefix mappings from parent's document # TODO This doesn't seem to help curr_node = parent while curr_node.__class__ == etree._Element: for n, v in curr_node.attrib.items(): if '{%s}' % nodes.Node.XMLNS_URI in n: _, prefix = n.split('}') my_nsmap[prefix] = v curr_node = self.get_node_parent(curr_node) return etree.Element('{%s}%s' % (ns_uri, tagname), nsmap=my_nsmap) else: return etree.Element(tagname)
def find_node_elements(self, node, name='*', ns_uri='*'): # TODO Any proper way to find namespaced elements by name? name_match_nodes = node.getiterator() # Filter nodes by name and ns_uri if necessary results = [] for n in name_match_nodes: # Ignore the current node if n == node: continue # Ignore non-Elements if not n.__class__ == etree._Element: continue if ns_uri != '*' and self.get_node_namespace_uri(n) != ns_uri: continue if name != '*' and self.get_node_local_name(n) != name: continue results.append(n) return results
def lookup_ns_prefix_for_uri(self, node, uri): if uri == nodes.Node.XMLNS_URI: return 'xmlns' result = None if hasattr(node, 'nsmap') and uri in node.nsmap.values(): for n, v in node.nsmap.items(): if v == uri: result = n break # TODO This is a slow hack necessary due to lxml's immutable nsmap if result is None or re.match('ns\d', result): # We either have no namespace prefix in the nsmap, in which case we # will try looking for a matching xmlns attribute, or we have # a namespace prefix that was probably assigned automatically by # lxml and we'd rather use a human-assigned prefix if available. curr_node = node # self.get_node_parent(node) while curr_node.__class__ == etree._Element: for n, v in curr_node.attrib.items(): if v == uri and ('{%s}' % nodes.Node.XMLNS_URI) in n: result = n.split('}')[1] return result curr_node = self.get_node_parent(curr_node) return result
def format_payload(self, api_version, data): """ Return appropriate QualysGuard API call. """ # Check if payload is for API v1 or API v2. if (api_version in (1, 2)): # Check if string type. if type(data) == str: # Convert to dictionary. logger.debug('Converting string to dict:\n%s' % data) # Remove possible starting question mark & ending ampersands. data = data.lstrip('?') data = data.rstrip('&') # Convert to dictionary. data = urllib.parse.parse_qs(data) logger.debug('Converted:\n%s' % str(data)) elif api_version in ('am', 'was', 'am2'): if type(data) == etree._Element: logger.debug('Converting lxml.builder.E to string') data = etree.tostring(data) logger.debug('Converted:\n%s' % data) return data
def __init__(self, content): # we need to turn content into valid utf8 content = to_unicode(content, encoding='iso-8859-1').encode('utf-8') # content is a string if len(content) != 0: self.dict = xml2dict(content) # etree._Element self.element = etree.fromstring(content) else: self.dict = OrderedDict() self.element = etree.Element('empty_content') # Raw XML self.xml = content self.tag = re.sub(pattern='{[^}]+}', repl='', string=self.element.tag, flags=0)
def elements_equal(e1, e2): """ Checks if two xml elements are equal. :param e1: First element to check. :type e1: etree._Element :param e2: Seconds element to check. :type e2: etree._Element Adapted from http://stackoverflow.com/a/24349916. :rtype: bool """ if e1.tag != e2.tag: return False e1te = e1.text.strip() if e1.text is not None else '' e2te = e2.text.strip() if e2.text is not None else '' if e1te != e2te: return False e1ta = e1.tail.strip() if e1.tail is not None else '' e2ta = e2.tail.strip() if e2.tail is not None else '' if e1ta != e2ta: return False if e1.attrib != e2.attrib: return False if len(e1) != len(e2): return False return all(elements_equal(c1, c2) for c1, c2 in zip(e1, e2))
def __init__(self, html): if html is None or not isinstance(html, etree._Element): raise ValueError("etree._Element is expected") for field_name, field_value in self._fields.items(): get_field = getattr(self, 'tal_%s' % field_name, None) value = field_value.extract_value(html) if isinstance(field_value, BaseField) else field_value if get_field: value = get_field(value) setattr(self, field_name, value)
def staged_to_string(self): """Convert the staging area to a list of strings. Returns: A list of string representing the configuration in the staging area. """ cfgs = [] for cfg in self.staged: if isinstance(cfg['config'], etree._Element): cfgs.append(etree.tostring(cfg['config'])) else: cfgs.append(cfg['config']) return cfgs
def __init__(self, title=None, username=None, password=None, url=None, notes=None, tags=None, expires=False, expiry_time=None, icon=None, element=None): if element is None: element = Element('Entry') title = xmlfactory.create_title_element(title) uuid = xmlfactory.create_uuid_element() username = xmlfactory.create_username_element(username) password = xmlfactory.create_password_element(password) times = xmlfactory.create_times_element(expires, expiry_time) if url: url_el = xmlfactory.create_url_element(url) element.append(url_el) if notes: notes_el = xmlfactory.create_notes_element(notes) element.append(notes_el) if tags: tags_el = xmlfactory.create_tags_element(tags) element.append(tags_el) if icon: icon_el = xmlfactory.create_icon_element(icon) element.append(icon_el) element.append(title) element.append(uuid) element.append(username) element.append(password) element.append(times) assert type(element) in [_Element, Element, ObjectifiedElement], \ 'The provided element is not an LXML Element, but {}'.format( type(element) ) assert element.tag == 'Entry', 'The provided element is not an Entry '\ 'element, but a {}'.format(element.tag) self._element = element
def append(name, symbol=Ref('previous_result'), copy_element=False): """ Appends the object referenced by ``symbol`` (default: the result of the previous :term:`handler function`) to the object referenced by ``name`` in the :term:`context` namespace. If the object is an element and ``copy_element`` is ``True``, a copy is appended to the target. """ def handler(context, previous_result, transformation): obj = symbol(transformation) if copy_element and isinstance(obj, etree._Element): obj = deepcopy(obj) dot_lookup(context, name).append(obj) return previous_result return handler
def find(element, path): """ A helper function around a :attr:`lxml.etree._Element.find` that passes the element's namespace mapping. """ return element.find(path, namespaces=element.nsmap)
def is_root_element(element: etree._Element) -> bool: """ Tests whether the given element is the root of the tree object. Not to be mixed up with the root element of a possible sub-document a transformation may be called with. """ return element is element.getroottree().getroot()
def traverse_df_ltr_btt(root: etree._Element) -> Iterator[etree._Element]: def yield_children(element): for child in element: yield from yield_children(child) yield element yield from yield_children(root)
def traverse_root(root: etree._Element) -> Iterator[etree._Element]: yield root
def _is_root_condition(element: etree._Element, transformation: 'Transformation'): return element is transformation.root
def Any(*conditions: Sequence[ConditionType]) -> Callable: """ Returns a callable that evaluates the provided test functions and returns ``True`` if any of them returned that. """ conditions = tuple(_condition_factory(x) for x in _flatten_sequence(conditions)) def evaluator(element: etree._Element, transformation: Transformation) -> bool: return any(x(element, transformation) for x in conditions) return evaluator
def OneOf(*conditions: Sequence[ConditionType]) -> Callable: """ Returns a callable that evaluates the provided test functions and returns ``True`` if exactly one of them returned that. """ conditions = tuple(_condition_factory(x) for x in _flatten_sequence(conditions)) def evaluator(element: etree._Element, transformation: Transformation) -> bool: return [x(element, transformation) for x in conditions].count(True) == 1 return evaluator
def Not(*conditions: Sequence[ConditionType]) -> Callable: """ Returns a callable that evaluates the provided test functions and returns ``True`` if any of them returned ``False``. """ conditions = tuple(_condition_factory(x) for x in _flatten_sequence(conditions)) def evaluator(element: etree._Element, transformation: Transformation) -> bool: return not any(x(element, transformation) for x in conditions) return evaluator
def HasLocalname(tag: AnyStr) -> Callable: """ Returns a callable that tests an element for the given local tag name. """ def evaluator(element: etree._Element, _) -> bool: return etree.QName(element).localname == tag return evaluator
def MatchesXPath(xpath: Union[str, Callable]) -> Callable: """ Returns a callable that tests an element for the given XPath expression (whether the evaluation result on the :term:`transformation root` contains it) . If the ``xpath`` argument is a callable, it will be called with the current transformation as argument to obtain the expression. """ def callable_evaluator(element: etree._Element, transformation: Transformation) -> bool: _xpath = xpath(transformation) dbg("Resolved XPath from callable: '{}'".format(_xpath)) return element in transformation.xpath_evaluator(_xpath) def string_evaluator(element: etree._Element, transformation: Transformation) -> bool: return element in transformation.xpath_evaluator(xpath) return callable_evaluator if callable(xpath) else string_evaluator
def _test_conditions(self, element: etree._Element, conditions: Sequence[Callable]) -> bool: # there's no dependency injection here because its overhead # shall be avoided during testing conditions for condition in conditions: dbg("Testing condition '{}'.".format(condition)) if not condition(element, self): dbg('The condition did not apply.') return False dbg('The condition applied.') return True
def assertUnorderedXmlEquivalentOutputs(self, data, expected, excepted_elements = ('lastBuildDate', 'generator')): """ Children and text subnodes of each element in XML are considered as unordered set. Therefore if two XML files has different order of same elements then these XMLs are same. """ if not excepted_elements: excepted_elements = () if isinstance(excepted_elements, six.string_types): excepted_elements = (excepted_elements,) data = data if isinstance(data, etree._Element) \ else etree.fromstring(self._str_to_bytes(data)) for excepted_element in excepted_elements: for element in data.xpath('//{}'.format(excepted_element)): element.getparent().remove(element) data_tuple = self._xml_to_tuple(data) expected = expected if isinstance(expected, etree._Element) \ else etree.fromstring(self._str_to_bytes(expected)) for excepted_element in excepted_elements: for element in expected.xpath('//{}'.format(excepted_element)): element.getparent().remove(element) expected_tuple = self._xml_to_tuple(expected) if data_tuple != expected_tuple: self.fail( 'Feeds are not equivalent accurate within ordering ' '(taking into consideration excepted nodes {excepted_elements}):\n' 'Given: {given}\n' 'Expected: {expected}' .format(excepted_elements=excepted_elements, given=etree.tostring(data), expected=etree.tostring(expected)))
def annotate(elements): begin_comment = etree.Comment(' begin of code added by maintainer ') end_comment = etree.Comment(' end of code added by maintainer ') if isinstance(elements, etree._Element): if elements.text and elements.text.strip(): begin_comment.tail = elements.text.strip() elements.text = '' elements[:0] = [begin_comment] elements.append(end_comment) return elements return [begin_comment] + elements + [end_comment]
def replace_xml(self, replaced, content): parent = replaced.getparent() if not isinstance(replaced, etree._Element): parent.extend(content) if content.tag is not etree.Comment: parent.text = content.text return idx = parent.index(replaced) items = len(content) del parent[idx] for i, element in enumerate(content): parent.insert(idx + i, element) self.reformat(parent, parent[idx: idx + items])
def Post(self, data, uri, extra_headers=None, url_params=None, escape_params=True, redirects_remaining=4, media_source=None, converter=None): """Insert or update data into a GData service at the given URI. Args: data: string, ElementTree._Element, atom.Entry, or gdata.GDataEntry The XML to be sent to the uri. uri: string The location (feed) to which the data should be inserted. Example: '/base/feeds/items'. extra_headers: dict (optional) HTTP headers which are to be included. The client automatically sets the Content-Type, Authorization, and Content-Length headers. url_params: dict (optional) Additional URL parameters to be included in the URI. These are translated into query arguments in the form '&dict_key=value&...'. Example: {'max-results': '250'} becomes &max-results=250 escape_params: boolean (optional) If false, the calling code has already ensured that the query will form a valid URL (all reserved characters have been escaped). If true, this method will escape the query and any URL parameters provided. media_source: MediaSource (optional) Container for the media to be sent along with the entry, if provided. converter: func (optional) A function which will be executed on the server's response. Often this is a function like GDataEntryFromString which will parse the body of the server's response and return a GDataEntry. Returns: If the post succeeded, this method will return a GDataFeed, GDataEntry, or the results of running converter on the server's result body (if converter was specified). """ return GDataService.PostOrPut(self, 'POST', data, uri, extra_headers=extra_headers, url_params=url_params, escape_params=escape_params, redirects_remaining=redirects_remaining, media_source=media_source, converter=converter)
def Put(self, data, uri, extra_headers=None, url_params=None, escape_params=True, redirects_remaining=3, media_source=None, converter=None): """Updates an entry at the given URI. Args: data: string, ElementTree._Element, or xml_wrapper.ElementWrapper The XML containing the updated data. uri: string A URI indicating entry to which the update will be applied. Example: '/base/feeds/items/ITEM-ID' extra_headers: dict (optional) HTTP headers which are to be included. The client automatically sets the Content-Type, Authorization, and Content-Length headers. url_params: dict (optional) Additional URL parameters to be included in the URI. These are translated into query arguments in the form '&dict_key=value&...'. Example: {'max-results': '250'} becomes &max-results=250 escape_params: boolean (optional) If false, the calling code has already ensured that the query will form a valid URL (all reserved characters have been escaped). If true, this method will escape the query and any URL parameters provided. converter: func (optional) A function which will be executed on the server's response. Often this is a function like GDataEntryFromString which will parse the body of the server's response and return a GDataEntry. Returns: If the put succeeded, this method will return a GDataFeed, GDataEntry, or the results of running converter on the server's result body (if converter was specified). """ return GDataService.PostOrPut(self, 'PUT', data, uri, extra_headers=extra_headers, url_params=url_params, escape_params=escape_params, redirects_remaining=redirects_remaining, media_source=media_source, converter=converter)
def YouTubeQuery(self, query): """Performs a YouTube specific query and returns a resulting feed or entry. Args: query: A Query object or one if its sub-classes (YouTubeVideoQuery, YouTubeUserQuery or YouTubePlaylistQuery). Returns: Depending on the type of Query object submitted returns either a YouTubeVideoFeed, a YouTubeUserFeed, a YouTubePlaylistFeed. If the Query object provided was not YouTube-related, a tuple is returned. On success the tuple will be in this form: (boolean succeeded=True, ElementTree._Element result) On failure, the tuple will be in this form: (boolean succeeded=False, {'status': HTTP status code from server, 'reason': HTTP reason from the server, 'body': HTTP body of the server response}) """ result = self.Query(query.ToUri()) if isinstance(query, YouTubeUserQuery): return gdata.youtube.YouTubeUserFeedFromString(result.ToString()) elif isinstance(query, YouTubePlaylistQuery): return gdata.youtube.YouTubePlaylistFeedFromString(result.ToString()) elif isinstance(query, YouTubeVideoQuery): return gdata.youtube.YouTubeVideoFeedFromString(result.ToString()) else: return result
def Post(self, data, uri, extra_headers=None, url_params=None, escape_params=True, content_type='application/atom+xml'): """Insert data into an APP server at the given URI. Args: data: string, ElementTree._Element, or something with a __str__ method The XML to be sent to the uri. uri: string The location (feed) to which the data should be inserted. Example: '/base/feeds/items'. extra_headers: dict (optional) HTTP headers which are to be included. The client automatically sets the Content-Type, Authorization, and Content-Length headers. url_params: dict (optional) Additional URL parameters to be included in the URI. These are translated into query arguments in the form '&dict_key=value&...'. Example: {'max-results': '250'} becomes &max-results=250 escape_params: boolean (optional) If false, the calling code has already ensured that the query will form a valid URL (all reserved characters have been escaped). If true, this method will escape the query and any URL parameters provided. Returns: httplib.HTTPResponse Server's response to the POST request. """ if extra_headers is None: extra_headers = {} if content_type: extra_headers['Content-Type'] = content_type return self.request('POST', uri, data=data, headers=extra_headers, url_params=url_params)
def _BecomeChildElement(self, element_tree): """Converts this object into an etree element and adds it as a child node. Adds self to the ElementTree. This method is required to avoid verbose XML which constantly redefines the namespace. Args: element_tree: ElementTree._Element The element to which this object's XML will be added. """ new_element = ElementTree.Element('tag__') # uh, uhm... empty tag name - sorry google, this is bogus? (c)https://github.com/lqc element_tree.append(new_element) self._TransferToElementTree(new_element)
def parse_xmlelements(self, xmlelements, schema, name=None, context=None): """Consume matching xmlelements and call parse() on each of them :param xmlelements: Dequeue of XML element objects :type xmlelements: collections.deque of lxml.etree._Element :param schema: The parent XML schema :type schema: zeep.xsd.Schema :param name: The name of the parent element :type name: str :param context: Optional parsing context (for inline schemas) :type context: zeep.xsd.context.XmlParserContext :return: dict or None """ result = [] for _unused in max_occurs_iter(self.max_occurs): if xmlelements: xmlelement = xmlelements.popleft() item = self.parse(xmlelement, schema, context=context) if item is not None: result.append(item) else: break if not self.accepts_multiple: result = result[0] if result else None return result
def get_node_namespace_uri(self, node): if '}' in node.tag: return node.tag.split('}')[0][1:] elif isinstance(node, LXMLAttribute): return node.namespace_uri elif isinstance(node, etree._ElementTree): return None elif isinstance(node, etree._Element): qname, ns_uri = self._unpack_name(node.tag, node)[:2] return ns_uri else: return None
def ExecuteWithPostData_FORM(session_id, uri, sub_url, timeout, post_data): headers = CreateHeadersWithSessionCookieAndCustomHeader(session_id) # TODO: another clue that refactor/redesign is required, xml testing if form code if isinstance(post_data, etree._Element): headers["Content-Type"] = "text/xml; charset=UTF-8" post_data = as_string(post_data) else: headers["Content-Type"] = "application/x-www-form-urlencoded; charset=UTF-8" post_data = urllib.parse.urlencode(utf8_encoded(post_data)).encode(encoding="utf-8") return ExecuteWebRequest(uri + sub_url, post_data, headers, timeout, lambda: 'POST')