Python sqlalchemy.orm.session 模块,Session() 实例源码

我们从Python开源项目中,提取了以下49个代码示例,用于说明如何使用sqlalchemy.orm.session.Session()

项目:windflow    作者:hartym    | 项目源码 | 文件源码
def test_rowmethod():
    session = Session()

    with pytest.raises(ValueError) as e:
        ModelStub.my_row_method(0, 1, 2)

    c, s, a = ModelStub.my_row_method(0, session, 1)

    assert c is 0
    assert s is session
    assert a == 1

    i = ModelStub()

    c, s, a = i.my_row_method(session, 2)
    assert c is i
    assert s is session
    assert a == 2
项目:marcotti-mls    作者:soccermetrics    | 项目源码 | 文件源码
def create_session(self):
        """
        Create a session context that communicates with the database.

        Commits all changes to the database before closing the session, and if an exception is raised,
        rollback the session.
        """
        session = Session(self.connection)
        logger.info("Create session {0} with {1}".format(
            id(session), self._public_db_uri(str(self.engine.url))))
        try:
            yield session
            session.commit()
            logger.info("Commit transactions to database")
        except Exception:
            session.rollback()
            logger.exception("Database transactions rolled back")
        finally:
            logger.info("Session {0} with {1} closed".format(
                id(session), self._public_db_uri(str(self.engine.url))))
            session.close()
项目:biweeklybudget    作者:jantman    | 项目源码 | 文件源码
def setup(self):
        self.mock_sess = Mock(spec_set=Session)
        self.cls = BiweeklyPayPeriod(date(2017, 3, 7), self.mock_sess)
        m_account = Mock(name='foo')
        type(m_account).name = 'foo'
        m_budget = Mock(name='bar')
        type(m_budget).name = 'bar'
        self.m_st = Mock(
            spec_set=ScheduledTransaction,
            id=123,
            description='desc',
            amount=Decimal('123.45'),
            account_id=2,
            account=m_account,
            budget_id=3,
            budget=m_budget
        )
项目:websauna    作者:websauna    | 项目源码 | 文件源码
def init_empty_site(self, dbsession:Session, user:UserMixin):
        """When the first user signs up build the admin groups and make the user member of it.

        Make the first member of the site to be admin and superuser.
        """

        # Try to reflect related group class based on User model
        i = inspection.inspect(user.__class__)
        Group = i.relationships["groups"].mapper.entity

        # Do we already have any groups... if we do we probably don'¨t want to init again
        if dbsession.query(Group).count() > 0:
            return

        g = Group(name=Group.DEFAULT_ADMIN_GROUP_NAME)
        dbsession.add(g)

        g.users.append(user)
项目:mccurse    作者:khardix    | 项目源码 | 文件源码
def filled_database(file_database) -> curse.Database:
    """Database with some mods filled in."""

    # Create structure
    addon.AddonBase.metadata.create_all(file_database.engine)

    # Add few mods
    session = SQLSession(bind=file_database.engine)
    session.add_all([
        addon.Mod(id=42, name='tested', summary="Mod under test"),
        addon.Mod(id=45, name='tester', summary="Validate tested mod"),
        addon.Mod(id=3, name='unrelated', summary="Dummy"),
    ])
    session.commit()

    return file_database
项目:mccurse    作者:khardix    | 项目源码 | 文件源码
def find(cls, connection: SQLSession, name: str) -> 'Mod':
        """Find exactly one Mod named NAME.

        Keyword Arguments:
            connection: Database connection to ask on.
            name: The name of the mod to search for.

        Returns:
            The requested mod.

        Raises:
            NoResultsFound: The name does not match any known mod.
            MultipleResultsFound: The name is too ambiguous,
                multiple matching mods found.
        """

        query = SQLBakery(lambda conn: conn.query(cls))
        query += lambda q: q.filter(cls.name.like(bindparam('name')))

        return query(connection).params(name='{}'.format(name)).one()
项目:mccurse    作者:khardix    | 项目源码 | 文件源码
def with_id(cls, connection: SQLSession, id: int) -> 'Mod':
        """Fetch mod with id from database.

        Keyword arguments:
            connection: Database connection to fetch from.
            id: The id field of the mod to get.

        Returns:
            The requested mod.

        Raises:
            NoResultFound: Mod with specified id does not exist.
        """

        query = SQLBakery(lambda conn: conn.query(cls))
        query += lambda q: q.filter(cls.id == bindparam('id'))

        return query(connection).params(id=id).one()
