我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用ctypes.get_errno()。
def watch(self, path): ''' Register a watch for the file/directory named path. Raises an OSError if path does not exist. ''' path = realpath(path) with self.lock: if path not in self.watches: bpath = path if isinstance(path, bytes) else path.encode(self.fenc) flags = self.MOVE_SELF | self.DELETE_SELF buf = ctypes.c_char_p(bpath) # Try watching path as a directory wd = self._add_watch(self._inotify_fd, buf, flags | self.ONLYDIR) if wd == -1: eno = ctypes.get_errno() if eno != errno.ENOTDIR: self.handle_error() # Try watching path as a file flags |= (self.MODIFY | self.ATTRIB) wd = self._add_watch(self._inotify_fd, buf, flags) if wd == -1: self.handle_error() self.watches[path] = wd self.modified[path] = False
def add_watch(self, path): bpath = path if isinstance(path, bytes) else path.encode(self.fenc) wd = self._add_watch( self._inotify_fd, ctypes.c_char_p(bpath), # Ignore symlinks and watch only directories self.DONT_FOLLOW | self.ONLYDIR | self.MODIFY | self.CREATE | self.DELETE | self.MOVE_SELF | self.MOVED_FROM | self.MOVED_TO | self.ATTRIB | self.DELETE_SELF ) if wd == -1: eno = ctypes.get_errno() if eno == errno.ENOTDIR: return False raise OSError(eno, 'Failed to add watch for: {0}: {1}'.format(path, self.os.strerror(eno))) self.watched_dirs[path] = wd self.watched_rmap[wd] = path return True
def __init__(self, cloexec=True, nonblock=True): self._init1, self._add_watch, self._rm_watch, self._read = load_inotify() flags = 0 if cloexec: flags |= self.CLOEXEC if nonblock: flags |= self.NONBLOCK self._inotify_fd = self._init1(flags) if self._inotify_fd == -1: raise INotifyError(os.strerror(ctypes.get_errno())) self._buf = ctypes.create_string_buffer(5000) self.fenc = get_preferred_file_name_encoding() self.hdr = struct.Struct(b'iIII') # We keep a reference to os to prevent it from being deleted # during interpreter shutdown, which would lead to errors in the # __del__ method self.os = os
def read(self, get_name=True): buf = [] while True: num = self._read(self._inotify_fd, self._buf, len(self._buf)) if num == 0: break if num < 0: en = ctypes.get_errno() if en == errno.EAGAIN: break # No more data if en == errno.EINTR: continue # Interrupted, try again raise OSError(en, self.os.strerror(en)) buf.append(self._buf.raw[:num]) raw = b''.join(buf) pos = 0 lraw = len(raw) while lraw - pos >= self.hdr.size: wd, mask, cookie, name_len = self.hdr.unpack_from(raw, pos) pos += self.hdr.size name = None if get_name: name = raw[pos:pos + name_len].rstrip(b'\0') pos += name_len self.process_event(wd, mask, cookie, name)
def inotify_init(flags=0): """Initializes a new inotify instance and returns a file descriptor associated with a new inotify event queue. :param ``INInitFlags`` flags: Optional flag to control the inotify_init behavior. """ fileno = _INOTIFY_INIT1(flags) if fileno < 0: errno = ctypes.get_errno() raise OSError(errno, os.strerror(errno), 'inotify_init1(%r)' % flags) return fileno ############################################################################### # Constants copied from sys/inotify.h # # See man inotify(7) for more details. #
def mount(source, target, fs_type, mnt_flags=()): """Mount ``source`` on ``target`` using filesystem type ``fs_type`` and mount flags ``mnt_flags``. NOTE: Mount data argument is not supported. """ res = _MOUNT(source, target, fs_type, mnt_flags, 0) if res < 0: errno = ctypes.get_errno() raise OSError( errno, os.strerror(errno), 'mount(%r, %r, %r, %r)' % (source, target, fs_type, mnt_flags) ) return res # int umount(const char *target);
def eventfd(initval, flags): """create a file descriptor for event notification. """ if initval < 0 or initval > (2**64 - 1): raise ValueError('Invalid initval: %r' % initval) fileno = _EVENTFD(initval, flags) if fileno < 0: errno = ctypes.get_errno() raise OSError(errno, os.strerror(errno), 'eventfd(%r, %r)' % (initval, flags)) return fileno ############################################################################### # Constants copied from sys/eventfd.h # # See man eventfd(2) for more details. #
def _remove_watch_for_path(self, path): # Must be called with _inotify_fd_lock held. logging.debug('_remove_watch_for_path(%r)', path) wd = self._directory_to_watch_descriptor[path] if _libc.inotify_rm_watch(self._inotify_fd, wd) < 0: # If the directory is deleted then the watch will removed automatically # and inotify_rm_watch will fail. Just log the error. logging.debug('inotify_rm_watch failed for %r: %d [%r]', path, ctypes.get_errno(), errno.errorcode[ctypes.get_errno()]) parent_path = os.path.dirname(path) if parent_path in self._directory_to_subdirs: self._directory_to_subdirs[parent_path].remove(path) # _directory_to_subdirs must be copied because it is mutated in the # recursive call. for subdir in frozenset(self._directory_to_subdirs[path]): self._remove_watch_for_path(subdir) del self._watch_to_directory[wd] del self._directory_to_watch_descriptor[path] del self._directory_to_subdirs[path]
def fallocate(self, mode, offset, length): """ This is a Linux-specific sys call, unlike posix_fallocate() Allows the caller to directly manipulate the allocated disk space for the file for the byte range starting at offset and continuing for length bytes. :param mode: Operation to be performed on the given range :param offset: Starting offset :param length: Size in bytes, starting at offset """ ret = api.client.glfs_fallocate(self.fd, mode, offset, length) if ret < 0: err = ctypes.get_errno() raise OSError(err, os.strerror(err))
def fgetxattr(self, key, size=0): """ Retrieve the value of the extended attribute identified by key for the file. :param key: Key of extended attribute :param size: If size is specified as zero, we first determine the size of xattr and then allocate a buffer accordingly. If size is non-zero, it is assumed the caller knows the size of xattr. :returns: Value of extended attribute corresponding to key specified. """ if size == 0: size = api.glfs_fgetxattr(self.fd, key, None, size) if size < 0: err = ctypes.get_errno() raise OSError(err, os.strerror(err)) buf = ctypes.create_string_buffer(size) rc = api.glfs_fgetxattr(self.fd, key, buf, size) if rc < 0: err = ctypes.get_errno() raise OSError(err, os.strerror(err)) return buf.value[:rc]
def fsetxattr(self, key, value, flags=0): """ Set extended attribute of file. :param key: The key of extended attribute. :param value: The valiue of extended attribute. :param flags: Possible values are 0 (default), 1 and 2 0: xattr will be created if it does not exist, or the value will be replaced if the xattr exists. 1: Perform a pure create, which fails if the named attribute already exists. 2: Perform a pure replace operation, which fails if the named attribute does not already exist. """ ret = api.glfs_fsetxattr(self.fd, key, value, len(value), flags) if ret < 0: err = ctypes.get_errno() raise OSError(err, os.strerror(err))
def lseek(self, pos, how): """ Set the read/write offset position of this file. The new position is defined by 'pos' relative to 'how' :param pos: sets new offset position according to 'how' :param how: SEEK_SET, sets offset position 'pos' bytes relative to beginning of file, SEEK_CUR, the position is set relative to the current position and SEEK_END sets the position relative to the end of the file. :returns: the new offset position """ ret = api.glfs_lseek(self.fd, pos, how) if ret < 0: err = ctypes.get_errno() raise OSError(err, os.strerror(err)) return ret
def read(self, size=-1): """ Read at most size bytes from the file. :param buflen: length of read buffer. If less than 0, then whole file is read. Default is -1. :returns: buffer of 'size' length """ if size < 0: size = self.fgetsize() rbuf = ctypes.create_string_buffer(size) ret = api.glfs_read(self.fd, rbuf, size, 0) if ret > 0: # In python 2.x, read() always returns a string. It's really upto # the consumer to decode this string into whatever encoding it was # written with. return rbuf.value[:ret] elif ret < 0: err = ctypes.get_errno() raise OSError(err, os.strerror(err))
def umount(self): """ Unmount a mounted GlusterFS volume. Provides users a way to free resources instead of just waiting for python garbage collector to call __del__() at some point later. """ if self.fs: ret = self._api.glfs_fini(self.fs) if ret < 0: err = ctypes.get_errno() raise LibgfapiException("glfs_fini(%s) failed: %s" % (self.fs, os.strerror(err))) else: # Succeeded. Protect against multiple umount() calls. self._mounted = False self.fs = None
def getxattr(self, path, key, size=0): """ Retrieve the value of the extended attribute identified by key for path specified. :param path: Path to file or directory :param key: Key of extended attribute :param size: If size is specified as zero, we first determine the size of xattr and then allocate a buffer accordingly. If size is non-zero, it is assumed the caller knows the size of xattr. :returns: Value of extended attribute corresponding to key specified. """ if size == 0: size = api.glfs_getxattr(self.fs, path, key, None, 0) if size < 0: err = ctypes.get_errno() raise OSError(err, os.strerror(err)) buf = ctypes.create_string_buffer(size) rc = api.glfs_getxattr(self.fs, path, key, buf, size) if rc < 0: err = ctypes.get_errno() raise OSError(err, os.strerror(err)) return buf.value[:rc]
def setxattr(self, path, key, value, flags=0): """ Set extended attribute of the path. :param path: Path to file or directory. :param key: The key of extended attribute. :param value: The valiue of extended attribute. :param flags: Possible values are 0 (default), 1 and 2 0: xattr will be created if it does not exist, or the value will be replaced if the xattr exists. 1: Perform a pure create, which fails if the named attribute already exists. 2: Perform a pure replace operation, which fails if the named attribute does not already exist. """ ret = api.glfs_setxattr(self.fs, path, key, value, len(value), flags) if ret < 0: err = ctypes.get_errno() raise OSError(err, os.strerror(err))
def ptrace(command, pid=0, arg1=0, arg2=0, check_errno=False): if HAS_CPTRACE: try: set_errno(0) result = _ptrace(command, pid, arg1, arg2, check_errno) except ValueError as errobj: message = str(errobj) errno = get_errno() raise PtraceError(message, errno=errno, pid=pid) else: result = _ptrace(command, pid, arg1, arg2) result_signed = c_long(result).value if result_signed == -1: errno = get_errno() # peek operations may returns -1 with errno=0: # it's not an error. For other operations, -1 # is always an error if not(check_errno) or errno: message = "ptrace(cmd=%s, pid=%s, %r, %r) error #%s: %s" % ( command, pid, arg1, arg2, errno, strerror(errno)) raise PtraceError(message, errno=errno, pid=pid) return result
def get_errno(self): return ctypes.get_errno()
def get_errno(): raise NotImplementedError( 'Your python version does not support errno/last_error' )
def getPollFDList(self): """ Return file descriptors to be used to poll USB events. You should not have to call this method, unless you are integrating this class with a polling mechanism. """ pollfd_p_p = libusb1.libusb_get_pollfds(self.__context_p) if not pollfd_p_p: errno = get_errno() if errno: raise OSError(errno) else: # Assume not implemented raise NotImplementedError( 'Your libusb does not seem to implement pollable FDs') try: result = [] append = result.append fd_index = 0 while pollfd_p_p[fd_index]: append(( pollfd_p_p[fd_index].contents.fd, pollfd_p_p[fd_index].contents.events, )) fd_index += 1 finally: _free(pollfd_p_p) return result
def monotonic(): """Monotonic clock, cannot go backward.""" ts = timespec() if clock_gettime(CLOCK_MONOTONIC, ctypes.pointer(ts)): errno = ctypes.get_errno() raise OSError(errno, os.strerror(errno)) return ts.tv_sec + ts.tv_nsec / 1.0e9 # Perform a sanity-check.
def handle_error(self): eno = ctypes.get_errno() extra = '' if eno == errno.ENOSPC: extra = 'You may need to increase the inotify limits on your system, via /proc/sys/fs/inotify/max_user_*' raise OSError(eno, self.os.strerror(eno) + str(extra))
def errno(): return ctypes.get_errno()
def mprotect_libc(addr, size, flags): libc = ctypes.CDLL(ctypes.util.find_library('libc'), use_errno=True) libc.mprotect.argtypes = [c_void_p, c_size_t, c_int] libc.mprotect.restype = c_int addr_align = addr & ~(PAGE_SIZE - 1) mem_end = (addr + size) & ~(PAGE_SIZE - 1) if (addr + size) > mem_end: mem_end += PAGE_SIZE memlen = mem_end - addr_align ret = libc.mprotect(addr_align, memlen, flags) if ret == -1: e = ctypes.get_errno() raise OSError(e, errno.errorcodes[e], os.strerror(e))
def posix_error(filename): errno = ctypes.get_errno() exc = OSError(errno, strerror(errno)) exc.filename = filename return exc
def umount(mount_point): """This function unmounts a mounted directory forcibly. This will be used for unmounting broken hard drive mounts which may hang. If umount returns EBUSY this will lazy unmount. :param mount_point: str. A String representing the filesystem mount point :returns: int. Returns 0 on success. errno otherwise. """ libc_path = ctypes.util.find_library("c") libc = ctypes.CDLL(libc_path, use_errno=True) # First try to umount with MNT_FORCE ret = libc.umount(mount_point, 1) if ret < 0: err = ctypes.get_errno() if err == errno.EBUSY: # Detach from try. IE lazy umount ret = libc.umount(mount_point, 2) if ret < 0: err = ctypes.get_errno() return err return 0 else: return err return 0
def sendfile(fdout, fdin, offset, nbytes): if sys.platform == 'darwin': _sendfile.argtypes = [ctypes.c_int, ctypes.c_int, ctypes.c_uint64, ctypes.POINTER(ctypes.c_uint64), ctypes.c_voidp, ctypes.c_int] _nbytes = ctypes.c_uint64(nbytes) result = _sendfile(fdin, fdout, offset, _nbytes, None, 0) if result == -1: e = ctypes.get_errno() if e == errno.EAGAIN and _nbytes.value is not None: return _nbytes.value raise OSError(e, os.strerror(e)) return _nbytes.value elif sys.platform in ('freebsd', 'dragonfly',): _sendfile.argtypes = [ctypes.c_int, ctypes.c_int, ctypes.c_uint64, ctypes.c_uint64, ctypes.c_voidp, ctypes.POINTER(ctypes.c_uint64), ctypes.c_int] _sbytes = ctypes.c_uint64() result = _sendfile(fdin, fdout, offset, nbytes, None, _sbytes, 0) if result == -1: e = ctypes.get_errno() if e == errno.EAGAIN and _sbytes.value is not None: return _sbytes.value raise OSError(e, os.strerror(e)) return _sbytes.value else: _sendfile.argtypes = [ctypes.c_int, ctypes.c_int, ctypes.POINTER(ctypes.c_uint64), ctypes.c_size_t] _offset = ctypes.c_uint64(offset) sent = _sendfile(fdout, fdin, _offset, nbytes) if sent == -1: e = ctypes.get_errno() raise OSError(e, os.strerror(e)) return sent
def get_errno(self): """ Return None is no errno code is available. """ return self._get_errno()
def str_errno(self): code = self.get_errno() if code is None: return 'Errno: no errno support' return 'Errno=%s (%s)' % (os.strerror(code), errno.errorcode[code])
def init(self): assert ctypes try_libc_name = 'c' if sys.platform.startswith('freebsd'): try_libc_name = 'inotify' libc_name = None try: libc_name = ctypes.util.find_library(try_libc_name) except (OSError, IOError): pass # Will attemp to load it with None anyway. self._libc = ctypes.CDLL(libc_name, use_errno=True) self._get_errno_func = ctypes.get_errno # Eventually check that libc has needed inotify bindings. if (not hasattr(self._libc, 'inotify_init') or not hasattr(self._libc, 'inotify_add_watch') or not hasattr(self._libc, 'inotify_rm_watch')): return False self._libc.inotify_init.argtypes = [] self._libc.inotify_init.restype = ctypes.c_int self._libc.inotify_add_watch.argtypes = [ctypes.c_int, ctypes.c_char_p, ctypes.c_uint32] self._libc.inotify_add_watch.restype = ctypes.c_int self._libc.inotify_rm_watch.argtypes = [ctypes.c_int, ctypes.c_int] self._libc.inotify_rm_watch.restype = ctypes.c_int return True
def _monotonic(): # noqa t = timespec() if clock_gettime(CLOCK_MONOTONIC, ctypes.pointer(t)) != 0: errno_ = ctypes.get_errno() raise OSError(errno_, os.strerror(errno_)) return t.tv_sec + t.tv_nsec * 1e-9
def die_if_parent_dies(signum=9): if 'linux' not in sys.platform: return try: import ctypes libc = ctypes.CDLL('libc.so.6', use_errno=True) PR_SET_PDEATHSIG = 1 result = libc.prctl(PR_SET_PDEATHSIG, signum) if result == 0: return True else: log('prctl failed: %s', os.strerror(ctypes.get_errno())) except StandardError, ex: sys.stderr.write(str(ex) + '\n')