我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用OpenSSL.SSL.Error()。
def writeSomeData(self, data): try: return Connection.writeSomeData(self, data) except SSL.WantWriteError: return 0 except SSL.WantReadError: self.writeBlockedOnRead = 1 Connection.stopWriting(self) Connection.startReading(self) return 0 except SSL.ZeroReturnError: return main.CONNECTION_LOST except SSL.SysCallError, e: if e[0] == -1 and data == "": # errors when writing empty strings are expected # and can be ignored return 0 else: return main.CONNECTION_LOST except SSL.Error, e: return e
def test_doesNotSwallowOtherSSLErrors(self): """ Only no cipher matches get swallowed, every other SSL error gets propagated. """ def raiser(_): # Unfortunately, there seems to be no way to trigger a real SSL # error artificially. raise SSL.Error([['', '', '']]) ctx = FakeContext(SSL.SSLv23_METHOD) ctx.set_cipher_list = raiser self.patch(sslverify.SSL, 'Context', lambda _: ctx) self.assertRaises( SSL.Error, sslverify._expandCipherString, u'ALL', SSL.SSLv23_METHOD, 0 )
def __init__(self, host, port, verify, cert_path, pkey_path, pkey_passphrase=''): self.host = host self.port = port try: self.pkey = crypto.load_privatekey(crypto.FILETYPE_PEM, open(pkey_path, 'rb').read(), pkey_passphrase) except IOError: raise eStreamerKeyError("Unable to locate key file {}".format(pkey_path)) except crypto.Error: raise eStreamerKeyError("Invalid key file or bad passphrase {}".format(pkey_path)) try: self.cert = crypto.load_certificate(crypto.FILETYPE_PEM, open(cert_path, 'rb').read()) except IOError: raise eStreamerCertError("Unable to locate cert file {}".format(cert_path)) except crypto.Error: raise eStreamerCertError("Invalid certificate {}".format(cert_path)) self.verify = verify self.ctx = None self.sock = None self._bytes = None
def test03_ssl_verification_of_peer_fails(self): ctx = SSL.Context(SSL.TLSv1_METHOD) def verify_callback(conn, x509, errnum, errdepth, preverify_ok): log.debug('SSL peer certificate verification failed for %r', x509.get_subject()) return preverify_ok ctx.set_verify(SSL.VERIFY_PEER, verify_callback) ctx.set_verify_depth(9) # Set bad location - unit test dir has no CA certs to verify with ctx.load_verify_locations(None, Constants.UNITTEST_DIR) conn = HTTPSConnection(Constants.HOSTNAME, port=Constants.PORT, ssl_context=ctx) conn.connect() self.assertRaises(SSL.Error, conn.request, 'GET', '/')
def doRead(self): if self.writeBlockedOnRead: self.writeBlockedOnRead = 0 self._resetReadWrite() try: return Connection.doRead(self) except SSL.ZeroReturnError: return main.CONNECTION_DONE except SSL.WantReadError: return except SSL.WantWriteError: self.readBlockedOnWrite = 1 Connection.startWriting(self) Connection.stopReading(self) return except SSL.SysCallError, (retval, desc): if ((retval == -1 and desc == 'Unexpected EOF') or retval > 0): return main.CONNECTION_LOST log.err() return main.CONNECTION_LOST except SSL.Error, e: return e
def _cbLostConns(self, results): (sSuccess, sResult), (cSuccess, cResult) = results self.failIf(sSuccess) self.failIf(cSuccess) acceptableErrors = [SSL.Error] # Rather than getting a verification failure on Windows, we are getting # a connection failure. Without something like sslverify proxying # in-between we can't fix up the platform's errors, so let's just # specifically say it is only OK in this one case to keep the tests # passing. Normally we'd like to be as strict as possible here, so # we're not going to allow this to report errors incorrectly on any # other platforms. if platform.isWindows(): from twisted.internet.error import ConnectionLost acceptableErrors.append(ConnectionLost) sResult.trap(*acceptableErrors) cResult.trap(*acceptableErrors) return self.serverPort.stopListening()
def testFailedCertificateVerification(self): onServerLost = defer.Deferred() onClientLost = defer.Deferred() self.loopback(sslverify.OpenSSLCertificateOptions(privateKey=self.sKey, certificate=self.sCert, verify=False, requireCertificate=False), sslverify.OpenSSLCertificateOptions(verify=True, requireCertificate=False, caCerts=[self.cCert]), onServerLost=onServerLost, onClientLost=onClientLost) d = defer.DeferredList([onClientLost, onServerLost], consumeErrors=True) def afterLost(((cSuccess, cResult), (sSuccess, sResult))): self.failIf(cSuccess) self.failIf(sSuccess) # Twisted trunk will do the correct thing here, and not log any # errors. Twisted 2.1 will do the wrong thing. We're flushing # errors until the buildbot is updated to a reasonable facsimilie # of 2.2. log.flushErrors(SSL.Error)
def log_request(self, code='-', size='-'): msg = self.requestline code = str(code) if termcolor: color = termcolor.colored if code[0] == '1': # 1xx - Informational msg = color(msg, attrs=['bold']) if code[0] == '2': # 2xx - Success msg = color(msg, color='white') elif code == '304': # 304 - Resource Not Modified msg = color(msg, color='cyan') elif code[0] == '3': # 3xx - Redirection msg = color(msg, color='green') elif code == '404': # 404 - Resource Not Found msg = color(msg, color='yellow') elif code[0] == '4': # 4xx - Client Error msg = color(msg, color='red', attrs=['bold']) else: # 5xx, or any other response msg = color(msg, color='magenta', attrs=['bold']) self.log('info', '"%s" %s %s', msg, code, size)
def _test_dtls_ciphersuite(self, server_connectivity_info, dtls_version, cipher, port): """This function is used by threads to it investigates with support the cipher suite on server, when DTLS protocol(s) is/are tested. Returns instance of class AcceptCipher or RejectCipher. Args: server_connectivity_info (ServerConnectivityInfo): contains information for connection on server dtls_version (str): contains SSL/TLS protocol version, which is used to connect cipher (str): contains OpenSSL shortcut for identification cipher suite port (int): contains port number for connecting comunication. """ cnx = SSL.Context(dtls_version) cnx.set_cipher_list(cipher) conn = SSL.Connection(cnx, socket.socket(socket.AF_INET, socket.SOCK_DGRAM)) try: conn.connect((server_connectivity_info.ip_address, port)) conn.do_handshake() except SSL.Error as e: error_msg = ((e[0])[0])[2] cipher_result = RejectCipher(TLS_OPENSSL_TO_RFC_NAMES_MAPPING[cipher], error_msg) else: cipher_result = AcceptCipher(TLS_OPENSSL_TO_RFC_NAMES_MAPPING[cipher]) finally: conn.shutdown() conn.close() return cipher_result
def _safe_ssl_call(self, suppress_ragged_eofs, call, *args, **kwargs): """Wrap the given call with SSL error-trapping.""" start = time.time() while True: try: return call(*args, **kwargs) except (ossl.WantReadError, ossl.WantWriteError): # Sleep and try again. This is dangerous, because it means # the rest of the stack has no way of differentiating # between a "new handshake" error and "client dropped". # Note this isn't an endless loop: there's a timeout below. time.sleep(self.SSL_RETRY) except ossl.Error as e: if suppress_ragged_eofs and e.args == (-1, 'Unexpected EOF'): return b'' raise socket.error(e.args[0]) if time.time() - start > self.SSL_TIMEOUT: raise socket.timeout('timed out')
def send(self, data, flags=0, timeout=timeout_default): if timeout is timeout_default: timeout = self.timeout while True: try: return self._sock.send(data, flags) except SSL.WantWriteError, ex: if self.timeout == 0.0: raise timeout(str(ex)) else: sys.exc_clear() wait_write(self.fileno(), timeout=timeout) except SSL.WantReadError, ex: if self.timeout == 0.0: raise timeout(str(ex)) else: sys.exc_clear() wait_read(self.fileno(), timeout=timeout) except SSL.SysCallError, ex: if ex[0] == -1 and data == "": # errors when writing empty strings are expected and can be ignored return 0 raise sslerror(SysCallError_code_mapping.get(ex.args[0], ex.args[0]), ex.args[1]) except SSL.Error, ex: raise sslerror(str(ex))
def recv(self, buflen): pending = self._sock.pending() if pending: return self._sock.recv(min(pending, buflen)) while True: try: return self._sock.recv(buflen) except SSL.WantReadError, ex: if self.timeout == 0.0: raise timeout(str(ex)) else: sys.exc_clear() wait_read(self.fileno(), timeout=self.timeout) except SSL.WantWriteError, ex: if self.timeout == 0.0: raise timeout(str(ex)) else: sys.exc_clear() wait_read(self.fileno(), timeout=self.timeout) except SSL.ZeroReturnError: return '' except SSL.SysCallError, ex: raise sslerror(SysCallError_code_mapping.get(ex.args[0], ex.args[0]), ex.args[1]) except SSL.Error, ex: raise sslerror(str(ex))
def do_handshake(self): self.update_flags() if not isinstance(self.sock, SSL.Connection): return self.handshaking = True self.ssl_write = None try: self.sock.do_handshake() except SSL.WantWriteError: self.ssl_write = True except SSL.WantReadError: pass except SSL.Error: self.close() else: self.handshaking = False
def _cbLostConns(self, results): (sSuccess, sResult), (cSuccess, cResult) = results self.assertFalse(sSuccess) self.assertFalse(cSuccess) acceptableErrors = [SSL.Error] # Rather than getting a verification failure on Windows, we are getting # a connection failure. Without something like sslverify proxying # in-between we can't fix up the platform's errors, so let's just # specifically say it is only OK in this one case to keep the tests # passing. Normally we'd like to be as strict as possible here, so # we're not going to allow this to report errors incorrectly on any # other platforms. if platform.isWindows(): from twisted.internet.error import ConnectionLost acceptableErrors.append(ConnectionLost) sResult.trap(*acceptableErrors) cResult.trap(*acceptableErrors) return self.serverPort.stopListening()
def test_validHostnameInvalidCertificate(self): """ When an invalid certificate containing a perfectly valid hostname is received, the connection is aborted with an OpenSSL error. """ cProto, sProto, pump = self.serviceIdentitySetup( u"valid.example.com", u"valid.example.com", validCertificate=False, ) self.assertEqual(cProto.wrappedProtocol.data, b'') self.assertEqual(sProto.wrappedProtocol.data, b'') cErr = cProto.wrappedProtocol.lostReason.value sErr = sProto.wrappedProtocol.lostReason.value self.assertIsInstance(cErr, SSL.Error) self.assertIsInstance(sErr, SSL.Error)
def test_realCAsBetterNotSignOurBogusTestCerts(self): """ If we use the default trust from the platform, our dinky certificate should I{really} fail. """ cProto, sProto, pump = self.serviceIdentitySetup( u"valid.example.com", u"valid.example.com", validCertificate=False, useDefaultTrust=True, ) self.assertEqual(cProto.wrappedProtocol.data, b'') self.assertEqual(sProto.wrappedProtocol.data, b'') cErr = cProto.wrappedProtocol.lostReason.value sErr = sProto.wrappedProtocol.lostReason.value self.assertIsInstance(cErr, SSL.Error) self.assertIsInstance(sErr, SSL.Error)
def test_clientPresentsBadCertificate(self): """ When the server verifies and the client presents an invalid certificate for that verification by passing it to L{sslverify.optionsForClientTLS}, the connection cannot be established with an SSL error. """ cProto, sProto, pump = self.serviceIdentitySetup( u"valid.example.com", u"valid.example.com", validCertificate=True, serverVerifies=True, validClientCertificate=False, clientPresentsCertificate=True, ) self.assertEqual(cProto.wrappedProtocol.data, b'') cErr = cProto.wrappedProtocol.lostReason.value sErr = sProto.wrappedProtocol.lostReason.value self.assertIsInstance(cErr, SSL.Error) self.assertIsInstance(sErr, SSL.Error)
def test_surpriseFromInfoCallback(self): """ pyOpenSSL isn't always so great about reporting errors. If one occurs in the verification info callback, it should be logged and the connection should be shut down (if possible, anyway; the app_data could be clobbered but there's no point testing for that). """ cProto, sProto, pump = self.serviceIdentitySetup( u"correct-host.example.com", u"correct-host.example.com", buggyInfoCallback=True, ) self.assertEqual(cProto.wrappedProtocol.data, b'') self.assertEqual(sProto.wrappedProtocol.data, b'') cErr = cProto.wrappedProtocol.lostReason.value sErr = sProto.wrappedProtocol.lostReason.value self.assertIsInstance(cErr, ZeroDivisionError) self.assertIsInstance(sErr, (ConnectionClosed, SSL.Error)) errors = self.flushLoggedErrors(ZeroDivisionError) self.assertTrue(errors)
def _checkHandshakeStatus(self): """ Ask OpenSSL to proceed with a handshake in progress. Initially, this just sends the ClientHello; after some bytes have been stuffed in to the C{Connection} object by C{dataReceived}, it will then respond to any C{Certificate} or C{KeyExchange} messages. """ # The connection might already be aborted (eg. by a callback during # connection setup), so don't even bother trying to handshake in that # case. if self._aborted: return try: self._tlsConnection.do_handshake() except WantReadError: self._flushSendBIO() except Error: self._tlsShutdownFinished(Failure()) else: self._handshakeDone = True if IHandshakeListener.providedBy(self.wrappedProtocol): self.wrappedProtocol.handshakeCompleted()
def _shutdownTLS(self): """ Initiate, or reply to, the shutdown handshake of the TLS layer. """ try: shutdownSuccess = self._tlsConnection.shutdown() except Error: # Mid-handshake, a call to shutdown() can result in a # WantWantReadError, or rather an SSL_ERR_WANT_READ; but pyOpenSSL # doesn't allow us to get at the error. See: # https://github.com/pyca/pyopenssl/issues/91 shutdownSuccess = False self._flushSendBIO() if shutdownSuccess: # Both sides have shutdown, so we can start closing lower-level # transport. This will also happen if we haven't started # negotiation at all yet, in which case shutdown succeeds # immediately. self.transport.loseConnection()