项目:papersummarize    作者:mrdrozdov    | 项目源码 | 文件源码
def get_tm_session(session_factory, transaction_manager):
    """
    Get a ``sqlalchemy.orm.Session`` instance backed by a transaction.

    This function will hook the session to the transaction manager which
    will take care of committing any changes.

    - When using pyramid_tm it will automatically be committed or aborted
      depending on whether an exception is raised.

    - When using scripts you should wrap the session in a manager yourself.
      For example::

          import transaction

          engine = get_engine(settings)
          session_factory = get_session_factory(engine)
          with transaction.manager:
              dbsession = get_tm_session(session_factory, transaction.manager)

    """
    dbsession = session_factory()
    zope.sqlalchemy.register(
        dbsession, transaction_manager=transaction_manager)
    return dbsession
项目:enjoliver    作者:JulienBalestra    | 项目源码 | 文件源码
def _delete_all_attached(session: Session, machine: Machine):
        """
        Delete all resources attached to a machine
        As we don't need performance we can avoid heuristics by dropping and re-creating theses needed resources
        The discovery data is the reference of the reality
        :param session:
        :param machine:
        :return:
        """
        session.query(MachineDisk) \
            .filter(MachineDisk.machine_id == machine.id) \
            .delete()

        all_mi = session.query(MachineInterface) \
            .filter(MachineInterface.machine_id == machine.id)
        for i in all_mi:
            session.query(ChassisPort) \
                .filter(ChassisPort.machine_interface == i.id) \
                .delete()
            session.delete(i)

        session.flush()
项目:windflow    作者:hartym    | 项目源码 | 文件源码
def test_modelmethod():
    session = Session()

    with pytest.raises(ValueError) as e:
        ModelStub.my_model_method(1, 2)

    c, s, a = ModelStub.my_model_method(session, 1)

    assert c is ModelStub
    assert s is session
    assert a == 1
项目:sqlalchemy-media    作者:pylover    | 项目源码 | 文件源码
def test_default_store(self):

        with StoreManager(Session) as manager1:

            # Hacking StoreManager to raise error
            StoreManager._default = None
            self.assertRaises(DefaultStoreError, manager1.get)

            # making it default again
            StoreManager.make_default('dummy')
            self.assertIsNotNone(manager1.get())

        # unregister
        StoreManager.unregister('dummy')
项目:sqlalchemy-media    作者:pylover    | 项目源码 | 文件源码
def test_context_stack(self):

        self.assertRaises(ContextError, StoreManager.get_current_store_manager)

        with StoreManager(Session) as manager1:
            store1 = manager1.get()
            self.assertIs(store1, manager1.default_store)

            with StoreManager(Session) as manager2:
                store2 = manager2.get()
                self.assertIs(store2, manager2.default_store)
                self.assertIsNot(manager1, manager2)
                self.assertIsNot(store1, store2)
项目:websauna.blog    作者:websauna    | 项目源码 | 文件源码
def test_empty_blog(web_server: str, browser: DriverAPI, dbsession: Session):
    """We can render empty blog."""

    # Direct Splinter browser to the website
    b = browser
    b.visit(web_server + "/blog/")

    # After login we see a profile link to our profile
    assert b.is_element_visible_by_css("#blog-no-posts")
项目:websauna.blog    作者:websauna    | 项目源码 | 文件源码
def test_no_unpublished_in_blog_roll(web_server: str, browser: DriverAPI, dbsession: Session, unpublished_post_id):
    """Visitors should not see unpublished posts in blog roll."""

    # Direct Splinter browser to the website
    b = browser
    b.visit(web_server + "/blog/")

    # After login we see a profile link to our profile
    assert b.is_element_visible_by_css("#blog-no-posts")
项目:websauna.blog    作者:websauna    | 项目源码 | 文件源码
def test_published_excerpt(web_server: str, browser: DriverAPI, dbsession: Session, published_post_id):
    """When posts are published they become visible in blog roll."""

    # Direct Splinter browser to the website
    b = browser
    b.visit(web_server + "/blog/")

    # After login we see a profile link to our profile
    assert b.is_element_visible_by_css(".excerpt")
