我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用ssl.SSLContext()。
def create_ssl_context(proto=ssl.PROTOCOL_SSLv23, verify_mode=ssl.CERT_NONE, protocols=None, options=None, ciphers="ALL"): protocols = protocols or ('PROTOCOL_SSLv3','PROTOCOL_TLSv1', 'PROTOCOL_TLSv1_1','PROTOCOL_TLSv1_2') options = options or ('OP_CIPHER_SERVER_PREFERENCE','OP_SINGLE_DH_USE', 'OP_SINGLE_ECDH_USE','OP_NO_COMPRESSION') context = ssl.SSLContext(proto) context.verify_mode = verify_mode # reset protocol, options context.protocol = 0 context.options = 0 for p in protocols: context.protocol |= getattr(ssl, p, 0) for o in options: context.options |= getattr(ssl, o, 0) context.set_ciphers(ciphers) return context
def _getConnection(self): if self.__connection is not None: return self.__connection url = self.__url if url.scheme == 'http': connection = http.client.HTTPConnection(url.hostname, url.port) elif url.scheme == 'https': ctx = ssl.SSLContext(ssl.PROTOCOL_SSLv23) connection = http.client.HTTPSConnection(url.hostname, url.port, context=ctx) else: raise BuildError("Unsupported URL scheme: '{}'".format(url.schema)) self.__connection = connection return connection
def _load_wincerts(): """Set _WINCERTS to an instance of wincertstore.Certfile.""" global _WINCERTS certfile = CertFile() certfile.addstore("CA") certfile.addstore("ROOT") atexit.register(certfile.close) _WINCERTS = certfile # XXX: Possible future work. # - Support CRL files? Only supported by CPython >= 2.7.9 and >= 3.4 # http://bugs.python.org/issue8813 # - OCSP? Not supported by python at all. # http://bugs.python.org/issue17123 # - Setting OP_NO_COMPRESSION? The server doesn't yet. # - Adding an ssl_context keyword argument to MongoClient? This might # be useful for sites that have unusual requirements rather than # trying to expose every SSLContext option through a keyword/uri # parameter.
def test_context(self): self.client.quit() ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1) self.assertRaises(ValueError, ftplib.FTP_TLS, keyfile=CERTFILE, context=ctx) self.assertRaises(ValueError, ftplib.FTP_TLS, certfile=CERTFILE, context=ctx) self.assertRaises(ValueError, ftplib.FTP_TLS, certfile=CERTFILE, keyfile=CERTFILE, context=ctx) self.client = ftplib.FTP_TLS(context=ctx, timeout=10) self.client.connect(self.server.host, self.server.port) self.assertNotIsInstance(self.client.sock, ssl.SSLSocket) self.client.auth() self.assertIs(self.client.sock.context, ctx) self.assertIsInstance(self.client.sock, ssl.SSLSocket) self.client.prot_p() with self.client.transfercmd('list') as sock: self.assertIs(sock.context, ctx) self.assertIsInstance(sock, ssl.SSLSocket)
def load_ssl_context(cert_file, pkey_file=None, protocol=None): """Loads SSL context from cert/private key files and optional protocol. Many parameters are directly taken from the API of :py:class:`ssl.SSLContext`. :param cert_file: Path of the certificate to use. :param pkey_file: Path of the private key to use. If not given, the key will be obtained from the certificate file. :param protocol: One of the ``PROTOCOL_*`` constants in the stdlib ``ssl`` module. Defaults to ``PROTOCOL_SSLv23``. """ if protocol is None: protocol = ssl.PROTOCOL_SSLv23 ctx = _SSLContext(protocol) ctx.load_cert_chain(cert_file, pkey_file) return ctx
def upgradetotls(self): """ upgrade to a tls wrapped connection :return: None """ # TODO: newer TLS version? # noinspection PyUnresolvedReferences context = ssl.SSLContext(ssl.PROTOCOL_TLSv1) # TODO: PLATFORM STAGECERTIFICATEFILE is not the correct name for this value, move to handler or set a different # variable in TRANSPORT with the same initial value? certkeyfile = sanatizefilename(self.handler.platform.options['STAGECERTIFICATEFILE']['Value']) context.load_cert_chain(certfile=certkeyfile, keyfile=certkeyfile) self.conn = context.wrap_bio(self.recvdataqueue.memorybio, self.senddataqueue.memorybio, server_side=True) print_message("Waiting for connection and TLS handshake...") while True: try: self.conn.do_handshake() break except (ssl.SSLWantReadError, ssl.SSLSyscallError): pass print_message("Upgrade to TLS done")
def upgradetotls(self): """ upgrade to a tls wrapped connection :return: None """ print_debug(DEBUG_MODULE, "upgrading to TLS context") # TODO: newer TLS version? # noinspection PyUnresolvedReferences context = ssl.SSLContext(ssl.PROTOCOL_TLSv1) # TODO: PLATFORM STAGECERTIFICATEFILE is not the correct name for this value, move to handler or set a different # variable in TRANSPORT with the same initial value? certkeyfile = sanatizefilename(self.handler.platform.options['STAGECERTIFICATEFILE']['Value']) context.load_cert_chain(certfile=certkeyfile, keyfile=certkeyfile) self.conn = context.wrap_socket(self.conn, server_side=True) print_message("Upgrade to TLS done")
def __init__(self, *args, **kwargs): """The ``ssl_options`` keyword argument may either be an `ssl.SSLContext` object or a dictionary of keywords arguments for `ssl.wrap_socket` """ self._ssl_options = kwargs.pop('ssl_options', _client_ssl_defaults) super(SSLIOStream, self).__init__(*args, **kwargs) self._ssl_accepting = True self._handshake_reading = False self._handshake_writing = False self._ssl_connect_callback = None self._server_hostname = None # If the socket is already connected, attempt to start the handshake. try: self.socket.getpeername() except socket.error: pass else: # Indirectly start the handshake, which will run on the next # IOLoop iteration and then the real IO state will be set in # _handle_events. self._add_io_state(self.io_loop.WRITE)
def __init__(self, ip, port, repository: Repository): self._ip = ip self._port = port self._loop = asyncio.get_event_loop() self._client_protocols = {} self._service_protocols = {} self._repository = repository self._tcp_pingers = {} self._http_pingers = {} self.logger = logging.getLogger() try: config = json_file_to_dict('./config.json') self._ssl_context = ssl.SSLContext(ssl.PROTOCOL_SSLv23) self._ssl_context.load_cert_chain(config['SSL_CERTIFICATE'], config['SSL_KEY']) except: self._ssl_context = None
def _do_ssl_handshake(self): incoming = ssl.MemoryBIO() outgoing = ssl.MemoryBIO() sslobj = ssl.SSLContext().wrap_bio(incoming, outgoing, False) # do_handshake() while True: try: sslobj.do_handshake() except ssl.SSLWantReadError: self._send_message(TDS_PRELOGIN, outgoing.read()) tag, _, _, buf = self._read_response_packet() assert tag == TDS_PRELOGIN incoming.write(buf) else: break return sslobj, incoming, outgoing
def testHttpsContext(self): client = httplib2.Http(ca_certs=self.ca_certs_path) # Establish connection to local server client.request('https://localhost:%d/' % (self.port)) # Verify that connection uses a TLS context with the correct hostname conn = client.connections['https:localhost:%d' % self.port] self.assertIsInstance(conn.sock, ssl.SSLSocket) self.assertTrue(hasattr(conn.sock, 'context')) self.assertIsInstance(conn.sock.context, ssl.SSLContext) self.assertTrue(conn.sock.context.check_hostname) self.assertEqual(conn.sock.server_hostname, 'localhost') self.assertEqual(conn.sock.context.verify_mode, ssl.CERT_REQUIRED) self.assertEqual(conn.sock.context.protocol, ssl.PROTOCOL_SSLv23)
def test_ssl_hostname_mismatch_repeat(self): # https://github.com/httplib2/httplib2/issues/5 # FIXME(temoto): as of 2017-01-05 this is only a reference code, not useful test. # Because it doesn't provoke described error on my machine. # Instead `SSLContext.wrap_socket` raises `ssl.CertificateError` # which was also added to original patch. # url host is intentionally different, we provoke ssl hostname mismatch error url = 'https://127.0.0.1:%d/' % (self.port,) http = httplib2.Http(ca_certs=self.ca_certs_path, proxy_info=None) def once(): try: http.request(url) assert False, 'expected certificate hostname mismatch error' except Exception as e: print('%s errno=%s' % (repr(e), getattr(e, 'errno', None))) once() once()
def __init__(self, host, port=None, key_file=None, cert_file=None, timeout=None, proxy_info=None, ca_certs=None, disable_ssl_certificate_validation=False): # TODO: implement proxy_info self.proxy_info = proxy_info context = None if ca_certs is None: ca_certs = CA_CERTS if (cert_file or ca_certs): if not hasattr(ssl, 'SSLContext'): raise CertificateValidationUnsupportedInPython31() context = ssl.SSLContext(ssl.PROTOCOL_TLSv1) if disable_ssl_certificate_validation: context.verify_mode = ssl.CERT_NONE else: context.verify_mode = ssl.CERT_REQUIRED if cert_file: context.load_cert_chain(cert_file, key_file) if ca_certs: context.load_verify_locations(ca_certs) http.client.HTTPSConnection.__init__( self, host, port=port, key_file=key_file, cert_file=cert_file, timeout=timeout, context=context, check_hostname=disable_ssl_certificate_validation ^ True)
def _create_transport_context(server_side, server_hostname): if server_side: raise ValueError('Server side SSL needs a valid SSLContext') # Client side may pass ssl=True to use a default # context; in that case the sslcontext passed is None. # The default is secure for client connections. if hasattr(ssl, 'create_default_context'): # Python 3.4+: use up-to-date strong settings. sslcontext = ssl.create_default_context() if not server_hostname: sslcontext.check_hostname = False else: # Fallback for Python 3.3. sslcontext = ssl.SSLContext(ssl.PROTOCOL_SSLv23) sslcontext.options |= ssl.OP_NO_SSLv2 sslcontext.options |= ssl.OP_NO_SSLv3 sslcontext.set_default_verify_paths() sslcontext.verify_mode = ssl.CERT_REQUIRED return sslcontext
def __init__(self, context, server_side, server_hostname=None): """ The *context* argument specifies the ssl.SSLContext to use. The *server_side* argument indicates whether this is a server side or client side transport. The optional *server_hostname* argument can be used to specify the hostname you are connecting to. You may only specify this parameter if the _ssl module supports Server Name Indication (SNI). """ self._context = context self._server_side = server_side self._server_hostname = server_hostname self._state = _UNWRAPPED self._incoming = ssl.MemoryBIO() self._outgoing = ssl.MemoryBIO() self._sslobj = None self._need_ssldata = False self._handshake_cb = None self._shutdown_cb = None
def start(loop, host, port): global server sslctx = None if args.tls: import ssl # TODO: take cert/key from args as well. here = os.path.join(os.path.dirname(__file__), '..', 'tests') sslctx = ssl.SSLContext(ssl.PROTOCOL_SSLv23) sslctx.options |= ssl.OP_NO_SSLv2 sslctx.load_cert_chain( certfile=os.path.join(here, 'ssl_cert.pem'), keyfile=os.path.join(here, 'ssl_key.pem')) server = yield from loop.create_server(Service, host, port, ssl=sslctx) dprint('serving TLS' if sslctx else 'serving', [s.getsockname() for s in server.sockets]) yield from server.wait_closed()
def test_create_unix_server_ssl_verified(self): proto = MyProto(loop=self.loop) server, path = self._make_ssl_unix_server( lambda: proto, SIGNED_CERTFILE) sslcontext_client = ssl.SSLContext(ssl.PROTOCOL_SSLv23) sslcontext_client.options |= ssl.OP_NO_SSLv2 sslcontext_client.verify_mode = ssl.CERT_REQUIRED sslcontext_client.load_verify_locations(cafile=SIGNING_CA) if hasattr(sslcontext_client, 'check_hostname'): sslcontext_client.check_hostname = True # Connection succeeds with correct CA and server hostname. f_c = self.loop.create_unix_connection(MyProto, path, ssl=sslcontext_client, server_hostname='localhost') client, pr = self.loop.run_until_complete(f_c) # close connection proto.transport.close() client.close() server.close() self.loop.run_until_complete(proto.done)
def test_create_server_ssl_verified(self): proto = MyProto(loop=self.loop) server, host, port = self._make_ssl_server( lambda: proto, SIGNED_CERTFILE) sslcontext_client = ssl.SSLContext(ssl.PROTOCOL_SSLv23) sslcontext_client.options |= ssl.OP_NO_SSLv2 sslcontext_client.verify_mode = ssl.CERT_REQUIRED sslcontext_client.load_verify_locations(cafile=SIGNING_CA) if hasattr(sslcontext_client, 'check_hostname'): sslcontext_client.check_hostname = True # Connection succeeds with correct CA and server hostname. f_c = self.loop.create_connection(MyProto, host, port, ssl=sslcontext_client, server_hostname='localhost') client, pr = self.loop.run_until_complete(f_c) # close connection proto.transport.close() client.close() server.close() self.loop.run_until_complete(proto.done)
def make_HTTPS_handler(params, **kwargs): opts_no_check_certificate = params.get('nocheckcertificate', False) if hasattr(ssl, 'create_default_context'): # Python >= 3.4 or 2.7.9 context = ssl.create_default_context(ssl.Purpose.SERVER_AUTH) if opts_no_check_certificate: context.check_hostname = False context.verify_mode = ssl.CERT_NONE try: return YoutubeDLHTTPSHandler(params, context=context, **kwargs) except TypeError: # Python 2.7.8 # (create_default_context present but HTTPSHandler has no context=) pass if sys.version_info < (3, 2): return YoutubeDLHTTPSHandler(params, **kwargs) else: # Python < 3.4 context = ssl.SSLContext(ssl.PROTOCOL_TLSv1) context.verify_mode = (ssl.CERT_NONE if opts_no_check_certificate else ssl.CERT_REQUIRED) context.set_default_verify_paths() return YoutubeDLHTTPSHandler(params, context=context, **kwargs)
def ssl_wrap_socket(cls, s, ssl_options, server_hostname=None, **kwargs): """Returns an ``ssl.SSLSocket`` wrapping the given socket. ``ssl_options`` may be either a dictionary (as accepted by `ssl_options_to_context`) or an `ssl.SSLContext` object. Additional keyword arguments are passed to ``wrap_socket`` (either the `~ssl.SSLContext` method or the `ssl` module function as appropriate). """ context = ssl_options_to_context(ssl_options) if hasattr(ssl, 'SSLContext') and isinstance(context, ssl.SSLContext): if server_hostname is not None and getattr(ssl, 'HAS_SNI'): # Python doesn't have server-side SNI support so we can't # really unittest this, but it can be manually tested with # python3.2 -m tornado.httpclient https://sni.velox.ch return context.wrap_socket(s, server_hostname=server_hostname, **kwargs) else: return context.wrap_socket(s, **kwargs) else: return ssl.wrap_socket(s, **dict(context, **kwargs))
def connect_to_vc(vchost, user, pwd): # Disabling SSL certificate verification if hasattr(ssl, 'SSLContext'): context = ssl.SSLContext(ssl.PROTOCOL_SSLv23) context.verify_mode = ssl.CERT_NONE else: context = None if vchost.find(':') != -1: host, port = vchost.split(':') else: host = vchost port = 443 if context: service_instance = SmartConnect(host=host, port=port, user=user, pwd=pwd, sslContext=context) else: service_instance = SmartConnect(host=host, port=port, user=user, pwd=pwd) return service_instance.RetrieveContent()
def _create_ssl_ctx(self, sslp): if isinstance(sslp, ssl.SSLContext): return sslp ca = sslp.get('ca') capath = sslp.get('capath') hasnoca = ca is None and capath is None ctx = ssl.create_default_context(cafile=ca, capath=capath) ctx.check_hostname = not hasnoca and sslp.get('check_hostname', True) ctx.verify_mode = ssl.CERT_NONE if hasnoca else ssl.CERT_REQUIRED if 'cert' in sslp: ctx.load_cert_chain(sslp['cert'], keyfile=sslp.get('key')) if 'cipher' in sslp: ctx.set_ciphers(sslp['cipher']) ctx.options |= ssl.OP_NO_SSLv2 ctx.options |= ssl.OP_NO_SSLv3 return ctx
def get_ssl_context(self, protocol=None): """ This method returns a SSL context based on the file that was specified when this object was created. Arguments: - An optional protocol. SSLv2 is used by default. Returns: - The SSL context """ # Validate the arguments if protocol is None: protocol = ssl.PROTOCOL_SSLv2 # Create an SSL context from the stored file and password. ssl_context = ssl.SSLContext(protocol) ssl_context.load_cert_chain(self._cert_filename, password=self._password) # Return the context return ssl_context
def __init__(self, target): # Target comes as protocol://target:port/path self.target = target proto, host, path = target.split(':') host = host[2:] self.path = '/' + path.split('/', 1)[1] if proto.lower() == 'https': #Create unverified (insecure) context try: uv_context = ssl.SSLContext(ssl.PROTOCOL_SSLv23) self.session = HTTPSConnection(host,context=uv_context) except AttributeError: #This does not exist on python < 2.7.11 self.session = HTTPSConnection(host) else: self.session = HTTPConnection(host) self.lastresult = None
def __init__(self, host, port=None, key_file=None, cert_file=None, timeout=None, proxy_info=None, ca_certs=None, disable_ssl_certificate_validation=False): self.proxy_info = proxy_info context = None if ca_certs is None: ca_certs = CA_CERTS if (cert_file or ca_certs) and not disable_ssl_certificate_validation: if not hasattr(ssl, 'SSLContext'): raise CertificateValidationUnsupportedInPython31() context = ssl.SSLContext(ssl.PROTOCOL_TLSv1) context.verify_mode = ssl.CERT_REQUIRED if cert_file: context.load_cert_chain(cert_file, key_file) if ca_certs: context.load_verify_locations(ca_certs) http.client.HTTPSConnection.__init__( self, host, port=port, key_file=key_file, cert_file=cert_file, timeout=timeout, context=context, check_hostname=True)
def start_connection(self) -> None: if self.settings.ftps['no_certificate_check']: context = ssl.SSLContext(ssl.PROTOCOL_TLSv1_2) context.verify_mode = ssl.CERT_NONE context.check_hostname = False else: context = ssl.create_default_context() self.ftps = FTP_TLS( host=self.settings.ftps['address'], user=self.settings.ftps['user'], passwd=self.settings.ftps['passwd'], context=context, source_address=self.settings.ftps['source_address'], timeout=self.settings.timeout_timer ) # Hath downloads self.ftps.prot_p()
def serve(self): self.initialize() self.challenge_thread = ChallengeThread.ChallengeThread(self) self.challenge_thread.start() if self.ssl_on: ssl_context = ssl.SSLContext(protocol=ssl.PROTOCOL_SSLv23) ssl_context.load_cert_chain(self.ssl_cert) self.server_socket = websockets.serve( self.handle_connection, self.listen_address, self.port, ssl=ssl_context) else: self.server_socket = websockets.serve( self.handle_connection, self.listen_address, self.port) try: asyncio.get_event_loop().run_until_complete(self.server_socket) asyncio.get_event_loop().run_forever() except KeyboardInterrupt: print("Closing the server") asyncio.get_event_loop().close()
def __init__(self, address, username='root', password='ca$hc0w', port=443, sslContext=None): self.address = address self.username = username self.password = password self.port = port if not sslContext: self.sslContext = ssl.SSLContext(ssl.PROTOCOL_SSLv23) self.sslContext.verify_mode = ssl.CERT_NONE else: self.sslContext = sslContext self._apiType = '' self._vms = dict() self._datacenters = dict() self._hosts = dict() self._datastores = dict()
def __init__(self, host, port=None, key_file=None, cert_file=None, strict=_strict_sentinel, timeout=socket._GLOBAL_DEFAULT_TIMEOUT, source_address=None, **_3to2kwargs): if 'check_hostname' in _3to2kwargs: check_hostname = _3to2kwargs['check_hostname']; del _3to2kwargs['check_hostname'] else: check_hostname = None if 'context' in _3to2kwargs: context = _3to2kwargs['context']; del _3to2kwargs['context'] else: context = None super(HTTPSConnection, self).__init__(host, port, strict, timeout, source_address) self.key_file = key_file self.cert_file = cert_file if context is None: # Some reasonable defaults context = ssl.SSLContext(ssl.PROTOCOL_SSLv23) context.options |= ssl.OP_NO_SSLv2 will_verify = context.verify_mode != ssl.CERT_NONE if check_hostname is None: check_hostname = will_verify elif check_hostname and not will_verify: raise ValueError("check_hostname needs a SSL context with " "either CERT_OPTIONAL or CERT_REQUIRED") if key_file or cert_file: context.load_cert_chain(cert_file, key_file) self._context = context self._check_hostname = check_hostname
def make_https_server(case, certfile=CERTFILE, host=HOST, handler_class=None): # we assume the certfile contains both private key and certificate context = ssl.SSLContext(ssl.PROTOCOL_SSLv23) context.load_cert_chain(certfile) server = HTTPSServerThread(context, host, handler_class) flag = threading.Event() server.start(flag) flag.wait() def cleanup(): if support.verbose: sys.stdout.write('stopping HTTPS server\n') server.stop() if support.verbose: sys.stdout.write('joining HTTPS thread\n') server.join() case.addCleanup(cleanup) return server
def urlopen(url, data=None, timeout=socket._GLOBAL_DEFAULT_TIMEOUT, **_3to2kwargs): if 'cadefault' in _3to2kwargs: cadefault = _3to2kwargs['cadefault']; del _3to2kwargs['cadefault'] else: cadefault = False if 'capath' in _3to2kwargs: capath = _3to2kwargs['capath']; del _3to2kwargs['capath'] else: capath = None if 'cafile' in _3to2kwargs: cafile = _3to2kwargs['cafile']; del _3to2kwargs['cafile'] else: cafile = None global _opener if cafile or capath or cadefault: if not _have_ssl: raise ValueError('SSL support not available') context = ssl.SSLContext(ssl.PROTOCOL_SSLv23) context.options |= ssl.OP_NO_SSLv2 context.verify_mode = ssl.CERT_REQUIRED if cafile or capath: context.load_verify_locations(cafile, capath) else: context.set_default_verify_paths() https_handler = HTTPSHandler(context=context, check_hostname=True) opener = build_opener(https_handler) elif _opener is None: _opener = opener = build_opener() else: opener = _opener return opener.open(url, data, timeout)
def setup_client(self): if self.client is None: if not HAVE_MQTT: print("Please install paho-mqtt 'pip install paho-mqtt' " "to use this library") return False self.client = mqtt.Client( client_id=self.blid, clean_session=self.clean, protocol=mqtt.MQTTv311) # Assign event callbacks self.client.on_message = self.on_message self.client.on_connect = self.on_connect self.client.on_publish = self.on_publish self.client.on_subscribe = self.on_subscribe self.client.on_disconnect = self.on_disconnect # Uncomment to enable debug messages # client.on_log = self.on_log # set TLS, self.cert_name is required by paho-mqtt, even if the # certificate is not used... # but v1.3 changes all this, so have to do the following: self.log.info("Seting TLS") try: self.client.tls_set( self.cert_name, cert_reqs=ssl.CERT_NONE, tls_version=ssl.PROTOCOL_TLSv1) except ValueError: # try V1.3 version self.log.warn("TLS Setting failed - trying 1.3 version") self.client._ssl_context = None context = ssl.SSLContext(ssl.PROTOCOL_TLSv1) context.verify_mode = ssl.CERT_NONE context.load_default_certs() self.client.tls_set_context(context) # disables peer verification self.client.tls_insecure_set(True) self.client.username_pw_set(self.blid, self.password) return True return False
def wrap_socket(self, sock): """Wrap a socket in an SSL context (see `ssl.wrap_socket`) :param socket: Plain socket :type socket: :class:`socket.socket` """ if self._wrap_socket is None: if hasattr(ssl, 'SSLContext'): ssl_context = ssl.create_default_context(cafile=self.cafile) ssl_context.check_hostname = False if self.certfile is not None: ssl_context.load_cert_chain(certfile=self.certfile, keyfile=self.keyfile, password=self.password) self._wrap_socket = ssl_context.wrap_socket else: self._wrap_socket = self._legacy_wrap_socket() return self._wrap_socket(sock)