Python tinydb 模块,TinyDB() 实例源码

我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用tinydb.TinyDB()

项目:ask_data_science    作者:AngelaVC    | 项目源码 | 文件源码
def __init__(self, db=None, topic=None, starter=None, enders=['.', '!', '?', ';', ',', ':']):
        self.db = TinyDB(db)
        self.enders = enders
        self.topic = topic
        self.starter = starter
        self.text = None
        self.sentences = []
        self.words = []
        self.bigrams = []
        self.trigrams = []
        self.transitions = defaultdict(list)
        self.starters = []
        self.wordFreq = defaultdict(Counter)
        self.bigramFreq = defaultdict(Counter)

        self.getAll()
项目:G-Scout    作者:nccgroup    | 项目源码 | 文件源码
def insert_entity(projectId,product, categories, table_name, version="v1",prefix="",items="items"):
    db = TinyDB("project_dbs/" + projectId + ".json")
        service = discovery.build(product, version, credentials=storage.get())
        while categories:
                api_entity = getattr(service, categories.pop(0))()
                service = api_entity
        request = api_entity.list(project=prefix+projectId)
    try:
        while request is not None:
            response = request.execute()
            for item in response[items]:
                db.table(table_name).insert(item)
            try:
                request = api_entity.list_next(previous_request=request, previous_response=response)
            except AttributeError:
                request = None
    except KeyError:
        pass
项目:sodogetip    作者:just-an-dev    | 项目源码 | 文件源码
def add_to_history(user_history, sender, receiver, amount, action, finish=False, tx_id="", tip_id=""):
        # convert object to string of name if necessary
        if type(user_history) is models.User:
            user_history = user_history.username

        if tip_id == "":
            tip_id = random.randint(0, 99999999)

        bot_logger.logger.info("Save for history user=%s, sender=%s, receiver=%s, amount=%s, action=%s, finish=%s" % (
            user_history, sender, receiver, amount, action, finish))

        db = TinyDB(config.history_path + user_history + '.json')
        db.insert({
            "id": tip_id,
            "user": user_history,
            "sender": sender,
            "receiver": receiver,
            "amount": amount,
            "action": action,
            "finish": finish,
            "status": "",
            "tx_id": tx_id,
            'time': datetime.datetime.now().isoformat(),
        })
        db.close()
项目:sodogetip    作者:just-an-dev    | 项目源码 | 文件源码
def add_to_history_tip(user_history, action, tip):
        # convert object to string of name if necessary
        if type(user_history) is models.User:
            user_history = user_history.username

        bot_logger.logger.info("Save for history user=%s, sender=%s, receiver=%s, amount=%s, action=%s, finish=%s" % (
            user_history, tip.sender.username, tip.receiver.username, tip.amount, action, tip.finish))

        db = TinyDB(config.history_path + user_history + '.json')
        db.insert({
            "user": user_history,
            "id": tip.id,
            "sender": tip.sender.username,
            "receiver": tip.receiver.username,
            "amount": tip.amount,
            "action": action,
            "finish": tip.finish,
            "status": tip.status,
            "tx_id": tip.tx_id,
            'time': tip.time,
        })
        db.close()
项目:sodogetip    作者:just-an-dev    | 项目源码 | 文件源码
def update_tip(user_history, tip):
        # convert object to string of name if necessary
        if type(user_history) is models.User:
            user_history = user_history.username

        # update only finish tips
        bot_logger.logger.info("update history for user=%s, tip.tx_id=%s" % (user_history, tip.tx_id))
        if tip.id is not None:
            bot_logger.logger.info("update history for user=%s, tip.id=%s" % (user_history, tip.id))

            db = TinyDB(config.history_path + user_history + '.json')
            tip_query = Query()
            db.update({'finish': tip.finish}, tip_query.id == tip.id)
            db.update({'tx_id': tip.tx_id}, tip_query.id == tip.id)
            db.update({'status': tip.status}, tip_query.id == tip.id)
            db.close()
        else:
            bot_logger.logger.warn("update history fail user=%s, tip.id=%s" % (user_history, tip.id))
