Python pycurl 模块,WRITEDATA 实例源码

我们从Python开源项目中,提取了以下16个代码示例,用于说明如何使用pycurl.WRITEDATA

项目:ceiba-dl    作者:lantw44    | 项目源码 | 文件源码
def file(self, path, output, args={}, progress_callback=lambda *x: None):
        self.logger.debug('??????????')
        self.web_cache[path] = dict(args)
        url = urllib.parse.urljoin(self.file_url, urllib.parse.quote(path))
        if len(args) > 0:
            url += '?' + urllib.parse.urlencode(args)
        self.logger.debug('HTTP ?????{}'.format(url))
        self.curl.setopt(pycurl.URL, url)
        self.curl.setopt(pycurl.COOKIE, self.web_cookie)
        self.curl.setopt(pycurl.NOBODY, False)
        self.curl.setopt(pycurl.NOPROGRESS, False)
        self.curl.setopt(pycurl.WRITEDATA, output)
        self.curl.setopt(pycurl.HEADERFUNCTION, lambda *x: None)
        self.curl.setopt(pycurl.XFERINFOFUNCTION, progress_callback)
        self.curl.perform()
        status = self.curl.getinfo(pycurl.RESPONSE_CODE)
        if status != 200:
            raise ServerError(status)
项目:ceiba-dl    作者:lantw44    | 项目源码 | 文件源码
def file_size(self, path, args={}):
        self.logger.debug('????????????')
        self.web_cache[path] = dict(args)
        url = urllib.parse.urljoin(self.file_url, urllib.parse.quote(path))
        if len(args) > 0:
            url += '?' + urllib.parse.urlencode(args)
        self.logger.debug('HTTP ?????{}'.format(url))
        self.curl.setopt(pycurl.URL, url)
        self.curl.setopt(pycurl.COOKIE, self.web_cookie)
        self.curl.setopt(pycurl.NOBODY, True)
        self.curl.setopt(pycurl.NOPROGRESS, True)
        self.curl.setopt(pycurl.WRITEDATA, io.BytesIO())
        self.curl.setopt(pycurl.HEADERFUNCTION, lambda *x: None)
        self.curl.setopt(pycurl.XFERINFOFUNCTION, lambda *x: None)
        self.curl.perform()
        status = self.curl.getinfo(pycurl.RESPONSE_CODE)
        if status != 200:
            raise ServerError(status)
        return self.curl.getinfo(pycurl.CONTENT_LENGTH_DOWNLOAD)
项目:ceiba-dl    作者:lantw44    | 项目源码 | 文件源码
def web_redirect(self, path, args={}):
        self.logger.debug('????????????')
        self.web_cache[path] = dict(args)
        url = urllib.parse.urljoin(self.web_url, urllib.parse.quote(path))
        if len(args) > 0:
            url += '?' + urllib.parse.urlencode(args)
        self.logger.debug('HTTP ?????{}'.format(url))
        headers = io.BytesIO()
        self.curl.setopt(pycurl.URL, url)
        self.curl.setopt(pycurl.COOKIE, self.web_cookie)
        self.curl.setopt(pycurl.NOBODY, False)
        self.curl.setopt(pycurl.NOPROGRESS, True)
        self.curl.setopt(pycurl.WRITEDATA, NoneIO())
        self.curl.setopt(pycurl.HEADERFUNCTION, headers.write)
        self.curl.setopt(pycurl.XFERINFOFUNCTION, lambda *x: None)
        self.curl.perform()
        status = self.curl.getinfo(pycurl.RESPONSE_CODE)
        if status != 302:
            raise ServerError(status)
        for header_line in headers.getvalue().split(b'\r\n'):
            if header_line.startswith(b'Location:'):
                return header_line.split(b':', maxsplit=1)[1].strip().decode()
        return None
项目:necrobot    作者:incnone    | 项目源码 | 文件源码
def end_all_async_unsafe(self):
        if not Config.RECORDING_ACTIVATED:
            return

        for rtmp_name in self._recording_rtmps:
            curl = pycurl.Curl()
            try:
                self._set_def_curl_opts(curl)

                curl.setopt(pycurl.URL, self._end_url(rtmp_name))
                curl.setopt(pycurl.WRITEDATA, self._end_buffer)

                curl.perform()
            except pycurl.error as e:
                console.warning(
                    'Pycurl error in end_all() for racer <{0}>: Tried to curl <{1}>. Error {2}.'.format(
                        rtmp_name,
                        self._end_url(rtmp_name),
                        e))
            finally:
                curl.close()

        self._recording_rtmps.clear()
