Python io 模块,RawIOBase() 实例源码

我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用io.RawIOBase()

项目:annotated-py-asyncio    作者:hhstore    | 项目源码 | 文件源码
def setUp(self):
        self.loop = self.new_test_loop()
        self.protocol = test_utils.make_test_protocol(asyncio.Protocol)
        self.pipe = mock.Mock(spec_set=io.RawIOBase)
        self.pipe.fileno.return_value = 5

        blocking_patcher = mock.patch('asyncio.unix_events._set_nonblocking')
        blocking_patcher.start()
        self.addCleanup(blocking_patcher.stop)

        fstat_patcher = mock.patch('os.fstat')
        m_fstat = fstat_patcher.start()
        st = mock.Mock()
        st.st_mode = stat.S_IFIFO
        m_fstat.return_value = st
        self.addCleanup(fstat_patcher.stop)
项目:annotated-py-asyncio    作者:hhstore    | 项目源码 | 文件源码
def setUp(self):
        self.loop = self.new_test_loop()
        self.protocol = test_utils.make_test_protocol(asyncio.BaseProtocol)
        self.pipe = mock.Mock(spec_set=io.RawIOBase)
        self.pipe.fileno.return_value = 5

        blocking_patcher = mock.patch('asyncio.unix_events._set_nonblocking')
        blocking_patcher.start()
        self.addCleanup(blocking_patcher.stop)

        fstat_patcher = mock.patch('os.fstat')
        m_fstat = fstat_patcher.start()
        st = mock.Mock()
        st.st_mode = stat.S_IFSOCK
        m_fstat.return_value = st
        self.addCleanup(fstat_patcher.stop)
项目:device-updater    作者:spark    | 项目源码 | 文件源码
def wait_until_ready(self, channel:RawIOBase, timeout=60):
        """
        sends ' ' (space) and waits for the corresponding ACK message. Once we have 3 of these in a row we can be fairly
        certain the device is ready for ymodem.
        :param channel:
        :param timeout:
        :return:
        """
        success_count = 0
        while channel.readline():  # flush any existing data
            success_count = 0

        while success_count < 2:
            channel.write(b' ')
            result = channel.read()
            if result and result[0]==LightYModemProtocol.ack:
                success_count += 1
项目:RealtimePythonChat    作者:quangtqag    | 项目源码 | 文件源码
def __init__(self, fileno, mode='r', closefd=True):
        RawIOBase.__init__(self) # Python 2: pylint:disable=no-member,non-parent-init-called
        self._closed = False
        self._closefd = closefd
        self._fileno = fileno
        make_nonblocking(fileno)
        self._readable = 'r' in mode
        self._writable = 'w' in mode
        self.hub = get_hub()

        io_watcher = self.hub.loop.io
        if self._readable:
            self._read_event = io_watcher(fileno, 1)

        if self._writable:
            self._write_event = io_watcher(fileno, 2)

        self._seekable = None
项目:RealtimePythonChat    作者:quangtqag    | 项目源码 | 文件源码
def close(self):
        if self._closed:
            return
        self.flush()
        self._closed = True
        if self._readable:
            self.hub.cancel_wait(self._read_event, cancel_wait_ex)
        if self._writable:
            self.hub.cancel_wait(self._write_event, cancel_wait_ex)
        fileno = self._fileno
        if self._closefd:
            self._fileno = None
            os.close(fileno)

    # RawIOBase provides a 'read' method that will call readall() if
    # the `size` was missing or -1 and otherwise call readinto(). We
    # want to take advantage of this to avoid single byte reads when
    # possible. This is highlighted by a bug in BufferedIOReader that
    # calls read() in a loop when its readall() method is invoked;
    # this was fixed in Python 3.3. See
    # https://github.com/gevent/gevent/issues/675)
项目:ouroboros    作者:pybee    | 项目源码 | 文件源码
def setUp(self):
        self.loop = self.new_test_loop()
        self.protocol = test_utils.make_test_protocol(asyncio.Protocol)
        self.pipe = mock.Mock(spec_set=io.RawIOBase)
        self.pipe.fileno.return_value = 5

        blocking_patcher = mock.patch('asyncio.unix_events._set_nonblocking')
        blocking_patcher.start()
        self.addCleanup(blocking_patcher.stop)

        fstat_patcher = mock.patch('os.fstat')
        m_fstat = fstat_patcher.start()
        st = mock.Mock()
        st.st_mode = stat.S_IFIFO
        m_fstat.return_value = st
        self.addCleanup(fstat_patcher.stop)
