我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用ssl.CertificateError()。
def connect(self): sock = socket.create_connection( (self.host, self.port), getattr(self, 'source_address', None) ) # Handle the socket if a (proxy) tunnel is present if hasattr(self, '_tunnel') and getattr(self, '_tunnel_host', None): self.sock = sock self._tunnel() self.sock = ssl.wrap_socket( sock, cert_reqs=ssl.CERT_REQUIRED, ca_certs=self.ca_bundle ) try: match_hostname(self.sock.getpeercert(), self.host) except CertificateError: self.sock.shutdown(socket.SHUT_RDWR) self.sock.close() raise
def connect(self): """Connect to Mongo and return a new SocketInfo. Can raise ConnectionFailure or CertificateError. Note that the pool does not keep a reference to the socket -- you must call return_socket() when you're done with it. """ sock = None try: sock = _configured_socket(self.address, self.opts) if self.handshake: ismaster = IsMaster(command(sock, 'admin', {'ismaster': 1}, False, False, ReadPreference.PRIMARY, DEFAULT_CODEC_OPTIONS)) else: ismaster = None return SocketInfo(sock, self, ismaster, self.address) except socket.error as error: if sock is not None: sock.close() _raise_connection_failure(self.address, error)
def test_ssl_hostname_mismatch_repeat(self): # https://github.com/httplib2/httplib2/issues/5 # FIXME(temoto): as of 2017-01-05 this is only a reference code, not useful test. # Because it doesn't provoke described error on my machine. # Instead `SSLContext.wrap_socket` raises `ssl.CertificateError` # which was also added to original patch. # url host is intentionally different, we provoke ssl hostname mismatch error url = 'https://127.0.0.1:%d/' % (self.port,) http = httplib2.Http(ca_certs=self.ca_certs_path, proxy_info=None) def once(): try: http.request(url) assert False, 'expected certificate hostname mismatch error' except Exception as e: print('%s errno=%s' % (repr(e), getattr(e, 'errno', None))) once() once()
def test_https_with_cafile(self): handler = self.start_https_server(certfile=CERT_localhost) import ssl # Good cert data = self.urlopen("https://localhost:%s/bizarre" % handler.port, cafile=CERT_localhost) self.assertEqual(data, b"we care a bit") # Bad cert with self.assertRaises(urllib.error.URLError) as cm: self.urlopen("https://localhost:%s/bizarre" % handler.port, cafile=CERT_fakehostname) # Good cert, but mismatching hostname handler = self.start_https_server(certfile=CERT_fakehostname) with self.assertRaises(ssl.CertificateError) as cm: self.urlopen("https://localhost:%s/bizarre" % handler.port, cafile=CERT_fakehostname)
def test_local_bad_hostname(self): # The (valid) cert doesn't validate the HTTP hostname import ssl from test.ssl_servers import make_https_server server = make_https_server(self, CERT_fakehostname) context = ssl.SSLContext(ssl.PROTOCOL_TLSv1) context.verify_mode = ssl.CERT_REQUIRED context.load_verify_locations(CERT_fakehostname) h = client.HTTPSConnection('localhost', server.port, context=context) with self.assertRaises(ssl.CertificateError): h.request('GET', '/') # Same with explicit check_hostname=True h = client.HTTPSConnection('localhost', server.port, context=context, check_hostname=True) with self.assertRaises(ssl.CertificateError): h.request('GET', '/') # With check_hostname=False, the mismatching is ignored h = client.HTTPSConnection('localhost', server.port, context=context, check_hostname=False) h.request('GET', '/nonexistent') resp = h.getresponse() self.assertEqual(resp.status, 404)
def match_hostname(cert, hostname): """Verify that *cert* (in decoded format as returned by SSLSocket.getpeercert()) matches the *hostname*. RFC 2818 and RFC 6125 rules are followed, but IP addresses are not accepted for *hostname*. CertificateError is raised on failure. On success, the function returns nothing. """ if not cert: raise ValueError("empty or no certificate, match_hostname needs a " "SSL socket or SSL context with either " "CERT_OPTIONAL or CERT_REQUIRED") dnsnames = [] san = cert.get('subjectAltName', ()) for key, value in san: if key == 'DNS': if _dnsname_match(value, hostname): return dnsnames.append(value) if not dnsnames: # The subject is only checked when there is no dNSName entry # in subjectAltName for sub in cert.get('subject', ()): for key, value in sub: # XXX according to RFC 2818, the most specific Common Name # must be used. if key == 'commonName': if _dnsname_match(value, hostname): return dnsnames.append(value) if len(dnsnames) > 1: raise CertificateError("hostname %r " "doesn't match either of %s" % (hostname, ', '.join(map(repr, dnsnames)))) elif len(dnsnames) == 1: raise CertificateError("hostname %r " "doesn't match %r" % (hostname, dnsnames[0])) else: raise CertificateError("no appropriate commonName or " "subjectAltName fields were found")
def match_hostname(cert, hostname): """Verify that *cert* (in decoded format as returned by SSLSocket.getpeercert()) matches the *hostname*. RFC 2818 and RFC 6125 rules are followed, but IP addresses are not accepted for *hostname*. CertificateError is raised on failure. On success, the function returns nothing. """ if not cert: raise ValueError("empty or no certificate") dnsnames = [] san = cert.get('subjectAltName', ()) for key, value in san: if key == 'DNS': if _dnsname_match(value, hostname): return dnsnames.append(value) if not dnsnames: # The subject is only checked when there is no dNSName entry # in subjectAltName for sub in cert.get('subject', ()): for key, value in sub: # XXX according to RFC 2818, the most specific Common Name # must be used. if key == 'commonName': if _dnsname_match(value, hostname): return dnsnames.append(value) if len(dnsnames) > 1: raise CertificateError("hostname %r " "doesn't match either of %s" % (hostname, ', '.join(map(repr, dnsnames)))) elif len(dnsnames) == 1: raise CertificateError("hostname %r " "doesn't match %r" % (hostname, dnsnames[0])) else: raise CertificateError("no appropriate commonName or " "subjectAltName fields were found")
def connect(self): sock = socket.create_connection( (self.host, self.port), getattr(self, 'source_address', None) ) # Handle the socket if a (proxy) tunnel is present if hasattr(self, '_tunnel') and getattr(self, '_tunnel_host', None): self.sock = sock self._tunnel() # http://bugs.python.org/issue7776: Python>=3.4.1 and >=2.7.7 # change self.host to mean the proxy server host when tunneling is # being used. Adapt, since we are interested in the destination # host for the match_hostname() comparison. actual_host = self._tunnel_host else: actual_host = self.host self.sock = ssl.wrap_socket( sock, cert_reqs=ssl.CERT_REQUIRED, ca_certs=self.ca_bundle ) try: match_hostname(self.sock.getpeercert(), actual_host) except CertificateError: self.sock.shutdown(socket.SHUT_RDWR) self.sock.close() raise
def _configured_socket(address, options): """Given (host, port) and PoolOptions, return a configured socket. Can raise socket.error, ConnectionFailure, or CertificateError. Sets socket's SSL and timeout options. """ sock = _create_connection(address, options) ssl_context = options.ssl_context if ssl_context is not None: try: sock = ssl_context.wrap_socket(sock) except IOError as exc: sock.close() raise ConnectionFailure("SSL handshake failed: %s" % (str(exc),)) if ssl_context.verify_mode and options.ssl_match_hostname: try: match_hostname(sock.getpeercert(), hostname=address[0]) except CertificateError: sock.close() raise sock.settimeout(options.socket_timeout) return sock # Do *not* explicitly inherit from object or Jython won't call __del__ # http://bugs.jython.org/issue1057
def _get_socket_no_auth(self): """Get or create a SocketInfo. Can raise ConnectionFailure.""" # We use the pid here to avoid issues with fork / multiprocessing. # See test.test_client:TestClient.test_fork for an example of # what could go wrong otherwise if self.pid != os.getpid(): self.reset() # Get a free socket or create one. if not self._socket_semaphore.acquire( True, self.opts.wait_queue_timeout): self._raise_wait_queue_timeout() # We've now acquired the semaphore and must release it on error. try: try: # set.pop() isn't atomic in Jython less than 2.7, see # http://bugs.jython.org/issue1854 with self.lock: sock_info, from_pool = self.sockets.pop(), True except KeyError: # Can raise ConnectionFailure or CertificateError. sock_info, from_pool = self.connect(), False if from_pool: # Can raise ConnectionFailure. sock_info = self._check(sock_info) except: self._socket_semaphore.release() raise sock_info.last_checkout = _time() return sock_info
def match_hostname(cert, hostname): """Verify that *cert* (in decoded format as returned by SSLSocket.getpeercert()) matches the *hostname*. RFC 2818 rules are mostly followed, but IP addresses are not accepted for *hostname*. CertificateError is raised on failure. On success, the function returns nothing. """ if not cert: raise ValueError("empty or no certificate") dnsnames = [] san = cert.get('subjectAltName', ()) for key, value in san: if key == 'DNS': if _dnsname_to_pat(value).match(hostname): return dnsnames.append(value) if not dnsnames: # The subject is only checked when there is no dNSName entry # in subjectAltName for sub in cert.get('subject', ()): for key, value in sub: # XXX according to RFC 2818, the most specific Common Name # must be used. if key == 'commonName': if _dnsname_to_pat(value).match(hostname): return dnsnames.append(value) if len(dnsnames) > 1: raise CertificateError("hostname %r " "doesn't match either of %s" % (hostname, ', '.join(map(repr, dnsnames)))) elif len(dnsnames) == 1: raise CertificateError("hostname %r " "doesn't match %r" % (hostname, dnsnames[0])) else: raise CertificateError("no appropriate commonName or " "subjectAltName fields were found")
def fetch(self, url): """Load a webpage and read return the body as plaintext.""" self.logger.info("{url}: loading...".format(**locals())) try: with aiohttp.Timeout(self.page_load_timeout, loop=self.loop): async with self.session.get(url, allow_redirects=True, headers=self.headers) as resp: if resp.status != 200: self.logger.warning("{url} was not reachable. HTTP " "error code {resp.status} was " "returned".format(**locals())) raise SorterResponseCodeError self.logger.info("{url}: loaded " "successfully.".format(**locals())) return await resp.text() except asyncio.TimeoutError: self.logger.warning("{url}: timed out after " "{self.page_load_timeout}.".format(**locals())) raise SorterTimeoutError except (aiosocks.errors.SocksError, aiohttp.errors.ServerDisconnectedError, aiohttp.errors.ClientResponseError) as exc: self.logger.warning("{url} was not reachable: " "{exc}".format(**locals())) raise SorterConnectionError except aiohttp.errors.ClientOSError as exception_msg: if "SSL" in exception_msg: self.logger.warning("{url}: certificate error (probably due to " "use of a self-signed " "cert.".format(**locals())) raise SorterCertError else: raise except (ssl.CertificateError, aiohttp.errors.ClientOSError): self.logger.warning("{url}: certificate error (probably due to " "use of a self-signed " "cert.".format(**locals())) raise SorterCertError
def test_create_server_ssl_match_failed(self): proto = MyProto(loop=self.loop) server, host, port = self._make_ssl_server( lambda: proto, SIGNED_CERTFILE) sslcontext_client = ssl.SSLContext(ssl.PROTOCOL_SSLv23) sslcontext_client.options |= ssl.OP_NO_SSLv2 sslcontext_client.verify_mode = ssl.CERT_REQUIRED sslcontext_client.load_verify_locations( cafile=SIGNING_CA) if hasattr(sslcontext_client, 'check_hostname'): sslcontext_client.check_hostname = True # incorrect server_hostname f_c = self.loop.create_connection(MyProto, host, port, ssl=sslcontext_client) with mock.patch.object(self.loop, 'call_exception_handler'): with test_utils.disable_logger(): with self.assertRaisesRegex( ssl.CertificateError, "hostname '127.0.0.1' doesn't match 'localhost'"): self.loop.run_until_complete(f_c) # close connection proto.transport.close() server.close()
def ready(self): conn = self._get_connection() try: return self._ready_request(conn) except (ssl.SSLError, ssl.CertificateError): # If there is a problem with the cert or SSL connection, error out immediately self.task.update(status="SSL error") raise except Exception: return False finally: conn.close()