我们从Python开源项目中,提取了以下49个代码示例,用于说明如何使用os.stat()。
def mounts(self, detectdev=False): mounts = [] with open('/proc/mounts', 'r') as f: for line in f: dev, path, fstype = line.split()[0:3] if fstype in ('ext2', 'ext3', 'ext4', 'xfs', 'jfs', 'reiserfs', 'btrfs', 'simfs'): # simfs: filesystem in OpenVZ if not os.path.isdir(path): continue mounts.append({'dev': dev, 'path': path, 'fstype': fstype}) for mount in mounts: stat = os.statvfs(mount['path']) total = stat.f_blocks*stat.f_bsize free = stat.f_bfree*stat.f_bsize used = (stat.f_blocks-stat.f_bfree)*stat.f_bsize mount['total'] = b2h(total) mount['free'] = b2h(free) mount['used'] = b2h(used) mount['used_rate'] = div_percent(used, total) if detectdev: dev = os.stat(mount['path']).st_dev mount['major'], mount['minor'] = os.major(dev), os.minor(dev) return mounts
def _warn_unsafe_extraction_path(path): """ If the default extraction path is overridden and set to an insecure location, such as /tmp, it opens up an opportunity for an attacker to replace an extracted file with an unauthorized payload. Warn the user if a known insecure location is used. See Distribute #375 for more details. """ if os.name == 'nt' and not path.startswith(os.environ['windir']): # On Windows, permissions are generally restrictive by default # and temp directories are not writable by other users, so # bypass the warning. return mode = os.stat(path).st_mode if mode & stat.S_IWOTH or mode & stat.S_IWGRP: msg = ("%s is writable by group/others and vulnerable to attack " "when " "used with get_resource_filename. Consider a more secure " "location (set with .set_extraction_path or the " "PYTHON_EGG_CACHE environment variable)." % path) warnings.warn(msg, UserWarning)
def copy_from_host(module): compress = module.params.get('compress') src = module.params.get('src') if not os.path.exists(src): module.fail_json(msg="file not found: {}".format(src)) if not os.access(src, os.R_OK): module.fail_json(msg="file is not readable: {}".format(src)) mode = oct(os.stat(src).st_mode & 0o777) with open(src, 'rb') as f: raw_data = f.read() sha1 = hashlib.sha1(raw_data).hexdigest() data = zlib.compress(raw_data) if compress else raw_data module.exit_json(content=base64.b64encode(data), sha1=sha1, mode=mode, source=src)
def check_is_readonly(self, fname): """ Return `True` if @fname is read-only on the filesystem. @fname Path to a file. """ if not fname: return try: mode = os.stat(fname) read_only = (stat.S_IMODE(mode.st_mode) & stat.S_IWUSR != stat.S_IWUSR) except FileNotFoundError: return return read_only
def place_data_on_block_device(blk_device, data_src_dst): """Migrate data in data_src_dst to blk_device and then remount.""" # mount block device into /mnt mount(blk_device, '/mnt') # copy data to /mnt copy_files(data_src_dst, '/mnt') # umount block device umount('/mnt') # Grab user/group ID's from original source _dir = os.stat(data_src_dst) uid = _dir.st_uid gid = _dir.st_gid # re-mount where the data should originally be # TODO: persist is currently a NO-OP in core.host mount(blk_device, data_src_dst, persist=True) # ensure original ownership of new mount. os.chown(data_src_dst, uid, gid)
def copyfile(src, dst): """Copy data from src to dst""" if _samefile(src, dst): raise Error("`%s` and `%s` are the same file" % (src, dst)) for fn in [src, dst]: try: st = os.stat(fn) except OSError: # File most likely does not exist pass else: # XXX What about other special files? (sockets, devices...) if stat.S_ISFIFO(st.st_mode): raise SpecialFileError("`%s` is a named pipe" % fn) with open(src, 'rb') as fsrc: with open(dst, 'wb') as fdst: copyfileobj(fsrc, fdst)
def newer(self, source, target): """Tell if the target is newer than the source. Returns true if 'source' exists and is more recently modified than 'target', or if 'source' exists and 'target' doesn't. Returns false if both exist and 'target' is the same age or younger than 'source'. Raise PackagingFileError if 'source' does not exist. Note that this test is not very accurate: files created in the same second will have the same "age". """ if not os.path.exists(source): raise DistlibException("file '%r' does not exist" % os.path.abspath(source)) if not os.path.exists(target): return True return os.stat(source).st_mtime > os.stat(target).st_mtime
def postprocess(self, tempname, filename): """Perform any platform-specific postprocessing of `tempname` This is where Mac header rewrites should be done; other platforms don't have anything special they should do. Resource providers should call this method ONLY after successfully extracting a compressed resource. They must NOT call it on resources that are already in the filesystem. `tempname` is the current (temporary) name of the file, and `filename` is the name it will be renamed to by the caller after this routine returns. """ if os.name == 'posix': # Make the resource executable mode = ((os.stat(tempname).st_mode) | 0o555) & 0o7777 os.chmod(tempname, mode)
def unpack_and_compile(self, egg_path, destination): to_compile = [] to_chmod = [] def pf(src, dst): if dst.endswith('.py') and not src.startswith('EGG-INFO/'): to_compile.append(dst) elif dst.endswith('.dll') or dst.endswith('.so'): to_chmod.append(dst) self.unpack_progress(src, dst) return not self.dry_run and dst or None unpack_archive(egg_path, destination, pf) self.byte_compile(to_compile) if not self.dry_run: for f in to_chmod: mode = ((os.stat(f)[stat.ST_MODE]) | 0o555) & 0o7755 chmod(f, mode)
def commonprefix(m): "Given a list of pathnames, returns the longest common leading component" if not m: return '' # Some people pass in a list of pathname parts to operate in an OS-agnostic # fashion; don't try to translate in that case as that's an abuse of the # API and they are already doing what they need to be OS-agnostic and so # they most likely won't be using an os.PathLike object in the sublists. if not isinstance(m[0], (list, tuple)): m = tuple(map(os.fspath, m)) s1 = min(m) s2 = max(m) for i, c in enumerate(s1): if c != s2[i]: return s1[:i] return s1 # Are two stat buffers (obtained from stat, fstat or lstat) # describing the same file?
def serve(self, request, path): # the following code is largely borrowed from `django.views.static.serve` # and django-filetransfers: filetransfers.backends.default fullpath = os.path.join(settings.PRIVATE_MEDIA_ROOT, path) if not os.path.exists(fullpath): raise Http404('"{0}" does not exist'.format(fullpath)) # Respect the If-Modified-Since header. statobj = os.stat(fullpath) content_type = mimetypes.guess_type(fullpath)[0] or 'application/octet-stream' if not was_modified_since(request.META.get('HTTP_IF_MODIFIED_SINCE'), statobj[stat.ST_MTIME], statobj[stat.ST_SIZE]): return HttpResponseNotModified(content_type=content_type) response = HttpResponse(open(fullpath, 'rb').read(), content_type=content_type) response["Last-Modified"] = http_date(statobj[stat.ST_MTIME]) # filename = os.path.basename(path) # response['Content-Disposition'] = smart_str(u'attachment; filename={0}'.format(filename)) return response
def add_local_charm_dir(self, charm_dir, series): """Upload a local charm to the model. This will automatically generate an archive from the charm dir. :param charm_dir: Path to the charm directory :param series: Charm series """ fh = tempfile.NamedTemporaryFile() CharmArchiveGenerator(charm_dir).make_archive(fh.name) with fh: func = partial( self.add_local_charm, fh, series, os.stat(fh.name).st_size) charm_url = await self._connector.loop.run_in_executor(None, func) log.debug('Uploaded local charm: %s -> %s', charm_dir, charm_url) return charm_url
def _write_symlink(self, zf, link_target, link_path): """Package symlinks with appropriate zipfile metadata.""" info = zipfile.ZipInfo() info.filename = link_path info.create_system = 3 # Magic code for symlinks / py2/3 compat # 27166663808 = (stat.S_IFLNK | 0755) << 16 info.external_attr = 2716663808 zf.writestr(info, link_target)
def Send_File_Client(): sendSock = socket.socket(socket.AF_INET,socket.SOCK_STREAM) sendSock.connect(ADDR) fhead=struct.pack('IdI',1,float(time.time()),os.stat(filename).st_size) print(fhead) sendSock.send(fhead) fp = open(filename,'rb') while 1: filedata = fp.read(BUFSIZE) if not filedata: break sendSock.send(filedata) ''' print u"?????????????...\n" fp.close() sendSock.close() print u"?????...\n" '''
def maybe_download_and_extract(): """Download and extract the tarball from Alex's website.""" dest_directory = FLAGS.data_dir if not os.path.exists(dest_directory): os.makedirs(dest_directory) filename = DATA_URL.split('/')[-1] filepath = os.path.join(dest_directory, filename) if not os.path.exists(filepath): def _progress(count, block_size, total_size): sys.stdout.write('\r>> Downloading %s %.1f%%' % (filename, float(count * block_size) / float(total_size) * 100.0)) sys.stdout.flush() filepath, _ = urllib.request.urlretrieve(DATA_URL, filepath, reporthook=_progress) print() statinfo = os.stat(filepath) print('Successfully downloaded', filename, statinfo.st_size, 'bytes.') tarfile.open(filepath, 'r:gz').extractall(dest_directory)
def checkcache(filename=None): """Discard cache entries that are out of date. (This is not checked upon each call!)""" if filename is None: filenames = cache.keys() else: if filename in cache: filenames = [filename] else: return for filename in filenames: size, mtime, lines, fullname = cache[filename] if mtime is None: continue # no-op for files loaded via a __loader__ try: stat = os.stat(fullname) except os.error: del cache[filename] continue if size != stat.st_size or mtime != stat.st_mtime: del cache[filename]
def newer(source, target): """Tells if the target is newer than the source. Return true if 'source' exists and is more recently modified than 'target', or if 'source' exists and 'target' doesn't. Return false if both exist and 'target' is the same age or younger than 'source'. Raise DistutilsFileError if 'source' does not exist. Note that this test is not very accurate: files created in the same second will have the same "age". """ if not os.path.exists(source): raise DistutilsFileError("file '%s' does not exist" % os.path.abspath(source)) if not os.path.exists(target): return True return os.stat(source).st_mtime > os.stat(target).st_mtime
def load_stats(self, arg): if not arg: self.stats = {} elif isinstance(arg, basestring): f = open(arg, 'rb') self.stats = marshal.load(f) f.close() try: file_stats = os.stat(arg) arg = time.ctime(file_stats.st_mtime) + " " + arg except: # in case this is not unix pass self.files = [ arg ] elif hasattr(arg, 'create_stats'): arg.create_stats() self.stats = arg.stats arg.stats = {} if not self.stats: raise TypeError, "Cannot create or construct a %r object from '%r''" % ( self.__class__, arg) return
def synopsis(filename, cache={}): """Get the one-line summary out of a module file.""" mtime = os.stat(filename).st_mtime lastupdate, result = cache.get(filename, (0, None)) if lastupdate < mtime: info = inspect.getmoduleinfo(filename) try: file = open(filename) except IOError: # module can't be opened, so skip it return None if info and 'b' in info[2]: # binary modules have to be imported try: module = imp.load_module('__temp__', file, filename, info[1:]) except: return None result = (module.__doc__ or '').splitlines()[0] del sys.modules['__temp__'] else: # text modules can be directly examined result = source_synopsis(file) file.close() cache[filename] = (mtime, result) return result
def listsubfolders(self, name): """Return the names of the subfolders in a given folder (prefixed with the given folder name).""" fullname = os.path.join(self.path, name) # Get the link count so we can avoid listing folders # that have no subfolders. nlinks = os.stat(fullname).st_nlink if nlinks <= 2: return [] subfolders = [] subnames = os.listdir(fullname) for subname in subnames: fullsubname = os.path.join(fullname, subname) if os.path.isdir(fullsubname): name_subname = os.path.join(name, subname) subfolders.append(name_subname) # Stop looking for subfolders when # we've seen them all nlinks = nlinks - 1 if nlinks <= 2: break subfolders.sort() return subfolders
def get_latest_data_subdir(pattern=None, take=-1): def get_date(f): return os.stat(os.path.join(BASE_DATA_DIR, f)).st_mtime try: dirs = next(os.walk(BASE_DATA_DIR))[1] except StopIteration: return None if pattern is not None: dirs = (d for d in dirs if pattern in d) dirs = list(sorted(dirs, key=get_date)) if len(dirs) == 0: return None return dirs[take]
def filemetadata(filename): # type: (str) -> Optional[FileMeta] p_filename = which(filename) if p_filename is None: return None filename = p_filename s = os.stat(filename) if filename != sys.executable: result = run_executable(filename, ['--version']) versionstring = result.stdout else: # filename is the Python interpreter itself versionstring = bytestr(sys.version) return FileMeta(filename, s.st_size, s.st_mtime, filesha(filename), versionstring) # ----------------------------------------------------------------------
def handle(self, *args, **options): commit_msg_path = os.path.join(self.HOOK_PATH, 'commit-msg') hook_exists = os.path.exists(commit_msg_path) if hook_exists: with open(commit_msg_path, 'r') as fp: hook_content = fp.read() else: hook_content = '#!/usr/bin/env bash\n\n' if 'ZERODOWNTIME_COMMIT_MSG_HOOK' not in hook_content: hook_content += COMMIT_MSG_HOOK with open(commit_msg_path, 'w') as fp: fp.write(hook_content) st = os.stat(commit_msg_path) os.chmod(commit_msg_path, st.st_mode | stat.S_IEXEC)
def _clean_upgrade(binary_ok, binary_path, path, temp_path): if binary_ok: import stat # save the permissions from the current binary old_stat = os.stat(binary_path) # rename the current binary in order to replace it with the latest os.rename(binary_path, path + "/old") os.rename(temp_path, binary_path) # set the same permissions that had the previous binary os.chmod(binary_path, old_stat.st_mode | stat.S_IEXEC) # delete the old binary os.remove(path + "/old") print("mongoaudit updated, restarting...") os.execl(binary_path, binary_path, *sys.argv) else: os.remove(temp_path) print("couldn't download the latest binary")
def handle_ref_error(self): try: if os.stat(self.ref).st_size>0: with open(self.ref) as f: for i in range(2): line=f.next().strip() if i == 0 and line[0]!='>': return QtGui.QMessageBox.question(self, 'Error !', 'Please check your input reference !', QtGui.QMessageBox.Ok) if i == 1 and len(re.findall("[^ATGCN]", line.upper()))>0: return QtGui.QMessageBox.question(self, 'Error !', 'Please check your input reference !', QtGui.QMessageBox.Ok) else: return QtGui.QMessageBox.question(self, 'Warning !', 'The selected reference file is empty, please check !', QtGui.QMessageBox.Ok) except: return QtGui.QMessageBox.question(self, 'Error !', 'Please input a reference file !', QtGui.QMessageBox.Ok)
def find_INSTALL(Makefile, flags): ''' See the doc-string for find_prefix as well. Sets Makefile['INSTALL'] if needed. $(INSTALL) is normally "install", but on Solares it needs to be "/usr/ucb/install". ''' if 'INSTALL' not in Makefile: trywith = [ '/usr/ucb/install' ] for install in trywith: try: os.stat(install) except: continue if flags['v']: sys.stdout.write('Using "' + install + '" as `install`\n.') Makefile['INSTALL'] = install return False Makefile['INSTALL'] = 'install' return False
def __hashEntry(self, prefix, entry, s): if stat.S_ISREG(s.st_mode): digest = self.__index.check(prefix, entry, s, hashFile) elif stat.S_ISDIR(s.st_mode): digest = self.__hashDir(prefix, entry) elif stat.S_ISLNK(s.st_mode): digest = self.__index.check(prefix, entry, s, DirHasher.__hashLink) elif stat.S_ISBLK(s.st_mode) or stat.S_ISCHR(s.st_mode): digest = struct.pack("<L", s.st_rdev) elif stat.S_ISFIFO(s.st_mode): digest = b'' else: digest = b'' logging.getLogger(__name__).warning("Unknown file: %s", entry) return digest
def download(url, filename): """Download .su3 file, return True on success""" USER_AGENT = "Wget/1.11.4" url = "{}i2pseeds.su3".format(url) req = urllib.request.Request(url, headers={"User-Agent": USER_AGENT}) try: with urllib.request.urlopen(req) as resp: with open(filename, 'wb') as f: f.write(resp.read()) if os.stat(filename).st_size > 0: return True else: return False except URLError as e: return False
def assert_file_contents_equal(a, b): ''' Assert that two files have the same size and hash. ''' def generate_error_message(a, b): ''' This creates the error message for the assertion error ''' size_a = os.stat(a).st_size size_b = os.stat(b).st_size if size_a == size_b: return "Files have the same size but different contents" else: return "Files have different sizes: a:%d b: %d" % (size_a, size_b) assert file_digest(a) == file_digest(b), generate_error_message(a, b)
def cached_data_age(self, name): """Return age of data cached at `name` in seconds or 0 if cache doesn't exist :param name: name of datastore :type name: ``unicode`` :returns: age of datastore in seconds :rtype: ``int`` """ cache_path = self.cachefile('%s.%s' % (name, self.cache_serializer)) if not os.path.exists(cache_path): return 0 return time.time() - os.stat(cache_path).st_mtime
def seek(self, offset, whence=os.SEEK_SET): """Similar to 'seek' of file descriptor; works only for regular files. """ if whence == os.SEEK_SET: self._overlap.Offset = offset elif whence == os.SEEK_CUR: self._overlap.Offset += offset else: assert whence == os.SEEK_END if isinstance(self._path, str): sb = os.stat(self._path) self._overlap.Offset = sb.st_size + offset else: self._overlap.Offset = offset
def is_block_device(path): ''' Confirm device at path is a valid block device node. :returns: boolean: True if path is a block device, False if not. ''' if not os.path.exists(path): return False return S_ISBLK(os.stat(path).st_mode)
def maybe_download(filename, work_directory): """Download the data from Yann's website, unless it's already here.""" if not os.path.exists(work_directory): os.mkdir(work_directory) filepath = os.path.join(work_directory, filename) if not os.path.exists(filepath): filepath, _ = urllib.request.urlretrieve(SOURCE_URL + filename, filepath) statinfo = os.stat(filepath) print('Succesfully downloaded', filename, statinfo.st_size, 'bytes.') return filepath