我们从Python开源项目中,提取了以下23个代码示例,用于说明如何使用eventlet.wrap_ssl()。
def test_012_ssl_server(self): def wsgi_app(environ, start_response): start_response('200 OK', {}) return [environ['wsgi.input'].read()] certificate_file = os.path.join(os.path.dirname(__file__), 'test_server.crt') private_key_file = os.path.join(os.path.dirname(__file__), 'test_server.key') server_sock = eventlet.wrap_ssl(eventlet.listen(('localhost', 0)), certfile=certificate_file, keyfile=private_key_file, server_side=True) self.spawn_server(sock=server_sock, site=wsgi_app) sock = eventlet.connect(self.server_addr) sock = eventlet.wrap_ssl(sock) sock.write( b'POST /foo HTTP/1.1\r\nHost: localhost\r\n' b'Connection: close\r\nContent-length:3\r\n\r\nabc') result = recvall(sock) assert result.endswith(b'abc')
def test_013_empty_return(self): def wsgi_app(environ, start_response): start_response("200 OK", []) return [b""] certificate_file = os.path.join(os.path.dirname(__file__), 'test_server.crt') private_key_file = os.path.join(os.path.dirname(__file__), 'test_server.key') server_sock = eventlet.wrap_ssl(eventlet.listen(('localhost', 0)), certfile=certificate_file, keyfile=private_key_file, server_side=True) self.spawn_server(sock=server_sock, site=wsgi_app) sock = eventlet.connect(('localhost', server_sock.getsockname()[1])) sock = eventlet.wrap_ssl(sock) sock.write(b'GET /foo HTTP/1.1\r\nHost: localhost\r\nConnection: close\r\n\r\n') result = recvall(sock) assert result[-4:] == b'\r\n\r\n'
def handle(self, listener, client, addr): if self.cfg.is_ssl: client = eventlet.wrap_ssl(client, server_side=True, **self.cfg.ssl_options) super(EventletWorker, self).handle(listener, client, addr)
def test_ssl_sending_messages(self): s = eventlet.wrap_ssl(eventlet.listen(('localhost', 0)), certfile=tests.certificate_file, keyfile=tests.private_key_file, server_side=True) self.spawn_server(sock=s) connect = [ "GET /echo HTTP/1.1", "Upgrade: WebSocket", "Connection: Upgrade", "Host: %s:%s" % self.server_addr, "Origin: http://%s:%s" % self.server_addr, "Sec-WebSocket-Protocol: ws", "Sec-WebSocket-Key1: 4 @1 46546xW%0l 1 5", "Sec-WebSocket-Key2: 12998 5 Y3 1 .P00", ] sock = eventlet.wrap_ssl(eventlet.connect(self.server_addr)) sock.sendall(six.b('\r\n'.join(connect) + '\r\n\r\n^n:ds[4U')) first_resp = b'' while b'\r\n\r\n' not in first_resp: first_resp += sock.recv() print('resp now:') print(first_resp) # make sure it sets the wss: protocol on the location header loc_line = [x for x in first_resp.split(b"\r\n") if x.lower().startswith(b'sec-websocket-location')][0] expect_wss = ('wss://%s:%s' % self.server_addr).encode() assert expect_wss in loc_line, "Expecting wss protocol in location: %s" % loc_line sock.sendall(b'\x00hello\xFF') result = sock.recv(1024) self.assertEqual(result, b'\x00hello\xff') sock.sendall(b'\x00start') eventlet.sleep(0.001) sock.sendall(b' end\xff') result = sock.recv(1024) self.assertEqual(result, b'\x00start end\xff') greenio.shutdown_safe(sock) sock.close() eventlet.sleep(0.01)
def test_017_ssl_zeroreturnerror(self): def server(sock, site, log): try: serv = wsgi.Server(sock, sock.getsockname(), site, log) client_socket, addr = sock.accept() serv.process_request([addr, client_socket, wsgi.STATE_IDLE]) return True except Exception: traceback.print_exc() return False def wsgi_app(environ, start_response): start_response('200 OK', []) return [environ['wsgi.input'].read()] certificate_file = os.path.join(os.path.dirname(__file__), 'test_server.crt') private_key_file = os.path.join(os.path.dirname(__file__), 'test_server.key') sock = eventlet.wrap_ssl( eventlet.listen(('localhost', 0)), certfile=certificate_file, keyfile=private_key_file, server_side=True) server_coro = eventlet.spawn(server, sock, wsgi_app, self.logfile) client = eventlet.connect(('localhost', sock.getsockname()[1])) client = eventlet.wrap_ssl(client) client.write(b'X') # non-empty payload so that SSL handshake occurs greenio.shutdown_safe(client) client.close() success = server_coro.wait() assert success
def test_028_ssl_handshake_errors(self): errored = [False] def server(sock): try: wsgi.server(sock=sock, site=hello_world, log=self.logfile) errored[0] = 'SSL handshake error caused wsgi.server to exit.' except greenthread.greenlet.GreenletExit: pass except Exception as e: errored[0] = 'SSL handshake error raised exception %s.' % e raise for data in ('', 'GET /non-ssl-request HTTP/1.0\r\n\r\n'): srv_sock = eventlet.wrap_ssl( eventlet.listen(('localhost', 0)), certfile=certificate_file, keyfile=private_key_file, server_side=True) addr = srv_sock.getsockname() g = eventlet.spawn_n(server, srv_sock) client = eventlet.connect(addr) if data: # send non-ssl request client.sendall(data.encode()) else: # close sock prematurely client.close() eventlet.sleep(0) # let context switch back to server assert not errored[0], errored[0] # make another request to ensure the server's still alive try: client = ssl.wrap_socket(eventlet.connect(addr)) client.write(b'GET / HTTP/1.0\r\nHost: localhost\r\n\r\n') result = recvall(client) assert result.startswith(b'HTTP'), result assert result.endswith(b'hello world') except ImportError: pass # TODO(openssl): should test with OpenSSL greenthread.kill(g)
def test_wrap_ssl(self): server = eventlet.wrap_ssl( eventlet.listen(('localhost', 0)), certfile=certificate_file, keyfile=private_key_file, server_side=True) port = server.getsockname()[1] def handle(sock, addr): sock.sendall(sock.recv(1024)) raise eventlet.StopServe() eventlet.spawn(eventlet.serve, server, handle) client = eventlet.wrap_ssl(eventlet.connect(('localhost', port))) client.sendall(b"echo") self.assertEqual(b"echo", client.recv(1024))