Python sqlalchemy.pool 模块,StaticPool() 实例源码

我们从Python开源项目中,提取了以下26个代码示例,用于说明如何使用sqlalchemy.pool.StaticPool()

项目:freqtrade    作者:gcarq    | 项目源码 | 文件源码
def init(config: dict, engine: Optional[Engine] = None) -> None:
    """
    Initializes this module with the given config,
    registers all known command handlers
    and starts polling for message updates
    :param config: config to use
    :param engine: database engine for sqlalchemy (Optional)
    :return: None
    """
    _CONF.update(config)
    if not engine:
        if _CONF.get('dry_run', False):
            engine = create_engine('sqlite://',
                                   connect_args={'check_same_thread': False},
                                   poolclass=StaticPool,
                                   echo=False)
        else:
            engine = create_engine('sqlite:///tradesv3.sqlite')

    session = scoped_session(sessionmaker(bind=engine, autoflush=True, autocommit=True))
    Trade.session = session()
    Trade.query = session.query_property()
    _DECL_BASE.metadata.create_all(engine)
项目:homeassistant    作者:NAStools    | 项目源码 | 文件源码
def _setup_connection(self):
        """Ensure database is ready to fly."""
        global Session  # pylint: disable=global-statement

        import homeassistant.components.recorder.models as models
        from sqlalchemy import create_engine
        from sqlalchemy.orm import scoped_session
        from sqlalchemy.orm import sessionmaker

        if self.db_url == 'sqlite://' or ':memory:' in self.db_url:
            from sqlalchemy.pool import StaticPool
            self.engine = create_engine(
                'sqlite://',
                connect_args={'check_same_thread': False},
                poolclass=StaticPool)
        else:
            self.engine = create_engine(self.db_url, echo=False)

        models.Base.metadata.create_all(self.engine)
        session_factory = sessionmaker(bind=self.engine)
        Session = scoped_session(session_factory)
        self.db_ready.set()
项目:Flask_Blog    作者:sugarguo    | 项目源码 | 文件源码
def setup_bind(cls):
        if config.requirements.independent_connections.enabled:
            from sqlalchemy import pool
            return engines.testing_engine(
                options=dict(poolclass=pool.StaticPool))
        else:
            return config.db
项目:QXSConsolas    作者:qxsch    | 项目源码 | 文件源码
def setup_bind(cls):
        if config.requirements.independent_connections.enabled:
            from sqlalchemy import pool
            return engines.testing_engine(
                options=dict(poolclass=pool.StaticPool))
        else:
            return config.db
项目:chihu    作者:yelongyu    | 项目源码 | 文件源码
def setup_bind(cls):
        if config.requirements.independent_connections.enabled:
            from sqlalchemy import pool
            return engines.testing_engine(
                options=dict(poolclass=pool.StaticPool))
        else:
            return config.db
项目:ShelbySearch    作者:Agentscreech    | 项目源码 | 文件源码
def setup_bind(cls):
        if config.requirements.independent_connections.enabled:
            from sqlalchemy import pool
            return engines.testing_engine(
                options=dict(poolclass=pool.StaticPool))
        else:
            return config.db
项目:Price-Comparator    作者:Thejas-1    | 项目源码 | 文件源码
def setup_bind(cls):
        if config.requirements.independent_connections.enabled:
            from sqlalchemy import pool
            return engines.testing_engine(
                options=dict(poolclass=pool.StaticPool))
        else:
            return config.db
项目:Price-Comparator    作者:Thejas-1    | 项目源码 | 文件源码
def _connect(self, url):
        self.url = url
        # TODO: seems like 0.5.x branch does not work with engine.dispose and staticpool
        #self.engine = create_engine(url, echo=True, poolclass=StaticPool)
        self.engine = create_engine(url, echo=True)
        # silence the logger added by SA, nose adds its own!
        logging.getLogger('sqlalchemy').handlers=[]
        self.meta = MetaData(bind=self.engine)
        if self.level < self.CONNECT:
            return
        #self.session = create_session(bind=self.engine)
        if self.level < self.TXN: 
            return
        #self.txn = self.session.begin()
项目:webapp    作者:superchilli    | 项目源码 | 文件源码
def setup_bind(cls):
        if config.requirements.independent_connections.enabled:
            from sqlalchemy import pool
            return engines.testing_engine(
                options=dict(poolclass=pool.StaticPool))
        else:
            return config.db
