Python twisted.internet.ssl 模块,DefaultOpenSSLContextFactory() 实例源码

我们从Python开源项目中,提取了以下28个代码示例,用于说明如何使用twisted.internet.ssl.DefaultOpenSSLContextFactory()

项目:privcount    作者:privcount    | 项目源码 | 文件源码
def run(self):
        '''
        Called by twisted
        '''
        # load initial config
        self.refresh_config()
        if self.config is None:
            logging.critical("cannot start due to error in config file")
            return

        # refresh and check status every event_period seconds
        self.refresh_task = task.LoopingCall(self.refresh_loop)
        refresh_deferred = self.refresh_task.start(self.config['event_period'], now=False)
        refresh_deferred.addErrback(errorCallback)

        # setup server for receiving blinded counts from the DC nodes and key shares from the SK nodes
        listen_port = self.config['listen_port']
        key_path = self.config['key']
        cert_path = self.config['cert']
        ssl_context = ssl.DefaultOpenSSLContextFactory(key_path, cert_path)

        logging.info("Tally Server listening on port {}".format(listen_port))
        reactor.listenSSL(listen_port, self, ssl_context)
        reactor.run()
项目:hostapd-mana    作者:adde88    | 项目源码 | 文件源码
def setUp(self):
        plainRoot = static.Data('not me', 'text/plain')
        tlsRoot = static.Data('me neither', 'text/plain')

        plainSite = server.Site(plainRoot, timeout=None)
        tlsSite = server.Site(tlsRoot, timeout=None)

        from twisted import test
        self.tlsPort = reactor.listenSSL(0, tlsSite,
                                         contextFactory=ssl.DefaultOpenSSLContextFactory(
            sibpath(test.__file__, 'server.pem'),
            sibpath(test.__file__, 'server.pem'),
            ),
                                         interface="127.0.0.1")
        self.plainPort = reactor.listenTCP(0, plainSite, interface="127.0.0.1")

        self.plainPortno = self.plainPort.getHost().port
        self.tlsPortno = self.tlsPort.getHost().port

        plainRoot.putChild('one', util.Redirect(self.getHTTPS('two')))
        tlsRoot.putChild('two', util.Redirect(self.getHTTP('three')))
        plainRoot.putChild('three', util.Redirect(self.getHTTPS('four')))
        tlsRoot.putChild('four', static.Data('FOUND IT!', 'text/plain'))
项目:hostapd-mana    作者:adde88    | 项目源码 | 文件源码
def testOpenSSLBuffering(self):
        serverProto = self.serverProto = SingleLineServerProtocol()
        clientProto = self.clientProto = RecordingClientProtocol()

        server = protocol.ServerFactory()
        client = self.client = protocol.ClientFactory()

        server.protocol = lambda: serverProto
        client.protocol = lambda: clientProto
        client.buffer = []

        sCTX = ssl.DefaultOpenSSLContextFactory(certPath, certPath)
        cCTX = ssl.ClientContextFactory()

        port = self.port = reactor.listenSSL(0, server, sCTX, interface='127.0.0.1')
        reactor.connectSSL('127.0.0.1', port.getHost().port, client, cCTX)

        i = 0
        while i < 5000 and not client.buffer:
            i += 1
            reactor.iterate()

        self.assertEquals(client.buffer, ["+OK <some crap>\r\n"])
项目:sslstrip-hsts-openwrt    作者:adde88    | 项目源码 | 文件源码
def setUp(self):
        plainRoot = static.Data('not me', 'text/plain')
        tlsRoot = static.Data('me neither', 'text/plain')

        plainSite = server.Site(plainRoot, timeout=None)
        tlsSite = server.Site(tlsRoot, timeout=None)

        from twisted import test
        self.tlsPort = reactor.listenSSL(0, tlsSite,
                                         contextFactory=ssl.DefaultOpenSSLContextFactory(
            sibpath(test.__file__, 'server.pem'),
            sibpath(test.__file__, 'server.pem'),
            ),
                                         interface="127.0.0.1")
        self.plainPort = reactor.listenTCP(0, plainSite, interface="127.0.0.1")

        self.plainPortno = self.plainPort.getHost().port
        self.tlsPortno = self.tlsPort.getHost().port

        plainRoot.putChild('one', util.Redirect(self.getHTTPS('two')))
        tlsRoot.putChild('two', util.Redirect(self.getHTTP('three')))
        plainRoot.putChild('three', util.Redirect(self.getHTTPS('four')))
        tlsRoot.putChild('four', static.Data('FOUND IT!', 'text/plain'))
