Python tinydb 模块,Query() 实例源码

我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用tinydb.Query()

项目:aDTN-python    作者:megfault    | 项目源码 | 文件源码
def get_data(self):
        """
        Retrieve the data objects sorted by increasing popularity, namely in increasing receive_count, then send_count
        and finally the last time they were sent by the current aDTN node.
        :return: data objects sorted by increasing popularity.
        """
        with self.lock:
            Stats = Query()
            stats = self.stats.search(Stats.deleted == False)
            res = sorted(stats, key=lambda x: (x['receive_count'], x['send_count'], x['last_sent']))[:10]
            now = int(time.time())
            objects = []
            for r in res:
                idx = r['idx']
                Objects = Query()
                obj = self.data.search(Objects.idx == idx)[0]['content']
                objects.append(obj)
                self.stats.update({'last_sent': now}, Objects.idx == idx)
                self.stats.update(increment('send_count'), Objects.idx == idx)
        return objects
项目:aDTN-python    作者:megfault    | 项目源码 | 文件源码
def delete_data(self, object_id):
        """
        Delete a data object given its ID.
        :param object_id: ID of the data object to delete.
        """
        with self.lock:
            Stats = Query()
            Message = Query()
            res = self.stats.search(Stats.idx == object_id)
            self.stats.update({'deleted': True}, Stats.idx == object_id)
            record = self.data.get(Message.idx == object_id)
            if record is not None:
                self.data.remove(eids=[record.eid])
                log_debug("Deleted message: {}".format(object_id))
            else:
                log_debug("No data to delete: {}".format(object_id))
项目:sodogetip    作者:just-an-dev    | 项目源码 | 文件源码
def update_tip(user_history, tip):
        # convert object to string of name if necessary
        if type(user_history) is models.User:
            user_history = user_history.username

        # update only finish tips
        bot_logger.logger.info("update history for user=%s, tip.tx_id=%s" % (user_history, tip.tx_id))
        if tip.id is not None:
            bot_logger.logger.info("update history for user=%s, tip.id=%s" % (user_history, tip.id))

            db = TinyDB(config.history_path + user_history + '.json')
            tip_query = Query()
            db.update({'finish': tip.finish}, tip_query.id == tip.id)
            db.update({'tx_id': tip.tx_id}, tip_query.id == tip.id)
            db.update({'status': tip.status}, tip_query.id == tip.id)
            db.close()
        else:
            bot_logger.logger.warn("update history fail user=%s, tip.id=%s" % (user_history, tip.id))
项目:sodogetip    作者:just-an-dev    | 项目源码 | 文件源码
def update_withdraw(user_history, status, tx_id, tip_id):
        # convert object to string of name if necessary
        if type(user_history) is models.User:
            user_history = user_history.username

        # update only finish tips
        if tip_id is not None:
            bot_logger.logger.info("update history for user=%s, tip.id=%s" % (user_history, tip_id))

            db = TinyDB(config.history_path + user_history + '.json')
            tip_query = Query()
            db.update({'finish': status}, tip_query.id == tip_id)
            db.update({'tx_id': tx_id}, tip_query.id == tip_id)
            db.update({'status': "finish"}, tip_query.id == tip_id)
            db.close()
        else:
            bot_logger.logger.warn("update history fail user=%s, tip.id=%s" % (user_history, tip_id))
项目:sodogetip    作者:just-an-dev    | 项目源码 | 文件源码
def add_address(username, address, active=True):
        # sanitize (just lower)
        username = str(unicode(username).lower())

        db = TinyDB(config.user_file)
        table = db.table(username)

        # check if address not already exist
        user_db = Query()
        data = table.count(user_db.address == address)

        if data == 0:
            table.insert({"type": "simple", "address": address, "coin": "doge", "enable": False})

            if active is True:
                UserStorage.active_user_address(username, address)
        else:
            bot_logger.logger.error("address %s already registered for  %s " % (str(address), str(username)))
项目:sodogetip    作者:just-an-dev    | 项目源码 | 文件源码
def get_user_address(cls, username):
        # sanitize (just lower)
        username = str(unicode(username).lower())

        if UserStorage.exist(username):
            db = TinyDB(config.user_file)
            table = db.table(username)
            user_db = Query()
            data = table.search(user_db.enable == True)
            if len(data) > 0:
                return data[0].get('address')
            else:
                # username not found
                return None
        else:
            bot_logger.logger.error("get address of un-registered user  %s " % (str(username)))
