我们从Python开源项目中,提取了以下16个代码示例,用于说明如何使用django.db.models.expressions.RawSQL()。
def annotate_running_scans_count(self) -> 'ScanListQuerySet': return self.annotate( running_scans__count=RawSQL(''' SELECT COUNT("{Scan}"."id") FROM "{Scan}" WHERE "{Scan}"."end" IS NULL AND "{Scan}"."site_id" IN (SELECT "{Site_ScanLists}"."site_id" FROM "{Site_ScanLists}" WHERE "{Site_ScanLists}"."scanlist_id" = "{ScanList}"."id" GROUP BY "{Site_ScanLists}"."site_id") '''.format( Scan=Scan._meta.db_table, Site_ScanLists=Site.scan_lists.through._meta.db_table, ScanList=ScanList._meta.db_table), ()))
def annotate_most_recent_scan_error_count(self) -> 'ScanListQuerySet': return self.annotate( last_scan__error_count=RawSQL(''' SELECT COUNT("id") FROM "{ScanError}" WHERE "{ScanError}"."scan_id" = "{Site}"."last_scan_id" '''.format( Scan=Scan._meta.db_table, Site=Site._meta.db_table, ScanError=ScanError._meta.db_table), ()))
def products_changed_handler(sender, **kwargs): """ ?????????? ??????? ????????? ???-?? ??????? ?????????, ??????????? ??????????????? ? ?????????. ???????????? ??? ?????????? ????????? ??????? ? ?????? ?????????. """ categories = kwargs.get('categories') if isinstance(categories, ShopCategory): # ????????? ????????? categories = ShopCategory.objects.filter(pk=categories.pk) elif isinstance(categories, (int, str)): # ?????? ??? ?????, ?????????? ID ????????? categories = ShopCategory.objects.filter(pk=categories) elif isinstance(categories, (list, tuple, set, ValuesListQuerySet)): # ?????? ????? ??? ?????, ?????????? ID ????????? categories = ShopCategory.objects.filter(pk__in=categories) elif isinstance(categories, QuerySet) and categories.model is ShopCategory: # QuerySet ????????? pass else: raise TypeError('Invalid categories for signal "products_changed"') with transaction.atomic(): categories.update( product_count=RawSQL( '(SELECT COUNT(*) ' 'FROM shop_shopproduct AS ssp ' 'WHERE ssp.category_id = shop_shopcategory.id ' 'AND ssp.is_visible = TRUE)', () ) ) categories.update( total_product_count=F('product_count') ) categories_changed.send(ShopCategory, categories=categories)
def priority_ordering(self, request, queryset): kw_param = request.query_params.get('topics__id__in', '') topics = tuple(int(kw) for kw in kw_param.split(',') if kw.isdigit()) if topics: sql = """ SELECT count(*) FROM ( SELECT topic_id FROM reqs_requirement_topics WHERE topic_id IN %s AND requirement_id = reqs_requirement.id GROUP BY topic_id ) AS subq """ queryset = queryset.annotate(kw_count=RawSQL(sql, (topics,))) queryset = queryset.order_by('-kw_count', 'req_id') return queryset
def order_by_json_path(self, json_path, language_code=None, order='asc'): """ Orders a queryset by the value of the specified `json_path`. More about the `#>>` operator and the `json_path` arg syntax: https://www.postgresql.org/docs/current/static/functions-json.html More about Raw SQL expressions: https://docs.djangoproject.com/en/dev/ref/models/expressions/#raw-sql-expressions Usage example: MyModel.objects.language('en_us').filter(is_active=True).order_by_json_path('title') """ language_code = (language_code or self._language_code or self.get_language_key(language_code)) json_path = '{%s,%s}' % (language_code, json_path) # Our jsonb field is named `translations`. raw_sql_expression = RawSQL("translations#>>%s", (json_path,)) if order == 'desc': raw_sql_expression = raw_sql_expression.desc() return self.order_by(raw_sql_expression)
def annotate_most_recent_scan_start(self) -> 'SiteQuerySet': return self.annotate( last_scan__start=RawSQL(''' SELECT DISTINCT ON (site_id) "start" FROM "{Scan}" WHERE site_id={Site}."id" ORDER BY "site_id", "end" DESC NULLS FIRST LIMIT 1 '''.format( Scan=Scan._meta.db_table, Site=Site._meta.db_table, Site_ScanLists=Site.scan_lists.through._meta.db_table), ()))
def annotate_most_recent_scan_end_or_null(self) -> 'SiteQuerySet': return self.annotate( last_scan__end_or_null=RawSQL(''' SELECT DISTINCT ON (site_id) "end" FROM "{Scan}" WHERE site_id={Site}."id" ORDER BY "site_id", "end" DESC NULLS FIRST LIMIT 1 '''.format( Scan=Scan._meta.db_table, Site=Site._meta.db_table, Site_ScanLists=Site.scan_lists.through._meta.db_table), ()))
def annotate_most_recent_scan_result(self) -> 'SiteQuerSet': return self.annotate(last_scan__result=RawSQL(''' SELECT "{ScanResult}"."result" FROM "{ScanResult}" WHERE "{ScanResult}"."scan_id"="{Site}"."last_scan_id" LIMIT 1 '''.format( ScanResult=ScanResult._meta.db_table, Site=Site._meta.db_table), ()))
def send_reminder_messages(): now = timezone.now() tasks = (Task.objects.open() .filter(reminder_message_sent_at=None, reminder_message_timeout__isnull=False) .annotate(deadline=RawSQL('created_at + reminder_message_timeout', ())) .filter(deadline__lt=now)) for task in tasks: send_task_message(task, _('{task} still open'), 'still_open.txt') task.reminder_message_sent_at = now task.save(update_fields=('reminder_message_sent_at',))
def get_queryset(self): return ( Band.objects .annotate( user_joined=RawSQL( 'SELECT 1 FROM api_member ' 'WHERE api_member.band_id = api_band.id AND api_member.user_id = %s ' 'LIMIT 1', (self.request.user.id,) ) ) .annotate(compositions_count=Count('compositions', distinct=True)) .annotate(members_count=Count('members', distinct=True)) .select_related('leader__member__user') )
def get_queryset(self): queryset = super(LogEntryManager, self).get_queryset() return queryset.filter(Q(object_schema_id=None) | Q(object_schema_id=expressions.RawSQL('current_schema()', [])))
def get_queryset(self): """ By default, PostgreSQL will order INETs with shorter (larger) prefix lengths ahead of those with longer (smaller) masks. This makes no sense when ordering IPs, which should be ordered solely by family and host address. We can use HOST() to extract just the host portion of the address (ignoring its mask), but we must then re-cast this value to INET() so that records will be ordered properly. We are essentially re-casting each IP address as a /32 or /128. """ qs = super(IPAddressManager, self).get_queryset() return qs.annotate(host=RawSQL('INET(HOST(ipam_ipaddress.address))', [])).order_by('family', 'host')
def categories_changed_handler(sender, **kwargs): """ ?????????? ??????? ????????? ???-?? ??????? ?????????, ??????????? ? ????????? ? ?? ?????????????. ???????????? ??? ?????????? ????????? ??????? ? ?????? ?????????. """ categories = kwargs.get('categories') include_self = kwargs.get('include_self', True) if isinstance(categories, ShopCategory): # ????????? ????????? categories = ShopCategory.objects.filter(pk=categories.pk) elif isinstance(categories, (int, str)): # ?????? ??? ?????, ?????????? ID ????????? categories = ShopCategory.objects.filter(pk=categories) elif isinstance(categories, (list, tuple, set, ValuesListQuerySet)): # ?????? ????? ??? ?????, ?????????? ID ????????? categories = ShopCategory.objects.filter(pk__in=categories) elif isinstance(categories, QuerySet) and categories.model is ShopCategory: # QuerySet ????????? pass else: raise TypeError('Invalid categories for signal "categories_changed"') ancestors = categories.get_ancestors( include_self=include_self ).filter( is_visible=True ).order_by('tree_id', '-level').values_list('id', flat=True) with transaction.atomic(): for category_id in ancestors: ShopCategory.objects.filter(pk=category_id).update( total_product_count=RawSQL( 'SELECT shop_shopcategory.product_count + ' 'COALESCE(SUM(ssc.total_product_count), 0) ' 'FROM shop_shopcategory AS ssc ' 'WHERE ssc.parent_id = shop_shopcategory.id ' 'AND ssc.is_visible = TRUE', () ) )
def leaderboard(request, challenge_phase_split_id): """Returns leaderboard for a corresponding Challenge Phase Split""" # check if the challenge exists or not try: challenge_phase_split = ChallengePhaseSplit.objects.get( pk=challenge_phase_split_id) except ChallengePhaseSplit.DoesNotExist: response_data = {'error': 'Challenge Phase Split does not exist'} return Response(response_data, status=status.HTTP_400_BAD_REQUEST) # Check if the Challenge Phase Split is publicly visible or not if challenge_phase_split.visibility != ChallengePhaseSplit.PUBLIC: response_data = {'error': 'Sorry, leaderboard is not public yet for this Challenge Phase Split!'} return Response(response_data, status=status.HTTP_400_BAD_REQUEST) # Get the leaderboard associated with the Challenge Phase Split leaderboard = challenge_phase_split.leaderboard # Get the default order by key to rank the entries on the leaderboard try: default_order_by = leaderboard.schema['default_order_by'] except: response_data = {'error': 'Sorry, Default filtering key not found in leaderboard schema!'} return Response(response_data, status=status.HTTP_400_BAD_REQUEST) # Get all the successful submissions related to the challenge phase split leaderboard_data = LeaderboardData.objects.filter( challenge_phase_split=challenge_phase_split, submission__is_public=True, submission__is_flagged=False).order_by('created_at') leaderboard_data = leaderboard_data.annotate( filtering_score=RawSQL('result->>%s', (default_order_by, ), output_field=FloatField())).values( 'id', 'submission__participant_team__team_name', 'challenge_phase_split', 'result', 'filtering_score', 'leaderboard__schema') sorted_leaderboard_data = sorted(leaderboard_data, key=lambda k: float(k['filtering_score']), reverse=True) distinct_sorted_leaderboard_data = [] team_list = [] for data in sorted_leaderboard_data: if data['submission__participant_team__team_name'] in team_list: continue else: distinct_sorted_leaderboard_data.append(data) team_list.append(data['submission__participant_team__team_name']) leaderboard_labels = challenge_phase_split.leaderboard.schema['labels'] for item in distinct_sorted_leaderboard_data: item['result'] = [item['result'][index.lower()] for index in leaderboard_labels] paginator, result_page = paginated_queryset( distinct_sorted_leaderboard_data, request, pagination_class=StandardResultSetPagination()) response_data = result_page return paginator.get_paginated_response(response_data)
def from_search_query(self, search_query): """ Return queryset of objects from SearchQuery.results, **in order**. EXPERIMENTAL: this will only work with results from a single index, with a single doc_type - as we are returning a single QuerySet. This method takes the hits JSON and converts that into a queryset of all the relevant objects. The key part of this is the ordering - the order in which search results are returned is based on relevance, something that only ES can calculate, and that cannot be replicated in the database. It does this by adding custom SQL which annotates each record with the score from the search 'hit'. This is brittle, caveat emptor. The RawSQL clause is in the form: SELECT CASE {{model}}.id WHEN {{id}} THEN {{score}} END The "WHEN x THEN y" is repeated for every hit. The resulting SQL, in full is like this: SELECT "freelancer_freelancerprofile"."id", (SELECT CASE freelancer_freelancerprofile.id WHEN 25 THEN 1.0 WHEN 26 THEN 1.0 [...] ELSE 0 END) AS "search_score" FROM "freelancer_freelancerprofile" WHERE "freelancer_freelancerprofile"."id" IN (25, 26, [...]) ORDER BY "search_score" DESC It should be very fast, as there is no table lookup, but there is an assumption at the heart of this, which is that the search query doesn't contain the entire database - i.e. that it has been paged. (ES itself caps the results at 10,000.) """ hits = search_query.hits score_sql = self._raw_sql([(h['id'], h['score']) for h in hits]) rank_sql = self._raw_sql([(hits[i]['id'], i) for i in range(len(hits))]) return ( self.get_queryset() .filter(pk__in=[h['id'] for h in hits]) # add the query relevance score .annotate(search_score=RawSQL(score_sql, ())) # add the ordering number (0-based) .annotate(search_rank=RawSQL(rank_sql, ())) .order_by('search_rank') )
def order_naturally(self, method=IFACE_ORDERING_POSITION): """ Naturally order interfaces by their type and numeric position. The sort method must be one of the defined IFACE_ORDERING_CHOICES (typically indicated by a parent Device's DeviceType). To order interfaces naturally, the `name` field is split into six distinct components: leading text (type), slot, subslot, position, channel, and virtual circuit: {type}{slot}/{subslot}/{position}/{subposition}:{channel}.{vc} Components absent from the interface name are ignored. For example, an interface named GigabitEthernet1/2/3 would be parsed as follows: name = 'GigabitEthernet' slot = 1 subslot = 2 position = 3 subposition = 0 channel = None vc = 0 The original `name` field is taken as a whole to serve as a fallback in the event interfaces do not match any of the prescribed fields. """ sql_col = '{}.name'.format(self.model._meta.db_table) ordering = { IFACE_ORDERING_POSITION: ( '_slot', '_subslot', '_position', '_subposition', '_channel', '_type', '_vc', '_id', 'name', ), IFACE_ORDERING_NAME: ( '_type', '_slot', '_subslot', '_position', '_subposition', '_channel', '_vc', '_id', 'name', ), }[method] TYPE_RE = r"SUBSTRING({} FROM '^([^0-9]+)')" ID_RE = r"CAST(SUBSTRING({} FROM '^(?:[^0-9]+)([0-9]+)$') AS integer)" SLOT_RE = r"CAST(SUBSTRING({} FROM '^(?:[^0-9]+)?([0-9]+)\/') AS integer)" SUBSLOT_RE = r"COALESCE(CAST(SUBSTRING({} FROM '^(?:[^0-9]+)?(?:[0-9]+\/)([0-9]+)') AS integer), 0)" POSITION_RE = r"COALESCE(CAST(SUBSTRING({} FROM '^(?:[^0-9]+)?(?:[0-9]+\/){{2}}([0-9]+)') AS integer), 0)" SUBPOSITION_RE = r"COALESCE(CAST(SUBSTRING({} FROM '^(?:[^0-9]+)?(?:[0-9]+\/){{3}}([0-9]+)') AS integer), 0)" CHANNEL_RE = r"COALESCE(CAST(SUBSTRING({} FROM ':([0-9]+)(\.[0-9]+)?$') AS integer), 0)" VC_RE = r"COALESCE(CAST(SUBSTRING({} FROM '\.([0-9]+)$') AS integer), 0)" fields = { '_type': RawSQL(TYPE_RE.format(sql_col), []), '_id': RawSQL(ID_RE.format(sql_col), []), '_slot': RawSQL(SLOT_RE.format(sql_col), []), '_subslot': RawSQL(SUBSLOT_RE.format(sql_col), []), '_position': RawSQL(POSITION_RE.format(sql_col), []), '_subposition': RawSQL(SUBPOSITION_RE.format(sql_col), []), '_channel': RawSQL(CHANNEL_RE.format(sql_col), []), '_vc': RawSQL(VC_RE.format(sql_col), []), } return self.annotate(**fields).order_by(*ordering)