项目:sodogetip    作者:just-an-dev    | 项目源码 | 文件源码
def update_withdraw(user_history, status, tx_id, tip_id):
        # convert object to string of name if necessary
        if type(user_history) is models.User:
            user_history = user_history.username

        # update only finish tips
        if tip_id is not None:
            bot_logger.logger.info("update history for user=%s, tip.id=%s" % (user_history, tip_id))

            db = TinyDB(config.history_path + user_history + '.json')
            tip_query = Query()
            db.update({'finish': status}, tip_query.id == tip_id)
            db.update({'tx_id': tx_id}, tip_query.id == tip_id)
            db.update({'status': "finish"}, tip_query.id == tip_id)
            db.close()
        else:
            bot_logger.logger.warn("update history fail user=%s, tip.id=%s" % (user_history, tip_id))
项目:sodogetip    作者:just-an-dev    | 项目源码 | 文件源码
def add_address(username, address, active=True):
        # sanitize (just lower)
        username = str(unicode(username).lower())

        db = TinyDB(config.user_file)
        table = db.table(username)

        # check if address not already exist
        user_db = Query()
        data = table.count(user_db.address == address)

        if data == 0:
            table.insert({"type": "simple", "address": address, "coin": "doge", "enable": False})

            if active is True:
                UserStorage.active_user_address(username, address)
        else:
            bot_logger.logger.error("address %s already registered for  %s " % (str(address), str(username)))
项目:sodogetip    作者:just-an-dev    | 项目源码 | 文件源码
def get_user_address(cls, username):
        # sanitize (just lower)
        username = str(unicode(username).lower())

        if UserStorage.exist(username):
            db = TinyDB(config.user_file)
            table = db.table(username)
            user_db = Query()
            data = table.search(user_db.enable == True)
            if len(data) > 0:
                return data[0].get('address')
            else:
                # username not found
                return None
        else:
            bot_logger.logger.error("get address of un-registered user  %s " % (str(username)))
项目:sodogetip    作者:just-an-dev    | 项目源码 | 文件源码
def active_user_address(cls, username, address):
        # sanitize (just lower)
        username = str(unicode(username).lower())

        if UserStorage.exist(username):
            db = TinyDB(config.user_file)
            table = db.table(username)

            # check if address not already exist
            user_db = Query()
            data = table.count(user_db.address == address)

            if data == 1:
                # disable all other address
                enabled_address = table.search(user_db.enable == True)
                for item in enabled_address:
                    table.update({"enable": False}, eids=[item.eid])

                # enable only one
                table.update({"enable": True}, user_db.address == address)
            else:
                bot_logger.logger.error("active a not found address (%s)  of user  %s " % (str(address), str(username)))

        else:
            bot_logger.logger.error("active address of un-registered user  %s " % (str(username)))
项目:sodogetip    作者:just-an-dev    | 项目源码 | 文件源码
def number_gold_credit():
    credit = 0
    db = TinyDB(config.DATA_PATH + 'reddit_gold.json')
    data = db.all()
    db.close()

    for gold in data:

        if gold['status'] == "buy":
            # user have buy credits
            credit = credit - int(gold['quantity'])

        if gold['status'] == "refill":
            # user have buy credits
            credit = credit + int(gold['quantity'])

    return credit
项目:annotated-py-tinydb    作者:hhstore    | 项目源码 | 文件源码
def test_caching_json_write(tmpdir):
    path = str(tmpdir.join('test.db'))

    db = TinyDB(path, storage=CachingMiddleware(JSONStorage))

    db.insert({'key': 'value'})

    db.close()

    # Verify database filesize
    statinfo = os.stat(path)
    assert statinfo.st_size != 0

    # Assert JSON file has been closed
    assert db._storage._handle.closed

    del db

    # Repoen database
    db = TinyDB(path, storage=CachingMiddleware(JSONStorage))
    assert db.all() == [{'key': 'value'}]