项目:sslstrip-hsts-openwrt    作者:adde88    | 项目源码 | 文件源码
def testOpenSSLBuffering(self):
        serverProto = self.serverProto = SingleLineServerProtocol()
        clientProto = self.clientProto = RecordingClientProtocol()

        server = protocol.ServerFactory()
        client = self.client = protocol.ClientFactory()

        server.protocol = lambda: serverProto
        client.protocol = lambda: clientProto
        client.buffer = []

        sCTX = ssl.DefaultOpenSSLContextFactory(certPath, certPath)
        cCTX = ssl.ClientContextFactory()

        port = self.port = reactor.listenSSL(0, server, sCTX, interface='127.0.0.1')
        reactor.connectSSL('127.0.0.1', port.getHost().port, client, cCTX)

        i = 0
        while i < 5000 and not client.buffer:
            i += 1
            reactor.iterate()

        self.assertEquals(client.buffer, ["+OK <some crap>\r\n"])
项目:CoinSwapCS    作者:AdamISZ    | 项目源码 | 文件源码
def get_ssl_context():
    """Construct an SSL context factory from the user's privatekey/cert.
    TODO: document set up for server operators.
    """
    pkcdata = {}
    for x, y in zip(["ssl_private_key_location", "ssl_certificate_location"],
                    ["key.pem", "cert.pem"]):
        if cs_single().config.get("SERVER", x) == "0":
            sslpath = os.path.join(cs_single().homedir, "ssl")
            if not os.path.exists(sslpath):
                print("No ssl configuration in home directory, please read "
                      "installation instructions and try again.")
                sys.exit(0)
            pkcdata[x] = os.path.join(sslpath, y)
        else:
            pkcdata[x] = cs_single().config.get("SERVER", x)
    return ssl.DefaultOpenSSLContextFactory(pkcdata["ssl_private_key_location"],
                                            pkcdata["ssl_certificate_location"])
项目:zenchmarks    作者:squeaky-pl    | 项目源码 | 文件源码
def setUp(self):
        plainRoot = Data(b'not me', 'text/plain')
        tlsRoot = Data(b'me neither', 'text/plain')

        plainSite = server.Site(plainRoot, timeout=None)
        tlsSite = server.Site(tlsRoot, timeout=None)

        self.tlsPort = reactor.listenSSL(
            0, tlsSite,
            contextFactory=ssl.DefaultOpenSSLContextFactory(
                serverPEMPath, serverPEMPath),
            interface="127.0.0.1")
        self.plainPort = reactor.listenTCP(0, plainSite, interface="127.0.0.1")

        self.plainPortno = self.plainPort.getHost().port
        self.tlsPortno = self.tlsPort.getHost().port

        plainRoot.putChild(b'one', Redirect(self.getHTTPS('two')))
        tlsRoot.putChild(b'two', Redirect(self.getHTTP('three')))
        plainRoot.putChild(b'three', Redirect(self.getHTTPS('four')))
        tlsRoot.putChild(b'four', Data(b'FOUND IT!', 'text/plain'))
项目:hostapd-mana    作者:adde88    | 项目源码 | 文件源码
def _parseSSL(factory, port, privateKey="server.pem", certKey=None,
              sslmethod=None, interface='', backlog=50):
    from twisted.internet import ssl
    if certKey is None:
        certKey = privateKey
    kw = {}
    if sslmethod is not None:
        kw['sslmethod'] = getattr(ssl.SSL, sslmethod)
    cf = ssl.DefaultOpenSSLContextFactory(privateKey, certKey, **kw)
    return ((int(port), factory, cf),
            {'interface': interface, 'backlog': int(backlog)})
