我们从Python开源项目中,提取了以下14个代码示例,用于说明如何使用twisted.internet.ssl.CertificateOptions()。
def options(self, *authorities): """ Behaves like L{twisted.internet.ssl.PrivateCertificate.options}(). """ if not self.client: # do some crud with sslverify to generate a temporary self-signed # certificate. This is SLOOOWWWWW so it is only in the absolute # worst, most naive case. # We have to do this because OpenSSL will not let both the server # and client be anonymous. sharedDN = DN(CN='TEMPORARY CERTIFICATE') key = KeyPair.generate() cr = key.certificateRequest(sharedDN) sscrd = key.signCertificateRequest(sharedDN, cr, lambda dn: True, 1) cert = key.newCertificate(sscrd) return cert.options(*authorities) options = dict() if authorities: options.update(dict(verify=True, requireCertificate=True, caCerts=[auth.original for auth in authorities])) occo = CertificateOptions(**options) return occo
def _check_native_endpoint(self, endpoint): if IStreamClientEndpoint.providedBy(endpoint): pass elif isinstance(endpoint, dict): if u'tls' in endpoint: tls = endpoint[u'tls'] if isinstance(tls, (dict, bool)): pass elif IOpenSSLClientConnectionCreator.providedBy(tls): pass elif isinstance(tls, CertificateOptions): pass else: raise ValueError( "'tls' configuration must be a dict, CertificateOptions or" " IOpenSSLClientConnectionCreator provider" ) else: raise ValueError( "'endpoint' configuration must be a dict or IStreamClientEndpoint" " provider" )
def onProceed(self, obj): """ Proceed with TLS negotiation and reset the XML stream. """ self.xmlstream.removeObserver('/failure', self.onFailure) ctx = ssl.CertificateOptions() self.xmlstream.transport.startTLS(ctx) self.xmlstream.reset() self.xmlstream.sendHeader() self._deferred.callback(Reset)
def createServer(self, address, portNumber, factory): contextFactory = ssl.CertificateOptions() return reactor.listenSSL( portNumber, factory, contextFactory, interface=address)
def connectClient(self, address, portNumber, clientCreator): contextFactory = ssl.CertificateOptions() return clientCreator.connectSSL(address, portNumber, contextFactory)
def _parseClientSSLOptions(kwargs): """ Parse common arguments for SSL endpoints, creating an L{CertificateOptions} instance. @param kwargs: A dict of keyword arguments to be parsed, potentially containing keys C{certKey}, C{privateKey}, C{caCertsDir}, and C{hostname}. See L{_parseClientSSL}. @type kwargs: L{dict} @return: The remaining arguments, including a new key C{sslContextFactory}. """ hostname = kwargs.pop('hostname', None) clientCertificate = _privateCertFromPaths(kwargs.pop('certKey', None), kwargs.pop('privateKey', None)) trustRoot = _parseTrustRootPath(kwargs.pop('caCertsDir', None)) if hostname is not None: configuration = optionsForClientTLS( _idnaText(hostname), trustRoot=trustRoot, clientCertificate=clientCertificate ) else: # _really_ though, you should specify a hostname. if clientCertificate is not None: privateKeyOpenSSL = clientCertificate.privateKey.original certificateOpenSSL = clientCertificate.original else: privateKeyOpenSSL = None certificateOpenSSL = None configuration = CertificateOptions( trustRoot=trustRoot, privateKey=privateKeyOpenSSL, certificate=certificateOpenSSL, ) kwargs['sslContextFactory'] = configuration return kwargs
def connectClient(self, address, portNumber, clientCreator): """ Create an SSL client using L{IReactorSSL.connectSSL}. """ contextFactory = ssl.CertificateOptions() return clientCreator.connectSSL(address, portNumber, contextFactory)
def start_responding(self, server_name, challenge, response): """ Put a context into the mapping. """ server_name = response.z_domain.decode('ascii') cert, pkey = generate_tls_sni_01_cert( server_name, _generate_private_key=self._generate_private_key) server_name = server_name.encode('utf-8') self._challenge_options[server_name] = CertificateOptions( certificate=cert_cryptography_to_pyopenssl(cert), privateKey=key_cryptography_to_pyopenssl(pkey))
def __invariant__(self): certs = list( ssl.Certificate.loadPEM(cert.as_bytes()) for cert in self.chain.certificates ) key = ssl.KeyPair.load(self.key.as_bytes(), FILETYPE_PEM) # Invoke CertificateOptions' key/certificate match checking logic. ssl.CertificateOptions( privateKey=key.original, certificate=certs[0].original, extraCertChain=list(cert.original for cert in certs[1:]), ).getContext() return (True, "")