项目:ouroboros    作者:pybee    | 项目源码 | 文件源码
def setUp(self):
        self.loop = self.new_test_loop()
        self.protocol = test_utils.make_test_protocol(asyncio.BaseProtocol)
        self.pipe = mock.Mock(spec_set=io.RawIOBase)
        self.pipe.fileno.return_value = 5

        blocking_patcher = mock.patch('asyncio.unix_events._set_nonblocking')
        blocking_patcher.start()
        self.addCleanup(blocking_patcher.stop)

        fstat_patcher = mock.patch('os.fstat')
        m_fstat = fstat_patcher.start()
        st = mock.Mock()
        st.st_mode = stat.S_IFSOCK
        m_fstat.return_value = st
        self.addCleanup(fstat_patcher.stop)
项目:zenchmarks    作者:squeaky-pl    | 项目源码 | 文件源码
def __init__(self, fileno, mode='r', closefd=True):
        RawIOBase.__init__(self) # Python 2: pylint:disable=no-member,non-parent-init-called
        self._closed = False
        self._closefd = closefd
        self._fileno = fileno
        make_nonblocking(fileno)
        self._readable = 'r' in mode
        self._writable = 'w' in mode
        self.hub = get_hub()

        io_watcher = self.hub.loop.io
        if self._readable:
            self._read_event = io_watcher(fileno, 1)

        if self._writable:
            self._write_event = io_watcher(fileno, 2)

        self._seekable = None
项目:zenchmarks    作者:squeaky-pl    | 项目源码 | 文件源码
def close(self):
        if self._closed:
            return
        self.flush()
        self._closed = True
        if self._readable:
            self.hub.cancel_wait(self._read_event, cancel_wait_ex)
        if self._writable:
            self.hub.cancel_wait(self._write_event, cancel_wait_ex)
        fileno = self._fileno
        if self._closefd:
            self._fileno = None
            os.close(fileno)

    # RawIOBase provides a 'read' method that will call readall() if
    # the `size` was missing or -1 and otherwise call readinto(). We
    # want to take advantage of this to avoid single byte reads when
    # possible. This is highlighted by a bug in BufferedIOReader that
    # calls read() in a loop when its readall() method is invoked;
    # this was fixed in Python 3.3. See
    # https://github.com/gevent/gevent/issues/675)
项目:kbe_server    作者:xiaohaoppy    | 项目源码 | 文件源码
def setUp(self):
        self.loop = self.new_test_loop()
        self.protocol = test_utils.make_test_protocol(asyncio.Protocol)
        self.pipe = mock.Mock(spec_set=io.RawIOBase)
        self.pipe.fileno.return_value = 5

        blocking_patcher = mock.patch('asyncio.unix_events._set_nonblocking')
        blocking_patcher.start()
        self.addCleanup(blocking_patcher.stop)

        fstat_patcher = mock.patch('os.fstat')
        m_fstat = fstat_patcher.start()
        st = mock.Mock()
        st.st_mode = stat.S_IFIFO
        m_fstat.return_value = st
        self.addCleanup(fstat_patcher.stop)
项目:kbe_server    作者:xiaohaoppy    | 项目源码 | 文件源码
def setUp(self):
        self.loop = self.new_test_loop()
        self.protocol = test_utils.make_test_protocol(asyncio.BaseProtocol)
        self.pipe = mock.Mock(spec_set=io.RawIOBase)
        self.pipe.fileno.return_value = 5

        blocking_patcher = mock.patch('asyncio.unix_events._set_nonblocking')
        blocking_patcher.start()
        self.addCleanup(blocking_patcher.stop)

        fstat_patcher = mock.patch('os.fstat')
        m_fstat = fstat_patcher.start()
        st = mock.Mock()
        st.st_mode = stat.S_IFSOCK
        m_fstat.return_value = st
        self.addCleanup(fstat_patcher.stop)