项目:hostapd-mana    作者:adde88    | 项目源码 | 文件源码
def _listen(self, site):
        from twisted import test
        return reactor.listenSSL(0, site,
                                 contextFactory=ssl.DefaultOpenSSLContextFactory(
            sibpath(test.__file__, 'server.pem'),
            sibpath(test.__file__, 'server.pem'),
            ),
                                 interface="127.0.0.1")
项目:hostapd-mana    作者:adde88    | 项目源码 | 文件源码
def testSSL(self, ssl=ssl):
            pem = util.sibpath(__file__, 'server.pem')
            p = reactor.listenSSL(0, protocol.ServerFactory(), ssl.DefaultOpenSSLContextFactory(pem, pem))
            portNo = p.getHost().port
            self.assertNotEqual(str(p).find(str(portNo)), -1,
                                "%d not found in %s" % (portNo, p))
            return p.stopListening()
项目:hostapd-mana    作者:adde88    | 项目源码 | 文件源码
def makeContextFactory(self, org, orgUnit, *args, **kwArgs):
        base = self.mktemp()
        generateCertificateFiles(base, org, orgUnit)
        serverCtxFactory = ssl.DefaultOpenSSLContextFactory(
            os.extsep.join((base, 'key')),
            os.extsep.join((base, 'cert')),
            *args, **kwArgs)

        return base, serverCtxFactory
项目:landscape-client    作者:CanonicalLtd    | 项目源码 | 文件源码
def test_ssl_verification_positive(self):
        """
        The client transport should complete an upload of messages to
        a host which provides SSL data which can be verified by the
        public key specified.
        """
        resource = DataCollectingResource()
        context_factory = DefaultOpenSSLContextFactory(PRIVKEY, PUBKEY)
        port = reactor.listenSSL(0, server.Site(resource), context_factory,
                                 interface="127.0.0.1")
        self.ports.append(port)
        transport = HTTPTransport(
            None, "https://localhost:%d/" % (port.getHost().port,), PUBKEY)
        result = deferToThread(transport.exchange, "HI", computer_id="34",
                               message_api="X.Y")

        def got_result(ignored):
            try:
                get_header = resource.request.requestHeaders.getRawHeaders
            except AttributeError:
                # For backwards compatibility with Twisted versions
                # without requestHeaders
                def get_header(header):
                    return [resource.request.received_headers[header]]
            self.assertEqual(get_header("x-computer-id"), ["34"])
            self.assertEqual(
                get_header("user-agent"), ["landscape-client/%s" % (VERSION,)])
            self.assertEqual(get_header("x-message-api"), ["X.Y"])
            self.assertEqual(bpickle.loads(resource.content), "HI")
        result.addCallback(got_result)
        return result
项目:landscape-client    作者:CanonicalLtd    | 项目源码 | 文件源码
def test_ssl_verification_negative(self):
        """
        If the SSL server provides a key which is not verified by the
        specified public key, then the client should immediately end
        the connection without uploading any message data.
        """
        self.log_helper.ignore_errors(PyCurlError)
        r = DataCollectingResource()
        context_factory = DefaultOpenSSLContextFactory(
            BADPRIVKEY, BADPUBKEY)
        port = reactor.listenSSL(0, server.Site(r), context_factory,
                                 interface="127.0.0.1")
        self.ports.append(port)
        transport = HTTPTransport(None, "https://localhost:%d/"
                                  % (port.getHost().port,), pubkey=PUBKEY)

        result = deferToThread(transport.exchange, "HI", computer_id="34",
                               message_api="X.Y")

        def got_result(ignored):
            self.assertIs(r.request, None)
            self.assertIs(r.content, None)
            self.assertTrue("server certificate verification failed"
                            in self.logfile.getvalue())
        result.addErrback(got_result)
        return result
项目:sslstrip-hsts-openwrt    作者:adde88    | 项目源码 | 文件源码
def _parseSSL(factory, port, privateKey="server.pem", certKey=None,
              sslmethod=None, interface='', backlog=50):
    from twisted.internet import ssl
    if certKey is None:
        certKey = privateKey
    kw = {}
    if sslmethod is not None:
        kw['sslmethod'] = getattr(ssl.SSL, sslmethod)
    cf = ssl.DefaultOpenSSLContextFactory(privateKey, certKey, **kw)
    return ((int(port), factory, cf),
            {'interface': interface, 'backlog': int(backlog)})
