我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用ssl.CERT_REQUIRED。
def connect(self): sock = socket.create_connection( (self.host, self.port), getattr(self, 'source_address', None) ) # Handle the socket if a (proxy) tunnel is present if hasattr(self, '_tunnel') and getattr(self, '_tunnel_host', None): self.sock = sock self._tunnel() self.sock = ssl.wrap_socket( sock, cert_reqs=ssl.CERT_REQUIRED, ca_certs=self.ca_bundle ) try: match_hostname(self.sock.getpeercert(), self.host) except CertificateError: self.sock.shutdown(socket.SHUT_RDWR) self.sock.close() raise
def resolve_cert_reqs(candidate): """ Resolves the argument to a numeric constant, which can be passed to the wrap_socket function/method from the ssl module. Defaults to :data:`ssl.CERT_NONE`. If given a string it is assumed to be the name of the constant in the :mod:`ssl` module or its abbrevation. (So you can specify `REQUIRED` instead of `CERT_REQUIRED`. If it's neither `None` nor a string we assume it is already the numeric constant which can directly be passed to wrap_socket. """ if candidate is None: return CERT_NONE if isinstance(candidate, str): res = getattr(ssl, candidate, None) if res is None: res = getattr(ssl, 'CERT_' + candidate) return res return candidate
def start(self): self.deviceHandler.start() if self.protocol == "udp": self.loadState() self.logger.debug("udpHeartbeatSeconds = {0}".format(self.udpHeartbeatSeconds)) self.logger.debug("udpDataPacketInterval = {0}".format(self.udpDataPacketInterval)) self.udpServer = SocketServer.UDPServer(('0.0.0.0', 0), IotUDPHandler) self.udpServer.service = self self.udpServer.role = IotUDPHandler.CLIENT self.logger.info("starting UDP client at {0}:{1} connecting to {2}, state at {3}".format(self.udpServer.server_address[0], self.udpServer.server_address[1], self.serverAddr, self.stateFile)) timer = threading.Timer(0.5, self.repeat) timer.daemon = True timer.start() self.udpServer.serve_forever() elif self.protocol == "ssl": while True: self.logger.info("Connecting by SSL to server at {0}".format(self.serverAddr)) try: sock = socket.socket(socket.AF_INET,socket.SOCK_STREAM) self.logger.debug("using caCertFile={0}, deviceCertFile={1}, deviceKeyFile={2}".format(self.caCertFile, self.deviceCertFile, self.deviceKeyFile)) sslSocket = ssl.wrap_socket(sock, ca_certs=self.caCertFile, cert_reqs=ssl.CERT_REQUIRED, certfile=self.deviceCertFile, keyfile=self.deviceKeyFile, ssl_version=ssl.PROTOCOL_TLSv1) sslSocket.connect((self.serverAddr.split(':')[0], int(self.serverAddr.split(':')[1]))) servercert = sslSocket.getpeercert() subject = dict(x[0] for x in servercert['subject']) self.logger.info("Connected to server with valid certificate, CN={0}".format(subject['commonName'])) self.sslSocket = sslSocket sslThread = threading.Thread(target = self.sslListen, args = (self.sslSocket,)) sslThread.daemon = True sslThread.start() while True: payload = self.deviceHandler.getMessagePayload() self.logger.debug("Sending payload to {0} by SSL: {1}".format(self.serverAddr, payload)) iotcommon.sendMessage(self.sslSocket, payload) time.sleep(self.sslIntervalSeconds) except Exception as e: self.logger.exception(e) time.sleep(10)
def set_cert(self, key_file=None, cert_file=None, cert_reqs=None, ca_certs=None, assert_hostname=None, assert_fingerprint=None, ca_cert_dir=None): """ This method should only be called once, before the connection is used. """ # If cert_reqs is not provided, we can try to guess. If the user gave # us a cert database, we assume they want to use it: otherwise, if # they gave us an SSL Context object we should use whatever is set for # it. if cert_reqs is None: if ca_certs or ca_cert_dir: cert_reqs = 'CERT_REQUIRED' elif self.ssl_context is not None: cert_reqs = self.ssl_context.verify_mode self.key_file = key_file self.cert_file = cert_file self.cert_reqs = cert_reqs self.assert_hostname = assert_hostname self.assert_fingerprint = assert_fingerprint self.ca_certs = ca_certs and os.path.expanduser(ca_certs) self.ca_cert_dir = ca_cert_dir and os.path.expanduser(ca_cert_dir)
def __init__(self, ssl_keyfile=None, ssl_certfile=None, ssl_cert_reqs=None, ssl_ca_certs=None, **kwargs): if not ssl_available: raise RedisError("Python wasn't built with SSL support") super(SSLConnection, self).__init__(**kwargs) self.keyfile = ssl_keyfile self.certfile = ssl_certfile if ssl_cert_reqs is None: ssl_cert_reqs = ssl.CERT_NONE elif isinstance(ssl_cert_reqs, basestring): CERT_REQS = { 'none': ssl.CERT_NONE, 'optional': ssl.CERT_OPTIONAL, 'required': ssl.CERT_REQUIRED } if ssl_cert_reqs not in CERT_REQS: raise RedisError( "Invalid SSL Certificate Requirements Flag: %s" % ssl_cert_reqs) ssl_cert_reqs = CERT_REQS[ssl_cert_reqs] self.cert_reqs = ssl_cert_reqs self.ca_certs = ssl_ca_certs
def test_can_connect_with_ssl_ca_host_match(self): """ Test to validate that we are able to connect to a cluster using ssl, and host matching test_can_connect_with_ssl_ca_host_match performs a simple sanity check to ensure that we can connect to a cluster with ssl authentication via simple server-side shared certificate authority. It also validates that the host ip matches what is expected @since 3.3 @jira_ticket PYTHON-296 @expected_result The client can connect via SSL and preform some basic operations, with check_hostname specified @test_category connection:ssl """ # find absolute path to client CA_CERTS abs_path_ca_cert_path = os.path.abspath(CLIENT_CA_CERTS) ssl_options = {'ca_certs': abs_path_ca_cert_path, 'ssl_version': ssl.PROTOCOL_TLSv1, 'cert_reqs': ssl.CERT_REQUIRED, 'check_hostname': True} validate_ssl_options(ssl_options=ssl_options)
def init_mtls(config): logger.info("Setting up mTLS...") tls_dir = config["ca_dir"] if tls_dir[0]!='/': tls_dir = os.path.abspath('%s/%s'%(common.WORK_DIR,tls_dir)) # We need to securely pull in the ca password my_key_pw = getpass.getpass("Please enter the password to decrypt your keystore: ") ca_util.setpassword(my_key_pw) # Create HIL Server Connect certs (if not already present) if not os.path.exists("%s/%s-cert.crt"%(tls_dir,config["ip"])): logger.info("Generating new Node Monitor TLS Certs in %s for connecting"%tls_dir) ca_util.cmd_mkcert(tls_dir,config["ip"]) ca_path = "%s/cacert.crt"%(tls_dir) my_cert = "%s/%s-cert.crt"%(tls_dir,config["ip"]) my_priv_key = "%s/%s-private.pem"%(tls_dir,config["ip"]) context = ssl.create_default_context(ssl.Purpose.CLIENT_AUTH) context.load_verify_locations(cafile=ca_path) context.load_cert_chain(certfile=my_cert,keyfile=my_priv_key,password=my_key_pw) context.verify_mode = ssl.CERT_REQUIRED return context
def open(self, host="", port=_DEFAULT_PORT_IMAP4_SSL): self.host = host self.port = port self.sock = socket.create_connection((host, port), timeout=self._timeout) with ca_certs(self.ca_certs) as certs: self.sslobj = ssl.wrap_socket( self.sock, keyfile=self.keyfile, certfile=self.certfile, cert_reqs=ssl.CERT_REQUIRED, ca_certs=certs ) cert = self.sslobj.getpeercert() match_hostname(cert, host) self.file = self.sslobj.makefile("rb")
def connect(self): httplib.HTTPConnection.connect(self) with ca_certs(self.ca_certs) as certs: self.sock = ssl.wrap_socket( self.sock, certfile=self.certfile, keyfile=self.keyfile, cert_reqs=ssl.CERT_REQUIRED if self.require_cert else ssl.CERT_NONE, ca_certs=certs ) if self.require_cert: hostname = self.host if not self._tunnel_host else self._tunnel_host cert = self.sock.getpeercert() match_hostname(cert, hostname)
def starttls(self, keyfile=None, certfile=None): self.ehlo_or_helo_if_needed() if not self.has_extn("starttls"): raise smtplib.SMTPException("server doesn't support STARTTLS") response, reply = self.docmd("STARTTLS") if response == 220: with ca_certs(self._ca_certs) as certs: self.sock = ssl.wrap_socket( self.sock, certfile=certfile, keyfile=keyfile, ca_certs=certs, cert_reqs=ssl.CERT_REQUIRED ) cert = self.sock.getpeercert() match_hostname(cert, self._host) self.file = smtplib.SSLFakeFile(self.sock) self.helo_resp = None self.ehlo_resp = None self.esmtp_features = {} self.does_esmtp = 0 return response, reply
def testHttpsContext(self): client = httplib2.Http(ca_certs=self.ca_certs_path) # Establish connection to local server client.request('https://localhost:%d/' % (self.port)) # Verify that connection uses a TLS context with the correct hostname conn = client.connections['https:localhost:%d' % self.port] self.assertIsInstance(conn.sock, ssl.SSLSocket) self.assertTrue(hasattr(conn.sock, 'context')) self.assertIsInstance(conn.sock.context, ssl.SSLContext) self.assertTrue(conn.sock.context.check_hostname) self.assertEqual(conn.sock.server_hostname, 'localhost') self.assertEqual(conn.sock.context.verify_mode, ssl.CERT_REQUIRED) self.assertEqual(conn.sock.context.protocol, ssl.PROTOCOL_SSLv23)
def __init__(self, host, port=None, key_file=None, cert_file=None, timeout=None, proxy_info=None, ca_certs=None, disable_ssl_certificate_validation=False): # TODO: implement proxy_info self.proxy_info = proxy_info context = None if ca_certs is None: ca_certs = CA_CERTS if (cert_file or ca_certs): if not hasattr(ssl, 'SSLContext'): raise CertificateValidationUnsupportedInPython31() context = ssl.SSLContext(ssl.PROTOCOL_TLSv1) if disable_ssl_certificate_validation: context.verify_mode = ssl.CERT_NONE else: context.verify_mode = ssl.CERT_REQUIRED if cert_file: context.load_cert_chain(cert_file, key_file) if ca_certs: context.load_verify_locations(ca_certs) http.client.HTTPSConnection.__init__( self, host, port=port, key_file=key_file, cert_file=cert_file, timeout=timeout, context=context, check_hostname=disable_ssl_certificate_validation ^ True)
def _create_transport_context(server_side, server_hostname): if server_side: raise ValueError('Server side SSL needs a valid SSLContext') # Client side may pass ssl=True to use a default # context; in that case the sslcontext passed is None. # The default is secure for client connections. if hasattr(ssl, 'create_default_context'): # Python 3.4+: use up-to-date strong settings. sslcontext = ssl.create_default_context() if not server_hostname: sslcontext.check_hostname = False else: # Fallback for Python 3.3. sslcontext = ssl.SSLContext(ssl.PROTOCOL_SSLv23) sslcontext.options |= ssl.OP_NO_SSLv2 sslcontext.options |= ssl.OP_NO_SSLv3 sslcontext.set_default_verify_paths() sslcontext.verify_mode = ssl.CERT_REQUIRED return sslcontext
def test_create_unix_server_ssl_verified(self): proto = MyProto(loop=self.loop) server, path = self._make_ssl_unix_server( lambda: proto, SIGNED_CERTFILE) sslcontext_client = ssl.SSLContext(ssl.PROTOCOL_SSLv23) sslcontext_client.options |= ssl.OP_NO_SSLv2 sslcontext_client.verify_mode = ssl.CERT_REQUIRED sslcontext_client.load_verify_locations(cafile=SIGNING_CA) if hasattr(sslcontext_client, 'check_hostname'): sslcontext_client.check_hostname = True # Connection succeeds with correct CA and server hostname. f_c = self.loop.create_unix_connection(MyProto, path, ssl=sslcontext_client, server_hostname='localhost') client, pr = self.loop.run_until_complete(f_c) # close connection proto.transport.close() client.close() server.close() self.loop.run_until_complete(proto.done)
def make_HTTPS_handler(params, **kwargs): opts_no_check_certificate = params.get('nocheckcertificate', False) if hasattr(ssl, 'create_default_context'): # Python >= 3.4 or 2.7.9 context = ssl.create_default_context(ssl.Purpose.SERVER_AUTH) if opts_no_check_certificate: context.check_hostname = False context.verify_mode = ssl.CERT_NONE try: return YoutubeDLHTTPSHandler(params, context=context, **kwargs) except TypeError: # Python 2.7.8 # (create_default_context present but HTTPSHandler has no context=) pass if sys.version_info < (3, 2): return YoutubeDLHTTPSHandler(params, **kwargs) else: # Python < 3.4 context = ssl.SSLContext(ssl.PROTOCOL_TLSv1) context.verify_mode = (ssl.CERT_NONE if opts_no_check_certificate else ssl.CERT_REQUIRED) context.set_default_verify_paths() return YoutubeDLHTTPSHandler(params, context=context, **kwargs)