项目:annotated-py-tinydb    作者:hhstore    | 项目源码 | 文件源码
def test_json_kwargs(tmpdir):
    db_file = tmpdir.join('test.db')
    db = TinyDB(str(db_file), sort_keys=True, indent=4, separators=(',', ': '))

    # Write contents
    db.insert({'b': 1})
    db.insert({'a': 1})

    assert db_file.read() == '''{
    "_default": {
        "1": {
            "b": 1
        },
        "2": {
            "a": 1
        }
    }
}'''
项目:annotated-py-tinydb    作者:hhstore    | 项目源码 | 文件源码
def test_json_readwrite(tmpdir):
    """
    Regression test for issue #1
    """
    path = str(tmpdir.join('test.db'))

    # Create TinyDB instance
    db = TinyDB(path, storage=JSONStorage)

    item = {'name': 'A very long entry'}
    item2 = {'name': 'A short one'}

    get = lambda s: db.get(where('name') == s)

    db.insert(item)
    assert get('A very long entry') == item

    db.remove(where('name') == 'A very long entry')
    assert get('A very long entry') is None

    db.insert(item2)
    assert get('A short one') == item2

    db.remove(where('name') == 'A short one')
    assert get('A short one') is None
项目:Xueqiu    作者:OliangchenO    | 项目源码 | 文件源码
def get_xueqiu_hold(cube_symbol,cube_weight):
    db = TinyDB('data/db_holding.json')
    print(cube_symbol)
    table = db.table(cube_symbol)
    db.purge_table(cube_symbol)
    req = urllib.request.Request(cube_hold_url+cube_symbol,headers = {
                'User-Agent': 'Mozilla/5.0 (Windows NT 5.1; rv:33.0) Gecko/20100101 Firefox/33.0',
                'cookie':cookie
               })
    soup = urllib.request.urlopen(req).read().decode('utf-8')
    soup = BeautifulSoup(soup, 'lxml')
    script = soup.find('script', text=re.compile('SNB\.cubeInfo'))
    json_text = re.search(r'^\s*SNB\.cubeInfo\s*=\s*({.*?})\s*;\s*$',
                      script.string, flags=re.DOTALL | re.MULTILINE).group(1)
#     json_text.append({'cube_symbol':cube_symbol}).append({'cube_weight':cube_weight})
    data = json.loads(json_text)
#     data.update({'cube_symbol',cube_symbol}).update({'cube_weight',cube_weight})
#     data["view_rebalancing"]["holdings"].append("cube_symbol : "+cube_symbol)
#     data["view_rebalancing"]["holdings"].append("cube_weight : "+cube_weight)
#     print(data["view_rebalancing"]["holdings"])
#     data["view_rebalancing"]["holdings"]
    table.insert({"cube_symbol":data["view_rebalancing"]["holdings"]})
#     for row in table:
#         print(row)
项目:Xueqiu    作者:OliangchenO    | 项目源码 | 文件源码
def get_xueqiu_cube_list(category,count,orderby):
    url=cube_list_url+"?category="+category+"&count="+count+"&market=cn&profit="+orderby
    data = request(url,cookie)
    jsonObj = json.loads(data.read())
    db = TinyDB('data/db_cube.json')
    table = db.table("Cube")
    db.purge_table("Cube")
    for TopestCube in jsonObj["list"]:
        table.insert(TopestCube)
#     for TopestCube in jsonObj["list"]:
#         id = TopestCube["id"]
#         name = TopestCube["name"]
#         symbol = TopestCube["symbol"]
#         daily_gain = TopestCube["daily_gain"]
#         monthly_gain = TopestCube["monthly_gain"]
#         annualized_gain_rate = TopestCube["annualized_gain_rate"]
#         total_gain = TopestCube["total_gain"]

#     print(jsonObj)
项目:prlworkflows    作者:PhasesResearchLab    | 项目源码 | 文件源码
def SQSDatabase(path, name_constraint=''):
    """
    Convienence function to create a TinyDB for the SQS database found at `path`.

    Parameters
    ----------
    path : path-like of the folder containing the SQS database.
    name_constraint : Any name constraint to add into the recursive glob. Not case sensitive. Exact substring.

    Returns
    -------
    TinyDB
        Database of abstract SQS.
    """
    db = TinyDB(storage=MemoryStorage)
    dataset_filenames = recursive_glob(path, '*.json')
    dataset_filenames = [fname for fname in dataset_filenames if name_constraint.upper() in fname.upper()]
    for fname in dataset_filenames:
        with open(fname) as file_:
            try:
                db.insert(json.load(file_))
            except ValueError as e:
                raise ValueError('JSON Error in {}: {}'.format(fname, e))
    return db
