我们从Python开源项目中,提取了以下36个代码示例,用于说明如何使用OpenSSL.SSL.VERIFY_PEER。
def test03_ssl_verification_of_peer_fails(self): ctx = SSL.Context(SSL.TLSv1_METHOD) def verify_callback(conn, x509, errnum, errdepth, preverify_ok): log.debug('SSL peer certificate verification failed for %r', x509.get_subject()) return preverify_ok ctx.set_verify(SSL.VERIFY_PEER, verify_callback) ctx.set_verify_depth(9) # Set bad location - unit test dir has no CA certs to verify with ctx.load_verify_locations(None, Constants.UNITTEST_DIR) conn = HTTPSConnection(Constants.HOSTNAME, port=Constants.PORT, ssl_context=ctx) conn.connect() self.assertRaises(SSL.Error, conn.request, 'GET', '/')
def test03_ssl_verification_of_peer_succeeds(self): ctx = SSL.Context(SSL.TLSv1_METHOD) verify_callback = lambda conn, x509, errnum, errdepth, preverify_ok: \ preverify_ok ctx.set_verify(SSL.VERIFY_PEER, verify_callback) ctx.set_verify_depth(9) # Set correct location for CA certs to verify with ctx.load_verify_locations(None, Constants.CACERT_DIR) conn = HTTPSConnection(Constants.HOSTNAME, port=Constants.PORT, ssl_context=ctx) conn.connect() conn.request('GET', '/') resp = conn.getresponse() print('Response = %s' % resp.read())
def test04_ssl_verification_with_subj_alt_name(self): ctx = SSL.Context(SSL.TLSv1_METHOD) verification = ServerSSLCertVerification(hostname='localhost') verify_callback = verification.get_verify_server_cert_func() ctx.set_verify(SSL.VERIFY_PEER, verify_callback) ctx.set_verify_depth(9) # Set correct location for CA certs to verify with ctx.load_verify_locations(None, Constants.CACERT_DIR) conn = HTTPSConnection(Constants.HOSTNAME, port=Constants.PORT, ssl_context=ctx) conn.connect() conn.request('GET', '/') resp = conn.getresponse() print('Response = %s' % resp.read())
def _server(self, sock): """ Create a new server-side SSL L{Connection} object wrapped around C{sock}. """ # Create the server side Connection. This is mostly setup boilerplate # - use TLSv1, use a particular certificate, etc. server_ctx = Context(TLSv1_METHOD) server_ctx.set_options(OP_NO_SSLv2 | OP_NO_SSLv3 | OP_SINGLE_DH_USE ) server_ctx.set_verify(VERIFY_PEER|VERIFY_FAIL_IF_NO_PEER_CERT|VERIFY_CLIENT_ONCE, verify_cb) server_store = server_ctx.get_cert_store() server_ctx.use_privatekey(load_privatekey(FILETYPE_PEM, server_key_pem)) server_ctx.use_certificate(load_certificate(FILETYPE_PEM, server_cert_pem)) server_ctx.check_privatekey() server_store.add_cert(load_certificate(FILETYPE_PEM, root_cert_pem)) # Here the Connection is actually created. If None is passed as the 2nd # parameter, it indicates a memory BIO should be created. server_conn = Connection(server_ctx, sock) server_conn.set_accept_state() return server_conn
def _client(self, sock): """ Create a new client-side SSL L{Connection} object wrapped around C{sock}. """ # Now create the client side Connection. Similar boilerplate to the # above. client_ctx = Context(TLSv1_METHOD) client_ctx.set_options(OP_NO_SSLv2 | OP_NO_SSLv3 | OP_SINGLE_DH_USE ) client_ctx.set_verify(VERIFY_PEER|VERIFY_FAIL_IF_NO_PEER_CERT|VERIFY_CLIENT_ONCE, verify_cb) client_store = client_ctx.get_cert_store() client_ctx.use_privatekey(load_privatekey(FILETYPE_PEM, client_key_pem)) client_ctx.use_certificate(load_certificate(FILETYPE_PEM, client_cert_pem)) client_ctx.check_privatekey() client_store.add_cert(load_certificate(FILETYPE_PEM, root_cert_pem)) client_conn = Connection(client_ctx, sock) client_conn.set_connect_state() return client_conn
def create_dest_sslcontext(insecure=False, trusted_ca_certs="", alpn=None): ssl_ctx = create_basic_sslcontext() if not insecure: trusted_ca_certs = trusted_ca_certs or certifi.where() ssl_ctx.load_verify_locations(trusted_ca_certs) ssl_ctx.set_verify( SSL.VERIFY_PEER | SSL.VERIFY_FAIL_IF_NO_PEER_CERT, certificate_verify_cb) else: ssl_ctx.set_verify(SSL.VERIFY_NONE, certificate_verify_cb) if alpn and HAS_ALPN: ssl_ctx.set_alpn_protos(alpn) return ssl_ctx
def __enter__(self): self.ctx = SSL.Context(SSL.TLSv1_METHOD) if self.verify: self.ctx.set_verify(SSL.VERIFY_PEER, self.validate_cert) self.ctx.load_verify_locations(self.verify) self.trusted_cert = crypto.load_certificate(crypto.FILETYPE_PEM, file(self.verify).read()) self.ctx.use_privatekey(self.pkey) self.ctx.use_certificate(self.cert) self.sock = SSL.Connection(self.ctx, socket.socket(socket.AF_INET, socket.SOCK_STREAM)) self.sock.connect((self.host, self.port)) return self
def set_peer_verification_for_url_hostname(ssl_context, url, if_verify_enabled=False): '''Convenience routine to set peer verification callback based on ServerSSLCertVerification class''' if not if_verify_enabled or (ssl_context.get_verify_mode() & SSL.VERIFY_PEER): urlObj = urlparse_.urlparse(url) hostname = urlObj.hostname server_ssl_cert_verif = ServerSSLCertVerification(hostname=hostname) verify_callback_ = server_ssl_cert_verif.get_verify_server_cert_func() ssl_context.set_verify(SSL.VERIFY_PEER, verify_callback_)
def test04_open_peer_cert_verification_fails(self): # Explicitly set empty CA directory to make verification fail ctx = SSL.Context(SSL.TLSv1_METHOD) verify_callback = lambda conn, x509, errnum, errdepth, preverify_ok: \ preverify_ok ctx.set_verify(SSL.VERIFY_PEER, verify_callback) ctx.load_verify_locations(None, './') opener = build_opener(ssl_context=ctx) self.assertRaises(SSL.Error, opener.open, Constants.TEST_URI)
def _load_verify_locations_test(self, *args): (server, client) = socket_pair() clientContext = Context(TLSv1_METHOD) clientContext.load_verify_locations(*args) # Require that the server certificate verify properly or the # connection will fail. clientContext.set_verify( VERIFY_PEER, lambda conn, cert, errno, depth, preverify_ok: preverify_ok) clientSSL = Connection(clientContext, client) clientSSL.set_connect_state() serverContext = Context(TLSv1_METHOD) serverContext.use_certificate( load_certificate(FILETYPE_PEM, cleartextCertificatePEM)) serverContext.use_privatekey( load_privatekey(FILETYPE_PEM, cleartextPrivateKeyPEM)) serverSSL = Connection(serverContext, server) serverSSL.set_accept_state() for i in range(3): for ssl in clientSSL, serverSSL: try: # Without load_verify_locations above, the handshake # will fail: # Error: [('SSL routines', 'SSL3_GET_SERVER_CERTIFICATE', # 'certificate verify failed')] ssl.do_handshake() except WantReadError: pass cert = clientSSL.get_peer_certificate() self.assertEqual(cert.get_subject().CN, 'Testing Root CA')
def test_set_default_verify_paths(self): """ L{Context.set_default_verify_paths} causes the platform-specific CA certificate locations to be used for verification purposes. """ # Testing this requires a server with a certificate signed by one of # the CAs in the platform CA location. Getting one of those costs # money. Fortunately (or unfortunately, depending on your # perspective), it's easy to think of a public server on the # internet which has such a certificate. Connecting to the network # in a unit test is bad, but it's the only way I can think of to # really test this. -exarkun # Arg, verisign.com doesn't speak TLSv1 context = Context(SSLv3_METHOD) context.set_default_verify_paths() context.set_verify( VERIFY_PEER, lambda conn, cert, errno, depth, preverify_ok: preverify_ok) client = socket() client.connect(('verisign.com', 443)) clientSSL = Connection(context, client) clientSSL.set_connect_state() clientSSL.do_handshake() clientSSL.send('GET / HTTP/1.0\r\n\r\n') self.assertTrue(clientSSL.recv(1024))
def go(): port = socket() port.bind(('', 0)) port.listen(1) called = [] def info(*args): print count.next() called.append(None) return 1 context = Context(TLSv1_METHOD) context.set_verify(VERIFY_PEER, info) context.use_certificate( load_certificate(FILETYPE_PEM, cleartextCertificatePEM)) context.use_privatekey( load_privatekey(FILETYPE_PEM, cleartextPrivateKeyPEM)) while 1: client = socket() client.setblocking(False) client.connect_ex(port.getsockname()) clientSSL = Connection(context, client) clientSSL.set_connect_state() server, ignored = port.accept() server.setblocking(False) serverSSL = Connection(context, server) serverSSL.set_accept_state() del called[:] while not called: for ssl in clientSSL, serverSSL: try: ssl.send('foo') except WantReadError, e: pass
def test_handshake_fail(self): def verify_cb(conn, x509, err_num, err_depth, err_code): return False server_future = self.server_start_tls(_server_ssl_options()) client_future = self.client_start_tls( _client_ssl_options(SSL.VERIFY_PEER, verify_cb)) with self.assertRaises(SSL.Error): yield client_future with self.assertRaises((SSL.Error, socket.error)): yield server_future
def test_check_hostname(self): def verify_cb(conn, x509, err_num, err_depth, err_code): return True server_future = self.server_start_tls(_server_ssl_options()) client_future = self.client_start_tls( _client_ssl_options(SSL.VERIFY_PEER, verify_cb), server_hostname=b'localhost') with self.assertRaises(VerificationError): yield client_future # TODO: server will not raise. # with self.assertRaises(Exception): yield server_future
def _makeContext(self): ctx = SSL.Context(self.method) ctx.set_app_data(_SSLApplicationData()) if self.certificate is not None and self.privateKey is not None: ctx.use_certificate(self.certificate) ctx.use_privatekey(self.privateKey) # Sanity check ctx.check_privatekey() verifyFlags = SSL.VERIFY_NONE if self.verify: verifyFlags = SSL.VERIFY_PEER | SSL.VERIFY_FAIL_IF_NO_PEER_CERT if self.caCerts: store = ctx.get_cert_store() for cert in self.caCerts: store.add_cert(cert) def _trackVerificationProblems(conn,cert,errno,depth,preverify_ok): return True ctx.set_verify(verifyFlags, _trackVerificationProblems) if self.enableSessions: sessionName = md5.md5("%s-%d" % (reflect.qual(self.__class__), _sessionCounter())).hexdigest() ctx.set_session_id(sessionName) return ctx
def ssl_context(cacert, srvcrt, srvkey): # general setup: TLSv1.2, no compression, paranoid ciphers sslctx = SSL.Context(SSL.TLSv1_2_METHOD) sslctx.set_verify_depth(9) sslctx.set_options(SSL.OP_NO_COMPRESSION) sslctx.set_mode(_ssl_lib.SSL_MODE_ENABLE_PARTIAL_WRITE | _ssl_lib.SSL_MODE_ACCEPT_MOVING_WRITE_BUFFER) sslctx.set_cipher_list(libmu.defs.Defs.cipher_list) sslctx.set_verify(SSL.VERIFY_PEER | SSL.VERIFY_FAIL_IF_NO_PEER_CERT, lambda _, __, ___, ____, ok: ok) # use CA cert provided during lambda invocation fmt_cert = format_ssl_cert(cacert) x509_cert = crypto.load_certificate(crypto.FILETYPE_PEM, fmt_cert) sslctx.get_cert_store().add_cert(x509_cert) # add my certificate chain has_cert = False for cert in srvcrt.split(' '): x509_cert = crypto.load_certificate(crypto.FILETYPE_PEM, format_ssl_cert(cert)) if not has_cert: sslctx.use_certificate(x509_cert) has_cert = True else: sslctx.add_extra_chain_cert(x509_cert) # private key sslctx.use_privatekey(crypto.load_privatekey(crypto.FILETYPE_PEM, format_ssl_key(srvkey))) # check that all's well sslctx.check_privatekey() return sslctx ### # SSLize a connected socket, requiring a supplied cacert ###
def testFailedVerify(self): org = "twisted.test.test_ssl" self.setupServerAndClient( (org, org + ", client"), {}, (org, org + ", server"), {}) def verify(*a): return False self.clientCtxFactory.getContext().set_verify(SSL.VERIFY_PEER, verify) serverConnLost = defer.Deferred() serverProtocol = protocol.Protocol() serverProtocol.connectionLost = serverConnLost.callback serverProtocolFactory = protocol.ServerFactory() serverProtocolFactory.protocol = lambda: serverProtocol self.serverPort = serverPort = reactor.listenSSL(0, serverProtocolFactory, self.serverCtxFactory) clientConnLost = defer.Deferred() clientProtocol = protocol.Protocol() clientProtocol.connectionLost = clientConnLost.callback clientProtocolFactory = protocol.ClientFactory() clientProtocolFactory.protocol = lambda: clientProtocol reactor.connectSSL('127.0.0.1', serverPort.getHost().port, clientProtocolFactory, self.clientCtxFactory) dl = defer.DeferredList([serverConnLost, clientConnLost], consumeErrors=True) return dl.addCallback(self._cbLostConns)
def make_ssl_context(key_file=None, cert_file=None, pem_file=None, ca_dir=None, verify_peer=False, url=None, method=SSL.TLSv1_METHOD, key_file_passphrase=None): """ Creates SSL context containing certificate and key file locations. """ ssl_context = SSL.Context(method) # Key file defaults to certificate file if present. if cert_file: ssl_context.use_certificate_file(cert_file) if key_file_passphrase: passwd_cb = lambda max_passphrase_len, set_prompt, userdata: \ key_file_passphrase ssl_context.set_passwd_cb(passwd_cb) if key_file: ssl_context.use_privatekey_file(key_file) elif cert_file: ssl_context.use_privatekey_file(cert_file) if pem_file or ca_dir: ssl_context.load_verify_locations(pem_file, ca_dir) def _callback(conn, x509, errnum, errdepth, preverify_ok): """Default certification verification callback. Performs no checks and returns the status passed in. """ return preverify_ok verify_callback = _callback if verify_peer: ssl_context.set_verify_depth(9) if url: set_peer_verification_for_url_hostname(ssl_context, url) else: ssl_context.set_verify(SSL.VERIFY_PEER, verify_callback) else: ssl_context.set_verify(SSL.VERIFY_NONE, verify_callback) return ssl_context
def _makeContext(self): ctx = SSL.Context(self.method) ctx.set_app_data(_SSLApplicationData()) if self.certificate is not None and self.privateKey is not None: ctx.use_certificate(self.certificate) ctx.use_privatekey(self.privateKey) # Sanity check ctx.check_privatekey() verifyFlags = SSL.VERIFY_NONE if self.verify: verifyFlags = SSL.VERIFY_PEER if self.requireCertificate: verifyFlags |= SSL.VERIFY_FAIL_IF_NO_PEER_CERT if self.verifyOnce: verifyFlags |= SSL.VERIFY_CLIENT_ONCE if self.caCerts: store = ctx.get_cert_store() for cert in self.caCerts: store.add_cert(cert) def _trackVerificationProblems(conn,cert,errno,depth,preverify_ok): # retcode is the answer OpenSSL's default verifier would have # given, had we allowed it to run. if not preverify_ok: ctx.get_app_data().problems.append(OpenSSLVerifyError(cert, errno, depth)) return preverify_ok ctx.set_verify(verifyFlags, _trackVerificationProblems) if self.verifyDepth is not None: ctx.set_verify_depth(self.verifyDepth) if self.enableSingleUseKeys: ctx.set_options(SSL.OP_SINGLE_DH_USE) if self.fixBrokenPeers: ctx.set_options(self._OP_ALL) if self.enableSessions: sessionName = md5.md5("%s-%d" % (reflect.qual(self.__class__), _sessionCounter())).hexdigest() ctx.set_session_id(sessionName) return ctx
def _makeContext(self): ctx = self._contextFactory(self.method) ctx.set_options(self._options) ctx.set_mode(self._mode) if self.certificate is not None and self.privateKey is not None: ctx.use_certificate(self.certificate) ctx.use_privatekey(self.privateKey) for extraCert in self.extraCertChain: ctx.add_extra_chain_cert(extraCert) # Sanity check ctx.check_privatekey() verifyFlags = SSL.VERIFY_NONE if self.verify: verifyFlags = SSL.VERIFY_PEER if self.requireCertificate: verifyFlags |= SSL.VERIFY_FAIL_IF_NO_PEER_CERT if self.verifyOnce: verifyFlags |= SSL.VERIFY_CLIENT_ONCE self.trustRoot._addCACertsToContext(ctx) # It'd be nice if pyOpenSSL let us pass None here for this behavior (as # the underlying OpenSSL API call allows NULL to be passed). It # doesn't, so we'll supply a function which does the same thing. def _verifyCallback(conn, cert, errno, depth, preverify_ok): return preverify_ok ctx.set_verify(verifyFlags, _verifyCallback) if self.verifyDepth is not None: ctx.set_verify_depth(self.verifyDepth) if self.enableSessions: name = "%s-%d" % (reflect.qual(self.__class__), _sessionCounter()) sessionName = md5(networkString(name)).hexdigest() ctx.set_session_id(sessionName.encode('ascii')) if self.dhParameters: ctx.load_tmp_dh(self.dhParameters._dhFile.path) ctx.set_cipher_list(self._cipherString.encode('ascii')) if self._ecCurve is not None: try: self._ecCurve.addECKeyToContext(ctx) except BaseException: pass # ECDHE support is best effort only. if self._acceptableProtocols: # Try to set NPN and ALPN. _acceptableProtocols cannot be set by # the constructor unless at least one mechanism is supported. _setAcceptableProtocols(ctx, self._acceptableProtocols) return ctx