我们从Python开源项目中,提取了以下24个代码示例,用于说明如何使用ssl.MemoryBIO()。
def _do_ssl_handshake(self): incoming = ssl.MemoryBIO() outgoing = ssl.MemoryBIO() sslobj = ssl.SSLContext().wrap_bio(incoming, outgoing, False) # do_handshake() while True: try: sslobj.do_handshake() except ssl.SSLWantReadError: self._send_message(TDS_PRELOGIN, outgoing.read()) tag, _, _, buf = self._read_response_packet() assert tag == TDS_PRELOGIN incoming.write(buf) else: break return sslobj, incoming, outgoing
def __init__(self, context, server_side, server_hostname=None): """ The *context* argument specifies the ssl.SSLContext to use. The *server_side* argument indicates whether this is a server side or client side transport. The optional *server_hostname* argument can be used to specify the hostname you are connecting to. You may only specify this parameter if the _ssl module supports Server Name Indication (SNI). """ self._context = context self._server_side = server_side self._server_hostname = server_hostname self._state = _UNWRAPPED self._incoming = ssl.MemoryBIO() self._outgoing = ssl.MemoryBIO() self._sslobj = None self._need_ssldata = False self._handshake_cb = None self._shutdown_cb = None
def __init__(self): """ initialize a new storage queue """ self.memorybio = ssl.MemoryBIO()
def _is_sslproto_available(): return hasattr(ssl, "MemoryBIO") # States of an _SSLPipe.
def _make_legacy_ssl_transport(self, rawsock, protocol, sslcontext, waiter, *, server_side=False, server_hostname=None, extra=None, server=None): # Use the legacy API: SSL_write, SSL_read, etc. The legacy API is used # on Python 3.4 and older, when ssl.MemoryBIO is not available. return _SelectorSslTransport( self, rawsock, protocol, sslcontext, waiter, server_side, server_hostname, extra, server)
def test_create_ssl_connection(self): raise unittest.SkipTest("need python 3.5 (ssl.MemoryBIO)")
def test_create_server_ssl(self): raise unittest.SkipTest("need python 3.5 (ssl.MemoryBIO)")
def test_create_server_ssl_match_failed(self): raise unittest.SkipTest("need python 3.5 (ssl.MemoryBIO)")
def test_create_server_ssl_verified(self): raise unittest.SkipTest("need python 3.5 (ssl.MemoryBIO)")
def __init__(self, ctx, sock, **kwargs): self.incoming = ssl.MemoryBIO() self.outgoing = ssl.MemoryBIO() self.obj = ctx.wrap_bio(self.incoming, self.outgoing, **kwargs) self.sock = sock
def __init__( self, transport_stream, ssl_context, *, server_hostname=None, server_side=False, https_compatible=False, max_refill_bytes=_default_max_refill_bytes ): self.transport_stream = transport_stream self._state = _State.OK self._max_refill_bytes = max_refill_bytes self._https_compatible = https_compatible self._outgoing = _stdlib_ssl.MemoryBIO() self._incoming = _stdlib_ssl.MemoryBIO() self._ssl_object = ssl_context.wrap_bio( self._incoming, self._outgoing, server_side=server_side, server_hostname=server_hostname ) # Tracks whether we've already done the initial handshake self._handshook = _Once(self._do_handshake) # These are used to synchronize access to self.transport_stream self._inner_send_lock = _sync.StrictFIFOLock() self._inner_recv_count = 0 self._inner_recv_lock = _sync.Lock() # These are used to make sure that our caller doesn't attempt to make # multiple concurrent calls to send_all/wait_send_all_might_not_block # or to receive_some. self._outer_send_conflict_detector = ConflictDetector( "another task is currently sending data on this SSLStream" ) self._outer_recv_conflict_detector = ConflictDetector( "another task is currently receiving data on this SSLStream" )
def init_tls(self): self.log.debug("initializing TLS context...") self.tls_ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1) self.tls_in = ssl.MemoryBIO() self.tls_out = ssl.MemoryBIO() self.tls = self.tls_ctx.wrap_bio(self.tls_in, self.tls_out, False, None)
def first_test(): sock = socket.create_connection(('google.com', 443)) incoming = ssl.MemoryBIO() outgoing = ssl.MemoryBIO() ctx = ssl.create_default_context() ssl_obj = ctx.wrap_bio(incoming, outgoing) try: ssl_obj.write(b'') except ssl.SSLWantReadError as err: print("err", err) print('ssl_obj', ssl_obj.pending()) print('incoming', incoming.pending) print('outgoing', outgoing.pending) # print() # print(outgoing.read()) data = outgoing.read() # print("data", data) sock.send(data) print('ssl_obj', ssl_obj.pending()) print('incoming', incoming.pending) print('outgoing', outgoing.pending) got = sock.recv(10240) print('sock.recv got', len(got)) incoming.write(got) # print(incoming.read()) # print(ssl_obj.read()) # print('sock.recv', got) try: # ssl_obj.write(got) ssl_obj.do_handshake() except ssl.SSLWantReadError as err: print("err", err) print('ssl_obj', ssl_obj.pending()) print('incoming', incoming.pending) print('outgoing', outgoing.pending)
def handle_notification_connection(self): self.incoming = ssl.MemoryBIO() self.outgoing = ssl.MemoryBIO() try: if bluetooth_support_kitkat: notif_tls_ctx_kitkat_bt.load_verify_locations(cadata=parse_authorized_certs()) self.tls_bio = notif_tls_ctx_kitkat_bt.wrap_bio(incoming=self.incoming, outgoing=self.outgoing, server_side=True) else: notif_tls_ctx.load_verify_locations(cadata=parse_authorized_certs()) self.tls_bio = notif_tls_ctx.wrap_bio(incoming=self.incoming, outgoing=self.outgoing, server_side=True) self.do_handshake() except Exception as e: print_with_timestamp('(Bluetooth) Failed TLS handshake notif_conn: {}'.format(e)) return # one recv should not take longer than 10 sec self.socket.settimeout(10) notification_flags_size = struct.unpack('>I', recvall(self.socket, 4))[0] notification_flags_encrypted = recvall(self.socket, notification_flags_size) notification_flags = struct.unpack('>B', self.tls_decrypt(notification_flags_encrypted))[0] include_title = chkflags(notification_flags, FLAG_INCLUDE_TITLE) include_message = chkflags(notification_flags, FLAG_INCLUDE_MESSAGE) include_icon = chkflags(notification_flags, FLAG_INCLUDE_ICON) title = '' message = '' if include_title or include_message: title_and_or_message_size = struct.unpack('>I', recvall(self.socket, 4))[0] title_and_or_message_encrypted = recvall(self.socket, title_and_or_message_size) title_and_or_message = self.tls_decrypt(title_and_or_message_encrypted).decode() if include_title: title = title_and_or_message.split('|||')[0] if include_message: message = title_and_or_message.split('|||')[1] if include_icon: icon_tmp_file = tempfile.NamedTemporaryFile(buffering=0, dir=TMP_DIR_PATH) icon_size = struct.unpack('>I', recvall(self.socket, 4))[0] icon_encrypted = recvall(self.socket, icon_size) icon = self.tls_decrypt(icon_encrypted) try: icon_tmp_file.write(icon) Notification(title, message, hashlib.sha256(title.encode() + message.encode() + icon).digest(), icon_tmp_file).show() except Exception: Notification(title, message, hashlib.sha256(title.encode() + message.encode()).digest()).show() else: Notification(title, message, hashlib.sha256(title.encode() + message.encode()).digest()).show()