Python pycurl 模块,WRITEFUNCTION 实例源码

我们从Python开源项目中,提取了以下47个代码示例,用于说明如何使用pycurl.WRITEFUNCTION

项目:Instagram-API    作者:danleyb2    | 项目源码 | 文件源码
def request(self, endpoint, post=None):
        buffer = BytesIO()

        ch = pycurl.Curl()
        ch.setopt(pycurl.URL, Constants.API_URL + endpoint)
        ch.setopt(pycurl.USERAGENT, self.userAgent)
        ch.setopt(pycurl.WRITEFUNCTION, buffer.write)
        ch.setopt(pycurl.FOLLOWLOCATION, True)
        ch.setopt(pycurl.HEADER, True)
        ch.setopt(pycurl.VERBOSE, False)
        ch.setopt(pycurl.COOKIEFILE, os.path.join(self.IGDataPath, self.username, self.username + "-cookies.dat"))
        ch.setopt(pycurl.COOKIEJAR, os.path.join(self.IGDataPath, self.username, self.username + "-cookies.dat"))

        if post is not None:
            ch.setopt(pycurl.POST, True)
            ch.setopt(pycurl.POSTFIELDS, post)

        if self.proxy:
            ch.setopt(pycurl.PROXY, self.proxyHost)
            if self.proxyAuth:
                ch.setopt(pycurl.PROXYUSERPWD, self.proxyAuth)

        ch.perform()
        resp = buffer.getvalue()
        header_len = ch.getinfo(pycurl.HEADER_SIZE)
        header = resp[0: header_len]
        body = resp[header_len:]

        ch.close()

        if self.debug:
            print("REQUEST: " + endpoint)
            if post is not None:
                if not isinstance(post, list):
                    print("DATA: " + str(post))
            print("RESPONSE: " + body)

        return [header, json_decode(body)]
项目:QQBot    作者:springhack    | 项目源码 | 文件源码
def CurlPOST(url, data, cookie):
    c = pycurl.Curl()
    b = StringIO.StringIO()
    c.setopt(pycurl.URL, url)
    c.setopt(pycurl.POST, 1)
    c.setopt(pycurl.HTTPHEADER,['Content-Type: application/json'])
    # c.setopt(pycurl.TIMEOUT, 10)
    c.setopt(pycurl.WRITEFUNCTION, b.write)
    c.setopt(pycurl.COOKIEFILE, cookie)
    c.setopt(pycurl.COOKIEJAR, cookie)
    c.setopt(pycurl.POSTFIELDS, data)
    c.perform()
    html = b.getvalue()
    b.close()
    c.close()
    return html
项目:landscape-client    作者:CanonicalLtd    | 项目源码 | 文件源码
def test_post(self):
        curl = CurlStub(b"result")
        result = fetch("http://example.com", post=True, curl=curl)
        self.assertEqual(result, b"result")
        self.assertEqual(curl.options,
                         {pycurl.URL: b"http://example.com",
                          pycurl.FOLLOWLOCATION: 1,
                          pycurl.MAXREDIRS: 5,
                          pycurl.CONNECTTIMEOUT: 30,
                          pycurl.LOW_SPEED_LIMIT: 1,
                          pycurl.LOW_SPEED_TIME: 600,
                          pycurl.NOSIGNAL: 1,
                          pycurl.WRITEFUNCTION: Any(),
                          pycurl.POST: True,
                          pycurl.DNS_CACHE_TIMEOUT: 0,
                          pycurl.ENCODING: b"gzip,deflate"})
项目:landscape-client    作者:CanonicalLtd    | 项目源码 | 文件源码
def test_post_data(self):
        curl = CurlStub(b"result")
        result = fetch("http://example.com", post=True, data="data", curl=curl)
        self.assertEqual(result, b"result")
        self.assertEqual(curl.options[pycurl.READFUNCTION](), b"data")
        self.assertEqual(curl.options,
                         {pycurl.URL: b"http://example.com",
                          pycurl.FOLLOWLOCATION: 1,
                          pycurl.MAXREDIRS: 5,
                          pycurl.CONNECTTIMEOUT: 30,
                          pycurl.LOW_SPEED_LIMIT: 1,
                          pycurl.LOW_SPEED_TIME: 600,
                          pycurl.NOSIGNAL: 1,
                          pycurl.WRITEFUNCTION: Any(),
                          pycurl.POST: True,
                          pycurl.POSTFIELDSIZE: 4,
                          pycurl.READFUNCTION: Any(),
                          pycurl.DNS_CACHE_TIMEOUT: 0,
                          pycurl.ENCODING: b"gzip,deflate"})
项目:landscape-client    作者:CanonicalLtd    | 项目源码 | 文件源码
def test_cainfo(self):
        curl = CurlStub(b"result")
        result = fetch("https://example.com", cainfo="cainfo", curl=curl)
        self.assertEqual(result, b"result")
        self.assertEqual(curl.options,
                         {pycurl.URL: b"https://example.com",
                          pycurl.FOLLOWLOCATION: 1,
                          pycurl.MAXREDIRS: 5,
                          pycurl.CONNECTTIMEOUT: 30,
                          pycurl.LOW_SPEED_LIMIT: 1,
                          pycurl.LOW_SPEED_TIME: 600,
                          pycurl.NOSIGNAL: 1,
                          pycurl.WRITEFUNCTION: Any(),
                          pycurl.CAINFO: b"cainfo",
                          pycurl.DNS_CACHE_TIMEOUT: 0,
                          pycurl.ENCODING: b"gzip,deflate"})
项目:landscape-client    作者:CanonicalLtd    | 项目源码 | 文件源码
def test_timeouts(self):
        curl = CurlStub(b"result")
        result = fetch("http://example.com", connect_timeout=5,
                       total_timeout=30, curl=curl)
        self.assertEqual(result, b"result")
        self.assertEqual(curl.options,
                         {pycurl.URL: b"http://example.com",
                          pycurl.FOLLOWLOCATION: 1,
                          pycurl.MAXREDIRS: 5,
                          pycurl.CONNECTTIMEOUT: 5,
                          pycurl.LOW_SPEED_LIMIT: 1,
                          pycurl.LOW_SPEED_TIME: 30,
                          pycurl.NOSIGNAL: 1,
                          pycurl.WRITEFUNCTION: Any(),
                          pycurl.DNS_CACHE_TIMEOUT: 0,
                          pycurl.ENCODING: b"gzip,deflate"})
