我们从Python开源项目中,提取了以下23个代码示例,用于说明如何使用pycurl.EFFECTIVE_URL。
def response_from_conn_object(self, conn, header, body): # followlocation if conn.getinfo(pycurl.EFFECTIVE_URL) != self.completeUrl: self.setFinalUrl(conn.getinfo(pycurl.EFFECTIVE_URL)) #pycurl reponse headers includes original => remove header = header[header.find("\r\n\r\n")+1:] self.totaltime = conn.getinfo(pycurl.TOTAL_TIME) rp = Response() rp.parseResponse(header) rp.addContent(body) if self.schema=="https" and self.__proxy: self.response=Response() self.response.parseResponse(rp.getContent()) else: self.response=rp return rp
def fetch(self, request, **kwargs): """Executes an HTTPRequest, returning an HTTPResponse. If an error occurs during the fetch, we raise an HTTPError. """ if not isinstance(request, HTTPRequest): request = HTTPRequest(url=request, **kwargs) buffer = cStringIO.StringIO() headers = httputil.HTTPHeaders() try: _curl_setup_request(self._curl, request, buffer, headers) self._curl.perform() code = self._curl.getinfo(pycurl.HTTP_CODE) effective_url = self._curl.getinfo(pycurl.EFFECTIVE_URL) buffer.seek(0) response = HTTPResponse( request=request, code=code, headers=headers, buffer=buffer, effective_url=effective_url) if code < 200 or code >= 300: raise HTTPError(code, response=response) return response except pycurl.error, e: buffer.close() raise CurlError(*e)
def _finish(self, curl, curl_error=None, curl_message=None): info = curl.info curl.info = None self._multi.remove_handle(curl) self._free_list.append(curl) buffer = info["buffer"] if curl_error: error = CurlError(curl_error, curl_message) code = error.code effective_url = None buffer.close() buffer = None else: error = None code = curl.getinfo(pycurl.HTTP_CODE) effective_url = curl.getinfo(pycurl.EFFECTIVE_URL) buffer.seek(0) # the various curl timings are documented at # http://curl.haxx.se/libcurl/c/curl_easy_getinfo.html time_info = dict( queue=info["curl_start_time"] - info["request"].start_time, namelookup=curl.getinfo(pycurl.NAMELOOKUP_TIME), connect=curl.getinfo(pycurl.CONNECT_TIME), pretransfer=curl.getinfo(pycurl.PRETRANSFER_TIME), starttransfer=curl.getinfo(pycurl.STARTTRANSFER_TIME), total=curl.getinfo(pycurl.TOTAL_TIME), redirect=curl.getinfo(pycurl.REDIRECT_TIME), ) try: info["callback"](HTTPResponse( request=info["request"], code=code, headers=info["headers"], buffer=buffer, effective_url=effective_url, error=error, reason=info['headers'].get("X-Http-Reason", None), request_time=time.time() - info["curl_start_time"], time_info=time_info)) except Exception: self.handle_callback_exception(info["callback"])
def load(self, url, get={}, post={}, referer=True, cookies=True, just_header=False, multipart=False, decode=False): """ load and returns a given page """ self.setRequestContext(url, get, post, referer, cookies, multipart) self.header = "" self.c.setopt(pycurl.HTTPHEADER, self.headers) if just_header: self.c.setopt(pycurl.FOLLOWLOCATION, 0) self.c.setopt(pycurl.NOBODY, 1) self.c.perform() rep = self.header self.c.setopt(pycurl.FOLLOWLOCATION, 1) self.c.setopt(pycurl.NOBODY, 0) else: self.c.perform() rep = self.getResponse() self.c.setopt(pycurl.POSTFIELDS, "") self.lastEffectiveURL = self.c.getinfo(pycurl.EFFECTIVE_URL) self.code = self.verifyHeader() self.addCookies() if decode: rep = self.decodeResponse(rep) return rep
def _finish(self, curl, curl_error=None, curl_message=None): info = curl.info curl.info = None self._multi.remove_handle(curl) self._free_list.append(curl) buffer = info["buffer"] if curl_error: error = CurlError(curl_error, curl_message) code = error.code effective_url = None buffer.close() buffer = None else: error = None code = curl.getinfo(pycurl.HTTP_CODE) effective_url = curl.getinfo(pycurl.EFFECTIVE_URL) buffer.seek(0) # the various curl timings are documented at # http://curl.haxx.se/libcurl/c/curl_easy_getinfo.html time_info = dict( queue=info["curl_start_time"] - info["request"].start_time, namelookup=curl.getinfo(pycurl.NAMELOOKUP_TIME), connect=curl.getinfo(pycurl.CONNECT_TIME), pretransfer=curl.getinfo(pycurl.PRETRANSFER_TIME), starttransfer=curl.getinfo(pycurl.STARTTRANSFER_TIME), total=curl.getinfo(pycurl.TOTAL_TIME), redirect=curl.getinfo(pycurl.REDIRECT_TIME), ) try: info["callback"](HTTPResponse( request=info["request"], code=code, headers=info["headers"], buffer=buffer, effective_url=effective_url, error=error, request_time=time.time() - info["curl_start_time"], time_info=time_info)) except Exception: self.handle_callback_exception(info["callback"])
def _finish(self, curl, curl_error=None, curl_message=None): info = curl.info curl.info = None self._multi.remove_handle(curl) self._free_list.append(curl) buffer = info["buffer"] if curl_error: error = CurlError(curl_error, curl_message) code = error.code body = None effective_url = None buffer.close() buffer = None else: error = None code = curl.getinfo(pycurl.HTTP_CODE) effective_url = curl.getinfo(pycurl.EFFECTIVE_URL) buffer.seek(0) try: info["callback"](HTTPResponse( request=info["request"], code=code, headers=info["headers"], buffer=buffer, effective_url=effective_url, error=error, request_time=time.time() - info["start_time"])) except (KeyboardInterrupt, SystemExit): raise except: logging.error("Exception in callback %r", info["callback"], exc_info=True)
def _finish(self, curl, curl_error=None, curl_message=None): info = curl.info curl.info = None self._multi.remove_handle(curl) self._free_list.append(curl) buffer = info["buffer"] if curl_error: error = CurlError(curl_error, curl_message) code = error.code effective_url = None buffer.close() buffer = None else: error = None code = curl.getinfo(pycurl.HTTP_CODE) effective_url = curl.getinfo(pycurl.EFFECTIVE_URL) buffer.seek(0) try: info["callback"](HTTPResponse( request=info["request"], code=code, headers=info["headers"], buffer=buffer, effective_url=effective_url, error=error, request_time=time.time() - info["start_time"])) except (KeyboardInterrupt, SystemExit): raise except: logging.error("Exception in callback %r", info["callback"], exc_info=True)
def info(c): "Return a dictionary with all info on the last response." m = {} m['effective-url'] = c.getinfo(pycurl.EFFECTIVE_URL) m['http-code'] = c.getinfo(pycurl.HTTP_CODE) m['total-time'] = c.getinfo(pycurl.TOTAL_TIME) m['namelookup-time'] = c.getinfo(pycurl.NAMELOOKUP_TIME) m['connect-time'] = c.getinfo(pycurl.CONNECT_TIME) m['pretransfer-time'] = c.getinfo(pycurl.PRETRANSFER_TIME) m['redirect-time'] = c.getinfo(pycurl.REDIRECT_TIME) m['redirect-count'] = c.getinfo(pycurl.REDIRECT_COUNT) # m['size-upload'] = c.getinfo(pycurl.SIZE_UPLOAD) m['size-download'] = c.getinfo(pycurl.SIZE_DOWNLOAD) # m['speed-upload'] = c.getinfo(pycurl.SPEED_UPLOAD) m['header-size'] = c.getinfo(pycurl.HEADER_SIZE) m['request-size'] = c.getinfo(pycurl.REQUEST_SIZE) m['content-length-download'] = c.getinfo(pycurl.CONTENT_LENGTH_DOWNLOAD) m['content-length-upload'] = c.getinfo(pycurl.CONTENT_LENGTH_UPLOAD) m['content-type'] = c.getinfo(pycurl.CONTENT_TYPE) m['response-code'] = c.getinfo(pycurl.RESPONSE_CODE) m['speed-download'] = c.getinfo(pycurl.SPEED_DOWNLOAD) # m['ssl-verifyresult'] = c.getinfo(pycurl.SSL_VERIFYRESULT) m['filetime'] = c.getinfo(pycurl.INFO_FILETIME) m['starttransfer-time'] = c.getinfo(pycurl.STARTTRANSFER_TIME) m['redirect-time'] = c.getinfo(pycurl.REDIRECT_TIME) m['redirect-count'] = c.getinfo(pycurl.REDIRECT_COUNT) m['http-connectcode'] = c.getinfo(pycurl.HTTP_CONNECTCODE) # m['httpauth-avail'] = c.getinfo(pycurl.HTTPAUTH_AVAIL) # m['proxyauth-avail'] = c.getinfo(pycurl.PROXYAUTH_AVAIL) # m['os-errno'] = c.getinfo(pycurl.OS_ERRNO) m['num-connects'] = c.getinfo(pycurl.NUM_CONNECTS) # m['ssl-engines'] = c.getinfo(pycurl.SSL_ENGINES) # m['cookielist'] = c.getinfo(pycurl.INFO_COOKIELIST) # m['lastsocket'] = c.getinfo(pycurl.LASTSOCKET) # m['ftp-entry-path'] = c.getinfo(pycurl.FTP_ENTRY_PATH) return m
def info(self): "Return a dictionary with all info on the last response." m = {} m['effective-url'] = self.handle.getinfo(pycurl.EFFECTIVE_URL) m['http-code'] = self.handle.getinfo(pycurl.HTTP_CODE) m['total-time'] = self.handle.getinfo(pycurl.TOTAL_TIME) m['namelookup-time'] = self.handle.getinfo(pycurl.NAMELOOKUP_TIME) m['connect-time'] = self.handle.getinfo(pycurl.CONNECT_TIME) m['pretransfer-time'] = self.handle.getinfo(pycurl.PRETRANSFER_TIME) m['redirect-time'] = self.handle.getinfo(pycurl.REDIRECT_TIME) m['redirect-count'] = self.handle.getinfo(pycurl.REDIRECT_COUNT) m['size-upload'] = self.handle.getinfo(pycurl.SIZE_UPLOAD) m['size-download'] = self.handle.getinfo(pycurl.SIZE_DOWNLOAD) m['speed-upload'] = self.handle.getinfo(pycurl.SPEED_UPLOAD) m['header-size'] = self.handle.getinfo(pycurl.HEADER_SIZE) m['request-size'] = self.handle.getinfo(pycurl.REQUEST_SIZE) m['content-length-download'] = self.handle.getinfo(pycurl.CONTENT_LENGTH_DOWNLOAD) m['content-length-upload'] = self.handle.getinfo(pycurl.CONTENT_LENGTH_UPLOAD) m['content-type'] = self.handle.getinfo(pycurl.CONTENT_TYPE) m['response-code'] = self.handle.getinfo(pycurl.RESPONSE_CODE) m['speed-download'] = self.handle.getinfo(pycurl.SPEED_DOWNLOAD) m['ssl-verifyresult'] = self.handle.getinfo(pycurl.SSL_VERIFYRESULT) m['filetime'] = self.handle.getinfo(pycurl.INFO_FILETIME) m['starttransfer-time'] = self.handle.getinfo(pycurl.STARTTRANSFER_TIME) m['redirect-time'] = self.handle.getinfo(pycurl.REDIRECT_TIME) m['redirect-count'] = self.handle.getinfo(pycurl.REDIRECT_COUNT) m['http-connectcode'] = self.handle.getinfo(pycurl.HTTP_CONNECTCODE) m['httpauth-avail'] = self.handle.getinfo(pycurl.HTTPAUTH_AVAIL) m['proxyauth-avail'] = self.handle.getinfo(pycurl.PROXYAUTH_AVAIL) m['os-errno'] = self.handle.getinfo(pycurl.OS_ERRNO) m['num-connects'] = self.handle.getinfo(pycurl.NUM_CONNECTS) m['ssl-engines'] = self.handle.getinfo(pycurl.SSL_ENGINES) m['cookielist'] = self.handle.getinfo(pycurl.INFO_COOKIELIST) m['lastsocket'] = self.handle.getinfo(pycurl.LASTSOCKET) m['ftp-entry-path'] = self.handle.getinfo(pycurl.FTP_ENTRY_PATH) return m
def perform(cls): if cls._futures: while True: status, num_active = cls._multi.perform() if status != pycurl.E_CALL_MULTI_PERFORM: break while True: num_ready, success, fail = cls._multi.info_read() for c in success: cc = cls._futures.pop(c) result = curl_result(c) result['url'] = c._raw_url result['id'] = c._raw_id result['state'] = 'normal' result['spider'] = 'pycurl' result['payload'] = payload = c._raw_payload # post_func = payload.get('post_func') # if type(post_func) == str: # post_func = load(post_func) # if post_func: # result = post_func(payload, result) cc.set_result(result) for c, err_num, err_msg in fail: print('error:', err_num, err_msg, c.getinfo(pycurl.EFFECTIVE_URL)) result = curl_result(c) result['url'] = c._raw_url result['id'] = c._raw_id result['state'] = 'error' result['spider'] = 'pycurl' result['error_code'] = err_num result['error_desc'] = err_msg result['payload'] = payload = c._raw_payload # post_func = payload.get('post_func') # if type(post_func) == str: # post_func = load(post_func) # if post_func: # result2 = post_func(payload, result) # if type(result2) is dict and len(result2) >= len(result): # result = result2 cls._futures.pop(c).set_exception(CurlLoop.CurlException(code=err_num, desc=err_msg, data=result)) if num_ready == 0: break
def curl_result(c): effective_url = c.getinfo(pycurl.EFFECTIVE_URL) primary_ip = c.getinfo(pycurl.PRIMARY_IP) primary_port = c.getinfo(pycurl.PRIMARY_PORT) local_ip = c.getinfo(pycurl.LOCAL_IP) local_port = c.getinfo(pycurl.LOCAL_PORT) speed_download = c.getinfo(pycurl.SPEED_DOWNLOAD) size_download = c.getinfo(pycurl.SIZE_DOWNLOAD) redirect_time = c.getinfo(pycurl.REDIRECT_TIME) redirect_count = c.getinfo(pycurl.REDIRECT_COUNT) redirect_url = c.getinfo(pycurl.REDIRECT_URL) http_code = c.getinfo(pycurl.HTTP_CODE) response_code = c.getinfo(pycurl.RESPONSE_CODE) total_time = c.getinfo(pycurl.TOTAL_TIME) content_type = c.getinfo(pycurl.CONTENT_TYPE) namelookup_time = c.getinfo(pycurl.NAMELOOKUP_TIME) info_filetime = c.getinfo(pycurl.INFO_FILETIME) http_connectcode = c.getinfo(pycurl.HTTP_CONNECTCODE) starttransfer_time = c.getinfo(pycurl.STARTTRANSFER_TIME) pretransfer_time = c.getinfo(pycurl.PRETRANSFER_TIME) header_size = c.getinfo(pycurl.HEADER_SIZE) request_size = c.getinfo(pycurl.REQUEST_SIZE) ssl_verifyresult = c.getinfo(pycurl.SSL_VERIFYRESULT) num_connects = c.getinfo(pycurl.NUM_CONNECTS) return { 'effective_url': effective_url, 'primary_ip': primary_ip, 'primary_port': primary_port, 'local_ip': local_ip, 'local_port': local_port, 'speed_download': speed_download, 'size_download': size_download, 'redirect_time': redirect_time, 'redirect_count': redirect_count, 'redirect_url': redirect_url, 'http_code': http_code, 'response_code': response_code, 'total_time': total_time, 'content_type': content_type, 'namelookup_time': namelookup_time, 'info_filetime': info_filetime, 'http_connectcode': http_connectcode, 'starttransfer_time': starttransfer_time, 'pretransfer_time': pretransfer_time, 'header_size': header_size, 'request_size': request_size, 'ssl_verifyresult': ssl_verifyresult, 'num_connects': num_connects, # 'proxy_ssl_verifyresult': proxy_ssl_verifyresult, # 'app_connecttime': app_connecttime, }
def load(self, url, get={}, post={}, referer=True, cookies=True, just_header=False, multipart=False, decode=False): """ Load and returns a given page. """ self.set_request_context(url, get, post, referer, cookies, multipart) # TODO: use http/rfc message instead self.header = "" if "header" in self.options: # TODO # print("custom header not implemented") self.setopt(pycurl.HTTPHEADER, self.options['header']) if just_header: self.setopt(pycurl.FOLLOWLOCATION, 0) self.setopt(pycurl.NOBODY, 1) # TODO: nobody= no post? # overwrite HEAD request, we want a common request type if post: self.setopt(pycurl.CUSTOMREQUEST, 'POST') else: self.setopt(pycurl.CUSTOMREQUEST, 'GET') try: self.c.perform() rep = self.header finally: self.setopt(pycurl.FOLLOWLOCATION, 1) self.setopt(pycurl.NOBODY, 0) self.unsetopt(pycurl.CUSTOMREQUEST) else: self.c.perform() rep = self.get_response() self.setopt(pycurl.POSTFIELDS, '') self.last_url = safequote(url) self.last_effective_url = self.c.getinfo(pycurl.EFFECTIVE_URL) if self.last_effective_url: self.last_url = self.last_effective_url self.code = self.verify_header() if cookies: self.parse_cookies() if decode: rep = self.decode_response(rep) return rep