我们从Python开源项目中,提取了以下30个代码示例,用于说明如何使用ssl.HAS_SNI。
def connect(self): "Connect to a host on a given (SSL) port." sock = socket_create_connection((self.host, self.port), self.timeout, self.source_address) if self._tunnel_host: self.sock = sock self._tunnel() server_hostname = self.host if ssl.HAS_SNI else None self.sock = self._context.wrap_socket(sock, server_hostname=server_hostname) try: if self._check_hostname: ssl.match_hostname(self.sock.getpeercert(), self.host) except Exception: self.sock.shutdown(socket.SHUT_RDWR) self.sock.close() raise
def connect(self): "Connect to a host on a given (SSL) port." sock = socket.create_connection((self.host, self.port), self.timeout, self.source_address) if self._tunnel_host: self.sock = sock self._tunnel() server_hostname = self.host if ssl.HAS_SNI else None self.sock = self._context.wrap_socket(sock, server_hostname=server_hostname) try: if self._check_hostname: ssl.match_hostname(self.sock.getpeercert(), self.host) except Exception: self.sock.shutdown(socket.SHUT_RDWR) self.sock.close() raise
def test_sni(self): self.skipTest("test disabled - test server needed") # Checks that Server Name Indication works, if supported by the # OpenSSL linked to. # The ssl module itself doesn't have server-side support for SNI, # so we rely on a third-party test site. expect_sni = ssl.HAS_SNI with support.transient_internet("XXX"): u = urllib.request.urlopen("XXX") contents = u.readall() if expect_sni: self.assertIn(b"Great", contents) self.assertNotIn(b"Unfortunately", contents) else: self.assertNotIn(b"Great", contents) self.assertIn(b"Unfortunately", contents)
def starttls(self, ssl_context=None): name = 'STARTTLS' if not HAVE_SSL: raise self.error('SSL support missing') if self._tls_established: raise self.abort('TLS session already established') if name not in self.capabilities: raise self.abort('TLS not supported by server') # Generate a default SSL context if none was passed. if ssl_context is None: ssl_context = ssl._create_stdlib_context() typ, dat = self._simple_command(name) if typ == 'OK': server_hostname = self.host if ssl.HAS_SNI else None self.sock = ssl_context.wrap_socket(self.sock, server_hostname=server_hostname) self.file = self.sock.makefile('rb') self._tls_established = True self._get_capabilities() else: raise self.error("Couldn't establish TLS session") return self._untagged_response(typ, dat, name)
def connect(self): "Connect to a host on a given (SSL) port." super().connect() if self._tunnel_host: server_hostname = self._tunnel_host else: server_hostname = self.host sni_hostname = server_hostname if ssl.HAS_SNI else None self.sock = self._context.wrap_socket(self.sock, server_hostname=sni_hostname) if not self._context.check_hostname and self._check_hostname: try: ssl.match_hostname(self.sock.getpeercert(), server_hostname) except Exception: self.sock.shutdown(socket.SHUT_RDWR) self.sock.close() raise
def stls(self, context=None): """Start a TLS session on the active connection as specified in RFC 2595. context - a ssl.SSLContext """ if not HAVE_SSL: raise error_proto('-ERR TLS support missing') if self._tls_established: raise error_proto('-ERR TLS session already established') caps = self.capa() if not 'STLS' in caps: raise error_proto('-ERR STLS not supported by server') if context is None: context = ssl._create_stdlib_context() resp = self._shortcmd('STLS') server_hostname = self.host if ssl.HAS_SNI else None self.sock = context.wrap_socket(self.sock, server_hostname=server_hostname) self.file = self.sock.makefile('rb') self._tls_established = True return resp
def test_https_sni(self): if ssl is None: self.skipTest("ssl module required") if not ssl.HAS_SNI: self.skipTest("SNI support required in OpenSSL") sni_name = [None] def cb_sni(ssl_sock, server_name, initial_context): sni_name[0] = server_name context = ssl.SSLContext(ssl.PROTOCOL_TLSv1) context.set_servername_callback(cb_sni) handler = self.start_https_server(context=context, certfile=CERT_localhost) context = ssl.create_default_context(cafile=CERT_localhost) self.urlopen("https://localhost:%s" % handler.port, context=context) self.assertEqual(sni_name[0], "localhost")
def test_https_sni(self): if ssl is None: self.skipTest("ssl module required") if not ssl.HAS_SNI: self.skipTest("SNI support required in OpenSSL") sni_name = None def cb_sni(ssl_sock, server_name, initial_context): nonlocal sni_name sni_name = server_name context = ssl.SSLContext(ssl.PROTOCOL_TLSv1) context.set_servername_callback(cb_sni) handler = self.start_https_server(context=context, certfile=CERT_localhost) context = ssl.create_default_context(cafile=CERT_localhost) self.urlopen("https://localhost:%s" % handler.port, context=context) self.assertEqual(sni_name, "localhost")
def _create_socket(self): sock = IMAP4._create_socket(self) server_hostname = self.host if ssl.HAS_SNI else None return self.ssl_context.wrap_socket(sock, server_hostname=server_hostname)
def _get_socket(self, host, port, timeout): if self.debuglevel > 0: print('connect:', (host, port), file=stderr) new_socket = socket.create_connection((host, port), timeout, self.source_address) server_hostname = self._host if ssl.HAS_SNI else None new_socket = self.context.wrap_socket(new_socket, server_hostname=server_hostname) return new_socket
def auth(self): '''Set up secure control connection by using TLS/SSL.''' if isinstance(self.sock, ssl.SSLSocket): raise ValueError("Already using TLS") if self.ssl_version == ssl.PROTOCOL_TLSv1: resp = self.voidcmd('AUTH TLS') else: resp = self.voidcmd('AUTH SSL') server_hostname = self.host if ssl.HAS_SNI else None self.sock = self.context.wrap_socket(self.sock, server_hostname=server_hostname) self.file = self.sock.makefile(mode='r', encoding=self.encoding) return resp
def ntransfercmd(self, cmd, rest=None): conn, size = FTP.ntransfercmd(self, cmd, rest) if self._prot_p: server_hostname = self.host if ssl.HAS_SNI else None conn = self.context.wrap_socket(conn, server_hostname=server_hostname) return conn, size
def test_https_sni(self): if ssl is None: self.skipTest("ssl module required") if not ssl.HAS_SNI: self.skipTest("SNI support required in OpenSSL") sni_name = None def cb_sni(ssl_sock, server_name, initial_context): nonlocal sni_name sni_name = server_name context = ssl.SSLContext(ssl.PROTOCOL_TLSv1) context.set_servername_callback(cb_sni) handler = self.start_https_server(context=context, certfile=CERT_localhost) self.urlopen("https://localhost:%s" % handler.port) self.assertEqual(sni_name, "localhost")
def connect(self, addr=('irc.freenode.net', 6667), use_ssl=False): '''Connect to a IRC server. addr is a tuple of (server, port)''' self.acquire_lock() self.addr = (rmnlsp(addr[0]), addr[1]) for res in socket.getaddrinfo(self.addr[0], self.addr[1], socket.AF_UNSPEC, socket.SOCK_STREAM): af, socktype, proto, canonname, sa = res try: if use_ssl: if (3,) <= sys.version_info < (3, 3): self.sock = ssl.SSLSocket(af, socktype, proto) elif sys.version_info >= (3, 4): ctx = ssl.create_default_context() if ssl.HAS_SNI: self.sock = ctx.wrap_socket(socket.socket(af, socktype, proto), server_hostname=self.addr[0]) else: self.sock = ctx.wrap_socket(socket.socket(af, socktype, proto)) else: self.sock = ssl.SSLSocket(sock=socket.socket(af, socktype, proto)) else: self.sock = socket.socket(af, socktype, proto) except socket.error: self.sock = None continue try: self.sock.settimeout(300) self.sock.connect(sa) except socket.error: self.sock.close() self.sock = None continue break if self.sock is None: e = socket.error( '[errno %d] Socket operation on non-socket' % errno.ENOTSOCK) e.errno = errno.ENOTSOCK self.lock.release() raise e self.nick = None self.recvbuf = b'' self.sendbuf = b'' self.lock.release()
def starttls(self, keyfile=None, certfile=None, context=None): """Puts the connection to the SMTP server into TLS mode. If there has been no previous EHLO or HELO command this session, this method tries ESMTP EHLO first. If the server supports TLS, this will encrypt the rest of the SMTP session. If you provide the keyfile and certfile parameters, the identity of the SMTP server and client can be checked. This, however, depends on whether the socket module really checks the certificates. This method may raise the following exceptions: SMTPHeloError The server didn't reply properly to the helo greeting. """ self.ehlo_or_helo_if_needed() if not self.has_extn("starttls"): raise SMTPException("STARTTLS extension not supported by server.") (resp, reply) = self.docmd("STARTTLS") if resp == 220: if not _have_ssl: raise RuntimeError("No SSL support included in this Python") if context is not None and keyfile is not None: raise ValueError("context and keyfile arguments are mutually " "exclusive") if context is not None and certfile is not None: raise ValueError("context and certfile arguments are mutually " "exclusive") if context is None: context = ssl._create_stdlib_context(certfile=certfile, keyfile=keyfile) server_hostname = self._host if ssl.HAS_SNI else None self.sock = context.wrap_socket(self.sock, server_hostname=server_hostname) self.file = None # RFC 3207: # The client MUST discard any knowledge obtained from # the server, such as the list of SMTP service extensions, # which was not obtained from the TLS negotiation itself. self.helo_resp = None self.ehlo_resp = None self.esmtp_features = {} self.does_esmtp = 0 return (resp, reply)
def __init__(self, loop, rawsock, protocol, sslcontext, waiter=None, server_side=False, server_hostname=None, extra=None, server=None): if ssl is None: raise RuntimeError('stdlib ssl module not available') if server_side: if not sslcontext: raise ValueError('Server side ssl needs a valid SSLContext') else: if not sslcontext: # Client side may pass ssl=True to use a default # context; in that case the sslcontext passed is None. # The default is the same as used by urllib with # cadefault=True. if hasattr(ssl, '_create_stdlib_context'): sslcontext = ssl._create_stdlib_context( cert_reqs=ssl.CERT_REQUIRED, check_hostname=bool(server_hostname)) else: # Fallback for Python 3.3. sslcontext = ssl.SSLContext(ssl.PROTOCOL_SSLv23) sslcontext.options |= ssl.OP_NO_SSLv2 sslcontext.set_default_verify_paths() sslcontext.verify_mode = ssl.CERT_REQUIRED wrap_kwargs = { 'server_side': server_side, 'do_handshake_on_connect': False, } if server_hostname and not server_side and ssl.HAS_SNI: wrap_kwargs['server_hostname'] = server_hostname sslsock = sslcontext.wrap_socket(rawsock, **wrap_kwargs) super().__init__(loop, sslsock, protocol, extra, server) self._server_hostname = server_hostname self._waiter = waiter self._sslcontext = sslcontext self._paused = False # SSL-specific extra info. (peercert is set later) self._extra.update(sslcontext=sslcontext) if self._loop.get_debug(): logger.debug("%r starts SSL handshake", self) start_time = self._loop.time() else: start_time = None self._on_handshake(start_time)
def __init__(self, wsgi, prefix_resolver=None, download_host=None, proxy_host=None, proxy_options=None, proxy_apps=None): self._wsgi = wsgi self.set_connection_class() if isinstance(prefix_resolver, str): prefix_resolver = FixedResolver(prefix_resolver) self.prefix_resolver = prefix_resolver or FixedResolver() self.proxy_apps = proxy_apps or {} self.proxy_host = proxy_host or self.DEFAULT_HOST #self.has_sni = hasattr(ssl, 'HAS_SNI') and ssl.HAS_SNI #self.has_alpn = hasattr(ssl, 'HAS_ALPN') and ssl.HAS_ALPN if self.proxy_host not in self.proxy_apps: self.proxy_apps[self.proxy_host] = None # HTTPS Only Options proxy_options = proxy_options or {} ca_name = proxy_options.get('ca_name', self.CA_ROOT_NAME) ca_file_cache = proxy_options.get('ca_file_cache', self.CA_ROOT_FILE) self.ca = CertificateAuthority(ca_name=ca_name, ca_file_cache=ca_file_cache, cert_cache=None, cert_not_before=-3600) try: self.root_ca_file = self.ca.get_root_pem_filename() except Exception as e: self.root_ca_file = None self.use_wildcard = proxy_options.get('use_wildcard_certs', True) if proxy_options.get('enable_cert_download', True): download_host = download_host or self.DEFAULT_HOST self.proxy_apps[download_host] = CertDownloader(self.ca) self.enable_ws = proxy_options.get('enable_websockets', True) if WebSocketHandler == object: self.enable_ws = None