项目:QualquerMerdaAPI    作者:tiagovizoto    | 项目源码 | 文件源码
def setup_bind(cls):
        if config.requirements.independent_connections.enabled:
            from sqlalchemy import pool
            return engines.testing_engine(
                options=dict(poolclass=pool.StaticPool))
        else:
            return config.db
项目:gardenbot    作者:GoestaO    | 项目源码 | 文件源码
def setup_bind(cls):
        if config.requirements.independent_connections.enabled:
            from sqlalchemy import pool
            return engines.testing_engine(
                options=dict(poolclass=pool.StaticPool))
        else:
            return config.db
项目:flask-zhenai-mongo-echarts    作者:Fretice    | 项目源码 | 文件源码
def setup_bind(cls):
        if config.requirements.independent_connections.enabled:
            from sqlalchemy import pool
            return engines.testing_engine(
                options=dict(poolclass=pool.StaticPool))
        else:
            return config.db
项目:Data-visualization    作者:insta-code1    | 项目源码 | 文件源码
def setup_bind(cls):
        if config.requirements.independent_connections.enabled:
            from sqlalchemy import pool
            return engines.testing_engine(
                options=dict(poolclass=pool.StaticPool))
        else:
            return config.db
项目:jf    作者:rolycg    | 项目源码 | 文件源码
def create_database():
    engine = create_engine('sqlite:///database.db', connect_args={'check_same_thread': False}, poolclass=StaticPool)
    Base.metadata.create_all(engine)
    return engine
项目:jf    作者:rolycg    | 项目源码 | 文件源码
def connect_database():
    engine = create_engine('sqlite:///database.db', connect_args={'check_same_thread': False}, poolclass=StaticPool)
    return engine
项目:micro-blog    作者:nickChenyx    | 项目源码 | 文件源码
def setup_bind(cls):
        if config.requirements.independent_connections.enabled:
            from sqlalchemy import pool
            return engines.testing_engine(
                options=dict(poolclass=pool.StaticPool))
        else:
            return config.db
项目:python-flask-security    作者:weinbergdavid    | 项目源码 | 文件源码
def setup_bind(cls):
        if config.requirements.independent_connections.enabled:
            from sqlalchemy import pool
            return engines.testing_engine(
                options=dict(poolclass=pool.StaticPool))
        else:
            return config.db
项目:watcher    作者:nosmokingbandit    | 项目源码 | 文件源码
def setup_bind(cls):
        if config.requirements.independent_connections.enabled:
            from sqlalchemy import pool
            return engines.testing_engine(
                options=dict(poolclass=pool.StaticPool))
        else:
            return config.db
项目:Lixiang_zhaoxin    作者:hejaxian    | 项目源码 | 文件源码
def setup_bind(cls):
        if config.requirements.independent_connections.enabled:
            from sqlalchemy import pool
            return engines.testing_engine(
                options=dict(poolclass=pool.StaticPool))
        else:
            return config.db
项目:flask    作者:bobohope    | 项目源码 | 文件源码
def setup_bind(cls):
        if config.requirements.independent_connections.enabled:
            from sqlalchemy import pool
            return engines.testing_engine(
                options=dict(poolclass=pool.StaticPool))
        else:
            return config.db
项目:Chorus    作者:DonaldBough    | 项目源码 | 文件源码
def setup_bind(cls):
        if config.requirements.independent_connections.enabled:
            from sqlalchemy import pool
            return engines.testing_engine(
                options=dict(poolclass=pool.StaticPool))
        else:
            return config.db
项目:Hawkeye    作者:tozhengxq    | 项目源码 | 文件源码
def _connect(self, url):
        self.url = url
        # TODO: seems like 0.5.x branch does not work with engine.dispose and staticpool
        #self.engine = create_engine(url, echo=True, poolclass=StaticPool)
        self.engine = create_engine(url, echo=True)
        # silence the logger added by SA, nose adds its own!
        logging.getLogger('sqlalchemy').handlers=[]
        self.meta = MetaData(bind=self.engine)
        if self.level < self.CONNECT:
            return
        #self.session = create_session(bind=self.engine)
        if self.level < self.TXN:
            return
        #self.txn = self.session.begin()
