我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用stat.S_IFDIR。
def getattr(self, path, fh=None): try: node = self.__path_to_object(path) if node: is_file = (type(node) == File) st_mode_ft_bits = stat.S_IFREG if is_file else stat.S_IFDIR st_mode_permissions = 0o444 if is_file else 0o555 return { 'st_mode': st_mode_ft_bits | st_mode_permissions, 'st_uid': self.uid, 'st_gid': self.gid, 'st_size': node.size if is_file else 0, 'st_ctime': node.time if is_file else 0, 'st_mtime': node.time if is_file else 0 } else: raise FuseOSError(ENOENT) except IliasFSError as e: raise_ilias_error_to_fuse(e)
def get_file_type(path): """Retrieve the file type of the path :param path: The path to get the file type for :return: The file type as a string or None on error """ f_types = { 'socket': stat.S_IFSOCK, 'regular': stat.S_IFREG, 'block': stat.S_IFBLK, 'directory': stat.S_IFDIR, 'character_device': stat.S_IFCHR, 'fifo': stat.S_IFIFO, } if not path or not os.path.exists(path): return None obj = os.stat(path).st_mode for key,val in f_types.items(): if obj & val == val: return key
def __init__(self, filename=None, **kwargs): filename = common.FileSpec(filename, filesystem="Reg", path_sep="\\") super(RegistryKeyInformation, self).__init__( filename=filename, **kwargs) self.hive = self.key_name = self.value_name = self.value = "" self.value_type = "REG_NONE" self.st_mode = stat.S_IFDIR path_components = self.filename.components() if not path_components: return # The first component MUST be a hive self.hive = path_components[0] self._hive_handle = KeyHandle(getattr(_winreg, self.hive, None)) if self._hive_handle is None: raise IOError("Unknown hive name %s" % self.hive) # Maybe its a key. try: self._read_key(path_components) except exceptions.WindowsError: # Nop - maybe its a value then. self._read_value(path_components)
def getattr(self, path): # The path represents a pid. components = os.path.split(path) if len(components) > 2: return if len(components) == 2 and components[1] in self.tasks: s = make_stat(components[1]) s.st_mode = stat.S_IFREG s.st_size = self.address_space_size return s elif components[0] == "/": s = make_stat(2) s.st_mode = stat.S_IFDIR return s
def backup_parents(self, root): '''Back up parents of root, non-recursively.''' root = self.fs.abspath(root) tracing.trace('backing up parents of %s', root) dummy_metadata = obnamlib.Metadata(st_mode=0777 | stat.S_IFDIR) while True: parent = os.path.dirname(root) try: metadata = obnamlib.read_metadata(self.fs, root) except OSError, e: logging.warning( 'Failed to get metadata for %s: %s: %s', root, e.errno or 0, e.strerror) logging.warning('Using fake metadata instead for %s', root) metadata = dummy_metadata if not self.pretend: self.add_file_to_generation(root, metadata) if root == parent: break root = parent
def setUp(self): self.now = None self.tempdir = tempfile.mkdtemp() fs = obnamlib.LocalFS(self.tempdir) self.hooks = obnamlib.HookManager() self.hooks.new('repository-toplevel-init') self.client = obnamlib.ClientMetadataTree( fs, 'clientid', obnamlib.DEFAULT_NODE_SIZE, obnamlib.DEFAULT_UPLOAD_QUEUE_SIZE, obnamlib.DEFAULT_LRU_SIZE, self) # Force use of filename hash collisions. self.client.default_file_id = self.client._bad_default_file_id self.client.start_generation() self.clientid = self.client.get_generation_id(self.client.tree) self.file_metadata = obnamlib.Metadata(st_mode=stat.S_IFREG | 0666) self.file_encoded = obnamlib.fmt_6.metadata_codec.encode_metadata( self.file_metadata) self.dir_metadata = obnamlib.Metadata(st_mode=stat.S_IFDIR | 0777) self.dir_encoded = obnamlib.fmt_6.metadata_codec.encode_metadata( self.dir_metadata)
def make_file_struct(self, size, isfile=True, ctime=time(), mtime=time(), atime=time(), read_only=False): stats = dict() # TODO replace uncommented modes with commented when write ability is added if isfile: stats['st_mode'] = S_IFREG | 0o0444 else: stats['st_mode'] = S_IFDIR | 0o0555 if not self.pakfile.read_only: stats['st_mode'] |= 0o0200 stats['st_uid'] = os.getuid() stats['st_gid'] = os.getgid() stats['st_nlink'] = 1 stats['st_ctime'] = ctime stats['st_mtime'] = mtime stats['st_atime'] = atime stats['st_size'] = size return stats
def cleanup_mode(mode): """Cleanup a mode value. This will return a mode that can be stored in a tree object. :param mode: Mode to clean up. """ if stat.S_ISLNK(mode): return stat.S_IFLNK elif stat.S_ISDIR(mode): return stat.S_IFDIR elif S_ISGITLINK(mode): return S_IFGITLINK ret = stat.S_IFREG | 0o644 ret |= (mode & 0o111) return ret
def getattr(self, path, fh=None): if path == '/' or GDBFS.is_dir(path): st = dict(st_mode=(S_IFDIR | 0755), st_nlink=2) else: # file = GDBFS.get_file(path) if len(GDBFS.Ndb.get_node_label(GDBFS.get_node_name(path)))!=0: filesize=0 try: filesize=GDBFS.get_file_length(path) except: filesize=0 st = dict(st_mode=(S_IFREG | 0644), st_size=filesize) else: raise FuseOSError(ENOENT) st['st_ctime'] = st['st_mtime'] = st['st_atime'] = time() st['st_uid'], st['st_gid'], pid = fuse_get_context() print "\n\nin getattr, path =",path,"\nst =", st return st
def _initialize_aci(self, mode, fileType): valid_types = [ stat.S_IFREG, stat.S_IFDIR, stat.S_IFCHR, stat.S_IFBLK, stat.S_IFIFO, stat.S_IFLNK, stat.S_IFSOCK] if fileType not in valid_types: raise RuntimeError("Invalid file type.") aci = self.aciCollection.new() uid = os.getuid() aci.dbobj.id = uid aci.dbobj.uname = pwd.getpwuid(uid).pw_name aci.dbobj.gname = grp.getgrgid(os.getgid()).gr_name aci.dbobj.mode = int(mode, 8) + fileType aci.save() return aci.key
def getattr(self, path, fh=None): ''' Returns a dictionary with keys identical to the stat C structure of stat(2). st_atime, st_mtime and st_ctime should be floats. NOTE: There is an incombatibility between Linux and Mac OS X concerning st_nlink of directories. Mac OS X counts all files inside the directory, while Linux counts only the subdirectories. ''' if path != '/': raise FuseOSError(ENOENT) return dict(st_mode=(S_IFDIR | 0o755), st_nlink=2)
def _list_directory(self, st): return bool(st.st_mode & stat.S_IFDIR)
def test_ensure_tree(self): tmpdir = tempfile.mkdtemp() try: testdir = '%s/foo/bar/baz' % (tmpdir,) fileutils.ensure_tree(testdir, TEST_PERMISSIONS) self.assertTrue(os.path.isdir(testdir)) self.assertEqual(os.stat(testdir).st_mode, TEST_PERMISSIONS | stat.S_IFDIR) finally: if os.path.exists(tmpdir): shutil.rmtree(tmpdir)
def find_data_to_stat(data): """Convert Win32 FIND_DATA struct to stat_result.""" # First convert Win32 dwFileAttributes to st_mode attributes = data.dwFileAttributes st_mode = 0 if attributes & FILE_ATTRIBUTE_DIRECTORY: st_mode |= S_IFDIR | 0o111 else: st_mode |= S_IFREG if attributes & FILE_ATTRIBUTE_READONLY: st_mode |= 0o444 else: st_mode |= 0o666 if (attributes & FILE_ATTRIBUTE_REPARSE_POINT and data.dwReserved0 == IO_REPARSE_TAG_SYMLINK): st_mode ^= st_mode & 0o170000 st_mode |= S_IFLNK st_size = data.nFileSizeHigh << 32 | data.nFileSizeLow st_atime = filetime_to_time(data.ftLastAccessTime) st_mtime = filetime_to_time(data.ftLastWriteTime) st_ctime = filetime_to_time(data.ftCreationTime) # Some fields set to zero per CPython's posixmodule.c: st_ino, st_dev, # st_nlink, st_uid, st_gid return Win32StatResult(st_mode, 0, 0, 0, 0, 0, st_size, st_atime, st_mtime, st_ctime, int(st_atime * 1000000000), int(st_mtime * 1000000000), int(st_ctime * 1000000000), attributes)
def is_dir(self, follow_symlinks=True): is_symlink = self.is_symlink() if follow_symlinks and is_symlink: try: return self.stat().st_mode & 0o170000 == S_IFDIR except OSError as e: if e.errno != ENOENT: raise return False elif is_symlink: return False else: return (self._find_data.dwFileAttributes & FILE_ATTRIBUTE_DIRECTORY != 0)
def is_dir(self, follow_symlinks=True): if (self._d_type == DT_UNKNOWN or (follow_symlinks and self.is_symlink())): try: st = self.stat(follow_symlinks=follow_symlinks) except OSError as e: if e.errno != ENOENT: raise return False return st.st_mode & 0o170000 == S_IFDIR else: return self._d_type == DT_DIR
def stat(self) -> Stat: # Return a directory with read and execute (traverse) permissions for # everybody. now = time.time() return dict(st_mode=(S_IFDIR | 0o755), st_nlink=2, st_ctime=now, st_mtime=now, st_attime=now)
def lstat(self, path): """Get attributes of a file, directory, or symlink This method queries the attributes of a file, directory, or symlink. Unlike :meth:`stat`, this method should return the attributes of a symlink itself rather than the target of that link. :param bytes path: The path of the file, directory, or link to get attributes for :returns: An :class:`SFTPAttrs` or an os.stat_result containing the file attributes :raises: :exc:`SFTPError` to return an error to the client """ try: stat = self._vfs.stat(path).stat except VFSFileNotFoundError: raise SFTPError(FX_NO_SUCH_FILE, 'No such file') mod = S_IFDIR if stat.type == Stat.DIRECTORY else S_IFREG return SFTPAttrs(**{ 'size': stat.size, 'uid': os.getuid(), 'gid': os.getgid(), 'permissions': mod | S_IRWXU | S_IRWXG | S_IRWXO, 'atime': stat.atime, 'mtime': stat.mtime })
def _to_mode(attr): m = 0 if (attr & FILE_ATTRIBUTE_DIRECTORY): m |= stdstat.S_IFDIR | 0o111 else: m |= stdstat.S_IFREG if (attr & FILE_ATTRIBUTE_READONLY): m |= 0o444 else: m |= 0o666 return m
def dbpcs(mode): if mode & stat.S_IFLNK == stat.S_IFLNK: return 's' if mode & stat.S_IFSOCK == stat.S_IFSOCK: return 's' if mode & stat.S_IFREG == stat.S_IFREG: return '-' if mode & stat.S_IFBLK == stat.S_IFBLK: return 'b' if mode & stat.S_IFDIR == stat.S_IFDIR: return 'd' if mode & stat.S_IFIFO == stat.S_IFIFO: return 'p' if mode & stat.S_IFCHR == stat.S_IFCHR: return 'c' return '?'
def getattr(self, inode, ctx=None): attrs = self.inodes.get(inode) if attrs is None: raise llfuse.FUSEError(errno.ENOENT) # FIXME if attrs.get('type') == 'tree': mode_filetype = stat.S_IFDIR elif attrs.get('type') == 'blob': mode_filetype = stat.S_IFREG elif attrs.get('filetype') == 'link': mode_filetype = stat.S_IFLNK elif attrs.get('filetype') == 'fifo': mode_filetype = stat.S_IFIFO else: raise llfuse.FUSEError(errno.ENOENT) # FIXME entry = llfuse.EntryAttributes() entry.st_mode = mode_filetype | attrs.get('mode', MartyFSHandler.DEFAULT_MODE) if attrs.get('type') == 'blob' and 'ref' in attrs: entry.st_size = self.storage.size(attrs['ref']) else: entry.st_size = 0 stamp = int(1438467123.985654 * 1e9) entry.st_atime_ns = stamp entry.st_ctime_ns = stamp entry.st_mtime_ns = stamp entry.st_gid = 0 entry.st_uid = 0 entry.st_ino = inode return entry
def __init__(self, is_dir=False, is_file=False, size=0, ctime=time(), mtime=time(), atime=time()): self.is_dir = is_dir self.is_file = is_file if self.is_dir: self.attr = dict(st_mode=(S_IFDIR | 0o755), st_nlink=2) if self.is_file: self.attr = dict(st_mode=(S_IFREG | 0o755), st_nlink=1, st_size=size, st_ctime=ctime, st_mtime=mtime, st_atime=atime) self.attr["attrs"] = {}
def create_dir(self, dir_inode, name, *, mode=0): if not stat.S_IFMT(mode): mode |= stat.S_IFDIR dir_inode.change_times() _debug('create_dir %s 0x%x', name, mode) return self._create(mode, dir_inode, name)
def getattr(self, path, fh=None): first_dir = common.get_first_dir(path) if first_dir in self.dirs: return self.dirs[first_dir].getattr(common.remove_first_dir(path), fh) uid, gid, pid = fuse_get_context() if path == '/' or path in self.dirs: st = {'st_mode': (stat.S_IFDIR | 0o555), 'st_nlink': 2} elif path.lower() in self.files: st = {'st_mode': (stat.S_IFREG | 0o444), 'st_size': self.files[path.lower()]['size'], 'st_nlink': 1} else: raise FuseOSError(errno.ENOENT) return {**st, **self.g_stat, 'st_uid': uid, 'st_gid': gid}
def getattr(self, path, fh=None): first_dir = common.get_first_dir(path) if first_dir in self.dirs: return self.dirs[first_dir].getattr(common.remove_first_dir(path), fh) uid, gid, pid = fuse_get_context() if path == '/': st = {'st_mode': (stat.S_IFDIR | 0o555), 'st_nlink': 2} elif path.lower() in self.files: st = {'st_mode': (stat.S_IFREG | 0o444), 'st_size': self.files[path.lower()]['size'], 'st_nlink': 1} else: raise FuseOSError(errno.ENOENT) return {**st, **self.g_stat, 'st_uid': uid, 'st_gid': gid}
def getattr(self, path, fh=None): uid, gid, pid = fuse_get_context() if path == '/': st = {'st_mode': (stat.S_IFDIR | (0o555 if self.readonly else 0o777)), 'st_nlink': 2} elif path.lower() in self.files: st = {'st_mode': (stat.S_IFREG | (0o444 if (self.readonly or path.lower() == '/_nandinfo.txt') else 0o666)), 'st_size': self.files[path.lower()]['size'], 'st_nlink': 1} else: raise FuseOSError(errno.ENOENT) return {**st, **self.g_stat, 'st_uid': uid, 'st_gid': gid}
def getattr(self, path, fh=None): uid, gid, pid = fuse_get_context() try: item = self.romfs_reader.get_info_from_path(path) except romfs.RomFSFileNotFoundException: raise FuseOSError(errno.ENOENT) if item.type == 'dir': st = {'st_mode': (stat.S_IFDIR | 0o555), 'st_nlink': 2} elif item.type == 'file': st = {'st_mode': (stat.S_IFREG | 0o444), 'st_size': item.size, 'st_nlink': 1} else: # this won't happen unless I fucked up raise FuseOSError(errno.ENOENT) return {**st, **self.g_stat, 'st_uid': uid, 'st_gid': gid}
def getattr(self, path, fh=None): (typ,kw) = path_type(path) now = time.time() if typ=='root' or typ=='histories': # Simple directory st = dict(st_mode=(S_IFDIR | 0555), st_nlink=2) st['st_ctime'] = st['st_mtime'] = st['st_atime'] = now elif typ=='datasets': # Simple directory st = dict(st_mode=(S_IFDIR | 0555), st_nlink=2) st['st_ctime'] = st['st_mtime'] = st['st_atime'] = now elif typ=='historydataorcoll': # Dataset or collection d = self._dataset(kw) if d['history_content_type'] == 'dataset_collection': # A collection, will be a simple directory. st = dict(st_mode=(S_IFDIR | 0555), st_nlink=2) st['st_ctime'] = st['st_mtime'] = st['st_atime'] = now else: # A file, will be a symlink to a galaxy dataset. t = time.mktime(time.strptime(d['update_time'],'%Y-%m-%dT%H:%M:%S.%f')) fname = esc_filename(d.get('file_path', d['file_name'])) st = dict(st_mode=(S_IFLNK | 0444), st_nlink=1, st_size=len(fname), st_ctime=t, st_mtime=t, st_atime=t) elif typ=='collectiondataset': # A file within a collection, will be a symlink to a galaxy dataset. d = self._dataset(kw, display=False) t = time.mktime(time.strptime(d['update_time'],'%Y-%m-%dT%H:%M:%S.%f')) fname = esc_filename(d.get('file_path', d['file_name'])) st = dict(st_mode=(S_IFLNK | 0444), st_nlink=1, st_size=len(fname), st_ctime=t, st_mtime=t, st_atime=t) else: raise FuseOSError(ENOENT) return st # Return a symlink for the given dataset
def is_dir(self, ino): return And(self.is_valid(ino), self._attrs.mode(ino) & stat.S_IFDIR != 0)
def is_regular(self, ino): return And(self.is_valid(ino), self._attrs.mode(ino) & stat.S_IFDIR == 0) ##
def is_dir(self, ino): attr = self._inode.get_iattr(ino) return And(self.is_valid(ino), attr.mode & S_IFDIR != 0)
def is_regular(self, ino): attr = self._inode.get_iattr(ino) return And(self.is_valid(ino), attr.mode & S_IFDIR == 0) ###
def forget(self, ino): if Or(self.get_iattr(ino).mode & S_IFDIR != 0, self.get_iattr(ino).nlink != 1): return assertion(self.is_regular(ino), "forget: ino is not regular") self._inode.begin_tx() attr = self._inode.get_iattr(ino) attr.nlink = 0 self._inode.set_iattr(ino, attr) self._inode.commit_tx()
def getattr(self, submission=None, path=None): if self.stat is None: super(Directory, self).getattr(submission, path) mode = 0o770 if self.writable else 0o550 self.stat['st_mode'] = S_IFDIR | mode self.stat['st_nlink'] = 2 self.stat['st_atime'] = time() return self.stat
def __init__(self, *args, **kwargs): super(TempDirectory, self).__init__(*args, **kwargs) self.stat = { 'st_size': 0, 'st_atime': time(), 'st_ctime': time(), 'st_size': 0, 'st_mtime': time(), 'st_mode': S_IFDIR | 0o770, 'st_nlink': 2, }
def __init__(self): self.st_mode = stat.S_IFDIR | 0755 self.st_ino = 0 # handled by FUSE self.st_dev = 0 # handled by FUSE self.st_nlink = 2 # a directory has two links: itself and ".." self.st_uid = 0 self.st_gid = 0 self.st_size = 4096 self.st_atime = 0 self.st_mtime = 0 self.st_ctime = 0 # Producer thread that reads data from a stream, writes it to a file (if given), # and notifies other threads via a condition variable.
def getattr(self, path): st = MyStat() if path == '/': st.st_mode = stat.S_IFDIR | 0o755 st.st_nlink = 2 return st if path == "/.command": st.st_mode = stat.S_IFDIR | 0o755 return st if path.startswith("/.command/"): name = path.split("/", 3)[2] if name in ("writeInto",): st.st_mode = stat.S_IFREG | 0o444 return st return -errno.ENOENT # Get field field = self.getField(path) if not field: return -errno.ENOENT # Set size and mode if field.is_field_set: st.st_mode = stat.S_IFDIR | 0o755 else: st.st_mode = stat.S_IFREG | 0o444 st.st_nlink = 1 if field.hasValue(): st.st_size = len(self.fieldValue(field)) else: st.st_size = 0 return st
def readdir(self, path, offset): log.info("readdir(%s)" % path) yield fuse.Direntry('.') yield fuse.Direntry('..') if path == "/.command": for entry in self.readCommandDir(): yield entry return # Get field fieldset = self.getField(path) # if not fieldset: # return -errno.ENOENT if path == "/": entry = fuse.Direntry(".command") entry.type = stat.S_IFREG yield entry # Format file name count = len(fieldset) if count % 10: count += 10 - (count % 10) format = "%%0%ud-%%s" % (count // 10) # Create entries for index, field in enumerate(fieldset): name = format % (1 + index, field.name) entry = fuse.Direntry(name) if field.is_field_set: entry.type = stat.S_IFDIR else: entry.type = stat.S_IFREG yield entry log.info("readdir(%s) done" % path)
def test_creates_GADirectory_from_dict(self): orig = obnamlib.GADirectory() orig.add_file('.') orig.set_file_key( '.', obnamlib.REPO_FILE_MODE, stat.S_IFDIR | 0755) orig.add_file('README') orig.set_file_key( 'README', obnamlib.REPO_FILE_MODE, stat.S_IFREG | 0644) orig.add_subdir('.git', 'git-dir-id') new = obnamlib.create_gadirectory_from_dict(orig.as_dict()) self.assertEqual(new.as_dict(), orig.as_dict())
def test_isdir_returns_true_for_directory(self): metadata = obnamlib.Metadata(st_mode=stat.S_IFDIR) self.assert_(metadata.isdir())
def test_gets_file_child(self): gen_id = self.create_generation() self.repo.add_file(gen_id, '/foo') self.repo.set_file_key( gen_id, '/foo', obnamlib.REPO_FILE_MODE, stat.S_IFDIR | 0700) self.repo.add_file(gen_id, '/foo/bar') self.repo.set_file_key( gen_id, '/foo/bar', obnamlib.REPO_FILE_MODE, stat.S_IFREG | 0700) self.assertEqual( self.repo.get_file_children(gen_id, '/foo'), ['/foo/bar'])
def verify(self, img, **args): """Returns a tuple of lists of the form (errors, warnings, info). The error list will be empty if the action has been correctly installed in the given image.""" lstat, errors, warnings, info, abort = \ self.verify_fsobj_common(img, stat.S_IFDIR) return errors, warnings, info
def check(self): """Check this object for internal consistency. :raise ObjectFormatException: if the object is malformed in some way """ super(Tree, self).check() last = None allowed_modes = (stat.S_IFREG | 0o755, stat.S_IFREG | 0o644, stat.S_IFLNK, stat.S_IFDIR, S_IFGITLINK, # TODO: optionally exclude as in git fsck --strict stat.S_IFREG | 0o664) for name, mode, sha in parse_tree(b''.join(self._chunked_text), True): check_hexsha(sha, 'invalid sha %s' % sha) if b'/' in name or name in (b'', b'.', b'..'): raise ObjectFormatException('invalid name %s' % name) if mode not in allowed_modes: raise ObjectFormatException('invalid mode %06o' % mode) entry = (name, (mode, sha)) if last: if key_entry(last) > key_entry(entry): raise ObjectFormatException('entries not sorted') if name == last[0]: raise ObjectFormatException('duplicate entry %s' % name) last = entry
def as_pretty_string(self): text = [] for name, mode, hexsha in self.iteritems(): if mode & stat.S_IFDIR: kind = "tree" else: kind = "blob" text.append("%04o %s %s\t%s\n" % (mode, kind, hexsha, name)) return "".join(text)
def commit_tree(object_store, blobs): """Commit a new tree. :param object_store: Object store to add trees to :param blobs: Iterable over blob path, sha, mode entries :return: SHA1 of the created tree. """ trees = {b'': {}} def add_tree(path): if path in trees: return trees[path] dirname, basename = pathsplit(path) t = add_tree(dirname) assert isinstance(basename, bytes) newtree = {} t[basename] = newtree trees[path] = newtree return newtree for path, sha, mode in blobs: tree_path, basename = pathsplit(path) tree = add_tree(tree_path) tree[basename] = (mode, sha) def build_tree(path): tree = Tree() for basename, entry in trees[path].items(): if isinstance(entry, dict): mode = stat.S_IFDIR sha = build_tree(pathjoin(path, basename)) else: (mode, sha) = entry tree.add(basename, mode, sha) object_store.add_object(tree) return tree.id return build_tree(b'')
def walk_trees(store, tree1_id, tree2_id, prune_identical=False): """Recursively walk all the entries of two trees. Iteration is depth-first pre-order, as in e.g. os.walk. :param store: An ObjectStore for looking up objects. :param tree1_id: The SHA of the first Tree object to iterate, or None. :param tree2_id: The SHA of the second Tree object to iterate, or None. :param prune_identical: If True, identical subtrees will not be walked. :return: Iterator over Pairs of TreeEntry objects for each pair of entries in the trees and their subtrees recursively. If an entry exists in one tree but not the other, the other entry will have all attributes set to None. If neither entry's path is None, they are guaranteed to match. """ # This could be fairly easily generalized to >2 trees if we find a use # case. mode1 = tree1_id and stat.S_IFDIR or None mode2 = tree2_id and stat.S_IFDIR or None todo = [(TreeEntry(b'', mode1, tree1_id), TreeEntry(b'', mode2, tree2_id))] while todo: entry1, entry2 = todo.pop() is_tree1 = _is_tree(entry1) is_tree2 = _is_tree(entry2) if prune_identical and is_tree1 and is_tree2 and entry1 == entry2: continue tree1 = is_tree1 and store[entry1.sha] or None tree2 = is_tree2 and store[entry2.sha] or None path = entry1.path or entry2.path todo.extend(reversed(_merge_entries(path, tree1, tree2))) yield entry1, entry2