Python sqlalchemy.ext.declarative 模块,DeclarativeMeta() 实例源码

我们从Python开源项目中,提取了以下9个代码示例,用于说明如何使用sqlalchemy.ext.declarative.DeclarativeMeta()

项目:xmusic-crawler    作者:rockers7414    | 项目源码 | 文件源码
def default(self, obj):
        fields = {}
        if isinstance(obj.__class__, DeclarativeMeta):
            # for sqlalchemy orm
            for key, value in obj.__dict__.items():
                if key.startswith('_'):
                    continue
                if(isinstance(value, list)):
                    inner_jsonObj = []
                    for _row in value:
                        inner_jsonObj.append(
                            self.default(_row))
                    fields[key] = inner_jsonObj
                else:
                    fields[key] = self.type_convert(value)
        else:
            # for raw sql
            for field in obj.keys():
                fields[field] = self.type_convert(
                    str(getattr(obj, field)))
        return fields
项目:AppServer    作者:skytoup    | 项目源码 | 文件源码
def decode(obj):
        if obj and isinstance(obj.__class__, DeclarativeMeta):
            fields = {}
            for field in [x for x in dir(obj) if not x.startswith('_') and not x.endswith('_') and x != 'metadata']:
                data = obj.__getattribute__(field)
                if isinstance(data, datetime.datetime):
                    fields[field] = data.timestamp()
                elif isinstance(data, datetime.date):
                    fields[field] = data.isoformat()
                elif isinstance(data, datetime.timedelta):
                    fields[field] = (datetime.datetime.min + data).time().isoformat()
                elif isinstance(data, int) or isinstance(data, float) or isinstance(data, str):
                    fields[field] = data
                elif isinstance(data, enum.Enum):
                    fields[field] = data.value
                elif isinstance(data.__class__, DeclarativeMeta):
                    fields[field] = AlchemyEncoder.decode(data)
                elif isinstance(data, list):
                    fields[field] = [AlchemyEncoder.decode(d) for d in data]
            return fields
        else:
            return obj
项目:wsgicli    作者:kobinpy    | 项目源码 | 文件源码
def _sqlalchemy_model():
    from sqlalchemy.ext.declarative import DeclarativeMeta
    from sqlalchemy.orm import sessionmaker
    return [sessionmaker, DeclarativeMeta]
项目:sqlservice    作者:dgilland    | 项目源码 | 文件源码
def test_declarative_base():
    """Test declarative_base()."""
    class MetaClass(DeclarativeMeta):
        pass

    metadata = MetaData()

    Model = declarative_base(metadata=metadata, metaclass=MetaClass)

    assert Model.__bases__[0] is ModelBase
    assert Model.metadata is metadata
    assert Model.metaclass is MetaClass
项目:temporal-sqlalchemy    作者:CloverHealth    | 项目源码 | 文件源码
def build_history_class(
        cls: declarative.DeclarativeMeta,
        prop: T_PROPS,
        schema: str = None) -> nine.Type[TemporalProperty]:
    """build a sqlalchemy model for given prop"""
    class_name = "%s%s_%s" % (cls.__name__, 'History', prop.key)
    table = build_history_table(cls, prop, schema)
    base_classes = (
        TemporalProperty,
        declarative.declarative_base(metadata=table.metadata),
    )
    class_attrs = {
        '__table__': table,
        'entity': orm.relationship(
            lambda: cls,
            backref=orm.backref('%s_history' % prop.key, lazy='dynamic')
        ),
    }

    if isinstance(prop, orm.RelationshipProperty):
        class_attrs[prop.key] = orm.relationship(
            prop.argument,
            lazy='noload')

    model = type(class_name, base_classes, class_attrs)
    return model
项目:sqlalchemy_zdb    作者:skftn    | 项目源码 | 文件源码
def compile_zdb_score(element, compiler, **kw):
    clauses = list(element.clauses)
    if len(clauses) != 1:
        raise ValueError("Incorrect params")

    c = clauses[0]
    if isinstance(c, BindParameter) and isinstance(c.value, DeclarativeMeta):
        return "zdb_score(\'%s\', %s.ctid)" % (c.value.__tablename__, c.value.__tablename__)

    raise ValueError("Incorrect param")
