我们从Python开源项目中,提取了以下13个代码示例,用于说明如何使用errno.EFAULT。
def test_add_signal_handler_install_error(self, m_signal): m_signal.NSIG = signal.NSIG def set_wakeup_fd(fd): if fd == -1: raise ValueError() m_signal.set_wakeup_fd = set_wakeup_fd class Err(OSError): errno = errno.EFAULT m_signal.signal.side_effect = Err self.assertRaises( Err, self.loop.add_signal_handler, signal.SIGINT, lambda: True)
def test_bind_by_addr(self, llc, raw, ldl, dlc): llc.bind(raw, 16) assert llc.getsockname(raw) == 16 for i, sock in enumerate([ldl, dlc]): with pytest.raises(nfc.llcp.Error) as excinfo: llc.bind(sock, 16) assert excinfo.value.errno == errno.EACCES llc.bind(ldl, 63) assert llc.getsockname(ldl) == 63 with pytest.raises(nfc.llcp.Error) as excinfo: llc.bind(dlc, 63) assert excinfo.value.errno == errno.EADDRINUSE with pytest.raises(nfc.llcp.Error) as excinfo: llc.bind(dlc, 64) assert excinfo.value.errno == errno.EFAULT with pytest.raises(nfc.llcp.Error) as excinfo: llc.bind(dlc, -1) assert excinfo.value.errno == errno.EFAULT llc.bind(dlc, 62) assert llc.getsockname(dlc) == 62
def test_bind_by_name(self, llc, raw, ldl, dlc): with pytest.raises(nfc.llcp.Error) as excinfo: llc.bind(dlc, 'urn:nfc:snep') assert excinfo.value.errno == errno.EFAULT llc.bind(dlc, 'urn:nfc:sn:snep') assert llc.getsockname(dlc) == 4 with pytest.raises(nfc.llcp.Error) as excinfo: llc.bind(ldl, 'urn:nfc:sn:snep') assert excinfo.value.errno == errno.EADDRINUSE llc.bind(ldl, 'urn:nfc:xsn:nfcpy.org:service') assert llc.getsockname(ldl) == 16 for sap in range(17, 32): sock = llc.socket(nfc.llcp.llc.RAW_ACCESS_POINT) llc.bind(sock, 'urn:nfc:sn:use_sap-{}'.format(sap)) assert llc.getsockname(sock) == sap with pytest.raises(nfc.llcp.Error) as excinfo: llc.bind(raw, 'urn:nfc:sn:sap-32') assert excinfo.value.errno == errno.EADDRNOTAVAIL
def _bind_by_name(self, socket, name): if not service_name_format.match(name): raise err.Error(errno.EFAULT) with self.lock: if self.snl.get(name) is not None: raise err.Error(errno.EADDRINUSE) addr = wks_map.get(name) if addr is None: try: addr = 16 + self.sap[16:32].index(None) except ValueError: raise err.Error(errno.EADDRNOTAVAIL) socket.bind(addr) self.sap[addr] = ServiceAccessPoint(addr, self) self.sap[addr].insert_socket(socket) self.snl[name] = addr
def sys_read(self, fd, buf, count): data = '' if count != 0: # TODO check count bytes from buf if not buf in self.current.memory: # or not self.current.memory.isValid(buf+count): logger.info("READ: buf points to invalid address. Returning EFAULT") return -errno.EFAULT try: # Read the data and put in tin memory data = self._get_fd(fd).read(count) except BadFd: logger.info(("READ: Not valid file descriptor on read." " Returning EBADF")) return -errno.EBADF self.syscall_trace.append(("_read", fd, data)) self.current.write_bytes(buf, data) return len(data)
def sys_readlink(self, path, buf, bufsize): ''' Read :rtype: int :param path: the "link path id" :param buf: the buffer where the bytes will be putted. :param bufsize: the max size for read the link. :todo: Out eax number of bytes actually sent | EAGAIN | EBADF | EFAULT | EINTR | errno.EINVAL | EIO | ENOSPC | EPIPE ''' if bufsize <= 0: return -errno.EINVAL filename = self.current.read_string(path) if filename == '/proc/self/exe': data = os.path.abspath(self.program) else: data = os.readlink(filename)[:bufsize] self.current.write_bytes(buf, data) return len(data)
def bind(self, socket, addr_or_name=None): if not isinstance(socket, tco.TransmissionControlObject): raise err.Error(errno.ENOTSOCK) if socket.addr is not None: raise err.Error(errno.EINVAL) if addr_or_name is None: self._bind_by_none(socket) elif isinstance(addr_or_name, int): self._bind_by_addr(socket, addr_or_name) elif isinstance(addr_or_name, bytes): self._bind_by_name(socket, addr_or_name) else: raise err.Error(errno.EFAULT)
def _bind_by_addr(self, socket, addr): if addr < 0 or addr > 63: raise err.Error(errno.EFAULT) with self.lock: if addr in range(32, 64) or isinstance(socket, tco.RawAccessPoint): if self.sap[addr] is None: socket.bind(addr) self.sap[addr] = ServiceAccessPoint(addr, self) self.sap[addr].insert_socket(socket) else: raise err.Error(errno.EADDRINUSE) else: raise err.Error(errno.EACCES)
def sys_getcwd(self, buf, size): ''' getcwd - Get the current working directory :param int buf: Pointer to dest array :param size: size in bytes of the array pointed to by the buf :return: buf (Success), or 0 ''' try: current_dir = os.getcwd() length = len(current_dir) + 1 if size > 0 and size < length: logger.info("GETCWD: size is greater than 0, but is smaller than the length" "of the path + 1. Returning ERANGE") return -errno.ERANGE if not self.current.memory.access_ok(slice(buf, buf+length), 'w'): logger.info("GETCWD: buf within invalid memory. Returning EFAULT") return -errno.EFAULT self.current.write_string(buf, current_dir) logger.debug("getcwd(0x%08x, %u) -> <%s> (Size %d)", buf, size, current_dir, length) return length except OSError as e: return -e.errno
def test_SocketListener_accept_errors(): class FakeSocket(tsocket.SocketType): def __init__(self, events): self._events = iter(events) type = tsocket.SOCK_STREAM # Fool the check for SO_ACCEPTCONN in SocketListener.__init__ def getsockopt(self, level, opt): return True def setsockopt(self, level, opt, value): pass # Fool the check for connection in SocketStream.__init__ def getpeername(self): pass async def accept(self): await _core.checkpoint() event = next(self._events) if isinstance(event, BaseException): raise event else: return event, None fake_server_sock = FakeSocket([]) fake_listen_sock = FakeSocket( [ OSError(errno.ECONNABORTED, "Connection aborted"), OSError(errno.EPERM, "Permission denied"), OSError(errno.EPROTO, "Bad protocol"), fake_server_sock, OSError(errno.EMFILE, "Out of file descriptors"), OSError(errno.EFAULT, "attempt to write to read-only memory"), OSError(errno.ENOBUFS, "out of buffers"), fake_server_sock, ] ) l = SocketListener(fake_listen_sock) with assert_checkpoints(): s = await l.accept() assert s.socket is fake_server_sock for code in [errno.EMFILE, errno.EFAULT, errno.ENOBUFS]: with assert_checkpoints(): with pytest.raises(OSError) as excinfo: await l.accept() assert excinfo.value.errno == code with assert_checkpoints(): s = await l.accept() assert s.socket is fake_server_sock
def sys_write(self, fd, buf, count): ''' write - send bytes through a file descriptor The write system call writes up to count bytes from the buffer pointed to by buf to the file descriptor fd. If count is zero, write returns 0 and optionally sets *tx_bytes to zero. :param fd a valid file descriptor :param buf a memory buffer :param count number of bytes to send :return: 0 Success EBADF fd is not a valid file descriptor or is not open. EFAULT buf or tx_bytes points to an invalid address. ''' data = [] cpu = self.current if count != 0: try: write_fd = self._get_fd(fd) except BadFd: logger.error("WRITE: Not valid file descriptor. Returning EBADFD %d", fd) return -errno.EBADF # TODO check count bytes from buf if buf not in cpu.memory or buf + count not in cpu.memory: logger.debug("WRITE: buf points to invalid address. Returning EFAULT") return -errno.EFAULT if fd > 2 and write_fd.is_full(): cpu.PC -= cpu.instruction.size self.wait([], [fd], None) raise RestartSyscall() data = cpu.read_bytes(buf, count) data = self._transform_write_data(data) write_fd.write(data) for line in ''.join([str(x) for x in data]).split('\n'): logger.debug("WRITE(%d, 0x%08x, %d) -> <%.48r>", fd, buf, count, line) self.syscall_trace.append(("_write", fd, data)) self.signal_transmit(fd) return len(data)