Python errno 模块,EFAULT 实例源码

我们从Python开源项目中,提取了以下13个代码示例,用于说明如何使用errno.EFAULT

项目:annotated-py-asyncio    作者:hhstore    | 项目源码 | 文件源码
def test_add_signal_handler_install_error(self, m_signal):
        m_signal.NSIG = signal.NSIG

        def set_wakeup_fd(fd):
            if fd == -1:
                raise ValueError()
        m_signal.set_wakeup_fd = set_wakeup_fd

        class Err(OSError):
            errno = errno.EFAULT
        m_signal.signal.side_effect = Err

        self.assertRaises(
            Err,
            self.loop.add_signal_handler,
            signal.SIGINT, lambda: True)
项目:nfcpy    作者:nfcpy    | 项目源码 | 文件源码
def test_bind_by_addr(self, llc, raw, ldl, dlc):
            llc.bind(raw, 16)
            assert llc.getsockname(raw) == 16
            for i, sock in enumerate([ldl, dlc]):
                with pytest.raises(nfc.llcp.Error) as excinfo:
                    llc.bind(sock, 16)
                assert excinfo.value.errno == errno.EACCES
            llc.bind(ldl, 63)
            assert llc.getsockname(ldl) == 63
            with pytest.raises(nfc.llcp.Error) as excinfo:
                llc.bind(dlc, 63)
            assert excinfo.value.errno == errno.EADDRINUSE
            with pytest.raises(nfc.llcp.Error) as excinfo:
                llc.bind(dlc, 64)
            assert excinfo.value.errno == errno.EFAULT
            with pytest.raises(nfc.llcp.Error) as excinfo:
                llc.bind(dlc, -1)
            assert excinfo.value.errno == errno.EFAULT
            llc.bind(dlc, 62)
            assert llc.getsockname(dlc) == 62
项目:nfcpy    作者:nfcpy    | 项目源码 | 文件源码
def test_bind_by_name(self, llc, raw, ldl, dlc):
            with pytest.raises(nfc.llcp.Error) as excinfo:
                llc.bind(dlc, 'urn:nfc:snep')
            assert excinfo.value.errno == errno.EFAULT
            llc.bind(dlc, 'urn:nfc:sn:snep')
            assert llc.getsockname(dlc) == 4
            with pytest.raises(nfc.llcp.Error) as excinfo:
                llc.bind(ldl, 'urn:nfc:sn:snep')
            assert excinfo.value.errno == errno.EADDRINUSE
            llc.bind(ldl, 'urn:nfc:xsn:nfcpy.org:service')
            assert llc.getsockname(ldl) == 16
            for sap in range(17, 32):
                sock = llc.socket(nfc.llcp.llc.RAW_ACCESS_POINT)
                llc.bind(sock, 'urn:nfc:sn:use_sap-{}'.format(sap))
                assert llc.getsockname(sock) == sap
            with pytest.raises(nfc.llcp.Error) as excinfo:
                llc.bind(raw, 'urn:nfc:sn:sap-32')
            assert excinfo.value.errno == errno.EADDRNOTAVAIL
项目:nfcpy    作者:nfcpy    | 项目源码 | 文件源码
def _bind_by_name(self, socket, name):
        if not service_name_format.match(name):
            raise err.Error(errno.EFAULT)

        with self.lock:
            if self.snl.get(name) is not None:
                raise err.Error(errno.EADDRINUSE)
            addr = wks_map.get(name)
            if addr is None:
                try:
                    addr = 16 + self.sap[16:32].index(None)
                except ValueError:
                    raise err.Error(errno.EADDRNOTAVAIL)
            socket.bind(addr)
            self.sap[addr] = ServiceAccessPoint(addr, self)
            self.sap[addr].insert_socket(socket)
            self.snl[name] = addr