项目:sslstrip-hsts-openwrt    作者:adde88    | 项目源码 | 文件源码
def _listen(self, site):
        from twisted import test
        return reactor.listenSSL(0, site,
                                 contextFactory=ssl.DefaultOpenSSLContextFactory(
            sibpath(test.__file__, 'server.pem'),
            sibpath(test.__file__, 'server.pem'),
            ),
                                 interface="127.0.0.1")
项目:sslstrip-hsts-openwrt    作者:adde88    | 项目源码 | 文件源码
def testSSL(self, ssl=ssl):
            pem = util.sibpath(__file__, 'server.pem')
            p = reactor.listenSSL(0, protocol.ServerFactory(), ssl.DefaultOpenSSLContextFactory(pem, pem))
            portNo = p.getHost().port
            self.assertNotEqual(str(p).find(str(portNo)), -1,
                                "%d not found in %s" % (portNo, p))
            return p.stopListening()
项目:sslstrip-hsts-openwrt    作者:adde88    | 项目源码 | 文件源码
def makeContextFactory(self, org, orgUnit, *args, **kwArgs):
        base = self.mktemp()
        generateCertificateFiles(base, org, orgUnit)
        serverCtxFactory = ssl.DefaultOpenSSLContextFactory(
            os.extsep.join((base, 'key')),
            os.extsep.join((base, 'cert')),
            *args, **kwArgs)

        return base, serverCtxFactory
项目:Parlay    作者:PromenadeSoftware    | 项目源码 | 文件源码
def getContext(self):
            """Return a SSL.Context object. override in subclasses."""

            ssl_context_factory = ssl.DefaultOpenSSLContextFactory(PARLAY_PATH+'/keys/broker.key',
                                                                   PARLAY_PATH+'/keys/broker.crt')
            # We only want to use 'High' and 'Medium' ciphers, not 'Weak' ones. We want *actual* security here.
            ssl_context = ssl_context_factory.getContext()
            # perfect forward secrecy ciphers
            ssl_context.set_cipher_list('EECDH+ECDSA+AESGCM EECDH+aRSA+AESGCM EECDH+ECDSA+SHA384 EECDH+ECDSA+SHA256 EECDH' +
                                        '+aRSA+SHA384 EECDH+aRSA+SHA256 EECDH+aRSA+RC4 EECDH EDH+aRSA RC4 !aNULL' +
                                        '!eNULL !LOW !3DES !MD5 !EXP !PSK !SRP !DSS')
            return ssl_context
项目:joinmarket-clientserver    作者:JoinMarket-Org    | 项目源码 | 文件源码
def start_daemon(host, port, factory, usessl=False, sslkey=None, sslcert=None):
    if usessl:
        assert sslkey
        assert sslcert
        reactor.listenSSL(
            port, factory, ssl.DefaultOpenSSLContextFactory(sslkey, sslcert),
            interface=host)
    else:
        reactor.listenTCP(port, factory, interface=host)
项目:BF4BlazeEmulator    作者:buchacho    | 项目源码 | 文件源码
def Start():
    Globals.serverIP = "192.168.1.40"

    #MySQL Data
    Globals.dbHost = "127.0.0.1"
    Globals.dbUser = "user"
    Globals.dbPass = "pass"
    Globals.dbDatabase = "db"

    CheckMySqlConn()

    SSLInfo = ssl.DefaultOpenSSLContextFactory('crt/privkey.pem', 'crt/cacert.pem')

    factory = Factory()
    factory.protocol = GosRedirector.GOSRedirector
    reactor.listenSSL(42127, factory, SSLInfo)
    print("[SSL REACTOR] GOSREDIRECTOR STARTED [42127]")

    factory = Factory()
    factory.protocol = BlazeMain_Client.BLAZEHUB
    reactor.listenTCP(10041, factory)
    print("[TCP REACTOR] BLAZE CLIENT [10041]")

    factory = Factory()
    factory.protocol = BlazeMain_Server.BLAZEHUB
    reactor.listenTCP(10071, factory)
    print("[TCP REACTOR] BLAZE SERVER [10071]")

    sites = server.Site(Https.Simple())
    reactor.listenSSL(443, sites, SSLInfo)
    print("[WEB REACTOR] Https [443]")

    reactor.run()