项目:landscape-client    作者:CanonicalLtd    | 项目源码 | 文件源码
def test_pycurl_insecure(self):
        curl = CurlStub(b"result")
        result = fetch("http://example.com/get-ca-cert", curl=curl,
                       insecure=True)
        self.assertEqual(result, b"result")
        self.assertEqual(curl.options,
                         {pycurl.URL: b"http://example.com/get-ca-cert",
                          pycurl.FOLLOWLOCATION: 1,
                          pycurl.MAXREDIRS: 5,
                          pycurl.CONNECTTIMEOUT: 30,
                          pycurl.LOW_SPEED_LIMIT: 1,
                          pycurl.LOW_SPEED_TIME: 600,
                          pycurl.NOSIGNAL: 1,
                          pycurl.WRITEFUNCTION: Any(),
                          pycurl.SSL_VERIFYPEER: False,
                          pycurl.DNS_CACHE_TIMEOUT: 0,
                          pycurl.ENCODING: b"gzip,deflate"})
项目:GOKU    作者:bingweichen    | 项目源码 | 文件源码
def postXmlSSL(self, xml, url, second=30, cert=True, post=True):
        """????"""
        self.curl.setopt(pycurl.URL, url)
        self.curl.setopt(pycurl.TIMEOUT, second)
        # ????
        # ?????cert ? key ??????.pem??
        # ?????PEM?????
        if cert:
            self.curl.setopt(pycurl.SSLKEYTYPE, "PEM")
            self.curl.setopt(pycurl.SSLKEY, WxPayConf_pub.SSLKEY_PATH)
            self.curl.setopt(pycurl.SSLCERTTYPE, "PEM")
            self.curl.setopt(pycurl.SSLCERT, WxPayConf_pub.SSLCERT_PATH)
        # post????
        if post:
            self.curl.setopt(pycurl.POST, True)
            self.curl.setopt(pycurl.POSTFIELDS, xml)
        buff = StringIO()
        self.curl.setopt(pycurl.WRITEFUNCTION, buff.write)

        self.curl.perform()
        return buff.getvalue()
项目:ZPoc    作者:zidier215    | 项目源码 | 文件源码
def _get_url(self, url):
        if self.API_TOKEN == None:
            logging.error('none token') # 3 For ERROR level
            return
        try:
            c = pycurl.Curl()
            c.setopt(pycurl.CAINFO, certifi.where())
            c.setopt(pycurl.URL, url)
            b = StringIO.StringIO()
            c.setopt(pycurl.WRITEFUNCTION, b.write)
            c.setopt(pycurl.USERAGENT, "Mozilla/4.0 (compatible; MSIE 6.0; Windows NT 5.1; SV1; .NET CLR 1.1.4322)")
            c.setopt(pycurl.HTTPHEADER, ['Authorization: JWT %s' % self.API_TOKEN.encode()])
            c.setopt(pycurl.CUSTOMREQUEST, "GET")
            c.setopt(pycurl.FOLLOWLOCATION, 1)
            c.perform()
            result = b.getvalue()
            logging.debug('result')
        except Exception as e:
            logging.error(e.message)
            logging.error('go error')
            pass
        return result
项目:tus-py-client    作者:tus    | 项目源码 | 文件源码
def __init__(self, uploader):
        self.handle = pycurl.Curl()
        self.response_headers = {}
        self.output = six.StringIO()
        self.status_code = None

        self.handle.setopt(pycurl.CAINFO, certifi.where())
        self.handle.setopt(pycurl.URL, uploader.url)
        self.handle.setopt(pycurl.HEADERFUNCTION, self._prepare_response_header)
        self.handle.setopt(pycurl.UPLOAD, 1)
        self.handle.setopt(pycurl.CUSTOMREQUEST, 'PATCH')

        self.file = uploader.get_file_stream()
        self.file.seek(uploader.offset)
        self.handle.setopt(pycurl.READFUNCTION, self.file.read)
        self.handle.setopt(pycurl.WRITEFUNCTION, self.output.write)
        self.handle.setopt(pycurl.INFILESIZE, uploader.request_length)

        headers = ["upload-offset: {}".format(uploader.offset),
                   "Content-Type: application/offset+octet-stream"] + uploader.headers_as_list
        self.handle.setopt(pycurl.HTTPHEADER, headers)
项目:python-Speech_Recognition    作者:zthxxx    | 项目源码 | 文件源码
def get_page_data(url, head = None, curl = None):
    stream_buffer = StringIO()
    if not curl:
        curl = pycurl.Curl()
    curl.setopt(pycurl.URL, url)#curl doesn't support unicode
    if head:
        curl.setopt(pycurl.HTTPHEADER,  head)#must be list, not dict
    curl.setopt(pycurl.WRITEFUNCTION, stream_buffer.write)
    curl.setopt(pycurl.CUSTOMREQUEST,"GET")
    curl.setopt(pycurl.CONNECTTIMEOUT, 30)
    curl.setopt(pycurl.TIMEOUT, 30)
    curl.setopt(pycurl.SSL_VERIFYPEER, 0)
    curl.setopt(pycurl.SSL_VERIFYHOST, 0)
    curl.perform()
    page_data =stream_buffer.getvalue()
    stream_buffer.close()
    return page_data
