Python errno 模块,ENOTTY 实例源码

我们从Python开源项目中,提取了以下15个代码示例,用于说明如何使用errno.ENOTTY

项目:iotronic    作者:openstack    | 项目源码 | 文件源码
def _is_daemon():
    # The process group for a foreground process will match the
    # process group of the controlling terminal. If those values do
    # not match, or ioctl() fails on the stdout file handle, we assume
    # the process is running in the background as a daemon.
    # http://www.gnu.org/software/bash/manual/bashref.html#Job-Control-Basics
    try:
        is_daemon = os.getpgrp() != os.tcgetpgrp(sys.stdout.fileno())
    except OSError as err:
        if err.errno == errno.ENOTTY:
            # Assume we are a daemon because there is no terminal.
            is_daemon = True
        else:
            raise
    except UnsupportedOperation:
        # Could not get the fileno for stdout, so we must be a daemon.
        is_daemon = True
    return is_daemon
项目:weibo    作者:windskyer    | 项目源码 | 文件源码
def _is_daemon():
    # The process group for a foreground process will match the
    # process group of the controlling terminal. If those values do
    # not match, or ioctl() fails on the stdout file handle, we assume
    # the process is running in the background as a daemon.
    # http://www.gnu.org/software/bash/manual/bashref.html#Job-Control-Basics
    try:
        is_daemon = os.getpgrp() != os.tcgetpgrp(sys.stdout.fileno())
    except io.UnsupportedOperation:
        # Could not get the fileno for stdout, so we must be a daemon.
        is_daemon = True
    except OSError as err:
        if err.errno == errno.ENOTTY:
            # Assume we are a daemon because there is no terminal.
            is_daemon = True
        else:
            raise
    return is_daemon
项目:web_ctp    作者:molebot    | 项目源码 | 文件源码
def test_does_not_crash(self):
        """Check if get_terminal_size() returns a meaningful value.

        There's no easy portable way to actually check the size of the
        terminal, so let's check if it returns something sensible instead.
        """
        try:
            size = os.get_terminal_size()
        except OSError as e:
            if sys.platform == "win32" or e.errno in (errno.EINVAL, errno.ENOTTY):
                # Under win32 a generic OSError can be thrown if the
                # handle cannot be retrieved
                self.skipTest("failed to query terminal size")
            raise

        self.assertGreaterEqual(size.columns, 0)
        self.assertGreaterEqual(size.lines, 0)
项目:web_ctp    作者:molebot    | 项目源码 | 文件源码
def test_stty_match(self):
        """Check if stty returns the same results

        stty actually tests stdin, so get_terminal_size is invoked on
        stdin explicitly. If stty succeeded, then get_terminal_size()
        should work too.
        """
        try:
            size = subprocess.check_output(['stty', 'size']).decode().split()
        except (FileNotFoundError, subprocess.CalledProcessError):
            self.skipTest("stty invocation failed")
        expected = (int(size[1]), int(size[0])) # reversed order

        try:
            actual = os.get_terminal_size(sys.__stdin__.fileno())
        except OSError as e:
            if sys.platform == "win32" or e.errno in (errno.EINVAL, errno.ENOTTY):
                # Under win32 a generic OSError can be thrown if the
                # handle cannot be retrieved
                self.skipTest("failed to query terminal size")
            raise
        self.assertEqual(expected, actual)
项目:ouroboros    作者:pybee    | 项目源码 | 文件源码
def test_does_not_crash(self):
        """Check if get_terminal_size() returns a meaningful value.

        There's no easy portable way to actually check the size of the
        terminal, so let's check if it returns something sensible instead.
        """
        try:
            size = os.get_terminal_size()
        except OSError as e:
            if sys.platform == "win32" or e.errno in (errno.EINVAL, errno.ENOTTY):
                # Under win32 a generic OSError can be thrown if the
                # handle cannot be retrieved
                self.skipTest("failed to query terminal size")
            raise

        self.assertGreaterEqual(size.columns, 0)
        self.assertGreaterEqual(size.lines, 0)
