Python sqlalchemy.pool 模块,QueuePool() 实例源码

我们从Python开源项目中,提取了以下7个代码示例,用于说明如何使用sqlalchemy.pool.QueuePool()

项目:girlfriend    作者:chihongze    | 项目源码 | 文件源码
def setUp(self):
        self.test_db_file = "/tmp/gftest.db"
        connection_url = "sqlite:///{}".format(self.test_db_file)
        self.config = Config({
            "db_test": {
                "connect_url": "sqlite:///:memory:",
                "pool_policy": "test"
            },
            "db_test2": {
                "connect_url": connection_url
            },
            "dbpool_test": {
                "poolclass": "QueuePool",
                "pool_size": 10,
                "pool_recycle": 3600,
                "pool_timeout": 20
            }
        })
        conn = sqlite3.connect(self.test_db_file)
        create_table_sql = """
        create table user (
            id integer primary key,
            name varchar(10) unique,
            grade int not null,
            description text not null
        )
        """
        conn.execute(create_table_sql)
        conn.execute((
            "insert into user (id, name, grade, description) values "
            "(1, 'SamChi', 1, 'I am SamChi')"
        ))
        conn.commit()
项目:girlfriend    作者:chihongze    | 项目源码 | 文件源码
def test_init_all(self):
        engine_manager = EngineManager()
        # ???manager????????????????
        for i in xrange(0, 100):
            engine_manager.validate_config(self.config)
            engine_manager.init_all(self.config)

            test_engine = engine_manager.engine("test")
            self.assertIsNotNone(test_engine)

            # ???????
            pool = test_engine.engine.pool
            self.assertIsInstance(pool, QueuePool)
            self.assertEquals(pool.size(), 10)
            self.assertEquals(pool._recycle, 3600)
            self.assertEquals(pool._timeout, 20)

            # ??????
            connection = test_engine.engine.connect()
            result = connection.execute("select 1 + 1")
            self.assertEquals(tuple(result)[0][0], 2)

            test_engine2 = engine_manager.engine("test2")
            self.assertIsNotNone(test_engine2)

            connection = test_engine2.engine.connect()
            result = connection.execute("select * from user")
            self.assertEquals(tuple(result)[0],
                              (1, "SamChi", 1, "I am SamChi"))
项目:girlfriend    作者:chihongze    | 项目源码 | 文件源码
def setUp(self):
        config = Config({
            "db_test": {
                "connect_url": "sqlite:///:memory:",
                "pool_policy": "test"
            },
            "dbpool_test": {
                "poolclass": "QueuePool",
                "pool_size": 10,
            }
        })
        _engine_manager.validate_config(config)
        _engine_manager.init_all(config)
        global Base
        engine_container = _engine_manager.engine("test")
        Base.metadata.create_all(engine_container.engine)

        student_sam = Student(id=1, name="Sam", fullname="SamChi", grade=1)
        student_jack = Student(id=2, name="Jack", fullname="JackMa", grade=2)
        student_betty = Student(
            id=3, name="Betty", fullname="Betty Smith", grade=1)
        grade1 = Grade(id=1, name="Grade One")
        grade2 = Grade(id=2, name="Grade Two")

        session = engine_container.session()
        session.add_all([
            student_sam,
            student_jack,
            student_betty,
            grade1,
            grade2
        ])
        session.commit()
        self.config = config
项目:triage    作者:dssg    | 项目源码 | 文件源码
def connect(poolclass=QueuePool):
    with open('database.yaml') as fd:
        config = yaml.load(fd)
        dburl = URL(
            'postgres',
            host=config['host'],
            username=config['user'],
            database=config['db'],
            password=config['pass'],
            port=config['port'],
        )
        return create_engine(dburl, poolclass=poolclass)
项目:mors    作者:openstack    | 项目源码 | 文件源码
def __init__(self, db_conn_string):
        self.engine = create_engine(db_conn_string, poolclass=QueuePool)
        self.metadata = MetaData(bind=self.engine)
        self.tenant_lease = Table('tenant_lease', self.metadata, autoload=True)
        self.instance_lease = Table('instance_lease', self.metadata, autoload=True)
项目:catwalk    作者:dssg    | 项目源码 | 文件源码
def connect(poolclass=QueuePool):
    with open('database.yaml') as f:
        profile = yaml.load(f)
        dbconfig = {
            'host': profile['host'],
            'username': profile['user'],
            'database': profile['db'],
            'password': profile['pass'],
            'port': profile['port'],
        }
        dburl = URL('postgres', **dbconfig)
        return create_engine(dburl, poolclass=poolclass)
项目:pythoners    作者:mlyangyue    | 项目源码 | 文件源码
def ConnPool(index, suffix=None):
    """
    ??????
    :param index:??????
    :param suffix:???? read,write
    """

    index_suffix = str(index)
    if not suffix:
        index_suffix += str(suffix)
    if not index_suffix in _DB_POOL_:
        conf = SETTINGS[RUNING_CONFIG].dbconfig
        if not index in conf:
            index = 'bkwdb'
        dbconf = conf.get(index)
        # pool_size ?????,max_overflow ??????????????,timeout ????????
        _DB_POOL_[index_suffix] = QueuePool(lambda: createconn(**dbconf), pool_size=2, max_overflow=0, timeout=10)
    conn = None
    for i in range(_DB_POOL_[index_suffix].size() + 1):
        conn = _DB_POOL_[index_suffix].connect()
        try:
            conn.ping()
            break
        except pymysql.OperationalError:
            conn.invalidate()

    return conn