项目:necrobot    作者:incnone    | 项目源码 | 文件源码
def _end_record_nolock(self, rtmp_name):
        rtmp_name = rtmp_name.lower()
        if rtmp_name not in self._recording_rtmps:
            return

        curl = pycurl.Curl()
        try:
            self._set_def_curl_opts(curl)
            curl.setopt(pycurl.URL, self._end_url(rtmp_name))
            curl.setopt(pycurl.WRITEDATA, self._end_buffer)

            curl.perform()
            self._recording_rtmps = [r for r in self._recording_rtmps if r != rtmp_name]
        except pycurl.error as e:
            console.warning(
                'Pycurl error in end_record({0}): Tried to curl <{1}>. Error {2}.'.format(
                    rtmp_name,
                    self._end_url(rtmp_name),
                    e))
        finally:
            curl.close()
项目:cntv-video-downloader    作者:cls1991    | 项目源码 | 文件源码
def get_html(url, user_agent, refer_url):
    """
    curl html
    :param url:
    :param user_agent:
    :param refer_url:
    :return:
    """
    curl = pycurl.Curl()
    curl.setopt(pycurl.USERAGENT, user_agent)
    curl.setopt(pycurl.REFERER, refer_url)

    buffers = StringIO()
    curl.setopt(pycurl.URL, url)
    curl.setopt(pycurl.WRITEDATA, buffers)
    curl.perform()
    body = buffers.getvalue()
    buffers.close()
    curl.close()

    return body
项目:baidu-dropbox-sniffer    作者:cls1991    | 项目源码 | 文件源码
def get_download_link(fs_id):
    """
    ??????
    :param fs_id:
    :return:
    """
    curl = pycurl.Curl()
    curl.setopt(pycurl.USERAGENT, const.USER_AGENT)
    curl.setopt(pycurl.REFERER, const.PAN_REFER_URL)

    buffers = StringIO()
    request_dict = {
        'channel': 'chunlei',
        'timestamp': '1473685224',
        'fidlist': [fs_id],
        'type': 'dlink',
        'web': 1,
        'clienttype': 0,
        'bdstoken': 'e0e895bb3ef7b0cb70899ee66b74e809',
        'sign': decode_sign(parse_sign2('d76e889b6aafd3087ac3bd56f4d4053a', '3545d271c5d07ba27355d39da0c62a4ee06d2d25'))
    }
    target_url = const.PAN_API_URL + 'download?' + urllib.urlencode(request_dict)
    curl.setopt(pycurl.URL, target_url)
    curl.setopt(pycurl.WRITEDATA, buffers)
    curl.setopt(pycurl.COOKIEFILE, "cookie.txt")
    curl.perform()
    body = buffers.getvalue()
    buffers.close()
    curl.close()
    data = json.loads(body)
    if data['errno']:
        return None

    return data['dlink'][0]['dlink']
项目:ceiba-dl    作者:lantw44    | 项目源码 | 文件源码
def api(self, args, encoding='utf-8', allow_return_none=False):
        self.logger.debug('???? API ??')
        if args.get('mode', '') == 'semester':
            semester = args.get('semester', '')
            if allow_return_none and self.api_cache == semester:
                self.logger.debug('????? {} ?? API ??'.format(semester))
                return
            self.api_cache = semester
        query_args = dict()
        query_args.update(self.api_args)
        query_args.update(args)
        url = self.api_url + '?' + urllib.parse.urlencode(query_args)
        data = io.BytesIO()
        self.logger.debug('HTTP ?????{}'.format(url))
        self.curl.setopt(pycurl.URL, url)
        self.curl.setopt(pycurl.COOKIE, self.api_cookie)
        self.curl.setopt(pycurl.NOBODY, False)
        self.curl.setopt(pycurl.NOPROGRESS, True)
        self.curl.setopt(pycurl.WRITEDATA, data)
        self.curl.setopt(pycurl.HEADERFUNCTION, lambda *x: None)
        self.curl.setopt(pycurl.XFERINFOFUNCTION, lambda *x: None)
        self.curl.perform()
        status = self.curl.getinfo(pycurl.RESPONSE_CODE)
        if status != 200:
            raise ServerError(status)
        try:
            value = data.getvalue()
            return json.loads(value.decode(encoding))
        except json.decoder.JSONDecodeError:
            raise NotJSONError(value.decode(encoding))
