Python ssl 模块,MemoryBIO() 实例源码

我们从Python开源项目中,提取了以下24个代码示例,用于说明如何使用ssl.MemoryBIO()

项目:minitds    作者:nakagami    | 项目源码 | 文件源码
def _do_ssl_handshake(self):
        incoming = ssl.MemoryBIO()
        outgoing = ssl.MemoryBIO()
        sslobj = ssl.SSLContext().wrap_bio(incoming, outgoing, False)

        # do_handshake()
        while True:
            try:
                sslobj.do_handshake()
            except ssl.SSLWantReadError:
                self._send_message(TDS_PRELOGIN, outgoing.read())
                tag, _, _, buf = self._read_response_packet()
                assert tag == TDS_PRELOGIN
                incoming.write(buf)
            else:
                break

        return sslobj, incoming, outgoing
项目:annotated-py-asyncio    作者:hhstore    | 项目源码 | 文件源码
def __init__(self, context, server_side, server_hostname=None):
        """
        The *context* argument specifies the ssl.SSLContext to use.

        The *server_side* argument indicates whether this is a server side or
        client side transport.

        The optional *server_hostname* argument can be used to specify the
        hostname you are connecting to. You may only specify this parameter if
        the _ssl module supports Server Name Indication (SNI).
        """
        self._context = context
        self._server_side = server_side
        self._server_hostname = server_hostname
        self._state = _UNWRAPPED
        self._incoming = ssl.MemoryBIO()
        self._outgoing = ssl.MemoryBIO()
        self._sslobj = None
        self._need_ssldata = False
        self._handshake_cb = None
        self._shutdown_cb = None
项目:golightan    作者:shirou    | 项目源码 | 文件源码
def __init__(self, context, server_side, server_hostname=None):
        """
        The *context* argument specifies the ssl.SSLContext to use.

        The *server_side* argument indicates whether this is a server side or
        client side transport.

        The optional *server_hostname* argument can be used to specify the
        hostname you are connecting to. You may only specify this parameter if
        the _ssl module supports Server Name Indication (SNI).
        """
        self._context = context
        self._server_side = server_side
        self._server_hostname = server_hostname
        self._state = _UNWRAPPED
        self._incoming = ssl.MemoryBIO()
        self._outgoing = ssl.MemoryBIO()
        self._sslobj = None
        self._need_ssldata = False
        self._handshake_cb = None
        self._shutdown_cb = None
项目:ouroboros    作者:pybee    | 项目源码 | 文件源码
def __init__(self, context, server_side, server_hostname=None):
        """
        The *context* argument specifies the ssl.SSLContext to use.

        The *server_side* argument indicates whether this is a server side or
        client side transport.

        The optional *server_hostname* argument can be used to specify the
        hostname you are connecting to. You may only specify this parameter if
        the _ssl module supports Server Name Indication (SNI).
        """
        self._context = context
        self._server_side = server_side
        self._server_hostname = server_hostname
        self._state = _UNWRAPPED
        self._incoming = ssl.MemoryBIO()
        self._outgoing = ssl.MemoryBIO()
        self._sslobj = None
        self._need_ssldata = False
        self._handshake_cb = None
        self._shutdown_cb = None
项目:outis    作者:SySS-Research    | 项目源码 | 文件源码
def __init__(self):
        """
        initialize a new storage queue
        """

        self.memorybio = ssl.MemoryBIO()
项目:annotated-py-asyncio    作者:hhstore    | 项目源码 | 文件源码
def _is_sslproto_available():
    return hasattr(ssl, "MemoryBIO")


# States of an _SSLPipe.
项目:annotated-py-asyncio    作者:hhstore    | 项目源码 | 文件源码
def _make_legacy_ssl_transport(self, rawsock, protocol, sslcontext,
                                   waiter, *,
                                   server_side=False, server_hostname=None,
                                   extra=None, server=None):
        # Use the legacy API: SSL_write, SSL_read, etc. The legacy API is used
        # on Python 3.4 and older, when ssl.MemoryBIO is not available.
        return _SelectorSslTransport(
            self, rawsock, protocol, sslcontext, waiter,
            server_side, server_hostname, extra, server)
项目:annotated-py-asyncio    作者:hhstore    | 项目源码 | 文件源码
def test_create_ssl_connection(self):
                raise unittest.SkipTest("need python 3.5 (ssl.MemoryBIO)")
项目:annotated-py-asyncio    作者:hhstore    | 项目源码 | 文件源码
def test_create_server_ssl(self):
                raise unittest.SkipTest("need python 3.5 (ssl.MemoryBIO)")
