Python tinydb 模块,where() 实例源码

我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用tinydb.where()

项目:ESPEI    作者:PhasesResearchLab    | 项目源码 | 文件源码
def test_adding_bibtex_entries_to_bibliography_db(datasets_db):
    """Adding a BibTeX entries to a database works and the database can be searched."""
    TEST_BIBTEX = """@article{Roe1952gamma,
author = {Roe, W. P. and Fishel, W. P.},
journal = {Trans. Am. Soc. Met.},
keywords = {Fe-Cr,Fe-Ti,Fe-Ti-Cr},
pages = {1030--1041},
title = {{Gamma Loop Studies in the Fe-Ti, Fe-Cr, and Fe-Ti-Cr Systems}},
volume = {44},
year = {1952}
}

@phdthesis{shin2007thesis,
author = {Shin, D},
keywords = {Al-Cu,Al-Cu-Mg,Al-Cu-Si,Al-Mg,Al-Mg-Si,Al-Si,Cu-Mg,Mg-Si,SQS},
number = {May},
school = {The Pennsylvania State University},
title = {{Thermodynamic properties of solid solutions from special quasirandom structures and CALPHAD modeling: Application to aluminum-copper-magnesium-silicon and hafnium-silicon-oxygen}},
year = {2007}
}"""
    db = add_bibtex_to_bib_database(TEST_BIBTEX, datasets_db)
    search_res = db.search(where('ID') == 'Roe1952gamma')
    assert len(search_res) == 1
    assert len(db.all()) == 2
项目:mpsign    作者:abrasumente233    | 项目源码 | 文件源码
def info(name=None):
    if name is None:
        user_rows = user_table.all()
    else:
        user_rows = [user_table.get(where('name') == name)]

    if len(user_rows) == 0:
        print('No user yet.')
        return

    if user_rows[0] is None:
        raise UserNotFound

    row_format = '{:>15}{:>15}{:>20}'

    print(row_format.format('Name', 'EXP', 'is BDUSS valid'))

    for user_row in user_rows:
        print(row_format.format(user_row['name'],
                                user_row['exp'],
                                str(User(user_row['bduss']).validation)))
项目:annotated-py-tinydb    作者:hhstore    | 项目源码 | 文件源码
def test_json_readwrite(tmpdir):
    """
    Regression test for issue #1
    """
    path = str(tmpdir.join('test.db'))

    # Create TinyDB instance
    db = TinyDB(path, storage=JSONStorage)

    item = {'name': 'A very long entry'}
    item2 = {'name': 'A short one'}

    get = lambda s: db.get(where('name') == s)

    db.insert(item)
    assert get('A very long entry') == item

    db.remove(where('name') == 'A very long entry')
    assert get('A very long entry') is None

    db.insert(item2)
    assert get('A short one') == item2

    db.remove(where('name') == 'A short one')
    assert get('A short one') is None
项目:annotated-py-tinydb    作者:hhstore    | 项目源码 | 文件源码
def test_multiple_tables(db):
    table1 = db.table('table1')
    table2 = db.table('table2')
    table3 = db.table('table3')

    table1.insert({'int': 1, 'char': 'a'})
    table2.insert({'int': 1, 'char': 'b'})
    table3.insert({'int': 1, 'char': 'c'})

    assert table1.count(where('char') == 'a') == 1
    assert table2.count(where('char') == 'b') == 1
    assert table3.count(where('char') == 'c') == 1

    db.purge_tables()

    assert len(table1) == 0
    assert len(table2) == 0
    assert len(table3) == 0
项目:annotated-py-tinydb    作者:hhstore    | 项目源码 | 文件源码
def test_lru_cache(db):
    # Test integration into TinyDB
    table = db.table('table3', cache_size=2)
    query = where('int') == 1

    table.search(query)
    table.search(where('int') == 2)
    table.search(where('int') == 3)
    assert query not in table._query_cache

    table.remove(where('int') == 1)
    assert not table._query_cache.lru

    table.search(query)

    assert len(table._query_cache) == 1
    table.clear_cache()
    assert len(table._query_cache) == 0