项目:websauna.blog    作者:websauna    | 项目源码 | 文件源码
def test_published_post(web_server: str, browser: DriverAPI, dbsession: Session, published_post_id):
    """User can view blog posts."""

    b = browser
    b.visit(web_server + "/blog/")

    assert b.is_element_present_by_css("#heading-blog")

    # Go to the published post from the roll
    b.find_by_css(".post-link").click()

    assert b.is_element_present_by_css("#heading-post")
项目:websauna.blog    作者:websauna    | 项目源码 | 文件源码
def test_empty_tag_roll(web_server: str, browser: DriverAPI, dbsession: Session):
    """We can render empty tag list."""

    # Direct Splinter browser to the website
    b = browser
    b.visit(web_server + "/blog/tag/xxx")

    assert b.is_element_present_by_css("#blog-no-posts")
项目:sqlservice    作者:dgilland    | 项目源码 | 文件源码
def create_session(self,
                       bind,
                       options=None,
                       session_class=Session,
                       query_class=SQLQuery):
        """Factory function to create a scoped session using `bind`.

        Args:
            bind (Engine|Connection): Database engine or connection instance.
            options (dict, optional): Session configuration options.
            session_class (obj, optional): Session class to use when creating
                new session instances. Defaults to :class:`.Session`.
            query_class (obj, optional): Query class used for ``session.query``
                instances. Defaults to :class:`.SQLQuery`.

        Returns:
            Session: SQLAlchemy session instance bound to `bind`.
        """
        if options is None:  # pragma: no cover
            options = {}

        if query_class:
            options['query_cls'] = query_class

        scopefunc = options.pop('scopefunc', None)
        session_factory = orm.sessionmaker(bind=bind,
                                           class_=session_class,
                                           **options)

        return orm.scoped_session(session_factory, scopefunc=scopefunc)
项目:sqlservice    作者:dgilland    | 项目源码 | 文件源码
def session(self, Session):
        """Set private :attr:`_Session`."""
        self._Session = Session
项目:biweeklybudget    作者:jantman    | 项目源码 | 文件源码
def setup(self):
        self.mock_sess = Mock(spec_set=Session)
        self.cls = BiweeklyPayPeriod(date(2017, 3, 17), self.mock_sess)
项目:biweeklybudget    作者:jantman    | 项目源码 | 文件源码
def setup(self):
        self.mock_sess = Mock(spec_set=Session)
        self.cls = BiweeklyPayPeriod(date(2017, 3, 17), self.mock_sess)
项目:biweeklybudget    作者:jantman    | 项目源码 | 文件源码
def setup(self):
        self.mock_sess = Mock(spec_set=Session)
        self.cls = BiweeklyPayPeriod(date(2017, 3, 17), self.mock_sess)
项目:biweeklybudget    作者:jantman    | 项目源码 | 文件源码
def setup(self):
        self.mock_sess = Mock(spec_set=Session)
        self.cls = BiweeklyPayPeriod(date(2017, 3, 17), self.mock_sess)
项目:biweeklybudget    作者:jantman    | 项目源码 | 文件源码
def setup(self):
        self.mock_sess = Mock(spec_set=Session)
        self.cls = BiweeklyPayPeriod(date(2017, 3, 17), self.mock_sess)
项目:biweeklybudget    作者:jantman    | 项目源码 | 文件源码
def setup(self):
        self.mock_sess = Mock(spec_set=Session)
        self.cls = BiweeklyPayPeriod(date(2017, 3, 17), self.mock_sess)
项目:biweeklybudget    作者:jantman    | 项目源码 | 文件源码
def setup(self):
        self.mock_sess = Mock(spec_set=Session)
        self.cls = BiweeklyPayPeriod(date(2017, 3, 17), self.mock_sess)
项目:biweeklybudget    作者:jantman    | 项目源码 | 文件源码
def setup(self):
        self.mock_sess = Mock(spec_set=Session)
