我们从Python开源项目中,提取了以下9个代码示例,用于说明如何使用sqlalchemy.ext.declarative.DeclarativeMeta()。
def default(self, obj): fields = {} if isinstance(obj.__class__, DeclarativeMeta): # for sqlalchemy orm for key, value in obj.__dict__.items(): if key.startswith('_'): continue if(isinstance(value, list)): inner_jsonObj = [] for _row in value: inner_jsonObj.append( self.default(_row)) fields[key] = inner_jsonObj else: fields[key] = self.type_convert(value) else: # for raw sql for field in obj.keys(): fields[field] = self.type_convert( str(getattr(obj, field))) return fields
def decode(obj): if obj and isinstance(obj.__class__, DeclarativeMeta): fields = {} for field in [x for x in dir(obj) if not x.startswith('_') and not x.endswith('_') and x != 'metadata']: data = obj.__getattribute__(field) if isinstance(data, datetime.datetime): fields[field] = data.timestamp() elif isinstance(data, datetime.date): fields[field] = data.isoformat() elif isinstance(data, datetime.timedelta): fields[field] = (datetime.datetime.min + data).time().isoformat() elif isinstance(data, int) or isinstance(data, float) or isinstance(data, str): fields[field] = data elif isinstance(data, enum.Enum): fields[field] = data.value elif isinstance(data.__class__, DeclarativeMeta): fields[field] = AlchemyEncoder.decode(data) elif isinstance(data, list): fields[field] = [AlchemyEncoder.decode(d) for d in data] return fields else: return obj
def _sqlalchemy_model(): from sqlalchemy.ext.declarative import DeclarativeMeta from sqlalchemy.orm import sessionmaker return [sessionmaker, DeclarativeMeta]
def test_declarative_base(): """Test declarative_base().""" class MetaClass(DeclarativeMeta): pass metadata = MetaData() Model = declarative_base(metadata=metadata, metaclass=MetaClass) assert Model.__bases__[0] is ModelBase assert Model.metadata is metadata assert Model.metaclass is MetaClass
def build_history_class( cls: declarative.DeclarativeMeta, prop: T_PROPS, schema: str = None) -> nine.Type[TemporalProperty]: """build a sqlalchemy model for given prop""" class_name = "%s%s_%s" % (cls.__name__, 'History', prop.key) table = build_history_table(cls, prop, schema) base_classes = ( TemporalProperty, declarative.declarative_base(metadata=table.metadata), ) class_attrs = { '__table__': table, 'entity': orm.relationship( lambda: cls, backref=orm.backref('%s_history' % prop.key, lazy='dynamic') ), } if isinstance(prop, orm.RelationshipProperty): class_attrs[prop.key] = orm.relationship( prop.argument, lazy='noload') model = type(class_name, base_classes, class_attrs) return model
def compile_zdb_score(element, compiler, **kw): clauses = list(element.clauses) if len(clauses) != 1: raise ValueError("Incorrect params") c = clauses[0] if isinstance(c, BindParameter) and isinstance(c.value, DeclarativeMeta): return "zdb_score(\'%s\', %s.ctid)" % (c.value.__tablename__, c.value.__tablename__) raise ValueError("Incorrect param")
def destroy(session, data, model_class=None, synchronize_session=False): """Delete bulk `data`. The `data` argument can be any of the following: - Single instance of `model_class` - List of `model_class` instances - Primary key value (single value or ``tuple`` of values for composite keys) - List of primary key values. - Dict containing primary key(s) mapping - List of dicts with primary key(s) mappings If a non-`model_class` instances are passed in, then `model_class` is required to know which table to delete from. Args: session (Session): SQLAlchemy session object. data (mixed): Data to delete from database. synchronize_session (bool|str): Argument passed to ``Query.delete``. Returns: int: Number of deleted records. """ if not isinstance(data, list): data = [data] valid_model_class = isinstance(model_class, DeclarativeMeta) mapped_data = defaultdict(list) for idx, item in enumerate(data): item_class = type(item) if not isinstance(item_class, DeclarativeMeta) and valid_model_class: class_ = model_class else: class_ = item_class if not isinstance(class_, DeclarativeMeta): raise TypeError('Type of value given to destory() function is not ' 'a valid SQLALchemy declarative class and/or ' 'model class argument is not valid. ' 'Item with index {0} and with value "{1}" is ' 'an instance of "{2}" and model class is {3}.' .format(idx, item, type(item), model_class)) mapped_data[class_].append(item) delete_count = 0 with transaction(session): for model_class, data in iteritems(mapped_data): count = (session.query(model_class) .filter(primary_key_filter(data, model_class)) .options(orm.lazyload('*')) .delete(synchronize_session=synchronize_session)) delete_count += count return delete_count
def build_history_table( cls: declarative.DeclarativeMeta, prop: T_PROPS, schema: str = None) -> sa.Table: """build a sql alchemy table for given prop""" if isinstance(prop, orm.RelationshipProperty): columns = [util.copy_column(column) for column in prop.local_columns] else: columns = [util.copy_column(column) for column in prop.columns] local_table = cls.__table__ table_name = util.truncate_identifier( _generate_history_table_name(local_table, columns) ) # Build the foreign key(s), specifically adding an index since we may use # a casted foreign key in our constraints. See _exclusion_in_uuid entity_foreign_keys = list(util.foreign_key_to(local_table, index=True)) entity_constraints = [ _exclusion_in(fk.type, fk.key) for fk in entity_foreign_keys ] constraints = [ sa.Index( util.truncate_identifier('%s_effective_idx' % table_name), 'effective', postgresql_using='gist' ), sap.ExcludeConstraint( *itertools.chain(entity_constraints, [('vclock', '&&')]), name=util.truncate_identifier('%s_excl_vclock' % table_name) ), sap.ExcludeConstraint( *itertools.chain(entity_constraints, [('effective', '&&')]), name=util.truncate_identifier('%s_excl_effective' % table_name) ), ] return sa.Table( table_name, local_table.metadata, sa.Column('id', sap.UUID(as_uuid=True), default=uuid.uuid4, primary_key=True), sa.Column('effective', sap.TSTZRANGE, default=util.effective_now, nullable=False), sa.Column('vclock', sap.INT4RANGE, nullable=False), *itertools.chain(entity_foreign_keys, columns, constraints), schema=schema or local_table.schema, keep_existing=True ) # memoization ftw
def compile_zdb_query(element, compiler, **kw): query = [] tables = set() format_args = [] limit = "" for i, c in enumerate(element.clauses): add_to_query = True if isinstance(c, BinaryExpression): tables.add(c.left.table.name) elif isinstance(c, BindParameter): if isinstance(c.value, str): pass elif isinstance(c.value, DeclarativeMeta): if i > 0: raise ValueError("Table can be specified only as first param") tables.add(c.value.__tablename__) add_to_query = False elif isinstance(c, BooleanClauseList): pass elif isinstance(c, Column): pass else: raise ValueError("Unsupported filter") if add_to_query: query.append(compile_clause(c, compiler, tables, format_args)) if not tables: raise ValueError("No filters passed") elif len(tables) > 1: raise ValueError("Different tables passed") else: table = tables.pop() if hasattr(element, "_zdb_order_by") and isinstance(element._zdb_order_by, (UnaryExpression, ZdbScore)): limit = compile_limit(order_by=element._zdb_order_by, offset=element._zdb_offset, limit=element._zdb_limit) sql = "zdb(\'%s\', ctid) ==> " % table if format_args and isinstance(format_args, list): sql += "\'%sformat(\'%s\', %s)\'" % ( limit, " and ".join(query), ", ".join(format_args) ) else: sql += "\'%s%s\'" % ( limit, " and ".join(query)) return sql