项目:Worksets    作者:DozyDolphin    | 项目源码 | 文件源码
def __init__(self, publisher, settings):
        self.logger = logging.getLogger(' Db')
        try:
            if not os.path.exists(settings.db_dir):
                self.logger.info("db directory doesn't exist - creating...")
                os.makedirs(settings.db_dir)
        except IOError as e:
            self.logger.critical("Couldn't create directory " + settings.db_dir + " : " + str(e))
        self.db_file = 'db.json'
        db_path = settings.db_dir + '/' + self.db_file
        self.publisher = publisher
        try:
            if not os.path.isfile(db_path):
                self.logger.info("db file doesn't exist - creating...")
                self.db = TinyDB(db_path)
                self.db.table('worksets')
                self.db.purge_table('default')
        except IOError as e:
            self.logger.critical("Couldn't create db file: " + str(e))
        self.db = TinyDB(db_path)
        self.w_query = Query()
项目:PokeBot    作者:bcc5160    | 项目源码 | 文件源码
def test_caching_json_write(tmpdir):
    path = str(tmpdir.join('test.db'))

    db = TinyDB(path, storage=CachingMiddleware(JSONStorage))

    db.insert({'key': 'value'})

    db.close()

    # Verify database filesize
    statinfo = os.stat(path)
    assert statinfo.st_size != 0

    # Assert JSON file has been closed
    assert db._storage._handle.closed

    del db

    # Repoen database
    db = TinyDB(path, storage=CachingMiddleware(JSONStorage))
    assert db.all() == [{'key': 'value'}]
项目:PokeBot    作者:bcc5160    | 项目源码 | 文件源码
def test_json_kwargs(tmpdir):
    db_file = tmpdir.join('test.db')
    db = TinyDB(str(db_file), sort_keys=True, indent=4, separators=(',', ': '))

    # Write contents
    db.insert({'b': 1})
    db.insert({'a': 1})

    assert db_file.read() == '''{
    "_default": {
        "1": {
            "b": 1
        },
        "2": {
            "a": 1
        }
    }
}'''
项目:PokeBot    作者:bcc5160    | 项目源码 | 文件源码
def test_json_readwrite(tmpdir):
    """
    Regression test for issue #1
    """
    path = str(tmpdir.join('test.db'))

    # Create TinyDB instance
    db = TinyDB(path, storage=JSONStorage)

    item = {'name': 'A very long entry'}
    item2 = {'name': 'A short one'}

    get = lambda s: db.get(where('name') == s)

    db.insert(item)
    assert get('A very long entry') == item

    db.remove(where('name') == 'A very long entry')
    assert get('A very long entry') is None

    db.insert(item2)
    assert get('A short one') == item2

    db.remove(where('name') == 'A short one')
    assert get('A short one') is None
项目:minihydra    作者:VillanCh    | 项目源码 | 文件源码
def test_caching_json_write(tmpdir):
    path = str(tmpdir.join('test.db'))

    db = TinyDB(path, storage=CachingMiddleware(JSONStorage))

    db.insert({'key': 'value'})

    db.close()

    # Verify database filesize
    statinfo = os.stat(path)
    assert statinfo.st_size != 0

    # Assert JSON file has been closed
    assert db._storage._handle.closed

    del db

    # Repoen database
    db = TinyDB(path, storage=CachingMiddleware(JSONStorage))
    assert db.all() == [{'key': 'value'}]
