我们从Python开源项目中,提取了以下17个代码示例,用于说明如何使用django.db.models.OuterRef()。
def get_activity(self, limit=20, offset=0, distinct=False, friends_only=False, request=None): #Todo: make distinct work; combine friends and following, but then get posts from them friends = Friendship.get_friendships(self, 0) friend_ids = [] for friend in friends: friend_ids.append(friend.other(self)) follows = self.follow_source.filter().values_list('target', flat=True) if not friends_only: friend_ids.append(self.id) for thing in follows: friend_ids.append(thing) if request.user.is_authenticated: has_yeah = Yeah.objects.filter(post=OuterRef('id'), by=request.user.id) if distinct: posts = Post.objects.select_related('creator').select_related('community').annotate(num_yeahs=Count('yeah', distinct=True), num_comments=Count('comment', distinct=True), yeah_given=Exists(has_yeah, distinct=True)).annotate(max_created=Max('creator__post__created')).filter(created=F('max_created')).filter(creator__in=friend_ids).order_by('-created')[offset:offset + limit] else: posts = Post.objects.select_related('creator').select_related('community').annotate(num_yeahs=Count('yeah', distinct=True), num_comments=Count('comment', distinct=True), yeah_given=Exists(has_yeah, distinct=True)).filter(creator__in=friend_ids).order_by('-created')[offset:offset + limit] if request: for post in posts: post.setup(request) post.recent_comment = post.recent_comment() return posts
def relevant_reqs_count(params): """Create a subquery of the count of requirements relevant to the provided query parameters""" subquery = Requirement.objects.filter(policy=OuterRef('pk')) params = subfilter_params(params, 'requirements') subquery = RequirementFilter(params, queryset=subquery).qs subquery = filter_by_topic(params, subquery) subquery = filter_by_agency(params, subquery) subquery = filter_by_agency_group(params, subquery) subquery = filter_by_all_agency(params, subquery) subquery = subquery.filter(public=True) subquery = subquery.values('policy').\ annotate(count=Count('policy')).values('count').\ order_by() # clear default order return Subquery(subquery, output_field=IntegerField())
def get_user_queryset(self): qs = super(UserViewSet, self).get_user_queryset() if self.action == 'retrieve': if self.request.user.is_authenticated(): qs = qs.annotate( followed=models.ExpressionWrapper( models.Count( models.Subquery( User.following.through.objects.filter( to_user_id=self.request.user.id, from_user_id=models.OuterRef('id') ).values('id') ) ), output_field=models.BooleanField() ) ) return qs
def get_queryset(self): user = self.get_user_object() try: rel_field = getattr(user, self.allowed_actions[self.action]) except KeyError: raise NotFound if self.request.user.is_authenticated(): rel_field = rel_field.annotate( followed=models.ExpressionWrapper( models.Count( models.Subquery( User.following.through.objects.filter( to_user_id=self.request.user.id, from_user_id=models.OuterRef('id') ).values('id') ) ), output_field=models.BooleanField() ) ) return rel_field.order_by('id')
def feeds(username): follower_ids = Relationship.objects.filter( owner__user__username=username).values_list('follow_id', flat=True).distinct() qs_reacted = Reaction.objects.filter(post=models.OuterRef('pk'), owner__user__username=username) qs_added = Post.objects.filter(origin=models.OuterRef('pk'), owner__user__username=username) return Post.objects.filter( models.Q(owner_id__in=follower_ids) | models.Q(owner__user__username=username)).annotate( is_reacted=models.Exists(queryset=qs_reacted)).annotate( is_added=models.Exists(queryset=qs_added)).order_by('-created_at')
def links_by_user(username, user=None): qs_reactions = Reaction.objects.filter(post=models.OuterRef('pk'), owner=user) qs_added = Post.objects.filter(origin=models.OuterRef('pk'), owner=user) return Post.objects.filter(owner__user__username=username).annotate( is_reacted=models.Exists(queryset=qs_reactions)).annotate( is_added=models.Exists(queryset=qs_added)).order_by('-created_at')
def test_values(self): qs = models.Cat.objects.include('archetype') list(qs.filter(id__in=Subquery(qs.filter(id=OuterRef('pk')).values('pk'))))
def _with_has_successful_render_annotation(self): renders = Render.objects.filter(paper=models.OuterRef('pk'), state=Render.STATE_SUCCESS) return self.annotate(has_successful_render=models.Exists(renders))
def _with_has_not_expired_render_annotation(self): renders = Render.objects.filter(paper=models.OuterRef('pk'), is_expired=False) return self.annotate(has_not_expired_render=models.Exists(renders))
def get_queryset(self, *args, **kwargs): overridden_reviews = Review.objects.filter(override_vote__isnull=False, submission_id=models.OuterRef('pk')) return self.request.event.submissions\ .order_by('review_id')\ .annotate(has_override=models.Exists(overridden_reviews))\ .annotate(avg_score=models.Case( models.When( has_override=True, then=self.request.event.settings.review_max_score + 1, ), default=models.Avg('reviews__score') ))\ .order_by('-state', '-avg_score')
def post_view(request, post): has_yeah = Yeah.objects.filter(post=OuterRef('id'), by=request.user.id) try: post = Post.objects.annotate(num_yeahs=Count('yeah', distinct=True), num_comments=Count('comment', distinct=True), yeah_given=Exists(has_yeah, distinct=True)).get(id=post) except Post.DoesNotExist: raise Http404() post.setup(request) if post.poll: post.poll.setup(request.user) if request.user.is_authenticated: post.can_rm = post.can_rm(request) post.is_favorite = post.is_favorite(request.user) post.can_comment = post.can_comment(request) if post.is_mine: title = 'Your post' else: title = '{0}\'s post'.format(post.creator.nickname) all_comment_count = post.number_comments() if all_comment_count > 20: comments = post.get_comments(request, None, all_comment_count - 20) else: comments = post.get_comments(request) return render(request, 'closedverse_main/post-view.html', { 'title': title, #CSS might not be that friendly with this / 'classes': ['post-permlink'], 'post': post, 'yeahs': post.get_yeahs(request), 'comments': comments, 'all_comment_count': all_comment_count, 'ogdata': { 'title': title, 'description': post.trun(), 'date': str(post.created), 'image': post.creator.do_avatar(post.feeling), }, })
def get_posts(self, limit=50, offset=0, request=None): if request.user.is_authenticated: has_yeah = Yeah.objects.filter(post=OuterRef('id'), by=request.user.id) posts = self.post_set.select_related('community').select_related('creator').annotate(num_yeahs=Count('yeah', distinct=True), num_comments=Count('comment', distinct=True), yeah_given=Exists(has_yeah, distinct=True)).filter().order_by('-created')[offset:offset + limit] else: posts = self.post_set.select_related('community').select_related('creator').annotate(num_yeahs=Count('yeah', distinct=True), num_comments=Count('comment', distinct=True)).filter().order_by('-created')[offset:offset + limit] if request: for post in posts: post.setup(request) post.recent_comment = post.recent_comment() return posts
def get_comments(self, limit=50, offset=0, request=None): if request.user.is_authenticated: has_yeah = Yeah.objects.filter(comment=OuterRef('id'), by=request.user.id) posts = self.comment_set.select_related('original_post').select_related('creator').select_related('original_post__creator').annotate(num_yeahs=Count('yeah', distinct=True), yeah_given=Exists(has_yeah, distinct=True)).filter().order_by('-created')[offset:offset + limit] else: posts = self.comment_set.select_related('original_post').select_related('original_post__creator').select_related('creator').annotate(num_yeahs=Count('yeah', distinct=True)).filter().order_by('-created')[offset:offset + limit] if request: for post in posts: post.setup(request) return posts
def get_posts(self, limit=50, offset=0, request=None, favorite=False): if request.user.is_authenticated: has_yeah = Yeah.objects.filter(post=OuterRef('id'), by=request.user.id) posts = Post.objects.select_related('creator').annotate(num_yeahs=Count('yeah', distinct=True), num_comments=Count('comment', distinct=True), yeah_given=Exists(has_yeah, distinct=True)).filter(community_id=self.id).order_by('-created')[offset:offset + limit] else: posts = Post.objects.select_related('creator').annotate(num_yeahs=Count('yeah', distinct=True), num_comments=Count('comment', distinct=True)).filter(community_id=self.id).order_by('-created')[offset:offset + limit] if request: for post in posts: post.setup(request) post.recent_comment = post.recent_comment() return posts
def get_queryset(self): queryset = super().get_queryset() queryset = queryset.filter(public=True) queryset = queryset.annotate( has_docnode=Exists(DocNode.objects.filter( policy=OuterRef('pk'), )), total_reqs=relevant_reqs_count({}), relevant_reqs=relevant_reqs_count(self.request.GET), ).filter(relevant_reqs__gt=0) return queryset
def with_voted_flag(self, user): return self.annotate( voted=models.Exists( Experience.voted_users.through.objects.filter( user=user.id, experience=models.OuterRef('pk') ) ) )
def get_queryset(self): queryset = Activity.objects.all() if self.action == 'timeline': user = self.request.user queryset = queryset.filter( models.Q( user__in=models.Subquery( user.following.through.objects.filter( to_user_id=user.id ).values('from_user_id') ) ) | ( models.Q( brick_name__in=models.Subquery( user.bricks_watching.through.objects.filter( user=user.id ).values('brick') ) ) & ~models.Q(user=user.id) ) ) user = self.request.query_params.get('user', None) type = self.request.query_params.get('type', None) if user is not None: queryset = queryset.filter(user__username=user) if type is not None: queryset = queryset.filter(type__in=type.split(',')) queryset = queryset.annotate( score=models.Case( models.When( type='Watch', then=models.Subquery( BiobrickMeta.objects .filter(part_name=models.OuterRef('brick_name')) .values('rate_score') ) ), default=None ) ) return queryset.order_by('-acttime')