我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用errno.ENOSYS。
def rlimit(self, resource, limits=None): # If pid is 0 prlimit() applies to the calling process and # we don't want that. We should never get here though as # PID 0 is not supported on Linux. if self.pid == 0: raise ValueError("can't use prlimit() against PID 0 process") try: if limits is None: # get return cext.linux_prlimit(self.pid, resource) else: # set if len(limits) != 2: raise ValueError( "second argument must be a (soft, hard) tuple, " "got %s" % repr(limits)) soft, hard = limits cext.linux_prlimit(self.pid, resource, soft, hard) except OSError as err: if err.errno == errno.ENOSYS and pid_exists(self.pid): # I saw this happening on Travis: # https://travis-ci.org/giampaolo/psutil/jobs/51368273 raise ZombieProcess(self.pid, self._name, self._ppid) else: raise
def test_select_module_defines_does_not_implement_poll(self): # This test is to make sure that if a platform defines # a selector as being available but does not actually # implement it. # Reset the _DEFAULT_SELECTOR value as if using for the first time. selectors2._DEFAULT_SELECTOR = None # Now we're going to patch in a bad `poll`. class BadPoll(object): def poll(self, timeout): raise OSError(errno.ENOSYS) # Remove all selectors except `select.select` and replace `select.poll`. patch_select_module(self, 'select', poll=BadPoll) selector = self.make_selector() self.assertIsInstance(selector, selectors2.SelectSelector)
def test_rmlinkError(self): """ An exception raised by L{rmlink} other than C{ENOENT} is passed up to the caller of L{FilesystemLock.lock}. """ def fakeRmlink(name): raise OSError(errno.ENOSYS, None) self.patch(lockfile, 'rmlink', fakeRmlink) def fakeKill(pid, signal): if signal != 0: raise OSError(errno.EPERM, None) if pid == 43125: raise OSError(errno.ESRCH, None) self.patch(lockfile, 'kill', fakeKill) lockf = self.mktemp() # Make it appear locked so it has to use readlink lockfile.symlink(str(43125), lockf) lock = lockfile.FilesystemLock(lockf) exc = self.assertRaises(OSError, lock.lock) self.assertEqual(exc.errno, errno.ENOSYS) self.assertFalse(lock.locked)
def _lock(self): """Lock the entire multistore.""" self._thread_lock.acquire() try: self._file.open_and_lock() except IOError as e: if e.errno == errno.ENOSYS: logger.warn('File system does not support locking the ' 'credentials file.') elif e.errno == errno.ENOLCK: logger.warn('File system is out of resources for writing the ' 'credentials file (is your disk full?).') elif e.errno == errno.EDEADLK: logger.warn('Lock contention on multistore file, opening ' 'in read-only mode.') elif e.errno == errno.EACCES: logger.warn('Cannot access credentials file.') else: raise if not self._file.is_locked(): self._read_only = True if self._warn_on_readonly: logger.warn('The credentials file (%s) is not writable. ' 'Opening in read-only mode. Any refreshed ' 'credentials will only be ' 'valid for this run.', self._file.filename()) if os.path.getsize(self._file.filename()) == 0: logger.debug('Initializing empty multistore file') # The multistore is empty so write out an empty file. self._data = {} self._write() elif not self._read_only or self._data is None: # Only refresh the data if we are read/write or we haven't # cached the data yet. If we are readonly, we assume is isn't # changing out from under us and that we only have to read it # once. This prevents us from whacking any new access keys that # we have cached in memory but were unable to write out. self._refresh_data_cache()
def test_select_module_defines_does_not_implement_epoll(self): # Same as above test except with `select.epoll`. # Reset the _DEFAULT_SELECTOR value as if using for the first time. selectors2._DEFAULT_SELECTOR = None # Now we're going to patch in a bad `epoll`. def bad_epoll(*args, **kwargs): raise OSError(errno.ENOSYS) # Remove all selectors except `select.select` and replace `select.epoll`. patch_select_module(self, 'select', epoll=bad_epoll) selector = self.make_selector() self.assertIsInstance(selector, selectors2.SelectSelector)
def _lock(self): """Lock the entire multistore.""" self._thread_lock.acquire() try: self._file.open_and_lock() except (IOError, OSError) as e: if e.errno == errno.ENOSYS: logger.warn('File system does not support locking the ' 'credentials file.') elif e.errno == errno.ENOLCK: logger.warn('File system is out of resources for writing the ' 'credentials file (is your disk full?).') elif e.errno == errno.EDEADLK: logger.warn('Lock contention on multistore file, opening ' 'in read-only mode.') elif e.errno == errno.EACCES: logger.warn('Cannot access credentials file.') else: raise if not self._file.is_locked(): self._read_only = True if self._warn_on_readonly: logger.warn('The credentials file (%s) is not writable. ' 'Opening in read-only mode. Any refreshed ' 'credentials will only be ' 'valid for this run.', self._file.filename()) if os.path.getsize(self._file.filename()) == 0: logger.debug('Initializing empty multistore file') # The multistore is empty so write out an empty file. self._data = {} self._write() elif not self._read_only or self._data is None: # Only refresh the data if we are read/write or we haven't # cached the data yet. If we are readonly, we assume is isn't # changing out from under us and that we only have to read it # once. This prevents us from whacking any new access keys that # we have cached in memory but were unable to write out. self._refresh_data_cache()
def _lock(self): """Lock the entire multistore.""" self._thread_lock.acquire() try: self._file.open_and_lock() except IOError as e: if e.errno == errno.ENOSYS: logger.warn('File system does not support locking the ' 'credentials file.') elif e.errno == errno.ENOLCK: logger.warn('File system is out of resources for writing the ' 'credentials file (is your disk full?).') else: raise if not self._file.is_locked(): self._read_only = True if self._warn_on_readonly: logger.warn('The credentials file (%s) is not writable. ' 'Opening in read-only mode. Any refreshed ' 'credentials will only be ' 'valid for this run.', self._file.filename()) if os.path.getsize(self._file.filename()) == 0: logger.debug('Initializing empty multistore file') # The multistore is empty so write out an empty file. self._data = {} self._write() elif not self._read_only or self._data is None: # Only refresh the data if we are read/write or we haven't # cached the data yet. If we are readonly, we assume is isn't # changing out from under us and that we only have to read it # once. This prevents us from whacking any new access keys that # we have cached in memory but were unable to write out. self._refresh_data_cache()
def test_statvfs_result_pickle(self): try: result = os.statvfs(self.fname) except OSError as e: # On AtheOS, glibc always returns ENOSYS if e.errno == errno.ENOSYS: self.skipTest('os.statvfs() failed with ENOSYS') for proto in range(pickle.HIGHEST_PROTOCOL + 1): p = pickle.dumps(result, proto) self.assertIn(b'statvfs_result', p) if proto < 4: self.assertIn(b'cos\nstatvfs_result\n', p) unpickled = pickle.loads(p) self.assertEqual(result, unpickled)
def os_error_not_implemented(*unused_args, **unused_kwargs): raise OSError(errno.ENOSYS, 'Function not implemented')
def test_os_error_not_implemented(self): with self.assertRaises(OSError) as cm: stubs.os_error_not_implemented() self.mox.VerifyAll() e = cm.exception self.assertEqual(errno.ENOSYS, e.errno) self.assertEqual('Function not implemented', e.strerror) self.assertIsNone(e.filename)
def _lock(self): """Lock the entire multistore.""" self._thread_lock.acquire() try: self._file.open_and_lock() except IOError as e: if e.errno == errno.ENOSYS: logger.warn('File system does not support locking the credentials ' 'file.') elif e.errno == errno.ENOLCK: logger.warn('File system is out of resources for writing the ' 'credentials file (is your disk full?).') else: raise if not self._file.is_locked(): self._read_only = True if self._warn_on_readonly: logger.warn('The credentials file (%s) is not writable. Opening in ' 'read-only mode. Any refreshed credentials will only be ' 'valid for this run.', self._file.filename()) if os.path.getsize(self._file.filename()) == 0: logger.debug('Initializing empty multistore file') # The multistore is empty so write out an empty file. self._data = {} self._write() elif not self._read_only or self._data is None: # Only refresh the data if we are read/write or we haven't # cached the data yet. If we are readonly, we assume is isn't # changing out from under us and that we only have to read it # once. This prevents us from whacking any new access keys that # we have cached in memory but were unable to write out. self._refresh_data_cache()
def open(self, mode): """ If the mode is r+, return a L{_ReadFile} with the contents given to this path's initializer. @raise OSError: If the mode is unsupported. @return: A L{_ReadFile} instance """ if mode == "rb+": return _ReadFile(self.contents) raise OSError(ENOSYS, "Function not implemented")
def test_symlinkError(self): """ An exception raised by C{symlink} other than C{EEXIST} is passed up to the caller of L{FilesystemLock.lock}. """ self._symlinkErrorTest(errno.ENOSYS)
def test_readlinkError(self): """ An exception raised by C{readlink} other than C{ENOENT} is passed up to the caller of L{FilesystemLock.lock}. """ self._readlinkErrorTest(OSError, errno.ENOSYS) self._readlinkErrorTest(IOError, errno.ENOSYS)
def test_readlinkErrorPOSIX(self): """ Any L{IOError} raised by C{readlink} on a POSIX platform passed to the caller of L{FilesystemLock.lock}. On POSIX, unlike on Windows, these are unexpected errors which cannot be handled by L{FilesystemLock}. """ self._readlinkErrorTest(IOError, errno.ENOSYS) self._readlinkErrorTest(IOError, errno.EACCES)
def test_statvfs_result_pickle(self): try: result = os.statvfs(self.fname) except OSError as e: # On AtheOS, glibc always returns ENOSYS if e.errno == errno.ENOSYS: self.skipTest('os.statvfs() failed with ENOSYS') p = pickle.dumps(result) self.assertIn(b'\x03cos\nstatvfs_result\n', p) unpickled = pickle.loads(p) self.assertEqual(result, unpickled)
def _lock(self): """Lock the entire multistore.""" self._thread_lock.acquire() try: self._file.open_and_lock() except IOError as e: if e.errno == errno.ENOSYS: logger.warn('File system does not support locking the ' 'credentials file.') elif e.errno == errno.ENOLCK: logger.warn('File system is out of resources for writing the ' 'credentials file (is your disk full?).') elif e.errno == errno.EDEADLK: logger.warn('Lock contention on multistore file, opening ' 'in read-only mode.') else: raise if not self._file.is_locked(): self._read_only = True if self._warn_on_readonly: logger.warn('The credentials file (%s) is not writable. ' 'Opening in read-only mode. Any refreshed ' 'credentials will only be ' 'valid for this run.', self._file.filename()) if os.path.getsize(self._file.filename()) == 0: logger.debug('Initializing empty multistore file') # The multistore is empty so write out an empty file. self._data = {} self._write() elif not self._read_only or self._data is None: # Only refresh the data if we are read/write or we haven't # cached the data yet. If we are readonly, we assume is isn't # changing out from under us and that we only have to read it # once. This prevents us from whacking any new access keys that # we have cached in memory but were unable to write out. self._refresh_data_cache()
def statfs(self): tracing.trace('called') client_name = self.obnam.app.settings['client-name'] total_data = sum( self.obnam.repo.get_generation_key( gen, obnamlib.REPO_GENERATION_TOTAL_DATA) for gen in self.obnam.repo.get_client_generation_ids(client_name)) files = sum( self.obnam.repo.get_generation_key( gen, obnamlib.REPO_GENERATION_FILE_COUNT) for gen in self.obnam.repo.get_client_generation_ids(client_name)) stv = fuse.StatVfs() stv.f_bsize = 65536 stv.f_frsize = 0 stv.f_blocks = total_data / 65536 stv.f_bfree = 0 stv.f_bavail = 0 stv.f_files = files stv.f_ffree = 0 stv.f_favail = 0 stv.f_flag = 0 stv.f_namemax = 255 # raise OSError(errno.ENOSYS, 'Unimplemented') return stv
def __call__(self, books_needed): '''Look up the appropriate routine or throw an error''' self.LCEobj.errno = errno.ENOSYS try: policy_func = self.__class__.__dict__['_policy_' + self.name] return policy_func(self, books_needed) except KeyError as e: # AssertionError is a "gentler" reporting path back to user raise AssertionError('"%s" is not implemented' % self.name) ########################################################################### # This is NOT for testing, just a quick entry without the full Librarian.
def read(self, shelf, length, offset, fd): raise TmfsOSError(errno.ENOSYS)
def write(self, shelf, buf, offset, fd): raise TmfsOSError(errno.ENOSYS)
def setxattr(self, path, xattr, valbytes, flags, position=0): # flags from linux/xattr.h: XATTR_CREATE = 1, XATTR_REPLACE = 2 if flags or position: raise TmfsOSError(errno.ENOSYS) # haven't actually seen it yet # 'Extend' user.xxxx syntax and screen for it here elems = xattr.split('.') if elems[0] != 'user' or len(elems) < 2: raise TmfsOSError(errno.EINVAL) # Don't forget the setfattr command, and the shell it runs in, does # things to a "numeric" argument. setfattr processes a leading # 0x and does a byte-by-byte conversion, yielding a byte array. # It needs pairs of digits and can be of arbitrary length. Any # other argument ends up here as a pure string (well, byte array). try: value = valbytes.decode() except ValueError as e: # http://stackoverflow.com/questions/606191/convert-bytes-to-a-python-string value = valbytes.decode('cp437') rsp = self.librarian( self.lcp('set_xattr', path=path, xattr=xattr, value=value)) if rsp is not None: # unexpected raise TmfsOSError(errno.ENOTTY)
def flush(self, path, fh): '''May be called zero, one, or more times per shelf open. It's a chance to report delayed errors, not a syscall passthru.''' return 0 # @prentry # def opendir(self, path, *args, **kwargs): # raise TmfsOSError(errno.ENOSYS)
def fsync(self, path, datasync, fh): raise TmfsOSError(errno.ENOSYS)
def fsyncdir(self, path, datasync, fh): raise TmfsOSError(errno.ENOSYS)
def bmap(self, path, blocksize, blockno): '''Only if "target" is a filesystem on a block device. Convert file-relative blockno to device-relative block.''' raise TmfsOSError(errno.ENOSYS)
def chmod(self, path, mode, **kwargs): raise TmfsOSError(errno.ENOSYS)
def link(self, target, name): raise TmfsOSError(errno.ENOSYS)
def test_statvfs_attributes(self): if not hasattr(os, "statvfs"): return try: result = os.statvfs(self.fname) except OSError as e: # On AtheOS, glibc always returns ENOSYS if e.errno == errno.ENOSYS: return # Make sure direct access works self.assertEqual(result.f_bfree, result[3]) # Make sure all the attributes are there. members = ('bsize', 'frsize', 'blocks', 'bfree', 'bavail', 'files', 'ffree', 'favail', 'flag', 'namemax') for value, member in enumerate(members): self.assertEqual(getattr(result, 'f_' + member), result[value]) # Make sure that assignment really fails try: result.f_bfree = 1 self.fail("No exception thrown") except AttributeError: pass try: result.parrot = 1 self.fail("No exception thrown") except AttributeError: pass # Use the constructor with a too-short tuple. try: result2 = os.statvfs_result((10,)) self.fail("No exception thrown") except TypeError: pass # Use the constructor with a too-long tuple. try: result2 = os.statvfs_result((0,1,2,3,4,5,6,7,8,9,10,11,12,13,14)) except TypeError: pass
def test_statvfs_attributes(self): try: result = os.statvfs(self.fname) except OSError, e: # On AtheOS, glibc always returns ENOSYS if e.errno == errno.ENOSYS: self.skipTest('glibc always returns ENOSYS on AtheOS') # Make sure direct access works self.assertEqual(result.f_bfree, result[3]) # Make sure all the attributes are there. members = ('bsize', 'frsize', 'blocks', 'bfree', 'bavail', 'files', 'ffree', 'favail', 'flag', 'namemax') for value, member in enumerate(members): self.assertEqual(getattr(result, 'f_' + member), result[value]) # Make sure that assignment really fails try: result.f_bfree = 1 self.fail("No exception raised") except TypeError: pass try: result.parrot = 1 self.fail("No exception raised") except AttributeError: pass # Use the constructor with a too-short tuple. try: result2 = os.statvfs_result((10,)) self.fail("No exception raised") except TypeError: pass # Use the constructor with a too-long tuple. try: result2 = os.statvfs_result((0,1,2,3,4,5,6,7,8,9,10,11,12,13,14)) except TypeError: pass
def test_statvfs_attributes(self): if not hasattr(os, "statvfs"): return try: result = os.statvfs(self.fname) except OSError as e: # On AtheOS, glibc always returns ENOSYS if e.errno == errno.ENOSYS: return # Make sure direct access works self.assertEqual(result.f_bfree, result[3]) # Make sure all the attributes are there. members = ('bsize', 'frsize', 'blocks', 'bfree', 'bavail', 'files', 'ffree', 'favail', 'flag', 'namemax') for value, member in enumerate(members): self.assertEqual(getattr(result, 'f_' + member), result[value]) # Make sure that assignment really fails try: result.f_bfree = 1 self.fail("No exception raised") except AttributeError: pass try: result.parrot = 1 self.fail("No exception raised") except AttributeError: pass # Use the constructor with a too-short tuple. try: result2 = os.statvfs_result((10,)) self.fail("No exception raised") except TypeError: pass # Use the constructor with a too-long tuple. try: result2 = os.statvfs_result((0,1,2,3,4,5,6,7,8,9,10,11,12,13,14)) except TypeError: pass
def test_statvfs_attributes(self): try: result = os.statvfs(self.fname) except OSError, e: # On AtheOS, glibc always returns ENOSYS if e.errno == errno.ENOSYS: self.skipTest('glibc always returns ENOSYS on AtheOS') # Make sure direct access works self.assertEqual(result.f_bfree, result[3]) # Make sure all the attributes are there. members = ('bsize', 'frsize', 'blocks', 'bfree', 'bavail', 'files', 'ffree', 'favail', 'flag', 'namemax') for value, member in enumerate(members): self.assertEqual(getattr(result, 'f_' + member), result[value]) # Make sure that assignment really fails try: result.f_bfree = 1 self.fail("No exception raised") except (TypeError, AttributeError): pass try: result.parrot = 1 self.fail("No exception raised") except AttributeError: pass # Use the constructor with a too-short tuple. try: result2 = os.statvfs_result((10,)) self.fail("No exception raised") except TypeError: pass # Use the constructor with a too-long tuple. try: result2 = os.statvfs_result((0,1,2,3,4,5,6,7,8,9,10,11,12,13,14)) except TypeError: pass
def test_statvfs_attributes(self): try: result = os.statvfs(self.fname) except OSError as e: # On AtheOS, glibc always returns ENOSYS if e.errno == errno.ENOSYS: self.skipTest('os.statvfs() failed with ENOSYS') # Make sure direct access works self.assertEqual(result.f_bfree, result[3]) # Make sure all the attributes are there. members = ('bsize', 'frsize', 'blocks', 'bfree', 'bavail', 'files', 'ffree', 'favail', 'flag', 'namemax') for value, member in enumerate(members): self.assertEqual(getattr(result, 'f_' + member), result[value]) # Make sure that assignment really fails try: result.f_bfree = 1 self.fail("No exception raised") except AttributeError: pass try: result.parrot = 1 self.fail("No exception raised") except AttributeError: pass # Use the constructor with a too-short tuple. try: result2 = os.statvfs_result((10,)) self.fail("No exception raised") except TypeError: pass # Use the constructor with a too-long tuple. try: result2 = os.statvfs_result((0,1,2,3,4,5,6,7,8,9,10,11,12,13,14)) except TypeError: pass
def test_statvfs_attributes(self): if not hasattr(os, "statvfs"): return try: result = os.statvfs(self.fname) except OSError, e: # On AtheOS, glibc always returns ENOSYS if e.errno == errno.ENOSYS: return # Make sure direct access works self.assertEqual(result.f_bfree, result[3]) # Make sure all the attributes are there. members = ('bsize', 'frsize', 'blocks', 'bfree', 'bavail', 'files', 'ffree', 'favail', 'flag', 'namemax') for value, member in enumerate(members): self.assertEqual(getattr(result, 'f_' + member), result[value]) # Make sure that assignment really fails try: result.f_bfree = 1 self.fail("No exception raised") except TypeError: pass try: result.parrot = 1 self.fail("No exception raised") except AttributeError: pass # Use the constructor with a too-short tuple. try: result2 = os.statvfs_result((10,)) self.fail("No exception raised") except TypeError: pass # Use the constructor with a too-long tuple. try: result2 = os.statvfs_result((0,1,2,3,4,5,6,7,8,9,10,11,12,13,14)) except TypeError: pass
def getxattr(self, path, xattr, position=0): """Called with a specific namespace.name xattr. Can return either a bytes array OR an int.""" if position: raise TmfsOSError(errno.ENOSYS) # never saw this in 8 months rsp = self.librarian(self.lcp('get_shelf', path=path)) shelf = TMShelf(rsp) # Does this also need changed to support path instead of name? # Piggy back for queries by kernel (globals & fault handling). if xattr.startswith('_obtain_'): # this will need some work data = self.shadow.getxattr(shelf, xattr) try: return bytes(data.encode()) except AttributeError as e: # probably the "encode()" self._ret_is_string = False return bytes(data) # "ls" starts with simple getattr but then comes here for # security.selinux, system.posix_acl_access, and posix_acl_default. # ls -l can also do the same thing on '/'. Save the round trips. # if xattr.startswith('security.') or not shelf_name: # path == '/' if xattr.startswith('security.'): # path == '/' is legal now return bytes(0) try: rsp = self.librarian( self.lcp('get_xattr', path=path, xattr=xattr)) value = rsp['value'] assert value is not None # 'No such attribute' if isinstance(value, int): return value elif isinstance(value, str): # http://stackoverflow.com/questions/606191/convert-bytes-to-a-python-string return bytes(value.encode('cp437')) else: bytes(value.encode()) except Exception as e: raise TmfsOSError(errno.ENODATA) # syn for ENOATTR