我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用errno.ENOENT。
def log(message, level=None): """Write a message to the juju log""" command = ['juju-log'] if level: command += ['-l', level] if not isinstance(message, six.string_types): message = repr(message) command += [message] # Missing juju-log should not cause failures in unit tests # Send log output to stderr try: subprocess.call(command) except OSError as e: if e.errno == errno.ENOENT: if level: message = "{}: {}".format(level, message) message = "juju-log: {}".format(message) print(message, file=sys.stderr) else: raise
def status_get(): """Retrieve the previously set juju workload state and message If the status-get command is not found then assume this is juju < 1.23 and return 'unknown', "" """ cmd = ['status-get', "--format=json", "--include-data"] try: raw_status = subprocess.check_output(cmd) except OSError as e: if e.errno == errno.ENOENT: return ('unknown', "") else: raise else: status = json.loads(raw_status.decode("UTF-8")) return (status["status"], status["message"])
def monitor_key_exists(service, key): """ Searches for the existence of a key in the monitor cluster. :param service: six.string_types. The Ceph user name to run the command under :param key: six.string_types. The key to search for :return: Returns True if the key exists, False if not and raises an exception if an unknown error occurs. :raise: CalledProcessError if an unknown error occurs """ try: check_call( ['ceph', '--id', service, 'config-key', 'exists', str(key)]) # I can return true here regardless because Ceph returns # ENOENT if the key wasn't found return True except CalledProcessError as e: if e.returncode == errno.ENOENT: return False else: log("Unknown error from ceph config-get exists: {} {}".format( e.returncode, e.output)) raise
def run_command(self, cmd, show_stdout=True, cwd=None, on_returncode='raise', command_desc=None, extra_environ=None, spinner=None): """ Run a VCS subcommand This is simply a wrapper around call_subprocess that adds the VCS command name, and checks that the VCS is available """ cmd = [self.name] + cmd try: return call_subprocess(cmd, show_stdout, cwd, on_returncode, command_desc, extra_environ, spinner) except OSError as e: # errno.ENOENT = no such file or directory # In other words, the VCS executable isn't available if e.errno == errno.ENOENT: raise BadCommand('Cannot find command %r' % self.name) else: raise # re-raise exception if a different error occurred
def add_metric(*args, **kwargs): """Add metric values. Values may be expressed with keyword arguments. For metric names containing dashes, these may be expressed as one or more 'key=value' positional arguments. May only be called from the collect-metrics hook.""" _args = ['add-metric'] _kvpairs = [] _kvpairs.extend(args) _kvpairs.extend(['{}={}'.format(k, v) for k, v in kwargs.items()]) _args.extend(sorted(_kvpairs)) try: subprocess.check_call(_args) return except EnvironmentError as e: if e.errno != errno.ENOENT: raise log_message = 'add-metric failed: {}'.format(' '.join(_kvpairs)) log(log_message, level='INFO')
def make_filesystem(blk_device, fstype='ext4', timeout=10): """Make a new filesystem on the specified block device.""" count = 0 e_noent = os.errno.ENOENT while not os.path.exists(blk_device): if count >= timeout: log('Gave up waiting on block device %s' % blk_device, level=ERROR) raise IOError(e_noent, os.strerror(e_noent), blk_device) log('Waiting for block device %s to appear' % blk_device, level=DEBUG) count += 1 time.sleep(1) else: log('Formatting block device %s as filesystem %s.' % (blk_device, fstype), level=INFO) check_call(['mkfs', '-t', fstype, blk_device])
def __init__(self, conn_or_path, calendar, daily_bar_reader, overwrite=False): if isinstance(conn_or_path, sqlite3.Connection): self.conn = conn_or_path elif isinstance(conn_or_path, str): if overwrite and exists(conn_or_path): try: remove(conn_or_path) except OSError as e: if e.errno != ENOENT: raise self.conn = sqlite3.connect(conn_or_path) else: raise TypeError("Unknown connection type %s" % type(conn_or_path)) self._daily_bar_reader = daily_bar_reader self._calendar = calendar
def __init__(self, path, factory=None, create=True): """Initialize a single-file mailbox.""" Mailbox.__init__(self, path, factory, create) try: f = open(self._path, 'rb+') except IOError, e: if e.errno == errno.ENOENT: if create: f = open(self._path, 'wb+') else: raise NoSuchMailboxError(self._path) elif e.errno in (errno.EACCES, errno.EROFS): f = open(self._path, 'rb') else: raise self._file = f self._toc = None self._next_key = 0 self._pending = False # No changes require rewriting the file. self._locked = False self._file_length = None # Used to record mailbox size
def __setitem__(self, key, message): """Replace the keyed message; raise KeyError if it doesn't exist.""" path = os.path.join(self._path, str(key)) try: f = open(path, 'rb+') except IOError, e: if e.errno == errno.ENOENT: raise KeyError('No message with key: %s' % key) else: raise try: if self._locked: _lock_file(f) try: os.close(os.open(path, os.O_WRONLY | os.O_TRUNC)) self._dump_message(message, f) if isinstance(message, MHMessage): self._dump_sequences(message, key) finally: if self._locked: _unlock_file(f) finally: _sync_close(f)
def get_string(self, key): """Return a string representation or raise a KeyError.""" try: if self._locked: f = open(os.path.join(self._path, str(key)), 'r+') else: f = open(os.path.join(self._path, str(key)), 'r') except IOError, e: if e.errno == errno.ENOENT: raise KeyError('No message with key: %s' % key) else: raise try: if self._locked: _lock_file(f) try: return f.read() finally: if self._locked: _unlock_file(f) finally: f.close()
def _findLib_gcc(name): expr = r'[^\(\)\s]*lib%s\.[^\(\)\s]*' % re.escape(name) fdout, ccout = tempfile.mkstemp() os.close(fdout) cmd = 'if type gcc >/dev/null 2>&1; then CC=gcc; elif type cc >/dev/null 2>&1; then CC=cc;else exit 10; fi;' \ '$CC -Wl,-t -o ' + ccout + ' 2>&1 -l' + name try: f = os.popen(cmd) try: trace = f.read() finally: rv = f.close() finally: try: os.unlink(ccout) except OSError, e: if e.errno != errno.ENOENT: raise if rv == 10: raise OSError, 'gcc or cc command not found' res = re.search(expr, trace) if not res: return None return res.group(0)
def pquery(command, stdin=None, **kwargs): if very_verbose: info('Query "'+' '.join(command)+'" in '+getcwd()) try: proc = subprocess.Popen(command, stdout=subprocess.PIPE, stderr=subprocess.PIPE, **kwargs) except OSError as e: if e[0] == errno.ENOENT: error( "Could not execute \"%s\".\n" "Please verify that it's installed and accessible from your current path by executing \"%s\".\n" % (command[0], command[0]), e[0]) else: raise e stdout, _ = proc.communicate(stdin) if very_verbose: log(str(stdout).strip()+"\n") if proc.returncode != 0: raise ProcessException(proc.returncode, command[0], ' '.join(command), getcwd()) return stdout
def make_subprocess(cmdline, stdout=False, stderr=False, stdin=False, universal_newlines=False, close_fds=True, env=None): """Make a subprocess according to the given command-line string""" LOG.info("Running cmd '%s'", " ".join(cmdline)) kwargs = {} kwargs['stdout'] = stdout and subprocess.PIPE or None kwargs['stderr'] = stderr and subprocess.PIPE or None kwargs['stdin'] = stdin and subprocess.PIPE or None kwargs['universal_newlines'] = universal_newlines kwargs['close_fds'] = close_fds kwargs['env'] = env try: proc = subprocess.Popen(cmdline, **kwargs) except OSError, e: # noqa if e.errno == errno.ENOENT: raise CommandNotFound else: raise return proc
def safe_listdir(path): """ Attempt to list contents of path, but suppress some exceptions. """ try: return os.listdir(path) except (PermissionError, NotADirectoryError): pass except OSError as e: # Ignore the directory if does not exist, not a directory or # permission denied ignorable = ( e.errno in (errno.ENOTDIR, errno.EACCES, errno.ENOENT) # Python 2 on Windows needs to be handled this way :( or getattr(e, "winerror", None) == 267 ) if not ignorable: raise return ()
def _get_candidates(self, link, package_name): can_not_cache = ( not self.cache_dir or not package_name or not link ) if can_not_cache: return [] canonical_name = canonicalize_name(package_name) formats = index.fmt_ctl_formats( self.format_control, canonical_name ) if not self.allowed_formats.intersection(formats): return [] root = self.get_path_for_link(link) try: return os.listdir(root) except OSError as err: if err.errno in {errno.ENOENT, errno.ENOTDIR}: return [] raise
def copy(self): "Copy the file to the local directory" fpi= open(self.filename, "rb") fpo_filename= os.path.join( self.destination, os.path.basename(self.filename)) try: fpo= open(fpo_filename, "r+b") except IOError, exc: if exc.errno == errno.ENOENT: fpo= open(fpo_filename, "wb") else: raise try: self.phase_copy(fpi, fpo, self.phase1, self.phase2) self.phase_copy(fpi, fpo, self.phase2, self.phase3) finally: self.record_state()
def do_install(self, src, tgt, **kw): """See :py:meth:`waflib.Build.InstallContext.do_install`""" if not self.progress_bar: Logs.info('- remove %s' % tgt) self.uninstall.append(tgt) try: os.remove(tgt) except OSError as e: if e.errno != errno.ENOENT: if not getattr(self, 'uninstall_error', None): self.uninstall_error = True Logs.warn('build: some files could not be uninstalled (retry with -vv to list them)') if Logs.verbose > 1: Logs.warn('Could not remove %s (error code %r)' % (e.filename, e.errno)) self.rm_empty_dirs(tgt)