我们从Python开源项目中,提取了以下3个代码示例,用于说明如何使用ssl.get_default_verify_paths()。
def __connect(self, wsURL): '''Connect to the websocket in a thread.''' self.logger.debug("Starting thread") ssl_defaults = ssl.get_default_verify_paths() sslopt_ca_certs = {'ca_certs': ssl_defaults.cafile} self.ws = websocket.WebSocketApp(wsURL, on_message=self.__on_message, on_close=self.__on_close, on_open=self.__on_open, on_error=self.__on_error, header=self.__get_auth() ) setup_custom_logger('websocket', log_level=settings.LOG_LEVEL) self.wst = threading.Thread(target=lambda: self.ws.run_forever(sslopt=sslopt_ca_certs)) self.wst.daemon = True self.wst.start() self.logger.info("Started thread") # Wait for connect before continuing conn_timeout = 5 while (not self.ws.sock or not self.ws.sock.connected) and conn_timeout and not self._error: sleep(1) conn_timeout -= 1 if not conn_timeout or self._error: self.logger.error("Couldn't connect to WS! Exiting.") self.exit() sys.exit(1)
def connect(self): screepsConnection = API(u=self.user,p=self.password,ptr=self.ptr,host=self.host,secure=self.secure) me = screepsConnection.me() self.user_id = me['_id'] self.token = screepsConnection.token if self.logging: logging.getLogger('websocket').addHandler(logging.StreamHandler()) websocket.enableTrace(True) else: logging.getLogger('websocket').addHandler(logging.NullHandler()) websocket.enableTrace(False) if self.host: url = 'wss://' if self.secure else 'ws://' url += self.host + '/socket/websocket' elif not self.ptr: url = 'wss://screeps.com/socket/websocket' else: url = 'wss://screeps.com/ptr/socket/websocket' self.ws = websocket.WebSocketApp(url=url, on_message=self.on_message, on_error=self.on_error, on_close=self.on_close, on_open=self.on_open) ssl_defaults = ssl.get_default_verify_paths() sslopt_ca_certs = {'ca_certs': ssl_defaults.cafile} if 'http_proxy' in self.settings and self.settings['http_proxy'] is not None: http_proxy_port = self.settings['http_proxy_port'] if 'http_proxy_port' in self.settings else 8080 self.ws.run_forever(http_proxy_host=self.settings['http_proxy'], http_proxy_port=http_proxy_port, ping_interval=1, sslopt=sslopt_ca_certs) else: self.ws.run_forever(ping_interval=1, sslopt=sslopt_ca_certs)
def _configure_ssl(self, properties): if (not properties or not self._SSL_PROPS.intersection(set(iter(properties)))): return None mode = proton.SSLDomain.MODE_CLIENT if properties.get('x-ssl-server', properties.get('x-server')): mode = proton.SSLDomain.MODE_SERVER identity = properties.get('x-ssl-identity') ca_file = properties.get('x-ssl-ca-file') if (not ca_file and properties.get('x-ssl') and hasattr(ssl, 'get_default_verify_paths')): ca_file = ssl.get_default_verify_paths().cafile hostname = properties.get('x-ssl-peer-name', properties.get('hostname')) # default to most secure level of certificate validation if not ca_file: vdefault = 'no-verify' elif not hostname: vdefault = 'verify-cert' else: vdefault = 'verify-peer' vmode = properties.get('x-ssl-verify-mode', vdefault) try: vmode = self._VERIFY_MODES[vmode] except KeyError: raise proton.SSLException("bad value for x-ssl-verify-mode: '%s'" % vmode) if vmode == proton.SSLDomain.VERIFY_PEER_NAME: if not hostname or not ca_file: raise proton.SSLException("verify-peer needs x-ssl-peer-name" " and x-ssl-ca-file") elif vmode == proton.SSLDomain.VERIFY_PEER: if not ca_file: raise proton.SSLException("verify-cert needs x-ssl-ca-file") # This will throw proton.SSLUnavailable if SSL support is not installed domain = proton.SSLDomain(mode) if identity: # our identity: domain.set_credentials(identity[0], identity[1], identity[2]) if ca_file: # how we verify peers: domain.set_trusted_ca_db(ca_file) domain.set_peer_authentication(vmode, ca_file) if mode == proton.SSLDomain.MODE_SERVER: if properties.get('x-ssl-allow-cleartext'): domain.allow_unsecured_client() pn_ssl = proton.SSL(self._pn_transport, domain) if hostname: pn_ssl.peer_hostname = hostname LOG.debug("SSL configured for connection %s", self._name) return pn_ssl