项目:biweeklybudget    作者:jantman    | 项目源码 | 文件源码
def setup(self):
        self.mock_sess = Mock(spec_set=Session)
        self.cls = BiweeklyPayPeriod(date(2017, 3, 17), self.mock_sess)
项目:biweeklybudget    作者:jantman    | 项目源码 | 文件源码
def setup(self):
        self.mock_sess = Mock(spec_set=Session)
        self.cls = BiweeklyPayPeriod(date(2017, 3, 17), self.mock_sess)
项目:biweeklybudget    作者:jantman    | 项目源码 | 文件源码
def setup(self):
        self.mock_sess = Mock(spec_set=Session)
        self.cls = BiweeklyPayPeriod(date(2017, 3, 7), self.mock_sess)
项目:biweeklybudget    作者:jantman    | 项目源码 | 文件源码
def setup(self):
        self.mock_sess = Mock(spec_set=Session)
        self.cls = BiweeklyPayPeriod(date(2017, 3, 17), self.mock_sess)
项目:biweeklybudget    作者:jantman    | 项目源码 | 文件源码
def setup(self):
        self.mock_sess = Mock(spec_set=Session)
        self.cls = BiweeklyPayPeriod(date(2017, 3, 17), self.mock_sess)
项目:biweeklybudget    作者:jantman    | 项目源码 | 文件源码
def setup(self):
        self.mock_sess = Mock(spec_set=Session)
        self.cls = BiweeklyPayPeriod(date(2017, 3, 7), self.mock_sess)
项目:biweeklybudget    作者:jantman    | 项目源码 | 文件源码
def setup(self):
        self.mock_sess = Mock(spec_set=Session)
        self.cls = BiweeklyPayPeriod(date(2017, 3, 17), self.mock_sess)
项目:biweeklybudget    作者:jantman    | 项目源码 | 文件源码
def setup(self):
        self.mock_sess = Mock(spec_set=Session)
        self.cls = BiweeklyPayPeriod(date(2017, 3, 17), self.mock_sess)
项目:websauna    作者:websauna    | 项目源码 | 文件源码
def check_empty_site_init(self, dbsession:Session, user:UserMixin):
        """Call after user creation to see if this user is the first user and should get initial admin rights."""

        assert user.id, "Please flush your db"

        # Try to reflect related group class based on User model
        i = inspection.inspect(user.__class__)
        Group = i.relationships["groups"].mapper.entity

        # If we already have groups admin group must be there
        if dbsession.query(Group).count() > 0:
            return

        self.init_empty_site(dbsession, user)
项目:websauna    作者:websauna    | 项目源码 | 文件源码
def create_test_dbsession(request, registry: Registry, transaction_manager=transaction.manager) -> Session:
    """Create a test database session and setup database.

    Create and drop all tables when called. Add teardown function py.test to drop all tables during teardown.
    Also add implicit UUID extension on the database, so we don't need to add by hand every time.

    :param request: py.test test request
    :param settings: test.ini app settings
    :param transaction_manager:
    :return: New database session
    """
    from websauna.system.model.meta import Base

    dbsession = create_dbsession(registry, manager=transaction_manager)
    engine = dbsession.get_bind()

    connection = engine.connect()

    # Support native PSQL UUID types
    if engine.dialect.name == "postgresql":
        connection.execute('create extension if not exists "uuid-ossp";')

    with transaction.manager:
        Base.metadata.drop_all(engine)
        Base.metadata.create_all(engine)

    def teardown():
        # There might be open transactions in the database. They will block DROP ALL and thus the tests would end up in a deadlock. Thus, we clean up all connections we know about.
        # XXX: Fix this shit

        with transaction.manager:
            Base.metadata.drop_all(engine)

        dbsession.close()

    request.addfinalizer(teardown)

    return dbsession
项目:mccurse    作者:khardix    | 项目源码 | 文件源码
def test_mod_search(filled_database):
    """Does the search return expected results?"""

    EXPECT_IDS = {42, 45}

    session = SQLSession(bind=filled_database.engine)
    selected = addon.Mod.search(session, 'Tested')

    assert {int(m.id) for m in selected} == EXPECT_IDS
