Python django.db.models 模块,QuerySet() 实例源码

我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用django.db.models.QuerySet()

项目:django-actions-logger    作者:shtalinberg    | 项目源码 | 文件源码
def get_for_objects(self, queryset):
        """
        Get log entries for the objects in the specified queryset.
        :param queryset: The queryset to get the log entries for.
        :type queryset: QuerySet
        :return: The LogAction objects for the objects in the given queryset.
        :rtype: QuerySet
        """
        if not isinstance(queryset, QuerySet) or queryset.count() == 0:
            return self.none()

        content_type = ContentType.objects.get_for_model(queryset.model)
        primary_keys = queryset.values_list(queryset.model._meta.pk.name, flat=True)

        if isinstance(primary_keys[0], integer_types):
            return self.filter(content_type=content_type).filter(Q(object_id__in=primary_keys)).distinct()
        else:
            return self.filter(content_type=content_type).filter(Q(object_pk__in=primary_keys)).distinct()
项目:zing    作者:evernote    | 项目源码 | 文件源码
def _earliest_or_latest(self, field_name=None, direction="-"):
        """
        Overrides QuerySet._earliest_or_latest to add pk for secondary ordering
        """
        order_by = field_name or getattr(self.model._meta, 'get_latest_by')
        assert bool(order_by), "earliest() and latest() require either a "\
            "field_name parameter or 'get_latest_by' in the model"
        assert self.query.can_filter(), \
            "Cannot change a query once a slice has been taken."
        obj = self._clone()
        obj.query.set_limits(high=1)
        obj.query.clear_ordering(force_empty=True)
        # add pk as secondary ordering for Submissions
        obj.query.add_ordering('%s%s' % (direction, order_by),
                               '%s%s' % (direction, "pk"))
        return obj.get()
项目:pandachaika    作者:pandabuilder    | 项目源码 | 文件源码
def get_queryset(self) -> QuerySet:

        tag_clean = self.request.GET.get('q', '').replace(" ", "_")
        m = re.match(r"^([-^])", tag_clean)
        if m:
            self.modifier = m.group(1)
            tag_clean = tag_clean.replace(self.modifier, "")
        else:
            self.modifier = ''
        scope_name = tag_clean.split(":", maxsplit=1)

        if len(scope_name) > 1:
            results = Tag.objects.exclude(source='user').filter(
                Q(name__contains=scope_name[1]),
                Q(scope__contains=scope_name[0]))
        else:
            results = Tag.objects.exclude(source='user').filter(
                Q(name__contains=tag_clean) |
                Q(scope__contains=tag_clean))

        return results.distinct()
项目:pandachaika    作者:pandabuilder    | 项目源码 | 文件源码
def eligible_for_use(self, **kwargs: typing.Any) -> QuerySet:
        return self.get_queryset().eligible_for_use(**kwargs)

    # # This is commented because it didn't work in MySQL
    # def need_new_archive(self, **kwargs):
    #
    #     return self.annotate(
    #         num_archives=Count('archive')
    #     ).filter(
    #         (
    #             (
    #                 Q(num_archives=1) &
    #                 ~Q(filesize=F('archive__filesize')) &
    #                 ~Q(filesize=0)
    #             ) |
    #             Q(archive__isnull=True)
    #         ),
    #         **kwargs
    #     ).prefetch_related('archive_set').order_by('-create_date')