项目:python-Speech_Recognition    作者:zthxxx    | 项目源码 | 文件源码
def post_page_data(url, data = None, head = None, curl = None):
    stream_buffer = StringIO()
    if not curl:
        curl = pycurl.Curl()
    curl.setopt(pycurl.URL, url)#curl doesn't support unicode
    if head:
        curl.setopt(pycurl.HTTPHEADER,  head)#must be list, not dict
    curl.setopt(pycurl.POSTFIELDS,  data)
    curl.setopt(pycurl.WRITEFUNCTION, stream_buffer.write)
    curl.setopt(pycurl.CUSTOMREQUEST,"POST")
    # curl.setopt(pycurl.CONNECTTIMEOUT, 30)
    # curl.setopt(pycurl.TIMEOUT, 30)
    curl.perform()
    page_data = stream_buffer.getvalue()
    stream_buffer.close()
    return page_data
项目:check_wan_latency    作者:computingbee    | 项目源码 | 文件源码
def getlat4city():
    avglat = -1

    sp_url = "http://www.super-ping.com/ping.php?node=" + CITY + "&ping=" + WAN_IP
    sp_refer_url = "http://www.super-ping.com/?ping=" + WAN_IP + "&locale=en"
        sp_http_headers = [ 'Referer: ' + sp_refer_url, 'X-Requested-With: XMLHttpRequest']

    crl = pyc.Curl()
    sio = StringIO()

    crl.setopt(pyc.URL, sp_url)
        crl.setopt(pyc.HTTPHEADER, sp_http_headers)
    crl.setopt(pyc.WRITEFUNCTION, sio.write)
    crl.perform()
    crl.close()

    lat_http_result = sio.getvalue() #process http result only if html
    if lat_http_result.strip() != "-" and lat_http_result.strip() != "super-ping.com":
        fstring="ping-avg'>"
        lstring="</div>"
        start = lat_http_result.index(fstring) + len(fstring)
        end = lat_http_result.index(lstring,start)
        avglat = lat_http_result[start:end]

    return float(avglat)
项目:hzlgithub    作者:hzlRises    | 项目源码 | 文件源码
def curl(url, debug=False, **kwargs):
        while 1:
                try:
                        s = StringIO.StringIO()
                        c = pycurl.Curl()
                        c.setopt(pycurl.URL, url)
                        c.setopt(pycurl.REFERER, url)
                        c.setopt(pycurl.FOLLOWLOCATION, True)
                        c.setopt(pycurl.TIMEOUT, 60)
                        c.setopt(pycurl.ENCODING, 'gzip')
                        c.setopt(pycurl.USERAGENT, 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_0) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/31.0.1650.63 Safari/537.36')
                        c.setopt(pycurl.NOSIGNAL, True)
                        c.setopt(pycurl.WRITEFUNCTION, s.write)
                        for k, v in kwargs.iteritems():
                                c.setopt(vars(pycurl)[k], v)
                        c.perform()
                        c.close()
                        return s.getvalue()
                except:
                        if debug:
                                raise
                        continue
项目:pyload-requests    作者:pyload    | 项目源码 | 文件源码
def __init__(self, *args, **kwargs):
        self.c = pycurl.Curl()
        Request.__init__(self, *args, **kwargs)

        self.rep = io.StringIO()
        self.last_url = None
        self.last_effective_url = None
        self.header = ""

        # cookiejar defines the context
        self.cj = self.context

        self.setopt(pycurl.WRITEFUNCTION, self.write)
        self.setopt(pycurl.HEADERFUNCTION, self.write_header)

    # TODO: Rename to curl
项目:multiplierz    作者:BlaisProteomics    | 项目源码 | 文件源码
def scan(self, time):
        scan_time,mz,scan_name,st,scan_mode = min(self.scans(), key=lambda si: abs(time - si[0]))

        self.crl.setopt(pycurl.HTTPGET, True)
        self.crl.setopt(pycurl.URL, str(scan_name + '.txt'))

        response = cStringIO.StringIO()
        self.crl.setopt(pycurl.WRITEFUNCTION, response.write)

        for i in range(5):
            #print 'scan %d' % i
            self.crl.perform()
            if response.getvalue():
                break

        scan = response.getvalue().splitlines()

        # how to get charge for an mzURL scan?
        return mzScan([tuple(float(v) for v in s.split()) for s in scan],
                      scan_time, mode=scan_mode, mz=mz)
项目:multiplierz    作者:BlaisProteomics    | 项目源码 | 文件源码
def xic(self, start_time, stop_time, start_mz, stop_mz, filter=None):
        xic_url = str(self.data_file + ('/ric/%s-%s/%s-%s.txt' % (start_time, stop_time,
                                                                  start_mz, stop_mz)))

        self.crl.setopt(pycurl.HTTPGET, True)
        self.crl.setopt(pycurl.URL, xic_url)

        response = cStringIO.StringIO()
        self.crl.setopt(pycurl.WRITEFUNCTION, response.write)

        for i in range(5):
            #print 'xic %d' % i
            self.crl.perform()
            if response.getvalue():
                break

        scan = response.getvalue().splitlines()

        return [tuple(float(v) for v in s.split()) for s in scan]
项目:tools    作者:chenshiyang2015    | 项目源码 | 文件源码
def test_gzip(url): 



        t = Test() 
        c = pycurl.Curl()  
        c.setopt(pycurl.WRITEFUNCTION,t.callback)
        c.setopt(pycurl.ENCODING, 'gzip')
        c.setopt(pycurl.URL,url) 
        c.setopt(pycurl.USERAGENT,"User-Agent':'EMAO_OPS_MONITOR) Gecko/20091201 Firefox/3.5.6)")
        c.perform()   

        TOTAL_TIME = c.getinfo(c.TOTAL_TIME)


        #print "????????%.2f ms" %(TOTAL_TIME*1000) 
        return TOTAL_TIME * 1000
项目:pymsf    作者:s0m30ne    | 项目源码 | 文件源码
def searchIP(self, query, pages, queue, STOP_ME):
        if self.API_TOKEN == None:
            print "please config your API_TOKEN"
            sys.exit()
        for page in range(1, pages+1):
            b = StringIO.StringIO()
            c = pycurl.Curl()
            c.setopt(pycurl.URL, "%s?query=%s&page=%s" % (self.API_URL, query, page))
            c.setopt(pycurl.WRITEFUNCTION, b.write)
            c.setopt(pycurl.FOLLOWLOCATION, 1)
            c.setopt(pycurl.CUSTOMREQUEST, "GET")
            c.setopt(pycurl.HTTPHEADER, ['Authorization: JWT %s' % self.API_TOKEN.encode()])
            c.perform()
            hosts = json.loads(b.getvalue())
            for host in hosts['matches']:
                queue.put(host["ip"])
        STOP_ME[0] = True