项目:annotated-py-asyncio    作者:hhstore    | 项目源码 | 文件源码
def test_create_server_ssl_match_failed(self):
                raise unittest.SkipTest("need python 3.5 (ssl.MemoryBIO)")
项目:annotated-py-asyncio    作者:hhstore    | 项目源码 | 文件源码
def test_create_server_ssl_verified(self):
                raise unittest.SkipTest("need python 3.5 (ssl.MemoryBIO)")
项目:trio    作者:python-trio    | 项目源码 | 文件源码
def __init__(self, ctx, sock, **kwargs):
        self.incoming = ssl.MemoryBIO()
        self.outgoing = ssl.MemoryBIO()
        self.obj = ctx.wrap_bio(self.incoming, self.outgoing, **kwargs)
        self.sock = sock
项目:trio    作者:python-trio    | 项目源码 | 文件源码
def __init__(
            self,
            transport_stream,
            ssl_context,
            *,
            server_hostname=None,
            server_side=False,
            https_compatible=False,
            max_refill_bytes=_default_max_refill_bytes
    ):
        self.transport_stream = transport_stream
        self._state = _State.OK
        self._max_refill_bytes = max_refill_bytes
        self._https_compatible = https_compatible
        self._outgoing = _stdlib_ssl.MemoryBIO()
        self._incoming = _stdlib_ssl.MemoryBIO()
        self._ssl_object = ssl_context.wrap_bio(
            self._incoming,
            self._outgoing,
            server_side=server_side,
            server_hostname=server_hostname
        )
        # Tracks whether we've already done the initial handshake
        self._handshook = _Once(self._do_handshake)

        # These are used to synchronize access to self.transport_stream
        self._inner_send_lock = _sync.StrictFIFOLock()
        self._inner_recv_count = 0
        self._inner_recv_lock = _sync.Lock()

        # These are used to make sure that our caller doesn't attempt to make
        # multiple concurrent calls to send_all/wait_send_all_might_not_block
        # or to receive_some.
        self._outer_send_conflict_detector = ConflictDetector(
            "another task is currently sending data on this SSLStream"
        )
        self._outer_recv_conflict_detector = ConflictDetector(
            "another task is currently receiving data on this SSLStream"
        )
项目:golightan    作者:shirou    | 项目源码 | 文件源码
def _is_sslproto_available():
    return hasattr(ssl, "MemoryBIO")


# States of an _SSLPipe.
项目:golightan    作者:shirou    | 项目源码 | 文件源码
def _make_legacy_ssl_transport(self, rawsock, protocol, sslcontext,
                                   waiter, *,
                                   server_side=False, server_hostname=None,
                                   extra=None, server=None):
        # Use the legacy API: SSL_write, SSL_read, etc. The legacy API is used
        # on Python 3.4 and older, when ssl.MemoryBIO is not available.
        return _SelectorSslTransport(
            self, rawsock, protocol, sslcontext, waiter,
            server_side, server_hostname, extra, server)
项目:ouroboros    作者:pybee    | 项目源码 | 文件源码
def _is_sslproto_available():
    return hasattr(ssl, "MemoryBIO")


# States of an _SSLPipe.
项目:ouroboros    作者:pybee    | 项目源码 | 文件源码
def _make_legacy_ssl_transport(self, rawsock, protocol, sslcontext,
                                   waiter, *,
                                   server_side=False, server_hostname=None,
                                   extra=None, server=None):
        # Use the legacy API: SSL_write, SSL_read, etc. The legacy API is used
        # on Python 3.4 and older, when ssl.MemoryBIO is not available.
        return _SelectorSslTransport(
            self, rawsock, protocol, sslcontext, waiter,
            server_side, server_hostname, extra, server)
项目:ouroboros    作者:pybee    | 项目源码 | 文件源码
def test_create_ssl_connection(self):
                raise unittest.SkipTest("need python 3.5 (ssl.MemoryBIO)")
项目:ouroboros    作者:pybee    | 项目源码 | 文件源码
def test_create_server_ssl(self):
                raise unittest.SkipTest("need python 3.5 (ssl.MemoryBIO)")
项目:ouroboros    作者:pybee    | 项目源码 | 文件源码
def test_create_server_ssl_match_failed(self):
                raise unittest.SkipTest("need python 3.5 (ssl.MemoryBIO)")
项目:ouroboros    作者:pybee    | 项目源码 | 文件源码
def test_create_server_ssl_verified(self):
                raise unittest.SkipTest("need python 3.5 (ssl.MemoryBIO)")
