我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用ssl.create_default_context()。
def init_mtls(config): logger.info("Setting up mTLS...") tls_dir = config["ca_dir"] if tls_dir[0]!='/': tls_dir = os.path.abspath('%s/%s'%(common.WORK_DIR,tls_dir)) # We need to securely pull in the ca password my_key_pw = getpass.getpass("Please enter the password to decrypt your keystore: ") ca_util.setpassword(my_key_pw) # Create HIL Server Connect certs (if not already present) if not os.path.exists("%s/%s-cert.crt"%(tls_dir,config["ip"])): logger.info("Generating new Node Monitor TLS Certs in %s for connecting"%tls_dir) ca_util.cmd_mkcert(tls_dir,config["ip"]) ca_path = "%s/cacert.crt"%(tls_dir) my_cert = "%s/%s-cert.crt"%(tls_dir,config["ip"]) my_priv_key = "%s/%s-private.pem"%(tls_dir,config["ip"]) context = ssl.create_default_context(ssl.Purpose.CLIENT_AUTH) context.load_verify_locations(cafile=ca_path) context.load_cert_chain(certfile=my_cert,keyfile=my_priv_key,password=my_key_pw) context.verify_mode = ssl.CERT_REQUIRED return context
def urlopen(url, data=None, timeout=socket._GLOBAL_DEFAULT_TIMEOUT, cafile=None, capath=None, cadefault=False, context=None): global _opener if cafile or capath or cadefault: if context is not None: raise ValueError( "You can't pass both context and any of cafile, capath, and " "cadefault" ) if not _have_ssl: raise ValueError('SSL support not available') context = ssl.create_default_context(purpose=ssl.Purpose.SERVER_AUTH, cafile=cafile, capath=capath) https_handler = HTTPSHandler(context=context) opener = build_opener(https_handler) elif context: https_handler = HTTPSHandler(context=context) opener = build_opener(https_handler) elif _opener is None: _opener = opener = build_opener() else: opener = _opener return opener.open(url, data, timeout)
def __init__(self, auth_url, username, tenant, loop=None, log=None, cafile=None, token_renew_delay=3300): self.auth_url = auth_url self.username = username self.tenant = tenant self.log = log self.token_renew_delay = token_renew_delay self.loop = loop or asyncio.get_event_loop() self.headers = {"content-type": "application/json", "accept": "application/json"} if cafile: sslcontext = ssl.create_default_context(cafile=cafile) conn = aiohttp.TCPConnector(ssl_context=sslcontext) self.session = aiohttp.ClientSession(connector=conn, loop=self.loop) else: session = aiohttp.ClientSession(loop=self.loop)
def _create_transport_context(server_side, server_hostname): if server_side: raise ValueError('Server side SSL needs a valid SSLContext') # Client side may pass ssl=True to use a default # context; in that case the sslcontext passed is None. # The default is secure for client connections. if hasattr(ssl, 'create_default_context'): # Python 3.4+: use up-to-date strong settings. sslcontext = ssl.create_default_context() if not server_hostname: sslcontext.check_hostname = False else: # Fallback for Python 3.3. sslcontext = ssl.SSLContext(ssl.PROTOCOL_SSLv23) sslcontext.options |= ssl.OP_NO_SSLv2 sslcontext.options |= ssl.OP_NO_SSLv3 sslcontext.set_default_verify_paths() sslcontext.verify_mode = ssl.CERT_REQUIRED return sslcontext
def make_HTTPS_handler(params, **kwargs): opts_no_check_certificate = params.get('nocheckcertificate', False) if hasattr(ssl, 'create_default_context'): # Python >= 3.4 or 2.7.9 context = ssl.create_default_context(ssl.Purpose.SERVER_AUTH) if opts_no_check_certificate: context.check_hostname = False context.verify_mode = ssl.CERT_NONE try: return YoutubeDLHTTPSHandler(params, context=context, **kwargs) except TypeError: # Python 2.7.8 # (create_default_context present but HTTPSHandler has no context=) pass if sys.version_info < (3, 2): return YoutubeDLHTTPSHandler(params, **kwargs) else: # Python < 3.4 context = ssl.SSLContext(ssl.PROTOCOL_TLSv1) context.verify_mode = (ssl.CERT_NONE if opts_no_check_certificate else ssl.CERT_REQUIRED) context.set_default_verify_paths() return YoutubeDLHTTPSHandler(params, context=context, **kwargs)
def _create_ssl_ctx(self, sslp): if isinstance(sslp, ssl.SSLContext): return sslp ca = sslp.get('ca') capath = sslp.get('capath') hasnoca = ca is None and capath is None ctx = ssl.create_default_context(cafile=ca, capath=capath) ctx.check_hostname = not hasnoca and sslp.get('check_hostname', True) ctx.verify_mode = ssl.CERT_NONE if hasnoca else ssl.CERT_REQUIRED if 'cert' in sslp: ctx.load_cert_chain(sslp['cert'], keyfile=sslp.get('key')) if 'cipher' in sslp: ctx.set_ciphers(sslp['cipher']) ctx.options |= ssl.OP_NO_SSLv2 ctx.options |= ssl.OP_NO_SSLv3 return ctx
def __init__(self, timeout=None, proxy=None, cacert=None, sessions=False): if (timeout is not None) and not self.supports_feature('timeout'): raise RuntimeError('timeout is not supported with urllib2 transport') if proxy: raise RuntimeError('proxy is not supported with urllib2 transport') if cacert: raise RuntimeError('cacert is not support with urllib2 transport') handlers = [] if ((sys.version_info[0] == 2 and sys.version_info >= (2,7,9)) or (sys.version_info[0] == 3 and sys.version_info >= (3,2,0))): context = ssl.create_default_context() context.check_hostname = False context.verify_mode = ssl.CERT_NONE handlers.append(urllib2.HTTPSHandler(context=context)) if sessions: handlers.append(urllib2.HTTPCookieProcessor(CookieJar())) opener = urllib2.build_opener(*handlers) self.request_opener = opener.open self._timeout = timeout
def urlopen_maybe_no_check_cert(URL): """ Similar to urllib.request.urlopen, but disables certificate verification on Mac. """ context = None from urllib.request import urlopen if mac(): from ssl import create_default_context, CERT_NONE context = create_default_context() context.check_hostname = False context.verify_mode = CERT_NONE timeout = 10 # seconds fp = None try: fp = urlopen(URL, timeout=timeout, context=context) except TypeError: fp = urlopen(URL, timeout=timeout) return fp
def __init__(self, username, password, provider): """Constructor used to init the config and authenticate""" self.config = oneandoneemailconfig.OneAndOneConfig(provider) self.url = self.config.get_config() self.account_cached = False self.oneandoneuser = username self.onenandonepassword = password self.headers = {'User-Agent': EmailAccountManager.userAgent} self.cookies = http.cookiejar.LWPCookieJar() handlers = [ urllib.request.HTTPHandler(), urllib.request.HTTPSHandler(), urllib.request.HTTPCookieProcessor(self.cookies) ] ctx = ssl.create_default_context() ctx.check_hostname = False ctx.verify_mode = ssl.CERT_NONE self.opener = urllib.request.build_opener( urllib.request.HTTPSHandler(context=ctx), *handlers) self.is_logged = self.authenticate()
def kubectl_or_oc(server: str) -> str: """ Return "kubectl" or "oc", the command-line tool we should use. :param server: The URL of the cluster API server. """ if which("oc") is None: return "kubectl" # We've got oc, and possibly kubectl as well. We only want oc for OpenShift # servers, so check for an OpenShift API endpoint: ctx = ssl.create_default_context() ctx.check_hostname = False ctx.verify_mode = ssl.CERT_NONE try: with urlopen(server + "/version/openshift", context=ctx) as u: u.read() except HTTPError: return "kubectl" else: return "oc"
def get_opener(): autoproxy = '127.0.0.1:8087' import ssl if getattr(ssl, "create_default_context", None): cafile = os.path.join(data_root, "gae_proxy", "CA.crt") if not os.path.isfile(cafile): cafile = None context = ssl.create_default_context(purpose=ssl.Purpose.SERVER_AUTH, cafile=cafile) https_handler = urllib2.HTTPSHandler(context=context) opener = urllib2.build_opener(urllib2.ProxyHandler({'http': autoproxy, 'https': autoproxy}), https_handler) else: opener = urllib2.build_opener(urllib2.ProxyHandler({'http': autoproxy, 'https': autoproxy})) return opener
def start_connection(self) -> None: if self.settings.ftps['no_certificate_check']: context = ssl.SSLContext(ssl.PROTOCOL_TLSv1_2) context.verify_mode = ssl.CERT_NONE context.check_hostname = False else: context = ssl.create_default_context() self.ftps = FTP_TLS( host=self.settings.ftps['address'], user=self.settings.ftps['user'], passwd=self.settings.ftps['passwd'], context=context, source_address=self.settings.ftps['source_address'], timeout=self.settings.timeout_timer ) # Hath downloads self.ftps.prot_p()
def server_thread_fn(): server_ctx = ssl.create_default_context(ssl.Purpose.CLIENT_AUTH) server_ctx.load_cert_chain("trio-test-1.pem") server = server_ctx.wrap_socket( server_sock, server_side=True, suppress_ragged_eofs=False, ) while True: data = server.recv(4096) print("server got:", data) if not data: print("server waiting for client to finish everything") client_done.wait() print("server attempting to send back close-notify") server.unwrap() print("server ok") break server.sendall(data)
def test_ssl_handshake_failure_during_aclose(): # Weird scenario: aclose() triggers an automatic handshake, and this # fails. This also exercises a bit of code in aclose() that was otherwise # uncovered, for re-raising exceptions after calling aclose_forcefully on # the underlying transport. async with ssl_echo_server_raw(expect_fail=True) as sock: # Don't configure trust correctly client_ctx = stdlib_ssl.create_default_context() s = tssl.SSLStream( sock, client_ctx, server_hostname="trio-test-1.example.org" ) # It's a little unclear here whether aclose should swallow the error # or let it escape. We *do* swallow the error if it arrives when we're # sending close_notify, because both sides closing the connection # simultaneously is allowed. But I guess when https_compatible=False # then it's bad if we can get through a whole connection with a peer # that has no valid certificate, and never raise an error. with pytest.raises(BrokenStreamError) as excinfo: await s.aclose()
def __init__(self, url, proxy, cafile): self.url = url self.proxy = proxy if proxy: logging.info("Using HTTPS proxy: " + proxy) proxy_handler = urllib2.ProxyHandler({'https': proxy}) opener = urllib2.build_opener(proxy_handler) urllib2.install_opener(opener) self.kwargs = {} if cafile and hasattr(ssl, "create_default_context"): logging.info("Using CA file: " + cafile) ctx = ssl.create_default_context() ctx.load_verify_locations(cafile = cafile) self.kwargs['context'] = ctx # given an infoMap returned by the local node, call up the home server
def parse_and_run(args=None): sslBaseDir = path.join(BASE_DIR, 'ssl') p = ArgumentParser() p.add_argument('--bind', '-b', action='store', help='the address to bind to', default='127.0.0.1') p.add_argument('--port', '-p', action='store', type=int, help='the port to listen on', default=8080) p.add_argument('--debug', '-d', action='store_true', help='enable debugging (use with caution)', default=False) p.add_argument('--ssl', '-s', action='store_true', help='enable ssl', default=False) args = p.parse_args(args) if args.ssl: ctx = ssl.create_default_context(ssl.Purpose.CLIENT_AUTH) ctx.load_cert_chain(path.join(sslBaseDir, 'server.crt'), path.join(sslBaseDir, 'server.key')) app.config['SESSION_TYPE'] = 'filesystem' app.run(host=args.bind, port=args.port, debug=args.debug, ssl_context=ctx) else: app.run(host=args.bind, port=args.port, debug=args.debug)
def parse_and_run(args=None): basedir = path.join(path.abspath(path.dirname(__file__)), 'Hermes', 'Server', 'ssl') p = ArgumentParser() p.add_argument('--bind', '-b', action='store', help='the address to bind to', default='127.0.0.1') p.add_argument('--port', '-p', action='store', type=int, help='the port to listen on', default=8080) p.add_argument('--debug', '-d', action='store_true', help='enable debugging (use with caution)', default=False) p.add_argument('--ssl', '-s', action='store_true', help='enable ssl', default=False) args = p.parse_args(args) if args.ssl: ctx = ssl.create_default_context(ssl.Purpose.CLIENT_AUTH) ctx.load_cert_chain(path.join(basedir, 'server.crt'), path.join(basedir, 'server.key')) app.config['SESSION_TYPE'] = 'filesystem' app.run(host=args.bind, port=args.port, debug=args.debug, ssl_context=ctx) else: app.run(host=args.bind, port=args.port, debug=args.debug)
def __init__(self, cfg): super().__init__(cfg) # Use default client behavior if ca_file not provided. context = None if 'ca_file' in cfg['server']: context = ssl.create_default_context( cafile=cfg['server']['ca_file'] ) self._conn = imapclient.IMAPClient( cfg['server']['hostname'], use_uid=True, ssl=True, port=cfg['server'].get('port'), ssl_context=context, ) username = cfg['server']['username'] password = secrets.get_password(cfg) self._conn.login(username, password)
def wrap_socket(self, sock): """Wrap a socket in an SSL context (see `ssl.wrap_socket`) :param socket: Plain socket :type socket: :class:`socket.socket` """ if self._wrap_socket is None: if hasattr(ssl, 'SSLContext'): ssl_context = ssl.create_default_context(cafile=self.cafile) ssl_context.check_hostname = False if self.certfile is not None: ssl_context.load_cert_chain(certfile=self.certfile, keyfile=self.keyfile, password=self.password) self._wrap_socket = ssl_context.wrap_socket else: self._wrap_socket = self._legacy_wrap_socket() return self._wrap_socket(sock)
def create_socket(self): family = socket.AF_INET6 if config.connection.ipv6 else socket.AF_INET if config.connection.proxy: proxy_server, proxy_port = config.connection.proxy.split(':') self.sock = socks.socksocket(family, socket.SOCK_STREAM) self.sock.setblocking(0) self.sock.settimeout(15) self.sock.setproxy(socks.PROXY_TYPE_SOCKS5, proxy_server, int(proxy_port)) else: self.sock = socket.socket(family, socket.SOCK_STREAM) if config.connection.vhost: self.sock.bind((config.connection.vhost, 0)) if config.connection.ssl: ctx = ssl.create_default_context() if config.cert.file: ctx.load_cert_chain(config.cert.file, config.cert.key, config.cert.password) if config.connection.ssl_verify: ctx.verify_mode = ssl.CERT_REQUIRED ctx.load_default_certs() else: ctx.check_hostname = False ctx.verify_mode = ssl.CERT_NONE self.sock = ctx.wrap_socket(self.sock)
def is_SOAP(ip, port, timeout): try: ctx = ssl.create_default_context() ctx.check_hostname = False ctx.verify_mode = ssl.CERT_NONE output = urllib2.urlopen('https://'+ip+":"+str(port), context=ctx, timeout=timeout).read() if "rO0AB" in output:return (1, True) except urllib2.HTTPError, e: if ((e.getcode() == 500) and ("rO0AB" in e.read())):return (1, True) except:pass try: output = urllib2.urlopen('http://'+ip+":"+str(port), timeout=timeout).read() if "rO0AB" in output:return (0, True) except urllib2.HTTPError, e: if ((e.getcode() == 500) and ("rO0AB" in e.read())):return (0, True) except:pass return (2, False)
def make_https_server(case, context=None, certfile=CERTFILE, host=HOST, handler_class=None): if context is None: context = ssl.create_default_context(ssl.Purpose.CLIENT_AUTH) # We assume the certfile contains both private key and certificate context.load_cert_chain(certfile) server = HTTPSServerThread(context, host, handler_class) flag = threading.Event() server.start(flag) flag.wait() def cleanup(): if support.verbose: sys.stdout.write('stopping HTTPS server\n') server.stop() if support.verbose: sys.stdout.write('joining HTTPS thread\n') server.join() case.addCleanup(cleanup) return server
def init_from_url(self, snapshot=-1, thingpedia_url=None): if thingpedia_url is None: thingpedia_url = os.getenv('THINGPEDIA_URL', 'https://thingpedia.stanford.edu/thingpedia') ssl_context = ssl.create_default_context() with urllib.request.urlopen(thingpedia_url + '/api/snapshot/' + str(snapshot) + '?meta=1', context=ssl_context) as res: self._process_devices(json.load(res)['data']) with urllib.request.urlopen(thingpedia_url + '/api/entities?snapshot=' + str(snapshot), context=ssl_context) as res: self._process_entities(json.load(res)['data'])
def run(): np.random.seed(42) config = ServerConfig.load(('./server.conf',)) if sys.version_info[2] >= 6: thread_pool = ThreadPoolExecutor(thread_name_prefix='query-thread-') else: thread_pool = ThreadPoolExecutor(max_workers=32) app = Application(config, thread_pool) if config.ssl_key: ssl_ctx = ssl.create_default_context(ssl.Purpose.CLIENT_AUTH) ssl_ctx.load_cert_chain(config.ssl_chain, config.ssl_key) app.listen(config.port, ssl_options=ssl_ctx) else: app.listen(config.port) if config.user: os.setgid(grp.getgrnam(config.user)[2]) os.setuid(pwd.getpwnam(config.user)[2]) if sd: sd.notify('READY=1') tokenizer_service = TokenizerService() tokenizer_service.run() for language in config.languages: load_language(app, tokenizer_service, language, config.get_model_directory(language)) sys.stdout.flush() tornado.ioloop.IOLoop.current().start()
def _get_ssl(self, cert=None): return ssl.create_default_context( purpose=ssl.Purpose.CLIENT_AUTH, cadata=cert)
def __init__(self, output_spec): """ Uses HTTP chunked transfer-encoding to stream a request body to a server. Unfortunately requests does not support hooking into this logic easily, so we use the lower-level httplib module. """ super(HttpStreamPushAdapter, self).__init__(output_spec) self._closed = False parts = urlparse.urlparse(output_spec['url']) if parts.scheme == 'https': ssl_context = ssl.create_default_context() conn = httplib.HTTPSConnection(parts.netloc, context=ssl_context) else: conn = httplib.HTTPConnection(parts.netloc) try: conn.putrequest(output_spec.get('method', 'POST').upper(), parts.path, skip_accept_encoding=True) for header, value in output_spec.get('headers', {}).items(): conn.putheader(header, value) conn.putheader('Transfer-Encoding', 'chunked') conn.endheaders() # This actually flushes the headers to the server except Exception: print('HTTP connection to "%s" failed.' % output_spec['url']) conn.close() raise self.conn = conn
def __init__(self, url, headers={}): self._url = url """ Uses HTTP chunked transfer-encoding to stream a request body to a server. Unfortunately requests does not support hooking into this logic easily, so we use the lower-level httplib module. """ self._closed = False parts = urlparse.urlparse(self._url) if parts.scheme == 'https': ssl_context = ssl.create_default_context() conn = httplib.HTTPSConnection(parts.netloc, context=ssl_context) else: conn = httplib.HTTPConnection(parts.netloc) try: url = parts.path if parts.query is not None: url = '%s?%s' % (url, parts.query) conn.putrequest('POST', url, skip_accept_encoding=True) for header, value in headers.items(): conn.putheader(header, value) conn.putheader('Transfer-Encoding', 'chunked') conn.endheaders() # This actually flushes the headers to the server except Exception: sys.stderr.write('HTTP connection to "%s" failed.\n' % self._url) conn.close() raise self.conn = conn
def http_post(shark, url, data): log = shark.log.bind(url=url) opts = shark.config['HTTP'] if opts.get('ssl_cafile'): ssl_context = ssl.create_default_context(cafile=opts['ssl_cafile']) else: ssl_context = None conn = aiohttp.TCPConnector(ssl_context=ssl_context) async with aiohttp.ClientSession(connector=conn) as session: wait = opts['wait'] for n in range(opts['tries']): if n > 0: await asyncio.sleep(wait) try: log.debug('http request', data=data) async with session.post(url, json=data, timeout=opts['timeout']) as resp: if resp.status == 429: # Too many requests. wait = _get_rate_limit_wait(log, resp, opts) continue else: wait = opts['wait'] resp.raise_for_status() data = await resp.json() log.debug('http response', data=data) return data except aiohttp.ClientError: log.exception('unhandled exception in http_post') except asyncio.TimeoutError: log.exception('timeout in http_post') return {'status': 'error', 'error': c.ERR_SERVICE_UNAVAILABLE}
def get_ssl_context(self): ssl_settings = self.config.get('WS_SSL') if ssl_settings: ssl_context = ssl.create_default_context(ssl.Purpose.CLIENT_AUTH) ssl_context.load_cert_chain(certfile=ssl_settings['cert'], keyfile=ssl_settings['key']) return ssl_context
def test_check_hostname(self): # Test that server_hostname parameter to start_tls is being used. # The check_hostname functionality is only available in python 2.7 and # up and in python 3.4 and up. server_future = self.server_start_tls(_server_ssl_options()) client_future = self.client_start_tls( ssl.create_default_context(), server_hostname=b'127.0.0.1') with ExpectLog(gen_log, "SSL Error"): with self.assertRaises(ssl.SSLError): # The client fails to connect with an SSL error. yield client_future with self.assertRaises(Exception): # The server fails to connect, but the exact error is unspecified. yield server_future
def get_tls_context(self): ca_cert = config.get('tenant', 'ca_cert') my_cert = config.get('tenant', 'my_cert') my_priv_key = config.get('tenant', 'private_key') my_key_pw = config.get('tenant','private_key_pw') tls_dir = config.get('tenant','tls_dir') if tls_dir == 'default': ca_cert = 'cacert.crt' my_cert = 'client-cert.crt' my_priv_key = 'client-private.pem' tls_dir = 'cv_ca' # this is relative path, convert to absolute in WORK_DIR if tls_dir[0]!='/': tls_dir = os.path.abspath('%s/%s'%(common.WORK_DIR,tls_dir)) if my_key_pw=='default': logger.warning("CAUTION: using default password for private key, please set private_key_pw to a strong password") logger.info("Setting up client TLS in %s"%(tls_dir)) ca_path = "%s/%s"%(tls_dir,ca_cert) my_cert = "%s/%s"%(tls_dir,my_cert) my_priv_key = "%s/%s"%(tls_dir,my_priv_key) context = ssl.create_default_context() context.load_verify_locations(cafile=ca_path) context.load_cert_chain(certfile=my_cert,keyfile=my_priv_key,password=my_key_pw) context.verify_mode = ssl.CERT_REQUIRED context.check_hostname = config.getboolean('general','tls_check_hostnames') return context
def test_dir(url, depth, max_depth, dir_list): if (depth > max_depth): return depth = depth +1 for d in dir_list: new_url = url + "/" + d try: # make a SSL handler that ignores SSL CERT issues ctx = ssl.create_default_context() ctx.check_hostname = False ctx.verify_mode = ssl.CERT_NONE response = urllib2.urlopen(new_url, context=ctx) if response and response.getcode() == 200: print "[+] FOUND %s" % (new_url) thread.start_new_thread(test_dir, (new_url, depth, max_depth, dir_list, )) except urllib2.HTTPError, e: if e.code == 401: print "[!] Authorization Required %s " % (new_url) elif e.code == 403: print "[!] Forbidden %s " % (new_url) elif e.code == 404: print "[-] Not Found %s " % (new_url) elif e.code == 503: print "[!] Service Unavailable %s " % (new_url) else: print "[?] Unknwon"
def test_dir(url, depth, max_depth, dir_list): if (depth > max_depth): return depth = depth +1 found = list() for d in dir_list: new_url = url + "/" + d try: # make a SSL handler that ignores SSL CERT issues ctx = ssl.create_default_context() ctx.check_hostname = False ctx.verify_mode = ssl.CERT_NONE response = urllib2.urlopen(new_url, context=ctx) if response and response.getcode() == 200: print "[+] FOUND %s" % (new_url) found.append(new_url) except urllib2.HTTPError, e: if e.code == 401: print "[!] Authorization Required %s " % (new_url) elif e.code == 403: print "[!] Forbidden %s " % (new_url) elif e.code == 404: print "[-] Not Found %s " % (new_url) elif e.code == 503: print "[!] Service Unavailable %s " % (new_url) else: print "[?] Unknwon" # loop over each identified valid directory and recurse into it for new_url in found: test_dir(new_url, depth, max_depth, dir_list)
def echo_client(server_address, messages): log = logging.getLogger('echo_client') # The certificate is created with pymotw.com as the hostname, # which will not match when the example code runs # elsewhere, so disable hostname verification. ssl_context = ssl.create_default_context( ssl.Purpose.SERVER_AUTH, ) ssl_context.check_hostname = False ssl_context.load_verify_locations('pymotw.crt') log.debug('connecting to {} port {}'.format(*server_address)) reader, writer = await asyncio.open_connection( *server_address, ssl=ssl_context) # This could be writer.writelines() except that # would make it harder to show each part of the message # being sent. for msg in messages: writer.write(msg) log.debug('sending {!r}'.format(msg)) # SSL does not support EOF, so send a null byte to indicate # the end of the message. writer.write(b'\x00') await writer.drain() log.debug('waiting for response') while True: data = await reader.read(128) if data: log.debug('received {!r}'.format(data)) else: log.debug('closing') writer.close() return
def _test_create_ssl_connection(self, httpd, create_connection, check_sockname=True): conn_fut = create_connection(ssl=test_utils.dummy_ssl_context()) self._basetest_create_ssl_connection(conn_fut, check_sockname) # ssl.Purpose was introduced in Python 3.4 if hasattr(ssl, 'Purpose'): def _dummy_ssl_create_context(purpose=ssl.Purpose.SERVER_AUTH, *, cafile=None, capath=None, cadata=None): """ A ssl.create_default_context() replacement that doesn't enable cert validation. """ self.assertEqual(purpose, ssl.Purpose.SERVER_AUTH) return test_utils.dummy_ssl_context() # With ssl=True, ssl.create_default_context() should be called with mock.patch('ssl.create_default_context', side_effect=_dummy_ssl_create_context) as m: conn_fut = create_connection(ssl=True) self._basetest_create_ssl_connection(conn_fut, check_sockname) self.assertEqual(m.call_count, 1) # With the real ssl.create_default_context(), certificate # validation will fail with self.assertRaises(ssl.SSLError) as cm: conn_fut = create_connection(ssl=True) # Ignore the "SSL handshake failed" log in debug mode with test_utils.disable_logger(): self._basetest_create_ssl_connection(conn_fut, check_sockname) self.assertEqual(cm.exception.reason, 'CERTIFICATE_VERIFY_FAILED')