Python pycurl 模块,URL 实例源码

我们从Python开源项目中,提取了以下47个代码示例,用于说明如何使用pycurl.URL

项目:recipebook    作者:dpapathanasiou    | 项目源码 | 文件源码
def put (url, data, headers={}):
    """Make a PUT request to the url, using data in the message body,
    with the additional headers, if any"""

    reply = -1 # default, non-http response

    curl = pycurl.Curl()
    curl.setopt(pycurl.URL, url)
    if len(headers) > 0:
        curl.setopt(pycurl.HTTPHEADER, [k+': '+v for k,v in headers.items()])
    curl.setopt(pycurl.PUT, 1)
    curl.setopt(pycurl.INFILESIZE, len(data))
    databuffer = StringIO(data)
    curl.setopt(pycurl.READFUNCTION, databuffer.read)
    try:
        curl.perform()
        reply = curl.getinfo(pycurl.HTTP_CODE)
    except Exception:
        pass
    curl.close()

    return reply
项目:Instagram-API    作者:danleyb2    | 项目源码 | 文件源码
def request(self, endpoint, post=None):
        buffer = BytesIO()

        ch = pycurl.Curl()
        ch.setopt(pycurl.URL, Constants.API_URL + endpoint)
        ch.setopt(pycurl.USERAGENT, self.userAgent)
        ch.setopt(pycurl.WRITEFUNCTION, buffer.write)
        ch.setopt(pycurl.FOLLOWLOCATION, True)
        ch.setopt(pycurl.HEADER, True)
        ch.setopt(pycurl.VERBOSE, False)
        ch.setopt(pycurl.COOKIEFILE, os.path.join(self.IGDataPath, self.username, self.username + "-cookies.dat"))
        ch.setopt(pycurl.COOKIEJAR, os.path.join(self.IGDataPath, self.username, self.username + "-cookies.dat"))

        if post is not None:
            ch.setopt(pycurl.POST, True)
            ch.setopt(pycurl.POSTFIELDS, post)

        if self.proxy:
            ch.setopt(pycurl.PROXY, self.proxyHost)
            if self.proxyAuth:
                ch.setopt(pycurl.PROXYUSERPWD, self.proxyAuth)

        ch.perform()
        resp = buffer.getvalue()
        header_len = ch.getinfo(pycurl.HEADER_SIZE)
        header = resp[0: header_len]
        body = resp[header_len:]

        ch.close()

        if self.debug:
            print("REQUEST: " + endpoint)
            if post is not None:
                if not isinstance(post, list):
                    print("DATA: " + str(post))
            print("RESPONSE: " + body)

        return [header, json_decode(body)]
项目:QQBot    作者:springhack    | 项目源码 | 文件源码
def CurlPOST(url, data, cookie):
    c = pycurl.Curl()
    b = StringIO.StringIO()
    c.setopt(pycurl.URL, url)
    c.setopt(pycurl.POST, 1)
    c.setopt(pycurl.HTTPHEADER,['Content-Type: application/json'])
    # c.setopt(pycurl.TIMEOUT, 10)
    c.setopt(pycurl.WRITEFUNCTION, b.write)
    c.setopt(pycurl.COOKIEFILE, cookie)
    c.setopt(pycurl.COOKIEJAR, cookie)
    c.setopt(pycurl.POSTFIELDS, data)
    c.perform()
    html = b.getvalue()
    b.close()
    c.close()
    return html
项目:darkc0de-old-stuff    作者:tuwid    | 项目源码 | 文件源码
def getXML(self,obj):
        r=obj.createElement("request")
        r.setAttribute("method",self.method)
        url=obj.createElement("URL")
        url.appendChild(obj.createTextNode(self.completeUrl))
        r.appendChild(url)
        if self.method=="POST":
            pd=obj.createElement("PostData")
            pd.appendChild(obj.createTextNode(self.postdata))
            r.appendChild(pd)
        if "Cookie" in self.__headers:
            ck=obj.createElement("Cookie")
            ck.appendChild(obj.createTextNode(self.__headers["Cookie"]))
            r.appendChild(ck)

        return r
项目:ceiba-dl    作者:lantw44    | 项目源码 | 文件源码
def file(self, path, output, args={}, progress_callback=lambda *x: None):
        self.logger.debug('??????????')
        self.web_cache[path] = dict(args)
        url = urllib.parse.urljoin(self.file_url, urllib.parse.quote(path))
        if len(args) > 0:
            url += '?' + urllib.parse.urlencode(args)
        self.logger.debug('HTTP ?????{}'.format(url))
        self.curl.setopt(pycurl.URL, url)
        self.curl.setopt(pycurl.COOKIE, self.web_cookie)
        self.curl.setopt(pycurl.NOBODY, False)
        self.curl.setopt(pycurl.NOPROGRESS, False)
        self.curl.setopt(pycurl.WRITEDATA, output)
        self.curl.setopt(pycurl.HEADERFUNCTION, lambda *x: None)
        self.curl.setopt(pycurl.XFERINFOFUNCTION, progress_callback)
        self.curl.perform()
        status = self.curl.getinfo(pycurl.RESPONSE_CODE)
        if status != 200:
            raise ServerError(status)