项目:pyopenvpn    作者:0xa    | 项目源码 | 文件源码
def init_tls(self):
        self.log.debug("initializing TLS context...")
        self.tls_ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
        self.tls_in = ssl.MemoryBIO()
        self.tls_out = ssl.MemoryBIO()
        self.tls = self.tls_ctx.wrap_bio(self.tls_in, self.tls_out, False, None)
项目:uvio    作者:srossross    | 项目源码 | 文件源码
def first_test():
    sock = socket.create_connection(('google.com', 443))

    incoming = ssl.MemoryBIO()
    outgoing = ssl.MemoryBIO()

    ctx = ssl.create_default_context()

    ssl_obj = ctx.wrap_bio(incoming, outgoing)

    try:
        ssl_obj.write(b'')
    except ssl.SSLWantReadError as err:
        print("err", err)

    print('ssl_obj', ssl_obj.pending())
    print('incoming', incoming.pending)
    print('outgoing', outgoing.pending)
    # print()
    # print(outgoing.read())
    data = outgoing.read()
    # print("data", data)
    sock.send(data)


    print('ssl_obj', ssl_obj.pending())
    print('incoming', incoming.pending)
    print('outgoing', outgoing.pending)


    got = sock.recv(10240)
    print('sock.recv got', len(got))

    incoming.write(got)
    # print(incoming.read())
    # print(ssl_obj.read())
    # print('sock.recv', got)


    try:
        # ssl_obj.write(got)
        ssl_obj.do_handshake()
    except ssl.SSLWantReadError as err:
        print("err", err)

    print('ssl_obj', ssl_obj.pending())
    print('incoming', incoming.pending)
    print('outgoing', outgoing.pending)
项目:an2linuxserver    作者:rootkiwi    | 项目源码 | 文件源码
def handle_notification_connection(self):
        self.incoming = ssl.MemoryBIO()
        self.outgoing = ssl.MemoryBIO()
        try:
            if bluetooth_support_kitkat:
                notif_tls_ctx_kitkat_bt.load_verify_locations(cadata=parse_authorized_certs())
                self.tls_bio = notif_tls_ctx_kitkat_bt.wrap_bio(incoming=self.incoming, outgoing=self.outgoing,
                                                                server_side=True)
            else:
                notif_tls_ctx.load_verify_locations(cadata=parse_authorized_certs())
                self.tls_bio = notif_tls_ctx.wrap_bio(incoming=self.incoming, outgoing=self.outgoing, server_side=True)
            self.do_handshake()
        except Exception as e:
            print_with_timestamp('(Bluetooth) Failed TLS handshake notif_conn: {}'.format(e))
            return

        # one recv should not take longer than 10 sec
        self.socket.settimeout(10)

        notification_flags_size = struct.unpack('>I', recvall(self.socket, 4))[0]
        notification_flags_encrypted = recvall(self.socket, notification_flags_size)
        notification_flags = struct.unpack('>B', self.tls_decrypt(notification_flags_encrypted))[0]

        include_title   = chkflags(notification_flags, FLAG_INCLUDE_TITLE)
        include_message = chkflags(notification_flags, FLAG_INCLUDE_MESSAGE)
        include_icon    = chkflags(notification_flags, FLAG_INCLUDE_ICON)

        title = ''
        message = ''

        if include_title or include_message:
            title_and_or_message_size = struct.unpack('>I', recvall(self.socket, 4))[0]
            title_and_or_message_encrypted = recvall(self.socket, title_and_or_message_size)
            title_and_or_message = self.tls_decrypt(title_and_or_message_encrypted).decode()
            if include_title:
                title = title_and_or_message.split('|||')[0]
            if include_message:
                message = title_and_or_message.split('|||')[1]

        if include_icon:
            icon_tmp_file = tempfile.NamedTemporaryFile(buffering=0, dir=TMP_DIR_PATH)
            icon_size = struct.unpack('>I', recvall(self.socket, 4))[0]
            icon_encrypted = recvall(self.socket, icon_size)
            icon = self.tls_decrypt(icon_encrypted)
            try:
                icon_tmp_file.write(icon)
                Notification(title, message, hashlib.sha256(title.encode() + message.encode() + icon).digest(),
                             icon_tmp_file).show()
            except Exception:
                Notification(title, message, hashlib.sha256(title.encode() + message.encode()).digest()).show()
        else:
            Notification(title, message, hashlib.sha256(title.encode() + message.encode()).digest()).show()