项目:minihydra    作者:VillanCh    | 项目源码 | 文件源码
def test_json_kwargs(tmpdir):
    db_file = tmpdir.join('test.db')
    db = TinyDB(str(db_file), sort_keys=True, indent=4, separators=(',', ': '))

    # Write contents
    db.insert({'b': 1})
    db.insert({'a': 1})

    assert db_file.read() == '''{
    "_default": {
        "1": {
            "b": 1
        },
        "2": {
            "a": 1
        }
    }
}'''
项目:minihydra    作者:VillanCh    | 项目源码 | 文件源码
def test_json_readwrite(tmpdir):
    """
    Regression test for issue #1
    """
    path = str(tmpdir.join('test.db'))

    # Create TinyDB instance
    db = TinyDB(path, storage=JSONStorage)

    item = {'name': 'A very long entry'}
    item2 = {'name': 'A short one'}

    get = lambda s: db.get(where('name') == s)

    db.insert(item)
    assert get('A very long entry') == item

    db.remove(where('name') == 'A very long entry')
    assert get('A very long entry') is None

    db.insert(item2)
    assert get('A short one') == item2

    db.remove(where('name') == 'A short one')
    assert get('A short one') is None
项目:wat-bridge    作者:rmed    | 项目源码 | 文件源码
def init_bridge():
    """Parse the configuration file and set relevant variables."""
    conf_path = os.path.abspath(os.getenv('WAT_CONF', ''))

    if not conf_path or not os.path.isfile(conf_path):
        sys.exit('Could not find configuration file')

    parser = configparser.ConfigParser()
    parser.read(conf_path)

    # Whatsapp settings
    SETTINGS['wa_phone'] = parser.get('wa', 'phone')
    SETTINGS['wa_password'] = parser.get('wa', 'password')

    # Telegram settings
    SETTINGS['owner'] = parser.getint('tg', 'owner')
    SETTINGS['tg_token'] = parser.get('tg', 'token')

    # TinyDB
    global DB
    DB = TinyDB(parser.get('db', 'path'))
    DB.table_class = SmartCacheTable
项目:sanskrit_parser    作者:kmadathil    | 项目源码 | 文件源码
def generate_db(tsv_file, db_file):
        """ Create db from tsv file """
        logger.info("Converting tsv %s to db file %s", tsv_file, db_file)
        if os.path.exists(db_file):
            os.remove(db_file)
        db = TinyDB(db_file)
        with codecs.open(tsv_file, "rb", encoding="utf-8") as f:
            row = f.readline().split("\t")
            headers = [SanskritObject(x).canonical() for x in row[0:8]]
            logger.info("Found dhatu tsv headers: {}".format(str(headers)))
            # FIXME - Rewrite from here
            for row in f:
                entries = row.split("\t")[:len(headers)]
                entries = [SanskritObject(e).canonical() for e in entries]
                j = dict(zip(headers, entries))
                db.insert(j)
        db.close()
        logger.info("Saved dhatus database")
项目:tinydb-jsonorm    作者:abnerjacobsen    | 项目源码 | 文件源码
def __init__(self, db='nonedb.json'):

        # Storage and serialization
        serializer = SerializationMiddleware(tinydb.storages.JSONStorage)
        serializer.register_serializer(DateTimeSerializer(), 'TinyDateTime')

        # A reference to the actual database object.
        self._conn = tinydb.TinyDB(db, storage=serializer)

        # Activat SmartCache
        self._conn.table_class = SmartCacheTable

        # A shortcut to ``tinydb.TinyDB.table`` method.
        # See http://tinydb.readthedocs.org/en/latest/usage.html#tables
        # for reference.
        self.table = self._conn.table

        # A shortcut to ``tinydb.where`` object.
        # See http://tinydb.readthedocs.org/en/latest/usage.html#queries
        # for reference.
        self.where = tinydb.where
项目:dontwi    作者:vocalodon    | 项目源码 | 文件源码
def search_db(self, query):
        file_name = self.items["result log"]["db_file"]
        with TinyDB(file_name) as db_entity:
            summaries = db_entity.search(query)
            return summaries
        return None
项目:dontwi    作者:vocalodon    | 项目源码 | 文件源码
def save_result_summaries(self, result_summaries):
        f_name = self.items["result log"]["db_file"]
        with TinyDB(f_name) as db_entity:
            eids = db_entity.insert_multiple(result_summaries)
            return eids
        return None
项目:dontwi    作者:vocalodon    | 项目源码 | 文件源码
def update_result_summary_in_db(self, result_summary, eids):
        f_name = self.items["result log"]["db_file"]
        with TinyDB(f_name) as db_entity:
            eids2 = db_entity.update(result_summary, eids=eids)
            return eids2
        return True
