Python twisted.internet.ssl 模块,CertificateOptions() 实例源码

我们从Python开源项目中,提取了以下14个代码示例,用于说明如何使用twisted.internet.ssl.CertificateOptions()

项目:hostapd-mana    作者:adde88    | 项目源码 | 文件源码
def options(self, *authorities):
        """
        Behaves like L{twisted.internet.ssl.PrivateCertificate.options}().
        """
        if not self.client:
            # do some crud with sslverify to generate a temporary self-signed
            # certificate.  This is SLOOOWWWWW so it is only in the absolute
            # worst, most naive case.

            # We have to do this because OpenSSL will not let both the server
            # and client be anonymous.
            sharedDN = DN(CN='TEMPORARY CERTIFICATE')
            key = KeyPair.generate()
            cr = key.certificateRequest(sharedDN)
            sscrd = key.signCertificateRequest(sharedDN, cr, lambda dn: True, 1)
            cert = key.newCertificate(sscrd)
            return cert.options(*authorities)
        options = dict()
        if authorities:
            options.update(dict(verify=True,
                                requireCertificate=True,
                                caCerts=[auth.original for auth in authorities]))
        occo = CertificateOptions(**options)
        return occo
项目:sslstrip-hsts-openwrt    作者:adde88    | 项目源码 | 文件源码
def options(self, *authorities):
        """
        Behaves like L{twisted.internet.ssl.PrivateCertificate.options}().
        """
        if not self.client:
            # do some crud with sslverify to generate a temporary self-signed
            # certificate.  This is SLOOOWWWWW so it is only in the absolute
            # worst, most naive case.

            # We have to do this because OpenSSL will not let both the server
            # and client be anonymous.
            sharedDN = DN(CN='TEMPORARY CERTIFICATE')
            key = KeyPair.generate()
            cr = key.certificateRequest(sharedDN)
            sscrd = key.signCertificateRequest(sharedDN, cr, lambda dn: True, 1)
            cert = key.newCertificate(sscrd)
            return cert.options(*authorities)
        options = dict()
        if authorities:
            options.update(dict(verify=True,
                                requireCertificate=True,
                                caCerts=[auth.original for auth in authorities]))
        occo = CertificateOptions(**options)
        return occo
项目:deb-python-autobahn    作者:openstack    | 项目源码 | 文件源码
def _check_native_endpoint(self, endpoint):
        if IStreamClientEndpoint.providedBy(endpoint):
            pass
        elif isinstance(endpoint, dict):
            if u'tls' in endpoint:
                tls = endpoint[u'tls']
                if isinstance(tls, (dict, bool)):
                    pass
                elif IOpenSSLClientConnectionCreator.providedBy(tls):
                    pass
                elif isinstance(tls, CertificateOptions):
                    pass
                else:
                    raise ValueError(
                        "'tls' configuration must be a dict, CertificateOptions or"
                        " IOpenSSLClientConnectionCreator provider"
                    )
        else:
            raise ValueError(
                "'endpoint' configuration must be a dict or IStreamClientEndpoint"
                " provider"
            )
项目:hostapd-mana    作者:adde88    | 项目源码 | 文件源码
def onProceed(self, obj):
        """
        Proceed with TLS negotiation and reset the XML stream.
        """

        self.xmlstream.removeObserver('/failure', self.onFailure)
        ctx = ssl.CertificateOptions()
        self.xmlstream.transport.startTLS(ctx)
        self.xmlstream.reset()
        self.xmlstream.sendHeader()
        self._deferred.callback(Reset)
项目:hostapd-mana    作者:adde88    | 项目源码 | 文件源码
def createServer(self, address, portNumber, factory):
        contextFactory = ssl.CertificateOptions()
        return reactor.listenSSL(
            portNumber, factory, contextFactory, interface=address)
项目:hostapd-mana    作者:adde88    | 项目源码 | 文件源码
def connectClient(self, address, portNumber, clientCreator):
        contextFactory = ssl.CertificateOptions()
        return clientCreator.connectSSL(address, portNumber, contextFactory)
项目:sslstrip-hsts-openwrt    作者:adde88    | 项目源码 | 文件源码
def onProceed(self, obj):
        """
        Proceed with TLS negotiation and reset the XML stream.
        """

        self.xmlstream.removeObserver('/failure', self.onFailure)
        ctx = ssl.CertificateOptions()
        self.xmlstream.transport.startTLS(ctx)
        self.xmlstream.reset()
        self.xmlstream.sendHeader()
        self._deferred.callback(Reset)