项目:lainonlife    作者:barrucadu    | 项目源码 | 文件源码
def make_user(username, admin=False):
    check_query = Query()
    check_if_user_exists = THE_DB.search(check_query.id == username)
    if len(check_if_user_exists) > 0:
        return
    # generates a random 32 hex digit password
    password = "%032x" % random.getrandbits(128)
    new_user = {
        'id': username,
        'password': password,
        'banned': False,
        'admin': admin,
        'dj_name': username,
        'dj_pic': '',
        'stream_title': '',
        'stream_desc': '',
    }

    THE_DB.insert(new_user)
    return (username, password)
项目:democracybot    作者:LiaungYip    | 项目源码 | 文件源码
def make_token(user_id: int, poll_tag: str):
    q = Query()
    while True:
        # generate a unique token, made of 8 characters a-z.
        token = "".join(
            [random.choice(string.ascii_lowercase) for i in range(8)])
        if not tokens_db.contains(q.token == token):
            break

    tokens_db.insert({
        "user_id": user_id,
        "poll_tag": poll_tag,
        "token": token
    })

    return token
项目:ghost    作者:nir0s    | 项目源码 | 文件源码
def get(self, key_name):
        """Return a dictionary consisting of the key itself

        e.g.
        {u'created_at': u'2016-10-10 08:31:53',
         u'description': None,
         u'metadata': None,
         u'modified_at': u'2016-10-10 08:31:53',
         u'name': u'aws',
         u'uid': u'459f12c0-f341-413e-9d7e-7410f912fb74',
         u'value': u'the_value'}

        """
        result = self.db.search(Query().name == key_name)
        if not result:
            return {}
        return result[0]
项目:ghost    作者:nir0s    | 项目源码 | 文件源码
def list(self):
        """Return a list of all keys (not just key names, but rather the keys
        themselves).

        e.g.
         {u'created_at': u'2016-10-10 08:31:53',
          u'description': None,
          u'metadata': None,
          u'modified_at': u'2016-10-10 08:31:53',
          u'name': u'aws',
          u'uid': u'459f12c0-f341-413e-9d7e-7410f912fb74',
          u'value': u'the_value'},
         {u'created_at': u'2016-10-10 08:32:29',
          u'description': u'my gcp token',
          u'metadata': {u'owner': u'nir'},
          u'modified_at': u'2016-10-10 08:32:29',
          u'name': u'gcp',
          u'uid': u'a51a0043-f241-4d52-93c1-266a3c5de15e',
          u'value': u'the_value'}]

        """
        # TODO: Return only the key names from all storages
        return self.db.search(Query().name.matches('.*'))
项目:Worksets    作者:DozyDolphin    | 项目源码 | 文件源码
def __init__(self, publisher, settings):
        self.logger = logging.getLogger(' Db')
        try:
            if not os.path.exists(settings.db_dir):
                self.logger.info("db directory doesn't exist - creating...")
                os.makedirs(settings.db_dir)
        except IOError as e:
            self.logger.critical("Couldn't create directory " + settings.db_dir + " : " + str(e))
        self.db_file = 'db.json'
        db_path = settings.db_dir + '/' + self.db_file
        self.publisher = publisher
        try:
            if not os.path.isfile(db_path):
                self.logger.info("db file doesn't exist - creating...")
                self.db = TinyDB(db_path)
                self.db.table('worksets')
                self.db.purge_table('default')
        except IOError as e:
            self.logger.critical("Couldn't create db file: " + str(e))
        self.db = TinyDB(db_path)
        self.w_query = Query()
项目:tinymongo    作者:schapman1974    | 项目源码 | 文件源码
def parse_query(self, query):
        """
        Creates a tinydb Query() object from the query dict

        :param query: object containing the dictionary representation of the
        query
        :return: composite Query()
        """
        logger.debug(u'query to parse2: {}'.format(query))

        # this should find all records
        if query == {} or query is None:
            return Query()._id != u'-1'  # noqa

        q = None
        # find the final result of the generator
        for c in self.parse_condition(query):
            if q is None:
                q = c
            else:
                q = q & c

        logger.debug(u'new query item2: {}'.format(q))

        return q
项目:KeyCounter    作者:Microcore    | 项目源码 | 文件源码
def save(self, day, count):
        '''
        Save count for given day.
        Args:
          - day: <datetime>
          - count: <int>
        Returns: None
        '''
        date = day.strftime(self.__date_format)
        count = int(count)
        Day = tinydb.Query()

        if self.__db.contains(Day.Date == date):
            self.__db.update({'Count': count}, Day.Date == date)
        else:
            self.__db.insert({'Date': date, 'Count': count})