项目:prlworkflows    作者:PhasesResearchLab    | 项目源码 | 文件源码
def _parse_atat_lattice(lattice_in):
    """Parse an ATAT-style `lat.in` string.

    The parsed string will be in three groups: (Coordinate system) (lattice) (atoms)
    where the atom group is split up into subgroups, each describing the position and atom name
    """
    float_number = Regex(r'[-+]?[0-9]*\.?[0-9]+([eE][-+]?[0-9]+)?').setParseAction(lambda t: [float(t[0])])
    vector = Group(float_number + float_number + float_number)
    angles = vector
    vector_line = vector + Suppress(LineEnd())
    coord_sys = Group((vector_line + vector_line + vector_line) | (vector + angles + Suppress(LineEnd())))
    lattice = Group(vector + vector + vector)
    atom = Group(vector + Group(OneOrMore(Word(alphas + '_'))))
    atat_lattice_grammer = coord_sys + lattice + Group(OneOrMore(atom))
    # parse the input string and convert it to a POSCAR string
    return atat_lattice_grammer.parseString(lattice_in)
项目:genorch    作者:genorch    | 项目源码 | 文件源码
def create_service(self, opts):
        for target in self.targets:
            docker_client = docker.DockerClient('tcp://' + common.translate_id(target)[0] +
                                                ':' + cfg.docker['API_PORT'])
            if "sub_driver" in opts:
                sub_driver_name = opts['sub_driver']
                sub_driver = getattr(docker_client, sub_driver_name)

                del opts['sub_driver']

                if sub_driver_name == 'swarm':
                    sub_driver_opts = opts['opts']
                    node_type = sub_driver_opts['type']

                    if node_type == 'manager':
                        sub_driver.init('eth0:' + cfg.docker['SWARM_PORT'], '0.0.0.0:' + cfg.docker['SWARM_PORT'])
                        db.vms.update(insert_join_token(sub_driver.attrs['JoinTokens']), where('name') == target)
                    elif node_type == 'worker':
                        manager = db.vms.get(where('name') == sub_driver_opts['managers'][0])
                        sub_driver.join([common.id_to_swarm(sub_driver_opts['managers'][0])], manager['docker']['join_tokens']['Worker'], '0.0.0.0:' + cfg.docker['SWARM_PORT'])

            else:
                docker_client.containers.run(**opts, detach=True)
项目:genorch    作者:genorch    | 项目源码 | 文件源码
def create_cluster(self):
        self.init = {}
        for vm in self.vms:
            docker_client = docker.DockerClient('tcp://' +
                                                common.translate_id(vm['id'])[0]
                                                + ':' + cfg.docker['API_PORT'])
            swarm_client = docker_client.swarm
            if vm['role'] == 'manager':
                swarm_client.init('eth0:' + cfg.docker['SWARM_PORT'],
                                  '0.0.0.0:' + cfg.docker['SWARM_PORT'])
                db.vms.update(
                              insert_join_token(
                                  swarm_client.attrs['JoinTokens']
                                  ),
                              where('name') == vm['id'])

                self.vms.remove(vm)
                self.init = vm
                break

        for vm in self.vms:
            if vm['role'] == 'manager':
                self.add_manager(vm)
            elif vm['role'] == 'worker':
                self.add_worker(vm)
项目:blacklion    作者:Foohx    | 项目源码 | 文件源码
def _fncChatBotAlly_cmdWhoPlays(self, m):
        self.log("+", "\t\tReceive command !who_plays")
        table = self.db['ranks'].table('server_'+str(self.account.account['serveur']))

        # scan_date = table.get(where('id'), len(table))
        # scan_date = scan_date['scan_date']

        ranks_old = table.all()
        ranks_now = self.account.getRanking()
        self.log("?", "\t\tSearching for actives users..")
        rapport = "Active players for last 5 days :\n\n"
        count = 0
        for rn in ranks_now:
            for ro in ranks_old:
                if rn['user'] == ro['user']:
                    if (ro['points'] - rn['points']) != 0:
                        count = count +1
                        rapport = rapport + "- @" + rn['user'] + " ("+str((rn['points'] - ro['points']))+" pts)\n"
                    if count >= 7:
                        count = 0
                        self.account.rSendMessageToAlliance(rapport)
                        rapport = ""
                    break
        rapport = rapport + "OVER !"
        self.account.rSendMessageToAlliance(rapport)
