我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用psycopg2.extensions.cursor()。
def test_cleanup_on_badconn_close(self): # ticket #148 conn = self.conn cur = conn.cursor() try: cur.execute("select pg_terminate_backend(pg_backend_pid())") except psycopg2.OperationalError, e: if e.pgcode != psycopg2.errorcodes.ADMIN_SHUTDOWN: raise except psycopg2.DatabaseError, e: # curiously when disconnected in green mode we get a DatabaseError # without pgcode. if e.pgcode is not None: raise self.assertEqual(conn.closed, 2) conn.close() self.assertEqual(conn.closed, 1)
def test_notices_consistent_order(self): conn = self.conn cur = conn.cursor() if self.conn.server_version >= 90300: cur.execute("set client_min_messages=debug1") cur.execute(""" create temp table table1 (id serial); create temp table table2 (id serial); """) cur.execute(""" create temp table table3 (id serial); create temp table table4 (id serial); """) self.assertEqual(4, len(conn.notices)) self.assertTrue('table1' in conn.notices[0]) self.assertTrue('table2' in conn.notices[1]) self.assertTrue('table3' in conn.notices[2]) self.assertTrue('table4' in conn.notices[3])
def test_concurrent_execution(self): def slave(): cnn = self.connect() cur = cnn.cursor() cur.execute("select pg_sleep(4)") cur.close() cnn.close() t1 = threading.Thread(target=slave) t2 = threading.Thread(target=slave) t0 = time.time() t1.start() t2.start() t1.join() t2.join() self.assertTrue(time.time() - t0 < 7, "something broken in concurrency")
def test_set_isolation_level_default(self): conn = self.connect() curs = conn.cursor() conn.autocommit = True curs.execute("set default_transaction_isolation to 'read committed'") conn.autocommit = False conn.set_isolation_level(ext.ISOLATION_LEVEL_SERIALIZABLE) self.assertEqual(conn.isolation_level, ext.ISOLATION_LEVEL_SERIALIZABLE) curs.execute("show transaction_isolation") self.assertEqual(curs.fetchone()[0], "serializable") conn.rollback() conn.set_isolation_level(ext.ISOLATION_LEVEL_DEFAULT) curs.execute("show transaction_isolation") self.assertEqual(curs.fetchone()[0], "read committed")
def test_isolation_level_read_committed(self): cnn1 = self.connect() cnn2 = self.connect() cnn2.set_isolation_level(ext.ISOLATION_LEVEL_READ_COMMITTED) cur1 = cnn1.cursor() cur1.execute("select count(*) from isolevel;") self.assertEqual(0, cur1.fetchone()[0]) cnn1.commit() cur2 = cnn2.cursor() cur2.execute("insert into isolevel values (10);") cur1.execute("insert into isolevel values (20);") cur2.execute("select count(*) from isolevel;") self.assertEqual(1, cur2.fetchone()[0]) cnn1.commit() cur2.execute("select count(*) from isolevel;") self.assertEqual(2, cur2.fetchone()[0]) cur1.execute("select count(*) from isolevel;") self.assertEqual(1, cur1.fetchone()[0]) cnn2.commit() cur1.execute("select count(*) from isolevel;") self.assertEqual(2, cur1.fetchone()[0])
def test_tpc_commit(self): cnn = self.connect() xid = cnn.xid(1, "gtrid", "bqual") self.assertEqual(cnn.status, ext.STATUS_READY) cnn.tpc_begin(xid) self.assertEqual(cnn.status, ext.STATUS_BEGIN) cur = cnn.cursor() cur.execute("insert into test_tpc values ('test_tpc_commit');") self.assertEqual(0, self.count_xacts()) self.assertEqual(0, self.count_test_records()) cnn.tpc_prepare() self.assertEqual(cnn.status, ext.STATUS_PREPARED) self.assertEqual(1, self.count_xacts()) self.assertEqual(0, self.count_test_records()) cnn.tpc_commit() self.assertEqual(cnn.status, ext.STATUS_READY) self.assertEqual(0, self.count_xacts()) self.assertEqual(1, self.count_test_records())
def test_tpc_commit_one_phase(self): cnn = self.connect() xid = cnn.xid(1, "gtrid", "bqual") self.assertEqual(cnn.status, ext.STATUS_READY) cnn.tpc_begin(xid) self.assertEqual(cnn.status, ext.STATUS_BEGIN) cur = cnn.cursor() cur.execute("insert into test_tpc values ('test_tpc_commit_1p');") self.assertEqual(0, self.count_xacts()) self.assertEqual(0, self.count_test_records()) cnn.tpc_commit() self.assertEqual(cnn.status, ext.STATUS_READY) self.assertEqual(0, self.count_xacts()) self.assertEqual(1, self.count_test_records())
def test_tpc_rollback(self): cnn = self.connect() xid = cnn.xid(1, "gtrid", "bqual") self.assertEqual(cnn.status, ext.STATUS_READY) cnn.tpc_begin(xid) self.assertEqual(cnn.status, ext.STATUS_BEGIN) cur = cnn.cursor() cur.execute("insert into test_tpc values ('test_tpc_rollback');") self.assertEqual(0, self.count_xacts()) self.assertEqual(0, self.count_test_records()) cnn.tpc_prepare() self.assertEqual(cnn.status, ext.STATUS_PREPARED) self.assertEqual(1, self.count_xacts()) self.assertEqual(0, self.count_test_records()) cnn.tpc_rollback() self.assertEqual(cnn.status, ext.STATUS_READY) self.assertEqual(0, self.count_xacts()) self.assertEqual(0, self.count_test_records())
def test_tpc_rollback_one_phase(self): cnn = self.connect() xid = cnn.xid(1, "gtrid", "bqual") self.assertEqual(cnn.status, ext.STATUS_READY) cnn.tpc_begin(xid) self.assertEqual(cnn.status, ext.STATUS_BEGIN) cur = cnn.cursor() cur.execute("insert into test_tpc values ('test_tpc_rollback_1p');") self.assertEqual(0, self.count_xacts()) self.assertEqual(0, self.count_test_records()) cnn.tpc_rollback() self.assertEqual(cnn.status, ext.STATUS_READY) self.assertEqual(0, self.count_xacts()) self.assertEqual(0, self.count_test_records())
def test_tpc_rollback_recovered(self): cnn = self.connect() xid = cnn.xid(1, "gtrid", "bqual") self.assertEqual(cnn.status, ext.STATUS_READY) cnn.tpc_begin(xid) self.assertEqual(cnn.status, ext.STATUS_BEGIN) cur = cnn.cursor() cur.execute("insert into test_tpc values ('test_tpc_commit_rec');") self.assertEqual(0, self.count_xacts()) self.assertEqual(0, self.count_test_records()) cnn.tpc_prepare() cnn.close() self.assertEqual(1, self.count_xacts()) self.assertEqual(0, self.count_test_records()) cnn = self.connect() xid = cnn.xid(1, "gtrid", "bqual") cnn.tpc_rollback(xid) self.assertEqual(cnn.status, ext.STATUS_READY) self.assertEqual(0, self.count_xacts()) self.assertEqual(0, self.count_test_records())
def test_set_read_only(self): self.assertTrue(self.conn.readonly is None) cur = self.conn.cursor() self.conn.set_session(readonly=True) self.assertTrue(self.conn.readonly is True) cur.execute("SHOW transaction_read_only;") self.assertEqual(cur.fetchone()[0], 'on') self.conn.rollback() cur.execute("SHOW transaction_read_only;") self.assertEqual(cur.fetchone()[0], 'on') self.conn.rollback() self.conn.set_session(readonly=False) self.assertTrue(self.conn.readonly is False) cur.execute("SHOW transaction_read_only;") self.assertEqual(cur.fetchone()[0], 'off') self.conn.rollback()
def test_set_deferrable(self): self.assertTrue(self.conn.deferrable is None) cur = self.conn.cursor() self.conn.set_session(readonly=True, deferrable=True) self.assertTrue(self.conn.deferrable is True) cur.execute("SHOW transaction_read_only;") self.assertEqual(cur.fetchone()[0], 'on') cur.execute("SHOW transaction_deferrable;") self.assertEqual(cur.fetchone()[0], 'on') self.conn.rollback() cur.execute("SHOW transaction_deferrable;") self.assertEqual(cur.fetchone()[0], 'on') self.conn.rollback() self.conn.set_session(deferrable=False) self.assertTrue(self.conn.deferrable is False) cur.execute("SHOW transaction_read_only;") self.assertEqual(cur.fetchone()[0], 'on') cur.execute("SHOW transaction_deferrable;") self.assertEqual(cur.fetchone()[0], 'off') self.conn.rollback()
def test_mixing_session_attribs(self): cur = self.conn.cursor() self.conn.autocommit = True self.conn.readonly = True cur.execute("SHOW transaction_read_only;") self.assertEqual(cur.fetchone()[0], 'on') cur.execute("SHOW default_transaction_read_only;") self.assertEqual(cur.fetchone()[0], 'on') self.conn.autocommit = False cur.execute("SHOW transaction_read_only;") self.assertEqual(cur.fetchone()[0], 'on') cur.execute("SHOW default_transaction_read_only;") self.assertEqual(cur.fetchone()[0], 'off')
def test_default_no_autocommit(self): self.assertTrue(not self.conn.autocommit) self.assertEqual(self.conn.status, ext.STATUS_READY) self.assertEqual(self.conn.get_transaction_status(), ext.TRANSACTION_STATUS_IDLE) cur = self.conn.cursor() cur.execute('select 1;') self.assertEqual(self.conn.status, ext.STATUS_BEGIN) self.assertEqual(self.conn.get_transaction_status(), ext.TRANSACTION_STATUS_INTRANS) self.conn.rollback() self.assertEqual(self.conn.status, ext.STATUS_READY) self.assertEqual(self.conn.get_transaction_status(), ext.TRANSACTION_STATUS_IDLE)
def test_set_autocommit(self): self.conn.autocommit = True self.assertTrue(self.conn.autocommit) self.assertEqual(self.conn.status, ext.STATUS_READY) self.assertEqual(self.conn.get_transaction_status(), ext.TRANSACTION_STATUS_IDLE) cur = self.conn.cursor() cur.execute('select 1;') self.assertEqual(self.conn.status, ext.STATUS_READY) self.assertEqual(self.conn.get_transaction_status(), ext.TRANSACTION_STATUS_IDLE) self.conn.autocommit = False self.assertTrue(not self.conn.autocommit) self.assertEqual(self.conn.status, ext.STATUS_READY) self.assertEqual(self.conn.get_transaction_status(), ext.TRANSACTION_STATUS_IDLE) cur.execute('select 1;') self.assertEqual(self.conn.status, ext.STATUS_BEGIN) self.assertEqual(self.conn.get_transaction_status(), ext.TRANSACTION_STATUS_INTRANS)
def test_subclass_commit(self): commits = [] class MyConn(ext.connection): def commit(self): commits.append(None) super(MyConn, self).commit() with self.connect(connection_factory=MyConn) as conn: curs = conn.cursor() curs.execute("insert into test_with values (10)") self.assertEqual(conn.status, ext.STATUS_READY) self.assertTrue(commits) curs = self.conn.cursor() curs.execute("select * from test_with") self.assertEqual(curs.fetchall(), [(10,)])
def test_subclass_rollback(self): rollbacks = [] class MyConn(ext.connection): def rollback(self): rollbacks.append(None) super(MyConn, self).rollback() try: with self.connect(connection_factory=MyConn) as conn: curs = conn.cursor() curs.execute("insert into test_with values (11)") 1 / 0 except ZeroDivisionError: pass else: self.assertTrue("exception not raised") self.assertEqual(conn.status, ext.STATUS_READY) self.assertTrue(rollbacks) curs = conn.cursor() curs.execute("select * from test_with") self.assertEqual(curs.fetchall(), [])
def test_with_error(self): try: with self.conn as conn: with conn.cursor() as curs: curs.execute("insert into test_with values (5)") 1 / 0 except ZeroDivisionError: pass self.assertEqual(self.conn.status, ext.STATUS_READY) self.assertTrue(not self.conn.closed) self.assertTrue(curs.closed) curs = self.conn.cursor() curs.execute("select * from test_with") self.assertEqual(curs.fetchall(), [])
def as_string(self, context): # is it a connection or cursor? if isinstance(context, ext.connection): conn = context elif isinstance(context, ext.cursor): conn = context.connection else: raise TypeError("context must be a connection or a cursor") a = ext.adapt(self._wrapped) if hasattr(a, 'prepare'): a.prepare(conn) rv = a.getquoted() if sys.version_info[0] >= 3 and isinstance(rv, bytes): rv = rv.decode(ext.encodings[conn.encoding]) return rv
def test_notices_consistent_order(self): conn = self.conn cur = conn.cursor() if self.conn.server_version >= 90300: cur.execute("set client_min_messages=debug1") cur.execute(""" create temp table table1 (id serial); create temp table table2 (id serial); """) cur.execute(""" create temp table table3 (id serial); create temp table table4 (id serial); """) self.assertEqual(4, len(conn.notices)) self.assert_('table1' in conn.notices[0]) self.assert_('table2' in conn.notices[1]) self.assert_('table3' in conn.notices[2]) self.assert_('table4' in conn.notices[3])
def test_concurrent_execution(self): def slave(): cnn = self.connect() cur = cnn.cursor() cur.execute("select pg_sleep(4)") cur.close() cnn.close() t1 = threading.Thread(target=slave) t2 = threading.Thread(target=slave) t0 = time.time() t1.start() t2.start() t1.join() t2.join() self.assert_(time.time() - t0 < 7, "something broken in concurrency")
def test_cursor_factory(self): self.assertEqual(self.conn.cursor_factory, None) cur = self.conn.cursor() cur.execute("select 1 as a") self.assertRaises(TypeError, (lambda r: r['a']), cur.fetchone()) self.conn.cursor_factory = psycopg2.extras.DictCursor self.assertEqual(self.conn.cursor_factory, psycopg2.extras.DictCursor) cur = self.conn.cursor() cur.execute("select 1 as a") self.assertEqual(cur.fetchone()['a'], 1) self.conn.cursor_factory = None self.assertEqual(self.conn.cursor_factory, None) cur = self.conn.cursor() cur.execute("select 1 as a") self.assertRaises(TypeError, (lambda r: r['a']), cur.fetchone())
def clear_test_xacts(self): """Rollback all the prepared transaction in the testing db.""" cnn = self.connect() cnn.set_isolation_level(0) cur = cnn.cursor() try: cur.execute( "select gid from pg_prepared_xacts where database = %s", (dbname,)) except psycopg2.ProgrammingError: cnn.rollback() cnn.close() return gids = [r[0] for r in cur] for gid in gids: cur.execute("rollback prepared %s;", (gid,)) cnn.close()
def test_tpc_commit_recovered(self): cnn = self.connect() xid = cnn.xid(1, "gtrid", "bqual") self.assertEqual(cnn.status, ext.STATUS_READY) cnn.tpc_begin(xid) self.assertEqual(cnn.status, ext.STATUS_BEGIN) cur = cnn.cursor() cur.execute("insert into test_tpc values ('test_tpc_commit_rec');") self.assertEqual(0, self.count_xacts()) self.assertEqual(0, self.count_test_records()) cnn.tpc_prepare() cnn.close() self.assertEqual(1, self.count_xacts()) self.assertEqual(0, self.count_test_records()) cnn = self.connect() xid = cnn.xid(1, "gtrid", "bqual") cnn.tpc_commit(xid) self.assertEqual(cnn.status, ext.STATUS_READY) self.assertEqual(0, self.count_xacts()) self.assertEqual(1, self.count_test_records())
def test_set_read_only(self): self.assert_(self.conn.readonly is None) cur = self.conn.cursor() self.conn.set_session(readonly=True) self.assert_(self.conn.readonly is True) cur.execute("SHOW transaction_read_only;") self.assertEqual(cur.fetchone()[0], 'on') self.conn.rollback() cur.execute("SHOW transaction_read_only;") self.assertEqual(cur.fetchone()[0], 'on') self.conn.rollback() self.conn.set_session(readonly=False) self.assert_(self.conn.readonly is False) cur.execute("SHOW transaction_read_only;") self.assertEqual(cur.fetchone()[0], 'off') self.conn.rollback()
def test_set_deferrable(self): self.assert_(self.conn.deferrable is None) cur = self.conn.cursor() self.conn.set_session(readonly=True, deferrable=True) self.assert_(self.conn.deferrable is True) cur.execute("SHOW transaction_read_only;") self.assertEqual(cur.fetchone()[0], 'on') cur.execute("SHOW transaction_deferrable;") self.assertEqual(cur.fetchone()[0], 'on') self.conn.rollback() cur.execute("SHOW transaction_deferrable;") self.assertEqual(cur.fetchone()[0], 'on') self.conn.rollback() self.conn.set_session(deferrable=False) self.assert_(self.conn.deferrable is False) cur.execute("SHOW transaction_read_only;") self.assertEqual(cur.fetchone()[0], 'on') cur.execute("SHOW transaction_deferrable;") self.assertEqual(cur.fetchone()[0], 'off') self.conn.rollback()
def test_default_no_autocommit(self): self.assert_(not self.conn.autocommit) self.assertEqual(self.conn.status, ext.STATUS_READY) self.assertEqual(self.conn.get_transaction_status(), ext.TRANSACTION_STATUS_IDLE) cur = self.conn.cursor() cur.execute('select 1;') self.assertEqual(self.conn.status, ext.STATUS_BEGIN) self.assertEqual(self.conn.get_transaction_status(), ext.TRANSACTION_STATUS_INTRANS) self.conn.rollback() self.assertEqual(self.conn.status, ext.STATUS_READY) self.assertEqual(self.conn.get_transaction_status(), ext.TRANSACTION_STATUS_IDLE)
def test_set_autocommit(self): self.conn.autocommit = True self.assert_(self.conn.autocommit) self.assertEqual(self.conn.status, ext.STATUS_READY) self.assertEqual(self.conn.get_transaction_status(), ext.TRANSACTION_STATUS_IDLE) cur = self.conn.cursor() cur.execute('select 1;') self.assertEqual(self.conn.status, ext.STATUS_READY) self.assertEqual(self.conn.get_transaction_status(), ext.TRANSACTION_STATUS_IDLE) self.conn.autocommit = False self.assert_(not self.conn.autocommit) self.assertEqual(self.conn.status, ext.STATUS_READY) self.assertEqual(self.conn.get_transaction_status(), ext.TRANSACTION_STATUS_IDLE) cur.execute('select 1;') self.assertEqual(self.conn.status, ext.STATUS_BEGIN) self.assertEqual(self.conn.get_transaction_status(), ext.TRANSACTION_STATUS_INTRANS)
def test_subclass_commit(self): commits = [] class MyConn(ext.connection): def commit(self): commits.append(None) super(MyConn, self).commit() with self.connect(connection_factory=MyConn) as conn: curs = conn.cursor() curs.execute("insert into test_with values (10)") self.assertEqual(conn.status, ext.STATUS_READY) self.assert_(commits) curs = self.conn.cursor() curs.execute("select * from test_with") self.assertEqual(curs.fetchall(), [(10,)])
def test_subclass_rollback(self): rollbacks = [] class MyConn(ext.connection): def rollback(self): rollbacks.append(None) super(MyConn, self).rollback() try: with self.connect(connection_factory=MyConn) as conn: curs = conn.cursor() curs.execute("insert into test_with values (11)") 1 / 0 except ZeroDivisionError: pass else: self.assert_("exception not raised") self.assertEqual(conn.status, ext.STATUS_READY) self.assert_(rollbacks) curs = conn.cursor() curs.execute("select * from test_with") self.assertEqual(curs.fetchall(), [])
def test_with_error(self): try: with self.conn as conn: with conn.cursor() as curs: curs.execute("insert into test_with values (5)") 1 / 0 except ZeroDivisionError: pass self.assertEqual(self.conn.status, ext.STATUS_READY) self.assert_(not self.conn.closed) self.assert_(curs.closed) curs = self.conn.cursor() curs.execute("select * from test_with") self.assertEqual(curs.fetchall(), [])