我们从Python开源项目中,提取了以下47个代码示例,用于说明如何使用pycurl.WRITEFUNCTION。
def request(self, endpoint, post=None): buffer = BytesIO() ch = pycurl.Curl() ch.setopt(pycurl.URL, Constants.API_URL + endpoint) ch.setopt(pycurl.USERAGENT, self.userAgent) ch.setopt(pycurl.WRITEFUNCTION, buffer.write) ch.setopt(pycurl.FOLLOWLOCATION, True) ch.setopt(pycurl.HEADER, True) ch.setopt(pycurl.VERBOSE, False) ch.setopt(pycurl.COOKIEFILE, os.path.join(self.IGDataPath, self.username, self.username + "-cookies.dat")) ch.setopt(pycurl.COOKIEJAR, os.path.join(self.IGDataPath, self.username, self.username + "-cookies.dat")) if post is not None: ch.setopt(pycurl.POST, True) ch.setopt(pycurl.POSTFIELDS, post) if self.proxy: ch.setopt(pycurl.PROXY, self.proxyHost) if self.proxyAuth: ch.setopt(pycurl.PROXYUSERPWD, self.proxyAuth) ch.perform() resp = buffer.getvalue() header_len = ch.getinfo(pycurl.HEADER_SIZE) header = resp[0: header_len] body = resp[header_len:] ch.close() if self.debug: print("REQUEST: " + endpoint) if post is not None: if not isinstance(post, list): print("DATA: " + str(post)) print("RESPONSE: " + body) return [header, json_decode(body)]
def CurlPOST(url, data, cookie): c = pycurl.Curl() b = StringIO.StringIO() c.setopt(pycurl.URL, url) c.setopt(pycurl.POST, 1) c.setopt(pycurl.HTTPHEADER,['Content-Type: application/json']) # c.setopt(pycurl.TIMEOUT, 10) c.setopt(pycurl.WRITEFUNCTION, b.write) c.setopt(pycurl.COOKIEFILE, cookie) c.setopt(pycurl.COOKIEJAR, cookie) c.setopt(pycurl.POSTFIELDS, data) c.perform() html = b.getvalue() b.close() c.close() return html
def test_post(self): curl = CurlStub(b"result") result = fetch("http://example.com", post=True, curl=curl) self.assertEqual(result, b"result") self.assertEqual(curl.options, {pycurl.URL: b"http://example.com", pycurl.FOLLOWLOCATION: 1, pycurl.MAXREDIRS: 5, pycurl.CONNECTTIMEOUT: 30, pycurl.LOW_SPEED_LIMIT: 1, pycurl.LOW_SPEED_TIME: 600, pycurl.NOSIGNAL: 1, pycurl.WRITEFUNCTION: Any(), pycurl.POST: True, pycurl.DNS_CACHE_TIMEOUT: 0, pycurl.ENCODING: b"gzip,deflate"})
def test_post_data(self): curl = CurlStub(b"result") result = fetch("http://example.com", post=True, data="data", curl=curl) self.assertEqual(result, b"result") self.assertEqual(curl.options[pycurl.READFUNCTION](), b"data") self.assertEqual(curl.options, {pycurl.URL: b"http://example.com", pycurl.FOLLOWLOCATION: 1, pycurl.MAXREDIRS: 5, pycurl.CONNECTTIMEOUT: 30, pycurl.LOW_SPEED_LIMIT: 1, pycurl.LOW_SPEED_TIME: 600, pycurl.NOSIGNAL: 1, pycurl.WRITEFUNCTION: Any(), pycurl.POST: True, pycurl.POSTFIELDSIZE: 4, pycurl.READFUNCTION: Any(), pycurl.DNS_CACHE_TIMEOUT: 0, pycurl.ENCODING: b"gzip,deflate"})
def test_cainfo(self): curl = CurlStub(b"result") result = fetch("https://example.com", cainfo="cainfo", curl=curl) self.assertEqual(result, b"result") self.assertEqual(curl.options, {pycurl.URL: b"https://example.com", pycurl.FOLLOWLOCATION: 1, pycurl.MAXREDIRS: 5, pycurl.CONNECTTIMEOUT: 30, pycurl.LOW_SPEED_LIMIT: 1, pycurl.LOW_SPEED_TIME: 600, pycurl.NOSIGNAL: 1, pycurl.WRITEFUNCTION: Any(), pycurl.CAINFO: b"cainfo", pycurl.DNS_CACHE_TIMEOUT: 0, pycurl.ENCODING: b"gzip,deflate"})
def test_timeouts(self): curl = CurlStub(b"result") result = fetch("http://example.com", connect_timeout=5, total_timeout=30, curl=curl) self.assertEqual(result, b"result") self.assertEqual(curl.options, {pycurl.URL: b"http://example.com", pycurl.FOLLOWLOCATION: 1, pycurl.MAXREDIRS: 5, pycurl.CONNECTTIMEOUT: 5, pycurl.LOW_SPEED_LIMIT: 1, pycurl.LOW_SPEED_TIME: 30, pycurl.NOSIGNAL: 1, pycurl.WRITEFUNCTION: Any(), pycurl.DNS_CACHE_TIMEOUT: 0, pycurl.ENCODING: b"gzip,deflate"})
def test_pycurl_insecure(self): curl = CurlStub(b"result") result = fetch("http://example.com/get-ca-cert", curl=curl, insecure=True) self.assertEqual(result, b"result") self.assertEqual(curl.options, {pycurl.URL: b"http://example.com/get-ca-cert", pycurl.FOLLOWLOCATION: 1, pycurl.MAXREDIRS: 5, pycurl.CONNECTTIMEOUT: 30, pycurl.LOW_SPEED_LIMIT: 1, pycurl.LOW_SPEED_TIME: 600, pycurl.NOSIGNAL: 1, pycurl.WRITEFUNCTION: Any(), pycurl.SSL_VERIFYPEER: False, pycurl.DNS_CACHE_TIMEOUT: 0, pycurl.ENCODING: b"gzip,deflate"})
def postXmlSSL(self, xml, url, second=30, cert=True, post=True): """????""" self.curl.setopt(pycurl.URL, url) self.curl.setopt(pycurl.TIMEOUT, second) # ???? # ?????cert ? key ??????.pem?? # ?????PEM????? if cert: self.curl.setopt(pycurl.SSLKEYTYPE, "PEM") self.curl.setopt(pycurl.SSLKEY, WxPayConf_pub.SSLKEY_PATH) self.curl.setopt(pycurl.SSLCERTTYPE, "PEM") self.curl.setopt(pycurl.SSLCERT, WxPayConf_pub.SSLCERT_PATH) # post???? if post: self.curl.setopt(pycurl.POST, True) self.curl.setopt(pycurl.POSTFIELDS, xml) buff = StringIO() self.curl.setopt(pycurl.WRITEFUNCTION, buff.write) self.curl.perform() return buff.getvalue()
def _get_url(self, url): if self.API_TOKEN == None: logging.error('none token') # 3 For ERROR level return try: c = pycurl.Curl() c.setopt(pycurl.CAINFO, certifi.where()) c.setopt(pycurl.URL, url) b = StringIO.StringIO() c.setopt(pycurl.WRITEFUNCTION, b.write) c.setopt(pycurl.USERAGENT, "Mozilla/4.0 (compatible; MSIE 6.0; Windows NT 5.1; SV1; .NET CLR 1.1.4322)") c.setopt(pycurl.HTTPHEADER, ['Authorization: JWT %s' % self.API_TOKEN.encode()]) c.setopt(pycurl.CUSTOMREQUEST, "GET") c.setopt(pycurl.FOLLOWLOCATION, 1) c.perform() result = b.getvalue() logging.debug('result') except Exception as e: logging.error(e.message) logging.error('go error') pass return result
def __init__(self, uploader): self.handle = pycurl.Curl() self.response_headers = {} self.output = six.StringIO() self.status_code = None self.handle.setopt(pycurl.CAINFO, certifi.where()) self.handle.setopt(pycurl.URL, uploader.url) self.handle.setopt(pycurl.HEADERFUNCTION, self._prepare_response_header) self.handle.setopt(pycurl.UPLOAD, 1) self.handle.setopt(pycurl.CUSTOMREQUEST, 'PATCH') self.file = uploader.get_file_stream() self.file.seek(uploader.offset) self.handle.setopt(pycurl.READFUNCTION, self.file.read) self.handle.setopt(pycurl.WRITEFUNCTION, self.output.write) self.handle.setopt(pycurl.INFILESIZE, uploader.request_length) headers = ["upload-offset: {}".format(uploader.offset), "Content-Type: application/offset+octet-stream"] + uploader.headers_as_list self.handle.setopt(pycurl.HTTPHEADER, headers)
def get_page_data(url, head = None, curl = None): stream_buffer = StringIO() if not curl: curl = pycurl.Curl() curl.setopt(pycurl.URL, url)#curl doesn't support unicode if head: curl.setopt(pycurl.HTTPHEADER, head)#must be list, not dict curl.setopt(pycurl.WRITEFUNCTION, stream_buffer.write) curl.setopt(pycurl.CUSTOMREQUEST,"GET") curl.setopt(pycurl.CONNECTTIMEOUT, 30) curl.setopt(pycurl.TIMEOUT, 30) curl.setopt(pycurl.SSL_VERIFYPEER, 0) curl.setopt(pycurl.SSL_VERIFYHOST, 0) curl.perform() page_data =stream_buffer.getvalue() stream_buffer.close() return page_data
def post_page_data(url, data = None, head = None, curl = None): stream_buffer = StringIO() if not curl: curl = pycurl.Curl() curl.setopt(pycurl.URL, url)#curl doesn't support unicode if head: curl.setopt(pycurl.HTTPHEADER, head)#must be list, not dict curl.setopt(pycurl.POSTFIELDS, data) curl.setopt(pycurl.WRITEFUNCTION, stream_buffer.write) curl.setopt(pycurl.CUSTOMREQUEST,"POST") # curl.setopt(pycurl.CONNECTTIMEOUT, 30) # curl.setopt(pycurl.TIMEOUT, 30) curl.perform() page_data = stream_buffer.getvalue() stream_buffer.close() return page_data
def getlat4city(): avglat = -1 sp_url = "http://www.super-ping.com/ping.php?node=" + CITY + "&ping=" + WAN_IP sp_refer_url = "http://www.super-ping.com/?ping=" + WAN_IP + "&locale=en" sp_http_headers = [ 'Referer: ' + sp_refer_url, 'X-Requested-With: XMLHttpRequest'] crl = pyc.Curl() sio = StringIO() crl.setopt(pyc.URL, sp_url) crl.setopt(pyc.HTTPHEADER, sp_http_headers) crl.setopt(pyc.WRITEFUNCTION, sio.write) crl.perform() crl.close() lat_http_result = sio.getvalue() #process http result only if html if lat_http_result.strip() != "-" and lat_http_result.strip() != "super-ping.com": fstring="ping-avg'>" lstring="</div>" start = lat_http_result.index(fstring) + len(fstring) end = lat_http_result.index(lstring,start) avglat = lat_http_result[start:end] return float(avglat)
def curl(url, debug=False, **kwargs): while 1: try: s = StringIO.StringIO() c = pycurl.Curl() c.setopt(pycurl.URL, url) c.setopt(pycurl.REFERER, url) c.setopt(pycurl.FOLLOWLOCATION, True) c.setopt(pycurl.TIMEOUT, 60) c.setopt(pycurl.ENCODING, 'gzip') c.setopt(pycurl.USERAGENT, 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_0) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/31.0.1650.63 Safari/537.36') c.setopt(pycurl.NOSIGNAL, True) c.setopt(pycurl.WRITEFUNCTION, s.write) for k, v in kwargs.iteritems(): c.setopt(vars(pycurl)[k], v) c.perform() c.close() return s.getvalue() except: if debug: raise continue
def __init__(self, *args, **kwargs): self.c = pycurl.Curl() Request.__init__(self, *args, **kwargs) self.rep = io.StringIO() self.last_url = None self.last_effective_url = None self.header = "" # cookiejar defines the context self.cj = self.context self.setopt(pycurl.WRITEFUNCTION, self.write) self.setopt(pycurl.HEADERFUNCTION, self.write_header) # TODO: Rename to curl
def scan(self, time): scan_time,mz,scan_name,st,scan_mode = min(self.scans(), key=lambda si: abs(time - si[0])) self.crl.setopt(pycurl.HTTPGET, True) self.crl.setopt(pycurl.URL, str(scan_name + '.txt')) response = cStringIO.StringIO() self.crl.setopt(pycurl.WRITEFUNCTION, response.write) for i in range(5): #print 'scan %d' % i self.crl.perform() if response.getvalue(): break scan = response.getvalue().splitlines() # how to get charge for an mzURL scan? return mzScan([tuple(float(v) for v in s.split()) for s in scan], scan_time, mode=scan_mode, mz=mz)
def xic(self, start_time, stop_time, start_mz, stop_mz, filter=None): xic_url = str(self.data_file + ('/ric/%s-%s/%s-%s.txt' % (start_time, stop_time, start_mz, stop_mz))) self.crl.setopt(pycurl.HTTPGET, True) self.crl.setopt(pycurl.URL, xic_url) response = cStringIO.StringIO() self.crl.setopt(pycurl.WRITEFUNCTION, response.write) for i in range(5): #print 'xic %d' % i self.crl.perform() if response.getvalue(): break scan = response.getvalue().splitlines() return [tuple(float(v) for v in s.split()) for s in scan]
def test_gzip(url): t = Test() c = pycurl.Curl() c.setopt(pycurl.WRITEFUNCTION,t.callback) c.setopt(pycurl.ENCODING, 'gzip') c.setopt(pycurl.URL,url) c.setopt(pycurl.USERAGENT,"User-Agent':'EMAO_OPS_MONITOR) Gecko/20091201 Firefox/3.5.6)") c.perform() TOTAL_TIME = c.getinfo(c.TOTAL_TIME) #print "????????%.2f ms" %(TOTAL_TIME*1000) return TOTAL_TIME * 1000
def searchIP(self, query, pages, queue, STOP_ME): if self.API_TOKEN == None: print "please config your API_TOKEN" sys.exit() for page in range(1, pages+1): b = StringIO.StringIO() c = pycurl.Curl() c.setopt(pycurl.URL, "%s?query=%s&page=%s" % (self.API_URL, query, page)) c.setopt(pycurl.WRITEFUNCTION, b.write) c.setopt(pycurl.FOLLOWLOCATION, 1) c.setopt(pycurl.CUSTOMREQUEST, "GET") c.setopt(pycurl.HTTPHEADER, ['Authorization: JWT %s' % self.API_TOKEN.encode()]) c.perform() hosts = json.loads(b.getvalue()) for host in hosts['matches']: queue.put(host["ip"]) STOP_ME[0] = True
def get (url, user_agent=UA, referrer=None): """Make a GET request of the url using pycurl and return the data (which is None if unsuccessful)""" data = None databuffer = StringIO() curl = pycurl.Curl() curl.setopt(pycurl.URL, url) curl.setopt(pycurl.FOLLOWLOCATION, 1) curl.setopt(pycurl.CONNECTTIMEOUT, 5) curl.setopt(pycurl.TIMEOUT, 8) curl.setopt(pycurl.WRITEFUNCTION, databuffer.write) curl.setopt(pycurl.COOKIEFILE, '') if user_agent: curl.setopt(pycurl.USERAGENT, user_agent) if referrer is not None: curl.setopt(pycurl.REFERER, referrer) try: curl.perform() data = databuffer.getvalue() except Exception: pass curl.close() return data
def request(self, endpoint, headers=None, post=None, first=True): buffer = BytesIO() ch = pycurl.Curl() ch.setopt(pycurl.URL, endpoint) ch.setopt(pycurl.USERAGENT, self.userAgent) ch.setopt(pycurl.WRITEFUNCTION, buffer.write) ch.setopt(pycurl.FOLLOWLOCATION, True) ch.setopt(pycurl.HEADER, True) if headers: ch.setopt(pycurl.HTTPHEADER, headers) ch.setopt(pycurl.VERBOSE, self.debug) ch.setopt(pycurl.SSL_VERIFYPEER, False) ch.setopt(pycurl.SSL_VERIFYHOST, False) ch.setopt(pycurl.COOKIEFILE, self.settingsPath + self.username + '-cookies.dat') ch.setopt(pycurl.COOKIEJAR, self.settingsPath + self.username + '-cookies.dat') if post: import urllib ch.setopt(pycurl.POST, len(post)) ch.setopt(pycurl.POSTFIELDS, urllib.urlencode(post)) ch.perform() resp = buffer.getvalue() header_len = ch.getinfo(pycurl.HEADER_SIZE) header = resp[0: header_len] body = resp[header_len:] ch.close() if self.debug: import urllib print("REQUEST: " + endpoint) if post is not None: if not isinstance(post, list): print('DATA: ' + urllib.unquote_plus(json.dumps(post))) print("RESPONSE: " + body + "\n") return [header, json_decode(body)]
def CurlGET(url, cookie): c = pycurl.Curl() b = StringIO.StringIO() c.setopt(pycurl.URL, url) # c.setopt(pycurl.TIMEOUT, 10) # c.setopt(pycurl.POST, 1) c.setopt(pycurl.WRITEFUNCTION, b.write) c.setopt(pycurl.COOKIEFILE, cookie) c.setopt(pycurl.COOKIEJAR, cookie) c.perform() html = b.getvalue() b.close() c.close() return html
def request(self, url, method, body, headers): c = pycurl.Curl() c.setopt(pycurl.URL, url) if 'proxy_host' in self.proxy: c.setopt(pycurl.PROXY, self.proxy['proxy_host']) if 'proxy_port' in self.proxy: c.setopt(pycurl.PROXYPORT, self.proxy['proxy_port']) if 'proxy_user' in self.proxy: c.setopt(pycurl.PROXYUSERPWD, "%(proxy_user)s:%(proxy_pass)s" % self.proxy) self.buf = StringIO() c.setopt(pycurl.WRITEFUNCTION, self.buf.write) #c.setopt(pycurl.READFUNCTION, self.read) #self.body = StringIO(body) #c.setopt(pycurl.HEADERFUNCTION, self.header) if self.cacert: c.setopt(c.CAINFO, self.cacert) c.setopt(pycurl.SSL_VERIFYPEER, self.cacert and 1 or 0) c.setopt(pycurl.SSL_VERIFYHOST, self.cacert and 2 or 0) c.setopt(pycurl.CONNECTTIMEOUT, self.timeout) c.setopt(pycurl.TIMEOUT, self.timeout) if method == 'POST': c.setopt(pycurl.POST, 1) c.setopt(pycurl.POSTFIELDS, body) if headers: hdrs = ['%s: %s' % (k, v) for k, v in headers.items()] log.debug(hdrs) c.setopt(pycurl.HTTPHEADER, hdrs) c.perform() c.close() return {}, self.buf.getvalue()
def __search_video_id(self, query): params = { "key": self.key, "part": "id,snippet", "order": "relevance", "type": "video", "videoSyndicated": "true", "maxResults": 3, "q": query } buf = BytesIO() client = pycurl.Curl() client.setopt(pycurl.URL, self.SERVICE_HOST + "/youtube/v3/search?" + urlencode(params)) client.setopt(pycurl.WRITEFUNCTION, buf.write) client.perform() client.close() body = json.loads(buf.getvalue().decode("utf-8")) buf.close() if "error" in body: raise Exception("query error: {0}".format(body)) if len(body["items"]) == 0: raise Exception("result not found") video_id = body["items"][0]["id"]["videoId"] return video_id
def __get_video_duration(self, video_id): params = { "key": self.key, "part": "contentDetails", "id": video_id } buf = BytesIO() client = pycurl.Curl() client.setopt(pycurl.URL, self.SERVICE_HOST + "/youtube/v3/videos?" + urlencode(params)) client.setopt(pycurl.WRITEFUNCTION, buf.write) client.perform() client.close() body = json.loads(buf.getvalue().decode("utf-8")) buf.close() if "error" in body: raise Exception("query error: {0}".format(body)) if len(body["items"]) == 0: raise Exception("result not found") duration = body["items"][0]["contentDetails"]["duration"] duration_seconds = isodate.parse_duration(duration).seconds return duration_seconds
def __search(self, keyword, search_type): offset = 0 result_key = search_type + "s" result = [] while True: self.logger.info("search " + keyword + ", offset = " + str(offset)) params = { "q": keyword, "type": search_type, "offset": offset, "limit": 50 } buf = BytesIO() client = pycurl.Curl() client.setopt(pycurl.URL, self.service_host + "/v1/search" + "?" + urlencode(params)) client.setopt(pycurl.WRITEFUNCTION, buf.write) client.perform() client.close() body = json.loads(buf.getvalue().decode("utf-8")) buf.close() if body[result_key]["total"] > 0: for data in body[result_key]["items"]: result.append(data) if body[result_key]["total"] > \ body[result_key]["limit"] * (offset + 1): offset = offset + 1 else: break self.logger.info("Done") return result
def get_tracks_by_album_id(self, album_id): self.logger.info("Get tracks of the album(" + album_id + ") from Spotify.") if not album_id: return None offset = 0 tracks = [] while True: params = { "offset": offset, "limit": 50 } buf = BytesIO() client = pycurl.Curl() client.setopt(pycurl.URL, self.service_host + "/v1/albums/" + album_id + "/tracks" + "?" + urlencode(params)) client.setopt(pycurl.WRITEFUNCTION, buf.write) client.perform() client.close() body = json.loads(buf.getvalue().decode("utf-8")) buf.close() if body["total"] > 0: for data in body["items"]: tracks.append( Track(data["name"], None, data["track_number"])) if body["total"] > body["limit"] * (offset + 1): offset = offset + 1 else: break return tracks
def process(self, prio, obj): self.pause.wait() c = obj.to_http_object(self.freelist.get()) if self._proxies: c = self._set_proxy(c, obj) c.response_queue = ((StringIO(), StringIO(), obj)) c.setopt(pycurl.WRITEFUNCTION, c.response_queue[0].write) c.setopt(pycurl.HEADERFUNCTION, c.response_queue[1].write) with self.mutex_multi: self.m.add_handle(c)
def head(self): conn=pycurl.Curl() conn.setopt(pycurl.SSL_VERIFYPEER,False) conn.setopt(pycurl.SSL_VERIFYHOST,0) conn.setopt(pycurl.URL,self.completeUrl) conn.setopt(pycurl.NOBODY, True) # para hacer un pedido HEAD conn.setopt(pycurl.WRITEFUNCTION, self.header_callback) conn.perform() rp=Response() rp.parseResponse(self.__performHead) self.response=rp
def request(self, url, method, body, headers): c = pycurl.Curl() c.setopt(pycurl.URL, url) if 'proxy_host' in self.proxy: c.setopt(pycurl.PROXY, self.proxy['proxy_host']) if 'proxy_port' in self.proxy: c.setopt(pycurl.PROXYPORT, self.proxy['proxy_port']) if 'proxy_user' in self.proxy: c.setopt(pycurl.PROXYUSERPWD, "%(proxy_user)s:%(proxy_pass)s" % self.proxy) self.buf = StringIO() c.setopt(pycurl.WRITEFUNCTION, self.buf.write) #c.setopt(pycurl.READFUNCTION, self.read) #self.body = StringIO(body) #c.setopt(pycurl.HEADERFUNCTION, self.header) if self.cacert: c.setopt(c.CAINFO, self.cacert) c.setopt(pycurl.SSL_VERIFYPEER, self.cacert and 1 or 0) c.setopt(pycurl.SSL_VERIFYHOST, self.cacert and 2 or 0) c.setopt(pycurl.CONNECTTIMEOUT, self.timeout / 6) c.setopt(pycurl.TIMEOUT, self.timeout) if method == 'POST': c.setopt(pycurl.POST, 1) c.setopt(pycurl.POSTFIELDS, body) if headers: hdrs = ['%s: %s' % (k, v) for k, v in headers.items()] log.debug(hdrs) c.setopt(pycurl.HTTPHEADER, hdrs) c.perform() c.close() return {}, self.buf.getvalue()
def perform(self): if self.error: raise self.error if self.performed: raise AssertionError("Can't perform twice") self.options[pycurl.WRITEFUNCTION](self.result) self.performed = True
def test_basic(self): curl = CurlStub(b"result") result = fetch("http://example.com", curl=curl) self.assertEqual(result, b"result") self.assertEqual(curl.options, {pycurl.URL: b"http://example.com", pycurl.FOLLOWLOCATION: 1, pycurl.MAXREDIRS: 5, pycurl.CONNECTTIMEOUT: 30, pycurl.LOW_SPEED_LIMIT: 1, pycurl.LOW_SPEED_TIME: 600, pycurl.NOSIGNAL: 1, pycurl.WRITEFUNCTION: Any(), pycurl.DNS_CACHE_TIMEOUT: 0, pycurl.ENCODING: b"gzip,deflate"})
def test_create_curl(self): curls = [] def pycurl_Curl(): curl = CurlStub(b"result") curls.append(curl) return curl Curl = pycurl.Curl try: pycurl.Curl = pycurl_Curl result = fetch("http://example.com") curl = curls[0] self.assertEqual(result, b"result") self.assertEqual(curl.options, {pycurl.URL: b"http://example.com", pycurl.FOLLOWLOCATION: 1, pycurl.MAXREDIRS: 5, pycurl.CONNECTTIMEOUT: 30, pycurl.LOW_SPEED_LIMIT: 1, pycurl.LOW_SPEED_TIME: 600, pycurl.NOSIGNAL: 1, pycurl.WRITEFUNCTION: Any(), pycurl.DNS_CACHE_TIMEOUT: 0, pycurl.ENCODING: b"gzip,deflate"}) finally: pycurl.Curl = Curl
def __init__(self, cookies=None, options=None): self.c = pycurl.Curl() self.rep = StringIO() self.cj = cookies #cookiejar self.lastURL = None self.lastEffectiveURL = None self.abort = False self.code = 0 # last http code self.header = "" self.headers = [] #temporary request header self.initHandle() self.setInterface(options) self.c.setopt(pycurl.WRITEFUNCTION, self.write) self.c.setopt(pycurl.HEADERFUNCTION, self.writeHeader) self.log = getLogger("log")
def cleanup(self): self.m_data = io.BytesIO() self.m_headers = {} self.m_response = TCPResponse() self.m_handle.setopt(pycurl.WRITEFUNCTION, self.m_data.write)
def handle_request(self): curl_handle = pycurl.Curl() # set default options. curl_handle.setopt(pycurl.URL, self.request_url) curl_handle.setopt(pycurl.REFERER, self.request_url) curl_handle.setopt(pycurl.USERAGENT, self.useragent) curl_handle.setopt(pycurl.TIMEOUT, self.curlopts['TIMEOUT']) curl_handle.setopt(pycurl.CONNECTTIMEOUT, self.curlopts['CONNECTTIMEOUT']) curl_handle.setopt(pycurl.HEADER, True) #curl_handle.setopt(pycurl.VERBOSE, 1) curl_handle.setopt(pycurl.FOLLOWLOCATION, 1) curl_handle.setopt(pycurl.MAXREDIRS, 5) if(self.request_headers and len(self.request_headers) > 0): tmplist = list() for(key, value) in self.request_headers.items(): tmplist.append(key + ':' + value) curl_handle.setopt(pycurl.HTTPHEADER, tmplist) #??????POST curl_handle.setopt(pycurl.HTTPPROXYTUNNEL, 1) curl_handle.setopt(pycurl.POSTFIELDS, self.request_body) response = StringIO.StringIO() curl_handle.setopt(pycurl.WRITEFUNCTION, response.write) try: curl_handle.perform() except pycurl.error as error: raise ChannelException(error, 5) self.response_code = curl_handle.getinfo(curl_handle.HTTP_CODE) header_size = curl_handle.getinfo(curl_handle.HEADER_SIZE) resp_str = response.getvalue() self.response_headers = resp_str[0 : header_size] self.response_body = resp_str[header_size : ] response.close() curl_handle.close()
def _login(self): try: c = pycurl.Curl() c.setopt(pycurl.CAINFO, certifi.where()) c.setopt(pycurl.URL, self.url) b = StringIO.StringIO() c.setopt(pycurl.USERAGENT, "Mozilla/4.0 (compatible; MSIE 6.0; Windows NT 5.1; SV1; .NET CLR 1.1.4322)") c.setopt(pycurl.WRITEFUNCTION, b.write) c.setopt(pycurl.FOLLOWLOCATION, 1) c.setopt(pycurl.MAXREDIRS, 5) c.setopt(pycurl.CUSTOMREQUEST, "POST") c.setopt(pycurl.POSTFIELDS, self.post_data) c.perform() if b.getvalue(): logging.info('success login') # For INFO level self.API_TOKEN = json.loads(b.getvalue())["access_token"] self.save_token() else: logging.warning('success fail,get null result') #2 For WARNING level logging.debug(self.API_TOKEN) b.close() c.close() except pycurl.E_HTTP_POST_ERROR: logging.error(str(pycurl.E_HTTP_POST_ERROR)) except Exception as e: logging.error('please check your password or username') logging.error(e.message) #3 For ERROR level pass
def getPage (self, url, requestHeader = []) : resultFormate = StringIO.StringIO() fakeIp = self.fakeIp() requestHeader.append('CLIENT-IP:' + fakeIp) requestHeader.append('X-FORWARDED-FOR:' + fakeIp) try: curl = pycurl.Curl() curl.setopt(pycurl.URL, url.strip()) curl.setopt(pycurl.ENCODING, 'gzip,deflate') curl.setopt(pycurl.HEADER, 1) curl.setopt(pycurl.TIMEOUT, 120) curl.setopt(pycurl.SSL_VERIFYPEER, 0) curl.setopt(pycurl.SSL_VERIFYHOST, 0) curl.setopt(pycurl.HTTPHEADER, requestHeader) curl.setopt(pycurl.WRITEFUNCTION, resultFormate.write) curl.perform() headerSize = curl.getinfo(pycurl.HEADER_SIZE) curl.close() header = resultFormate.getvalue()[0 : headerSize].split('\r\n') body = resultFormate.getvalue()[headerSize : ] except Exception, e: header = '' body = '' return header, body
def setUrl(self, url): self.b = StringIO.StringIO() self.c.setopt(pycurl.WRITEFUNCTION, self.b.write) self.c.setopt(pycurl.URL, url)
def curlRequest(self, url, headers = False, post = False, returnHeaders=True): ch = pycurl.Curl() ch.setopt(pycurl.URL, url) hdrs = [ "Host: poloniex.com", "Connection: close", "User-Agent: Mozilla/5.0 (CLI; Linux x86_64) polproxy", "accept: application/json" ] if post != False: ch.setopt(pycurl.POSTFIELDS, post) hdrs = hdrs + ["content-type: application/x-www-form-urlencoded", "content-length: " + str(len(post))] if headers != False: hdrs = hdrs + headers ch.setopt(pycurl.HTTPHEADER, hdrs) ch.setopt(pycurl.SSL_VERIFYHOST, 0) ch.setopt(pycurl.FOLLOWLOCATION, True) ch.setopt(pycurl.CONNECTTIMEOUT, 5) ch.setopt(pycurl.TIMEOUT, 5) ret = BytesIO() if returnHeaders: ch.setopt(pycurl.HEADERFUNCTION, ret.write) ch.setopt(pycurl.WRITEFUNCTION, ret.write) try: ch.perform() except: return "" ch.close() return ret.getvalue().decode("ISO-8859-1")
def getpage(url): c=pycurl.Curl() print url c.setopt(pycurl.URL,url) b=StringIO.StringIO() print '1' c.setopt(pycurl.WRITEFUNCTION,b.write) print '2' c.perform() print '-----' # f=open('%s/posts.txt' % setting.CURRENT_PATH,'wb') # f.write(b.getvalue()) # f.close() return b.getvalue()