我们从Python开源项目中,提取了以下23个代码示例,用于说明如何使用errno.ENODATA。
def _copyxattr(src, dst, *, follow_symlinks=True): """Copy extended filesystem attributes from `src` to `dst`. Overwrite existing attributes. If `follow_symlinks` is false, symlinks won't be followed. """ try: names = os.listxattr(src, follow_symlinks=follow_symlinks) except OSError as e: if e.errno not in (errno.ENOTSUP, errno.ENODATA): raise return for name in names: try: value = os.getxattr(src, name, follow_symlinks=follow_symlinks) os.setxattr(dst, name, value, follow_symlinks=follow_symlinks) except OSError as e: if e.errno not in (errno.EPERM, errno.ENOTSUP, errno.ENODATA): raise
def _check_xattrs_str(self, s, getxattr, setxattr, removexattr, listxattr, **kwargs): fn = support.TESTFN open(fn, "wb").close() with self.assertRaises(OSError) as cm: getxattr(fn, s("user.test"), **kwargs) self.assertEqual(cm.exception.errno, errno.ENODATA) init_xattr = listxattr(fn) self.assertIsInstance(init_xattr, list) setxattr(fn, s("user.test"), b"", **kwargs) xattr = set(init_xattr) xattr.add("user.test") self.assertEqual(set(listxattr(fn)), xattr) self.assertEqual(getxattr(fn, b"user.test", **kwargs), b"") setxattr(fn, s("user.test"), b"hello", os.XATTR_REPLACE, **kwargs) self.assertEqual(getxattr(fn, b"user.test", **kwargs), b"hello") with self.assertRaises(OSError) as cm: setxattr(fn, s("user.test"), b"bye", os.XATTR_CREATE, **kwargs) self.assertEqual(cm.exception.errno, errno.EEXIST) with self.assertRaises(OSError) as cm: setxattr(fn, s("user.test2"), b"bye", os.XATTR_REPLACE, **kwargs) self.assertEqual(cm.exception.errno, errno.ENODATA) setxattr(fn, s("user.test2"), b"foo", os.XATTR_CREATE, **kwargs) xattr.add("user.test2") self.assertEqual(set(listxattr(fn)), xattr) removexattr(fn, s("user.test"), **kwargs) with self.assertRaises(OSError) as cm: getxattr(fn, s("user.test"), **kwargs) self.assertEqual(cm.exception.errno, errno.ENODATA) xattr.remove("user.test") self.assertEqual(set(listxattr(fn)), xattr) self.assertEqual(getxattr(fn, s("user.test2"), **kwargs), b"foo") setxattr(fn, s("user.test"), b"a"*1024, **kwargs) self.assertEqual(getxattr(fn, s("user.test"), **kwargs), b"a"*1024) removexattr(fn, s("user.test"), **kwargs) many = sorted("user.test{}".format(i) for i in range(100)) for thing in many: setxattr(fn, thing, b"x", **kwargs) self.assertEqual(set(listxattr(fn)), set(init_xattr) | set(many))
def read_from_rp(self, rp): """Set the extended attributes from an rpath""" try: attr_list = rp.conn.xattr.listxattr(encode(rp.path), rp.issym()) except IOError, exc: if exc[0] in (errno.EOPNOTSUPP, errno.EPERM, errno.ETXTBSY): return # if not supported, consider empty if exc[0] in (errno.EACCES, errno.ENOENT, errno.ELOOP): log.Log("Warning: listattr(%s): %s" % (repr(rp.path), exc), 4) return raise for attr in attr_list: if attr.startswith('system.'): # Do not preserve system extended attributes continue if not rp.isdir() and attr == 'com.apple.ResourceFork': # Resource Fork handled elsewhere, except for directories continue try: self.attr_dict[attr] = \ rp.conn.xattr.getxattr(encode(rp.path), attr, rp.issym()) except IOError, exc: # File probably modified while reading, just continue if exc[0] == errno.ENODATA: continue elif exc[0] == errno.ENOENT: break # Handle bug in pyxattr < 0.2.2 elif exc[0] == errno.ERANGE: continue else: raise
def get_gfid(path): try: return uuid.UUID(bytes=xattr.get(path, "trusted.gfid", nofollow=True)) except (IOError, OSError) as e: if e.errno == ENODATA: return uuid.UUID(bytes=xattr.get(path, "glusterfs.gfid", nofollow=True)) else: raise
def get_xattrs_as_blob(fs, filename): # pragma: no cover tracing.trace('filename=%s' % filename) try: names = fs.llistxattr(filename) except (OSError, IOError), e: if e.errno in (errno.EOPNOTSUPP, errno.EACCES): return None raise tracing.trace('names=%s' % repr(names)) if not names: return None values = [] for name in names[:]: tracing.trace('trying name %s' % repr(name)) try: value = fs.lgetxattr(filename, name) except OSError, e: # On btrfs, at least, this can happen: the filesystem returns # a list of attribute names, but then fails when looking up # the value for one or more of the names. We pretend that the # name was never returned in that case. # # Obviously this can happen due to race conditions as well. if e.errno == errno.ENODATA: names.remove(name) logging.warning( '%s has extended attribute named %s without value, ' 'ignoring attribute', filename, name) else: raise else: tracing.trace('lgetxattr(%s)=%s' % (name, value)) values.append(value) assert len(names) == len(values) name_blob = ''.join('%s\0' % name for name in names) lengths = [len(v) for v in values] fmt = '!' + 'Q' * len(values) value_blob = struct.pack(fmt, *lengths) + ''.join(values) return ('%s%s%s' % (struct.pack('!Q', len(name_blob)), name_blob, value_blob))
def run(args): # Volmark from Master side fmt_string = "!" + "B" * 19 + "II" try: vm = struct.unpack(fmt_string, xattr.get( args.path, "trusted.glusterfs.volume-mark", nofollow=True)) print "UUID : %s" % uuid.UUID( "".join(['%02x' % x for x in vm[2:18]])) print "VERSION : %s.%s" % vm[0:2] print "RETVAL : %s" % vm[18] print "VOLUME MARK : %s.%s (%s)" % ( vm[19], vm[20], human_time("%s.%s" % (vm[19], vm[20]))) except (OSError, IOError) as e: if e.errno == ENODATA: pass else: print "[Error %s] %s" % (e.errno, os.strerror(e.errno)) sys.exit(-1) # Volmark from slave side all_xattrs = xattr.list(args.path) fmt_string = "!" + "B" * 19 + "II" + "I" volmark_xattrs = [] for x in all_xattrs: if x.startswith("trusted.glusterfs.volume-mark."): volmark_xattrs.append(x) for vx in volmark_xattrs: try: vm = struct.unpack(fmt_string, xattr.get( args.path, vx)) print "UUID : %s" % uuid.UUID( "".join(['%02x' % x for x in vm[2:18]])) print "VERSION : %s.%s" % vm[0:2] print "RETVAL : %s" % vm[18] print "VOLUME MARK : %s.%s (%s)" % ( vm[19], vm[20], human_time("%s.%s" % (vm[19], vm[20]))) print "TIMEOUT : %s (%s)" % (vm[-1], human_time(vm[-1])) except (OSError, IOError) as e: if e.errno == ENODATA: pass else: print "[Error %s] %s" % (e.errno, os.strerror(e.errno)) sys.exit(-1)
def getxattr(self, path, xattr, position=0): """Called with a specific namespace.name xattr. Can return either a bytes array OR an int.""" if position: raise TmfsOSError(errno.ENOSYS) # never saw this in 8 months rsp = self.librarian(self.lcp('get_shelf', path=path)) shelf = TMShelf(rsp) # Does this also need changed to support path instead of name? # Piggy back for queries by kernel (globals & fault handling). if xattr.startswith('_obtain_'): # this will need some work data = self.shadow.getxattr(shelf, xattr) try: return bytes(data.encode()) except AttributeError as e: # probably the "encode()" self._ret_is_string = False return bytes(data) # "ls" starts with simple getattr but then comes here for # security.selinux, system.posix_acl_access, and posix_acl_default. # ls -l can also do the same thing on '/'. Save the round trips. # if xattr.startswith('security.') or not shelf_name: # path == '/' if xattr.startswith('security.'): # path == '/' is legal now return bytes(0) try: rsp = self.librarian( self.lcp('get_xattr', path=path, xattr=xattr)) value = rsp['value'] assert value is not None # 'No such attribute' if isinstance(value, int): return value elif isinstance(value, str): # http://stackoverflow.com/questions/606191/convert-bytes-to-a-python-string return bytes(value.encode('cp437')) else: bytes(value.encode()) except Exception as e: raise TmfsOSError(errno.ENODATA) # syn for ENOATTR