Python stat 模块,S_IFREG 实例源码

我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用stat.S_IFREG

项目:kit-ilias-fuse    作者:brantsch    | 项目源码 | 文件源码
def getattr(self, path, fh=None):
        try:
            node = self.__path_to_object(path)
            if node:
                is_file = (type(node) == File)
                st_mode_ft_bits = stat.S_IFREG if is_file else stat.S_IFDIR
                st_mode_permissions = 0o444 if is_file else 0o555
                return {
                    'st_mode': st_mode_ft_bits | st_mode_permissions,
                    'st_uid': self.uid,
                    'st_gid': self.gid,
                    'st_size': node.size if is_file else 0,
                    'st_ctime': node.time if is_file else 0,
                    'st_mtime': node.time if is_file else 0
                }
            else:
                raise FuseOSError(ENOENT)
        except IliasFSError as e:
            raise_ilias_error_to_fuse(e)
项目:differ    作者:MadRussian    | 项目源码 | 文件源码
def get_file_type(path):
  """Retrieve the file type of the path

  :param path: The path to get the file type for
  :return: The file type as a string or None on error
  """
  f_types = {
    'socket':           stat.S_IFSOCK,
    'regular':          stat.S_IFREG,
    'block':            stat.S_IFBLK,
    'directory':        stat.S_IFDIR,
    'character_device': stat.S_IFCHR,
    'fifo':             stat.S_IFIFO,
  }
  if not path or not os.path.exists(path):
    return None

  obj = os.stat(path).st_mode
  for key,val in f_types.items():
    if obj & val == val:
      return key
项目:dlna_live_streaming    作者:mfoetsch    | 项目源码 | 文件源码
def getattr(self, path):
        print "getattr", path
        st = MyStat()
        st.st_atime = int(time.time())
        st.st_mtime = st.st_atime
        st.st_ctime = st.st_atime
        pe = path.split(os.path.sep)[1:]
        if path == "/":         # root of the FUSE filesystem
            pass
        elif pe[-1] in ["a", "b", "c"]: # first level is directories
            pass
        elif len(pe) == 2 and pe[-1] == "fuse_live.mkv":
            st.st_mode = stat.S_IFREG | 0666
            st.st_nlink = 1
            st.st_size = 1024**3 #self.fileSize
        else:
            return -errno.ENOENT
        return st
项目:rvmi-rekall    作者:fireeye    | 项目源码 | 文件源码
def getattr(self, path):
        # The path represents a pid.
        components = os.path.split(path)
        if len(components) > 2:
            return

        if len(components) == 2 and components[1] in self.tasks:
            s = make_stat(components[1])
            s.st_mode = stat.S_IFREG
            s.st_size = self.address_space_size

            return s

        elif components[0] == "/":
            s = make_stat(2)
            s.st_mode = stat.S_IFDIR

            return s
项目:obnam    作者:obnam-mirror    | 项目源码 | 文件源码
def test_committing_remembers_file_removal(self):
        gen_id = self.create_generation()
        self.repo.add_file(gen_id, '/foo/bar')
        self.repo.set_file_key(
            gen_id, '/foo/bar', obnamlib.REPO_FILE_MODE, stat.S_IFREG)
        self.repo.commit_client('fooclient')
        self.repo.unlock_client('fooclient')

        self.repo.lock_client('fooclient')
        gen_id_2 = self.repo.create_generation('fooclient')
        self.assertTrue(self.repo.file_exists(gen_id_2, '/foo/bar'))
        self.repo.remove_file(gen_id_2, '/foo/bar')
        self.repo.commit_client('fooclient')
        self.repo.unlock_client('fooclient')

        self.assertTrue(self.repo.file_exists(gen_id, '/foo/bar'))
        self.assertFalse(self.repo.file_exists(gen_id_2, '/foo/bar'))
项目:obnam    作者:obnam-mirror    | 项目源码 | 文件源码
def test_unlocking_client_forgets_modified_file_chunk_ids(self):
        gen_id = self.create_generation()
        self.repo.add_file(gen_id, '/foo/bar')
        self.repo.set_file_key(
            gen_id, '/foo/bar', obnamlib.REPO_FILE_MODE, stat.S_IFREG)
        self.repo.append_file_chunk_id(gen_id, '/foo/bar', 1)
        self.repo.commit_client('fooclient')
        self.repo.unlock_client('fooclient')

        self.repo.lock_client('fooclient')
        gen_id_2 = self.repo.create_generation('fooclient')
        self.repo.append_file_chunk_id(gen_id_2, '/foo/bar', 2)
        self.assertEqual(
            self.repo.get_file_chunk_ids(gen_id_2, '/foo/bar'),
            [1, 2])

        self.repo.unlock_client('fooclient')
        self.assertEqual(
            self.repo.get_file_chunk_ids(gen_id, '/foo/bar'),
            [1])
