我们从Python开源项目中,提取了以下11个代码示例,用于说明如何使用pycurl.PROXYUSERPWD。
def request(self, endpoint, post=None): buffer = BytesIO() ch = pycurl.Curl() ch.setopt(pycurl.URL, Constants.API_URL + endpoint) ch.setopt(pycurl.USERAGENT, self.userAgent) ch.setopt(pycurl.WRITEFUNCTION, buffer.write) ch.setopt(pycurl.FOLLOWLOCATION, True) ch.setopt(pycurl.HEADER, True) ch.setopt(pycurl.VERBOSE, False) ch.setopt(pycurl.COOKIEFILE, os.path.join(self.IGDataPath, self.username, self.username + "-cookies.dat")) ch.setopt(pycurl.COOKIEJAR, os.path.join(self.IGDataPath, self.username, self.username + "-cookies.dat")) if post is not None: ch.setopt(pycurl.POST, True) ch.setopt(pycurl.POSTFIELDS, post) if self.proxy: ch.setopt(pycurl.PROXY, self.proxyHost) if self.proxyAuth: ch.setopt(pycurl.PROXYUSERPWD, self.proxyAuth) ch.perform() resp = buffer.getvalue() header_len = ch.getinfo(pycurl.HEADER_SIZE) header = resp[0: header_len] body = resp[header_len:] ch.close() if self.debug: print("REQUEST: " + endpoint) if post is not None: if not isinstance(post, list): print("DATA: " + str(post)) print("RESPONSE: " + body) return [header, json_decode(body)]
def request(self, url, method, body, headers): c = pycurl.Curl() c.setopt(pycurl.URL, url) if 'proxy_host' in self.proxy: c.setopt(pycurl.PROXY, self.proxy['proxy_host']) if 'proxy_port' in self.proxy: c.setopt(pycurl.PROXYPORT, self.proxy['proxy_port']) if 'proxy_user' in self.proxy: c.setopt(pycurl.PROXYUSERPWD, "%(proxy_user)s:%(proxy_pass)s" % self.proxy) self.buf = StringIO() c.setopt(pycurl.WRITEFUNCTION, self.buf.write) #c.setopt(pycurl.READFUNCTION, self.read) #self.body = StringIO(body) #c.setopt(pycurl.HEADERFUNCTION, self.header) if self.cacert: c.setopt(c.CAINFO, self.cacert) c.setopt(pycurl.SSL_VERIFYPEER, self.cacert and 1 or 0) c.setopt(pycurl.SSL_VERIFYHOST, self.cacert and 2 or 0) c.setopt(pycurl.CONNECTTIMEOUT, self.timeout) c.setopt(pycurl.TIMEOUT, self.timeout) if method == 'POST': c.setopt(pycurl.POST, 1) c.setopt(pycurl.POSTFIELDS, body) if headers: hdrs = ['%s: %s' % (k, v) for k, v in headers.items()] log.debug(hdrs) c.setopt(pycurl.HTTPHEADER, hdrs) c.perform() c.close() return {}, self.buf.getvalue()
def request(self, url, method, body, headers): c = pycurl.Curl() c.setopt(pycurl.URL, url) if 'proxy_host' in self.proxy: c.setopt(pycurl.PROXY, self.proxy['proxy_host']) if 'proxy_port' in self.proxy: c.setopt(pycurl.PROXYPORT, self.proxy['proxy_port']) if 'proxy_user' in self.proxy: c.setopt(pycurl.PROXYUSERPWD, "%(proxy_user)s:%(proxy_pass)s" % self.proxy) self.buf = StringIO() c.setopt(pycurl.WRITEFUNCTION, self.buf.write) #c.setopt(pycurl.READFUNCTION, self.read) #self.body = StringIO(body) #c.setopt(pycurl.HEADERFUNCTION, self.header) if self.cacert: c.setopt(c.CAINFO, self.cacert) c.setopt(pycurl.SSL_VERIFYPEER, self.cacert and 1 or 0) c.setopt(pycurl.SSL_VERIFYHOST, self.cacert and 2 or 0) c.setopt(pycurl.CONNECTTIMEOUT, self.timeout / 6) c.setopt(pycurl.TIMEOUT, self.timeout) if method == 'POST': c.setopt(pycurl.POST, 1) c.setopt(pycurl.POSTFIELDS, body) if headers: hdrs = ['%s: %s' % (k, v) for k, v in headers.items()] log.debug(hdrs) c.setopt(pycurl.HTTPHEADER, hdrs) c.perform() c.close() return {}, self.buf.getvalue()
def setInterface(self, options): interface, proxy, ipv6 = options["interface"], options["proxies"], options["ipv6"] if interface and interface.lower() != "none": self.c.setopt(pycurl.INTERFACE, str(interface)) if proxy: if proxy["type"] == "socks4": self.c.setopt(pycurl.PROXYTYPE, pycurl.PROXYTYPE_SOCKS4) elif proxy["type"] == "socks5": self.c.setopt(pycurl.PROXYTYPE, pycurl.PROXYTYPE_SOCKS5) else: self.c.setopt(pycurl.PROXYTYPE, pycurl.PROXYTYPE_HTTP) self.c.setopt(pycurl.PROXY, str(proxy["address"])) self.c.setopt(pycurl.PROXYPORT, proxy["port"]) if proxy["username"]: self.c.setopt(pycurl.PROXYUSERPWD, str("%s:%s" % (proxy["username"], proxy["password"]))) if ipv6: self.c.setopt(pycurl.IPRESOLVE, pycurl.IPRESOLVE_WHATEVER) else: self.c.setopt(pycurl.IPRESOLVE, pycurl.IPRESOLVE_V4) if "auth" in options: self.c.setopt(pycurl.USERPWD, str(options["auth"])) if "timeout" in options: self.c.setopt(pycurl.LOW_SPEED_TIME, options["timeout"])
def set_interface(self, options): interface, proxy, ipv6 = options[ 'interface'], options['proxies'], options['ipv6'] if interface and interface.lower() != "none": self.setopt(pycurl.INTERFACE, interface) if proxy: if proxy['type'] == "socks4": self.setopt(pycurl.PROXYTYPE, pycurl.PROXYTYPE_SOCKS4) elif proxy['type'] == "socks5": self.setopt(pycurl.PROXYTYPE, pycurl.PROXYTYPE_SOCKS5) else: self.setopt(pycurl.PROXYTYPE, pycurl.PROXYTYPE_HTTP) self.setopt(pycurl.PROXY, proxy['host']) self.setopt(pycurl.PROXYPORT, proxy['port']) if proxy['username']: userpwd = "{0}:{1}".format( proxy['username'], proxy['password']) self.setopt(pycurl.PROXYUSERPWD, userpwd) if ipv6: self.setopt(pycurl.IPRESOLVE, pycurl.IPRESOLVE_WHATEVER) else: self.setopt(pycurl.IPRESOLVE, pycurl.IPRESOLVE_V4) if "timeout" in options: self.setopt(pycurl.LOW_SPEED_TIME, options['timeout']) if "auth" in options: self.setopt(pycurl.USERPWD, self.options['auth'])
def request(self, method, url, headers, post_data=None): s = util.StringIO.StringIO() rheaders = util.StringIO.StringIO() curl = pycurl.Curl() proxy = self._get_proxy(url) if proxy: if proxy.hostname: curl.setopt(pycurl.PROXY, proxy.hostname) if proxy.port: curl.setopt(pycurl.PROXYPORT, proxy.port) if proxy.username or proxy.password: curl.setopt( pycurl.PROXYUSERPWD, "%s:%s" % (proxy.username, proxy.password)) if method == 'get': curl.setopt(pycurl.HTTPGET, 1) elif method == 'post': curl.setopt(pycurl.POST, 1) curl.setopt(pycurl.POSTFIELDS, post_data) else: curl.setopt(pycurl.CUSTOMREQUEST, method.upper()) # pycurl doesn't like unicode URLs curl.setopt(pycurl.URL, util.utf8(url)) curl.setopt(pycurl.WRITEFUNCTION, s.write) curl.setopt(pycurl.HEADERFUNCTION, rheaders.write) curl.setopt(pycurl.NOSIGNAL, 1) curl.setopt(pycurl.CONNECTTIMEOUT, 30) curl.setopt(pycurl.TIMEOUT, 80) curl.setopt(pycurl.HTTPHEADER, ['%s: %s' % (k, v) for k, v in headers.items()]) if self._verify_ssl_certs: curl.setopt(pycurl.CAINFO, os.path.join( os.path.dirname(__file__), 'data/ca-certificates.crt')) else: curl.setopt(pycurl.SSL_VERIFYHOST, False) try: curl.perform() except pycurl.error as e: self._handle_request_error(e) rbody = s.getvalue() rcode = curl.getinfo(pycurl.RESPONSE_CODE) return rbody, rcode, self.parse_headers(rheaders.getvalue())