项目:ceiba-dl    作者:lantw44    | 项目源码 | 文件源码
def file_size(self, path, args={}):
        self.logger.debug('????????????')
        self.web_cache[path] = dict(args)
        url = urllib.parse.urljoin(self.file_url, urllib.parse.quote(path))
        if len(args) > 0:
            url += '?' + urllib.parse.urlencode(args)
        self.logger.debug('HTTP ?????{}'.format(url))
        self.curl.setopt(pycurl.URL, url)
        self.curl.setopt(pycurl.COOKIE, self.web_cookie)
        self.curl.setopt(pycurl.NOBODY, True)
        self.curl.setopt(pycurl.NOPROGRESS, True)
        self.curl.setopt(pycurl.WRITEDATA, io.BytesIO())
        self.curl.setopt(pycurl.HEADERFUNCTION, lambda *x: None)
        self.curl.setopt(pycurl.XFERINFOFUNCTION, lambda *x: None)
        self.curl.perform()
        status = self.curl.getinfo(pycurl.RESPONSE_CODE)
        if status != 200:
            raise ServerError(status)
        return self.curl.getinfo(pycurl.CONTENT_LENGTH_DOWNLOAD)
项目:ceiba-dl    作者:lantw44    | 项目源码 | 文件源码
def web_redirect(self, path, args={}):
        self.logger.debug('????????????')
        self.web_cache[path] = dict(args)
        url = urllib.parse.urljoin(self.web_url, urllib.parse.quote(path))
        if len(args) > 0:
            url += '?' + urllib.parse.urlencode(args)
        self.logger.debug('HTTP ?????{}'.format(url))
        headers = io.BytesIO()
        self.curl.setopt(pycurl.URL, url)
        self.curl.setopt(pycurl.COOKIE, self.web_cookie)
        self.curl.setopt(pycurl.NOBODY, False)
        self.curl.setopt(pycurl.NOPROGRESS, True)
        self.curl.setopt(pycurl.WRITEDATA, NoneIO())
        self.curl.setopt(pycurl.HEADERFUNCTION, headers.write)
        self.curl.setopt(pycurl.XFERINFOFUNCTION, lambda *x: None)
        self.curl.perform()
        status = self.curl.getinfo(pycurl.RESPONSE_CODE)
        if status != 302:
            raise ServerError(status)
        for header_line in headers.getvalue().split(b'\r\n'):
            if header_line.startswith(b'Location:'):
                return header_line.split(b':', maxsplit=1)[1].strip().decode()
        return None
项目:pathspider    作者:mami-project    | 项目源码 | 文件源码
def connect_https(source, job, conn_timeout, curlopts=None, curlinfos=None):
    if curlopts is None:
        curlopts = {}

    if ':' in job['dip']:
        ipString = '[' + job['dip'] + ']'
    else:
        ipString = job['dip']

    if pycurl.URL not in curlopts:
        if 'domain' in job:
            url = "http://" + job['domain'] + ":" + str(job['dp']) + "/"
        else:
            url = "http://" + ipString + ":" + str(job['dp']) + "/"
    else:
        curlopts[pycurl.URL] = url

    if pycurl.SSL_VERIFYHOST not in curlopts:
        curlopts[pycurl.SSL_VERIFYHOST] = 0

    if pycurl.SSL_VERIFYPEER not in curlopts:
        curlopts[pycurl.SSL_VERIFYPEER] = 0

    return connect_http(source, job, conn_timeout, curlopts, curlinfos)
项目:defcon-workshop    作者:devsecops    | 项目源码 | 文件源码
def getXML(self,obj):
        r=obj.createElement("request")
        r.setAttribute("method",self.method)
        url=obj.createElement("URL")
        url.appendChild(obj.createTextNode(self.completeUrl))
        r.appendChild(url)
        if self.method=="POST":
            pd=obj.createElement("PostData")
            pd.appendChild(obj.createTextNode(self.postdata))
            r.appendChild(pd)
        if "Cookie" in self._headers:
            ck=obj.createElement("Cookie")
            ck.appendChild(obj.createTextNode(self._headers["Cookie"]))
            r.appendChild(ck)

        return r
项目:oa_qian    作者:sunqb    | 项目源码 | 文件源码
def fetch(self, url, body=None, headers=None):
        if not _allowedURL(url):
            raise ValueError('Bad URL scheme: %r' % (url,))

        if headers is None:
            headers = {}

        headers.setdefault(
            'User-Agent',
            "%s Python-urllib/%s" % (USER_AGENT, urllib2.__version__,))

        req = urllib2.Request(url, data=body, headers=headers)
        try:
            f = self.urlopen(req)
            try:
                return self._makeResponse(f)
            finally:
                f.close()
        except urllib2.HTTPError, why:
            try:
                return self._makeResponse(why)
            finally:
                why.close()
项目:landscape-client    作者:CanonicalLtd    | 项目源码 | 文件源码
def fetch_many_async(urls, callback=None, errback=None, **kwargs):
    """
    Retrieve a list of URLs asynchronously.

    @param callback: Optionally, a function that will be fired one time for
        each successful URL, and will be passed its content and the URL itself.
    @param errback: Optionally, a function that will be fired one time for each
        failing URL, and will be passed the failure and the URL itself.
    @return: A C{DeferredList} whose callback chain will be fired as soon as
        all downloads have terminated. If an error occurs, the errback chain
        of the C{DeferredList} will be fired immediatly.
    """
    results = []
    for url in urls:
        result = fetch_async(url, **kwargs)
        if callback:
            result.addCallback(callback, url)
        if errback:
            result.addErrback(errback, url)
        results.append(result)
    return DeferredList(results, fireOnOneErrback=True, consumeErrors=True)