项目:tumanov_castleoaks    作者:Roamdev    | 项目源码 | 文件源码
def products_changed_handler(sender, **kwargs):
    """
        ?????????? ??????? ????????? ???-?? ??????? ?????????,
        ??????????? ??????????????? ? ?????????.

        ???????????? ??? ?????????? ????????? ??????? ? ?????? ?????????.
    """
    categories = kwargs.get('categories')
    if isinstance(categories, ShopCategory):
        # ????????? ?????????
        categories = ShopCategory.objects.filter(pk=categories.pk)
    elif isinstance(categories, (int, str)):
        # ?????? ??? ?????, ?????????? ID ?????????
        categories = ShopCategory.objects.filter(pk=categories)
    elif isinstance(categories, (list, tuple, set, ValuesListQuerySet)):
        # ?????? ????? ??? ?????, ?????????? ID ?????????
        categories = ShopCategory.objects.filter(pk__in=categories)
    elif isinstance(categories, QuerySet) and categories.model is ShopCategory:
        # QuerySet ?????????
        pass
    else:
        raise TypeError('Invalid categories for signal "products_changed"')

    with transaction.atomic():
        categories.update(
            product_count=RawSQL(
                '(SELECT COUNT(*) '
                'FROM shop_shopproduct AS ssp '
                'WHERE ssp.category_id = shop_shopcategory.id '
                'AND ssp.is_visible = TRUE)',
                ()
            )
        )
        categories.update(
            total_product_count=F('product_count')
        )

    categories_changed.send(ShopCategory, categories=categories)
项目:django-chemtrails    作者:inonit    | 项目源码 | 文件源码
def test_get_group_filters(self):
        group = Group.objects.create(name='test group')
        group.permissions.add(*Permission.objects.filter(codename__in=['add_user', 'change_user']))

        user = User.objects.create_user(username='testuser', password='test123.')
        user.groups.add(group)

        # Get group filters for group
        checker = utils.GraphPermissionChecker(group)
        filters = checker.get_group_filters()
        permissions = Permission.objects.filter(**filters)
        self.assertIsInstance(permissions, QuerySet)
        self.assertEqual(permissions.count(), 2)

        # Get group filters for use
        checker = utils.GraphPermissionChecker(user)
        filters = checker.get_group_filters()
        permissions = Permission.objects.filter(**filters)
        self.assertIsInstance(permissions, QuerySet)
        self.assertEqual(permissions.count(), 2)
项目:django-channels-router    作者:Monadical-SAS    | 项目源码 | 文件源码
def group(self):
        """
        get a django channels Group consisting of all the
        reply_channels in the QuerySet
        """
        self._channel_names = self.values_list('channel_name', flat=True)

        if not self._channel_names:
            empty_group = Group('emptyname')
            empty_group.empty = True
            return empty_group

        # group name is the hash of all the channel_names
        combined_names = b''.join(
            i.encode('utf-8', errors='replace')
            for i in self._channel_names
        )
        group_id = md5(combined_names).hexdigest()
        combined_group = Group(name=group_id)

        for channel_name in self._channel_names:
            if channel_name.startswith('bot-'):
                continue
            combined_group.add(Channel(channel_name))
        return combined_group
项目:montage    作者:storyful    | 项目源码 | 文件源码
def get_obj_or_api_404(model_class_or_queryset, **lookups):
    """
        Gets a object as per args in lookups dict.

        Can use either a model or a queryset to query on
    """
    if isinstance(model_class_or_queryset, QuerySet):
        qs = model_class_or_queryset
        model_class = model_class_or_queryset.model
    else:
        qs = model_class_or_queryset.objects
        model_class = model_class_or_queryset

    try:
        return qs.get(**lookups)

    except model_class.DoesNotExist:
        raise NotFoundException(
            "{0} instance with {1} not found".format(
                model_class.__name__, str(lookups)))
项目:mk42    作者:Peer-Lab    | 项目源码 | 文件源码
def active(self, *args, **kwargs):
        """
        Return active groups.

        :param args: additional args.
        :type args: list.
        :param kwargs: additional args.
        :type kwargs: dict.
        :return: queryset with active groups.
        :rtype: django.db.models.query.QuerySet.
        """

        return self.filter(active=True)