项目:ceiba-dl    作者:lantw44    | 项目源码 | 文件源码
def web(self, path, args={}, encoding=None, allow_return_none=False):
        self.logger.debug('????????')
        if allow_return_none:
            if path in self.web_cache and self.web_cache[path] == args:
                self.logger.debug('????? {} ????'.format(path))
                self.logger.debug('???{}'.format(args))
                return
        self.web_cache[path] = dict(args)
        url = urllib.parse.urljoin(self.web_url, urllib.parse.quote(path))
        if len(args) > 0:
            url += '?' + urllib.parse.urlencode(args)
        self.logger.debug('HTTP ?????{}'.format(url))
        data = io.BytesIO()
        self.curl.setopt(pycurl.URL, url)
        self.curl.setopt(pycurl.COOKIE, self.web_cookie)
        self.curl.setopt(pycurl.NOBODY, False)
        self.curl.setopt(pycurl.NOPROGRESS, True)
        self.curl.setopt(pycurl.WRITEDATA, data)
        self.curl.setopt(pycurl.HEADERFUNCTION, lambda *x: None)
        self.curl.setopt(pycurl.XFERINFOFUNCTION, lambda *x: None)
        self.curl.perform()
        status = self.curl.getinfo(pycurl.RESPONSE_CODE)
        if status != 200:
            raise ServerError(status)
        data.seek(io.SEEK_SET)
        return etree.parse(data, etree.HTMLParser(
            encoding=encoding, remove_comments=True))
项目:wa    作者:wateraccounting    | 项目源码 | 文件源码
def Download_ETens_from_WA_FTP(output_folder, Lat_tiles, Lon_tiles):           
    """
    This function retrieves ETensV1.0 data for a given date from the
    ftp.wateraccounting.unesco-ihe.org server.

    Restrictions:
    The data and this python file may not be distributed to others without
    permission of the WA+ team.

    Keyword arguments:
    output_folder -- Directory of the outputs
    Lat_tiles -- [Lat_min, Lat_max] Tile number of the max and min latitude tile number
    Lon_tiles -- [Lon_min, Lon_max] Tile number of the max and min longitude tile number                
    """      
    for v_tile in range(Lat_tiles[0], Lat_tiles[1]+1):
        for h_tile in range(Lon_tiles[0], Lon_tiles[1]+1):                                                  

            Tilename = "h%sv%s.zip" %(h_tile, v_tile)
            if not os.path.exists(os.path.join(output_folder,Tilename)): 
                try:  
                    # Collect account and FTP information           
                    username, password = WebAccounts.Accounts(Type = 'FTP_WA')
                    FTP_name = "ftp://ftp.wateraccounting.unesco-ihe.org//WaterAccounting_Guest/ETensV1.0/%s" % Tilename
                    local_filename = os.path.join(output_folder, Tilename)   

                    # Download data from FTP    
                    curl = pycurl.Curl()
                    curl.setopt(pycurl.URL, FTP_name)   
                    curl.setopt(pycurl.USERPWD, '%s:%s' %(username, password))                              
                    fp = open(local_filename, "wb")                              
                    curl.setopt(pycurl.WRITEDATA, fp)
                    curl.perform()
                    curl.close()
                    fp.close()  

                except:
                    print "tile %s is not found and will be replaced by NaN values" % Tilename

    return()
项目:necrobot    作者:incnone    | 项目源码 | 文件源码
def _start_record_nolock(self, rtmp_name):
        rtmp_name = rtmp_name.lower()
        if rtmp_name in self._recording_rtmps:
            self._end_record_nolock(rtmp_name)
        if rtmp_name in self._recording_rtmps:
            console.warning(
                'Error: tried to start a recording of racer <{0}>, but failed to end a previously '
                'started recording.'.format(rtmp_name))
            return None

        curl = pycurl.Curl()
        try:
            new_buffer = BytesIO()
            self._vodstart_buffers[rtmp_name] = new_buffer

            self._set_def_curl_opts(curl)
            curl.setopt(pycurl.URL, self._start_url(rtmp_name))
            curl.setopt(pycurl.WRITEDATA, new_buffer)

            curl.perform()
            self._recording_rtmps.append(rtmp_name)
        except pycurl.error as e:
            console.warning(
                'Pycurl error in start_record({0}): Tried to curl <{1}>. Error: {2}.'.format(
                    rtmp_name,
                    self._start_url(rtmp_name),
                    e))
        finally:
            curl.close()