项目:obnam    作者:obnam-mirror    | 项目源码 | 文件源码
def test_committing_child_remembers_modified_file_chunk_ids(self):
        gen_id = self.create_generation()
        self.repo.add_file(gen_id, '/foo/bar')
        self.repo.set_file_key(
            gen_id, '/foo/bar', obnamlib.REPO_FILE_MODE, stat.S_IFREG)
        self.repo.append_file_chunk_id(gen_id, '/foo/bar', 1)
        self.repo.commit_client('fooclient')
        self.repo.unlock_client('fooclient')

        self.repo.lock_client('fooclient')
        gen_id_2 = self.repo.create_generation('fooclient')
        self.repo.append_file_chunk_id(gen_id_2, '/foo/bar', 2)
        self.assertEqual(
            self.repo.get_file_chunk_ids(gen_id_2, '/foo/bar'),
            [1, 2])

        self.repo.commit_client('fooclient')
        self.repo.unlock_client('fooclient')
        self.assertEqual(
            self.repo.get_file_chunk_ids(gen_id, '/foo/bar'),
            [1])
        self.assertEqual(
            self.repo.get_file_chunk_ids(gen_id_2, '/foo/bar'),
            [1, 2])
项目:obnam    作者:obnam-mirror    | 项目源码 | 文件源码
def setUp(self):
        self.now = None
        self.tempdir = tempfile.mkdtemp()
        fs = obnamlib.LocalFS(self.tempdir)
        self.hooks = obnamlib.HookManager()
        self.hooks.new('repository-toplevel-init')
        self.client = obnamlib.ClientMetadataTree(
            fs, 'clientid', obnamlib.DEFAULT_NODE_SIZE,
            obnamlib.DEFAULT_UPLOAD_QUEUE_SIZE, obnamlib.DEFAULT_LRU_SIZE,
            self)
        # Force use of filename hash collisions.
        self.client.default_file_id = self.client._bad_default_file_id
        self.client.start_generation()
        self.clientid = self.client.get_generation_id(self.client.tree)
        self.file_metadata = obnamlib.Metadata(st_mode=stat.S_IFREG | 0666)
        self.file_encoded = obnamlib.fmt_6.metadata_codec.encode_metadata(
            self.file_metadata)
        self.dir_metadata = obnamlib.Metadata(st_mode=stat.S_IFDIR | 0777)
        self.dir_encoded = obnamlib.fmt_6.metadata_codec.encode_metadata(
            self.dir_metadata)
项目:starfuse    作者:Qix-    | 项目源码 | 文件源码
def make_file_struct(self, size, isfile=True, ctime=time(), mtime=time(), atime=time(), read_only=False):
        stats = dict()
        # TODO replace uncommented modes with commented when write ability is added
        if isfile:
            stats['st_mode'] = S_IFREG | 0o0444
        else:
            stats['st_mode'] = S_IFDIR | 0o0555

        if not self.pakfile.read_only:
            stats['st_mode'] |= 0o0200

        stats['st_uid'] = os.getuid()
        stats['st_gid'] = os.getgid()
        stats['st_nlink'] = 1
        stats['st_ctime'] = ctime
        stats['st_mtime'] = mtime
        stats['st_atime'] = atime
        stats['st_size'] = size
        return stats
项目:maas    作者:maas    | 项目源码 | 文件源码
def test_atomic_write_preserves_ownership_before_moving_into_place(self):
        atomic_file = self.make_file('atomic')

        self.patch(fs_module, 'isfile').return_value = True
        self.patch(fs_module, 'chown')
        self.patch(fs_module, 'rename')
        self.patch(fs_module, 'stat')

        ret_stat = fs_module.stat.return_value
        ret_stat.st_uid = sentinel.uid
        ret_stat.st_gid = sentinel.gid
        ret_stat.st_mode = stat.S_IFREG

        atomic_write(factory.make_bytes(), atomic_file)

        self.assertThat(fs_module.stat, MockCalledOnceWith(atomic_file))
        self.assertThat(fs_module.chown, MockCalledOnceWith(
            ANY, sentinel.uid, sentinel.gid))
项目:qgis_resources_sharing    作者:akbargumbira    | 项目源码 | 文件源码
def cleanup_mode(mode):
    """Cleanup a mode value.

    This will return a mode that can be stored in a tree object.

    :param mode: Mode to clean up.
    """
    if stat.S_ISLNK(mode):
        return stat.S_IFLNK
    elif stat.S_ISDIR(mode):
        return stat.S_IFDIR
    elif S_ISGITLINK(mode):
        return S_IFGITLINK
    ret = stat.S_IFREG | 0o644
    ret |= (mode & 0o111)
    return ret
项目:GDBFS    作者:USTC-OS-group33    | 项目源码 | 文件源码
def getattr(self, path, fh=None):
        if path == '/' or GDBFS.is_dir(path):
            st = dict(st_mode=(S_IFDIR | 0755), st_nlink=2)
        else:
            # file = GDBFS.get_file(path)
            if len(GDBFS.Ndb.get_node_label(GDBFS.get_node_name(path)))!=0:
                filesize=0
                try:
                    filesize=GDBFS.get_file_length(path)
                except:
                    filesize=0

                st = dict(st_mode=(S_IFREG | 0644), st_size=filesize)
            else:
                raise FuseOSError(ENOENT)
        st['st_ctime'] = st['st_mtime'] = st['st_atime'] = time()
        st['st_uid'], st['st_gid'], pid = fuse_get_context()
        print "\n\nin getattr, path =",path,"\nst =", st
        return st
项目:lib9    作者:Jumpscale    | 项目源码 | 文件源码
def addFile(self, parent_path, name, size=0, mode="644"):
        _, parentObj = self._search_db(parent_path)
        if parentObj.dbobj.state != "":
            raise RuntimeError("%s: No such file or directory" % parent_path)

        aciKey = self._initialize_aci(mode, stat.S_IFREG)
        fileDict = {
            "aclkey": aciKey,
            "blocksize": size,
            "creationTime": calendar.timegm(time.gmtime()),
            "modificationTime": calendar.timegm(time.gmtime()),
            "name": name,
            "size": size,
        }
        self._addObj(parentObj, fileDict, "F")
        parentObj.save()
