Python shutil 模块,Error() 实例源码

我们从Python开源项目中,提取了以下49个代码示例,用于说明如何使用shutil.Error()

项目:DevOps    作者:YoLoveLife    | 项目源码 | 文件源码
def _unsafe_writes(self, src, dest, exception):
      # sadly there are some situations where we cannot ensure atomicity, but only if
      # the user insists and we get the appropriate error we update the file unsafely
      if exception.errno == errno.EBUSY:
          #TODO: issue warning that this is an unsafe operation, but doing it cause user insists
          try:
              try:
                  out_dest = open(dest, 'wb')
                  in_src = open(src, 'rb')
                  shutil.copyfileobj(in_src, out_dest)
              finally: # assuring closed files in 2.4 compatible way
                  if out_dest:
                      out_dest.close()
                  if in_src:
                      in_src.close()
          except (shutil.Error, OSError, IOError):
              e = get_exception()
              self.fail_json(msg='Could not write data to file (%s) from (%s): %s' % (dest, src, e))

      else:
          self.fail_json(msg='Could not replace file: %s to %s: %s' % (src, dest, exception))
项目:munki-enrollment-client    作者:gerritdewitt    | 项目源码 | 文件源码
def do_last_steps():
    '''Runs last steps'''
    common.print_info("Starting last steps...")
    # Restore automatic logins if our package postinstall script disabled them:
    if os.path.exists(config_paths.AUTO_LOGIN_PASSWORD_FILE_MOVED):
        try:
            os.rename(config_paths.AUTO_LOGIN_PASSWORD_FILE_MOVED,config_paths.AUTO_LOGIN_PASSWORD_FILE)
        except OSError:
            common.print_error("Could not restore automatic login by moving %s." % config_paths.AUTO_LOGIN_PASSWORD_FILE_MOVED)
    # Remove preference key that disabled FileVault automatic logins.
    # Use defaults since the plist is probably binary.
    osx.defaults_delete('DisableFDEAutoLogin', config_paths.LOGIN_WINDOW_PREFS_FILE)
    # Cleanup files:
    paths_list = config_paths.CLEANUP_FILES_ARRAY
    paths_list.append(config_paths.THIS_LAUNCH_AGENT_PATH)
    common.delete_files_by_path(paths_list)
    if os.path.exists(config_paths.THIS_APP_PATH):
        try:
            shutil.rmtree(config_paths.THIS_APP_PATH)
        except shutil.Error:
            pass
    common.print_info("Completed last steps. Rebooting...")
    # Reboot:
    osx.reboot_system()
项目:Quiver    作者:DeflatedPickle    | 项目源码 | 文件源码
def install_zip(self):
        pack = filedialog.askopenfile("r")
        found_pack = False

        if pack:
            with zipfile.ZipFile(pack.name, "r") as z:
                for file in z.namelist():
                    if file == "pack.mcmeta":
                        # messagebox.showinfo("Information", "Found 'pack.mcmeta'.")
                        found_pack = True

                pack.close()

            if found_pack:
                try:
                    shutil.move(pack.name, self.resourcepack_location)
                except shutil.Error:
                    messagebox.showerror("Error", "This pack is already installed.")

            else:
                messagebox.showerror("Error", "Could not find 'pack.mcmeta'.")
项目:nc-backup-py    作者:ChinaNetCloud    | 项目源码 | 文件源码
def copy_files(src, dst, uid, gid):
    """Copy files recursively and set uid and gid."""
    for root, dirs, files in os.walk(src):

        for name in dirs:
            dst_root = root.replace(src, dst)
            try:
                logging.warning("%s|%s" % (dst_root, name))
                logging.warning(os.path.join(root, name))
                os.mkdir(os.path.join(dst_root, name))
                os.chown(os.path.join(dst_root, name), uid, gid)
            except OSError, e:
                logging.warn(e)
        for name in files:
            dst_root = root.replace(src, dst)
            try:
                shutil.copyfile(os.path.join(root, name),
                                os.path.join(dst_root, name))
                os.chown(os.path.join(dst_root, name), uid, gid)
            except shutil.Error:
                logging.warn(e)
项目:DevOps    作者:YoLoveLife    | 项目源码 | 文件源码
def _check_locale(self):
        '''
        Uses the locale module to test the currently set locale
        (per the LANG and LC_CTYPE environment settings)
        '''
        try:
            # setting the locale to '' uses the default locale
            # as it would be returned by locale.getdefaultlocale()
            locale.setlocale(locale.LC_ALL, '')
        except locale.Error:
            # fallback to the 'C' locale, which may cause unicode
            # issues but is preferable to simply failing because
            # of an unknown locale
            locale.setlocale(locale.LC_ALL, 'C')
            os.environ['LANG'] = 'C'
            os.environ['LC_ALL'] = 'C'
            os.environ['LC_MESSAGES'] = 'C'
        except Exception:
            e = get_exception()
            self.fail_json(msg="An unknown error was encountered while attempting to validate the locale: %s" % e)