项目:mccurse    作者:khardix    | 项目源码 | 文件源码
def test_mod_find(filled_database):
    """Does the search find the correct mod or report correct error?"""

    session = SQLSession(bind=filled_database.engine)

    assert addon.Mod.find(session, 'Tested').id == 42

    with pytest.raises(addon.NoResultFound):
        addon.Mod.find(session, 'nonsense')
项目:mccurse    作者:khardix    | 项目源码 | 文件源码
def test_mod_with_id(filled_database):
    """Does the with_id find the correct mod?"""

    session = SQLSession(bind=filled_database.engine)

    assert addon.Mod.with_id(session, 42).name == 'tested'
    assert addon.Mod.with_id(session, 45).name == 'tester'

    with pytest.raises(addon.NoResultFound):
        addon.Mod.with_id(session, 44)


# Release tests
项目:mccurse    作者:khardix    | 项目源码 | 文件源码
def session(self) -> SQLSession:
        """Create new session for batch database communication."""

        return SQLSession(bind=self.engine)
项目:papersummarize    作者:mrdrozdov    | 项目源码 | 文件源码
def receive_after_insert(mapper, connection, target):
    """ listen for the 'after_insert' event """
    @event.listens_for(Session, "before_commit", once=True)
    def receive_before_commit(session):
        paper_rating = PaperRating(paper_id=target.id)
        session.add(paper_rating)
项目:papersummarize    作者:mrdrozdov    | 项目源码 | 文件源码
def receive_after_insert(mapper, connection, target):
    """ listen for the 'after_insert' event """
    @event.listens_for(Session, "before_commit", once=True)
    def receive_before_commit(session):
        paper_rating = session.query(PaperRating).filter_by(paper_id=target.paper_id).first()
        user_paper_ratings = session.query(UserPaperRating).filter_by(paper_id=target.paper_id).all()
        update_value(paper_rating, user_paper_ratings)
项目:papersummarize    作者:mrdrozdov    | 项目源码 | 文件源码
def receive_after_update(mapper, connection, target):
    """ listen for the 'after_update' event """
    @event.listens_for(Session, "before_commit", once=True)
    def receive_before_commit(session):
        paper_rating = session.query(PaperRating).filter_by(paper_id=target.paper_id).first()
        user_paper_ratings = session.query(UserPaperRating).filter_by(paper_id=target.paper_id).all()
        update_value(paper_rating, user_paper_ratings)
项目:papersummarize    作者:mrdrozdov    | 项目源码 | 文件源码
def receive_after_delete(mapper, connection, target):
    """ listen for the 'after_delete' event """
    @event.listens_for(Session, "before_commit", once=True)
    def receive_before_commit(session):
        paper_rating = session.query(PaperRating).filter_by(paper_id=target.paper_id).first()
        user_paper_ratings = session.query(UserPaperRating).filter_by(paper_id=target.paper_id).all()
        update_value(paper_rating, user_paper_ratings)
项目:sqlalchemy_zdb    作者:skftn    | 项目源码 | 文件源码
def test_db_essentials(dbsession):
    assert isinstance(dbsession, Session)
    q = ZdbQuery(Products, session=dbsession)
    assert isinstance(q, ZdbQuery)
项目:botfriend    作者:leonardr    | 项目源码 | 文件源码
def setup(self):
        # Create a new connection to the database.
        self._db = Session(self.connection)
        self.transaction = self.connection.begin_nested()

        # Start with a high number so it won't interfere with tests that
        # search for a small number.
        self.counter = 2000

        self.time_counter = datetime(2014, 1, 1)
项目:hydrus    作者:HTTP-APIs    | 项目源码 | 文件源码
def set_session(application, DB_SESSION):
    """Set the database session for the app. Must be of type <hydrus.hydraspec.doc_writer.HydraDoc>."""
    if not isinstance(DB_SESSION, Session):
        raise TypeError("The API Doc is not of type <sqlalchemy.orm.session.Session>")

    def handler(sender, **kwargs):
        g.dbsession = DB_SESSION
    with appcontext_pushed.connected_to(handler, application):
        yield
项目:hydrus    作者:HTTP-APIs    | 项目源码 | 文件源码
def get_session():
    """Get the Database Session for the server."""
    session = getattr(g, 'dbsession', None)
    if session is None:
        session = sessionmaker(bind=engine)()
        g.dbsession = session
    return session