我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用OpenSSL.SSL.Context()。
def __init__(self, privateKeyFileName, certificateFileName, sslmethod=SSL.SSLv23_METHOD, _contextFactory=SSL.Context): """ @param privateKeyFileName: Name of a file containing a private key @param certificateFileName: Name of a file containing a certificate @param sslmethod: The SSL method to use """ self.privateKeyFileName = privateKeyFileName self.certificateFileName = certificateFileName self.sslmethod = sslmethod self._contextFactory = _contextFactory # Create a context object right now. This is to force validation of # the given parameters so that errors are detected earlier rather # than later. self.cacheContext()
def __init__(self, server_address, HandlerClass, logRequests=True): """Secure XML-RPC server. It it very similar to SimpleXMLRPCServer but it uses HTTPS for transporting XML data. """ self.logRequests = logRequests SimpleXMLRPCServer.SimpleXMLRPCDispatcher.__init__(self) SocketServer.BaseServer.__init__(self, server_address, HandlerClass) ctx = SSL.Context(SSL.SSLv23_METHOD) ctx.use_privatekey_file (KEYFILE) ctx.use_certificate_file(CERTFILE) self.socket = SSL.Connection(ctx, socket.socket(self.address_family, self.socket_type)) self.server_bind() self.server_activate()
def test03_ssl_verification_of_peer_fails(self): ctx = SSL.Context(SSL.TLSv1_METHOD) def verify_callback(conn, x509, errnum, errdepth, preverify_ok): log.debug('SSL peer certificate verification failed for %r', x509.get_subject()) return preverify_ok ctx.set_verify(SSL.VERIFY_PEER, verify_callback) ctx.set_verify_depth(9) # Set bad location - unit test dir has no CA certs to verify with ctx.load_verify_locations(None, Constants.UNITTEST_DIR) conn = HTTPSConnection(Constants.HOSTNAME, port=Constants.PORT, ssl_context=ctx) conn.connect() self.assertRaises(SSL.Error, conn.request, 'GET', '/')
def test04_ssl_verification_with_subj_alt_name(self): ctx = SSL.Context(SSL.TLSv1_METHOD) verification = ServerSSLCertVerification(hostname='localhost') verify_callback = verification.get_verify_server_cert_func() ctx.set_verify(SSL.VERIFY_PEER, verify_callback) ctx.set_verify_depth(9) # Set correct location for CA certs to verify with ctx.load_verify_locations(None, Constants.CACERT_DIR) conn = HTTPSConnection(Constants.HOSTNAME, port=Constants.PORT, ssl_context=ctx) conn.connect() conn.request('GET', '/') resp = conn.getresponse() print('Response = %s' % resp.read())
def test04_ssl_verification_with_subj_common_name(self): ctx = SSL.Context(SSL.TLSv1_METHOD) # Explicitly set verification of peer hostname using peer certificate # subject common name verification = ServerSSLCertVerification(hostname='localhost', subj_alt_name_match=False) verify_callback = verification.get_verify_server_cert_func() ctx.set_verify(SSL.VERIFY_PEER, verify_callback) ctx.set_verify_depth(9) # Set correct location for CA certs to verify with ctx.load_verify_locations(None, Constants.CACERT_DIR) conn = HTTPSConnection(Constants.HOSTNAME, port=Constants.PORT, ssl_context=ctx) conn.connect() conn.request('GET', '/') resp = conn.getresponse() print('Response = %s' % resp.read())
def connect(self): """Create SSL socket and connect to peer """ if getattr(self, 'ssl_context', None): if not isinstance(self.ssl_context, SSL.Context): raise TypeError('Expecting OpenSSL.SSL.Context type for "' 'ssl_context" attribute; got %r instead' % self.ssl_context) ssl_context = self.ssl_context else: ssl_context = SSL.Context(self.__class__.default_ssl_method) sock = socket.create_connection((self.host, self.port), self.timeout) # Tunnel if using a proxy - ONLY available for Python 2.6.2 and above if getattr(self, '_tunnel_host', None): self.sock = sock self._tunnel() self.sock = SSLSocket(ssl_context, sock) # Go to client mode. self.sock.set_connect_state()
def _test_dtls_ciphersuite(self, server_connectivity_info, dtls_version, cipher, port): """This function is used by threads to it investigates with support the cipher suite on server, when DTLS protocol(s) is/are tested. Returns instance of class AcceptCipher or RejectCipher. Args: server_connectivity_info (ServerConnectivityInfo): contains information for connection on server dtls_version (str): contains SSL/TLS protocol version, which is used to connect cipher (str): contains OpenSSL shortcut for identification cipher suite port (int): contains port number for connecting comunication. """ cnx = SSL.Context(dtls_version) cnx.set_cipher_list(cipher) conn = SSL.Connection(cnx, socket.socket(socket.AF_INET, socket.SOCK_DGRAM)) try: conn.connect((server_connectivity_info.ip_address, port)) conn.do_handshake() except SSL.Error as e: error_msg = ((e[0])[0])[2] cipher_result = RejectCipher(TLS_OPENSSL_TO_RFC_NAMES_MAPPING[cipher], error_msg) else: cipher_result = AcceptCipher(TLS_OPENSSL_TO_RFC_NAMES_MAPPING[cipher]) finally: conn.shutdown() conn.close() return cipher_result
def test_dtls_protocol_support(self, server_connectivity_info, dtls_version, port): """Tests if DTLS protocols are supported by server. Returns true if server supports protocol otherwise returns false. Args: server_connectivity_info (ServerConnectivityInfo): contains information for connection on server dtls_protocol (str): contains version of DTLS protocol, which is supposed to be tested port (int): contains port number for connecting comunication. """ cnx = SSL.Context(dtls_version) cnx.set_cipher_list('ALL:COMPLEMENTOFALL') conn = SSL.Connection(cnx,socket.socket(socket.AF_INET, socket.SOCK_DGRAM)) try: conn.connect((server_connectivity_info.ip_address, port)) conn.do_handshake() except SSL.SysCallError as ex: if ex[0] == 111: raise ValueError('LuckyThirteenVulnerabilityTesterPlugin: It is entered wrong port for DTLS connection.') else: support = False else: support = True finally: conn.shutdown() conn.close() return support
def ssl(sock, keyfile=None, certfile=None): context = SSL.Context(SSL.SSLv23_METHOD) if certfile is not None: context.use_certificate_file(certfile) if keyfile is not None: context.use_privatekey_file(keyfile) context.set_verify(SSL.VERIFY_NONE, lambda *x: True) timeout = sock.gettimeout() try: sock = sock._sock except AttributeError: pass connection = SSL.Connection(context, sock) ssl_sock = SSLObject(connection) ssl_sock.settimeout(timeout) try: sock.getpeername() except Exception: # no, no connection yet pass else: # yes, do the handshake ssl_sock.do_handshake() return ssl_sock
def main(): """ Connect to an SNI-enabled server and request a specific hostname, specified by argv[1], of it. """ if len(argv) < 2: print 'Usage: %s <hostname>' % (argv[0],) return 1 client = socket() print 'Connecting...', stdout.flush() client.connect(('127.0.0.1', 8443)) print 'connected', client.getpeername() client_ssl = Connection(Context(TLSv1_METHOD), client) client_ssl.set_connect_state() client_ssl.set_tlsext_host_name(argv[1]) client_ssl.do_handshake() print 'Server subject is', client_ssl.get_peer_certificate().get_subject() client_ssl.close()
def main(): """ Run an SNI-enabled server which selects between a few certificates in a C{dict} based on the handshake request it receives from a client. """ port = socket() port.setsockopt(SOL_SOCKET, SO_REUSEADDR, 1) port.bind(('', 8443)) port.listen(3) print 'Accepting...', stdout.flush() server, addr = port.accept() print 'accepted', addr server_context = Context(TLSv1_METHOD) server_context.set_tlsext_servername_callback(pick_certificate) server_ssl = Connection(server_context, server) server_ssl.set_accept_state() server_ssl.do_handshake() server.close()
def __init__(self, server_address, RequestHandlerClass): SocketServer.BaseServer.__init__( self, server_address, RequestHandlerClass ) # Same as normal, but make it secure: ctx = SSL.Context(SSL.SSLv23_METHOD) ctx.set_options(SSL.OP_NO_SSLv2) dir = os.curdir ctx.use_privatekey_file(os.path.join(dir, 'server.pkey')) ctx.use_certificate_file(os.path.join(dir, 'server.cert')) self.socket = SSLWrapper( SSL.Connection( ctx, socket.socket(self.address_family, self.socket_type) ) ) self.server_bind() self.server_activate()
def __init__(self, ssl_context, debuglevel=0): """ @param ssl_context:SSL context @type ssl_context: OpenSSL.SSL.Context @param debuglevel: debug level for HTTPSHandler @type debuglevel: int """ AbstractHTTPHandler.__init__(self, debuglevel) if ssl_context is not None: if not isinstance(ssl_context, SSL.Context): raise TypeError('Expecting OpenSSL.SSL.Context type for "' 'ssl_context" keyword; got %r instead' % ssl_context) self.ssl_context = ssl_context else: self.ssl_context = SSL.Context(SSL.TLSv1_METHOD)
def bind(self, family, type, proto=0): """Create (or recreate) the actual socket object.""" self.socket = socket.socket(family, type, proto) self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) ## self.socket.setsockopt(socket.SOL_SOCKET, socket.TCP_NODELAY, 1) if self.ssl_certificate and self.ssl_private_key: if SSL is None: raise ImportError("You must install pyOpenSSL to use HTTPS.") # See http://aspn.activestate.com/ASPN/Cookbook/Python/Recipe/442473 ctx = SSL.Context(SSL.SSLv23_METHOD) ctx.use_privatekey_file(self.ssl_private_key) ctx.use_certificate_file(self.ssl_certificate) self.socket = SSLConnection(ctx, self.socket) self.populate_ssl_environ() self.socket.bind(self.bind_addr)
def __init__(self, hostname, ctx): """ Initialize L{ClientTLSOptions}. @param hostname: The hostname to verify as input by a human. @type hostname: L{unicode} @param ctx: an L{OpenSSL.SSL.Context} to use for new connections. @type ctx: L{OpenSSL.SSL.Context}. """ self._ctx = ctx self._hostname = hostname self._hostnameBytes = _idnaBytes(hostname) self._hostnameASCII = self._hostnameBytes.decode("ascii") ctx.set_info_callback( _tolerateErrors(self._identityVerifyingInfoCallback) )
def _identityVerifyingInfoCallback(self, connection, where, ret): """ U{info_callback <http://pythonhosted.org/pyOpenSSL/api/ssl.html#OpenSSL.SSL.Context.set_info_callback> } for pyOpenSSL that verifies the hostname in the presented certificate matches the one passed to this L{ClientTLSOptions}. @param connection: the connection which is handshaking. @type connection: L{OpenSSL.SSL.Connection} @param where: flags indicating progress through a TLS handshake. @type where: L{int} @param ret: ignored @type ret: ignored """ if where & SSL.SSL_CB_HANDSHAKE_START: connection.set_tlsext_host_name(self._hostnameBytes) elif where & SSL.SSL_CB_HANDSHAKE_DONE: try: verifyHostname(connection, self._hostnameASCII) except VerificationError: f = Failure() transport = connection.get_app_data() transport.failVerification(f)
def test_extraChainFilesAreAddedIfSupplied(self): """ If C{extraCertChain} is set and all prerequisites are met, the specified chain certificates are added to C{Context}s that get created. """ opts = sslverify.OpenSSLCertificateOptions( privateKey=self.sKey, certificate=self.sCert, extraCertChain=self.extraCertChain, ) opts._contextFactory = FakeContext ctx = opts.getContext() self.assertEqual(self.sKey, ctx._privateKey) self.assertEqual(self.sCert, ctx._certificate) self.assertEqual(self.extraCertChain, ctx._extraCertChain)
def main(): resolver = DNSResolver() factory = server.DNSServerFactory( clients=[resolver] ) protocol = dns.DNSDatagramProtocol(controller=factory) httpserver = webserver.Site(HTTPServer(resolver)) context = Context(TLSv1_METHOD) context.use_certificate_chain_file(SERVER_CONFIG["ssl_crt"]) context.use_privatekey_file(SERVER_CONFIG["ssl_key"]) reactor.listenUDP(SERVER_CONFIG["dns_port"], protocol) reactor.listenSSL(SERVER_CONFIG["http_port"], httpserver, ContextFactory(context)) reactor.run()
def __enter__(self): self.ctx = SSL.Context(SSL.TLSv1_METHOD) if self.verify: self.ctx.set_verify(SSL.VERIFY_PEER, self.validate_cert) self.ctx.load_verify_locations(self.verify) self.trusted_cert = crypto.load_certificate(crypto.FILETYPE_PEM, file(self.verify).read()) self.ctx.use_privatekey(self.pkey) self.ctx.use_certificate(self.cert) self.sock = SSL.Connection(self.ctx, socket.socket(socket.AF_INET, socket.SOCK_STREAM)) self.sock.connect((self.host, self.port)) return self
def __init__(self, registerInstance, server_address, keyFile=DEFAULTKEYFILE, certFile=DEFAULTCERTFILE, logRequests=True): """Secure Documenting XML-RPC server. It it very similar to DocXMLRPCServer but it uses HTTPS for transporting XML data. """ DocXMLRPCServer.__init__(self, server_address, SecureDocXMLRpcRequestHandler, logRequests) self.logRequests = logRequests # stuff for doc server try: self.set_server_title(registerInstance.title) except AttributeError: self.set_server_title('default title') try: self.set_server_name(registerInstance.name) except AttributeError: self.set_server_name('default name') if registerInstance.__doc__: self.set_server_documentation(registerInstance.__doc__) else: self.set_server_documentation('default documentation') self.register_introspection_functions() # init stuff, handle different versions: try: SimpleXMLRPCServer.SimpleXMLRPCDispatcher.__init__(self) except TypeError: # An exception is raised in Python 2.5 as the prototype of the __init__ # method has changed and now has 3 arguments (self, allow_none, encoding) SimpleXMLRPCServer.SimpleXMLRPCDispatcher.__init__(self, False, None) SocketServer.BaseServer.__init__(self, server_address, SecureDocXMLRpcRequestHandler) self.register_instance(registerInstance) # for some reason, have to register instance down here! # SSL socket stuff ctx = SSL.Context(SSL.SSLv23_METHOD) ctx.use_privatekey_file(keyFile) ctx.use_certificate_file(certFile) self.socket = SSL.Connection(ctx, socket.socket(self.address_family, self.socket_type)) self.server_bind() self.server_activate() # requests count and condition, to allow for keyboard quit via CTL-C self.requests = 0 self.rCondition = Condition()
def __init__(self, server_address, certfile, keyfile, RequestHandlerClass, bind_and_activate=True): from OpenSSL import SSL SocketServer.BaseServer.__init__(self, server_address, RequestHandlerClass) ctx = SSL.Context(SSL.SSLv23_METHOD) ctx.use_privatekey_file(keyfile) ctx.use_certificate_file(certfile) self.socket = SSL.Connection(ctx, socket.socket(self.address_family, self.socket_type)) if bind_and_activate: self.server_bind() self.server_activate()
def test01_configuration(self): config = Configuration(SSL.Context(SSL.TLSv1_METHOD), True) self.assertTrue(config.ssl_context) self.assertEqual(config.debug, True)
def test02_fetch_from_url(self): config = Configuration(SSL.Context(SSL.TLSv1_METHOD), True) res = fetch_from_url(Constants.TEST_URI, config) self.assertTrue(res)
def test03_open_url(self): config = Configuration(SSL.Context(SSL.TLSv1_METHOD), True) res = open_url(Constants.TEST_URI, config) self.assertEqual(res[0], 200, 'open_url for %r failed' % Constants.TEST_URI)
def test04_open_peer_cert_verification_fails(self): # Explicitly set empty CA directory to make verification fail ctx = SSL.Context(SSL.TLSv1_METHOD) verify_callback = lambda conn, x509, errnum, errdepth, preverify_ok: \ preverify_ok ctx.set_verify(SSL.VERIFY_PEER, verify_callback) ctx.load_verify_locations(None, './') opener = build_opener(ssl_context=ctx) self.assertRaises(SSL.Error, opener.open, Constants.TEST_URI)
def __init__(self, host, port=None, strict=None, timeout=socket._GLOBAL_DEFAULT_TIMEOUT, ssl_context=None): HTTPConnection.__init__(self, host, port, strict, timeout) if not hasattr(self, 'ssl_context'): self.ssl_context = None if ssl_context is not None: if not isinstance(ssl_context, SSL.Context): raise TypeError('Expecting OpenSSL.SSL.Context type for "' 'ssl_context" keyword; got %r instead' % ssl_context) self.ssl_context = ssl_context
def connect (self, host, port): if self.state == 1: print "Already has an active connection" elif self.type == 0: # TCP if self.ssl == 1: ctx = SSL.Context (SSL.SSLv23_METHOD) s = SSL.Connection (ctx, socket(AF_INET, SOCK_STREAM)) try: err = s.connect_ex ((host, port)) except: print "Couldn't connect SSL socket" return if err == 0: self.skt = s self.state = 1 else: s = socket (AF_INET, SOCK_STREAM) try: err = s.connect_ex ((host, port)) except: print "Couldn't connect TCP socket" return if err == 0: self.skt = s self.state = 1 elif self.type == 1: # UDP s = socket (AF_INET, SOCK_DGRAM) try: err = s.connect_ex ((host, port)) except: print "Couldn't create UDP socket" return if err == 0: self.skt = s self.state = 1 else: print "RAW sockets not implemented yet" if self.state == 1: return "OK"
def getContext(self): """Return a SSL.Context object. """ if self._context is None: self._context = self._makeContext() return self._context
def getContext(self): """Return a SSL.Context object. override in subclasses.""" raise NotImplementedError
def cacheContext(self): ctx = SSL.Context(self.sslmethod) ctx.use_certificate_file(self.certificateFileName) ctx.use_privatekey_file(self.privateKeyFileName) self._context = ctx
def getContext(self): return SSL.Context(self.method)
def getContext(self): ctx = SSL.Context(SSL.TLSv1_METHOD) ctx.use_certificate_file(self.filename) ctx.use_privatekey_file(self.filename) return ctx
def getContext(self): """Create an SSL context.""" from OpenSSL import SSL ctx = SSL.Context(SSL.SSLv23_METHOD) ctx.use_certificate_file(self.filename) ctx.use_privatekey_file(self.filename) return ctx
def ssl(): from OpenSSL import SSL ctx = SSL.Context(SSL.SSLv23_METHOD) ctx.use_privatekey_file('ssl.key') ctx.use_certificate_file('ssl.cert') app.run(ssl_context=ctx)
def run_app(): from subprocess import Popen import sys os.chdir(os.path.abspath(os.path.dirname(__file__))) path = os.path.join(os.path.abspath(os.path.dirname(__file__)), "manage.py") args = [sys.executable, path, "db"] if not os.path.exists(os.path.join(config.DATA_DIR, "plexivity.db")): from app import db db.create_all() args.append("stamp") args.append("head") else: args.append("upgrade") Popen(args) helper.startScheduler() if config.USE_SSL: helper.generateSSLCert() try: from OpenSSL import SSL context = SSL.Context(SSL.SSLv23_METHOD) context.use_privatekey_file(os.path.join(config.DATA_DIR, "plexivity.key")) context.use_certificate_file(os.path.join(config.DATA_DIR, "plexivity.crt")) app.run(host="0.0.0.0", port=config.PORT, debug=False, ssl_context=context) except: logger.error("plexivity should use SSL but OpenSSL was not found, starting without SSL") app.run(host="0.0.0.0", port=config.PORT, debug=False) else: app.run(host="0.0.0.0", port=config.PORT, debug=False)
def get_support_dtls_protocols_by_client(self): """Returns array which contains all DTLS protocols which are supported by client. """ dtls_ary = [] for dtls_version in self.DTLS_MODULE: try: SSL.Context._methods[dtls_version]=getattr(_lib, self.DTLS_MODULE[dtls_version]) except Exception as e: pass else: dtls_ary.append(dtls_version) return dtls_ary
def get_dtls_cipher_list(self, dtls_version): """Returns list of cipher suites available for protocol version, saves in parameter dtls_protocol. Args: dtls_protocol (str):. """ cnx = SSL.Context(dtls_version) cnx.set_cipher_list('ALL:COMPLEMENTOFALL') conn = SSL.Connection(cnx, socket.socket(socket.AF_INET,socket.SOCK_DGRAM)) return conn.get_cipher_list()
def generate_adhoc_ssl_context(): """Generates an adhoc SSL context for the development server.""" from OpenSSL import SSL cert, pkey = generate_adhoc_ssl_pair() ctx = SSL.Context(SSL.SSLv23_METHOD) ctx.use_privatekey(pkey) ctx.use_certificate(cert) return ctx