项目:landscape-client    作者:CanonicalLtd    | 项目源码 | 文件源码
def fetch_to_files(urls, directory, logger=None, **kwargs):
    """
    Retrieve a list of URLs and save their content as files in a directory.

    @param urls: The list URLs to fetch.
    @param directory: The directory to save the files to, the name of the file
        will equal the last fragment of the URL.
    @param logger: Optional function to be used to log errors for failed URLs.
    """

    def write(data, url):
        filename = url_to_filename(url, directory=directory)
        fd = open(filename, "wb")
        fd.write(data)
        fd.close()

    def log_error(failure, url):
        if logger:
            logger("Couldn't fetch file from %s (%s)" % (
                url, str(failure.value)))
        return failure

    return fetch_many_async(urls, callback=write, errback=log_error, **kwargs)
项目:landscape-client    作者:CanonicalLtd    | 项目源码 | 文件源码
def test_post(self):
        curl = CurlStub(b"result")
        result = fetch("http://example.com", post=True, curl=curl)
        self.assertEqual(result, b"result")
        self.assertEqual(curl.options,
                         {pycurl.URL: b"http://example.com",
                          pycurl.FOLLOWLOCATION: 1,
                          pycurl.MAXREDIRS: 5,
                          pycurl.CONNECTTIMEOUT: 30,
                          pycurl.LOW_SPEED_LIMIT: 1,
                          pycurl.LOW_SPEED_TIME: 600,
                          pycurl.NOSIGNAL: 1,
                          pycurl.WRITEFUNCTION: Any(),
                          pycurl.POST: True,
                          pycurl.DNS_CACHE_TIMEOUT: 0,
                          pycurl.ENCODING: b"gzip,deflate"})
项目:landscape-client    作者:CanonicalLtd    | 项目源码 | 文件源码
def test_post_data(self):
        curl = CurlStub(b"result")
        result = fetch("http://example.com", post=True, data="data", curl=curl)
        self.assertEqual(result, b"result")
        self.assertEqual(curl.options[pycurl.READFUNCTION](), b"data")
        self.assertEqual(curl.options,
                         {pycurl.URL: b"http://example.com",
                          pycurl.FOLLOWLOCATION: 1,
                          pycurl.MAXREDIRS: 5,
                          pycurl.CONNECTTIMEOUT: 30,
                          pycurl.LOW_SPEED_LIMIT: 1,
                          pycurl.LOW_SPEED_TIME: 600,
                          pycurl.NOSIGNAL: 1,
                          pycurl.WRITEFUNCTION: Any(),
                          pycurl.POST: True,
                          pycurl.POSTFIELDSIZE: 4,
                          pycurl.READFUNCTION: Any(),
                          pycurl.DNS_CACHE_TIMEOUT: 0,
                          pycurl.ENCODING: b"gzip,deflate"})
项目:landscape-client    作者:CanonicalLtd    | 项目源码 | 文件源码
def test_cainfo(self):
        curl = CurlStub(b"result")
        result = fetch("https://example.com", cainfo="cainfo", curl=curl)
        self.assertEqual(result, b"result")
        self.assertEqual(curl.options,
                         {pycurl.URL: b"https://example.com",
                          pycurl.FOLLOWLOCATION: 1,
                          pycurl.MAXREDIRS: 5,
                          pycurl.CONNECTTIMEOUT: 30,
                          pycurl.LOW_SPEED_LIMIT: 1,
                          pycurl.LOW_SPEED_TIME: 600,
                          pycurl.NOSIGNAL: 1,
                          pycurl.WRITEFUNCTION: Any(),
                          pycurl.CAINFO: b"cainfo",
                          pycurl.DNS_CACHE_TIMEOUT: 0,
                          pycurl.ENCODING: b"gzip,deflate"})
项目:landscape-client    作者:CanonicalLtd    | 项目源码 | 文件源码
def test_headers(self):
        curl = CurlStub(b"result")
        result = fetch("http://example.com",
                       headers={"a": "1", "b": "2"}, curl=curl)
        self.assertEqual(result, b"result")
        self.assertEqual(curl.options,
                         {pycurl.URL: b"http://example.com",
                          pycurl.FOLLOWLOCATION: 1,
                          pycurl.MAXREDIRS: 5,
                          pycurl.CONNECTTIMEOUT: 30,
                          pycurl.LOW_SPEED_LIMIT: 1,
                          pycurl.LOW_SPEED_TIME: 600,
                          pycurl.NOSIGNAL: 1,
                          pycurl.WRITEFUNCTION: Any(),
                          pycurl.HTTPHEADER: ["a: 1", "b: 2"],
                          pycurl.DNS_CACHE_TIMEOUT: 0,
                          pycurl.ENCODING: b"gzip,deflate"})
项目:landscape-client    作者:CanonicalLtd    | 项目源码 | 文件源码
def test_timeouts(self):
        curl = CurlStub(b"result")
        result = fetch("http://example.com", connect_timeout=5,
                       total_timeout=30, curl=curl)
        self.assertEqual(result, b"result")
        self.assertEqual(curl.options,
                         {pycurl.URL: b"http://example.com",
                          pycurl.FOLLOWLOCATION: 1,
                          pycurl.MAXREDIRS: 5,
                          pycurl.CONNECTTIMEOUT: 5,
                          pycurl.LOW_SPEED_LIMIT: 1,
                          pycurl.LOW_SPEED_TIME: 30,
                          pycurl.NOSIGNAL: 1,
                          pycurl.WRITEFUNCTION: Any(),
                          pycurl.DNS_CACHE_TIMEOUT: 0,
                          pycurl.ENCODING: b"gzip,deflate"})