项目:swjtu-pyscraper    作者:Desgard    | 项目源码 | 文件源码
def isatty(self):
        io.RawIOBase.isatty(self)
        return True
项目:hostapd-mana    作者:adde88    | 项目源码 | 文件源码
def _gettextwriter(out, encoding):
    if out is None:
        import sys
        out = sys.stdout

    if isinstance(out, io.RawIOBase):
        buffer = io.BufferedIOBase(out)
        # Keep the original file open when the TextIOWrapper is
        # destroyed
        buffer.close = lambda: None
    else:
        # This is to handle passed objects that aren't in the
        # IOBase hierarchy, but just have a write method
        buffer = io.BufferedIOBase()
        buffer.writable = lambda: True
        buffer.write = out.write
        try:
            # TextIOWrapper uses this methods to determine
            # if BOM (for UTF-16, etc) should be added
            buffer.seekable = out.seekable
            buffer.tell = out.tell
        except AttributeError:
            pass
    # wrap a binary writer with TextIOWrapper
    return _UnbufferedTextIOWrapper(buffer, encoding=encoding,
                                   errors='xmlcharrefreplace',
                                   newline='\n')
项目:Sci-Finder    作者:snverse    | 项目源码 | 文件源码
def isatty(self):
        io.RawIOBase.isatty(self)
        return True
项目:Sci-Finder    作者:snverse    | 项目源码 | 文件源码
def isatty(self):
        io.RawIOBase.isatty(self)
        return True
项目:OSPTF    作者:xSploited    | 项目源码 | 文件源码
def _gettextwriter(out, encoding):
    if out is None:
        import sys
        out = sys.stdout

    if isinstance(out, io.RawIOBase):
        buffer = io.BufferedIOBase(out)
        # Keep the original file open when the TextIOWrapper is
        # destroyed
        buffer.close = lambda: None
    else:
        # This is to handle passed objects that aren't in the
        # IOBase hierarchy, but just have a write method
        buffer = io.BufferedIOBase()
        buffer.writable = lambda: True
        buffer.write = out.write
        try:
            # TextIOWrapper uses this methods to determine
            # if BOM (for UTF-16, etc) should be added
            buffer.seekable = out.seekable
            buffer.tell = out.tell
        except AttributeError:
            pass
    # wrap a binary writer with TextIOWrapper
    return _UnbufferedTextIOWrapper(buffer, encoding=encoding,
                                   errors='xmlcharrefreplace',
                                   newline='\n')
项目:watchmen    作者:lycclsltt    | 项目源码 | 文件源码
def __init__(self, sock, mode):
        if mode not in ("r", "w", "rw", "rb", "wb", "rwb"):
            raise ValueError("invalid mode: %r" % mode)
        io.RawIOBase.__init__(self)
        self._sock = sock
        if "b" not in mode:
            mode += "b"
        self._mode = mode
        self._reading = "r" in mode
        self._writing = "w" in mode
        self._timeout_occurred = False
项目:watchmen    作者:lycclsltt    | 项目源码 | 文件源码
def close(self):
        """Close the SocketIO object.  This doesn't close the underlying
        socket, except if all references to it have disappeared.
        """
        if self.closed:
            return
        io.RawIOBase.close(self)
        self._sock._decref_socketios()
        self._sock = None
项目:touch-pay-client    作者:HackPucBemobi    | 项目源码 | 文件源码
def __init__(self, sock, mode):
        if mode not in ("r", "w", "rw", "rb", "wb", "rwb"):
            raise ValueError("invalid mode: %r" % mode)
        io.RawIOBase.__init__(self)
        self._sock = sock
        if "b" not in mode:
            mode += "b"
        self._mode = mode
        self._reading = "r" in mode
        self._writing = "w" in mode
        self._timeout_occurred = False
项目:touch-pay-client    作者:HackPucBemobi    | 项目源码 | 文件源码
def close(self):
        """Close the SocketIO object.  This doesn't close the underlying
        socket, except if all references to it have disappeared.
        """
        if self.closed:
            return
        io.RawIOBase.close(self)
        self._sock._decref_socketios()
        self._sock = None
