我们从Python开源项目中,提取了以下44个代码示例,用于说明如何使用termios.VMIN。
def initialise(self): if not self.enabled: return if termios is None: self.enabled = False return try: self.tty = open("/dev/tty", "w+") os.tcgetpgrp(self.tty.fileno()) self.clean_tcattr = termios.tcgetattr(self.tty) iflag, oflag, cflag, lflag, ispeed, ospeed, cc = self.clean_tcattr new_lflag = lflag & (0xffffffff ^ termios.ICANON) new_cc = cc[:] new_cc[termios.VMIN] = 0 self.cbreak_tcattr = [ iflag, oflag, cflag, new_lflag, ispeed, ospeed, new_cc] except Exception: self.enabled = False return
def __enter__(self): # NOTE: On os X systems, using pty.setraw() fails. Therefor we are using this: try: newattr = termios.tcgetattr(self.fileno) except termios.error: pass else: newattr[tty.LFLAG] = self._patch_lflag(newattr[tty.LFLAG]) newattr[tty.IFLAG] = self._patch_iflag(newattr[tty.IFLAG]) # VMIN defines the number of characters read at a time in # non-canonical mode. It seems to default to 1 on Linux, but on # Solaris and derived operating systems it defaults to 4. (This is # because the VMIN slot is the same as the VEOF slot, which # defaults to ASCII EOT = Ctrl-D = 4.) newattr[tty.CC][termios.VMIN] = 1 termios.tcsetattr(self.fileno, termios.TCSANOW, newattr) # Put the terminal in cursor mode. (Instead of application mode.) os.write(self.fileno, b'\x1b[?1l')
def reconfigure(self, state=True): """ Allow HUPCL (hang up on close) VMIN, VTIME = 0 """ fd = self.fileno() orig_attr = termios.tcgetattr(fd) iflag, oflag, cflag, lflag, ispeed, ospeed, cc = orig_attr cflag |= termios.HUPCL cc[termios.VMIN] = 0 cc[termios.VTIME] = 0 termios.tcsetattr(fd, termios.TCSANOW, [iflag, oflag, cflag, lflag, ispeed, ospeed, cc])
def setupterm(): global settings update_geometry() hide_cursor() do('smkx') # keypad mode if not settings: settings = termios.tcgetattr(fd.fileno()) mode = termios.tcgetattr(fd.fileno()) IFLAG = 0 OFLAG = 1 CFLAG = 2 LFLAG = 3 ISPEED = 4 OSPEED = 5 CC = 6 mode[LFLAG] = mode[LFLAG] & ~(termios.ECHO | termios.ICANON | termios.IEXTEN) mode[CC][termios.VMIN] = 1 mode[CC][termios.VTIME] = 0 termios.tcsetattr(fd, termios.TCSAFLUSH, mode)
def setup(self): self.old = termios.tcgetattr(self.fd) new = termios.tcgetattr(self.fd) new[3] = new[3] & ~termios.ICANON & ~termios.ECHO & ~termios.ISIG new[6][termios.VMIN] = 1 new[6][termios.VTIME] = 0 termios.tcsetattr(self.fd, termios.TCSANOW, new)
def _reconfigure_port(self, force_update=True): """Set communication parameters on opened port.""" super(VTIMESerial, self)._reconfigure_port() fcntl.fcntl(self.fd, fcntl.F_SETFL, 0) # clear O_NONBLOCK if self._inter_byte_timeout is not None: vmin = 1 vtime = int(self._inter_byte_timeout * 10) else: vmin = 0 vtime = int(self._timeout * 10) try: orig_attr = termios.tcgetattr(self.fd) iflag, oflag, cflag, lflag, ispeed, ospeed, cc = orig_attr except termios.error as msg: # if a port is nonexistent but has a /dev file, it'll fail here raise serial.SerialException("Could not configure port: %s" % msg) if vtime < 0 or vtime > 255: raise ValueError('Invalid vtime: %r' % vtime) cc[termios.VTIME] = vtime cc[termios.VMIN] = vmin termios.tcsetattr( self.fd, termios.TCSANOW, [iflag, oflag, cflag, lflag, ispeed, ospeed, cc])
def setup(self): new = termios.tcgetattr(self.fd) new[3] = new[3] & ~termios.ICANON & ~termios.ECHO & ~termios.ISIG new[6][termios.VMIN] = 1 new[6][termios.VTIME] = 0 termios.tcsetattr(self.fd, termios.TCSANOW, new)
def prepare(self): # per-readline preparations: self.__svtermstate = tcgetattr(self.input_fd) raw = self.__svtermstate.copy() raw.iflag |= termios.ICRNL raw.iflag &= ~(termios.BRKINT | termios.INPCK | termios.ISTRIP | termios.IXON) raw.oflag &= ~termios.OPOST raw.cflag &= ~(termios.CSIZE | termios.PARENB) raw.cflag |= (termios.CS8) raw.lflag &= ~(termios.ICANON | termios.ECHO | termios.IEXTEN | (termios.ISIG * 1)) raw.cc[termios.VMIN] = 1 raw.cc[termios.VTIME] = 0 tcsetattr(self.input_fd, termios.TCSADRAIN, raw) self.screen = [] self.height, self.width = self.getheightwidth() self.__buffer = [] self.__posxy = 0, 0 self.__gone_tall = 0 self.__move = self.__move_short self.__offset = 0 self.__maybe_write_code(self._smkx) try: self.old_sigwinch = signal.signal( signal.SIGWINCH, self.__sigwinch) except ValueError: pass
def _reconfigure_port(self, force_update=True): """Set communication parameters on opened port.""" super(VTIMESerial, self)._reconfigure_port() fcntl.fcntl(self.fd, fcntl.F_SETFL, 0) # clear O_NONBLOCK if self._inter_byte_timeout is not None: vmin = 1 vtime = int(self._inter_byte_timeout * 10) elif self._timeout is None: vmin = 1 vtime = 0 else: vmin = 0 vtime = int(self._timeout * 10) try: orig_attr = termios.tcgetattr(self.fd) iflag, oflag, cflag, lflag, ispeed, ospeed, cc = orig_attr except termios.error as msg: # if a port is nonexistent but has a /dev file, it'll fail here raise serial.SerialException("Could not configure port: {}".format(msg)) if vtime < 0 or vtime > 255: raise ValueError('Invalid vtime: {!r}'.format(vtime)) cc[termios.VTIME] = vtime cc[termios.VMIN] = vmin termios.tcsetattr( self.fd, termios.TCSANOW, [iflag, oflag, cflag, lflag, ispeed, ospeed, cc])
def __init__(self): global CIO_STARTED if CIO_STARTED: raise Exception("cannot init cio twice") CIO_STARTED = True self.buf='' self.fh_ocfg=list() if os.name == 'posix': # for posix, need to set to non-canonical input. self.posix=True fh = sys.stdin.fileno() # if the following call fails, we are probably called with a stdin which is not a tty. ocfg = termios.tcgetattr(fh) cfg = termios.tcgetattr(fh) cfg[3] = cfg[3]&~termios.ICANON&~termios.ECHO #cfg[0] = cfg[0]&~termios.INLCR #cfg[1] = cfg[0]&~termios.OCRNL cfg[6][termios.VMIN] = 0 cfg[6][termios.VTIME] = 0 termios.tcsetattr(fh,termios.TCSAFLUSH,cfg) self.fh_ocfg.extend((fh,ocfg)) atexit.register(stop_canon_input,self.fh_ocfg) elif os.name == 'nt': # for windows, don't need to configure the terminal. self.posix=False else: # know only posix and windows... raise Exception("os variant %s not supported"%repr(os.name))
def run(self): if os.name != "posix": raise ValueError("Can only be run on Posix systems") return buffer = "" # While PySerial would be preferable and more machine-independant, # it does not support echo suppression with open(self.dbgser_tty_name, 'r+') as dbgser: # Config the debug serial port oldattrs = termios.tcgetattr(dbgser) newattrs = termios.tcgetattr(dbgser) newattrs[4] = termios.B115200 # ispeed newattrs[5] = termios.B115200 # ospeed newattrs[3] = newattrs[3] & ~termios.ICANON & ~termios.ECHO newattrs[6][termios.VMIN] = 0 newattrs[6][termios.VTIME] = 10 termios.tcsetattr(dbgser, termios.TCSANOW, newattrs) # As long as we weren't asked to stop, try to capture dbgserial # output and push each line up the result queue. try: while not self.stoprequest.isSet(): ch = dbgser.read(1) if ch: if ch == "\n": # Push the line (sans newline) into our queue # and clear the buffer for the next line. self.result_q.put(buffer) buffer = "" elif ch != "\r": buffer += ch except IOError: pass finally: # Restore previous settings termios.tcsetattr(dbgser, termios.TCSAFLUSH, oldattrs) # Flush any partial buffer if buffer: self.result_q.put(buffer)
def prepare_tty(): "set the terminal in char mode (return each keyboard press at once) and"\ " switch off echoing of this input; return the original settings" stdin_fd = sys.stdin.fileno() # will most likely be 0 ;-> old_stdin_config = termios.tcgetattr(stdin_fd) [ iflag, oflag, cflag, lflag, ispeed, ospeed, cc ] = \ termios.tcgetattr(stdin_fd) cc[termios.VTIME] = 1 cc[termios.VMIN] = 1 iflag = iflag & ~(termios.IGNBRK | termios.BRKINT | termios.PARMRK | termios.ISTRIP | termios.INLCR | termios.IGNCR | #termios.ICRNL | termios.IXON) # oflag = oflag & ~termios.OPOST cflag = cflag | termios.CS8 lflag = lflag & ~(termios.ECHO | termios.ECHONL | termios.ICANON | # termios.ISIG | termios.IEXTEN) termios.tcsetattr(stdin_fd, termios.TCSANOW, [ iflag, oflag, cflag, lflag, ispeed, ospeed, cc ]) return (stdin_fd, old_stdin_config)
def _reconfigure_port(self, force_update=True): """Set communication parameters on opened port.""" super(VTIMESerial, self)._reconfigure_port() fcntl.fcntl(self.fd, fcntl.F_SETFL, 0) # clear O_NONBLOCK if self._inter_byte_timeout is not None: vmin = 1 vtime = int(self._inter_byte_timeout * 10) else: vmin = 0 vtime = int(self._timeout * 10) try: orig_attr = termios.tcgetattr(self.fd) iflag, oflag, cflag, lflag, ispeed, ospeed, cc = orig_attr except termios.error as msg: # if a port is nonexistent but has a /dev file, it'll fail here raise serial.SerialException("Could not configure port: {}".format(msg)) if vtime < 0 or vtime > 255: raise ValueError('Invalid vtime: {!r}'.format(vtime)) cc[termios.VTIME] = vtime cc[termios.VMIN] = vmin termios.tcsetattr( self.fd, termios.TCSANOW, [iflag, oflag, cflag, lflag, ispeed, ospeed, cc])
def prepare(self): # per-readline preparations: self.__svtermstate = tcgetattr(self.input_fd) raw = self.__svtermstate.copy() raw.iflag &=~ (termios.BRKINT | termios.INPCK | termios.ISTRIP | termios.IXON) raw.oflag &=~ (termios.OPOST) raw.cflag &=~ (termios.CSIZE|termios.PARENB) raw.cflag |= (termios.CS8) raw.lflag &=~ (termios.ICANON|termios.ECHO| termios.IEXTEN|(termios.ISIG*1)) raw.cc[termios.VMIN] = 1 raw.cc[termios.VTIME] = 0 tcsetattr(self.input_fd, termios.TCSADRAIN, raw) self.screen = [] self.height, self.width = self.getheightwidth() self.__buffer = [] self.__posxy = 0, 0 self.__gone_tall = 0 self.__move = self.__move_short self.__offset = 0 self.__maybe_write_code(self._smkx) try: self.old_sigwinch = signal.signal( signal.SIGWINCH, self.__sigwinch) except ValueError: pass
def show (self, show_log = False): # init termios old_settings = termios.tcgetattr(sys.stdin) new_settings = termios.tcgetattr(sys.stdin) new_settings[3] = new_settings[3] & ~(termios.ECHO | termios.ICANON) # lflags new_settings[6][termios.VMIN] = 0 # cc new_settings[6][termios.VTIME] = 0 # cc termios.tcsetattr(sys.stdin, termios.TCSADRAIN, new_settings) self.pm.init(show_log) self.state = self.STATE_ACTIVE self.draw_policer = 0 try: while True: # draw and handle user input cont, force_draw = self.handle_key_input() self.draw_screen(force_draw) if not cont: break time.sleep(0.1) # regular state if self.state == self.STATE_ACTIVE: # if no connectivity - move to lost connecitivty if not self.stateless_client.async_client.is_alive(): self.stateless_client._invalidate_stats(self.pm.ports) self.state = self.STATE_LOST_CONT # lost connectivity elif self.state == self.STATE_LOST_CONT: # got it back if self.stateless_client.async_client.is_alive(): # move to state reconnect self.state = self.STATE_RECONNECT # restored connectivity - try to reconnect elif self.state == self.STATE_RECONNECT: try: self.stateless_client.connect() self.state = self.STATE_ACTIVE except STLError: self.state = self.STATE_LOST_CONT finally: # restore termios.tcsetattr(sys.stdin, termios.TCSADRAIN, old_settings) print("") # draw once