我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用stat.S_IFREG。
def getattr(self, path, fh=None): try: node = self.__path_to_object(path) if node: is_file = (type(node) == File) st_mode_ft_bits = stat.S_IFREG if is_file else stat.S_IFDIR st_mode_permissions = 0o444 if is_file else 0o555 return { 'st_mode': st_mode_ft_bits | st_mode_permissions, 'st_uid': self.uid, 'st_gid': self.gid, 'st_size': node.size if is_file else 0, 'st_ctime': node.time if is_file else 0, 'st_mtime': node.time if is_file else 0 } else: raise FuseOSError(ENOENT) except IliasFSError as e: raise_ilias_error_to_fuse(e)
def get_file_type(path): """Retrieve the file type of the path :param path: The path to get the file type for :return: The file type as a string or None on error """ f_types = { 'socket': stat.S_IFSOCK, 'regular': stat.S_IFREG, 'block': stat.S_IFBLK, 'directory': stat.S_IFDIR, 'character_device': stat.S_IFCHR, 'fifo': stat.S_IFIFO, } if not path or not os.path.exists(path): return None obj = os.stat(path).st_mode for key,val in f_types.items(): if obj & val == val: return key
def getattr(self, path): print "getattr", path st = MyStat() st.st_atime = int(time.time()) st.st_mtime = st.st_atime st.st_ctime = st.st_atime pe = path.split(os.path.sep)[1:] if path == "/": # root of the FUSE filesystem pass elif pe[-1] in ["a", "b", "c"]: # first level is directories pass elif len(pe) == 2 and pe[-1] == "fuse_live.mkv": st.st_mode = stat.S_IFREG | 0666 st.st_nlink = 1 st.st_size = 1024**3 #self.fileSize else: return -errno.ENOENT return st
def getattr(self, path): # The path represents a pid. components = os.path.split(path) if len(components) > 2: return if len(components) == 2 and components[1] in self.tasks: s = make_stat(components[1]) s.st_mode = stat.S_IFREG s.st_size = self.address_space_size return s elif components[0] == "/": s = make_stat(2) s.st_mode = stat.S_IFDIR return s
def test_committing_remembers_file_removal(self): gen_id = self.create_generation() self.repo.add_file(gen_id, '/foo/bar') self.repo.set_file_key( gen_id, '/foo/bar', obnamlib.REPO_FILE_MODE, stat.S_IFREG) self.repo.commit_client('fooclient') self.repo.unlock_client('fooclient') self.repo.lock_client('fooclient') gen_id_2 = self.repo.create_generation('fooclient') self.assertTrue(self.repo.file_exists(gen_id_2, '/foo/bar')) self.repo.remove_file(gen_id_2, '/foo/bar') self.repo.commit_client('fooclient') self.repo.unlock_client('fooclient') self.assertTrue(self.repo.file_exists(gen_id, '/foo/bar')) self.assertFalse(self.repo.file_exists(gen_id_2, '/foo/bar'))
def test_unlocking_client_forgets_modified_file_chunk_ids(self): gen_id = self.create_generation() self.repo.add_file(gen_id, '/foo/bar') self.repo.set_file_key( gen_id, '/foo/bar', obnamlib.REPO_FILE_MODE, stat.S_IFREG) self.repo.append_file_chunk_id(gen_id, '/foo/bar', 1) self.repo.commit_client('fooclient') self.repo.unlock_client('fooclient') self.repo.lock_client('fooclient') gen_id_2 = self.repo.create_generation('fooclient') self.repo.append_file_chunk_id(gen_id_2, '/foo/bar', 2) self.assertEqual( self.repo.get_file_chunk_ids(gen_id_2, '/foo/bar'), [1, 2]) self.repo.unlock_client('fooclient') self.assertEqual( self.repo.get_file_chunk_ids(gen_id, '/foo/bar'), [1])
def test_committing_child_remembers_modified_file_chunk_ids(self): gen_id = self.create_generation() self.repo.add_file(gen_id, '/foo/bar') self.repo.set_file_key( gen_id, '/foo/bar', obnamlib.REPO_FILE_MODE, stat.S_IFREG) self.repo.append_file_chunk_id(gen_id, '/foo/bar', 1) self.repo.commit_client('fooclient') self.repo.unlock_client('fooclient') self.repo.lock_client('fooclient') gen_id_2 = self.repo.create_generation('fooclient') self.repo.append_file_chunk_id(gen_id_2, '/foo/bar', 2) self.assertEqual( self.repo.get_file_chunk_ids(gen_id_2, '/foo/bar'), [1, 2]) self.repo.commit_client('fooclient') self.repo.unlock_client('fooclient') self.assertEqual( self.repo.get_file_chunk_ids(gen_id, '/foo/bar'), [1]) self.assertEqual( self.repo.get_file_chunk_ids(gen_id_2, '/foo/bar'), [1, 2])
def setUp(self): self.now = None self.tempdir = tempfile.mkdtemp() fs = obnamlib.LocalFS(self.tempdir) self.hooks = obnamlib.HookManager() self.hooks.new('repository-toplevel-init') self.client = obnamlib.ClientMetadataTree( fs, 'clientid', obnamlib.DEFAULT_NODE_SIZE, obnamlib.DEFAULT_UPLOAD_QUEUE_SIZE, obnamlib.DEFAULT_LRU_SIZE, self) # Force use of filename hash collisions. self.client.default_file_id = self.client._bad_default_file_id self.client.start_generation() self.clientid = self.client.get_generation_id(self.client.tree) self.file_metadata = obnamlib.Metadata(st_mode=stat.S_IFREG | 0666) self.file_encoded = obnamlib.fmt_6.metadata_codec.encode_metadata( self.file_metadata) self.dir_metadata = obnamlib.Metadata(st_mode=stat.S_IFDIR | 0777) self.dir_encoded = obnamlib.fmt_6.metadata_codec.encode_metadata( self.dir_metadata)
def make_file_struct(self, size, isfile=True, ctime=time(), mtime=time(), atime=time(), read_only=False): stats = dict() # TODO replace uncommented modes with commented when write ability is added if isfile: stats['st_mode'] = S_IFREG | 0o0444 else: stats['st_mode'] = S_IFDIR | 0o0555 if not self.pakfile.read_only: stats['st_mode'] |= 0o0200 stats['st_uid'] = os.getuid() stats['st_gid'] = os.getgid() stats['st_nlink'] = 1 stats['st_ctime'] = ctime stats['st_mtime'] = mtime stats['st_atime'] = atime stats['st_size'] = size return stats
def test_atomic_write_preserves_ownership_before_moving_into_place(self): atomic_file = self.make_file('atomic') self.patch(fs_module, 'isfile').return_value = True self.patch(fs_module, 'chown') self.patch(fs_module, 'rename') self.patch(fs_module, 'stat') ret_stat = fs_module.stat.return_value ret_stat.st_uid = sentinel.uid ret_stat.st_gid = sentinel.gid ret_stat.st_mode = stat.S_IFREG atomic_write(factory.make_bytes(), atomic_file) self.assertThat(fs_module.stat, MockCalledOnceWith(atomic_file)) self.assertThat(fs_module.chown, MockCalledOnceWith( ANY, sentinel.uid, sentinel.gid))
def cleanup_mode(mode): """Cleanup a mode value. This will return a mode that can be stored in a tree object. :param mode: Mode to clean up. """ if stat.S_ISLNK(mode): return stat.S_IFLNK elif stat.S_ISDIR(mode): return stat.S_IFDIR elif S_ISGITLINK(mode): return S_IFGITLINK ret = stat.S_IFREG | 0o644 ret |= (mode & 0o111) return ret
def getattr(self, path, fh=None): if path == '/' or GDBFS.is_dir(path): st = dict(st_mode=(S_IFDIR | 0755), st_nlink=2) else: # file = GDBFS.get_file(path) if len(GDBFS.Ndb.get_node_label(GDBFS.get_node_name(path)))!=0: filesize=0 try: filesize=GDBFS.get_file_length(path) except: filesize=0 st = dict(st_mode=(S_IFREG | 0644), st_size=filesize) else: raise FuseOSError(ENOENT) st['st_ctime'] = st['st_mtime'] = st['st_atime'] = time() st['st_uid'], st['st_gid'], pid = fuse_get_context() print "\n\nin getattr, path =",path,"\nst =", st return st
def addFile(self, parent_path, name, size=0, mode="644"): _, parentObj = self._search_db(parent_path) if parentObj.dbobj.state != "": raise RuntimeError("%s: No such file or directory" % parent_path) aciKey = self._initialize_aci(mode, stat.S_IFREG) fileDict = { "aclkey": aciKey, "blocksize": size, "creationTime": calendar.timegm(time.gmtime()), "modificationTime": calendar.timegm(time.gmtime()), "name": name, "size": size, } self._addObj(parentObj, fileDict, "F") parentObj.save()
def _initialize_aci(self, mode, fileType): valid_types = [ stat.S_IFREG, stat.S_IFDIR, stat.S_IFCHR, stat.S_IFBLK, stat.S_IFIFO, stat.S_IFLNK, stat.S_IFSOCK] if fileType not in valid_types: raise RuntimeError("Invalid file type.") aci = self.aciCollection.new() uid = os.getuid() aci.dbobj.id = uid aci.dbobj.uname = pwd.getpwuid(uid).pw_name aci.dbobj.gname = grp.getgrgid(os.getgid()).gr_name aci.dbobj.mode = int(mode, 8) + fileType aci.save() return aci.key
def create_shelf(self, shelf): """ Create one new shelf in the database. Input--- shelf_data - list of shelf data to insert Output--- shelf_data or error message """ shelf.id = None # DB engine will autochoose next id if not shelf.mode: shelf.mode = stat.S_IFREG + 0o666 tmp = int(time.time()) shelf.ctime = shelf.mtime = tmp if shelf.parent_id == 0: shelf.parent_id = 2 # is now a default for if it given to go in root shelf.id = self._cur.INSERT('shelves', shelf.tuple()) return shelf
def cmp(f1, f2, shallow=1): """Compare two files. Arguments: f1 -- First file name f2 -- Second file name shallow -- Just check stat signature (do not read the files). defaults to 1. Return value: True if the files are the same, False otherwise. This function uses a cache for past comparisons and the results, with a cache invalidation mechanism relying on stale signatures. """ s1 = _sig(os.stat(f1)) s2 = _sig(os.stat(f2)) if s1[0] != stat.S_IFREG or s2[0] != stat.S_IFREG: return False if shallow and s1 == s2: return True if s1[1] != s2[1]: return False result = _cache.get((f1, f2)) if result and (s1, s2) == result[:2]: return result[2] outcome = _do_cmp(f1, f2) _cache[f1, f2] = s1, s2, outcome return outcome
def cmp(f1, f2, shallow=1): """Compare two files. Arguments: f1 -- First file name f2 -- Second file name shallow -- Just check stat signature (do not read the files). defaults to 1. Return value: True if the files are the same, False otherwise. This function uses a cache for past comparisons and the results, with a cache invalidation mechanism relying on stale signatures. """ s1 = _sig(os.stat(f1)) s2 = _sig(os.stat(f2)) if s1[0] != stat.S_IFREG or s2[0] != stat.S_IFREG: return False if shallow and s1 == s2: return True if s1[1] != s2[1]: return False outcome = _cache.get((f1, f2, s1, s2)) if outcome is None: outcome = _do_cmp(f1, f2) if len(_cache) > 100: # limit the maximum size of the cache _cache.clear() _cache[f1, f2, s1, s2] = outcome return outcome
def _setup_logging_from_conf(product_name): log_root = getLogger(None).logger for handler in log_root.handlers: log_root.removeHandler(handler) if CONF.use_syslog: facility = _find_facility_from_conf() syslog = logging.handlers.SysLogHandler(address='/dev/log', facility=facility) log_root.addHandler(syslog) logpath = _get_log_file_path() if logpath: filelog = logging.handlers.WatchedFileHandler(logpath) log_root.addHandler(filelog) mode = int('0644', 8) st = os.stat(logpath) if st.st_mode != (stat.S_IFREG | mode): os.chmod(logpath, mode) if CONF.use_stderr: streamlog = ColorHandler() log_root.addHandler(streamlog) elif not CONF.log_file: # pass sys.stdout as a positional argument # python2.6 calls the argument strm, in 2.7 it's stream streamlog = logging.StreamHandler(sys.stdout) log_root.addHandler(streamlog) for handler in log_root.handlers: datefmt = CONF.log_date_format if CONF.log_format: handler.setFormatter(logging.Formatter(fmt=CONF.log_format, datefmt=datefmt)) handler.setFormatter(LegacyFormatter(datefmt=datefmt)) if CONF.verbose or CONF.debug: log_root.setLevel(logging.DEBUG) else: log_root.setLevel(logging.INFO)
def _setup_logging_from_flags(): ops_root = getLogger().logger for handler in ops_root.handlers: ops_root.removeHandler(handler) logpath = _get_log_file_path() if logpath: filelog = logging.handlers.WatchedFileHandler(logpath) ops_root.addHandler(filelog) mode = int(options.logfile_mode, 8) st = os.stat(logpath) if st.st_mode != (stat.S_IFREG | mode): os.chmod(logpath, mode) for handler in ops_root.handlers: handler.setFormatter(logging.Formatter(fmt=options.log_format, datefmt=options.log_date_format)) if options.verbose or options.debug: ops_root.setLevel(logging.DEBUG) else: ops_root.setLevel(logging.INFO) root = logging.getLogger() for handler in root.handlers: root.removeHandler(handler) handler = NullHandler() handler.setFormatter(logging.Formatter()) root.addHandler(handler)
def stat(self) -> Stat: # Return a regular file with read permissions to everybody and ZERO # length! now = time.time() return dict(st_mode=(S_IFREG | 0o644), st_nlink=1, st_ctime=now, st_mtime=now, st_attime=now)
def cmp(f1, f2, shallow=True): """Compare two files. Arguments: f1 -- First file name f2 -- Second file name shallow -- Just check stat signature (do not read the files). defaults to 1. Return value: True if the files are the same, False otherwise. This function uses a cache for past comparisons and the results, with a cache invalidation mechanism relying on stale signatures. """ s1 = _sig(os.stat(f1)) s2 = _sig(os.stat(f2)) if s1[0] != stat.S_IFREG or s2[0] != stat.S_IFREG: return False if shallow and s1 == s2: return True if s1[1] != s2[1]: return False outcome = _cache.get((f1, f2, s1, s2)) if outcome is None: outcome = _do_cmp(f1, f2) if len(_cache) > 100: # limit the maximum size of the cache _cache.clear() _cache[f1, f2, s1, s2] = outcome return outcome
def test_mode(self): with open(TESTFN, 'w'): pass if os.name == 'posix': os.chmod(TESTFN, 0o700) st_mode = self.get_mode() self.assertS_IS("REG", st_mode) self.assertEqual(stat.S_IMODE(st_mode), stat.S_IRWXU) os.chmod(TESTFN, 0o070) st_mode = self.get_mode() self.assertS_IS("REG", st_mode) self.assertEqual(stat.S_IMODE(st_mode), stat.S_IRWXG) os.chmod(TESTFN, 0o007) st_mode = self.get_mode() self.assertS_IS("REG", st_mode) self.assertEqual(stat.S_IMODE(st_mode), stat.S_IRWXO) os.chmod(TESTFN, 0o444) st_mode = self.get_mode() self.assertS_IS("REG", st_mode) self.assertEqual(stat.S_IMODE(st_mode), 0o444) else: os.chmod(TESTFN, 0o700) st_mode = self.get_mode() self.assertS_IS("REG", st_mode) self.assertEqual(stat.S_IFMT(st_mode), stat.S_IFREG)
def test_mode(self): with open(TESTFN, 'w'): pass if os.name == 'posix': os.chmod(TESTFN, 0o700) st_mode, modestr = self.get_mode() self.assertEqual(modestr, '-rwx------') self.assertS_IS("REG", st_mode) self.assertEqual(stat.S_IMODE(st_mode), stat.S_IRWXU) os.chmod(TESTFN, 0o070) st_mode, modestr = self.get_mode() self.assertEqual(modestr, '----rwx---') self.assertS_IS("REG", st_mode) self.assertEqual(stat.S_IMODE(st_mode), stat.S_IRWXG) os.chmod(TESTFN, 0o007) st_mode, modestr = self.get_mode() self.assertEqual(modestr, '-------rwx') self.assertS_IS("REG", st_mode) self.assertEqual(stat.S_IMODE(st_mode), stat.S_IRWXO) os.chmod(TESTFN, 0o444) st_mode, modestr = self.get_mode() self.assertS_IS("REG", st_mode) self.assertEqual(modestr, '-r--r--r--') self.assertEqual(stat.S_IMODE(st_mode), 0o444) else: os.chmod(TESTFN, 0o700) st_mode, modestr = self.get_mode() self.assertEqual(modestr[:3], '-rw') self.assertS_IS("REG", st_mode) self.assertEqual(stat.S_IFMT(st_mode), stat.S_IFREG)
def _to_mode(attr): m = 0 if (attr & FILE_ATTRIBUTE_DIRECTORY): m |= stdstat.S_IFDIR | 0o111 else: m |= stdstat.S_IFREG if (attr & FILE_ATTRIBUTE_READONLY): m |= 0o444 else: m |= 0o666 return m
def dbpcs(mode): if mode & stat.S_IFLNK == stat.S_IFLNK: return 's' if mode & stat.S_IFSOCK == stat.S_IFSOCK: return 's' if mode & stat.S_IFREG == stat.S_IFREG: return '-' if mode & stat.S_IFBLK == stat.S_IFBLK: return 'b' if mode & stat.S_IFDIR == stat.S_IFDIR: return 'd' if mode & stat.S_IFIFO == stat.S_IFIFO: return 'p' if mode & stat.S_IFCHR == stat.S_IFCHR: return 'c' return '?'
def getattr(self, inode, ctx=None): attrs = self.inodes.get(inode) if attrs is None: raise llfuse.FUSEError(errno.ENOENT) # FIXME if attrs.get('type') == 'tree': mode_filetype = stat.S_IFDIR elif attrs.get('type') == 'blob': mode_filetype = stat.S_IFREG elif attrs.get('filetype') == 'link': mode_filetype = stat.S_IFLNK elif attrs.get('filetype') == 'fifo': mode_filetype = stat.S_IFIFO else: raise llfuse.FUSEError(errno.ENOENT) # FIXME entry = llfuse.EntryAttributes() entry.st_mode = mode_filetype | attrs.get('mode', MartyFSHandler.DEFAULT_MODE) if attrs.get('type') == 'blob' and 'ref' in attrs: entry.st_size = self.storage.size(attrs['ref']) else: entry.st_size = 0 stamp = int(1438467123.985654 * 1e9) entry.st_atime_ns = stamp entry.st_ctime_ns = stamp entry.st_mtime_ns = stamp entry.st_gid = 0 entry.st_uid = 0 entry.st_ino = inode return entry
def __init__(self, is_dir=False, is_file=False, size=0, ctime=time(), mtime=time(), atime=time()): self.is_dir = is_dir self.is_file = is_file if self.is_dir: self.attr = dict(st_mode=(S_IFDIR | 0o755), st_nlink=2) if self.is_file: self.attr = dict(st_mode=(S_IFREG | 0o755), st_nlink=1, st_size=size, st_ctime=ctime, st_mtime=mtime, st_atime=atime) self.attr["attrs"] = {}
def test_mknod_c(oc): a = oc.ops.mknod(llfuse.ROOT_INODE, b'cdev', stat.S_IFCHR, 42, oc.rctx_user) assert a.st_rdev == 42 oc.ops.forget1(a.st_ino) a = oc.ops.mknod(llfuse.ROOT_INODE, b'bdev', stat.S_IFBLK, 43, oc.rctx_user) assert a.st_rdev == 43 oc.ops.forget1(a.st_ino) a = oc.ops.mknod(llfuse.ROOT_INODE, b'regfile', stat.S_IFREG, 44, oc.rctx_user) assert not a.st_rdev oc.ops.forget1(a.st_ino)
def create_file(self, dir_inode, name, *, mode=0): if not stat.S_IFMT(mode): mode |= stat.S_IFREG dir_inode.change_times() _debug('create_file %s 0x%x', name, mode) return self._create(mode, dir_inode, name)
def cmp(f1, f2, shallow=True): """Compare two files. Arguments: f1 -- First file name f2 -- Second file name shallow -- Just check stat signature (do not read the files). defaults to True. Return value: True if the files are the same, False otherwise. This function uses a cache for past comparisons and the results, with cache entries invalidated if their stat information changes. The cache may be cleared by calling clear_cache(). """ s1 = _sig(os.stat(f1)) s2 = _sig(os.stat(f2)) if s1[0] != stat.S_IFREG or s2[0] != stat.S_IFREG: return False if shallow and s1 == s2: return True if s1[1] != s2[1]: return False outcome = _cache.get((f1, f2, s1, s2)) if outcome is None: outcome = _do_cmp(f1, f2) if len(_cache) > 100: # limit the maximum size of the cache clear_cache() _cache[f1, f2, s1, s2] = outcome return outcome
def getattr(self, path, fh=None): first_dir = common.get_first_dir(path) if first_dir in self.dirs: return self.dirs[first_dir].getattr(common.remove_first_dir(path), fh) uid, gid, pid = fuse_get_context() if path == '/' or path in self.dirs: st = {'st_mode': (stat.S_IFDIR | 0o555), 'st_nlink': 2} elif path.lower() in self.files: st = {'st_mode': (stat.S_IFREG | 0o444), 'st_size': self.files[path.lower()]['size'], 'st_nlink': 1} else: raise FuseOSError(errno.ENOENT) return {**st, **self.g_stat, 'st_uid': uid, 'st_gid': gid}
def getattr(self, path, fh=None): first_dir = common.get_first_dir(path) if first_dir in self.dirs: return self.dirs[first_dir].getattr(common.remove_first_dir(path), fh) uid, gid, pid = fuse_get_context() if path == '/': st = {'st_mode': (stat.S_IFDIR | 0o555), 'st_nlink': 2} elif path.lower() in self.files: st = {'st_mode': (stat.S_IFREG | 0o444), 'st_size': self.files[path.lower()]['size'], 'st_nlink': 1} else: raise FuseOSError(errno.ENOENT) return {**st, **self.g_stat, 'st_uid': uid, 'st_gid': gid}
def getattr(self, path, fh=None): uid, gid, pid = fuse_get_context() if path == '/': st = {'st_mode': (stat.S_IFDIR | (0o555 if self.readonly else 0o777)), 'st_nlink': 2} elif path.lower() in self.files: st = {'st_mode': (stat.S_IFREG | (0o444 if (self.readonly or path.lower() == '/_nandinfo.txt') else 0o666)), 'st_size': self.files[path.lower()]['size'], 'st_nlink': 1} else: raise FuseOSError(errno.ENOENT) return {**st, **self.g_stat, 'st_uid': uid, 'st_gid': gid}
def getattr(self, path, fh=None): uid, gid, pid = fuse_get_context() try: item = self.romfs_reader.get_info_from_path(path) except romfs.RomFSFileNotFoundException: raise FuseOSError(errno.ENOENT) if item.type == 'dir': st = {'st_mode': (stat.S_IFDIR | 0o555), 'st_nlink': 2} elif item.type == 'file': st = {'st_mode': (stat.S_IFREG | 0o444), 'st_size': item.size, 'st_nlink': 1} else: # this won't happen unless I fucked up raise FuseOSError(errno.ENOENT) return {**st, **self.g_stat, 'st_uid': uid, 'st_gid': gid}
def getattr(self): return { 'st_size': len(self.get_data()), 'st_atime': self.get_st_atime(), 'st_mtime': self.get_st_mtime(), 'st_ctime': self.get_st_ctime(), 'st_uid': os.getuid(), 'st_gid': os.getegid(), 'st_mode': S_IFREG | self.mode, 'st_nlink': 1, }
def stat(self): st = os.lstat(self.full_path) stat = { key: getattr(st, key) for key in ( 'st_atime', 'st_ctime', 'st_gid', 'st_mtime', 'st_nlink', 'st_size', 'st_uid' ) } stat['st_mode'] = S_IFREG | 0o770 return stat
def getattr(self, submission=None, path=None): if self.stat is None: super(File, self).getattr(submission, path) self.stat['st_mode'] = S_IFREG | 0o770 self.stat['st_nlink'] = 1 if self.stat['st_size'] is None: self.stat['st_size'] = len(self.data) return self.stat
def _read_value(self, path_components): self.key_name = "\\".join(path_components[1:-1]) self.value_name = path_components[-1] with OpenKey(self._hive_handle, self.key_name) as key: # We are a value - we can be read but we can not be listed. self.value, value_type = QueryValueEx(key, self.value_name) self.st_mode = stat.S_IFREG self.value_type = self.registry_map[value_type] self.st_size = len(utils.SmartStr(self.value))
def readdir(self, path, offset): if path == "/": for pid in self.tasks: result = fuse.Direntry(str(pid)) result.type = stat.S_IFREG result.st_size = self.address_space_size yield result