项目:Alfred    作者:jkachhadia    | 项目源码 | 文件源码
def setup_bind(cls):
        if config.requirements.independent_connections.enabled:
            from sqlalchemy import pool
            return engines.testing_engine(
                options=dict(poolclass=pool.StaticPool))
        else:
            return config.db
项目:ngx_status    作者:YoYoAdorkable    | 项目源码 | 文件源码
def setup_bind(cls):
        if config.requirements.independent_connections.enabled:
            from sqlalchemy import pool
            return engines.testing_engine(
                options=dict(poolclass=pool.StaticPool))
        else:
            return config.db
项目:Price-Comparator    作者:Thejas-1    | 项目源码 | 文件源码
def construct_engine(engine, **opts):
    """.. versionadded:: 0.5.4

    Constructs and returns SQLAlchemy engine.

    Currently, there are 2 ways to pass create_engine options to :mod:`migrate.versioning.api` functions:

    :param engine: connection string or a existing engine
    :param engine_dict: python dictionary of options to pass to `create_engine`
    :param engine_arg_*: keyword parameters to pass to `create_engine` (evaluated with :func:`migrate.versioning.util.guess_obj_type`)
    :type engine_dict: dict
    :type engine: string or Engine instance
    :type engine_arg_*: string
    :returns: SQLAlchemy Engine

    .. note::

        keyword parameters override ``engine_dict`` values.

    """
    if isinstance(engine, Engine):
        return engine
    elif not isinstance(engine, basestring):
        raise ValueError("you need to pass either an existing engine or a database uri")

    # get options for create_engine
    if opts.get('engine_dict') and isinstance(opts['engine_dict'], dict):
        kwargs = opts['engine_dict']
    else:
        kwargs = dict()

    # DEPRECATED: handle echo the old way
    echo = asbool(opts.get('echo', False))
    if echo:
        warnings.warn('echo=True parameter is deprecated, pass '
            'engine_arg_echo=True or engine_dict={"echo": True}',
            exceptions.MigrateDeprecationWarning)
        kwargs['echo'] = echo

    # parse keyword arguments
    for key, value in opts.iteritems():
        if key.startswith('engine_arg_'):
            kwargs[key[11:]] = guess_obj_type(value)

    log.debug('Constructing engine')
    # TODO: return create_engine(engine, poolclass=StaticPool, **kwargs)
    # seems like 0.5.x branch does not work with engine.dispose and staticpool
    return create_engine(engine, **kwargs)
项目:Hawkeye    作者:tozhengxq    | 项目源码 | 文件源码
def construct_engine(engine, **opts):
    """.. versionadded:: 0.5.4

    Constructs and returns SQLAlchemy engine.

    Currently, there are 2 ways to pass create_engine options to :mod:`migrate.versioning.api` functions:

    :param engine: connection string or a existing engine
    :param engine_dict: python dictionary of options to pass to `create_engine`
    :param engine_arg_*: keyword parameters to pass to `create_engine` (evaluated with :func:`migrate.versioning.util.guess_obj_type`)
    :type engine_dict: dict
    :type engine: string or Engine instance
    :type engine_arg_*: string
    :returns: SQLAlchemy Engine

    .. note::

        keyword parameters override ``engine_dict`` values.

    """
    if isinstance(engine, Engine):
        return engine
    elif not isinstance(engine, six.string_types):
        raise ValueError("you need to pass either an existing engine or a database uri")

    # get options for create_engine
    if opts.get('engine_dict') and isinstance(opts['engine_dict'], dict):
        kwargs = opts['engine_dict']
    else:
        kwargs = dict()

    # DEPRECATED: handle echo the old way
    echo = asbool(opts.get('echo', False))
    if echo:
        warnings.warn('echo=True parameter is deprecated, pass '
            'engine_arg_echo=True or engine_dict={"echo": True}',
            exceptions.MigrateDeprecationWarning)
        kwargs['echo'] = echo

    # parse keyword arguments
    for key, value in six.iteritems(opts):
        if key.startswith('engine_arg_'):
            kwargs[key[11:]] = guess_obj_type(value)

    log.debug('Constructing engine')
    # TODO: return create_engine(engine, poolclass=StaticPool, **kwargs)
    # seems like 0.5.x branch does not work with engine.dispose and staticpool
    return create_engine(engine, **kwargs)