项目:device-updater    作者:spark    | 项目源码 | 文件源码
def transfer(self, file, channel:RawIOBase, progress:ProgressSpan):
        """
        Transfers a single file to the device and closes the session, so the device places the data in the
        target location.

        file: the file to transfer via ymodem
        channel: the ymodem endpoint
        progress: notification of progress
        """
        self.seq = 0
        self.channel = channel
        self.progress = progress

        file.seek(0, os.SEEK_END)
        size = file.tell()
        file.seek(0, os.SEEK_SET)
        progress.min = 0
        progress.max = size
        progress.update(0)

        response = self.send_filename_header(self.default_file_name, size)
        while response == LightYModemClient.ack:
            response = self.send_packet(file)
        file.close()
        if response != LightYModemClient.eot:
            raise LightYModemException("Unable to transfer the file to the device. Code=%d" % response)
        self._send_close()
        return True
项目:RPoint    作者:george17-meet    | 项目源码 | 文件源码
def isatty(self):
        io.RawIOBase.isatty(self)
        return True
项目:Intranet-Penetration    作者:yuxiaokui    | 项目源码 | 文件源码
def _gettextwriter(out, encoding):
    if out is None:
        import sys
        out = sys.stdout

    if isinstance(out, io.RawIOBase):
        buffer = io.BufferedIOBase(out)
        # Keep the original file open when the TextIOWrapper is
        # destroyed
        buffer.close = lambda: None
    else:
        # This is to handle passed objects that aren't in the
        # IOBase hierarchy, but just have a write method
        buffer = io.BufferedIOBase()
        buffer.writable = lambda: True
        buffer.write = out.write
        try:
            # TextIOWrapper uses this methods to determine
            # if BOM (for UTF-16, etc) should be added
            buffer.seekable = out.seekable
            buffer.tell = out.tell
        except AttributeError:
            pass
    # wrap a binary writer with TextIOWrapper
    return _UnbufferedTextIOWrapper(buffer, encoding=encoding,
                                   errors='xmlcharrefreplace',
                                   newline='\n')
项目:MKFQ    作者:maojingios    | 项目源码 | 文件源码
def _gettextwriter(out, encoding):
    if out is None:
        import sys
        out = sys.stdout

    if isinstance(out, io.RawIOBase):
        buffer = io.BufferedIOBase(out)
        # Keep the original file open when the TextIOWrapper is
        # destroyed
        buffer.close = lambda: None
    else:
        # This is to handle passed objects that aren't in the
        # IOBase hierarchy, but just have a write method
        buffer = io.BufferedIOBase()
        buffer.writable = lambda: True
        buffer.write = out.write
        try:
            # TextIOWrapper uses this methods to determine
            # if BOM (for UTF-16, etc) should be added
            buffer.seekable = out.seekable
            buffer.tell = out.tell
        except AttributeError:
            pass
    # wrap a binary writer with TextIOWrapper
    return _UnbufferedTextIOWrapper(buffer, encoding=encoding,
                                   errors='xmlcharrefreplace',
                                   newline='\n')
项目:hakkuframework    作者:4shadoww    | 项目源码 | 文件源码
def readall(self):
        """Read all remaining data"""
        # avoid RawIOBase default impl
        return self.read()
项目:hakkuframework    作者:4shadoww    | 项目源码 | 文件源码
def __init__(self, sock, mode):
        if mode not in ("r", "w", "rw", "rb", "wb", "rwb"):
            raise ValueError("invalid mode: %r" % mode)
        io.RawIOBase.__init__(self)
        self._sock = sock
        if "b" not in mode:
            mode += "b"
        self._mode = mode
        self._reading = "r" in mode
        self._writing = "w" in mode
        self._timeout_occurred = False
项目:hakkuframework    作者:4shadoww    | 项目源码 | 文件源码
def close(self):
        """Close the SocketIO object.  This doesn't close the underlying
        socket, except if all references to it have disappeared.
        """
        if self.closed:
            return
        io.RawIOBase.close(self)
        self._sock._decref_socketios()
        self._sock = None