项目:PokeBot    作者:bcc5160    | 项目源码 | 文件源码
def test_json_readwrite(tmpdir):
    """
    Regression test for issue #1
    """
    path = str(tmpdir.join('test.db'))

    # Create TinyDB instance
    db = TinyDB(path, storage=JSONStorage)

    item = {'name': 'A very long entry'}
    item2 = {'name': 'A short one'}

    get = lambda s: db.get(where('name') == s)

    db.insert(item)
    assert get('A very long entry') == item

    db.remove(where('name') == 'A very long entry')
    assert get('A very long entry') is None

    db.insert(item2)
    assert get('A short one') == item2

    db.remove(where('name') == 'A short one')
    assert get('A short one') is None
项目:PokeBot    作者:bcc5160    | 项目源码 | 文件源码
def test_multiple_tables(db):
    table1 = db.table('table1')
    table2 = db.table('table2')
    table3 = db.table('table3')

    table1.insert({'int': 1, 'char': 'a'})
    table2.insert({'int': 1, 'char': 'b'})
    table3.insert({'int': 1, 'char': 'c'})

    assert table1.count(where('char') == 'a') == 1
    assert table2.count(where('char') == 'b') == 1
    assert table3.count(where('char') == 'c') == 1

    db.purge_tables()

    assert len(table1) == 0
    assert len(table2) == 0
    assert len(table3) == 0
项目:PokeBot    作者:bcc5160    | 项目源码 | 文件源码
def test_lru_cache(db):
    # Test integration into TinyDB
    table = db.table('table3', cache_size=2)
    query = where('int') == 1

    table.search(query)
    table.search(where('int') == 2)
    table.search(where('int') == 3)
    assert query not in table._query_cache

    table.remove(where('int') == 1)
    assert not table._query_cache.lru

    table.search(query)

    assert len(table._query_cache) == 1
    table.clear_cache()
    assert len(table._query_cache) == 0
项目:minihydra    作者:VillanCh    | 项目源码 | 文件源码
def test_json_readwrite(tmpdir):
    """
    Regression test for issue #1
    """
    path = str(tmpdir.join('test.db'))

    # Create TinyDB instance
    db = TinyDB(path, storage=JSONStorage)

    item = {'name': 'A very long entry'}
    item2 = {'name': 'A short one'}

    get = lambda s: db.get(where('name') == s)

    db.insert(item)
    assert get('A very long entry') == item

    db.remove(where('name') == 'A very long entry')
    assert get('A very long entry') is None

    db.insert(item2)
    assert get('A short one') == item2

    db.remove(where('name') == 'A short one')
    assert get('A short one') is None
项目:minihydra    作者:VillanCh    | 项目源码 | 文件源码
def test_multiple_tables(db):
    table1 = db.table('table1')
    table2 = db.table('table2')
    table3 = db.table('table3')

    table1.insert({'int': 1, 'char': 'a'})
    table2.insert({'int': 1, 'char': 'b'})
    table3.insert({'int': 1, 'char': 'c'})

    assert table1.count(where('char') == 'a') == 1
    assert table2.count(where('char') == 'b') == 1
    assert table3.count(where('char') == 'c') == 1

    db.purge_tables()

    assert len(table1) == 0
    assert len(table2) == 0
    assert len(table3) == 0
项目:minihydra    作者:VillanCh    | 项目源码 | 文件源码
def test_lru_cache(db):
    # Test integration into TinyDB
    table = db.table('table3', cache_size=2)
    query = where('int') == 1

    table.search(query)
    table.search(where('int') == 2)
    table.search(where('int') == 3)
    assert query not in table._query_cache

    table.remove(where('int') == 1)
    assert not table._query_cache.lru

    table.search(query)

    assert len(table._query_cache) == 1
    table.clear_cache()
    assert len(table._query_cache) == 0
