我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用dbm.open()。
def __save(self): if self.__asynchronous == 0: state = { "version" : _BobState.CUR_VERSION, "byNameDirs" : self.__byNameDirs, "results" : self.__results, "inputs" : self.__inputs, "jenkins" : self.__jenkins, "dirStates" : self.__dirStates, "buildState" : self.__buildState, } tmpFile = self.__path+".new" try: with open(tmpFile, "wb") as f: pickle.dump(state, f) f.flush() os.fsync(f.fileno()) os.replace(tmpFile, self.__path) except OSError as e: raise ParseError("Error saving workspace state: " + str(e)) self.__dirty = False else: self.__dirty = True
def __init__(self, root, **config): self.root = root self.config = config store = self.config["dbm_path"] os.makedirs(store, exist_ok=True) self.http_path = config.get("http_path", "monitor") self.oauth = github.OAuth(**root.config.secrets[self.config["name"]]) self.tokens = dbm.open(os.path.join(store, "tokens.db"), "cs") self.sessions = dbm.open(os.path.join(store, "sessions.db"), "cs") self.acl = config["access"] self.logs_path = config["logs_path"] os.makedirs(self.logs_path, exist_ok=True) self.connections = [] self._ws_job_listeners = collections.defaultdict(list) self._jobws_cb_map = collections.defaultdict(list) self._ws_user_map = {} self._user_events = collections.defaultdict(list)
def dumpasn1(self): """ Pretty print an ASN.1 DER object using cryptlib dumpasn1 tool. Use a temporary file rather than popen4() because dumpasn1 uses seek() when decoding ASN.1 content nested in OCTET STRING values. """ ret = None fn = "dumpasn1.%d.tmp" % os.getpid() try: f = open(fn, "wb") f.write(self.get_DER()) f.close() p = subprocess.Popen(("dumpasn1", "-a", fn), stdout = subprocess.PIPE, stderr = subprocess.STDOUT) ret = "\n".join(x for x in p.communicate()[0].splitlines() if x.startswith(" ")) except Exception, e: ret = "[Could not run dumpasn1: %s]" % e finally: os.unlink(fn) return ret
def keys(self): """Return a list of usernames in the database. @rtype: list @return: The usernames in the database. """ if self.db == None: raise AssertionError("DB not open") self.lock.acquire() try: usernames = self.db.keys() finally: self.lock.release() usernames = [u for u in usernames if not u.startswith("--Reserved--")] return usernames
def find_test_run_time_diff(test_id, run_time): times_db_path = os.path.join(os.path.join(os.getcwd(), '.testrepository'), 'times.dbm') if os.path.isfile(times_db_path): try: test_times = dbm.open(times_db_path) except Exception: return False try: avg_runtime = float(test_times.get(str(test_id), False)) except Exception: try: avg_runtime = float(test_times[str(test_id)]) except Exception: avg_runtime = False if avg_runtime and avg_runtime > 0: run_time = float(run_time.rstrip('s')) perc_diff = ((run_time - avg_runtime) / avg_runtime) * 100 return perc_diff return False
def open(file, flag='r', mode=0666): """Open or create database at path given by *file*. Optional argument *flag* can be 'r' (default) for read-only access, 'w' for read-write access of an existing database, 'c' for read-write access to a new or existing database, and 'n' for read-write access to a new database. Note: 'r' and 'w' fail if the database doesn't exist; 'c' creates it only if it doesn't exist; and 'n' always creates a new database. """ # guess the type of an existing database from whichdb import whichdb result=whichdb(file) if result is None: # db doesn't exist if 'c' in flag or 'n' in flag: # file doesn't exist and the new # flag was used so use default type mod = _defaultmod else: raise error, "need 'c' or 'n' flag to open new db" elif result == "": # db type cannot be determined raise error, "db type could not be determined" else: mod = __import__(result) return mod.open(file, flag, mode)
def __getBIdCache(self): if self.__buildIdCache is None: self.__buildIdCache = dbm.open(".bob-buildids.dbm", 'c') #self.__buildIdCache = {} return self.__buildIdCache
def load(cls, cacheName, cacheKey): try: db = dbm.open(cacheName, "r") persistedCacheKey = db.get(b'vsn') if cacheKey == persistedCacheKey: return cls(db, db[b'root']) else: db.close() except OSError: pass except dbm.error: pass return None
def create(cls, cacheName, cacheKey, root): try: db = dbm.open(cacheName, 'n') try: db[b'root'] = rootKey = PkgGraphNode.__convertPackageToGraph(db, root) db[b'vsn'] = cacheKey finally: db.close() return cls(dbm.open(cacheName, 'r'), rootKey) except dbm.error as e: raise BobError("Cannot save internal state: " + str(e))
def __enter__(self): try: self.__db = dbm.open(".bob-adb", 'c') except dbm.error as e: raise BobError("Cannot open cache: " + str(e)) return self
def __init__(self, service_name, storage_name, **kwargs): path = os.path.join(kwargs["path"], service_name) os.makedirs(path, exist_ok=True) self.db = dbm.open(os.path.join(path, storage_name + ".db"), "cs")
def __init__(self, root, **kwargs): self.root = root self.cfg = kwargs self.http_path = kwargs.get("http_path", "github") self.oauth = github.OAuth(**root.config.secrets[self.cfg["name"]]) store = kwargs["data-path"] os.makedirs(store, exist_ok=True) self.users = dbm.open(os.path.join(store, "users.db"), "cs") self.orgs = dbm.open(os.path.join(store, "orgs.db"), "cs") session_store = dbm.open(os.path.join(store, "sessions.db"), "cs") self.ss = SessionStore(session_store, kwargs.get("cookie_name", "ghs"))
def cb_job_started(self, job): path = os.path.join(self.config["logs_path"], job.event.id, job.config["name"]) os.makedirs(path) path = os.path.join(path, "console.txt") outfile = open(path, "wb") job.console_callbacks.add(lambda s,d: outfile.write(d))
def setUp(self): self.db = shelfdb.open('test_data/db') self.assertIsInstance(self.db, shelfdb.shelf.DB)
def test_shelf_open_close(self): self.db.shelf('user') try: dbm.open('test_data/db/user') except dbm.gnu.error as e: self.assertEqual(e.errno, 11) self.db.close() self.assertIsInstance(self.db._shelf['user']._shelf.dict, shelve._ClosedDict)
def setUp(self): self.db = shelfdb.open('test_data/db') self.user_list = [ {'name': 'Jan'}, {'name': 'Feb'}, {'name': 'Mar'}, ] for user in self.user_list: user['_id'] = self.db.shelf('user').insert(user) # Check if _id is a valid uuid1 id_ = uuid.UUID(user['_id'], version=1) self.assertEqual(user['_id'].replace('-', ''), id_.hex)
def OnSeveResult(self,event): fn = open('catalog.txt','w+') fn2 = open('extract.txt',"w+") leftRule = self.lf.GetText() rightRule = self.rt.GetText() fn.writelines(leftRule) fn2.writelines(rightRule) fn.close() fn2.close()
def save(self, data): with open('NormalFilePersistence.txt', 'w') as open_file: open_file.write(data)
def load(self): with open('NormalFilePersistence.txt', 'r') as open_file: return open_file.read()
def save(self, key, value): try: dbm_file = dbm.open('DBMPersistence', 'c') dbm_file[key] = str(value) finally: dbm_file.close()
def load(self, key): try: dbm_file = dbm.open('DBMPersistence', 'r') if key in dbm_file: result = dbm_file[key] else: result = None finally: dbm_file.close() return result
def save(self, obj): with open('PicklePersistence', 'wb') as pickle_file: pickle.dump(obj, pickle_file)
def save(self, key, obj): try: shelve_file = shelve.open('ShelvePersistence') shelve_file[key] = obj finally: shelve_file.close()
def load(self, key): try: shelve_file = shelve.open('ShelvePersistence') if key in shelve_file: result = shelve_file[key] else: result = None finally: shelve_file.close() return result
def __init__(self, filename, flag='c', protocol=None, writeback=False): import dbm Shelf.__init__(self, dbm.open(filename, flag), protocol, writeback)
def open(filename, flag='c', protocol=None, writeback=False): """Open a persistent dictionary for reading and writing. The filename parameter is the base filename for the underlying database. As a side-effect, an extension may be added to the filename and more than one file may be created. The optional flag parameter has the same interpretation as the flag parameter of dbm.open(). The optional protocol parameter specifies the version of the pickle protocol (0, 1, or 2). See the module's __doc__ string for an overview of the interface. """ return DbfilenameShelf(filename, flag, protocol, writeback)
def __init__(self, filename, flag='c', protocol=None, keyencoding='utf-8'): self.db = filename self.flag = flag self.dict = {} with dbm.open(self.db, self.flag) as db: for k in db.keys(): v = BytesIO(db[k]) self.dict[k] = Unpickler(v).load() shelve.Shelf.__init__(self, self.dict, protocol, False, keyencoding)
def sync(self): with dbm.open(self.db, self.flag) as db: for k, v in self.dict.items(): f = BytesIO() p = Pickler(f, protocol=self._protocol) p.dump(v) db[k] = f.getvalue() db.sync()
def __init__(self, filename, mode, perm): import dbm self.db = dbm.open(filename, mode, perm)
def query(self, word): import requests from bs4 import BeautifulSoup sess = requests.Session() headers = { 'Host': 'open.iciba.com', 'User-Agent': 'Mozilla/5.0 (X11; Linux x86_64; rv:50.0) Gecko/20100101 Firefox/50.0', 'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8', 'Accept-Language': 'en-US,en;q=0.5', 'Accept-Encoding': 'gzip, deflate' } sess.headers.update(headers) url = 'http://open.iciba.com/huaci_new/dict.php?word=%s' % (word) try: resp = sess.get(url, timeout=100) text = resp.text pattern = r'(<div class=\\\"icIBahyI-group_pos\\\">[\s\S]+?</div>)' text = re.search(pattern, text).group(1) except: return None if (resp.status_code == 200) and (text): soup = BeautifulSoup(text, 'lxml') ps = soup.find_all('p') trans = [] for item in ps: transText = item.get_text() transText = re.sub( r'\s+', ' ', transText.replace('\t', '')).strip() if transText: trans.append(transText) return '\n'.join(trans) else: return None
def load(self, id): # overridable callback for external storage, do not call directly try: file = open(self.path(id), 'rb') except IOError: return None blob = file.read() file.close() return blob
def save(self, id, blob): # overridable callback for external storage, do not call directly file = open(self.path(id), 'wb') file.write(blob) file.close() return id
def startup(self, *args, **kwargs): '''Supports path= to specify a location, otherwise assumes the current directory. ''' path = kwargs.get('path', '.') self._path = '%s/%s' % (path, self.name) self._db = dbm.open(self._path, 'c') return [ ]
def set(self, **kw): """ Set this object by setting one of its known formats. This method only allows one to set one format at a time. Subsequent calls will clear the object first. The point of all this is to let the object's internal converters handle mustering the object into whatever format you need at the moment. """ if len(kw) == 1: name = kw.keys()[0] if name in self.formats: self.clear() setattr(self, name, kw[name]) return if name == "PEM": self.clear() self._set_PEM(kw[name]) return if name == "Base64": self.clear() self.DER = base64.b64decode(kw[name]) return if name == "Auto_update": self.filename = kw[name] self.check_auto_update() return if name in ("PEM_file", "DER_file", "Auto_file"): f = open(kw[name], "rb") value = f.read() f.close() self.clear() if name == "PEM_file" or (name == "Auto_file" and looks_like_PEM(value)): self._set_PEM(value) else: self.DER = value return raise rpki.exceptions.DERObjectConversionError("Can't honor conversion request %r" % (kw,))
def check_auto_update(self): """ Check for updates to a DER object that auto-updates from a file. """ if self.filename is None: return try: filename = self.filename timestamp = os.stat(self.filename).st_mtime if self.timestamp is None or self.timestamp < timestamp: logger.debug("Updating %s, timestamp %s", filename, rpki.sundial.datetime.fromtimestamp(timestamp)) f = open(filename, "rb") value = f.read() f.close() self.clear() if looks_like_PEM(value): self._set_PEM(value) else: self.DER = value self.filename = filename self.timestamp = timestamp except (IOError, OSError), e: now = rpki.sundial.now() if self.lastfail is None or now > self.lastfail + self.failure_threshold: logger.warning("Could not auto_update %r (last failure %s): %s", self, self.lastfail, e) self.lastfail = now else: self.lastfail = None
def __init__(self, filename, keyno = 0): try: try: import gdbm as dbm_du_jour except ImportError: import dbm as dbm_du_jour self.keyno = long(keyno) self.filename = filename self.db = dbm_du_jour.open(filename, "c") except: logger.warning("insecure_debug_only_rsa_key_generator initialization FAILED, hack inoperative") raise