我们从Python开源项目中,提取了以下39个代码示例,用于说明如何使用stat.ST_INO。
def emit(self, record): """ Emit a record. First check if the underlying file has changed, and if it has, close the old stream and reopen the file to get the current stream. """ if not os.path.exists(self.baseFilename): stat = None changed = 1 else: stat = os.stat(self.baseFilename) changed = (stat[ST_DEV] != self.dev) or (stat[ST_INO] != self.ino) if changed and self.stream is not None: self.stream.flush() self.stream.close() self.stream = self._open() if stat is None: stat = os.stat(self.baseFilename) self.dev, self.ino = stat[ST_DEV], stat[ST_INO] logging.FileHandler.emit(self, record)
def get_last_offset(self, f): st = os.fstat(f.fileno()) ino = st[stat.ST_INO] size = st[stat.ST_SIZE] try: last = self.read_last_stat() except IOError: # no such file last = {'inode': 0, 'offset': 0} if last['inode'] == ino and last['offset'] <= size: return last['offset'] else: return 0
def __init__(self, filename, mode='a', encoding=None, delay=0): logging.FileHandler.__init__(self, filename, mode, encoding, delay) if not os.path.exists(self.baseFilename): self.dev, self.ino = -1, -1 else: stat = os.stat(self.baseFilename) self.dev, self.ino = stat[ST_DEV], stat[ST_INO]
def _statstream(self): if self.stream: sres = os.fstat(self.stream.fileno()) self.dev, self.ino = sres[ST_DEV], sres[ST_INO]
def write_last_stat(self, f, offset): st = os.fstat(f.fileno()) ino = st[stat.ST_INO] last = { "inode": ino, "offset": offset, } fsutil.write_file(self.stat_path(), utfjson.dump(last)) logger.info('position written fn=%s inode=%d offset=%d' % ( self.fn, ino, offset))
def emit(self, record): """ Emit a record. First check if the underlying file has changed, and if it has, close the old stream and reopen the file to get the current stream. """ # Reduce the chance of race conditions by stat'ing by path only # once and then fstat'ing our new fd if we opened a new log stream. # See issue #14632: Thanks to John Mulligan for the problem report # and patch. try: # stat the file by path, checking for existence sres = os.stat(self.baseFilename) except OSError as err: if err.errno == errno.ENOENT: sres = None else: raise # compare file system stat with that of our stream file handle if not sres or sres[ST_DEV] != self.dev or sres[ST_INO] != self.ino: if self.stream is not None: # we have an open file handle, clean it up self.stream.flush() self.stream.close() self.stream = None # See Issue #21742: _open () might fail. # open a new file handle and get new stat info from that fd self.stream = self._open() self._statstream() logging.FileHandler.emit(self, record)
def fetch_js(): libs = { 'd3.v3.min.js': 'https://d3js.org/d3.v3.min.js', 'd3.v4.min.js': 'https://d3js.org/d3.v4.min.js', 'svg-crowbar.js': 'https://nytimes.github.io/svg-crowbar/svg-crowbar.js', } for js, url in libs.iteritems(): target = os.path.sep.join(('LemonGraph', 'data', js)) source = os.path.sep.join(('deps', 'js', js)) dotsource = os.path.sep.join(('deps', 'js', '.%s' % js)) try: os.mkdir(os.path.sep.join(('deps', 'js'))) except OSError: pass try: s1 = os.stat(source) except OSError: with open(dotsource, 'wb') as fh: for chunk in do_curl(url): fh.write(chunk) s1 = os.fstat(fh.fileno()) fh.close() os.link(dotsource, source) os.unlink(dotsource) try: s2 = os.stat(target) except OSError: print >> sys.stderr, "Hard linking: %s -> %s" % (source, target) os.link(source, target) continue for i in (stat.ST_INO, stat.ST_DEV): if s1[i] != s2[i]: os.unlink(target) print >> sys.stderr, "Hard linking: %s -> %s" % (source, target) os.link(source, target) break
def sameopenfile(a, b): """ Test whether two open file objects reference the same file. """ return os.fstat(a)[stat.ST_INO]==os.fstat(b)[stat.ST_INO] ## Path canonicalisation # 'user directory' is taken as meaning the User Root Directory, which is in # practice never used, for anything.
def emit(self, record): """ Emit a record. First check if the underlying file has changed, and if it has, close the old stream and reopen the file to get the current stream. """ # Reduce the chance of race conditions by stat'ing by path only # once and then fstat'ing our new fd if we opened a new log stream. # See issue #14632: Thanks to John Mulligan for the problem report # and patch. try: # stat the file by path, checking for existence sres = os.stat(self.baseFilename) except OSError as err: if err.errno == errno.ENOENT: sres = None else: raise # compare file system stat with that of our stream file handle if not sres or sres[ST_DEV] != self.dev or sres[ST_INO] != self.ino: if self.stream is not None: # we have an open file handle, clean it up self.stream.flush() self.stream.close() # open a new file handle and get new stat info from that fd self.stream = self._open() self._statstream() logging.FileHandler.emit(self, record)
def emit(self, record): """ Emit a record. First check if the underlying file has changed, and if it has, close the old stream and reopen the file to get the current stream. """ # Reduce the chance of race conditions by stat'ing by path only # once and then fstat'ing our new fd if we opened a new log stream. # See issue #14632: Thanks to John Mulligan for the problem report # and patch. try: # stat the file by path, checking for existence sres = os.stat(self.baseFilename) except FileNotFoundError: sres = None # compare file system stat with that of our stream file handle if not sres or sres[ST_DEV] != self.dev or sres[ST_INO] != self.ino: if self.stream is not None: # we have an open file handle, clean it up self.stream.flush() self.stream.close() self.stream = None # See Issue #21742: _open () might fail. # open a new file handle and get new stat info from that fd self.stream = self._open() self._statstream() logging.FileHandler.emit(self, record)
def delete_files_by_inode(inode_list, del_img_flag): """scan the downloaded list of files and delete any whose inode matches one in the list""" file_list = os.listdir(my_settings.get(SETTINGS_SECTION, 'iplayer_directory')) for file_name in sorted(file_list): full_file = os.path.join(my_settings.get(SETTINGS_SECTION, 'iplayer_directory'), file_name) if os.path.isfile(full_file): # need to check file exists in case its a jpg we already deleted file_stat = os.stat(full_file) #print 'considering file %s which has inode %d\n<br >' % (full_file, file_stat[stat.ST_INO], ) if str(file_stat[stat.ST_INO]) in inode_list: print 'file %s is being deleted \n<br >' % (full_file, ) try: os.remove(full_file) except OSError: # for some reason the above works but throws exception print 'error deleting %s\n<br />' % full_file if del_img_flag: file_prefix, _ignore = os.path.splitext(full_file) image_file_name = file_prefix + '.jpg' if os.path.isfile(image_file_name): print 'image %s is being deleted \n<br >' % image_file_name try: os.remove(image_file_name) except OSError: # for some reason the above works but throws exception print 'error deleting %s\n<br />' % (image_file_name, ) else: print 'there was no image file %s to be deleted\n<br >' % (image_file_name, ) #####################################################################################################################
def index_file(db, file_path, update = False): global _index_count if six.PY2: if not isinstance(file_path, unicode): # noqa file_path = unicode(file_path, encoding="UTF-8") # noqa con = db[0] # Bad symlinks etc. try: stat_res = os.stat(file_path) except OSError: if not update: print("%s: not found, skipping" % (file_path,)) return if not stat.S_ISREG(stat_res[stat.ST_MODE]): if not update: print("%s: not a plain file, skipping" % (file_path,)) return inode_mod = (stat_res[stat.ST_INO] * stat_res[stat.ST_MTIME]) % 2 ** 62 old_inode_mod = None old_md5 = None file_ = None with con: res = con.execute(_find_file, (file_path,)).fetchall() if res: file_ = res[0][0] old_inode_mod = res[0][1] old_md5 = res[0][2] if old_inode_mod != inode_mod: do_index, file_ = check_file( con, file_, file_path, inode_mod, old_md5, update ) if not do_index: return encoding = read_index(db, file_, file_path, update) con.execute(_update_file_info, (encoding, file_path)) else: if not update: print("%s: uptodate" % (file_path,)) with con: con.execute(_mark_found, (file_path,))
def emit(self, record): """ Emit a record. First check if the underlying file has changed, and if it has, close the old stream and reopen the file to get the current stream. """ # Reduce the chance of race conditions by stat'ing by path only # once and then fstat'ing our new fd if we opened a new log stream. # See issue #14632: Thanks to John Mulligan for the problem report # and patch. try: # stat the file by path, checking for existence sres = os.stat(self.baseFilename) except OSError as e: if e.errno == errno.ENOENT: sres = None else: raise # compare file system stat with that of our stream file handle if not sres or sres[ST_DEV] != self.dev or sres[ST_INO] != self.ino: # Fixed by xp 2017 Apr 03: # os.fstat still gets OSError(errno=2), although it operates # directly on fd instead of path. The same for stream.flush(). # Thus we keep on trying this close/open/stat loop until no # OSError raises. for ii in range(16): try: if self.stream is not None: # we have an open file handle, clean it up self.stream.flush() self.stream.close() # See Issue #21742: _open () might fail. self.stream = None # open a new file handle and get new stat info from # that fd self.stream = self._open() self._statstream() break except OSError as e: if e.errno == errno.ENOENT: continue else: raise logging.FileHandler.emit(self, record)
def make_file_dict_python(filename): """Create the data dictionary using a Python call to os.lstat We do this on Windows since Python's implementation is much better than the one in cmodule.c Eventually, we will move to using this on all platforms since CPUs have gotten much faster than they were when it was necessary to write cmodule.c """ try: statblock = os.lstat(filename) except os.error: return {'type':None} data = {} mode = statblock[stat.ST_MODE] if stat.S_ISREG(mode): type_ = 'reg' elif stat.S_ISDIR(mode): type_ = 'dir' elif stat.S_ISCHR(mode): type_ = 'dev' s = statblock.st_rdev data['devnums'] = ('c',) + (s >> 8, s & 0xff) elif stat.S_ISBLK(mode): type_ = 'dev' s = statblock.st_rdev data['devnums'] = ('b',) + (s >> 8, s & 0xff) elif stat.S_ISFIFO(mode): type_ = 'fifo' elif stat.S_ISLNK(mode): type_ = 'sym' data['linkname'] = os.readlink(filename) elif stat.S_ISSOCK(mode): type_ = 'sock' else: raise C.UnknownFileError(filename) data['type'] = type_ data['size'] = statblock[stat.ST_SIZE] data['perms'] = stat.S_IMODE(mode) data['uid'] = statblock[stat.ST_UID] data['gid'] = statblock[stat.ST_GID] data['inode'] = statblock[stat.ST_INO] data['devloc'] = statblock[stat.ST_DEV] data['nlink'] = statblock[stat.ST_NLINK] if os.name == 'nt': global type if type(filename) == unicode: attribs = win32file.GetFileAttributesW(filename) else: attribs = win32file.GetFileAttributes(filename) if attribs & winnt.FILE_ATTRIBUTE_REPARSE_POINT: data['type'] = 'sym' data['linkname'] = None if not (type_ == 'sym' or type_ == 'dev'): # mtimes on symlinks and dev files don't work consistently data['mtime'] = long(statblock[stat.ST_MTIME]) data['atime'] = long(statblock[stat.ST_ATIME]) data['ctime'] = long(statblock[stat.ST_CTIME]) return data
def page_transcode_inode(p_submit, p_inode, p_trnscd_cmd_method, p_trnscd_rez): """scan the downloaded list of files and transcode any whose inode matches one in the list""" print 'transcoding files with inode %s\n<br />' % (p_inode, ) if p_submit == '' or p_submit != 'Transcode' or p_trnscd_cmd_method == '': print '<p>Confirm transcode options:</p>' print '<form method="get" action="">' print '<select name="trnscd_cmd_method">\n' print ' <option value="trnscd_cmd_video">%s</option>' % (my_settings.get(SETTINGS_SECTION, 'trnscd_cmd_video'), ) print ' <option value="trnscd_cmd_audio">%s</option>' % (my_settings.get(SETTINGS_SECTION, 'trnscd_cmd_audio'), ) print '</select>\n<br />' print_select_resolution() print '<br />' print '<input type="hidden" name="inode" value="%s">' % (p_inode, ) print '<input type="hidden" name="page" value="transcode_inode">' print '<input type="submit" name="submit" value="Transcode">' print '</form>' else: file_list = os.listdir(my_settings.get(SETTINGS_SECTION, 'iplayer_directory')) for file_name in sorted(file_list): full_file_name = os.path.join(my_settings.get(SETTINGS_SECTION, 'iplayer_directory'), file_name) file_root, file_extension = os.path.splitext(full_file_name) file_root = file_root.replace('original', 'transcoded') file_root = file_root.replace('default', 'transcoded') file_root = file_root.replace('editorial', 'transcoded') file_stat = os.stat(full_file_name) #print 'considering file %s which has inode %d\n<br >' % (full_file_name, file_stat[stat.ST_INO], ) if str(file_stat[stat.ST_INO]) == p_inode: rezopts = '' fnameadd = '' if p_trnscd_rez != '' and p_trnscd_rez != 'original': rezopts = ' -s %s' % (p_trnscd_rez, ) fnameadd = '-%s' % (p_trnscd_rez, ) #full_file_mp4 = '%s.mp4' % ( os.path.splitext(full_file_name)[0], ) cmd = "%s %s %s %s%s.%s 2>&1" % (my_settings.get(SETTINGS_SECTION, p_trnscd_cmd_method), rezopts, full_file_name, file_root, fnameadd, TRANSCODE_COMMANDS[p_trnscd_cmd_method]['outext'], ) #print 'file %s is being transcoded with command %s\n<br ><pre>\n' % (full_file_name, cmd, ) print 'file transcoding<pre>\n%s\n' % (cmd, ) sys.stdout.flush() sys.stdout = os.fdopen(sys.stdout.fileno(), 'w', 0) # capture stdout sys.stderr = os.fdopen(sys.stderr.fileno(), 'w', 0) # and stderr os.system(cmd) print '</pre>' print '<p>Finished!</p>' #####################################################################################################################
def run(self): """At the end of the install function, we need to rename some files because distutils provides no way to rename files as they are placed in their install locations. """ _install.run(self) for o_src, o_dest in hardlink_modules: for e in [".py", ".pyc"]: src = util.change_root(self.root_dir, o_src + e) dest = util.change_root( self.root_dir, o_dest + e) if ostype == "posix": if os.path.exists(dest) and \ os.stat(src)[stat.ST_INO] != \ os.stat(dest)[stat.ST_INO]: os.remove(dest) file_util.copy_file(src, dest, link="hard", update=1) else: file_util.copy_file(src, dest, update=1) # XXX Uncomment it when we need to deliver python 3.4 version # of modules. # Don't install the scripts for python 3.4. if py_version == '3.4': return for d, files in six.iteritems(scripts[osname]): for (srcname, dstname) in files: dst_dir = util.change_root(self.root_dir, d) dst_path = util.change_root(self.root_dir, os.path.join(d, dstname)) dir_util.mkpath(dst_dir, verbose=True) file_util.copy_file(srcname, dst_path, update=True) # make scripts executable os.chmod(dst_path, os.stat(dst_path).st_mode | stat.S_IXUSR | stat.S_IXGRP | stat.S_IXOTH) for target, o_dest in symlink_modules: dest = util.change_root(self.root_dir, o_dest) dir_util.mkpath(os.path.dirname(dest), verbose=True) try: os.unlink(dest) except Exception: pass os.symlink(target, dest)