项目:zenchmarks    作者:squeaky-pl    | 项目源码 | 文件源码
def _listen(self, site):
        return reactor.listenSSL(
            0, site,
            contextFactory=ssl.DefaultOpenSSLContextFactory(
                serverPEMPath, serverPEMPath),
            interface="127.0.0.1")
项目:zenchmarks    作者:squeaky-pl    | 项目源码 | 文件源码
def testSSL(self, ssl=ssl):
            pem = util.sibpath(__file__, 'server.pem')
            p = reactor.listenSSL(0, protocol.ServerFactory(), ssl.DefaultOpenSSLContextFactory(pem, pem))
            portNo = p.getHost().port
            self.assertNotEqual(str(p).find(str(portNo)), -1,
                                "%d not found in %s" % (portNo, p))
            return p.stopListening()
项目:zenchmarks    作者:squeaky-pl    | 项目源码 | 文件源码
def makeContextFactory(self, org, orgUnit, *args, **kwArgs):
        base = self.mktemp()
        generateCertificateFiles(base, org, orgUnit)
        serverCtxFactory = ssl.DefaultOpenSSLContextFactory(
            os.extsep.join((base, 'key')),
            os.extsep.join((base, 'cert')),
            *args, **kwArgs)

        return base, serverCtxFactory
项目:zenchmarks    作者:squeaky-pl    | 项目源码 | 文件源码
def __init__(self, *args, **kw):
            kw['sslmethod'] = SSL.TLSv1_METHOD
            ssl.DefaultOpenSSLContextFactory.__init__(self, *args, **kw)
项目:zenchmarks    作者:squeaky-pl    | 项目源码 | 文件源码
def setUp(self):
        # pyOpenSSL Context objects aren't introspectable enough.  Pass in
        # an alternate context factory so we can inspect what is done to it.
        self.contextFactory = ssl.DefaultOpenSSLContextFactory(
            certPath, certPath, _contextFactory=FakeContext)
        self.context = self.contextFactory.getContext()
项目:zenchmarks    作者:squeaky-pl    | 项目源码 | 文件源码
def test_method(self):
        """
        L{ssl.DefaultOpenSSLContextFactory.getContext} returns an SSL context
        which can use SSLv3 or TLSv1 but not SSLv2.
        """
        # SSLv23_METHOD allows SSLv2, SSLv3, or TLSv1
        self.assertEqual(self.context._method, SSL.SSLv23_METHOD)

        # And OP_NO_SSLv2 disables the SSLv2 support.
        self.assertTrue(self.context._options & SSL.OP_NO_SSLv2)

        # Make sure SSLv3 and TLSv1 aren't disabled though.
        self.assertFalse(self.context._options & SSL.OP_NO_SSLv3)
        self.assertFalse(self.context._options & SSL.OP_NO_TLSv1)
项目:zenchmarks    作者:squeaky-pl    | 项目源码 | 文件源码
def test_missingCertificateFile(self):
        """
        Instantiating L{ssl.DefaultOpenSSLContextFactory} with a certificate
        filename which does not identify an existing file results in the
        initializer raising L{OpenSSL.SSL.Error}.
        """
        self.assertRaises(
            SSL.Error,
            ssl.DefaultOpenSSLContextFactory, certPath, self.mktemp())
项目:zenchmarks    作者:squeaky-pl    | 项目源码 | 文件源码
def test_missingPrivateKeyFile(self):
        """
        Instantiating L{ssl.DefaultOpenSSLContextFactory} with a private key
        filename which does not identify an existing file results in the
        initializer raising L{OpenSSL.SSL.Error}.
        """
        self.assertRaises(
            SSL.Error,
            ssl.DefaultOpenSSLContextFactory, self.mktemp(), certPath)