我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用os.statvfs()。
def mounts(self, detectdev=False): mounts = [] with open('/proc/mounts', 'r') as f: for line in f: dev, path, fstype = line.split()[0:3] if fstype in ('ext2', 'ext3', 'ext4', 'xfs', 'jfs', 'reiserfs', 'btrfs', 'simfs'): # simfs: filesystem in OpenVZ if not os.path.isdir(path): continue mounts.append({'dev': dev, 'path': path, 'fstype': fstype}) for mount in mounts: stat = os.statvfs(mount['path']) total = stat.f_blocks*stat.f_bsize free = stat.f_bfree*stat.f_bsize used = (stat.f_blocks-stat.f_bfree)*stat.f_bsize mount['total'] = b2h(total) mount['free'] = b2h(free) mount['used'] = b2h(used) mount['used_rate'] = div_percent(used, total) if detectdev: dev = os.stat(mount['path']).st_dev mount['major'], mount['minor'] = os.major(dev), os.minor(dev) return mounts
def used_inodes_percent(self, check_config): check_config = self._strip_percent_sign_from_check_config(check_config) for part in self.psutil.disk_partitions(all=False): s = os.statvfs(part.mountpoint) try: inodes_usage = int((s.f_files - s.f_favail) * 100 / s.f_files) except ZeroDivisionError: continue status = self._value_to_status_less( inodes_usage, check_config, self._strip_percent_sign ) if status != self.STATUS_OK: return ( status, 'Partition {} uses {}% of inodes'.format(part.mountpoint, inodes_usage) ) return self.STATUS_OK, 'Inodes usage correct'
def get_path_usage(path): space_st = os.statvfs(path) # f_bavail: without blocks reserved for super users # f_bfree: with blocks reserved for super users avail = space_st.f_frsize * space_st.f_bavail capa = space_st.f_frsize * space_st.f_blocks used = capa - avail return { 'total': capa, 'used': used, 'available': avail, 'percent': float(used) / capa, }
def _check_dir_free_space(chk_dir, required_space=1): """Check that directory has some free space. :param chk_dir: Directory to check :param required_space: amount of space to check for in MiB. :raises InsufficientDiskSpace: if free space is < required space """ # check that we have some free space stat = os.statvfs(chk_dir) # get dir free space in MiB. free_space = float(stat.f_bsize * stat.f_bavail) / 1024 / 1024 # check for at least required_space MiB free if free_space < required_space: raise exception.InsufficientDiskSpace(path=chk_dir, required=required_space, actual=free_space)
def select(self): currentFolder = self.getPreferredFolder() # Do nothing unless current Directory is valid if currentFolder is not None: # Check if we need to have a minimum of free Space available if self.minFree is not None: # Try to read fs stats try: s = os.statvfs(currentFolder) if (s.f_bavail * s.f_bsize) / 1000000 > self.minFree: # Automatically confirm if we have enough free disk Space available return self.selectConfirmed(True) except OSError: pass # Ask User if he really wants to select this folder self.session.openWithCallback( self.selectConfirmed, MessageBox, _("There might not be enough Space on the selected Partition.\nDo you really want to continue?"), type = MessageBox.TYPE_YESNO ) # No minimum free Space means we can safely close else: self.selectConfirmed(True)
def update(self): try: stat = statvfs(self.path) except OSError: return -1 if self.type == self.FREE: try: percent = '(' + str((100 * stat.f_bavail) // stat.f_blocks) + '%)' free = stat.f_bfree * stat.f_bsize if free < 10000000: free = _("%d kB") % (free >> 10) elif free < 10000000000: free = _("%d MB") % (free >> 20) else: free = _("%d Gb") % (free >> 30) self.setText(" ".join((free, percent, _("free diskspace")))) except: # occurs when f_blocks is 0 or a similar error self.setText("-?-")
def getDiskInfo(self, path): def isMountPoint(): try: fd = open('/proc/mounts', 'r') for line in fd: l = line.split() if len(l) > 1 and l[1] == path: return True fd.close() except: return None return False result = [0,0,0,0] # (size, used, avail, use%) if isMountPoint(): try: st = statvfs(path) except: st = None if not st is None and not 0 in (st.f_bsize, st.f_blocks): result[0] = st.f_bsize * st.f_blocks # size result[2] = st.f_bsize * st.f_bavail # avail result[1] = result[0] - result[2] # used result[3] = result[1] * 100 / result[0] # use% return result
def diskSize(self): cap = 0 try: line = readFile(self.sysfsPath('size')) cap = int(line) return cap / 1000 * 512 / 1000 except: dev = self.findMount() if dev: try: stat = os.statvfs(dev) cap = int(stat.f_blocks * stat.f_bsize) return cap / 1000 / 1000 except: pass return cap
def diskUsage(path): """Return disk usage statistics about the given path. Inspired by: https://stackoverflow.com/questions/4274899/get-actual-disk-space Returned values is a named tuple with attributes 'total', 'used' and 'free', which are the amount of total, used and free space, in bytes. """ st = os.statvfs(path) free = st.f_bavail * st.f_frsize total = st.f_blocks * st.f_frsize used = (st.f_blocks - st.f_bfree) * st.f_frsize return {'total':total, 'used':used, 'free':free} ######################## ### Convert to FASTQ ### ######################## ### Before beginning, check there is enough memory on disk
def getFreeSpace(self): free_space = 0 if "statvfs" in dir(os): # Unix statvfs = os.statvfs(config.data_dir) free_space = statvfs.f_frsize * statvfs.f_bavail else: # Windows try: import ctypes free_space_pointer = ctypes.c_ulonglong(0) ctypes.windll.kernel32.GetDiskFreeSpaceExW( ctypes.c_wchar_p(config.data_dir), None, None, ctypes.pointer(free_space_pointer) ) free_space = free_space_pointer.value except Exception, err: self.log.debug("GetFreeSpace error: %s" % err) return free_space
def stream_host_stats(): while True: net = psutil.net_io_counters(pernic=True) time.sleep(1) net1 = psutil.net_io_counters(pernic=True) net_stat_download = {} net_stat_upload = {} for k, v in net.items(): for k1, v1 in net1.items(): if k1 == k: net_stat_download[k] = (v1.bytes_recv - v.bytes_recv) / 1000. net_stat_upload[k] = (v1.bytes_sent - v.bytes_sent) / 1000. ds = statvfs('/') disk_str = {"Used": ((ds.f_blocks - ds.f_bfree) * ds.f_frsize) / 10 ** 9, "Unused": (ds.f_bavail * ds.f_frsize) / 10 ** 9} yield '[{"cpu":"%s","memory":"%s","memTotal":"%s","net_stats_down":"%s","net_stats_up":"%s","disk":"%s"}],' \ % (psutil.cpu_percent(interval=1), psutil.virtual_memory().used, psutil.virtual_memory().free, \ net_stat_download, net_stat_upload, disk_str)
def disk_usage(path): """Return disk usage associated with path.""" try: st = os.statvfs(path) except UnicodeEncodeError: if not PY3 and isinstance(path, unicode): # this is a bug with os.statvfs() and unicode on # Python 2, see: # - https://github.com/giampaolo/psutil/issues/416 # - http://bugs.python.org/issue18695 try: path = path.encode(sys.getfilesystemencoding()) except UnicodeEncodeError: pass st = os.statvfs(path) else: raise free = (st.f_bavail * st.f_frsize) total = (st.f_blocks * st.f_frsize) used = (st.f_blocks - st.f_bfree) * st.f_frsize percent = usage_percent(used, total, _round=1) # NB: the percentage is -5% than what shown by df due to # reserved blocks that we are currently not considering: # http://goo.gl/sWGbH return sdiskusage(total, used, free, percent)
def statfs(self, path): full_path = self.join_path(path) stv = os.statvfs(full_path) stat = dict((key, getattr(stv, key)) for key in ('f_bavail', 'f_bfree', 'f_blocks', 'f_bsize', 'f_favail', 'f_ffree', 'f_files', 'f_flag', 'f_frsize', 'f_namemax' ) ) return stat
def statfs(self, path): full_path = self._full_path(path) stv = os.statvfs(full_path) stat = dict((key, getattr(stv, key)) for key in ('f_bavail', 'f_bfree', 'f_blocks', 'f_bsize', 'f_favail', 'f_ffree', 'f_files', 'f_flag', 'f_frsize', 'f_namemax' ) ) return stat
def _get_stats(self, mount_point): data = os.statvfs(mount_point) total = (data.f_blocks * data.f_bsize) free = (data.f_bfree * data.f_bsize) used_percent = 100 - (100.0 * free / total) total_inode = data.f_files free_inode = data.f_ffree used_percent_inode = 100 - (100.0 * free_inode / total_inode) used = total - free used_inode = total_inode - free_inode return {'total': total, 'free': free, 'used_percent': used_percent, 'total_inode': total_inode, 'free_inode': free_inode, 'used_inode': used_inode, 'used': used, 'used_percent_inode': used_percent_inode}
def update(self): try: stat = statvfs(self.path) except OSError: return -1 if self.type == self.FREE: try: percent = '(' + str((100 * stat.f_bavail) // stat.f_blocks) + '%)' free = stat.f_bfree * stat.f_bsize if free < 10000000: free = "%d kB" % (free >> 10) elif free < 10000000000: free = "%d MB" % (free >> 20) else: free = "%d GB" % (free >> 30) self.setText(_("%s %s free disk space") % (free, percent)) except: # occurs when f_blocks is 0 or a similar error self.setText("-?-")
def main(): args = parse_args() mountpoints = [] with open('/proc/mounts', 'r') as fp: for line in fp: a, mountpoint, fstype, a = line.split(' ', 3) if fstype in ['ext2', 'ext3', 'ext4', 'xfs']: mountpoints.append(mountpoint) template = args.prefix + '.{}.{} {} ' + str(int(time())) for mp in mountpoints: stat = os.statvfs(mp) used = stat.f_frsize * stat.f_blocks - stat.f_bfree * stat.f_bsize size = stat.f_frsize * stat.f_blocks if mp == '/': mp = 'rootfs' mp = mp.replace('/', '_').lstrip('_') print(template.format(mp, 'used', used)) print(template.format(mp, 'size', size))
def test_structseq(self): import time import os t = time.localtime() for proto in protocols: s = self.dumps(t, proto) u = self.loads(s) self.assertEqual(t, u) if hasattr(os, "stat"): t = os.stat(os.curdir) s = self.dumps(t, proto) u = self.loads(s) self.assertEqual(t, u) if hasattr(os, "statvfs"): t = os.statvfs(os.curdir) s = self.dumps(t, proto) u = self.loads(s) self.assertEqual(t, u) # Tests for protocol 2
def test_sample_free_space(self): """Test collecting information about free space.""" counter = mock_counter(1) def statvfs(path, multiplier=lambda: next(counter)): return os.statvfs_result( (4096, 0, mb(1000), mb(multiplier() * 100), 0, 0, 0, 0, 0, 0)) plugin = self.get_mount_info( statvfs=statvfs, create_time=self.reactor.time) step_size = self.monitor.step_size self.monitor.add(plugin) self.reactor.advance(step_size) message = plugin.create_free_space_message() self.assertTrue(message) self.assertEqual(message.get("type"), "free-space") free_space = message.get("free-space", ()) self.assertEqual(len(free_space), 1) self.assertEqual(free_space[0], (step_size, "/", 409600))
def test_resynchronize(self): """ On the reactor "resynchronize" event, new mount-info messages should be sent. """ plugin = self.get_mount_info( create_time=self.reactor.time, statvfs=statvfs_result_fixture) self.monitor.add(plugin) plugin.run() plugin.exchange() self.reactor.fire("resynchronize", scopes=["disk"]) plugin.run() plugin.exchange() messages = self.mstore.get_pending_messages() messages = [message for message in messages if message["type"] == "mount-info"] expected_message = { "type": "mount-info", "mount-info": [(0, {"device": "/dev/hda1", "mount-point": "/", "total-space": 4096000, "filesystem": "ext3"})]} self.assertMessages(messages, [expected_message, expected_message])
def __init__(self, interval=300, monitor_interval=60 * 60, mounts_file="/proc/mounts", create_time=time.time, statvfs=None, mtab_file="/etc/mtab"): self.run_interval = interval self._monitor_interval = monitor_interval self._create_time = create_time self._mounts_file = mounts_file self._mtab_file = mtab_file if statvfs is None: statvfs = os.statvfs self._statvfs = statvfs self._create_time = create_time self._free_space = [] self._mount_info = [] self._mount_info_to_persist = None self.is_device_removable = is_device_removable
def _get_drive_usage(path): """ Use Python libraries to get drive space/usage statistics. Prior to v3.3, use `os.statvfs`; on v3.3+, use the more accurate `shutil.disk_usage`. """ if sys.version_info >= (3, 3): usage = shutil.disk_usage(path) return { "total": usage.total, "used": usage.used, "free": usage.free, } else: # with os.statvfs, we need to multiple block sizes by block counts to get bytes stats = os.statvfs(path) total = stats.f_frsize * stats.f_blocks free = stats.f_frsize * stats.f_bavail return { "total": total, "free": free, "used": total - free, }
def get_free_space(path=settings.KOLIBRI_HOME): if sys.platform.startswith('win'): import ctypes free = ctypes.c_ulonglong(0) check = ctypes.windll.kernel32.GetDiskFreeSpaceExW(ctypes.c_wchar_p(path), None, None, ctypes.pointer(free)) if check == 0: raise ctypes.winError() result = free.value else: st = os.statvfs(path) result = st.f_bavail * st.f_frsize return result # Utility functions for pinging or killing PIDs
def fsbsize(path): """ Get optimal file system buffer size (in bytes) for I/O calls """ path = encode(path) if os.name == "nt": import ctypes drive = "%s\\" % os.path.splitdrive(path)[0] cluster_sectors, sector_size = ctypes.c_longlong(0) ctypes.windll.kernel32.GetDiskFreeSpaceW(ctypes.c_wchar_p(drive), ctypes.pointer( cluster_sectors), ctypes.pointer(sector_size), None, None) return cluster_sectors * sector_size else: return os.statvfs(path).f_frsize
def test_structseq(self): import time import os t = time.localtime() for proto in protocols: s = self.dumps(t, proto) u = self.loads(s) self.assertEqual(t, u) if hasattr(os, "stat"): t = os.stat(os.curdir) s = self.dumps(t, proto) u = self.loads(s) self.assertEqual(t, u) if hasattr(os, "statvfs"): t = os.statvfs(os.curdir) s = self.dumps(t, proto) u = self.loads(s) self.assertEqual(t, u)
def bestRecordingLocation(candidates): path = '' biggest = 0 for candidate in candidates: try: stat = os.statvfs(candidate[1]) # must have some free space (i.e. not read-only) if stat.f_bavail: # Free space counts double size = (stat.f_blocks + stat.f_bavail) * stat.f_bsize if size > biggest: path = candidate[1] biggest = size except Exception, e: print "[DRL]", e return path
def select(self): currentFolder = self.getPreferredFolder() # Do nothing unless current Directory is valid if currentFolder is not None: # Check if we need to have a minimum of free Space available if self.minFree is not None: # Try to read fs stats try: s = os.statvfs(currentFolder) if (s.f_bavail * s.f_bsize) / 1000000 > self.minFree: # Automatically confirm if we have enough free disk Space available return self.selectConfirmed(True) except OSError: pass # Ask User if he really wants to select this folder self.session.openWithCallback( self.selectConfirmed, MessageBox, _("There might not be enough space on the selected partition..\nDo you really want to continue?"), type = MessageBox.TYPE_YESNO ) # No minimum free Space means we can safely close else: self.selectConfirmed(True)
def freespace(self): self.MountPath = None if not self.dirname: dirname = findSafeRecordPath(defaultMoviePath()) else: dirname = findSafeRecordPath(self.dirname) if dirname is None: dirname = findSafeRecordPath(defaultMoviePath()) self.dirnameHadToFallback = True if not dirname: return False self.MountPath = dirname mountwriteable = os.access(dirname, os.W_OK) if not mountwriteable: self.log(0, ("Mount '%s' is not writeable." % dirname)) return False s = os.statvfs(dirname) if (s.f_bavail * s.f_bsize) / 1000000 < 1024: self.log(0, "Not enough free space to record") return False else: self.log(0, "Found enough free space to record") return True
def disk_metrics(): """Get the disk usage, in bytes.""" def add_usage(ms, dus, dname): try: for i, val in enumerate(dus): if val.decode("utf-8") == dname: ms[dname] = int(dus[i-1]) except: pass # Overall disk usage statinfo = os.statvfs("/") metrics = {"used": statinfo.f_frsize * (statinfo.f_blocks - statinfo.f_bfree)} # Per-directory disk usage dirs = ["/home", "/nix", "/srv", "/tmp", "/var"] argv = ["du", "-s", "-b"] argv.extend(dirs) # why doesn't python have an expression variant of this!? dus = subprocess.check_output(argv).split() for dname in dirs: add_usage(metrics, dus, dname) return metrics
def disk(): c = statsd.StatsClient(STATSD_HOST, 8125, prefix=PREFIX + 'system.disk') while True: for path, label in PATHS: disk_usage = psutil.disk_usage(path) st = os.statvfs(path) total_inode = st.f_files free_inode = st.f_ffree inode_percentage = int(100*(float(total_inode - free_inode) / total_inode)) c.gauge('%s.inodes.percent' % label, inode_percentage) c.gauge('%s.total' % label, disk_usage.total) c.gauge('%s.used' % label, disk_usage.used) c.gauge('%s.free' % label, disk_usage.free) c.gauge('%s.percent' % label, disk_usage.percent) time.sleep(GRANULARITY)
def df(fs=".", blocksize=1024): "returns free space on a filesystem (in bytes)" try: from os import statvfs statvfs_found = 1 except: statvfs_found = 0 if statvfs_found: (f_bsize, f_frsize, f_blocks, f_bfree, f_bavail, f_files, f_ffree, f_favail, f_flag, f_namemax) = statvfs(fs) return long(f_bavail) * long(f_bsize) else: # Not very portable p = os.popen("df " + fs) s = string.split(string.rstrip(p.readline())) for i in range(len(s)): if s[i] == "Available": s = string.split(string.rstrip(p.readline())) p.close() return int(s[i]) * long(blocksize) p.close()
def test_structseq(self): import time import os t = time.localtime() for proto in protocols: s = self.dumps(t, proto) u = self.loads(s) self.assert_is_copy(t, u) if hasattr(os, "stat"): t = os.stat(os.curdir) s = self.dumps(t, proto) u = self.loads(s) self.assert_is_copy(t, u) if hasattr(os, "statvfs"): t = os.statvfs(os.curdir) s = self.dumps(t, proto) u = self.loads(s) self.assert_is_copy(t, u)
def statfs(self): """ Should return an object with statvfs attributes (f_bsize, f_frsize...). Eg., the return value of os.statvfs() is such a thing (since py 2.2). If you are not reusing an existing statvfs object, start with fuse.StatVFS(), and define the attributes. To provide usable information (ie., you want sensible df(1) output, you are suggested to specify the following attributes: - f_bsize - preferred size of file blocks, in bytes - f_frsize - fundamental size of file blcoks, in bytes [if you have no idea, use the same as blocksize] - f_blocks - total number of blocks in the filesystem - f_bfree - number of free blocks - f_files - total number of file inodes - f_ffree - nunber of free file inodes """ return os.statvfs(".")
def statfs(self, path): if common.windows: lpSectorsPerCluster = ctypes.c_ulonglong(0) lpBytesPerSector = ctypes.c_ulonglong(0) lpNumberOfFreeClusters = ctypes.c_ulonglong(0) lpTotalNumberOfClusters = ctypes.c_ulonglong(0) ret = windll.kernel32.GetDiskFreeSpaceW(ctypes.c_wchar_p(path), ctypes.pointer(lpSectorsPerCluster), ctypes.pointer(lpBytesPerSector), ctypes.pointer(lpNumberOfFreeClusters), ctypes.pointer(lpTotalNumberOfClusters)) if not ret: raise WindowsError free_blocks = lpNumberOfFreeClusters.value * lpSectorsPerCluster.value result = {'f_bavail': free_blocks, 'f_bfree': free_blocks, 'f_bsize': lpBytesPerSector.value, 'f_frsize': lpBytesPerSector.value, 'f_blocks': lpTotalNumberOfClusters.value * lpSectorsPerCluster.value, 'f_namemax': wintypes.MAX_PATH} return result else: stv = os.statvfs(path) # f_flag causes python interpreter crashes in some cases. i don't get it. return dict((key, getattr(stv, key)) for key in ('f_bavail', 'f_bfree', 'f_blocks', 'f_bsize', 'f_favail', 'f_ffree', 'f_files', 'f_frsize', 'f_namemax'))
def _get_stats(mount_point): data = os.statvfs(mount_point) total = (data.f_blocks * data.f_bsize) free = (data.f_bfree * data.f_bsize) used_percent = 100 - (100.0 * free / total) total_inode = data.f_files free_inode = data.f_ffree used_percent_inode = 100 - (100.0 * free_inode / total_inode) used = total - free used_inode = total_inode - free_inode return {'total': total, 'free': free, 'used_percent': used_percent, 'total_inode': total_inode, 'free_inode': free_inode, 'used_inode': used_inode, 'used': used, 'used_percent_inode': used_percent_inode}
def get_total_disk_space(): cwd = os.getcwd() # Windows is the only platform that doesn't support os.statvfs, so # we need to special case this. if sys.platform.startswith('win'): _, total, free = (ctypes.c_ulonglong(), ctypes.c_ulonglong(), \ ctypes.c_ulonglong()) if sys.version_info >= (3,) or isinstance(cwd, unicode): fn = ctypes.windll.kernel32.GetDiskFreeSpaceExW else: fn = ctypes.windll.kernel32.GetDiskFreeSpaceExA ret = fn(cwd, ctypes.byref(_), ctypes.byref(total), ctypes.byref(free)) if ret == 0: # WinError() will fetch the last error code. raise ctypes.WinError() return (total.value, free.value) else: st = os.statvfs(cwd) free = st.f_bavail * st.f_frsize total = st.f_blocks * st.f_frsize return (total, free)
def _getStats(mountPoint): data = os.statvfs(mountPoint) total = (data.f_blocks * data.f_bsize) / ONE_GB_BYTES free = (data.f_bfree * data.f_bsize) / ONE_GB_BYTES used_percent = 100 - (100.0 * free / total) total_inode = data.f_files free_inode = data.f_ffree used_percent_inode = 100 - (100.0 * free_inode / total_inode) used = total - free used_inode = total_inode - free_inode return {'total': total, 'free': free, 'used_percent': used_percent, 'total_inode': total_inode, 'free_inode': free_inode, 'used_inode': used_inode, 'used': used, 'used_percent_inode': used_percent_inode}
def get_free_space_bytes(folder): """ Return folder/drive free space (in bytes) """ if platform.system() == 'Windows': _free_bytes = ctypes.c_ulonglong(0) _total_bytes = ctypes.c_ulonglong(0) ctypes.windll.kernel32.GetDiskFreeSpaceExW(folder, None, ctypes.pointer(_total_bytes), ctypes.pointer(_free_bytes)) total_bytes = _total_bytes.value free_bytes = _free_bytes.value else: try: st = os.statvfs(folder) total_bytes = st.f_blocks * st.f_frsize free_bytes = st.f_bavail * st.f_frsize except: total_bytes = 0 free_bytes = 0 return total_bytes, free_bytes