项目:NetSpark-Scripts    作者:admiralspark    | 项目源码 | 文件源码
def populate():
    ''' Populates the DB with data. This is old code, needs cleanup '''
    with open(filename, mode='r') as f:
        reader = csv.DictReader(f)
    # Now iterate through every row in the CSVfile and set variables
        for row in reader:
            ip = row['IP_Address']
            hostname = row['SysName']
            device_type = row['device_type']
            department = row['Department']
            switch = {
                'ip': row['IP_Address'],
                'hostname': row['SysName'],
                'device_type': row['device_type'],
                'department': row['Department']
            }

            dbf = Query()
            resultf = db.search(dbf.ip == row['IP_Address'])
            if str(resultf) != "[]":
                print("Skipping " + row['IP_Address'] + " as it already exists.")
            else:
                db.insert(switch)
                print ("Added " + row['IP_Address'])
项目:dontwi    作者:vocalodon    | 项目源码 | 文件源码
def has_result_of_status(self, status, results):
        inbound_str = self.items["operation"]["inbound"]
        query = Query()
        result_q = reduce(or_, [
            query.result == a_result for a_result in results])
        querys = [query.inbound == inbound_str,
                  query.inbound_status_id == status.get_status_id(), result_q]
        combined_query = reduce(and_, querys)
        return self.search_db(combined_query)
项目:dontwi    作者:vocalodon    | 项目源码 | 文件源码
def get_result_summaries_by_status(self, status):
        inbound_str = self.items["operation"]["inbound"]
        query = Query()
        combined_query = (query.inbound == inbound_str) & (
            query.inbound_status_id == status.get_status_id())
        return self.search_db(combined_query)
项目:dontwi    作者:vocalodon    | 项目源码 | 文件源码
def get_result_summaries_by_results(self, results):
        query = Query()
        querys = [query.result == a_result for a_result in results]
        combined_query = reduce(or_, querys)
        return self.search_db(combined_query)
项目:sketal    作者:vk-brain    | 项目源码 | 文件源码
def get_user(self, user_id):
        User = Query()

        user = self.tinydb.get(User.user_id == user_id)

        return user["data"] if user else {}
项目:sketal    作者:vk-brain    | 项目源码 | 文件源码
def save_user(self, user_id, data):
        User = Query()

        if not self.tinydb.update({'user_id': user_id, 'data': data}, User.user_id == user_id):
            self.tinydb.insert({'user_id': user_id, 'data': data})
项目:sketal    作者:vk-brain    | 项目源码 | 文件源码
def delete_user(self, user_id):
        User = Query()

        self.tinydb.remove(User.user_id == user_id)
项目:telegram-xkcd-password-generator    作者:MasterGroosha    | 项目源码 | 文件源码
def user_exists(user_id):
    return True if len(db.search(Query().user_id == user_id)) > 0 else False
项目:telegram-xkcd-password-generator    作者:MasterGroosha    | 项目源码 | 文件源码
def get_person(user_id):
    # Check if user exists
    S = Query()
    person = db.search(S.user_id == user_id)
    if len(person) is 0:
        usr = {"user_id": user_id,
               "word_count": DEFAULT_WORD_COUNT,
               "prefixes": DEFAULT_PREFIX_SUFFIX,
               "separators": DEFAULT_SEPARATOR}
        db.insert(usr)
        return usr
    return person[0]
项目:telegram-xkcd-password-generator    作者:MasterGroosha    | 项目源码 | 文件源码
def change_word_count(user_id, increase):
    S = Query()
    if increase:
        db.update(increment("word_count"), S.user_id == user_id)
    else:
        db.update(decrement("word_count"), S.user_id == user_id)
    return db.search(S.user_id == user_id)[0]
项目:telegram-xkcd-password-generator    作者:MasterGroosha    | 项目源码 | 文件源码
def change_prefixes(user_id, enable_prefixes):
    S = Query()
    if enable_prefixes:
        db.update({"prefixes": True}, S.user_id == user_id)
    else:
        db.update({"prefixes": False}, S.user_id == user_id)
    return db.search(S.user_id == user_id)[0]
项目:telegram-xkcd-password-generator    作者:MasterGroosha    | 项目源码 | 文件源码
def change_separators(user_id, enable_separators):
    S = Query()
    if enable_separators:
        db.update({"separators": True}, S.user_id == user_id)
    else:
        db.update({"separators": False}, S.user_id == user_id)
    return db.search(S.user_id == user_id)[0]
