我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用errno.ENOTDIR。
def safe_listdir(path): """ Attempt to list contents of path, but suppress some exceptions. """ try: return os.listdir(path) except (PermissionError, NotADirectoryError): pass except OSError as e: # Ignore the directory if does not exist, not a directory or # permission denied ignorable = ( e.errno in (errno.ENOTDIR, errno.EACCES, errno.ENOENT) # Python 2 on Windows needs to be handled this way :( or getattr(e, "winerror", None) == 267 ) if not ignorable: raise return ()
def _get_candidates(self, link, package_name): can_not_cache = ( not self.cache_dir or not package_name or not link ) if can_not_cache: return [] canonical_name = canonicalize_name(package_name) formats = index.fmt_ctl_formats( self.format_control, canonical_name ) if not self.allowed_formats.intersection(formats): return [] root = self.get_path_for_link(link) try: return os.listdir(root) except OSError as err: if err.errno in {errno.ENOENT, errno.ENOTDIR}: return [] raise
def install_other(subdir): cwd = os.getcwd() path = os.path.join(cwd, subdir) try: os.chdir(path) except OSError, error: if error.errno not in (errno.ENOENT, errno.ENOTDIR): raise print >> sys.stderr, "Could not find directory %r" % path return try: module_info = imp.find_module("setup", ["."]) imp.load_module("setup", *module_info) finally: os.chdir(cwd)
def copy(src, dst): """ Handle the copying of a file or directory. The destination basedir _must_ exist. :param src: A string containing the path of the source to copy. If the source ends with a '/', will become a recursive directory copy of source. :param dst: A string containing the path to the destination. If the destination ends with a '/', will copy into the target directory. :return: None """ try: shutil.copytree(src, dst) except OSError as exc: if exc.errno == errno.ENOTDIR: shutil.copy(src, dst) else: raise
def process_item(self, source, destination): ''' Backup a single item ''' try: copytree(source, destination, ignore=ignore_patterns(*self.ignored_extensions)) except OSError as e: if e.errno == ENOTDIR: try: copy(source, destination) except: log.update_log("Error processing file <%s>: <%s>" % (source, e)) else: self.log.update_log("Source <%s> could not be copied to <%s>: <%s>" % (source, destination, e)) except BaseException as e: self.log.update_log("Unkown error copying <%s>: <%s>" % (source, e))
def set_directory(self, directory): """Set the directory where the downloaded file will be placed. May raise OSError if the supplied directory path is not suitable. """ if not path.exists(directory): raise OSError(errno.ENOENT, "You see no directory there.", directory) if not path.isdir(directory): raise OSError(errno.ENOTDIR, "You cannot put a file into " "something which is not a directory.", directory) if not os.access(directory, os.X_OK | os.W_OK): raise OSError(errno.EACCES, "This directory is too hard to write in to.", directory) self.destDir = directory
def watch(self, path): ''' Register a watch for the file/directory named path. Raises an OSError if path does not exist. ''' path = realpath(path) with self.lock: if path not in self.watches: bpath = path if isinstance(path, bytes) else path.encode(self.fenc) flags = self.MOVE_SELF | self.DELETE_SELF buf = ctypes.c_char_p(bpath) # Try watching path as a directory wd = self._add_watch(self._inotify_fd, buf, flags | self.ONLYDIR) if wd == -1: eno = ctypes.get_errno() if eno != errno.ENOTDIR: self.handle_error() # Try watching path as a file flags |= (self.MODIFY | self.ATTRIB) wd = self._add_watch(self._inotify_fd, buf, flags) if wd == -1: self.handle_error() self.watches[path] = wd self.modified[path] = False
def add_watch(self, path): bpath = path if isinstance(path, bytes) else path.encode(self.fenc) wd = self._add_watch( self._inotify_fd, ctypes.c_char_p(bpath), # Ignore symlinks and watch only directories self.DONT_FOLLOW | self.ONLYDIR | self.MODIFY | self.CREATE | self.DELETE | self.MOVE_SELF | self.MOVED_FROM | self.MOVED_TO | self.ATTRIB | self.DELETE_SELF ) if wd == -1: eno = ctypes.get_errno() if eno == errno.ENOTDIR: return False raise OSError(eno, 'Failed to add watch for: {0}: {1}'.format(path, self.os.strerror(eno))) self.watched_dirs[path] = wd self.watched_rmap[wd] = path return True
def convert_errno(e): """ Convert an errno value (as from an C{OSError} or C{IOError}) into a standard SFTP result code. This is a convenience function for trapping exceptions in server code and returning an appropriate result. @param e: an errno code, as from C{OSError.errno}. @type e: int @return: an SFTP error code like L{SFTP_NO_SUCH_FILE}. @rtype: int """ if e == errno.EACCES: # permission denied return SFTP_PERMISSION_DENIED elif (e == errno.ENOENT) or (e == errno.ENOTDIR): # no such file return SFTP_NO_SUCH_FILE else: return SFTP_FAILURE
def chdir(self, path): """ Change the "current directory" of this SFTP session. Since SFTP doesn't really have the concept of a current working directory, this is emulated by paramiko. Once you use this method to set a working directory, all operations on this SFTPClient object will be relative to that path. You can pass in C{None} to stop using a current working directory. @param path: new current working directory @type path: str @raise IOError: if the requested path doesn't exist on the server @since: 1.4 """ if path is None: self._cwd = None return if not stat.S_ISDIR(self.stat(path).st_mode): raise SFTPError(errno.ENOTDIR, "%s: %s" % (os.strerror(errno.ENOTDIR), path)) self._cwd = self.normalize(path).encode('utf-8')
def register(self,name,URI): origname,name=name,self.validateName(name) URI=self.validateURI(URI) fn=self.translate(name) self.lock.acquire() try: if os.access(fn,os.R_OK): Log.msg('NameServer','name already exists:',name) raise Pyro.errors.NamingError('name already exists',name) try: open(fn,'w').write(str(URI)+'\n') self._dosynccall("register",origname,URI) Log.msg('NameServer','registered',name,'with URI',str(URI)) except IOError,x: if x.errno==errno.ENOENT: raise Pyro.errors.NamingError('(parent)group not found') elif x.errno==errno.ENOTDIR: raise Pyro.errors.NamingError('parent is no group') else: raise Pyro.errors.NamingError(str(x)) finally: self.lock.release()
def deleteGroup(self,groupname): groupname=self.validateName(groupname) if groupname==':': Log.msg('NameServer','attempt to deleteGroup root group') raise Pyro.errors.NamingError('not allowed to delete root group') dirnam = self.translate(groupname) self.lock.acquire() try: if not os.access(dirnam,os.R_OK): raise Pyro.errors.NamingError('group not found',groupname) try: shutil.rmtree(dirnam) self._dosynccall("deleteGroup",groupname) Log.msg('NameServer','deleted group',groupname) except OSError,x: if x.errno==errno.ENOENT: raise Pyro.errors.NamingError('group not found',groupname) elif x.errno==errno.ENOTDIR: raise Pyro.errors.NamingError('is no group',groupname) else: raise Pyro.errors.NamingError(str(x)) finally: self.lock.release()
def convert_errno(e): """ Convert an errno value (as from an ``OSError`` or ``IOError``) into a standard SFTP result code. This is a convenience function for trapping exceptions in server code and returning an appropriate result. :param int e: an errno code, as from ``OSError.errno``. :return: an `int` SFTP error code like ``SFTP_NO_SUCH_FILE``. """ if e == errno.EACCES: # permission denied return SFTP_PERMISSION_DENIED elif (e == errno.ENOENT) or (e == errno.ENOTDIR): # no such file return SFTP_NO_SUCH_FILE else: return SFTP_FAILURE
def chdir(self, path=None): """ Change the "current directory" of this SFTP session. Since SFTP doesn't really have the concept of a current working directory, this is emulated by Paramiko. Once you use this method to set a working directory, all operations on this `.SFTPClient` object will be relative to that path. You can pass in ``None`` to stop using a current working directory. :param str path: new current working directory :raises IOError: if the requested path doesn't exist on the server .. versionadded:: 1.4 """ if path is None: self._cwd = None return if not stat.S_ISDIR(self.stat(path).st_mode): raise SFTPError(errno.ENOTDIR, "%s: %s" % (os.strerror(errno.ENOTDIR), path)) self._cwd = b(self.normalize(path))
def make_dest_dir(dest): """Make directories if they do not already exist. Raises: OSError: An error occurred while creating the directories. A specific exception is if a directory that is being created already exists as a file. """ dest = os.path.abspath(dest) try: os.makedirs(dest) except OSError as exc: if exc.errno == errno.ENOTDIR: raise OSError(errno.ENOTDIR, 'a parent directory of \'%s\' already exists as a file' % dest) elif exc.errno != errno.EEXIST or os.path.isfile(dest): raise
def check_path(path, ptype=None, access=None): """Checks that a path exists, is of the specified type, and allows the specified access. Args: ptype: 'f' for file or 'd' for directory. access (int): One of the access values from :module:`os` Raises: IOError if the path does not exist, is not of the specified type, or doesn't allow the specified access. """ if ( ptype == 'f' and not path.startswith("/dev/") and not os.path.isfile(path)): raise IOError(errno.EISDIR, "{} is not a file".format(path), path) elif ptype == 'd' and not os.path.isdir(path): raise IOError(errno.ENOTDIR, "{} is not a directory".format(path), path) elif not os.path.exists(path): raise IOError(errno.ENOENT, "{} does not exist".format(path), path) if access is not None and not os.access(path, access): raise IOError(errno.EACCES, "{} is not accessable".format(path), path) return path
def returns_sftp_error(func): def wrapped(*args, **kwargs): try: return func(*args, **kwargs) except OSError as err: LOG.debug("Error calling %s(%s, %s): %s", func, args, kwargs, err, exc_info=True) errno = err.errno if errno in {EACCES, EDQUOT, EPERM, EROFS}: return paramiko.SFTP_PERMISSION_DENIED if errno in {ENOENT, ENOTDIR}: return paramiko.SFTP_NO_SUCH_FILE return paramiko.SFTP_FAILURE except Exception as err: LOG.debug("Error calling %s(%s, %s): %s", func, args, kwargs, err, exc_info=True) return paramiko.SFTP_FAILURE return wrapped
def cached_wheel(cache_dir, link, format_control, package_name): if not cache_dir: return link if not link: return link if link.is_wheel: return link if not link.is_artifact: return link if not package_name: return link canonical_name = canonicalize_name(package_name) formats = pip.index.fmt_ctl_formats(format_control, canonical_name) if "binary" not in formats: return link root = _cache_for_link(cache_dir, link) try: wheel_names = os.listdir(root) except OSError as e: if e.errno in (errno.ENOENT, errno.ENOTDIR): return link raise candidates = [] for wheel_name in wheel_names: try: wheel = Wheel(wheel_name) except InvalidWheelFilename: continue if not wheel.supported(): # Built for a different python/arch/etc continue candidates.append((wheel.support_index_min(), wheel_name)) if not candidates: return link candidates.sort() path = os.path.join(root, candidates[0][1]) return pip.index.Link(path_to_url(path))
def _execvpe(file, args, env=None): if env is not None: exec_func = execve argrest = (args, env) else: exec_func = execv argrest = (args,) env = environ head, tail = path.split(file) if head: exec_func(file, *argrest) return last_exc = saved_exc = None saved_tb = None path_list = get_exec_path(env) if name != 'nt': file = fsencode(file) path_list = map(fsencode, path_list) for dir in path_list: fullname = path.join(dir, file) try: exec_func(fullname, *argrest) except OSError as e: last_exc = e tb = sys.exc_info()[2] if (e.errno != errno.ENOENT and e.errno != errno.ENOTDIR and saved_exc is None): saved_exc = e saved_tb = tb if saved_exc: raise saved_exc.with_traceback(saved_tb) raise last_exc.with_traceback(tb)
def _execvpe(file, args, env=None): if env is not None: func = execve argrest = (args, env) else: func = execv argrest = (args,) env = environ head, tail = path.split(file) if head: func(file, *argrest) return if 'PATH' in env: envpath = env['PATH'] else: envpath = defpath PATH = envpath.split(pathsep) saved_exc = None saved_tb = None for dir in PATH: fullname = path.join(dir, file) try: func(fullname, *argrest) except error, e: tb = sys.exc_info()[2] if (e.errno != errno.ENOENT and e.errno != errno.ENOTDIR and saved_exc is None): saved_exc = e saved_tb = tb if saved_exc: raise error, saved_exc, saved_tb raise error, e, tb # Change environ to automatically call putenv() if it exists
def copyDirectoy(src, dst, ignorePattern=None): try: if ignorePattern: shutil.copytree(src, dst, ignore=shutil.ignore_patterns(*ignorePattern)) return shutil.copytree(src, dst) except OSError as exc: if exc.errno == errno.ENOTDIR: shutil.copy(src, dst) else: logger.error("Failed to copy directory {} to destination: {}".format(src, dst), exc_info=True) raise
def copy_content(src, dest): if os.path.exists(src): try: if os.path.exists(dest): shutil.rmtree(dest) shutil.copytree(src, dest) except OSError as exc: try: if exc.errno == errno.ENOTDIR: shutil.copy(src, dest) else: raise except OSError as e: print("Couldn't copy: %s" % str(e)) else: print("Source path doesn't exist: %s" % src)
def errnoToFailure(e, path): if e == errno.ENOENT or e == errno.EISDIR: return defer.fail(FileNotFoundError(path)) elif e == errno.EACCES or e == errno.EPERM: return defer.fail(PermissionDeniedError(path)) elif e == errno.ENOTDIR: return defer.fail(IsNotADirectoryError(path)) else: return defer.fail() # Generic VFS exceptions
def copyfolder(src, dst): try: shutil.copytree(src, dst) except FileExistsError as exc: lecho('Warning', exc, red) sys.exit(1) except OSError as exc: if exc.errno == errno.ENOTDIR: shutil.copy(src, dst) else: raise
def add_watches(self, base, top_level=True): ''' Add watches for this directory and all its descendant directories, recursively. ''' base = realpath(base) # There may exist a link which leads to an endless # add_watches loop or to maximum recursion depth exceeded if not top_level and base in self.watched_dirs: return try: is_dir = self.add_watch(base) except OSError as e: if e.errno == errno.ENOENT: # The entry could have been deleted between listdir() and # add_watch(). if top_level: raise NoSuchDir('The dir {0} does not exist'.format(base)) return if e.errno == errno.EACCES: # We silently ignore entries for which we dont have permission, # unless they are the top level dir if top_level: raise NoSuchDir('You do not have permission to monitor {0}'.format(base)) return raise else: if is_dir: try: files = os.listdir(base) except OSError as e: if e.errno in (errno.ENOTDIR, errno.ENOENT): # The dir was deleted/replaced between the add_watch() # and listdir() if top_level: raise NoSuchDir('The dir {0} does not exist'.format(base)) return raise for x in files: self.add_watches(os.path.join(base, x), top_level=False) elif top_level: # The top level dir is a file, not good. raise NoSuchDir('The dir {0} does not exist'.format(base))