项目:recipebook    作者:dpapathanasiou    | 项目源码 | 文件源码
def get (url, user_agent=UA, referrer=None):
    """Make a GET request of the url using pycurl and return the data
    (which is None if unsuccessful)"""

    data = None
    databuffer = StringIO()

    curl = pycurl.Curl()
    curl.setopt(pycurl.URL, url)
    curl.setopt(pycurl.FOLLOWLOCATION, 1)
    curl.setopt(pycurl.CONNECTTIMEOUT, 5)
    curl.setopt(pycurl.TIMEOUT, 8)
    curl.setopt(pycurl.WRITEFUNCTION, databuffer.write)
    curl.setopt(pycurl.COOKIEFILE, '')
    if user_agent:
        curl.setopt(pycurl.USERAGENT, user_agent)
    if referrer is not None:
        curl.setopt(pycurl.REFERER, referrer)
    try:
        curl.perform()
        data = databuffer.getvalue()
    except Exception:
        pass
    curl.close()

    return data
项目:Instagram-API    作者:danleyb2    | 项目源码 | 文件源码
def request(self, endpoint, headers=None, post=None, first=True):
        buffer = BytesIO()

        ch = pycurl.Curl()

        ch.setopt(pycurl.URL, endpoint)
        ch.setopt(pycurl.USERAGENT, self.userAgent)
        ch.setopt(pycurl.WRITEFUNCTION, buffer.write)
        ch.setopt(pycurl.FOLLOWLOCATION, True)
        ch.setopt(pycurl.HEADER, True)
        if headers:
            ch.setopt(pycurl.HTTPHEADER, headers)

        ch.setopt(pycurl.VERBOSE, self.debug)
        ch.setopt(pycurl.SSL_VERIFYPEER, False)
        ch.setopt(pycurl.SSL_VERIFYHOST, False)
        ch.setopt(pycurl.COOKIEFILE, self.settingsPath + self.username + '-cookies.dat')
        ch.setopt(pycurl.COOKIEJAR, self.settingsPath + self.username + '-cookies.dat')

        if post:
            import urllib
            ch.setopt(pycurl.POST, len(post))
            ch.setopt(pycurl.POSTFIELDS, urllib.urlencode(post))

        ch.perform()
        resp = buffer.getvalue()
        header_len = ch.getinfo(pycurl.HEADER_SIZE)
        header = resp[0: header_len]
        body = resp[header_len:]
        ch.close()

        if self.debug:
            import urllib
            print("REQUEST: " + endpoint)
            if post is not None:
                if not isinstance(post, list):
                    print('DATA: ' + urllib.unquote_plus(json.dumps(post)))
            print("RESPONSE: " + body + "\n")

        return [header, json_decode(body)]
项目:QQBot    作者:springhack    | 项目源码 | 文件源码
def CurlGET(url, cookie):
    c = pycurl.Curl()
    b = StringIO.StringIO()
    c.setopt(pycurl.URL, url)
    # c.setopt(pycurl.TIMEOUT, 10)
    # c.setopt(pycurl.POST, 1)
    c.setopt(pycurl.WRITEFUNCTION, b.write)
    c.setopt(pycurl.COOKIEFILE, cookie)
    c.setopt(pycurl.COOKIEJAR, cookie)
    c.perform()
    html = b.getvalue()
    b.close()
    c.close()
    return html
项目:touch-pay-client    作者:HackPucBemobi    | 项目源码 | 文件源码
def request(self, url, method, body, headers):
            c = pycurl.Curl()
            c.setopt(pycurl.URL, url)
            if 'proxy_host' in self.proxy:
                c.setopt(pycurl.PROXY, self.proxy['proxy_host'])
            if 'proxy_port' in self.proxy:
                c.setopt(pycurl.PROXYPORT, self.proxy['proxy_port'])
            if 'proxy_user' in self.proxy:
                c.setopt(pycurl.PROXYUSERPWD, "%(proxy_user)s:%(proxy_pass)s" % self.proxy)
            self.buf = StringIO()
            c.setopt(pycurl.WRITEFUNCTION, self.buf.write)
            #c.setopt(pycurl.READFUNCTION, self.read)
            #self.body = StringIO(body)
            #c.setopt(pycurl.HEADERFUNCTION, self.header)
            if self.cacert:
                c.setopt(c.CAINFO, self.cacert)
            c.setopt(pycurl.SSL_VERIFYPEER, self.cacert and 1 or 0)
            c.setopt(pycurl.SSL_VERIFYHOST, self.cacert and 2 or 0)
            c.setopt(pycurl.CONNECTTIMEOUT, self.timeout)
            c.setopt(pycurl.TIMEOUT, self.timeout)
            if method == 'POST':
                c.setopt(pycurl.POST, 1)
                c.setopt(pycurl.POSTFIELDS, body)
            if headers:
                hdrs = ['%s: %s' % (k, v) for k, v in headers.items()]
                log.debug(hdrs)
                c.setopt(pycurl.HTTPHEADER, hdrs)
            c.perform()
            c.close()
            return {}, self.buf.getvalue()
项目:xmusic-crawler    作者:rockers7414    | 项目源码 | 文件源码
def __search_video_id(self, query):
        params = {
            "key": self.key,
            "part": "id,snippet",
            "order": "relevance",
            "type": "video",
            "videoSyndicated": "true",
            "maxResults": 3,
            "q": query
        }

        buf = BytesIO()

        client = pycurl.Curl()
        client.setopt(pycurl.URL, self.SERVICE_HOST + "/youtube/v3/search?" +
                      urlencode(params))
        client.setopt(pycurl.WRITEFUNCTION, buf.write)
        client.perform()
        client.close()

        body = json.loads(buf.getvalue().decode("utf-8"))
        buf.close()

        if "error" in body:
            raise Exception("query error: {0}".format(body))

        if len(body["items"]) == 0:
            raise Exception("result not found")

        video_id = body["items"][0]["id"]["videoId"]

        return video_id