项目:zippy    作者:securesystemslab    | 项目源码 | 文件源码
def test_dont_copy_file_onto_link_to_itself(self):
        # Temporarily disable test on Windows.
        if os.name == 'nt':
            return
        # bug 851123.
        os.mkdir(TESTFN)
        src = os.path.join(TESTFN, 'cheese')
        dst = os.path.join(TESTFN, 'shop')
        try:
            with open(src, 'w') as f:
                f.write('cheddar')
            os.link(src, dst)
            self.assertRaises(shutil.Error, shutil.copyfile, src, dst)
            with open(src, 'r') as f:
                self.assertEqual(f.read(), 'cheddar')
            os.remove(dst)
        finally:
            shutil.rmtree(TESTFN, ignore_errors=True)
项目:zippy    作者:securesystemslab    | 项目源码 | 文件源码
def test_dont_copy_file_onto_symlink_to_itself(self):
        # bug 851123.
        os.mkdir(TESTFN)
        src = os.path.join(TESTFN, 'cheese')
        dst = os.path.join(TESTFN, 'shop')
        try:
            with open(src, 'w') as f:
                f.write('cheddar')
            # Using `src` here would mean we end up with a symlink pointing
            # to TESTFN/TESTFN/cheese, while it should point at
            # TESTFN/cheese.
            os.symlink('cheese', dst)
            self.assertRaises(shutil.Error, shutil.copyfile, src, dst)
            with open(src, 'r') as f:
                self.assertEqual(f.read(), 'cheddar')
            os.remove(dst)
        finally:
            shutil.rmtree(TESTFN, ignore_errors=True)
项目:zippy    作者:securesystemslab    | 项目源码 | 文件源码
def test_copytree_named_pipe(self):
            os.mkdir(TESTFN)
            try:
                subdir = os.path.join(TESTFN, "subdir")
                os.mkdir(subdir)
                pipe = os.path.join(subdir, "mypipe")
                os.mkfifo(pipe)
                try:
                    shutil.copytree(TESTFN, TESTFN2)
                except shutil.Error as e:
                    errors = e.args[0]
                    self.assertEqual(len(errors), 1)
                    src, dst, error_msg = errors[0]
                    self.assertEqual("`%s` is a named pipe" % pipe, error_msg)
                else:
                    self.fail("shutil.Error should have been raised")
            finally:
                shutil.rmtree(TESTFN, ignore_errors=True)
                shutil.rmtree(TESTFN2, ignore_errors=True)
项目:zippy    作者:securesystemslab    | 项目源码 | 文件源码
def test_copytree_dangling_symlinks(self):

        # a dangling symlink raises an error at the end
        src_dir = self.mkdtemp()
        dst_dir = os.path.join(self.mkdtemp(), 'destination')
        os.symlink('IDONTEXIST', os.path.join(src_dir, 'test.txt'))
        os.mkdir(os.path.join(src_dir, 'test_dir'))
        self._write_data(os.path.join(src_dir, 'test_dir', 'test.txt'), '456')
        self.assertRaises(Error, shutil.copytree, src_dir, dst_dir)

        # a dangling symlink is ignored with the proper flag
        dst_dir = os.path.join(self.mkdtemp(), 'destination2')
        shutil.copytree(src_dir, dst_dir, ignore_dangling_symlinks=True)
        self.assertNotIn('test.txt', os.listdir(dst_dir))

        # a dangling symlink is copied if symlinks=True
        dst_dir = os.path.join(self.mkdtemp(), 'destination3')
        shutil.copytree(src_dir, dst_dir, symlinks=True)
        self.assertIn('test.txt', os.listdir(dst_dir))
项目:Polyglot    作者:UniversalDevicesInc    | 项目源码 | 文件源码
def write(self):
        """ Writes configuration file """
        # wait for file to unlock. Timeout after 5 seconds
        for _ in range(5):
            if not self._writing:
                self._writing = True
                break
            time.sleep(1)
        else:
            _LOGGER.error('Could not write to configuration file. It is busy.')
            return

        # dump JSON to config file and unlock
        encoded = self.encode()
        json.dump(encoded, open(self._filetmp, 'w'), sort_keys=True, indent=4,
                  separators=(',', ': '))
        os.chmod(self._filetmp, stat.S_IRUSR | stat.S_IWUSR)
        try:
            shutil.move(self._filetmp, self._file)
            _LOGGER.debug('Config file succesfully updated.')
        except shutil.Error as e:
            _LOGGER.error('Failed to move temp config file to original error: ' + str(e))

        self._writing = False
        _LOGGER.debug('Wrote configuration file')
项目:oil    作者:oilshell    | 项目源码 | 文件源码
def test_copytree_named_pipe(self):
        os.mkdir(TESTFN)
        try:
            subdir = os.path.join(TESTFN, "subdir")
            os.mkdir(subdir)
            pipe = os.path.join(subdir, "mypipe")
            os.mkfifo(pipe)
            try:
                shutil.copytree(TESTFN, TESTFN2)
            except shutil.Error as e:
                errors = e.args[0]
                self.assertEqual(len(errors), 1)
                src, dst, error_msg = errors[0]
                self.assertEqual("`%s` is a named pipe" % pipe, error_msg)
            else:
                self.fail("shutil.Error should have been raised")
        finally:
            shutil.rmtree(TESTFN, ignore_errors=True)
            shutil.rmtree(TESTFN2, ignore_errors=True)