项目:lib9    作者:Jumpscale    | 项目源码 | 文件源码
def _initialize_aci(self, mode, fileType):
        valid_types = [
            stat.S_IFREG,
            stat.S_IFDIR,
            stat.S_IFCHR,
            stat.S_IFBLK,
            stat.S_IFIFO,
            stat.S_IFLNK,
            stat.S_IFSOCK]
        if fileType not in valid_types:
            raise RuntimeError("Invalid file type.")

        aci = self.aciCollection.new()
        uid = os.getuid()
        aci.dbobj.id = uid
        aci.dbobj.uname = pwd.getpwuid(uid).pw_name
        aci.dbobj.gname = grp.getgrgid(os.getgid()).gr_name
        aci.dbobj.mode = int(mode, 8) + fileType
        aci.save()
        return aci.key
项目:tm-librarian    作者:FabricAttachedMemory    | 项目源码 | 文件源码
def create_shelf(self, shelf):
        """ Create one new shelf in the database.
            Input---
              shelf_data - list of shelf data to insert
            Output---
              shelf_data or error message
        """
        shelf.id = None     # DB engine will autochoose next id
        if not shelf.mode:
            shelf.mode = stat.S_IFREG + 0o666
        tmp = int(time.time())
        shelf.ctime = shelf.mtime = tmp
        if shelf.parent_id == 0:
            shelf.parent_id = 2  # is now a default for if it given to go in root
        shelf.id = self._cur.INSERT('shelves', shelf.tuple())
        return shelf
项目:kinect-2-libras    作者:inessadl    | 项目源码 | 文件源码
def cmp(f1, f2, shallow=1):
    """Compare two files.

    Arguments:

    f1 -- First file name

    f2 -- Second file name

    shallow -- Just check stat signature (do not read the files).
               defaults to 1.

    Return value:

    True if the files are the same, False otherwise.

    This function uses a cache for past comparisons and the results,
    with a cache invalidation mechanism relying on stale signatures.

    """

    s1 = _sig(os.stat(f1))
    s2 = _sig(os.stat(f2))
    if s1[0] != stat.S_IFREG or s2[0] != stat.S_IFREG:
        return False
    if shallow and s1 == s2:
        return True
    if s1[1] != s2[1]:
        return False

    result = _cache.get((f1, f2))
    if result and (s1, s2) == result[:2]:
        return result[2]
    outcome = _do_cmp(f1, f2)
    _cache[f1, f2] = s1, s2, outcome
    return outcome
项目:hostapd-mana    作者:adde88    | 项目源码 | 文件源码
def cmp(f1, f2, shallow=1):
    """Compare two files.

    Arguments:

    f1 -- First file name

    f2 -- Second file name

    shallow -- Just check stat signature (do not read the files).
               defaults to 1.

    Return value:

    True if the files are the same, False otherwise.

    This function uses a cache for past comparisons and the results,
    with a cache invalidation mechanism relying on stale signatures.

    """

    s1 = _sig(os.stat(f1))
    s2 = _sig(os.stat(f2))
    if s1[0] != stat.S_IFREG or s2[0] != stat.S_IFREG:
        return False
    if shallow and s1 == s2:
        return True
    if s1[1] != s2[1]:
        return False

    outcome = _cache.get((f1, f2, s1, s2))
    if outcome is None:
        outcome = _do_cmp(f1, f2)
        if len(_cache) > 100:      # limit the maximum size of the cache
            _cache.clear()
        _cache[f1, f2, s1, s2] = outcome
    return outcome
项目:weibo    作者:windskyer    | 项目源码 | 文件源码
def _setup_logging_from_conf(product_name):
    log_root = getLogger(None).logger
    for handler in log_root.handlers:
        log_root.removeHandler(handler)

    if CONF.use_syslog:
        facility = _find_facility_from_conf()
        syslog = logging.handlers.SysLogHandler(address='/dev/log',
                                                facility=facility)
        log_root.addHandler(syslog)

    logpath = _get_log_file_path()
    if logpath:
        filelog = logging.handlers.WatchedFileHandler(logpath)
        log_root.addHandler(filelog)

        mode = int('0644', 8)
        st = os.stat(logpath)
        if st.st_mode != (stat.S_IFREG | mode):
            os.chmod(logpath, mode)

    if CONF.use_stderr:
        streamlog = ColorHandler()
        log_root.addHandler(streamlog)
    elif not CONF.log_file:
        # pass sys.stdout as a positional argument
        # python2.6 calls the argument strm, in 2.7 it's stream
        streamlog = logging.StreamHandler(sys.stdout)
        log_root.addHandler(streamlog)

    for handler in log_root.handlers:
        datefmt = CONF.log_date_format
        if CONF.log_format:
            handler.setFormatter(logging.Formatter(fmt=CONF.log_format,
                                                   datefmt=datefmt))
        handler.setFormatter(LegacyFormatter(datefmt=datefmt))

    if CONF.verbose or CONF.debug:
        log_root.setLevel(logging.DEBUG)
    else:
        log_root.setLevel(logging.INFO)
