我们从Python开源项目中,提取了以下15个代码示例,用于说明如何使用urllib2.addinfourl()。
def http_response(self, req, resp): old_resp = resp # gzip if resp.headers.get("content-encoding") == "gzip": gz = GzipFile( fileobj=StringIO(resp.read()), mode="r" ) resp = urllib2.addinfourl(gz, old_resp.headers, old_resp.url, old_resp.code) resp.msg = old_resp.msg # deflate if resp.headers.get("content-encoding") == "deflate": gz = StringIO(self.deflate(resp.read())) resp = urllib2.addinfourl(gz, old_resp.headers, old_resp.url, old_resp.code) # 'class to add info() and resp.msg = old_resp.msg return resp
def http_response(self, req, resp): old_resp = resp # gzip if resp.headers.get("content-encoding") == "gzip": gz = GzipFile( fileobj=StringIO(resp.read()), mode="r" ) resp = urllib2.addinfourl(gz, old_resp.headers, old_resp.url, old_resp.code) resp.msg = old_resp.msg # deflate if resp.headers.get("content-encoding") == "deflate": gz = StringIO( deflate(resp.read()) ) resp = urllib2.addinfourl(gz, old_resp.headers, old_resp.url, old_resp.code) # 'class to add info() and resp.msg = old_resp.msg return resp # deflate support
def dispatch_query(self, request_url, headers, method="GET", data=None): """Synchronously dispatch an OAuth-signed request to L{request_url}. :param request_url: The URL to which the request is to be sent. :param headers: Headers to include in the request. :type headers: A dict. :param method: The HTTP method, e.g. C{GET}, C{POST}, etc. An AssertionError is raised if trying to pass data for a GET. :param data: The data to send, if any. :type data: A byte string. :return: A open file-like object that contains the response. """ headers = dict(headers) # header keys are case insensitive, so we have to pass over them set_accept_encoding = False for key in headers: if key.lower() == 'accept-encoding': # The user already supplied a requested encoding, so just pass # it along. break else: set_accept_encoding = True headers['Accept-encoding'] = 'gzip' req = RequestWithMethod(request_url, data, headers, method=method) res = urllib2.urlopen(req) # If we set the Accept-encoding header, then we decode the header for # the caller. is_gzip = ( set_accept_encoding and res.info().get('Content-Encoding') == 'gzip') if is_gzip: # Workaround python's gzip failure, gzip.GzipFile wants to be able # to seek the file object. res_content_io = BytesIO(res.read()) ungz = gzip.GzipFile(mode='rb', fileobj=res_content_io) res = urllib2.addinfourl(ungz, res.headers, res.url, res.code) return res
def to_addinfourl(response): """Convert a `django.http.HttpResponse` to a `urllib2.addinfourl`.""" headers_raw = response.serialize_headers() headers = httplib.HTTPMessage(io.BytesIO(headers_raw)) return urllib2.addinfourl( fp=io.BytesIO(response.content), headers=headers, url=None, code=response.status_code)
def https_open(self, request): url = request.get_full_url() data = GetRequestData(request) digest = DigestFromRequest(request) response_file_path = os.path.join(RESPONSE_DATA_DIR, digest) requests_seen.append(request) if os.path.exists(response_file_path): f = open(response_file_path, mode='rb') m = Message() m.add_header('Content-Type', 'application/json') resp = addinfourl(f, headers=m, url=url, code=200) resp.msg = 'Success' resp.code = 200 return resp # The file was not there. Create a missing resource file. missing_response_file_path = os.path.join(RESPONSE_DATA_DIR, '{}.missing'.format(digest)) with open(missing_response_file_path, mode='wb') as f: s = json.dumps({ "url": url, "data": StringFromBytes(data), "digest": digest }) f.write(s.encode('utf-8')) raise URLError( 'URL is not cached. Created missing request record: {}.missing'.format( digest))
def https_open(self, request): global disable_network if disable_network: raise URLError('Network access is disabled') url = request.get_full_url() data = GetRequestData(request) digest = DigestFromRequest(request) response_file_path = os.path.join(RESPONSE_DATA_DIR, digest) requests_seen.append(request) if os.path.exists(response_file_path): f = open(response_file_path, mode='rb') m = Message() m.add_header('Content-Type', 'application/json') resp = addinfourl(f, headers=m, url=url, code=200) resp.msg = 'Success' resp.code = 200 return resp # The file was not there. Create a missing resource file. missing_response_file_path = os.path.join(RESPONSE_DATA_DIR, '{}.missing'.format(digest)) with open(missing_response_file_path, mode='wb') as f: s = json.dumps({ "url": url, "data": StringFromBytes(data), "digest": digest }) f.write(s.encode('utf-8')) raise URLError( 'URL is not cached. Created missing request record: {}.missing'.format( digest))
def http_error_default(self, req, fp, code, msg, hdrs): infourl = urllib.addinfourl(fp, hdrs, req.get_full_url()) infourl.status = code infourl.code = code return infourl
def http_error_302(self, req, fp, code, msg, headers): infourl = urllib.addinfourl(fp, headers, req.get_full_url()) infourl.status = code infourl.code = code return infourl
def http_response(self, req, resp): """Handle encodings in the order that they are encountered.""" encodings = [] headers = resp.headers encoding_header = None for header in headers: if header.lower() == "content-encoding": encoding_header = header for encoding in headers[header].split(","): encoding = encoding.strip() if encoding: encodings.append(encoding) break if not encodings: return resp del headers[encoding_header] fp = resp while encodings and encodings[-1].lower() == "gzip": fp = cStringIO.StringIO(fp.read()) fp = gzip.GzipFile(fileobj=fp, mode="r") encodings.pop() if encodings: headers[encoding_header] = ", ".join(encodings) logger.warning("Unrecognized Content-Encoding: %s", encodings[-1]) msg = resp.msg if sys.version_info >= (2, 6): resp = urllib2.addinfourl(fp, headers, resp.url, resp.code) else: response_code = resp.code resp = urllib2.addinfourl(fp, headers, resp.url) resp.code = response_code resp.msg = msg return resp
def do_open(self, http_class, req): h = None host = req.get_host() if not host: raise urllib2.URLError('no host given') try: need_new_connection = 1 key = self._get_connection_key(host) h = self._connections.get(key) if not h is None: try: self._start_connection(h, req) except: r = None else: try: r = h.getresponse() except httplib.ResponseNotReady, e: r = None except httplib.BadStatusLine, e: r = None if r is None or r.version == 9: # httplib falls back to assuming HTTP 0.9 if it gets a # bad header back. This is most likely to happen if # the socket has been closed by the server since we # last used the connection. if DEBUG: print "failed to re-use connection to %s" % host h.close() else: if DEBUG: print "re-using connection to %s" % host need_new_connection = 0 if need_new_connection: if DEBUG: print "creating new connection to %s" % host h = http_class(host) self._connections[key] = h self._start_connection(h, req) r = h.getresponse() except socket.error, err: if h: h.close() raise urllib2.URLError(err) # if not a persistent connection, don't try to reuse it if r.will_close: self._remove_connection(host) if DEBUG: print "STATUS: %s, %s" % (r.status, r.reason) r._handler = self r._host = host r._url = req.get_full_url() #if r.status == 200 or not HANDLE_ERRORS: #return r if r.status == 200 or not HANDLE_ERRORS: # [speedplane] Must return an adinfourl object resp = urllib2.addinfourl(r, r.msg, req.get_full_url()) resp.code = r.status resp.msg = r.reason return resp; else: r.code = r.status return self.parent.error('http', req, r, r.status, r.reason, r.msg)