我们从Python开源项目中,提取了以下24个代码示例,用于说明如何使用errno.ENOTEMPTY。
def delete_object(self, bucket, obj): fname = self._get_storage_path(bucket, obj) try: os.unlink(fname) except OSError, e: if e.errno == errno.ENOENT: raise Error(404, 'Not Found') bname = self._get_storage_path(bucket) fname = os.path.dirname(fname) while fname and len(fname) > len(bname): try: os.rmdir(fname) except OSError, e: if e.errno == errno.ENOTEMPTY: break else: raise Error(500, str(e)) fname = os.path.dirname(fname)
def _delete_measures_files_for_metric(self, metric_id, files): for f in files: try: os.unlink(self._build_measure_path(metric_id, f)) except OSError as e: # Another process deleted it in the meantime, no prob' if e.errno != errno.ENOENT: raise try: os.rmdir(self._build_measure_path(metric_id)) except OSError as e: # ENOENT: ok, it has been removed at almost the same time # by another process # ENOTEMPTY: ok, someone pushed measure in the meantime, # we'll delete the measures and directory later # EEXIST: some systems use this instead of ENOTEMPTY if e.errno not in (errno.ENOENT, errno.ENOTEMPTY, errno.EEXIST): raise
def rmdir(self, parent_inode, name, ctx): assert self._initialized assert_or_errno(self.access(parent_inode, WX_OK, ctx), EPERM, 'no wx perm to parent inode') pn = self.forest.inodes.get_by_value(parent_inode) n = self._lookup(pn, name, ctx) assert_or_errno(n, ENOENT, 'no entry in tree') try: assert_or_errno(n.leaf_node.is_dir, ENOENT, 'non-directory target') self._ctx_sticky_mutate_check(ctx, pn, n) assert_or_errno(not n.node.children, ENOTEMPTY) n.leaf_node.root.remove_from_tree(n.leaf_node) pn.change_times() # TBD: Also check the subdir permission? finally: n.deref()
def rmdir(self, parent, name): # parent is a directory if Not(self.is_dir(parent)): return 0, errno.ENOTDIR on = self._mach.create_on([]) ino = self._dirfn(parent, name[0]) assertion(self.is_dir(ino)) if UGT(self._attrs.nlink(ino), 2): return BitVecVal(0, 32), errno.ENOTEMPTY # Wipe data _fn = self._datafn._fn fn = lambda i, b, o: If( And(on, ino == i), BitVecVal(0, 64), _fn(i, b, o)) self._datafn._fn = fn self._attrs.set_nlink(parent, self._attrs.nlink(parent) - 1, guard=on) self._attrs.set_nlink(ino, 1, guard=on) return ino, BitVecVal(0, 32)
def cmd_rmdir(self, cmdict): shelf = self._path2shelf(cmdict['path']) children = self.db.get_directory_shelves(shelf) self.errno = errno.ENOTEMPTY assert not children, 'Directory is not empty' shelf = self.db.delete_shelf(shelf) # handle link counts. put after the delete call, so that if # the delete fails, the link counts remain consistent parent_shelf = self._path2shelf(self._path2list( cmdict['path'])[:-1]) # fingers crossed parent_shelf.link_count -= 1 parent_shelf.matchfields = ('link_count', ) self.db.modify_shelf(parent_shelf, commit=True) return shelf
def rmdir_p(self): """ Like :meth:`rmdir`, but does not raise an exception if the directory is not empty or does not exist. """ try: self.rmdir() except OSError: _, e, _ = sys.exc_info() if e.errno != errno.ENOTEMPTY and e.errno != errno.EEXIST: raise return self
def removedirs_p(self): """ Like :meth:`removedirs`, but does not raise an exception if the directory is not empty or does not exist. """ try: self.removedirs() except OSError: _, e, _ = sys.exc_info() if e.errno != errno.ENOTEMPTY and e.errno != errno.EEXIST: raise return self # --- Modifying operations on files
def rmdir_p(self): """ Like :meth:`rmdir`, but does not raise an exception if the directory is not empty or does not exist. """ try: self.rmdir() except OSError: # pragma: no cover (temporary) _, e, _ = sys.exc_info() if e.errno != errno.ENOTEMPTY and e.errno != errno.ENOENT: raise return self
def RmDirs(dir_name): """Removes dir_name and every subsequently empty directory above it. Unlike os.removedirs and shutil.rmtree, this function doesn't raise an error if the directory does not exist. Args: dir_name: Directory to be removed. """ try: shutil.rmtree(dir_name) except OSError, err: if err.errno != errno.ENOENT: raise try: parent_directory = os.path.dirname(dir_name) while parent_directory: try: os.rmdir(parent_directory) except OSError, err: if err.errno != errno.ENOENT: raise parent_directory = os.path.dirname(parent_directory) except OSError, err: if err.errno not in (errno.EACCES, errno.ENOTEMPTY, errno.EPERM): raise
def delete_bucket(self, bucket): path = self._get_storage_path(bucket) try: os.rmdir(path) except OSError, e: if e.errno == errno.ENOENT: raise Error(404, 'Not Found') elif e.errno == errno.ENOTEMPTY: raise Error(409, 'Confict') else: raise Error(500, str(e))
def post_publish_cleanup(workingdirs): '''clean up empty directories left under --builddir''' for d in workingdirs: if os.path.isdir(d): try: logger.debug("removing build dir %s", d) os.rmdir(d) except OSError as e: if e.errno != errno.ENOTEMPTY: raise logger.debug("Could not remove %s; files still present", d)
def _call_windows_retry(func, args=(), retry_max=5, retry_delay=0.5): """ It's possible to see spurious errors on Windows due to various things keeping a handle to the directory open (explorer, virus scanners, etc) So we try a few times if it fails with a known error. """ retry_count = 0 while True: try: func(*args) except OSError as e: # Error codes are defined in: # http://docs.python.org/2/library/errno.html#module-errno if e.errno not in (errno.EACCES, errno.ENOTEMPTY): raise if retry_count == retry_max: raise retry_count += 1 LOG.info('%s() failed for "%s". Reason: %s (%s). Retrying...', func.__name__, args, e.strerror, e.errno) time.sleep(retry_delay) else: # If no exception has been thrown it should be done break
def smbComDeleteDirectory(connId, smbServer, SMBCommand, recvPacket ): connData = smbServer.getConnectionData(connId) respSMBCommand = smb.SMBCommand(smb.SMB.SMB_COM_DELETE_DIRECTORY) respParameters = '' respData = '' comDeleteDirectoryData= smb.SMBDeleteDirectory_Data(flags = recvPacket['Flags2'], data = SMBCommand['Data']) # Get the Tid associated if connData['ConnectedShares'].has_key(recvPacket['Tid']): errorCode = STATUS_SUCCESS path = connData['ConnectedShares'][recvPacket['Tid']]['path'] fileName = os.path.normpath(decodeSMBString(recvPacket['Flags2'],comDeleteDirectoryData['DirectoryName']).replace('\\','/')) if len(fileName) > 0 and (fileName[0] == '/' or fileName[0] == '\\'): # strip leading '/' fileName = fileName[1:] pathName = os.path.join(path,fileName) if os.path.exists(pathName) is not True: errorCode = STATUS_NO_SUCH_FILE # TODO: More checks here in the future.. Specially when we support # user access else: try: os.rmdir(pathName) except OSError, e: smbServer.log("smbComDeleteDirectory: %s" % e,logging.ERROR) if e.errno == errno.ENOTEMPTY: errorCode = STATUS_DIRECTORY_NOT_EMPTY else: errorCode = STATUS_ACCESS_DENIED else: errorCode = STATUS_SMB_BAD_TID if errorCode > 0: respParameters = '' respData = '' respSMBCommand['Parameters'] = respParameters respSMBCommand['Data'] = respData smbServer.setConnectionData(connId, connData) return [respSMBCommand], None, errorCode
def rmtree(path, selinux=False, exclude=()): """Version of shutil.rmtree that ignores no-such-file-or-directory errors, tries harder if it finds immutable files and supports excluding paths""" if os.path.islink(path): raise OSError("Cannot call rmtree on a symbolic link") try_again = True retries = 0 failed_to_handle = False failed_filename = None if path in exclude: return while try_again: try_again = False try: names = os.listdir(path) for name in names: fullname = os.path.join(path, name) if fullname not in exclude: try: mode = os.lstat(fullname).st_mode except OSError: mode = 0 if stat.S_ISDIR(mode): try: rmtree(fullname, selinux=selinux, exclude=exclude) except OSError as e: if e.errno in (errno.EPERM, errno.EACCES, errno.EBUSY): # we alrady tried handling this on lower level and failed, # there's no point in trying again now failed_to_handle = True raise else: os.remove(fullname) os.rmdir(path) except OSError as e: if failed_to_handle: raise if e.errno == errno.ENOENT: # no such file or directory pass elif exclude and e.errno == errno.ENOTEMPTY: # there's something excluded left pass elif selinux and (e.errno == errno.EPERM or e.errno == errno.EACCES): try_again = True if failed_filename == e.filename: raise failed_filename = e.filename os.system("chattr -R -i %s" % path) elif e.errno == errno.EBUSY: retries += 1 if retries > 1: raise try_again = True getLog().debug("retrying failed tree remove after sleeping a bit") time.sleep(2) else: raise
def rmdir(self, parent, name): assertion(self.is_dir(parent), "rmdir: parent is not a directory") assertion(name[0] != 0, "rmdir: name is null") self._inode.begin_tx() parent_block, parent_bid, off, valid = self.locate_dentry_ino(parent, name) if Not(valid): self._inode.commit_tx() return 0, errno.ENOENT assertion(valid, "rmdir: dentry off not valid") ino = Extract(31, 0, parent_block[off]) if Not(self.is_dir(ino)): self._inode.commit_tx() return 0, errno.ENOTDIR assertion(self.is_dir(ino), "rmdir: ino is not dir") attr = self._inode.get_iattr(ino) if UGT(attr.nlink, 2): self._inode.commit_tx() return BitVecVal(0, 32), errno.ENOTEMPTY attr = self._inode.get_iattr(parent) assertion(UGE(attr.nlink, 2), "rmdir: nlink is not greater than 1: " + str(attr.nlink)) attr.nlink -= 1 self._inode.set_iattr(parent, attr) self.clear_dentry(parent_block, off) self._inode.write(parent_bid, parent_block) attr = self._inode.get_iattr(ino) attr.nlink = 1 self._inode.set_iattr(ino, attr) # append the inode to the orphan list self._orphans.append(Extend(ino, 64)) self._inode.commit_tx() return ino, 0
def remove(self, pkgplan): path = self.get_installed_path(pkgplan.image.get_root()) try: os.rmdir(path) except OSError as e: if e.errno == errno.ENOENT: pass elif e.errno in (errno.EEXIST, errno.ENOTEMPTY): # Cannot remove directory since it's # not empty. pkgplan.salvage(path) elif e.errno == errno.ENOTDIR: # Either the user or another package has changed # this directory into a link or file. Salvage # what's there and drive on. pkgplan.salvage(path) elif e.errno == errno.EBUSY and os.path.ismount(path): # User has replaced directory with mountpoint, # or a package has been poorly implemented. if not self.attrs.get("implicit"): err_txt = _("Unable to remove {0}; it is " "in use as a mountpoint. To " "continue, please unmount the " "filesystem at the target " "location and try again.").format( path) raise apx.ActionExecutionError(self, details=err_txt, error=e, fmri=pkgplan.origin_fmri) elif e.errno == errno.EBUSY: # os.path.ismount() is broken for lofs # filesystems, so give a more generic # error. if not self.attrs.get("implicit"): err_txt = _("Unable to remove {0}; it " "is in use by the system, another " "process, or as a " "mountpoint.").format(path) raise apx.ActionExecutionError(self, details=err_txt, error=e, fmri=pkgplan.origin_fmri) elif e.errno != errno.EACCES: # this happens on Windows raise
def remove_fsobj(self, pkgplan, path): """Shared logic for removing file and link objects.""" # Necessary since removal logic is reused by install. fmri = pkgplan.destination_fmri if not fmri: fmri = pkgplan.origin_fmri try: portable.remove(path) except EnvironmentError as e: if e.errno == errno.ENOENT: # Already gone; don't care. return elif e.errno == errno.EBUSY and os.path.ismount(path): # User has replaced item with mountpoint, or a # package has been poorly implemented. err_txt = _("Unable to remove {0}; it is in use " "as a mountpoint. To continue, please " "unmount the filesystem at the target " "location and try again.").format(path) raise apx.ActionExecutionError(self, details=err_txt, error=e, fmri=fmri) elif e.errno == errno.EBUSY: # os.path.ismount() is broken for lofs # filesystems, so give a more generic # error. err_txt = _("Unable to remove {0}; it is in " "use by the system, another process, or " "as a mountpoint.").format(path) raise apx.ActionExecutionError(self, details=err_txt, error=e, fmri=fmri) elif e.errno == errno.EPERM and \ not stat.S_ISDIR(os.lstat(path).st_mode): # Was expecting a directory in this failure # case, it is not, so raise the error. raise elif e.errno in (errno.EACCES, errno.EROFS): # Raise these permissions exceptions as-is. raise elif e.errno != errno.EPERM: # An unexpected error. raise apx.ActionExecutionError(self, error=e, fmri=fmri) # Attempting to remove a directory as performed above # gives EPERM. First, try to remove the directory, # if it isn't empty, salvage it. try: os.rmdir(path) except OSError as e: if e.errno in (errno.EPERM, errno.EACCES): # Raise permissions exceptions as-is. raise elif e.errno not in (errno.EEXIST, errno.ENOTEMPTY): # An unexpected error. raise apx.ActionExecutionError(self, error=e, fmri=fmri) pkgplan.salvage(path)
def main(): options, days_old = _parse_args() if not os.path.exists(BASE): print >> sys.stderr, "error: '%s' doesn't exist. Make sure you're"\ " running this on the dom0." % BASE sys.exit(1) lockpaths_removed = 0 nspaths_removed = 0 for nsname in os.listdir(BASE)[:options.limit]: nspath = os.path.join(BASE, nsname) if not os.path.isdir(nspath): continue # Remove old lockfiles removed = 0 locknames = os.listdir(nspath) for lockname in locknames: lockpath = os.path.join(nspath, lockname) lock_age_days = _get_age_days(os.path.getmtime(lockpath)) if lock_age_days > days_old: lockpaths_removed += 1 removed += 1 if options.verbose: print 'Removing old lock: %03d %s' % (lock_age_days, lockpath) if not options.dry_run: os.unlink(lockpath) # Remove empty namespace paths if len(locknames) == removed: nspaths_removed += 1 if options.verbose: print 'Removing empty namespace: %s' % nspath if not options.dry_run: try: os.rmdir(nspath) except OSError, e: if e.errno == errno.ENOTEMPTY: print >> sys.stderr, "warning: directory '%s'"\ " not empty" % nspath else: raise if options.dry_run: print "** Dry Run **" print "Total locks removed: ", lockpaths_removed print "Total namespaces removed: ", nspaths_removed