Python sqlalchemy.orm.session 模块,object_session() 实例源码

我们从Python开源项目中,提取了以下12个代码示例,用于说明如何使用sqlalchemy.orm.session.object_session()

项目:stream2segment    作者:rizac    | 项目源码 | 文件源码
def get(self, *columns):  # DEPRECATED: used for testing
        '''Gets the values in the relative columns'''
        qry = object_session(self).query(*columns)  # .select_from(self.__class__)
        jointables = set(c.class_ for c in columns)
        if jointables:
            joins = []
            model = self.__class__
            for r in self.__mapper__.relationships.values():
                if r.mapper.class_ in jointables:
                    joins.append(getattr(model, r.key))
            if joins:
                qry = qry.join(*joins)
        metadata = qry.filter(Segment.id == self.id).all()
        if len(metadata) == 1:
            return metadata[0]
        else:
            return metadata
项目:ms_deisotope    作者:mobiusklein    | 项目源码 | 文件源码
def ensure_unique_name(self, key, name):
        session = object_session(self)
        if session is not None:
            model_class = self.__class__
            name = make_unique_name(session, model_class, name)
            return name
        else:
            return name
项目:ms_deisotope    作者:mobiusklein    | 项目源码 | 文件源码
def save_bunch(self, bunch):
        session = object_session(self)
        return serialize_scan_bunch(session, bunch, self.id)
项目:kivy_rpg    作者:spinningD20    | 项目源码 | 文件源码
def save(self, commit=True):
        """Save the record."""
        db = object_session(self)
        db.add(self)
        if commit:
            db.commit()
        return self
项目:kivy_rpg    作者:spinningD20    | 项目源码 | 文件源码
def delete(self, commit=True, *args, **kwargs):
        """Remove the record from the database."""
        db = object_session(self)
        db.add(self)
        db.delete(self)
        return commit and db.commit()
项目:deb-python-sqlalchemy-utils    作者:openstack    | 项目源码 | 文件源码
def get_bind(obj):
    """
    Return the bind for given SQLAlchemy Engine / Connection / declarative
    model object.

    :param obj: SQLAlchemy Engine / Connection / declarative model object

    ::

        from sqlalchemy_utils import get_bind


        get_bind(session)  # Connection object

        get_bind(user)

    """
    if hasattr(obj, 'bind'):
        conn = obj.bind
    else:
        try:
            conn = object_session(obj).bind
        except UnmappedInstanceError:
            conn = obj

    if not hasattr(conn, 'execute'):
        raise TypeError(
            'This method accepts only Session, Engine, Connection and '
            'declarative model objects.'
        )
    return conn
项目:deb-python-sqlalchemy-utils    作者:openstack    | 项目源码 | 文件源码
def is_deleted(obj):
    return obj in sa.orm.object_session(obj).deleted
项目:stream2segment    作者:rizac    | 项目源码 | 文件源码
def save_inventory(station, downloaded_bytes_data):
    """Saves the inventory. Raises SqlAlchemyError if station's session is None,
    or (most likely IntegrityError) on failure
    """
    station.inventory_xml = dumps_inv(downloaded_bytes_data)
    try:
        object_session(station).commit()
    except UnmappedInstanceError:
        raise
    except SQLAlchemyError:
        object_session(station).rollback()
        raise
项目:idealoom    作者:conversence    | 项目源码 | 文件源码
def object_session(self):
        return object_session(self)
项目:idealoom    作者:conversence    | 项目源码 | 文件源码
def orm_update_listener(mapper, connection, target):
    if getattr(target, '__history_table__', None):
        return
    session = object_session(target)
    if session.is_modified(target, include_collections=False):
        target.send_to_changes(connection, CrudOperation.UPDATE)
项目:ms_deisotope    作者:mobiusklein    | 项目源码 | 文件源码
def convert(self, fitted=True, deconvoluted=True):
        precursor_information = self.precursor_information.convert(
        ) if self.precursor_information is not None else None

        session = object_session(self)
        conn = session.connection()

        if fitted:
            q = conn.execute(select([FittedPeak.__table__]).where(
                FittedPeak.__table__.c.scan_id == self.id)).fetchall()

            peak_set_items = list(
                map(make_memory_fitted_peak, q))

            peak_set = PeakSet(peak_set_items)
            peak_set._index()
            peak_index = PeakIndex(np.array([], dtype=np.float64), np.array(
                [], dtype=np.float64), peak_set)
        else:
            peak_index = PeakIndex(np.array([], dtype=np.float64), np.array(
                [], dtype=np.float64), PeakSet([]))

        if deconvoluted:
            q = conn.execute(select([DeconvolutedPeak.__table__]).where(
                DeconvolutedPeak.__table__.c.scan_id == self.id)).fetchall()

            deconvoluted_peak_set_items = list(
                map(make_memory_deconvoluted_peak, q))

            deconvoluted_peak_set = DeconvolutedPeakSet(
                deconvoluted_peak_set_items)
            deconvoluted_peak_set._reindex()
        else:
            deconvoluted_peak_set = DeconvolutedPeakSet([])

        info = self.info or {}

        scan = ProcessedScan(
            self.scan_id, self.title, precursor_information, int(self.ms_level),
            float(self.scan_time), self.index, peak_index, deconvoluted_peak_set,
            activation=info.get('activation'))
        return scan
项目:stream2segment    作者:rizac    | 项目源码 | 文件源码
def edit_classes(self, *ids_or_labels, mode, auto_commit=True, annotator=None):
        """:param mode: either 'add' 'set' or 'del'"""
        sess = object_session(self)
        needs_commit = False
        ids = set(ids_or_labels)
        labels = set(_ for _ in ids if type(_) in (bytes, str))
        ids -= labels
        if mode == 'set':
            self.classes[:] = []
        else:
            classes = list(self.classes)

        if mode == 'del':
            for c in classes:
                if c.id in ids or c.label in labels:
                    self.classes.remove(c)
                    needs_commit = True
            ids = labels = set()  # do not add anything
        elif mode == 'add':
            for c in classes:
                if c.id in ids:
                    ids.remove(c.id)  # already set, remove it and don't add it again
                if c.label in labels:
                    labels.remove(c.label)  # already set, remove it and don't add it again
        elif mode != 'set':
            raise TypeError("`mode` argument needs to be in ('add', 'del', 'set'), "
                            "'%s' supplied" % str(mode))

        if ids or labels:
            flt1 = None if not ids else Class.id.in_(ids)  # filter on ids, or None
            flt2 = None if not labels else Class.label.in_(labels)  # filter on labels, or None
            flt = flt1 if flt2 is None else flt2 if flt1 is None else (flt1 | flt2)
            classids2add = [_[0] for _ in sess.query(Class.id).filter(flt)]
            if classids2add:
                needs_commit = True
                sess.add_all((ClassLabelling(class_id=cid,
                                             segment_id=self.id, annotator=annotator,
                                             is_hand_labelled=annotator or False)
                              for cid in classids2add))

        if needs_commit and auto_commit:
            try:
                sess.commit()
            except SQLAlchemyError as _:
                sess.rollback()
                raise