项目:demo-day-vikings    作者:Mester    | 项目源码 | 文件源码
def update_score(db, json_object):
    """
    Method to update the score of the song
    """
    q = Query()
    j = db.search(q.url == json_object['url'])[0]
    logger.debug("Duplicate post found: {}".format(j['title'].encode('ascii', 'ignore')))
    logger.debug("Old Score: {}".format(j['score']))
    logger.debug("Now Score: {}".format(json_object['score']))
    if int(j['score']) != int(json_object['score']):
        logger.debug("Updating Score for {}".format(j['title'].encode('ascii', 'ignore')))
        db.update({'score':json_object['score']}, q.url == json_object['url'])
    else:
        logger.debug("The scores are still the same, not updating")
项目:demo-day-vikings    作者:Mester    | 项目源码 | 文件源码
def is_json_unique(db, json_object):
    """
    Checks if the url of this post is present in 
    the database already
    """
    q = Query()
    if len(db.search(q.url == json_object['url'])) == 0:
        return True
    return False
项目:stf-selector    作者:RedQA    | 项目源码 | 文件源码
def where(key):
    return Query([key])
项目:EasyStorj    作者:lakewik    | 项目源码 | 文件源码
def get_public_download_indicators(self, public_download_hash_url):
        query = Query()
        public_download_indicators = self.table.search(query.public_download_hash_url.search(public_download_hash_url))

        return public_download_indicators
项目:EasyStorj    作者:lakewik    | 项目源码 | 文件源码
def get_public_file_details_by_local_hash(self, local_file_hash):
        query = Query()
        public_download_indicators = self.table.search(query.public_download_hash.search(local_file_hash))

        return public_download_indicators
项目:simple-environment-monitor-system    作者:diegorubin    | 项目源码 | 文件源码
def load(self, label):
        query = Query()
        self.__dict__.update(self.table.search(query.label == label)[0])
项目:simple-environment-monitor-system    作者:diegorubin    | 项目源码 | 文件源码
def save(self):
        attributes = self.get_attributes()
        if self.__new_record__():
            self.table.insert(attributes)
        else:
            query = Query()
            self.table.update(attributes, query.label == self.label)
        return True
项目:simple-environment-monitor-system    作者:diegorubin    | 项目源码 | 文件源码
def destroy(self):
        query = Query()
        self.table.remove(eids=[self.table.search(query.label == self.label)[0].eid])
项目:simple-environment-monitor-system    作者:diegorubin    | 项目源码 | 文件源码
def __new_record__(self):
        query = Query()
        return len(self.table.search(query.label == self.label)) == 0
项目:G-Scout    作者:nccgroup    | 项目源码 | 文件源码
def add_network_rules(projectId, db):
    for firewall in db.table('Firewall').all():
        if not firewall.get('sourceRanges'):
            firewall['sourceRanges'] = firewall['sourceTags']
        db.table('Network').update(
                    add_rule({
                        "name":firewall['name'], 
                        "allowed":firewall['allowed'],
                        "sourceRanges":firewall['sourceRanges'],
                        "tags":firewall.get('targetTags')
                        }),
                    eids=[db.table('Network').get(
                        Query().selfLink==firewall['network']
                        ).eid])
项目:G-Scout    作者:nccgroup    | 项目源码 | 文件源码
def add_affected_instances(projectId, db):
    for firewall in db.table('Firewall').all():
        try:
            for instance in db.table('Network').get(Query().selfLink==firewall['network'])['members']:
                try:
                    if not firewall.get('targetTags'):
                        db.table('Firewall').update(
                        add_instance({
                        "kind":instance['kind'],
                        "selfLink":instance['selfLink'],
                        "tags":instance.get('tags'),
                        "name":instance['name']
                        }),eids=[firewall.eid])
                    try:
                        for tag in instance.get('tags'):
                            if tag in firewall.get('targetTags'):
                                db.table('Firewall').update(
                            add_instance({
                            "kind":instance['kind'],
                            "selfLink":instance['selfLink'],
                            "tags":instance.get('tags'),
                            "name":instance['name']
                            }),eids=[firewall.eid])
                    except TypeError:
                        continue
                except KeyError:
                    continue
        except KeyError:
            continue

