我们从Python开源项目中,提取了以下49个代码示例,用于说明如何使用sqlalchemy.orm.session.Session()。
def test_rowmethod(): session = Session() with pytest.raises(ValueError) as e: ModelStub.my_row_method(0, 1, 2) c, s, a = ModelStub.my_row_method(0, session, 1) assert c is 0 assert s is session assert a == 1 i = ModelStub() c, s, a = i.my_row_method(session, 2) assert c is i assert s is session assert a == 2
def create_session(self): """ Create a session context that communicates with the database. Commits all changes to the database before closing the session, and if an exception is raised, rollback the session. """ session = Session(self.connection) logger.info("Create session {0} with {1}".format( id(session), self._public_db_uri(str(self.engine.url)))) try: yield session session.commit() logger.info("Commit transactions to database") except Exception: session.rollback() logger.exception("Database transactions rolled back") finally: logger.info("Session {0} with {1} closed".format( id(session), self._public_db_uri(str(self.engine.url)))) session.close()
def setup(self): self.mock_sess = Mock(spec_set=Session) self.cls = BiweeklyPayPeriod(date(2017, 3, 7), self.mock_sess) m_account = Mock(name='foo') type(m_account).name = 'foo' m_budget = Mock(name='bar') type(m_budget).name = 'bar' self.m_st = Mock( spec_set=ScheduledTransaction, id=123, description='desc', amount=Decimal('123.45'), account_id=2, account=m_account, budget_id=3, budget=m_budget )
def init_empty_site(self, dbsession:Session, user:UserMixin): """When the first user signs up build the admin groups and make the user member of it. Make the first member of the site to be admin and superuser. """ # Try to reflect related group class based on User model i = inspection.inspect(user.__class__) Group = i.relationships["groups"].mapper.entity # Do we already have any groups... if we do we probably don'¨t want to init again if dbsession.query(Group).count() > 0: return g = Group(name=Group.DEFAULT_ADMIN_GROUP_NAME) dbsession.add(g) g.users.append(user)
def filled_database(file_database) -> curse.Database: """Database with some mods filled in.""" # Create structure addon.AddonBase.metadata.create_all(file_database.engine) # Add few mods session = SQLSession(bind=file_database.engine) session.add_all([ addon.Mod(id=42, name='tested', summary="Mod under test"), addon.Mod(id=45, name='tester', summary="Validate tested mod"), addon.Mod(id=3, name='unrelated', summary="Dummy"), ]) session.commit() return file_database
def find(cls, connection: SQLSession, name: str) -> 'Mod': """Find exactly one Mod named NAME. Keyword Arguments: connection: Database connection to ask on. name: The name of the mod to search for. Returns: The requested mod. Raises: NoResultsFound: The name does not match any known mod. MultipleResultsFound: The name is too ambiguous, multiple matching mods found. """ query = SQLBakery(lambda conn: conn.query(cls)) query += lambda q: q.filter(cls.name.like(bindparam('name'))) return query(connection).params(name='{}'.format(name)).one()
def with_id(cls, connection: SQLSession, id: int) -> 'Mod': """Fetch mod with id from database. Keyword arguments: connection: Database connection to fetch from. id: The id field of the mod to get. Returns: The requested mod. Raises: NoResultFound: Mod with specified id does not exist. """ query = SQLBakery(lambda conn: conn.query(cls)) query += lambda q: q.filter(cls.id == bindparam('id')) return query(connection).params(id=id).one()
def get_tm_session(session_factory, transaction_manager): """ Get a ``sqlalchemy.orm.Session`` instance backed by a transaction. This function will hook the session to the transaction manager which will take care of committing any changes. - When using pyramid_tm it will automatically be committed or aborted depending on whether an exception is raised. - When using scripts you should wrap the session in a manager yourself. For example:: import transaction engine = get_engine(settings) session_factory = get_session_factory(engine) with transaction.manager: dbsession = get_tm_session(session_factory, transaction.manager) """ dbsession = session_factory() zope.sqlalchemy.register( dbsession, transaction_manager=transaction_manager) return dbsession
def _delete_all_attached(session: Session, machine: Machine): """ Delete all resources attached to a machine As we don't need performance we can avoid heuristics by dropping and re-creating theses needed resources The discovery data is the reference of the reality :param session: :param machine: :return: """ session.query(MachineDisk) \ .filter(MachineDisk.machine_id == machine.id) \ .delete() all_mi = session.query(MachineInterface) \ .filter(MachineInterface.machine_id == machine.id) for i in all_mi: session.query(ChassisPort) \ .filter(ChassisPort.machine_interface == i.id) \ .delete() session.delete(i) session.flush()
def test_modelmethod(): session = Session() with pytest.raises(ValueError) as e: ModelStub.my_model_method(1, 2) c, s, a = ModelStub.my_model_method(session, 1) assert c is ModelStub assert s is session assert a == 1
def test_default_store(self): with StoreManager(Session) as manager1: # Hacking StoreManager to raise error StoreManager._default = None self.assertRaises(DefaultStoreError, manager1.get) # making it default again StoreManager.make_default('dummy') self.assertIsNotNone(manager1.get()) # unregister StoreManager.unregister('dummy')
def test_context_stack(self): self.assertRaises(ContextError, StoreManager.get_current_store_manager) with StoreManager(Session) as manager1: store1 = manager1.get() self.assertIs(store1, manager1.default_store) with StoreManager(Session) as manager2: store2 = manager2.get() self.assertIs(store2, manager2.default_store) self.assertIsNot(manager1, manager2) self.assertIsNot(store1, store2)
def test_empty_blog(web_server: str, browser: DriverAPI, dbsession: Session): """We can render empty blog.""" # Direct Splinter browser to the website b = browser b.visit(web_server + "/blog/") # After login we see a profile link to our profile assert b.is_element_visible_by_css("#blog-no-posts")
def test_no_unpublished_in_blog_roll(web_server: str, browser: DriverAPI, dbsession: Session, unpublished_post_id): """Visitors should not see unpublished posts in blog roll.""" # Direct Splinter browser to the website b = browser b.visit(web_server + "/blog/") # After login we see a profile link to our profile assert b.is_element_visible_by_css("#blog-no-posts")
def test_published_excerpt(web_server: str, browser: DriverAPI, dbsession: Session, published_post_id): """When posts are published they become visible in blog roll.""" # Direct Splinter browser to the website b = browser b.visit(web_server + "/blog/") # After login we see a profile link to our profile assert b.is_element_visible_by_css(".excerpt")
def test_published_post(web_server: str, browser: DriverAPI, dbsession: Session, published_post_id): """User can view blog posts.""" b = browser b.visit(web_server + "/blog/") assert b.is_element_present_by_css("#heading-blog") # Go to the published post from the roll b.find_by_css(".post-link").click() assert b.is_element_present_by_css("#heading-post")
def test_empty_tag_roll(web_server: str, browser: DriverAPI, dbsession: Session): """We can render empty tag list.""" # Direct Splinter browser to the website b = browser b.visit(web_server + "/blog/tag/xxx") assert b.is_element_present_by_css("#blog-no-posts")
def create_session(self, bind, options=None, session_class=Session, query_class=SQLQuery): """Factory function to create a scoped session using `bind`. Args: bind (Engine|Connection): Database engine or connection instance. options (dict, optional): Session configuration options. session_class (obj, optional): Session class to use when creating new session instances. Defaults to :class:`.Session`. query_class (obj, optional): Query class used for ``session.query`` instances. Defaults to :class:`.SQLQuery`. Returns: Session: SQLAlchemy session instance bound to `bind`. """ if options is None: # pragma: no cover options = {} if query_class: options['query_cls'] = query_class scopefunc = options.pop('scopefunc', None) session_factory = orm.sessionmaker(bind=bind, class_=session_class, **options) return orm.scoped_session(session_factory, scopefunc=scopefunc)
def session(self, Session): """Set private :attr:`_Session`.""" self._Session = Session
def setup(self): self.mock_sess = Mock(spec_set=Session) self.cls = BiweeklyPayPeriod(date(2017, 3, 17), self.mock_sess)
def setup(self): self.mock_sess = Mock(spec_set=Session)
def setup(self): self.mock_sess = Mock(spec_set=Session) self.cls = BiweeklyPayPeriod(date(2017, 3, 7), self.mock_sess)
def check_empty_site_init(self, dbsession:Session, user:UserMixin): """Call after user creation to see if this user is the first user and should get initial admin rights.""" assert user.id, "Please flush your db" # Try to reflect related group class based on User model i = inspection.inspect(user.__class__) Group = i.relationships["groups"].mapper.entity # If we already have groups admin group must be there if dbsession.query(Group).count() > 0: return self.init_empty_site(dbsession, user)
def create_test_dbsession(request, registry: Registry, transaction_manager=transaction.manager) -> Session: """Create a test database session and setup database. Create and drop all tables when called. Add teardown function py.test to drop all tables during teardown. Also add implicit UUID extension on the database, so we don't need to add by hand every time. :param request: py.test test request :param settings: test.ini app settings :param transaction_manager: :return: New database session """ from websauna.system.model.meta import Base dbsession = create_dbsession(registry, manager=transaction_manager) engine = dbsession.get_bind() connection = engine.connect() # Support native PSQL UUID types if engine.dialect.name == "postgresql": connection.execute('create extension if not exists "uuid-ossp";') with transaction.manager: Base.metadata.drop_all(engine) Base.metadata.create_all(engine) def teardown(): # There might be open transactions in the database. They will block DROP ALL and thus the tests would end up in a deadlock. Thus, we clean up all connections we know about. # XXX: Fix this shit with transaction.manager: Base.metadata.drop_all(engine) dbsession.close() request.addfinalizer(teardown) return dbsession
def test_mod_search(filled_database): """Does the search return expected results?""" EXPECT_IDS = {42, 45} session = SQLSession(bind=filled_database.engine) selected = addon.Mod.search(session, 'Tested') assert {int(m.id) for m in selected} == EXPECT_IDS
def test_mod_find(filled_database): """Does the search find the correct mod or report correct error?""" session = SQLSession(bind=filled_database.engine) assert addon.Mod.find(session, 'Tested').id == 42 with pytest.raises(addon.NoResultFound): addon.Mod.find(session, 'nonsense')
def test_mod_with_id(filled_database): """Does the with_id find the correct mod?""" session = SQLSession(bind=filled_database.engine) assert addon.Mod.with_id(session, 42).name == 'tested' assert addon.Mod.with_id(session, 45).name == 'tester' with pytest.raises(addon.NoResultFound): addon.Mod.with_id(session, 44) # Release tests
def session(self) -> SQLSession: """Create new session for batch database communication.""" return SQLSession(bind=self.engine)
def receive_after_insert(mapper, connection, target): """ listen for the 'after_insert' event """ @event.listens_for(Session, "before_commit", once=True) def receive_before_commit(session): paper_rating = PaperRating(paper_id=target.id) session.add(paper_rating)
def receive_after_insert(mapper, connection, target): """ listen for the 'after_insert' event """ @event.listens_for(Session, "before_commit", once=True) def receive_before_commit(session): paper_rating = session.query(PaperRating).filter_by(paper_id=target.paper_id).first() user_paper_ratings = session.query(UserPaperRating).filter_by(paper_id=target.paper_id).all() update_value(paper_rating, user_paper_ratings)
def receive_after_update(mapper, connection, target): """ listen for the 'after_update' event """ @event.listens_for(Session, "before_commit", once=True) def receive_before_commit(session): paper_rating = session.query(PaperRating).filter_by(paper_id=target.paper_id).first() user_paper_ratings = session.query(UserPaperRating).filter_by(paper_id=target.paper_id).all() update_value(paper_rating, user_paper_ratings)
def receive_after_delete(mapper, connection, target): """ listen for the 'after_delete' event """ @event.listens_for(Session, "before_commit", once=True) def receive_before_commit(session): paper_rating = session.query(PaperRating).filter_by(paper_id=target.paper_id).first() user_paper_ratings = session.query(UserPaperRating).filter_by(paper_id=target.paper_id).all() update_value(paper_rating, user_paper_ratings)
def test_db_essentials(dbsession): assert isinstance(dbsession, Session) q = ZdbQuery(Products, session=dbsession) assert isinstance(q, ZdbQuery)
def setup(self): # Create a new connection to the database. self._db = Session(self.connection) self.transaction = self.connection.begin_nested() # Start with a high number so it won't interfere with tests that # search for a small number. self.counter = 2000 self.time_counter = datetime(2014, 1, 1)
def set_session(application, DB_SESSION): """Set the database session for the app. Must be of type <hydrus.hydraspec.doc_writer.HydraDoc>.""" if not isinstance(DB_SESSION, Session): raise TypeError("The API Doc is not of type <sqlalchemy.orm.session.Session>") def handler(sender, **kwargs): g.dbsession = DB_SESSION with appcontext_pushed.connected_to(handler, application): yield
def get_session(): """Get the Database Session for the server.""" session = getattr(g, 'dbsession', None) if session is None: session = sessionmaker(bind=engine)() g.dbsession = session return session