项目:mk42    作者:Peer-Lab    | 项目源码 | 文件源码
def inactive(self, *args, **kwargs):
        """
        Return inactive groups.

        :param args: additional args.
        :type args: list.
        :param kwargs: additional args.
        :type kwargs: dict.
        :return: queryset with inactive groups.
        :rtype: django.db.models.query.QuerySet.
        """

        return self.filter(active=False)
项目:mk42    作者:Peer-Lab    | 项目源码 | 文件源码
def active(self, *args, **kwargs):
        """
        Return active membership.

        :param args: additional args.
        :type args: list.
        :param kwargs: additional args.
        :type kwargs: dict.
        :return: queryset with active memberships.
        :rtype: django.db.models.query.QuerySet.
        """

        return self.filter(active=True)
项目:mk42    作者:Peer-Lab    | 项目源码 | 文件源码
def inactive(self, *args, **kwargs):
        """
        Return inactive membership.

        :param args: additional args.
        :type args: list.
        :param kwargs: additional args.
        :type kwargs: dict.
        :return: queryset with inactive memberships.
        :rtype: django.db.models.query.QuerySet.
        """

        return self.filter(active=False)
项目:mk42    作者:Peer-Lab    | 项目源码 | 文件源码
def active(self, *args, **kwargs):
        """
        Return users with is_active == True.

        :param args: additional args.
        :type args: list.
        :param kwargs: additional args.
        :type kwargs: dict.
        :return: queryset of users with is_active == True.
        :rtype: django.db.models.query.QuerySet.
        """

        return self.filter(is_active=True)
项目:drf-metadata    作者:night-crawler    | 项目源码 | 文件源码
def get_NAME_serializer(self, field: models.Field, qs: models.QuerySet, obj=None) -> Serializer:
        """
        Returns serializer <NAME> field. This method (`get_NAME_serializer`) is not supposed to use directly.
        :param obj: optional obj passed to method
        :param field: model field
        :param qs: queryset
        :return: Serializer
        """
        raise Exception()

    # noinspection PyPep8Naming,PyMethodMayBeStatic
项目:drf-metadata    作者:night-crawler    | 项目源码 | 文件源码
def get_NAME_queryset(self, field: models.Field, obj=None) -> models.QuerySet:
        """
        Redefines field <NAME> queryset (fk, m2m, etc. querysets).
            This method (`get_NAME_queryset`) is not supposed to use directly.
        :param field:
        :param obj: optional obj passed to method
        :return: QuerySet()
        """
        raise Exception()

    # noinspection PyPep8Naming,PyMethodMayBeStatic
项目:drf-metadata    作者:night-crawler    | 项目源码 | 文件源码
def get_NAME_dataset_url(self, field: models.Field, obj=None) -> models.QuerySet:
        """
        Redefines all dataset_url bindings. Supposed to use in runtime
            (when there's no ability to pass reverse_lazy, etc.)
        :param field:
        :param obj: optional obj passed to method
        :return: QuerySet()
        """
        raise Exception()

    # noinspection PyProtectedMember
项目:drf-metadata    作者:night-crawler    | 项目源码 | 文件源码
def default_queryset_serializer(qs: models.QuerySet, many: bool = False) -> DRFMimicSerializer:
        """
        Simple serializer that looks like default DRF serializer
        :param qs: queryset
        :param many: not used, mimic to DRF serializer
        :return:
        """
        res = [{'id': item.id, 'name': str(item)} for item in qs]
        return DRFMimicSerializer(data=res)
项目:drf-metadata    作者:night-crawler    | 项目源码 | 文件源码
def serialize_queryset(self, field: models.Field, qs: models.QuerySet) -> t.Dict:
        serializer = self.get_serializer(field)
        return serializer(qs, many=True).data
项目:django-actions-logger    作者:shtalinberg    | 项目源码 | 文件源码
def get_for_model(self, model):
        """
        Get log entries for all objects of a specified type.
        :param model: The model to get log entries for.
        :type model: class
        :return: QuerySet of log entries for the given model.
        :rtype: QuerySet
        """
        # Return empty queryset if the given object is not valid.
        if not issubclass(model, models.Model):
            return self.none()

        ct = ContentType.objects.get_for_model(model)

        return self.filter(content_type=ct)