# Function to pass Tinydb for the update query
项目:G-Scout    作者:nccgroup    | 项目源码 | 文件源码
def add_finding(db,entity_table, entity_id, rule_title):
    finding_table = db.table('Finding')
    rule_table = db.table('Rule')
    finding_table.insert({
                "entity": {"table":entity_table,"id":entity_id} ,
                "rule": {"table":"rule","id":rule_table.search(Query().title==rule_title)[0].eid}
                })
项目:aws-pricing-tools    作者:concurrencylabs    | 项目源码 | 文件源码
def calculate(pdim):

  log.info("Calculating DynamoDB pricing with the following inputs: {}".format(str(pdim.__dict__)))

  ts = phelper.Timestamp()
  ts.start('totalCalculationKinesis')

  dbs, indexMetadata = phelper.loadDBs(consts.SERVICE_KINESIS, phelper.get_partition_keys(pdim.region,consts.SCRIPT_TERM_TYPE_ON_DEMAND))

  cost = 0
  pricing_records = []

  awsPriceListApiVersion = indexMetadata['Version']
  priceQuery = tinydb.Query()

  kinesisDb = dbs[phelper.create_file_key([consts.REGION_MAP[pdim.region], consts.TERM_TYPE_MAP[pdim.termType], consts.PRODUCT_FAMILY_KINESIS_STREAMS])]

  #Shard Hours
  query = ((priceQuery['Group'] == 'Provisioned shard hour'))
  pricing_records, cost = phelper.calculate_price(consts.SERVICE_KINESIS, kinesisDb, query, pdim.shardHours, pricing_records, cost)

  #PUT Payload Units
  query = ((priceQuery['Group'] == 'Payload Units'))
  pricing_records, cost = phelper.calculate_price(consts.SERVICE_KINESIS, kinesisDb, query, pdim.putPayloadUnits, pricing_records, cost)

  #Extended Retention Hours
  query = ((priceQuery['Group'] == 'Addon shard hour'))
  pricing_records, cost = phelper.calculate_price(consts.SERVICE_KINESIS, kinesisDb, query, pdim.extendedDataRetentionHours, pricing_records, cost)

  #TODO: add Enhanced (shard-level) metrics

  #Data Transfer - N/A
  #Note there is no charge for data transfer in Kinesis as per https://aws.amazon.com/kinesis/streams/pricing/

  pricing_result = PricingResult(awsPriceListApiVersion, pdim.region, cost, pricing_records)
  log.debug(json.dumps(vars(pricing_result),sort_keys=False,indent=4))

  log.debug("Total time to compute: [{}]".format(ts.finish('totalCalculationKinesis')))
  return pricing_result.__dict__
项目:aws-pricing-tools    作者:concurrencylabs    | 项目源码 | 文件源码
def calculate(pdim):

  log.info("Calculating DynamoDB pricing with the following inputs: {}".format(str(pdim.__dict__)))

  ts = phelper.Timestamp()
  ts.start('totalCalculationDynamoDB')

  #Load On-Demand DBs
  dbs, indexMetadata = phelper.loadDBs(consts.SERVICE_DYNAMODB, phelper.get_partition_keys(pdim.region,consts.SCRIPT_TERM_TYPE_ON_DEMAND))

  cost = 0
  pricing_records = []

  awsPriceListApiVersion = indexMetadata['Version']
  priceQuery = tinydb.Query()

  #TODO:add support for free-tier flag (include or exclude from calculation)

  iopsDb = dbs[phelper.create_file_key([consts.REGION_MAP[pdim.region], consts.TERM_TYPE_MAP[pdim.termType], consts.PRODUCT_FAMILY_DB_PIOPS])]

  #Read Capacity Units
  query = ((priceQuery['Group'] == 'DDB-ReadUnits'))
  pricing_records, cost = phelper.calculate_price(consts.SERVICE_DYNAMODB, iopsDb, query, pdim.readCapacityUnitHours, pricing_records, cost)

  #Write Capacity Units
  query = ((priceQuery['Group'] == 'DDB-WriteUnits'))
  pricing_records, cost = phelper.calculate_price(consts.SERVICE_DYNAMODB, iopsDb, query, pdim.writeCapacityUnitHours, pricing_records, cost)

  #DB Storage (TODO)

  #Data Transfer (TODO)
  #there is no additional charge for data transferred between Amazon DynamoDB and other Amazon Web Services within the same Region
  #data transferred across Regions (e.g., between Amazon DynamoDB in the US East (Northern Virginia) Region and Amazon EC2 in the EU (Ireland) Region), will be charged on both sides of the transfer.

  #API Requests (only applies for DDB Streams)(TODO)

  pricing_result = PricingResult(awsPriceListApiVersion, pdim.region, cost, pricing_records)
  log.debug(json.dumps(vars(pricing_result),sort_keys=False,indent=4))

  log.debug("Total time to compute: [{}]".format(ts.finish('totalCalculationDynamoDB')))
  return pricing_result.__dict__