项目:blacklib    作者:halfss    | 项目源码 | 文件源码
def _setup_logging_from_flags():
    ops_root = getLogger().logger
    for handler in ops_root.handlers:
        ops_root.removeHandler(handler)

    logpath = _get_log_file_path()
    if logpath:
        filelog = logging.handlers.WatchedFileHandler(logpath)
        ops_root.addHandler(filelog)

        mode = int(options.logfile_mode, 8)
        st = os.stat(logpath)
        if st.st_mode != (stat.S_IFREG | mode):
            os.chmod(logpath, mode)

    for handler in ops_root.handlers:
        handler.setFormatter(logging.Formatter(fmt=options.log_format,
            datefmt=options.log_date_format))

    if options.verbose or options.debug:
        ops_root.setLevel(logging.DEBUG)
    else:
        ops_root.setLevel(logging.INFO)

    root = logging.getLogger()
    for handler in root.handlers:
        root.removeHandler(handler)
    handler = NullHandler()
    handler.setFormatter(logging.Formatter())
    root.addHandler(handler)
项目:blobfs-poc    作者:eddieantonio    | 项目源码 | 文件源码
def stat(self) -> Stat:
        # Return a regular file with read permissions to everybody and ZERO
        # length!
        now = time.time()
        return dict(st_mode=(S_IFREG | 0o644),
                    st_nlink=1,
                    st_ctime=now,
                    st_mtime=now,
                    st_attime=now)
项目:zippy    作者:securesystemslab    | 项目源码 | 文件源码
def cmp(f1, f2, shallow=True):
    """Compare two files.

    Arguments:

    f1 -- First file name

    f2 -- Second file name

    shallow -- Just check stat signature (do not read the files).
               defaults to 1.

    Return value:

    True if the files are the same, False otherwise.

    This function uses a cache for past comparisons and the results,
    with a cache invalidation mechanism relying on stale signatures.

    """

    s1 = _sig(os.stat(f1))
    s2 = _sig(os.stat(f2))
    if s1[0] != stat.S_IFREG or s2[0] != stat.S_IFREG:
        return False
    if shallow and s1 == s2:
        return True
    if s1[1] != s2[1]:
        return False

    outcome = _cache.get((f1, f2, s1, s2))
    if outcome is None:
        outcome = _do_cmp(f1, f2)
        if len(_cache) > 100:      # limit the maximum size of the cache
            _cache.clear()
        _cache[f1, f2, s1, s2] = outcome
    return outcome
项目:oil    作者:oilshell    | 项目源码 | 文件源码
def test_mode(self):
        with open(TESTFN, 'w'):
            pass
        if os.name == 'posix':
            os.chmod(TESTFN, 0o700)
            st_mode = self.get_mode()
            self.assertS_IS("REG", st_mode)
            self.assertEqual(stat.S_IMODE(st_mode),
                             stat.S_IRWXU)

            os.chmod(TESTFN, 0o070)
            st_mode = self.get_mode()
            self.assertS_IS("REG", st_mode)
            self.assertEqual(stat.S_IMODE(st_mode),
                             stat.S_IRWXG)

            os.chmod(TESTFN, 0o007)
            st_mode = self.get_mode()
            self.assertS_IS("REG", st_mode)
            self.assertEqual(stat.S_IMODE(st_mode),
                             stat.S_IRWXO)

            os.chmod(TESTFN, 0o444)
            st_mode = self.get_mode()
            self.assertS_IS("REG", st_mode)
            self.assertEqual(stat.S_IMODE(st_mode), 0o444)
        else:
            os.chmod(TESTFN, 0o700)
            st_mode = self.get_mode()
            self.assertS_IS("REG", st_mode)
            self.assertEqual(stat.S_IFMT(st_mode),
                             stat.S_IFREG)
项目:oil    作者:oilshell    | 项目源码 | 文件源码
def cmp(f1, f2, shallow=1):
    """Compare two files.

    Arguments:

    f1 -- First file name

    f2 -- Second file name

    shallow -- Just check stat signature (do not read the files).
               defaults to 1.

    Return value:

    True if the files are the same, False otherwise.

    This function uses a cache for past comparisons and the results,
    with a cache invalidation mechanism relying on stale signatures.

    """

    s1 = _sig(os.stat(f1))
    s2 = _sig(os.stat(f2))
    if s1[0] != stat.S_IFREG or s2[0] != stat.S_IFREG:
        return False
    if shallow and s1 == s2:
        return True
    if s1[1] != s2[1]:
        return False

    outcome = _cache.get((f1, f2, s1, s2))
    if outcome is None:
        outcome = _do_cmp(f1, f2)
        if len(_cache) > 100:      # limit the maximum size of the cache
            _cache.clear()
        _cache[f1, f2, s1, s2] = outcome
    return outcome
项目:python2-tracer    作者:extremecoders-re    | 项目源码 | 文件源码
def test_mode(self):
        with open(TESTFN, 'w'):
            pass
        if os.name == 'posix':
            os.chmod(TESTFN, 0o700)
            st_mode = self.get_mode()
            self.assertS_IS("REG", st_mode)
            self.assertEqual(stat.S_IMODE(st_mode),
                             stat.S_IRWXU)

            os.chmod(TESTFN, 0o070)
            st_mode = self.get_mode()
            self.assertS_IS("REG", st_mode)
            self.assertEqual(stat.S_IMODE(st_mode),
                             stat.S_IRWXG)

            os.chmod(TESTFN, 0o007)
            st_mode = self.get_mode()
            self.assertS_IS("REG", st_mode)
            self.assertEqual(stat.S_IMODE(st_mode),
                             stat.S_IRWXO)

            os.chmod(TESTFN, 0o444)
            st_mode = self.get_mode()
            self.assertS_IS("REG", st_mode)
            self.assertEqual(stat.S_IMODE(st_mode), 0o444)
        else:
            os.chmod(TESTFN, 0o700)
            st_mode = self.get_mode()
            self.assertS_IS("REG", st_mode)
            self.assertEqual(stat.S_IFMT(st_mode),
                             stat.S_IFREG)
