我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用sqlalchemy.orm.relationship()。
def task_association(cls): name = cls.__name__ discriminator = name.lower() assoc_cls = type('%sTaskAssociation' % name, (TaskAssociation, ), dict( __tablename__=None, __mapper_args__ = { 'polymorphic_identity' : discriminator } ) ) cls.tasks = association_proxy( 'task_association', 'tasks', creator = lambda tasks: assoc_cls(tasks=tasks) ) return relationship(assoc_cls, backref=backref('parent', uselist=False))
def test_secondary_joins_with_custom_primary_join_conditions_are_not_supported(self): mapper(self.classes.A, self.tables.a) mapper(self.classes.B, self.tables.b, properties={ 'a': relationship( self.classes.A, secondary=self.tables.a_to_b, primaryjoin=and_(self.tables.b.c.id == self.tables.a_to_b.c.b_id, self.tables.b.c.id > 10), lazy="bulk", ) }) exception = None try: configure_mappers() except UnsupportedRelationError as e: exception = e assert exception.args[0] == ( 'BulkLazyLoader B.a: ' 'Only simple relations on 1 primary key and without custom joins are supported' )
def test_secondary_joins_with_custom_secondary_join_conditions_are_not_supported(self): mapper(self.classes.A, self.tables.a) mapper(self.classes.B, self.tables.b, properties={ 'a': relationship( self.classes.A, secondary=self.tables.a_to_b, secondaryjoin=and_(self.tables.a.c.id == self.tables.a_to_b.c.a_id, self.tables.a.c.id > 10), lazy="bulk", ) }) exception = None try: configure_mappers() except UnsupportedRelationError as e: exception = e assert exception.args[0] == ( 'BulkLazyLoader B.a: ' 'Only simple relations on 1 primary key and without custom joins are supported' )
def setup_mappers(cls): User, users = cls.classes.User, cls.tables.users UserInfo, user_infos = cls.classes.UserInfo, cls.tables.user_infos Address, addresses = cls.classes.Address, cls.tables.addresses Thing, things = cls.classes.Thing, cls.tables.things mapper(User, users, properties={ 'addresses': relationship( Address, backref=backref('user', lazy="bulk"), lazy="bulk", order_by=[desc(addresses.c.email_address)] ), 'children': relationship(User, backref=backref('parent', remote_side=[users.c.id], lazy="bulk"), lazy="bulk"), 'user_info': relationship(UserInfo, lazy="bulk", backref=backref('user', lazy="bulk"), uselist=False), 'things': relationship(Thing, secondary=cls.tables.user_to_things, lazy="bulk"), }) mapper(Address, addresses) mapper(UserInfo, user_infos) mapper(Thing, things, properties={ 'users': relationship(User, secondary=cls.tables.user_to_things, lazy="bulk"), }) configure_mappers()
def __new__(cls, mapper_to_bookmark): class_attr = {} class_attr['id'] = Column(Integer, primary_key=True) class_attr['tagwords'] = relationship("TagWord", backref='tag') # list of TagWord instances class_attr['parents'] = relationship('Tag', secondary=tag_relationship, primaryjoin=tag_relationship.c.tag_id == class_attr['id'], secondaryjoin=tag_relationship.c.tag_parent_id == class_attr['id'], backref="children") target_class_name = mapper_to_bookmark.__name__ target_name = target_class_name.lower().split('.')[-1] # 'filename' usually class_attr['target_class_name'] = target_class_name class_attr['target_name'] = target_name class_attr['__init__'] = init class_attr['__repr__'] = display class_attr['construct'] = construct # @classmethod class_attr['tag'] = build_tag # @property class_attr[target_name+'s'] = tag_targets # @hybrid_property class_attr['words'] = words # @hybrid_property return type('Tag', (BASE,), class_attr)
def _memoized_attr_info(self): """Info dictionary associated with the object, allowing user-defined data to be associated with this :class:`.InspectionAttr`. The dictionary is generated when first accessed. Alternatively, it can be specified as a constructor argument to the :func:`.column_property`, :func:`.relationship`, or :func:`.composite` functions. .. versionadded:: 0.8 Added support for .info to all :class:`.MapperProperty` subclasses. .. versionchanged:: 1.0.0 :attr:`.MapperProperty.info` is also available on extension types via the :attr:`.InspectionAttrInfo.info` attribute, so that it can apply to a wider variety of ORM and extension constructs. .. seealso:: :attr:`.QueryableAttribute.info` :attr:`.SchemaItem.info` """ return {}
def url(self): return '{}/{}/{}'.format(osm_api_base, self.osm_type, self.osm_id) # class ItemCandidateTag(Base): # __tablename__ = 'item_candidate_tag' # __table_args__ = ( # ForeignKeyConstraint(['item_id', 'osm_id', 'osm_type'], # [ItemCandidate.item_id, # ItemCandidate.osm_id, # ItemCandidate.osm_type]), # ) # # item_id = Column(Integer, primary_key=True) # osm_id = Column(BigInteger, primary_key=True) # osm_type = Column(osm_type_enum, primary_key=True) # k = Column(String, primary_key=True) # v = Column(String, primary_key=True) # # item_candidate = relationship(ItemCandidate, # backref=backref('tag_table', lazy='dynamic'))
def info(self): """Info dictionary associated with the object, allowing user-defined data to be associated with this :class:`.MapperProperty`. The dictionary is generated when first accessed. Alternatively, it can be specified as a constructor argument to the :func:`.column_property`, :func:`.relationship`, or :func:`.composite` functions. .. versionadded:: 0.8 Added support for .info to all :class:`.MapperProperty` subclasses. .. seealso:: :attr:`.QueryableAttribute.info` :attr:`.SchemaItem.info` """ return {}
def serialize(self): return { 'id' : self.id, 'email': self.email, 'first_name': self.first_name, 'last_name': self.last_name, 'sex': self.sex, 'residence': self.residence, 'similarity': self.similarity, 'maternal_side': self.maternal_side, 'paternal_side': self.paternal_side, 'picture_url': self.picture_url, 'birth_year': self.birth_year, 'relationship': self.relationship, 'birthplace': self.birthplace, 'ancestry': self.ancestry } #Snp data schema
def foreign_key(other_table, nullable=False): """ Declare a foreign key property, which will also create a foreign key column in the table with the name of the property. By convention the property name should end in "_fk". You are required to explicitly create foreign keys in order to allow for one-to-one, one-to-many, and many-to-one relationships (but not for many-to-many relationships). If you do not do so, SQLAlchemy will fail to create the relationship property and raise an exception with a clear error message. You should normally not have to access this property directly, but instead use the associated relationship properties. *This utility method should only be used during class creation.* :param other_table: other table name :type other_table: basestring :param nullable: ``True`` to allow null values (meaning that there is no relationship) :type nullable: bool """ return Column(Integer, ForeignKey('{table}.id'.format(table=other_table), ondelete='CASCADE'), nullable=nullable)
def setup_relationships(self): """Set up the relations between the different SQL tables""" Probe.scripts = relationship('Script', order_by=Script.id, back_populates='probe', cascade='all, delete, delete-orphan') Probe.network_configs = relationship('NetworkConfig', order_by=NetworkConfig.id, back_populates='probe', cascade='all, delete, delete-orphan') User.probes = relationship('Probe', order_by=Probe.id, back_populates='user', cascade='all, delete, delete-orphan') User.databases = relationship('Database', order_by=Database.id, back_populates='user', cascade='all, delete, delete-orphan')
def gen_relation_models(): Base = declarative_base() class RelationTestModel(Base): __tablename__ = "sanity_check_test_2" id = Column(Integer, primary_key=True) class RelationTestModel2(Base): __tablename__ = "sanity_check_test_3" id = Column(Integer, primary_key=True) test_relationship_id = Column(ForeignKey("sanity_check_test_2.id")) test_relationship = relationship(RelationTestModel, primaryjoin=test_relationship_id == RelationTestModel.id) return Base, RelationTestModel, RelationTestModel2
def session_scoped(cls): cls.user_session_id = Column(Integer, ForeignKey('usersession.id', ondelete='CASCADE'), index=True) cls.user_session = relationship('UserSession', cascade='all, delete') @classmethod def for_current_session(cls, **kwargs): user_session = ExecutionContext.get_context().session return cls.for_session(user_session, **kwargs) cls.for_current_session = for_current_session @classmethod def for_session(cls, user_session, **kwargs): found = Session.query(cls).filter_by(user_session=user_session) if found.count() >= 1: return found.one() instance = cls(user_session=user_session, **kwargs) Session.add(instance) return instance cls.for_session = for_session return cls
def __init__(self, name): self.name = name # class UserWord(Base): # __tablename__ = 'user_word' # user_id = Column(Integer, ForeignKey('users.id'), primary_key=True) # word_id = Column(Integer, ForeignKey('words.id'), primary_key=True) # level = Column(String(50)) # # # bidirectional attribute/collection of "user"/"user_words" # user = relationship(User, # backref=backref("user_words", # cascade="all, delete-orphan") # ) # # # reference to the "Word" object # content = relationship("Word") # # def __init__(self, content=None, user=None, level=None): # self.user = user # self.content = content # self.level = level
def __repr__(self): return 'Word(%s)' % repr(self.content) # # # # class Word(Base): # __tablename__ = 'words' # id = Column(Integer, primary_key=True) # content = Column(String(64), unique=True, index=True) # rank = Column(Integer, index=True) # phonetic_symbol = Column(String(128)) # # words??notes??????? # notes = relationship('Note', backref='word', lazy='dynamic') # # # words??users????????user??? # # # ??????????????? # # synonym = Column(Text()) # def __init__(self, content): # self.content = content
def seed_identifier(cls): # @NoSelf '''returns data_identifier if the latter is not None, else net.sta.loc.cha by querying the relative channel and station''' # Needed note: To know what we are doing in 'sel' below, please look: # http://docs.sqlalchemy.org/en/latest/orm/extensions/hybrid.html#correlated-subquery-relationship-hybrid # Notes # - we use limit(1) cause we might get more than one # result. Regardless of why it happens (because we don't join or apply a distinct?) # it is relevant for us to get the first result which has the requested # network+station and location + channel strings # - the label(...) at the end makes all the difference. The doc is, as always, unclear # http://docs.sqlalchemy.org/en/latest/core/sqlelement.html#sqlalchemy.sql.expression.label dot = text("'.'") sel = select([concat(Station.network, dot, Station.station, dot, Channel.location, dot, Channel.channel)]).\ where((Channel.id == cls.channel_id) & (Station.id == Channel.station_id)).limit(1).\ label('seedidentifier') return case([(cls.data_identifier.isnot(None), cls.data_identifier)], else_=sel)
def _variable_mixin_aware_constructor(self, **kwargs): # The standard default for the underlying relationship for # variables sets it to None, which means it cannot directly be # used as a mappable collection. Cure the problem accordingly with # a different default. if isinstance(self, VariableMixin): kwargs.setdefault('variables', {}) return _declarative_constructor(self, **kwargs)
def gloss_reference(cls): return relationship( "GlossolaliaReference", foreign_keys=[getattr(cls, "gloss_reference_id")] )
def ad(cls): return relationship('Ad')
def test_multi_foreign_key_relations_are_not_supported(self): mapper(self.classes.MultiPk, self.tables.multi_pk) mapper(self.classes.MultiFk, self.tables.multi_fk, properties={ 'multi_pks': relationship(self.classes.MultiPk, lazy="bulk") }) exception = None try: configure_mappers() except UnsupportedRelationError as e: exception = e assert exception.args[0] == ( 'BulkLazyLoader MultiFk.multi_pks: ' 'Only simple relations on 1 primary key and without custom joins are supported' )
def __new__(cls, mapper_to_bookmark, mapper_to_bookmark_placeholder=False): future_class_attr = {} future_class_attr['id'] = Column(Integer, primary_key=True) future_class_attr['tag_rel'] = relationship("Tag", secondary=lambda: tagbookmarks_table, collection_class=set, backref=backref('bookmarks')) future_class_attr['tags'] = association_proxy('tag_rel', 'tag') target_class_name = mapper_to_bookmark.__name__ target_name = target_class_name.lower().split('.')[-1] # 'filename' usually future_class_attr[target_name+'_id'] = Column(Integer, ForeignKey(target_name+'.id'), unique=False, nullable=False) future_class_attr[target_name] = relationship(target_class_name, backref='bookmarks') future_class_attr['target_class_name'] = target_class_name future_class_attr['target_name'] = target_name if mapper_to_bookmark_placeholder: target_class_name_placeholder = mapper_to_bookmark_placeholder.__name__ target_name_placeholder = target_class_name_placeholder.lower().split('.')[-1] # byteoffset in the filename case future_class_attr[target_name_placeholder+'_id'] = Column(Integer, ForeignKey(target_name_placeholder+'.id'), unique=False, nullable=True) future_class_attr[target_name_placeholder] = relationship(target_class_name_placeholder, backref='bookmarks') future_class_attr['target_class_name_placeholder'] = target_class_name_placeholder future_class_attr['target_name_placeholder'] = target_name_placeholder future_class_attr['construct'] = construct future_class_attr['__repr__'] = bookmark_repr return type('Bookmark', (BASE,), future_class_attr)
def reference_col(tablename, nullable=False, pk_name='id', **kwargs): """Column that adds primary key foreign key reference. Usage: :: category_id = reference_col('category') category = relationship('Category', backref='categories') """ return db.Column( db.ForeignKey('{0}.{1}'.format(tablename, pk_name)), nullable=nullable, **kwargs)
def transformations(self, relationship="all"): """Get all the transformations of this info. Return a list of transformations involving this info. ``relationship`` can be "parent" (in which case only transformations where the info is the ``info_in`` are returned), "child" (in which case only transformations where the info is the ``info_out`` are returned) or ``all`` (in which case any transformations where the info is the ``info_out`` or the ``info_in`` are returned). The default is ``all`` """ if relationship not in ["all", "parent", "child"]: raise(ValueError( "You cannot get transformations of relationship {}" .format(relationship) + "Relationship can only be parent, child or all.")) if relationship == "all": return Transformation\ .query\ .filter(and_(Transformation.failed == false(), or_(Transformation.info_in == self, Transformation.info_out == self)))\ .all() if relationship == "parent": return Transformation\ .query\ .filter_by(info_in_id=self.id, failed=False)\ .all() if relationship == "child": return Transformation\ .query\ .filter_by(info_out_id=self.id, failed=False)\ .all()