我们从Python开源项目中,提取了以下49个代码示例,用于说明如何使用shutil.Error()。
def _unsafe_writes(self, src, dest, exception): # sadly there are some situations where we cannot ensure atomicity, but only if # the user insists and we get the appropriate error we update the file unsafely if exception.errno == errno.EBUSY: #TODO: issue warning that this is an unsafe operation, but doing it cause user insists try: try: out_dest = open(dest, 'wb') in_src = open(src, 'rb') shutil.copyfileobj(in_src, out_dest) finally: # assuring closed files in 2.4 compatible way if out_dest: out_dest.close() if in_src: in_src.close() except (shutil.Error, OSError, IOError): e = get_exception() self.fail_json(msg='Could not write data to file (%s) from (%s): %s' % (dest, src, e)) else: self.fail_json(msg='Could not replace file: %s to %s: %s' % (src, dest, exception))
def do_last_steps(): '''Runs last steps''' common.print_info("Starting last steps...") # Restore automatic logins if our package postinstall script disabled them: if os.path.exists(config_paths.AUTO_LOGIN_PASSWORD_FILE_MOVED): try: os.rename(config_paths.AUTO_LOGIN_PASSWORD_FILE_MOVED,config_paths.AUTO_LOGIN_PASSWORD_FILE) except OSError: common.print_error("Could not restore automatic login by moving %s." % config_paths.AUTO_LOGIN_PASSWORD_FILE_MOVED) # Remove preference key that disabled FileVault automatic logins. # Use defaults since the plist is probably binary. osx.defaults_delete('DisableFDEAutoLogin', config_paths.LOGIN_WINDOW_PREFS_FILE) # Cleanup files: paths_list = config_paths.CLEANUP_FILES_ARRAY paths_list.append(config_paths.THIS_LAUNCH_AGENT_PATH) common.delete_files_by_path(paths_list) if os.path.exists(config_paths.THIS_APP_PATH): try: shutil.rmtree(config_paths.THIS_APP_PATH) except shutil.Error: pass common.print_info("Completed last steps. Rebooting...") # Reboot: osx.reboot_system()
def install_zip(self): pack = filedialog.askopenfile("r") found_pack = False if pack: with zipfile.ZipFile(pack.name, "r") as z: for file in z.namelist(): if file == "pack.mcmeta": # messagebox.showinfo("Information", "Found 'pack.mcmeta'.") found_pack = True pack.close() if found_pack: try: shutil.move(pack.name, self.resourcepack_location) except shutil.Error: messagebox.showerror("Error", "This pack is already installed.") else: messagebox.showerror("Error", "Could not find 'pack.mcmeta'.")
def copy_files(src, dst, uid, gid): """Copy files recursively and set uid and gid.""" for root, dirs, files in os.walk(src): for name in dirs: dst_root = root.replace(src, dst) try: logging.warning("%s|%s" % (dst_root, name)) logging.warning(os.path.join(root, name)) os.mkdir(os.path.join(dst_root, name)) os.chown(os.path.join(dst_root, name), uid, gid) except OSError, e: logging.warn(e) for name in files: dst_root = root.replace(src, dst) try: shutil.copyfile(os.path.join(root, name), os.path.join(dst_root, name)) os.chown(os.path.join(dst_root, name), uid, gid) except shutil.Error: logging.warn(e)
def _check_locale(self): ''' Uses the locale module to test the currently set locale (per the LANG and LC_CTYPE environment settings) ''' try: # setting the locale to '' uses the default locale # as it would be returned by locale.getdefaultlocale() locale.setlocale(locale.LC_ALL, '') except locale.Error: # fallback to the 'C' locale, which may cause unicode # issues but is preferable to simply failing because # of an unknown locale locale.setlocale(locale.LC_ALL, 'C') os.environ['LANG'] = 'C' os.environ['LC_ALL'] = 'C' os.environ['LC_MESSAGES'] = 'C' except Exception: e = get_exception() self.fail_json(msg="An unknown error was encountered while attempting to validate the locale: %s" % e)
def test_dont_copy_file_onto_link_to_itself(self): # Temporarily disable test on Windows. if os.name == 'nt': return # bug 851123. os.mkdir(TESTFN) src = os.path.join(TESTFN, 'cheese') dst = os.path.join(TESTFN, 'shop') try: with open(src, 'w') as f: f.write('cheddar') os.link(src, dst) self.assertRaises(shutil.Error, shutil.copyfile, src, dst) with open(src, 'r') as f: self.assertEqual(f.read(), 'cheddar') os.remove(dst) finally: shutil.rmtree(TESTFN, ignore_errors=True)
def test_dont_copy_file_onto_symlink_to_itself(self): # bug 851123. os.mkdir(TESTFN) src = os.path.join(TESTFN, 'cheese') dst = os.path.join(TESTFN, 'shop') try: with open(src, 'w') as f: f.write('cheddar') # Using `src` here would mean we end up with a symlink pointing # to TESTFN/TESTFN/cheese, while it should point at # TESTFN/cheese. os.symlink('cheese', dst) self.assertRaises(shutil.Error, shutil.copyfile, src, dst) with open(src, 'r') as f: self.assertEqual(f.read(), 'cheddar') os.remove(dst) finally: shutil.rmtree(TESTFN, ignore_errors=True)
def test_copytree_named_pipe(self): os.mkdir(TESTFN) try: subdir = os.path.join(TESTFN, "subdir") os.mkdir(subdir) pipe = os.path.join(subdir, "mypipe") os.mkfifo(pipe) try: shutil.copytree(TESTFN, TESTFN2) except shutil.Error as e: errors = e.args[0] self.assertEqual(len(errors), 1) src, dst, error_msg = errors[0] self.assertEqual("`%s` is a named pipe" % pipe, error_msg) else: self.fail("shutil.Error should have been raised") finally: shutil.rmtree(TESTFN, ignore_errors=True) shutil.rmtree(TESTFN2, ignore_errors=True)
def test_copytree_dangling_symlinks(self): # a dangling symlink raises an error at the end src_dir = self.mkdtemp() dst_dir = os.path.join(self.mkdtemp(), 'destination') os.symlink('IDONTEXIST', os.path.join(src_dir, 'test.txt')) os.mkdir(os.path.join(src_dir, 'test_dir')) self._write_data(os.path.join(src_dir, 'test_dir', 'test.txt'), '456') self.assertRaises(Error, shutil.copytree, src_dir, dst_dir) # a dangling symlink is ignored with the proper flag dst_dir = os.path.join(self.mkdtemp(), 'destination2') shutil.copytree(src_dir, dst_dir, ignore_dangling_symlinks=True) self.assertNotIn('test.txt', os.listdir(dst_dir)) # a dangling symlink is copied if symlinks=True dst_dir = os.path.join(self.mkdtemp(), 'destination3') shutil.copytree(src_dir, dst_dir, symlinks=True) self.assertIn('test.txt', os.listdir(dst_dir))
def write(self): """ Writes configuration file """ # wait for file to unlock. Timeout after 5 seconds for _ in range(5): if not self._writing: self._writing = True break time.sleep(1) else: _LOGGER.error('Could not write to configuration file. It is busy.') return # dump JSON to config file and unlock encoded = self.encode() json.dump(encoded, open(self._filetmp, 'w'), sort_keys=True, indent=4, separators=(',', ': ')) os.chmod(self._filetmp, stat.S_IRUSR | stat.S_IWUSR) try: shutil.move(self._filetmp, self._file) _LOGGER.debug('Config file succesfully updated.') except shutil.Error as e: _LOGGER.error('Failed to move temp config file to original error: ' + str(e)) self._writing = False _LOGGER.debug('Wrote configuration file')
def CreateDirectories(path): """Create directory if the path to a file doesn't exist. Args: path: The full file path to where a file will be placed. Raises: Error: Failure creating the requested directory. """ dirname = os.path.dirname(path) if not os.path.isdir(dirname): logging.debug('Creating directory %s ', dirname) try: os.makedirs(dirname) except (shutil.Error, OSError): raise Error('Unable to make directory: %s' % dirname)
def _rename_temp_folder(name, target_folder, tmp_path): """ Rename the temp folder of the cloned repo Return the name of the path to install :return: path to install, None if already exists """ logger.debug("[ResourcesManager] Rename temp folder") new_absolute_neuron_path = target_folder + os.sep + name try: shutil.move(tmp_path, new_absolute_neuron_path) return new_absolute_neuron_path except shutil.Error: # the folder already exist Utils.print_warning("The module %s already exist in the path %s" % (name, target_folder)) # remove the cloned repo logger.debug("[ResourcesManager] Deleting temp folder %s" % str(tmp_path)) shutil.rmtree(tmp_path)
def test_copytree_dangling_symlinks(self): # a dangling symlink raises an error at the end src_dir = self.mkdtemp() dst_dir = os.path.join(self.mkdtemp(), 'destination') os.symlink('IDONTEXIST', os.path.join(src_dir, 'test.txt')) os.mkdir(os.path.join(src_dir, 'test_dir')) write_file((src_dir, 'test_dir', 'test.txt'), '456') self.assertRaises(Error, shutil.copytree, src_dir, dst_dir) # a dangling symlink is ignored with the proper flag dst_dir = os.path.join(self.mkdtemp(), 'destination2') shutil.copytree(src_dir, dst_dir, ignore_dangling_symlinks=True) self.assertNotIn('test.txt', os.listdir(dst_dir)) # a dangling symlink is copied if symlinks=True dst_dir = os.path.join(self.mkdtemp(), 'destination3') shutil.copytree(src_dir, dst_dir, symlinks=True) self.assertIn('test.txt', os.listdir(dst_dir))
def copy_files_to_project_folder(self): old_dir = CWD os.chdir(self.project_dir()) self.logger.info(u'Copying files to {}'.format(self.project_dir())) for sgroup in self.settings['setting_groups']: for setting in sgroup.values(): if setting.copy and setting.type == 'file' and setting.value: f_path = setting.value.replace(self.project_dir(), '') if os.path.isabs(f_path): try: utils.copy(setting.value, self.project_dir()) self.logger.info(u'Copying file {} to {}'.format(setting.value, self.project_dir())) except shutil.Error as e: # same file warning self.logger.warning(u'Warning: {}'.format(e)) finally: setting.value = os.path.basename(setting.value) os.chdir(old_dir)
def collect_cgroup(approot, destroot): """Get host treadmill cgroups inforamation.""" src = "%s/cgroup_svc" % approot dest = "%s%s" % (destroot, src) try: shutil.copytree(src, dest) except (shutil.Error, OSError): _LOGGER.warning('skip %s => %s', src, dest) pattern = '/cgroup/*/treadmill/core' for cgrp_core in glob.glob(pattern): core_dest = '%s%s' % (destroot, cgrp_core) try: shutil.copytree(cgrp_core, core_dest) except (shutil.Error, OSError): _LOGGER.warning('skip %s => %s', src, dest)
def copytree(src, dst, symlinks=False, ignore=None): for item in os.listdir(src): s = os.path.join(src, item) d = os.path.join(dst, item) if os.path.isdir(s): # TODO: better way to copy while skip symbol links try: shutil.copytree(s, d, symlinks, ignore) except shutil.Error as e: logging.warning('Warning: Some directories not copied under %s.' % (s)) except OSError as e: logging.warning('Warning: Some directories not copied under %s.' % (s)) # logging.warning('Some directories not copied. Error: %s' % e) else: shutil.copy2(s, d) ################################################################################
def copytree(src, dst): """ Alternative implementation of shutil.copytree which allows to copy into existing directories. :param src: :param dst: :return: """ os.makedirs(dst, exist_ok=True) errors = [] for name in os.listdir(src): srcname = os.path.join(src, name) dstname = os.path.join(dst, name) try: if os.path.isdir(srcname): copytree(srcname, dstname) else: shutil.copy(srcname, dstname) except shutil.Error as err: errors.extend(err.args[0]) except OSError as err: errors.append((srcname, dstname, str(err))) if errors: raise shutil.Error(errors) return dst
def ack(self, tup_id): """Acknowledge tup_id, that is the path_mail. """ if os.path.exists(tup_id): if self._what == "remove": os.remove(tup_id) else: try: shutil.move(tup_id, self._where) except shutil.Error: os.remove(tup_id) try: # Remove from tail analyzed mail self._queue.task_done() self._queue_tail.remove(tup_id) self.log("Mails to process: {}".format(len(self._queue_tail))) except KeyError, e: self.raise_exception(e, tup_id)
def from_zip(cls, file_obj, stages_name, stages_root): "Unpack zip from file_obj into os.path.join(stages_root, stages_name)." try: assignment_root = os.path.join(stages_root, stages_name) os.mkdir(assignment_root) with zipfile.ZipFile(file_obj, 'r') as zf: bad_filename = zf.testzip() if bad_filename is not None: raise Error('Corrupt file in zip: ' + bad_filename) # TODO: Handle case where zf.namelist() uses a lot of memory archived_files = zf.namelist() for af in archived_files: zf.extract(af, assignment_root) # TODO: The stage.save_main_script() code below is used as a workaround # to ensure that the main script is executable. Ideally, file # permissions would be preserved. stages = cls(assignment_root) for stage in stages.stages.itervalues(): stage.save_main_script() return stages except (zipfile.BadZipfile, zipfile.LargeZipFile) as e: raise Error(e)
def _set_filename(self, filename): """Set the filename of the current working file""" tmp_file = '_'.join(filename.split()) # new_file = new_file.replace("'", # '_').replace('-', # '_').replace(' ', # '_').replace('(', '_').replace(')', '_') new_file = '' pathsep = os.path.sep if sys.platform == 'win32': pathsep = '/' for char in tmp_file: if char.isalnum() or char in ['.', '_', ':', pathsep, '-']: new_file += char try: shutil.copy(filename, new_file) except shutil.Error, err: msg = "`%s` and `%s` are the same file" % (filename, new_file) if str(err) == msg: pass else: raise err utils.ensure_file_exists(new_file) self._filename = new_file self._basename, self._ext = os.path.splitext(self._filename)
def delete_dir_content(dir_path=analyzer.TEMP_DIR): global errors gather_data_before_delete(dir_path) dir_content = analyzer.get_dir_content(dir_path) for item in dir_content.keys(): item_path = os.path.join(dir_path, item) try: if os.path.isfile(item_path): print("Deleting file:", item) os.remove(item_path) else: print("Deleting dir:", item) shutil.rmtree(item_path) except(os.error, shutil.Error) as error: errors.append(str(error)) print("Unable to delete") gather_cleanup_data(dir_path)
def _check_locale(self): ''' Uses the locale module to test the currently set locale (per the LANG and LC_CTYPE environment settings) ''' try: # setting the locale to '' uses the default locale # as it would be returned by locale.getdefaultlocale() locale.setlocale(locale.LC_ALL, '') except locale.Error: # fallback to the 'C' locale, which may cause unicode # issues but is preferable to simply failing because # of an unknown locale locale.setlocale(locale.LC_ALL, 'C') os.environ['LANG'] = 'C' os.environ['LC_ALL'] = 'C' os.environ['LC_MESSAGES'] = 'C' except Exception as e: self.fail_json(msg="An unknown error was encountered while attempting to validate the locale: %s" % to_native(e), exception=traceback.format_exc())
def _unsafe_writes(self, src, dest): # sadly there are some situations where we cannot ensure atomicity, but only if # the user insists and we get the appropriate error we update the file unsafely try: try: out_dest = open(dest, 'wb') in_src = open(src, 'rb') shutil.copyfileobj(in_src, out_dest) finally: # assuring closed files in 2.4 compatible way if out_dest: out_dest.close() if in_src: in_src.close() except (shutil.Error, OSError, IOError) as e: self.fail_json(msg='Could not write data to file (%s) from (%s): %s' % (dest, src, to_native(e)), exception=traceback.format_exc())
def setlocale(category, loc=None, printer=None): """Wraps locale.setlocale(), falling back to the C locale if the desired locale is broken or unavailable. The 'printer' parameter should be a function which takes a string and displays it. If 'None' (the default), setlocale() will print the message to stderr.""" if printer is None: printer = emsg try: locale.setlocale(category, loc) # Because of Python bug 813449, getdefaultlocale may fail # with a ValueError even if setlocale succeeds. So we call # it here to prevent having this error raised if it is # called later by other non-pkg(7) code. locale.getdefaultlocale() except (locale.Error, ValueError): try: dl = " '{0}.{1}'".format(*locale.getdefaultlocale()) except ValueError: dl = "" printer("Unable to set locale{0}; locale package may be broken " "or\nnot installed. Reverting to C locale.".format(dl)) locale.setlocale(category, "C")
def moveKeyFilesToCorrectLocations(keys_dir, pkdir, skdir): for key_file in os.listdir(keys_dir): if key_file.endswith(".key"): try: shutil.move(os.path.join(keys_dir, key_file), os.path.join(pkdir, key_file)) except shutil.Error as ex: # print(ex) pass if key_file.endswith(".key_secret"): try: shutil.move(os.path.join(keys_dir, key_file), os.path.join(skdir, key_file)) except shutil.Error as ex: # print(ex) pass
def post(self): to = self.get_body_argument("to", False) if to: destination_uri = os.path.join(self.config['auto']['base_path'], to) destination_path = urllib.url2pathname(urlparse.urlparse(destination_uri.split('file://')[1]).path) logger.info("Moving '{}' to '{}'".format(self.album_path, destination_path)) self.core.tracklist.clear() try: shutil.move(self.album_path, destination_path) logger.info("Moved album '{}' to '{}'".format(self.album_path, destination_path)) except shutil.Error: logger.error("Could not move album '{}' to '{}'".format(self.album_path, destination_path), exc_info=1) else: logger.error('Destination not supplied, move canceled') return self.redirect('/auto')
def copyPhoto(self, photo, target, recursive=True): """Attempt to copy photo from cache to given destination. Arguments: photo: photo object to download. target: target filename recursive (optional): attempt to download picture if it does not exist yet Returns: True if image was copied successfully, False if image did not exist and download was initiated, otherwise None. Raises: shutil.Error if an error occured during moving the file. """ pass
def _copy_temp_directory(self): """Copy the temp directory to the output directory.""" if os.path.exists(self.paths['output']): oldfiles = [f for f in os.listdir(self.paths['output'])] for oldfile in oldfiles: os.remove(os.path.join(self.paths['output'], oldfile)) newfiles = [f for f in os.listdir(self.paths['temp'])] for newfile in newfiles: shutil.copy2(os.path.join(self.paths['temp'], newfile), self.paths['output']) else: try: shutil.copytree(self.paths['temp'], self.paths['output']) except shutil.Error as err: self.exit(err) shutil.rmtree(self.paths['temp'])
def open_pack(self): pack = filedialog.askdirectory(initialdir=self.resourcepack_location) if os.path.isfile(pack + "/pack.mcmeta"): # messagebox.showinfo("Information", "Found 'pack.mcmeta'.") self.parent.directory = pack self.parent.cmd.tree_refresh() self.destroy() else: messagebox.showerror("Error", "Could not find 'pack.mcmeta'.")
def open_zip(self): pack = filedialog.askopenfile("r", initialdir=self.resourcepack_location) found_pack = False if pack: amount = functions.zip_files(pack.name) progress = dialog.ProgressWindow(self.parent, title="Opening Zip", maximum=amount) count = 0 with zipfile.ZipFile(pack.name, "r") as z: for file in z.namelist(): if file == "pack.mcmeta": # messagebox.showinfo("Information", "Found 'pack.mcmeta'.") found_pack = True self.destroy() if found_pack: self.parent.d = tempfile.TemporaryDirectory() for file in z.namelist(): z.extract(file, self.parent.d.name) count += 1 progress.variable_name.set("Current File: " + file) progress.variable_percent.set("{}% Complete".format(round(100 * float(count) / float(amount)))) progress.variable_progress.set(progress.variable_progress.get() + 1) self.parent.name = pack.name.split("/")[-1].split(".")[0] self.parent.directory = self.parent.d.name self.parent.directory_real = pack.name self.parent.cmd.tree_refresh() self.destroy() elif not found_pack: messagebox.showerror("Error", "Could not find 'pack.mcmeta'.") pack.close() progress.destroy()
def install_pack(self): pack = filedialog.askdirectory() if os.path.isfile(pack + "/pack.mcmeta"): # messagebox.showinfo("Information", "Found 'pack.mcmeta'.") try: shutil.move(pack, self.resourcepack_location) except shutil.Error: messagebox.showerror("Error", "This pack is already installed.") else: messagebox.showerror("Error", "Could not find 'pack.mcmeta'.")
def put_file(self, in_path, out_path): ''' transfer a file from local to local ''' super(Connection, self).put_file(in_path, out_path) display.vvv(u"PUT {0} TO {1}".format(in_path, out_path), host=self._play_context.remote_addr) if not os.path.exists(to_bytes(in_path, errors='surrogate_or_strict')): raise AnsibleFileNotFound("file or module does not exist: {0}".format(to_native(in_path))) try: shutil.copyfile(to_bytes(in_path, errors='surrogate_or_strict'), to_bytes(out_path, errors='surrogate_or_strict')) except shutil.Error: raise AnsibleError("failed to copy: {0} and {1} are the same".format(to_native(in_path), to_native(out_path))) except IOError as e: raise AnsibleError("failed to transfer file to {0}: {1}".format(to_native(out_path), to_native(e)))
def test_existing_file_inside_dest_dir(self): # A file with the same name inside the destination dir already exists. with open(self.dst_file, "wb"): pass self.assertRaises(shutil.Error, shutil.move, self.src_file, self.dst_dir)
def sort(self): """Sort the files inside the drop folder.""" for elt in os.listdir(self.path): if elt == CONFIG_FILE_NAME: continue try: dest = self._get_destination(elt) move(os.path.join(self.path, elt), dest) logging.info("%s moved to %s", elt, dest) except LookupError as e: logging.warning(str(e)) except Error: logging.warning("error while moving the element %s", elt)
def show_bug_reports(self, *v): m = Gtk.Dialog(_("Question"), self, 0) m.add_button(Gtk.STOCK_CANCEL, Gtk.ResponseType.CANCEL) m.add_button(Gtk.STOCK_OK, Gtk.ResponseType.OK) vbox = Gtk.VBox() m.vbox.pack_start(vbox, False, False, 0) vbox.set_spacing(18) vbox.set_border_width(12) l = Gtk.Label(label=_("Please enter the email used when you submitted the bugs:")) vbox.pack_start(l, False, False, 0) self.g_email = Gtk.Entry() m.action_area.get_children()[0].grab_default() self.g_email.set_activates_default(True) vbox.pack_start(self.g_email, False, False, 0) m.show_all() ret = m.run() m.destroy() if ret == Gtk.ResponseType.OK: params = urllib.urlencode({ 'pagename': 'SITS-Incoming/SearchBugs', 'q': 'SITS-Incoming/"Submitter: %s"' % utils.mangle_email(self.g_email.get_text().decode("utf-8")()), }) try: webbrowser.open_new("http://www.solfege.org?%s" % params) except Exception, e: self.display_error_message2(_("Error opening web browser"), str(e))
def display_docfile(self, fn): """ Display the HTML file named by fn in the help browser window. """ for lang in solfege.app.m_userman_language, "C": filename = os.path.join(os.getcwdu(), u"help", lang, fn) if os.path.isfile(filename): break try: webbrowser.open(filename) except Exception, e: self.display_error_message2(_("Error opening web browser"), str(e))
def test_dont_copy_file_onto_link_to_itself(self): # bug 851123. os.mkdir(TESTFN) src = os.path.join(TESTFN, 'cheese') dst = os.path.join(TESTFN, 'shop') try: f = open(src, 'w') f.write('cheddar') f.close() os.link(src, dst) self.assertRaises(shutil.Error, shutil.copyfile, src, dst) with open(src, 'r') as f: self.assertEqual(f.read(), 'cheddar') os.remove(dst) # Using `src` here would mean we end up with a symlink pointing # to TESTFN/TESTFN/cheese, while it should point at # TESTFN/cheese. os.symlink('cheese', dst) self.assertRaises(shutil.Error, shutil.copyfile, src, dst) with open(src, 'r') as f: self.assertEqual(f.read(), 'cheddar') os.remove(dst) finally: try: shutil.rmtree(TESTFN) except OSError: pass