我们从Python开源项目中,提取了以下16个代码示例,用于说明如何使用errno.ENXIO。
def _open_ipipes(wds, fifos, input_pipes): """ This will attempt to open the named pipes in the set of ``fifos`` for writing, which will only succeed if the subprocess has opened them for reading already. This modifies and returns the list of write descriptors, the list of waiting fifo names, and the mapping back to input adapters. """ for fifo in fifos.copy(): try: fd = os.open(fifo, os.O_WRONLY | os.O_NONBLOCK) input_pipes[fd] = fifos.pop(fifo) wds.append(fd) except OSError as e: if e.errno != errno.ENXIO: raise e return wds, fifos, input_pipes
def is_sparse(path): # LP: #1656371 - Looking at stat().st_blocks won't work on ZFS file # systems, since that seems to return 1, whereas on EXT4 it returns 0. # Rather than hard code the value based on file system type (which could # be different even on other file systems), a more reliable way seems to # be to use SEEK_DATA with an offset of 0 to find the first block of data # after position 0. If there is no data, an ENXIO will get raised, at # least on any modern Linux kernels we care about. See lseek(2) for # details. with open(path, 'r') as fp: try: os.lseek(fp.fileno(), 0, os.SEEK_DATA) except OSError as error: # There is no OSError subclass for ENXIO. if error.errno != errno.ENXIO: raise # The expected exception occurred, meaning, there is no data in # the file, so it's entirely sparse. return True # The expected exception did not occur, so there is data in the file. return False
def myconnect(sock, host, port): '''myconnect(sock, host, port) -- attempt to make a network connection with the given socket and host-port pair; raises our new NetworkError exception and collates error number and reason.''' try: sock.connect(host, port) except socket.error, args: myargs = updArgs(args) # convert inst to tuple if len(myargs) == 1: # no #s on some errors myargs = (errno.ENXIO, myargs[0]) raise NetworkError, \ updArgs(myargs, host + ':' + str(port)) # myopen() --> file object
def open(self, flags): if self._fd is None: if not os.path.exists(self.path): raise Exception('Input pipe does not exist: %s' % self._path) if not stat.S_ISFIFO(os.stat(self.path).st_mode): raise Exception('Input pipe must be a fifo object: %s' % self._path) try: self._fd = os.open(self.path, flags) except OSError as e: if e.errno != errno.ENXIO: raise e
def pty_make_controlling_tty(tty_fd): '''This makes the pseudo-terminal the controlling tty. This should be more portable than the pty.fork() function. Specifically, this should work on Solaris. ''' child_name = os.ttyname(tty_fd) # Disconnect from controlling tty, if any. Raises OSError of ENXIO # if there was no controlling tty to begin with, such as when # executed by a cron(1) job. try: fd = os.open("/dev/tty", os.O_RDWR | os.O_NOCTTY) os.close(fd) except OSError as err: if err.errno != errno.ENXIO: raise os.setsid() # Verify we are disconnected from controlling tty by attempting to open # it again. We expect that OSError of ENXIO should always be raised. try: fd = os.open("/dev/tty", os.O_RDWR | os.O_NOCTTY) os.close(fd) raise ExceptionPexpect("OSError of errno.ENXIO should be raised.") except OSError as err: if err.errno != errno.ENXIO: raise # Verify we can open child pty. fd = os.open(child_name, os.O_RDWR) os.close(fd) # Verify we now have a controlling tty. fd = os.open("/dev/tty", os.O_WRONLY) os.close(fd)
def __pty_make_controlling_tty(self, tty_fd): '''This makes the pseudo-terminal the controlling tty. This should be more portable than the pty.fork() function. Specifically, this should work on Solaris. ''' child_name = os.ttyname(tty_fd) # Disconnect from controlling tty, if any. Raises OSError of ENXIO # if there was no controlling tty to begin with, such as when # executed by a cron(1) job. try: fd = os.open("/dev/tty", os.O_RDWR | os.O_NOCTTY) os.close(fd) except OSError as err: if err.errno != errno.ENXIO: raise os.setsid() # Verify we are disconnected from controlling tty by attempting to open # it again. We expect that OSError of ENXIO should always be raised. try: fd = os.open("/dev/tty", os.O_RDWR | os.O_NOCTTY) os.close(fd) raise ExceptionPexpect("OSError of errno.ENXIO should be raised.") except OSError as err: if err.errno != errno.ENXIO: raise # Verify we can open child pty. fd = os.open(child_name, os.O_RDWR) os.close(fd) # Verify we now have a controlling tty. fd = os.open("/dev/tty", os.O_WRONLY) os.close(fd)
def myconnect(sock, host, port): try: sock.connect((host, port)) except socket.error, args: myargs = updArgs(args) # convert inst to tuple if len(myargs) == 1: # no #s on some errors myargs = (errno.ENXIO, myargs[0]) raise NetworkError, \ updArgs(myargs, host + ':' + str(port))
def run(self): try: # Try to open the FIFO nonblocking, so we can periodically check # if the main thread wants us to wind down. If it does, there's no # more need for the FIFO, so stop the thread. while True: try: fd = os.open(self.fifo_path, os.O_WRONLY | os.O_NONBLOCK) except OSError as error: if error.errno != errno.ENXIO: raise elif self._want_join.is_set(): return else: break # Now clear the fd's nonblocking flag to let writes block normally. fcntl.fcntl(fd, fcntl.F_SETFL, 0) with open(fd, 'wb') as fifo: # The queue works around a unified diff limitation: if there's # no newlines in both don't make it a difference end_nl = self.feeder(fifo) self.end_nl_q.put(end_nl) except Exception as error: self._exception = error
def myconnect(sock, host, port): try: sock.connect((host, port)) except socket.error, args: myargs = updArgs(args) # conv inst2tuple if len(myargs) == 1: # no #s on some errs myargs = (errno.ENXIO, myargs[0]) raise NetworkError, \ updArgs(myargs, host + ':' + str(port))
def recvfd(self): """Receive a file descriptor via the pipe.""" if self.closed: self.debug_msg("recvfd", "failed (closed)") raise IOError( "sendfd() called for closed {0}".format( type(self).__name__)) try: fcntl_args = struct.pack('i', -1) fcntl_rv = fcntl.ioctl(self.__pipefd, fcntl.I_RECVFD, fcntl_args) fd = struct.unpack('i', fcntl_rv)[0] except IOError as e: if e.errno == errno.ENXIO: # other end of the connection was closed return -1 self.debug_msg("recvfd", "failed") raise e assert fd != -1 # debugging self.debug_dumpfd("recvfd", fd) # reset the current file pointer si = os.fstat(fd) if stat.S_ISREG(si.st_mode): os.lseek(fd, os.SEEK_SET, 0) return fd