项目:tinydb-jsonorm    作者:abnerjacobsen    | 项目源码 | 文件源码
def __init__(self, db='nonedb.json'):

        # Storage and serialization
        serializer = SerializationMiddleware(tinydb.storages.JSONStorage)
        serializer.register_serializer(DateTimeSerializer(), 'TinyDateTime')

        # A reference to the actual database object.
        self._conn = tinydb.TinyDB(db, storage=serializer)

        # Activat SmartCache
        self._conn.table_class = SmartCacheTable

        # A shortcut to ``tinydb.TinyDB.table`` method.
        # See http://tinydb.readthedocs.org/en/latest/usage.html#tables
        # for reference.
        self.table = self._conn.table

        # A shortcut to ``tinydb.where`` object.
        # See http://tinydb.readthedocs.org/en/latest/usage.html#queries
        # for reference.
        self.where = tinydb.where
项目:telebot    作者:DronMDF    | 项目源码 | 文件源码
def set(self, name, value):
        db = TinyDB(self.filename)
        db.upsert({'name': name, 'value': value}, where('name') == name)
项目:telebot    作者:DronMDF    | 项目源码 | 文件源码
def get(self, name, default=None):
        db = TinyDB(self.filename)
        item = db.get(where('name') == name)
        if item is not None:
            return item.get('value', default)
        return default
项目:ESPEI    作者:PhasesResearchLab    | 项目源码 | 文件源码
def get_data(comps, phase_name, configuration, symmetry, datasets, prop):
    desired_data = datasets.search((tinydb.where('output').test(lambda x: x in prop)) &
                                   (tinydb.where('components').test(lambda x: set(x).issubset(comps))) &
                                   (tinydb.where('solver').test(symmetry_filter, configuration, list_to_tuple(symmetry) if symmetry else symmetry)) &
                                   (tinydb.where('phases') == [phase_name]))
    # This seems to be necessary because the 'values' member does not modify 'datasets'
    # But everything else does!
    desired_data = copy.deepcopy(desired_data)

    def recursive_zip(a, b):
        if isinstance(a, (list, tuple)) and isinstance(b, (list, tuple)):
            return list(recursive_zip(x, y) for x, y in zip(a, b))
        else:
            return list(zip(a, b))

    for idx, data in enumerate(desired_data):
        # Filter output values to only contain data for matching sublattice configurations
        matching_configs = np.array([(canonicalize(sblconf, symmetry) == canonicalize(configuration, symmetry))
                                     for sblconf in data['solver']['sublattice_configurations']])
        matching_configs = np.arange(len(data['solver']['sublattice_configurations']))[matching_configs]
        # Rewrite output values with filtered data
        desired_data[idx]['values'] = np.array(data['values'], dtype=np.float)[..., matching_configs]
        desired_data[idx]['solver']['sublattice_configurations'] = list_to_tuple(np.array(data['solver']['sublattice_configurations'],
                                                                                          dtype=np.object)[matching_configs].tolist())
        try:
            desired_data[idx]['solver']['sublattice_occupancies'] = np.array(data['solver']['sublattice_occupancies'],
                                                                             dtype=np.object)[matching_configs].tolist()
        except KeyError:
            pass
        # Filter out temperatures below 298.15 K (for now, until better refstates exist)
        temp_filter = np.atleast_1d(data['conditions']['T']) >= 298.15
        desired_data[idx]['conditions']['T'] = np.atleast_1d(data['conditions']['T'])[temp_filter]
        # Don't use data['values'] because we rewrote it above; not sure what 'data' references now
        desired_data[idx]['values'] = desired_data[idx]['values'][..., temp_filter, :]
    return desired_data