项目:python2-tracer    作者:extremecoders-re    | 项目源码 | 文件源码
def cmp(f1, f2, shallow=1):
    """Compare two files.

    Arguments:

    f1 -- First file name

    f2 -- Second file name

    shallow -- Just check stat signature (do not read the files).
               defaults to 1.

    Return value:

    True if the files are the same, False otherwise.

    This function uses a cache for past comparisons and the results,
    with a cache invalidation mechanism relying on stale signatures.

    """

    s1 = _sig(os.stat(f1))
    s2 = _sig(os.stat(f2))
    if s1[0] != stat.S_IFREG or s2[0] != stat.S_IFREG:
        return False
    if shallow and s1 == s2:
        return True
    if s1[1] != s2[1]:
        return False

    outcome = _cache.get((f1, f2, s1, s2))
    if outcome is None:
        outcome = _do_cmp(f1, f2)
        if len(_cache) > 100:      # limit the maximum size of the cache
            _cache.clear()
        _cache[f1, f2, s1, s2] = outcome
    return outcome
项目:sslstrip-hsts-openwrt    作者:adde88    | 项目源码 | 文件源码
def cmp(f1, f2, shallow=1):
    """Compare two files.

    Arguments:

    f1 -- First file name

    f2 -- Second file name

    shallow -- Just check stat signature (do not read the files).
               defaults to 1.

    Return value:

    True if the files are the same, False otherwise.

    This function uses a cache for past comparisons and the results,
    with a cache invalidation mechanism relying on stale signatures.

    """

    s1 = _sig(os.stat(f1))
    s2 = _sig(os.stat(f2))
    if s1[0] != stat.S_IFREG or s2[0] != stat.S_IFREG:
        return False
    if shallow and s1 == s2:
        return True
    if s1[1] != s2[1]:
        return False

    outcome = _cache.get((f1, f2, s1, s2))
    if outcome is None:
        outcome = _do_cmp(f1, f2)
        if len(_cache) > 100:      # limit the maximum size of the cache
            _cache.clear()
        _cache[f1, f2, s1, s2] = outcome
    return outcome
项目:web_ctp    作者:molebot    | 项目源码 | 文件源码
def test_mode(self):
        with open(TESTFN, 'w'):
            pass
        if os.name == 'posix':
            os.chmod(TESTFN, 0o700)
            st_mode, modestr = self.get_mode()
            self.assertEqual(modestr, '-rwx------')
            self.assertS_IS("REG", st_mode)
            self.assertEqual(stat.S_IMODE(st_mode),
                             stat.S_IRWXU)

            os.chmod(TESTFN, 0o070)
            st_mode, modestr = self.get_mode()
            self.assertEqual(modestr, '----rwx---')
            self.assertS_IS("REG", st_mode)
            self.assertEqual(stat.S_IMODE(st_mode),
                             stat.S_IRWXG)

            os.chmod(TESTFN, 0o007)
            st_mode, modestr = self.get_mode()
            self.assertEqual(modestr, '-------rwx')
            self.assertS_IS("REG", st_mode)
            self.assertEqual(stat.S_IMODE(st_mode),
                             stat.S_IRWXO)

            os.chmod(TESTFN, 0o444)
            st_mode, modestr = self.get_mode()
            self.assertS_IS("REG", st_mode)
            self.assertEqual(modestr, '-r--r--r--')
            self.assertEqual(stat.S_IMODE(st_mode), 0o444)
        else:
            os.chmod(TESTFN, 0o700)
            st_mode, modestr = self.get_mode()
            self.assertEqual(modestr[:3], '-rw')
            self.assertS_IS("REG", st_mode)
            self.assertEqual(stat.S_IFMT(st_mode),
                             stat.S_IFREG)
项目:siphon-cli    作者:getsiphon    | 项目源码 | 文件源码
def _to_mode(attr):
    m = 0
    if (attr & FILE_ATTRIBUTE_DIRECTORY):
        m |= stdstat.S_IFDIR | 0o111
    else:
        m |= stdstat.S_IFREG
    if (attr & FILE_ATTRIBUTE_READONLY):
        m |= 0o444
    else:
        m |= 0o666
    return m
项目:lshapy    作者:kuking    | 项目源码 | 文件源码
def dbpcs(mode):
    if mode & stat.S_IFLNK == stat.S_IFLNK: return 's'
    if mode & stat.S_IFSOCK == stat.S_IFSOCK: return 's'
    if mode & stat.S_IFREG == stat.S_IFREG: return '-'
    if mode & stat.S_IFBLK == stat.S_IFBLK: return 'b'
    if mode & stat.S_IFDIR == stat.S_IFDIR: return 'd'
    if mode & stat.S_IFIFO == stat.S_IFIFO: return 'p'
    if mode & stat.S_IFCHR == stat.S_IFCHR: return 'c'
    return '?'
