我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用tinydb.TinyDB()。
def __init__(self, db=None, topic=None, starter=None, enders=['.', '!', '?', ';', ',', ':']): self.db = TinyDB(db) self.enders = enders self.topic = topic self.starter = starter self.text = None self.sentences = [] self.words = [] self.bigrams = [] self.trigrams = [] self.transitions = defaultdict(list) self.starters = [] self.wordFreq = defaultdict(Counter) self.bigramFreq = defaultdict(Counter) self.getAll()
def insert_entity(projectId,product, categories, table_name, version="v1",prefix="",items="items"): db = TinyDB("project_dbs/" + projectId + ".json") service = discovery.build(product, version, credentials=storage.get()) while categories: api_entity = getattr(service, categories.pop(0))() service = api_entity request = api_entity.list(project=prefix+projectId) try: while request is not None: response = request.execute() for item in response[items]: db.table(table_name).insert(item) try: request = api_entity.list_next(previous_request=request, previous_response=response) except AttributeError: request = None except KeyError: pass
def add_to_history(user_history, sender, receiver, amount, action, finish=False, tx_id="", tip_id=""): # convert object to string of name if necessary if type(user_history) is models.User: user_history = user_history.username if tip_id == "": tip_id = random.randint(0, 99999999) bot_logger.logger.info("Save for history user=%s, sender=%s, receiver=%s, amount=%s, action=%s, finish=%s" % ( user_history, sender, receiver, amount, action, finish)) db = TinyDB(config.history_path + user_history + '.json') db.insert({ "id": tip_id, "user": user_history, "sender": sender, "receiver": receiver, "amount": amount, "action": action, "finish": finish, "status": "", "tx_id": tx_id, 'time': datetime.datetime.now().isoformat(), }) db.close()
def add_to_history_tip(user_history, action, tip): # convert object to string of name if necessary if type(user_history) is models.User: user_history = user_history.username bot_logger.logger.info("Save for history user=%s, sender=%s, receiver=%s, amount=%s, action=%s, finish=%s" % ( user_history, tip.sender.username, tip.receiver.username, tip.amount, action, tip.finish)) db = TinyDB(config.history_path + user_history + '.json') db.insert({ "user": user_history, "id": tip.id, "sender": tip.sender.username, "receiver": tip.receiver.username, "amount": tip.amount, "action": action, "finish": tip.finish, "status": tip.status, "tx_id": tip.tx_id, 'time': tip.time, }) db.close()
def update_tip(user_history, tip): # convert object to string of name if necessary if type(user_history) is models.User: user_history = user_history.username # update only finish tips bot_logger.logger.info("update history for user=%s, tip.tx_id=%s" % (user_history, tip.tx_id)) if tip.id is not None: bot_logger.logger.info("update history for user=%s, tip.id=%s" % (user_history, tip.id)) db = TinyDB(config.history_path + user_history + '.json') tip_query = Query() db.update({'finish': tip.finish}, tip_query.id == tip.id) db.update({'tx_id': tip.tx_id}, tip_query.id == tip.id) db.update({'status': tip.status}, tip_query.id == tip.id) db.close() else: bot_logger.logger.warn("update history fail user=%s, tip.id=%s" % (user_history, tip.id))
def update_withdraw(user_history, status, tx_id, tip_id): # convert object to string of name if necessary if type(user_history) is models.User: user_history = user_history.username # update only finish tips if tip_id is not None: bot_logger.logger.info("update history for user=%s, tip.id=%s" % (user_history, tip_id)) db = TinyDB(config.history_path + user_history + '.json') tip_query = Query() db.update({'finish': status}, tip_query.id == tip_id) db.update({'tx_id': tx_id}, tip_query.id == tip_id) db.update({'status': "finish"}, tip_query.id == tip_id) db.close() else: bot_logger.logger.warn("update history fail user=%s, tip.id=%s" % (user_history, tip_id))
def add_address(username, address, active=True): # sanitize (just lower) username = str(unicode(username).lower()) db = TinyDB(config.user_file) table = db.table(username) # check if address not already exist user_db = Query() data = table.count(user_db.address == address) if data == 0: table.insert({"type": "simple", "address": address, "coin": "doge", "enable": False}) if active is True: UserStorage.active_user_address(username, address) else: bot_logger.logger.error("address %s already registered for %s " % (str(address), str(username)))
def get_user_address(cls, username): # sanitize (just lower) username = str(unicode(username).lower()) if UserStorage.exist(username): db = TinyDB(config.user_file) table = db.table(username) user_db = Query() data = table.search(user_db.enable == True) if len(data) > 0: return data[0].get('address') else: # username not found return None else: bot_logger.logger.error("get address of un-registered user %s " % (str(username)))
def active_user_address(cls, username, address): # sanitize (just lower) username = str(unicode(username).lower()) if UserStorage.exist(username): db = TinyDB(config.user_file) table = db.table(username) # check if address not already exist user_db = Query() data = table.count(user_db.address == address) if data == 1: # disable all other address enabled_address = table.search(user_db.enable == True) for item in enabled_address: table.update({"enable": False}, eids=[item.eid]) # enable only one table.update({"enable": True}, user_db.address == address) else: bot_logger.logger.error("active a not found address (%s) of user %s " % (str(address), str(username))) else: bot_logger.logger.error("active address of un-registered user %s " % (str(username)))
def number_gold_credit(): credit = 0 db = TinyDB(config.DATA_PATH + 'reddit_gold.json') data = db.all() db.close() for gold in data: if gold['status'] == "buy": # user have buy credits credit = credit - int(gold['quantity']) if gold['status'] == "refill": # user have buy credits credit = credit + int(gold['quantity']) return credit
def test_caching_json_write(tmpdir): path = str(tmpdir.join('test.db')) db = TinyDB(path, storage=CachingMiddleware(JSONStorage)) db.insert({'key': 'value'}) db.close() # Verify database filesize statinfo = os.stat(path) assert statinfo.st_size != 0 # Assert JSON file has been closed assert db._storage._handle.closed del db # Repoen database db = TinyDB(path, storage=CachingMiddleware(JSONStorage)) assert db.all() == [{'key': 'value'}]
def test_json_kwargs(tmpdir): db_file = tmpdir.join('test.db') db = TinyDB(str(db_file), sort_keys=True, indent=4, separators=(',', ': ')) # Write contents db.insert({'b': 1}) db.insert({'a': 1}) assert db_file.read() == '''{ "_default": { "1": { "b": 1 }, "2": { "a": 1 } } }'''
def test_json_readwrite(tmpdir): """ Regression test for issue #1 """ path = str(tmpdir.join('test.db')) # Create TinyDB instance db = TinyDB(path, storage=JSONStorage) item = {'name': 'A very long entry'} item2 = {'name': 'A short one'} get = lambda s: db.get(where('name') == s) db.insert(item) assert get('A very long entry') == item db.remove(where('name') == 'A very long entry') assert get('A very long entry') is None db.insert(item2) assert get('A short one') == item2 db.remove(where('name') == 'A short one') assert get('A short one') is None
def get_xueqiu_hold(cube_symbol,cube_weight): db = TinyDB('data/db_holding.json') print(cube_symbol) table = db.table(cube_symbol) db.purge_table(cube_symbol) req = urllib.request.Request(cube_hold_url+cube_symbol,headers = { 'User-Agent': 'Mozilla/5.0 (Windows NT 5.1; rv:33.0) Gecko/20100101 Firefox/33.0', 'cookie':cookie }) soup = urllib.request.urlopen(req).read().decode('utf-8') soup = BeautifulSoup(soup, 'lxml') script = soup.find('script', text=re.compile('SNB\.cubeInfo')) json_text = re.search(r'^\s*SNB\.cubeInfo\s*=\s*({.*?})\s*;\s*$', script.string, flags=re.DOTALL | re.MULTILINE).group(1) # json_text.append({'cube_symbol':cube_symbol}).append({'cube_weight':cube_weight}) data = json.loads(json_text) # data.update({'cube_symbol',cube_symbol}).update({'cube_weight',cube_weight}) # data["view_rebalancing"]["holdings"].append("cube_symbol : "+cube_symbol) # data["view_rebalancing"]["holdings"].append("cube_weight : "+cube_weight) # print(data["view_rebalancing"]["holdings"]) # data["view_rebalancing"]["holdings"] table.insert({"cube_symbol":data["view_rebalancing"]["holdings"]}) # for row in table: # print(row)
def get_xueqiu_cube_list(category,count,orderby): url=cube_list_url+"?category="+category+"&count="+count+"&market=cn&profit="+orderby data = request(url,cookie) jsonObj = json.loads(data.read()) db = TinyDB('data/db_cube.json') table = db.table("Cube") db.purge_table("Cube") for TopestCube in jsonObj["list"]: table.insert(TopestCube) # for TopestCube in jsonObj["list"]: # id = TopestCube["id"] # name = TopestCube["name"] # symbol = TopestCube["symbol"] # daily_gain = TopestCube["daily_gain"] # monthly_gain = TopestCube["monthly_gain"] # annualized_gain_rate = TopestCube["annualized_gain_rate"] # total_gain = TopestCube["total_gain"] # print(jsonObj)
def SQSDatabase(path, name_constraint=''): """ Convienence function to create a TinyDB for the SQS database found at `path`. Parameters ---------- path : path-like of the folder containing the SQS database. name_constraint : Any name constraint to add into the recursive glob. Not case sensitive. Exact substring. Returns ------- TinyDB Database of abstract SQS. """ db = TinyDB(storage=MemoryStorage) dataset_filenames = recursive_glob(path, '*.json') dataset_filenames = [fname for fname in dataset_filenames if name_constraint.upper() in fname.upper()] for fname in dataset_filenames: with open(fname) as file_: try: db.insert(json.load(file_)) except ValueError as e: raise ValueError('JSON Error in {}: {}'.format(fname, e)) return db
def __init__(self, publisher, settings): self.logger = logging.getLogger(' Db') try: if not os.path.exists(settings.db_dir): self.logger.info("db directory doesn't exist - creating...") os.makedirs(settings.db_dir) except IOError as e: self.logger.critical("Couldn't create directory " + settings.db_dir + " : " + str(e)) self.db_file = 'db.json' db_path = settings.db_dir + '/' + self.db_file self.publisher = publisher try: if not os.path.isfile(db_path): self.logger.info("db file doesn't exist - creating...") self.db = TinyDB(db_path) self.db.table('worksets') self.db.purge_table('default') except IOError as e: self.logger.critical("Couldn't create db file: " + str(e)) self.db = TinyDB(db_path) self.w_query = Query()
def init_bridge(): """Parse the configuration file and set relevant variables.""" conf_path = os.path.abspath(os.getenv('WAT_CONF', '')) if not conf_path or not os.path.isfile(conf_path): sys.exit('Could not find configuration file') parser = configparser.ConfigParser() parser.read(conf_path) # Whatsapp settings SETTINGS['wa_phone'] = parser.get('wa', 'phone') SETTINGS['wa_password'] = parser.get('wa', 'password') # Telegram settings SETTINGS['owner'] = parser.getint('tg', 'owner') SETTINGS['tg_token'] = parser.get('tg', 'token') # TinyDB global DB DB = TinyDB(parser.get('db', 'path')) DB.table_class = SmartCacheTable
def generate_db(tsv_file, db_file): """ Create db from tsv file """ logger.info("Converting tsv %s to db file %s", tsv_file, db_file) if os.path.exists(db_file): os.remove(db_file) db = TinyDB(db_file) with codecs.open(tsv_file, "rb", encoding="utf-8") as f: row = f.readline().split("\t") headers = [SanskritObject(x).canonical() for x in row[0:8]] logger.info("Found dhatu tsv headers: {}".format(str(headers))) # FIXME - Rewrite from here for row in f: entries = row.split("\t")[:len(headers)] entries = [SanskritObject(e).canonical() for e in entries] j = dict(zip(headers, entries)) db.insert(j) db.close() logger.info("Saved dhatus database")
def __init__(self, db='nonedb.json'): # Storage and serialization serializer = SerializationMiddleware(tinydb.storages.JSONStorage) serializer.register_serializer(DateTimeSerializer(), 'TinyDateTime') # A reference to the actual database object. self._conn = tinydb.TinyDB(db, storage=serializer) # Activat SmartCache self._conn.table_class = SmartCacheTable # A shortcut to ``tinydb.TinyDB.table`` method. # See http://tinydb.readthedocs.org/en/latest/usage.html#tables # for reference. self.table = self._conn.table # A shortcut to ``tinydb.where`` object. # See http://tinydb.readthedocs.org/en/latest/usage.html#queries # for reference. self.where = tinydb.where
def search_db(self, query): file_name = self.items["result log"]["db_file"] with TinyDB(file_name) as db_entity: summaries = db_entity.search(query) return summaries return None
def save_result_summaries(self, result_summaries): f_name = self.items["result log"]["db_file"] with TinyDB(f_name) as db_entity: eids = db_entity.insert_multiple(result_summaries) return eids return None
def update_result_summary_in_db(self, result_summary, eids): f_name = self.items["result log"]["db_file"] with TinyDB(f_name) as db_entity: eids2 = db_entity.update(result_summary, eids=eids) return eids2 return True
def dump_log(self): f_name = self.items["result log"]["db_file"] with TinyDB(f_name) as db_entity: return db_entity.all()
def remove_summaries_by_eids(self, eids): f_name = self.items["result log"]["db_file"] with TinyDB(f_name) as db_entity: return db_entity.remove(eids=eids)
def __init__(self): """Adds self to messages and event's `data` field. Through this instance you can access TinyDB instance (data["tinydbproxy"].tinydb). This plugin should be included first! """ super().__init__() self.tinydb = TinyDB(path=self.get_path("tinydb_database.json"), storage=CachingMiddleware(JSONStorage))
def _opendb(self): self.middleware = CachingMiddleware(JSONStorage) self.middleware.WRITE_CACHE_SIZE = 500 self.db = TinyDB(self.conn_str, storage=self.middleware, default_table=self.default_table)
def main(): """Method to put it all together""" db = TinyDB(os.path.join(os.getcwd(), DATABASE_NAME)) for sort_type in ["hot", "new", "top"]: logger.debug("Fetching url: {}".format(sort_type)) json_data = get_json_from_subreddit(sort_type, 100) json_list = [] for data in json_data: for post in data['data']['children']: if is_post_valid(post): j = convert_post_to_json(post) if j is not None: insert_into_database(db, j)
def get_genres(database_name): """Utility method to get all the genres as a set""" db = TinyDB(os.path.join(os.getcwd(), database_name)) all_genres = { song['genre'] for song in db.all() } specific_genres = set() for genre in all_genres: specific_genres = specific_genres.union(set(genre.strip().split('/'))) db.close() return _strip_spaces(specific_genres)
def get_total_songs(database_name): """Utility Method to get the total number of songs in the database""" db = TinyDB(os.path.join(os.getcwd(), database_name)) total_length = len(db.all()) db.close() return total_length
def setUp(self): with open(os.path.join(BASE_DIR, 'tests', 'fixtures', 'test.json')) as f: self.j = json.load(f) self.database_name = os.path.join(os.getcwd(), 'test.db') db = TinyDB(self.database_name) db.insert_multiple(self.j) db.close
def __init__(self, page=None, db=None): self.db = TinyDB(db) self.page = page # TODO need to do some check that database is well formed # TODO and that the page is a well-formed WebPage object self.db.insert({ 'url': self.page.link, 'title': self.page.title, 'text': self.page.text, 'links': self.page.links})
def gatherText(self): ''' Takes a TinyDB with a "text" field and combines all the text into a one list of texts ''' all_text = "" for item in self.db: all_text = all_text + " " + clean_text(item['text']) self.text = all_text
def set(self, name, value): db = TinyDB(self.filename) db.upsert({'name': name, 'value': value}, where('name') == name)
def get(self, name, default=None): db = TinyDB(self.filename) item = db.get(where('name') == name) if item is not None: return item.get('value', default) return default
def __init__(self): self.node_details_content = None self.public_files_db = TinyDB('public_files.json') self.table = self.public_files_db.table('public_files')
def __init__(self): self.ownstorj_playlists_db = TinyDB('ownstorj_playlists.json') self.playlists_table = self.ownstorj_playlists_db.table('playlists') self.tracks_table = self.ownstorj_playlists_db.table('tracks')
def __init__(self, **attributes): self.__dict__ = attributes self.db = TinyDB(TINY_DB_PATH) self.table = self.db.table(self.__class__.__name__)
def list_log_services(): projectId = TinyDB('projects.json').table("Project").all() resp, content = storage.get().authorize(Http()).request("https://logging.googleapis.com/v1beta3/projects/" + projectId + "/logServices","GET") return [service['name'] for service in json.loads(content)['logServices']]
def insert_templates(): projectId = TinyDB('projects.json').table("Project").all() request = instanceTemplates.list(project=projectId) try: while request is not None: response = request.execute() for instanceTemplate in response['items']: template_table.insert(instanceTemplate) request = instanceTemplates.list_next(previous_request=request, previous_response=response) except KeyError: pass
def insert_instance_groups(): projectId = TinyDB('projects.json').table("Project").all() for zone in get_zones(): request = instanceGroups.list(project=projectId, zone=zone) try: while request is not None: response = request.execute() for instanceGroup in response['items']: group_table.insert(instanceGroup) request = instanceGroups.list_next(previous_request=request, previous_response=response) except KeyError: pass
def get_zones(): projectId = TinyDB('projects.json').table("Project").all() results = [] request = zones.list(project=projectId) response = request.execute()['items'] for result in response: results.append(result['name']) return results
def loadDBs(service, indexFiles): dBs = {} datadir = get_data_directory(service) indexMetadata = getIndexMetadata(service) #Files in Lambda can only be created in the /tmp filesystem - If it doesn't exist, create it. lambdaFileSystem = '/tmp/'+service+'/data' if not os.path.exists(lambdaFileSystem): os.makedirs(lambdaFileSystem) for i in indexFiles: db = tinydb.TinyDB(lambdaFileSystem+'/'+i+'.json') #TODO: remove circular dependency from utils, so I can use the method get_index_file_name #TODO: initial tests show that is faster (by a few milliseconds) to populate the file from scratch). See if I should load from scratch all the time #TODO:Create a file that is an index of those files that have been generated, so the code knows which files to look for and avoid creating unnecesary empty .json files if len(db) == 0: try: with open(datadir+i+'.csv', 'rb') as csvfile: pricelist = csv.DictReader(csvfile, delimiter=',', quotechar='"') db.insert_multiple(pricelist) except IOError: pass dBs[i]=db return dBs, indexMetadata
def __init__(self): self.db = TinyDB(os.path.join(os.path.join(os.getcwd(), os.path.dirname(__file__)), ListCache.DB_FILE))