项目:python2-tracer    作者:extremecoders-re    | 项目源码 | 文件源码
def test_copytree_named_pipe(self):
        os.mkdir(TESTFN)
        try:
            subdir = os.path.join(TESTFN, "subdir")
            os.mkdir(subdir)
            pipe = os.path.join(subdir, "mypipe")
            os.mkfifo(pipe)
            try:
                shutil.copytree(TESTFN, TESTFN2)
            except shutil.Error as e:
                errors = e.args[0]
                self.assertEqual(len(errors), 1)
                src, dst, error_msg = errors[0]
                self.assertEqual("`%s` is a named pipe" % pipe, error_msg)
            else:
                self.fail("shutil.Error should have been raised")
        finally:
            shutil.rmtree(TESTFN, ignore_errors=True)
            shutil.rmtree(TESTFN2, ignore_errors=True)
项目:glazier    作者:google    | 项目源码 | 文件源码
def CreateDirectories(path):
  """Create directory if the path to a file doesn't exist.

  Args:
    path: The full file path to where a file will be placed.

  Raises:
    Error: Failure creating the requested directory.
  """
  dirname = os.path.dirname(path)
  if not os.path.isdir(dirname):
    logging.debug('Creating directory %s ', dirname)
    try:
      os.makedirs(dirname)
    except (shutil.Error, OSError):
      raise Error('Unable to make directory: %s' % dirname)
项目:kalliope    作者:kalliope-project    | 项目源码 | 文件源码
def _rename_temp_folder(name, target_folder, tmp_path):
        """
        Rename the temp folder of the cloned repo
        Return the name of the path to install
        :return: path to install, None if already exists
        """
        logger.debug("[ResourcesManager] Rename temp folder")
        new_absolute_neuron_path = target_folder + os.sep + name
        try:
            shutil.move(tmp_path, new_absolute_neuron_path)
            return new_absolute_neuron_path
        except shutil.Error:
            # the folder already exist
            Utils.print_warning("The module %s already exist in the path %s" % (name, target_folder))
            # remove the cloned repo
            logger.debug("[ResourcesManager] Deleting temp folder %s" % str(tmp_path))
            shutil.rmtree(tmp_path)
项目:web_ctp    作者:molebot    | 项目源码 | 文件源码
def test_dont_copy_file_onto_link_to_itself(self):
        # Temporarily disable test on Windows.
        if os.name == 'nt':
            return
        # bug 851123.
        os.mkdir(TESTFN)
        src = os.path.join(TESTFN, 'cheese')
        dst = os.path.join(TESTFN, 'shop')
        try:
            with open(src, 'w') as f:
                f.write('cheddar')
            os.link(src, dst)
            self.assertRaises(shutil.Error, shutil.copyfile, src, dst)
            with open(src, 'r') as f:
                self.assertEqual(f.read(), 'cheddar')
            os.remove(dst)
        finally:
            shutil.rmtree(TESTFN, ignore_errors=True)
项目:web_ctp    作者:molebot    | 项目源码 | 文件源码
def test_dont_copy_file_onto_symlink_to_itself(self):
        # bug 851123.
        os.mkdir(TESTFN)
        src = os.path.join(TESTFN, 'cheese')
        dst = os.path.join(TESTFN, 'shop')
        try:
            with open(src, 'w') as f:
                f.write('cheddar')
            # Using `src` here would mean we end up with a symlink pointing
            # to TESTFN/TESTFN/cheese, while it should point at
            # TESTFN/cheese.
            os.symlink('cheese', dst)
            self.assertRaises(shutil.Error, shutil.copyfile, src, dst)
            with open(src, 'r') as f:
                self.assertEqual(f.read(), 'cheddar')
            os.remove(dst)
        finally:
            shutil.rmtree(TESTFN, ignore_errors=True)
项目:web_ctp    作者:molebot    | 项目源码 | 文件源码
def test_copytree_named_pipe(self):
            os.mkdir(TESTFN)
            try:
                subdir = os.path.join(TESTFN, "subdir")
                os.mkdir(subdir)
                pipe = os.path.join(subdir, "mypipe")
                os.mkfifo(pipe)
                try:
                    shutil.copytree(TESTFN, TESTFN2)
                except shutil.Error as e:
                    errors = e.args[0]
                    self.assertEqual(len(errors), 1)
                    src, dst, error_msg = errors[0]
                    self.assertEqual("`%s` is a named pipe" % pipe, error_msg)
                else:
                    self.fail("shutil.Error should have been raised")
            finally:
                shutil.rmtree(TESTFN, ignore_errors=True)
                shutil.rmtree(TESTFN2, ignore_errors=True)