项目:xmusic-crawler    作者:rockers7414    | 项目源码 | 文件源码
def __get_video_duration(self, video_id):
        params = {
            "key": self.key,
            "part": "contentDetails",
            "id": video_id
        }

        buf = BytesIO()

        client = pycurl.Curl()
        client.setopt(pycurl.URL, self.SERVICE_HOST + "/youtube/v3/videos?" +
                      urlencode(params))
        client.setopt(pycurl.WRITEFUNCTION, buf.write)
        client.perform()
        client.close()

        body = json.loads(buf.getvalue().decode("utf-8"))
        buf.close()

        if "error" in body:
            raise Exception("query error: {0}".format(body))

        if len(body["items"]) == 0:
            raise Exception("result not found")

        duration = body["items"][0]["contentDetails"]["duration"]
        duration_seconds = isodate.parse_duration(duration).seconds

        return duration_seconds
项目:xmusic-crawler    作者:rockers7414    | 项目源码 | 文件源码
def __search(self, keyword, search_type):
        offset = 0
        result_key = search_type + "s"
        result = []

        while True:
            self.logger.info("search " + keyword + ", offset = " + str(offset))
            params = {
                "q": keyword,
                "type": search_type,
                "offset": offset,
                "limit": 50
            }

            buf = BytesIO()

            client = pycurl.Curl()
            client.setopt(pycurl.URL, self.service_host +
                          "/v1/search" + "?" + urlencode(params))
            client.setopt(pycurl.WRITEFUNCTION, buf.write)
            client.perform()
            client.close()

            body = json.loads(buf.getvalue().decode("utf-8"))
            buf.close()

            if body[result_key]["total"] > 0:
                for data in body[result_key]["items"]:
                    result.append(data)

            if body[result_key]["total"] > \
                    body[result_key]["limit"] * (offset + 1):
                offset = offset + 1
            else:
                break

        self.logger.info("Done")
        return result
项目:xmusic-crawler    作者:rockers7414    | 项目源码 | 文件源码
def get_tracks_by_album_id(self, album_id):
        self.logger.info("Get tracks of the album(" +
                         album_id + ") from Spotify.")
        if not album_id:
            return None

        offset = 0
        tracks = []
        while True:
            params = {
                "offset": offset,
                "limit": 50
            }

            buf = BytesIO()

            client = pycurl.Curl()
            client.setopt(pycurl.URL, self.service_host + "/v1/albums/" +
                          album_id + "/tracks" + "?" + urlencode(params))
            client.setopt(pycurl.WRITEFUNCTION, buf.write)
            client.perform()
            client.close()

            body = json.loads(buf.getvalue().decode("utf-8"))
            buf.close()

            if body["total"] > 0:
                for data in body["items"]:
                    tracks.append(
                        Track(data["name"], None, data["track_number"]))

            if body["total"] > body["limit"] * (offset + 1):
                offset = offset + 1
            else:
                break

        return tracks
项目:defcon-workshop    作者:devsecops    | 项目源码 | 文件源码
def process(self, prio, obj):
    self.pause.wait()
    c = obj.to_http_object(self.freelist.get())
    if self._proxies: c = self._set_proxy(c, obj)

    c.response_queue = ((StringIO(), StringIO(), obj))
    c.setopt(pycurl.WRITEFUNCTION, c.response_queue[0].write)
    c.setopt(pycurl.HEADERFUNCTION, c.response_queue[1].write)

    with self.mutex_multi:
        self.m.add_handle(c)
项目:defcon-workshop    作者:devsecops    | 项目源码 | 文件源码
def head(self):
        conn=pycurl.Curl()
        conn.setopt(pycurl.SSL_VERIFYPEER,False)
        conn.setopt(pycurl.SSL_VERIFYHOST,0)
        conn.setopt(pycurl.URL,self.completeUrl)

        conn.setopt(pycurl.NOBODY, True) # para hacer un pedido HEAD

        conn.setopt(pycurl.WRITEFUNCTION, self.header_callback)
        conn.perform()

        rp=Response()
        rp.parseResponse(self.__performHead)
        self.response=rp
项目:true_review_web2py    作者:lucadealfaro    | 项目源码 | 文件源码
def request(self, url, method, body, headers):
            c = pycurl.Curl()
            c.setopt(pycurl.URL, url)
            if 'proxy_host' in self.proxy:
                c.setopt(pycurl.PROXY, self.proxy['proxy_host'])
            if 'proxy_port' in self.proxy:
                c.setopt(pycurl.PROXYPORT, self.proxy['proxy_port'])
            if 'proxy_user' in self.proxy:
                c.setopt(pycurl.PROXYUSERPWD, "%(proxy_user)s:%(proxy_pass)s" % self.proxy)
            self.buf = StringIO()
            c.setopt(pycurl.WRITEFUNCTION, self.buf.write)
            #c.setopt(pycurl.READFUNCTION, self.read)
            #self.body = StringIO(body)
            #c.setopt(pycurl.HEADERFUNCTION, self.header)
            if self.cacert:
                c.setopt(c.CAINFO, self.cacert)
            c.setopt(pycurl.SSL_VERIFYPEER, self.cacert and 1 or 0)
            c.setopt(pycurl.SSL_VERIFYHOST, self.cacert and 2 or 0)
            c.setopt(pycurl.CONNECTTIMEOUT, self.timeout / 6)
            c.setopt(pycurl.TIMEOUT, self.timeout)
            if method == 'POST':
                c.setopt(pycurl.POST, 1)
                c.setopt(pycurl.POSTFIELDS, body)
            if headers:
                hdrs = ['%s: %s' % (k, v) for k, v in headers.items()]
                log.debug(hdrs)
                c.setopt(pycurl.HTTPHEADER, hdrs)
            c.perform()
            c.close()
            return {}, self.buf.getvalue()
