我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用ssl.OP_NO_SSLv2()。
def init_poolmanager(self, connections, maxsize, block=DEFAULT_POOLBLOCK, **pool_kwargs): if self._cbapi_force_tls_1_2: # Force the use of TLS v1.2 when talking to this Cb Response server. context = create_urllib3_context(ciphers=('TLSv1.2:!aNULL:!eNULL:!MD5')) context.options |= ssl.OP_NO_SSLv2 context.options |= ssl.OP_NO_SSLv3 context.options |= ssl.OP_NO_TLSv1 context.options |= ssl.OP_NO_TLSv1_1 pool_kwargs['ssl_context'] = context if not self._cbapi_verify_hostname: # Provide the ability to validate a Carbon Black server's SSL certificate without validating the hostname # (by default Carbon Black certificates are "issued" as CN=Self-signed Carbon Black Enterprise Server # HTTPS Certificate) pool_kwargs["assert_hostname"] = False return super(CbAPISessionAdapter, self).init_poolmanager(connections, maxsize, block, **pool_kwargs)
def _create_transport_context(server_side, server_hostname): if server_side: raise ValueError('Server side SSL needs a valid SSLContext') # Client side may pass ssl=True to use a default # context; in that case the sslcontext passed is None. # The default is secure for client connections. if hasattr(ssl, 'create_default_context'): # Python 3.4+: use up-to-date strong settings. sslcontext = ssl.create_default_context() if not server_hostname: sslcontext.check_hostname = False else: # Fallback for Python 3.3. sslcontext = ssl.SSLContext(ssl.PROTOCOL_SSLv23) sslcontext.options |= ssl.OP_NO_SSLv2 sslcontext.options |= ssl.OP_NO_SSLv3 sslcontext.set_default_verify_paths() sslcontext.verify_mode = ssl.CERT_REQUIRED return sslcontext
def start(loop, host, port): global server sslctx = None if args.tls: import ssl # TODO: take cert/key from args as well. here = os.path.join(os.path.dirname(__file__), '..', 'tests') sslctx = ssl.SSLContext(ssl.PROTOCOL_SSLv23) sslctx.options |= ssl.OP_NO_SSLv2 sslctx.load_cert_chain( certfile=os.path.join(here, 'ssl_cert.pem'), keyfile=os.path.join(here, 'ssl_key.pem')) server = yield from loop.create_server(Service, host, port, ssl=sslctx) dprint('serving TLS' if sslctx else 'serving', [s.getsockname() for s in server.sockets]) yield from server.wait_closed()
def test_create_unix_server_ssl_verified(self): proto = MyProto(loop=self.loop) server, path = self._make_ssl_unix_server( lambda: proto, SIGNED_CERTFILE) sslcontext_client = ssl.SSLContext(ssl.PROTOCOL_SSLv23) sslcontext_client.options |= ssl.OP_NO_SSLv2 sslcontext_client.verify_mode = ssl.CERT_REQUIRED sslcontext_client.load_verify_locations(cafile=SIGNING_CA) if hasattr(sslcontext_client, 'check_hostname'): sslcontext_client.check_hostname = True # Connection succeeds with correct CA and server hostname. f_c = self.loop.create_unix_connection(MyProto, path, ssl=sslcontext_client, server_hostname='localhost') client, pr = self.loop.run_until_complete(f_c) # close connection proto.transport.close() client.close() server.close() self.loop.run_until_complete(proto.done)
def test_create_server_ssl_verified(self): proto = MyProto(loop=self.loop) server, host, port = self._make_ssl_server( lambda: proto, SIGNED_CERTFILE) sslcontext_client = ssl.SSLContext(ssl.PROTOCOL_SSLv23) sslcontext_client.options |= ssl.OP_NO_SSLv2 sslcontext_client.verify_mode = ssl.CERT_REQUIRED sslcontext_client.load_verify_locations(cafile=SIGNING_CA) if hasattr(sslcontext_client, 'check_hostname'): sslcontext_client.check_hostname = True # Connection succeeds with correct CA and server hostname. f_c = self.loop.create_connection(MyProto, host, port, ssl=sslcontext_client, server_hostname='localhost') client, pr = self.loop.run_until_complete(f_c) # close connection proto.transport.close() client.close() server.close() self.loop.run_until_complete(proto.done)
def _create_ssl_ctx(self, sslp): if isinstance(sslp, ssl.SSLContext): return sslp ca = sslp.get('ca') capath = sslp.get('capath') hasnoca = ca is None and capath is None ctx = ssl.create_default_context(cafile=ca, capath=capath) ctx.check_hostname = not hasnoca and sslp.get('check_hostname', True) ctx.verify_mode = ssl.CERT_NONE if hasnoca else ssl.CERT_REQUIRED if 'cert' in sslp: ctx.load_cert_chain(sslp['cert'], keyfile=sslp.get('key')) if 'cipher' in sslp: ctx.set_ciphers(sslp['cipher']) ctx.options |= ssl.OP_NO_SSLv2 ctx.options |= ssl.OP_NO_SSLv3 return ctx
def __init__(self, host, port=None, key_file=None, cert_file=None, strict=_strict_sentinel, timeout=socket._GLOBAL_DEFAULT_TIMEOUT, source_address=None, **_3to2kwargs): if 'check_hostname' in _3to2kwargs: check_hostname = _3to2kwargs['check_hostname']; del _3to2kwargs['check_hostname'] else: check_hostname = None if 'context' in _3to2kwargs: context = _3to2kwargs['context']; del _3to2kwargs['context'] else: context = None super(HTTPSConnection, self).__init__(host, port, strict, timeout, source_address) self.key_file = key_file self.cert_file = cert_file if context is None: # Some reasonable defaults context = ssl.SSLContext(ssl.PROTOCOL_SSLv23) context.options |= ssl.OP_NO_SSLv2 will_verify = context.verify_mode != ssl.CERT_NONE if check_hostname is None: check_hostname = will_verify elif check_hostname and not will_verify: raise ValueError("check_hostname needs a SSL context with " "either CERT_OPTIONAL or CERT_REQUIRED") if key_file or cert_file: context.load_cert_chain(cert_file, key_file) self._context = context self._check_hostname = check_hostname
def urlopen(url, data=None, timeout=socket._GLOBAL_DEFAULT_TIMEOUT, **_3to2kwargs): if 'cadefault' in _3to2kwargs: cadefault = _3to2kwargs['cadefault']; del _3to2kwargs['cadefault'] else: cadefault = False if 'capath' in _3to2kwargs: capath = _3to2kwargs['capath']; del _3to2kwargs['capath'] else: capath = None if 'cafile' in _3to2kwargs: cafile = _3to2kwargs['cafile']; del _3to2kwargs['cafile'] else: cafile = None global _opener if cafile or capath or cadefault: if not _have_ssl: raise ValueError('SSL support not available') context = ssl.SSLContext(ssl.PROTOCOL_SSLv23) context.options |= ssl.OP_NO_SSLv2 context.verify_mode = ssl.CERT_REQUIRED if cafile or capath: context.load_verify_locations(cafile, capath) else: context.set_default_verify_paths() https_handler = HTTPSHandler(context=context, check_hostname=True) opener = build_opener(https_handler) elif _opener is None: _opener = opener = build_opener() else: opener = _opener return opener.open(url, data, timeout)
def starttls(self, ssl_context=None): name = 'STARTTLS' if not HAVE_SSL: raise self.error('SSL support missing') if self._tls_established: raise self.abort('TLS session already established') if name not in self.capabilities: raise self.abort('TLS not supported by server') # Generate a default SSL context if none was passed. if ssl_context is None: ssl_context = ssl.SSLContext(ssl.PROTOCOL_SSLv23) # SSLv2 considered harmful. ssl_context.options |= ssl.OP_NO_SSLv2 typ, dat = self._simple_command(name) if typ == 'OK': self.sock = ssl_context.wrap_socket(self.sock) self.file = self.sock.makefile('rb') self._tls_established = True self._get_capabilities() else: raise self.error("Couldn't establish TLS session") return self._untagged_response(typ, dat, name)
def __init__(self, host, port=None, key_file=None, cert_file=None, strict=_strict_sentinel, timeout=socket._GLOBAL_DEFAULT_TIMEOUT, source_address=None, *, context=None, check_hostname=None): super(HTTPSConnection, self).__init__(host, port, strict, timeout, source_address) self.key_file = key_file self.cert_file = cert_file if context is None: # Some reasonable defaults context = ssl.SSLContext(ssl.PROTOCOL_SSLv23) context.options |= ssl.OP_NO_SSLv2 will_verify = context.verify_mode != ssl.CERT_NONE if check_hostname is None: check_hostname = will_verify elif check_hostname and not will_verify: raise ValueError("check_hostname needs a SSL context with " "either CERT_OPTIONAL or CERT_REQUIRED") if key_file or cert_file: context.load_cert_chain(cert_file, key_file) self._context = context self._check_hostname = check_hostname
def _encrypt_on(sock, context): """Wrap a socket in SSL/TLS. Arguments: - sock: Socket to wrap - context: SSL context to use for the encrypted connection Returns: - sock: New, encrypted socket. """ # Generate a default SSL context if none was passed. if context is None: context = ssl.SSLContext(ssl.PROTOCOL_SSLv23) # SSLv2 considered harmful. context.options |= ssl.OP_NO_SSLv2 return context.wrap_socket(sock) # The classes themselves
def urlopen(url, data=None, timeout=socket._GLOBAL_DEFAULT_TIMEOUT, *, cafile=None, capath=None): global _opener if cafile or capath: if not _have_ssl: raise ValueError('SSL support not available') context = ssl.SSLContext(ssl.PROTOCOL_SSLv23) context.options |= ssl.OP_NO_SSLv2 if cafile or capath: context.verify_mode = ssl.CERT_REQUIRED context.load_verify_locations(cafile, capath) check_hostname = True else: check_hostname = False https_handler = HTTPSHandler(context=context, check_hostname=check_hostname) opener = build_opener(https_handler) elif _opener is None: _opener = opener = build_opener() else: opener = _opener return opener.open(url, data, timeout)
def connect(self): sock = socket.create_connection((self.host, self.port), self.timeout) if getattr(self, '_tunnel_host', False): self.sock = sock self._tunnel() if not hasattr(ssl, 'SSLContext'): # For 2.x if self.ca_certs: cert_reqs = ssl.CERT_REQUIRED else: cert_reqs = ssl.CERT_NONE self.sock = ssl.wrap_socket(sock, self.key_file, self.cert_file, cert_reqs=cert_reqs, ssl_version=ssl.PROTOCOL_SSLv23, ca_certs=self.ca_certs) else: # pragma: no cover context = ssl.SSLContext(ssl.PROTOCOL_SSLv23) context.options |= ssl.OP_NO_SSLv2 if self.cert_file: context.load_cert_chain(self.cert_file, self.key_file) kwargs = {} if self.ca_certs: context.verify_mode = ssl.CERT_REQUIRED context.load_verify_locations(cafile=self.ca_certs) if getattr(ssl, 'HAS_SNI', False): kwargs['server_hostname'] = self.host self.sock = context.wrap_socket(sock, **kwargs) if self.ca_certs and self.check_domain: try: match_hostname(self.sock.getpeercert(), self.host) logger.debug('Host verified: %s', self.host) except CertificateError: # pragma: no cover self.sock.shutdown(socket.SHUT_RDWR) self.sock.close() raise
def connect(self): sock = socket.create_connection((self.host, self.port), self.timeout) if getattr(self, '_tunnel_host', False): self.sock = sock self._tunnel() if not hasattr(ssl, 'SSLContext'): # For 2.x if self.ca_certs: cert_reqs = ssl.CERT_REQUIRED else: cert_reqs = ssl.CERT_NONE self.sock = ssl.wrap_socket(sock, self.key_file, self.cert_file, cert_reqs=cert_reqs, ssl_version=ssl.PROTOCOL_SSLv23, ca_certs=self.ca_certs) else: context = ssl.SSLContext(ssl.PROTOCOL_SSLv23) context.options |= ssl.OP_NO_SSLv2 if self.cert_file: context.load_cert_chain(self.cert_file, self.key_file) kwargs = {} if self.ca_certs: context.verify_mode = ssl.CERT_REQUIRED context.load_verify_locations(cafile=self.ca_certs) if getattr(ssl, 'HAS_SNI', False): kwargs['server_hostname'] = self.host self.sock = context.wrap_socket(sock, **kwargs) if self.ca_certs and self.check_domain: try: match_hostname(self.sock.getpeercert(), self.host) logger.debug('Host verified: %s', self.host) except CertificateError: self.sock.shutdown(socket.SHUT_RDWR) self.sock.close() raise
def main(): asyncio.set_event_loop(None) if args.iocp: from asyncio.windows_events import ProactorEventLoop loop = ProactorEventLoop() else: loop = asyncio.new_event_loop() sslctx = None if args.tls: import ssl # TODO: take cert/key from args as well. # - ????, ?????? # # here = os.path.join(os.path.dirname(__file__), '..', 'tests') here = os.path.join(os.path.dirname(__file__), '..', '..', 'tests') sslctx = ssl.SSLContext(ssl.PROTOCOL_SSLv23) sslctx.options |= ssl.OP_NO_SSLv2 sslctx.load_cert_chain( certfile=os.path.join(here, 'ssl_cert.pem'), keyfile=os.path.join(here, 'ssl_key.pem')) cache = Cache(loop) task = asyncio.streams.start_server(cache.handle_client, args.host, args.port, ssl=sslctx, loop=loop) svr = loop.run_until_complete(task) for sock in svr.sockets: logging.info('socket %s', sock.getsockname()) try: loop.run_forever() finally: loop.close()
def _create_ssl_context(self, certfile, keyfile=None): sslcontext = ssl.SSLContext(ssl.PROTOCOL_SSLv23) sslcontext.options |= ssl.OP_NO_SSLv2 sslcontext.load_cert_chain(certfile, keyfile) return sslcontext
def test_create_server_ssl_verify_failed(self): proto = MyProto(loop=self.loop) server, host, port = self._make_ssl_server( lambda: proto, SIGNED_CERTFILE) sslcontext_client = ssl.SSLContext(ssl.PROTOCOL_SSLv23) sslcontext_client.options |= ssl.OP_NO_SSLv2 sslcontext_client.verify_mode = ssl.CERT_REQUIRED if hasattr(sslcontext_client, 'check_hostname'): sslcontext_client.check_hostname = True # no CA loaded f_c = self.loop.create_connection(MyProto, host, port, ssl=sslcontext_client) with mock.patch.object(self.loop, 'call_exception_handler'): with test_utils.disable_logger(): with self.assertRaisesRegex(ssl.SSLError, 'certificate verify failed '): self.loop.run_until_complete(f_c) # execute the loop to log the connection error test_utils.run_briefly(self.loop) # close connection self.assertIsNone(proto.transport) server.close()
def test_create_server_ssl_match_failed(self): proto = MyProto(loop=self.loop) server, host, port = self._make_ssl_server( lambda: proto, SIGNED_CERTFILE) sslcontext_client = ssl.SSLContext(ssl.PROTOCOL_SSLv23) sslcontext_client.options |= ssl.OP_NO_SSLv2 sslcontext_client.verify_mode = ssl.CERT_REQUIRED sslcontext_client.load_verify_locations( cafile=SIGNING_CA) if hasattr(sslcontext_client, 'check_hostname'): sslcontext_client.check_hostname = True # incorrect server_hostname f_c = self.loop.create_connection(MyProto, host, port, ssl=sslcontext_client) with mock.patch.object(self.loop, 'call_exception_handler'): with test_utils.disable_logger(): with self.assertRaisesRegex( ssl.CertificateError, "hostname '127.0.0.1' doesn't match 'localhost'"): self.loop.run_until_complete(f_c) # close connection proto.transport.close() server.close()
def connect(self): """Overrides HTTPSConnection.connect to specify TLS version""" # Standard implementation from HTTPSConnection, which is not # designed for extension, unfortunately if sys.version_info >= (2, 7): sock = socket.create_connection((self.host, self.port), self.timeout, self.source_address) elif sys.version_info >= (2, 6): sock = socket.create_connection((self.host, self.port), self.timeout) else: sock = socket.create_connection((self.host, self.port)) if getattr(self, '_tunnel_host', None): self.sock = sock self._tunnel() if hasattr(ssl, 'SSLContext'): # Since python 2.7.9, tls 1.1 and 1.2 are supported via # SSLContext ssl_context = ssl.SSLContext(ssl.PROTOCOL_SSLv23) ssl_context.options |= ssl.OP_NO_SSLv2 ssl_context.options |= ssl.OP_NO_SSLv3 if self.key_file and self.cert_file: ssl_context.load_cert_chain(keyfile=self.key_file, certfile=self.cert_file) self.sock = ssl_context.wrap_socket(sock) else: # This is the only difference; default wrap_socket uses SSLv23 self.sock = ssl.wrap_socket(sock, self.key_file, self.cert_file, ssl_version=ssl.PROTOCOL_TLSv1)