我们从Python开源项目中,提取了以下13个代码示例,用于说明如何使用os.O_NDELAY。
def flags(self, *which): import fcntl, os if which: if len(which) > 1: raise TypeError, 'Too many arguments' which = which[0] else: which = '?' l_flags = 0 if 'n' in which: l_flags = l_flags | os.O_NDELAY if 'a' in which: l_flags = l_flags | os.O_APPEND if 's' in which: l_flags = l_flags | os.O_SYNC file = self._file_ if '=' not in which: cur_fl = fcntl.fcntl(file.fileno(), fcntl.F_GETFL, 0) if '!' in which: l_flags = cur_fl & ~ l_flags else: l_flags = cur_fl | l_flags l_flags = fcntl.fcntl(file.fileno(), fcntl.F_SETFL, l_flags) if 'c' in which: arg = ('!' not in which) # 0 is don't, 1 is do close on exec l_flags = fcntl.fcntl(file.fileno(), fcntl.F_SETFD, arg) if '?' in which: which = '' # Return current flags l_flags = fcntl.fcntl(file.fileno(), fcntl.F_GETFL, 0) if os.O_APPEND & l_flags: which = which + 'a' if fcntl.fcntl(file.fileno(), fcntl.F_GETFD, 0) & 1: which = which + 'c' if os.O_NDELAY & l_flags: which = which + 'n' if os.O_SYNC & l_flags: which = which + 's' return which
def __init__(self, device="/dev/ttyAMA0", baudrate=9600): if not device.startswith("/dev/"): device = "/dev/%s" % device if isinstance(baudrate, str): baudrate = int(baudrate) aname = "B%d" % baudrate if not hasattr(termios, aname): raise Exception("Unsupported baudrate") self.baudrate = baudrate Bus.__init__(self, "UART", device, os.O_RDWR | os.O_NOCTTY) fcntl.fcntl(self.fd, fcntl.F_SETFL, os.O_NDELAY) #backup = termios.tcgetattr(self.fd) options = termios.tcgetattr(self.fd) # iflag options[0] = 0 # oflag options[1] = 0 # cflag options[2] |= (termios.CLOCAL | termios.CREAD) options[2] &= ~termios.PARENB options[2] &= ~termios.CSTOPB options[2] &= ~termios.CSIZE options[2] |= termios.CS8 # lflag options[3] = 0 speed = getattr(termios, aname) # input speed options[4] = speed # output speed options[5] = speed termios.tcsetattr(self.fd, termios.TCSADRAIN, options)
def _setup_non_block_io(self, io): outfd = io.fileno() file_flags = fcntl.fcntl(outfd, fcntl.F_GETFL) fcntl.fcntl(outfd, fcntl.F_SETFL, file_flags | os.O_NDELAY)
def __init__(self): self.queue = Queue() PIPE = subprocess.PIPE self.p = subprocess.Popen(self.cmd, stdout=PIPE) flags = fcntl.fcntl(self.p.stdout.fileno(), fcntl.F_GETFL) fcntl.fcntl(self.p.stdout.fileno(), fcntl.F_SETFL, (flags | os.O_NDELAY | os.O_NONBLOCK)) self.out = self.p.stdout threading.Thread(target=self.sendData).start()