我们从Python开源项目中,提取了以下11个代码示例,用于说明如何使用sqlalchemy.orm.session.sessionmaker()。
def init(config: dict, engine: Optional[Engine] = None) -> None: """ Initializes this module with the given config, registers all known command handlers and starts polling for message updates :param config: config to use :param engine: database engine for sqlalchemy (Optional) :return: None """ _CONF.update(config) if not engine: if _CONF.get('dry_run', False): engine = create_engine('sqlite://', connect_args={'check_same_thread': False}, poolclass=StaticPool, echo=False) else: engine = create_engine('sqlite:///tradesv3.sqlite') session = scoped_session(sessionmaker(bind=engine, autoflush=True, autocommit=True)) Trade.session = session() Trade.query = session.query_property() _DECL_BASE.metadata.create_all(engine)
def test_temporary_table(self): test_data = u'Hello, World!' expected = DataFrame({'spam': [test_data]}) Base = declarative.declarative_base() class Temporary(Base): __tablename__ = 'temp_test' __table_args__ = {'prefixes': ['TEMPORARY']} id = sqlalchemy.Column(sqlalchemy.Integer, primary_key=True) spam = sqlalchemy.Column(sqlalchemy.Unicode(30), nullable=False) Session = sa_session.sessionmaker(bind=self.conn) session = Session() with session.transaction: conn = session.connection() Temporary.__table__.create(conn) session.add(Temporary(spam=test_data)) session.flush() df = sql.read_sql_query( sql=sqlalchemy.select([Temporary.spam]), con=conn, ) tm.assert_frame_equal(df, expected)
def setUp(self): self.addCleanup(Test.cleanup, self) url = os.getenv("DB_URL", "sqlite:///:memory:") self.dburl = url # print self.dburl # if not url: # self.skipTest("No database URL set") # # self.dburl = "sqlite:///:memory:" DBSession = scoped_session(sessionmaker()) self.engine = create_engine(self.dburl, echo=False) DBSession.remove() DBSession.configure(bind=self.engine, autoflush=False, expire_on_commit=False) # Base.metadata.drop_all(engine) Base.metadata.create_all(self.engine) self.session = DBSession
def get_session(dbpath, scoped=False): # , enable_fk_if_sqlite=True): """ Create an sql alchemy session for IO db operations :param dbpath: the path to the database, e.g. sqlite:///path_to_my_dbase.sqlite :param scoped: boolean (False by default) if the session must be scoped session """ # init the session: engine = create_engine(dbpath) Base.metadata.create_all(engine) # @UndefinedVariable # enable fkeys if sqlite. This can be added also as event listener as outlined here: # http://stackoverflow.com/questions/13712381/how-to-turn-on-pragma-foreign-keys-on-in-sqlalchemy-migration-script-or-conf # NOT implemented YET. See models.py if not scoped: # create a configured "Session" class session = sessionmaker(bind=engine) # create a Session return session() # return session else: session_factory = sessionmaker(bind=engine) return scoped_session(session_factory)
def __init__(self, db_path: str, lang: str): engine = create_engine('sqlite:///{}'.format(db_path)) SessionMaker = sessionmaker(bind=engine) self.session = SessionMaker() self.lang = lang
def __init__(self, url='sqlite:///default.db'): self.engine = create_engine( os.environ.get('DATABASE_URL', url), echo=False ) self.Session = scoped_session(sessionmaker( bind=self.engine, expire_on_commit=False))
def get_session(self): if self.session is None: klass = sessionmaker(bind=create_engine(self.database)) self.session = klass() return self.session
def persist_bundle(self): from madmex.persistence.driver import persist_bundle from sqlalchemy import create_engine from sqlalchemy.orm.session import sessionmaker from madmex.util import remove_file dummy = DummyBundle() persist_bundle(dummy) my_database = getattr(SETTINGS, 'ANTARES_TEST_DATABASE') klass = sessionmaker(bind=create_engine(my_database)) session = klass() query = 'SELECT count(*) FROM product WHERE uuid=\'%s\';' % dummy.uuid_id print query try: result_set = session.execute(query) for row in result_set: self.assertGreater(row['count'], 0) # Delete object from database. session.delete(dummy.get_database_object()) session.commit() for file_name in dummy.get_files(): full_path = os.path.join(dummy.get_output_directory(), os.path.basename(file_name)) self.assertTrue(os.path.isfile(full_path)) # Remove file from filesystem. remove_file(full_path) except: session.rollback() raise finally: session.close()
def persist_bundle_sensor(self): from madmex.persistence.driver import persist_bundle folder = '/LUSTRE/MADMEX/staging/madmex_antares/test_ingest/556_297_041114_dim_img_spot' from sqlalchemy import create_engine from sqlalchemy.orm.session import sessionmaker from madmex.mapper.bundle.spot5 import Bundle #from madmex.configuration import SETTINGS dummy = Bundle(folder) #dummy.target = '/LUSTRE/MADMEX/staging/' target_url = getattr(SETTINGS, 'TEST_FOLDER') print target_url #TODO please fix me, horrible hack dummy.target = target_url persist_bundle(dummy) my_database = getattr(SETTINGS, 'ANTARES_TEST_DATABASE') klass = sessionmaker(bind=create_engine(my_database)) session = klass() query = 'SELECT count(*) FROM product WHERE uuid=\'%s\';' % dummy.uuid_id try: result_set = session.execute(query) for row in result_set: self.assertGreater(row['count'], 0) session.delete(dummy.get_database_object()) session.commit() for file_name in dummy.get_files(): full_path = os.path.join(target_url, os.path.basename(file_name)) self.assertTrue(os.path.isfile(full_path)) os.remove(full_path) except: session.rollback() raise finally: session.close()
def create_vector_tables(path_query): klass = sessionmaker(bind=ENGINE) session = klass() file_open = open(path_query, 'r') sql = " ".join(file_open.readlines()) session.execute(sql)
def __init__(self, path, echo=True): self.engine = create_engine(path, echo=echo) if not database_exists(self.engine.url): create_database(self.engine.url) Base.metadata.create_all(self.engine) from sqlalchemy.orm import scoped_session self.Session = scoped_session(sessionmaker(bind=self.engine))