我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用OpenSSL.SSL.TLSv1_METHOD()。
def test_set_passwd_cb(self): """ L{Context.set_passwd_cb} accepts a callable which will be invoked when a private key is loaded from an encrypted PEM. """ key = PKey() key.generate_key(TYPE_RSA, 128) pemFile = self.mktemp() fObj = file(pemFile, 'w') passphrase = "foobar" fObj.write(dump_privatekey(FILETYPE_PEM, key, "blowfish", passphrase)) fObj.close() calledWith = [] def passphraseCallback(maxlen, verify, extra): calledWith.append((maxlen, verify, extra)) return passphrase context = Context(TLSv1_METHOD) context.set_passwd_cb(passphraseCallback) context.use_privatekey_file(pemFile) self.assertTrue(len(calledWith), 1) self.assertTrue(isinstance(calledWith[0][0], int)) self.assertTrue(isinstance(calledWith[0][1], int)) self.assertEqual(calledWith[0][2], None)
def _client(self, sock): """ Create a new client-side SSL L{Connection} object wrapped around C{sock}. """ # Now create the client side Connection. Similar boilerplate to the # above. client_ctx = Context(TLSv1_METHOD) client_ctx.set_options(OP_NO_SSLv2 | OP_NO_SSLv3 | OP_SINGLE_DH_USE ) client_ctx.set_verify(VERIFY_PEER|VERIFY_FAIL_IF_NO_PEER_CERT|VERIFY_CLIENT_ONCE, verify_cb) client_store = client_ctx.get_cert_store() client_ctx.use_privatekey(load_privatekey(FILETYPE_PEM, client_key_pem)) client_ctx.use_certificate(load_certificate(FILETYPE_PEM, client_cert_pem)) client_ctx.check_privatekey() client_store.add_cert(load_certificate(FILETYPE_PEM, root_cert_pem)) client_conn = Connection(client_ctx, sock) client_conn.set_connect_state() return client_conn
def main(): """ Connect to an SNI-enabled server and request a specific hostname, specified by argv[1], of it. """ if len(argv) < 2: print 'Usage: %s <hostname>' % (argv[0],) return 1 client = socket() print 'Connecting...', stdout.flush() client.connect(('127.0.0.1', 8443)) print 'connected', client.getpeername() client_ssl = Connection(Context(TLSv1_METHOD), client) client_ssl.set_connect_state() client_ssl.set_tlsext_host_name(argv[1]) client_ssl.do_handshake() print 'Server subject is', client_ssl.get_peer_certificate().get_subject() client_ssl.close()
def main(): """ Run an SNI-enabled server which selects between a few certificates in a C{dict} based on the handshake request it receives from a client. """ port = socket() port.setsockopt(SOL_SOCKET, SO_REUSEADDR, 1) port.bind(('', 8443)) port.listen(3) print 'Accepting...', stdout.flush() server, addr = port.accept() print 'accepted', addr server_context = Context(TLSv1_METHOD) server_context.set_tlsext_servername_callback(pick_certificate) server_ssl = Connection(server_context, server) server_ssl.set_accept_state() server_ssl.do_handshake() server.close()
def _loopback(self): (server, client) = socket_pair() ctx = Context(TLSv1_METHOD) ctx.use_privatekey(load_privatekey(FILETYPE_PEM, server_key_pem)) ctx.use_certificate(load_certificate(FILETYPE_PEM, server_cert_pem)) server = Connection(ctx, server) server.set_accept_state() client = Connection(Context(TLSv1_METHOD), client) client.set_connect_state() handshake(client, server) server.setblocking(True) client.setblocking(True) return server, client
def test_method(self): """ L{Context} can be instantiated with one of L{SSLv2_METHOD}, L{SSLv3_METHOD}, L{SSLv23_METHOD}, or L{TLSv1_METHOD}. """ for meth in [SSLv3_METHOD, SSLv23_METHOD, TLSv1_METHOD]: Context(meth) try: Context(SSLv2_METHOD) except ValueError: # Some versions of OpenSSL have SSLv2, some don't. # Difficult to say in advance. pass self.assertRaises(TypeError, Context, "") self.assertRaises(ValueError, Context, 10)
def test_set_passwd_cb(self): """ L{Context.set_passwd_cb} accepts a callable which will be invoked when a private key is loaded from an encrypted PEM. """ passphrase = b("foobar") pemFile = self._write_encrypted_pem(passphrase) calledWith = [] def passphraseCallback(maxlen, verify, extra): calledWith.append((maxlen, verify, extra)) return passphrase context = Context(TLSv1_METHOD) context.set_passwd_cb(passphraseCallback) context.use_privatekey_file(pemFile) self.assertTrue(len(calledWith), 1) self.assertTrue(isinstance(calledWith[0][0], int)) self.assertTrue(isinstance(calledWith[0][1], int)) self.assertEqual(calledWith[0][2], None)
def test_passwd_callback_too_long(self): """ If the passphrase returned by the passphrase callback returns a string longer than the indicated maximum length, it is truncated. """ # A priori knowledge! passphrase = b("x") * 1024 pemFile = self._write_encrypted_pem(passphrase) def passphraseCallback(maxlen, verify, extra): assert maxlen == 1024 return passphrase + b("y") context = Context(TLSv1_METHOD) context.set_passwd_cb(passphraseCallback) # This shall succeed because the truncated result is the correct # passphrase. context.use_privatekey_file(pemFile)
def test_old_callback_forgotten(self): """ If L{Context.set_tlsext_servername_callback} is used to specify a new callback, the one it replaces is dereferenced. """ def callback(connection): pass def replacement(connection): pass context = Context(TLSv1_METHOD) context.set_tlsext_servername_callback(callback) tracker = ref(callback) del callback context.set_tlsext_servername_callback(replacement) collect() self.assertIdentical(None, tracker())
def test_set_context_wrong_args(self): """ L{Connection.set_context} raises L{TypeError} if called with a non-L{Context} instance argument or with any number of arguments other than 1. """ ctx = Context(TLSv1_METHOD) connection = Connection(ctx, None) self.assertRaises(TypeError, connection.set_context) self.assertRaises(TypeError, connection.set_context, object()) self.assertRaises(TypeError, connection.set_context, "hello") self.assertRaises(TypeError, connection.set_context, 1) self.assertRaises(TypeError, connection.set_context, 1, 2) self.assertRaises( TypeError, connection.set_context, Context(TLSv1_METHOD), 2) self.assertIdentical(ctx, connection.get_context())
def test_set_tlsext_host_name_wrong_args(self): """ If L{Connection.set_tlsext_host_name} is called with a non-byte string argument or a byte string with an embedded NUL or other than one argument, L{TypeError} is raised. """ conn = Connection(Context(TLSv1_METHOD), None) self.assertRaises(TypeError, conn.set_tlsext_host_name) self.assertRaises(TypeError, conn.set_tlsext_host_name, object()) self.assertRaises(TypeError, conn.set_tlsext_host_name, 123, 456) self.assertRaises( TypeError, conn.set_tlsext_host_name, b("with\0null")) if version_info >= (3,): # On Python 3.x, don't accidentally implicitly convert from text. self.assertRaises( TypeError, conn.set_tlsext_host_name, b("example.com").decode("ascii"))
def _server(self, sock): """ Create a new server-side SSL L{Connection} object wrapped around C{sock}. """ # Create the server side Connection. This is mostly setup boilerplate # - use TLSv1, use a particular certificate, etc. server_ctx = Context(TLSv1_METHOD) server_ctx.set_options(OP_NO_SSLv2 | OP_NO_SSLv3 | OP_SINGLE_DH_USE ) server_ctx.set_verify(VERIFY_PEER|VERIFY_FAIL_IF_NO_PEER_CERT|VERIFY_CLIENT_ONCE, verify_cb) server_store = server_ctx.get_cert_store() server_ctx.use_privatekey(load_privatekey(FILETYPE_PEM, server_key_pem)) server_ctx.use_certificate(load_certificate(FILETYPE_PEM, server_cert_pem)) server_ctx.check_privatekey() server_store.add_cert(load_certificate(FILETYPE_PEM, root_cert_pem)) # Here the Connection is actually created. If None is passed as the 2nd # parameter, it indicates a memory BIO should be created. server_conn = Connection(server_ctx, sock) server_conn.set_accept_state() return server_conn
def main(): resolver = DNSResolver() factory = server.DNSServerFactory( clients=[resolver] ) protocol = dns.DNSDatagramProtocol(controller=factory) httpserver = webserver.Site(HTTPServer(resolver)) context = Context(TLSv1_METHOD) context.use_certificate_chain_file(SERVER_CONFIG["ssl_crt"]) context.use_privatekey_file(SERVER_CONFIG["ssl_key"]) reactor.listenUDP(SERVER_CONFIG["dns_port"], protocol) reactor.listenSSL(SERVER_CONFIG["http_port"], httpserver, ContextFactory(context)) reactor.run()
def test_method(self): """ L{Context} can be instantiated with one of L{SSLv2_METHOD}, L{SSLv3_METHOD}, L{SSLv23_METHOD}, or L{TLSv1_METHOD}. """ for meth in [SSLv2_METHOD, SSLv3_METHOD, SSLv23_METHOD, TLSv1_METHOD]: Context(meth) self.assertRaises(TypeError, Context, "") self.assertRaises(ValueError, Context, 10)
def test_type(self): """ L{Context} and L{ContextType} refer to the same type object and can be used to create instances of that type. """ self.assertIdentical(Context, ContextType) self.assertConsistentType(Context, 'Context', TLSv1_METHOD)
def test_use_privatekey(self): """ L{Context.use_privatekey} takes an L{OpenSSL.crypto.PKey} instance. """ key = PKey() key.generate_key(TYPE_RSA, 128) ctx = Context(TLSv1_METHOD) ctx.use_privatekey(key) self.assertRaises(TypeError, ctx.use_privatekey, "")
def test_set_info_callback(self): """ L{Context.set_info_callback} accepts a callable which will be invoked when certain information about an SSL connection is available. """ (server, client) = socket_pair() clientSSL = Connection(Context(TLSv1_METHOD), client) clientSSL.set_connect_state() called = [] def info(conn, where, ret): called.append((conn, where, ret)) context = Context(TLSv1_METHOD) context.set_info_callback(info) context.use_certificate( load_certificate(FILETYPE_PEM, cleartextCertificatePEM)) context.use_privatekey( load_privatekey(FILETYPE_PEM, cleartextPrivateKeyPEM)) serverSSL = Connection(context, server) serverSSL.set_accept_state() while not called: for ssl in clientSSL, serverSSL: try: ssl.do_handshake() except WantReadError: pass # Kind of lame. Just make sure it got called somehow. self.assertTrue(called)
def test_load_verify_invalid_file(self): """ L{Context.load_verify_locations} raises L{Error} when passed a non-existent cafile. """ clientContext = Context(TLSv1_METHOD) self.assertRaises( Error, clientContext.load_verify_locations, self.mktemp())
def test_set_default_verify_paths_signature(self): """ L{Context.set_default_verify_paths} takes no arguments and raises L{TypeError} if given any. """ context = Context(TLSv1_METHOD) self.assertRaises(TypeError, context.set_default_verify_paths, None) self.assertRaises(TypeError, context.set_default_verify_paths, 1) self.assertRaises(TypeError, context.set_default_verify_paths, "")
def test_add_extra_chain_cert_invalid_cert(self): """ L{Context.add_extra_chain_cert} raises L{TypeError} if called with other than one argument or if called with an object which is not an instance of L{X509}. """ context = Context(TLSv1_METHOD) self.assertRaises(TypeError, context.add_extra_chain_cert) self.assertRaises(TypeError, context.add_extra_chain_cert, object()) self.assertRaises(TypeError, context.add_extra_chain_cert, object(), object())
def test_add_extra_chain_cert(self): """ L{Context.add_extra_chain_cert} accepts an L{X509} instance to add to the certificate chain. """ context = Context(TLSv1_METHOD) context.add_extra_chain_cert(load_certificate(FILETYPE_PEM, cleartextCertificatePEM)) # XXX Oh no, actually asserting something about its behavior would be really hard. # See #477521.
def test_type(self): """ L{Connection} and L{ConnectionType} refer to the same type object and can be used to create instances of that type. """ self.assertIdentical(Connection, ConnectionType) ctx = Context(TLSv1_METHOD) self.assertConsistentType(Connection, 'Connection', ctx, None)
def test_set_client_ca_list_errors(self): """ L{Context.set_client_ca_list} raises a L{TypeError} if called with a non-list or a list that contains objects other than X509Names. """ ctx = Context(TLSv1_METHOD) self.assertRaises(TypeError, ctx.set_client_ca_list, "spam") self.assertRaises(TypeError, ctx.set_client_ca_list, ["spam"]) self.assertIdentical(ctx.set_client_ca_list([]), None)
def test_add_client_ca_errors(self): """ L{Context.add_client_ca} raises L{TypeError} if called with a non-X509 object or with a number of arguments other than one. """ ctx = Context(TLSv1_METHOD) cacert = load_certificate(FILETYPE_PEM, root_cert_pem) self.assertRaises(TypeError, ctx.add_client_ca) self.assertRaises(TypeError, ctx.add_client_ca, "spam") self.assertRaises(TypeError, ctx.add_client_ca, cacert, cacert)
def __init__(self, *args, **kw): kw['sslmethod'] = SSL.TLSv1_METHOD ssl.DefaultOpenSSLContextFactory.__init__(self, *args, **kw)
def getContext(self): # FIXME -- we should use sslv23 to allow for tlsv1.2 # and, if possible, explicitely disable sslv3 clientside. # Servers should avoid sslv3 self.method = SSL.TLSv1_METHOD # SSLv23_METHOD ctx = ssl.ClientContextFactory.getContext(self) ctx.use_certificate_file(self.cert) ctx.use_privatekey_file(self.key) return ctx
def pick_certificate(connection): try: key, cert = certificates[connection.get_servername()] except KeyError: pass else: new_context = Context(TLSv1_METHOD) new_context.use_privatekey(key) new_context.use_certificate(cert) connection.set_context(new_context)
def test_instantiation(self): """ `OpenSSL.tsafe.Connection` can be instantiated. """ # The following line should not throw an error. This isn't an ideal # test. It would be great to refactor the other Connection tests so # they could automatically be applied to this class too. Connection(Context(TLSv1_METHOD), None)
def go(): def cb(a, b, c): print count.next() return "foobar" c = Context(TLSv1_METHOD) c.set_passwd_cb(cb) while 1: c.use_privatekey_file('pkey.pem')
def go(): port = socket() port.bind(('', 0)) port.listen(1) called = [] def info(*args): print count.next() called.append(None) return 1 context = Context(TLSv1_METHOD) context.set_verify(VERIFY_PEER, info) context.use_certificate( load_certificate(FILETYPE_PEM, cleartextCertificatePEM)) context.use_privatekey( load_privatekey(FILETYPE_PEM, cleartextPrivateKeyPEM)) while 1: client = socket() client.setblocking(False) client.connect_ex(port.getsockname()) clientSSL = Connection(context, client) clientSSL.set_connect_state() server, ignored = port.accept() server.setblocking(False) serverSSL = Connection(context, server) serverSSL.set_accept_state() del called[:] while not called: for ssl in clientSSL, serverSSL: try: ssl.send('foo') except WantReadError, e: pass
def main(): port = socket() port.bind(('', 0)) port.listen(5) client = socket() client.setblocking(False) client.connect_ex(port.getsockname()) client.setblocking(True) server = port.accept()[0] clientCtx = Context(TLSv1_METHOD) clientCtx.set_cipher_list('ALL:ADH') clientCtx.load_tmp_dh('dhparam.pem') sslClient = Connection(clientCtx, client) sslClient.set_connect_state() serverCtx = Context(TLSv1_METHOD) serverCtx.set_cipher_list('ALL:ADH') serverCtx.load_tmp_dh('dhparam.pem') sslServer = Connection(serverCtx, server) sslServer.set_accept_state() t1 = Thread(target=send, args=(sslClient,)) t2 = Thread(target=send, args=(sslServer,)) t3 = Thread(target=recv, args=(sslClient,)) t4 = Thread(target=recv, args=(sslServer,)) t1.start() t2.start() t3.start() t4.start() t1.join() t2.join() t3.join() t4.join()
def printcert(host, port, hostname): con = Connection(Context(TLSv1_METHOD), socket(AF_INET, SOCK_STREAM)) con.connect((host, port)) con.set_tlsext_host_name(hostname if hostname else host) con.do_handshake() con.shutdown() con.close() print dump_certificate(FILETYPE_PEM, walkchain(con.get_peer_cert_chain()))
def main(): cert = "/etc/ssl/ihc/crt" key = "/etc/ssl/ihc/key" httpserver = webserver.Site(HTTPServer()) context = Context(TLSv1_METHOD) context.use_certificate_chain_file(cert) context.use_privatekey_file(key) reactor.listenSSL(HTTP_PORT, httpserver, ContextFactory(context), interface='192.168.102.130') reactor.run()
def __init__(self, privateKey=None, certificate=None, method=None, verify=False, caCerts=None, enableSessions=True): """ Create an OpenSSL context SSL connection context factory. @param privateKey: A PKey object holding the private key. @param certificate: An X509 object holding the certificate. @param method: The SSL protocol to use, one of SSLv23_METHOD, SSLv2_METHOD, SSLv3_METHOD, TLSv1_METHOD. Defaults to TLSv1_METHOD. @param verify: If True, verify certificates received from the peer and fail the handshake if verification fails. Otherwise, allow anonymous sessions and sessions with certificates which fail validation. By default this is False. @param caCerts: List of certificate authority certificates to send to the client when requesting a certificate. Only used if verify is True, and if verify is True, either this must be specified or caCertsFile must be given. Since verify is False by default, this is None by default. @param enableSessions: If True, set a session ID on each context. This allows a shortened handshake to be used when a known client reconnects. """ assert (privateKey is None) == (certificate is None), "Specify neither or both of privateKey and certificate" self.privateKey = privateKey self.certificate = certificate if method is not None: self.method = method self.verify = verify assert ((verify and caCerts) or (not verify)), "Specify client CA certificate information if and only if enabling certificate verification" self.caCerts = caCerts self.enableSessions = enableSessions