项目:landscape-client    作者:CanonicalLtd    | 项目源码 | 文件源码
def test_pycurl_insecure(self):
        curl = CurlStub(b"result")
        result = fetch("http://example.com/get-ca-cert", curl=curl,
                       insecure=True)
        self.assertEqual(result, b"result")
        self.assertEqual(curl.options,
                         {pycurl.URL: b"http://example.com/get-ca-cert",
                          pycurl.FOLLOWLOCATION: 1,
                          pycurl.MAXREDIRS: 5,
                          pycurl.CONNECTTIMEOUT: 30,
                          pycurl.LOW_SPEED_LIMIT: 1,
                          pycurl.LOW_SPEED_TIME: 600,
                          pycurl.NOSIGNAL: 1,
                          pycurl.WRITEFUNCTION: Any(),
                          pycurl.SSL_VERIFYPEER: False,
                          pycurl.DNS_CACHE_TIMEOUT: 0,
                          pycurl.ENCODING: b"gzip,deflate"})
项目:Zipatoapi    作者:ggruner    | 项目源码 | 文件源码
def get_user_init(self):
        '''
        Description:
         (re)initialize the nonce of current session and fetch the session ID
        Return:
         {
            "success": true,
            "jsessionid": "3185591CD191F18D1551440AE1BEF86A-n1.frontend3",
            "nonce": "GTPSLZUcDyjEBqeL"
         }
        '''
        uri = "user/init"
        api_url = self.url + uri
        c = pycurl.Curl()
        output_init = BytesIO()

        c.setopt(c.URL, api_url)
        ### Create the cookie File
        c.setopt(pycurl.COOKIEJAR, 'cookie.txt')
        c.setopt(c.WRITEFUNCTION, output_init.write)
        c.perform()

        return json.loads(output_init.getvalue())
项目:Zipatoapi    作者:ggruner    | 项目源码 | 文件源码
def get_user_login(self, token):
        '''
        Description:
         login
        '''
        self.token = token

        uri = "user/login?username="+self.login+"&token="+self.token

        api_url = self.url + uri
        c = pycurl.Curl()
        output = BytesIO()

        c.setopt(c.URL, api_url)
        ### Read the cookie File
        c.setopt(pycurl.COOKIEFILE, 'cookie.txt')
        c.setopt(c.WRITEFUNCTION, output.write)
        c.perform()

        return json.loads(output.getvalue())
项目:Zipatoapi    作者:ggruner    | 项目源码 | 文件源码
def get_all_virtual_endpoints(self):
        '''
        Description:
         get all virtual endpoints
        '''
        uri = "virtualEndpoints"
        api_url = self.url + uri
        c = pycurl.Curl()
        output_init = BytesIO()

        c.setopt(c.URL, api_url)
        ### Create the cookie File
        c.setopt(pycurl.COOKIEFILE, 'cookie.txt')
        c.setopt(c.WRITEFUNCTION, output_init.write)
        c.perform()

        return json.loads(output_init.getvalue())
项目:Zipatoapi    作者:ggruner    | 项目源码 | 文件源码
def create_virtual_endpoints(self, data, category):
        '''
        Description:
         create a virtual endpoints
         category:
          SENSOR
          METER
          GAUGE
          ONOFF
          LEVEL_CONTROL
        '''
        self.data = data
        self.category = category

        uri = "virtualEndpoints/?category=" + self.category
        api_url = self.url + uri

        c = pycurl.Curl()
        c.setopt(pycurl.URL, api_url)
        c.setopt(pycurl.HTTPHEADER, ['Accept: application/json','Content-Type: application/json','charset=UTF-8'])
        c.setopt(pycurl.COOKIEFILE, 'cookie.txt')
        c.setopt(pycurl.POST, 1)
        c.setopt(pycurl.POSTFIELDS, self.data)
        c.setopt(pycurl.VERBOSE, 1)
        c.perform()
项目:Zipatoapi    作者:ggruner    | 项目源码 | 文件源码
def get_virtual_endpoints_config(self, uuid):
        '''
        Description:
         get virtual endpoints config
        '''
        self.uuid = uuid

        uri = "virtualEndpoints/"+self.uuid+"/config"
        api_url = self.url + uri

        c = pycurl.Curl()
        output_init = BytesIO()

        c.setopt(c.URL, api_url)
        ### Create the cookie File
        c.setopt(pycurl.COOKIEFILE, 'cookie.txt')
        c.setopt(c.WRITEFUNCTION, output_init.write)
        c.perform()

        return json.loads(output_init.getvalue())
项目:Zipatoapi    作者:ggruner    | 项目源码 | 文件源码
def create_rooms(self, data):
        '''
        Description:
         create a room
        '''
        self.data = data

        uri = "rooms/"
        api_url = self.url + uri

        c = pycurl.Curl()
        c.setopt(pycurl.URL, api_url)
        c.setopt(pycurl.HTTPHEADER, ['Accept: application/json','Content-Type: application/json','charset=UTF-8'])
        c.setopt(pycurl.COOKIEFILE, 'cookie.txt')
        c.setopt(pycurl.POST, 1)
        c.setopt(pycurl.POSTFIELDS, self.data)
        c.setopt(pycurl.VERBOSE, 1)
        c.perform()
项目:Zipatoapi    作者:ggruner    | 项目源码 | 文件源码
def put_attributes_config(self, data, uuid):
        '''
        Description:
         modify an attribute
        '''
        self.data = data
        self.uuid = uuid

        uri = "attributes/" + self.uuid + "/config"
        api_url = self.url + uri

        c = pycurl.Curl()
        c.setopt(pycurl.URL, api_url)
        c.setopt(pycurl.HTTPHEADER, ['Accept: application/json','Content-Type: application/json','charset=UTF-8'])
        c.setopt(pycurl.COOKIEFILE, 'cookie.txt')
        c.setopt(pycurl.POST, 1)
        c.setopt(pycurl.POSTFIELDS, self.data)
        c.setopt(pycurl.VERBOSE, 1)
        c.perform()
