我们从Python开源项目中,提取了以下16个代码示例,用于说明如何使用pycurl.WRITEDATA。
def file(self, path, output, args={}, progress_callback=lambda *x: None): self.logger.debug('??????????') self.web_cache[path] = dict(args) url = urllib.parse.urljoin(self.file_url, urllib.parse.quote(path)) if len(args) > 0: url += '?' + urllib.parse.urlencode(args) self.logger.debug('HTTP ?????{}'.format(url)) self.curl.setopt(pycurl.URL, url) self.curl.setopt(pycurl.COOKIE, self.web_cookie) self.curl.setopt(pycurl.NOBODY, False) self.curl.setopt(pycurl.NOPROGRESS, False) self.curl.setopt(pycurl.WRITEDATA, output) self.curl.setopt(pycurl.HEADERFUNCTION, lambda *x: None) self.curl.setopt(pycurl.XFERINFOFUNCTION, progress_callback) self.curl.perform() status = self.curl.getinfo(pycurl.RESPONSE_CODE) if status != 200: raise ServerError(status)
def file_size(self, path, args={}): self.logger.debug('????????????') self.web_cache[path] = dict(args) url = urllib.parse.urljoin(self.file_url, urllib.parse.quote(path)) if len(args) > 0: url += '?' + urllib.parse.urlencode(args) self.logger.debug('HTTP ?????{}'.format(url)) self.curl.setopt(pycurl.URL, url) self.curl.setopt(pycurl.COOKIE, self.web_cookie) self.curl.setopt(pycurl.NOBODY, True) self.curl.setopt(pycurl.NOPROGRESS, True) self.curl.setopt(pycurl.WRITEDATA, io.BytesIO()) self.curl.setopt(pycurl.HEADERFUNCTION, lambda *x: None) self.curl.setopt(pycurl.XFERINFOFUNCTION, lambda *x: None) self.curl.perform() status = self.curl.getinfo(pycurl.RESPONSE_CODE) if status != 200: raise ServerError(status) return self.curl.getinfo(pycurl.CONTENT_LENGTH_DOWNLOAD)
def web_redirect(self, path, args={}): self.logger.debug('????????????') self.web_cache[path] = dict(args) url = urllib.parse.urljoin(self.web_url, urllib.parse.quote(path)) if len(args) > 0: url += '?' + urllib.parse.urlencode(args) self.logger.debug('HTTP ?????{}'.format(url)) headers = io.BytesIO() self.curl.setopt(pycurl.URL, url) self.curl.setopt(pycurl.COOKIE, self.web_cookie) self.curl.setopt(pycurl.NOBODY, False) self.curl.setopt(pycurl.NOPROGRESS, True) self.curl.setopt(pycurl.WRITEDATA, NoneIO()) self.curl.setopt(pycurl.HEADERFUNCTION, headers.write) self.curl.setopt(pycurl.XFERINFOFUNCTION, lambda *x: None) self.curl.perform() status = self.curl.getinfo(pycurl.RESPONSE_CODE) if status != 302: raise ServerError(status) for header_line in headers.getvalue().split(b'\r\n'): if header_line.startswith(b'Location:'): return header_line.split(b':', maxsplit=1)[1].strip().decode() return None
def end_all_async_unsafe(self): if not Config.RECORDING_ACTIVATED: return for rtmp_name in self._recording_rtmps: curl = pycurl.Curl() try: self._set_def_curl_opts(curl) curl.setopt(pycurl.URL, self._end_url(rtmp_name)) curl.setopt(pycurl.WRITEDATA, self._end_buffer) curl.perform() except pycurl.error as e: console.warning( 'Pycurl error in end_all() for racer <{0}>: Tried to curl <{1}>. Error {2}.'.format( rtmp_name, self._end_url(rtmp_name), e)) finally: curl.close() self._recording_rtmps.clear()
def _end_record_nolock(self, rtmp_name): rtmp_name = rtmp_name.lower() if rtmp_name not in self._recording_rtmps: return curl = pycurl.Curl() try: self._set_def_curl_opts(curl) curl.setopt(pycurl.URL, self._end_url(rtmp_name)) curl.setopt(pycurl.WRITEDATA, self._end_buffer) curl.perform() self._recording_rtmps = [r for r in self._recording_rtmps if r != rtmp_name] except pycurl.error as e: console.warning( 'Pycurl error in end_record({0}): Tried to curl <{1}>. Error {2}.'.format( rtmp_name, self._end_url(rtmp_name), e)) finally: curl.close()
def get_html(url, user_agent, refer_url): """ curl html :param url: :param user_agent: :param refer_url: :return: """ curl = pycurl.Curl() curl.setopt(pycurl.USERAGENT, user_agent) curl.setopt(pycurl.REFERER, refer_url) buffers = StringIO() curl.setopt(pycurl.URL, url) curl.setopt(pycurl.WRITEDATA, buffers) curl.perform() body = buffers.getvalue() buffers.close() curl.close() return body
def get_download_link(fs_id): """ ?????? :param fs_id: :return: """ curl = pycurl.Curl() curl.setopt(pycurl.USERAGENT, const.USER_AGENT) curl.setopt(pycurl.REFERER, const.PAN_REFER_URL) buffers = StringIO() request_dict = { 'channel': 'chunlei', 'timestamp': '1473685224', 'fidlist': [fs_id], 'type': 'dlink', 'web': 1, 'clienttype': 0, 'bdstoken': 'e0e895bb3ef7b0cb70899ee66b74e809', 'sign': decode_sign(parse_sign2('d76e889b6aafd3087ac3bd56f4d4053a', '3545d271c5d07ba27355d39da0c62a4ee06d2d25')) } target_url = const.PAN_API_URL + 'download?' + urllib.urlencode(request_dict) curl.setopt(pycurl.URL, target_url) curl.setopt(pycurl.WRITEDATA, buffers) curl.setopt(pycurl.COOKIEFILE, "cookie.txt") curl.perform() body = buffers.getvalue() buffers.close() curl.close() data = json.loads(body) if data['errno']: return None return data['dlink'][0]['dlink']
def api(self, args, encoding='utf-8', allow_return_none=False): self.logger.debug('???? API ??') if args.get('mode', '') == 'semester': semester = args.get('semester', '') if allow_return_none and self.api_cache == semester: self.logger.debug('????? {} ?? API ??'.format(semester)) return self.api_cache = semester query_args = dict() query_args.update(self.api_args) query_args.update(args) url = self.api_url + '?' + urllib.parse.urlencode(query_args) data = io.BytesIO() self.logger.debug('HTTP ?????{}'.format(url)) self.curl.setopt(pycurl.URL, url) self.curl.setopt(pycurl.COOKIE, self.api_cookie) self.curl.setopt(pycurl.NOBODY, False) self.curl.setopt(pycurl.NOPROGRESS, True) self.curl.setopt(pycurl.WRITEDATA, data) self.curl.setopt(pycurl.HEADERFUNCTION, lambda *x: None) self.curl.setopt(pycurl.XFERINFOFUNCTION, lambda *x: None) self.curl.perform() status = self.curl.getinfo(pycurl.RESPONSE_CODE) if status != 200: raise ServerError(status) try: value = data.getvalue() return json.loads(value.decode(encoding)) except json.decoder.JSONDecodeError: raise NotJSONError(value.decode(encoding))
def web(self, path, args={}, encoding=None, allow_return_none=False): self.logger.debug('????????') if allow_return_none: if path in self.web_cache and self.web_cache[path] == args: self.logger.debug('????? {} ????'.format(path)) self.logger.debug('???{}'.format(args)) return self.web_cache[path] = dict(args) url = urllib.parse.urljoin(self.web_url, urllib.parse.quote(path)) if len(args) > 0: url += '?' + urllib.parse.urlencode(args) self.logger.debug('HTTP ?????{}'.format(url)) data = io.BytesIO() self.curl.setopt(pycurl.URL, url) self.curl.setopt(pycurl.COOKIE, self.web_cookie) self.curl.setopt(pycurl.NOBODY, False) self.curl.setopt(pycurl.NOPROGRESS, True) self.curl.setopt(pycurl.WRITEDATA, data) self.curl.setopt(pycurl.HEADERFUNCTION, lambda *x: None) self.curl.setopt(pycurl.XFERINFOFUNCTION, lambda *x: None) self.curl.perform() status = self.curl.getinfo(pycurl.RESPONSE_CODE) if status != 200: raise ServerError(status) data.seek(io.SEEK_SET) return etree.parse(data, etree.HTMLParser( encoding=encoding, remove_comments=True))
def Download_ETens_from_WA_FTP(output_folder, Lat_tiles, Lon_tiles): """ This function retrieves ETensV1.0 data for a given date from the ftp.wateraccounting.unesco-ihe.org server. Restrictions: The data and this python file may not be distributed to others without permission of the WA+ team. Keyword arguments: output_folder -- Directory of the outputs Lat_tiles -- [Lat_min, Lat_max] Tile number of the max and min latitude tile number Lon_tiles -- [Lon_min, Lon_max] Tile number of the max and min longitude tile number """ for v_tile in range(Lat_tiles[0], Lat_tiles[1]+1): for h_tile in range(Lon_tiles[0], Lon_tiles[1]+1): Tilename = "h%sv%s.zip" %(h_tile, v_tile) if not os.path.exists(os.path.join(output_folder,Tilename)): try: # Collect account and FTP information username, password = WebAccounts.Accounts(Type = 'FTP_WA') FTP_name = "ftp://ftp.wateraccounting.unesco-ihe.org//WaterAccounting_Guest/ETensV1.0/%s" % Tilename local_filename = os.path.join(output_folder, Tilename) # Download data from FTP curl = pycurl.Curl() curl.setopt(pycurl.URL, FTP_name) curl.setopt(pycurl.USERPWD, '%s:%s' %(username, password)) fp = open(local_filename, "wb") curl.setopt(pycurl.WRITEDATA, fp) curl.perform() curl.close() fp.close() except: print "tile %s is not found and will be replaced by NaN values" % Tilename return()
def _start_record_nolock(self, rtmp_name): rtmp_name = rtmp_name.lower() if rtmp_name in self._recording_rtmps: self._end_record_nolock(rtmp_name) if rtmp_name in self._recording_rtmps: console.warning( 'Error: tried to start a recording of racer <{0}>, but failed to end a previously ' 'started recording.'.format(rtmp_name)) return None curl = pycurl.Curl() try: new_buffer = BytesIO() self._vodstart_buffers[rtmp_name] = new_buffer self._set_def_curl_opts(curl) curl.setopt(pycurl.URL, self._start_url(rtmp_name)) curl.setopt(pycurl.WRITEDATA, new_buffer) curl.perform() self._recording_rtmps.append(rtmp_name) except pycurl.error as e: console.warning( 'Pycurl error in start_record({0}): Tried to curl <{1}>. Error: {2}.'.format( rtmp_name, self._start_url(rtmp_name), e)) finally: curl.close()
def curl_common_init(buf): handle = pycurl.Curl() handle.setopt(pycurl.WRITEDATA, buf) handle.setopt(pycurl.HEADERFUNCTION, curl_hdr) handle.setopt(pycurl.DEBUGFUNCTION, curl_debug) handle.setopt(pycurl.USERPWD, '{}:{}'.format(_g.conf._user,_g.conf._pass)) handle.setopt(pycurl.FOLLOWLOCATION, True) # avoid FTP CWD for fastest directory transversal handle.setopt(pycurl.FTP_FILEMETHOD, pycurl.FTPMETHOD_NOCWD) # we always set this flag and let the logging module # handle filtering. handle.setopt(pycurl.VERBOSE, True) # use ipv4 for VPNs handle.setopt(pycurl.IPRESOLVE, pycurl.IPRESOLVE_V4) handle.setopt(pycurl.USE_SSL, True) handle.setopt(pycurl.SSL_VERIFYPEER, False) # XXX handle.setopt(pycurl.SSL_VERIFYHOST, 0) return handle
def set_hosts_file(hosts="/etc/hosts"): import socket if not os.path.exists(hosts): if not os.path.exists(os.path.dirname(hosts)): os.makedirs(os.path.dirname(hosts)) with open(hosts, "w") as f: hosts_url = "https://raw.githubusercontent.com/racaljk/hosts/master/hosts" conn = requests.head(hosts_url) if conn.status_code != 200: hosts_url = "https://coding.net/u/scaffrey/p/hosts/git/raw/master/hosts" curl = pycurl.Curl() curl.setopt(pycurl.URL, hosts_url) curl.setopt(pycurl.CAINFO, certifi.where()) curl.setopt(pycurl.WRITEDATA, f) curl.perform() curl.close() hostname = socket.gethostname() # socket.getfqdn() print hostname try: ip = socket.gethostbyname(socket.gethostname()) # TODO(Guodong Ding) Ubuntu not passed here, but CentOS passed! except Exception as _: del _ ip = None with open(hosts, "a") as f: if ip is not None: appended_content = "\n" + "127.0.0.1 " + hostname + "\n" + ip + " " + hostname + "\n" else: appended_content = "\n" + "127.0.0.1 " + hostname + "\n" f.write(appended_content)
def list_dir(dir_name): """ ???????? :param dir_name: ?? :return: """ result = list() curl = pycurl.Curl() curl.setopt(pycurl.USERAGENT, const.USER_AGENT) curl.setopt(pycurl.REFERER, const.PAN_REFER_URL) buffers = StringIO() request_dict = { 'channel': 'chunlei', 'clienttype': 0, 'showempty': 0, 'web': 1, 'order': 'time', 'desc': 1, 'page': 1, 'num': 100, 'dir': dir_name, 'bdstoken': 'e0e895bb3ef7b0cb70899ee66b74e809' } target_url = const.PAN_API_URL + 'list?' + urllib.urlencode(request_dict) curl.setopt(pycurl.URL, target_url) curl.setopt(pycurl.WRITEDATA, buffers) curl.setopt(pycurl.COOKIEFILE, "cookie.txt") curl.perform() body = buffers.getvalue() print body buffers.close() curl.close() data = json.loads(body) if data['errno'] == 0: for a_list in data['list']: dlink = get_download_link(a_list['fs_id']) if dlink: dlink = dlink.replace('\\', '') result.append(dlink) return result
def Download_data(Date, Version, output_folder, Var): """ This function downloads CFSR data from the FTP server For - CFSR: ftp://nomads.ncdc.noaa.gov/CFSR/HP_time_series/ - CFSRv2: http://nomads.ncdc.noaa.gov/modeldata/cfsv2_analysis_timeseries/ Keyword arguments: Date -- pandas timestamp day Version -- 1 or 2 (1 = CFSR, 2 = CFSRv2) output_folder -- The directory for storing the downloaded files Var -- The variable that must be downloaded from the server ('dlwsfc','uswsfc','dswsfc','ulwsfc') """ # Define the filename that must be downloaded if Version == 1: filename = Var + '.gdas.' + str(Date.strftime('%Y')) + str(Date.strftime('%m')) + '.grb2' if Version == 2: filename = Var + '.gdas.' + str(Date.strftime('%Y')) + str(Date.strftime('%m')) + '.grib2' try: # download the file when it not exist local_filename = os.path.join(output_folder, filename) if not os.path.exists(local_filename): Downloaded = 0 Times = 0 while Downloaded == 0: # Create the command and run the command in cmd if Version == 1: FTP_name = 'ftp://nomads.ncdc.noaa.gov/CFSR/HP_time_series/' + Date.strftime('%Y') + Date.strftime('%m')+ '/' + filename if Version == 2: FTP_name = 'https://nomads.ncdc.noaa.gov/modeldata/cfsv2_analysis_timeseries/' + Date.strftime('%Y') + '/' + Date.strftime('%Y') + Date.strftime('%m')+ '/' + filename curl = pycurl.Curl() curl.setopt(pycurl.URL, FTP_name) fp = open(local_filename, "wb") curl.setopt(pycurl.SSL_VERIFYPEER, 0) curl.setopt(pycurl.SSL_VERIFYHOST, 0) curl.setopt(pycurl.WRITEDATA, fp) curl.perform() curl.close() fp.close() statinfo = os.stat(local_filename) if int(statinfo.st_size) > 10000: Downloaded = 1 else: Times += 1 if Times == 10: Downloaded = 1 except: print 'Was not able to download the CFSR file from the FTP server' return(local_filename)
def get_dlinks(search_target, get_dlinks_only=True): """ ????url??????? :param search_target: ???? :param get_dlinks_only: ?????? :return ??????????? """ refer_url = const.REFER_URL % search_target curl = pycurl.Curl() curl.setopt(pycurl.USERAGENT, const.USER_AGENT) curl.setopt(pycurl.REFERER, refer_url) result = [] ll = 0 record_start_cursor = get_record_start_cursor(const.CURSOR_FILE) if record_start_cursor: ll = int(record_start_cursor) print('start') # ?????????????? while True: print('crawler pictures of page %d' % (ll / 30 + 1)) # ??str????? buffers = StringIO() target_url = const.API_URL % (search_target, search_target, ll) curl.setopt(pycurl.URL, target_url) curl.setopt(pycurl.WRITEDATA, buffers) curl.perform() body = buffers.getvalue() body = body.replace('null', 'None') data = eval(body) if 'data' in data: has_data = False for a_data in data['data']: obj_url = None if 'objURL' in a_data: obj_url = a_data['objURL'] if obj_url: has_data = True result.append(obj_url) if not has_data: print('no more pic') break ll += 30 else: print('no more pic') break print('done') curl.close() # ??page_num if ll: set_record_start_cursor(str(ll), const.CURSOR_FILE) for index, data in enumerate(result): result[index] = decode_url(data) if not get_dlinks_only: save_to_file(result, search_target + '.txt', const.BASE_FOLDER)