我们从Python开源项目中,提取了以下28个代码示例,用于说明如何使用twisted.internet.ssl.DefaultOpenSSLContextFactory()。
def run(self): ''' Called by twisted ''' # load initial config self.refresh_config() if self.config is None: logging.critical("cannot start due to error in config file") return # refresh and check status every event_period seconds self.refresh_task = task.LoopingCall(self.refresh_loop) refresh_deferred = self.refresh_task.start(self.config['event_period'], now=False) refresh_deferred.addErrback(errorCallback) # setup server for receiving blinded counts from the DC nodes and key shares from the SK nodes listen_port = self.config['listen_port'] key_path = self.config['key'] cert_path = self.config['cert'] ssl_context = ssl.DefaultOpenSSLContextFactory(key_path, cert_path) logging.info("Tally Server listening on port {}".format(listen_port)) reactor.listenSSL(listen_port, self, ssl_context) reactor.run()
def setUp(self): plainRoot = static.Data('not me', 'text/plain') tlsRoot = static.Data('me neither', 'text/plain') plainSite = server.Site(plainRoot, timeout=None) tlsSite = server.Site(tlsRoot, timeout=None) from twisted import test self.tlsPort = reactor.listenSSL(0, tlsSite, contextFactory=ssl.DefaultOpenSSLContextFactory( sibpath(test.__file__, 'server.pem'), sibpath(test.__file__, 'server.pem'), ), interface="127.0.0.1") self.plainPort = reactor.listenTCP(0, plainSite, interface="127.0.0.1") self.plainPortno = self.plainPort.getHost().port self.tlsPortno = self.tlsPort.getHost().port plainRoot.putChild('one', util.Redirect(self.getHTTPS('two'))) tlsRoot.putChild('two', util.Redirect(self.getHTTP('three'))) plainRoot.putChild('three', util.Redirect(self.getHTTPS('four'))) tlsRoot.putChild('four', static.Data('FOUND IT!', 'text/plain'))
def testOpenSSLBuffering(self): serverProto = self.serverProto = SingleLineServerProtocol() clientProto = self.clientProto = RecordingClientProtocol() server = protocol.ServerFactory() client = self.client = protocol.ClientFactory() server.protocol = lambda: serverProto client.protocol = lambda: clientProto client.buffer = [] sCTX = ssl.DefaultOpenSSLContextFactory(certPath, certPath) cCTX = ssl.ClientContextFactory() port = self.port = reactor.listenSSL(0, server, sCTX, interface='127.0.0.1') reactor.connectSSL('127.0.0.1', port.getHost().port, client, cCTX) i = 0 while i < 5000 and not client.buffer: i += 1 reactor.iterate() self.assertEquals(client.buffer, ["+OK <some crap>\r\n"])
def get_ssl_context(): """Construct an SSL context factory from the user's privatekey/cert. TODO: document set up for server operators. """ pkcdata = {} for x, y in zip(["ssl_private_key_location", "ssl_certificate_location"], ["key.pem", "cert.pem"]): if cs_single().config.get("SERVER", x) == "0": sslpath = os.path.join(cs_single().homedir, "ssl") if not os.path.exists(sslpath): print("No ssl configuration in home directory, please read " "installation instructions and try again.") sys.exit(0) pkcdata[x] = os.path.join(sslpath, y) else: pkcdata[x] = cs_single().config.get("SERVER", x) return ssl.DefaultOpenSSLContextFactory(pkcdata["ssl_private_key_location"], pkcdata["ssl_certificate_location"])
def setUp(self): plainRoot = Data(b'not me', 'text/plain') tlsRoot = Data(b'me neither', 'text/plain') plainSite = server.Site(plainRoot, timeout=None) tlsSite = server.Site(tlsRoot, timeout=None) self.tlsPort = reactor.listenSSL( 0, tlsSite, contextFactory=ssl.DefaultOpenSSLContextFactory( serverPEMPath, serverPEMPath), interface="127.0.0.1") self.plainPort = reactor.listenTCP(0, plainSite, interface="127.0.0.1") self.plainPortno = self.plainPort.getHost().port self.tlsPortno = self.tlsPort.getHost().port plainRoot.putChild(b'one', Redirect(self.getHTTPS('two'))) tlsRoot.putChild(b'two', Redirect(self.getHTTP('three'))) plainRoot.putChild(b'three', Redirect(self.getHTTPS('four'))) tlsRoot.putChild(b'four', Data(b'FOUND IT!', 'text/plain'))
def _parseSSL(factory, port, privateKey="server.pem", certKey=None, sslmethod=None, interface='', backlog=50): from twisted.internet import ssl if certKey is None: certKey = privateKey kw = {} if sslmethod is not None: kw['sslmethod'] = getattr(ssl.SSL, sslmethod) cf = ssl.DefaultOpenSSLContextFactory(privateKey, certKey, **kw) return ((int(port), factory, cf), {'interface': interface, 'backlog': int(backlog)})
def _listen(self, site): from twisted import test return reactor.listenSSL(0, site, contextFactory=ssl.DefaultOpenSSLContextFactory( sibpath(test.__file__, 'server.pem'), sibpath(test.__file__, 'server.pem'), ), interface="127.0.0.1")
def testSSL(self, ssl=ssl): pem = util.sibpath(__file__, 'server.pem') p = reactor.listenSSL(0, protocol.ServerFactory(), ssl.DefaultOpenSSLContextFactory(pem, pem)) portNo = p.getHost().port self.assertNotEqual(str(p).find(str(portNo)), -1, "%d not found in %s" % (portNo, p)) return p.stopListening()
def makeContextFactory(self, org, orgUnit, *args, **kwArgs): base = self.mktemp() generateCertificateFiles(base, org, orgUnit) serverCtxFactory = ssl.DefaultOpenSSLContextFactory( os.extsep.join((base, 'key')), os.extsep.join((base, 'cert')), *args, **kwArgs) return base, serverCtxFactory
def test_ssl_verification_positive(self): """ The client transport should complete an upload of messages to a host which provides SSL data which can be verified by the public key specified. """ resource = DataCollectingResource() context_factory = DefaultOpenSSLContextFactory(PRIVKEY, PUBKEY) port = reactor.listenSSL(0, server.Site(resource), context_factory, interface="127.0.0.1") self.ports.append(port) transport = HTTPTransport( None, "https://localhost:%d/" % (port.getHost().port,), PUBKEY) result = deferToThread(transport.exchange, "HI", computer_id="34", message_api="X.Y") def got_result(ignored): try: get_header = resource.request.requestHeaders.getRawHeaders except AttributeError: # For backwards compatibility with Twisted versions # without requestHeaders def get_header(header): return [resource.request.received_headers[header]] self.assertEqual(get_header("x-computer-id"), ["34"]) self.assertEqual( get_header("user-agent"), ["landscape-client/%s" % (VERSION,)]) self.assertEqual(get_header("x-message-api"), ["X.Y"]) self.assertEqual(bpickle.loads(resource.content), "HI") result.addCallback(got_result) return result
def test_ssl_verification_negative(self): """ If the SSL server provides a key which is not verified by the specified public key, then the client should immediately end the connection without uploading any message data. """ self.log_helper.ignore_errors(PyCurlError) r = DataCollectingResource() context_factory = DefaultOpenSSLContextFactory( BADPRIVKEY, BADPUBKEY) port = reactor.listenSSL(0, server.Site(r), context_factory, interface="127.0.0.1") self.ports.append(port) transport = HTTPTransport(None, "https://localhost:%d/" % (port.getHost().port,), pubkey=PUBKEY) result = deferToThread(transport.exchange, "HI", computer_id="34", message_api="X.Y") def got_result(ignored): self.assertIs(r.request, None) self.assertIs(r.content, None) self.assertTrue("server certificate verification failed" in self.logfile.getvalue()) result.addErrback(got_result) return result
def getContext(self): """Return a SSL.Context object. override in subclasses.""" ssl_context_factory = ssl.DefaultOpenSSLContextFactory(PARLAY_PATH+'/keys/broker.key', PARLAY_PATH+'/keys/broker.crt') # We only want to use 'High' and 'Medium' ciphers, not 'Weak' ones. We want *actual* security here. ssl_context = ssl_context_factory.getContext() # perfect forward secrecy ciphers ssl_context.set_cipher_list('EECDH+ECDSA+AESGCM EECDH+aRSA+AESGCM EECDH+ECDSA+SHA384 EECDH+ECDSA+SHA256 EECDH' + '+aRSA+SHA384 EECDH+aRSA+SHA256 EECDH+aRSA+RC4 EECDH EDH+aRSA RC4 !aNULL' + '!eNULL !LOW !3DES !MD5 !EXP !PSK !SRP !DSS') return ssl_context
def start_daemon(host, port, factory, usessl=False, sslkey=None, sslcert=None): if usessl: assert sslkey assert sslcert reactor.listenSSL( port, factory, ssl.DefaultOpenSSLContextFactory(sslkey, sslcert), interface=host) else: reactor.listenTCP(port, factory, interface=host)
def Start(): Globals.serverIP = "192.168.1.40" #MySQL Data Globals.dbHost = "127.0.0.1" Globals.dbUser = "user" Globals.dbPass = "pass" Globals.dbDatabase = "db" CheckMySqlConn() SSLInfo = ssl.DefaultOpenSSLContextFactory('crt/privkey.pem', 'crt/cacert.pem') factory = Factory() factory.protocol = GosRedirector.GOSRedirector reactor.listenSSL(42127, factory, SSLInfo) print("[SSL REACTOR] GOSREDIRECTOR STARTED [42127]") factory = Factory() factory.protocol = BlazeMain_Client.BLAZEHUB reactor.listenTCP(10041, factory) print("[TCP REACTOR] BLAZE CLIENT [10041]") factory = Factory() factory.protocol = BlazeMain_Server.BLAZEHUB reactor.listenTCP(10071, factory) print("[TCP REACTOR] BLAZE SERVER [10071]") sites = server.Site(Https.Simple()) reactor.listenSSL(443, sites, SSLInfo) print("[WEB REACTOR] Https [443]") reactor.run()
def _listen(self, site): return reactor.listenSSL( 0, site, contextFactory=ssl.DefaultOpenSSLContextFactory( serverPEMPath, serverPEMPath), interface="127.0.0.1")
def __init__(self, *args, **kw): kw['sslmethod'] = SSL.TLSv1_METHOD ssl.DefaultOpenSSLContextFactory.__init__(self, *args, **kw)
def setUp(self): # pyOpenSSL Context objects aren't introspectable enough. Pass in # an alternate context factory so we can inspect what is done to it. self.contextFactory = ssl.DefaultOpenSSLContextFactory( certPath, certPath, _contextFactory=FakeContext) self.context = self.contextFactory.getContext()
def test_method(self): """ L{ssl.DefaultOpenSSLContextFactory.getContext} returns an SSL context which can use SSLv3 or TLSv1 but not SSLv2. """ # SSLv23_METHOD allows SSLv2, SSLv3, or TLSv1 self.assertEqual(self.context._method, SSL.SSLv23_METHOD) # And OP_NO_SSLv2 disables the SSLv2 support. self.assertTrue(self.context._options & SSL.OP_NO_SSLv2) # Make sure SSLv3 and TLSv1 aren't disabled though. self.assertFalse(self.context._options & SSL.OP_NO_SSLv3) self.assertFalse(self.context._options & SSL.OP_NO_TLSv1)
def test_missingCertificateFile(self): """ Instantiating L{ssl.DefaultOpenSSLContextFactory} with a certificate filename which does not identify an existing file results in the initializer raising L{OpenSSL.SSL.Error}. """ self.assertRaises( SSL.Error, ssl.DefaultOpenSSLContextFactory, certPath, self.mktemp())
def test_missingPrivateKeyFile(self): """ Instantiating L{ssl.DefaultOpenSSLContextFactory} with a private key filename which does not identify an existing file results in the initializer raising L{OpenSSL.SSL.Error}. """ self.assertRaises( SSL.Error, ssl.DefaultOpenSSLContextFactory, self.mktemp(), certPath)