项目:Zipatoapi    作者:ggruner    | 项目源码 | 文件源码
def save_and_synchronize(self, wait="false", timeout=30):
        '''
        Description:
         synchronize Zipato with the Server
        '''
        self.wait = wait
        self.timeout = timeout

        uri = "box/saveAndSynchronize?wait=" + self.wait + "&timeout=" + str(self.timeout)

        api_url = self.url + uri

        c = pycurl.Curl()
        output_init = BytesIO()

        c.setopt(c.URL, api_url)
        ### Create the cookie File
        c.setopt(pycurl.COOKIEFILE, 'cookie.txt')
        c.setopt(c.WRITEFUNCTION, output_init.write)
        c.perform()
        c.close()

        return json.loads(output_init.getvalue())
项目:Zipatoapi    作者:ggruner    | 项目源码 | 文件源码
def synchronize(self, ifneeded="false", wait="false", timeout=30):
        '''
        Description:
         synchronize Zipato with the Server
        '''
        self.ifneeded = ifneeded
        self.wait = wait
        self.timeout = timeout

        uri = "box/synchronize?ifNeeded=" + self.ifneeded + "wait=" + self.wait + "&timeout=" + str(self.timeout)

        api_url = self.url + uri

        c = pycurl.Curl()
        output_init = BytesIO()

        c.setopt(c.URL, api_url)
        ### Create the cookie File
        c.setopt(pycurl.COOKIEFILE, 'cookie.txt')
        c.setopt(c.WRITEFUNCTION, output_init.write)
        c.perform()
        c.close()

        return json.loads(output_init.getvalue())
项目:GOKU    作者:bingweichen    | 项目源码 | 文件源码
def postXmlSSL(self, xml, url, second=30, cert=True, post=True):
        """????"""
        self.curl.setopt(pycurl.URL, url)
        self.curl.setopt(pycurl.TIMEOUT, second)
        # ????
        # ?????cert ? key ??????.pem??
        # ?????PEM?????
        if cert:
            self.curl.setopt(pycurl.SSLKEYTYPE, "PEM")
            self.curl.setopt(pycurl.SSLKEY, WxPayConf_pub.SSLKEY_PATH)
            self.curl.setopt(pycurl.SSLCERTTYPE, "PEM")
            self.curl.setopt(pycurl.SSLCERT, WxPayConf_pub.SSLCERT_PATH)
        # post????
        if post:
            self.curl.setopt(pycurl.POST, True)
            self.curl.setopt(pycurl.POSTFIELDS, xml)
        buff = StringIO()
        self.curl.setopt(pycurl.WRITEFUNCTION, buff.write)

        self.curl.perform()
        return buff.getvalue()
项目:Opencv_learning    作者:wjb711    | 项目源码 | 文件源码
def use_cloud(token):
    fp = wave.open('output.wav','r')
    nf = fp.getnframes()
    f_len = nf * 2
    audio_data = fp.readframes(nf)

    cuid = "123456" #my xiaomi phone MAC
    srv_url = 'http://vop.baidu.com/server_api' + '?cuid=' + cuid + '&token=' + token
    http_header = [
        'Content-Type: audio/pcm; rate=8000',
        'Content-Length: %d' % f_len
    ]
    print srv_url
    c = pycurl.Curl()
    c.setopt(pycurl.URL, str(srv_url)) #curl doesn't support unicode
    c.setopt(c.HTTPHEADER, http_header)   #must be list, not dict
    c.setopt(c.POST, 1)
    c.setopt(c.CONNECTTIMEOUT, 30)
    c.setopt(c.TIMEOUT, 30)
    c.setopt(c.WRITEFUNCTION, dump_res)
    c.setopt(c.POSTFIELDS, audio_data)
    c.setopt(c.POSTFIELDSIZE, f_len)
    c.perform()
项目:Opencv_learning    作者:wjb711    | 项目源码 | 文件源码
def use_cloud(token):
    fp = wave.open('output.wav','r')
    nf = fp.getnframes()
    f_len = nf * 2
    audio_data = fp.readframes(nf)

    cuid = "123456" #my xiaomi phone MAC
    srv_url = 'http://vop.baidu.com/server_api' + '?cuid=' + cuid + '&token=' + token
    http_header = [
        'Content-Type: audio/pcm; rate=8000',
        'Content-Length: %d' % f_len
    ]
    print srv_url
    c = pycurl.Curl()
    c.setopt(pycurl.URL, str(srv_url)) #curl doesn't support unicode
    c.setopt(c.HTTPHEADER, http_header)   #must be list, not dict
    c.setopt(c.POST, 1)
    c.setopt(c.CONNECTTIMEOUT, 30)
    c.setopt(c.TIMEOUT, 30)
    c.setopt(c.WRITEFUNCTION, dump_res)
    c.setopt(c.POSTFIELDS, audio_data)
    c.setopt(c.POSTFIELDSIZE, f_len)
    c.perform()