项目:ouroboros    作者:pybee    | 项目源码 | 文件源码
def test_stty_match(self):
        """Check if stty returns the same results

        stty actually tests stdin, so get_terminal_size is invoked on
        stdin explicitly. If stty succeeded, then get_terminal_size()
        should work too.
        """
        try:
            size = subprocess.check_output(['stty', 'size']).decode().split()
        except (FileNotFoundError, subprocess.CalledProcessError):
            self.skipTest("stty invocation failed")
        expected = (int(size[1]), int(size[0])) # reversed order

        try:
            actual = os.get_terminal_size(sys.__stdin__.fileno())
        except OSError as e:
            if sys.platform == "win32" or e.errno in (errno.EINVAL, errno.ENOTTY):
                # Under win32 a generic OSError can be thrown if the
                # handle cannot be retrieved
                self.skipTest("failed to query terminal size")
            raise
        self.assertEqual(expected, actual)
项目:kbe_server    作者:xiaohaoppy    | 项目源码 | 文件源码
def test_does_not_crash(self):
        """Check if get_terminal_size() returns a meaningful value.

        There's no easy portable way to actually check the size of the
        terminal, so let's check if it returns something sensible instead.
        """
        try:
            size = os.get_terminal_size()
        except OSError as e:
            if sys.platform == "win32" or e.errno in (errno.EINVAL, errno.ENOTTY):
                # Under win32 a generic OSError can be thrown if the
                # handle cannot be retrieved
                self.skipTest("failed to query terminal size")
            raise

        self.assertGreaterEqual(size.columns, 0)
        self.assertGreaterEqual(size.lines, 0)
项目:kbe_server    作者:xiaohaoppy    | 项目源码 | 文件源码
def test_stty_match(self):
        """Check if stty returns the same results

        stty actually tests stdin, so get_terminal_size is invoked on
        stdin explicitly. If stty succeeded, then get_terminal_size()
        should work too.
        """
        try:
            size = subprocess.check_output(['stty', 'size']).decode().split()
        except (FileNotFoundError, subprocess.CalledProcessError):
            self.skipTest("stty invocation failed")
        expected = (int(size[1]), int(size[0])) # reversed order

        try:
            actual = os.get_terminal_size(sys.__stdin__.fileno())
        except OSError as e:
            if sys.platform == "win32" or e.errno in (errno.EINVAL, errno.ENOTTY):
                # Under win32 a generic OSError can be thrown if the
                # handle cannot be retrieved
                self.skipTest("failed to query terminal size")
            raise
        self.assertEqual(expected, actual)
项目:tm-librarian    作者:FabricAttachedMemory    | 项目源码 | 文件源码
def setxattr(self, path, xattr, valbytes, flags, position=0):
        # flags from linux/xattr.h: XATTR_CREATE = 1, XATTR_REPLACE = 2
        if flags or position:
            raise TmfsOSError(errno.ENOSYS)     # haven't actually seen it yet

        # 'Extend' user.xxxx syntax and screen for it here
        elems = xattr.split('.')
        if elems[0] != 'user' or len(elems) < 2:
            raise TmfsOSError(errno.EINVAL)

        # Don't forget the setfattr command, and the shell it runs in, does
        # things to a "numeric" argument.  setfattr processes a leading
        # 0x and does a byte-by-byte conversion, yielding a byte array.
        # It needs pairs of digits and can be of arbitrary length.  Any
        # other argument ends up here as a pure string (well, byte array).
        try:
            value = valbytes.decode()
        except ValueError as e:
            # http://stackoverflow.com/questions/606191/convert-bytes-to-a-python-string
            value = valbytes.decode('cp437')

        rsp = self.librarian(
                self.lcp('set_xattr', path=path,
                         xattr=xattr, value=value))
        if rsp is not None:  # unexpected
            raise TmfsOSError(errno.ENOTTY)
项目:tm-librarian    作者:FabricAttachedMemory    | 项目源码 | 文件源码
def removexattr(self, path, xattr):
        rsp = self.librarian(
            self.lcp('remove_xattr', path=path, xattr=xattr))
        if rsp is not None:  # unexpected
            raise TmfsOSError(errno.ENOTTY)