项目:trio    作者:python-trio    | 项目源码 | 文件源码
def test_unsupported_not_forwarded():
    class FakeFile(io.RawIOBase):
        def unsupported_attr(self):  # pragma: no cover
            pass

    async_file = trio.wrap_file(FakeFile())

    assert hasattr(async_file.wrapped, 'unsupported_attr')

    with pytest.raises(AttributeError):
        getattr(async_file, 'unsupported_attr')
项目:pupy    作者:ru-faraon    | 项目源码 | 文件源码
def _gettextwriter(out, encoding):
    if out is None:
        import sys
        out = sys.stdout

    if isinstance(out, io.RawIOBase):
        buffer = io.BufferedIOBase(out)
        # Keep the original file open when the TextIOWrapper is
        # destroyed
        buffer.close = lambda: None
    else:
        # This is to handle passed objects that aren't in the
        # IOBase hierarchy, but just have a write method
        buffer = io.BufferedIOBase()
        buffer.writable = lambda: True
        buffer.write = out.write
        try:
            # TextIOWrapper uses this methods to determine
            # if BOM (for UTF-16, etc) should be added
            buffer.seekable = out.seekable
            buffer.tell = out.tell
        except AttributeError:
            pass
    # wrap a binary writer with TextIOWrapper
    return _UnbufferedTextIOWrapper(buffer, encoding=encoding,
                                   errors='xmlcharrefreplace',
                                   newline='\n')
项目:oa_qian    作者:sunqb    | 项目源码 | 文件源码
def isatty(self):
        io.RawIOBase.isatty(self)
        return True
项目:RealtimePythonChat    作者:quangtqag    | 项目源码 | 文件源码
def isatty(self):
        io.RawIOBase.isatty(self)
        return True
项目:PySocket    作者:falseen    | 项目源码 | 文件源码
def __init__(self, sock, mode):
        if mode not in ("r", "w", "rw", "rb", "wb", "rwb"):
            raise ValueError("invalid mode: %r" % mode)
        io.RawIOBase.__init__(self)
        self._sock = sock
        if "b" not in mode:
            mode += "b"
        self._mode = mode
        self._reading = "r" in mode
        self._writing = "w" in mode
        self._timeout_occurred = False
项目:PySocket    作者:falseen    | 项目源码 | 文件源码
def close(self):
        """Close the SocketIO object.  This doesn't close the underlying
        socket, except if all references to it have disappeared.
        """
        if self.closed:
            return
        io.RawIOBase.close(self)
        self._sock._decref_socketios()
        self._sock = None
项目:Indushell    作者:SecarmaLabs    | 项目源码 | 文件源码
def isatty(self):
        io.RawIOBase.isatty(self)
        return True
项目:Liljimbo-Chatbot    作者:chrisjim316    | 项目源码 | 文件源码
def isatty(self):
        io.RawIOBase.isatty(self)
        return True
项目:flask_system    作者:prashasy    | 项目源码 | 文件源码
def isatty(self):
        io.RawIOBase.isatty(self)
        return True
项目:zippy    作者:securesystemslab    | 项目源码 | 文件源码
def __init__(self, handler):
        io.RawIOBase.__init__(self)
        # The channel from the client
        self.c2s = io.BytesIO()
        # The channel to the client
        self.s2c = io.BytesIO()
        self.handler = handler
        self.handler.start(self.c2s.readline, self.push_data)
项目:zippy    作者:securesystemslab    | 项目源码 | 文件源码
def __init__(self, sock, mode):
        if mode not in ("r", "w", "rw", "rb", "wb", "rwb"):
            raise ValueError("invalid mode: %r" % mode)
        io.RawIOBase.__init__(self)
        self._sock = sock
        if "b" not in mode:
            mode += "b"
        self._mode = mode
        self._reading = "r" in mode
        self._writing = "w" in mode
        self._timeout_occurred = False
项目:zippy    作者:securesystemslab    | 项目源码 | 文件源码
def close(self):
        """Close the SocketIO object.  This doesn't close the underlying
        socket, except if all references to it have disappeared.
        """
        if self.closed:
            return
        io.RawIOBase.close(self)
        self._sock._decref_socketios()
        self._sock = None
