我们从Python开源项目中,提取了以下37个代码示例,用于说明如何使用ssl.PROTOCOL_TLSv1_2()。
def loop(self): """Main server loop for accepting connections. Better call it on its own thread""" while True: try: (csock, (ipaddr, port)) = self.connection["sock"].accept() self._log("L", "New connection from %s:%s" % (str(ipaddr), str(port))) except sock_error: raise sock_error try: csock = ssl.wrap_socket(csock, server_side=True, certfile="server.crt", keyfile="server.key", ssl_version=ssl.PROTOCOL_TLSv1_2) except AttributeError: # All PROTOCOL consts are merged on TLS in Python2.7.13 csock = ssl.wrap_socket(csock, server_side=True, certfile="server.crt", keyfile="server.key", ssl_version=ssl.PROTOCOL_TLS) self.clients["hosts"][str(self.clients["serial"])] = Host(csock, ipaddr, port, self.clients["serial"]) self.clients["serial"] += 1
def _get_ssl_options(cls, cert_options): ssl_options = {} if cert_options['validate_cert']: ssl_options["cert_reqs"] = ssl.CERT_REQUIRED if cert_options['ca_certs'] is not None: ssl_options["ca_certs"] = cert_options['ca_certs'] else: ssl_options["ca_certs"] = _default_ca_certs() if cert_options['client_key'] is not None: ssl_options["keyfile"] = cert_options['client_key'] if cert_options['client_cert'] is not None: ssl_options["certfile"] = cert_options['client_cert'] # according to REC 7540: # deployments of HTTP/2 that use TLS 1.2 MUST # support TLS_ECDHE_RSA_WITH_AES_128_GCM_SHA256 ssl_options["ciphers"] = "ECDH+AESGCM" ssl_options["ssl_version"] = ssl.PROTOCOL_TLSv1_2 ssl_options = ssl_options_to_context(ssl_options) ssl_options.set_alpn_protocols(['h2']) return ssl_options
def _get_ssl_options(cls, cert_options): ssl_options = {} if cert_options['validate_cert']: ssl_options["cert_reqs"] = ssl.CERT_REQUIRED if cert_options['ca_certs'] is not None: ssl_options["ca_certs"] = cert_options['ca_certs'] else: ssl_options["ca_certs"] = simple_httpclient._default_ca_certs() if cert_options['client_key'] is not None: ssl_options["keyfile"] = cert_options['client_key'] if cert_options['client_cert'] is not None: ssl_options["certfile"] = cert_options['client_cert'] # according to REC 7540: # deployments of HTTP/2 that use TLS 1.2 MUST # support TLS_ECDHE_RSA_WITH_AES_128_GCM_SHA256 ssl_options["ciphers"] = "ECDH+AESGCM" ssl_options["ssl_version"] = ssl.PROTOCOL_TLSv1_2 ssl_options = netutil.ssl_options_to_context(ssl_options) ssl_options.set_alpn_protocols(['h2']) return ssl_options
def __init__(self, listen_host, listen_port, server_key, server_cert, ca_cert, image_path, image_types, exec_handlers, static_paths): websockets = WebSockets() wm = pyinotify.WatchManager() inotify_handler = INotifyHandler(websockets) self._notifier = pyinotify.Notifier(wm, inotify_handler) for image_type in image_types: type_path = os.path.join(image_path, image_type) wm.add_watch(type_path, pyinotify.IN_MOVED_TO) exec_handlers = dict(x.split('=', 1) for x in (exec_handlers or [])) static_paths = dict(x.split('=', 1) for x in (static_paths or [])) http_handler = HTTPRequestHandler(image_path, image_types, exec_handlers, static_paths, websockets) self._httpd = geventserver.WSGIServer( (listen_host, listen_port), http_handler, keyfile=server_key, certfile=server_cert, ca_certs=ca_cert, cert_reqs=ssl.CERT_REQUIRED, ssl_version=ssl.PROTOCOL_TLSv1_2)
def get_ssl_version(sock): # Seth behaves differently depeding on the TLS protocol # https://bugs.python.org/issue31453 # This is an ugly hack (as if the rest of this wasn't...) versions = [ ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_TLSv1_1, ssl.PROTOCOL_TLSv1_2, ] firstbytes = sock.recv(16, socket.MSG_PEEK) try: return versions[firstbytes[10]-1] except IndexError: print("Unexpected SSL version: %s" % hexlify(firstbytes)) return versions[-1] # def launch_rdp_client(): # time.sleep(1) # p = subprocess.Popen( # ["xfreerdp", # "/v:%s:%d" % (args.bind_ip, consts.RELAY_PORT), # "/u:%s\\%s" % (domain, user), # ], # )
def connect(self): time.sleep(random.randrange(0, 2**self.connection_attempts)) self.connection_attempts += 1 # websocket.enableTrace(True) ws = websocket.WebSocketApp(self.config_server_url, on_message=self.on_message, on_error=self.on_error, on_close=self.on_close) ws.on_open = self.on_open if self.config_server_url.startswith("wss://"): ws.run_forever(sslopt={"cert_reqs": ssl.CERT_REQUIRED, "ca_certs": ca_cert, "ssl_version": ssl.PROTOCOL_TLSv1_2, "keyfile": client_pem, "certfile": client_crt}) else: ws.run_forever()
def main(): # websocket.enableTrace(True) if len(sys.argv) < 2: host = ws_url else: host = sys.argv[1] ws = websocket.WebSocketApp(host, on_message=on_message, on_error=on_error, on_close=on_close) ws.on_open = on_open if host.startswith("wss://"): ws.run_forever(sslopt={"cert_reqs": ssl.CERT_REQUIRED, "ca_certs": ca_cert, "ssl_version": ssl.PROTOCOL_TLSv1_2, "keyfile": client_pem, "certfile": client_crt}) else: ws.run_forever()
def run(self): """ Start the RPyC server """ if self.certfile is not None and self.keyfile is not None: authenticator = SSLAuthenticator(self.certfile, self.keyfile) self.server = ThreadedServer( self.serviceClass, hostname=self.host, port=self.port, protocol_config={'allow_all_attrs': True}, authenticator=authenticator, cert_reqs=ssl.CERT_REQUIRED, ciphers='EECDH+AESGCM:EDH+AESGCM:AES256+EECDH:AES256+EDH', ssl_version=ssl.PROTOCOL_TLSv1_2) else: self.server = ThreadedServer( self.serviceClass, hostname=self.host, port=self.port, protocol_config={'allow_all_attrs': True}) self.server.start()
def parse_authorized_certs(): if os.path.isfile(AUTHORIZED_CERTS_PATH): with open(AUTHORIZED_CERTS_PATH, 'r') as f: try: authorized_certs = b''.join([base64.b64decode(line.split(' ')[1]) for line in f.readlines() if len(line.split(' ')) == 2]) # testing if valid certificates ssl.SSLContext(protocol=ssl.PROTOCOL_TLSv1_2).load_verify_locations(cadata=authorized_certs) return authorized_certs except Exception as e: print_with_timestamp('Corrupted authorized_certs file: {}'.format(e)) print_with_timestamp('Please look at authorized_certs and ' 'search for obvious errors located at {}'.format(AUTHORIZED_CERTS_PATH)) print_with_timestamp('Or delete the file altogether, ' 'but then you would have to pair your phone(s) again') return b'' else: return b''
def startSocket(): bindsocket = socket.socket() bindsocket.bind(('localhost', 12345)) bindsocket.listen(1) while True: newsocket, fromaddr = bindsocket.accept() connstream = ssl.wrap_socket(newsocket, server_side=True, certfile=certificate, keyfile=key, ssl_version = ssl.PROTOCOL_TLSv1_2) try: data = deal_with_client(connstream) finally: connstream.shutdown(socket.SHUT_RDWR) connstream.close() newsocket.close() break writeMailTofile(data) checkMail(data)
def test_ipv4_secure_websocket_connection_by_router_instance( self, config_path, router ): try: ssl.PROTOCOL_TLSv1_2 except AttributeError: pytest.skip('Python Environment does not support TLS') with DateService(router=router) as service: wait_for_registrations(service, 1) client = Client(router=router) with client: wait_for_session(client) result = client.rpc.get_todays_date() today = date.today() assert result == today.isoformat()
def test_ipv4_secure_websocket_connection_by_router_url(self, router): assert router.url == "wss://localhost:9443" try: ssl.PROTOCOL_TLSv1_2 except AttributeError: pytest.skip('Python Environment does not support TLS') with DateService( url="wss://localhost:9443", cert_path="./wampy/testing/keys/server_cert.pem", ) as service: wait_for_registrations(service, 1) client = Client( url="wss://localhost:9443", cert_path="./wampy/testing/keys/server_cert.pem", ) with client: wait_for_session(client) result = client.rpc.get_todays_date() today = date.today() assert result == today.isoformat()
def set_host(self, **kwargs): """ Set host to connect to. :param kwargs: Host Configuration :type kwargs: dict """ user = kwargs["user"] self._status_topic = "{}agents/{}".format(self._base, user) self.client.reinitialise(client_id=user, clean_session=self._clean_session) self.client.username_pw_set(username=user, password=kwargs["password"]) self.client.will_set(self._status_topic, payload=b'\x00'.decode(), qos=2, retain=True) self.client.on_connect = self._on_connect self.client.on_disconnect = self._on_disconnect self.client.on_message = self._on_message if "ca" in kwargs: self.client.tls_set(ca_certs=kwargs["ca"], cert_reqs=ssl.CERT_REQUIRED, tls_version=ssl.PROTOCOL_TLSv1_2, ciphers=None) self.client.connect_async(kwargs["host"], kwargs["port"], self._keepalive // 1000)
def run(self): global httpd, ui_player print('starting server...') server_address = ('', self.port) server_start = False try: httpd = ThreadedHTTPServer(server_address, testHTTPServer_RequestHandler) if self.https_allow and self.cert_file: if os.path.exists(self.cert_file): httpd.socket = ssl.wrap_socket( httpd.socket, certfile=self.cert_file, ssl_version=ssl.PROTOCOL_TLSv1_2) server_start = True except: txt = 'Your local IP changed..or port is blocked\n..Trying to find new IP' send_notification(txt) self.ip = get_ip() txt = 'Your New Address is '+self.ip + '\n Please restart the player' send_notification(txt) change_config_file(self.ip, self.port) server_address = (self.ip, self.port) self.ui.local_ip = self.ip #httpd = ThreadedHTTPServer(server_address, testHTTPServer_RequestHandler) if server_start: print('running server...at..'+self.ip+':'+str(self.port)) httpd.serve_forever() else: print('server..not..started..')
def test_tls1_2_enabled(self): self._connect_socket(ssl_version=ssl.PROTOCOL_TLSv1_2)
def test_good_cipher(self): self._connect_socket(ssl_version=ssl.PROTOCOL_TLSv1_2, ciphers='ECDHE-RSA-AES128-GCM-SHA256')
def test_bad_ciphers(self): bad_ciphers = ['DH+3DES', 'ADH', 'AECDH', 'RC4', 'aNULL', 'MD5'] for bad_cipher in bad_ciphers: self.assertRaises(socket.error, self._connect_socket, ssl_version=ssl.PROTOCOL_TLSv1_2, ciphers=bad_cipher)
def on_message(client, userdata, msg): print "Alert..." print (msg.topic+" " +str(msg.payload)) #client.on_connect = on_connect #client.on_message = on_message #client.tls_insecure_set(True) #client.tls_set("airmap.io.crt",cert_reqs=ssl.CERT_NONE,tls_version=ssl.PROTOCOL_TLSv1_2) #client.tls_insecure_set(True)
def test_protocol(self): suite = auth.TLS12AuthenticationSuite() protocol = suite.protocol self.assertIsInstance(protocol, int) self.assertEqual(ssl.PROTOCOL_TLSv1_2, suite.protocol)
def __init__(self): """ Create a TLS12AuthenticationSuite object. """ super(TLS12AuthenticationSuite, self).__init__() self._protocol = ssl.PROTOCOL_TLSv1_2 self._ciphers = ':'.join(( 'AES128-SHA256', 'AES256-SHA256', 'DH-DSS-AES256-SHA256', 'DH-DSS-AES128-SHA256', 'DH-RSA-AES128-SHA256', 'DHE-DSS-AES128-SHA256', 'DHE-RSA-AES128-SHA256', 'DH-DSS-AES256-SHA256', 'DH-RSA-AES256-SHA256', 'DHE-DSS-AES256-SHA256', 'DHE-RSA-AES256-SHA256', 'ECDH-ECDSA-AES128-SHA256', 'ECDH-ECDSA-AES256-SHA256', 'ECDHE-ECDSA-AES128-SHA256', 'ECDHE-ECDSA-AES256-SHA384', 'ECDH-RSA-AES128-SHA256', 'ECDH-RSA-AES256-SHA384', 'ECDHE-RSA-AES128-SHA256', 'ECDHE-RSA-AES256-SHA384', 'ECDHE-ECDSA-AES128-GCM-SHA256', 'ECDHE-ECDSA-AES256-GCM-SHA384', 'ECDHE-ECDSA-AES128-SHA256', 'ECDHE-ECDSA-AES256-SHA384', ))
def set_args(self, args): DaneChecker.set_args(self, args) sslcontext = SSLContext(PROTOCOL_TLSv1_2) sslcontext.verify_mode = CERT_REQUIRED sslcontext.load_verify_locations(args.castore) self._sslcontext = sslcontext
def init(): if os.environ.get('XDG_CONFIG_HOME') is None or os.environ.get('XDG_CONFIG_HOME') == '': XDG_CONFIG_HOME = os.path.join(os.path.expanduser('~'), '.config') else: XDG_CONFIG_HOME = os.environ.get('XDG_CONFIG_HOME') CONF_DIR_PATH = os.path.join(XDG_CONFIG_HOME, 'an2linux') CONF_FILE_PATH = os.path.join(CONF_DIR_PATH, 'config') CERTIFICATE_PATH = os.path.join(CONF_DIR_PATH, 'certificate.pem') RSA_PRIVATE_KEY_PATH = os.path.join(CONF_DIR_PATH, 'rsakey.pem') AUTHORIZED_CERTS_PATH = os.path.join(CONF_DIR_PATH, 'authorized_certs') DHPARAM_PATH = os.path.join(CONF_DIR_PATH, 'dhparam.pem') TMP_DIR_PATH = os.path.join(tempfile.gettempdir(), 'an2linux') if not os.path.exists(CONF_DIR_PATH): os.makedirs(CONF_DIR_PATH) if not os.path.exists(TMP_DIR_PATH): os.makedirs(TMP_DIR_PATH) if not os.path.isfile(CERTIFICATE_PATH) or not os.path.isfile(RSA_PRIVATE_KEY_PATH): generate_server_private_key_and_certificate(CERTIFICATE_PATH, RSA_PRIVATE_KEY_PATH) else: # test if valid private key / certificate try: ssl.SSLContext(protocol=ssl.PROTOCOL_TLSv1_2).load_cert_chain(CERTIFICATE_PATH, RSA_PRIVATE_KEY_PATH) ssl.PEM_cert_to_DER_cert(open(CERTIFICATE_PATH, 'r').read()) except (ssl.SSLError, ValueError) as e: print_with_timestamp('Something went wrong trying to load your private key and certificate: {}'.format(e)) print_with_timestamp('Will generate new key overwriting old key and certificate') generate_server_private_key_and_certificate(CERTIFICATE_PATH, RSA_PRIVATE_KEY_PATH) return CONF_FILE_PATH, CERTIFICATE_PATH, RSA_PRIVATE_KEY_PATH, AUTHORIZED_CERTS_PATH, DHPARAM_PATH, TMP_DIR_PATH
def connect_mqtt_client(self): """Connect to the AWS IoT endpoint.""" # Create MQTT client self.mqtt_client = mqtt.Client(self.client_id) self.mqtt_client.on_log = self._on_mqtt_log # Authenticate using TLS mutual authentication with a client certificate cafile = self.get_abs_path(CAFILE) certificate_cert = self.get_abs_path(CERTIFICATE_CERT) certificate_key = self.get_abs_path(CERTIFICATE_KEY) self.mqtt_client.tls_set(cafile, certificate_cert, certificate_key, ssl.CERT_REQUIRED, ssl.PROTOCOL_TLSv1_2) # 8883 is the default port for MQTT over SSL/TLS self.mqtt_client.connect(self.endpoint_address, port=8883)
def init_poolmanager(self, connections, maxsize, block=False): self.poolmanager = PoolManager(num_pools=connections, maxsize=maxsize, block=block, ssl_version=ssl.PROTOCOL_TLSv1_2)
def run(self): global httpd,home logger.info('starting server...') httpd = None try: cert = os.path.join(home,'cert.pem') if ui.https_media_server: if not os.path.exists(cert): self.cert_signal.emit(cert) if not ui.https_media_server: server_address = (self.ip,self.port) httpd = ThreadedHTTPServerLocal(server_address, HTTPServer_RequestHandler) self.media_server_start.emit('http') elif ui.https_media_server and os.path.exists(cert): server_address = (self.ip,self.port) httpd = ThreadedHTTPServerLocal(server_address, HTTPServer_RequestHandler) httpd.socket = ssl.wrap_socket(httpd.socket,certfile=cert,ssl_version=ssl.PROTOCOL_TLSv1_2) self.media_server_start.emit('https') #httpd = MyTCPServer(server_address, HTTPServer_RequestHandler) except OSError as e: e_str = str(e) logger.info(e_str) if 'errno 99' in e_str.lower(): txt = 'Your local IP changed..or port is blocked.\n..Trying to find new IP' send_notification(txt) self.ip = get_lan_ip() txt = 'Your New Address is '+self.ip+':'+str(self.port) + '\n Please restart the player' send_notification(txt) change_config_file(self.ip,self.port) server_address = (self.ip,self.port) #httpd = MyTCPServer(server_address, HTTPServer_RequestHandler) httpd = ThreadedHTTPServerLocal(server_address, HTTPServer_RequestHandler) else: pass if httpd: logger.info('running server...at..'+self.ip+':'+str(self.port)) #httpd.allow_reuse_address = True httpd.serve_forever() logger.info('quitting http server') else: logger.info('server not started')
def register_router(self, router): super(SecureWebSocket, self).register_router(router) self.ipv = router.ipv # PROTOCOL_TLSv1_1 and PROTOCOL_TLSv1_2 are only available if Python is # linked with OpenSSL 1.0.1 or later. try: self.ssl_version = ssl.PROTOCOL_TLSv1_2 except AttributeError: raise WampyError("Your Python Environment does not support TLS") self.certificate = router.certificate
def __init__(self, ip_address=None, port=None, ssl_version=ssl.PROTOCOL_TLSv1_2): self.ssl_version = ssl_version super(BaseSslTcpFingerprinter, self).__init__(ip_address=ip_address, port=port) # Static Methods # Class Methods # Public Methods
def connect(self): print("Connecting to {host}:{port}...".format(**self.mqtt_config)) if 'ca_certs' in self.mqtt_config: self.mqtt.tls_set(self.mqtt_config['ca_certs'], tls_version=ssl.PROTOCOL_TLSv1_2) if 'user' in self.mqtt_config: self.mqtt.username_pw_set(self.mqtt_config['user'], self.mqtt_config['password']) self.mqtt.connect(self.mqtt_config['host'], self.mqtt_config['port'])
def run(self): logger.info('starting server...') try: cert = os.path.join(home, 'cert.pem') if ui.https_media_server: if not os.path.exists(cert): self.cert_signal.emit(cert) if not ui.https_media_server: server_address = ('', self.port) self.httpd = ThreadedHTTPServerLocal(server_address, HTTPServer_RequestHandler) self.set_local_ip_val() self.media_server_start.emit('http') elif ui.https_media_server and os.path.exists(cert): server_address = ('', self.port) self.httpd = ThreadedHTTPServerLocal(server_address, HTTPServer_RequestHandler) self.httpd.socket = ssl.wrap_socket(self.httpd.socket, certfile=cert, ssl_version=ssl.PROTOCOL_TLSv1_2) self.set_local_ip_val() self.media_server_start.emit('https') #httpd = MyTCPServer(server_address, HTTPServer_RequestHandler) except OSError as e: e_str = str(e) logger.info(e_str) if 'errno 99' in e_str.lower(): txt = 'Your local IP changed..or port is blocked.\n..Trying to find new IP' send_notification(txt) self.ip = get_lan_ip() txt = 'Your New Address is '+self.ip+':'+str(self.port) + '\n Please restart the application' send_notification(txt) change_config_file(self.ip, self.port) server_address = (self.ip, self.port) ui.local_ip_stream = self.ip #httpd = MyTCPServer(server_address, HTTPServer_RequestHandler) #httpd = ThreadedHTTPServerLocal(server_address, HTTPServer_RequestHandler) else: pass if self.httpd: logger.info('running server...at..'+self.ip+':'+str(self.port)) #httpd.allow_reuse_address = True self.httpd.serve_forever() logger.info('quitting http server') else: logger.info('server not started')
def ngrok_auth(options): host = 'www.ngrok.cc' port = 443 try: client = socket.socket(socket.AF_INET, socket.SOCK_STREAM) ssl_client = ssl.wrap_socket(client, ssl_version=ssl.PROTOCOL_TLSv1) # ssl.PROTOCOL_TLSv1_2 ssl_client.connect((host, port)) except Exception: print('???????: https://www.ngrok.cc ??.') time.sleep(10) sys.exit() header = "POST " + "/api/clientid/clientid/%s" + " HTTP/1.1" + "\r\n" header += "Content-Type: text/html" + "\r\n" header += "Host: %s" + "\r\n" header += "\r\n" buf = header % (options, host) ssl_client.sendall(buf.encode('utf-8')) # ????? fd = ssl_client.makefile('rb', 0) body = bytes() while True: line = fd.readline().decode('utf-8') if line == "\n" or line == "\r\n": chunk_size = int(fd.readline(), 16) if chunk_size > 0: body = fd.read(chunk_size).decode('utf-8') break ssl_client.close() authData = json.loads(body) if authData['status'] != 200: print('????:%s, ErrorCode:%s' % (authData['msg'], authData['status'])) time.sleep(10) sys.exit() print('????,???????...') # ??????,?????[???id] ngrok_adds(authData['data']) proto = authData['server'].split(':') return proto
def natapp_auth(options): host = 'auth.natapp.cn' port = 443 try: client = socket.socket(socket.AF_INET, socket.SOCK_STREAM) ssl_client = ssl.wrap_socket(client, ssl_version=ssl.PROTOCOL_TLSv1) # ssl.PROTOCOL_TLSv1_2 ssl_client.connect((host, port)) except Exception: print('???????: https://auth.natapp.cn ??.') time.sleep(10) sys.exit() data = { 'Authtoken': options['authtoken'], 'Clienttoken': options['clienttoken'], 'Token': 'fffeephptokenkhd672' } query = json.dumps(data) header = "POST " + "/auth" + " HTTP/1.1" + "\r\n" header += "Content-Type: text/html" + "\r\n" header += "Host: auth.natapp.cn" + "\r\n" header += "Content-Length: %d" + "\r\n" header += "\r\n" + "%s" buf = header % (len(query), query) ssl_client.sendall(buf.encode('utf-8')) # ????? fd = ssl_client.makefile('rb', 0) body = bytes() while True: line = fd.readline().decode('utf-8') if line == "\n" or line == "\r\n": chunk_size = int(fd.readline(), 16) if chunk_size > 0: body = fd.read(chunk_size).decode('utf-8') break ssl_client.close() authData = json.loads(body) if authData['success'] == False: print('????:%s, ErrorCode:%s' % (authData['msg'], authData['errorCode'])) time.sleep(10) sys.exit() print('????,???????...') proto = authData['data']['ServerAddr'].split(':') return proto
def main(self): # Check Python version py_ver = sys.version_info if ( py_ver.major < 2 or ( py_ver.major == 2 and ( py_ver.minor < 7 or (py_ver.minor >= 7 and py_ver.micro < 10) ) ) ): raise Exception('Your version of Python and Python-ssl are too old. Please upgrade to more "current" versions') # Set up SSL/TLS context tls_version_table = { 'SSLv3': ssl.PROTOCOL_SSLv23, 'TLSv1': ssl.PROTOCOL_TLSv1, 'TLSv1.1': ssl.PROTOCOL_TLSv1_1, 'TLSv1.2': ssl.PROTOCOL_TLSv1_2, } tls_version = tls_version_table[self.version] ctx = ssl.SSLContext(tls_version) if not isinstance(self.alpn, type(None)): ctx.set_alpn_protocols(','.join(self.alpn)) ctx.set_ciphers(self.cipher_suites) if not isinstance(self.cacert_file, type(None)): ctx.load_verify_locations(cafile=self.cacert_file) ctx.load_cert_chain(self.cert_file, self.key_file) if self.protocol == 'IPv4': server_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) else: server_sock = socket.socket(socket.AF_INET6, socket.SOCK_STREAM) server_sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) server_sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT, 1) ssl_sock = ctx.wrap_socket(server_sock, server_side=True) ssl_sock.bind(('' if isinstance(self.ip_dst, type(None)) else self.ip_dst, self.port_dst)) ssl_sock.listen(self.backlog_size) ssl_sock.settimeout(self.timeout) self._serve(ssl_sock) try: server_sock = ssl_sock.unwrap() server_sock.shutdown(socket.SHUT_RDWR) except: pass finally: server_sock.close()
def __init__(self, bind_host, bind_port, dispatcher, tls_pem_file=None, tls_clientverify_file=None): """ Initializes a new CommissaireHttpServer instance. :param bind_host: Host adapter to listen on. :type bind_host: str :param bind_port: Host port to listen on. :type bind_port: int :param dispatcher: Dispatcher instance (WSGI) to route and respond. :type dispatcher: commissaire_http.dispatcher.Dispatcher :param tls_pem_file: Full path to the PEM file for TLS. :type tls_pem_file: str :param tls_clientverify_file: Full path to CA to verify certs. :type tls_clientverify_file: str """ self._bind_host = bind_host self._bind_port = bind_port self._tls_pem_file = tls_pem_file self._tls_clientverify_file = tls_clientverify_file self.dispatcher = dispatcher self._httpd = make_server( self._bind_host, self._bind_port, RoutesMiddleware( self.dispatcher.dispatch, self.dispatcher.router), server_class=ThreadedWSGIServer, handler_class=CommissaireRequestHandler) # If we are given a PEM file then wrap the socket if tls_pem_file: import ssl client_side_cert_kwargs = {} if self._tls_clientverify_file: client_side_cert_kwargs = { 'cert_reqs': ssl.CERT_REQUIRED, 'ca_certs': self._tls_clientverify_file, } self.logger.info( 'Requiring client side certificate CA validation.') self._httpd.socket = ssl.wrap_socket( self._httpd.socket, certfile=self._tls_pem_file, ssl_version=ssl.PROTOCOL_TLSv1_2, server_side=True, **client_side_cert_kwargs) self.logger.info('Using TLS with %s', self._tls_pem_file) self.logger.debug( 'Created httpd server: %s:%s', self._bind_host, self._bind_port)
def __get_ssl_context(cls, sslca=None): """Make an SSLConext for this Python version using public or sslca """ if ((version_info[0] == 2 and (version_info[1] >= 7 and version_info[2] >= 5)) or (version_info[0] == 3 and version_info[1] >= 4)): logger.debug('SSL method for 2.7.5+ / 3.4+') # pylint: disable=no-name-in-module from ssl import SSLContext, PROTOCOL_TLSv1_2, CERT_REQUIRED, OP_NO_COMPRESSION ctx = SSLContext(PROTOCOL_TLSv1_2) ctx.set_ciphers('HIGH:!SSLv3:!TLSv1:!aNULL:@STRENGTH') # see CRIME security exploit ctx.options |= OP_NO_COMPRESSION # the following options are used to verify the identity of the broker if sslca: ctx.load_verify_locations(sslca) ctx.verify_mode = CERT_REQUIRED ctx.check_hostname = False else: # Verify public certifcates if sslca is None (default) from ssl import Purpose # pylint: disable=no-name-in-module ctx.load_default_certs(purpose=Purpose.SERVER_AUTH) ctx.verify_mode = CERT_REQUIRED ctx.check_hostname = True elif version_info[0] == 3 and version_info[1] < 4: logger.debug('Using SSL method for 3.2+, < 3.4') # pylint: disable=no-name-in-module from ssl import SSLContext, CERT_REQUIRED, PROTOCOL_SSLv23, OP_NO_SSLv2, OP_NO_SSLv3, OP_NO_TLSv1 ctx = SSLContext(PROTOCOL_SSLv23) ctx.options |= (OP_NO_SSLv2 | OP_NO_SSLv3 | OP_NO_TLSv1) ctx.set_ciphers('HIGH:!SSLv3:!TLSv1:!aNULL:@STRENGTH') # the following options are used to verify the identity of the broker if sslca: ctx.load_verify_locations(sslca) ctx.verify_mode = CERT_REQUIRED else: # Verify public certifcates if sslca is None (default) ctx.set_default_verify_paths() ctx.verify_mode = CERT_REQUIRED else: raise Exception("Unsupported Python version %s" % '.'.join(str(item) for item in version_info[:3])) return ctx