项目:spc    作者:whbrewer    | 项目源码 | 文件源码
def request(self, url, method, body, headers):
            c = pycurl.Curl()
            c.setopt(pycurl.URL, url)
            if 'proxy_host' in self.proxy:
                c.setopt(pycurl.PROXY, self.proxy['proxy_host'])
            if 'proxy_port' in self.proxy:
                c.setopt(pycurl.PROXYPORT, self.proxy['proxy_port'])
            if 'proxy_user' in self.proxy:
                c.setopt(pycurl.PROXYUSERPWD, "%(proxy_user)s:%(proxy_pass)s" % self.proxy)
            self.buf = StringIO()
            c.setopt(pycurl.WRITEFUNCTION, self.buf.write)
            #c.setopt(pycurl.READFUNCTION, self.read)
            #self.body = StringIO(body)
            #c.setopt(pycurl.HEADERFUNCTION, self.header)
            if self.cacert:
                c.setopt(c.CAINFO, self.cacert)
            c.setopt(pycurl.SSL_VERIFYPEER, self.cacert and 1 or 0)
            c.setopt(pycurl.SSL_VERIFYHOST, self.cacert and 2 or 0)
            c.setopt(pycurl.CONNECTTIMEOUT, self.timeout / 6)
            c.setopt(pycurl.TIMEOUT, self.timeout)
            if method == 'POST':
                c.setopt(pycurl.POST, 1)
                c.setopt(pycurl.POSTFIELDS, body)
            if headers:
                hdrs = ['%s: %s' % (k, v) for k, v in headers.items()]
                log.debug(hdrs)
                c.setopt(pycurl.HTTPHEADER, hdrs)
            c.perform()
            c.close()
            return {}, self.buf.getvalue()
项目:landscape-client    作者:CanonicalLtd    | 项目源码 | 文件源码
def perform(self):
        if self.error:
            raise self.error
        if self.performed:
            raise AssertionError("Can't perform twice")
        self.options[pycurl.WRITEFUNCTION](self.result)
        self.performed = True
项目:landscape-client    作者:CanonicalLtd    | 项目源码 | 文件源码
def test_basic(self):
        curl = CurlStub(b"result")
        result = fetch("http://example.com", curl=curl)
        self.assertEqual(result, b"result")
        self.assertEqual(curl.options,
                         {pycurl.URL: b"http://example.com",
                          pycurl.FOLLOWLOCATION: 1,
                          pycurl.MAXREDIRS: 5,
                          pycurl.CONNECTTIMEOUT: 30,
                          pycurl.LOW_SPEED_LIMIT: 1,
                          pycurl.LOW_SPEED_TIME: 600,
                          pycurl.NOSIGNAL: 1,
                          pycurl.WRITEFUNCTION: Any(),
                          pycurl.DNS_CACHE_TIMEOUT: 0,
                          pycurl.ENCODING: b"gzip,deflate"})
项目:landscape-client    作者:CanonicalLtd    | 项目源码 | 文件源码
def test_create_curl(self):
        curls = []

        def pycurl_Curl():
            curl = CurlStub(b"result")
            curls.append(curl)
            return curl
        Curl = pycurl.Curl
        try:
            pycurl.Curl = pycurl_Curl
            result = fetch("http://example.com")
            curl = curls[0]
            self.assertEqual(result, b"result")
            self.assertEqual(curl.options,
                             {pycurl.URL: b"http://example.com",
                              pycurl.FOLLOWLOCATION: 1,
                              pycurl.MAXREDIRS: 5,
                              pycurl.CONNECTTIMEOUT: 30,
                              pycurl.LOW_SPEED_LIMIT: 1,
                              pycurl.LOW_SPEED_TIME: 600,
                              pycurl.NOSIGNAL: 1,
                              pycurl.WRITEFUNCTION: Any(),
                              pycurl.DNS_CACHE_TIMEOUT: 0,
                              pycurl.ENCODING: b"gzip,deflate"})
        finally:
            pycurl.Curl = Curl
项目:Problematica-public    作者:TechMaz    | 项目源码 | 文件源码
def request(self, url, method, body, headers):
            c = pycurl.Curl()
            c.setopt(pycurl.URL, url)
            if 'proxy_host' in self.proxy:
                c.setopt(pycurl.PROXY, self.proxy['proxy_host'])
            if 'proxy_port' in self.proxy:
                c.setopt(pycurl.PROXYPORT, self.proxy['proxy_port'])
            if 'proxy_user' in self.proxy:
                c.setopt(pycurl.PROXYUSERPWD, "%(proxy_user)s:%(proxy_pass)s" % self.proxy)
            self.buf = StringIO()
            c.setopt(pycurl.WRITEFUNCTION, self.buf.write)
            #c.setopt(pycurl.READFUNCTION, self.read)
            #self.body = StringIO(body)
            #c.setopt(pycurl.HEADERFUNCTION, self.header)
            if self.cacert:
                c.setopt(c.CAINFO, self.cacert)
            c.setopt(pycurl.SSL_VERIFYPEER, self.cacert and 1 or 0)
            c.setopt(pycurl.SSL_VERIFYHOST, self.cacert and 2 or 0)
            c.setopt(pycurl.CONNECTTIMEOUT, self.timeout / 6)
            c.setopt(pycurl.TIMEOUT, self.timeout)
            if method == 'POST':
                c.setopt(pycurl.POST, 1)
                c.setopt(pycurl.POSTFIELDS, body)
            if headers:
                hdrs = ['%s: %s' % (k, v) for k, v in headers.items()]
                log.debug(hdrs)
                c.setopt(pycurl.HTTPHEADER, hdrs)
            c.perform()
            c.close()
            return {}, self.buf.getvalue()
