Python sqlalchemy.orm.session 模块,sessionmaker() 实例源码

我们从Python开源项目中,提取了以下11个代码示例,用于说明如何使用sqlalchemy.orm.session.sessionmaker()

项目:freqtrade    作者:gcarq    | 项目源码 | 文件源码
def init(config: dict, engine: Optional[Engine] = None) -> None:
    """
    Initializes this module with the given config,
    registers all known command handlers
    and starts polling for message updates
    :param config: config to use
    :param engine: database engine for sqlalchemy (Optional)
    :return: None
    """
    _CONF.update(config)
    if not engine:
        if _CONF.get('dry_run', False):
            engine = create_engine('sqlite://',
                                   connect_args={'check_same_thread': False},
                                   poolclass=StaticPool,
                                   echo=False)
        else:
            engine = create_engine('sqlite:///tradesv3.sqlite')

    session = scoped_session(sessionmaker(bind=engine, autoflush=True, autocommit=True))
    Trade.session = session()
    Trade.query = session.query_property()
    _DECL_BASE.metadata.create_all(engine)
项目:PyDataLondon29-EmbarrassinglyParallelDAWithAWSLambda    作者:SignalMedia    | 项目源码 | 文件源码
def test_temporary_table(self):
        test_data = u'Hello, World!'
        expected = DataFrame({'spam': [test_data]})
        Base = declarative.declarative_base()

        class Temporary(Base):
            __tablename__ = 'temp_test'
            __table_args__ = {'prefixes': ['TEMPORARY']}
            id = sqlalchemy.Column(sqlalchemy.Integer, primary_key=True)
            spam = sqlalchemy.Column(sqlalchemy.Unicode(30), nullable=False)

        Session = sa_session.sessionmaker(bind=self.conn)
        session = Session()
        with session.transaction:
            conn = session.connection()
            Temporary.__table__.create(conn)
            session.add(Temporary(spam=test_data))
            session.flush()
            df = sql.read_sql_query(
                sql=sqlalchemy.select([Temporary.spam]),
                con=conn,
            )

        tm.assert_frame_equal(df, expected)
项目:stream2segment    作者:rizac    | 项目源码 | 文件源码
def setUp(self):

        self.addCleanup(Test.cleanup, self)


        url = os.getenv("DB_URL", "sqlite:///:memory:")
        self.dburl = url
        # print self.dburl

#         if not url:
#            self.skipTest("No database URL set")
#         
#         self.dburl = "sqlite:///:memory:"


        DBSession = scoped_session(sessionmaker())
        self.engine = create_engine(self.dburl, echo=False)
        DBSession.remove()
        DBSession.configure(bind=self.engine, autoflush=False, expire_on_commit=False)
        # Base.metadata.drop_all(engine)
        Base.metadata.create_all(self.engine)

        self.session = DBSession
项目:stream2segment    作者:rizac    | 项目源码 | 文件源码
def get_session(dbpath, scoped=False):  # , enable_fk_if_sqlite=True):
    """
    Create an sql alchemy session for IO db operations
    :param dbpath: the path to the database, e.g. sqlite:///path_to_my_dbase.sqlite
    :param scoped: boolean (False by default) if the session must be scoped session
    """
    # init the session:
    engine = create_engine(dbpath)
    Base.metadata.create_all(engine)  # @UndefinedVariable

    # enable fkeys if sqlite. This can be added also as event listener as outlined here:
    # http://stackoverflow.com/questions/13712381/how-to-turn-on-pragma-foreign-keys-on-in-sqlalchemy-migration-script-or-conf
    # NOT implemented YET. See models.py

    if not scoped:
        # create a configured "Session" class
        session = sessionmaker(bind=engine)
        # create a Session
        return session()
    # return session
    else:
        session_factory = sessionmaker(bind=engine)
        return scoped_session(session_factory)
项目:trf    作者:aistairc    | 项目源码 | 文件源码
def __init__(self, db_path: str, lang: str):

        engine = create_engine('sqlite:///{}'.format(db_path))
        SessionMaker = sessionmaker(bind=engine)
        self.session = SessionMaker()
        self.lang = lang