项目:web_ctp    作者:molebot    | 项目源码 | 文件源码
def test_copytree_dangling_symlinks(self):

        # a dangling symlink raises an error at the end
        src_dir = self.mkdtemp()
        dst_dir = os.path.join(self.mkdtemp(), 'destination')
        os.symlink('IDONTEXIST', os.path.join(src_dir, 'test.txt'))
        os.mkdir(os.path.join(src_dir, 'test_dir'))
        write_file((src_dir, 'test_dir', 'test.txt'), '456')
        self.assertRaises(Error, shutil.copytree, src_dir, dst_dir)

        # a dangling symlink is ignored with the proper flag
        dst_dir = os.path.join(self.mkdtemp(), 'destination2')
        shutil.copytree(src_dir, dst_dir, ignore_dangling_symlinks=True)
        self.assertNotIn('test.txt', os.listdir(dst_dir))

        # a dangling symlink is copied if symlinks=True
        dst_dir = os.path.join(self.mkdtemp(), 'destination3')
        shutil.copytree(src_dir, dst_dir, symlinks=True)
        self.assertIn('test.txt', os.listdir(dst_dir))
项目:Electrify    作者:jyapayne    | 项目源码 | 文件源码
def copy_files_to_project_folder(self):
        old_dir = CWD
        os.chdir(self.project_dir())
        self.logger.info(u'Copying files to {}'.format(self.project_dir()))
        for sgroup in self.settings['setting_groups']:
            for setting in sgroup.values():
                if setting.copy and setting.type == 'file' and setting.value:
                    f_path = setting.value.replace(self.project_dir(), '')
                    if os.path.isabs(f_path):
                        try:
                            utils.copy(setting.value, self.project_dir())
                            self.logger.info(u'Copying file {} to {}'.format(setting.value, self.project_dir()))
                        except shutil.Error as e:  # same file warning
                            self.logger.warning(u'Warning: {}'.format(e))
                        finally:
                            setting.value = os.path.basename(setting.value)

        os.chdir(old_dir)
项目:treadmill    作者:Morgan-Stanley    | 项目源码 | 文件源码
def collect_cgroup(approot, destroot):
    """Get host treadmill cgroups inforamation."""
    src = "%s/cgroup_svc" % approot
    dest = "%s%s" % (destroot, src)

    try:
        shutil.copytree(src, dest)
    except (shutil.Error, OSError):
        _LOGGER.warning('skip %s => %s', src, dest)

    pattern = '/cgroup/*/treadmill/core'
    for cgrp_core in glob.glob(pattern):
        core_dest = '%s%s' % (destroot, cgrp_core)

        try:
            shutil.copytree(cgrp_core, core_dest)
        except (shutil.Error, OSError):
            _LOGGER.warning('skip %s => %s', src, dest)
项目:pefile.pypy    作者:cloudtracer    | 项目源码 | 文件源码
def test_copytree_named_pipe(self):
        os.mkdir(TESTFN)
        try:
            subdir = os.path.join(TESTFN, "subdir")
            os.mkdir(subdir)
            pipe = os.path.join(subdir, "mypipe")
            os.mkfifo(pipe)
            try:
                shutil.copytree(TESTFN, TESTFN2)
            except shutil.Error as e:
                errors = e.args[0]
                self.assertEqual(len(errors), 1)
                src, dst, error_msg = errors[0]
                self.assertEqual("`%s` is a named pipe" % pipe, error_msg)
            else:
                self.fail("shutil.Error should have been raised")
        finally:
            shutil.rmtree(TESTFN, ignore_errors=True)
            shutil.rmtree(TESTFN2, ignore_errors=True)
项目:ouroboros    作者:pybee    | 项目源码 | 文件源码
def test_copytree_named_pipe(self):
        os.mkdir(TESTFN)
        try:
            subdir = os.path.join(TESTFN, "subdir")
            os.mkdir(subdir)
            pipe = os.path.join(subdir, "mypipe")
            os.mkfifo(pipe)
            try:
                shutil.copytree(TESTFN, TESTFN2)
            except shutil.Error as e:
                errors = e.args[0]
                self.assertEqual(len(errors), 1)
                src, dst, error_msg = errors[0]
                self.assertEqual("`%s` is a named pipe" % pipe, error_msg)
            else:
                self.fail("shutil.Error should have been raised")
        finally:
            shutil.rmtree(TESTFN, ignore_errors=True)
            shutil.rmtree(TESTFN2, ignore_errors=True)
项目:ouroboros    作者:pybee    | 项目源码 | 文件源码
def test_copytree_dangling_symlinks(self):

        # a dangling symlink raises an error at the end
        src_dir = self.mkdtemp()
        dst_dir = os.path.join(self.mkdtemp(), 'destination')
        os.symlink('IDONTEXIST', os.path.join(src_dir, 'test.txt'))
        os.mkdir(os.path.join(src_dir, 'test_dir'))
        write_file((src_dir, 'test_dir', 'test.txt'), '456')
        self.assertRaises(Error, shutil.copytree, src_dir, dst_dir)

        # a dangling symlink is ignored with the proper flag
        dst_dir = os.path.join(self.mkdtemp(), 'destination2')
        shutil.copytree(src_dir, dst_dir, ignore_dangling_symlinks=True)
        self.assertNotIn('test.txt', os.listdir(dst_dir))

        # a dangling symlink is copied if symlinks=True
        dst_dir = os.path.join(self.mkdtemp(), 'destination3')
        shutil.copytree(src_dir, dst_dir, symlinks=True)
        self.assertIn('test.txt', os.listdir(dst_dir))