项目:Marty    作者:NaPs    | 项目源码 | 文件源码
def getattr(self, inode, ctx=None):
        attrs = self.inodes.get(inode)
        if attrs is None:
            raise llfuse.FUSEError(errno.ENOENT)  # FIXME

        if attrs.get('type') == 'tree':
            mode_filetype = stat.S_IFDIR
        elif attrs.get('type') == 'blob':
            mode_filetype = stat.S_IFREG
        elif attrs.get('filetype') == 'link':
            mode_filetype = stat.S_IFLNK
        elif attrs.get('filetype') == 'fifo':
            mode_filetype = stat.S_IFIFO
        else:
            raise llfuse.FUSEError(errno.ENOENT)  # FIXME

        entry = llfuse.EntryAttributes()
        entry.st_mode = mode_filetype | attrs.get('mode', MartyFSHandler.DEFAULT_MODE)

        if attrs.get('type') == 'blob' and 'ref' in attrs:
            entry.st_size = self.storage.size(attrs['ref'])
        else:
            entry.st_size = 0

        stamp = int(1438467123.985654 * 1e9)
        entry.st_atime_ns = stamp
        entry.st_ctime_ns = stamp
        entry.st_mtime_ns = stamp
        entry.st_gid = 0
        entry.st_uid = 0
        entry.st_ino = inode

        return entry
项目:dashbase-tools    作者:dashbase    | 项目源码 | 文件源码
def __init__(self, is_dir=False, is_file=False, size=0, ctime=time(), mtime=time(), atime=time()):
        self.is_dir = is_dir
        self.is_file = is_file
        if self.is_dir:
            self.attr = dict(st_mode=(S_IFDIR | 0o755), st_nlink=2)
        if self.is_file:
            self.attr = dict(st_mode=(S_IFREG | 0o755), st_nlink=1, st_size=size,
                             st_ctime=ctime, st_mtime=mtime, st_atime=atime)
        self.attr["attrs"] = {}
项目:tfhfs    作者:fingon    | 项目源码 | 文件源码
def test_mknod_c(oc):
    a = oc.ops.mknod(llfuse.ROOT_INODE, b'cdev',
                     stat.S_IFCHR, 42, oc.rctx_user)
    assert a.st_rdev == 42
    oc.ops.forget1(a.st_ino)
    a = oc.ops.mknod(llfuse.ROOT_INODE, b'bdev',
                     stat.S_IFBLK, 43, oc.rctx_user)
    assert a.st_rdev == 43
    oc.ops.forget1(a.st_ino)
    a = oc.ops.mknod(llfuse.ROOT_INODE, b'regfile',
                     stat.S_IFREG, 44, oc.rctx_user)
    assert not a.st_rdev
    oc.ops.forget1(a.st_ino)
项目:tfhfs    作者:fingon    | 项目源码 | 文件源码
def create_file(self, dir_inode, name, *, mode=0):
        if not stat.S_IFMT(mode):
            mode |= stat.S_IFREG
        dir_inode.change_times()
        _debug('create_file %s 0x%x', name, mode)
        return self._create(mode, dir_inode, name)
项目:pefile.pypy    作者:cloudtracer    | 项目源码 | 文件源码
def test_mode(self):
        with open(TESTFN, 'w'):
            pass
        if os.name == 'posix':
            os.chmod(TESTFN, 0o700)
            st_mode = self.get_mode()
            self.assertS_IS("REG", st_mode)
            self.assertEqual(stat.S_IMODE(st_mode),
                             stat.S_IRWXU)

            os.chmod(TESTFN, 0o070)
            st_mode = self.get_mode()
            self.assertS_IS("REG", st_mode)
            self.assertEqual(stat.S_IMODE(st_mode),
                             stat.S_IRWXG)

            os.chmod(TESTFN, 0o007)
            st_mode = self.get_mode()
            self.assertS_IS("REG", st_mode)
            self.assertEqual(stat.S_IMODE(st_mode),
                             stat.S_IRWXO)

            os.chmod(TESTFN, 0o444)
            st_mode = self.get_mode()
            self.assertS_IS("REG", st_mode)
            self.assertEqual(stat.S_IMODE(st_mode), 0o444)
        else:
            os.chmod(TESTFN, 0o700)
            st_mode = self.get_mode()
            self.assertS_IS("REG", st_mode)
            self.assertEqual(stat.S_IFMT(st_mode),
                             stat.S_IFREG)
项目:pefile.pypy    作者:cloudtracer    | 项目源码 | 文件源码
def cmp(f1, f2, shallow=1):
    """Compare two files.

    Arguments:

    f1 -- First file name

    f2 -- Second file name

    shallow -- Just check stat signature (do not read the files).
               defaults to 1.

    Return value:

    True if the files are the same, False otherwise.

    This function uses a cache for past comparisons and the results,
    with a cache invalidation mechanism relying on stale signatures.

    """

    s1 = _sig(os.stat(f1))
    s2 = _sig(os.stat(f2))
    if s1[0] != stat.S_IFREG or s2[0] != stat.S_IFREG:
        return False
    if shallow and s1 == s2:
        return True
    if s1[1] != s2[1]:
        return False

    outcome = _cache.get((f1, f2, s1, s2))
    if outcome is None:
        outcome = _do_cmp(f1, f2)
        if len(_cache) > 100:      # limit the maximum size of the cache
            _cache.clear()
        _cache[f1, f2, s1, s2] = outcome
    return outcome