项目:madodl    作者:miezak    | 项目源码 | 文件源码
def curl_common_init(buf):
    handle = pycurl.Curl()

    handle.setopt(pycurl.WRITEDATA, buf)
    handle.setopt(pycurl.HEADERFUNCTION, curl_hdr)
    handle.setopt(pycurl.DEBUGFUNCTION, curl_debug)

    handle.setopt(pycurl.USERPWD, '{}:{}'.format(_g.conf._user,_g.conf._pass))

    handle.setopt(pycurl.FOLLOWLOCATION, True)

    # avoid FTP CWD for fastest directory transversal
    handle.setopt(pycurl.FTP_FILEMETHOD, pycurl.FTPMETHOD_NOCWD)

    # we always set this flag and let the logging module
    # handle filtering.
    handle.setopt(pycurl.VERBOSE, True)

    # use ipv4 for VPNs
    handle.setopt(pycurl.IPRESOLVE, pycurl.IPRESOLVE_V4)
    handle.setopt(pycurl.USE_SSL, True)
    handle.setopt(pycurl.SSL_VERIFYPEER, False)
    # XXX
    handle.setopt(pycurl.SSL_VERIFYHOST, 0)

    return handle
项目:LinuxBashShellScriptForOps    作者:DingGuodong    | 项目源码 | 文件源码
def set_hosts_file(hosts="/etc/hosts"):
    import socket
    if not os.path.exists(hosts):
        if not os.path.exists(os.path.dirname(hosts)):
            os.makedirs(os.path.dirname(hosts))

    with open(hosts, "w") as f:
        hosts_url = "https://raw.githubusercontent.com/racaljk/hosts/master/hosts"
        conn = requests.head(hosts_url)
        if conn.status_code != 200:
            hosts_url = "https://coding.net/u/scaffrey/p/hosts/git/raw/master/hosts"
        curl = pycurl.Curl()
        curl.setopt(pycurl.URL, hosts_url)
        curl.setopt(pycurl.CAINFO, certifi.where())
        curl.setopt(pycurl.WRITEDATA, f)
        curl.perform()
        curl.close()

    hostname = socket.gethostname()  # socket.getfqdn()
    print hostname
    try:
        ip = socket.gethostbyname(socket.gethostname())  # TODO(Guodong Ding) Ubuntu not passed here, but CentOS passed!
    except Exception as _:
        del _
        ip = None
    with open(hosts, "a") as f:
        if ip is not None:
            appended_content = "\n" + "127.0.0.1 " + hostname + "\n" + ip + " " + hostname + "\n"
        else:
            appended_content = "\n" + "127.0.0.1 " + hostname + "\n"
        f.write(appended_content)
项目:baidu-dropbox-sniffer    作者:cls1991    | 项目源码 | 文件源码
def list_dir(dir_name):
    """
    ????????
    :param dir_name: ??
    :return:
    """
    result = list()
    curl = pycurl.Curl()
    curl.setopt(pycurl.USERAGENT, const.USER_AGENT)
    curl.setopt(pycurl.REFERER, const.PAN_REFER_URL)

    buffers = StringIO()
    request_dict = {
        'channel': 'chunlei',
        'clienttype': 0,
        'showempty': 0,
        'web': 1,
        'order': 'time',
        'desc': 1,
        'page': 1,
        'num': 100,
        'dir': dir_name,
        'bdstoken': 'e0e895bb3ef7b0cb70899ee66b74e809'
    }
    target_url = const.PAN_API_URL + 'list?' + urllib.urlencode(request_dict)
    curl.setopt(pycurl.URL, target_url)
    curl.setopt(pycurl.WRITEDATA, buffers)
    curl.setopt(pycurl.COOKIEFILE, "cookie.txt")
    curl.perform()
    body = buffers.getvalue()
    print body
    buffers.close()
    curl.close()
    data = json.loads(body)
    if data['errno'] == 0:
        for a_list in data['list']:
            dlink = get_download_link(a_list['fs_id'])
            if dlink:
                dlink = dlink.replace('\\', '')
                result.append(dlink)

    return result