项目:chef_community_cookbooks    作者:DennyZhang    | 项目源码 | 文件源码
def copytree(src, dst, symlinks=False, ignore=None):
    for item in os.listdir(src):
        s = os.path.join(src, item)
        d = os.path.join(dst, item)
        if os.path.isdir(s):
            # TODO: better way to copy while skip symbol links
            try:
                shutil.copytree(s, d, symlinks, ignore)
            except shutil.Error as e:
                logging.warning('Warning: Some directories not copied under %s.' % (s))
            except OSError as e:
                logging.warning('Warning: Some directories not copied under %s.' % (s))
                # logging.warning('Some directories not copied. Error: %s' % e)
        else:
            shutil.copy2(s, d)

################################################################################
项目:ndk-python    作者:gittor    | 项目源码 | 文件源码
def test_copytree_named_pipe(self):
            os.mkdir(TESTFN)
            try:
                subdir = os.path.join(TESTFN, "subdir")
                os.mkdir(subdir)
                pipe = os.path.join(subdir, "mypipe")
                os.mkfifo(pipe)
                try:
                    shutil.copytree(TESTFN, TESTFN2)
                except shutil.Error as e:
                    errors = e.args[0]
                    self.assertEqual(len(errors), 1)
                    src, dst, error_msg = errors[0]
                    self.assertEqual("`%s` is a named pipe" % pipe, error_msg)
                else:
                    self.fail("shutil.Error should have been raised")
            finally:
                shutil.rmtree(TESTFN, ignore_errors=True)
                shutil.rmtree(TESTFN2, ignore_errors=True)
项目:Slivka    作者:warownia1    | 项目源码 | 文件源码
def copytree(src, dst):
    """
    Alternative implementation of shutil.copytree which allows to copy into
    existing directories.
    :param src:
    :param dst:
    :return:
    """
    os.makedirs(dst, exist_ok=True)
    errors = []
    for name in os.listdir(src):
        srcname = os.path.join(src, name)
        dstname = os.path.join(dst, name)
        try:
            if os.path.isdir(srcname):
                copytree(srcname, dstname)
            else:
                shutil.copy(srcname, dstname)
        except shutil.Error as err:
            errors.extend(err.args[0])
        except OSError as err:
            errors.append((srcname, dstname, str(err)))
    if errors:
        raise shutil.Error(errors)
    return dst
项目:spamscope    作者:SpamScope    | 项目源码 | 文件源码
def ack(self, tup_id):
        """Acknowledge tup_id, that is the path_mail. """

        if os.path.exists(tup_id):
            if self._what == "remove":
                os.remove(tup_id)
            else:
                try:
                    shutil.move(tup_id, self._where)
                except shutil.Error:
                    os.remove(tup_id)

        try:
            # Remove from tail analyzed mail
            self._queue.task_done()
            self._queue_tail.remove(tup_id)
            self.log("Mails to process: {}".format(len(self._queue_tail)))
        except KeyError, e:
            self.raise_exception(e, tup_id)
项目:grade-oven    作者:mikelmcdaniel    | 项目源码 | 文件源码
def from_zip(cls, file_obj, stages_name, stages_root):
    "Unpack zip from file_obj into os.path.join(stages_root, stages_name)."
    try:
      assignment_root = os.path.join(stages_root, stages_name)
      os.mkdir(assignment_root)
      with zipfile.ZipFile(file_obj, 'r') as zf:
        bad_filename = zf.testzip()
        if bad_filename is not None:
          raise Error('Corrupt file in zip: ' + bad_filename)
        # TODO: Handle case where zf.namelist() uses a lot of memory
        archived_files = zf.namelist()
        for af in archived_files:
          zf.extract(af, assignment_root)
        # TODO: The stage.save_main_script() code below is used as a workaround
        # to ensure that the main script is executable. Ideally, file
        # permissions would be preserved.
        stages = cls(assignment_root)
        for stage in stages.stages.itervalues():
          stage.save_main_script()
        return stages
    except (zipfile.BadZipfile, zipfile.LargeZipFile) as e:
      raise Error(e)
项目:voiceid    作者:sih4sing5hong5    | 项目源码 | 文件源码
def _set_filename(self, filename):
        """Set the filename of the current working file"""
        tmp_file = '_'.join(filename.split())
#        new_file = new_file.replace("'",
#                        '_').replace('-',
#                        '_').replace(' ',
#                        '_').replace('(', '_').replace(')', '_')
        new_file = ''
        pathsep = os.path.sep 
        if sys.platform == 'win32':
            pathsep = '/'
        for char in tmp_file:
            if char.isalnum() or char in  ['.', '_', ':', pathsep, '-']:
                new_file += char
        try:
            shutil.copy(filename, new_file)
        except shutil.Error, err:
            msg = "`%s` and `%s` are the same file" % (filename, new_file)
            if  str(err) == msg:
                pass
            else:
                raise err
        utils.ensure_file_exists(new_file)
        self._filename = new_file
        self._basename, self._ext = os.path.splitext(self._filename)