项目:Opencv_learning    作者:wjb711    | 项目源码 | 文件源码
def use_cloud(token):  
    fp = wave.open('output.wav', 'rb')  
    nf = fp.getnframes()  
    f_len = nf * 2  
    audio_data = fp.readframes(nf)  

    cuid = "xxxxxxxxxx" #my xiaomi phone MAC  
    srv_url = 'http://vop.baidu.com/server_api' + '?cuid=' + cuid + '&token=' + token  
    http_header = [  
        'Content-Type: audio/pcm; rate=8000',  
        'Content-Length: %d' % f_len  
    ]  
    print srv_url
    c = pycurl.Curl()  
    c.setopt(pycurl.URL, str(srv_url)) #curl doesn't support unicode  
    #c.setopt(c.RETURNTRANSFER, 1)  
    c.setopt(c.HTTPHEADER, http_header)   #must be list, not dict  
    c.setopt(c.POST, 1)  
    c.setopt(c.CONNECTTIMEOUT, 30)  
    c.setopt(c.TIMEOUT, 30)  
    c.setopt(c.WRITEFUNCTION, dump_res)  
    c.setopt(c.POSTFIELDS, audio_data)  
    c.setopt(c.POSTFIELDSIZE, f_len)  
    c.perform() #pycurl.perform() has no return val
项目:Opencv_learning    作者:wjb711    | 项目源码 | 文件源码
def use_cloud(token):
    fp = wave.open('output.wav','r')
    nf = fp.getnframes()
    f_len = nf * 2
    audio_data = fp.readframes(nf)

    cuid = "123456" #my xiaomi phone MAC
    srv_url = 'http://vop.baidu.com/server_api' + '?cuid=' + cuid + '&token=' + token
    http_header = [
        'Content-Type: audio/pcm; rate=8000',
        'Content-Length: %d' % f_len
    ]

    c = pycurl.Curl()
    c.setopt(pycurl.URL, str(srv_url)) #curl doesn't support unicode
    c.setopt(c.HTTPHEADER, http_header)   #must be list, not dict
    c.setopt(c.POST, 1)
    c.setopt(c.CONNECTTIMEOUT, 30)
    c.setopt(c.TIMEOUT, 30)
    c.setopt(c.WRITEFUNCTION, dump_res)
    c.setopt(c.POSTFIELDS, audio_data)
    c.setopt(c.POSTFIELDSIZE, f_len)
    c.perform()
项目:ZPoc    作者:zidier215    | 项目源码 | 文件源码
def _get_url(self, url):
        if self.API_TOKEN == None:
            logging.error('none token') # 3 For ERROR level
            return
        try:
            c = pycurl.Curl()
            c.setopt(pycurl.CAINFO, certifi.where())
            c.setopt(pycurl.URL, url)
            b = StringIO.StringIO()
            c.setopt(pycurl.WRITEFUNCTION, b.write)
            c.setopt(pycurl.USERAGENT, "Mozilla/4.0 (compatible; MSIE 6.0; Windows NT 5.1; SV1; .NET CLR 1.1.4322)")
            c.setopt(pycurl.HTTPHEADER, ['Authorization: JWT %s' % self.API_TOKEN.encode()])
            c.setopt(pycurl.CUSTOMREQUEST, "GET")
            c.setopt(pycurl.FOLLOWLOCATION, 1)
            c.perform()
            result = b.getvalue()
            logging.debug('result')
        except Exception as e:
            logging.error(e.message)
            logging.error('go error')
            pass
        return result
项目:speech_rec_py    作者:YichiHuang    | 项目源码 | 文件源码
def use_cloud(token):
    fp=wave.open(u'01.wav','rb')
    nf=fp.getnframes()
    print 'sampwidth:',fp.getnframes()
    print 'framerate:',fp.getframerate()
    print 'channels:',fp.getnchannels()
    f_len=nf*2 
    audio_data=fp.readframes(nf)

    cuid="10:2A:B3:58:28:88" #my redmi phone MAC
    srv_url='http://vop.baidu.com/server_api'+'?cuid='+cuid+'&token='+token
    http_header=[
                 'Content-Type:audio/pcm; rate=8000',
                 'Content-length:%d' % f_len
    ]

    c=pycurl.Curl()
    c.setopt(pycurl.URL,str(srv_url))
    c.setopt(c.HTTPHEADER,http_header)
    c.setopt(c.CONNECTTIMEOUT,80)
    c.setopt(c.TIMEOUT,80)
    c.setopt(c.WRITEFUNCTION,dump_res)
    c.setopt(c.POSTFIELDS,audio_data)
    c.setopt(c.POSTFIELDSIZE,f_len)
    c.perform() #pycurl.perform() has no return val
项目:myRobot    作者:jesson20121020    | 项目源码 | 文件源码
def voice_to_text(self, voice_file, callback):
        # print 'xdc::::::::voice:::::', voice_file, callback
        fp = wave.open(voice_file, 'rb')
        nf = fp.getnframes()
        f_len = nf * 2
        audio_data = fp.readframes(nf)

        cuid = "xxxxxxxxxx" #my xiaomi phone MAC
        srv_url = 'http://vop.baidu.com/server_api' + '?cuid=' + cuid + '&token=' + self.get_token()
        http_header = [
            'Content-Type: audio/pcm; rate=8000',
            'Content-Length: %d' % f_len
        ]
        c = pycurl.Curl()
        c.setopt(pycurl.URL, str(srv_url)) #curl doesn't support unicode
        #c.setopt(c.RETURNTRANSFER, 1)
        c.setopt(c.HTTPHEADER, http_header)   #must be list, not dict
        c.setopt(c.POST, 1)
        c.setopt(c.CONNECTTIMEOUT, 30)
        c.setopt(c.TIMEOUT, 30)
        c.setopt(c.WRITEFUNCTION, callback)
        c.setopt(c.POSTFIELDS, audio_data)
        c.setopt(c.POSTFIELDSIZE, f_len)
        c.perform() #pycurl.perform() has no return val
