我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用ssl.CERT_OPTIONAL。
def __init__(self, ssl_keyfile=None, ssl_certfile=None, ssl_cert_reqs=None, ssl_ca_certs=None, **kwargs): if not ssl_available: raise RedisError("Python wasn't built with SSL support") super(SSLConnection, self).__init__(**kwargs) self.keyfile = ssl_keyfile self.certfile = ssl_certfile if ssl_cert_reqs is None: ssl_cert_reqs = ssl.CERT_NONE elif isinstance(ssl_cert_reqs, basestring): CERT_REQS = { 'none': ssl.CERT_NONE, 'optional': ssl.CERT_OPTIONAL, 'required': ssl.CERT_REQUIRED } if ssl_cert_reqs not in CERT_REQS: raise RedisError( "Invalid SSL Certificate Requirements Flag: %s" % ssl_cert_reqs) ssl_cert_reqs = CERT_REQS[ssl_cert_reqs] self.cert_reqs = ssl_cert_reqs self.ca_certs = ssl_ca_certs
def _verify_cert(self, peer_cert): """Returns True if peercert is valid according to the configured validation mode and hostname. The ssl handshake already tested the certificate for a valid CA signature; the only thing that remains is to check the hostname. """ verify_mode = self.ssl_options.verify_mode assert verify_mode in (ssl.CERT_NONE, ssl.CERT_REQUIRED, ssl.CERT_OPTIONAL) if verify_mode == ssl.CERT_NONE or self.host is None: return True if peer_cert is None and verify_mode == ssl.CERT_REQUIRED: logger.warning("No SSL certificate given") return False try: ssl_match_hostname(peer_cert, self.host) except SSLCertificateError: logger.warning("Invalid SSL certificate", exc_info=True) return False else: return True
def wrap_socket(self, sock): try: if self.clientcert_req: ca_certs = self.interface[4] cert_reqs = ssl.CERT_OPTIONAL sock = ssl.wrap_socket(sock, keyfile=self.interface[2], certfile=self.interface[3], server_side=True, cert_reqs=cert_reqs, ca_certs=ca_certs, ssl_version=ssl.PROTOCOL_SSLv23) else: sock = ssl.wrap_socket(sock, keyfile=self.interface[2], certfile=self.interface[3], server_side=True, ssl_version=ssl.PROTOCOL_SSLv23) except SSLError: # Generally this happens when an HTTP request is received on a # secure socket. We don't do anything because it will be detected # by Worker and dealt with appropriately. pass return sock
def create_secure_socket(self, cert_dir, host='localhost', port=8001): context = ssl.SSLContext() # Defaults to SSL/TLS support with PROTOCOL_TLS (best for now for compatibility) context.verify_mode = ssl.CERT_OPTIONAL # ssl.CERT_REQUIRED is more secure context.check_hostname = False # Hostname verification on certs (Dont want for now) context.load_default_certs(purpose=ssl.Purpose.CLIENT_AUTH) # Load the public CA certs for the server socket (need CLIENT_AUTH param) self.generate_server_self_cert(cert_dir) context.load_cert_chain(join(cert_dir, "KnowledgeManagement.crt"), keyfile=join(cert_dir, "KnowledgeManagement.key")) # Create the secure socket that will be listening for connections. # Note that the SSL handshake is NOT performed upon connection so the server can securely transfer the cert # if required. self.server_proc = socket.socket(socket.AF_INET, socket.SOCK_STREAM) self.server_proc.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) secureSocket = context.wrap_socket(self.server_proc, server_side=True) secureSocket.bind((host, port)) print("SSL Server started on {}:{}".format(host, port)) #self.server_negotiate_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) #self.server_negotiate_sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) #self.server_negotiate_sock.bind((host, port+1)) self.is_listening=True return secureSocket
def run(self, handler): context = ssl.SSLContext(ssl.PROTOCOL_TLSv1_2) context.load_cert_chain(handler.certificate_file, handler.private_key_file) context.load_verify_locations(cafile=handler.certificate_file) context.load_verify_locations(cafile=handler.crl_file) context.options &= ssl.OP_NO_SSLv3 context.options &= ssl.OP_NO_SSLv2 context.verify_flags |= ssl.VERIFY_CRL_CHECK_LEAF context.verify_mode = ssl.CERT_OPTIONAL self.options['ssl_context'] = context logger.info('Starting server on host %s port %d.', self.host, self.port) server = pywsgi.WSGIServer( (self.host, self.port), handler, ssl_context=context, handler_class=RequestHandler, log=logger, error_log=logger, ) server.serve_forever()
def _connect(self): if self.server_tls: raise Exception("TBD") print(self.client.tls_set(self.server_tls.server_cert, cert_reqs=ssl.CERT_OPTIONAL)) print(self.client.connect(self.host, self.port)) else: self.client.connect(self.host, self.port) self.client.subscribe(self.topics) def on_connect(client, userdata, flags, rc): print("Connected with result code "+str(rc)) self.client.on_connect = on_connect def on_publish(client, userdata, mid): print("Successfully published mid %d" % mid) self.client.on_publish = on_publish
def __init__(self, keyfile=None, certfile=None, cert_reqs=None, ca_certs=None): self.keyfile = keyfile self.certfile = certfile if cert_reqs is None: self.cert_reqs = ssl.CERT_NONE elif isinstance(cert_reqs, str): CERT_REQS = { 'none': ssl.CERT_NONE, 'optional': ssl.CERT_OPTIONAL, 'required': ssl.CERT_REQUIRED } if cert_reqs not in CERT_REQS: raise RedisError( "Invalid SSL Certificate Requirements Flag: %s" % cert_reqs) self.cert_reqs = CERT_REQS[cert_reqs] self.ca_certs = ca_certs self.context = None
def test_cert_reqs_options(self): import ssl with pytest.raises(TypeError) as e: pool = aredis.ConnectionPool.from_url( 'rediss://?ssl_cert_reqs=none&ssl_keyfile=test') assert e.message == 'certfile should be a valid filesystem path' assert pool.get_connection().ssl_context.verify_mode == ssl.CERT_NONE with pytest.raises(TypeError) as e: pool = aredis.ConnectionPool.from_url( 'rediss://?ssl_cert_reqs=optional&ssl_keyfile=test') assert e.message == 'certfile should be a valid filesystem path' assert pool.get_connection().ssl_context.verify_mode == ssl.CERT_OPTIONAL with pytest.raises(TypeError) as e: pool = aredis.ConnectionPool.from_url( 'rediss://?ssl_cert_reqs=required&ssl_keyfile=test') assert e.message == 'certfile should be a valid filesystem path' assert pool.get_connection().ssl_context.verify_mode == ssl.CERT_REQUIRED
def validate_cert_reqs(option, value): """Validate the cert reqs are valid. It must be None or one of the three values ``ssl.CERT_NONE``, ``ssl.CERT_OPTIONAL`` or ``ssl.CERT_REQUIRED``. """ if value is None: return value elif isinstance(value, string_type) and hasattr(ssl, value): value = getattr(ssl, value) if value in (ssl.CERT_NONE, ssl.CERT_OPTIONAL, ssl.CERT_REQUIRED): return value raise ValueError("The value of %s must be one of: " "`ssl.CERT_NONE`, `ssl.CERT_OPTIONAL` or " "`ssl.CERT_REQUIRED" % (option,))
def __get_verify_mode(self): """Whether to try to verify other peers' certificates and how to behave if verification fails. This attribute must be one of ssl.CERT_NONE, ssl.CERT_OPTIONAL or ssl.CERT_REQUIRED. """ return self._verify_mode
def validate_cert_reqs(option, value): """Validate the cert reqs are valid. It must be None or one of the three values ``ssl.CERT_NONE``, ``ssl.CERT_OPTIONAL`` or ``ssl.CERT_REQUIRED``""" if value is None: return value if HAS_SSL: if value in (ssl.CERT_NONE, ssl.CERT_OPTIONAL, ssl.CERT_REQUIRED): return value raise ConfigurationError("The value of %s must be one of: " "`ssl.CERT_NONE`, `ssl.CERT_OPTIONAL` or " "`ssl.CERT_REQUIRED" % (option,)) else: raise ConfigurationError("The value of %s is set but can't be " "validated. The ssl module is not available" % (option,))
def _verify_cert(self, peercert): """Returns True if peercert is valid according to the configured validation mode and hostname. The ssl handshake already tested the certificate for a valid CA signature; the only thing that remains is to check the hostname. """ if isinstance(self._ssl_options, dict): verify_mode = self._ssl_options.get('cert_reqs', ssl.CERT_NONE) elif isinstance(self._ssl_options, ssl.SSLContext): verify_mode = self._ssl_options.verify_mode assert verify_mode in (ssl.CERT_NONE, ssl.CERT_REQUIRED, ssl.CERT_OPTIONAL) if verify_mode == ssl.CERT_NONE or self._server_hostname is None: return True cert = self.socket.getpeercert() if cert is None and verify_mode == ssl.CERT_REQUIRED: gen_log.warning("No SSL certificate given") return False try: ssl_match_hostname(peercert, self._server_hostname) except SSLCertificateError as e: gen_log.warning("Invalid SSL certificate: %s" % e) return False else: return True
def set_cert(self, key_file=None, cert_file=None, cert_reqs='CERT_NONE', ca_certs=None): ssl_req_scheme = { 'CERT_NONE': ssl.CERT_NONE, 'CERT_OPTIONAL': ssl.CERT_OPTIONAL, 'CERT_REQUIRED': ssl.CERT_REQUIRED } self.key_file = key_file self.cert_file = cert_file self.cert_reqs = ssl_req_scheme.get(cert_reqs) or ssl.CERT_NONE self.ca_certs = ca_certs
def validate_cert_reqs(option, value): """Validate the cert reqs are valid. It must be None or one of the three values ``ssl.CERT_NONE``, ``ssl.CERT_OPTIONAL`` or ``ssl.CERT_REQUIRED``. """ if value is None: return value elif isinstance(value, string_type) and hasattr(ssl, value): value = getattr(ssl, value) if value in (ssl.CERT_NONE, ssl.CERT_OPTIONAL, ssl.CERT_REQUIRED): return value raise ValueError("The value of %s must be one of: " "`ssl.CERT_NONE`, `ssl.CERT_OPTIONAL` or " "`ssl.CERT_REQUIRED`" % (option,))
def _connect(self): if self.server_tls: raise Exception("TBD") print(self.client.tls_set(self.server_tls.server_cert, cert_reqs=ssl.CERT_OPTIONAL)) print(self.client.connect(self.host, self.port)) else: self.client.connect(self.host, self.port) def on_connect(client, userdata, flags, rc): print("Connected with result code "+str(rc)) # Subscribing in on_connect() means that if we lose the connection and # reconnect then subscriptions will be renewed. client.subscribe(self.topics) self.client.on_connect = on_connect
def _verify_cert(self, peercert): """Returns True if peercert is valid according to the configured validation mode and hostname. The ssl handshake already tested the certificate for a valid CA signature; the only thing that remains is to check the hostname. """ if isinstance(self._ssl_options, dict): verify_mode = self._ssl_options.get('cert_reqs', ssl.CERT_NONE) elif isinstance(self._ssl_options, ssl.SSLContext): verify_mode = self._ssl_options.verify_mode assert verify_mode in (ssl.CERT_NONE, ssl.CERT_REQUIRED, ssl.CERT_OPTIONAL) if verify_mode == ssl.CERT_NONE or self._server_hostname is None: return True cert = self.socket.getpeercert() if cert is None and verify_mode == ssl.CERT_REQUIRED: gen_log.warning("No SSL certificate given") return False try: ssl_match_hostname(peercert, self._server_hostname) except SSLCertificateError: gen_log.warning("Invalid SSL certificate", exc_info=True) return False else: return True
def _wrap_ssl(self): assert self.config['security_protocol'] in ('SSL', 'SASL_SSL') if self._ssl_context is None: log.debug('%s: configuring default SSL Context', self) self._ssl_context = ssl.SSLContext(ssl.PROTOCOL_SSLv23) # pylint: disable=no-member self._ssl_context.options |= ssl.OP_NO_SSLv2 # pylint: disable=no-member self._ssl_context.options |= ssl.OP_NO_SSLv3 # pylint: disable=no-member self._ssl_context.verify_mode = ssl.CERT_OPTIONAL if self.config['ssl_check_hostname']: self._ssl_context.check_hostname = True if self.config['ssl_cafile']: log.info('%s: Loading SSL CA from %s', self, self.config['ssl_cafile']) self._ssl_context.load_verify_locations(self.config['ssl_cafile']) self._ssl_context.verify_mode = ssl.CERT_REQUIRED if self.config['ssl_certfile'] and self.config['ssl_keyfile']: log.info('%s: Loading SSL Cert from %s', self, self.config['ssl_certfile']) log.info('%s: Loading SSL Key from %s', self, self.config['ssl_keyfile']) self._ssl_context.load_cert_chain( certfile=self.config['ssl_certfile'], keyfile=self.config['ssl_keyfile'], password=self.config['ssl_password']) if self.config['ssl_crlfile']: if not hasattr(ssl, 'VERIFY_CRL_CHECK_LEAF'): error = 'No CRL support with this version of Python.' log.error('%s: %s Disconnecting.', self, error) self.close(Errors.ConnectionError(error)) return log.info('%s: Loading SSL CRL from %s', self, self.config['ssl_crlfile']) self._ssl_context.load_verify_locations(self.config['ssl_crlfile']) # pylint: disable=no-member self._ssl_context.verify_flags |= ssl.VERIFY_CRL_CHECK_LEAF log.debug('%s: wrapping socket in ssl context', self) try: self._sock = self._ssl_context.wrap_socket( self._sock, server_hostname=self.hostname, do_handshake_on_connect=False) except ssl.SSLError as e: log.exception('%s: Failed to wrap socket in SSLContext!', self) self.close(e)