项目:django-ajax-views    作者:Pyco7    | 项目源码 | 文件源码
def default_filter(self, opts, *args, **kwargs):
        """
        This is called by the view's ``get_queryset`` method.

        It calls the :func:`ajax_filter` first to apply the filter and then :func:`ajax_sort` to sort the filtered
        QuerySet. Each of these methods can be overridden to manipulate the QuerySet.

        :param opts: Filter options passed through request.
        :param args: Positional filter options in addition to opts.
        :param kwargs: Keword filter options in addition to opts.
        :return: Filtered QuerySet
        """
        return self.ajax_filter(opts, *args, **kwargs).ajax_sorter(opts)
项目:django-tree    作者:BertrandBordage    | 项目源码 | 文件源码
def __init__(self, field, value):
        self.field = field
        self.attname = getattr(self.field, 'attname', None)
        self.field_bound = self.attname is not None
        self.qs = (self.field.model._default_manager.all()
                   if self.field_bound else QuerySet())
        self.value = value
项目:fantasie    作者:shawn1m    | 项目源码 | 文件源码
def get_random_song_list(self, n):
        # ?????n?QueryObject??,??QuerySet
        # ???????order_by('?')
        result = []
        count = self.aggregate(count=Count('id'))['count']
        for _ in range(n):
            try:
                random_index = randint(0, count - 1)
            except ValueError:
                raise MusicLibrary.DoesNotExist
            result.append(self.all()[random_index])
        return result
项目:PrivacyScore    作者:PrivacyScore    | 项目源码 | 文件源码
def ordered_columns(self) -> QuerySet:
        """Get the ordered column values of this site."""
        return self.sorted_columns
项目:postix    作者:c3cashdesk    | 项目源码 | 文件源码
def active(self, session=None) -> models.QuerySet:
        qs = self.get_queryset().filter(
            status=TroubleshooterNotification.STATUS_NEW,
            created__gt=now() - timedelta(minutes=10),
            session__end__isnull=True,
        )
        return qs.filter(session=session) if session else qs
项目:postix    作者:c3cashdesk    | 项目源码 | 文件源码
def get_queryset(self) -> QuerySet:
        return Cashdesk.objects.filter(is_active=True).order_by('name')
项目:postix    作者:c3cashdesk    | 项目源码 | 文件源码
def get_queryset(self) -> QuerySet:
        return CashdeskSession.objects.filter(end__isnull=False).order_by('-end')
项目:postix    作者:c3cashdesk    | 项目源码 | 文件源码
def get_queryset(self) -> QuerySet:
        queryset = PreorderPosition.objects.all().order_by('id')
        exact_param = self.request.GET.get('secret', None)
        search_param = self.request.GET.get('search', None)
        if exact_param is not None:
            queryset = queryset.filter(secret__iexact=exact_param)
        elif search_param is not None and len(search_param) >= 6:
            queryset = queryset.filter(secret__istartswith=search_param)
        else:
            queryset = queryset.none()
        return queryset
项目:postix    作者:c3cashdesk    | 项目源码 | 文件源码
def get_queryset(self) -> QuerySet:
        queryset = ListConstraintEntry.objects.all().order_by('id')
        listid_param = self.request.query_params.get('listid', None)
        if listid_param is not None:
            queryset = queryset.filter(list_id=listid_param)
            search_param = self.request.query_params.get('search', None)
            if search_param is not None and len(search_param) >= 3:
                queryset = queryset.filter(Q(name__icontains=search_param) | Q(identifier__icontains=search_param))
            elif search_param is not None:
                queryset = queryset.filter(identifier__iexact=search_param)
            else:
                queryset = queryset.none()
        else:
            queryset = queryset.none()
        return queryset
