我们从Python开源项目中,提取了以下21个代码示例,用于说明如何使用pycurl.NOSIGNAL。
def get_connection(): # pycurl initialization h = pycurl.Curl() # follow redirects h.setopt(pycurl.FOLLOWLOCATION, False) # enable compression h.setopt(pycurl.ENCODING, 'gzip, deflate') # certifi h.setopt(pycurl.CAINFO, certifi.where()) # no signal h.setopt(pycurl.NOSIGNAL, 1) # certificate informations h.setopt(pycurl.OPT_CERTINFO, 1) return h
def test_post(self): curl = CurlStub(b"result") result = fetch("http://example.com", post=True, curl=curl) self.assertEqual(result, b"result") self.assertEqual(curl.options, {pycurl.URL: b"http://example.com", pycurl.FOLLOWLOCATION: 1, pycurl.MAXREDIRS: 5, pycurl.CONNECTTIMEOUT: 30, pycurl.LOW_SPEED_LIMIT: 1, pycurl.LOW_SPEED_TIME: 600, pycurl.NOSIGNAL: 1, pycurl.WRITEFUNCTION: Any(), pycurl.POST: True, pycurl.DNS_CACHE_TIMEOUT: 0, pycurl.ENCODING: b"gzip,deflate"})
def test_post_data(self): curl = CurlStub(b"result") result = fetch("http://example.com", post=True, data="data", curl=curl) self.assertEqual(result, b"result") self.assertEqual(curl.options[pycurl.READFUNCTION](), b"data") self.assertEqual(curl.options, {pycurl.URL: b"http://example.com", pycurl.FOLLOWLOCATION: 1, pycurl.MAXREDIRS: 5, pycurl.CONNECTTIMEOUT: 30, pycurl.LOW_SPEED_LIMIT: 1, pycurl.LOW_SPEED_TIME: 600, pycurl.NOSIGNAL: 1, pycurl.WRITEFUNCTION: Any(), pycurl.POST: True, pycurl.POSTFIELDSIZE: 4, pycurl.READFUNCTION: Any(), pycurl.DNS_CACHE_TIMEOUT: 0, pycurl.ENCODING: b"gzip,deflate"})
def test_cainfo(self): curl = CurlStub(b"result") result = fetch("https://example.com", cainfo="cainfo", curl=curl) self.assertEqual(result, b"result") self.assertEqual(curl.options, {pycurl.URL: b"https://example.com", pycurl.FOLLOWLOCATION: 1, pycurl.MAXREDIRS: 5, pycurl.CONNECTTIMEOUT: 30, pycurl.LOW_SPEED_LIMIT: 1, pycurl.LOW_SPEED_TIME: 600, pycurl.NOSIGNAL: 1, pycurl.WRITEFUNCTION: Any(), pycurl.CAINFO: b"cainfo", pycurl.DNS_CACHE_TIMEOUT: 0, pycurl.ENCODING: b"gzip,deflate"})
def test_headers(self): curl = CurlStub(b"result") result = fetch("http://example.com", headers={"a": "1", "b": "2"}, curl=curl) self.assertEqual(result, b"result") self.assertEqual(curl.options, {pycurl.URL: b"http://example.com", pycurl.FOLLOWLOCATION: 1, pycurl.MAXREDIRS: 5, pycurl.CONNECTTIMEOUT: 30, pycurl.LOW_SPEED_LIMIT: 1, pycurl.LOW_SPEED_TIME: 600, pycurl.NOSIGNAL: 1, pycurl.WRITEFUNCTION: Any(), pycurl.HTTPHEADER: ["a: 1", "b: 2"], pycurl.DNS_CACHE_TIMEOUT: 0, pycurl.ENCODING: b"gzip,deflate"})
def test_pycurl_insecure(self): curl = CurlStub(b"result") result = fetch("http://example.com/get-ca-cert", curl=curl, insecure=True) self.assertEqual(result, b"result") self.assertEqual(curl.options, {pycurl.URL: b"http://example.com/get-ca-cert", pycurl.FOLLOWLOCATION: 1, pycurl.MAXREDIRS: 5, pycurl.CONNECTTIMEOUT: 30, pycurl.LOW_SPEED_LIMIT: 1, pycurl.LOW_SPEED_TIME: 600, pycurl.NOSIGNAL: 1, pycurl.WRITEFUNCTION: Any(), pycurl.SSL_VERIFYPEER: False, pycurl.DNS_CACHE_TIMEOUT: 0, pycurl.ENCODING: b"gzip,deflate"})
def initHandle(self): """ sets common options to curl handle """ self.c.setopt(pycurl.FOLLOWLOCATION, 1) self.c.setopt(pycurl.MAXREDIRS, 5) self.c.setopt(pycurl.CONNECTTIMEOUT, 30) self.c.setopt(pycurl.NOSIGNAL, 1) self.c.setopt(pycurl.NOPROGRESS, 1) if hasattr(pycurl, "AUTOREFERER"): self.c.setopt(pycurl.AUTOREFERER, 1) self.c.setopt(pycurl.SSL_VERIFYPEER, 0) self.c.setopt(pycurl.LOW_SPEED_TIME, 30) self.c.setopt(pycurl.LOW_SPEED_LIMIT, 5) #self.c.setopt(pycurl.VERBOSE, 1) self.c.setopt(pycurl.USERAGENT, "Mozilla/5.0 (Windows NT 6.1; Win64; x64;en; rv:5.0) Gecko/20110619 Firefox/5.0") if pycurl.version_info()[7]: self.c.setopt(pycurl.ENCODING, "gzip, deflate") self.c.setopt(pycurl.HTTPHEADER, ["Accept: */*", "Accept-Language: en-US,en", "Accept-Charset: ISO-8859-1,utf-8;q=0.7,*;q=0.7", "Connection: keep-alive", "Keep-Alive: 300", "Expect:"])
def curl(url, debug=False, **kwargs): while 1: try: s = StringIO.StringIO() c = pycurl.Curl() c.setopt(pycurl.URL, url) c.setopt(pycurl.REFERER, url) c.setopt(pycurl.FOLLOWLOCATION, True) c.setopt(pycurl.TIMEOUT, 60) c.setopt(pycurl.ENCODING, 'gzip') c.setopt(pycurl.USERAGENT, 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_0) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/31.0.1650.63 Safari/537.36') c.setopt(pycurl.NOSIGNAL, True) c.setopt(pycurl.WRITEFUNCTION, s.write) for k, v in kwargs.iteritems(): c.setopt(vars(pycurl)[k], v) c.perform() c.close() return s.getvalue() except: if debug: raise continue
def test_basic(self): curl = CurlStub(b"result") result = fetch("http://example.com", curl=curl) self.assertEqual(result, b"result") self.assertEqual(curl.options, {pycurl.URL: b"http://example.com", pycurl.FOLLOWLOCATION: 1, pycurl.MAXREDIRS: 5, pycurl.CONNECTTIMEOUT: 30, pycurl.LOW_SPEED_LIMIT: 1, pycurl.LOW_SPEED_TIME: 600, pycurl.NOSIGNAL: 1, pycurl.WRITEFUNCTION: Any(), pycurl.DNS_CACHE_TIMEOUT: 0, pycurl.ENCODING: b"gzip,deflate"})
def test_create_curl(self): curls = [] def pycurl_Curl(): curl = CurlStub(b"result") curls.append(curl) return curl Curl = pycurl.Curl try: pycurl.Curl = pycurl_Curl result = fetch("http://example.com") curl = curls[0] self.assertEqual(result, b"result") self.assertEqual(curl.options, {pycurl.URL: b"http://example.com", pycurl.FOLLOWLOCATION: 1, pycurl.MAXREDIRS: 5, pycurl.CONNECTTIMEOUT: 30, pycurl.LOW_SPEED_LIMIT: 1, pycurl.LOW_SPEED_TIME: 600, pycurl.NOSIGNAL: 1, pycurl.WRITEFUNCTION: Any(), pycurl.DNS_CACHE_TIMEOUT: 0, pycurl.ENCODING: b"gzip,deflate"}) finally: pycurl.Curl = Curl
def init_handle(self): """ Sets common options to curl handle. """ self.setopt(pycurl.FOLLOWLOCATION, 1) self.setopt(pycurl.MAXREDIRS, 5) self.setopt(pycurl.CONNECTTIMEOUT, 30) self.setopt(pycurl.NOSIGNAL, 1) self.setopt(pycurl.NOPROGRESS, 1) if hasattr(pycurl, "AUTOREFERER"): self.setopt(pycurl.AUTOREFERER, 1) self.setopt(pycurl.SSL_VERIFYPEER, 0) # Interval for low speed, detects connection loss, but can abort dl if # hoster stalls the download self.setopt(pycurl.LOW_SPEED_TIME, 45) self.setopt(pycurl.LOW_SPEED_LIMIT, 5) # do not save the cookies self.setopt(pycurl.COOKIEFILE, '') self.setopt(pycurl.COOKIEJAR, '') # self.setopt(pycurl.VERBOSE, 1) self.setopt( pycurl.USERAGENT, 'Mozilla/5.0 (Windows NT 10.0; Win64; rv:53.0) ' 'Gecko/20100101 Firefox/53.0') if pycurl.version_info()[7]: self.setopt(pycurl.ENCODING, 'gzip,deflate') self.headers.update( {'Accept': "*/*", 'Accept-Language': "en-US,en", 'Accept-Charset': "ISO-8859-1,utf-8;q=0.7,*;q=0.7", 'Connection': "keep-alive", 'Keep-Alive': "300", 'Expect': ""})
def __init__(self, base_url="", fakeheaders=[]): self.handle = pycurl.Curl() # These members might be set. self.set_url(base_url) self.verbosity = 0 self.fakeheaders = fakeheaders # Nothing past here should be modified by the caller. self.payload = None self.payload_io = BytesIO() self.hrd = "" # Verify that we've got the right site; harmless on a non-SSL connect. self.set_option(pycurl.SSL_VERIFYHOST, 2) # Follow redirects in case it wants to take us to a CGI... self.set_option(pycurl.FOLLOWLOCATION, 1) self.set_option(pycurl.MAXREDIRS, 5) self.set_option(pycurl.NOSIGNAL, 1) # Setting this option with even a nonexistent file makes libcurl # handle cookie capture and playback automatically. self.set_option(pycurl.COOKIEFILE, "/dev/null") # Set timeouts to avoid hanging too long self.set_timeout(30) # Use password identification from .netrc automatically self.set_option(pycurl.NETRC, 1) self.set_option(pycurl.WRITEFUNCTION, self.payload_io.write) def header_callback(x): self.hdr += x.decode('ascii') self.set_option(pycurl.HEADERFUNCTION, header_callback)
def load_url(url, token, shape=(8, 256, 256)): """ Loads a geotiff url inside a thread and returns as an ndarray """ # print("calling load_url ({})".format(url)) _, ext = os.path.splitext(urlparse(url).path) success = False for i in xrange(MAX_RETRIES): thread_id = threading.current_thread().ident _curl = _curl_pool[thread_id] _curl.setopt(_curl.URL, url) _curl.setopt(pycurl.NOSIGNAL, 1) _curl.setopt(pycurl.HTTPHEADER, ['Authorization: Bearer {}'.format(token)]) with NamedTemporaryFile(prefix="gbdxtools", suffix=ext, delete=False) as temp: # TODO: apply correct file extension _curl.setopt(_curl.WRITEDATA, temp.file) _curl.perform() code = _curl.getinfo(pycurl.HTTP_CODE) try: if(code != 200): raise TypeError("Request for {} returned unexpected error code: {}".format(url, code)) temp.file.flush() temp.close() with rasterio.open(temp.name) as dataset: arr = dataset.read() success = True return arr except (TypeError, RasterioIOError) as e: print(e) _curl.close() del _curl_pool[thread_id] finally: temp.close() os.remove(temp.name) if success is False: arr = np.zeros(shape, dtype=np.float32) return arr
def load_url(url, shape=(8, 256, 256)): """ Loads a geotiff url inside a thread and returns as an ndarray """ thread_id = threading.current_thread().ident _curl = _curl_pool[thread_id] _curl.setopt(_curl.URL, url) _curl.setopt(pycurl.NOSIGNAL, 1) _, ext = os.path.splitext(urlparse(url).path) with NamedTemporaryFile(prefix="gbdxtools", suffix="."+ext, delete=False) as temp: # TODO: apply correct file extension _curl.setopt(_curl.WRITEDATA, temp.file) _curl.perform() code = _curl.getinfo(pycurl.HTTP_CODE) try: if(code != 200): raise TypeError("Request for {} returned unexpected error code: {}".format(url, code)) temp.file.flush() temp.close() with rasterio.open(temp.name) as dataset: arr = dataset.read() except (TypeError, RasterioIOError) as e: print(e) temp.seek(0) print(temp.read()) arr = np.zeros(shape, dtype=np.uint8) _curl.close() del _curl_pool[thread_id] finally: temp.close() os.remove(temp.name) return arr
def perform(self): self.__performHead="" self.__performBody="" conn=pycurl.Curl() conn.setopt(pycurl.SSL_VERIFYPEER,False) conn.setopt(pycurl.SSL_VERIFYHOST,1) conn.setopt(pycurl.URL,self.completeUrl) if self.__method or self.__userpass: if self.__method=="basic": conn.setopt(pycurl.HTTPAUTH, pycurl.HTTPAUTH_BASIC) elif self.__method=="ntlm": conn.setopt(pycurl.HTTPAUTH, pycurl.HTTPAUTH_NTLM) elif self.__method=="digest": conn.setopt(pycurl.HTTPAUTH, pycurl.HTTPAUTH_DIGEST) conn.setopt(pycurl.USERPWD, self.__userpass) if self.__timeout: conn.setopt(pycurl.CONNECTTIMEOUT, self.__timeout) conn.setopt(pycurl.NOSIGNAL, 1) if self.__totaltimeout: conn.setopt(pycurl.TIMEOUT, self.__totaltimeout) conn.setopt(pycurl.NOSIGNAL, 1) conn.setopt(pycurl.WRITEFUNCTION, self.body_callback) conn.setopt(pycurl.HEADERFUNCTION, self.header_callback) if self.__proxy!=None: conn.setopt(pycurl.PROXY,self.__proxy) if self.__headers.has_key("Proxy-Connection"): del self.__headers["Proxy-Connection"] conn.setopt(pycurl.HTTPHEADER,self.__getHeaders()) if self.method=="POST": conn.setopt(pycurl.POSTFIELDS,self.__postdata) conn.perform() rp=Response() rp.parseResponse(self.__performHead) rp.addContent(self.__performBody) self.response=rp ######### ESTE conjunto de funciones no es necesario para el uso habitual de la clase
def to_pycurl_object(c, req): c.setopt(pycurl.MAXREDIRS, 5) c.setopt(pycurl.WRITEFUNCTION, req.body_callback) c.setopt(pycurl.HEADERFUNCTION, req.header_callback) c.setopt(pycurl.NOSIGNAL, 1) c.setopt(pycurl.SSL_VERIFYPEER, False) c.setopt(pycurl.SSL_VERIFYHOST, 0) c.setopt(pycurl.URL,req.completeUrl) if req.getConnTimeout(): c.setopt(pycurl.CONNECTTIMEOUT, req.getConnTimeout()) if req.getTotalTimeout(): c.setopt(pycurl.TIMEOUT, req.getTotalTimeout()) authMethod, userpass = req.getAuth() if authMethod or userpass: if authMethod == "basic": c.setopt(pycurl.HTTPAUTH, pycurl.HTTPAUTH_BASIC) elif authMethod == "ntlm": c.setopt(pycurl.HTTPAUTH, pycurl.HTTPAUTH_NTLM) elif authMethod == "digest": c.setopt(pycurl.HTTPAUTH, pycurl.HTTPAUTH_DIGEST) c.setopt(pycurl.USERPWD, userpass) c.setopt(pycurl.HTTPHEADER, req.getHeaders()) if req.method == "POST": c.setopt(pycurl.POSTFIELDS, req.postdata) if req.method != "GET" and req.method != "POST": c.setopt(pycurl.CUSTOMREQUEST, req.method) if req.method == "HEAD": c.setopt(pycurl.NOBODY, True) if req.followLocation: c.setopt(pycurl.FOLLOWLOCATION, 1) proxy = req.getProxy() if proxy != None: c.setopt(pycurl.PROXY, proxy) if req.proxytype=="SOCKS5": c.setopt(pycurl.PROXYTYPE,pycurl.PROXYTYPE_SOCKS5) elif req.proxytype=="SOCKS4": c.setopt(pycurl.PROXYTYPE,pycurl.PROXYTYPE_SOCKS4) req.delHeader("Proxy-Connection") return c
def request(self, method, url, headers, post_data=None): s = util.StringIO.StringIO() rheaders = util.StringIO.StringIO() curl = pycurl.Curl() proxy = self._get_proxy(url) if proxy: if proxy.hostname: curl.setopt(pycurl.PROXY, proxy.hostname) if proxy.port: curl.setopt(pycurl.PROXYPORT, proxy.port) if proxy.username or proxy.password: curl.setopt( pycurl.PROXYUSERPWD, "%s:%s" % (proxy.username, proxy.password)) if method == 'get': curl.setopt(pycurl.HTTPGET, 1) elif method == 'post': curl.setopt(pycurl.POST, 1) curl.setopt(pycurl.POSTFIELDS, post_data) else: curl.setopt(pycurl.CUSTOMREQUEST, method.upper()) # pycurl doesn't like unicode URLs curl.setopt(pycurl.URL, util.utf8(url)) curl.setopt(pycurl.WRITEFUNCTION, s.write) curl.setopt(pycurl.HEADERFUNCTION, rheaders.write) curl.setopt(pycurl.NOSIGNAL, 1) curl.setopt(pycurl.CONNECTTIMEOUT, 30) curl.setopt(pycurl.TIMEOUT, 80) curl.setopt(pycurl.HTTPHEADER, ['%s: %s' % (k, v) for k, v in headers.items()]) if self._verify_ssl_certs: curl.setopt(pycurl.CAINFO, os.path.join( os.path.dirname(__file__), 'data/ca-certificates.crt')) else: curl.setopt(pycurl.SSL_VERIFYHOST, False) try: curl.perform() except pycurl.error as e: self._handle_request_error(e) rbody = s.getvalue() rcode = curl.getinfo(pycurl.RESPONSE_CODE) return rbody, rcode, self.parse_headers(rheaders.getvalue())
def version_update(self): if not obplayer.Config.setting('sync_url'): return obplayer.Log.log('sending player version to server: ' + obplayer.Config.version, 'sync') postfields = {} postfields['id'] = obplayer.Config.setting('sync_device_id') postfields['pw'] = obplayer.Config.setting('sync_device_password') postfields['version'] = obplayer.Config.version postfields['longitude'] = obplayer.Config.setting('location_longitude') postfields['latitude'] = obplayer.Config.setting('location_latitude') curl = pycurl.Curl() enc_postfields = urllib.urlencode(postfields) curl.setopt(pycurl.NOSIGNAL, 1) curl.setopt(pycurl.USERAGENT, 'OpenBroadcaster Player') curl.setopt(pycurl.URL, obplayer.Config.setting('sync_url') + '?action=version') curl.setopt(pycurl.HEADER, False) curl.setopt(pycurl.POST, True) curl.setopt(pycurl.POSTFIELDS, enc_postfields) curl.setopt(pycurl.LOW_SPEED_LIMIT, 10) curl.setopt(pycurl.LOW_SPEED_TIME, 60) curl.setopt(pycurl.NOPROGRESS, 0) curl.setopt(pycurl.PROGRESSFUNCTION, self.curl_progress) class CurlResponse: def __init__(self): self.buffer = u'' def __call__(self, data): self.buffer += data.decode('utf-8') curl_response = CurlResponse() curl.setopt(pycurl.WRITEFUNCTION, curl_response) try: curl.perform() except: obplayer.Log.log("exception in VersionUpdate thread", 'error') obplayer.Log.log(traceback.format_exc(), 'error') curl.close() if curl_response.buffer: version = json.loads(curl_response.buffer) obplayer.Log.log("server version reported as " + str(version), 'sync') if not self.check_min_version(version): obplayer.Log.log("minimum server version " + str(MIN_SERVER_VERSION) + " is required. Please update server software before continuing", 'error') else: obplayer.Log.log("server did not report a version number", 'warning')
def now_playing_update_thread(self, playlist_id, playlist_end, media_id, media_end, show_name): if not obplayer.Config.setting('sync_url'): return postfields = {} postfields['id'] = obplayer.Config.setting('sync_device_id') postfields['pw'] = obplayer.Config.setting('sync_device_password') postfields['playlist_id'] = playlist_id postfields['media_id'] = media_id postfields['show_name'] = show_name if playlist_end != '': postfields['playlist_end'] = int(round(playlist_end)) else: postfields['playlist_end'] = '' if media_end != '': postfields['media_end'] = int(round(media_end)) else: postfields['media_end'] = '' curl = pycurl.Curl() enc_postfields = urllib.urlencode(postfields) curl.setopt(pycurl.NOSIGNAL, 1) curl.setopt(pycurl.USERAGENT, 'OpenBroadcaster Player') curl.setopt(pycurl.URL, obplayer.Config.setting('sync_url') + '?action=now_playing') curl.setopt(pycurl.HEADER, False) curl.setopt(pycurl.POST, True) curl.setopt(pycurl.POSTFIELDS, enc_postfields) #curl.setopt(pycurl.FOLLOWLOCATION, 1) curl.setopt(pycurl.LOW_SPEED_LIMIT, 10) curl.setopt(pycurl.LOW_SPEED_TIME, 60) curl.setopt(pycurl.NOPROGRESS, 0) curl.setopt(pycurl.PROGRESSFUNCTION, self.curl_progress) try: curl.perform() except: obplayer.Log.log("exception in NowPlayingUpdate thread", 'error') obplayer.Log.log(traceback.format_exc(), 'error') curl.close() # # Request sync data from web application. # This is used by sync (with request_type='schedule') and sync_priority_broadcasts (with request_type='emerg'). # Function outputs XML response from server. #
def sync_request(self, request_type='', data=False): sync_url = obplayer.Config.setting('sync_url') if not sync_url: obplayer.Log.log("sync url is blank, skipping sync request", 'sync') return '' curl = pycurl.Curl() postfields = {} postfields['id'] = obplayer.Config.setting('sync_device_id') postfields['pw'] = obplayer.Config.setting('sync_device_password') postfields['hbuffer'] = obplayer.Config.setting('sync_buffer') if data: postfields['data'] = data enc_postfields = urllib.urlencode(postfields) curl.setopt(pycurl.NOSIGNAL, 1) curl.setopt(pycurl.USERAGENT, 'OpenBroadcaster Player') curl.setopt(pycurl.URL, sync_url + '?action=' + request_type) curl.setopt(pycurl.HEADER, False) curl.setopt(pycurl.POST, True) curl.setopt(pycurl.POSTFIELDS, enc_postfields) # some options so that it'll abort the transfer if the speed is too low (i.e., network problem) # low speed abort set to 0.01Kbytes/s for 60 seconds). curl.setopt(pycurl.LOW_SPEED_LIMIT, 10) curl.setopt(pycurl.LOW_SPEED_TIME, 60) curl.setopt(pycurl.NOPROGRESS, 0) curl.setopt(pycurl.PROGRESSFUNCTION, self.curl_progress) class CurlResponse: def __init__(self): self.buffer = u'' def __call__(self, data): self.buffer += data.decode('utf-8') curl_response = CurlResponse() curl.setopt(pycurl.WRITEFUNCTION, curl_response) try: curl.perform() #except pycurl.error as error: # (errno, errstr) = error # obplayer.Log.log('network error: ' + errstr, 'error') except: obplayer.Log.log("exception in sync " + request_type + " thread", 'error') obplayer.Log.log(traceback.format_exc(), 'error') curl.close() return curl_response.buffer # # Fetch media from web application. Saves under media directory. # media_id : id of the media we want # filename : filename to save under. #