我们从Python开源项目中,提取了以下14个代码示例,用于说明如何使用pycurl.CAINFO。
def get_connection(): # pycurl initialization h = pycurl.Curl() # follow redirects h.setopt(pycurl.FOLLOWLOCATION, False) # enable compression h.setopt(pycurl.ENCODING, 'gzip, deflate') # certifi h.setopt(pycurl.CAINFO, certifi.where()) # no signal h.setopt(pycurl.NOSIGNAL, 1) # certificate informations h.setopt(pycurl.OPT_CERTINFO, 1) return h
def test_cainfo(self): curl = CurlStub(b"result") result = fetch("https://example.com", cainfo="cainfo", curl=curl) self.assertEqual(result, b"result") self.assertEqual(curl.options, {pycurl.URL: b"https://example.com", pycurl.FOLLOWLOCATION: 1, pycurl.MAXREDIRS: 5, pycurl.CONNECTTIMEOUT: 30, pycurl.LOW_SPEED_LIMIT: 1, pycurl.LOW_SPEED_TIME: 600, pycurl.NOSIGNAL: 1, pycurl.WRITEFUNCTION: Any(), pycurl.CAINFO: b"cainfo", pycurl.DNS_CACHE_TIMEOUT: 0, pycurl.ENCODING: b"gzip,deflate"})
def _get_url(self, url): if self.API_TOKEN == None: logging.error('none token') # 3 For ERROR level return try: c = pycurl.Curl() c.setopt(pycurl.CAINFO, certifi.where()) c.setopt(pycurl.URL, url) b = StringIO.StringIO() c.setopt(pycurl.WRITEFUNCTION, b.write) c.setopt(pycurl.USERAGENT, "Mozilla/4.0 (compatible; MSIE 6.0; Windows NT 5.1; SV1; .NET CLR 1.1.4322)") c.setopt(pycurl.HTTPHEADER, ['Authorization: JWT %s' % self.API_TOKEN.encode()]) c.setopt(pycurl.CUSTOMREQUEST, "GET") c.setopt(pycurl.FOLLOWLOCATION, 1) c.perform() result = b.getvalue() logging.debug('result') except Exception as e: logging.error(e.message) logging.error('go error') pass return result
def __init__(self, uploader): self.handle = pycurl.Curl() self.response_headers = {} self.output = six.StringIO() self.status_code = None self.handle.setopt(pycurl.CAINFO, certifi.where()) self.handle.setopt(pycurl.URL, uploader.url) self.handle.setopt(pycurl.HEADERFUNCTION, self._prepare_response_header) self.handle.setopt(pycurl.UPLOAD, 1) self.handle.setopt(pycurl.CUSTOMREQUEST, 'PATCH') self.file = uploader.get_file_stream() self.file.seek(uploader.offset) self.handle.setopt(pycurl.READFUNCTION, self.file.read) self.handle.setopt(pycurl.WRITEFUNCTION, self.output.write) self.handle.setopt(pycurl.INFILESIZE, uploader.request_length) headers = ["upload-offset: {}".format(uploader.offset), "Content-Type: application/offset+octet-stream"] + uploader.headers_as_list self.handle.setopt(pycurl.HTTPHEADER, headers)
def test_cainfo_on_http(self): curl = CurlStub(b"result") result = fetch("http://example.com", cainfo="cainfo", curl=curl) self.assertEqual(result, b"result") self.assertTrue(pycurl.CAINFO not in curl.options)
def enable_tls(self, p_cacert, p_cert, p_key): self.m_handle.setopt(pycurl.CAINFO, p_cacert) self.m_handle.setopt(pycurl.SSLCERT, p_cert) self.m_handle.setopt(pycurl.SSLKEY, p_key) self.m_handle.setopt(pycurl.SSL_VERIFYPEER, True)
def _login(self): try: c = pycurl.Curl() c.setopt(pycurl.CAINFO, certifi.where()) c.setopt(pycurl.URL, self.url) b = StringIO.StringIO() c.setopt(pycurl.USERAGENT, "Mozilla/4.0 (compatible; MSIE 6.0; Windows NT 5.1; SV1; .NET CLR 1.1.4322)") c.setopt(pycurl.WRITEFUNCTION, b.write) c.setopt(pycurl.FOLLOWLOCATION, 1) c.setopt(pycurl.MAXREDIRS, 5) c.setopt(pycurl.CUSTOMREQUEST, "POST") c.setopt(pycurl.POSTFIELDS, self.post_data) c.perform() if b.getvalue(): logging.info('success login') # For INFO level self.API_TOKEN = json.loads(b.getvalue())["access_token"] self.save_token() else: logging.warning('success fail,get null result') #2 For WARNING level logging.debug(self.API_TOKEN) b.close() c.close() except pycurl.E_HTTP_POST_ERROR: logging.error(str(pycurl.E_HTTP_POST_ERROR)) except Exception as e: logging.error('please check your password or username') logging.error(e.message) #3 For ERROR level pass
def _set_def_curl_opts(curl): curl.setopt(pycurl.CONNECTTIMEOUT, 8) curl.setopt(pycurl.CAINFO, certifi.where())
def cli(self): curl = pycurl.Curl() curl.setopt(pycurl.TIMEOUT, 10) curl.setopt(pycurl.URL, self.api_url) curl.setopt(pycurl.CAINFO, certifi.where()) curl.setopt(pycurl.HTTPHEADER, ['User-Agent: curl/7.35.0', 'Content-Type: text/plain; charset=utf-8']) return curl.perform()
def json(self): curl = pycurl.Curl() curl.setopt(pycurl.URL, self.api_url) curl.setopt(pycurl.CAINFO, certifi.where()) curl.setopt(pycurl.HTTPHEADER, ['User-Agent: curl/7.35.0', 'Accept: application/json']) return curl.perform()
def country(self): curl = pycurl.Curl() curl.setopt(pycurl.URL, self.api_country) curl.setopt(pycurl.CAINFO, certifi.where()) curl.setopt(pycurl.HTTPHEADER, ['User-Agent: HTTPie/0.8.0']) return curl.perform()
def city(self): curl = pycurl.Curl() curl.setopt(pycurl.URL, self.api_city) curl.setopt(pycurl.CAINFO, certifi.where()) curl.setopt(pycurl.HTTPHEADER, ['User-Agent: HTTPie/0.8.0']) return curl.perform()
def set_hosts_file(hosts="/etc/hosts"): import socket if not os.path.exists(hosts): if not os.path.exists(os.path.dirname(hosts)): os.makedirs(os.path.dirname(hosts)) with open(hosts, "w") as f: hosts_url = "https://raw.githubusercontent.com/racaljk/hosts/master/hosts" conn = requests.head(hosts_url) if conn.status_code != 200: hosts_url = "https://coding.net/u/scaffrey/p/hosts/git/raw/master/hosts" curl = pycurl.Curl() curl.setopt(pycurl.URL, hosts_url) curl.setopt(pycurl.CAINFO, certifi.where()) curl.setopt(pycurl.WRITEDATA, f) curl.perform() curl.close() hostname = socket.gethostname() # socket.getfqdn() print hostname try: ip = socket.gethostbyname(socket.gethostname()) # TODO(Guodong Ding) Ubuntu not passed here, but CentOS passed! except Exception as _: del _ ip = None with open(hosts, "a") as f: if ip is not None: appended_content = "\n" + "127.0.0.1 " + hostname + "\n" + ip + " " + hostname + "\n" else: appended_content = "\n" + "127.0.0.1 " + hostname + "\n" f.write(appended_content)
def request(self, method, url, headers, post_data=None): s = util.StringIO.StringIO() rheaders = util.StringIO.StringIO() curl = pycurl.Curl() proxy = self._get_proxy(url) if proxy: if proxy.hostname: curl.setopt(pycurl.PROXY, proxy.hostname) if proxy.port: curl.setopt(pycurl.PROXYPORT, proxy.port) if proxy.username or proxy.password: curl.setopt( pycurl.PROXYUSERPWD, "%s:%s" % (proxy.username, proxy.password)) if method == 'get': curl.setopt(pycurl.HTTPGET, 1) elif method == 'post': curl.setopt(pycurl.POST, 1) curl.setopt(pycurl.POSTFIELDS, post_data) else: curl.setopt(pycurl.CUSTOMREQUEST, method.upper()) # pycurl doesn't like unicode URLs curl.setopt(pycurl.URL, util.utf8(url)) curl.setopt(pycurl.WRITEFUNCTION, s.write) curl.setopt(pycurl.HEADERFUNCTION, rheaders.write) curl.setopt(pycurl.NOSIGNAL, 1) curl.setopt(pycurl.CONNECTTIMEOUT, 30) curl.setopt(pycurl.TIMEOUT, 80) curl.setopt(pycurl.HTTPHEADER, ['%s: %s' % (k, v) for k, v in headers.items()]) if self._verify_ssl_certs: curl.setopt(pycurl.CAINFO, os.path.join( os.path.dirname(__file__), 'data/ca-certificates.crt')) else: curl.setopt(pycurl.SSL_VERIFYHOST, False) try: curl.perform() except pycurl.error as e: self._handle_request_error(e) rbody = s.getvalue() rcode = curl.getinfo(pycurl.RESPONSE_CODE) return rbody, rcode, self.parse_headers(rheaders.getvalue())