项目:postix    作者:c3cashdesk    | 项目源码 | 文件源码
def get_queryset(self) -> QuerySet:
        qs = TransactionPosition.objects.all()\
            .order_by('-transaction__datetime')\
            .select_related('transaction')
        if 'cashdesk' in self.filter:
            qs = qs.filter(transaction__session__cashdesk=self.filter['cashdesk'])
        if 'type' in self.filter and self.filter['type']:
            qs = qs.filter(type=self.filter['type'])
        if 'receipt' in self.filter and self.filter['receipt']:
            qs = qs.filter(transaction__receipt_id=self.filter['receipt'])
        return qs
项目:csv_generator    作者:fatboystring    | 项目源码 | 文件源码
def for_content_type_id(self, content_type_id):
        """
        Method to return a queryset of CsvGenerator
        model instances for a given content type

        :param content_type_id: ContentType model id
        :type content_type_id: int

        :return: QuerySet of CsvGenerator model instances
        """
        return self.filter(content_type_id=content_type_id)
项目:csv_generator    作者:fatboystring    | 项目源码 | 文件源码
def for_content_type(self, content_type):
        """
        Method to return a queryset of CsvGenerator
        model instances for a given content type

        :param content_type: ContentType model
        :type content_type: django.contrib.contenttype.models.ContentType

        :return: QuerySet of CsvGenerator model instances
        """
        return self.filter(content_type=content_type)
项目:csv_generator    作者:fatboystring    | 项目源码 | 文件源码
def for_model(self, model):
        """
        Method to return a queryset of CsvGenerator
        model instances for a given type of model

        :param model: Model class or instance
        :type model: django.db.models.Model

        :return: QuerySet of CsvGenerator model instances
        """
        return self.for_content_type(ContentType.objects.get_for_model(model))
项目:lifter    作者:EliotBerriot    | 项目源码 | 文件源码
def setup():
    """
    This is a bit dirty, but to make lifter available on django querysets,
    we simply add a custom method to django base queryset class.
    """

    method_name = 'locally'
    setattr(QuerySet, method_name, _locally)
项目:autostew    作者:Autostew    | 项目源码 | 文件源码
def get_connected_members(self) -> QuerySet:
        return self.member_set.filter(still_connected=True)
项目:autostew    作者:Autostew    | 项目源码 | 文件源码
def get_members_who_finished_race(self) -> QuerySet:
        if self.parent_or_self.is_result:
            return self.parent_or_self.member_set.filter(still_connected=True)
        return None
项目:pandachaika    作者:pandabuilder    | 项目源码 | 文件源码
def crawl_urls_caller(self, urls: List[str], wanted_filters: QuerySet=None, wanted_only: bool=False):
        try:
            self.crawl_urls(urls, wanted_filters=wanted_filters, wanted_only=wanted_only)
        except BaseException:
            thread_logger = logging.getLogger('viewer.threads')
            thread_logger.error(traceback.format_exc())
项目:pandachaika    作者:pandabuilder    | 项目源码 | 文件源码
def crawl_urls(self, urls: List[str], wanted_filters: QuerySet = None, wanted_only: bool = False) -> None:
        pass
项目:pandachaika    作者:pandabuilder    | 项目源码 | 文件源码
def sort_tags(tag_list: QuerySet) -> List[Tuple[str, List['Tag']]]:

    prioritized_tag_list = list()

    for scope in scope_priorities:
        matched_tags = sorted([x for x in tag_list if x.scope == scope], key=str)
        if matched_tags:
            prioritized_tag_list.append((scope, matched_tags))

    remaining_tags = sorted([x for x in tag_list if x.scope not in scope_priorities], key=str)
    if remaining_tags:
        prioritized_tag_list.append((None, remaining_tags))

    return prioritized_tag_list
