我们从Python开源项目中,提取了以下48个代码示例,用于说明如何使用pycurl.Curl()。
def api_login(): global token url = settings.MASTER_API_URL_LOGIN ch = pycurl.Curl() #????pycurl????? ch.setopt(ch.URL, url) #??????url info = StringIO.StringIO() ch.setopt(ch.WRITEFUNCTION, info.write) ch.setopt(ch.POST, True) #???https??????? ch.setopt(ch.SSL_VERIFYPEER, 0) ch.setopt(ch.SSL_VERIFYHOST, 2) ch.setopt(ch.HTTPHEADER, ['Accept: application/x-yaml']) ch.setopt(ch.POSTFIELDS, 'username=%s&password=%s&eauth=pam' %(settings.SALT_API_AUTH_USER, settings.SALT_API_AUTH_PASS)) #????? #ch.setopt(ch.HEADER, True) #?????? ch.setopt(ch.HEADER,False) ch.perform() html = info.getvalue() #??token token = html.split("\n")[-3].replace("\n", '') token = token.split(' ')[3] info.close() ch.close()
def api_key(fun='key.list_all', match='', arg_num=0): api_login() url = settings.MASTER_API_URL ch = pycurl.Curl() ch.setopt(ch.URL, url) info = StringIO.StringIO() ch.setopt(ch.WRITEFUNCTION, info.write) ch.setopt(ch.POST, True) ch.setopt(ch.SSL_VERIFYPEER, 0) ch.setopt(ch.SSL_VERIFYHOST, 2) ch.setopt(ch.HTTPHEADER, ['Accept: application/json', "X-Auth-Token: %s" %(token)]) if arg_num == 0: ch.setopt(ch.POSTFIELDS, "client=wheel&fun=%s" %(fun)) elif arg_num == 1: ch.setopt(ch.POSTFIELDS, "client=wheel&fun=%s&match=%s" %(fun, match)) else: pass ch.setopt(ch.HEADER,False) ch.perform() html = info.getvalue() info.close() ch.close() return json.loads(html)
def put (url, data, headers={}): """Make a PUT request to the url, using data in the message body, with the additional headers, if any""" reply = -1 # default, non-http response curl = pycurl.Curl() curl.setopt(pycurl.URL, url) if len(headers) > 0: curl.setopt(pycurl.HTTPHEADER, [k+': '+v for k,v in headers.items()]) curl.setopt(pycurl.PUT, 1) curl.setopt(pycurl.INFILESIZE, len(data)) databuffer = StringIO(data) curl.setopt(pycurl.READFUNCTION, databuffer.read) try: curl.perform() reply = curl.getinfo(pycurl.HTTP_CODE) except Exception: pass curl.close() return reply
def request(self, endpoint, post=None): buffer = BytesIO() ch = pycurl.Curl() ch.setopt(pycurl.URL, Constants.API_URL + endpoint) ch.setopt(pycurl.USERAGENT, self.userAgent) ch.setopt(pycurl.WRITEFUNCTION, buffer.write) ch.setopt(pycurl.FOLLOWLOCATION, True) ch.setopt(pycurl.HEADER, True) ch.setopt(pycurl.VERBOSE, False) ch.setopt(pycurl.COOKIEFILE, os.path.join(self.IGDataPath, self.username, self.username + "-cookies.dat")) ch.setopt(pycurl.COOKIEJAR, os.path.join(self.IGDataPath, self.username, self.username + "-cookies.dat")) if post is not None: ch.setopt(pycurl.POST, True) ch.setopt(pycurl.POSTFIELDS, post) if self.proxy: ch.setopt(pycurl.PROXY, self.proxyHost) if self.proxyAuth: ch.setopt(pycurl.PROXYUSERPWD, self.proxyAuth) ch.perform() resp = buffer.getvalue() header_len = ch.getinfo(pycurl.HEADER_SIZE) header = resp[0: header_len] body = resp[header_len:] ch.close() if self.debug: print("REQUEST: " + endpoint) if post is not None: if not isinstance(post, list): print("DATA: " + str(post)) print("RESPONSE: " + body) return [header, json_decode(body)]
def CurlPOST(url, data, cookie): c = pycurl.Curl() b = StringIO.StringIO() c.setopt(pycurl.URL, url) c.setopt(pycurl.POST, 1) c.setopt(pycurl.HTTPHEADER,['Content-Type: application/json']) # c.setopt(pycurl.TIMEOUT, 10) c.setopt(pycurl.WRITEFUNCTION, b.write) c.setopt(pycurl.COOKIEFILE, cookie) c.setopt(pycurl.COOKIEJAR, cookie) c.setopt(pycurl.POSTFIELDS, data) c.perform() html = b.getvalue() b.close() c.close() return html
def get_connection(): # pycurl initialization h = pycurl.Curl() # follow redirects h.setopt(pycurl.FOLLOWLOCATION, False) # enable compression h.setopt(pycurl.ENCODING, 'gzip, deflate') # certifi h.setopt(pycurl.CAINFO, certifi.where()) # no signal h.setopt(pycurl.NOSIGNAL, 1) # certificate informations h.setopt(pycurl.OPT_CERTINFO, 1) return h
def ipv6_asn(ifname): try: c = pycurl.Curl() body = BytesIO() c.setopt(c.URL, "https://stat.ripe.net/data/prefix-overview/data.json?resource={}" .format(ipv6_address_public(ifname))) c.setopt(c.INTERFACE, ifname) c.setopt(c.WRITEDATA, body) c.perform() asns = json.loads((body.getvalue()).decode('utf-8'))['data']['asns'] if len(asns) == 1: return asns[0]['asn'] else: return None except pycurl.error: return None
def __authorize(self): headers = ["Authorization: Basic " + self.user_info_base64] post_field = urlencode(self.parameter) buf = BytesIO() client = pycurl.Curl() client.setopt(client.URL, self.spotify_authorize_url) client.setopt(client.HTTPHEADER, headers) client.setopt(client.POSTFIELDS, post_field) client.setopt(client.WRITEFUNCTION, buf.write) client.setopt(pycurl.SSL_VERIFYPEER, 0) client.perform() client.close() self.response = json.loads(buf.getvalue().decode()) buf.close() self.access_token = self.response.get("access_token")
def perform(self): self.__performHead="" self.__performBody="" self.__headersSent="" try: conn = Request.to_pycurl_object(pycurl.Curl(), self) conn.perform() self.response_from_conn_object(conn, self.__performHead, self.__performBody) except pycurl.error, error: errno, errstr = error raise ReqRespException(ReqRespException.FATAL, errstr) finally: conn.close() ######### ESTE conjunto de funciones no es necesario para el uso habitual de la clase
def _curl(self, payload, computer_id, exchange_token, message_api): # There are a few "if _PY3" checks below, because for Python 3 we # want to convert a number of values from bytes to string, before # assigning them to the headers. if _PY3 and isinstance(message_api, bytes): message_api = message_api.decode("ascii") headers = {"X-Message-API": message_api, "User-Agent": "landscape-client/%s" % VERSION, "Content-Type": "application/octet-stream"} if computer_id: if _PY3 and isinstance(computer_id, bytes): computer_id = computer_id.decode("ascii") headers["X-Computer-ID"] = computer_id if exchange_token: if _PY3 and isinstance(exchange_token, bytes): exchange_token = exchange_token.decode("ascii") headers["X-Exchange-Token"] = str(exchange_token) curl = pycurl.Curl() return (curl, fetch(self._url, post=True, data=payload, headers=headers, cainfo=self._pubkey, curl=curl))
def get_user_init(self): ''' Description: (re)initialize the nonce of current session and fetch the session ID Return: { "success": true, "jsessionid": "3185591CD191F18D1551440AE1BEF86A-n1.frontend3", "nonce": "GTPSLZUcDyjEBqeL" } ''' uri = "user/init" api_url = self.url + uri c = pycurl.Curl() output_init = BytesIO() c.setopt(c.URL, api_url) ### Create the cookie File c.setopt(pycurl.COOKIEJAR, 'cookie.txt') c.setopt(c.WRITEFUNCTION, output_init.write) c.perform() return json.loads(output_init.getvalue())
def get_user_login(self, token): ''' Description: login ''' self.token = token uri = "user/login?username="+self.login+"&token="+self.token api_url = self.url + uri c = pycurl.Curl() output = BytesIO() c.setopt(c.URL, api_url) ### Read the cookie File c.setopt(pycurl.COOKIEFILE, 'cookie.txt') c.setopt(c.WRITEFUNCTION, output.write) c.perform() return json.loads(output.getvalue())
def get_all_virtual_endpoints(self): ''' Description: get all virtual endpoints ''' uri = "virtualEndpoints" api_url = self.url + uri c = pycurl.Curl() output_init = BytesIO() c.setopt(c.URL, api_url) ### Create the cookie File c.setopt(pycurl.COOKIEFILE, 'cookie.txt') c.setopt(c.WRITEFUNCTION, output_init.write) c.perform() return json.loads(output_init.getvalue())
def create_virtual_endpoints(self, data, category): ''' Description: create a virtual endpoints category: SENSOR METER GAUGE ONOFF LEVEL_CONTROL ''' self.data = data self.category = category uri = "virtualEndpoints/?category=" + self.category api_url = self.url + uri c = pycurl.Curl() c.setopt(pycurl.URL, api_url) c.setopt(pycurl.HTTPHEADER, ['Accept: application/json','Content-Type: application/json','charset=UTF-8']) c.setopt(pycurl.COOKIEFILE, 'cookie.txt') c.setopt(pycurl.POST, 1) c.setopt(pycurl.POSTFIELDS, self.data) c.setopt(pycurl.VERBOSE, 1) c.perform()
def get_virtual_endpoints_config(self, uuid): ''' Description: get virtual endpoints config ''' self.uuid = uuid uri = "virtualEndpoints/"+self.uuid+"/config" api_url = self.url + uri c = pycurl.Curl() output_init = BytesIO() c.setopt(c.URL, api_url) ### Create the cookie File c.setopt(pycurl.COOKIEFILE, 'cookie.txt') c.setopt(c.WRITEFUNCTION, output_init.write) c.perform() return json.loads(output_init.getvalue())
def create_rooms(self, data): ''' Description: create a room ''' self.data = data uri = "rooms/" api_url = self.url + uri c = pycurl.Curl() c.setopt(pycurl.URL, api_url) c.setopt(pycurl.HTTPHEADER, ['Accept: application/json','Content-Type: application/json','charset=UTF-8']) c.setopt(pycurl.COOKIEFILE, 'cookie.txt') c.setopt(pycurl.POST, 1) c.setopt(pycurl.POSTFIELDS, self.data) c.setopt(pycurl.VERBOSE, 1) c.perform()
def put_attributes_config(self, data, uuid): ''' Description: modify an attribute ''' self.data = data self.uuid = uuid uri = "attributes/" + self.uuid + "/config" api_url = self.url + uri c = pycurl.Curl() c.setopt(pycurl.URL, api_url) c.setopt(pycurl.HTTPHEADER, ['Accept: application/json','Content-Type: application/json','charset=UTF-8']) c.setopt(pycurl.COOKIEFILE, 'cookie.txt') c.setopt(pycurl.POST, 1) c.setopt(pycurl.POSTFIELDS, self.data) c.setopt(pycurl.VERBOSE, 1) c.perform()
def save_and_synchronize(self, wait="false", timeout=30): ''' Description: synchronize Zipato with the Server ''' self.wait = wait self.timeout = timeout uri = "box/saveAndSynchronize?wait=" + self.wait + "&timeout=" + str(self.timeout) api_url = self.url + uri c = pycurl.Curl() output_init = BytesIO() c.setopt(c.URL, api_url) ### Create the cookie File c.setopt(pycurl.COOKIEFILE, 'cookie.txt') c.setopt(c.WRITEFUNCTION, output_init.write) c.perform() c.close() return json.loads(output_init.getvalue())
def synchronize(self, ifneeded="false", wait="false", timeout=30): ''' Description: synchronize Zipato with the Server ''' self.ifneeded = ifneeded self.wait = wait self.timeout = timeout uri = "box/synchronize?ifNeeded=" + self.ifneeded + "wait=" + self.wait + "&timeout=" + str(self.timeout) api_url = self.url + uri c = pycurl.Curl() output_init = BytesIO() c.setopt(c.URL, api_url) ### Create the cookie File c.setopt(pycurl.COOKIEFILE, 'cookie.txt') c.setopt(c.WRITEFUNCTION, output_init.write) c.perform() c.close() return json.loads(output_init.getvalue())
def get_orcid_token(): #set request variables client_id = config.orcid_client_id client_secret = config.orcid_client_secret token_endpoint = config.token_endpoint data = BytesIO() #create post data post_data = {'client_id': client_id, 'client_secret': client_secret, 'scope': '/read-public', 'grant_type': 'client_credentials'} #url encode post data postfields = urllib.urlencode(post_data) #create and send http request c = pycurl.Curl() c.setopt(c.URL, token_endpoint) c.setopt(c.HTTPHEADER, ['Accept: application/json']) c.setopt(c.POSTFIELDS, postfields) c.setopt(c.WRITEFUNCTION, data.write) c.perform() c.close() #get request response json_object = json.loads(data.getvalue()) token = json_object['access_token'] return token
def get_request(url, outfpath=None): global PYCURL if PYCURL: # outfpath must be set import pycurl from io import BytesIO buffer = BytesIO() c = pycurl.Curl() c.setopt(c.URL, url) c.setopt(c.WRITEDATA, buffer) c.setopt(c.COOKIEJAR, '/tmp/cookie.jar') c.setopt(c.NETRC, True) c.setopt(c.FOLLOWLOCATION, True) #c.setopt(c.REMOTE_NAME, outfpath) c.perform() c.close() return buffer.getvalue() resp = requests.get(url) return resp.text
def __init__(self, p_request, p_timeoutMs = 1000, p_curlOpts=None): if isinstance(p_request, str): p_request = HTTPRequest(p_url=p_request) self.m_request = p_request self.m_timeoutMs = p_timeoutMs self.m_response = None self.m_handle = None self.m_data = None self.m_headers = None self.m_handle = pycurl.Curl() self.m_opts = p_curlOpts if p_curlOpts is None: self.m_opts = {} self.cleanup() self._init_opt() self._init_url() self._init_method() self._init_headers() self.m_handle.setopt(pycurl.USERAGENT, self.m_request.m_agent) self.m_handle.setopt(pycurl.HEADERFUNCTION, self._read_header) if self.m_timeoutMs: self.m_handle.setopt(pycurl.TIMEOUT_MS, self.m_timeoutMs) self.m_handle.setopt(pycurl.FOLLOWLOCATION, True)
def use_cloud(token): fp = wave.open('output.wav','r') nf = fp.getnframes() f_len = nf * 2 audio_data = fp.readframes(nf) cuid = "123456" #my xiaomi phone MAC srv_url = 'http://vop.baidu.com/server_api' + '?cuid=' + cuid + '&token=' + token http_header = [ 'Content-Type: audio/pcm; rate=8000', 'Content-Length: %d' % f_len ] print srv_url c = pycurl.Curl() c.setopt(pycurl.URL, str(srv_url)) #curl doesn't support unicode c.setopt(c.HTTPHEADER, http_header) #must be list, not dict c.setopt(c.POST, 1) c.setopt(c.CONNECTTIMEOUT, 30) c.setopt(c.TIMEOUT, 30) c.setopt(c.WRITEFUNCTION, dump_res) c.setopt(c.POSTFIELDS, audio_data) c.setopt(c.POSTFIELDSIZE, f_len) c.perform()
def use_cloud(token): fp = wave.open('output.wav', 'rb') nf = fp.getnframes() f_len = nf * 2 audio_data = fp.readframes(nf) cuid = "xxxxxxxxxx" #my xiaomi phone MAC srv_url = 'http://vop.baidu.com/server_api' + '?cuid=' + cuid + '&token=' + token http_header = [ 'Content-Type: audio/pcm; rate=8000', 'Content-Length: %d' % f_len ] print srv_url c = pycurl.Curl() c.setopt(pycurl.URL, str(srv_url)) #curl doesn't support unicode #c.setopt(c.RETURNTRANSFER, 1) c.setopt(c.HTTPHEADER, http_header) #must be list, not dict c.setopt(c.POST, 1) c.setopt(c.CONNECTTIMEOUT, 30) c.setopt(c.TIMEOUT, 30) c.setopt(c.WRITEFUNCTION, dump_res) c.setopt(c.POSTFIELDS, audio_data) c.setopt(c.POSTFIELDSIZE, f_len) c.perform() #pycurl.perform() has no return val
def use_cloud(token): fp = wave.open('output.wav','r') nf = fp.getnframes() f_len = nf * 2 audio_data = fp.readframes(nf) cuid = "123456" #my xiaomi phone MAC srv_url = 'http://vop.baidu.com/server_api' + '?cuid=' + cuid + '&token=' + token http_header = [ 'Content-Type: audio/pcm; rate=8000', 'Content-Length: %d' % f_len ] c = pycurl.Curl() c.setopt(pycurl.URL, str(srv_url)) #curl doesn't support unicode c.setopt(c.HTTPHEADER, http_header) #must be list, not dict c.setopt(c.POST, 1) c.setopt(c.CONNECTTIMEOUT, 30) c.setopt(c.TIMEOUT, 30) c.setopt(c.WRITEFUNCTION, dump_res) c.setopt(c.POSTFIELDS, audio_data) c.setopt(c.POSTFIELDSIZE, f_len) c.perform()
def _get_url(self, url): if self.API_TOKEN == None: logging.error('none token') # 3 For ERROR level return try: c = pycurl.Curl() c.setopt(pycurl.CAINFO, certifi.where()) c.setopt(pycurl.URL, url) b = StringIO.StringIO() c.setopt(pycurl.WRITEFUNCTION, b.write) c.setopt(pycurl.USERAGENT, "Mozilla/4.0 (compatible; MSIE 6.0; Windows NT 5.1; SV1; .NET CLR 1.1.4322)") c.setopt(pycurl.HTTPHEADER, ['Authorization: JWT %s' % self.API_TOKEN.encode()]) c.setopt(pycurl.CUSTOMREQUEST, "GET") c.setopt(pycurl.FOLLOWLOCATION, 1) c.perform() result = b.getvalue() logging.debug('result') except Exception as e: logging.error(e.message) logging.error('go error') pass return result
def use_cloud(token): fp=wave.open(u'01.wav','rb') nf=fp.getnframes() print 'sampwidth:',fp.getnframes() print 'framerate:',fp.getframerate() print 'channels:',fp.getnchannels() f_len=nf*2 audio_data=fp.readframes(nf) cuid="10:2A:B3:58:28:88" #my redmi phone MAC srv_url='http://vop.baidu.com/server_api'+'?cuid='+cuid+'&token='+token http_header=[ 'Content-Type:audio/pcm; rate=8000', 'Content-length:%d' % f_len ] c=pycurl.Curl() c.setopt(pycurl.URL,str(srv_url)) c.setopt(c.HTTPHEADER,http_header) c.setopt(c.CONNECTTIMEOUT,80) c.setopt(c.TIMEOUT,80) c.setopt(c.WRITEFUNCTION,dump_res) c.setopt(c.POSTFIELDS,audio_data) c.setopt(c.POSTFIELDSIZE,f_len) c.perform() #pycurl.perform() has no return val
def voice_to_text(self, voice_file, callback): # print 'xdc::::::::voice:::::', voice_file, callback fp = wave.open(voice_file, 'rb') nf = fp.getnframes() f_len = nf * 2 audio_data = fp.readframes(nf) cuid = "xxxxxxxxxx" #my xiaomi phone MAC srv_url = 'http://vop.baidu.com/server_api' + '?cuid=' + cuid + '&token=' + self.get_token() http_header = [ 'Content-Type: audio/pcm; rate=8000', 'Content-Length: %d' % f_len ] c = pycurl.Curl() c.setopt(pycurl.URL, str(srv_url)) #curl doesn't support unicode #c.setopt(c.RETURNTRANSFER, 1) c.setopt(c.HTTPHEADER, http_header) #must be list, not dict c.setopt(c.POST, 1) c.setopt(c.CONNECTTIMEOUT, 30) c.setopt(c.TIMEOUT, 30) c.setopt(c.WRITEFUNCTION, callback) c.setopt(c.POSTFIELDS, audio_data) c.setopt(c.POSTFIELDSIZE, f_len) c.perform() #pycurl.perform() has no return val
def __init__(self, uploader): self.handle = pycurl.Curl() self.response_headers = {} self.output = six.StringIO() self.status_code = None self.handle.setopt(pycurl.CAINFO, certifi.where()) self.handle.setopt(pycurl.URL, uploader.url) self.handle.setopt(pycurl.HEADERFUNCTION, self._prepare_response_header) self.handle.setopt(pycurl.UPLOAD, 1) self.handle.setopt(pycurl.CUSTOMREQUEST, 'PATCH') self.file = uploader.get_file_stream() self.file.seek(uploader.offset) self.handle.setopt(pycurl.READFUNCTION, self.file.read) self.handle.setopt(pycurl.WRITEFUNCTION, self.output.write) self.handle.setopt(pycurl.INFILESIZE, uploader.request_length) headers = ["upload-offset: {}".format(uploader.offset), "Content-Type: application/offset+octet-stream"] + uploader.headers_as_list self.handle.setopt(pycurl.HTTPHEADER, headers)
def transfer(ipaddr, username, password, commandfile): #transfers commandfile to camera storage = StringIO() c = pycurl.Curl() c.setopt(c.URL, 'http://' + ipaddr + '/admin/remoteconfig') c.setopt(c.POST, 1) c.setopt(c.CONNECTTIMEOUT, 5) c.setopt(c.TIMEOUT, TIMEOUT) filesize = os.path.getsize(commandfile) f = open(commandfile, 'rb') c.setopt(c.FAILONERROR, True) c.setopt(pycurl.POSTFIELDSIZE, filesize) c.setopt(pycurl.READFUNCTION, FileReader(f).read_callback) c.setopt(c.WRITEFUNCTION, storage.write) c.setopt(pycurl.HTTPHEADER, ["application/x-www-form-urlencoded"]) c.setopt(c.VERBOSE, VERBOSE) c.setopt(pycurl.HTTPAUTH, pycurl.HTTPAUTH_BASIC) c.setopt(pycurl.USERPWD, username + ':' + password) try: c.perform() except pycurl.error, error: errno, errstr = error print 'An error occurred: ', errstr return False, '' c.close() content = storage.getvalue() f.close() return True, content # *************************************************************** # *** Main program *** # ***************************************************************
def get_page_data(url, head = None, curl = None): stream_buffer = StringIO() if not curl: curl = pycurl.Curl() curl.setopt(pycurl.URL, url)#curl doesn't support unicode if head: curl.setopt(pycurl.HTTPHEADER, head)#must be list, not dict curl.setopt(pycurl.WRITEFUNCTION, stream_buffer.write) curl.setopt(pycurl.CUSTOMREQUEST,"GET") curl.setopt(pycurl.CONNECTTIMEOUT, 30) curl.setopt(pycurl.TIMEOUT, 30) curl.setopt(pycurl.SSL_VERIFYPEER, 0) curl.setopt(pycurl.SSL_VERIFYHOST, 0) curl.perform() page_data =stream_buffer.getvalue() stream_buffer.close() return page_data
def post_page_data(url, data = None, head = None, curl = None): stream_buffer = StringIO() if not curl: curl = pycurl.Curl() curl.setopt(pycurl.URL, url)#curl doesn't support unicode if head: curl.setopt(pycurl.HTTPHEADER, head)#must be list, not dict curl.setopt(pycurl.POSTFIELDS, data) curl.setopt(pycurl.WRITEFUNCTION, stream_buffer.write) curl.setopt(pycurl.CUSTOMREQUEST,"POST") # curl.setopt(pycurl.CONNECTTIMEOUT, 30) # curl.setopt(pycurl.TIMEOUT, 30) curl.perform() page_data = stream_buffer.getvalue() stream_buffer.close() return page_data
def getlat4city(): avglat = -1 sp_url = "http://www.super-ping.com/ping.php?node=" + CITY + "&ping=" + WAN_IP sp_refer_url = "http://www.super-ping.com/?ping=" + WAN_IP + "&locale=en" sp_http_headers = [ 'Referer: ' + sp_refer_url, 'X-Requested-With: XMLHttpRequest'] crl = pyc.Curl() sio = StringIO() crl.setopt(pyc.URL, sp_url) crl.setopt(pyc.HTTPHEADER, sp_http_headers) crl.setopt(pyc.WRITEFUNCTION, sio.write) crl.perform() crl.close() lat_http_result = sio.getvalue() #process http result only if html if lat_http_result.strip() != "-" and lat_http_result.strip() != "super-ping.com": fstring="ping-avg'>" lstring="</div>" start = lat_http_result.index(fstring) + len(fstring) end = lat_http_result.index(lstring,start) avglat = lat_http_result[start:end] return float(avglat)
def getWant(line):#?????? keyword = keyword_list[line] url = url_list[line] try: c = pycurl.Curl() c.setopt(c.URL,url) c.setopt(c.CONNECTTIMEOUT, 60) c.setopt(c.TIMEOUT,120) b = StringIO.StringIO() c.setopt(c.WRITEFUNCTION,b.write) c.perform() html = b.getvalue() mutex.acquire() global match global all global percentage if(getIfmatch(html)): match += 1 else: pass all += 1 print 'all: '+str(all)+' match: '+str(match)+', percentage '+'%.1f'%((float(match)/all)*100)+'%' mutex.release() except: print '%s Empty reply from server' %keyword
def Curl(url,headers): while 1: try: c = pycurl.Curl() c.setopt(pycurl.REFERER, 'http://weixin.sogou.com/') c.setopt(pycurl.FOLLOWLOCATION, True) c.setopt(pycurl.MAXREDIRS,5) c.setopt(pycurl.CONNECTTIMEOUT, 60) c.setopt(pycurl.TIMEOUT,120) c.setopt(pycurl.ENCODING, 'gzip,deflate') c.fp = StringIO.StringIO() c.setopt(pycurl.URL, url) c.setopt(pycurl.HTTPHEADER,headers) c.setopt(c.WRITEFUNCTION, c.fp.write) c.perform() html = c.fp.getvalue() if '??????' in html: print u'??????,??10??' time.sleep(600) else: return html except Exception, e: print url,'curl(url)',e continue #????????
def getArticleInfo(url): headers = [ "User-Agent:Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/52.0.2743.116 Safari/537.36", "Cookie:RK=EPnrvyKs33; pgv_pvi=9085208576; pac_uid=1_2273358437; noticeLoginFlag=1; ts_uid=864776808; ptcz=5eb5cdef0bba881b5fdad31c18353d40fd0242f5a873f155197343eaec8b3730; pt2gguin=o2273358437; o_cookie=2273358437; tvfe_boss_uuid=1a889f93cefc98b9; pgv_pvid=2616135824", ] html = Curl(url,headers) title = search('<title>(.*?)</title>',html).decode('utf-8','ignore') datetime = search('<em id="post-date" class="rich_media_meta rich_media_meta_text">(.*?)</em>',html).decode('utf-8','ignore') num_url= url.replace('http://mp.weixin.qq.com/s','http://mp.weixin.qq.com/mp/getcomment') +'&&uin=&key=&pass_ticket=&wxtoken=&devicetype=&clientversion=0&x5=0'#?????/????? num_html = Curl(num_url,headers) dict_weixin = eval(num_html) read_num = dict_weixin['read_num'] like_num = dict_weixin['like_num'] return title,read_num,like_num,datetime #?????
def curl(url, debug=False, **kwargs): while 1: try: s = StringIO.StringIO() c = pycurl.Curl() c.setopt(pycurl.URL, url) c.setopt(pycurl.REFERER, url) c.setopt(pycurl.FOLLOWLOCATION, True) c.setopt(pycurl.TIMEOUT, 60) c.setopt(pycurl.ENCODING, 'gzip') c.setopt(pycurl.USERAGENT, 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_0) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/31.0.1650.63 Safari/537.36') c.setopt(pycurl.NOSIGNAL, True) c.setopt(pycurl.WRITEFUNCTION, s.write) for k, v in kwargs.iteritems(): c.setopt(vars(pycurl)[k], v) c.perform() c.close() return s.getvalue() except: if debug: raise continue
def curl(source_url, is_post, cookie): """ ??pycurl?????WEB??????? :source_url: ??URL :is_post: ???POST :cookie: cookie """ buffer = BytesIO() c = pycurl.Curl() c.setopt(c.ENCODING, 'gzip,deflate') c.setopt(c.COOKIE, cookie) c.setopt(c.USERAGENT, 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10.11; rv:42.0) Gecko/20100101 Firefox/42.0') try: if is_post == 2: # post url, query = source_url.split('?', 1) c.setopt(c.URL, url) c.setopt(c.POSTFIELDS, query) else: c.setopt(c.URL, source_url) c.setopt(c.WRITEDATA, buffer) c.perform() c.close() return buffer.getvalue() except: return ''
def close(self): """ Cleanup. """ for chunk in self.chunks: self.close_chunk(chunk) # Workaround: pycurl segfaults when closing multi, that never had # any curl handles if hasattr(self, 'manager'): with closing(pycurl.Curl()) as c: self.__manager.add_handle(c) self.__manager.remove_handle(c) self.chunks = [] if hasattr(self, 'manager'): self.__manager.close() del self.__manager if hasattr(self, "info"): del self.info
def __init__(self, *args, **kwargs): self.c = pycurl.Curl() Request.__init__(self, *args, **kwargs) self.rep = io.StringIO() self.last_url = None self.last_effective_url = None self.header = "" # cookiejar defines the context self.cj = self.context self.setopt(pycurl.WRITEFUNCTION, self.write) self.setopt(pycurl.HEADERFUNCTION, self.write_header) # TODO: Rename to curl
def _curl_a_link(self, target_url,post_target, commit_date=None): ''' ?????????????????????json??????????curl get??????????????????????????????????elastic??????? ''' buffer = StringIO() c = Curl() c.setopt(c.URL,target_url) c.setopt(c.WRITEDATA, buffer) c.perform() c.close() load_target=json.loads(buffer.getvalue()) return load_target pass
def test_gzip(url): t = Test() c = pycurl.Curl() c.setopt(pycurl.WRITEFUNCTION,t.callback) c.setopt(pycurl.ENCODING, 'gzip') c.setopt(pycurl.URL,url) c.setopt(pycurl.USERAGENT,"User-Agent':'EMAO_OPS_MONITOR) Gecko/20091201 Firefox/3.5.6)") c.perform() TOTAL_TIME = c.getinfo(c.TOTAL_TIME) #print "????????%.2f ms" %(TOTAL_TIME*1000) return TOTAL_TIME * 1000
def curl_read(url): try: c = pycurl.Curl() c.setopt(c.URL, url) resp = StringIO() headers = StringIO() c.setopt(c.WRITEFUNCTION, resp.write) c.setopt(c.HEADERFUNCTION, headers.write) c.setopt(pycurl.CONNECTTIMEOUT, 20) c.setopt(pycurl.TIMEOUT, 20) c.perform() if c.getinfo(c.RESPONSE_CODE) == 200: c.close() is_hit = handle_response(resp, headers) size = len(resp) return True, is_hit, size return False, False, 0 except: return False, False, 0
def end_all_async_unsafe(self): if not Config.RECORDING_ACTIVATED: return for rtmp_name in self._recording_rtmps: curl = pycurl.Curl() try: self._set_def_curl_opts(curl) curl.setopt(pycurl.URL, self._end_url(rtmp_name)) curl.setopt(pycurl.WRITEDATA, self._end_buffer) curl.perform() except pycurl.error as e: console.warning( 'Pycurl error in end_all() for racer <{0}>: Tried to curl <{1}>. Error {2}.'.format( rtmp_name, self._end_url(rtmp_name), e)) finally: curl.close() self._recording_rtmps.clear()
def _end_record_nolock(self, rtmp_name): rtmp_name = rtmp_name.lower() if rtmp_name not in self._recording_rtmps: return curl = pycurl.Curl() try: self._set_def_curl_opts(curl) curl.setopt(pycurl.URL, self._end_url(rtmp_name)) curl.setopt(pycurl.WRITEDATA, self._end_buffer) curl.perform() self._recording_rtmps = [r for r in self._recording_rtmps if r != rtmp_name] except pycurl.error as e: console.warning( 'Pycurl error in end_record({0}): Tried to curl <{1}>. Error {2}.'.format( rtmp_name, self._end_url(rtmp_name), e)) finally: curl.close()
def ccurl_setcookie(url): hdr = "Mozilla/5.0 (X11; Ubuntu; Linux i686; rv:45.0) Gecko/20100101 Firefox/45.0" c = pycurl.Curl() c.setopt(c.FOLLOWLOCATION, True) c.setopt(c.USERAGENT, hdr) c.setopt(c.COOKIEJAR, '/tmp/AnimeWatch/cookie.txt') url = str(url) c.setopt(c.URL, url) storage = BytesIO() c.setopt(c.WRITEDATA, storage) c.perform() c.close() content = storage.getvalue() content = getContentUnicode(content) return (content)