项目:ouroboros    作者:pybee    | 项目源码 | 文件源码
def cmp(f1, f2, shallow=True):
    """Compare two files.

    Arguments:

    f1 -- First file name

    f2 -- Second file name

    shallow -- Just check stat signature (do not read the files).
               defaults to True.

    Return value:

    True if the files are the same, False otherwise.

    This function uses a cache for past comparisons and the results,
    with cache entries invalidated if their stat information
    changes.  The cache may be cleared by calling clear_cache().

    """

    s1 = _sig(os.stat(f1))
    s2 = _sig(os.stat(f2))
    if s1[0] != stat.S_IFREG or s2[0] != stat.S_IFREG:
        return False
    if shallow and s1 == s2:
        return True
    if s1[1] != s2[1]:
        return False

    outcome = _cache.get((f1, f2, s1, s2))
    if outcome is None:
        outcome = _do_cmp(f1, f2)
        if len(_cache) > 100:      # limit the maximum size of the cache
            clear_cache()
        _cache[f1, f2, s1, s2] = outcome
    return outcome
项目:fuse-3ds    作者:ihaveamac    | 项目源码 | 文件源码
def getattr(self, path, fh=None):
        first_dir = common.get_first_dir(path)
        if first_dir in self.dirs:
            return self.dirs[first_dir].getattr(common.remove_first_dir(path), fh)
        uid, gid, pid = fuse_get_context()
        if path == '/' or path in self.dirs:
            st = {'st_mode': (stat.S_IFDIR | 0o555), 'st_nlink': 2}
        elif path.lower() in self.files:
            st = {'st_mode': (stat.S_IFREG | 0o444), 'st_size': self.files[path.lower()]['size'], 'st_nlink': 1}
        else:
            raise FuseOSError(errno.ENOENT)
        return {**st, **self.g_stat, 'st_uid': uid, 'st_gid': gid}
项目:fuse-3ds    作者:ihaveamac    | 项目源码 | 文件源码
def getattr(self, path, fh=None):
        first_dir = common.get_first_dir(path)
        if first_dir in self.dirs:
            return self.dirs[first_dir].getattr(common.remove_first_dir(path), fh)
        uid, gid, pid = fuse_get_context()
        if path == '/':
            st = {'st_mode': (stat.S_IFDIR | 0o555), 'st_nlink': 2}
        elif path.lower() in self.files:
            st = {'st_mode': (stat.S_IFREG | 0o444), 'st_size': self.files[path.lower()]['size'], 'st_nlink': 1}
        else:
            raise FuseOSError(errno.ENOENT)
        return {**st, **self.g_stat, 'st_uid': uid, 'st_gid': gid}
项目:fuse-3ds    作者:ihaveamac    | 项目源码 | 文件源码
def getattr(self, path, fh=None):
        uid, gid, pid = fuse_get_context()
        if path == '/':
            st = {'st_mode': (stat.S_IFDIR | (0o555 if self.readonly else 0o777)), 'st_nlink': 2}
        elif path.lower() in self.files:
            st = {'st_mode': (stat.S_IFREG | (0o444 if (self.readonly or path.lower() == '/_nandinfo.txt') else 0o666)),
                  'st_size': self.files[path.lower()]['size'], 'st_nlink': 1}
        else:
            raise FuseOSError(errno.ENOENT)
        return {**st, **self.g_stat, 'st_uid': uid, 'st_gid': gid}
项目:fuse-3ds    作者:ihaveamac    | 项目源码 | 文件源码
def getattr(self, path, fh=None):
        first_dir = common.get_first_dir(path)
        if first_dir in self.dirs:
            return self.dirs[first_dir].getattr(common.remove_first_dir(path), fh)
        uid, gid, pid = fuse_get_context()
        if path == '/' or path in self.dirs:
            st = {'st_mode': (stat.S_IFDIR | 0o555), 'st_nlink': 2}
        elif path.lower() in self.files:
            st = {'st_mode': (stat.S_IFREG | 0o444), 'st_size': self.files[path.lower()]['size'], 'st_nlink': 1}
        else:
            raise FuseOSError(errno.ENOENT)
        return {**st, **self.g_stat, 'st_uid': uid, 'st_gid': gid}
项目:fuse-3ds    作者:ihaveamac    | 项目源码 | 文件源码
def getattr(self, path, fh=None):
        uid, gid, pid = fuse_get_context()
        try:
            item = self.romfs_reader.get_info_from_path(path)
        except romfs.RomFSFileNotFoundException:
            raise FuseOSError(errno.ENOENT)
        if item.type == 'dir':
            st = {'st_mode': (stat.S_IFDIR | 0o555), 'st_nlink': 2}
        elif item.type == 'file':
            st = {'st_mode': (stat.S_IFREG | 0o444), 'st_size': item.size, 'st_nlink': 1}
        else:
            # this won't happen unless I fucked up
            raise FuseOSError(errno.ENOENT)
        return {**st, **self.g_stat, 'st_uid': uid, 'st_gid': gid}