项目:sslstrip-hsts-openwrt    作者:adde88    | 项目源码 | 文件源码
def createServer(self, address, portNumber, factory):
        contextFactory = ssl.CertificateOptions()
        return reactor.listenSSL(
            portNumber, factory, contextFactory, interface=address)
项目:sslstrip-hsts-openwrt    作者:adde88    | 项目源码 | 文件源码
def connectClient(self, address, portNumber, clientCreator):
        contextFactory = ssl.CertificateOptions()
        return clientCreator.connectSSL(address, portNumber, contextFactory)
项目:zenchmarks    作者:squeaky-pl    | 项目源码 | 文件源码
def onProceed(self, obj):
        """
        Proceed with TLS negotiation and reset the XML stream.
        """

        self.xmlstream.removeObserver('/failure', self.onFailure)
        ctx = ssl.CertificateOptions()
        self.xmlstream.transport.startTLS(ctx)
        self.xmlstream.reset()
        self.xmlstream.sendHeader()
        self._deferred.callback(Reset)
项目:zenchmarks    作者:squeaky-pl    | 项目源码 | 文件源码
def _parseClientSSLOptions(kwargs):
    """
    Parse common arguments for SSL endpoints, creating an L{CertificateOptions}
    instance.

    @param kwargs: A dict of keyword arguments to be parsed, potentially
        containing keys C{certKey}, C{privateKey}, C{caCertsDir}, and
        C{hostname}.  See L{_parseClientSSL}.
    @type kwargs: L{dict}

    @return: The remaining arguments, including a new key C{sslContextFactory}.
    """
    hostname = kwargs.pop('hostname', None)
    clientCertificate = _privateCertFromPaths(kwargs.pop('certKey', None),
                                              kwargs.pop('privateKey', None))
    trustRoot = _parseTrustRootPath(kwargs.pop('caCertsDir', None))
    if hostname is not None:
        configuration = optionsForClientTLS(
            _idnaText(hostname), trustRoot=trustRoot,
            clientCertificate=clientCertificate
        )
    else:
        # _really_ though, you should specify a hostname.
        if clientCertificate is not None:
            privateKeyOpenSSL = clientCertificate.privateKey.original
            certificateOpenSSL = clientCertificate.original
        else:
            privateKeyOpenSSL = None
            certificateOpenSSL = None
        configuration = CertificateOptions(
            trustRoot=trustRoot,
            privateKey=privateKeyOpenSSL,
            certificate=certificateOpenSSL,
        )
    kwargs['sslContextFactory'] = configuration
    return kwargs
项目:zenchmarks    作者:squeaky-pl    | 项目源码 | 文件源码
def connectClient(self, address, portNumber, clientCreator):
        """
        Create an SSL client using L{IReactorSSL.connectSSL}.
        """
        contextFactory = ssl.CertificateOptions()
        return clientCreator.connectSSL(address, portNumber, contextFactory)
项目:txacme    作者:twisted    | 项目源码 | 文件源码
def start_responding(self, server_name, challenge, response):
        """
        Put a context into the mapping.
        """
        server_name = response.z_domain.decode('ascii')
        cert, pkey = generate_tls_sni_01_cert(
            server_name, _generate_private_key=self._generate_private_key)
        server_name = server_name.encode('utf-8')
        self._challenge_options[server_name] = CertificateOptions(
            certificate=cert_cryptography_to_pyopenssl(cert),
            privateKey=key_cryptography_to_pyopenssl(pkey))
项目:txkube    作者:LeastAuthority    | 项目源码 | 文件源码
def __invariant__(self):
        certs = list(
            ssl.Certificate.loadPEM(cert.as_bytes())
            for cert
            in self.chain.certificates
        )
        key = ssl.KeyPair.load(self.key.as_bytes(), FILETYPE_PEM)

        # Invoke CertificateOptions' key/certificate match checking logic.
        ssl.CertificateOptions(
            privateKey=key.original,
            certificate=certs[0].original,
            extraCertChain=list(cert.original for cert in certs[1:]),
        ).getContext()
        return (True, "")