项目:ShelbySearch    作者:Agentscreech    | 项目源码 | 文件源码
def isatty(self):
        io.RawIOBase.isatty(self)
        return True
项目:jltools    作者:ownport    | 项目源码 | 文件源码
def isatty(self):
        io.RawIOBase.isatty(self)
        return True
项目:FileStoreGAE    作者:liantian-cn    | 项目源码 | 文件源码
def isatty(self):
        io.RawIOBase.isatty(self)
        return True
项目:bawk    作者:jttwnsnd    | 项目源码 | 文件源码
def isatty(self):
        io.RawIOBase.isatty(self)
        return True
项目:bawk    作者:jttwnsnd    | 项目源码 | 文件源码
def __init__(self, sock, mode):
        if mode not in ("r", "w", "rw", "rb", "wb", "rwb"):
            raise ValueError("invalid mode: %r" % mode)
        io.RawIOBase.__init__(self)
        self._sock = sock
        if "b" not in mode:
            mode += "b"
        self._mode = mode
        self._reading = "r" in mode
        self._writing = "w" in mode
        self._timeout_occurred = False
项目:bawk    作者:jttwnsnd    | 项目源码 | 文件源码
def close(self):
        """Close the SocketIO object.  This doesn't close the underlying
        socket, except if all references to it have disappeared.
        """
        if self.closed:
            return
        io.RawIOBase.close(self)
        self._sock._decref_socketios()
        self._sock = None
项目:infinite-lorem-ipsum    作者:patjm1992    | 项目源码 | 文件源码
def isatty(self):
        io.RawIOBase.isatty(self)
        return True
项目:Price-Comparator    作者:Thejas-1    | 项目源码 | 文件源码
def isatty(self):
        io.RawIOBase.isatty(self)
        return True
项目:oil    作者:oilshell    | 项目源码 | 文件源码
def _gettextwriter(out, encoding):
    if out is None:
        import sys
        out = sys.stdout

    if isinstance(out, io.RawIOBase):
        buffer = io.BufferedIOBase(out)
        # Keep the original file open when the TextIOWrapper is
        # destroyed
        buffer.close = lambda: None
    else:
        # This is to handle passed objects that aren't in the
        # IOBase hierarchy, but just have a write method
        buffer = io.BufferedIOBase()
        buffer.writable = lambda: True
        buffer.write = out.write
        try:
            # TextIOWrapper uses this methods to determine
            # if BOM (for UTF-16, etc) should be added
            buffer.seekable = out.seekable
            buffer.tell = out.tell
        except AttributeError:
            pass
    # wrap a binary writer with TextIOWrapper
    return _UnbufferedTextIOWrapper(buffer, encoding=encoding,
                                   errors='xmlcharrefreplace',
                                   newline='\n')
项目:python2-tracer    作者:extremecoders-re    | 项目源码 | 文件源码
def _gettextwriter(out, encoding):
    if out is None:
        import sys
        out = sys.stdout

    if isinstance(out, io.RawIOBase):
        buffer = io.BufferedIOBase(out)
        # Keep the original file open when the TextIOWrapper is
        # destroyed
        buffer.close = lambda: None
    else:
        # This is to handle passed objects that aren't in the
        # IOBase hierarchy, but just have a write method
        buffer = io.BufferedIOBase()
        buffer.writable = lambda: True
        buffer.write = out.write
        try:
            # TextIOWrapper uses this methods to determine
            # if BOM (for UTF-16, etc) should be added
            buffer.seekable = out.seekable
            buffer.tell = out.tell
        except AttributeError:
            pass
    # wrap a binary writer with TextIOWrapper
    return _UnbufferedTextIOWrapper(buffer, encoding=encoding,
                                   errors='xmlcharrefreplace',
                                   newline='\n')
项目:packaging    作者:blockstack    | 项目源码 | 文件源码
def __init__(self, sock, mode):
        if mode not in ("r", "w", "rw", "rb", "wb", "rwb"):
            raise ValueError("invalid mode: %r" % mode)
        io.RawIOBase.__init__(self)
        self._sock = sock
        if "b" not in mode:
            mode += "b"
        self._mode = mode
        self._reading = "r" in mode
        self._writing = "w" in mode
        self._timeout_occurred = False