项目:wfuzz    作者:gwen001    | 项目源码 | 文件源码
def getXML(self,obj):
        r=obj.createElement("request")
        r.setAttribute("method",self.method)
        url=obj.createElement("URL")
        url.appendChild(obj.createTextNode(self.completeUrl))
        r.appendChild(url)
        if self.method=="POST":
            pd=obj.createElement("PostData")
            pd.appendChild(obj.createTextNode(self.postdata))
            r.appendChild(pd)
        if "Cookie" in self._headers:
            ck=obj.createElement("Cookie")
            ck.appendChild(obj.createTextNode(self._headers["Cookie"]))
            r.appendChild(ck)

        return r
项目:tus-py-client    作者:tus    | 项目源码 | 文件源码
def __init__(self, uploader):
        self.handle = pycurl.Curl()
        self.response_headers = {}
        self.output = six.StringIO()
        self.status_code = None

        self.handle.setopt(pycurl.CAINFO, certifi.where())
        self.handle.setopt(pycurl.URL, uploader.url)
        self.handle.setopt(pycurl.HEADERFUNCTION, self._prepare_response_header)
        self.handle.setopt(pycurl.UPLOAD, 1)
        self.handle.setopt(pycurl.CUSTOMREQUEST, 'PATCH')

        self.file = uploader.get_file_stream()
        self.file.seek(uploader.offset)
        self.handle.setopt(pycurl.READFUNCTION, self.file.read)
        self.handle.setopt(pycurl.WRITEFUNCTION, self.output.write)
        self.handle.setopt(pycurl.INFILESIZE, uploader.request_length)

        headers = ["upload-offset: {}".format(uploader.offset),
                   "Content-Type: application/offset+octet-stream"] + uploader.headers_as_list
        self.handle.setopt(pycurl.HTTPHEADER, headers)
项目:python-Speech_Recognition    作者:zthxxx    | 项目源码 | 文件源码
def get_page_data(url, head = None, curl = None):
    stream_buffer = StringIO()
    if not curl:
        curl = pycurl.Curl()
    curl.setopt(pycurl.URL, url)#curl doesn't support unicode
    if head:
        curl.setopt(pycurl.HTTPHEADER,  head)#must be list, not dict
    curl.setopt(pycurl.WRITEFUNCTION, stream_buffer.write)
    curl.setopt(pycurl.CUSTOMREQUEST,"GET")
    curl.setopt(pycurl.CONNECTTIMEOUT, 30)
    curl.setopt(pycurl.TIMEOUT, 30)
    curl.setopt(pycurl.SSL_VERIFYPEER, 0)
    curl.setopt(pycurl.SSL_VERIFYHOST, 0)
    curl.perform()
    page_data =stream_buffer.getvalue()
    stream_buffer.close()
    return page_data
项目:python-Speech_Recognition    作者:zthxxx    | 项目源码 | 文件源码
def post_page_data(url, data = None, head = None, curl = None):
    stream_buffer = StringIO()
    if not curl:
        curl = pycurl.Curl()
    curl.setopt(pycurl.URL, url)#curl doesn't support unicode
    if head:
        curl.setopt(pycurl.HTTPHEADER,  head)#must be list, not dict
    curl.setopt(pycurl.POSTFIELDS,  data)
    curl.setopt(pycurl.WRITEFUNCTION, stream_buffer.write)
    curl.setopt(pycurl.CUSTOMREQUEST,"POST")
    # curl.setopt(pycurl.CONNECTTIMEOUT, 30)
    # curl.setopt(pycurl.TIMEOUT, 30)
    curl.perform()
    page_data = stream_buffer.getvalue()
    stream_buffer.close()
    return page_data
项目:check_wan_latency    作者:computingbee    | 项目源码 | 文件源码
def getlat4city():
    avglat = -1

    sp_url = "http://www.super-ping.com/ping.php?node=" + CITY + "&ping=" + WAN_IP
    sp_refer_url = "http://www.super-ping.com/?ping=" + WAN_IP + "&locale=en"
        sp_http_headers = [ 'Referer: ' + sp_refer_url, 'X-Requested-With: XMLHttpRequest']

    crl = pyc.Curl()
    sio = StringIO()

    crl.setopt(pyc.URL, sp_url)
        crl.setopt(pyc.HTTPHEADER, sp_http_headers)
    crl.setopt(pyc.WRITEFUNCTION, sio.write)
    crl.perform()
    crl.close()

    lat_http_result = sio.getvalue() #process http result only if html
    if lat_http_result.strip() != "-" and lat_http_result.strip() != "super-ping.com":
        fstring="ping-avg'>"
        lstring="</div>"
        start = lat_http_result.index(fstring) + len(fstring)
        end = lat_http_result.index(lstring,start)
        avglat = lat_http_result[start:end]

    return float(avglat)
项目:hzlgithub    作者:hzlRises    | 项目源码 | 文件源码
def Curl(url,headers):
    while 1:
        try:
            c = pycurl.Curl()
            c.setopt(pycurl.REFERER, 'http://weixin.sogou.com/')
            c.setopt(pycurl.FOLLOWLOCATION, True)
            c.setopt(pycurl.MAXREDIRS,5)
            c.setopt(pycurl.CONNECTTIMEOUT, 60)
            c.setopt(pycurl.TIMEOUT,120)
            c.setopt(pycurl.ENCODING, 'gzip,deflate')
            c.fp = StringIO.StringIO()  
            c.setopt(pycurl.URL, url)
            c.setopt(pycurl.HTTPHEADER,headers)
            c.setopt(c.WRITEFUNCTION, c.fp.write)
            c.perform()
            html = c.fp.getvalue()
            if '??????' in html:
                print u'??????,??10??'
                time.sleep(600)
            else:
                return html
        except Exception, e:
            print url,'curl(url)',e
            continue 