项目:ESPEI    作者:PhasesResearchLab    | 项目源码 | 文件源码
def symmetry_filter(x, config, symmetry):
    """
    Return True if the candidate sublattice configuration has any symmetry
    which matches the phase model symmetry.

    Parameters
    ----------
    x : the candidate dataset 'solver' dict. Must contain the "sublattice_configurations" key
    config : the configuratino of interest: e.g. ['AL', ['AL', 'NI'], 'VA']
    symmetry : tuple of tuples where each inner tuple is a group of equivalent
               sublattices. A value of ((0, 1), (2, 3, 4)) means that sublattices
               at indices 0 and 1 are symmetrically equivalent to each other and
               sublattices at indices 2, 3, and 4 are symetrically equivalent to
               each other.

    Returns
    -------
    bool

    """
    if x['mode'] == 'manual':
        if len(config) != len(x['sublattice_configurations'][0]):
            return False
        # If even one matches, it's a match
        # We do more filtering downstream
        for data_config in x['sublattice_configurations']:
            if canonicalize(config, symmetry) == canonicalize(data_config, symmetry):
                return True
    return False
项目:ESPEI    作者:PhasesResearchLab    | 项目源码 | 文件源码
def test_pickelable_tinydb_can_be_pickled_and_unpickled():
    """PickleableTinyDB should be able to be pickled and unpickled."""
    test_dict = {'test_key': ['test', 'values']}
    db = PickleableTinyDB(storage=MemoryStorage)
    db.insert(test_dict)
    db = pickle.loads(pickle.dumps(db))
    assert db.search(where('test_key').exists())[0] == test_dict
项目:EasyStorj    作者:lakewik    | 项目源码 | 文件源码
def is_file_public(self, bucket_id, file_id):
        if len(self.table.search((where('bucket_id') == str(bucket_id)) & (where('file_id') == str(file_id)))) > 0:
            return True
        else:
            return False
项目:EasyStorj    作者:lakewik    | 项目源码 | 文件源码
def get_public_file_hash(self, bucket_id, file_id):
        public_file_data = self.table.search((where('bucket_id') == str(bucket_id)) & (where('file_id') == str(file_id)))
        return public_file_data[0]["public_download_hash"]
项目:EasyStorj    作者:lakewik    | 项目源码 | 文件源码
def get_playlist_tracks_list(self, playlist_id):
        playlist_tracks_list = self.tracks_table.search(where('playlist_id') == str(playlist_id))
        return playlist_tracks_list
项目:EasyStorj    作者:lakewik    | 项目源码 | 文件源码
def count_tracks_in_playlist(self, playlist_id):
        playlist_tracks_list = self.tracks_table.search(where('playlist_id') == str(playlist_id))
        #i = 0
        #for track in playlist_tracks_list:
        #    i += 1

        return len(playlist_tracks_list)
项目:EasyStorj    作者:lakewik    | 项目源码 | 文件源码
def is_file_in_playlist(self, local_file_id):
        if len(self.tracks_table.search(where('local_file_id') == str(local_file_id))) > 0:
            return True
        else:
            return False
项目:mpsign    作者:abrasumente233    | 项目源码 | 文件源码
def __init__(self, name):
        if not is_user_existent(name):
            raise UserNotFound()
        user_row = user_table.get(where('name') == name)
        self.eid = user_row.eid
        self.name = user_row['name']
        self.exp = user_row['exp']
        self.obj = User(user_row['bduss'])
项目:mpsign    作者:abrasumente233    | 项目源码 | 文件源码
def is_user_existent(name):
    field_existence = user_table.search(where('name').exists())
    if not field_existence:
        return False

    user_existence = user_table.search(where('name') == name)
    return True if len(user_existence) is 1 else False
项目:mpsign    作者:abrasumente233    | 项目源码 | 文件源码
def delete(user):
    user_table.remove(where('name') == user.name)
    bar_table.remove(where('user') == user.eid)
    print('finished deleting {0}'.format(user.name))
项目:mpsign    作者:abrasumente233    | 项目源码 | 文件源码
def update(user):
    bars = User(user.obj.bduss).bars
    bars_as_list = []

    # ? Bar ??????????? {kw: str, fid: str, eid: int} dict ? list
    for bar in bars:
        print('found {name}\'s bar {bar}'.format(bar=bar.kw, name=user.name))
        bars_as_list.append({'kw': bar.kw, 'fid': bar.fid, 'user': user.eid})

    print('{name} has {count} bars.'.format(name=user.name, count=len(bars)))
    bar_table.remove(where('user') == user.eid)  # ???????
    bar_table.insert_multiple(bars_as_list)
    return len(bars)
