我们从Python开源项目中,提取了以下7个代码示例,用于说明如何使用sqlalchemy.pool.QueuePool()。
def setUp(self): self.test_db_file = "/tmp/gftest.db" connection_url = "sqlite:///{}".format(self.test_db_file) self.config = Config({ "db_test": { "connect_url": "sqlite:///:memory:", "pool_policy": "test" }, "db_test2": { "connect_url": connection_url }, "dbpool_test": { "poolclass": "QueuePool", "pool_size": 10, "pool_recycle": 3600, "pool_timeout": 20 } }) conn = sqlite3.connect(self.test_db_file) create_table_sql = """ create table user ( id integer primary key, name varchar(10) unique, grade int not null, description text not null ) """ conn.execute(create_table_sql) conn.execute(( "insert into user (id, name, grade, description) values " "(1, 'SamChi', 1, 'I am SamChi')" )) conn.commit()
def test_init_all(self): engine_manager = EngineManager() # ???manager???????????????? for i in xrange(0, 100): engine_manager.validate_config(self.config) engine_manager.init_all(self.config) test_engine = engine_manager.engine("test") self.assertIsNotNone(test_engine) # ??????? pool = test_engine.engine.pool self.assertIsInstance(pool, QueuePool) self.assertEquals(pool.size(), 10) self.assertEquals(pool._recycle, 3600) self.assertEquals(pool._timeout, 20) # ?????? connection = test_engine.engine.connect() result = connection.execute("select 1 + 1") self.assertEquals(tuple(result)[0][0], 2) test_engine2 = engine_manager.engine("test2") self.assertIsNotNone(test_engine2) connection = test_engine2.engine.connect() result = connection.execute("select * from user") self.assertEquals(tuple(result)[0], (1, "SamChi", 1, "I am SamChi"))
def setUp(self): config = Config({ "db_test": { "connect_url": "sqlite:///:memory:", "pool_policy": "test" }, "dbpool_test": { "poolclass": "QueuePool", "pool_size": 10, } }) _engine_manager.validate_config(config) _engine_manager.init_all(config) global Base engine_container = _engine_manager.engine("test") Base.metadata.create_all(engine_container.engine) student_sam = Student(id=1, name="Sam", fullname="SamChi", grade=1) student_jack = Student(id=2, name="Jack", fullname="JackMa", grade=2) student_betty = Student( id=3, name="Betty", fullname="Betty Smith", grade=1) grade1 = Grade(id=1, name="Grade One") grade2 = Grade(id=2, name="Grade Two") session = engine_container.session() session.add_all([ student_sam, student_jack, student_betty, grade1, grade2 ]) session.commit() self.config = config
def connect(poolclass=QueuePool): with open('database.yaml') as fd: config = yaml.load(fd) dburl = URL( 'postgres', host=config['host'], username=config['user'], database=config['db'], password=config['pass'], port=config['port'], ) return create_engine(dburl, poolclass=poolclass)
def __init__(self, db_conn_string): self.engine = create_engine(db_conn_string, poolclass=QueuePool) self.metadata = MetaData(bind=self.engine) self.tenant_lease = Table('tenant_lease', self.metadata, autoload=True) self.instance_lease = Table('instance_lease', self.metadata, autoload=True)
def connect(poolclass=QueuePool): with open('database.yaml') as f: profile = yaml.load(f) dbconfig = { 'host': profile['host'], 'username': profile['user'], 'database': profile['db'], 'password': profile['pass'], 'port': profile['port'], } dburl = URL('postgres', **dbconfig) return create_engine(dburl, poolclass=poolclass)
def ConnPool(index, suffix=None): """ ?????? :param index:?????? :param suffix:???? read,write """ index_suffix = str(index) if not suffix: index_suffix += str(suffix) if not index_suffix in _DB_POOL_: conf = SETTINGS[RUNING_CONFIG].dbconfig if not index in conf: index = 'bkwdb' dbconf = conf.get(index) # pool_size ?????,max_overflow ??????????????,timeout ???????? _DB_POOL_[index_suffix] = QueuePool(lambda: createconn(**dbconf), pool_size=2, max_overflow=0, timeout=10) conn = None for i in range(_DB_POOL_[index_suffix].size() + 1): conn = _DB_POOL_[index_suffix].connect() try: conn.ping() break except pymysql.OperationalError: conn.invalidate() return conn