Python marshmallow.fields 模块,Nested() 实例源码


项目:QualquerMerdaAPI    作者:tiagovizoto    | 项目源码 | 文件源码
def dumps(self, obj, many=None, update_fields=True, *args, **kwargs):
        """Same as :meth:`dump`, except return a JSON-encoded string.

        :param obj: The object to serialize.
        :param bool many: Whether to serialize `obj` as a collection. If `None`, the value
            for `self.many` is used.
        :param bool update_fields: Whether to update the schema's field classes. Typically
            set to `True`, but may be `False` when serializing a homogenous collection.
            This parameter is used by `fields.Nested` to avoid multiple updates.
        :return: A tuple of the form (``data``, ``errors``)
        :rtype: `MarshalResult`, a `collections.namedtuple`

        .. versionadded:: 1.0.0
        deserialized, errors = self.dump(obj, many=many, update_fields=update_fields)
        ret = self.opts.json_module.dumps(deserialized, *args, **kwargs)
        return MarshalResult(ret, errors)
项目:umongo    作者:Scille    | 项目源码 | 文件源码
def _deserialize(self, value, attr, data):
        embedded_document_cls = self.embedded_document_cls
        if isinstance(value, embedded_document_cls):
            return value
        # Handle inheritance deserialization here using `cls` field as hint
        if embedded_document_cls.opts.offspring and isinstance(value, dict) and 'cls' in value:
            to_use_cls_name = value.pop('cls')
            if not any(o for o in embedded_document_cls.opts.offspring
                       if o.__name__ == to_use_cls_name):
                raise ValidationError(_('Unknown document `{document}`.').format(
                to_use_cls = embedded_document_cls.opts.instance.retrieve_embedded_document(
            except NotRegisteredDocumentError as e:
                raise ValidationError(str(e))
            return to_use_cls(**value)
            # `Nested._deserialize` calls schema.load without partial=True
            data, errors = self.schema.load(value, partial=True)
            if errors:
                raise ValidationError(errors, data=data)
            return self._deserialize_from_mongo(data)
项目:microcosm-flask    作者:globality-corp    | 项目源码 | 文件源码
def build_parameter(field):
    Build a parameter from a marshmallow field.


    if hasattr(field, SWAGGER_TYPE):
        parameter = resolve_tagged_field(field)
    elif getattr(field, "enum", None):
        parameter = resolve_enum_field(field)
    elif getattr(field, 'as_string', None):
        parameter = resolve_numeric_string_field(field)
    elif isinstance(field, TimestampField):
        parameter = resolve_timestamp_field(field)
        parameter = resolve_default_for_field(field)

    if field.metadata.get("description"):
        parameter["description"] = field.metadata["description"]

    if field.default:
        parameter["default"] = field.default

    # nested
    if isinstance(field, fields.Nested):
        parameter["$ref"] = "#/definitions/{}".format(type_name(name_for(field.schema)))

    # arrays
    if isinstance(field, fields.List):
        parameter["items"] = build_parameter(field.container)

    return parameter
项目:microcosm-flask    作者:globality-corp    | 项目源码 | 文件源码
def iter_schemas(marshmallow_schema):
    Build zero or more JSON schemas for a marshmallow schema.

    Generates: name, schema pairs.

    if not marshmallow_schema:

    base_schema = build_schema(marshmallow_schema)
    base_schema_name = type_name(name_for(marshmallow_schema))
    yield base_schema_name, base_schema

    for name, field in iter_fields(marshmallow_schema):
        if isinstance(field, fields.Nested):
            nested_schema = build_schema(field.schema)
            nested_schema_name = type_name(name_for(field.schema))
            yield nested_schema_name, nested_schema
            for subname, subfield in iter_schemas(field.schema):
                yield subname, subfield
        if isinstance(field, fields.List) and isinstance(field.container, fields.Nested):
            nested_schema = build_schema(field.container.schema)
            nested_schema_name = type_name(name_for(field.container.schema))
            yield nested_schema_name, nested_schema
            for subname, subfield in iter_schemas(field.container.schema):
                yield subname, subfield
项目:lymph-schema    作者:deliveryhero    | 项目源码 | 文件源码
def dump_schema(schema_obj):
    json_schema = {
        "type": "object",
        "properties": {},
        "required": [],
    mapping = {v: k for k, v in schema_obj.TYPE_MAPPING.items()}
    mapping[fields.Email] = text_type
    mapping[fields.Dict] = dict
    mapping[fields.List] = list
    mapping[fields.Url] = text_type
    mapping[fields.LocalDateTime] = datetime.datetime

    for field_name, field in sorted(schema_obj.fields.items()):
        schema = None

        if field.__class__ in mapping:
            pytype = mapping[field.__class__]
            schema = _from_python_type(field, pytype)
        elif isinstance(field, fields.Nested):
            schema = _from_nested_schema(field)
        elif issubclass(field.__class__, fields.Field):
            for cls in mapping.keys():
                if issubclass(field.__class__, cls):
                    pytype = mapping[cls]
                    schema = _from_python_type(field, pytype)

        if schema is None:
            raise ValueError('unsupported field type %s' % field)

        field_name = field.dump_to or
        json_schema['properties'][field_name] = schema
        if field.required:
    return json_schema
项目:toasted-marshmallow    作者:lyft    | 项目源码 | 文件源码
def nested_circular_ref_schema():
    class NestedStringSchema(Schema):
        key = fields.String()
        me = fields.Nested('NestedStringSchema')
    return NestedStringSchema()
项目:toasted-marshmallow    作者:lyft    | 项目源码 | 文件源码
def nested_schema():
    class GrandChildSchema(Schema):
        bar = fields.String()
        raz = fields.String()

    class SubSchema(Schema):
        name = fields.String()
        value = fields.Nested(GrandChildSchema)

    class NestedSchema(Schema):
        key = fields.String()
        value = fields.Nested(SubSchema, only=('name', ''))
        values = fields.Nested(SubSchema, exclude=('value', ), many=True)
    return NestedSchema()
项目:twopi-flask-utils    作者:TwoPiCode    | 项目源码 | 文件源码
def paginated(basequery, schema_type, offset=None, limit=None):
    Paginate a sqlalchemy query

    :param basequery: The base query to be iterated upon
    :param schema_type: The ``Marshmallow`` schema to dump data with
    :param offset: (Optional) The offset into the data. If omitted it will 
                  be read from the query string in the ``?offset=`` argument. If
                  not query string, defaults to 0.
    :param limit: (Optional) The maximum results per page. If omitted it will 
                  be read from the query string in the ``?limit=`` argument. If
                  not query string, defaults to 20.

    :returns: The page's data in a namedtuple form ``(data=, errors=)``

    if offset is None or limit is None:
        args = parser.parse(pagination_args, request)
        if offset is None:
            offset = args['offset']

        if limit is None:
            limit = args['limit']

    data = {
        'offset': offset,
        'limit': limit,
        'items': basequery.limit(limit).offset(offset),
        'totalItems': basequery.count()

    class _Pagination(Schema):
        offset = fields.Integer()
        limit = fields.Integer()
        totalItems = fields.Integer()
        items = fields.Nested(schema_type, many=True)

    return _Pagination().dump(data)
项目:mybookshelf2    作者:izderadicka    | 项目源码 | 文件源码
def PartialSchemaFactory(schema_cls, **kwargs):
    schema = schema_cls(**kwargs)
    for field_name, field in schema.fields.items():
        if isinstance(field, fields.Nested):
            new_field = deepcopy(field)
            new_field.schema.partial = True
            schema.fields[field_name] = new_field
    return schema
项目:pydantic    作者:samuelcolvin    | 项目源码 | 文件源码
def __init__(self, allow_extra):
        class LocationSchema(Schema):
            latitude = fields.Float(allow_none=True)
            longitude = fields.Float(allow_none=True)

        class SkillSchema(Schema):
            subject = fields.Str(required=True)
            subject_id = fields.Integer(required=True)
            category = fields.Str(required=True)
            qual_level = fields.Str(required=True)
            qual_level_id = fields.Integer(required=True)
            qual_level_ranking = fields.Float(default=0)

        class Model(Schema):
            id = fields.Integer(required=True)
            client_name = fields.Str(validate=validate.Length(max=255), required=True)
            sort_index = fields.Float(required=True)
            #client_email = fields.Email()
            client_phone = fields.Str(validate=validate.Length(max=255), allow_none=True)

            location = LocationSchema()

            contractor = fields.Integer(validate=validate.Range(min=0), allow_none=True)
            upstream_http_referrer = fields.Str(validate=validate.Length(max=1023), allow_none=True)
            grecaptcha_response = fields.Str(validate=validate.Length(min=20, max=1000), required=True)
            last_updated = fields.DateTime(allow_none=True)
            skills = fields.Nested(SkillSchema(many=True))

        self.allow_extra = allow_extra  # unused
        self.schema = Model()
项目:umongo    作者:Scille    | 项目源码 | 文件源码
def as_marshmallow_field(self, params=None, mongo_world=False):
        # Overwrite default `as_marshmallow_field` to handle nesting
        kwargs = self._extract_marshmallow_field_params(mongo_world)
        if params:
            nested_params = params.pop('params')
            nested_params = None
        nested_ma_schema = self._embedded_document_cls.schema.as_marshmallow_schema(
            params=nested_params, mongo_world=mongo_world)
        return ma_fields.Nested(nested_ma_schema, **kwargs)