项目:mpsign    作者:abrasumente233    | 项目源码 | 文件源码
def sign(user, delay=None):
    bar_rows = bar_table.search(where('user') == user.eid)
    exp = 0

    for bar_row in bar_rows:
        exp += sign_bar(user, Bar(bar_row['kw'], bar_row['fid']))
        if delay is not None:
            time.sleep(delay)

    print('{name}\'s {count} bars was signed, exp +{exp}.'.format(name=user.name, count=len(bar_rows),
                                                                  exp=exp))
    return exp
项目:mpsign    作者:abrasumente233    | 项目源码 | 文件源码
def modify(user, bduss):
    user_table.update({'bduss': bduss}, where('name') == user.name)
项目:annotated-py-tinydb    作者:hhstore    | 项目源码 | 文件源码
def test_delete(db):
    db.update(delete('int'), where('char') == 'a')
    assert 'int' not in db.get(where('char') == 'a')
项目:annotated-py-tinydb    作者:hhstore    | 项目源码 | 文件源码
def test_increment(db):
    db.update(increment('int'), where('char') == 'a')
    assert db.get(where('char') == 'a')['int'] == 2
项目:annotated-py-tinydb    作者:hhstore    | 项目源码 | 文件源码
def test_decrement(db):
    db.update(decrement('int'), where('char') == 'a')
    assert db.get(where('char') == 'a')['int'] == 0
项目:annotated-py-tinydb    作者:hhstore    | 项目源码 | 文件源码
def test_insert(db):
    db.purge()
    db.insert({'int': 1, 'char': 'a'})

    assert db.count(where('int') == 1) == 1

    db.purge()

    db.insert({'int': 1, 'char': 'a'})
    db.insert({'int': 1, 'char': 'b'})
    db.insert({'int': 1, 'char': 'c'})

    assert db.count(where('int') == 1) == 3
    assert db.count(where('char') == 'a') == 1
项目:annotated-py-tinydb    作者:hhstore    | 项目源码 | 文件源码
def test_remove(db):
    db.remove(where('char') == 'b')

    assert len(db) == 2
    assert db.count(where('int') == 1) == 2
项目:annotated-py-tinydb    作者:hhstore    | 项目源码 | 文件源码
def test_remove_multiple(db):
    db.remove(where('int') == 1)

    assert len(db) == 0
项目:annotated-py-tinydb    作者:hhstore    | 项目源码 | 文件源码
def test_remove_returns_ids(db):
    assert db.remove(where('char') == 'b') == [2]
项目:annotated-py-tinydb    作者:hhstore    | 项目源码 | 文件源码
def test_update(db):
    assert db.count(where('int') == 1) == 3

    db.update({'int': 2}, where('char') == 'a')

    assert db.count(where('int') == 2) == 1
    assert db.count(where('int') == 1) == 2
项目:annotated-py-tinydb    作者:hhstore    | 项目源码 | 文件源码
def test_update_returns_ids(db):
    db.purge()
    assert db.insert({'int': 1, 'char': 'a'}) == 1
    assert db.insert({'int': 1, 'char': 'a'}) == 2

    assert db.update({'char': 'b'}, where('int') == 1) == [1, 2]
项目:annotated-py-tinydb    作者:hhstore    | 项目源码 | 文件源码
def test_update_ids(db):
    db.update({'int': 2}, eids=[1, 2])

    assert db.count(where('int') == 2) == 2
项目:annotated-py-tinydb    作者:hhstore    | 项目源码 | 文件源码
def test_search(db):
    assert not db._query_cache
    assert len(db.search(where('int') == 1)) == 3

    assert len(db._query_cache) == 1
    assert len(db.search(where('int') == 1)) == 3  # Query result from cache
项目:annotated-py-tinydb    作者:hhstore    | 项目源码 | 文件源码
def test_get(db):
    item = db.get(where('char') == 'b')
    assert item['char'] == 'b'