#????????
项目:hzlgithub    作者:hzlRises    | 项目源码 | 文件源码
def getHtml(url,headers):
    c = pycurl.Curl()                           #??curl????????
    c.setopt(pycurl.URL, url)                   #??????URL
    c.setopt(pycurl.FOLLOWLOCATION, True)       #????????
    c.setopt(pycurl.MAXREDIRS,5)                #?????????
    c.setopt(pycurl.CONNECTTIMEOUT, 60)         #??????
    c.setopt(pycurl.TIMEOUT,120)                #????
    c.setopt(pycurl.ENCODING, 'gzip,deflate')   #??gzip???????????????????gzip?????????gzip??????
    c.fp = StringIO.StringIO()                  #??StringIO??
    c.setopt(pycurl.HTTPHEADER,headers)         #?????
    c.setopt(pycurl.POST, 1)                    #??get
    c.setopt(pycurl.POSTFIELDS, data)           #??POST??
    c.setopt(c.WRITEFUNCTION, c.fp.write)       #???????
    c.perform()                                 #?? 
    html = c.fp.getvalue()                      #?????
    return html
项目:hzlgithub    作者:hzlRises    | 项目源码 | 文件源码
def curl(url, debug=False, **kwargs):
        while 1:
                try:
                        s = StringIO.StringIO()
                        c = pycurl.Curl()
                        c.setopt(pycurl.URL, url)
                        c.setopt(pycurl.REFERER, url)
                        c.setopt(pycurl.FOLLOWLOCATION, True)
                        c.setopt(pycurl.TIMEOUT, 60)
                        c.setopt(pycurl.ENCODING, 'gzip')
                        c.setopt(pycurl.USERAGENT, 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_0) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/31.0.1650.63 Safari/537.36')
                        c.setopt(pycurl.NOSIGNAL, True)
                        c.setopt(pycurl.WRITEFUNCTION, s.write)
                        for k, v in kwargs.iteritems():
                                c.setopt(vars(pycurl)[k], v)
                        c.perform()
                        c.close()
                        return s.getvalue()
                except:
                        if debug:
                                raise
                        continue
项目:multiplierz    作者:BlaisProteomics    | 项目源码 | 文件源码
def scan_info(self, start_time, stop_time=0, start_mz=0, stop_mz=99999):
        """Gets a list of [(time, mz, scan_name, scan_type, scan_mode)] in the time and mz range provided

        scan_name = the URL of the scan (goes to the HTML version)

        All full MS scans that fall within the time range are included.
        Only MS/MS scans that fall within the mz range (optional) are included

        Example:
        >>> scan_info = my_peakfile.scan_info(30.0, 35.0, 435.82, 436.00)
        """
        if stop_time == 0:
            stop_time = start_time

        return [(t,mz,sn,st,sm) for t,mz,sn,st,sm in self.scans()
                if start_time <= t <= stop_time and (st == 'MS1' or start_mz <= mz <= stop_mz)]
项目:multiplierz    作者:BlaisProteomics    | 项目源码 | 文件源码
def xic(self, start_time, stop_time, start_mz, stop_mz, filter=None):
        xic_url = str(self.data_file + ('/ric/%s-%s/%s-%s.txt' % (start_time, stop_time,
                                                                  start_mz, stop_mz)))

        self.crl.setopt(pycurl.HTTPGET, True)
        self.crl.setopt(pycurl.URL, xic_url)

        response = cStringIO.StringIO()
        self.crl.setopt(pycurl.WRITEFUNCTION, response.write)

        for i in range(5):
            #print 'xic %d' % i
            self.crl.perform()
            if response.getvalue():
                break

        scan = response.getvalue().splitlines()

        return [tuple(float(v) for v in s.split()) for s in scan]
项目:micro-blog    作者:nickChenyx    | 项目源码 | 文件源码
def fetch(self, url, body=None, headers=None):
        if not _allowedURL(url):
            raise ValueError('Bad URL scheme: %r' % (url,))

        if headers is None:
            headers = {}

        headers.setdefault(
            'User-Agent',
            "%s Python-urllib/%s" % (USER_AGENT, urllib2.__version__,))

        req = urllib2.Request(url, data=body, headers=headers)
        try:
            f = self.urlopen(req)
            try:
                return self._makeResponse(f)
            finally:
                f.close()
        except urllib2.HTTPError, why:
            try:
                return self._makeResponse(why)
            finally:
                why.close()
项目:tools    作者:chenshiyang2015    | 项目源码 | 文件源码
def test_gzip(url): 



        t = Test() 
        c = pycurl.Curl()  
        c.setopt(pycurl.WRITEFUNCTION,t.callback)
        c.setopt(pycurl.ENCODING, 'gzip')
        c.setopt(pycurl.URL,url) 
        c.setopt(pycurl.USERAGENT,"User-Agent':'EMAO_OPS_MONITOR) Gecko/20091201 Firefox/3.5.6)")
        c.perform()   

        TOTAL_TIME = c.getinfo(c.TOTAL_TIME)


        #print "????????%.2f ms" %(TOTAL_TIME*1000) 
        return TOTAL_TIME * 1000