我们从Python开源项目中,提取了以下28个代码示例,用于说明如何使用stat.S_IFLNK。
def _write_symlink(self, zf, link_target, link_path): """Package symlinks with appropriate zipfile metadata.""" info = zipfile.ZipInfo() info.filename = link_path info.create_system = 3 # Magic code for symlinks / py2/3 compat # 27166663808 = (stat.S_IFLNK | 0755) << 16 info.external_attr = 2716663808 zf.writestr(info, link_target)
def symlink(self, parent_inode, name, target, ctx): assert self._initialized assert_or_errno(self.access(parent_inode, WX_OK, ctx), EPERM, 'no wx perm to parent inode') n = self._lookup(parent_inode, name, ctx, noref=True) assert_or_errno(not n, EEXIST, 'link target exists') try: self.unlink(parent_inode, name, ctx) except llfuse.FUSEError as e: if e.errno != ENOENT: raise fd, a = self.create(parent_inode, name, stat.S_IFLNK | 0o777, os.O_WRONLY, ctx) try: self.write(fd, 0, target) return a finally: self.release(fd)
def verify(self, img, **args): """Returns a tuple of lists of the form (errors, warnings, info). The error list will be empty if the action has been correctly installed in the given image.""" target = self.attrs["target"] path = self.get_installed_path(img.get_root()) lstat, errors, warnings, info, abort = \ self.verify_fsobj_common(img, stat.S_IFLNK) if abort: assert errors return errors, warnings, info atarget = os.readlink(path) if target != atarget: errors.append(_("Target: '{found}' should be " "'{expected}'").format(found=atarget, expected=target)) return errors, warnings, info
def cleanup_mode(mode): """Cleanup a mode value. This will return a mode that can be stored in a tree object. :param mode: Mode to clean up. """ if stat.S_ISLNK(mode): return stat.S_IFLNK elif stat.S_ISDIR(mode): return stat.S_IFDIR elif S_ISGITLINK(mode): return S_IFGITLINK ret = stat.S_IFREG | 0o644 ret |= (mode & 0o111) return ret
def _initialize_aci(self, mode, fileType): valid_types = [ stat.S_IFREG, stat.S_IFDIR, stat.S_IFCHR, stat.S_IFBLK, stat.S_IFIFO, stat.S_IFLNK, stat.S_IFSOCK] if fileType not in valid_types: raise RuntimeError("Invalid file type.") aci = self.aciCollection.new() uid = os.getuid() aci.dbobj.id = uid aci.dbobj.uname = pwd.getpwuid(uid).pw_name aci.dbobj.gname = grp.getgrgid(os.getgid()).gr_name aci.dbobj.mode = int(mode, 8) + fileType aci.save() return aci.key
def AddToZipHermetic(zip_file, zip_path, src_path=None, data=None, compress=None): """Adds a file to the given ZipFile with a hard-coded modified time. Args: zip_file: ZipFile instance to add the file to. zip_path: Destination path within the zip file. src_path: Path of the source file. Mutually exclusive with |data|. data: File data as a string. compress: Whether to enable compression. Default is taken from ZipFile constructor. """ assert (src_path is None) != (data is None), ( '|src_path| and |data| are mutually exclusive.') CheckZipPath(zip_path) zipinfo = zipfile.ZipInfo(filename=zip_path, date_time=HERMETIC_TIMESTAMP) zipinfo.external_attr = _HERMETIC_FILE_ATTR if src_path and os.path.islink(src_path): zipinfo.filename = zip_path zipinfo.external_attr |= stat.S_IFLNK << 16L # mark as a symlink zip_file.writestr(zipinfo, os.readlink(src_path)) return if src_path: with file(src_path) as f: data = f.read() # zipfile will deflate even when it makes the file bigger. To avoid # growing files, disable compression at an arbitrary cut off point. if len(data) < 16: compress = False # None converts to ZIP_STORED, when passed explicitly rather than the # default passed to the ZipFile constructor. compress_type = zip_file.compression if compress is not None: compress_type = zipfile.ZIP_DEFLATED if compress else zipfile.ZIP_STORED zip_file.writestr(zipinfo, data, compress_type)
def AddToZipHermetic(zip_file, zip_path, src_path=None, data=None, compress=None): """Adds a file to the given ZipFile with a hard-coded modified time. Args: zip_file: ZipFile instance to add the file to. zip_path: Destination path within the zip file. src_path: Path of the source file. Mutually exclusive with |data|. data: File data as a string. compress: Whether to enable compression. Default is take from ZipFile constructor. """ assert (src_path is None) != (data is None), ( '|src_path| and |data| are mutually exclusive.') CheckZipPath(zip_path) zipinfo = zipfile.ZipInfo(filename=zip_path, date_time=HERMETIC_TIMESTAMP) zipinfo.external_attr = _HERMETIC_FILE_ATTR if src_path and os.path.islink(src_path): zipinfo.filename = zip_path zipinfo.external_attr |= stat.S_IFLNK << 16L # mark as a symlink zip_file.writestr(zipinfo, os.readlink(src_path)) return if src_path: with file(src_path) as f: data = f.read() # zipfile will deflate even when it makes the file bigger. To avoid # growing files, disable compression at an arbitrary cut off point. if len(data) < 16: compress = False # None converts to ZIP_STORED, when passed explicitly rather than the # default passed to the ZipFile constructor. compress_type = zip_file.compression if compress is not None: compress_type = zipfile.ZIP_DEFLATED if compress else zipfile.ZIP_STORED zip_file.writestr(zipinfo, data, compress_type)
def dbpcs(mode): if mode & stat.S_IFLNK == stat.S_IFLNK: return 's' if mode & stat.S_IFSOCK == stat.S_IFSOCK: return 's' if mode & stat.S_IFREG == stat.S_IFREG: return '-' if mode & stat.S_IFBLK == stat.S_IFBLK: return 'b' if mode & stat.S_IFDIR == stat.S_IFDIR: return 'd' if mode & stat.S_IFIFO == stat.S_IFIFO: return 'p' if mode & stat.S_IFCHR == stat.S_IFCHR: return 'c' return '?'
def getattr(self, inode, ctx=None): attrs = self.inodes.get(inode) if attrs is None: raise llfuse.FUSEError(errno.ENOENT) # FIXME if attrs.get('type') == 'tree': mode_filetype = stat.S_IFDIR elif attrs.get('type') == 'blob': mode_filetype = stat.S_IFREG elif attrs.get('filetype') == 'link': mode_filetype = stat.S_IFLNK elif attrs.get('filetype') == 'fifo': mode_filetype = stat.S_IFIFO else: raise llfuse.FUSEError(errno.ENOENT) # FIXME entry = llfuse.EntryAttributes() entry.st_mode = mode_filetype | attrs.get('mode', MartyFSHandler.DEFAULT_MODE) if attrs.get('type') == 'blob' and 'ref' in attrs: entry.st_size = self.storage.size(attrs['ref']) else: entry.st_size = 0 stamp = int(1438467123.985654 * 1e9) entry.st_atime_ns = stamp entry.st_ctime_ns = stamp entry.st_mtime_ns = stamp entry.st_gid = 0 entry.st_uid = 0 entry.st_ino = inode return entry
def getattr(self, path, fh=None): (typ,kw) = path_type(path) now = time.time() if typ=='root' or typ=='histories': # Simple directory st = dict(st_mode=(S_IFDIR | 0555), st_nlink=2) st['st_ctime'] = st['st_mtime'] = st['st_atime'] = now elif typ=='datasets': # Simple directory st = dict(st_mode=(S_IFDIR | 0555), st_nlink=2) st['st_ctime'] = st['st_mtime'] = st['st_atime'] = now elif typ=='historydataorcoll': # Dataset or collection d = self._dataset(kw) if d['history_content_type'] == 'dataset_collection': # A collection, will be a simple directory. st = dict(st_mode=(S_IFDIR | 0555), st_nlink=2) st['st_ctime'] = st['st_mtime'] = st['st_atime'] = now else: # A file, will be a symlink to a galaxy dataset. t = time.mktime(time.strptime(d['update_time'],'%Y-%m-%dT%H:%M:%S.%f')) fname = esc_filename(d.get('file_path', d['file_name'])) st = dict(st_mode=(S_IFLNK | 0444), st_nlink=1, st_size=len(fname), st_ctime=t, st_mtime=t, st_atime=t) elif typ=='collectiondataset': # A file within a collection, will be a symlink to a galaxy dataset. d = self._dataset(kw, display=False) t = time.mktime(time.strptime(d['update_time'],'%Y-%m-%dT%H:%M:%S.%f')) fname = esc_filename(d.get('file_path', d['file_name'])) st = dict(st_mode=(S_IFLNK | 0444), st_nlink=1, st_size=len(fname), st_ctime=t, st_mtime=t, st_atime=t) else: raise FuseOSError(ENOENT) return st # Return a symlink for the given dataset
def test_islink_returns_true_for_symlink(self): metadata = obnamlib.Metadata(st_mode=stat.S_IFLNK) self.assert_(metadata.islink())
def test_returns_symlink_fields_correctly(self): self.fakefs.st_mode |= stat.S_IFLNK metadata = obnamlib.read_metadata(self.fakefs, 'foo', getpwuid=self.fakefs.getpwuid, getgrgid=self.fakefs.getgrgid) fields = ['st_mode', 'target'] for field in fields: self.assertEqual(getattr(metadata, field), getattr(self.fakefs, field), field)
def test_sets_symlink_target(self): self.fs.remove(self.filename) self.metadata.st_mode = 0777 | stat.S_IFLNK self.metadata.target = 'target' obnamlib.set_metadata(self.fs, self.filename, self.metadata) self.assertEqual(self.fs.readlink(self.filename), 'target')
def test_sets_symlink_mtime_perms(self): self.fs.remove(self.filename) self.metadata.st_mode = 0777 | stat.S_IFLNK self.metadata.target = 'target' obnamlib.set_metadata(self.fs, self.filename, self.metadata) st = os.lstat(self.filename) self.assertEqual(st.st_mode, self.metadata.st_mode) self.assertEqual(st.st_mtime, self.metadata.st_mtime_sec)
def check(self): """Check this object for internal consistency. :raise ObjectFormatException: if the object is malformed in some way """ super(Tree, self).check() last = None allowed_modes = (stat.S_IFREG | 0o755, stat.S_IFREG | 0o644, stat.S_IFLNK, stat.S_IFDIR, S_IFGITLINK, # TODO: optionally exclude as in git fsck --strict stat.S_IFREG | 0o664) for name, mode, sha in parse_tree(b''.join(self._chunked_text), True): check_hexsha(sha, 'invalid sha %s' % sha) if b'/' in name or name in (b'', b'.', b'..'): raise ObjectFormatException('invalid name %s' % name) if mode not in allowed_modes: raise ObjectFormatException('invalid mode %06o' % mode) entry = (name, (mode, sha)) if last: if key_entry(last) > key_entry(entry): raise ObjectFormatException('entries not sorted') if name == last[0]: raise ObjectFormatException('duplicate entry %s' % name) last = entry
def symlink(self, target, source): self.files[target] = dict(st_mode=(S_IFLNK | 0o777), st_nlink=1, st_size=len(source)) self.data[target] = source
def __str__(self): "create a unix-style long description of the file (like ls -l)" if self.st_mode is not None: kind = stat.S_IFMT(self.st_mode) if kind == stat.S_IFIFO: ks = 'p' elif kind == stat.S_IFCHR: ks = 'c' elif kind == stat.S_IFDIR: ks = 'd' elif kind == stat.S_IFBLK: ks = 'b' elif kind == stat.S_IFREG: ks = '-' elif kind == stat.S_IFLNK: ks = 'l' elif kind == stat.S_IFSOCK: ks = 's' else: ks = '?' ks += self._rwx((self.st_mode & 0700) >> 6, self.st_mode & stat.S_ISUID) ks += self._rwx((self.st_mode & 070) >> 3, self.st_mode & stat.S_ISGID) ks += self._rwx(self.st_mode & 7, self.st_mode & stat.S_ISVTX, True) else: ks = '?---------' # compute display date if (self.st_mtime is None) or (self.st_mtime == 0xffffffffL): # shouldn't really happen datestr = '(unknown date)' else: if abs(time.time() - self.st_mtime) > 15552000: # (15552000 = 6 months) datestr = time.strftime('%d %b %Y', time.localtime(self.st_mtime)) else: datestr = time.strftime('%d %b %H:%M', time.localtime(self.st_mtime)) filename = getattr(self, 'filename', '?') # not all servers support uid/gid uid = self.st_uid gid = self.st_gid if uid is None: uid = 0 if gid is None: gid = 0 return '%s 1 %-8d %-8d %8d %-12s %s' % (ks, uid, gid, self.st_size, datestr, filename)
def __str__(self): """create a unix-style long description of the file (like ls -l)""" if self.st_mode is not None: kind = stat.S_IFMT(self.st_mode) if kind == stat.S_IFIFO: ks = 'p' elif kind == stat.S_IFCHR: ks = 'c' elif kind == stat.S_IFDIR: ks = 'd' elif kind == stat.S_IFBLK: ks = 'b' elif kind == stat.S_IFREG: ks = '-' elif kind == stat.S_IFLNK: ks = 'l' elif kind == stat.S_IFSOCK: ks = 's' else: ks = '?' ks += self._rwx((self.st_mode & o700) >> 6, self.st_mode & stat.S_ISUID) ks += self._rwx((self.st_mode & o70) >> 3, self.st_mode & stat.S_ISGID) ks += self._rwx(self.st_mode & 7, self.st_mode & stat.S_ISVTX, True) else: ks = '?---------' # compute display date if (self.st_mtime is None) or (self.st_mtime == xffffffff): # shouldn't really happen datestr = '(unknown date)' else: if abs(time.time() - self.st_mtime) > 15552000: # (15552000 = 6 months) datestr = time.strftime('%d %b %Y', time.localtime(self.st_mtime)) else: datestr = time.strftime('%d %b %H:%M', time.localtime(self.st_mtime)) filename = getattr(self, 'filename', '?') # not all servers support uid/gid uid = self.st_uid gid = self.st_gid if uid is None: uid = 0 if gid is None: gid = 0 return '%s 1 %-8d %-8d %8d %-12s %s' % (ks, uid, gid, self.st_size, datestr, filename)
def __str__(self): """create a unix-style long description of the file (like ls -l)""" if self.st_mode is not None: kind = stat.S_IFMT(self.st_mode) if kind == stat.S_IFIFO: ks = 'p' elif kind == stat.S_IFCHR: ks = 'c' elif kind == stat.S_IFDIR: ks = 'd' elif kind == stat.S_IFBLK: ks = 'b' elif kind == stat.S_IFREG: ks = '-' elif kind == stat.S_IFLNK: ks = 'l' elif kind == stat.S_IFSOCK: ks = 's' else: ks = '?' ks += self._rwx((self.st_mode & o700) >> 6, self.st_mode & stat.S_ISUID) ks += self._rwx((self.st_mode & o70) >> 3, self.st_mode & stat.S_ISGID) ks += self._rwx(self.st_mode & 7, self.st_mode & stat.S_ISVTX, True) else: ks = '?---------' # compute display date if (self.st_mtime is None) or (self.st_mtime == xffffffff): # shouldn't really happen datestr = '(unknown date)' else: if abs(time.time() - self.st_mtime) > 15552000: # (15552000 = 6 months) datestr = time.strftime('%d %b %Y', time.localtime(self.st_mtime)) else: datestr = time.strftime('%d %b %H:%M', time.localtime(self.st_mtime)) filename = getattr(self, 'filename', '?') # not all servers support uid/gid uid = self.st_uid gid = self.st_gid size = self.st_size if uid is None: uid = 0 if gid is None: gid = 0 if size is None: size = 0 return '%s 1 %-8d %-8d %8d %-12s %s' % (ks, uid, gid, size, datestr, filename)
def __str__(self): """create a unix-style long description of the file (like ls -l)""" if self.st_mode is not None: kind = stat.S_IFMT(self.st_mode) if kind == stat.S_IFIFO: ks = 'p' elif kind == stat.S_IFCHR: ks = 'c' elif kind == stat.S_IFDIR: ks = 'd' elif kind == stat.S_IFBLK: ks = 'b' elif kind == stat.S_IFREG: ks = '-' elif kind == stat.S_IFLNK: ks = 'l' elif kind == stat.S_IFSOCK: ks = 's' else: ks = '?' ks += self._rwx( (self.st_mode & o700) >> 6, self.st_mode & stat.S_ISUID) ks += self._rwx( (self.st_mode & o70) >> 3, self.st_mode & stat.S_ISGID) ks += self._rwx( self.st_mode & 7, self.st_mode & stat.S_ISVTX, True) else: ks = '?---------' # compute display date if (self.st_mtime is None) or (self.st_mtime == xffffffff): # shouldn't really happen datestr = '(unknown date)' else: if abs(time.time() - self.st_mtime) > 15552000: # (15552000 = 6 months) datestr = time.strftime( '%d %b %Y', time.localtime(self.st_mtime)) else: datestr = time.strftime( '%d %b %H:%M', time.localtime(self.st_mtime)) filename = getattr(self, 'filename', '?') # not all servers support uid/gid uid = self.st_uid gid = self.st_gid size = self.st_size if uid is None: uid = 0 if gid is None: gid = 0 if size is None: size = 0 return '%s 1 %-8d %-8d %8d %-12s %s' % ( ks, uid, gid, size, datestr, filename)
def init_root(self): # we need the list of all real (non-checkpoint) generations client_name = self.obnam.app.settings['client-name'] generations = [ gen for gen in self.obnam.repo.get_client_generation_ids(client_name) if not self.obnam.repo.get_generation_key( gen, obnamlib.REPO_GENERATION_IS_CHECKPOINT)] # self.rootlist holds the stat information for each entry at # the root of the FUSE filesystem: /.pid, /latest, and one for # each generation. self.rootlist = {} used_generations = [] for gen in generations: genspec = self.obnam.repo.make_generation_spec(gen) path = '/' + genspec try: genstat = self.get_stat_in_generation(path) end = self.obnam.repo.get_generation_key( gen, obnamlib.REPO_GENERATION_ENDED) genstat.st_ctime = genstat.st_mtime = end self.rootlist[path] = genstat used_generations.append(gen) except obnamlib.ObnamError as e: logging.warning('Ignoring error %s', str(e)) assert used_generations # self.rootstat is the stat information for the root of the # FUSE filesystem. We set it to the same as that of the latest # generation. latest_gen_id = used_generations[-1] latest_gen_spec = self.obnam.repo.make_generation_spec(latest_gen_id) latest_gen_root_stat = self.rootlist['/' + latest_gen_spec] self.rootstat = fuse.Stat(**latest_gen_root_stat.__dict__) # Add an entry for /latest to self.rootlist. symlink_stat = fuse.Stat( target=latest_gen_spec, **latest_gen_root_stat.__dict__) symlink_stat.st_mode &= ~(stat.S_IFDIR | stat.S_IFREG) symlink_stat.st_mode |= stat.S_IFLNK self.rootlist['/latest'] = symlink_stat # Add an entry for /.pid to self.rootlist. pidstat = fuse.Stat(**self.rootstat.__dict__) pidstat.st_mode = ( stat.S_IFREG | stat.S_IRUSR | stat.S_IRGRP | stat.S_IROTH) self.rootlist['/.pid'] = pidstat
def chmod(self, ppath, mode): """ Change mode for files or directories :param ppath: path of file/dir :param mode: string of the mode Examples: flistmeta.chmod("/tmp/dir1", "777") """ fType, dirObj = self._search_db(ppath) if dirObj.dbobj.state != "": raise RuntimeError("%s: No such file or directory" % ppath) try: mode = int(mode, 8) except ValueError: raise ValueError("Invalid mode.") else: if fType == "D": _mode = mode + stat.S_IFDIR aclObj = self.aciCollection.get(dirObj.dbobj.aclkey) aclObj.dbobj.mode = _mode aclObj.save() elif fType == "F" or fType == "L": _mode = mode + stat.S_IFREG if fType == "F" else mode + stat.S_IFLNK _, propList = self._getPropertyList(dirObj.dbobj, fType) for file in propList: if file.name == j.sal.fs.getBaseName(ppath): aclObj = self.aciCollection.get(file.aclkey) aclObj.dbobj.mode = _mode aclObj.save() else: for file in dirObj.dbobj.links: if file.name == j.sal.fs.getBaseName(ppath): aclObj = self.aciCollection.get(file.aclkey) if stat.S_ISSOCK(aclObj.dbobj.st_mode): _mode = mode + stat.S_IFSOCK elif stat.S_ISBLK(aclObj.dbobj.st_mode): _mode = mode + stat.S_IFBLK elif stat.S_ISCHR(aclObj.dbobj.st_mode): _mode = mode + stat.S_IFCHR elif stat.S_ISFIFO(aclObj.dbobj.st_mode): _mode = mode + stat.S_IFIFO aclObj.dbobj.mode = _mode aclObj.save()