我们从Python开源项目中,提取了以下12个代码示例,用于说明如何使用sqlalchemy.orm.session.object_session()。
def get(self, *columns): # DEPRECATED: used for testing '''Gets the values in the relative columns''' qry = object_session(self).query(*columns) # .select_from(self.__class__) jointables = set(c.class_ for c in columns) if jointables: joins = [] model = self.__class__ for r in self.__mapper__.relationships.values(): if r.mapper.class_ in jointables: joins.append(getattr(model, r.key)) if joins: qry = qry.join(*joins) metadata = qry.filter(Segment.id == self.id).all() if len(metadata) == 1: return metadata[0] else: return metadata
def ensure_unique_name(self, key, name): session = object_session(self) if session is not None: model_class = self.__class__ name = make_unique_name(session, model_class, name) return name else: return name
def save_bunch(self, bunch): session = object_session(self) return serialize_scan_bunch(session, bunch, self.id)
def save(self, commit=True): """Save the record.""" db = object_session(self) db.add(self) if commit: db.commit() return self
def delete(self, commit=True, *args, **kwargs): """Remove the record from the database.""" db = object_session(self) db.add(self) db.delete(self) return commit and db.commit()
def get_bind(obj): """ Return the bind for given SQLAlchemy Engine / Connection / declarative model object. :param obj: SQLAlchemy Engine / Connection / declarative model object :: from sqlalchemy_utils import get_bind get_bind(session) # Connection object get_bind(user) """ if hasattr(obj, 'bind'): conn = obj.bind else: try: conn = object_session(obj).bind except UnmappedInstanceError: conn = obj if not hasattr(conn, 'execute'): raise TypeError( 'This method accepts only Session, Engine, Connection and ' 'declarative model objects.' ) return conn
def is_deleted(obj): return obj in sa.orm.object_session(obj).deleted
def save_inventory(station, downloaded_bytes_data): """Saves the inventory. Raises SqlAlchemyError if station's session is None, or (most likely IntegrityError) on failure """ station.inventory_xml = dumps_inv(downloaded_bytes_data) try: object_session(station).commit() except UnmappedInstanceError: raise except SQLAlchemyError: object_session(station).rollback() raise
def object_session(self): return object_session(self)
def orm_update_listener(mapper, connection, target): if getattr(target, '__history_table__', None): return session = object_session(target) if session.is_modified(target, include_collections=False): target.send_to_changes(connection, CrudOperation.UPDATE)
def convert(self, fitted=True, deconvoluted=True): precursor_information = self.precursor_information.convert( ) if self.precursor_information is not None else None session = object_session(self) conn = session.connection() if fitted: q = conn.execute(select([FittedPeak.__table__]).where( FittedPeak.__table__.c.scan_id == self.id)).fetchall() peak_set_items = list( map(make_memory_fitted_peak, q)) peak_set = PeakSet(peak_set_items) peak_set._index() peak_index = PeakIndex(np.array([], dtype=np.float64), np.array( [], dtype=np.float64), peak_set) else: peak_index = PeakIndex(np.array([], dtype=np.float64), np.array( [], dtype=np.float64), PeakSet([])) if deconvoluted: q = conn.execute(select([DeconvolutedPeak.__table__]).where( DeconvolutedPeak.__table__.c.scan_id == self.id)).fetchall() deconvoluted_peak_set_items = list( map(make_memory_deconvoluted_peak, q)) deconvoluted_peak_set = DeconvolutedPeakSet( deconvoluted_peak_set_items) deconvoluted_peak_set._reindex() else: deconvoluted_peak_set = DeconvolutedPeakSet([]) info = self.info or {} scan = ProcessedScan( self.scan_id, self.title, precursor_information, int(self.ms_level), float(self.scan_time), self.index, peak_index, deconvoluted_peak_set, activation=info.get('activation')) return scan
def edit_classes(self, *ids_or_labels, mode, auto_commit=True, annotator=None): """:param mode: either 'add' 'set' or 'del'""" sess = object_session(self) needs_commit = False ids = set(ids_or_labels) labels = set(_ for _ in ids if type(_) in (bytes, str)) ids -= labels if mode == 'set': self.classes[:] = [] else: classes = list(self.classes) if mode == 'del': for c in classes: if c.id in ids or c.label in labels: self.classes.remove(c) needs_commit = True ids = labels = set() # do not add anything elif mode == 'add': for c in classes: if c.id in ids: ids.remove(c.id) # already set, remove it and don't add it again if c.label in labels: labels.remove(c.label) # already set, remove it and don't add it again elif mode != 'set': raise TypeError("`mode` argument needs to be in ('add', 'del', 'set'), " "'%s' supplied" % str(mode)) if ids or labels: flt1 = None if not ids else Class.id.in_(ids) # filter on ids, or None flt2 = None if not labels else Class.label.in_(labels) # filter on labels, or None flt = flt1 if flt2 is None else flt2 if flt1 is None else (flt1 | flt2) classids2add = [_[0] for _ in sess.query(Class.id).filter(flt)] if classids2add: needs_commit = True sess.add_all((ClassLabelling(class_id=cid, segment_id=self.id, annotator=annotator, is_hand_labelled=annotator or False) for cid in classids2add)) if needs_commit and auto_commit: try: sess.commit() except SQLAlchemyError as _: sess.rollback() raise