项目:download-manager    作者:thispc    | 项目源码 | 文件源码
def __init__(self, cookies=None, options=None):
        self.c = pycurl.Curl()
        self.rep = StringIO()

        self.cj = cookies #cookiejar

        self.lastURL = None
        self.lastEffectiveURL = None
        self.abort = False
        self.code = 0 # last http code

        self.header = ""

        self.headers = [] #temporary request header

        self.initHandle()
        self.setInterface(options)

        self.c.setopt(pycurl.WRITEFUNCTION, self.write)
        self.c.setopt(pycurl.HEADERFUNCTION, self.writeHeader)

        self.log = getLogger("log")
项目:xtdpy    作者:psycofdj    | 项目源码 | 文件源码
def cleanup(self):
    self.m_data     = io.BytesIO()
    self.m_headers  = {}
    self.m_response = TCPResponse()
    self.m_handle.setopt(pycurl.WRITEFUNCTION,  self.m_data.write)
项目:AppBackend    作者:540871129    | 项目源码 | 文件源码
def handle_request(self):
        curl_handle = pycurl.Curl()
        # set default options.
        curl_handle.setopt(pycurl.URL, self.request_url)
        curl_handle.setopt(pycurl.REFERER, self.request_url)
        curl_handle.setopt(pycurl.USERAGENT, self.useragent)
        curl_handle.setopt(pycurl.TIMEOUT, self.curlopts['TIMEOUT'])
        curl_handle.setopt(pycurl.CONNECTTIMEOUT, self.curlopts['CONNECTTIMEOUT'])
        curl_handle.setopt(pycurl.HEADER, True)
        #curl_handle.setopt(pycurl.VERBOSE, 1)
        curl_handle.setopt(pycurl.FOLLOWLOCATION, 1)
        curl_handle.setopt(pycurl.MAXREDIRS, 5)
        if(self.request_headers and len(self.request_headers) > 0):
            tmplist = list()
            for(key, value) in self.request_headers.items():
                tmplist.append(key + ':' + value)
            curl_handle.setopt(pycurl.HTTPHEADER, tmplist)
        #??????POST
        curl_handle.setopt(pycurl.HTTPPROXYTUNNEL, 1)
        curl_handle.setopt(pycurl.POSTFIELDS, self.request_body)

        response = StringIO.StringIO()
        curl_handle.setopt(pycurl.WRITEFUNCTION, response.write)

        try:
            curl_handle.perform()
        except pycurl.error as error:
            raise ChannelException(error, 5)

        self.response_code = curl_handle.getinfo(curl_handle.HTTP_CODE)
        header_size = curl_handle.getinfo(curl_handle.HEADER_SIZE)
        resp_str = response.getvalue()
        self.response_headers = resp_str[0 : header_size]
        self.response_body = resp_str[header_size : ]

        response.close()
        curl_handle.close()
项目:ZPoc    作者:zidier215    | 项目源码 | 文件源码
def _login(self):
        try:
            c = pycurl.Curl()
            c.setopt(pycurl.CAINFO, certifi.where())
            c.setopt(pycurl.URL, self.url)
            b = StringIO.StringIO()
            c.setopt(pycurl.USERAGENT, "Mozilla/4.0 (compatible; MSIE 6.0; Windows NT 5.1; SV1; .NET CLR 1.1.4322)")
            c.setopt(pycurl.WRITEFUNCTION, b.write)
            c.setopt(pycurl.FOLLOWLOCATION, 1)
            c.setopt(pycurl.MAXREDIRS, 5)
            c.setopt(pycurl.CUSTOMREQUEST, "POST")
            c.setopt(pycurl.POSTFIELDS, self.post_data)
            c.perform()
            if b.getvalue():
                logging.info('success login') # For INFO level
                self.API_TOKEN = json.loads(b.getvalue())["access_token"]
                self.save_token()
            else:
                logging.warning('success fail,get null result') #2 For WARNING level
            logging.debug(self.API_TOKEN)
            b.close()
            c.close()
        except pycurl.E_HTTP_POST_ERROR:
            logging.error(str(pycurl.E_HTTP_POST_ERROR))
        except Exception as e:
            logging.error('please check your password or username')
            logging.error(e.message) #3 For ERROR level
            pass
项目:rekall-agent-server    作者:rekall-innovations    | 项目源码 | 文件源码
def request(self, url, method, body, headers):
            c = pycurl.Curl()
            c.setopt(pycurl.URL, url)
            if 'proxy_host' in self.proxy:
                c.setopt(pycurl.PROXY, self.proxy['proxy_host'])
            if 'proxy_port' in self.proxy:
                c.setopt(pycurl.PROXYPORT, self.proxy['proxy_port'])
            if 'proxy_user' in self.proxy:
                c.setopt(pycurl.PROXYUSERPWD, "%(proxy_user)s:%(proxy_pass)s" % self.proxy)
            self.buf = StringIO()
            c.setopt(pycurl.WRITEFUNCTION, self.buf.write)
            #c.setopt(pycurl.READFUNCTION, self.read)
            #self.body = StringIO(body)
            #c.setopt(pycurl.HEADERFUNCTION, self.header)
            if self.cacert:
                c.setopt(c.CAINFO, self.cacert)
            c.setopt(pycurl.SSL_VERIFYPEER, self.cacert and 1 or 0)
            c.setopt(pycurl.SSL_VERIFYHOST, self.cacert and 2 or 0)
            c.setopt(pycurl.CONNECTTIMEOUT, self.timeout)
            c.setopt(pycurl.TIMEOUT, self.timeout)
            if method == 'POST':
                c.setopt(pycurl.POST, 1)
                c.setopt(pycurl.POSTFIELDS, body)
            if headers:
                hdrs = ['%s: %s' % (k, v) for k, v in headers.items()]
                log.debug(hdrs)
                c.setopt(pycurl.HTTPHEADER, hdrs)
            c.perform()
            c.close()
            return {}, self.buf.getvalue()
