我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用ssl.SSLSocket()。
def test_context(self): self.client.quit() ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1) self.assertRaises(ValueError, ftplib.FTP_TLS, keyfile=CERTFILE, context=ctx) self.assertRaises(ValueError, ftplib.FTP_TLS, certfile=CERTFILE, context=ctx) self.assertRaises(ValueError, ftplib.FTP_TLS, certfile=CERTFILE, keyfile=CERTFILE, context=ctx) self.client = ftplib.FTP_TLS(context=ctx, timeout=10) self.client.connect(self.server.host, self.server.port) self.assertNotIsInstance(self.client.sock, ssl.SSLSocket) self.client.auth() self.assertIs(self.client.sock.context, ctx) self.assertIsInstance(self.client.sock, ssl.SSLSocket) self.client.prot_p() with self.client.transfercmd('list') as sock: self.assertIs(sock.context, ctx) self.assertIsInstance(sock, ssl.SSLSocket)
def retrlines(self, cmd, callback = None): if callback is None: callback = print_line resp = self.sendcmd('TYPE A') conn = self.transfercmd(cmd) fp = conn.makefile('rb') try: while 1: line = fp.readline() if self.debugging > 2: print '*retr*', repr(line) if not line: break if line[-2:] == CRLF: line = line[:-2] elif line[-1:] == '\n': line = line[:-1] callback(line) # shutdown ssl layer if isinstance(conn, ssl.SSLSocket): conn.unwrap() finally: fp.close() conn.close() return self.voidresp()
def ssl_wrap_socket(socket, ssl_options, server_hostname=None, **kwargs): """Returns an ``ssl.SSLSocket`` wrapping the given socket. ``ssl_options`` may be either an `ssl.SSLContext` object or a dictionary (as accepted by `ssl_options_to_context`). Additional keyword arguments are passed to ``wrap_socket`` (either the `~ssl.SSLContext` method or the `ssl` module function as appropriate). """ context = ssl_options_to_context(ssl_options) if hasattr(ssl, 'SSLContext') and isinstance(context, ssl.SSLContext): if server_hostname is not None and getattr(ssl, 'HAS_SNI'): # Python doesn't have server-side SNI support so we can't # really unittest this, but it can be manually tested with # python3.2 -m tornado.httpclient https://sni.velox.ch return context.wrap_socket(socket, server_hostname=server_hostname, **kwargs) else: return context.wrap_socket(socket, **kwargs) else: return ssl.wrap_socket(socket, **dict(context, **kwargs))
def retrlines(self, cmd, callback = None): if callback is None: callback = print_line resp = self.sendcmd('TYPE A') conn = self.transfercmd(cmd) fp = conn.makefile('rb') try: while 1: line = fp.readline(self.maxline + 1) if len(line) > self.maxline: raise Error("got more than %d bytes" % self.maxline) if self.debugging > 2: print '*retr*', repr(line) if not line: break if line[-2:] == CRLF: line = line[:-2] elif line[-1:] == '\n': line = line[:-1] callback(line) # shutdown ssl layer if isinstance(conn, ssl.SSLSocket): conn.unwrap() finally: fp.close() conn.close() return self.voidresp()
def testHttpsContext(self): client = httplib2.Http(ca_certs=self.ca_certs_path) # Establish connection to local server client.request('https://localhost:%d/' % (self.port)) # Verify that connection uses a TLS context with the correct hostname conn = client.connections['https:localhost:%d' % self.port] self.assertIsInstance(conn.sock, ssl.SSLSocket) self.assertTrue(hasattr(conn.sock, 'context')) self.assertIsInstance(conn.sock.context, ssl.SSLContext) self.assertTrue(conn.sock.context.check_hostname) self.assertEqual(conn.sock.server_hostname, 'localhost') self.assertEqual(conn.sock.context.verify_mode, ssl.CERT_REQUIRED) self.assertEqual(conn.sock.context.protocol, ssl.PROTOCOL_SSLv23)
def ssl_wrap_socket(cls, s, ssl_options, server_hostname=None, **kwargs): """Returns an ``ssl.SSLSocket`` wrapping the given socket. ``ssl_options`` may be either a dictionary (as accepted by `ssl_options_to_context`) or an `ssl.SSLContext` object. Additional keyword arguments are passed to ``wrap_socket`` (either the `~ssl.SSLContext` method or the `ssl` module function as appropriate). """ context = ssl_options_to_context(ssl_options) if hasattr(ssl, 'SSLContext') and isinstance(context, ssl.SSLContext): if server_hostname is not None and getattr(ssl, 'HAS_SNI'): # Python doesn't have server-side SNI support so we can't # really unittest this, but it can be manually tested with # python3.2 -m tornado.httpclient https://sni.velox.ch return context.wrap_socket(s, server_hostname=server_hostname, **kwargs) else: return context.wrap_socket(s, **kwargs) else: return ssl.wrap_socket(s, **dict(context, **kwargs))
def auth(self): '''Set up secure control connection by using TLS/SSL.''' if isinstance(self.sock, ssl.SSLSocket): raise ValueError("Already using TLS") if self.ssl_version == ssl.PROTOCOL_TLSv1: resp = self.voidcmd('AUTH TLS') else: resp = self.voidcmd('AUTH SSL') if self.context is not None: self.sock = self.context.wrap_socket(self.sock) else: self.sock = ssl.wrap_socket(self.sock, self.keyfile, self.certfile, ssl_version=self.ssl_version) self.file = self.sock.makefile(mode='r', encoding=self.encoding) return resp
def retrlines(self, cmd, callback = None): if callback is None: callback = print_line resp = self.sendcmd('TYPE A') conn = self.transfercmd(cmd) fp = conn.makefile('r', encoding=self.encoding) try: while 1: line = fp.readline() if self.debugging > 2: print('*retr*', repr(line)) if not line: break if line[-2:] == CRLF: line = line[:-2] elif line[-1:] == '\n': line = line[:-1] callback(line) # shutdown ssl layer if isinstance(conn, ssl.SSLSocket): conn.unwrap() finally: fp.close() conn.close() return self.voidresp()
def test_starttls(self): file = self.server.file sock = self.server.sock try: self.server.starttls() except nntplib.NNTPPermanentError: self.skipTest("STARTTLS not supported by server.") else: # Check that the socket and internal pseudo-file really were # changed. self.assertNotEqual(file, self.server.file) self.assertNotEqual(sock, self.server.sock) # Check that the new socket really is an SSL one self.assertIsInstance(self.server.sock, ssl.SSLSocket) # Check that trying starttls when it's already active fails. self.assertRaises(ValueError, self.server.starttls)
def test_context(self): ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1) self.assertRaises(ValueError, poplib.POP3_SSL, self.server.host, self.server.port, keyfile=CERTFILE, context=ctx) self.assertRaises(ValueError, poplib.POP3_SSL, self.server.host, self.server.port, certfile=CERTFILE, context=ctx) self.assertRaises(ValueError, poplib.POP3_SSL, self.server.host, self.server.port, keyfile=CERTFILE, certfile=CERTFILE, context=ctx) self.client.quit() self.client = poplib.POP3_SSL(self.server.host, self.server.port, context=ctx) self.assertIsInstance(self.client.sock, ssl.SSLSocket) self.assertIs(self.client.sock.context, ctx) self.assertTrue(self.client.noop().startswith(b'+OK'))
def test_data_connection(self): # clear text with self.client.transfercmd('list') as sock: self.assertNotIsInstance(sock, ssl.SSLSocket) self.assertEqual(self.client.voidresp(), "226 transfer complete") # secured, after PROT P self.client.prot_p() with self.client.transfercmd('list') as sock: self.assertIsInstance(sock, ssl.SSLSocket) self.assertEqual(self.client.voidresp(), "226 transfer complete") # PROT C is issued, the connection must be in cleartext again self.client.prot_c() with self.client.transfercmd('list') as sock: self.assertNotIsInstance(sock, ssl.SSLSocket) self.assertEqual(self.client.voidresp(), "226 transfer complete")
def test_data_connection(self): # clear text sock = self.client.transfercmd('list') self.assertNotIsInstance(sock, ssl.SSLSocket) sock.close() self.assertEqual(self.client.voidresp(), "226 transfer complete") # secured, after PROT P self.client.prot_p() sock = self.client.transfercmd('list') self.assertIsInstance(sock, ssl.SSLSocket) sock.close() self.assertEqual(self.client.voidresp(), "226 transfer complete") # PROT C is issued, the connection must be in cleartext again self.client.prot_c() sock = self.client.transfercmd('list') self.assertNotIsInstance(sock, ssl.SSLSocket) sock.close() self.assertEqual(self.client.voidresp(), "226 transfer complete")
def test_context(self): self.client.quit() ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1) self.assertRaises(ValueError, ftplib.FTP_TLS, keyfile=CERTFILE, context=ctx) self.assertRaises(ValueError, ftplib.FTP_TLS, certfile=CERTFILE, context=ctx) self.assertRaises(ValueError, ftplib.FTP_TLS, certfile=CERTFILE, keyfile=CERTFILE, context=ctx) self.client = ftplib.FTP_TLS(context=ctx, timeout=TIMEOUT) self.client.connect(self.server.host, self.server.port) self.assertNotIsInstance(self.client.sock, ssl.SSLSocket) self.client.auth() self.assertIs(self.client.sock.context, ctx) self.assertIsInstance(self.client.sock, ssl.SSLSocket) self.client.prot_p() sock = self.client.transfercmd('list') try: self.assertIs(sock.context, ctx) self.assertIsInstance(sock, ssl.SSLSocket) finally: sock.close()
def test_context(self): self.client.quit() ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1) self.assertRaises(ValueError, ftplib.FTP_TLS, keyfile=CERTFILE, context=ctx) self.assertRaises(ValueError, ftplib.FTP_TLS, certfile=CERTFILE, context=ctx) self.assertRaises(ValueError, ftplib.FTP_TLS, certfile=CERTFILE, keyfile=CERTFILE, context=ctx) self.client = ftplib.FTP_TLS(context=ctx, timeout=2) self.client.connect(self.server.host, self.server.port) self.assertNotIsInstance(self.client.sock, ssl.SSLSocket) self.client.auth() self.assertIs(self.client.sock.context, ctx) self.assertIsInstance(self.client.sock, ssl.SSLSocket) self.client.prot_p() with self.client.transfercmd('list') as sock: self.assertIs(sock.context, ctx) self.assertIsInstance(sock, ssl.SSLSocket)
def ssl_wrap_socket(socket, ssl_options, server_hostname=None, **kwargs): """Returns an ``ssl.SSLSocket`` wrapping the given socket. ``ssl_options`` may be either a dictionary (as accepted by `ssl_options_to_context`) or an `ssl.SSLContext` object. Additional keyword arguments are passed to ``wrap_socket`` (either the `~ssl.SSLContext` method or the `ssl` module function as appropriate). """ context = ssl_options_to_context(ssl_options) if hasattr(ssl, 'SSLContext') and isinstance(context, ssl.SSLContext): if server_hostname is not None and getattr(ssl, 'HAS_SNI'): # Python doesn't have server-side SNI support so we can't # really unittest this, but it can be manually tested with # python3.2 -m tornado.httpclient https://sni.velox.ch return context.wrap_socket(socket, server_hostname=server_hostname, **kwargs) else: return context.wrap_socket(socket, **kwargs) else: return ssl.wrap_socket(socket, **dict(context, **kwargs))
def test_context(self): self.client.quit() ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1) self.assertRaises(ValueError, ftplib.FTP_TLS, keyfile=CERTFILE, context=ctx) self.assertRaises(ValueError, ftplib.FTP_TLS, certfile=CERTFILE, context=ctx) self.assertRaises(ValueError, ftplib.FTP_TLS, certfile=CERTFILE, keyfile=CERTFILE, context=ctx) self.client = ftplib.FTP_TLS(context=ctx, timeout=TIMEOUT) self.client.connect(self.server.host, self.server.port) self.assertNotIsInstance(self.client.sock, ssl.SSLSocket) self.client.auth() self.assertIs(self.client.sock.context, ctx) self.assertIsInstance(self.client.sock, ssl.SSLSocket) self.client.prot_p() with self.client.transfercmd('list') as sock: self.assertIs(sock.context, ctx) self.assertIsInstance(sock, ssl.SSLSocket)
def test_prot(self): self.client.login(secure=False) msg = "503 PROT not allowed on insecure control connection." self.assertRaisesWithMsg(ftplib.error_perm, msg, self.client.sendcmd, 'prot p') self.client.login(secure=True) # secured self.client.prot_p() sock = self.client.transfercmd('list') with contextlib.closing(sock): while True: if not sock.recv(1024): self.client.voidresp() break self.assertTrue(isinstance(sock, ssl.SSLSocket)) # unsecured self.client.prot_c() sock = self.client.transfercmd('list') with contextlib.closing(sock): while True: if not sock.recv(1024): self.client.voidresp() break self.assertFalse(isinstance(sock, ssl.SSLSocket))
def ssl_wrap_socket(socket, ssl_options, server_hostname=None, **kwargs): """Returns an ``ssl.SSLSocket`` wrapping the given socket. ``ssl_options`` may be either an `ssl.SSLContext` object or a dictionary (as accepted by `ssl_options_to_context`). Additional keyword arguments are passed to ``wrap_socket`` (either the `~ssl.SSLContext` method or the `ssl` module function as appropriate). """ context = ssl_options_to_context(ssl_options) if hasattr(ssl, 'SSLContext') and isinstance(context, ssl.SSLContext): if server_hostname is not None and getattr(ssl, 'HAS_SNI'): # Python doesn't have server-side SNI support so we can't # really unittest this, but it can be manually tested with # python3.2 -m tornado.httpclient https://sni.velox.ch return context.wrap_socket(socket, server_hostname=server_hostname, **kwargs) else: return context.wrap_socket(socket, **kwargs) else: return ssl.wrap_socket(socket, **dict(context, **kwargs)) # type: ignore