我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用io.RawIOBase()。
def setUp(self): self.loop = self.new_test_loop() self.protocol = test_utils.make_test_protocol(asyncio.Protocol) self.pipe = mock.Mock(spec_set=io.RawIOBase) self.pipe.fileno.return_value = 5 blocking_patcher = mock.patch('asyncio.unix_events._set_nonblocking') blocking_patcher.start() self.addCleanup(blocking_patcher.stop) fstat_patcher = mock.patch('os.fstat') m_fstat = fstat_patcher.start() st = mock.Mock() st.st_mode = stat.S_IFIFO m_fstat.return_value = st self.addCleanup(fstat_patcher.stop)
def setUp(self): self.loop = self.new_test_loop() self.protocol = test_utils.make_test_protocol(asyncio.BaseProtocol) self.pipe = mock.Mock(spec_set=io.RawIOBase) self.pipe.fileno.return_value = 5 blocking_patcher = mock.patch('asyncio.unix_events._set_nonblocking') blocking_patcher.start() self.addCleanup(blocking_patcher.stop) fstat_patcher = mock.patch('os.fstat') m_fstat = fstat_patcher.start() st = mock.Mock() st.st_mode = stat.S_IFSOCK m_fstat.return_value = st self.addCleanup(fstat_patcher.stop)
def wait_until_ready(self, channel:RawIOBase, timeout=60): """ sends ' ' (space) and waits for the corresponding ACK message. Once we have 3 of these in a row we can be fairly certain the device is ready for ymodem. :param channel: :param timeout: :return: """ success_count = 0 while channel.readline(): # flush any existing data success_count = 0 while success_count < 2: channel.write(b' ') result = channel.read() if result and result[0]==LightYModemProtocol.ack: success_count += 1
def __init__(self, fileno, mode='r', closefd=True): RawIOBase.__init__(self) # Python 2: pylint:disable=no-member,non-parent-init-called self._closed = False self._closefd = closefd self._fileno = fileno make_nonblocking(fileno) self._readable = 'r' in mode self._writable = 'w' in mode self.hub = get_hub() io_watcher = self.hub.loop.io if self._readable: self._read_event = io_watcher(fileno, 1) if self._writable: self._write_event = io_watcher(fileno, 2) self._seekable = None
def close(self): if self._closed: return self.flush() self._closed = True if self._readable: self.hub.cancel_wait(self._read_event, cancel_wait_ex) if self._writable: self.hub.cancel_wait(self._write_event, cancel_wait_ex) fileno = self._fileno if self._closefd: self._fileno = None os.close(fileno) # RawIOBase provides a 'read' method that will call readall() if # the `size` was missing or -1 and otherwise call readinto(). We # want to take advantage of this to avoid single byte reads when # possible. This is highlighted by a bug in BufferedIOReader that # calls read() in a loop when its readall() method is invoked; # this was fixed in Python 3.3. See # https://github.com/gevent/gevent/issues/675)
def isatty(self): io.RawIOBase.isatty(self) return True
def _gettextwriter(out, encoding): if out is None: import sys out = sys.stdout if isinstance(out, io.RawIOBase): buffer = io.BufferedIOBase(out) # Keep the original file open when the TextIOWrapper is # destroyed buffer.close = lambda: None else: # This is to handle passed objects that aren't in the # IOBase hierarchy, but just have a write method buffer = io.BufferedIOBase() buffer.writable = lambda: True buffer.write = out.write try: # TextIOWrapper uses this methods to determine # if BOM (for UTF-16, etc) should be added buffer.seekable = out.seekable buffer.tell = out.tell except AttributeError: pass # wrap a binary writer with TextIOWrapper return _UnbufferedTextIOWrapper(buffer, encoding=encoding, errors='xmlcharrefreplace', newline='\n')
def __init__(self, sock, mode): if mode not in ("r", "w", "rw", "rb", "wb", "rwb"): raise ValueError("invalid mode: %r" % mode) io.RawIOBase.__init__(self) self._sock = sock if "b" not in mode: mode += "b" self._mode = mode self._reading = "r" in mode self._writing = "w" in mode self._timeout_occurred = False
def close(self): """Close the SocketIO object. This doesn't close the underlying socket, except if all references to it have disappeared. """ if self.closed: return io.RawIOBase.close(self) self._sock._decref_socketios() self._sock = None
def transfer(self, file, channel:RawIOBase, progress:ProgressSpan): """ Transfers a single file to the device and closes the session, so the device places the data in the target location. file: the file to transfer via ymodem channel: the ymodem endpoint progress: notification of progress """ self.seq = 0 self.channel = channel self.progress = progress file.seek(0, os.SEEK_END) size = file.tell() file.seek(0, os.SEEK_SET) progress.min = 0 progress.max = size progress.update(0) response = self.send_filename_header(self.default_file_name, size) while response == LightYModemClient.ack: response = self.send_packet(file) file.close() if response != LightYModemClient.eot: raise LightYModemException("Unable to transfer the file to the device. Code=%d" % response) self._send_close() return True
def readall(self): """Read all remaining data""" # avoid RawIOBase default impl return self.read()
def test_unsupported_not_forwarded(): class FakeFile(io.RawIOBase): def unsupported_attr(self): # pragma: no cover pass async_file = trio.wrap_file(FakeFile()) assert hasattr(async_file.wrapped, 'unsupported_attr') with pytest.raises(AttributeError): getattr(async_file, 'unsupported_attr')
def __init__(self, handler): io.RawIOBase.__init__(self) # The channel from the client self.c2s = io.BytesIO() # The channel to the client self.s2c = io.BytesIO() self.handler = handler self.handler.start(self.c2s.readline, self.push_data)