项目:ndk-python    作者:gittor    | 项目源码 | 文件源码
def cmp(f1, f2, shallow=1):
    """Compare two files.

    Arguments:

    f1 -- First file name

    f2 -- Second file name

    shallow -- Just check stat signature (do not read the files).
               defaults to 1.

    Return value:

    True if the files are the same, False otherwise.

    This function uses a cache for past comparisons and the results,
    with a cache invalidation mechanism relying on stale signatures.

    """

    s1 = _sig(os.stat(f1))
    s2 = _sig(os.stat(f2))
    if s1[0] != stat.S_IFREG or s2[0] != stat.S_IFREG:
        return False
    if shallow and s1 == s2:
        return True
    if s1[1] != s2[1]:
        return False

    outcome = _cache.get((f1, f2, s1, s2))
    if outcome is None:
        outcome = _do_cmp(f1, f2)
        if len(_cache) > 100:      # limit the maximum size of the cache
            _cache.clear()
        _cache[f1, f2, s1, s2] = outcome
    return outcome
项目:empyrion-python-api    作者:huhlig    | 项目源码 | 文件源码
def cmp(f1, f2, shallow=1):
    """Compare two files.

    Arguments:

    f1 -- First file name

    f2 -- Second file name

    shallow -- Just check stat signature (do not read the files).
               defaults to 1.

    Return value:

    True if the files are the same, False otherwise.

    This function uses a cache for past comparisons and the results,
    with a cache invalidation mechanism relying on stale signatures.

    """

    s1 = _sig(os.stat(f1))
    s2 = _sig(os.stat(f2))
    if s1[0] != stat.S_IFREG or s2[0] != stat.S_IFREG:
        return False
    if shallow and s1 == s2:
        return True
    if s1[1] != s2[1]:
        return False

    outcome = _cache.get((f1, f2, s1, s2))
    if outcome is None:
        outcome = _do_cmp(f1, f2)
        if len(_cache) > 100:      # limit the maximum size of the cache
            _cache.clear()
        _cache[f1, f2, s1, s2] = outcome
    return outcome
项目:CodeGra.fs    作者:CodeGra-de    | 项目源码 | 文件源码
def getattr(self):
        return {
            'st_size': len(self.get_data()),
            'st_atime': self.get_st_atime(),
            'st_mtime': self.get_st_mtime(),
            'st_ctime': self.get_st_ctime(),
            'st_uid': os.getuid(),
            'st_gid': os.getegid(),
            'st_mode': S_IFREG | self.mode,
            'st_nlink': 1,
        }
项目:CodeGra.fs    作者:CodeGra-de    | 项目源码 | 文件源码
def stat(self):
        st = os.lstat(self.full_path)
        stat = {
            key: getattr(st, key)
            for key in (
                'st_atime', 'st_ctime', 'st_gid', 'st_mtime', 'st_nlink',
                'st_size', 'st_uid'
            )
        }
        stat['st_mode'] = S_IFREG | 0o770
        return stat
项目:CodeGra.fs    作者:CodeGra-de    | 项目源码 | 文件源码
def getattr(self, submission=None, path=None):
        if self.stat is None:
            super(File, self).getattr(submission, path)
            self.stat['st_mode'] = S_IFREG | 0o770
            self.stat['st_nlink'] = 1

        if self.stat['st_size'] is None:
            self.stat['st_size'] = len(self.data)
        return self.stat
项目:kbe_server    作者:xiaohaoppy    | 项目源码 | 文件源码
def cmp(f1, f2, shallow=True):
    """Compare two files.

    Arguments:

    f1 -- First file name

    f2 -- Second file name

    shallow -- Just check stat signature (do not read the files).
               defaults to True.

    Return value:

    True if the files are the same, False otherwise.

    This function uses a cache for past comparisons and the results,
    with cache entries invalidated if their stat information
    changes.  The cache may be cleared by calling clear_cache().

    """

    s1 = _sig(os.stat(f1))
    s2 = _sig(os.stat(f2))
    if s1[0] != stat.S_IFREG or s2[0] != stat.S_IFREG:
        return False
    if shallow and s1 == s2:
        return True
    if s1[1] != s2[1]:
        return False

    outcome = _cache.get((f1, f2, s1, s2))
    if outcome is None:
        outcome = _do_cmp(f1, f2)
        if len(_cache) > 100:      # limit the maximum size of the cache
            clear_cache()
        _cache[f1, f2, s1, s2] = outcome
    return outcome
项目:hacker-scripts    作者:restran    | 项目源码 | 文件源码
def _to_mode(attr):
    m = 0
    if (attr & FILE_ATTRIBUTE_DIRECTORY):
        m |= stdstat.S_IFDIR | 0o111
    else:
        m |= stdstat.S_IFREG
    if (attr & FILE_ATTRIBUTE_READONLY):
        m |= 0o444
    else:
        m |= 0o666
    return m
项目:rvmi-rekall    作者:fireeye    | 项目源码 | 文件源码
def _read_value(self, path_components):
        self.key_name = "\\".join(path_components[1:-1])
        self.value_name = path_components[-1]
        with OpenKey(self._hive_handle, self.key_name) as key:
            # We are a value - we can be read but we can not be listed.
            self.value, value_type = QueryValueEx(key, self.value_name)
            self.st_mode = stat.S_IFREG
            self.value_type = self.registry_map[value_type]
            self.st_size = len(utils.SmartStr(self.value))
项目:rvmi-rekall    作者:fireeye    | 项目源码 | 文件源码
def readdir(self, path, offset):
        if path == "/":
            for pid in self.tasks:
                result = fuse.Direntry(str(pid))
                result.type = stat.S_IFREG
                result.st_size = self.address_space_size

                yield result