我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用ssl.PROTOCOL_SSLv23()。
def create_ssl_context(proto=ssl.PROTOCOL_SSLv23, verify_mode=ssl.CERT_NONE, protocols=None, options=None, ciphers="ALL"): protocols = protocols or ('PROTOCOL_SSLv3','PROTOCOL_TLSv1', 'PROTOCOL_TLSv1_1','PROTOCOL_TLSv1_2') options = options or ('OP_CIPHER_SERVER_PREFERENCE','OP_SINGLE_DH_USE', 'OP_SINGLE_ECDH_USE','OP_NO_COMPRESSION') context = ssl.SSLContext(proto) context.verify_mode = verify_mode # reset protocol, options context.protocol = 0 context.options = 0 for p in protocols: context.protocol |= getattr(ssl, p, 0) for o in options: context.options |= getattr(ssl, o, 0) context.set_ciphers(ciphers) return context
def load_ssl_context(cert_file, pkey_file=None, protocol=None): """Loads SSL context from cert/private key files and optional protocol. Many parameters are directly taken from the API of :py:class:`ssl.SSLContext`. :param cert_file: Path of the certificate to use. :param pkey_file: Path of the private key to use. If not given, the key will be obtained from the certificate file. :param protocol: One of the ``PROTOCOL_*`` constants in the stdlib ``ssl`` module. Defaults to ``PROTOCOL_SSLv23``. """ if protocol is None: protocol = ssl.PROTOCOL_SSLv23 ctx = _SSLContext(protocol) ctx.load_cert_chain(cert_file, pkey_file) return ctx
def __init__(self, dispatcher, connection, address, use_ssl, ssl_certfile, ssl_keyfile): Session.__init__(self, dispatcher) self.use_ssl = use_ssl self.raw_connection = connection if use_ssl: import ssl self._connection = ssl.wrap_socket( connection, server_side=True, certfile=ssl_certfile, keyfile=ssl_keyfile, ssl_version=ssl.PROTOCOL_SSLv23, do_handshake_on_connect=False) else: self._connection = connection self.address = address[0] + ":%d"%address[1] self.name = "TCP " if not use_ssl else "SSL " self.timeout = 1000 self.dispatcher.add_session(self) self.response_queue = queue.Queue() self.message = '' self.retry_msg = '' self.handshake = not self.use_ssl self.need_write = True
def testHttpsContext(self): client = httplib2.Http(ca_certs=self.ca_certs_path) # Establish connection to local server client.request('https://localhost:%d/' % (self.port)) # Verify that connection uses a TLS context with the correct hostname conn = client.connections['https:localhost:%d' % self.port] self.assertIsInstance(conn.sock, ssl.SSLSocket) self.assertTrue(hasattr(conn.sock, 'context')) self.assertIsInstance(conn.sock.context, ssl.SSLContext) self.assertTrue(conn.sock.context.check_hostname) self.assertEqual(conn.sock.server_hostname, 'localhost') self.assertEqual(conn.sock.context.verify_mode, ssl.CERT_REQUIRED) self.assertEqual(conn.sock.context.protocol, ssl.PROTOCOL_SSLv23)
def ssl_wrap_socket(sock, keyfile=None, certfile=None, cert_reqs=CERT_NONE, ca_certs=None, server_hostname=None, ssl_version=PROTOCOL_SSLv23): """ All arguments except `server_hostname` have the same meaning as for :func:`ssl.wrap_socket` :param server_hostname: Hostname of the expected certificate """ context = SSLContext(ssl_version) context.verify_mode = cert_reqs if ca_certs: try: context.load_verify_locations(ca_certs) except TypeError as e: # Reraise as SSLError # FIXME: This block needs a test. raise SSLError(e) if certfile: # FIXME: This block needs a test. context.load_cert_chain(certfile, keyfile) if HAS_SNI: # Platform-specific: OpenSSL with enabled SNI return context.wrap_socket(sock, server_hostname=server_hostname) return context.wrap_socket(sock)
def wrap_socket(self, sock): try: if self.clientcert_req: ca_certs = self.interface[4] cert_reqs = ssl.CERT_OPTIONAL sock = ssl.wrap_socket(sock, keyfile=self.interface[2], certfile=self.interface[3], server_side=True, cert_reqs=cert_reqs, ca_certs=ca_certs, ssl_version=ssl.PROTOCOL_SSLv23) else: sock = ssl.wrap_socket(sock, keyfile=self.interface[2], certfile=self.interface[3], server_side=True, ssl_version=ssl.PROTOCOL_SSLv23) except SSLError: # Generally this happens when an HTTP request is received on a # secure socket. We don't do anything because it will be detected # by Worker and dealt with appropriately. pass return sock
def configure_client_socket(self): """This is the socket from mallory to the victim""" self.log.debug("SSLProto: Getting common name from socket") # Destination is an ssl socket to the server. cert_from_remote_server = self.destination.getpeercert(True) # fake_key is the private key and we need to store it somewhere. fake_cert, fake_key = cert_auth.ca.get_fake_cert_and_key_filename(cert_from_remote_server) self.log.debug("SSLProtocol: private key" + fake_key) self.log.debug("SSLProto: Starting Socket") try: self.source = ssl.wrap_socket(self.source, server_side=True, certfile=fake_cert, keyfile=fake_key, ssl_version=ssl.PROTOCOL_SSLv23) except: self.log.debug("SSLProto: Client Closed SSL Connection") traceback.print_exc() self.log.debug("SSLProto: WoWzer!!")
def serve(self): self.initialize() self.challenge_thread = ChallengeThread.ChallengeThread(self) self.challenge_thread.start() if self.ssl_on: ssl_context = ssl.SSLContext(protocol=ssl.PROTOCOL_SSLv23) ssl_context.load_cert_chain(self.ssl_cert) self.server_socket = websockets.serve( self.handle_connection, self.listen_address, self.port, ssl=ssl_context) else: self.server_socket = websockets.serve( self.handle_connection, self.listen_address, self.port) try: asyncio.get_event_loop().run_until_complete(self.server_socket) asyncio.get_event_loop().run_forever() except KeyboardInterrupt: print("Closing the server") asyncio.get_event_loop().close()
def connect(self): sock = socket.create_connection((self.host, self.port), self.timeout) if getattr(self, '_tunnel_host', False): self.sock = sock self._tunnel() if not hasattr(ssl, 'SSLContext'): # For 2.x if self.ca_certs: cert_reqs = ssl.CERT_REQUIRED else: cert_reqs = ssl.CERT_NONE self.sock = ssl.wrap_socket(sock, self.key_file, self.cert_file, cert_reqs=cert_reqs, ssl_version=ssl.PROTOCOL_SSLv23, ca_certs=self.ca_certs) else: # pragma: no cover context = ssl.SSLContext(ssl.PROTOCOL_SSLv23) context.options |= ssl.OP_NO_SSLv2 if self.cert_file: context.load_cert_chain(self.cert_file, self.key_file) kwargs = {} if self.ca_certs: context.verify_mode = ssl.CERT_REQUIRED context.load_verify_locations(cafile=self.ca_certs) if getattr(ssl, 'HAS_SNI', False): kwargs['server_hostname'] = self.host self.sock = context.wrap_socket(sock, **kwargs) if self.ca_certs and self.check_domain: try: match_hostname(self.sock.getpeercert(), self.host) logger.debug('Host verified: %s', self.host) except CertificateError: # pragma: no cover self.sock.shutdown(socket.SHUT_RDWR) self.sock.close() raise
def resolve_ssl_version(candidate): """ like resolve_cert_reqs """ if candidate is None: return PROTOCOL_SSLv23 if isinstance(candidate, str): res = getattr(ssl, candidate, None) if res is None: res = getattr(ssl, 'PROTOCOL_' + candidate) return res return candidate
def get_ssl_context(*args): """Create and return an SSLContext object.""" certfile, keyfile, ca_certs, cert_reqs = args # Note PROTOCOL_SSLv23 is about the most misleading name imaginable. # This configures the server and client to negotiate the # highest protocol version they both support. A very good thing. ctx = SSLContext(ssl.PROTOCOL_SSLv23) if hasattr(ctx, "options"): # Explicitly disable SSLv2 and SSLv3. Note that up to # date versions of MongoDB 2.4 and above already do this, # python disables SSLv2 by default in >= 2.7.7 and >= 3.3.4 # and SSLv3 in >= 3.4.3. There is no way for us to do this # explicitly for python 2.6 or 2.7 before 2.7.9. ctx.options |= getattr(ssl, "OP_NO_SSLv2", 0) ctx.options |= getattr(ssl, "OP_NO_SSLv3", 0) if certfile is not None: ctx.load_cert_chain(certfile, keyfile) if ca_certs is not None: ctx.load_verify_locations(ca_certs) elif cert_reqs != ssl.CERT_NONE: # CPython >= 2.7.9 or >= 3.4.0, pypy >= 2.5.1 if hasattr(ctx, "load_default_certs"): ctx.load_default_certs() # Python >= 3.2.0, useless on Windows. elif (sys.platform != "win32" and hasattr(ctx, "set_default_verify_paths")): ctx.set_default_verify_paths() elif sys.platform == "win32" and HAVE_WINCERTSTORE: with _WINCERTSLOCK: if _WINCERTS is None: _load_wincerts() ctx.load_verify_locations(_WINCERTS.name) elif HAVE_CERTIFI: ctx.load_verify_locations(certifi.where()) else: raise ConfigurationError( "`ssl_cert_reqs` is not ssl.CERT_NONE and no system " "CA certificates could be loaded. `ssl_ca_certs` is " "required.") ctx.verify_mode = ssl.CERT_REQUIRED if cert_reqs is None else cert_reqs return ctx
def connect(self): sock = socket.create_connection((self.host, self.port), self.timeout) if getattr(self, '_tunnel_host', False): self.sock = sock self._tunnel() if not hasattr(ssl, 'SSLContext'): # For 2.x if self.ca_certs: cert_reqs = ssl.CERT_REQUIRED else: cert_reqs = ssl.CERT_NONE self.sock = ssl.wrap_socket(sock, self.key_file, self.cert_file, cert_reqs=cert_reqs, ssl_version=ssl.PROTOCOL_SSLv23, ca_certs=self.ca_certs) else: context = ssl.SSLContext(ssl.PROTOCOL_SSLv23) context.options |= ssl.OP_NO_SSLv2 if self.cert_file: context.load_cert_chain(self.cert_file, self.key_file) kwargs = {} if self.ca_certs: context.verify_mode = ssl.CERT_REQUIRED context.load_verify_locations(cafile=self.ca_certs) if getattr(ssl, 'HAS_SNI', False): kwargs['server_hostname'] = self.host self.sock = context.wrap_socket(sock, **kwargs) if self.ca_certs and self.check_domain: try: match_hostname(self.sock.getpeercert(), self.host) logger.debug('Host verified: %s', self.host) except CertificateError: self.sock.shutdown(socket.SHUT_RDWR) self.sock.close() raise
def ssl_wrap_socket(self): # Allow sending of keep-alive messages - seems to prevent some servers # from closing SSL, leading to deadlocks. self.sock.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1) try: import ssl if self.ca_certs is not None: cert_reqs = ssl.CERT_REQUIRED else: cert_reqs = ssl.CERT_NONE if self.ssl_version == "tls1": ssl_version = ssl.PROTOCOL_TLSv1 elif self.ssl_version == "ssl2": ssl_version = ssl.PROTOCOL_SSLv2 elif self.ssl_version == "ssl3": ssl_version = ssl.PROTOCOL_SSLv3 elif self.ssl_version == "ssl23" or self.ssl_version is None: ssl_version = ssl.PROTOCOL_SSLv23 else: raise socket.sslerror("Invalid SSL version requested: %s", self.ssl_version) self.sock = ssl.wrap_socket(self.sock, self.keyfile, self.certfile, ca_certs=self.ca_certs, cert_reqs=cert_reqs, ssl_version=ssl_version) ssl_exc = ssl.SSLError self.read_fd = self.sock.fileno() except ImportError: # No ssl module, and socket.ssl has no fileno(), and does not allow certificate verification raise socket.sslerror("imaplib2 SSL mode does not work without ssl module") if self.cert_verify_cb is not None: cert_err = self.cert_verify_cb(self.sock.getpeercert(), self.host) if cert_err: raise ssl_exc(cert_err)
def ssl_options_to_context(ssl_options): """Try to convert an ``ssl_options`` dictionary to an `~ssl.SSLContext` object. The ``ssl_options`` dictionary contains keywords to be passed to `ssl.wrap_socket`. In Python 2.7.9+, `ssl.SSLContext` objects can be used instead. This function converts the dict form to its `~ssl.SSLContext` equivalent, and may be used when a component which accepts both forms needs to upgrade to the `~ssl.SSLContext` version to use features like SNI or NPN. """ if isinstance(ssl_options, dict): assert all(k in _SSL_CONTEXT_KEYWORDS for k in ssl_options), ssl_options if (not hasattr(ssl, 'SSLContext') or isinstance(ssl_options, ssl.SSLContext)): return ssl_options context = ssl.SSLContext( ssl_options.get('ssl_version', ssl.PROTOCOL_SSLv23)) if 'certfile' in ssl_options: context.load_cert_chain(ssl_options['certfile'], ssl_options.get('keyfile', None)) if 'cert_reqs' in ssl_options: context.verify_mode = ssl_options['cert_reqs'] if 'ca_certs' in ssl_options: context.load_verify_locations(ssl_options['ca_certs']) if 'ciphers' in ssl_options: context.set_ciphers(ssl_options['ciphers']) if hasattr(ssl, 'OP_NO_COMPRESSION'): # Disable TLS compression to avoid CRIME and related attacks. # This constant wasn't added until python 3.3. context.options |= ssl.OP_NO_COMPRESSION return context