项目:ouroboros    作者:pybee    | 项目源码 | 文件源码
def test_add_signal_handler_install_error(self, m_signal):
        m_signal.NSIG = signal.NSIG

        def set_wakeup_fd(fd):
            if fd == -1:
                raise ValueError()
        m_signal.set_wakeup_fd = set_wakeup_fd

        class Err(OSError):
            errno = errno.EFAULT
        m_signal.signal.side_effect = Err

        self.assertRaises(
            Err,
            self.loop.add_signal_handler,
            signal.SIGINT, lambda: True)
项目:kbe_server    作者:xiaohaoppy    | 项目源码 | 文件源码
def test_add_signal_handler_install_error(self, m_signal):
        m_signal.NSIG = signal.NSIG

        def set_wakeup_fd(fd):
            if fd == -1:
                raise ValueError()
        m_signal.set_wakeup_fd = set_wakeup_fd

        class Err(OSError):
            errno = errno.EFAULT
        m_signal.signal.side_effect = Err

        self.assertRaises(
            Err,
            self.loop.add_signal_handler,
            signal.SIGINT, lambda: True)
项目:manticore    作者:trailofbits    | 项目源码 | 文件源码
def sys_read(self, fd, buf, count):
        data = ''
        if count != 0:
            # TODO check count bytes from buf
            if not buf in self.current.memory: # or not  self.current.memory.isValid(buf+count):
                logger.info("READ: buf points to invalid address. Returning EFAULT")
                return -errno.EFAULT

            try:
                # Read the data and put in tin memory
                data = self._get_fd(fd).read(count)
            except BadFd:
                logger.info(("READ: Not valid file descriptor on read."
                             " Returning EBADF"))
                return -errno.EBADF
            self.syscall_trace.append(("_read", fd, data))
            self.current.write_bytes(buf, data)

        return len(data)
项目:manticore    作者:trailofbits    | 项目源码 | 文件源码
def sys_readlink(self, path, buf, bufsize):
        '''
        Read
        :rtype: int

        :param path: the "link path id"
        :param buf: the buffer where the bytes will be putted.
        :param bufsize: the max size for read the link.
        :todo: Out eax number of bytes actually sent | EAGAIN | EBADF | EFAULT | EINTR | errno.EINVAL | EIO | ENOSPC | EPIPE
        '''
        if bufsize <= 0:
            return -errno.EINVAL
        filename = self.current.read_string(path)
        if filename == '/proc/self/exe':
            data = os.path.abspath(self.program)
        else:
            data = os.readlink(filename)[:bufsize]
        self.current.write_bytes(buf, data)
        return len(data)
项目:nfcpy    作者:nfcpy    | 项目源码 | 文件源码
def bind(self, socket, addr_or_name=None):
        if not isinstance(socket, tco.TransmissionControlObject):
            raise err.Error(errno.ENOTSOCK)
        if socket.addr is not None:
            raise err.Error(errno.EINVAL)
        if addr_or_name is None:
            self._bind_by_none(socket)
        elif isinstance(addr_or_name, int):
            self._bind_by_addr(socket, addr_or_name)
        elif isinstance(addr_or_name, bytes):
            self._bind_by_name(socket, addr_or_name)
        else:
            raise err.Error(errno.EFAULT)
项目:nfcpy    作者:nfcpy    | 项目源码 | 文件源码
def _bind_by_addr(self, socket, addr):
        if addr < 0 or addr > 63:
            raise err.Error(errno.EFAULT)
        with self.lock:
            if addr in range(32, 64) or isinstance(socket, tco.RawAccessPoint):
                if self.sap[addr] is None:
                    socket.bind(addr)
                    self.sap[addr] = ServiceAccessPoint(addr, self)
                    self.sap[addr].insert_socket(socket)
                else:
                    raise err.Error(errno.EADDRINUSE)
            else:
                raise err.Error(errno.EACCES)