项目:dontwi    作者:vocalodon    | 项目源码 | 文件源码
def dump_log(self):
        f_name = self.items["result log"]["db_file"]
        with TinyDB(f_name) as db_entity:
            return db_entity.all()
项目:dontwi    作者:vocalodon    | 项目源码 | 文件源码
def remove_summaries_by_eids(self, eids):
        f_name = self.items["result log"]["db_file"]
        with TinyDB(f_name) as db_entity:
            return db_entity.remove(eids=eids)
项目:sketal    作者:vk-brain    | 项目源码 | 文件源码
def __init__(self):
        """Adds self to messages and event's `data` field.
        Through this instance you can access TinyDB instance (data["tinydbproxy"].tinydb).
        This plugin should be included first!
        """

        super().__init__()

        self.tinydb = TinyDB(path=self.get_path("tinydb_database.json"), storage=CachingMiddleware(JSONStorage))
项目:linchpin    作者:CentOS-PaaS-SIG    | 项目源码 | 文件源码
def _opendb(self):
        self.middleware = CachingMiddleware(JSONStorage)
        self.middleware.WRITE_CACHE_SIZE = 500
        self.db = TinyDB(self.conn_str, storage=self.middleware,
                         default_table=self.default_table)
项目:demo-day-vikings    作者:Mester    | 项目源码 | 文件源码
def main():
    """Method to put it all together"""
    db = TinyDB(os.path.join(os.getcwd(), DATABASE_NAME))
    for sort_type in ["hot", "new", "top"]:
        logger.debug("Fetching url: {}".format(sort_type))
        json_data = get_json_from_subreddit(sort_type, 100)
        json_list = []
        for data in json_data:
            for post in data['data']['children']:
                if is_post_valid(post):
                    j = convert_post_to_json(post)
                    if j is not None:
                        insert_into_database(db, j)
项目:demo-day-vikings    作者:Mester    | 项目源码 | 文件源码
def get_genres(database_name):
    """Utility method to get all the genres as a set"""
    db = TinyDB(os.path.join(os.getcwd(), database_name))
    all_genres = { song['genre'] for song in db.all() }
    specific_genres = set()
    for genre in all_genres:
        specific_genres = specific_genres.union(set(genre.strip().split('/')))
    db.close()
    return _strip_spaces(specific_genres)
项目:demo-day-vikings    作者:Mester    | 项目源码 | 文件源码
def get_total_songs(database_name):
    """Utility Method to get the total number of songs in the database"""
    db = TinyDB(os.path.join(os.getcwd(), database_name))
    total_length = len(db.all())
    db.close()
    return total_length
项目:demo-day-vikings    作者:Mester    | 项目源码 | 文件源码
def setUp(self):
        with open(os.path.join(BASE_DIR, 'tests', 'fixtures', 'test.json')) as f:
            self.j = json.load(f)
        self.database_name = os.path.join(os.getcwd(), 'test.db')
        db = TinyDB(self.database_name)
        db.insert_multiple(self.j)
        db.close
项目:ask_data_science    作者:AngelaVC    | 项目源码 | 文件源码
def __init__(self, page=None, db=None):
        self.db = TinyDB(db)
        self.page = page
        # TODO need to do some check that database is well formed
        # TODO and that the page is a well-formed WebPage object

        self.db.insert({
                'url': self.page.link,
                'title': self.page.title,
                'text': self.page.text,
                'links': self.page.links})
项目:ask_data_science    作者:AngelaVC    | 项目源码 | 文件源码
def gatherText(self):
        ''' Takes a TinyDB with a "text" field and combines all the text into
        a one list of texts
        '''
        all_text = ""
        for item in self.db:
            all_text = all_text + " " + clean_text(item['text'])
        self.text = all_text
项目:telebot    作者:DronMDF    | 项目源码 | 文件源码
def set(self, name, value):
        db = TinyDB(self.filename)
        db.upsert({'name': name, 'value': value}, where('name') == name)
