我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用os.utime()。
def __setitem__(self, key, message): """Replace the keyed message; raise KeyError if it doesn't exist.""" old_subpath = self._lookup(key) temp_key = self.add(message) temp_subpath = self._lookup(temp_key) if isinstance(message, MaildirMessage): # temp's subdir and suffix were specified by message. dominant_subpath = temp_subpath else: # temp's subdir and suffix were defaults from add(). dominant_subpath = old_subpath subdir = os.path.dirname(dominant_subpath) if self.colon in dominant_subpath: suffix = self.colon + dominant_subpath.split(self.colon)[-1] else: suffix = '' self.discard(key) new_path = os.path.join(self._path, subdir, key + suffix) os.rename(os.path.join(self._path, temp_subpath), new_path) if isinstance(message, MaildirMessage): os.utime(new_path, (os.path.getatime(new_path), message.get_date()))
def _caches_to_file(cache_path, start, end, name, cb, concat): start_time = time() if concat: all_data = [] for i in range(start, end): data = load(os.path.join(cache_path, "{0}.jb".format(i))) all_data.extend(data) dump(all_data, name, 3) else: target_path = os.path.join(cache_path, name[:-3]) if not os.path.exists(target_path): os.makedirs(target_path) for i in range(start, end): src_file_path = os.path.join(cache_path, "{0}.jb".format(i)) basename = os.path.basename(src_file_path) target_file_path = os.path.join(target_path, basename) shutil.move(src_file_path, target_file_path) finished_flag = os.path.join(target_path, '.finished') with open(finished_flag, 'a'): os.utime(finished_flag, None) logging.debug("Finished saving data to {0}. Took {1}s".format(name, time()-start_time)) cb()
def run_if_needed(base_file, then_file, now_file): # Python uses doubles for mtime so it can't precisely represent linux's # nanosecond precision. Round up to next whole second to ensure we get a # stable timestamp that is guaranteed to be >= the timestamp of the # compiler. This also avoids issues if the compiler is on a file system # with high-precision timestamps, but the build directory isn't. base_stat = os.stat(base_file) mtime = math.ceil(base_stat.st_mtime) atime = math.ceil(base_stat.st_atime) if (os.path.exists(then_file) and os.path.exists(now_file) and os.stat(then_file).st_mtime == mtime): return # Don't need to do anything. createIfNeeded(now_file) os.utime(now_file, None) # None means now createIfNeeded(then_file) os.utime(then_file, (atime, mtime))
def test_file_save_outside_roamer(self): digest = self.session.get_digest('egg.txt') path = os.path.join(TEST_DIR, 'egg.txt') with open(path, 'a') as egg_file: egg_file.write(' extra content') os.utime(path, (1330712280, 1330712292)) self.session.process() second_session = MockSession(DOC_DIR) second_session.add_entry('egg.txt', digest) second_session.process() path = os.path.join(DOC_DIR, 'egg.txt') self.assertTrue(os.path.exists(path)) with open(path, 'r') as egg_file: self.assertEqual(egg_file.read(), 'egg file content extra content')
def copystat(cls, src, dest, copy_own=True, copy_xattr=True): """ Copy all stat info (mode bits, atime, mtime, flags) from `src` to `dest`. If `copy_own=True`, the uid and gid are also copied. If `copy_xattr=True`, the extended attributes are also copied (only available on Linux). """ st = os.stat(src) mode = stat.S_IMODE(st.st_mode) os.chmod(dest, mode=mode) os.utime(dest, ns=(st.st_atime_ns, st.st_mtime_ns)) if hasattr(st, "st_flags"): os.chflags(dest, flags=st.st_flags) if copy_own: os.chown(dest, uid=st.st_uid, gid=st.st_gid) if copy_xattr: cls.copyxattr(src, dest)
def copystat(src, dst): """Copy all stat info (mode bits, atime, mtime, flags) from src to dst""" st = os.stat(src) mode = stat.S_IMODE(st.st_mode) if hasattr(os, 'utime'): os.utime(dst, (st.st_atime, st.st_mtime)) if hasattr(os, 'chmod'): os.chmod(dst, mode) if hasattr(os, 'chflags') and hasattr(st, 'st_flags'): try: os.chflags(dst, st.st_flags) except OSError, why: for err in 'EOPNOTSUPP', 'ENOTSUP': if hasattr(errno, err) and why.errno == getattr(errno, err): break else: raise
def blankAFile(file_path): ''' truncate a file to zero bytes, and preserve its original modification time Adapted from 'Keeping Large intermediate files' (http://www.ruffus.org.uk/faq.html) :param file: Input file path :return: None ''' if os.path.exists(file_path): timeInfo = os.stat(file_path) # retrieve current time stamp of the file try: f = open(file_path,'w') except IOError: pass else: f.truncate(0) f.close() # change the time of the file back to what it was os.utime(file_path,(timeInfo.st_atime, timeInfo.st_mtime)) print file_path + ' blanked to save disk-space.' else: print 'blankAFile: ' + file_path + ' not found.' sys.exit(1)
def try_utime(self, filename, last_modified_hdr): """Try to set the last-modified time of the given file.""" if last_modified_hdr is None: return if not os.path.isfile(encodeFilename(filename)): return timestr = last_modified_hdr if timestr is None: return filetime = timeconvert(timestr) if filetime is None: return filetime # Ignore obviously invalid dates if filetime == 0: return try: os.utime(filename, (time.time(), filetime)) except Exception: pass return filetime
def handle(self): local_item_tmp_path = self.local_parent_path + get_tmp_filename(self.item_name) try: with open(local_item_tmp_path, 'wb') as f: self.drive.download_file(file=f, size=self._item.size, item_id=self._item.id) local_sha1 = hasher.hash_value(local_item_tmp_path) item_sha1 = None if self._item.file_props is not None and self._item.file_props.hashes is not None: item_sha1 = self._item.file_props.hashes.sha1 if item_sha1 is None: self.logger.warn('Remote file %s has not sha1 property, we keep the file but cannot check correctness of it', self.local_path) elif local_sha1 != item_sha1: self.logger.error('Mismatch hash of download file %s : remote:%s,%d local:%s %d', self.local_path, self._item.file_props.hashes.sha1, self._item.size, local_sha1, os.path.getsize(local_item_tmp_path)) return os.rename(local_item_tmp_path, self.local_path) t = datetime_to_timestamp(self._item.modified_time) os.utime(self.local_path, (t, t)) os.chown(self.local_path, OS_USER_ID, OS_USER_GID) self.items_store.update_item(self._item, ItemRecordStatuses.DOWNLOADED) except (IOError, OSError) as e: self.logger.error('An IO error occurred when downloading "%s":\n%s.', self.local_path, traceback.format_exc()) except errors.OneDriveError as e: self.logger.error('An API error occurred when downloading "%s":\n%s.', self.local_path, traceback.format_exc())
def Main(): parser = argparse.ArgumentParser() parser.add_argument('files', nargs='+') parser.add_argument('-f', '--force', action='store_true', help="don't err on missing") parser.add_argument('--stamp', required=True, help='touch this file') args = parser.parse_args() for f in args.files: try: os.remove(f) except OSError: if not args.force: print >>sys.stderr, "'%s' does not exist" % f return 1 with open(args.stamp, 'w'): os.utime(args.stamp, None) return 0
def setUp(self): super(HeatConfigTest, self).setUp() self.fake_hook_path = self.relative_path(__file__, 'hook-fake.py') self.heat_config_path = self.relative_path( __file__, '..', 'heat-config/os-refresh-config/configure.d/55-heat-config') self.hooks_dir = self.useFixture(fixtures.TempDir()) self.deployed_dir = self.useFixture(fixtures.TempDir()) with open(self.fake_hook_path) as f: fake_hook = f.read() for hook in self.fake_hooks: hook_name = self.hooks_dir.join(hook) with open(hook_name, 'w') as f: os.utime(hook_name, None) f.write(fake_hook) f.flush() os.chmod(hook_name, 0o755) self.env = os.environ.copy()
def setUp(self): super(HeatConfigDockerComposeORCTest, self).setUp() self.fake_hook_path = self.relative_path(__file__, 'hook-fake.py') self.heat_config_docker_compose_path = self.relative_path( __file__, '..', 'heat-config-docker-compose/os-refresh-config/configure.d/' '50-heat-config-docker-compose') self.docker_compose_dir = self.useFixture(fixtures.TempDir()) with open(self.fake_hook_path) as f: fake_hook = f.read() for hook in self.fake_hooks: hook_name = self.docker_compose_dir.join(hook) with open(hook_name, 'w') as f: os.utime(hook_name, None) f.write(fake_hook) f.flush() os.chmod(hook_name, 0o755)
def setUp(self): super(HeatConfigKubeletORCTest, self).setUp() self.fake_hook_path = self.relative_path(__file__, 'hook-fake.py') self.heat_config_kubelet_path = self.relative_path( __file__, '..', 'heat-config-kubelet/os-refresh-config/configure.d/' '50-heat-config-kubelet') self.manifests_dir = self.useFixture(fixtures.TempDir()) with open(self.fake_hook_path) as f: fake_hook = f.read() for hook in self.fake_hooks: hook_name = self.manifests_dir.join(hook) with open(hook_name, 'w') as f: os.utime(hook_name, None) f.write(fake_hook) f.flush() os.chmod(hook_name, 0o755)
def execute_run_box_basics(output="default"): if os.path.exists(DEFAULT_LOCAL_FRECKLES_BOX_BASICS_MARKER): return {"return_code": -1} task_config = [{'tasks': ['makkus.box-basics'] }] result = create_and_run_nsbl_runner(task_config, task_metadata={}, output_format=output, ask_become_pass=True, run_box_basics=False) if result["return_code"] == 0: with open(DEFAULT_LOCAL_FRECKLES_BOX_BASICS_MARKER, 'a'): os.utime(DEFAULT_LOCAL_FRECKLES_BOX_BASICS_MARKER, None) return result
def touch(path, times=None, dirs=False): ''' perform UNIX touch with extra based on: http://stackoverflow.com/questions/12654772/create-empty-file-using-python Args: path: file path to touch times: a 2-tuple of the form (atime, mtime) where each member is an int or float expressing seconds. defaults to current time. dirs: is set, create folders if not exists ''' if dirs: basedir = os.path.dirname(path) if not os.path.exists(basedir): os.makedirs(basedir) with open(path, 'a'): os.utime(path, times=times)
def touch(self, mode=0o666, exist_ok=True): """ Create this file with the given access mode, if it doesn't exist. """ if self._closed: self._raise_closed() if exist_ok: # First try to bump modification time # Implementation note: GNU touch uses the UTIME_NOW option of # the utimensat() / futimens() functions. try: self._accessor.utime(self, None) except OSError: # Avoid exception chaining pass else: return flags = os.O_CREAT | os.O_WRONLY if not exist_ok: flags |= os.O_EXCL fd = self._raw_open(flags, mode) os.close(fd)
def run(self): if not self.has_npm(): log.error("`npm` unavailable. If you're running this command using sudo, make sure `npm` is available to sudo") else: env = os.environ.copy() env['PATH'] = npm_path log.info("Installing build dependencies with npm. This may take a while...") check_call(["npm", "install"], cwd=jsroot, stdout=sys.stdout, stderr=sys.stderr, **prckws) os.utime(node_modules, None) for t in self.targets: if not os.path.exists(t): msg = "Missing file: %s" % t if not self.has_npm(): msg += "\nnpm is required to build a development version of jupyter-" + name raise ValueError(msg) # update package data in case this created new files update_package_data(self.distribution)
def set_file_utime(filename, desired_time): """ Set the utime of a file, and if it fails, raise a more explicit error. :param filename: the file to modify :param desired_time: the epoch timestamp to set for atime and mtime. :raises: SetFileUtimeError: if you do not have permission (errno 1) :raises: OSError: for all errors other than errno 1 """ try: os.utime(filename, (desired_time, desired_time)) except OSError as e: # Only raise a more explicit exception when it is a permission issue. if e.errno != errno.EPERM: raise e raise SetFileUtimeError( ("The file was downloaded, but attempting to modify the " "utime of the file failed. Is the file owned by another user?"))
def check_for_update(): if os.path.exists(FILE_UPDATE): mtime = os.path.getmtime(FILE_UPDATE) last = datetime.utcfromtimestamp(mtime).strftime('%Y-%m-%d') today = datetime.utcnow().strftime('%Y-%m-%d') if last == today: return try: with open(FILE_UPDATE, 'a'): os.utime(FILE_UPDATE, None) request = urllib2.Request( CORE_VERSION_URL, urllib.urlencode({'version': main.__version__}), ) response = urllib2.urlopen(request) with open(FILE_UPDATE, 'w') as update_json: update_json.write(response.read()) except (urllib2.HTTPError, urllib2.URLError): pass
def main(reactor, *args): client = http.HTTPClient() fname = os.path.join(tmp, str(uuid.uuid4())) yield httpRequest(client._agent, URI, method='GET', saveto=fname) filesize = os.path.getsize(fname) assert filesize > 1 # touch file to 5 minutes in the past past = int(os.path.getmtime(fname)) - 300 print "PAST MTIME", past os.utime(fname, (past, past)) assert os.path.getmtime(fname) == past yield httpRequest(client._agent, URI, method='GET', saveto=fname) # it was not modified current = os.path.getmtime(fname) print "CURRENT MTIME", current assert int(current) == past print 'OK' shutil.rmtree(tmp)
def _create_dummy_file(self, base, size): filepath = '{base}/{name}.dat'.format(base=base, name=self._gen_rand_str(5)) with open(filepath, 'wb') as f: f.truncate(size) # Change file creation and modification to random time atime, mtime = self.get_rand_time_pair() os.utime(filepath, times=(atime, mtime), follow_symlinks=False) # Change st_birthtime on MacOS if platform == 'darwin': created_str = datetime.fromtimestamp(atime).strftime('%m/%d/%Y %H:%M:%S') # subprocess.run with list arguments and shell=False behaves very strange # and sets its st_birthtime to earlier than given timestamp very weirdly command = 'SetFile -d "{0}" {1}'.format(created_str, filepath) process = subprocess.Popen(command, shell=True, close_fds=True, stderr=subprocess.PIPE, stdout=subprocess.PIPE) self._tasks_birthtime_flags.append(process) self._total_size += size return filepath, atime, mtime, os.stat(filepath).st_ctime
def extractall(self, path=".", members=None): """Extract all members from the archive to the current working directory and set owner, modification time and permissions on directories afterwards. `path' specifies a different directory to extract to. `members' is optional and must be a subset of the list returned by getmembers(). """ directories = [] if members is None: members = self for tarinfo in members: if tarinfo.isdir(): # Extract directories with a safe mode. directories.append(tarinfo) tarinfo = copy.copy(tarinfo) tarinfo.mode = 0o700 # Do not set_attrs directories, as we will do that further down self.extract(tarinfo, path, set_attrs=not tarinfo.isdir()) # Reverse sort directories. directories.sort(key=lambda a: a.name) directories.reverse() # Set correct owner, mtime and filemode on directories. for tarinfo in directories: dirpath = os.path.join(path, tarinfo.name) try: self.chown(tarinfo, dirpath) self.utime(tarinfo, dirpath) self.chmod(tarinfo, dirpath) except ExtractError as e: if self.errorlevel > 1: raise else: self._dbg(1, "tarfile: %s" % e)
def utime(self, tarinfo, targetpath): """Set modification time of targetpath according to tarinfo. """ if not hasattr(os, 'utime'): return try: os.utime(targetpath, (tarinfo.mtime, tarinfo.mtime)) except EnvironmentError as e: raise ExtractError("could not change modification time") #--------------------------------------------------------------------------
def copystat(src, dst): """Copy all stat info (mode bits, atime, mtime, flags) from src to dst""" st = os.stat(src) mode = stat.S_IMODE(st.st_mode) if hasattr(os, 'utime'): os.utime(dst, (st.st_atime, st.st_mtime)) if hasattr(os, 'chmod'): os.chmod(dst, mode) if hasattr(os, 'chflags') and hasattr(st, 'st_flags'): try: os.chflags(dst, st.st_flags) except OSError as why: if (not hasattr(errno, 'EOPNOTSUPP') or why.errno != errno.EOPNOTSUPP): raise
def extractall(self, path=".", members=None, *, numeric_owner=False): """Extract all members from the archive to the current working directory and set owner, modification time and permissions on directories afterwards. `path' specifies a different directory to extract to. `members' is optional and must be a subset of the list returned by getmembers(). If `numeric_owner` is True, only the numbers for user/group names are used and not the names. """ directories = [] if members is None: members = self for tarinfo in members: if tarinfo.isdir(): # Extract directories with a safe mode. directories.append(tarinfo) tarinfo = copy.copy(tarinfo) tarinfo.mode = 0o700 # Do not set_attrs directories, as we will do that further down self.extract(tarinfo, path, set_attrs=not tarinfo.isdir(), numeric_owner=numeric_owner) # Reverse sort directories. directories.sort(key=lambda a: a.name) directories.reverse() # Set correct owner, mtime and filemode on directories. for tarinfo in directories: dirpath = os.path.join(path, tarinfo.name) try: self.chown(tarinfo, dirpath, numeric_owner=numeric_owner) self.utime(tarinfo, dirpath) self.chmod(tarinfo, dirpath) except ExtractError as e: if self.errorlevel > 1: raise else: self._dbg(1, "tarfile: %s" % e)
def utime(self, tarinfo, targetpath): """Set modification time of targetpath according to tarinfo. """ if not hasattr(os, 'utime'): return try: os.utime(targetpath, (tarinfo.mtime, tarinfo.mtime)) except OSError: raise ExtractError("could not change modification time") #--------------------------------------------------------------------------
def extractall(self, path=".", members=None): """Extract all members from the archive to the current working directory and set owner, modification time and permissions on directories afterwards. `path' specifies a different directory to extract to. `members' is optional and must be a subset of the list returned by getmembers(). """ directories = [] if members is None: members = self for tarinfo in members: if tarinfo.isdir(): # Extract directories with a safe mode. directories.append(tarinfo) tarinfo = copy.copy(tarinfo) tarinfo.mode = 0700 self.extract(tarinfo, path) # Reverse sort directories. directories.sort(key=operator.attrgetter('name')) directories.reverse() # Set correct owner, mtime and filemode on directories. for tarinfo in directories: dirpath = os.path.join(path, tarinfo.name) try: self.chown(tarinfo, dirpath) self.utime(tarinfo, dirpath) self.chmod(tarinfo, dirpath) except ExtractError, e: if self.errorlevel > 1: raise else: self._dbg(1, "tarfile: %s" % e)
def utime(self, tarinfo, targetpath): """Set modification time of targetpath according to tarinfo. """ if not hasattr(os, 'utime'): return try: os.utime(targetpath, (tarinfo.mtime, tarinfo.mtime)) except EnvironmentError, e: raise ExtractError("could not change modification time") #--------------------------------------------------------------------------
def add(self, message): """Add message and return assigned key.""" tmp_file = self._create_tmp() try: self._dump_message(message, tmp_file) except BaseException: tmp_file.close() os.remove(tmp_file.name) raise _sync_close(tmp_file) if isinstance(message, MaildirMessage): subdir = message.get_subdir() suffix = self.colon + message.get_info() if suffix == self.colon: suffix = '' else: subdir = 'new' suffix = '' uniq = os.path.basename(tmp_file.name).split(self.colon)[0] dest = os.path.join(self._path, subdir, uniq + suffix) try: if hasattr(os, 'link'): os.link(tmp_file.name, dest) os.remove(tmp_file.name) else: os.rename(tmp_file.name, dest) except OSError, e: os.remove(tmp_file.name) if e.errno == errno.EEXIST: raise ExternalClashError('Name clash with existing message: %s' % dest) else: raise if isinstance(message, MaildirMessage): os.utime(dest, (os.path.getatime(dest), message.get_date())) return uniq
def setModificationTime(self): """ Set the UFO modification time to the current time. This is never called automatically. It is up to the caller to call this when finished working on the UFO. """ os.utime(self._path, None) # metainfo.plist
def touch(self, path): if self.touch_only.value: os.utime(path, None) else: with open(path) as fd: content = fd.read() content = "#line 1\n" + content with open(path, "w") as fd: fd.write(content)
def touch(filename): with open(filename, 'a'): os.utime(filename, None)#TODO: evtl None als 2. param
def redate_by_exif(fn): """reads EXIF from jpg/jpeg and if file datetime differs from EXIF changes file date""" global ALL_CNT, CHANGED_CNT ALL_CNT += 1 exif_time = None s = os.stat(fn) file_time = s[8] if DEBUG: print(fn) try: exif_time = get_exif_date_pil(fn) except: s1 = traceback.format_exc() try: exif_time = get_exif_date_exif(fn) except: s2 = traceback.format_exc() print('Something is terribly wrong! Both PIL and exifread raises exception') print('-' * 20) print(s1) print('-' * 20) print(s2) print('-' * 20) print('-' * 20) if exif_time: dir_n = time.strftime("%Y/%Y_%m_%d", time.gmtime(exif_time)) try: os.makedirs(dir_n) except: pass secs_diff = file_time - exif_time print("%s %s -> %s (%s)" % (fn, show_fdt(file_time), show_fdt(exif_time), dir_n)) if secs_diff > MAX_DIFF or secs_diff < -MAX_DIFF: os.utime(fn, (exif_time, exif_time)) CHANGED_CNT += 1 shutil.move(fn, dir_n)
def touch(path): import os, time now = time.time() try: # assume it's there os.utime(path, (now, now)) except os.error: # if it isn't, try creating the directory, # a file with that name os.makedirs(os.path.dirname(path)) open(path, "w").close() os.utime(path, (now, now))
def utimens(self, path, times=None): return os.utime(self._full_path(path), times) # File methods # ============