项目:annotated-py-tinydb    作者:hhstore    | 项目源码 | 文件源码
def test_count(db):
    assert db.count(where('int') == 1) == 3
    assert db.count(where('char') == 'd') == 0
项目:annotated-py-tinydb    作者:hhstore    | 项目源码 | 文件源码
def test_contains(db):
    assert db.contains(where('int') == 1)
    assert not db.contains(where('int') == 0)
项目:annotated-py-tinydb    作者:hhstore    | 项目源码 | 文件源码
def test_unique_ids(tmpdir):
    """
    :type tmpdir: py._path.local.LocalPath
    """
    path = str(tmpdir.join('db.json'))

    # Verify ids are unique when reopening the DB and inserting
    with TinyDB(path) as _db:
        _db.insert({'x': 1})

    with TinyDB(path) as _db:
        _db.insert({'x': 1})

    with TinyDB(path) as _db:
        data = _db.all()

        assert data[0].eid != data[1].eid

    # Verify ids stay unique when inserting/removing
    with TinyDB(path) as _db:
        _db.purge()
        _db.insert_multiple({'x': i} for i in range(5))
        _db.remove(where('x') == 2)

        assert len(_db) == 4

        ids = [e.eid for e in _db.all()]
        assert len(ids) == len(set(ids))
项目:annotated-py-tinydb    作者:hhstore    | 项目源码 | 文件源码
def test_unicode_memory(db):
    """
    Regression test for issue #28
    """
    unic_str = 'ß'.decode('utf-8')
    byte_str = 'ß'

    db.insert({'value': unic_str})
    assert db.contains(where('value') == byte_str)
    assert db.contains(where('value') == unic_str)

    db.purge()
    db.insert({'value': byte_str})
    assert db.contains(where('value') == byte_str)
    assert db.contains(where('value') == unic_str)
项目:annotated-py-tinydb    作者:hhstore    | 项目源码 | 文件源码
def test_unicode_json(tmpdir):
    """
    Regression test for issue #28
    """
    unic_str1 = 'a'.decode('utf-8')
    byte_str1 = 'a'

    unic_str2 = 'ß'.decode('utf-8')
    byte_str2 = 'ß'

    path = str(tmpdir.join('db.json'))

    with TinyDB(path) as _db:
        _db.purge()
        _db.insert({'value': byte_str1})
        _db.insert({'value': byte_str2})
        assert _db.contains(where('value') == byte_str1)
        assert _db.contains(where('value') == unic_str1)
        assert _db.contains(where('value') == byte_str2)
        assert _db.contains(where('value') == unic_str2)

    with TinyDB(path) as _db:
        _db.purge()
        _db.insert({'value': unic_str1})
        _db.insert({'value': unic_str2})
        assert _db.contains(where('value') == byte_str1)
        assert _db.contains(where('value') == unic_str1)
        assert _db.contains(where('value') == byte_str2)
        assert _db.contains(where('value') == unic_str2)
项目:annotated-py-tinydb    作者:hhstore    | 项目源码 | 文件源码
def test_eids_json(tmpdir):
    """
    Regression test for issue #45
    """

    path = str(tmpdir.join('db.json'))

    with TinyDB(path) as _db:
        _db.purge()
        assert _db.insert({'int': 1, 'char': 'a'}) == 1
        assert _db.insert({'int': 1, 'char': 'a'}) == 2

        _db.purge()
        assert _db.insert_multiple([{'int': 1, 'char': 'a'},
                                    {'int': 1, 'char': 'b'},
                                    {'int': 1, 'char': 'c'}]) == [1, 2, 3]

        assert _db.contains(eids=[1, 2])
        assert not _db.contains(eids=[88])

        _db.update({'int': 2}, eids=[1, 2])
        assert _db.count(where('int') == 2) == 2

        el = _db.all()[0]
        assert _db.get(eid=el.eid) == el
        assert _db.get(eid=float('NaN')) is None

        _db.remove(eids=[1, 2])
        assert len(_db) == 1