项目:estimators    作者:fridiculous    | 项目源码 | 文件源码
def __init__(self, url='sqlite:///default.db'):

        self.engine = create_engine(
            os.environ.get('DATABASE_URL', url),
            echo=False
        )
        self.Session = scoped_session(sessionmaker(
            bind=self.engine,
            expire_on_commit=False))
项目:antares    作者:CONABIO    | 项目源码 | 文件源码
def get_session(self):
        if self.session is None:
            klass = sessionmaker(bind=create_engine(self.database))
            self.session = klass()
        return self.session
项目:antares    作者:CONABIO    | 项目源码 | 文件源码
def persist_bundle(self):
        from madmex.persistence.driver import persist_bundle
        from sqlalchemy import create_engine
        from sqlalchemy.orm.session import sessionmaker
        from madmex.util import remove_file
        dummy = DummyBundle()
        persist_bundle(dummy)


        my_database = getattr(SETTINGS, 'ANTARES_TEST_DATABASE')
        klass = sessionmaker(bind=create_engine(my_database))
        session = klass()
        query = 'SELECT count(*) FROM product WHERE uuid=\'%s\';' % dummy.uuid_id
        print query
        try:
            result_set = session.execute(query)
            for row in result_set:
                self.assertGreater(row['count'], 0)
            # Delete object from database.
            session.delete(dummy.get_database_object())
            session.commit()
            for file_name in dummy.get_files():
                full_path = os.path.join(dummy.get_output_directory(), os.path.basename(file_name))
                self.assertTrue(os.path.isfile(full_path))
                # Remove file from filesystem.
                remove_file(full_path)
        except:
            session.rollback()
            raise
        finally:
            session.close()
项目:antares    作者:CONABIO    | 项目源码 | 文件源码
def persist_bundle_sensor(self):
        from madmex.persistence.driver import persist_bundle
        folder = '/LUSTRE/MADMEX/staging/madmex_antares/test_ingest/556_297_041114_dim_img_spot'
        from sqlalchemy import create_engine
        from sqlalchemy.orm.session import sessionmaker
        from madmex.mapper.bundle.spot5 import Bundle
        #from madmex.configuration import SETTINGS

        dummy = Bundle(folder)
        #dummy.target = '/LUSTRE/MADMEX/staging/'
        target_url = getattr(SETTINGS, 'TEST_FOLDER')
        print target_url
        #TODO please fix me, horrible hack
        dummy.target = target_url
        persist_bundle(dummy)
        my_database = getattr(SETTINGS, 'ANTARES_TEST_DATABASE')
        klass = sessionmaker(bind=create_engine(my_database))
        session = klass()
        query = 'SELECT count(*) FROM product WHERE uuid=\'%s\';' % dummy.uuid_id

        try:
            result_set = session.execute(query)
            for row in result_set:
                self.assertGreater(row['count'], 0)
            session.delete(dummy.get_database_object())
            session.commit()
            for file_name in dummy.get_files():
                full_path = os.path.join(target_url, os.path.basename(file_name))
                self.assertTrue(os.path.isfile(full_path))
                os.remove(full_path)
        except:
            session.rollback()
            raise
        finally:
            session.close()
项目:antares    作者:CONABIO    | 项目源码 | 文件源码
def create_vector_tables(path_query):
    klass = sessionmaker(bind=ENGINE)
    session = klass()
    file_open = open(path_query, 'r')
    sql = " ".join(file_open.readlines())
    session.execute(sql)
项目:CA-NEAT    作者:mathiasose    | 项目源码 | 文件源码
def __init__(self, path, echo=True):
        self.engine = create_engine(path, echo=echo)
        if not database_exists(self.engine.url):
            create_database(self.engine.url)

        Base.metadata.create_all(self.engine)
        from sqlalchemy.orm import scoped_session
        self.Session = scoped_session(sessionmaker(bind=self.engine))