项目:wa    作者:wateraccounting    | 项目源码 | 文件源码
def Download_data(Date, Version, output_folder, Var):        
    """
    This function downloads CFSR data from the FTP server
                For - CFSR:    ftp://nomads.ncdc.noaa.gov/CFSR/HP_time_series/
                    - CFSRv2:  http://nomads.ncdc.noaa.gov/modeldata/cfsv2_analysis_timeseries/

    Keyword arguments:
    Date -- pandas timestamp day
    Version -- 1 or 2 (1 = CFSR, 2 = CFSRv2)            
    output_folder -- The directory for storing the downloaded files
    Var -- The variable that must be downloaded from the server ('dlwsfc','uswsfc','dswsfc','ulwsfc')
    """
    # Define the filename that must be downloaded     
    if Version == 1:
        filename = Var + '.gdas.' + str(Date.strftime('%Y')) + str(Date.strftime('%m')) + '.grb2'
    if Version == 2:
        filename = Var + '.gdas.' + str(Date.strftime('%Y')) + str(Date.strftime('%m')) + '.grib2'

    try:
         # download the file when it not exist                  
        local_filename = os.path.join(output_folder, filename)
        if not os.path.exists(local_filename):
            Downloaded = 0
            Times = 0
            while Downloaded == 0:              
                # Create the command and run the command in cmd
                if Version == 1:
                    FTP_name = 'ftp://nomads.ncdc.noaa.gov/CFSR/HP_time_series/' + Date.strftime('%Y') + Date.strftime('%m')+ '/' + filename

                if Version == 2:
                    FTP_name = 'https://nomads.ncdc.noaa.gov/modeldata/cfsv2_analysis_timeseries/' + Date.strftime('%Y') + '/' + Date.strftime('%Y') + Date.strftime('%m')+ '/' + filename

                curl = pycurl.Curl()
                curl.setopt(pycurl.URL, FTP_name)
                fp = open(local_filename, "wb")
                curl.setopt(pycurl.SSL_VERIFYPEER, 0)
                curl.setopt(pycurl.SSL_VERIFYHOST, 0)
                curl.setopt(pycurl.WRITEDATA, fp)
                curl.perform()
                curl.close()
                fp.close()  
                statinfo = os.stat(local_filename)      
                if int(statinfo.st_size) > 10000:
                    Downloaded = 1  
                else:
                    Times += 1
                    if Times == 10:
                        Downloaded = 1
    except:
        print 'Was not able to download the CFSR file from the FTP server'

    return(local_filename)
项目:baidu-pictures-downloader    作者:cls1991    | 项目源码 | 文件源码
def get_dlinks(search_target, get_dlinks_only=True):
    """
    ????url???????
    :param search_target: ????
    :param get_dlinks_only: ??????
    :return ???????????
    """
    refer_url = const.REFER_URL % search_target
    curl = pycurl.Curl()
    curl.setopt(pycurl.USERAGENT, const.USER_AGENT)
    curl.setopt(pycurl.REFERER, refer_url)

    result = []
    ll = 0
    record_start_cursor = get_record_start_cursor(const.CURSOR_FILE)
    if record_start_cursor:
        ll = int(record_start_cursor)
    print('start')
    # ??????????????
    while True:
        print('crawler pictures of page %d' % (ll / 30 + 1))
        # ??str?????
        buffers = StringIO()
        target_url = const.API_URL % (search_target, search_target, ll)
        curl.setopt(pycurl.URL, target_url)
        curl.setopt(pycurl.WRITEDATA, buffers)
        curl.perform()

        body = buffers.getvalue()
        body = body.replace('null', 'None')
        data = eval(body)
        if 'data' in data:
            has_data = False
            for a_data in data['data']:
                obj_url = None
                if 'objURL' in a_data:
                    obj_url = a_data['objURL']
                if obj_url:
                    has_data = True
                    result.append(obj_url)
            if not has_data:
                print('no more pic')
                break
            ll += 30
        else:
            print('no more pic')
            break

    print('done')
    curl.close()
    # ??page_num
    if ll:
        set_record_start_cursor(str(ll), const.CURSOR_FILE)
    for index, data in enumerate(result):
        result[index] = decode_url(data)

    if not get_dlinks_only:
        save_to_file(result, search_target + '.txt', const.BASE_FOLDER)