我们从Python开源项目中,提取了以下30个代码示例,用于说明如何使用rest_framework.serializers.ListSerializer()。
def many_init(cls, *args, **kwargs): """ This method implements the creation of a `ListSerializer` parent class when `many=True` is used. You can customize it if you need to control which keyword arguments are passed to the parent, and which are passed to the child. Note that we're over-cautious in passing most arguments to both parent and child classes in order to try to cover the general case. If you're overriding this method you'll probably want something much simpler, eg: @classmethod def many_init(cls, *args, **kwargs): kwargs['child'] = cls() return CustomListSerializer(*args, **kwargs) """ kwargs['child'] = BasePageSerializer(*args, **kwargs) return ListSerializer(*args, **kwargs)
def to_internal_value(self, data): """ This implements the same relevant logic as ListSerializer except that if one or more items fail validation, processing for other items that did not fail will continue. """ if not isinstance(data, list): message = self.error_messages['not_a_list'].format( input_type=type(data).__name__ ) raise serializers.ValidationError({ api_settings.NON_FIELD_ERRORS_KEY: [message] }) ret = [] for item in data: try: validated = self.child.run_validation(item) except serializers.ValidationError as exc: ret.append(exc.detail) else: ret.append(validated) return ret
def exclude_omitted_fields(self, request): field_names = self.get_requested_field_names(request) self._requested_fields = field_names if field_names is not None: # Drop any fields that are not specified in passed query param allowed = set(field_names) existing = set(self.fields.keys()) for field_name in existing - allowed: self.fields.pop(field_name) for field_name in self.fields: field = self.fields[field_name] if isinstance(field, serializers.ListSerializer): if isinstance(field.child, DynModelSerializer): field.child.exclude_omitted_fields(request) elif isinstance(field, DynModelSerializer): field.exclude_omitted_fields(request)
def data(self): # The parent class returns ReturnList return ReturnDict( { LINKS_FIELD_NAME: { URL_FIELD_NAME: { 'href': self.context['request'].build_absolute_uri() } }, EMBEDDED_FIELD_NAME: { # `items` mirrors hardcoded value in pagination classes 'items': super(ListSerializer, self).data } }, serializer=self )
def react_user_list(users, project, identifier): users = users.order_by(Lower('username')) user_list = ListSerializer(users, child=UserWithMailSerializer()).data format_strings = { 'users': json.dumps(user_list), 'project': project.pk, 'listen_to': identifier, } return format_html( ( '<span data-euth-widget="userlist"' ' data-users="{users}"' ' data-project="{project}"' ' data-listen-to="{listen_to}"></span>' ), **format_strings )
def assert_required_fields(self, field_names, parent_getter, field_parent_getter): """ Helper function to assert required fields """ for key in field_names: field = field_parent_getter(ProfileFilledOutSerializer().fields)[key] is_generated = isinstance(field, (ListSerializer, SerializerMethodField, ReadOnlyField)) is_skippable = (field.read_only or field.allow_null or field.allow_blank) # skip fields that are skippable, generated, read only, or that tie # to other serializers which are tested elsewhere. if is_generated or is_skippable: continue clone = deepcopy(self.data) clone["image"] = self.profile.image clone["image_small"] = self.profile.image_small clone["image_medium"] = self.profile.image_medium parent_getter(clone)[key] = None with self.assertRaises(ValidationError) as ex: ProfileFilledOutSerializer(data=clone).is_valid(raise_exception=True) assert parent_getter(ex.exception.args[0]) == {key: ['This field may not be null.']} if isinstance(field, CharField): # test blank string too parent_getter(clone)[key] = "" with self.assertRaises(ValidationError) as ex: ProfileFilledOutSerializer(data=clone).is_valid(raise_exception=True) assert parent_getter(ex.exception.args[0]) == {key: ['This field may not be blank.']}
def test_patch_own_profile(self): """ A user PATCHes their own profile """ with mute_signals(post_save): ProfileFactory.create(user=self.user1, filled_out=False, agreed_to_terms_of_service=False) self.client.force_login(self.user1) with mute_signals(post_save): new_profile = ProfileFactory.create(filled_out=False) new_profile.user.social_auth.create( provider=EdxOrgOAuth2.name, uid="{}_edx".format(new_profile.user.username) ) patch_data = ProfileSerializer(new_profile).data del patch_data['image'] resp = self.client.patch(self.url1, content_type="application/json", data=json.dumps(patch_data)) assert resp.status_code == 200 old_profile = Profile.objects.get(user__username=self.user1.username) for key, value in patch_data.items(): field = ProfileSerializer().fields[key] if isinstance(field, (ListSerializer, SerializerMethodField, ReadOnlyField)) or field.read_only is True: # these fields are readonly continue elif isinstance(field, DateField): assert getattr(old_profile, key) == parse(value).date() else: assert getattr(old_profile, key) == value
def get_serializer_fields(self, path, method, view): """ Return a list of `coreapi.Field` instances corresponding to any request body input, as determined by the serializer class. """ if method not in ('PUT', 'PATCH', 'POST'): return [] if not hasattr(view, 'get_serializer'): return [] serializer = view.get_serializer() if isinstance(serializer, serializers.ListSerializer): return [ coreapi.Field( name='data', location='body', required=True, schema=coreschema.Array() ) ] if not isinstance(serializer, serializers.Serializer): return [] fields = [] for field in serializer.fields.values(): if field.read_only or isinstance(field, serializers.HiddenField): continue required = field.required and method != 'PATCH' field = coreapi.Field( name=field.field_name, location='form', required=required, schema=field_to_schema(field) ) fields.append(field) return fields
def __init__(self, field, parent, only_fields, include_fields): self.field = field self.parent = parent self.is_many = isinstance(field, serializers.ListSerializer) and isinstance(field.child, serializers.Serializer) self.has_context = isinstance(field, serializers.Serializer) or self.is_many if self.has_context: self.old_context = None self.only_fields = self.filter_fields(field.field_name, only_fields) self.include_fields = self.filter_fields(field.field_name, include_fields) self.on_exit_delete_fields = False self.on_exit_delete_include_fields = False self.old_fields = None self.old_include_fields = None
def get(self, request): """ Returns all the suggestions. """ serializer = rf_serializers.ListSerializer(models.Suggestion.objects.all(), child=serializers.SimpleSuggestionSerializer()) return Response(serializer.data)
def list(self, request): """ Returns all the beer styles. Do not show suggested beer styles by default. """ styles = models.Style.objects.all() if request.auth is None: styles = styles[:settings.UNAUTHENTICATED_RESULTS_COUNT] serializer = rf_serializers.ListSerializer( styles, child=serializers.SimpleStyleSerializer() ) return Response(serializer.data)
def list(self, request): """ Returns all approved yeast types by default. """ serializer = rf_serializers.ListSerializer( models.YeastType.objects.all() if request.auth is not None else models.YeastType.objects.all()[:settings.UNAUTHENTICATED_RESULTS_COUNT], child=serializers.YeastTypeSerializer() ) return Response(serializer.data)
def list(self, request): """ Returns all approved yeast strains. """ serializer = rf_serializers.ListSerializer( models.Yeast.objects.all() if request.auth is not None else models.Yeast.objects.all()[:settings.UNAUTHENTICATED_RESULTS_COUNT], child=serializers.SimpleYeastSerializer() ) return Response(serializer.data)
def list(self, request): """ Returns all countries in the system. """ serializer = rf_serializers.ListSerializer( models.CountryCode.objects.all() if request.auth is not None else models.CountryCode.objects.all()[:settings.UNAUTHENTICATED_RESULTS_COUNT], child=serializers.CountryCodeSerializer() ) return Response(serializer.data)
def list(self, request): """ Returns all hops that are approved in the system by default. """ serializer = rf_serializers.ListSerializer( models.Hop.objects.all() if request.auth is not None else models.Hop.objects.all()[:settings.UNAUTHENTICATED_RESULTS_COUNT], child=serializers.SimpleHopSerializer() ) return Response(serializer.data)
def list(self, request): """ Returns all fermentable types in the system """ fermentable_types = fermentable_types = models.FermentableType.objects.filter(is_active=True) # if the user is authenticated return all valid fermentable types if request.auth is None: fermentable_types = fermentable_types[:settings.UNAUTHENTICATED_RESULTS_COUNT] serializer = rf_serializers.ListSerializer( fermentable_types, child=serializers.FermentableType() ) return Response(serializer.data)
def list(self, request): """ Returns all fermentables that are approved in the system by default. """ fermentables = models.Fermentable.objects.filter(is_active=True) if request.auth is None: fermentables = fermentables[:settings.UNAUTHENTICATED_RESULTS_COUNT] serializer = rf_serializers.ListSerializer( fermentables, child=serializers.Fermentable() ) return Response(serializer.data)
def test_build_serializer_with_depth(self): class VehicleSerializer(ModelSerializer): class Meta: model = Vehicle session = session fields = '__all__' depth = 3 serializer = VehicleSerializer() self.assertEqual(len(serializer.fields), 11) self.assertEqual( set(serializer.fields.keys()), { Vehicle.created_at.key, Vehicle.engine.key, Vehicle.id.key, Vehicle.name.key, Vehicle.options.key, Vehicle.other.key, Vehicle.owner.key, Vehicle.paint.key, Vehicle.type.key, Vehicle.is_used.key, 'url' } ) engine_serializer = serializer.fields['engine'] self.assertEqual(len(engine_serializer.fields), 4) self.assertEqual(set(engine_serializer.fields.keys()), {'type_', 'displacement', 'fuel_type', 'cylinders'}) owner_serializer = serializer.fields['owner'] self.assertEqual(len(owner_serializer.fields), 3) self.assertEqual(set(owner_serializer.fields.keys()), {'id', 'first_name', 'last_name'}) self.assertEqual(set(f.label for f in owner_serializer.fields.values()), {'Id', 'First name', 'Last name'}) options_serializer = serializer.fields['options'] self.assertTrue(options_serializer.many) self.assertIsInstance(options_serializer, ListSerializer) option_serializer = options_serializer.child self.assertEqual(len(option_serializer.fields), 2) self.assertEqual(set(option_serializer.fields.keys()), {'id', 'name'})
def perform_update(self, instance, validated_data, errors): """ The main nested update logic implementation using nested fields and serializer """ for field in self._writable_fields: if field.field_name not in validated_data: continue try: value = validated_data.get(field.field_name) if isinstance(field, BaseSerializer): child_instance = getattr(instance, field.field_name, None) child_instance = field.get_object(value, child_instance) if child_instance and field.allow_nested_updates: value = field.perform_update(child_instance, value, errors) else: value = child_instance elif isinstance(field, ListSerializer) and isinstance(field.child, BaseSerializer): value = [] for item in validated_data.get(field.field_name): child_instance = field.child.get_object(item) if child_instance and (field.child.allow_create or field.child.allow_nested_updates): v = field.child.perform_update(child_instance, item, errors) else: v = child_instance if v: value.append(v) self.update_attribute(instance, field, value) except Exception as e: errors.setdefault(field.field_name, []).append(' '.join(e.args)) return instance
def many_init(cls, *args, **kwargs): kwargs['child'] = PageSerializer(*args, **kwargs) return ListSerializer(*args, **kwargs) # TODO: decide if we need this
def __init__(self, serializer): """ :param serializer: DjangoRestFramework Serializer """ if isinstance(serializer, ListSerializer): serializer = serializer.child self.serializer = serializer self.name = self._get_name() self.fields = self._collect_fields()
def get_raw_type_from_serializer_field(field): if isinstance(field, ListSerializer): return 'list' if isinstance(field, ModelSerializer): return 'model' # return smart_text(field.Meta.model.__name__).lower() name = field.__class__.__name__ if name.endswith('Field'): name = smart_text(name[:-5]) return smart_text(name).lower()
def set_field_property(self, segments, fields, property_key): if len(segments) == 1: field = fields.get(segments[0]) if field: setattr(field, property_key, True) return serializer = fields.get(segments[0]) fields = serializer.child.fields\ if type(serializer) is serializers.ListSerializer else\ serializer.fields return self.set_field_property( segments[1:], fields, property_key)
def field_to_schema(field): title = force_text(field.label) if field.label else '' description = force_text(field.help_text) if field.help_text else '' if isinstance(field, serializers.ListSerializer): child_schema = field_to_schema(field.child) return coreschema.Array( items=child_schema, title=title, description=description ) elif isinstance(field, serializers.Serializer): return coreschema.Object( properties=OrderedDict([ (key, field_to_schema(value)) for key, value in field.fields.items() ]), title=title, description=description ) elif isinstance(field, serializers.ManyRelatedField): return coreschema.Array( items=coreschema.String(), title=title, description=description ) elif isinstance(field, serializers.RelatedField): return coreschema.String(title=title, description=description) elif isinstance(field, serializers.MultipleChoiceField): return coreschema.Array( items=coreschema.Enum(enum=list(field.choices.keys())), title=title, description=description ) elif isinstance(field, serializers.ChoiceField): return coreschema.Enum( enum=list(field.choices.keys()), title=title, description=description ) elif isinstance(field, serializers.BooleanField): return coreschema.Boolean(title=title, description=description) elif isinstance(field, (serializers.DecimalField, serializers.FloatField)): return coreschema.Number(title=title, description=description) elif isinstance(field, serializers.IntegerField): return coreschema.Integer(title=title, description=description) if field.style.get('base_template') == 'textarea.html': return coreschema.String( title=title, description=description, format='textarea' ) return coreschema.String(title=title, description=description)
def run_autooptimization_discovery(serializer, prefix, select_related_set, prefetch_related_set, is_prefetch, only_fields, include_fields): if not hasattr(serializer, "Meta") or not hasattr(serializer.Meta, "model"): return model_class = serializer.Meta.model if hasattr(serializer, "get_on_demand_fields"): on_demand_fields = serializer.get_on_demand_fields() else: on_demand_fields = set() def filter_field_name(field_name, fields_to_serialize): if fields_to_serialize is not None: return ContextPassing.filter_fields(field_name, fields_to_serialize) return None for field_name, field in serializer.fields.items(): if hasattr(serializer, "check_if_needs_serialization"): if not serializer.check_if_needs_serialization(field_name, only_fields, include_fields, on_demand_fields): continue if isinstance(field, ListSerializer): if "." not in field.source and hasattr(model_class, field.source): model_field = getattr(model_class, field.source) if check_if_prefetch_object(model_field): prefetch_related_set.add(prefix + field.source) run_autooptimization_discovery(field.child, prefix + field.source + "__", select_related_set, prefetch_related_set, True, filter_field_name(field_name, only_fields), filter_field_name(field_name, include_fields)) elif isinstance(field, Serializer): if "." not in field.source and hasattr(model_class, field.source): model_field = getattr(model_class, field.source) if check_if_related_object(model_field): if is_prefetch: prefetch_related_set.add(prefix + field.source) else: select_related_set.add(prefix + field.source) run_autooptimization_discovery(field, prefix + field.source + "__", select_related_set, prefetch_related_set, is_prefetch, filter_field_name(field_name, only_fields), filter_field_name(field_name, include_fields)) elif "." in field.source: field_name = field.source.split(".", 1)[0] if hasattr(model_class, field_name): model_field = getattr(model_class, field_name) if check_if_related_object(model_field): select_related_set.add(prefix + field_name)
def _extract_field_info(self, model, field_name, field, fields, relationships, adapter, target_app, allow_recursion=False): if field_name == 'id': return None field_item = { 'name': field_name, 'type': adapter.field_type_mapping[field.__class__.__name__] } if isinstance(field, PrimaryKeyRelatedField) or isinstance(field, ManyRelatedField) \ or isinstance(field, ModelSerializer): if model is None: field_item['related_model'] = field.queryset.model._meta.model_name.lower() field_item['app'] = target_app if target_app is not None else \ field.queryset.model._meta.app_label.lower() relationships.append(field_item) else: model_field = model._meta.get_field(field_name) field_item['related_model'] = model_field.related_model._meta.model_name.lower() field_item['app'] = target_app if target_app is not None else \ model_field.related_model._meta.app_label.lower() relationships.append(field_item) if hasattr(model_field, 'field'): field_item['inverse'] = model_field.field.name elif hasattr(model_field, 'remote_field') and \ getattr(model_field.remote_field, 'related_name', None) is not None: field_item['inverse'] = model_field.remote_field.related_name if field_item.get('inverse', '-')[-1] == '+': field_item.pop('inverse') if isinstance(field, ModelSerializer): if hasattr(field, 'many') and field.many: field_item['type'] = adapter.field_type_mapping['ManyRelatedField'] else: field_item['type'] = adapter.field_type_mapping['PrimaryKeyRelatedField'] elif isinstance(field, ModelSerializer): field_item['related_model'] = field.queryset.model._meta.model_name.lower() field_item['app'] = target_app if target_app is not None else \ field.queryset.model._meta.app_label.lower() relationships.append(field_item) if field.many: field_item['type'] = adapter.field_type_mapping['ManyRelatedField'] else: field_item['type'] = adapter.field_type_mapping['PrimaryKeyRelatedField'] elif isinstance(field, ListSerializer): child_rels = [] child_fields = [] self._extract_field_info(model, field_name, field.child, child_fields, child_rels, adapter, target_app) if len(child_rels) > 0: for item in child_rels: item['type'] = adapter.field_type_mapping['ManyRelatedField'] item.pop('inverse', None) relationships.append(item) else: field_item['type'] = adapter.field_type_mapping('ListField') fields.append(field_item) else: fields.append(field_item) return field_item
def get_serializer_fields(self, path, method, view, version=None, method_func=None): """ Return a list of `coreapi.Field` instances corresponding to any request body input, as determined by the serializer class. """ if method not in ('PUT', 'PATCH', 'POST'): return [] serializer_class = self.get_serializer_class(view, method_func) if not serializer_class: return [] serializer = serializer_class() if isinstance(serializer, serializers.ListSerializer): return [ Field( name='data', location='body', required=True, schema=coreschema.Array() ) ] if not isinstance(serializer, serializers.Serializer): return [] fields = [] for field in serializer.fields.values(): if field.read_only or isinstance(field, serializers.HiddenField): continue required = field.required and method != 'PATCH' # if the attribute ('help_text') of this field is a lazy translation object, force it to generate a string description = str(field.help_text) if isinstance(field.help_text, Promise) else field.help_text fallback_schema = self.fallback_schema_from_field(field) field = Field( name=field.field_name, location='form', required=required, schema=fallback_schema if fallback_schema else field_to_schema(field), description=description, ) fields.append(field) return fields
def update(self, instance, validated_data): reverse_relations = OrderedDict() relations = OrderedDict() # Sort fields by create priority fields = self.get_sorted_by_create_priority(self.fields) # Remove related fields from validated data for future manipulations for field_name, field in fields.items(): if field.read_only: continue if isinstance(field, serializers.ListSerializer): if isinstance(field.child, serializers.ModelSerializer): if validated_data.pop(field.source, None) is None: # Skip field if field is not required or null allowed continue reverse_relations[field_name] = field.child if isinstance(field, serializers.ModelSerializer): if validated_data.pop(field.source, None) is None: # Skip field if field is not required or null allowed continue relations[field_name] = field nested_related_kwarg = getattr(self.Meta, 'nested_related_kwarg', None) if reverse_relations: assert nested_related_kwarg, \ "Set `nested_related_kwarg` in Meta options for use nested " \ "create feature" if relations: raise NotImplementedError("NestedUpdateMixin not provide update " "for direct relations") # Update instance instance = super(NestedUpdateMixin, self).update( instance, validated_data) if reverse_relations: self.update_reverse_relations(instance, reverse_relations) self.delete_reverse_relations_if_need(instance, reverse_relations) return instance
def create(self, validated_data): reverse_relations = OrderedDict() relations = OrderedDict() # Sort fields by create priority fields = self.get_sorted_by_create_priority(self.fields) # Remove related fields from validated data for future manipulations for field_name, field in fields.items(): if field.read_only or field_name in self._ignore_creation: continue if isinstance(field, serializers.ListSerializer): if isinstance(field.child, serializers.ModelSerializer): if validated_data.pop(field.source, None) is None: # Skip field if field is not required or null allowed continue reverse_relations[field_name] = field.child if isinstance(field, serializers.ModelSerializer): if validated_data.pop(field.source, None) is None: # Skip field if field is not required or null allowed continue relations[field_name] = field nested_related_kwarg = getattr(self.Meta, 'nested_related_kwarg', None) if reverse_relations: assert nested_related_kwarg, \ "Set `nested_related_kwarg` in Meta options for use nested " \ "create feature" # Create direct relations (foreign key) for field_name, field in relations.items(): serializer = self._get_new_serializer( field, data=self.initial_data[field_name]) serializer.is_valid(raise_exception=True) validated_data[field.source] = serializer.save() # Create instance instance = super(NestedCreateMixin, self).create(validated_data) if reverse_relations: self.create_reverse_relations(instance, reverse_relations) return instance