项目:househunt    作者:althor880    | 项目源码 | 文件源码
def listing_in_cache(self, listing):
        lquery = Query()
        return self.db.contains(lquery.hsh == listing.hsh)
项目:househunt    作者:althor880    | 项目源码 | 文件源码
def retrieve_listing(self, listing):
        lquery = Query()
        list_dict = self.db.get(lquery.hsh == listing.hsh)
        return Listing.from_dict(list_dict)
项目:househunt    作者:althor880    | 项目源码 | 文件源码
def remove_listing(self, listing):
        lquery = Query()
        self.db.remove(lquery.hsh == listing.hsh)
项目:househunt    作者:althor880    | 项目源码 | 文件源码
def update_listing(self, listing):
        lquery = Query()
        if self.listing_in_cache(listing):
            self.remove_listing(listing)
        self.insert_listing(listing)
项目:aDTN-python    作者:megfault    | 项目源码 | 文件源码
def add_object(self, data):
        """
        Attempt to insert a data object into the store. If it does not exist, it gets initialized. Otherwise the
        statistics are updated by increasing the receive count and the time of the last reception if the message has
        not been flagged as deleted.
        :param data: data object to store
        """
        idx = hash_string(data)
        now = int(time.time())
        with self.lock:
            Stats = Query()
            res = self.stats.search(Stats.idx == idx)
            if len(res) == 0:
                self.data.insert({'idx': idx, 'content': data})
                self.stats.insert({'idx': idx,
                                   'first_seen': now,
                                   'receive_count': 0,
                                   'send_count': 0,
                                   'last_received': None,
                                   'last_sent': None,
                                   'deleted': False})
                log_debug("Data object created: {}".format(data))
            else:
                deleted = res[0]['deleted']
                if deleted:
                    log_debug("Received deleted data object: {}".format(data))
                self.stats.update({'last_received': now}, Stats.idx == idx)
                self.stats.update(increment('receive_count'), Stats.idx == idx)
                log_debug("Data object updated: {}".format(data))
项目:sodogetip    作者:just-an-dev    | 项目源码 | 文件源码
def update_data(self):
        db = TinyDB(config.vanitygen)
        vanity_db = Query()

        db.update(set("finish", True), vanity_db.id == self.id)
        db.update(set("difficulty", self.difficulty), vanity_db.id == self.id)
        db.update(set("duration", self.duration), vanity_db.id == self.id)
项目:sodogetip    作者:just-an-dev    | 项目源码 | 文件源码
def remove_pending_tip(id_tip):
    db = TinyDB(config.unregistered_tip_user)
    tip = Query()
    db.remove(tip.id == id_tip)
    db.close()
项目:tus-py-client    作者:tus    | 项目源码 | 文件源码
def __init__(self, fp):
        self._db = TinyDB(fp)
        self._urls = Query()
项目:lainonlife    作者:barrucadu    | 项目源码 | 文件源码
def get_dj_info(username):
    check_query = Query()
    check_if_user_exists = THE_DB.search(check_query.id == username)
    if len(check_if_user_exists) == 0:
        return
    return check_if_user_exists[0]
项目:lainonlife    作者:barrucadu    | 项目源码 | 文件源码
def update_dj_info(username, form_dict):
    check_query = Query()
    check_if_user_exists = THE_DB.search(check_query.id == username)
    if len(check_if_user_exists) == 0:
        return False
    # trust no one, even if someone modified their response we don't want them to
    # most of these require different levels of permission
    dont_touch = ['admin', 'banned', 'id', 'password']
    for k in dont_touch:
        if k in form_dict:
            del form_dict[k]
    THE_DB.update(form_dict, check_query.id == username)
    return True
项目:lainonlife    作者:barrucadu    | 项目源码 | 文件源码
def update_dj_status(username, status_key, new_status):
    check_query = Query()
    check_if_user_exists = THE_DB.search(check_query.id == username)
    if len(check_if_user_exists) == 0:
        return
    THE_DB.update({status_key: new_status}, check_query.id == username)
    return new_status