我们从Python开源项目中,提取了以下26个代码示例,用于说明如何使用stat.S_IFIFO。
def setUp(self): self.loop = self.new_test_loop() self.protocol = test_utils.make_test_protocol(asyncio.Protocol) self.pipe = mock.Mock(spec_set=io.RawIOBase) self.pipe.fileno.return_value = 5 blocking_patcher = mock.patch('asyncio.unix_events._set_nonblocking') blocking_patcher.start() self.addCleanup(blocking_patcher.stop) fstat_patcher = mock.patch('os.fstat') m_fstat = fstat_patcher.start() st = mock.Mock() st.st_mode = stat.S_IFIFO m_fstat.return_value = st self.addCleanup(fstat_patcher.stop)
def _mkfifo(self, fifo_path): try: if os.path.exists(fifo_path): # self._run_shell_cmd("rm %s " % fifo_path, wait=True) os.remove(fifo_path) os.mknod(fifo_path, 0o666 | stat.S_IFIFO) except OSError as err: if err.errno == errno.EEXIST: # if already exists, skip this step pass # print "Fifo file already exists, skipping..." elif err.errno == errno.EPERM: # not permitted, try shell command # print "Not permitted to create fifo file, try to switch to # root..." retcode = self._run_shell_cmd( "mknod %s p" % fifo_path, wait=True) if retcode != 0: raise RuntimeError("mknod returns %s" % str(retcode)) else: raise err
def get_file_type(path): """Retrieve the file type of the path :param path: The path to get the file type for :return: The file type as a string or None on error """ f_types = { 'socket': stat.S_IFSOCK, 'regular': stat.S_IFREG, 'block': stat.S_IFBLK, 'directory': stat.S_IFDIR, 'character_device': stat.S_IFCHR, 'fifo': stat.S_IFIFO, } if not path or not os.path.exists(path): return None obj = os.stat(path).st_mode for key,val in f_types.items(): if obj & val == val: return key
def test_mknod_dir_fd(self): # Test using mknodat() to create a FIFO (the only use specified # by POSIX). support.unlink(support.TESTFN) mode = stat.S_IFIFO | stat.S_IRUSR | stat.S_IWUSR f = posix.open(posix.getcwd(), posix.O_RDONLY) try: posix.mknod(support.TESTFN, mode, 0, dir_fd=f) except OSError as e: # Some old systems don't allow unprivileged users to use # mknod(), or only support creating device nodes. self.assertIn(e.errno, (errno.EPERM, errno.EINVAL)) else: self.assertTrue(stat.S_ISFIFO(posix.stat(support.TESTFN).st_mode)) finally: posix.close(f)
def test_mknod(self): # Test using mknod() to create a FIFO (the only use specified # by POSIX). support.unlink(support.TESTFN) mode = stat.S_IFIFO | stat.S_IRUSR | stat.S_IWUSR try: posix.mknod(support.TESTFN, mode, 0) except OSError as e: # Some old systems don't allow unprivileged users to use # mknod(), or only support creating device nodes. self.assertIn(e.errno, (errno.EPERM, errno.EINVAL)) else: self.assertTrue(stat.S_ISFIFO(posix.stat(support.TESTFN).st_mode)) # Keyword arguments are also supported support.unlink(support.TESTFN) try: posix.mknod(path=support.TESTFN, mode=mode, device=0, dir_fd=None) except OSError as e: self.assertIn(e.errno, (errno.EPERM, errno.EINVAL))
def _initialize_aci(self, mode, fileType): valid_types = [ stat.S_IFREG, stat.S_IFDIR, stat.S_IFCHR, stat.S_IFBLK, stat.S_IFIFO, stat.S_IFLNK, stat.S_IFSOCK] if fileType not in valid_types: raise RuntimeError("Invalid file type.") aci = self.aciCollection.new() uid = os.getuid() aci.dbobj.id = uid aci.dbobj.uname = pwd.getpwuid(uid).pw_name aci.dbobj.gname = grp.getgrgid(os.getgid()).gr_name aci.dbobj.mode = int(mode, 8) + fileType aci.save() return aci.key
def test_mknod(self): # Test using mknod() to create a FIFO (the only use specified # by POSIX). support.unlink(support.TESTFN) mode = stat.S_IFIFO | stat.S_IRUSR | stat.S_IWUSR try: posix.mknod(support.TESTFN, mode, 0) except OSError as e: # Some old systems don't allow unprivileged users to use # mknod(), or only support creating device nodes. self.assertIn(e.errno, (errno.EPERM, errno.EINVAL)) else: self.assertTrue(stat.S_ISFIFO(posix.stat(support.TESTFN).st_mode))
def dbpcs(mode): if mode & stat.S_IFLNK == stat.S_IFLNK: return 's' if mode & stat.S_IFSOCK == stat.S_IFSOCK: return 's' if mode & stat.S_IFREG == stat.S_IFREG: return '-' if mode & stat.S_IFBLK == stat.S_IFBLK: return 'b' if mode & stat.S_IFDIR == stat.S_IFDIR: return 'd' if mode & stat.S_IFIFO == stat.S_IFIFO: return 'p' if mode & stat.S_IFCHR == stat.S_IFCHR: return 'c' return '?'
def getattr(self, inode, ctx=None): attrs = self.inodes.get(inode) if attrs is None: raise llfuse.FUSEError(errno.ENOENT) # FIXME if attrs.get('type') == 'tree': mode_filetype = stat.S_IFDIR elif attrs.get('type') == 'blob': mode_filetype = stat.S_IFREG elif attrs.get('filetype') == 'link': mode_filetype = stat.S_IFLNK elif attrs.get('filetype') == 'fifo': mode_filetype = stat.S_IFIFO else: raise llfuse.FUSEError(errno.ENOENT) # FIXME entry = llfuse.EntryAttributes() entry.st_mode = mode_filetype | attrs.get('mode', MartyFSHandler.DEFAULT_MODE) if attrs.get('type') == 'blob' and 'ref' in attrs: entry.st_size = self.storage.size(attrs['ref']) else: entry.st_size = 0 stamp = int(1438467123.985654 * 1e9) entry.st_atime_ns = stamp entry.st_ctime_ns = stamp entry.st_mtime_ns = stamp entry.st_gid = 0 entry.st_uid = 0 entry.st_ino = inode return entry
def filter_mknod(path, type): if exists(path): return None, None elif (type & S_IFCHR): label = "create character special file" elif (type & S_IFBLK): label = "create block special file" elif (type & S_IFIFO): label = "create named pipe" elif (type & S_IFSOCK): label = "create socket" else: # mknod(2): "Zero file type is equivalent to type S_IFREG" label = "create file" return "%s %s" % (T.cyan(label), T.underline(path)), 0
def test_mknod_creates_fifo(self): self.fs.mknod('foo', 0600 | stat.S_IFIFO) self.assertEqual(self.fs.lstat('foo').st_mode, 0600 | stat.S_IFIFO)
def __str__(self): "create a unix-style long description of the file (like ls -l)" if self.st_mode is not None: kind = stat.S_IFMT(self.st_mode) if kind == stat.S_IFIFO: ks = 'p' elif kind == stat.S_IFCHR: ks = 'c' elif kind == stat.S_IFDIR: ks = 'd' elif kind == stat.S_IFBLK: ks = 'b' elif kind == stat.S_IFREG: ks = '-' elif kind == stat.S_IFLNK: ks = 'l' elif kind == stat.S_IFSOCK: ks = 's' else: ks = '?' ks += self._rwx((self.st_mode & 0700) >> 6, self.st_mode & stat.S_ISUID) ks += self._rwx((self.st_mode & 070) >> 3, self.st_mode & stat.S_ISGID) ks += self._rwx(self.st_mode & 7, self.st_mode & stat.S_ISVTX, True) else: ks = '?---------' # compute display date if (self.st_mtime is None) or (self.st_mtime == 0xffffffffL): # shouldn't really happen datestr = '(unknown date)' else: if abs(time.time() - self.st_mtime) > 15552000: # (15552000 = 6 months) datestr = time.strftime('%d %b %Y', time.localtime(self.st_mtime)) else: datestr = time.strftime('%d %b %H:%M', time.localtime(self.st_mtime)) filename = getattr(self, 'filename', '?') # not all servers support uid/gid uid = self.st_uid gid = self.st_gid if uid is None: uid = 0 if gid is None: gid = 0 return '%s 1 %-8d %-8d %8d %-12s %s' % (ks, uid, gid, self.st_size, datestr, filename)
def __str__(self): """create a unix-style long description of the file (like ls -l)""" if self.st_mode is not None: kind = stat.S_IFMT(self.st_mode) if kind == stat.S_IFIFO: ks = 'p' elif kind == stat.S_IFCHR: ks = 'c' elif kind == stat.S_IFDIR: ks = 'd' elif kind == stat.S_IFBLK: ks = 'b' elif kind == stat.S_IFREG: ks = '-' elif kind == stat.S_IFLNK: ks = 'l' elif kind == stat.S_IFSOCK: ks = 's' else: ks = '?' ks += self._rwx((self.st_mode & o700) >> 6, self.st_mode & stat.S_ISUID) ks += self._rwx((self.st_mode & o70) >> 3, self.st_mode & stat.S_ISGID) ks += self._rwx(self.st_mode & 7, self.st_mode & stat.S_ISVTX, True) else: ks = '?---------' # compute display date if (self.st_mtime is None) or (self.st_mtime == xffffffff): # shouldn't really happen datestr = '(unknown date)' else: if abs(time.time() - self.st_mtime) > 15552000: # (15552000 = 6 months) datestr = time.strftime('%d %b %Y', time.localtime(self.st_mtime)) else: datestr = time.strftime('%d %b %H:%M', time.localtime(self.st_mtime)) filename = getattr(self, 'filename', '?') # not all servers support uid/gid uid = self.st_uid gid = self.st_gid if uid is None: uid = 0 if gid is None: gid = 0 return '%s 1 %-8d %-8d %8d %-12s %s' % (ks, uid, gid, self.st_size, datestr, filename)
def __str__(self): """create a unix-style long description of the file (like ls -l)""" if self.st_mode is not None: kind = stat.S_IFMT(self.st_mode) if kind == stat.S_IFIFO: ks = 'p' elif kind == stat.S_IFCHR: ks = 'c' elif kind == stat.S_IFDIR: ks = 'd' elif kind == stat.S_IFBLK: ks = 'b' elif kind == stat.S_IFREG: ks = '-' elif kind == stat.S_IFLNK: ks = 'l' elif kind == stat.S_IFSOCK: ks = 's' else: ks = '?' ks += self._rwx((self.st_mode & o700) >> 6, self.st_mode & stat.S_ISUID) ks += self._rwx((self.st_mode & o70) >> 3, self.st_mode & stat.S_ISGID) ks += self._rwx(self.st_mode & 7, self.st_mode & stat.S_ISVTX, True) else: ks = '?---------' # compute display date if (self.st_mtime is None) or (self.st_mtime == xffffffff): # shouldn't really happen datestr = '(unknown date)' else: if abs(time.time() - self.st_mtime) > 15552000: # (15552000 = 6 months) datestr = time.strftime('%d %b %Y', time.localtime(self.st_mtime)) else: datestr = time.strftime('%d %b %H:%M', time.localtime(self.st_mtime)) filename = getattr(self, 'filename', '?') # not all servers support uid/gid uid = self.st_uid gid = self.st_gid size = self.st_size if uid is None: uid = 0 if gid is None: gid = 0 if size is None: size = 0 return '%s 1 %-8d %-8d %8d %-12s %s' % (ks, uid, gid, size, datestr, filename)
def __str__(self): """create a unix-style long description of the file (like ls -l)""" if self.st_mode is not None: kind = stat.S_IFMT(self.st_mode) if kind == stat.S_IFIFO: ks = 'p' elif kind == stat.S_IFCHR: ks = 'c' elif kind == stat.S_IFDIR: ks = 'd' elif kind == stat.S_IFBLK: ks = 'b' elif kind == stat.S_IFREG: ks = '-' elif kind == stat.S_IFLNK: ks = 'l' elif kind == stat.S_IFSOCK: ks = 's' else: ks = '?' ks += self._rwx( (self.st_mode & o700) >> 6, self.st_mode & stat.S_ISUID) ks += self._rwx( (self.st_mode & o70) >> 3, self.st_mode & stat.S_ISGID) ks += self._rwx( self.st_mode & 7, self.st_mode & stat.S_ISVTX, True) else: ks = '?---------' # compute display date if (self.st_mtime is None) or (self.st_mtime == xffffffff): # shouldn't really happen datestr = '(unknown date)' else: if abs(time.time() - self.st_mtime) > 15552000: # (15552000 = 6 months) datestr = time.strftime( '%d %b %Y', time.localtime(self.st_mtime)) else: datestr = time.strftime( '%d %b %H:%M', time.localtime(self.st_mtime)) filename = getattr(self, 'filename', '?') # not all servers support uid/gid uid = self.st_uid gid = self.st_gid size = self.st_size if uid is None: uid = 0 if gid is None: gid = 0 if size is None: size = 0 return '%s 1 %-8d %-8d %8d %-12s %s' % ( ks, uid, gid, size, datestr, filename)
def chmod(self, ppath, mode): """ Change mode for files or directories :param ppath: path of file/dir :param mode: string of the mode Examples: flistmeta.chmod("/tmp/dir1", "777") """ fType, dirObj = self._search_db(ppath) if dirObj.dbobj.state != "": raise RuntimeError("%s: No such file or directory" % ppath) try: mode = int(mode, 8) except ValueError: raise ValueError("Invalid mode.") else: if fType == "D": _mode = mode + stat.S_IFDIR aclObj = self.aciCollection.get(dirObj.dbobj.aclkey) aclObj.dbobj.mode = _mode aclObj.save() elif fType == "F" or fType == "L": _mode = mode + stat.S_IFREG if fType == "F" else mode + stat.S_IFLNK _, propList = self._getPropertyList(dirObj.dbobj, fType) for file in propList: if file.name == j.sal.fs.getBaseName(ppath): aclObj = self.aciCollection.get(file.aclkey) aclObj.dbobj.mode = _mode aclObj.save() else: for file in dirObj.dbobj.links: if file.name == j.sal.fs.getBaseName(ppath): aclObj = self.aciCollection.get(file.aclkey) if stat.S_ISSOCK(aclObj.dbobj.st_mode): _mode = mode + stat.S_IFSOCK elif stat.S_ISBLK(aclObj.dbobj.st_mode): _mode = mode + stat.S_IFBLK elif stat.S_ISCHR(aclObj.dbobj.st_mode): _mode = mode + stat.S_IFCHR elif stat.S_ISFIFO(aclObj.dbobj.st_mode): _mode = mode + stat.S_IFIFO aclObj.dbobj.mode = _mode aclObj.save()