项目:pandachaika    作者:pandabuilder    | 项目源码 | 文件源码
def create_matches_wanted_galleries_from_providers(wanted_galleries: QuerySet, provider: str, logger: OptionalLogger=None) -> None:

    try:
        matchers = crawler_settings.provider_context.get_matchers(crawler_settings, logger, filter_name=provider, force=True, matcher_type='title')
        for matcher_element in matchers:
            matcher = matcher_element[0]
            for wanted_gallery in wanted_galleries:
                results = matcher.create_closer_matches_values(wanted_gallery.search_title)
                for gallery_data in results:
                    gallery_data[1]['dl_type'] = 'gallery_match'
                    gallery = Gallery.objects.update_or_create_from_values(gallery_data[1])
                    if gallery:
                        GalleryMatch.objects.get_or_create(
                            wanted_gallery=wanted_gallery,
                            gallery=gallery,
                            defaults={'match_accuracy': gallery_data[2]})
                if logger:
                    if results:
                        logger.info(
                            'Searched for wanted gallery: {} in panda, total possible matches: {}'.format(
                                wanted_gallery.title,
                                wanted_gallery.possible_matches.count()
                            )
                        )
                    else:
                        logger.info(
                            'Searched for wanted gallery: {} in panda, no match found'.format(wanted_gallery.title)
                        )
                time.sleep(crawler_settings.wait_timer)
        if logger:
            logger.info('Search ended.')
    except BaseException:
        thread_logger = logging.getLogger('viewer.threads')
        thread_logger.error(traceback.format_exc())


# WantedGallery
项目:pandachaika    作者:pandabuilder    | 项目源码 | 文件源码
def are_custom(self) -> QuerySet:
        return self.filter(source='user')
项目:pandachaika    作者:pandabuilder    | 项目源码 | 文件源码
def not_custom(self) -> QuerySet:
        return self.exclude(source='user')
项目:pandachaika    作者:pandabuilder    | 项目源码 | 文件源码
def first_artist_tag(self) -> QuerySet:
        return self.filter(scope__exact='artist').first()
项目:pandachaika    作者:pandabuilder    | 项目源码 | 文件源码
def several_archives(self) -> QuerySet:
        return self.annotate(
            num_archives=Count('archive')
        ).filter(num_archives__gt=1).order_by('-id').prefetch_related('archive_set')
项目:pandachaika    作者:pandabuilder    | 项目源码 | 文件源码
def non_used_galleries(self, **kwargs: typing.Any) -> QuerySet:
        return self.filter(
            ~Q(status=Gallery.DELETED),
            ~Q(dl_type__contains='skipped'),
            Q(archive__isnull=True),
            **kwargs
        ).order_by('-create_date')
项目:pandachaika    作者:pandabuilder    | 项目源码 | 文件源码
def eligible_for_use(self, **kwargs: typing.Any) -> QuerySet:
        return self.filter(
            ~Q(status=Gallery.DELETED),
            **kwargs
        )
项目:pandachaika    作者:pandabuilder    | 项目源码 | 文件源码
def filter_order_created(self, **kwargs: typing.Any) -> QuerySet:
        return self.filter(**kwargs).order_by('-create_date')
项目:pandachaika    作者:pandabuilder    | 项目源码 | 文件源码
def filter_order_modified(self, **kwargs: typing.Any) -> QuerySet:
        return self.filter(**kwargs).order_by('-last_modified')
项目:pandachaika    作者:pandabuilder    | 项目源码 | 文件源码
def after_posted_date(self, date: datetime) -> QuerySet:
        return self.filter(posted__gte=date).order_by('posted')
项目:pandachaika    作者:pandabuilder    | 项目源码 | 文件源码
def filter_non_existent(self) -> QuerySet:
        return self.get_queryset().several_archives()
项目:pandachaika    作者:pandabuilder    | 项目源码 | 文件源码
def non_used_galleries(self, **kwargs: typing.Any) -> QuerySet:
        return self.get_queryset().non_used_galleries(**kwargs)