我们从Python开源项目中,提取了以下18个代码示例,用于说明如何使用OpenSSL.SSL.VERIFY_NONE。
def ssl(sock, keyfile=None, certfile=None): context = SSL.Context(SSL.SSLv23_METHOD) if certfile is not None: context.use_certificate_file(certfile) if keyfile is not None: context.use_privatekey_file(keyfile) context.set_verify(SSL.VERIFY_NONE, lambda *x: True) timeout = sock.gettimeout() try: sock = sock._sock except AttributeError: pass connection = SSL.Connection(context, sock) ssl_sock = SSLObject(connection) ssl_sock.settimeout(timeout) try: sock.getpeername() except Exception: # no, no connection yet pass else: # yes, do the handshake ssl_sock.do_handshake() return ssl_sock
def test_start_tls_smtp(self): def verify_cb(conn, x509, err_num, err_depth, err_code): return True # This flow is simplified from RFC 3207 section 5. # We don't really need all of this, but it helps to make sure # that after realistic back-and-forth traffic the buffers end up # in a sane state. yield self.server_send_line(b"220 mail.example.com ready\r\n") yield self.client_send_line(b"EHLO mail.example.com\r\n") yield self.server_send_line(b"250-mail.example.com welcome\r\n") yield self.server_send_line(b"250 STARTTLS\r\n") yield self.client_send_line(b"STARTTLS\r\n") yield self.server_send_line(b"220 Go ahead\r\n") client_future = self.client_start_tls( _client_ssl_options(SSL.VERIFY_NONE, verify_cb)) server_future = self.server_start_tls(_server_ssl_options()) self.client_stream = yield client_future self.server_stream = yield server_future self.assertTrue(isinstance(self.client_stream, MicroProxySSLIOStream)) self.assertTrue(isinstance(self.server_stream, MicroProxySSLIOStream)) yield self.client_send_line(b"EHLO mail.example.com\r\n") yield self.server_send_line(b"250 mail.example.com welcome\r\n")
def create_dest_sslcontext(insecure=False, trusted_ca_certs="", alpn=None): ssl_ctx = create_basic_sslcontext() if not insecure: trusted_ca_certs = trusted_ca_certs or certifi.where() ssl_ctx.load_verify_locations(trusted_ca_certs) ssl_ctx.set_verify( SSL.VERIFY_PEER | SSL.VERIFY_FAIL_IF_NO_PEER_CERT, certificate_verify_cb) else: ssl_ctx.set_verify(SSL.VERIFY_NONE, certificate_verify_cb) if alpn and HAS_ALPN: ssl_ctx.set_alpn_protos(alpn) return ssl_ctx
def _make_client_iostream(self, connection, **kwargs): def verify_cb(conn, x509, err_num, err_depth, err_code): return True dest_context = _client_ssl_options(SSL.VERIFY_NONE, verify_cb) return MicroProxySSLIOStream( connection, io_loop=self.io_loop, ssl_options=dest_context, **kwargs)
def getContext(self): ctx = ssl.ClientContextFactory.getContext(self) #TODO: replace VERIFY_NONE with VERIFY_PEER when we have #a real server with a valid CA signed cert. If that doesn't #work it'll be possible to use self-signed certs, if they're distributed, #by placing the cert.pem file and location in the config and uncommenting #the ctx.load_verify_locations line. #As it stands this is using non-authenticated certs, meaning MITM exposed. ctx.set_verify(SSL.VERIFY_NONE, verifyCallback) #ctx.load_verify_locations("/path/to/cert.pem") return ctx
def _makeContext(self): ctx = SSL.Context(self.method) ctx.set_app_data(_SSLApplicationData()) if self.certificate is not None and self.privateKey is not None: ctx.use_certificate(self.certificate) ctx.use_privatekey(self.privateKey) # Sanity check ctx.check_privatekey() verifyFlags = SSL.VERIFY_NONE if self.verify: verifyFlags = SSL.VERIFY_PEER | SSL.VERIFY_FAIL_IF_NO_PEER_CERT if self.caCerts: store = ctx.get_cert_store() for cert in self.caCerts: store.add_cert(cert) def _trackVerificationProblems(conn,cert,errno,depth,preverify_ok): return True ctx.set_verify(verifyFlags, _trackVerificationProblems) if self.enableSessions: sessionName = md5.md5("%s-%d" % (reflect.qual(self.__class__), _sessionCounter())).hexdigest() ctx.set_session_id(sessionName) return ctx
def make_ssl_context(key_file=None, cert_file=None, pem_file=None, ca_dir=None, verify_peer=False, url=None, method=SSL.TLSv1_METHOD, key_file_passphrase=None): """ Creates SSL context containing certificate and key file locations. """ ssl_context = SSL.Context(method) # Key file defaults to certificate file if present. if cert_file: ssl_context.use_certificate_file(cert_file) if key_file_passphrase: passwd_cb = lambda max_passphrase_len, set_prompt, userdata: \ key_file_passphrase ssl_context.set_passwd_cb(passwd_cb) if key_file: ssl_context.use_privatekey_file(key_file) elif cert_file: ssl_context.use_privatekey_file(cert_file) if pem_file or ca_dir: ssl_context.load_verify_locations(pem_file, ca_dir) def _callback(conn, x509, errnum, errdepth, preverify_ok): """Default certification verification callback. Performs no checks and returns the status passed in. """ return preverify_ok verify_callback = _callback if verify_peer: ssl_context.set_verify_depth(9) if url: set_peer_verification_for_url_hostname(ssl_context, url) else: ssl_context.set_verify(SSL.VERIFY_PEER, verify_callback) else: ssl_context.set_verify(SSL.VERIFY_NONE, verify_callback) return ssl_context
def _makeContext(self): ctx = SSL.Context(self.method) ctx.set_app_data(_SSLApplicationData()) if self.certificate is not None and self.privateKey is not None: ctx.use_certificate(self.certificate) ctx.use_privatekey(self.privateKey) # Sanity check ctx.check_privatekey() verifyFlags = SSL.VERIFY_NONE if self.verify: verifyFlags = SSL.VERIFY_PEER if self.requireCertificate: verifyFlags |= SSL.VERIFY_FAIL_IF_NO_PEER_CERT if self.verifyOnce: verifyFlags |= SSL.VERIFY_CLIENT_ONCE if self.caCerts: store = ctx.get_cert_store() for cert in self.caCerts: store.add_cert(cert) def _trackVerificationProblems(conn,cert,errno,depth,preverify_ok): # retcode is the answer OpenSSL's default verifier would have # given, had we allowed it to run. if not preverify_ok: ctx.get_app_data().problems.append(OpenSSLVerifyError(cert, errno, depth)) return preverify_ok ctx.set_verify(verifyFlags, _trackVerificationProblems) if self.verifyDepth is not None: ctx.set_verify_depth(self.verifyDepth) if self.enableSingleUseKeys: ctx.set_options(SSL.OP_SINGLE_DH_USE) if self.fixBrokenPeers: ctx.set_options(self._OP_ALL) if self.enableSessions: sessionName = md5.md5("%s-%d" % (reflect.qual(self.__class__), _sessionCounter())).hexdigest() ctx.set_session_id(sessionName) return ctx
def _do_ssl_handshake(self): try: self._handshake_reading = False self._handshake_writing = False self.socket.do_handshake() except SSL.WantReadError: self._handshake_reading = True return except SSL.WantWriteError: self._handshake_writing = True return except SSL.SysCallError as e: err_num = abs(e[0]) if err_num in (errno.EBADF, errno.ENOTCONN, errno.EPERM): return self.close(exc_info=True) raise except SSL.Error as err: try: peer = self.socket.getpeername() except Exception: peer = '(not connected)' logger.warning("SSL Error on %s %s: %s", self.socket.fileno(), peer, err) return self.close(exc_info=True) except AttributeError: return self.close(exc_info=True) else: self._ssl_accepting = False verify_mode = self.socket.get_context().get_verify_mode() if (verify_mode != SSL.VERIFY_NONE and self._server_hostname is not None): try: verify_hostname(self.socket, self._server_hostname) except VerificationError as e: logger.warning("Invalid SSL certificate: {0}".format(e)) self.close(exc_info=True) return self._run_ssl_connect_callback()
def _makeContext(self): ctx = self._contextFactory(self.method) ctx.set_options(self._options) ctx.set_mode(self._mode) if self.certificate is not None and self.privateKey is not None: ctx.use_certificate(self.certificate) ctx.use_privatekey(self.privateKey) for extraCert in self.extraCertChain: ctx.add_extra_chain_cert(extraCert) # Sanity check ctx.check_privatekey() verifyFlags = SSL.VERIFY_NONE if self.verify: verifyFlags = SSL.VERIFY_PEER if self.requireCertificate: verifyFlags |= SSL.VERIFY_FAIL_IF_NO_PEER_CERT if self.verifyOnce: verifyFlags |= SSL.VERIFY_CLIENT_ONCE self.trustRoot._addCACertsToContext(ctx) # It'd be nice if pyOpenSSL let us pass None here for this behavior (as # the underlying OpenSSL API call allows NULL to be passed). It # doesn't, so we'll supply a function which does the same thing. def _verifyCallback(conn, cert, errno, depth, preverify_ok): return preverify_ok ctx.set_verify(verifyFlags, _verifyCallback) if self.verifyDepth is not None: ctx.set_verify_depth(self.verifyDepth) if self.enableSessions: name = "%s-%d" % (reflect.qual(self.__class__), _sessionCounter()) sessionName = md5(networkString(name)).hexdigest() ctx.set_session_id(sessionName.encode('ascii')) if self.dhParameters: ctx.load_tmp_dh(self.dhParameters._dhFile.path) ctx.set_cipher_list(self._cipherString.encode('ascii')) if self._ecCurve is not None: try: self._ecCurve.addECKeyToContext(ctx) except BaseException: pass # ECDHE support is best effort only. if self._acceptableProtocols: # Try to set NPN and ALPN. _acceptableProtocols cannot be set by # the constructor unless at least one mechanism is supported. _setAcceptableProtocols(ctx, self._acceptableProtocols) return ctx