我们从Python开源项目中,提取了以下11个代码示例,用于说明如何使用dbm.error()。
def markLocation(line, loc): if len(line) <= 60: # line is short enough pass elif loc <= 30: # error at beginning line = line[:60] + '[...]' elif loc >= (len(line) - 30): # error at end loc = 60 - (len(line) - loc) + 5 line = '[...]' + line[-60:] else: # error in middle line = '[...]' + line[loc-30:loc+30] + '[...]' loc = 35 return "Offending query: " + line + "\n" + (" " * (loc + 17)) + \ "^.-- Error location"
def doArchive(argv, bobRoot): subHelp = "\n ... ".join(sorted( [ "{:8} {}".format(c, d[1]) for (c, d) in availableArchiveCmds.items() ])) parser = argparse.ArgumentParser(prog="bob archive", formatter_class=argparse.RawDescriptionHelpFormatter, description="""Manage binary artifacts archive. The following subcommands are available: bob archive {} """.format(subHelp)) parser.add_argument('subcommand', help="Subcommand") parser.add_argument('args', nargs=argparse.REMAINDER, help="Arguments for subcommand") args = parser.parse_args(argv) if args.subcommand in availableArchiveCmds: availableArchiveCmds[args.subcommand][0](args.args) else: parser.error("Unknown subcommand '{}'".format(args.subcommand))
def load(cls, cacheName, cacheKey): try: db = dbm.open(cacheName, "r") persistedCacheKey = db.get(b'vsn') if cacheKey == persistedCacheKey: return cls(db, db[b'root']) else: db.close() except OSError: pass except dbm.error: pass return None
def create(cls, cacheName, cacheKey, root): try: db = dbm.open(cacheName, 'n') try: db[b'root'] = rootKey = PkgGraphNode.__convertPackageToGraph(db, root) db[b'vsn'] = cacheKey finally: db.close() return cls(dbm.open(cacheName, 'r'), rootKey) except dbm.error as e: raise BobError("Cannot save internal state: " + str(e))
def walkPackagePath(self, path): """Legacy path walking. Does not support any advanced query features. Guarenteed to return only a single package. If any path element is not found a error is thrown. """ # replace aliases path = self.__substAlias(path) # walk packages thisPackage = self.getRootPackage() steps = [ s for s in path.split("/") if s != "" ] if steps == []: raise BobError("'{}' is not a valid package path".format(path)) for step in steps: nextPackages = { s.getPackage().getName() : s.getPackage() for s in thisPackage.getDirectDepSteps() } for s in thisPackage.getIndirectDepSteps(): p = s.getPackage() nextPackages.setdefault(p.getName(), p) if step not in nextPackages: stack = thisPackage.getStack() raise BobError("Package '{}' not found under '{}'" .format(step, "/".join(stack) if stack != [''] else "/")) thisPackage = nextPackages[step] return thisPackage
def __enter__(self): try: self.__db = dbm.open(".bob-adb", 'c') except dbm.error as e: raise BobError("Cannot open cache: " + str(e)) return self
def set(self, key, value): self.open() if not self.lock: return time.time(), value try: return super(FileCache, self).set(key, value) except dbm.error: logger.exception("Failed to save to cache, flushing cache") self.destroy() self.open() return super(FileCache, self).set(key, value) except Exception: logger.exception("Failed to save to cache.") return time.time(), value
def __del__(self): if self.opened: logger.error("CACHE not closed propery.")
def _shelve_compat(name, *args, **kwargs): try: return shelve.open(name, *args, **kwargs) except dbm.error[0]: # Python 3 whichdb needs to try .db to determine type if name.endswith('.db'): name = name.rsplit('.db', 1)[0] return shelve.open(name, *args, **kwargs) else: raise
def create(self): """Create a new on-disk database. @raise anydbm.error: If there's a problem creating the database. """ if self.filename: self.db = anydbm.open(self.filename, "n") #raises anydbm.error self.db["--Reserved--type"] = self.type self.db.sync() else: self.db = {}
def open(self): """Open a pre-existing on-disk database. @raise anydbm.error: If there's a problem opening the database. @raise ValueError: If the database is not of the right type. """ if not self.filename: raise ValueError("Can only open on-disk databases") self.db = anydbm.open(self.filename, "w") #raises anydbm.error try: if self.db["--Reserved--type"] != self.type: raise ValueError("Not a %s database" % self.type) except KeyError: raise ValueError("Not a recognized database")