我们从Python开源项目中,提取了以下42个代码示例,用于说明如何使用sqlalchemy.sql.expression.desc()。
def query_records(build, classification, severity, machine_id, limit, interval_sec=None): records = Record.query if build is not None: records = records.join(Record.build).filter_by(build=build) if classification is not None: records = records.join(Record.classification).filter_by(classification=classification) if severity is not None: records = records.filter(Record.severity == severity) if machine_id is not None: records = records.filter(Record.machine_id == machine_id) if interval_sec is not None: current_time = time() secs_in_past = current_time - interval_sec # Due to time skew on client systems, delayed sends due to # spooling, etc, tsp_server works better as the reference # timestamp. records = records.filter(Record.tsp_server > secs_in_past) records = records.order_by(Record.id.desc()) if limit is not None: records = records.limit(limit) return records.all()
def get_top_crash_guilties(classes=None): q = db.session.query(Guilty.function, Guilty.module, Build.build, db.func.count(Record.id).label('total'), Guilty.id, Guilty.comment) q = q.join(Record) q = q.join(Build) q = q.join(Classification).filter(Record.classification_id == Classification.id) if not classes: classes = ['org.clearlinux/crash/clr'] q = q.filter(Classification.classification.in_(classes)) q = q.filter(Build.build.op('~')('^[0-9][0-9]+$')) q = q.filter(Guilty.hide == False) q = q.group_by(Guilty.function, Guilty.module, Guilty.comment, Guilty.id, Build.build) q = q.order_by(desc(cast(Build.build, db.Integer)), desc('total')) # query for records created in the last week (~ 10 Clear builds) q = q.filter(Build.build.in_(sorted(tuple(set([x[2] for x in q.all()])), key=lambda x: int(x))[-8:])) interval_sec = 24 * 60 * 60 * 7 current_time = time() sec_in_past = current_time - interval_sec q = q.filter(Record.tsp > sec_in_past) return q.all()
def _sortQuery(self): ''' Sort the query by a given parameter ''' if not isinstance(self.sort, type(None)): # set the sort variable ModelClass parameter sortparam = self.marvinform._param_form_lookup.mapToColumn(self.sort) # If order is specified, then do the sort if self.order: assert self.order in ['asc', 'desc'], 'Sort order parameter must be either "asc" or "desc"' # Check if order by already applied if 'ORDER' in str(self.query.statement): self.query = self.query.order_by(None) # Do the sorting if 'desc' in self.order: self.query = self.query.order_by(desc(sortparam)) else: self.query = self.query.order_by(sortparam)
def server_fault_get_by_server_uuids(self, context, server_uuids): """Get all server faults for the provided server_uuids.""" if not server_uuids: return {} rows = model_query(context, models.ServerFault).\ filter(models.ServerFault.server_uuid.in_(server_uuids)).\ order_by(desc("created_at"), desc("id")).all() output = {} for server_uuid in server_uuids: output[server_uuid] = [] for row in rows: data = dict(row) output[row['server_uuid']].append(data) return output
def instance_fault_get_by_instance_uuids(context, instance_uuids): """Get all instance faults for the provided instance_uuids.""" if not instance_uuids: return {} rows = model_query(context, models.InstanceFault, read_deleted='no').\ filter(models.InstanceFault.instance_uuid.in_( instance_uuids)).\ order_by(desc("created_at"), desc("id")).\ all() output = {} for instance_uuid in instance_uuids: output[instance_uuid] = [] for row in rows: data = dict(row) output[row['instance_uuid']].append(data) return output ##################
def list_web_hook(self): session = SessionManager.Session() try: web_hook_list = session.query(WebHook).\ options(joinedload(WebHook.created_by)).\ order_by(desc(getattr(WebHook, 'register_time'))).\ all() web_hook_dict_list = [] for web_hook in web_hook_list: web_hook_dict = row2dict(web_hook) web_hook_dict.pop('shared_secret', None) self.__process_user_obj_in_web_hook(web_hook, web_hook_dict) web_hook_dict_list.append(web_hook_dict) return json_resp({ 'data': web_hook_dict_list, 'total': len(web_hook_list) }) finally: SessionManager.Session.remove()
def get_top_player(self): return Database.session.query(MatchSummary).join(MatchSummary.player). \ order_by(desc(MatchSummary.matches)). \ order_by(desc(MatchSummary.player_win)). \ limit(LIMIT_DATA).all()
def get_match_hero_summary(self, account_id): return Database.session.query(MatchHeroSummary).join(MatchHeroSummary.hero). \ filter(MatchHeroSummary.account_id == account_id). \ order_by(desc(MatchHeroSummary.matches)). \ order_by(desc(MatchHeroSummary.player_win)). \ limit(LIMIT_DATA).all()
def get_match_item_summary(self, account_id): return Database.session.query(MatchItemSummary).join(MatchItemSummary.item). \ filter(MatchItemSummary.account_id == account_id). \ order_by(desc(MatchItemSummary.matches)). \ order_by(desc(MatchItemSummary.player_win)). \ limit(LIMIT_DATA).all()
def get_last_history(self): return Database.session.query(History).order_by(desc(History.id)).first()
def _build_order_expressions(self, criteria, relationships): """ :param criteria: criteria dictionary :type criteria: dict :param relationships: a dict with all joins to apply, describes current state in recurrent calls :type relationships: dict :return: expressions list :rtype: list """ expressions = [] if isinstance(criteria, dict): criteria = list(criteria.items()) for arg in criteria: if isinstance(arg, tuple): arg, value = arg else: value = None is_ascending = True if len(arg) and arg[0] == '+' or arg[0] == '-': is_ascending = arg[:1] == '+' arg = arg[1:] expression = self._parse_tokens(self.objects_class, arg.split('__'), value, relationships, lambda c, n, v: n) if expression is not None: expressions.append(expression if is_ascending else desc(expression)) return expressions
def actions_get(self, context, container_uuid): """Get all container actions for the provided uuid.""" query = model_query(models.ContainerAction).\ filter_by(container_uuid=container_uuid) actions = _paginate_query(models.ContainerAction, sort_dir='desc', sort_key='created_at', query=query) return actions
def _action_get_last_created_by_container_uuid(self, context, container_uuid): result = model_query(models.ContainerAction).\ filter_by(container_uuid=container_uuid).\ order_by(desc("created_at"), desc("id")).\ first() return result
def action_events_get(self, context, action_id): query = model_query(models.ContainerActionEvent).\ filter_by(action_id=action_id) events = _paginate_query(models.ContainerActionEvent, sort_dir='desc', sort_key='created_at', query=query) return events
def filter_records(build, classification, severity, machine_id=None, os_name=None, limit=None, from_date=None, to_date=None, payload=None, not_payload=None, data_source=None): records = Record.query if build is not None: records = records.join(Record.build).filter_by(build=build) if classification is not None: if isinstance(classification, list): records = records.join(Record.classification).filter(Classification.classification.in_(classification)) else: records = records.join(Record.classification).filter(Classification.classification.like(classification)) if severity is not None: records = records.filter(Record.severity == severity) if os_name is not None: records = records.filter(Record.os_name == os_name) if machine_id is not None: records = records.filter(Record.machine_id == machine_id) if from_date is not None: from_date = mktime(strptime(from_date, "%Y-%m-%d")) records = records.filter(Record.tsp >= from_date) if to_date is not None: to_date = mktime(strptime(to_date, "%Y-%m-%d")) records = records.filter(Record.tsp < to_date) if payload is not None: records = records.filter(Record.backtrace.op('~')(payload)) if not_payload is not None: records = records.filter(~Record.backtrace.op('~')(not_payload)) if data_source is not None: if data_source == "external": records = records.filter(Record.external == True) elif data_source == "internal": records = records.filter(Record.external == False) records = records.order_by(Record.id.desc()) if limit is not None: records = records.limit(limit) return records
def get_recordcnts_by_classification(): q = db.session.query(Classification.classification, db.func.count(Record.id).label('total')) q = q.join(Record.classification) q = q.group_by(Classification.classification) q = q.order_by(desc('total')) return q.all()
def get_recordcnts_by_machine_type(): q = db.session.query(Record.machine, db.func.count(Record.id).label('total')) q = q.group_by(Record.machine) q = q.order_by(desc('total')) return q.all()
def get_crashcnts_by_build(classes=None): q = db.session.query(Build.build, db.func.count(Record.id)).join(Record).join(Classification) if not classes: classes = ['org.clearlinux/crash/clr'] q = q.filter(Classification.classification.in_(classes)) q = q.filter(Build.build.op('~')('^[0-9]+$')) q = q.group_by(Build.build) q = q.order_by(desc(cast(Build.build, db.Integer))) q = q.limit(10) return q.all()
def get_machine_ids_for_guilty(id, most_recent=None): q = db.session.query(Build.build, Record.machine_id, db.func.count(Record.id).label('total'), Record.guilty_id) q = q.join(Record) q = q.filter(Record.guilty_id == id) q = q.filter(Record.os_name == 'clear-linux-os') q = q.filter(Build.build.op('~')('^[0-9][0-9]+$')) q = q.group_by(Build.build, Record.machine_id, Record.guilty_id) q = q.order_by(desc(cast(Build.build, db.Integer)), desc('total')) if most_recent: interval_sec = 24 * 60 * 60 * int(most_recent) current_time = time() sec_in_past = current_time - interval_sec q = q.filter(Record.tsp > sec_in_past) return q.all()
def get_swupd_msgs(most_recent=None): q = db.session.query(Record.tsp, Record.machine_id, Record.backtrace).join(Classification) q = q.filter(Classification.classification.like('org.clearlinux/swupd-client/%')) if most_recent: interval_sec = 24 * 60 * 60 * int(most_recent) current_time = time() sec_in_past = current_time - interval_sec q = q.filter(Record.tsp > sec_in_past) q = q.order_by(desc(Record.tsp)) return q
def __query__(self): if self.order == self.DESCENDANT: query = desc(self.attribute) elif self.order == self.ASCENDANT: query = asc(self.attribute) return query
def test_activation_mixin(self): activated_student = Student() activated_student.name = 'activated-student' activated_student.activated_at = datetime.now() DBSession.add(activated_student) deactivated_student = Student() deactivated_student.name = 'deactivated-student' deactivated_student.activated_at = None DBSession.add(deactivated_student) DBSession.commit() # Test ordering: student_list = Student.query.order_by(desc(Student.is_active)).all() self.assertIsNotNone(student_list[0].activated_at) self.assertIsNone(student_list[-1].activated_at) student_list = Student.query.order_by(asc(Student.is_active)).all() self.assertIsNotNone(student_list[-1].activated_at) self.assertIsNone(student_list[0].activated_at) # Test filtering: student_list = Student.query.filter(Student.is_active).all() for student in student_list: self.assertIsNotNone(student.activated_at) student_list = Student.query.filter(not_(Student.is_active)).all() for student in student_list: self.assertIsNone(student.activated_at)
def actions_get(context, instance_uuid): """Get all instance actions for the provided uuid.""" actions = model_query(context, models.InstanceAction).\ filter_by(instance_uuid=instance_uuid).\ order_by(desc("created_at"), desc("id")).\ all() return actions
def _action_get_last_created_by_instance_uuid(context, instance_uuid): result = (model_query(context, models.InstanceAction). filter_by(instance_uuid=instance_uuid). order_by(desc("created_at"), desc("id")). first()) return result
def action_events_get(context, action_id): events = model_query(context, models.InstanceActionEvent).\ filter_by(action_id=action_id).\ order_by(desc("created_at"), desc("id")).\ all() return events
def recent_update(self, days): current = datetime.now() # from one week ago start_time = current - timedelta(days=days) session = SessionManager.Session() try: result = session.query(Episode, Bangumi).\ join(Bangumi).\ filter(Episode.delete_mark == None).\ filter(Episode.status == Episode.STATUS_DOWNLOADED).\ filter(Episode.update_time >= start_time).\ filter(Episode.update_time <= current).\ order_by(desc(Episode.update_time)) episode_list = [] for eps, bgm in result: episode = row2dict(eps) episode['thumbnail'] = utils.generate_thumbnail_link(eps, bgm) episode['bangumi'] = row2dict(bgm) episode['bangumi']['cover'] = utils.generate_cover_link(bgm) episode_list.append(episode) return json_resp({'data': episode_list}) finally: SessionManager.Session.remove()
def list_episode(self, page, count, sort_field, sort_order, status): try: session = SessionManager.Session() query_object = session.query(Episode).\ filter(Episode.delete_mark == None) if status is not None: query_object = query_object.filter(Episode.status==status) # count total rows total = session.query(func.count(Episode.id)).filter(Episode.status==status).scalar() else: total = session.query(func.count(Episode.id)).scalar() offset = (page - 1) * count if sort_order == 'desc': episode_list = query_object.\ order_by(desc(getattr(Episode, sort_field))).\ offset(offset).\ limit(count).\ all() else: episode_list = query_object.\ order_by(asc(getattr(Episode, sort_field))).\ offset(offset).limit(count).\ all() episode_dict_list = [row2dict(episode) for episode in episode_list] return json_resp({'data': episode_dict_list, 'total': total}) finally: SessionManager.Session.remove()
def _build_total_expressions(self, queryset, totals): mapper = inspect(self.objects_class) primary_keys = mapper.primary_key relationships = { 'aliases': {}, 'join_chains': [], 'prefix': 'totals_', } aggregates = [] group_cols = OrderedDict() group_by = [] group_limit = None for total in totals: for aggregate, columns in total.items(): if aggregate == self.AGGR_GROUPLIMIT: if not isinstance(columns, int): raise HTTPBadRequest('Invalid attribute', 'Group limit option requires an integer value') group_limit = columns continue if not columns: if aggregate == self.AGGR_GROUPBY: raise HTTPBadRequest('Invalid attribute', 'Group by option requires at least one column name') if len(primary_keys) > 1: aggregates.append(Function(aggregate, func.row(*primary_keys)).label(aggregate)) else: aggregates.append(Function(aggregate, *primary_keys).label(aggregate)) continue if not isinstance(columns, list): columns = [columns] for column in columns: expression = self._parse_tokens(self.objects_class, column.split('__'), None, relationships, lambda c, n, v: n) if expression is not None: if aggregate == self.AGGR_GROUPBY: group_cols[column] = expression.label(column) group_by.append(expression) else: aggregates.append(Function(aggregate, expression).label(aggregate)) agg_query = self._apply_joins(queryset, relationships, distinct=False) group_cols_expr = list(group_cols.values()) columns = group_cols_expr + aggregates if group_limit: row_order = list(map(lambda c: c.desc(), aggregates)) columns.append(func.row_number().over(partition_by=group_cols_expr[:-1], order_by=row_order).label('row_number')) order = ','.join(list(map(str, range(1, len(group_cols_expr) + 1))) + list(map(lambda c: str(c) + ' DESC', range(1 + len(group_cols_expr), len(aggregates) + len(group_cols_expr) + 1)))) agg_query = agg_query.statement.with_only_columns(columns).order_by(None).order_by(order) if group_by: agg_query = agg_query.group_by(*group_by) if group_limit: subquery = agg_query.alias() agg_query = select([subquery]).where(subquery.c.row_number <= group_limit) return agg_query, list(group_cols.keys())
def list_bangumi(self, page, count, sort_field, sort_order, name, user_id, bangumi_type): try: session = SessionManager.Session() query_object = session.query(Bangumi).\ options(joinedload(Bangumi.cover_image)).\ filter(Bangumi.delete_mark == None) if bangumi_type != -1: query_object = query_object.filter(Bangumi.type == bangumi_type) if name is not None: name_pattern = '%{0}%'.format(name.encode('utf-8'),) logger.debug(name_pattern) query_object = query_object.\ filter(or_(Bangumi.name.ilike(name_pattern), Bangumi.name_cn.ilike(name_pattern))) # count total rows total = session.query(func.count(Bangumi.id)).\ filter(or_(Bangumi.name.ilike(name_pattern), Bangumi.name_cn.ilike(name_pattern))).\ scalar() else: total = session.query(func.count(Bangumi.id)).scalar() if sort_order == 'desc': query_object = query_object.\ order_by(desc(getattr(Bangumi, sort_field))) else: query_object = query_object.\ order_by(asc(getattr(Bangumi, sort_field))) if count == -1: bangumi_list = query_object.all() else: offset = (page - 1) * count bangumi_list = query_object.offset(offset).limit(count).all() bangumi_id_list = [bgm.id for bgm in bangumi_list] favorites = session.query(Favorites).\ filter(Favorites.bangumi_id.in_(bangumi_id_list)).\ filter(Favorites.user_id == user_id).\ all() bangumi_dict_list = [] for bgm in bangumi_list: bangumi = row2dict(bgm) bangumi['cover'] = utils.generate_cover_link(bgm) utils.process_bangumi_dict(bgm, bangumi) for fav in favorites: if fav.bangumi_id == bgm.id: bangumi['favorite_status'] = fav.status bangumi_dict_list.append(bangumi) return json_resp({'data': bangumi_dict_list, 'total': total}) finally: SessionManager.Session.remove()
def list_bangumi(self, page, count, sort_field, sort_order, name, bangumi_type): try: session = SessionManager.Session() query_object = session.query(Bangumi).\ options(joinedload(Bangumi.cover_image)).\ options(joinedload(Bangumi.created_by)).\ options(joinedload(Bangumi.maintained_by)).\ filter(Bangumi.delete_mark == None) if bangumi_type != -1: query_object = query_object.filter(Bangumi.type == bangumi_type) if name is not None: name_pattern = '%{0}%'.format(name.encode('utf-8'),) logger.debug(name_pattern) query_object = query_object.\ filter(or_(Bangumi.name.ilike(name_pattern), Bangumi.name_cn.ilike(name_pattern))) # count total rows total = session.query(func.count(Bangumi.id)).\ filter(or_(Bangumi.name.ilike(name_pattern), Bangumi.name_cn.ilike(name_pattern))).\ scalar() else: total = session.query(func.count(Bangumi.id)).scalar() if sort_order == 'desc': query_object = query_object.\ order_by(desc(getattr(Bangumi, sort_field))) else: query_object = query_object.\ order_by(asc(getattr(Bangumi, sort_field))) # we now support query all method by passing count = -1 if count == -1: bangumi_list = query_object.all() else: offset = (page - 1) * count bangumi_list = query_object.offset(offset).limit(count).all() bangumi_dict_list = [] for bgm in bangumi_list: bangumi = row2dict(bgm) bangumi['cover'] = utils.generate_cover_link(bgm) utils.process_bangumi_dict(bgm, bangumi) self.__process_user_obj_in_bangumi(bgm, bangumi) bangumi_dict_list.append(bangumi) return json_resp({'data': bangumi_dict_list, 'total': total}) # raise ClientError('something happened') finally: SessionManager.Session.remove()