项目:telebot    作者:DronMDF    | 项目源码 | 文件源码
def get(self, name, default=None):
        db = TinyDB(self.filename)
        item = db.get(where('name') == name)
        if item is not None:
            return item.get('value', default)
        return default
项目:EasyStorj    作者:lakewik    | 项目源码 | 文件源码
def __init__(self):
        self.node_details_content = None
        self.public_files_db = TinyDB('public_files.json')
        self.table = self.public_files_db.table('public_files')
项目:EasyStorj    作者:lakewik    | 项目源码 | 文件源码
def __init__(self):
        self.ownstorj_playlists_db = TinyDB('ownstorj_playlists.json')
        self.playlists_table = self.ownstorj_playlists_db.table('playlists')
        self.tracks_table = self.ownstorj_playlists_db.table('tracks')
项目:simple-environment-monitor-system    作者:diegorubin    | 项目源码 | 文件源码
def __init__(self, **attributes):
        self.__dict__ = attributes
        self.db = TinyDB(TINY_DB_PATH)
        self.table = self.db.table(self.__class__.__name__)
项目:G-Scout    作者:nccgroup    | 项目源码 | 文件源码
def list_log_services():
    projectId = TinyDB('projects.json').table("Project").all()
    resp, content = storage.get().authorize(Http()).request("https://logging.googleapis.com/v1beta3/projects/" + projectId + "/logServices","GET")
    return [service['name'] for service in json.loads(content)['logServices']]
项目:G-Scout    作者:nccgroup    | 项目源码 | 文件源码
def insert_templates():
    projectId = TinyDB('projects.json').table("Project").all()
    request = instanceTemplates.list(project=projectId)
        try:
            while request is not None:
                response = request.execute()
                for instanceTemplate in response['items']:
                    template_table.insert(instanceTemplate)
                request = instanceTemplates.list_next(previous_request=request, previous_response=response)
        except KeyError:
            pass
项目:G-Scout    作者:nccgroup    | 项目源码 | 文件源码
def insert_instance_groups():
    projectId = TinyDB('projects.json').table("Project").all()
    for zone in get_zones():
        request = instanceGroups.list(project=projectId, zone=zone)
        try:
            while request is not None:
                response = request.execute()
                for instanceGroup in response['items']:
                    group_table.insert(instanceGroup)
                request = instanceGroups.list_next(previous_request=request, previous_response=response)
        except KeyError:
            pass
项目:G-Scout    作者:nccgroup    | 项目源码 | 文件源码
def get_zones():
    projectId = TinyDB('projects.json').table("Project").all()
    results = []
    request = zones.list(project=projectId)
    response = request.execute()['items']
    for result in response:
        results.append(result['name'])
    return results
项目:aws-pricing-tools    作者:concurrencylabs    | 项目源码 | 文件源码
def loadDBs(service, indexFiles):

    dBs = {}
    datadir = get_data_directory(service)
    indexMetadata = getIndexMetadata(service)

    #Files in Lambda can only be created in the /tmp filesystem - If it doesn't exist, create it.
    lambdaFileSystem = '/tmp/'+service+'/data'
    if not os.path.exists(lambdaFileSystem):
      os.makedirs(lambdaFileSystem)

    for i in indexFiles:
      db = tinydb.TinyDB(lambdaFileSystem+'/'+i+'.json')
      #TODO: remove circular dependency from utils, so I can use the method get_index_file_name
      #TODO: initial tests show that is faster (by a few milliseconds) to populate the file from scratch). See if I should load from scratch all the time
      #TODO:Create a file that is an index of those files that have been generated, so the code knows which files to look for and avoid creating unnecesary empty .json files
      if len(db) == 0:
        try:
          with open(datadir+i+'.csv', 'rb') as csvfile:
              pricelist = csv.DictReader(csvfile, delimiter=',', quotechar='"')
              db.insert_multiple(pricelist)
        except IOError:
          pass
      dBs[i]=db

    return dBs, indexMetadata
项目:househunt    作者:althor880    | 项目源码 | 文件源码
def __init__(self):
        self.db = TinyDB(os.path.join(os.path.join(os.getcwd(), os.path.dirname(__file__)), ListCache.DB_FILE))