我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用os.error()。
def addsitedir(sitedir, known_paths=None): """Add 'sitedir' argument to sys.path if missing and handle .pth files in 'sitedir'""" if known_paths is None: known_paths = _init_pathinfo() reset = 1 else: reset = 0 sitedir, sitedircase = makepath(sitedir) if not sitedircase in known_paths: sys.path.append(sitedir) # Add path component try: names = os.listdir(sitedir) except os.error: return names.sort() for name in names: if name.endswith(os.extsep + "pth"): addpackage(sitedir, name, known_paths) if reset: known_paths = None return known_paths
def _handle_request_noblock(self): """Handle one request, without blocking. I assume that select.select has returned that the socket is readable before this function was called, so there should be no risk of blocking in get_request(). """ try: request, client_address = self.get_request() except socket.error: return if self.verify_request(request, client_address): try: self.process_request(request, client_address) except: self.handle_error(request, client_address) self.shutdown_request(request)
def shutdown_request(self, request): """Called to shutdown and close an individual request.""" try: #explicitly shutdown. socket.close() merely releases #the socket and waits for GC to perform the actual close. request.shutdown(socket.SHUT_WR) except socket.error: pass #some platforms may raise ENOTCONN here self.close_request(request)
def checkcache(filename=None): """Discard cache entries that are out of date. (This is not checked upon each call!)""" if filename is None: filenames = cache.keys() else: if filename in cache: filenames = [filename] else: return for filename in filenames: size, mtime, lines, fullname = cache[filename] if mtime is None: continue # no-op for files loaded via a __loader__ try: stat = os.stat(fullname) except os.error: del cache[filename] continue if size != stat.st_size or mtime != stat.st_mtime: del cache[filename]
def removedirs(name): """removedirs(path) Super-rmdir; remove a leaf directory and all empty intermediate ones. Works like rmdir except that, if the leaf directory is successfully removed, directories corresponding to rightmost path segments will be pruned away until either the whole path is consumed or an error occurs. Errors during this latter phase are ignored -- they generally mean that a directory was not empty. """ rmdir(name) head, tail = path.split(name) if not tail: head, tail = path.split(head) while head and tail: try: rmdir(head) except error: break head, tail = path.split(head)
def renames(old, new): """renames(old, new) Super-rename; create directories as necessary and delete any left empty. Works like rename, except creation of any intermediate directories needed to make the new pathname good is attempted first. After the rename, directories corresponding to rightmost path segments of the old name will be pruned way until either the whole path is consumed or a nonempty directory is found. Note: this function can fail with the new directory structure made if you lack permissions needed to unlink the leaf directory or file. """ head, tail = path.split(new) if head and tail and not path.exists(head): makedirs(head) rename(old, new) head, tail = path.split(old) if head and tail: try: removedirs(head) except error: pass
def _internal_poll(self, _deadstate=None, _waitpid=os.waitpid, _WNOHANG=os.WNOHANG, _os_error=os.error): """Check if child process has terminated. Returns returncode attribute. This method is called by __del__, so it cannot reference anything outside of the local scope (nor can any methods it calls). """ if self.returncode is None: try: pid, sts = _waitpid(self.pid, _WNOHANG) if pid == self.pid: self._handle_exitstatus(sts) except _os_error: if _deadstate is not None: self.returncode = _deadstate return self.returncode
def remove_tree(directory, verbose=1, dry_run=0): """Recursively remove an entire directory tree. Any errors are ignored (apart from being reported to stdout if 'verbose' is true). """ from distutils.util import grok_environment_error global _path_created if verbose >= 1: log.info("removing '%s' (and everything under it)", directory) if dry_run: return cmdtuples = [] _build_cmdtuple(directory, cmdtuples) for cmd in cmdtuples: try: cmd[0](cmd[1]) # remove dir from cache if it's already there abspath = os.path.abspath(cmd[1]) if abspath in _path_created: del _path_created[abspath] except (IOError, OSError), exc: log.warn(grok_environment_error( exc, "error removing %s: " % directory))
def write_file(self, new_text, filename, old_text, encoding): if not self.nobackups: # Make backup backup = filename + ".bak" if os.path.lexists(backup): try: os.remove(backup) except os.error, err: self.log_message("Can't remove backup %s", backup) try: os.rename(filename, backup) except os.error, err: self.log_message("Can't rename %s to %s", filename, backup) # Actually write the new file write = super(StdoutRefactoringTool, self).write_file write(new_text, filename, old_text, encoding) if not self.nobackups: shutil.copymode(backup, filename)
def write_file(self, new_text, filename, old_text, encoding=None): """Writes a string to a file. It first shows a unified diff between the old text and the new text, and then rewrites the file; the latter is only done if the write option is set. """ try: f = _open_with_encoding(filename, "w", encoding=encoding) except os.error as err: self.log_error("Can't create %s: %s", filename, err) return try: f.write(_to_system_newlines(new_text)) except os.error as err: self.log_error("Can't write %s: %s", filename, err) finally: f.close() self.log_debug("Wrote changes to %s", filename) self.wrote = True
def summarize(self): if self.wrote: were = "were" else: were = "need to be" if not self.files: self.log_message("No files %s modified.", were) else: self.log_message("Files that %s modified:", were) for file in self.files: self.log_message(file) if self.fixer_log: self.log_message("Warnings/messages while refactoring:") for message in self.fixer_log: self.log_message(message) if self.errors: if len(self.errors) == 1: self.log_message("There was 1 error:") else: self.log_message("There were %d errors:", len(self.errors)) for msg, args, kwds in self.errors: self.log_message(msg, *args, **kwds)
def walk(top, func, arg): """Directory tree walk with callback function. For each directory in the directory tree rooted at top (including top itself, but excluding '.' and '..'), call func(arg, dirname, fnames). dirname is the name of the directory, and fnames a list of the names of the files and subdirectories in dirname (excluding '.' and '..'). func may modify the fnames list in-place (e.g. via del or slice assignment), and walk will only recurse into the subdirectories whose names remain in fnames; this can be used to implement a filter, or to impose a specific order of visiting. No semantics are defined for, or required of, arg, beyond that arg is always passed to func. It can be used, e.g., to pass a filename pattern, or a mutable object designed to accumulate statistics. Passing None for arg is common.""" warnings.warnpy3k("In 3.x, os.path.walk is removed in favor of os.walk.", stacklevel=2) try: names = os.listdir(top) except os.error: return func(arg, top, names) for name in names: name = join(top, name) if isdir(name) and not islink(name): walk(name, func, arg)
def close(self, remove=os.unlink,error=os.error): if self.pipe: rc = self.pipe.close() else: rc = 255 if self.tmpfile: try: remove(self.tmpfile) except error: pass return rc # Alias
def _node(default=''): """ Helper to determine the node name of this machine. """ try: import socket except ImportError: # No sockets... return default try: return socket.gethostname() except socket.error: # Still not working... return default # os.path.abspath is new in Python 1.5.2:
def getsequences(self): """Return the set of sequences for the folder.""" sequences = {} fullname = self.getsequencesfilename() try: f = open(fullname, 'r') except IOError: return sequences while 1: line = f.readline() if not line: break fields = line.split(':') if len(fields) != 2: self.error('bad sequence in %s: %s' % (fullname, line.strip())) key = fields[0].strip() value = IntSet(fields[1].strip(), ' ').tolist() sequences[key] = value return sequences
def putsequences(self, sequences): """Write the set of sequences back to the folder.""" fullname = self.getsequencesfilename() f = None for key, seq in sequences.iteritems(): s = IntSet('', ' ') s.fromlist(seq) if not f: f = open(fullname, 'w') f.write('%s: %s\n' % (key, s.tostring())) if not f: try: os.unlink(fullname) except os.error: pass else: f.close()
def removemessages(self, list): """Remove one or more messages -- may raise os.error.""" errors = [] deleted = [] for n in list: path = self.getmessagefilename(n) commapath = self.getmessagefilename(',' + str(n)) try: os.unlink(commapath) except os.error: pass try: os.rename(path, commapath) except os.error, msg: errors.append(msg) else: deleted.append(n) if deleted: self.removefromallsequences(deleted) if errors: if len(errors) == 1: raise os.error, errors[0] else: raise os.error, ('multiple errors:', errors)
def copymessage(self, n, tofolder, ton): """Copy one message over a specific destination message, which may or may not already exist.""" path = self.getmessagefilename(n) # Open it to check that it exists f = open(path) f.close() del f topath = tofolder.getmessagefilename(ton) backuptopath = tofolder.getmessagefilename(',%d' % ton) try: os.rename(topath, backuptopath) except os.error: pass ok = 0 try: tofolder.setlast(None) shutil.copy2(path, topath) ok = 1 finally: if not ok: try: os.unlink(topath) except os.error: pass
def createmessage(self, n, txt): """Create a message, with text from the open file txt.""" path = self.getmessagefilename(n) backuppath = self.getmessagefilename(',%d' % n) try: os.rename(path, backuppath) except os.error: pass ok = 0 BUFSIZE = 16*1024 try: f = open(path, "w") while 1: buf = txt.read(BUFSIZE) if not buf: break f.write(buf) f.close() ok = 1 finally: if not ok: try: os.unlink(path) except os.error: pass
def check_version(version): # if application is binary then check for latest version if getattr(sys, 'frozen', False): try: url = "https://api.github.com/repos/stampery/mongoaudit/releases/latest" req = urllib2.urlopen(url) releases = json.loads(req.read()) latest = releases["tag_name"] if version < latest: print("mongoaudit version " + version) print("There's a new version " + latest) _upgrade(releases) except (urllib2.HTTPError, urllib2.URLError): print("Couldn't check for upgrades") except os.error: print("Couldn't write mongoaudit binary")
def write(path, data, kind='OTHER', dohex=False): assertType1(data) kind = kind.upper() try: os.remove(path) except os.error: pass err = 1 try: if kind == 'LWFN': writeLWFN(path, data) elif kind == 'PFB': writePFB(path, data) else: writeOther(path, data, dohex) err = 0 finally: if err and not DEBUG: try: os.remove(path) except os.error: pass # -- internal --
def save_pickle(self, dumpfile=DUMPFILE): if not self.changed: self.note(0, "\nNo need to save checkpoint") elif not dumpfile: self.note(0, "No dumpfile, won't save checkpoint") else: self.note(0, "\nSaving checkpoint to %s ...", dumpfile) newfile = dumpfile + ".new" f = open(newfile, "wb") pickle.dump(self, f) f.close() try: os.unlink(dumpfile) except os.error: pass os.rename(newfile, dumpfile) self.note(0, "Done.") return 1
def open_file(self, url): path = urllib.url2pathname(urllib.unquote(url)) if os.path.isdir(path): if path[-1] != os.sep: url = url + '/' indexpath = os.path.join(path, "index.html") if os.path.exists(indexpath): return self.open_file(url + "index.html") try: names = os.listdir(path) except os.error, msg: raise IOError, msg, sys.exc_traceback names.sort() s = MyStringIO("file:"+url, {'content-type': 'text/html'}) s.write('<BASE HREF="file:%s">\n' % urllib.quote(os.path.join(path, ""))) for name in names: q = urllib.quote(name) s.write('<A HREF="%s">%s</A>\n' % (q, q)) s.seek(0) return s return urllib.FancyURLopener.open_file(self, url)
def main(): verbose = webchecker.VERBOSE try: opts, args = getopt.getopt(sys.argv[1:], "qv") except getopt.error, msg: print msg print "usage:", sys.argv[0], "[-qv] ... [rooturl] ..." return 2 for o, a in opts: if o == "-q": verbose = 0 if o == "-v": verbose = verbose + 1 c = Sucker() c.setflags(verbose=verbose) c.urlopener.addheaders = [ ('User-agent', 'websucker/%s' % __version__), ] for arg in args: print "Adding root", arg c.addroot(arg) print "Run..." c.run()
def renames(old, new): """renames(old, new) Super-rename; create directories as necessary and delete any left empty. Works like rename, except creation of any intermediate directories needed to make the new pathname good is attempted first. After the rename, directories corresponding to rightmost path segments of the old name will be pruned until either the whole path is consumed or a nonempty directory is found. Note: this function can fail with the new directory structure made if you lack permissions needed to unlink the leaf directory or file. """ head, tail = path.split(new) if head and tail and not path.exists(head): makedirs(head) rename(old, new) head, tail = path.split(old) if head and tail: try: removedirs(head) except error: pass
def normalize_exception(exc): """ Given a SyntaxError from a marker evaluation, normalize the error message: - Remove indications of filename and line number. - Replace platform-specific error messages with standard error messages. """ subs = { 'unexpected EOF while parsing': 'invalid syntax', 'parenthesis is never closed': 'invalid syntax', } exc.filename = None exc.lineno = None exc.msg = subs.get(exc.msg, exc.msg) return exc
def addsitedir(sitedir): global _dirs_in_sys_path if _dirs_in_sys_path is None: _init_pathinfo() reset = 1 else: reset = 0 sitedir, sitedircase = makepath(sitedir) if not sitedircase in _dirs_in_sys_path: sys.path.append(sitedir) # Add path component try: names = os.listdir(sitedir) except os.error: return names.sort() for name in names: if name[-4:] == os.extsep + "pth": addpackage(sitedir, name) if reset: _dirs_in_sys_path = None
def test_remove_error(self): dirname = '/tmp/pykit-ut-fsutil-remove-on-error' if os.path.isdir(dirname): fsutil.remove(dirname) # OSError self.assertRaises(os.error, fsutil.remove, dirname, False) # ignore errors fsutil.remove(dirname, ignore_errors=True) def assert_error(exp_func): def onerror(func, path, exc_info): self.assertEqual(func, exp_func) return onerror # on error fsutil.remove(dirname, onerror=assert_error(os.remove))
def write_config(home, port, persist, demote): path = os.path.join(home, '.ave','config','adb_server.json') try: os.makedirs(os.path.join(home, '.ave','config')) except os.error, e: if e.errno != errno.EEXIST: raise config = {} if port != None: config['port'] = port if persist != None: config['persist'] = persist if demote != None: config['demote'] = demote with open(path, 'w') as f: json.dump(config, f)
def addsitedir(sitedir, known_paths=None): """Add 'sitedir' argument to sys.path if missing and handle .pth files in 'sitedir'""" if known_paths is None: known_paths = _init_pathinfo() reset = 1 else: reset = 0 sitedir, sitedircase = makepath(sitedir) if not sitedircase in known_paths: sys.path.append(sitedir) # Add path component try: names = os.listdir(sitedir) except os.error: return dotpth = os.extsep + "pth" names = [name for name in names if name.endswith(dotpth)] for name in sorted(names): addpackage(sitedir, name, known_paths) if reset: known_paths = None return known_paths
def extraction_error(self): """Give an error message for problems extracting file(s)""" old_exc = sys.exc_info()[1] cache_path = self.extraction_path or get_default_cache() tmpl = textwrap.dedent(""" Can't extract file(s) to egg cache The following error occurred while trying to extract file(s) to the Python egg cache: {old_exc} The Python egg cache directory is currently set to: {cache_path} Perhaps your account does not have write access to this directory? You can change the cache directory by setting the PYTHON_EGG_CACHE environment variable to point to an accessible directory. """).lstrip() err = ExtractionError(tmpl.format(**locals())) err.manager = self err.cache_path = cache_path err.original_error = old_exc raise err
def chmod(path, mode): log.debug("changing mode of %s to %o", path, mode) try: _chmod(path, mode) except os.error as e: log.debug("chmod failed: %s", e)
def _rlistdir(dirname): if not dirname: if isinstance(dirname, binary_type): dirname = binary_type(os.curdir, 'ASCII') else: dirname = os.curdir try: names = os.listdir(dirname) except os.error: return for x in names: yield x path = os.path.join(dirname, x) if dirname else x for y in _rlistdir(path): yield os.path.join(x, y)
def handle_error(self, request, client_address): """Handle an error gracefully. May be overridden. The default is to print a traceback and continue. """ print '-'*40 print 'Exception happened during processing of request from', print client_address import traceback traceback.print_exc() # XXX But this goes to stderr! print '-'*40