项目:tempy    作者:Dascr32    | 项目源码 | 文件源码
def delete_dir_content(dir_path=analyzer.TEMP_DIR):
    global errors
    gather_data_before_delete(dir_path)

    dir_content = analyzer.get_dir_content(dir_path)

    for item in dir_content.keys():
        item_path = os.path.join(dir_path, item)

        try:
            if os.path.isfile(item_path):
                print("Deleting file:", item)
                os.remove(item_path)

            else:
                print("Deleting dir:", item)
                shutil.rmtree(item_path)

        except(os.error, shutil.Error) as error:
            errors.append(str(error))
            print("Unable to delete")

    gather_cleanup_data(dir_path)
项目:kbe_server    作者:xiaohaoppy    | 项目源码 | 文件源码
def test_copytree_named_pipe(self):
        os.mkdir(TESTFN)
        try:
            subdir = os.path.join(TESTFN, "subdir")
            os.mkdir(subdir)
            pipe = os.path.join(subdir, "mypipe")
            os.mkfifo(pipe)
            try:
                shutil.copytree(TESTFN, TESTFN2)
            except shutil.Error as e:
                errors = e.args[0]
                self.assertEqual(len(errors), 1)
                src, dst, error_msg = errors[0]
                self.assertEqual("`%s` is a named pipe" % pipe, error_msg)
            else:
                self.fail("shutil.Error should have been raised")
        finally:
            shutil.rmtree(TESTFN, ignore_errors=True)
            shutil.rmtree(TESTFN2, ignore_errors=True)
项目:kbe_server    作者:xiaohaoppy    | 项目源码 | 文件源码
def test_copytree_dangling_symlinks(self):

        # a dangling symlink raises an error at the end
        src_dir = self.mkdtemp()
        dst_dir = os.path.join(self.mkdtemp(), 'destination')
        os.symlink('IDONTEXIST', os.path.join(src_dir, 'test.txt'))
        os.mkdir(os.path.join(src_dir, 'test_dir'))
        write_file((src_dir, 'test_dir', 'test.txt'), '456')
        self.assertRaises(Error, shutil.copytree, src_dir, dst_dir)

        # a dangling symlink is ignored with the proper flag
        dst_dir = os.path.join(self.mkdtemp(), 'destination2')
        shutil.copytree(src_dir, dst_dir, ignore_dangling_symlinks=True)
        self.assertNotIn('test.txt', os.listdir(dst_dir))

        # a dangling symlink is copied if symlinks=True
        dst_dir = os.path.join(self.mkdtemp(), 'destination3')
        shutil.copytree(src_dir, dst_dir, symlinks=True)
        self.assertIn('test.txt', os.listdir(dst_dir))
项目:ansible-provider-docs    作者:alibaba    | 项目源码 | 文件源码
def _check_locale(self):
        '''
        Uses the locale module to test the currently set locale
        (per the LANG and LC_CTYPE environment settings)
        '''
        try:
            # setting the locale to '' uses the default locale
            # as it would be returned by locale.getdefaultlocale()
            locale.setlocale(locale.LC_ALL, '')
        except locale.Error:
            # fallback to the 'C' locale, which may cause unicode
            # issues but is preferable to simply failing because
            # of an unknown locale
            locale.setlocale(locale.LC_ALL, 'C')
            os.environ['LANG'] = 'C'
            os.environ['LC_ALL'] = 'C'
            os.environ['LC_MESSAGES'] = 'C'
        except Exception as e:
            self.fail_json(msg="An unknown error was encountered while attempting to validate the locale: %s" %
                           to_native(e), exception=traceback.format_exc())
项目:ansible-provider-docs    作者:alibaba    | 项目源码 | 文件源码
def _unsafe_writes(self, src, dest):
        # sadly there are some situations where we cannot ensure atomicity, but only if
        # the user insists and we get the appropriate error we update the file unsafely
        try:
            try:
                out_dest = open(dest, 'wb')
                in_src = open(src, 'rb')
                shutil.copyfileobj(in_src, out_dest)
            finally:  # assuring closed files in 2.4 compatible way
                if out_dest:
                    out_dest.close()
                if in_src:
                    in_src.close()
        except (shutil.Error, OSError, IOError) as e:
            self.fail_json(msg='Could not write data to file (%s) from (%s): %s' % (dest, src, to_native(e)),
                           exception=traceback.format_exc())
项目:solaris-ips    作者:oracle    | 项目源码 | 文件源码
def setlocale(category, loc=None, printer=None):
        """Wraps locale.setlocale(), falling back to the C locale if the desired
        locale is broken or unavailable.  The 'printer' parameter should be a
        function which takes a string and displays it.  If 'None' (the default),
        setlocale() will print the message to stderr."""

        if printer is None:
                printer = emsg

        try:
                locale.setlocale(category, loc)
                # Because of Python bug 813449, getdefaultlocale may fail
                # with a ValueError even if setlocale succeeds. So we call
                # it here to prevent having this error raised if it is
                # called later by other non-pkg(7) code.
                locale.getdefaultlocale()
        except (locale.Error, ValueError):
                try:
                        dl = " '{0}.{1}'".format(*locale.getdefaultlocale())
                except ValueError:
                        dl = ""
                printer("Unable to set locale{0}; locale package may be broken "
                    "or\nnot installed.  Reverting to C locale.".format(dl))
                locale.setlocale(category, "C")
