我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用os.replace()。
def __save(self): if self.__asynchronous == 0: state = { "version" : _BobState.CUR_VERSION, "byNameDirs" : self.__byNameDirs, "results" : self.__results, "inputs" : self.__inputs, "jenkins" : self.__jenkins, "dirStates" : self.__dirStates, "buildState" : self.__buildState, } tmpFile = self.__path+".new" try: with open(tmpFile, "wb") as f: pickle.dump(state, f) f.flush() os.fsync(f.fileno()) os.replace(tmpFile, self.__path) except OSError as e: raise ParseError("Error saving workspace state: " + str(e)) self.__dirty = False else: self.__dirty = True
def save_json(self, filename, data): """Atomically saves json file""" rnd = randint(1000, 9999) path, ext = os.path.splitext(filename) tmp_file = "{}-{}.tmp".format(path, rnd) self._save_json(tmp_file, data) try: self._read_json(tmp_file) except json.decoder.JSONDecodeError: self.logger.exception("Attempted to write file {} but JSON " "integrity check on tmp file has failed. " "The original file is unaltered." "".format(filename)) return False os.replace(tmp_file, filename) return True
def zip_file(self): amount = functions.folder_files(self.parent.directory) progress = dialog.ProgressWindow(self.parent, title="Zipping Pack", maximum=amount) count = 0 with zipfile.ZipFile(self.parent.directory + ".zip", "w") as z: for root, dirs, files in os.walk(self.parent.directory.replace("\\", "/"), topdown=True): new_root = root.replace("\\", "/").split("/") # print(root, dirs, files) for name in files: z.write(os.path.join(root, name), "/".join(new_root[new_root.index(self.parent.directory.split("/")[-1]) + 1:]) + "/" + name) count += 1 progress.variable_name.set("Current File: " + name) progress.variable_percent.set("{}% Complete".format(round(100 * float(count) / float(amount)))) progress.variable_progress.set(progress.variable_progress.get() + 1) z.close() progress.destroy() messagebox.showinfo(title="Information", message="Zipping complete.")
def __exit__(self, exc_type, exc_value, tback): # Pass to tempfile, which also closes(). temp_path = self.temp.name self.temp.__exit__(exc_type, exc_value, tback) self.temp = None if exc_type is not None: # An exception occurred, clean up. try: _os.remove(temp_path) except FileNotFoundError: pass else: # No exception, commit changes _os.replace(temp_path, self.filename) return False # Don't cancel the exception. # Import these, so people can reference 'srctools.Vec' instead of # 'srctools.vec.Vec'. # Should be done after other code, so everything's initialised. # Not all classes are imported, just most-used ones.
def temp_move_path(path, d): if os.path.exists(path): dst = shutil.move(path, d) try: yield dst finally: try: os.replace(dst, path) except OSError: # no cov shutil.move(dst, path) else: try: yield finally: remove_path(path)
def _run_fileset(self, env, file_mapper): stash_dir = env.format('{root_dir}/.nimp/stash') nimp.system.safe_makedirs(stash_dir) stash_file = os.path.join(stash_dir, env.fileset) nimp.system.safe_delete(stash_file) with open(stash_file, 'w') as stash: for src, _ in file_mapper(): src = nimp.system.sanitize_path(src) if not os.path.isfile(src): continue if src.endswith('.stash'): continue md5 = hashlib.md5(src.encode('utf8')).hexdigest() os.replace(src, os.path.join(stash_dir, md5)) logging.info('Stashing %s as %s', src, md5) stash.write('%s %s\n' % (md5, src)) return True
def _run_fileset(self, env, file_mapper): stash_dir = env.format('{root_dir}/.nimp/stash') stash_file = os.path.join(stash_dir, env.fileset) success = True with open(stash_file, 'r') as stash: for dst in stash.readlines(): try: md5, dst = dst.strip().split() src = os.path.join(stash_dir, md5) logging.info('Unstashing %s as %s', md5, dst) nimp.system.safe_delete(dst) os.replace(src, dst) except Exception as ex: #pylint: disable=broad-except logging.error(ex) success = False nimp.system.safe_delete(stash_file) return success
def atomic_write(filepath, binary=False, fsync=False): """ Writeable file object that atomically updates a file (using a temporary file). In some cases (namely Python < 3.3 on Windows), this could result in an existing file being temporarily unlinked. :param filepath: the file path to be opened :param binary: whether to open the file in a binary mode instead of textual :param fsync: whether to force write the file to disk """ tmppath = filepath + '~' while os.path.isfile(tmppath): tmppath += '~' try: with open(tmppath, 'wb' if binary else 'w') as file: yield file if fsync: file.flush() os.fsync(file.fileno()) replace(tmppath, filepath) finally: try: os.remove(tmppath) except (IOError, OSError): pass
def rename(srcpath, destpath, data): import glob import os episodes = sorted(data.keys()) name_pattern = '{date} Showroom - AKB48 no Myonichi Yoroshiku! #{ep} ({name}).mp4' long_date_pattern = '20{}-{}-{}' for file in glob.glob('{}/*.mp4'.format(srcpath)): match = file_re.match(os.path.basename(file)) date = match.groupdict()['date'] long_date = long_date_pattern.format(*[date[i:i+2] for i in range(0, 6, 2)]) new_file = name_pattern.format( date=date, ep=episodes.index(long_date)+1, name=data[long_date]['engName'], ) os.replace( file, '{}/{}'.format(destpath, new_file) )
def prune_folder(folder, needed_list): oldcwd = os.getcwd() os.chdir(folder) os.makedirs('unneeded'.format(folder), exist_ok=True) files = glob.glob('*.mp4'.format(folder)) for file in files: # print(repr(file)) if file not in needed_list: # print('{} -> {}'.format(file, 'unneeded/{}'.format(file))) os.replace(file, 'unneeded/{}'.format(file)) else: # print('Needed:', repr(file)) pass os.chdir(oldcwd)
def update_streaming_url_web(self): """Updates streaming urls from the showroom website. Fallback if api changes again""" r = self._session.get(self._room.long_url) if r.ok: match = hls_url_re1.search(r.text) # TODO: check if there was a match if not match: # no url found in the page # probably the stream has ended but is_live returned true # just don't update the urls # except what happens if they are still "" ? return hls_url = match.group(0) rtmps_url = match.group(1).replace('https', 'rtmps') rtmp_url = "rtmp://{}.{}.{}.{}:1935/liveedge/{}".format(*match.groups()[1:]) with self._lock: self._rtmp_url = rtmp_url self._hls_url = hls_url self._rtmps_url = rtmps_url
def concurrency_safe_rename(src, dst): """Renames ``src`` into ``dst`` overwriting ``dst`` if it exists. On Windows os.replace (or for Python 2.7 its implementation through MoveFileExW) can yield permission errors if executed by two different processes. """ max_sleep_time = 1 total_sleep_time = 0 sleep_time = 0.001 while total_sleep_time < max_sleep_time: try: replace(src, dst) break except Exception as exc: if getattr(exc, 'winerror', None) == error_access_denied: time.sleep(sleep_time) total_sleep_time += sleep_time sleep_time *= 2 else: raise else: raise
def replace(src, dst): # argument names match stdlib docs, docstring below try: # ReplaceFile fails if the dest file does not exist, so # first try to rename it into position os.rename(src, dst) return except WindowsError as we: if we.errno == errno.EEXIST: pass # continue with the ReplaceFile logic below else: raise src = path_to_unicode(src) dst = path_to_unicode(dst) res = _ReplaceFile(c_wchar_p(dst), c_wchar_p(src), None, 0, None, None) if not res: raise OSError('failed to replace %r with %r' % (dst, src)) return
def close(self): try: if self.__inFile: self.__inFile.close() if self.__outFile: self.__outFile.close() os.replace(self.__outFile.name, self.__cachePath) except OSError as e: raise BuildError("Error closing hash cache: " + str(e))
def __generatePackages(self, nameFormatter, env, cacheKey, sandboxEnabled): # use separate caches with and without sandbox if sandboxEnabled: cacheName = ".bob-packages-sb.pickle" else: cacheName = ".bob-packages.pickle" # try to load the persisted packages states = { n:s() for (n,s) in self.__states.items() } rootPkg = Package() rootPkg.construct("<root>", [], nameFormatter, None, [], [], states, {}, {}, None, None, [], {}, -1) try: with open(cacheName, "rb") as f: persistedCacheKey = f.read(len(cacheKey)) if cacheKey == persistedCacheKey: tmp = PackageUnpickler(f, self.getRecipe, self.__plugins, nameFormatter).load() return tmp.toStep(nameFormatter, rootPkg).getPackage() except (EOFError, OSError, pickle.UnpicklingError): pass # not cached -> calculate packages result = self.__rootRecipe.prepare(nameFormatter, env, sandboxEnabled, states)[0] # save package tree for next invocation tmp = CoreStepRef(rootPkg, result.getPackageStep()) try: newCacheName = cacheName + ".new" with open(newCacheName, "wb") as f: f.write(cacheKey) PackagePickler(f, nameFormatter).dump(tmp) os.replace(newCacheName, cacheName) except OSError as e: print("Error saving internal state:", str(e), file=sys.stderr) return result
def _make_text_stream(stream, encoding, errors): if encoding is None: encoding = get_best_encoding(stream) if errors is None: errors = 'replace' return _NonClosingTextIOWrapper(stream, encoding, errors, line_buffering=True)
def filename_to_ui(value): if isinstance(value, bytes): value = value.decode(get_filesystem_encoding(), 'replace') return value
def _force_correct_text_reader(text_reader, encoding, errors): if _is_binary_reader(text_reader, False): binary_reader = text_reader else: # If there is no target encoding set, we need to verify that the # reader is not actually misconfigured. if encoding is None and not _stream_is_misconfigured(text_reader): return text_reader if _is_compatible_text_stream(text_reader, encoding, errors): return text_reader # If the reader has no encoding, we try to find the underlying # binary reader for it. If that fails because the environment is # misconfigured, we silently go with the same reader because this # is too common to happen. In that case, mojibake is better than # exceptions. binary_reader = _find_binary_reader(text_reader) if binary_reader is None: return text_reader # At this point, we default the errors to replace instead of strict # because nobody handles those errors anyways and at this point # we're so fundamentally fucked that nothing can repair it. if errors is None: errors = 'replace' return _make_text_stream(binary_reader, encoding, errors)
def _force_correct_text_writer(text_writer, encoding, errors): if _is_binary_writer(text_writer, False): binary_writer = text_writer else: # If there is no target encoding set, we need to verify that the # writer is not actually misconfigured. if encoding is None and not _stream_is_misconfigured(text_writer): return text_writer if _is_compatible_text_stream(text_writer, encoding, errors): return text_writer # If the writer has no encoding, we try to find the underlying # binary writer for it. If that fails because the environment is # misconfigured, we silently go with the same writer because this # is too common to happen. In that case, mojibake is better than # exceptions. binary_writer = _find_binary_writer(text_writer) if binary_writer is None: return text_writer # At this point, we default the errors to replace instead of strict # because nobody handles those errors anyways and at this point # we're so fundamentally fucked that nothing can repair it. if errors is None: errors = 'replace' return _make_text_stream(binary_writer, encoding, errors)
def filename_to_ui(value): if isinstance(value, bytes): value = value.decode(get_filesystem_encoding(), 'replace') else: value = value.encode('utf-8', 'surrogateescape') \ .decode('utf-8', 'replace') return value
def close(self): if self.directory_real: # print(self.directory, self.directory_real) with zipfile.ZipFile(self.directory_real, "w") as z: for root, dirs, files in os.walk(self.directory.replace("\\", "/"), topdown=True): new_root = root.replace("\\", "/").split("/") # print(root, dirs, files) for name in files: z.write(os.path.join(root, name), "/".join(new_root[new_root.index( self.directory.replace("\\", "/").split("/")[-1]) + 1:]) + "/" + name) self.d.cleanup() self.destroy()
def replace_file(self): old_file = self.widget_tree.item(self.widget_tree.focus())["tags"][0] file = filedialog.askopenfile() if file: new_file = file.name # os.replace(new_file, old_file) shutil.copy2(new_file, old_file) self.cmd.tree_refresh()
def force_rename(oldpath, newpath): """ Move the file at ``oldpath`` to ``newpath``, deleting ``newpath`` beforehand if necessary """ if hasattr(os, 'replace'): # Python 3.3+ os.replace(oldpath, newpath) else: if sys.platform.startswith('win'): try_unlink(newpath) os.rename(oldpath, newpath)
def parse_parts(self, parts): if six.PY2: parts = _py2_fsencode(parts) parsed = [] sep = self.sep altsep = self.altsep drv = root = '' it = reversed(parts) for part in it: if not part: continue if altsep: part = part.replace(altsep, sep) drv, root, rel = self.splitroot(part) if sep in rel: for x in reversed(rel.split(sep)): if x and x != '.': parsed.append(intern(x)) else: if rel and rel != '.': parsed.append(intern(rel)) if drv or root: if not drv: # If no drive is present, try to find one in the previous # parts. This makes the result of parsing e.g. # ("C:", "/", "a") reasonably intuitive. for part in it: if not part: continue if altsep: part = part.replace(altsep, sep) drv = self.splitroot(part)[0] if drv: break break if drv or root: parsed.append(drv + root) parsed.reverse() return drv, root, parsed
def as_posix(self): """Return the string representation of the path with forward (/) slashes.""" f = self._flavour return str(self).replace(f.sep, '/')
def replace(self, target): """ Rename this path to the given path, clobbering the existing destination if it exists. """ if sys.version_info < (3, 3): raise NotImplementedError("replace() is only available " "with Python 3.3 and later") if self._closed: self._raise_closed() self._accessor.replace(self, target)
def _dump(self): temp = '%s-%s.tmp' % (uuid.uuid4(), self.name) with open(temp, 'w', encoding='utf-8') as tmp: json.dump(self._db.copy(), tmp, ensure_ascii=True, cls=self.encoder, separators=(',', ':')) # atomically move the file os.replace(temp, self.name)
def create_final_from_temp(self, temp_parfile_name): """Move newly created parfile to its final filename.""" # Python 2 doesn't have os.replace, so use os.rename which is # not atomic in all cases. os.chmod(temp_parfile_name, 0o0755) os.rename(temp_parfile_name, self.output_filename)