我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用psycopg2.extensions.STATUS_READY。
def test_tpc_commit(self): cnn = self.connect() xid = cnn.xid(1, "gtrid", "bqual") self.assertEqual(cnn.status, ext.STATUS_READY) cnn.tpc_begin(xid) self.assertEqual(cnn.status, ext.STATUS_BEGIN) cur = cnn.cursor() cur.execute("insert into test_tpc values ('test_tpc_commit');") self.assertEqual(0, self.count_xacts()) self.assertEqual(0, self.count_test_records()) cnn.tpc_prepare() self.assertEqual(cnn.status, ext.STATUS_PREPARED) self.assertEqual(1, self.count_xacts()) self.assertEqual(0, self.count_test_records()) cnn.tpc_commit() self.assertEqual(cnn.status, ext.STATUS_READY) self.assertEqual(0, self.count_xacts()) self.assertEqual(1, self.count_test_records())
def test_tpc_commit_one_phase(self): cnn = self.connect() xid = cnn.xid(1, "gtrid", "bqual") self.assertEqual(cnn.status, ext.STATUS_READY) cnn.tpc_begin(xid) self.assertEqual(cnn.status, ext.STATUS_BEGIN) cur = cnn.cursor() cur.execute("insert into test_tpc values ('test_tpc_commit_1p');") self.assertEqual(0, self.count_xacts()) self.assertEqual(0, self.count_test_records()) cnn.tpc_commit() self.assertEqual(cnn.status, ext.STATUS_READY) self.assertEqual(0, self.count_xacts()) self.assertEqual(1, self.count_test_records())
def test_tpc_commit_recovered(self): cnn = self.connect() xid = cnn.xid(1, "gtrid", "bqual") self.assertEqual(cnn.status, ext.STATUS_READY) cnn.tpc_begin(xid) self.assertEqual(cnn.status, ext.STATUS_BEGIN) cur = cnn.cursor() cur.execute("insert into test_tpc values ('test_tpc_commit_rec');") self.assertEqual(0, self.count_xacts()) self.assertEqual(0, self.count_test_records()) cnn.tpc_prepare() cnn.close() self.assertEqual(1, self.count_xacts()) self.assertEqual(0, self.count_test_records()) cnn = self.connect() xid = cnn.xid(1, "gtrid", "bqual") cnn.tpc_commit(xid) self.assertEqual(cnn.status, ext.STATUS_READY) self.assertEqual(0, self.count_xacts()) self.assertEqual(1, self.count_test_records())
def test_tpc_rollback(self): cnn = self.connect() xid = cnn.xid(1, "gtrid", "bqual") self.assertEqual(cnn.status, ext.STATUS_READY) cnn.tpc_begin(xid) self.assertEqual(cnn.status, ext.STATUS_BEGIN) cur = cnn.cursor() cur.execute("insert into test_tpc values ('test_tpc_rollback');") self.assertEqual(0, self.count_xacts()) self.assertEqual(0, self.count_test_records()) cnn.tpc_prepare() self.assertEqual(cnn.status, ext.STATUS_PREPARED) self.assertEqual(1, self.count_xacts()) self.assertEqual(0, self.count_test_records()) cnn.tpc_rollback() self.assertEqual(cnn.status, ext.STATUS_READY) self.assertEqual(0, self.count_xacts()) self.assertEqual(0, self.count_test_records())
def test_tpc_rollback_one_phase(self): cnn = self.connect() xid = cnn.xid(1, "gtrid", "bqual") self.assertEqual(cnn.status, ext.STATUS_READY) cnn.tpc_begin(xid) self.assertEqual(cnn.status, ext.STATUS_BEGIN) cur = cnn.cursor() cur.execute("insert into test_tpc values ('test_tpc_rollback_1p');") self.assertEqual(0, self.count_xacts()) self.assertEqual(0, self.count_test_records()) cnn.tpc_rollback() self.assertEqual(cnn.status, ext.STATUS_READY) self.assertEqual(0, self.count_xacts()) self.assertEqual(0, self.count_test_records())
def test_default_no_autocommit(self): self.assertTrue(not self.conn.autocommit) self.assertEqual(self.conn.status, ext.STATUS_READY) self.assertEqual(self.conn.get_transaction_status(), ext.TRANSACTION_STATUS_IDLE) cur = self.conn.cursor() cur.execute('select 1;') self.assertEqual(self.conn.status, ext.STATUS_BEGIN) self.assertEqual(self.conn.get_transaction_status(), ext.TRANSACTION_STATUS_INTRANS) self.conn.rollback() self.assertEqual(self.conn.status, ext.STATUS_READY) self.assertEqual(self.conn.get_transaction_status(), ext.TRANSACTION_STATUS_IDLE)
def test_set_autocommit(self): self.conn.autocommit = True self.assertTrue(self.conn.autocommit) self.assertEqual(self.conn.status, ext.STATUS_READY) self.assertEqual(self.conn.get_transaction_status(), ext.TRANSACTION_STATUS_IDLE) cur = self.conn.cursor() cur.execute('select 1;') self.assertEqual(self.conn.status, ext.STATUS_READY) self.assertEqual(self.conn.get_transaction_status(), ext.TRANSACTION_STATUS_IDLE) self.conn.autocommit = False self.assertTrue(not self.conn.autocommit) self.assertEqual(self.conn.status, ext.STATUS_READY) self.assertEqual(self.conn.get_transaction_status(), ext.TRANSACTION_STATUS_IDLE) cur.execute('select 1;') self.assertEqual(self.conn.status, ext.STATUS_BEGIN) self.assertEqual(self.conn.get_transaction_status(), ext.TRANSACTION_STATUS_INTRANS)
def test_subclass_commit(self): commits = [] class MyConn(ext.connection): def commit(self): commits.append(None) super(MyConn, self).commit() with self.connect(connection_factory=MyConn) as conn: curs = conn.cursor() curs.execute("insert into test_with values (10)") self.assertEqual(conn.status, ext.STATUS_READY) self.assertTrue(commits) curs = self.conn.cursor() curs.execute("select * from test_with") self.assertEqual(curs.fetchall(), [(10,)])
def test_subclass_rollback(self): rollbacks = [] class MyConn(ext.connection): def rollback(self): rollbacks.append(None) super(MyConn, self).rollback() try: with self.connect(connection_factory=MyConn) as conn: curs = conn.cursor() curs.execute("insert into test_with values (11)") 1 / 0 except ZeroDivisionError: pass else: self.assertTrue("exception not raised") self.assertEqual(conn.status, ext.STATUS_READY) self.assertTrue(rollbacks) curs = conn.cursor() curs.execute("select * from test_with") self.assertEqual(curs.fetchall(), [])
def test_default_no_autocommit(self): self.assert_(not self.conn.autocommit) self.assertEqual(self.conn.status, ext.STATUS_READY) self.assertEqual(self.conn.get_transaction_status(), ext.TRANSACTION_STATUS_IDLE) cur = self.conn.cursor() cur.execute('select 1;') self.assertEqual(self.conn.status, ext.STATUS_BEGIN) self.assertEqual(self.conn.get_transaction_status(), ext.TRANSACTION_STATUS_INTRANS) self.conn.rollback() self.assertEqual(self.conn.status, ext.STATUS_READY) self.assertEqual(self.conn.get_transaction_status(), ext.TRANSACTION_STATUS_IDLE)
def test_set_autocommit(self): self.conn.autocommit = True self.assert_(self.conn.autocommit) self.assertEqual(self.conn.status, ext.STATUS_READY) self.assertEqual(self.conn.get_transaction_status(), ext.TRANSACTION_STATUS_IDLE) cur = self.conn.cursor() cur.execute('select 1;') self.assertEqual(self.conn.status, ext.STATUS_READY) self.assertEqual(self.conn.get_transaction_status(), ext.TRANSACTION_STATUS_IDLE) self.conn.autocommit = False self.assert_(not self.conn.autocommit) self.assertEqual(self.conn.status, ext.STATUS_READY) self.assertEqual(self.conn.get_transaction_status(), ext.TRANSACTION_STATUS_IDLE) cur.execute('select 1;') self.assertEqual(self.conn.status, ext.STATUS_BEGIN) self.assertEqual(self.conn.get_transaction_status(), ext.TRANSACTION_STATUS_INTRANS)
def test_subclass_commit(self): commits = [] class MyConn(ext.connection): def commit(self): commits.append(None) super(MyConn, self).commit() with self.connect(connection_factory=MyConn) as conn: curs = conn.cursor() curs.execute("insert into test_with values (10)") self.assertEqual(conn.status, ext.STATUS_READY) self.assert_(commits) curs = self.conn.cursor() curs.execute("select * from test_with") self.assertEqual(curs.fetchall(), [(10,)])
def test_subclass_rollback(self): rollbacks = [] class MyConn(ext.connection): def rollback(self): rollbacks.append(None) super(MyConn, self).rollback() try: with self.connect(connection_factory=MyConn) as conn: curs = conn.cursor() curs.execute("insert into test_with values (11)") 1 / 0 except ZeroDivisionError: pass else: self.assert_("exception not raised") self.assertEqual(conn.status, ext.STATUS_READY) self.assert_(rollbacks) curs = conn.cursor() curs.execute("select * from test_with") self.assertEqual(curs.fetchall(), [])
def test_with_error(self): try: with self.conn as conn: with conn.cursor() as curs: curs.execute("insert into test_with values (5)") 1/0 except ZeroDivisionError: pass self.assertEqual(self.conn.status, ext.STATUS_READY) self.assert_(not self.conn.closed) self.assert_(curs.closed) curs = self.conn.cursor() curs.execute("select * from test_with") self.assertEqual(curs.fetchall(), [])
def test_subclass_rollback(self): rollbacks = [] class MyConn(ext.connection): def rollback(self): rollbacks.append(None) super(MyConn, self).rollback() try: with self.connect(connection_factory=MyConn) as conn: curs = conn.cursor() curs.execute("insert into test_with values (11)") 1/0 except ZeroDivisionError: pass else: self.assert_("exception not raised") self.assertEqual(conn.status, ext.STATUS_READY) self.assert_(rollbacks) curs = conn.cursor() curs.execute("select * from test_with") self.assertEqual(curs.fetchall(), [])