我们从Python开源项目中,提取了以下25个代码示例,用于说明如何使用urllib2.html()。
def test_http_doubleslash(self): # Checks that the presence of an unnecessary double slash in a url doesn't break anything # Previously, a double slash directly after the host could cause incorrect parsing of the url h = urllib2.AbstractHTTPHandler() o = h.parent = MockOpener() data = "" ds_urls = [ "http://example.com/foo/bar/baz.html", "http://example.com//foo/bar/baz.html", "http://example.com/foo//bar/baz.html", "http://example.com/foo/bar//baz.html", ] for ds_url in ds_urls: ds_req = Request(ds_url, data) # Check whether host is determined correctly if there is no proxy np_ds_req = h.do_request_(ds_req) self.assertEqual(np_ds_req.unredirected_hdrs["Host"],"example.com") # Check whether host is determined correctly if there is a proxy ds_req.set_proxy("someproxy:3128",None) p_ds_req = h.do_request_(ds_req) self.assertEqual(p_ds_req.unredirected_hdrs["Host"],"example.com")
def test_invalid_redirect(self): from_url = "http://example.com/a.html" valid_schemes = ['http', 'https', 'ftp'] invalid_schemes = ['file', 'imap', 'ldap'] schemeless_url = "example.com/b.html" h = urllib2.HTTPRedirectHandler() o = h.parent = MockOpener() req = Request(from_url) req.timeout = socket._GLOBAL_DEFAULT_TIMEOUT for scheme in invalid_schemes: invalid_url = scheme + '://' + schemeless_url self.assertRaises(urllib2.HTTPError, h.http_error_302, req, MockFile(), 302, "Security Loophole", MockHeaders({"location": invalid_url})) for scheme in valid_schemes: valid_url = scheme + '://' + schemeless_url h.http_error_302(req, MockFile(), 302, "That's fine", MockHeaders({"location": valid_url})) self.assertEqual(o.req.get_full_url(), valid_url)
def new_document(self, content, format="html", title=None, member_ids=[]): """Creates a new document from the given content. To create a document in a folder, include the folder ID in the list of member_ids, e.g., client = quip.QuipClient(...) user = client.get_authenticated_user() client.new_document(..., member_ids=[user["private_folder_id"]]) """ return self._fetch_json("threads/new-document", post_data={ "content": content, "format": format, "title": title, "member_ids": ",".join(member_ids), })
def edit_document(self, thread_id, content, operation=APPEND, format="html", section_id=None, **kwargs): """Edits the given document, adding the given content. `operation` should be one of the constants described above. If `operation` is relative to another section of the document, you must also specify the `section_id`. """ args = { "thread_id": thread_id, "content": content, "location": operation, "format": format, "section_id": section_id, } args.update(kwargs) return self._fetch_json("threads/edit-document", post_data=args)
def setProxy(self, host, type='http'): """ Set the proxy for all requests to use. @type type: C{string} @see: U{The Python Docs<http://docs.python.org/library/urllib2.html# urllib2.Request.set_proxy>} """ self.proxy_args = (host, type)
def test_redirect_fragment(self): redirected_url = 'http://www.example.com/index.html#OK\r\n\r\n' hh = MockHTTPHandler(302, 'Location: ' + redirected_url) hdeh = urllib2.HTTPDefaultErrorHandler() hrh = urllib2.HTTPRedirectHandler() o = build_test_opener(hh, hdeh, hrh) fp = o.open('http://www.example.com') self.assertEqual(fp.geturl(), redirected_url.strip())
def test_url_fragment(self): req = Request("http://www.python.org/?qs=query#fragment=true") self.assertEqual("/?qs=query", req.get_selector()) req = Request("http://www.python.org/#fun=true") self.assertEqual("/", req.get_selector()) # Issue 11703: geturl() omits fragment in the original URL. url = 'http://docs.python.org/library/urllib2.html#OK' req = Request(url) self.assertEqual(req.get_full_url(), url)
def copy_document(self, id, title=None, member_ids=[]): """Creates a new document from the given thread ID. To create it in a folder, include the folder ID in member_ids. """ old_thread = self.get_thread(id) return self.new_document( old_thread["html"], title=title or old_thread["thread"]["title"], member_ids=member_ids)
def get_section(self, section_id, thread_id=None, document_html=None): if not document_html: document_html = self.get_thread(thread_id).get("html") if not document_html: return None tree = self.parse_document_html(document_html) element = list(tree.iterfind(".//*[@id='%s']" % section_id)) if not element: return None return element[0]
def _get_container(self, thread_id, document_html, container, index): if not document_html: document_html = self.get_thread(thread_id).get("html") if not document_html: return None tree = self.parse_document_html(document_html) lists = list(tree.iter(container)) if not lists: return None try: return lists[index] except IndexError: return None
def parse_document_html(self, document_html): """Returns an `ElementTree` for the given Quip document HTML""" document_xml = "<html>" + document_html + "</html>" return xml.etree.cElementTree.fromstring(document_xml.encode("utf-8"))
def get_blob(self, thread_id, blob_id): """Returns a file-like object with the contents of the given blob from the given thread. The object is described in detail here: https://docs.python.org/2/library/urllib2.html#urllib2.urlopen """ request = urllib2.Request( url=self._url("blob/%s/%s" % (thread_id, blob_id))) if self.access_token: request.add_header("Authorization", "Bearer " + self.access_token) try: return urllib2.urlopen(request, timeout=self.request_timeout) except urllib2.HTTPError, error: try: # Extract the developer-friendly error message from the response message = json.loads(error.read())["error_description"] except Exception: raise error if (self.retry_rate_limit and error.code == 503 and message == "Over Rate Limit"): # Retry later. reset_time = float(error.headers.get("X-RateLimit-Reset")) delay = max(2, reset_time - time.time() + 1) logging.warning("Rate Limit, delaying for %d seconds" % delay) time.sleep(delay) return self.get_blob(thread_id, blob_id) else: raise QuipError(error.code, message, error)
def merge_comments(self, original_id, children_ids): """Given an original document and a set of exact duplicates, copies all comments and messages on the duplicates to the original. """ import re threads = self.get_threads(children_ids + [original_id]) original_section_ids = re.findall(r" id='([a-zA-Z0-9]{11})'", threads[original_id]["html"]) for thread_id in children_ids: thread = threads[thread_id] child_section_ids = re.findall(r" id='([a-zA-Z0-9]{11})'", thread["html"]) parent_map = dict(zip(child_section_ids, original_section_ids)) messages = self.get_messages(thread_id) for message in reversed(messages): kwargs = {} if "parts" in message: kwargs["parts"] = json.dumps(message["parts"]) else: kwargs["content"] = message["text"] if "annotation" in message: section_id = None if "highlight_section_ids" in message["annotation"]: section_id = message["annotation"][ "highlight_section_ids"][0] else: anno_loc = thread["html"].find( '<annotation id="%s"' % message["annotation"]["id"]) loc = thread["html"].rfind("id=", 0, anno_loc) if anno_loc >= 0 and loc >= 0: section_id = thread["html"][loc+4:loc+15] if section_id and section_id in parent_map: kwargs["section_id"] = parent_map[section_id] if "files" in message: attachments = [] for blob_info in message["files"]: blob = self.get_blob(thread_id, blob_info["hash"]) new_blob = self.put_blob( original_id, blob, name=blob_info["name"]) attachments.append(new_blob["id"]) if attachments: kwargs["attachments"] = ",".join(attachments) self.new_message(original_id, **kwargs)