项目:manticore    作者:trailofbits    | 项目源码 | 文件源码
def sys_getcwd(self, buf, size):
        '''
        getcwd - Get the current working directory
        :param int buf: Pointer to dest array
        :param size: size in bytes of the array pointed to by the buf 
        :return: buf (Success), or 0
        '''

        try:
            current_dir = os.getcwd()          
            length = len(current_dir) + 1

            if size > 0 and size < length:
                logger.info("GETCWD: size is greater than 0, but is smaller than the length"  
                            "of the path + 1. Returning ERANGE")
                return -errno.ERANGE

            if not self.current.memory.access_ok(slice(buf, buf+length), 'w'):
                logger.info("GETCWD: buf within invalid memory. Returning EFAULT")
                return -errno.EFAULT

            self.current.write_string(buf, current_dir)
            logger.debug("getcwd(0x%08x, %u) -> <%s> (Size %d)", buf, size, current_dir, length)
            return length

        except OSError as e:
            return -e.errno
项目:trio    作者:python-trio    | 项目源码 | 文件源码
def test_SocketListener_accept_errors():
    class FakeSocket(tsocket.SocketType):
        def __init__(self, events):
            self._events = iter(events)

        type = tsocket.SOCK_STREAM

        # Fool the check for SO_ACCEPTCONN in SocketListener.__init__
        def getsockopt(self, level, opt):
            return True

        def setsockopt(self, level, opt, value):
            pass

        # Fool the check for connection in SocketStream.__init__
        def getpeername(self):
            pass

        async def accept(self):
            await _core.checkpoint()
            event = next(self._events)
            if isinstance(event, BaseException):
                raise event
            else:
                return event, None

    fake_server_sock = FakeSocket([])

    fake_listen_sock = FakeSocket(
        [
            OSError(errno.ECONNABORTED, "Connection aborted"),
            OSError(errno.EPERM, "Permission denied"),
            OSError(errno.EPROTO, "Bad protocol"),
            fake_server_sock,
            OSError(errno.EMFILE, "Out of file descriptors"),
            OSError(errno.EFAULT, "attempt to write to read-only memory"),
            OSError(errno.ENOBUFS, "out of buffers"),
            fake_server_sock,
        ]
    )

    l = SocketListener(fake_listen_sock)

    with assert_checkpoints():
        s = await l.accept()
        assert s.socket is fake_server_sock

    for code in [errno.EMFILE, errno.EFAULT, errno.ENOBUFS]:
        with assert_checkpoints():
            with pytest.raises(OSError) as excinfo:
                await l.accept()
            assert excinfo.value.errno == code

    with assert_checkpoints():
        s = await l.accept()
        assert s.socket is fake_server_sock
项目:manticore    作者:trailofbits    | 项目源码 | 文件源码
def sys_write(self, fd, buf, count):
        ''' write - send bytes through a file descriptor
          The write system call writes up to count bytes from the buffer pointed
          to by buf to the file descriptor fd. If count is zero, write returns 0
          and optionally sets *tx_bytes to zero.

          :param fd            a valid file descriptor
          :param buf           a memory buffer
          :param count         number of bytes to send
          :return: 0          Success
                    EBADF      fd is not a valid file descriptor or is not open.
                    EFAULT     buf or tx_bytes points to an invalid address.
        '''
        data = []
        cpu = self.current
        if count != 0:
            try:
                write_fd = self._get_fd(fd)
            except BadFd:
                logger.error("WRITE: Not valid file descriptor. Returning EBADFD %d", fd)
                return -errno.EBADF

            # TODO check count bytes from buf
            if buf not in cpu.memory or buf + count not in cpu.memory:
                logger.debug("WRITE: buf points to invalid address. Returning EFAULT")
                return -errno.EFAULT

            if fd > 2 and write_fd.is_full():
                cpu.PC -= cpu.instruction.size
                self.wait([], [fd], None)
                raise RestartSyscall()

            data = cpu.read_bytes(buf, count)
            data = self._transform_write_data(data)
            write_fd.write(data)

            for line in ''.join([str(x) for x in data]).split('\n'):
                logger.debug("WRITE(%d, 0x%08x, %d) -> <%.48r>",
                             fd,
                             buf,
                             count,
                             line)
            self.syscall_trace.append(("_write", fd, data))
            self.signal_transmit(fd)

        return len(data)