我们从Python开源项目中,提取了以下13个代码示例,用于说明如何使用ssl.html()。
def close(self, code=1000, reason=''): """ Call this method to initiate the websocket connection closing by sending a close frame to the connected peer. The ``code`` is the status code representing the termination's reason. Once this method is called, the ``server_terminated`` attribute is set. Calling this method several times is safe as the closing frame will be sent only the first time. .. seealso:: Defined Status Codes http://tools.ietf.org/html/rfc6455#section-7.4.1 """ if not self.server_terminated: self.server_terminated = True try: self._write(self.stream.close(code=code, reason=reason).single(mask=self.stream.always_mask)) except Exception as ex: logger.error("Error when terminating the connection: %s", str(ex))
def get_ssl_certificate(self, binary_form=False): """Returns the client's SSL certificate, if any. To use client certificates, the HTTPServer must have been constructed with cert_reqs set in ssl_options, e.g.:: server = HTTPServer(app, ssl_options=dict( certfile="foo.crt", keyfile="foo.key", cert_reqs=ssl.CERT_REQUIRED, ca_certs="cacert.crt")) By default, the return value is a dictionary (or None, if no client certificate is present). If ``binary_form`` is true, a DER-encoded form of the certificate is returned instead. See SSLSocket.getpeercert() in the standard library for more details. http://docs.python.org/library/ssl.html#sslsocket-objects """ try: return self.connection.stream.socket.getpeercert( binary_form=binary_form) except ssl.SSLError: return None
def __init__(self, max_window=4, timeout=8, proxy='', ssl_ciphers=None, max_retry=2): # http://docs.python.org/dev/library/ssl.html # http://blog.ivanristic.com/2009/07/examples-of-the-information-collected-from-ssl-handshakes.html # http://src.chromium.org/svn/trunk/src/net/third_party/nss/ssl/sslenum.c # http://www.openssl.org/docs/apps/ciphers.html # openssl s_server -accept 443 -key CA.crt -cert CA.crt # set_ciphers as Modern Browsers BaseHTTPUtil.__init__(self, GC.LINK_OPENSSL, os.path.join(cert_dir, 'cacerts'), ssl_ciphers) self.max_window = max_window self.max_retry = max_retry self.timeout = timeout self.proxy = proxy self.tcp_connection_time = LRUCache(512 if self.gws else 4096) self.ssl_connection_time = LRUCache(512 if self.gws else 4096) if self.gws and GC.GAE_ENABLEPROXY: self.gws_front_connection_time = LRUCache(128) self.gws_front_connection_time.set('ip', LRUCache(128), noexpire=True) self.create_ssl_connection = self.create_gws_connection_withproxy #if self.proxy: # dns_resolve = self.__dns_resolve_withproxy # self.create_connection = self.__create_connection_withproxy # self.create_ssl_connection = self.__create_ssl_connection_withproxy
def set_cert_credentials_provider(self, cert_credentials_provider): # History issue from Yun SDK where AR9331 embedded Linux only have Python 2.7.3 # pre-installed. In this version, TLSv1_2 is not even an option. # SSLv23 is a work-around which selects the highest TLS version between the client # and service. If user installs opensslv1.0.1+, this option will work fine for Mutual # Auth. # Note that we cannot force TLSv1.2 for Mutual Auth. in Python 2.7.3 and TLS support # in Python only starts from Python2.7. # See also: https://docs.python.org/2/library/ssl.html#ssl.PROTOCOL_SSLv23 if self._use_wss: ca_path = cert_credentials_provider.get_ca_path() self._paho_client.tls_set(ca_certs=ca_path, cert_reqs=ssl.CERT_REQUIRED, tls_version=ssl.PROTOCOL_SSLv23) else: ca_path = cert_credentials_provider.get_ca_path() cert_path = cert_credentials_provider.get_cert_path() key_path = cert_credentials_provider.get_key_path() self._paho_client.tls_set(ca_certs=ca_path,certfile=cert_path, keyfile=key_path, cert_reqs=ssl.CERT_REQUIRED, tls_version=ssl.PROTOCOL_SSLv23)
def index(): ''' Index page for listing all possibilities ''' htmlpage = ''' <html> <HEAD> <link rel="stylesheet" href="https://maxcdn.bootstrapcdn.com/bootstrap/3.3.7/css/bootstrap.min.css" integrity="sha384-BVYiiSIFeK1dGmJRAkycuHAHRg32OmUcww7on3RYdg4Va+PmSTsz/K68vbdEjh4u" crossorigin="anonymous"> </HEAD> <BODY> <div class='container'> <h1> Welcome to flask tests, here are links to test func calls: </h1> <p> <a href='https://"+lhost+":"+str(lport)+"/listCert'> List of certificate in the pki database </a> </p> <p> <a href='https://"+lhost+":"+str(lport)+"/help'> PKI documentation</a> </p> </div> </BODY> </html> ''' # allow us to modify headers resp = make_response(htmlpage) resp.headers['Server'] = 'PyKI amaibach API' return(resp)
def help(): ''' Documentation page ''' info = open(curScriptDir + "/help.html", 'rt') helpContent = info.read() info.close() # allow us to modify headers resp = make_response(helpContent) resp.headers['Server'] = 'PyKI amaibach API' return(resp)
def closed(self, code, reason=None): """ Called when the websocket stream and connection are finally closed. The provided ``code`` is status set by the other point and ``reason`` is a human readable message. .. seealso:: Defined Status Codes http://tools.ietf.org/html/rfc6455#section-7.4.1 """ pass
def get_cluster_version(dcos_url): """Given a cluster url, return its version string, such as 1.7, 1.8, 1.8.8, 1.9, 1.9-dev, etc""" version_url = "%s/%s" % (dcos_url, "dcos-metadata/dcos-version.json") # Since our test clusters have weird certs that won't validate, turn off # validation try: # this will handle a variety of versions, including some 2.7.x # (haven't investigated which) and all recent 3.x noverify_context = ssl.SSLContext() except: if hasattr(ssl, 'PROTOCOL_TLS'): # 2.7.13 noverify_context = ssl.SSLContext(ssl.PROTOCOL_TLS) elif hasattr(ssl, 'OP_NO_SSLv2'): # 2.7.9-2.7.12 logger.warn("Old python; Asking for something better than sslv2 or 3") noverify_context = ssl.SSLContext(ssl.PROTOCOL_SSLv23) # explicitly switch these off. This is recommended for ancient # versions as per https://docs.python.org/2/library/ssl.html#protocol-versions noverify_context.options |= ssl.OP_NO_SSLv2 noverify_context.options |= ssl.OP_NO_SSLv3 else: # before 2.7.9 logger.error("*VERY* old python. Very weak/unsafe encryption ahoy.") noverify_context = ssl.SSLContext(ssl.PROTOCOL_SSLv23) response = urllib.request.urlopen(version_url, context=noverify_context) encoding = 'utf-8' # default try: #python3 provided_encoding = response.headers.get_content_charset() if provided_encoding: encoding = provided_encoding except: pass json_s = response.read().decode(encoding) ver_s = json.loads(json_s)['version'] return ver_s
def create_ssl_context(**kwargs): """ A helper function around creating an SSL context https://docs.python.org/3/library/ssl.html#context-creation Accepts kwargs in the same manner as `create_default_context`. """ ctx = ssl.create_default_context(**kwargs) return ctx
def connect(self, keepAliveInterval=30): if keepAliveInterval is None : self._log.error("connect: None type inputs detected.") raise TypeError("None type inputs detected.") if not isinstance(keepAliveInterval, int): self._log.error("connect: Wrong input type detected. KeepAliveInterval must be an integer.") raise TypeError("Non-integer type inputs detected.") # Return connect succeeded/failed ret = False # TLS configuration if self._useWebsocket: # History issue from Yun SDK where AR9331 embedded Linux only have Python 2.7.3 # pre-installed. In this version, TLSv1_2 is not even an option. # SSLv23 is a work-around which selects the highest TLS version between the client # and service. If user installs opensslv1.0.1+, this option will work fine for Mutal # Auth. # Note that we cannot force TLSv1.2 for Mutual Auth. in Python 2.7.3 and TLS support # in Python only starts from Python2.7. # See also: https://docs.python.org/2/library/ssl.html#ssl.PROTOCOL_SSLv23 self._pahoClient.tls_set(ca_certs=self._cafile, cert_reqs=ssl.CERT_REQUIRED, tls_version=ssl.PROTOCOL_SSLv23) self._log.info("Connection type: Websocket") else: self._pahoClient.tls_set(self._cafile, self._cert, self._key, ssl.CERT_REQUIRED, ssl.PROTOCOL_SSLv23) # Throw exception... self._log.info("Connection type: TLSv1.2 Mutual Authentication") # Connect self._pahoClient.connect(self._host, self._port, keepAliveInterval) # Throw exception... self._pahoClient.loop_start() TenmsCount = 0 while(TenmsCount != self._connectdisconnectTimeout * 100 and self._connectResultCode == sys.maxsize): TenmsCount += 1 time.sleep(0.01) if(self._connectResultCode == sys.maxsize): self._log.error("Connect timeout.") self._pahoClient.loop_stop() raise connectTimeoutException() elif(self._connectResultCode == 0): ret = True self._log.info("Connected to AWS IoT.") self._log.debug("Connect time consumption: " + str(float(TenmsCount) * 10) + "ms.") else: self._log.error("A connect error happened: " + str(self._connectResultCode)) self._pahoClient.loop_stop() raise connectError(self._connectResultCode) return ret
def _get_from_pending(self): """ The SSL socket object provides the same interface as the socket interface but behaves differently. When data is sent over a SSL connection more data may be read than was requested from by the ws4py websocket object. In that case, the data may have been indeed read from the underlying real socket, but not read by the application which will expect another trigger from the manager's polling mechanism as if more data was still on the wire. This will happen only when new data is sent by the other peer which means there will be some delay before the initial read data is handled by the application. Due to this, we have to rely on a non-public method to query the internal SSL socket buffer if it has indeed more data pending in its buffer. Now, some people in the Python community `discourage <https://bugs.python.org/issue21430>`_ this usage of the ``pending()`` method because it's not the right way of dealing with such use case. They advise `this approach <https://docs.python.org/dev/library/ssl.html#notes-on-non-blocking-sockets>`_ instead. Unfortunately, this applies only if the application can directly control the poller which is not the case with the WebSocket abstraction here. We therefore rely on this `technic <http://stackoverflow.com/questions/3187565/select-and-ssl-in-python>`_ which seems to be valid anyway. This is a bit of a shame because we have to process more data than what wanted initially. """ data = b"" pending = self.sock.pending() while pending: data += self.sock.recv(pending) pending = self.sock.pending() return data