项目:Video-Downloader    作者:EvilCult    | 项目源码 | 文件源码
def getPage (self, url, requestHeader = []) :
        resultFormate = StringIO.StringIO()

        fakeIp = self.fakeIp()
        requestHeader.append('CLIENT-IP:' + fakeIp)
        requestHeader.append('X-FORWARDED-FOR:' + fakeIp)

        try:
            curl = pycurl.Curl()
            curl.setopt(pycurl.URL, url.strip())
            curl.setopt(pycurl.ENCODING, 'gzip,deflate')
            curl.setopt(pycurl.HEADER, 1)
            curl.setopt(pycurl.TIMEOUT, 120)
            curl.setopt(pycurl.SSL_VERIFYPEER, 0)   
            curl.setopt(pycurl.SSL_VERIFYHOST, 0)
            curl.setopt(pycurl.HTTPHEADER, requestHeader)
            curl.setopt(pycurl.WRITEFUNCTION, resultFormate.write)
            curl.perform()
            headerSize = curl.getinfo(pycurl.HEADER_SIZE)
            curl.close()

            header = resultFormate.getvalue()[0 : headerSize].split('\r\n')
            body = resultFormate.getvalue()[headerSize : ]
        except Exception, e:
            header = ''
            body = ''

        return header, body
项目:wfuzz    作者:gwen001    | 项目源码 | 文件源码
def process(self, prio, obj):
    self.pause.wait()
    c = obj.to_http_object(self.freelist.get())
    if self._proxies: c = self._set_proxy(c, obj)

    c.response_queue = ((StringIO(), StringIO(), obj))
    c.setopt(pycurl.WRITEFUNCTION, c.response_queue[0].write)
    c.setopt(pycurl.HEADERFUNCTION, c.response_queue[1].write)

    with self.mutex_multi:
        self.m.add_handle(c)
项目:wfuzz    作者:gwen001    | 项目源码 | 文件源码
def head(self):
        conn=pycurl.Curl()
        conn.setopt(pycurl.SSL_VERIFYPEER,False)
        conn.setopt(pycurl.SSL_VERIFYHOST,0)
        conn.setopt(pycurl.URL,self.completeUrl)

        conn.setopt(pycurl.NOBODY, True) # para hacer un pedido HEAD

        conn.setopt(pycurl.WRITEFUNCTION, self.header_callback)
        conn.perform()

        rp=Response()
        rp.parseResponse(self.__performHead)
        self.response=rp
项目:VWGen    作者:qazbnm456    | 项目源码 | 文件源码
def setUrl(self, url):
        self.b = StringIO.StringIO()
        self.c.setopt(pycurl.WRITEFUNCTION, self.b.write)

        self.c.setopt(pycurl.URL, url)
项目:slugiot-client    作者:slugiot    | 项目源码 | 文件源码
def request(self, url, method, body, headers):
            c = pycurl.Curl()
            c.setopt(pycurl.URL, url)
            if 'proxy_host' in self.proxy:
                c.setopt(pycurl.PROXY, self.proxy['proxy_host'])
            if 'proxy_port' in self.proxy:
                c.setopt(pycurl.PROXYPORT, self.proxy['proxy_port'])
            if 'proxy_user' in self.proxy:
                c.setopt(pycurl.PROXYUSERPWD, "%(proxy_user)s:%(proxy_pass)s" % self.proxy)
            self.buf = StringIO()
            c.setopt(pycurl.WRITEFUNCTION, self.buf.write)
            #c.setopt(pycurl.READFUNCTION, self.read)
            #self.body = StringIO(body)
            #c.setopt(pycurl.HEADERFUNCTION, self.header)
            if self.cacert:
                c.setopt(c.CAINFO, self.cacert)
            c.setopt(pycurl.SSL_VERIFYPEER, self.cacert and 1 or 0)
            c.setopt(pycurl.SSL_VERIFYHOST, self.cacert and 2 or 0)
            c.setopt(pycurl.CONNECTTIMEOUT, self.timeout / 6)
            c.setopt(pycurl.TIMEOUT, self.timeout)
            if method == 'POST':
                c.setopt(pycurl.POST, 1)
                c.setopt(pycurl.POSTFIELDS, body)
            if headers:
                hdrs = ['%s: %s' % (k, v) for k, v in headers.items()]
                log.debug(hdrs)
                c.setopt(pycurl.HTTPHEADER, hdrs)
            c.perform()
            c.close()
            return {}, self.buf.getvalue()
项目:polproxy    作者:kevinlekiller    | 项目源码 | 文件源码
def curlRequest(self, url, headers = False, post = False, returnHeaders=True):
        ch = pycurl.Curl()
        ch.setopt(pycurl.URL, url)
        hdrs = [
                "Host: poloniex.com",
                "Connection: close",
                "User-Agent: Mozilla/5.0 (CLI; Linux x86_64) polproxy",
                "accept: application/json"
        ]
        if post != False:
            ch.setopt(pycurl.POSTFIELDS, post)
            hdrs = hdrs + ["content-type: application/x-www-form-urlencoded", "content-length: " + str(len(post))]
        if headers != False:
            hdrs = hdrs + headers
        ch.setopt(pycurl.HTTPHEADER, hdrs)
        ch.setopt(pycurl.SSL_VERIFYHOST, 0)
        ch.setopt(pycurl.FOLLOWLOCATION, True)
        ch.setopt(pycurl.CONNECTTIMEOUT, 5)
        ch.setopt(pycurl.TIMEOUT, 5)
        ret = BytesIO()
        if returnHeaders:
            ch.setopt(pycurl.HEADERFUNCTION, ret.write)
        ch.setopt(pycurl.WRITEFUNCTION, ret.write)
        try:
            ch.perform()
        except:
            return ""
        ch.close()
        return ret.getvalue().decode("ISO-8859-1")
项目:Crawlers    作者:mi-minus    | 项目源码 | 文件源码
def getpage(url):
    c=pycurl.Curl()
    print url
    c.setopt(pycurl.URL,url)

    b=StringIO.StringIO()
    print '1'
    c.setopt(pycurl.WRITEFUNCTION,b.write)
    print '2'
    c.perform()
    print '-----'
    # f=open('%s/posts.txt' % setting.CURRENT_PATH,'wb')
    # f.write(b.getvalue())
    # f.close()
    return b.getvalue()