项目:gcodeplot    作者:arpruss    | 项目源码 | 文件源码
def open(self):
        """\
        Open port with current settings. This may throw a SerialException
        if the port cannot be opened."""
        if self._port is None:
            raise SerialException("Port must be configured before it can be used.")
        if self.is_open:
            raise SerialException("Port is already open.")
        self.fd = None
        # open
        try:
            self.fd = os.open(self.portstr, os.O_RDWR | os.O_NOCTTY | os.O_NONBLOCK)
        except OSError as msg:
            self.fd = None
            raise SerialException(msg.errno, "could not open port {}: {}".format(self._port, msg))
        #~ fcntl.fcntl(self.fd, fcntl.F_SETFL, 0)  # set blocking

        try:
            self._reconfigure_port(force_update=True)
        except:
            try:
                os.close(self.fd)
            except:
                # ignore any exception when closing the port
                # also to keep original exception that happened when setting up
                pass
            self.fd = None
            raise
        else:
            self.is_open = True
        try:
            if not self._dsrdtr:
                self._update_dtr_state()
            if not self._rtscts:
                self._update_rts_state()
        except IOError as e:
            if e.errno in (errno.EINVAL, errno.ENOTTY):
                # ignore Invalid argument and Inappropriate ioctl
                pass
            else:
                raise
        self.reset_input_buffer()
        self.pipe_abort_read_r, self.pipe_abort_read_w = os.pipe()
        self.pipe_abort_write_r, self.pipe_abort_write_w = os.pipe()
        fcntl.fcntl(self.pipe_abort_read_r, fcntl.F_SETFL, os.O_NONBLOCK)
        fcntl.fcntl(self.pipe_abort_write_r, fcntl.F_SETFL, os.O_NONBLOCK)
项目:bitio    作者:whaleygeek    | 项目源码 | 文件源码
def open(self):
        """\
        Open port with current settings. This may throw a SerialException
        if the port cannot be opened."""
        if self._port is None:
            raise SerialException("Port must be configured before it can be used.")
        if self.is_open:
            raise SerialException("Port is already open.")
        self.fd = None
        # open
        try:
            self.fd = os.open(self.portstr, os.O_RDWR | os.O_NOCTTY | os.O_NONBLOCK)
        except OSError as msg:
            self.fd = None
            raise SerialException(msg.errno, "could not open port {}: {}".format(self._port, msg))
        #~ fcntl.fcntl(self.fd, fcntl.F_SETFL, 0)  # set blocking

        try:
            self._reconfigure_port(force_update=True)
        except:
            try:
                os.close(self.fd)
            except:
                # ignore any exception when closing the port
                # also to keep original exception that happened when setting up
                pass
            self.fd = None
            raise
        else:
            self.is_open = True
        try:
            if not self._dsrdtr:
                self._update_dtr_state()
            if not self._rtscts:
                self._update_rts_state()
        except IOError as e:
            if e.errno in (errno.EINVAL, errno.ENOTTY):
                # ignore Invalid argument and Inappropriate ioctl
                pass
            else:
                raise
        self.reset_input_buffer()
        self.pipe_abort_read_r, self.pipe_abort_read_w = os.pipe()
        self.pipe_abort_write_r, self.pipe_abort_write_w = os.pipe()
        fcntl.fcntl(self.pipe_abort_read_r, fcntl.F_SETFL, os.O_NONBLOCK)
        fcntl.fcntl(self.pipe_abort_write_r, fcntl.F_SETFL, os.O_NONBLOCK)
项目:ddt4all    作者:cedricp    | 项目源码 | 文件源码
def open(self):
        """\
        Open port with current settings. This may throw a SerialException
        if the port cannot be opened."""
        if self._port is None:
            raise SerialException("Port must be configured before it can be used.")
        if self.is_open:
            raise SerialException("Port is already open.")
        self.fd = None
        # open
        try:
            self.fd = os.open(self.portstr, os.O_RDWR | os.O_NOCTTY | os.O_NONBLOCK)
        except OSError as msg:
            self.fd = None
            raise SerialException(msg.errno, "could not open port {0}: {1}".format(self._port, msg))
        #~ fcntl.fcntl(self.fd, fcntl.F_SETFL, 0)  # set blocking

        try:
            self._reconfigure_port(force_update=True)
        except:
            try:
                os.close(self.fd)
            except:
                # ignore any exception when closing the port
                # also to keep original exception that happened when setting up
                pass
            self.fd = None
            raise
        else:
            self.is_open = True
        try:
            if not self._dsrdtr:
                self._update_dtr_state()
            if not self._rtscts:
                self._update_rts_state()
        except IOError as e:
            if e.errno in (errno.EINVAL, errno.ENOTTY):
                # ignore Invalid argument and Inappropriate ioctl
                pass
            else:
                raise
        self.reset_input_buffer()
        self.pipe_abort_read_r, self.pipe_abort_read_w = os.pipe()
        self.pipe_abort_write_r, self.pipe_abort_write_w = os.pipe()
        fcntl.fcntl(self.pipe_abort_read_r, fcntl.F_SETFL, os.O_NONBLOCK)
        fcntl.fcntl(self.pipe_abort_write_r, fcntl.F_SETFL, os.O_NONBLOCK)