项目:indy-plenum    作者:hyperledger    | 项目源码 | 文件源码
def moveKeyFilesToCorrectLocations(keys_dir, pkdir, skdir):
    for key_file in os.listdir(keys_dir):
        if key_file.endswith(".key"):
            try:
                shutil.move(os.path.join(keys_dir, key_file),
                            os.path.join(pkdir, key_file))
            except shutil.Error as ex:
                # print(ex)
                pass
        if key_file.endswith(".key_secret"):
            try:
                shutil.move(os.path.join(keys_dir, key_file),
                            os.path.join(skdir, key_file))
            except shutil.Error as ex:
                # print(ex)
                pass
项目:mopidy-auto    作者:gotling    | 项目源码 | 文件源码
def post(self):
        to = self.get_body_argument("to", False)

        if to:
            destination_uri = os.path.join(self.config['auto']['base_path'], to)
            destination_path = urllib.url2pathname(urlparse.urlparse(destination_uri.split('file://')[1]).path)

            logger.info("Moving '{}' to '{}'".format(self.album_path, destination_path))

            self.core.tracklist.clear()
            try:
                shutil.move(self.album_path, destination_path)
                logger.info("Moved album '{}' to '{}'".format(self.album_path, destination_path))
            except shutil.Error:
                logger.error("Could not move album '{}' to '{}'".format(self.album_path, destination_path), exc_info=1)
        else:
            logger.error('Destination not supplied, move canceled')

        return self.redirect('/auto')
项目:enigma2-plugins    作者:opendreambox    | 项目源码 | 文件源码
def copyPhoto(self, photo, target, recursive=True):
        """Attempt to copy photo from cache to given destination.

        Arguments:
        photo: photo object to download.
        target: target filename
        recursive (optional): attempt to download picture if it does not exist yet

        Returns:
        True if image was copied successfully,
        False if image did not exist and download was initiated,
        otherwise None.

        Raises:
        shutil.Error if an error occured during moving the file.
        """
        pass
项目:whitewater-encoder    作者:samiare    | 项目源码 | 文件源码
def _copy_temp_directory(self):
        """Copy the temp directory to the output directory."""

        if os.path.exists(self.paths['output']):

            oldfiles = [f for f in os.listdir(self.paths['output'])]
            for oldfile in oldfiles:
                os.remove(os.path.join(self.paths['output'], oldfile))

            newfiles = [f for f in os.listdir(self.paths['temp'])]
            for newfile in newfiles:
                shutil.copy2(os.path.join(self.paths['temp'], newfile), self.paths['output'])

        else:
            try:
                shutil.copytree(self.paths['temp'], self.paths['output'])
            except shutil.Error as err:
                self.exit(err)

        shutil.rmtree(self.paths['temp'])
项目:Quiver    作者:DeflatedPickle    | 项目源码 | 文件源码
def open_pack(self):
        pack = filedialog.askdirectory(initialdir=self.resourcepack_location)
        if os.path.isfile(pack + "/pack.mcmeta"):
            # messagebox.showinfo("Information", "Found 'pack.mcmeta'.")
            self.parent.directory = pack
            self.parent.cmd.tree_refresh()
            self.destroy()
        else:
            messagebox.showerror("Error", "Could not find 'pack.mcmeta'.")
项目:Quiver    作者:DeflatedPickle    | 项目源码 | 文件源码
def open_zip(self):
        pack = filedialog.askopenfile("r", initialdir=self.resourcepack_location)
        found_pack = False

        if pack:
            amount = functions.zip_files(pack.name)
            progress = dialog.ProgressWindow(self.parent, title="Opening Zip", maximum=amount)

            count = 0

            with zipfile.ZipFile(pack.name, "r") as z:
                for file in z.namelist():
                    if file == "pack.mcmeta":
                        # messagebox.showinfo("Information", "Found 'pack.mcmeta'.")
                        found_pack = True
                        self.destroy()

                if found_pack:
                    self.parent.d = tempfile.TemporaryDirectory()
                    for file in z.namelist():
                        z.extract(file, self.parent.d.name)

                        count += 1
                        progress.variable_name.set("Current File: " + file)
                        progress.variable_percent.set("{}% Complete".format(round(100 * float(count) / float(amount))))
                        progress.variable_progress.set(progress.variable_progress.get() + 1)

                    self.parent.name = pack.name.split("/")[-1].split(".")[0]
                    self.parent.directory = self.parent.d.name
                    self.parent.directory_real = pack.name
                    self.parent.cmd.tree_refresh()
                    self.destroy()

                elif not found_pack:
                    messagebox.showerror("Error", "Could not find 'pack.mcmeta'.")

                pack.close()
            progress.destroy()
项目:Quiver    作者:DeflatedPickle    | 项目源码 | 文件源码
def install_pack(self):
        pack = filedialog.askdirectory()
        if os.path.isfile(pack + "/pack.mcmeta"):
            # messagebox.showinfo("Information", "Found 'pack.mcmeta'.")
            try:
                shutil.move(pack, self.resourcepack_location)
            except shutil.Error:
                messagebox.showerror("Error", "This pack is already installed.")
        else:
            messagebox.showerror("Error", "Could not find 'pack.mcmeta'.")
