我们从Python开源项目中,提取了以下9个代码示例,用于说明如何使用pycurl.IPRESOLVE。
def _process_queue(self): while True: started = 0 while self._free_list and self._requests: started += 1 curl = self._free_list.pop() (request, callback) = self._requests.popleft() curl.info = { "headers": httputil.HTTPHeaders(), "buffer": cStringIO.StringIO(), "request": request, "callback": callback, "start_time": time.time(), } # Disable IPv6 to mitigate the effects of this bug # on curl versions <= 7.21.0 # http://sourceforge.net/tracker/?func=detail&aid=3017819&group_id=976&atid=100976 if pycurl.version_info()[2] <= 0x71500: # 7.21.0 curl.setopt(pycurl.IPRESOLVE, pycurl.IPRESOLVE_V4) _curl_setup_request(curl, request, curl.info["buffer"], curl.info["headers"]) self._multi.add_handle(curl) if not started: break
def setInterface(self, options): interface, proxy, ipv6 = options["interface"], options["proxies"], options["ipv6"] if interface and interface.lower() != "none": self.c.setopt(pycurl.INTERFACE, str(interface)) if proxy: if proxy["type"] == "socks4": self.c.setopt(pycurl.PROXYTYPE, pycurl.PROXYTYPE_SOCKS4) elif proxy["type"] == "socks5": self.c.setopt(pycurl.PROXYTYPE, pycurl.PROXYTYPE_SOCKS5) else: self.c.setopt(pycurl.PROXYTYPE, pycurl.PROXYTYPE_HTTP) self.c.setopt(pycurl.PROXY, str(proxy["address"])) self.c.setopt(pycurl.PROXYPORT, proxy["port"]) if proxy["username"]: self.c.setopt(pycurl.PROXYUSERPWD, str("%s:%s" % (proxy["username"], proxy["password"]))) if ipv6: self.c.setopt(pycurl.IPRESOLVE, pycurl.IPRESOLVE_WHATEVER) else: self.c.setopt(pycurl.IPRESOLVE, pycurl.IPRESOLVE_V4) if "auth" in options: self.c.setopt(pycurl.USERPWD, str(options["auth"])) if "timeout" in options: self.c.setopt(pycurl.LOW_SPEED_TIME, options["timeout"])
def _process_queue(self): with stack_context.NullContext(): while True: started = 0 while self._free_list and self._requests: started += 1 curl = self._free_list.pop() (request, callback) = self._requests.popleft() curl.info = { "headers": httputil.HTTPHeaders(), "buffer": cStringIO.StringIO(), "request": request, "callback": callback, "curl_start_time": time.time(), } # Disable IPv6 to mitigate the effects of this bug # on curl versions <= 7.21.0 # http://sourceforge.net/tracker/?func=detail&aid=3017819&group_id=976&atid=100976 if pycurl.version_info()[2] <= 0x71500: # 7.21.0 curl.setopt(pycurl.IPRESOLVE, pycurl.IPRESOLVE_V4) _curl_setup_request(curl, request, curl.info["buffer"], curl.info["headers"]) self._multi.add_handle(curl) if not started: break
def _process_queue(self): with stack_context.NullContext(): while True: started = 0 while self._free_list and self._requests: started += 1 curl = self._free_list.pop() (request, callback) = self._requests.popleft() curl.info = { "headers": httputil.HTTPHeaders(), "buffer": BytesIO(), "request": request, "callback": callback, "curl_start_time": time.time(), } # Disable IPv6 to mitigate the effects of this bug # on curl versions <= 7.21.0 # http://sourceforge.net/tracker/?func=detail&aid=3017819&group_id=976&atid=100976 if pycurl.version_info()[2] <= 0x71500: # 7.21.0 curl.setopt(pycurl.IPRESOLVE, pycurl.IPRESOLVE_V4) _curl_setup_request(curl, request, curl.info["buffer"], curl.info["headers"]) self._multi.add_handle(curl) if not started: break
def set_interface(self, options): interface, proxy, ipv6 = options[ 'interface'], options['proxies'], options['ipv6'] if interface and interface.lower() != "none": self.setopt(pycurl.INTERFACE, interface) if proxy: if proxy['type'] == "socks4": self.setopt(pycurl.PROXYTYPE, pycurl.PROXYTYPE_SOCKS4) elif proxy['type'] == "socks5": self.setopt(pycurl.PROXYTYPE, pycurl.PROXYTYPE_SOCKS5) else: self.setopt(pycurl.PROXYTYPE, pycurl.PROXYTYPE_HTTP) self.setopt(pycurl.PROXY, proxy['host']) self.setopt(pycurl.PROXYPORT, proxy['port']) if proxy['username']: userpwd = "{0}:{1}".format( proxy['username'], proxy['password']) self.setopt(pycurl.PROXYUSERPWD, userpwd) if ipv6: self.setopt(pycurl.IPRESOLVE, pycurl.IPRESOLVE_WHATEVER) else: self.setopt(pycurl.IPRESOLVE, pycurl.IPRESOLVE_V4) if "timeout" in options: self.setopt(pycurl.LOW_SPEED_TIME, options['timeout']) if "auth" in options: self.setopt(pycurl.USERPWD, self.options['auth'])
def curl_common_init(buf): handle = pycurl.Curl() handle.setopt(pycurl.WRITEDATA, buf) handle.setopt(pycurl.HEADERFUNCTION, curl_hdr) handle.setopt(pycurl.DEBUGFUNCTION, curl_debug) handle.setopt(pycurl.USERPWD, '{}:{}'.format(_g.conf._user,_g.conf._pass)) handle.setopt(pycurl.FOLLOWLOCATION, True) # avoid FTP CWD for fastest directory transversal handle.setopt(pycurl.FTP_FILEMETHOD, pycurl.FTPMETHOD_NOCWD) # we always set this flag and let the logging module # handle filtering. handle.setopt(pycurl.VERBOSE, True) # use ipv4 for VPNs handle.setopt(pycurl.IPRESOLVE, pycurl.IPRESOLVE_V4) handle.setopt(pycurl.USE_SSL, True) handle.setopt(pycurl.SSL_VERIFYPEER, False) # XXX handle.setopt(pycurl.SSL_VERIFYHOST, 0) return handle
def __init__(self, handle=None, options=None): self.setCurlHandle(handle) if options == None: options = Options() self.setOptions(options) # To start with, disable FOLLOWLOCATION since we'll handle it self._handle.setopt(pycurl.FOLLOWLOCATION, False) # Force IPv4, since this class isn't yet compatible with IPv6 self._handle.setopt(pycurl.IPRESOLVE, pycurl.IPRESOLVE_V4)