项目:Jackal_Velodyne_Duke    作者:MengGuo    | 项目源码 | 文件源码
def open(self):
        """\
        Open port with current settings. This may throw a SerialException
        if the port cannot be opened."""
        if self._port is None:
            raise SerialException("Port must be configured before it can be used.")
        if self.is_open:
            raise SerialException("Port is already open.")
        self.fd = None
        # open
        try:
            self.fd = os.open(self.portstr, os.O_RDWR | os.O_NOCTTY | os.O_NONBLOCK)
        except OSError as msg:
            self.fd = None
            raise SerialException(msg.errno, "could not open port {}: {}".format(self._port, msg))
        #~ fcntl.fcntl(self.fd, fcntl.F_SETFL, 0)  # set blocking

        try:
            self._reconfigure_port(force_update=True)
        except:
            try:
                os.close(self.fd)
            except:
                # ignore any exception when closing the port
                # also to keep original exception that happened when setting up
                pass
            self.fd = None
            raise
        else:
            self.is_open = True
        try:
            if not self._dsrdtr:
                self._update_dtr_state()
            if not self._rtscts:
                self._update_rts_state()
        except IOError as e:
            if e.errno in (errno.EINVAL, errno.ENOTTY):
                # ignore Invalid argument and Inappropriate ioctl
                pass
            else:
                raise
        self.reset_input_buffer()
        self.pipe_abort_read_r, self.pipe_abort_read_w = os.pipe()
        self.pipe_abort_write_r, self.pipe_abort_write_w = os.pipe()
        fcntl.fcntl(self.pipe_abort_read_r, fcntl.F_SETFL, os.O_NONBLOCK)
        fcntl.fcntl(self.pipe_abort_write_r, fcntl.F_SETFL, os.O_NONBLOCK)
项目:Jackal_Velodyne_Duke    作者:MengGuo    | 项目源码 | 文件源码
def open(self):
        """\
        Open port with current settings. This may throw a SerialException
        if the port cannot be opened."""
        if self._port is None:
            raise SerialException("Port must be configured before it can be used.")
        if self.is_open:
            raise SerialException("Port is already open.")
        self.fd = None
        # open
        try:
            self.fd = os.open(self.portstr, os.O_RDWR | os.O_NOCTTY | os.O_NONBLOCK)
        except OSError as msg:
            self.fd = None
            raise SerialException(msg.errno, "could not open port {}: {}".format(self._port, msg))
        #~ fcntl.fcntl(self.fd, fcntl.F_SETFL, 0)  # set blocking

        try:
            self._reconfigure_port(force_update=True)
        except:
            try:
                os.close(self.fd)
            except:
                # ignore any exception when closing the port
                # also to keep original exception that happened when setting up
                pass
            self.fd = None
            raise
        else:
            self.is_open = True
        try:
            if not self._dsrdtr:
                self._update_dtr_state()
            if not self._rtscts:
                self._update_rts_state()
        except IOError as e:
            if e.errno in (errno.EINVAL, errno.ENOTTY):
                # ignore Invalid argument and Inappropriate ioctl
                pass
            else:
                raise
        self.reset_input_buffer()
        self.pipe_abort_read_r, self.pipe_abort_read_w = os.pipe()
        self.pipe_abort_write_r, self.pipe_abort_write_w = os.pipe()
        fcntl.fcntl(self.pipe_abort_read_r, fcntl.F_SETFL, os.O_NONBLOCK)
        fcntl.fcntl(self.pipe_abort_write_r, fcntl.F_SETFL, os.O_NONBLOCK)