我们从Python开源项目中,提取了以下29个代码示例,用于说明如何使用os.O_NOCTTY。
def open(self): """\ Open port with current settings. This may throw a SerialException if the port cannot be opened.""" if self._port is None: raise SerialException("Port must be configured before it can be used.") if self.is_open: raise SerialException("Port is already open.") self.fd = None # open try: self.fd = os.open(self.portstr, os.O_RDWR | os.O_NOCTTY | os.O_NONBLOCK) except OSError as msg: self.fd = None raise SerialException(msg.errno, "could not open port %s: %s" % (self._port, msg)) #~ fcntl.fcntl(self.fd, fcntl.F_SETFL, 0) # set blocking try: self._reconfigure_port(force_update=True) except: try: os.close(self.fd) except: # ignore any exception when closing the port # also to keep original exception that happened when setting up pass self.fd = None raise else: self.is_open = True if not self._dsrdtr: self._update_dtr_state() if not self._rtscts: self._update_rts_state() self.reset_input_buffer()
def pty_make_controlling_tty(tty_fd): '''This makes the pseudo-terminal the controlling tty. This should be more portable than the pty.fork() function. Specifically, this should work on Solaris. ''' child_name = os.ttyname(tty_fd) # Disconnect from controlling tty, if any. Raises OSError of ENXIO # if there was no controlling tty to begin with, such as when # executed by a cron(1) job. try: fd = os.open("/dev/tty", os.O_RDWR | os.O_NOCTTY) os.close(fd) except OSError as err: if err.errno != errno.ENXIO: raise os.setsid() # Verify we are disconnected from controlling tty by attempting to open # it again. We expect that OSError of ENXIO should always be raised. try: fd = os.open("/dev/tty", os.O_RDWR | os.O_NOCTTY) os.close(fd) raise ExceptionPexpect("OSError of errno.ENXIO should be raised.") except OSError as err: if err.errno != errno.ENXIO: raise # Verify we can open child pty. fd = os.open(child_name, os.O_RDWR) os.close(fd) # Verify we now have a controlling tty. fd = os.open("/dev/tty", os.O_WRONLY) os.close(fd)
def __init__(self, device="/dev/ttyAMA0", baudrate=9600): if not device.startswith("/dev/"): device = "/dev/%s" % device if isinstance(baudrate, str): baudrate = int(baudrate) aname = "B%d" % baudrate if not hasattr(termios, aname): raise Exception("Unsupported baudrate") self.baudrate = baudrate Bus.__init__(self, "UART", device, os.O_RDWR | os.O_NOCTTY) fcntl.fcntl(self.fd, fcntl.F_SETFL, os.O_NDELAY) #backup = termios.tcgetattr(self.fd) options = termios.tcgetattr(self.fd) # iflag options[0] = 0 # oflag options[1] = 0 # cflag options[2] |= (termios.CLOCAL | termios.CREAD) options[2] &= ~termios.PARENB options[2] &= ~termios.CSTOPB options[2] &= ~termios.CSIZE options[2] |= termios.CS8 # lflag options[3] = 0 speed = getattr(termios, aname) # input speed options[4] = speed # output speed options[5] = speed termios.tcsetattr(self.fd, termios.TCSADRAIN, options)
def __pty_make_controlling_tty(self, tty_fd): """This makes the pseudo-terminal the controlling tty. This should be more portable than the pty.fork() function. Specifically, this should work on Solaris. """ child_name = os.ttyname(tty_fd) # Disconnect from controlling tty if still connected. fd = os.open("/dev/tty", os.O_RDWR | os.O_NOCTTY); if fd >= 0: os.close(fd) os.setsid() # Verify we are disconnected from controlling tty try: fd = os.open("/dev/tty", os.O_RDWR | os.O_NOCTTY); if fd >= 0: os.close(fd) raise ExceptionPexpect, "Error! We are not disconnected from a controlling tty." except: # Good! We are disconnected from a controlling tty. pass # Verify we can open child pty. fd = os.open(child_name, os.O_RDWR); if fd < 0: raise ExceptionPexpect, "Error! Could not open child pty, " + child_name else: os.close(fd) # Verify we now have a controlling tty. fd = os.open("/dev/tty", os.O_WRONLY) if fd < 0: raise ExceptionPexpect, "Error! Could not open controlling tty, /dev/tty" else: os.close(fd)
def __pty_make_controlling_tty(self, tty_fd): '''This makes the pseudo-terminal the controlling tty. This should be more portable than the pty.fork() function. Specifically, this should work on Solaris. ''' child_name = os.ttyname(tty_fd) # Disconnect from controlling tty, if any. Raises OSError of ENXIO # if there was no controlling tty to begin with, such as when # executed by a cron(1) job. try: fd = os.open("/dev/tty", os.O_RDWR | os.O_NOCTTY) os.close(fd) except OSError as err: if err.errno != errno.ENXIO: raise os.setsid() # Verify we are disconnected from controlling tty by attempting to open # it again. We expect that OSError of ENXIO should always be raised. try: fd = os.open("/dev/tty", os.O_RDWR | os.O_NOCTTY) os.close(fd) raise ExceptionPexpect("OSError of errno.ENXIO should be raised.") except OSError as err: if err.errno != errno.ENXIO: raise # Verify we can open child pty. fd = os.open(child_name, os.O_RDWR) os.close(fd) # Verify we now have a controlling tty. fd = os.open("/dev/tty", os.O_WRONLY) os.close(fd)
def test_uses_tty_directly(self): with mock.patch('os.open') as open, \ mock.patch('io.FileIO') as fileio, \ mock.patch('io.TextIOWrapper') as textio: # By setting open's return value to None the implementation will # skip code we don't care about in this test. We can mock this out # fully if an alternate implementation works differently. open.return_value = None getpass.unix_getpass() open.assert_called_once_with('/dev/tty', os.O_RDWR | os.O_NOCTTY) fileio.assert_called_once_with(open.return_value, 'w+') textio.assert_called_once_with(fileio.return_value)
def __init__(self): """ Initialize data structures. """ self.actions = [] self.closed = [] self.pipeCount = 0 self.O_RDWR = -1 self.O_NOCTTY = -2 self.WNOHANG = -4 self.WEXITSTATUS = lambda x: 0 self.WIFEXITED = lambda x: 1 self.seteuidCalls = [] self.setegidCalls = []
def pty_make_controlling_tty(self,tty_fd): # borrow this code from pwntools child_name = os.ttyname(tty_fd) # Disconnect from controlling tty. Harmless if not already connected. try: fd = os.open("/dev/tty", os.O_RDWR | os.O_NOCTTY) if fd >= 0: os.close(fd) # which exception, shouldnt' we catch explicitly .. ? except OSError: # Already disconnected. This happens if running inside cron. pass os.setsid() # Verify we are disconnected from controlling tty # by attempting to open it again. try: fd = os.open("/dev/tty", os.O_RDWR | os.O_NOCTTY) if fd >= 0: os.close(fd) raise Exception('Failed to disconnect from ' + 'controlling tty. It is still possible to open /dev/tty.') # which exception, shouldnt' we catch explicitly .. ? except OSError: # Good! We are disconnected from a controlling tty. pass # Verify we can open child pty. fd = os.open(child_name, os.O_RDWR) if fd < 0: raise Exception("Could not open child pty, " + child_name) else: os.close(fd) # Verify we now have a controlling tty. fd = os.open("/dev/tty", os.O_WRONLY) if fd < 0: raise Exception("Could not open controlling tty, /dev/tty") else: os.close(fd)
def __pty_make_controlling_tty(self, tty_fd): """This makes the pseudo-terminal the controlling tty. This should be more portable than the pty.fork() function. Specifically, this should work on Solaris. """ child_name = os.ttyname(tty_fd) # Disconnect from controlling tty if still connected. fd = os.open("/dev/tty", os.O_RDWR | os.O_NOCTTY) if fd >= 0: os.close(fd) os.setsid() # Verify we are disconnected from controlling tty try: fd = os.open("/dev/tty", os.O_RDWR | os.O_NOCTTY) if fd >= 0: os.close(fd) raise ExceptionPexpect, "Error! We are not disconnected from a controlling tty." except: # Good! We are disconnected from a controlling tty. pass # Verify we can open child pty. fd = os.open(child_name, os.O_RDWR) if fd < 0: raise ExceptionPexpect, "Error! Could not open child pty, " + child_name else: os.close(fd) # Verify we now have a controlling tty. fd = os.open("/dev/tty", os.O_WRONLY) if fd < 0: raise ExceptionPexpect, "Error! Could not open controlling tty, /dev/tty" else: os.close(fd)
def open(self): """\ Open port with current settings. This may throw a SerialException if the port cannot be opened.""" if self._port is None: raise SerialException("Port must be configured before it can be used.") if self.is_open: raise SerialException("Port is already open.") self.fd = None # open try: self.fd = os.open(self.portstr, os.O_RDWR | os.O_NOCTTY | os.O_NONBLOCK) except OSError as msg: self.fd = None raise SerialException(msg.errno, "could not open port {}: {}".format(self._port, msg)) #~ fcntl.fcntl(self.fd, fcntl.F_SETFL, 0) # set blocking try: self._reconfigure_port(force_update=True) except: try: os.close(self.fd) except: # ignore any exception when closing the port # also to keep original exception that happened when setting up pass self.fd = None raise else: self.is_open = True try: if not self._dsrdtr: self._update_dtr_state() if not self._rtscts: self._update_rts_state() except IOError as e: if e.errno in (errno.EINVAL, errno.ENOTTY): # ignore Invalid argument and Inappropriate ioctl pass else: raise self.reset_input_buffer() self.pipe_abort_read_r, self.pipe_abort_read_w = os.pipe() self.pipe_abort_write_r, self.pipe_abort_write_w = os.pipe() fcntl.fcntl(self.pipe_abort_read_r, fcntl.F_SETFL, os.O_NONBLOCK) fcntl.fcntl(self.pipe_abort_write_r, fcntl.F_SETFL, os.O_NONBLOCK)
def __pty_make_controlling_tty(self, tty_fd): '''This makes the pseudo-terminal the controlling tty. This should be more portable than the pty.fork() function. Specifically, this should work on Solaris. ''' child_name = os.ttyname(tty_fd) # Disconnect from controlling tty. Harmless if not already connected. try: fd = os.open("/dev/tty", os.O_RDWR | os.O_NOCTTY) if fd >= 0: os.close(fd) # which exception, shouldnt' we catch explicitly .. ? except OSError: # Already disconnected. This happens if running inside cron. pass os.setsid() # Verify we are disconnected from controlling tty # by attempting to open it again. try: fd = os.open("/dev/tty", os.O_RDWR | os.O_NOCTTY) if fd >= 0: os.close(fd) raise Exception('Failed to disconnect from ' + 'controlling tty. It is still possible to open /dev/tty.') # which exception, shouldnt' we catch explicitly .. ? except OSError: # Good! We are disconnected from a controlling tty. pass # Verify we can open child pty. fd = os.open(child_name, os.O_RDWR) if fd < 0: raise Exception("Could not open child pty, " + child_name) else: os.close(fd) # Verify we now have a controlling tty. fd = os.open("/dev/tty", os.O_WRONLY) if fd < 0: raise Exception("Could not open controlling tty, /dev/tty") else: os.close(fd)
def open(self): """\ Open port with current settings. This may throw a SerialException if the port cannot be opened.""" if self._port is None: raise SerialException("Port must be configured before it can be used.") if self.is_open: raise SerialException("Port is already open.") self.fd = None # open try: self.fd = os.open(self.portstr, os.O_RDWR | os.O_NOCTTY | os.O_NONBLOCK) except OSError as msg: self.fd = None raise SerialException(msg.errno, "could not open port {0}: {1}".format(self._port, msg)) #~ fcntl.fcntl(self.fd, fcntl.F_SETFL, 0) # set blocking try: self._reconfigure_port(force_update=True) except: try: os.close(self.fd) except: # ignore any exception when closing the port # also to keep original exception that happened when setting up pass self.fd = None raise else: self.is_open = True try: if not self._dsrdtr: self._update_dtr_state() if not self._rtscts: self._update_rts_state() except IOError as e: if e.errno in (errno.EINVAL, errno.ENOTTY): # ignore Invalid argument and Inappropriate ioctl pass else: raise self.reset_input_buffer() self.pipe_abort_read_r, self.pipe_abort_read_w = os.pipe() self.pipe_abort_write_r, self.pipe_abort_write_w = os.pipe() fcntl.fcntl(self.pipe_abort_read_r, fcntl.F_SETFL, os.O_NONBLOCK) fcntl.fcntl(self.pipe_abort_write_r, fcntl.F_SETFL, os.O_NONBLOCK)
def open(self): """\ Open port with current settings. This may throw a SerialException if the port cannot be opened.""" if self._port is None: raise SerialException("Port must be configured before it can be used.") if self.is_open: raise SerialException("Port is already open.") self.fd = None # open try: self.fd = os.open(self.portstr, os.O_RDWR | os.O_NOCTTY | os.O_NONBLOCK) except OSError as msg: self.fd = None raise SerialException(msg.errno, "could not open port {}: {}".format(self._port, msg)) #~ fcntl.fcntl(self.fd, fcntl.F_SETFL, 0) # set blocking try: self._reconfigure_port(force_update=True) except: try: os.close(self.fd) except: # ignore any exception when closing the port # also to keep original exception that happened when setting up pass self.fd = None raise else: self.is_open = True try: if not self._dsrdtr: self._update_dtr_state() if not self._rtscts: self._update_rts_state() except IOError as e: if e.errno == 22: # ignore Invalid argument pass else: raise self.reset_input_buffer() self.pipe_abort_read_r, self.pipe_abort_read_w = os.pipe() self.pipe_abort_write_r, self.pipe_abort_write_w = os.pipe() fcntl.fcntl(self.pipe_abort_read_r, fcntl.F_SETFL, os.O_NONBLOCK) fcntl.fcntl(self.pipe_abort_write_r, fcntl.F_SETFL, os.O_NONBLOCK)
def open(self): """\ Open port with current settings. This may throw a SerialException if the port cannot be opened.""" if self._port is None: raise SerialException("Port must be configured before it can be used.") if self.is_open: raise SerialException("Port is already open.") self.fd = None # open try: self.fd = os.open(self.portstr, os.O_RDWR | os.O_NOCTTY | os.O_NONBLOCK) except OSError as msg: self.fd = None raise SerialException(msg.errno, "could not open port {}: {}".format(self._port, msg)) #~ fcntl.fcntl(self.fd, fcntl.F_SETFL, 0) # set blocking try: self._reconfigure_port(force_update=True) except: try: os.close(self.fd) except: # ignore any exception when closing the port # also to keep original exception that happened when setting up pass self.fd = None raise else: self.is_open = True try: if not self._dsrdtr: self._update_dtr_state() if not self._rtscts: self._update_rts_state() except IOError as e: if e.errno in (22, 25): # ignore Invalid argument and Inappropriate ioctl pass else: raise self.reset_input_buffer() self.pipe_abort_read_r, self.pipe_abort_read_w = os.pipe() self.pipe_abort_write_r, self.pipe_abort_write_w = os.pipe() fcntl.fcntl(self.pipe_abort_read_r, fcntl.F_SETFL, os.O_NONBLOCK) fcntl.fcntl(self.pipe_abort_write_r, fcntl.F_SETFL, os.O_NONBLOCK)
def pty_make_controlling_tty(tty_fd): """ This makes the pseudo-terminal the controlling tty. This should be more portable than the pty.fork() function. Specifically, this should work on Solaris. Thanks to pexpect: http://pexpect.sourceforge.net/pexpect.html """ child_name = os.ttyname(tty_fd) # Disconnect from controlling tty. Harmless if not already connected. try: fd = os.open("/dev/tty", os.O_RDWR | os.O_NOCTTY) if fd >= 0: os.close(fd) # which exception, shouldnt' we catch explicitly .. ? except: # Already disconnected. This happens if running inside cron. pass os.setsid() # Verify we are disconnected from controlling tty # by attempting to open it again. try: fd = os.open("/dev/tty", os.O_RDWR | os.O_NOCTTY) if fd >= 0: os.close(fd) raise Exception('Failed to disconnect from controlling ' 'tty. It is still possible to open /dev/tty.') # which exception, shouldnt' we catch explicitly .. ? except: # Good! We are disconnected from a controlling tty. pass # Verify we can open child pty. fd = os.open(child_name, os.O_RDWR) if fd < 0: raise Exception("Could not open child pty, " + child_name) else: os.close(fd) # Verify we now have a controlling tty. if os.name != 'posix': # Skip this on BSD-like systems since it will break. fd = os.open("/dev/tty", os.O_WRONLY) if fd < 0: raise Exception("Could not open controlling tty, /dev/tty") else: os.close(fd)