项目:DevOps    作者:YoLoveLife    | 项目源码 | 文件源码
def put_file(self, in_path, out_path):
        ''' transfer a file from local to local '''

        super(Connection, self).put_file(in_path, out_path)

        display.vvv(u"PUT {0} TO {1}".format(in_path, out_path), host=self._play_context.remote_addr)
        if not os.path.exists(to_bytes(in_path, errors='surrogate_or_strict')):
            raise AnsibleFileNotFound("file or module does not exist: {0}".format(to_native(in_path)))
        try:
            shutil.copyfile(to_bytes(in_path, errors='surrogate_or_strict'), to_bytes(out_path, errors='surrogate_or_strict'))
        except shutil.Error:
            raise AnsibleError("failed to copy: {0} and {1} are the same".format(to_native(in_path), to_native(out_path)))
        except IOError as e:
            raise AnsibleError("failed to transfer file to {0}: {1}".format(to_native(out_path), to_native(e)))
项目:zippy    作者:securesystemslab    | 项目源码 | 文件源码
def test_existing_file_inside_dest_dir(self):
        # A file with the same name inside the destination dir already exists.
        with open(self.dst_file, "wb"):
            pass
        self.assertRaises(shutil.Error, shutil.move, self.src_file, self.dst_dir)
项目:sorta    作者:loics2    | 项目源码 | 文件源码
def sort(self):
        """Sort the files inside the drop folder."""
        for elt in os.listdir(self.path):
            if elt == CONFIG_FILE_NAME:
                continue

            try:
                dest = self._get_destination(elt)
                move(os.path.join(self.path, elt), dest)
                logging.info("%s moved to %s", elt, dest)
            except LookupError as e:
                logging.warning(str(e))
            except Error:
                logging.warning("error while moving the element %s", elt)
项目:Solfege    作者:RannyeriDev    | 项目源码 | 文件源码
def show_bug_reports(self, *v):
        m = Gtk.Dialog(_("Question"), self, 0)
        m.add_button(Gtk.STOCK_CANCEL, Gtk.ResponseType.CANCEL)
        m.add_button(Gtk.STOCK_OK, Gtk.ResponseType.OK)
        vbox = Gtk.VBox()
        m.vbox.pack_start(vbox, False, False, 0)
        vbox.set_spacing(18)
        vbox.set_border_width(12)
        l = Gtk.Label(label=_("Please enter the email used when you submitted the bugs:"))
        vbox.pack_start(l, False, False, 0)
        self.g_email = Gtk.Entry()
        m.action_area.get_children()[0].grab_default()
        self.g_email.set_activates_default(True)
        vbox.pack_start(self.g_email, False, False, 0)
        m.show_all()
        ret = m.run()
        m.destroy()
        if ret == Gtk.ResponseType.OK:
            params = urllib.urlencode({
                    'pagename': 'SITS-Incoming/SearchBugs',
                    'q': 'SITS-Incoming/"Submitter: %s"' % utils.mangle_email(self.g_email.get_text().decode("utf-8")()),
                })
            try:
                webbrowser.open_new("http://www.solfege.org?%s" % params)
            except Exception, e:
                self.display_error_message2(_("Error opening web browser"), str(e))
项目:Solfege    作者:RannyeriDev    | 项目源码 | 文件源码
def display_docfile(self, fn):
        """
        Display the HTML file named by fn in the help browser window.
        """
        for lang in solfege.app.m_userman_language, "C":
            filename = os.path.join(os.getcwdu(), u"help", lang, fn)

            if os.path.isfile(filename):
                break
        try:
            webbrowser.open(filename)
        except Exception, e:
            self.display_error_message2(_("Error opening web browser"), str(e))
项目:oil    作者:oilshell    | 项目源码 | 文件源码
def test_dont_copy_file_onto_link_to_itself(self):
            # bug 851123.
            os.mkdir(TESTFN)
            src = os.path.join(TESTFN, 'cheese')
            dst = os.path.join(TESTFN, 'shop')
            try:
                f = open(src, 'w')
                f.write('cheddar')
                f.close()

                os.link(src, dst)
                self.assertRaises(shutil.Error, shutil.copyfile, src, dst)
                with open(src, 'r') as f:
                    self.assertEqual(f.read(), 'cheddar')
                os.remove(dst)

                # Using `src` here would mean we end up with a symlink pointing
                # to TESTFN/TESTFN/cheese, while it should point at
                # TESTFN/cheese.
                os.symlink('cheese', dst)
                self.assertRaises(shutil.Error, shutil.copyfile, src, dst)
                with open(src, 'r') as f:
                    self.assertEqual(f.read(), 'cheddar')
                os.remove(dst)
            finally:
                try:
                    shutil.rmtree(TESTFN)
                except OSError:
                    pass
项目:oil    作者:oilshell    | 项目源码 | 文件源码
def test_existing_file_inside_dest_dir(self):
        # A file with the same name inside the destination dir already exists.
        with open(self.dst_file, "wb"):
            pass
        self.assertRaises(shutil.Error, shutil.move, self.src_file, self.dst_dir)