我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用tinydb.where()。
def test_adding_bibtex_entries_to_bibliography_db(datasets_db): """Adding a BibTeX entries to a database works and the database can be searched.""" TEST_BIBTEX = """@article{Roe1952gamma, author = {Roe, W. P. and Fishel, W. P.}, journal = {Trans. Am. Soc. Met.}, keywords = {Fe-Cr,Fe-Ti,Fe-Ti-Cr}, pages = {1030--1041}, title = {{Gamma Loop Studies in the Fe-Ti, Fe-Cr, and Fe-Ti-Cr Systems}}, volume = {44}, year = {1952} } @phdthesis{shin2007thesis, author = {Shin, D}, keywords = {Al-Cu,Al-Cu-Mg,Al-Cu-Si,Al-Mg,Al-Mg-Si,Al-Si,Cu-Mg,Mg-Si,SQS}, number = {May}, school = {The Pennsylvania State University}, title = {{Thermodynamic properties of solid solutions from special quasirandom structures and CALPHAD modeling: Application to aluminum-copper-magnesium-silicon and hafnium-silicon-oxygen}}, year = {2007} }""" db = add_bibtex_to_bib_database(TEST_BIBTEX, datasets_db) search_res = db.search(where('ID') == 'Roe1952gamma') assert len(search_res) == 1 assert len(db.all()) == 2
def info(name=None): if name is None: user_rows = user_table.all() else: user_rows = [user_table.get(where('name') == name)] if len(user_rows) == 0: print('No user yet.') return if user_rows[0] is None: raise UserNotFound row_format = '{:>15}{:>15}{:>20}' print(row_format.format('Name', 'EXP', 'is BDUSS valid')) for user_row in user_rows: print(row_format.format(user_row['name'], user_row['exp'], str(User(user_row['bduss']).validation)))
def test_json_readwrite(tmpdir): """ Regression test for issue #1 """ path = str(tmpdir.join('test.db')) # Create TinyDB instance db = TinyDB(path, storage=JSONStorage) item = {'name': 'A very long entry'} item2 = {'name': 'A short one'} get = lambda s: db.get(where('name') == s) db.insert(item) assert get('A very long entry') == item db.remove(where('name') == 'A very long entry') assert get('A very long entry') is None db.insert(item2) assert get('A short one') == item2 db.remove(where('name') == 'A short one') assert get('A short one') is None
def test_multiple_tables(db): table1 = db.table('table1') table2 = db.table('table2') table3 = db.table('table3') table1.insert({'int': 1, 'char': 'a'}) table2.insert({'int': 1, 'char': 'b'}) table3.insert({'int': 1, 'char': 'c'}) assert table1.count(where('char') == 'a') == 1 assert table2.count(where('char') == 'b') == 1 assert table3.count(where('char') == 'c') == 1 db.purge_tables() assert len(table1) == 0 assert len(table2) == 0 assert len(table3) == 0
def test_lru_cache(db): # Test integration into TinyDB table = db.table('table3', cache_size=2) query = where('int') == 1 table.search(query) table.search(where('int') == 2) table.search(where('int') == 3) assert query not in table._query_cache table.remove(where('int') == 1) assert not table._query_cache.lru table.search(query) assert len(table._query_cache) == 1 table.clear_cache() assert len(table._query_cache) == 0
def _parse_atat_lattice(lattice_in): """Parse an ATAT-style `lat.in` string. The parsed string will be in three groups: (Coordinate system) (lattice) (atoms) where the atom group is split up into subgroups, each describing the position and atom name """ float_number = Regex(r'[-+]?[0-9]*\.?[0-9]+([eE][-+]?[0-9]+)?').setParseAction(lambda t: [float(t[0])]) vector = Group(float_number + float_number + float_number) angles = vector vector_line = vector + Suppress(LineEnd()) coord_sys = Group((vector_line + vector_line + vector_line) | (vector + angles + Suppress(LineEnd()))) lattice = Group(vector + vector + vector) atom = Group(vector + Group(OneOrMore(Word(alphas + '_')))) atat_lattice_grammer = coord_sys + lattice + Group(OneOrMore(atom)) # parse the input string and convert it to a POSCAR string return atat_lattice_grammer.parseString(lattice_in)
def create_service(self, opts): for target in self.targets: docker_client = docker.DockerClient('tcp://' + common.translate_id(target)[0] + ':' + cfg.docker['API_PORT']) if "sub_driver" in opts: sub_driver_name = opts['sub_driver'] sub_driver = getattr(docker_client, sub_driver_name) del opts['sub_driver'] if sub_driver_name == 'swarm': sub_driver_opts = opts['opts'] node_type = sub_driver_opts['type'] if node_type == 'manager': sub_driver.init('eth0:' + cfg.docker['SWARM_PORT'], '0.0.0.0:' + cfg.docker['SWARM_PORT']) db.vms.update(insert_join_token(sub_driver.attrs['JoinTokens']), where('name') == target) elif node_type == 'worker': manager = db.vms.get(where('name') == sub_driver_opts['managers'][0]) sub_driver.join([common.id_to_swarm(sub_driver_opts['managers'][0])], manager['docker']['join_tokens']['Worker'], '0.0.0.0:' + cfg.docker['SWARM_PORT']) else: docker_client.containers.run(**opts, detach=True)
def create_cluster(self): self.init = {} for vm in self.vms: docker_client = docker.DockerClient('tcp://' + common.translate_id(vm['id'])[0] + ':' + cfg.docker['API_PORT']) swarm_client = docker_client.swarm if vm['role'] == 'manager': swarm_client.init('eth0:' + cfg.docker['SWARM_PORT'], '0.0.0.0:' + cfg.docker['SWARM_PORT']) db.vms.update( insert_join_token( swarm_client.attrs['JoinTokens'] ), where('name') == vm['id']) self.vms.remove(vm) self.init = vm break for vm in self.vms: if vm['role'] == 'manager': self.add_manager(vm) elif vm['role'] == 'worker': self.add_worker(vm)
def _fncChatBotAlly_cmdWhoPlays(self, m): self.log("+", "\t\tReceive command !who_plays") table = self.db['ranks'].table('server_'+str(self.account.account['serveur'])) # scan_date = table.get(where('id'), len(table)) # scan_date = scan_date['scan_date'] ranks_old = table.all() ranks_now = self.account.getRanking() self.log("?", "\t\tSearching for actives users..") rapport = "Active players for last 5 days :\n\n" count = 0 for rn in ranks_now: for ro in ranks_old: if rn['user'] == ro['user']: if (ro['points'] - rn['points']) != 0: count = count +1 rapport = rapport + "- @" + rn['user'] + " ("+str((rn['points'] - ro['points']))+" pts)\n" if count >= 7: count = 0 self.account.rSendMessageToAlliance(rapport) rapport = "" break rapport = rapport + "OVER !" self.account.rSendMessageToAlliance(rapport)
def __init__(self, db='nonedb.json'): # Storage and serialization serializer = SerializationMiddleware(tinydb.storages.JSONStorage) serializer.register_serializer(DateTimeSerializer(), 'TinyDateTime') # A reference to the actual database object. self._conn = tinydb.TinyDB(db, storage=serializer) # Activat SmartCache self._conn.table_class = SmartCacheTable # A shortcut to ``tinydb.TinyDB.table`` method. # See http://tinydb.readthedocs.org/en/latest/usage.html#tables # for reference. self.table = self._conn.table # A shortcut to ``tinydb.where`` object. # See http://tinydb.readthedocs.org/en/latest/usage.html#queries # for reference. self.where = tinydb.where
def set(self, name, value): db = TinyDB(self.filename) db.upsert({'name': name, 'value': value}, where('name') == name)
def get(self, name, default=None): db = TinyDB(self.filename) item = db.get(where('name') == name) if item is not None: return item.get('value', default) return default
def get_data(comps, phase_name, configuration, symmetry, datasets, prop): desired_data = datasets.search((tinydb.where('output').test(lambda x: x in prop)) & (tinydb.where('components').test(lambda x: set(x).issubset(comps))) & (tinydb.where('solver').test(symmetry_filter, configuration, list_to_tuple(symmetry) if symmetry else symmetry)) & (tinydb.where('phases') == [phase_name])) # This seems to be necessary because the 'values' member does not modify 'datasets' # But everything else does! desired_data = copy.deepcopy(desired_data) def recursive_zip(a, b): if isinstance(a, (list, tuple)) and isinstance(b, (list, tuple)): return list(recursive_zip(x, y) for x, y in zip(a, b)) else: return list(zip(a, b)) for idx, data in enumerate(desired_data): # Filter output values to only contain data for matching sublattice configurations matching_configs = np.array([(canonicalize(sblconf, symmetry) == canonicalize(configuration, symmetry)) for sblconf in data['solver']['sublattice_configurations']]) matching_configs = np.arange(len(data['solver']['sublattice_configurations']))[matching_configs] # Rewrite output values with filtered data desired_data[idx]['values'] = np.array(data['values'], dtype=np.float)[..., matching_configs] desired_data[idx]['solver']['sublattice_configurations'] = list_to_tuple(np.array(data['solver']['sublattice_configurations'], dtype=np.object)[matching_configs].tolist()) try: desired_data[idx]['solver']['sublattice_occupancies'] = np.array(data['solver']['sublattice_occupancies'], dtype=np.object)[matching_configs].tolist() except KeyError: pass # Filter out temperatures below 298.15 K (for now, until better refstates exist) temp_filter = np.atleast_1d(data['conditions']['T']) >= 298.15 desired_data[idx]['conditions']['T'] = np.atleast_1d(data['conditions']['T'])[temp_filter] # Don't use data['values'] because we rewrote it above; not sure what 'data' references now desired_data[idx]['values'] = desired_data[idx]['values'][..., temp_filter, :] return desired_data
def symmetry_filter(x, config, symmetry): """ Return True if the candidate sublattice configuration has any symmetry which matches the phase model symmetry. Parameters ---------- x : the candidate dataset 'solver' dict. Must contain the "sublattice_configurations" key config : the configuratino of interest: e.g. ['AL', ['AL', 'NI'], 'VA'] symmetry : tuple of tuples where each inner tuple is a group of equivalent sublattices. A value of ((0, 1), (2, 3, 4)) means that sublattices at indices 0 and 1 are symmetrically equivalent to each other and sublattices at indices 2, 3, and 4 are symetrically equivalent to each other. Returns ------- bool """ if x['mode'] == 'manual': if len(config) != len(x['sublattice_configurations'][0]): return False # If even one matches, it's a match # We do more filtering downstream for data_config in x['sublattice_configurations']: if canonicalize(config, symmetry) == canonicalize(data_config, symmetry): return True return False
def test_pickelable_tinydb_can_be_pickled_and_unpickled(): """PickleableTinyDB should be able to be pickled and unpickled.""" test_dict = {'test_key': ['test', 'values']} db = PickleableTinyDB(storage=MemoryStorage) db.insert(test_dict) db = pickle.loads(pickle.dumps(db)) assert db.search(where('test_key').exists())[0] == test_dict
def is_file_public(self, bucket_id, file_id): if len(self.table.search((where('bucket_id') == str(bucket_id)) & (where('file_id') == str(file_id)))) > 0: return True else: return False
def get_public_file_hash(self, bucket_id, file_id): public_file_data = self.table.search((where('bucket_id') == str(bucket_id)) & (where('file_id') == str(file_id))) return public_file_data[0]["public_download_hash"]
def get_playlist_tracks_list(self, playlist_id): playlist_tracks_list = self.tracks_table.search(where('playlist_id') == str(playlist_id)) return playlist_tracks_list
def count_tracks_in_playlist(self, playlist_id): playlist_tracks_list = self.tracks_table.search(where('playlist_id') == str(playlist_id)) #i = 0 #for track in playlist_tracks_list: # i += 1 return len(playlist_tracks_list)
def is_file_in_playlist(self, local_file_id): if len(self.tracks_table.search(where('local_file_id') == str(local_file_id))) > 0: return True else: return False
def __init__(self, name): if not is_user_existent(name): raise UserNotFound() user_row = user_table.get(where('name') == name) self.eid = user_row.eid self.name = user_row['name'] self.exp = user_row['exp'] self.obj = User(user_row['bduss'])
def is_user_existent(name): field_existence = user_table.search(where('name').exists()) if not field_existence: return False user_existence = user_table.search(where('name') == name) return True if len(user_existence) is 1 else False
def delete(user): user_table.remove(where('name') == user.name) bar_table.remove(where('user') == user.eid) print('finished deleting {0}'.format(user.name))
def update(user): bars = User(user.obj.bduss).bars bars_as_list = [] # ? Bar ??????????? {kw: str, fid: str, eid: int} dict ? list for bar in bars: print('found {name}\'s bar {bar}'.format(bar=bar.kw, name=user.name)) bars_as_list.append({'kw': bar.kw, 'fid': bar.fid, 'user': user.eid}) print('{name} has {count} bars.'.format(name=user.name, count=len(bars))) bar_table.remove(where('user') == user.eid) # ??????? bar_table.insert_multiple(bars_as_list) return len(bars)
def sign(user, delay=None): bar_rows = bar_table.search(where('user') == user.eid) exp = 0 for bar_row in bar_rows: exp += sign_bar(user, Bar(bar_row['kw'], bar_row['fid'])) if delay is not None: time.sleep(delay) print('{name}\'s {count} bars was signed, exp +{exp}.'.format(name=user.name, count=len(bar_rows), exp=exp)) return exp
def modify(user, bduss): user_table.update({'bduss': bduss}, where('name') == user.name)
def test_delete(db): db.update(delete('int'), where('char') == 'a') assert 'int' not in db.get(where('char') == 'a')
def test_increment(db): db.update(increment('int'), where('char') == 'a') assert db.get(where('char') == 'a')['int'] == 2
def test_decrement(db): db.update(decrement('int'), where('char') == 'a') assert db.get(where('char') == 'a')['int'] == 0
def test_insert(db): db.purge() db.insert({'int': 1, 'char': 'a'}) assert db.count(where('int') == 1) == 1 db.purge() db.insert({'int': 1, 'char': 'a'}) db.insert({'int': 1, 'char': 'b'}) db.insert({'int': 1, 'char': 'c'}) assert db.count(where('int') == 1) == 3 assert db.count(where('char') == 'a') == 1
def test_remove(db): db.remove(where('char') == 'b') assert len(db) == 2 assert db.count(where('int') == 1) == 2
def test_remove_multiple(db): db.remove(where('int') == 1) assert len(db) == 0
def test_remove_returns_ids(db): assert db.remove(where('char') == 'b') == [2]
def test_update(db): assert db.count(where('int') == 1) == 3 db.update({'int': 2}, where('char') == 'a') assert db.count(where('int') == 2) == 1 assert db.count(where('int') == 1) == 2
def test_update_returns_ids(db): db.purge() assert db.insert({'int': 1, 'char': 'a'}) == 1 assert db.insert({'int': 1, 'char': 'a'}) == 2 assert db.update({'char': 'b'}, where('int') == 1) == [1, 2]
def test_update_ids(db): db.update({'int': 2}, eids=[1, 2]) assert db.count(where('int') == 2) == 2
def test_search(db): assert not db._query_cache assert len(db.search(where('int') == 1)) == 3 assert len(db._query_cache) == 1 assert len(db.search(where('int') == 1)) == 3 # Query result from cache
def test_get(db): item = db.get(where('char') == 'b') assert item['char'] == 'b'
def test_count(db): assert db.count(where('int') == 1) == 3 assert db.count(where('char') == 'd') == 0
def test_contains(db): assert db.contains(where('int') == 1) assert not db.contains(where('int') == 0)
def test_unique_ids(tmpdir): """ :type tmpdir: py._path.local.LocalPath """ path = str(tmpdir.join('db.json')) # Verify ids are unique when reopening the DB and inserting with TinyDB(path) as _db: _db.insert({'x': 1}) with TinyDB(path) as _db: _db.insert({'x': 1}) with TinyDB(path) as _db: data = _db.all() assert data[0].eid != data[1].eid # Verify ids stay unique when inserting/removing with TinyDB(path) as _db: _db.purge() _db.insert_multiple({'x': i} for i in range(5)) _db.remove(where('x') == 2) assert len(_db) == 4 ids = [e.eid for e in _db.all()] assert len(ids) == len(set(ids))
def test_unicode_memory(db): """ Regression test for issue #28 """ unic_str = 'ß'.decode('utf-8') byte_str = 'ß' db.insert({'value': unic_str}) assert db.contains(where('value') == byte_str) assert db.contains(where('value') == unic_str) db.purge() db.insert({'value': byte_str}) assert db.contains(where('value') == byte_str) assert db.contains(where('value') == unic_str)
def test_unicode_json(tmpdir): """ Regression test for issue #28 """ unic_str1 = 'a'.decode('utf-8') byte_str1 = 'a' unic_str2 = 'ß'.decode('utf-8') byte_str2 = 'ß' path = str(tmpdir.join('db.json')) with TinyDB(path) as _db: _db.purge() _db.insert({'value': byte_str1}) _db.insert({'value': byte_str2}) assert _db.contains(where('value') == byte_str1) assert _db.contains(where('value') == unic_str1) assert _db.contains(where('value') == byte_str2) assert _db.contains(where('value') == unic_str2) with TinyDB(path) as _db: _db.purge() _db.insert({'value': unic_str1}) _db.insert({'value': unic_str2}) assert _db.contains(where('value') == byte_str1) assert _db.contains(where('value') == unic_str1) assert _db.contains(where('value') == byte_str2) assert _db.contains(where('value') == unic_str2)
def test_eids_json(tmpdir): """ Regression test for issue #45 """ path = str(tmpdir.join('db.json')) with TinyDB(path) as _db: _db.purge() assert _db.insert({'int': 1, 'char': 'a'}) == 1 assert _db.insert({'int': 1, 'char': 'a'}) == 2 _db.purge() assert _db.insert_multiple([{'int': 1, 'char': 'a'}, {'int': 1, 'char': 'b'}, {'int': 1, 'char': 'c'}]) == [1, 2, 3] assert _db.contains(eids=[1, 2]) assert not _db.contains(eids=[88]) _db.update({'int': 2}, eids=[1, 2]) assert _db.count(where('int') == 2) == 2 el = _db.all()[0] assert _db.get(eid=el.eid) == el assert _db.get(eid=float('NaN')) is None _db.remove(eids=[1, 2]) assert len(_db) == 1