我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用shutil.move()。
def _copy_file(filename, location, link): copy = True download_location = os.path.join(location, link.filename) if os.path.exists(download_location): response = ask_path_exists( 'The file %s exists. (i)gnore, (w)ipe, (b)ackup, (a)abort' % display_path(download_location), ('i', 'w', 'b', 'a')) if response == 'i': copy = False elif response == 'w': logger.warning('Deleting %s', display_path(download_location)) os.remove(download_location) elif response == 'b': dest_file = backup_dir(download_location) logger.warning( 'Backing up %s to %s', display_path(download_location), display_path(dest_file), ) shutil.move(download_location, dest_file) elif response == 'a': sys.exit(-1) if copy: shutil.copy(filename, download_location) logger.info('Saved %s', display_path(download_location))
def _build_one(self, req, output_dir, python_tag=None): """Build one wheel. :return: The filename of the built wheel, or None if the build failed. """ tempd = tempfile.mkdtemp('pip-wheel-') try: if self.__build_one(req, tempd, python_tag=python_tag): try: wheel_name = os.listdir(tempd)[0] wheel_path = os.path.join(output_dir, wheel_name) shutil.move(os.path.join(tempd, wheel_name), wheel_path) logger.info('Stored in directory: %s', output_dir) return wheel_path except: pass # Ignore return, we can't do anything else useful. self._clean_one(req) return None finally: rmtree(tempd)
def _replace_zip_directory_cache_data(normalized_path): def replace_cached_zip_archive_directory_data(path, old_entry): # N.B. In theory, we could load the zip directory information just # once for all updated path spellings, and then copy it locally and # update its contained path strings to contain the correct # spelling, but that seems like a way too invasive move (this cache # structure is not officially documented anywhere and could in # theory change with new Python releases) for no significant # benefit. old_entry.clear() zipimport.zipimporter(path) old_entry.update(zipimport._zip_directory_cache[path]) return old_entry _update_zipimporter_cache( normalized_path, zipimport._zip_directory_cache, updater=replace_cached_zip_archive_directory_data)
def delete_account(self, account): """ Deletes the given `account` from the `keystore_dir` directory. Then deletes it from the `AccountsService` account manager instance. In fact, moves it to another location; another directory at the same level. """ app = self.app keystore_dir = app.services.accounts.keystore_dir deleted_keystore_dir = PyWalib.deleted_account_dir(keystore_dir) # create the deleted account dir if required if not os.path.exists(deleted_keystore_dir): os.makedirs(deleted_keystore_dir) # "removes" it from the file system account_filename = os.path.basename(account.path) deleted_account_path = os.path.join( deleted_keystore_dir, account_filename) shutil.move(account.path, deleted_account_path) # deletes it from the `AccountsService` account manager instance account_service = self.get_account_list() account_service.accounts.remove(account)
def maybe_move(self, spec, dist_filename, setup_base): dst = os.path.join(self.build_directory, spec.key) if os.path.exists(dst): msg = ( "%r already exists in %s; build directory %s will not be kept" ) log.warn(msg, spec.key, self.build_directory, setup_base) return setup_base if os.path.isdir(dist_filename): setup_base = dist_filename else: if os.path.dirname(dist_filename) == setup_base: os.unlink(dist_filename) # get it out of the tmp dir contents = os.listdir(setup_base) if len(contents) == 1: dist_filename = os.path.join(setup_base, contents[0]) if os.path.isdir(dist_filename): # if the only thing there is a directory, move it instead setup_base = dist_filename ensure_directory(dst) shutil.move(setup_base, dst) return dst
def join(args, outs, chunk_args, chunk_outs): chunk = chunk_outs[0] # if --output-dir and --interop-dir not overridden, move # into standard output location if not args.output_path: shutil.move(chunk.fastq_path, outs.fastq_path) else: outs.fastq_path = args.output_path if not args.interop_output_path: shutil.move(chunk.interop_path, outs.interop_path) else: outs.interop_path = args.interop_output_path outs.rc_i2_read = chunk.rc_i2_read outs.file_read_types_map = chunk.file_read_types_map outs.bcl2fastq_version = chunk.bcl2fastq_version outs.bcl2fastq_args = chunk.bcl2fastq_args
def paste(self): # ImageFile.LOAD_TRUNCATED_IMAGES = True dirname = os.path.dirname(__file__) command = ['/usr/bin/python3', os.path.join(dirname, 'bin/imageutil.py'), 'grab'] abs_fn, rel_fn = self.get_filename() tempfile1 = "/tmp/imagepaste1.png" command.append(tempfile1) out = self.run_command(" ".join(command)) if out[:4] == "grab": ret = sublime.ok_cancel_dialog("save to file?") print("ret %r" % ret) if ret: shutil.move(tempfile1, abs_fn) return rel_fn else: return None # im = ImageGrab.grabclipboard() # if im: # abs_fn, rel_fn = self.get_filename() # im.save(abs_fn,'PNG') # return rel_fn else: print('clipboard buffer is not image!') return None
def save_to_well_known_file(credentials, well_known_file=None): """Save the provided GoogleCredentials to the well known file. Args: credentials: the credentials to be saved to the well known file; it should be an instance of GoogleCredentials well_known_file: the name of the file where the credentials are to be saved; this parameter is supposed to be used for testing only """ # TODO(orestica): move this method to tools.py # once the argparse import gets fixed (it is not present in Python 2.6) if well_known_file is None: well_known_file = _get_well_known_file() config_dir = os.path.dirname(well_known_file) if not os.path.isdir(config_dir): raise OSError('Config directory does not exist: %s' % config_dir) credentials_data = credentials.serialization_data _save_private_file(well_known_file, credentials_data)
def _mv_contents(path, new_path): """ Try to move the contents of path to tmp_path. Return True/False. NOTE: Invoking `mv` as shutil.move() was not preserving ownership metadata. """ for entry in os.listdir(path): cmd = "mv {}/{} {}".format(path, entry, new_path) _rc, _stdout, _stderr = _run(cmd) if _rc != 0: return False return True # pylint: disable=too-many-return-statements
def _compile_module_file(template, text, filename, outputpath, module_writer): source, lexer = _compile(template, text, filename, generate_magic_comment=True) if isinstance(source, compat.text_type): source = source.encode(lexer.encoding or 'ascii') if module_writer: module_writer(source, outputpath) else: # make tempfiles in the same location as the ultimate # location. this ensures they're on the same filesystem, # avoiding synchronization issues. (dest, name) = tempfile.mkstemp(dir=os.path.dirname(outputpath)) os.write(dest, source) os.close(dest) shutil.move(name, outputpath)
def _copy_file(filename, location, content_type, link): copy = True download_location = os.path.join(location, link.filename) if os.path.exists(download_location): response = ask_path_exists( 'The file %s exists. (i)gnore, (w)ipe, (b)ackup ' % display_path(download_location), ('i', 'w', 'b')) if response == 'i': copy = False elif response == 'w': logger.warn('Deleting %s' % display_path(download_location)) os.remove(download_location) elif response == 'b': dest_file = backup_dir(download_location) logger.warn('Backing up %s to %s' % (display_path(download_location), display_path(dest_file))) shutil.move(download_location, dest_file) if copy: shutil.copy(filename, download_location) logger.notify('Saved %s' % display_path(download_location))
def maybe_move(self, spec, dist_filename, setup_base): dst = os.path.join(self.build_directory, spec.key) if os.path.exists(dst): msg = "%r already exists in %s; build directory %s will not be kept" log.warn(msg, spec.key, self.build_directory, setup_base) return setup_base if os.path.isdir(dist_filename): setup_base = dist_filename else: if os.path.dirname(dist_filename)==setup_base: os.unlink(dist_filename) # get it out of the tmp dir contents = os.listdir(setup_base) if len(contents)==1: dist_filename = os.path.join(setup_base,contents[0]) if os.path.isdir(dist_filename): # if the only thing there is a directory, move it instead setup_base = dist_filename ensure_directory(dst) shutil.move(setup_base, dst) return dst
def _build_one_inside_env(self, req, output_dir, python_tag=None, isolate=False): with TempDirectory(kind="wheel") as temp_dir: if self.__build_one(req, temp_dir.path, python_tag=python_tag, isolate=isolate): try: wheel_name = os.listdir(temp_dir.path)[0] wheel_path = os.path.join(output_dir, wheel_name) shutil.move( os.path.join(temp_dir.path, wheel_name), wheel_path ) logger.info('Stored in directory: %s', output_dir) return wheel_path except: pass # Ignore return, we can't do anything else useful. self._clean_one(req) return None
def sync(self): 'Write dict to disk' if self.flag == 'r': return filename = self.filename tempname = filename + '.tmp' fileobj = open(tempname, 'wb' if self.format=='pickle' else 'w') try: self.dump(fileobj) except Exception: os.remove(tempname) raise finally: fileobj.close() shutil.move(tempname, self.filename) # atomic commit if self.mode is not None: os.chmod(self.filename, self.mode)
def stripAxisCode(fileName): """copy lines from in file to out file up to first occurance of the string 'org.apache.axis', then just write closing brace. hasAxisCode detects of the file was already processed such that this is an idempotent operation. """ hasAxisCode = False fin = open(fileName, 'r') outName = ''.join([fileName, '.tmp']) fout = open(outName, 'wr') for line in fin: if (string.find(line, 'org.apache.axis') != -1 and string.find(line, 'extends') == -1 and string.find(line, 'implements') == -1): hasAxisCode = True break else: fout.write(line) fin.close() if hasAxisCode: fout.write("}\n") fout.close shutil.move(outName, fileName)
def create_backup(self): if self._verbose:print("Backing up current addon folder") local = os.path.join(self._updater_path,"backup") tempdest = os.path.join(self._addon_root, os.pardir, self._addon+"_updater_backup_temp") if os.path.isdir(local) == True: shutil.rmtree(local) if self._verbose:print("Backup destination path: ",local) # make the copy shutil.copytree(self._addon_root,tempdest) shutil.move(tempdest,local) # save the date for future ref now = datetime.now() self._json["backup_date"] = "{m}-{d}-{yr}".format( m=now.strftime("%B"),d=now.day,yr=now.year) self.save_updater_json()
def restore_backup(self): if self._verbose:print("Restoring backup") if self._verbose:print("Backing up current addon folder") backuploc = os.path.join(self._updater_path,"backup") tempdest = os.path.join(self._addon_root, os.pardir, self._addon+"_updater_backup_temp") tempdest = os.path.abspath(tempdest) # make the copy shutil.move(backuploc,tempdest) shutil.rmtree(self._addon_root) os.rename(tempdest,self._addon_root) self._json["backup_date"] = "" self._json["just_restored"] = True self._json["just_updated"] = True self.save_updater_json() self.reload_addon()
def _caches_to_file(cache_path, start, end, name, cb, concat): start_time = time() if concat: all_data = [] for i in range(start, end): data = load(os.path.join(cache_path, "{0}.jb".format(i))) all_data.extend(data) dump(all_data, name, 3) else: target_path = os.path.join(cache_path, name[:-3]) if not os.path.exists(target_path): os.makedirs(target_path) for i in range(start, end): src_file_path = os.path.join(cache_path, "{0}.jb".format(i)) basename = os.path.basename(src_file_path) target_file_path = os.path.join(target_path, basename) shutil.move(src_file_path, target_file_path) finished_flag = os.path.join(target_path, '.finished') with open(finished_flag, 'a'): os.utime(finished_flag, None) logging.debug("Finished saving data to {0}. Took {1}s".format(name, time()-start_time)) cb()
def restore_config_file(): os.remove(zmirror_file('config.py')) os.remove(zmirror_file('custom_func.py')) if os.path.exists(zmirror_file('config.py._unittest_raw')): shutil.move(zmirror_file('config.py._unittest_raw'), zmirror_file('config.py')) if os.path.exists(zmirror_file('custom_func.py._unittest_raw')): shutil.move(zmirror_file('custom_func.py._unittest_raw'), zmirror_file('custom_func.py')) try: os.remove(zmirror_file('ip_whitelist.txt')) except: pass try: os.remove(zmirror_file('ip_whitelist.log')) except: pass try: os.remove(zmirror_file('automatic_domains_whitelist.log')) except: pass
def test_obtain_netmiko_filename(): """Test file name and that directory is created.""" create_dir_test = True file_name_test = '/home/gituser/.netmiko/tmp/test_device.txt' file_name = obtain_netmiko_filename('test_device') assert file_name == file_name_test if create_dir_test: uuid_str = str(uuid.uuid1()) junk_dir_base = '/home/gituser/JUNK/netmiko' junk_dir = '{}/{}'.format(junk_dir_base, uuid_str) base_dir, full_dir = find_netmiko_dir() print(base_dir) # Move base_dir and recreate it if os.path.isdir(base_dir) and os.path.isdir(junk_dir_base): shutil.move(src=base_dir, dst=junk_dir) assert os.path.exists(base_dir) == False assert os.path.exists(full_dir) == False file_name = obtain_netmiko_filename('test_device') ensure_dir_exists(base_dir) ensure_dir_exists(full_dir) assert os.path.exists(base_dir) == True assert os.path.exists(full_dir) == True
def add_group(self, name=None, move=True): """ Add a new group of tasks/task generators. By default the new group becomes the default group for new task generators. :param name: name for this group :type name: string :param move: set the group created as default group (True by default) :type move: bool """ #if self.groups and not self.groups[0].tasks: # error('add_group: an empty group is already present') if name and name in self.group_names: Logs.error('add_group: name %s already present' % name) g = [] self.group_names[name] = g self.groups.append(g) if move: self.current_group = len(self.groups) - 1
def prepend(line, path): """ Appends *line* to the _beginning_ of the file at the given *path*. If *line* doesn't end in a newline one will be appended to the end of it. """ if isinstance(line, str): line = line.encode('utf-8') if not line.endswith(b'\n'): line += b'\n' temp = tempfile.NamedTemporaryFile('wb') temp_name = temp.name # We really only need a random path-safe name temp.close() with open(temp_name, 'wb') as temp: temp.write(line) with open(path, 'rb') as r: temp.write(r.read()) # Now replace the original with the modified version shutil.move(temp_name, path)
def _copy_file(filename, location, link): copy = True download_location = os.path.join(location, link.filename) if os.path.exists(download_location): response = ask_path_exists( 'The file %s exists. (i)gnore, (w)ipe, (b)ackup ' % display_path(download_location), ('i', 'w', 'b')) if response == 'i': copy = False elif response == 'w': logger.warning('Deleting %s', display_path(download_location)) os.remove(download_location) elif response == 'b': dest_file = backup_dir(download_location) logger.warning( 'Backing up %s to %s', display_path(download_location), display_path(dest_file), ) shutil.move(download_location, dest_file) if copy: shutil.copy(filename, download_location) logger.info('Saved %s', display_path(download_location))
def maybe_move(self, spec, dist_filename, setup_base): dst = os.path.join(self.build_directory, spec.key) if os.path.exists(dst): msg = ("%r already exists in %s; build directory %s will not be " "kept") log.warn(msg, spec.key, self.build_directory, setup_base) return setup_base if os.path.isdir(dist_filename): setup_base = dist_filename else: if os.path.dirname(dist_filename) == setup_base: os.unlink(dist_filename) # get it out of the tmp dir contents = os.listdir(setup_base) if len(contents) == 1: dist_filename = os.path.join(setup_base, contents[0]) if os.path.isdir(dist_filename): # if the only thing there is a directory, move it instead setup_base = dist_filename ensure_directory(dst) shutil.move(setup_base, dst) return dst
def video_recorder(test_case): recorder = VideoRecorder() recorder.start() yield recorder LOGGER.info("Stop video recording") recorder.stop() if test_case.is_failed: if os.path.isfile(recorder.file_path): with test_case.log_exception("Attach video"): shutil.move(recorder.file_path, os.path.join(test_case._test_report_dir, 'video.mp4')) else: LOGGER.warn( "Can't move video from {!r}".format(recorder.file_path)) else: recorder.clear()