我们从Python开源项目中,提取了以下27个代码示例,用于说明如何使用termios.FIONREAD。
def in_waiting(self): """Return the number of bytes currently in the input buffer.""" #~ s = fcntl.ioctl(self.fd, termios.FIONREAD, TIOCM_zero_str) s = fcntl.ioctl(self.fd, TIOCINQ, TIOCM_zero_str) return struct.unpack('I', s)[0] # select based implementation, proved to work on many systems
def out_waiting(self): """Return the number of bytes currently in the output buffer.""" #~ s = fcntl.ioctl(self.fd, termios.FIONREAD, TIOCM_zero_str) s = fcntl.ioctl(self.fd, TIOCOUTQ, TIOCM_zero_str) return struct.unpack('I', s)[0]
def read(self, timeout=None, read_delay=None): """Read the inotify file descriptor and return the resulting list of :attr:`~inotify_simple.Event` namedtuples (wd, mask, cookie, name). Args: timeout (int): The time in milliseconds to wait for events if there are none. If `negative or `None``, block until there are events. read_delay (int): The time in milliseconds to wait after the first event arrives before reading the buffer. This allows further events to accumulate before reading, which allows the kernel to consolidate like events and can enhance performance when there are many similar events. Returns: list: list of :attr:`~inotify_simple.Event` namedtuples""" # Wait for the first event: pending = self._poller.poll(timeout) if not pending: # Timed out, no events return [] if read_delay is not None: # Wait for more events to accumulate: time.sleep(read_delay/1000.0) # How much data is available to read? bytes_avail = ctypes.c_int() ioctl(self.fd, FIONREAD, bytes_avail) buffer_size = bytes_avail.value # Read and parse it: data = os.read(self.fd, buffer_size) events = parse_events(data) return events
def __call__(self): '''Selector poll loop''' avail_buf = array.array('i', [0]) while self._running: with self._socket_map_lock: for sock in self._unregister_sockets: self.selector.unregister(sock) self._unregister_sockets.clear() for sock in self._register_sockets: self.selector.register(sock, selectors.EVENT_READ) self._register_sockets.clear() events = self.selector.select(timeout=0.1) with self._socket_map_lock: if self._unregister_sockets or self._register_sockets: continue ready_ids = [self.socket_to_id[key.fileobj] for key, mask in events] ready_objs = [(self.objects[obj_id], self.id_to_socket[obj_id]) for obj_id in ready_ids] for obj, sock in ready_objs: # TODO: consider thread pool for recv and command_loop if fcntl.ioctl(sock, termios.FIONREAD, avail_buf) < 0: continue bytes_available = avail_buf[0] try: bytes_recv, address = sock.recvfrom(max((4096, bytes_available))) except OSError as ex: if ex.errno == errno.EAGAIN: continue bytes_recv, address = b'', None obj.received(bytes_recv, address)
def read_events(self): """ Read events from device, build _RawEvents, and enqueue them. """ buf_ = array.array('i', [0]) # get event queue size if fcntl.ioctl(self._fd, termios.FIONREAD, buf_, 1) == -1: return queue_size = buf_[0] if queue_size < self._threshold: log.debug('(fd: %d) %d bytes available to read but threshold is ' 'fixed to %d bytes', self._fd, queue_size, self._threshold) return try: # Read content from file r = os.read(self._fd, queue_size) except Exception, msg: raise NotifierError(msg) log.debug('Event queue size: %d', queue_size) rsum = 0 # counter while rsum < queue_size: s_size = 16 # Retrieve wd, mask, cookie and fname_len wd, mask, cookie, fname_len = struct.unpack('iIII', r[rsum:rsum+s_size]) # Retrieve name fname, = struct.unpack('%ds' % fname_len, r[rsum + s_size:rsum + s_size + fname_len]) rawevent = _RawEvent(wd, mask, cookie, fname) if self._coalesce: # Only enqueue new (unique) events. raweventstr = str(rawevent) if raweventstr not in self._eventset: self._eventset.add(raweventstr) self._eventq.append(rawevent) else: self._eventq.append(rawevent) rsum += s_size + fname_len
def EstimateUnreadBytes(fd): buf = array('i', [0]) ioctl(fd, FIONREAD, buf, 1) return buf[0]
def read_events(self): """ Read events from device, build _RawEvents, and enqueue them. """ buf_ = array.array('i', [0]) # get event queue size if fcntl.ioctl(self._fd, termios.FIONREAD, buf_, 1) == -1: return queue_size = buf_[0] if queue_size < self._threshold: log.debug('(fd: %d) %d bytes available to read but threshold is ' 'fixed to %d bytes', self._fd, queue_size, self._threshold) return try: # Read content from file r = os.read(self._fd, queue_size) except Exception as msg: raise NotifierError(msg) log.debug('Event queue size: %d', queue_size) rsum = 0 # counter while rsum < queue_size: s_size = 16 # Retrieve wd, mask, cookie and fname_len wd, mask, cookie, fname_len = struct.unpack('iIII', r[rsum:rsum+s_size]) # Retrieve name bname, = struct.unpack('%ds' % fname_len, r[rsum + s_size:rsum + s_size + fname_len]) # FIXME: should we explictly call sys.getdefaultencoding() here ?? uname = bname.decode() rawevent = _RawEvent(wd, mask, cookie, uname) if self._coalesce: # Only enqueue new (unique) events. raweventstr = str(rawevent) if raweventstr not in self._eventset: self._eventset.add(raweventstr) self._eventq.append(rawevent) else: self._eventq.append(rawevent) rsum += s_size + fname_len