项目:sqlservice    作者:dgilland    | 项目源码 | 文件源码
def destroy(session, data, model_class=None, synchronize_session=False):
    """Delete bulk `data`.

    The `data` argument can be any of the following:

    - Single instance of `model_class`
    - List of `model_class` instances
    - Primary key value (single value or ``tuple`` of values for composite
      keys)
    - List of primary key values.
    - Dict containing primary key(s) mapping
    - List of dicts with primary key(s) mappings

    If a non-`model_class` instances are passed in, then `model_class` is
    required to know which table to delete from.

    Args:
        session (Session): SQLAlchemy session object.
        data (mixed): Data to delete from database.
        synchronize_session (bool|str): Argument passed to
            ``Query.delete``.

    Returns:
        int: Number of deleted records.
    """
    if not isinstance(data, list):
        data = [data]

    valid_model_class = isinstance(model_class, DeclarativeMeta)

    mapped_data = defaultdict(list)

    for idx, item in enumerate(data):
        item_class = type(item)

        if not isinstance(item_class, DeclarativeMeta) and valid_model_class:
            class_ = model_class
        else:
            class_ = item_class

        if not isinstance(class_, DeclarativeMeta):
            raise TypeError('Type of value given to destory() function is not '
                            'a valid SQLALchemy declarative class and/or '
                            'model class argument is not valid. '
                            'Item with index {0} and with value "{1}" is '
                            'an instance of "{2}" and model class is {3}.'
                            .format(idx, item, type(item), model_class))

        mapped_data[class_].append(item)

    delete_count = 0

    with transaction(session):
        for model_class, data in iteritems(mapped_data):
            count = (session.query(model_class)
                     .filter(primary_key_filter(data, model_class))
                     .options(orm.lazyload('*'))
                     .delete(synchronize_session=synchronize_session))
            delete_count += count

    return delete_count
项目:temporal-sqlalchemy    作者:CloverHealth    | 项目源码 | 文件源码
def build_history_table(
        cls: declarative.DeclarativeMeta,
        prop: T_PROPS,
        schema: str = None) -> sa.Table:
    """build a sql alchemy table for given prop"""
    if isinstance(prop, orm.RelationshipProperty):
        columns = [util.copy_column(column) for column in prop.local_columns]
    else:
        columns = [util.copy_column(column) for column in prop.columns]

    local_table = cls.__table__
    table_name = util.truncate_identifier(
        _generate_history_table_name(local_table, columns)
    )
    # Build the foreign key(s), specifically adding an index since we may use
    # a casted foreign key in our constraints. See _exclusion_in_uuid
    entity_foreign_keys = list(util.foreign_key_to(local_table, index=True))
    entity_constraints = [
        _exclusion_in(fk.type, fk.key)
        for fk in entity_foreign_keys
    ]

    constraints = [
        sa.Index(
            util.truncate_identifier('%s_effective_idx' % table_name),
            'effective',
            postgresql_using='gist'
        ),
        sap.ExcludeConstraint(
            *itertools.chain(entity_constraints, [('vclock', '&&')]),
            name=util.truncate_identifier('%s_excl_vclock' % table_name)
        ),
        sap.ExcludeConstraint(
            *itertools.chain(entity_constraints, [('effective', '&&')]),
            name=util.truncate_identifier('%s_excl_effective' % table_name)
        ),
    ]

    return sa.Table(
        table_name,
        local_table.metadata,
        sa.Column('id',
                  sap.UUID(as_uuid=True),
                  default=uuid.uuid4,
                  primary_key=True),
        sa.Column('effective',
                  sap.TSTZRANGE,
                  default=util.effective_now,
                  nullable=False),
        sa.Column('vclock', sap.INT4RANGE, nullable=False),
        *itertools.chain(entity_foreign_keys, columns, constraints),
        schema=schema or local_table.schema,
        keep_existing=True
    )  # memoization ftw
项目:sqlalchemy_zdb    作者:skftn    | 项目源码 | 文件源码
def compile_zdb_query(element, compiler, **kw):
    query = []
    tables = set()
    format_args = []
    limit = ""

    for i, c in enumerate(element.clauses):
        add_to_query = True

        if isinstance(c, BinaryExpression):
            tables.add(c.left.table.name)
        elif isinstance(c, BindParameter):
            if isinstance(c.value, str):
                pass
            elif isinstance(c.value, DeclarativeMeta):
                if i > 0:
                    raise ValueError("Table can be specified only as first param")
                tables.add(c.value.__tablename__)
                add_to_query = False
        elif isinstance(c, BooleanClauseList):
            pass
        elif isinstance(c, Column):
            pass
        else:
            raise ValueError("Unsupported filter")

        if add_to_query:
            query.append(compile_clause(c, compiler, tables, format_args))

    if not tables:
        raise ValueError("No filters passed")
    elif len(tables) > 1:
        raise ValueError("Different tables passed")
    else:
        table = tables.pop()

    if hasattr(element, "_zdb_order_by") and isinstance(element._zdb_order_by, (UnaryExpression, ZdbScore)):
        limit = compile_limit(order_by=element._zdb_order_by,
                              offset=element._zdb_offset,
                              limit=element._zdb_limit)

    sql = "zdb(\'%s\', ctid) ==> " % table
    if format_args and isinstance(format_args, list):
        sql += "\'%sformat(\'%s\', %s)\'" % (
            limit,
            " and ".join(query),
            ", ".join(format_args)
        )
    else:
        sql += "\'%s%s\'" % (
            limit,
            " and ".join(query))
    return sql