我们从Python开源项目中,提取了以下48个代码示例,用于说明如何使用psycopg2.extensions.ISOLATION_LEVEL_REPEATABLE_READ。
def main(): """The main program""" args = parse_args() items = [] now = str(int(time())) with connect(database=args.dbname) as conn: conn.set_session( isolation_level=ISOLATION_LEVEL_REPEATABLE_READ, readonly=True, ) for query in args.queries: rows = execute(conn, query) if len(rows) == 0: raise Exception('No result') items.extend( get_row_data(args.key_column, rows) if args.key_column else get_column_data(rows) ) for key, value in items: print(args.prefix + '.' + key, value, now)
def _isolation_lookup(self): extensions = self._psycopg2_extensions() return { 'AUTOCOMMIT': extensions.ISOLATION_LEVEL_AUTOCOMMIT, 'READ COMMITTED': extensions.ISOLATION_LEVEL_READ_COMMITTED, 'READ UNCOMMITTED': extensions.ISOLATION_LEVEL_READ_UNCOMMITTED, 'REPEATABLE READ': extensions.ISOLATION_LEVEL_REPEATABLE_READ, 'SERIALIZABLE': extensions.ISOLATION_LEVEL_SERIALIZABLE }
def cursor(self, autocommit=False, readonly=False): if self._connpool is None: self.connect() conn = self._connpool.getconn() if autocommit: conn.set_isolation_level(ISOLATION_LEVEL_AUTOCOMMIT) else: conn.set_isolation_level(ISOLATION_LEVEL_REPEATABLE_READ) cursor = Cursor(self._connpool, conn, self) if readonly: cursor.execute('SET TRANSACTION READ ONLY') return cursor
def test_set_isolation_level(self): conn = self.connect() curs = conn.cursor() levels = [ ('read uncommitted', ext.ISOLATION_LEVEL_READ_UNCOMMITTED), ('read committed', ext.ISOLATION_LEVEL_READ_COMMITTED), ('repeatable read', ext.ISOLATION_LEVEL_REPEATABLE_READ), ('serializable', ext.ISOLATION_LEVEL_SERIALIZABLE), ] for name, level in levels: conn.set_isolation_level(level) # the only values available on prehistoric PG versions if conn.server_version < 80000: if level in ( ext.ISOLATION_LEVEL_READ_UNCOMMITTED, ext.ISOLATION_LEVEL_REPEATABLE_READ): name, level = levels[levels.index((name, level)) + 1] self.assertEqual(conn.isolation_level, level) curs.execute('show transaction_isolation;') got_name = curs.fetchone()[0] self.assertEqual(name, got_name) conn.commit() self.assertRaises(ValueError, conn.set_isolation_level, -1) self.assertRaises(ValueError, conn.set_isolation_level, 5)
def test_set_isolation_level(self): cur = self.conn.cursor() self.conn.set_session( ext.ISOLATION_LEVEL_SERIALIZABLE) cur.execute("SHOW transaction_isolation;") self.assertEqual(cur.fetchone()[0], 'serializable') self.conn.rollback() self.conn.set_session( ext.ISOLATION_LEVEL_REPEATABLE_READ) cur.execute("SHOW transaction_isolation;") if self.conn.server_version > 80000: self.assertEqual(cur.fetchone()[0], 'repeatable read') else: self.assertEqual(cur.fetchone()[0], 'serializable') self.conn.rollback() self.conn.set_session( isolation_level=ext.ISOLATION_LEVEL_READ_COMMITTED) cur.execute("SHOW transaction_isolation;") self.assertEqual(cur.fetchone()[0], 'read committed') self.conn.rollback() self.conn.set_session( isolation_level=ext.ISOLATION_LEVEL_READ_UNCOMMITTED) cur.execute("SHOW transaction_isolation;") if self.conn.server_version > 80000: self.assertEqual(cur.fetchone()[0], 'read uncommitted') else: self.assertEqual(cur.fetchone()[0], 'read committed') self.conn.rollback()
def _isolation_lookup(self): from psycopg2 import extensions return { 'AUTOCOMMIT': extensions.ISOLATION_LEVEL_AUTOCOMMIT, 'READ COMMITTED': extensions.ISOLATION_LEVEL_READ_COMMITTED, 'READ UNCOMMITTED': extensions.ISOLATION_LEVEL_READ_UNCOMMITTED, 'REPEATABLE READ': extensions.ISOLATION_LEVEL_REPEATABLE_READ, 'SERIALIZABLE': extensions.ISOLATION_LEVEL_SERIALIZABLE }
def _isolation_lookup(self): extensions = __import__('psycopg2.extensions').extensions return { 'AUTOCOMMIT': extensions.ISOLATION_LEVEL_AUTOCOMMIT, 'READ COMMITTED': extensions.ISOLATION_LEVEL_READ_COMMITTED, 'READ UNCOMMITTED': extensions.ISOLATION_LEVEL_READ_UNCOMMITTED, 'REPEATABLE READ': extensions.ISOLATION_LEVEL_REPEATABLE_READ, 'SERIALIZABLE': extensions.ISOLATION_LEVEL_SERIALIZABLE }
def test_setattr_isolation_level_int(self): cur = self.conn.cursor() self.conn.isolation_level = ext.ISOLATION_LEVEL_SERIALIZABLE self.assertEqual(self.conn.isolation_level, ext.ISOLATION_LEVEL_SERIALIZABLE) cur.execute("SHOW transaction_isolation;") self.assertEqual(cur.fetchone()[0], 'serializable') self.conn.rollback() self.conn.isolation_level = ext.ISOLATION_LEVEL_REPEATABLE_READ cur.execute("SHOW transaction_isolation;") if self.conn.server_version > 80000: self.assertEqual(self.conn.isolation_level, ext.ISOLATION_LEVEL_REPEATABLE_READ) self.assertEqual(cur.fetchone()[0], 'repeatable read') else: self.assertEqual(self.conn.isolation_level, ext.ISOLATION_LEVEL_SERIALIZABLE) self.assertEqual(cur.fetchone()[0], 'serializable') self.conn.rollback() self.conn.isolation_level = ext.ISOLATION_LEVEL_READ_COMMITTED self.assertEqual(self.conn.isolation_level, ext.ISOLATION_LEVEL_READ_COMMITTED) cur.execute("SHOW transaction_isolation;") self.assertEqual(cur.fetchone()[0], 'read committed') self.conn.rollback() self.conn.isolation_level = ext.ISOLATION_LEVEL_READ_UNCOMMITTED cur.execute("SHOW transaction_isolation;") if self.conn.server_version > 80000: self.assertEqual(self.conn.isolation_level, ext.ISOLATION_LEVEL_READ_UNCOMMITTED) self.assertEqual(cur.fetchone()[0], 'read uncommitted') else: self.assertEqual(self.conn.isolation_level, ext.ISOLATION_LEVEL_READ_COMMITTED) self.assertEqual(cur.fetchone()[0], 'read committed') self.conn.rollback() self.assertEqual(ext.ISOLATION_LEVEL_DEFAULT, None) self.conn.isolation_level = ext.ISOLATION_LEVEL_DEFAULT self.assertEqual(self.conn.isolation_level, None) cur.execute("SHOW transaction_isolation;") isol = cur.fetchone()[0] cur.execute("SHOW default_transaction_isolation;") self.assertEqual(cur.fetchone()[0], isol)
def test_setattr_isolation_level_str(self): cur = self.conn.cursor() self.conn.isolation_level = "serializable" self.assertEqual(self.conn.isolation_level, ext.ISOLATION_LEVEL_SERIALIZABLE) cur.execute("SHOW transaction_isolation;") self.assertEqual(cur.fetchone()[0], 'serializable') self.conn.rollback() self.conn.isolation_level = "repeatable read" cur.execute("SHOW transaction_isolation;") if self.conn.server_version > 80000: self.assertEqual(self.conn.isolation_level, ext.ISOLATION_LEVEL_REPEATABLE_READ) self.assertEqual(cur.fetchone()[0], 'repeatable read') else: self.assertEqual(self.conn.isolation_level, ext.ISOLATION_LEVEL_SERIALIZABLE) self.assertEqual(cur.fetchone()[0], 'serializable') self.conn.rollback() self.conn.isolation_level = "read committed" self.assertEqual(self.conn.isolation_level, ext.ISOLATION_LEVEL_READ_COMMITTED) cur.execute("SHOW transaction_isolation;") self.assertEqual(cur.fetchone()[0], 'read committed') self.conn.rollback() self.conn.isolation_level = "read uncommitted" cur.execute("SHOW transaction_isolation;") if self.conn.server_version > 80000: self.assertEqual(self.conn.isolation_level, ext.ISOLATION_LEVEL_READ_UNCOMMITTED) self.assertEqual(cur.fetchone()[0], 'read uncommitted') else: self.assertEqual(self.conn.isolation_level, ext.ISOLATION_LEVEL_READ_COMMITTED) self.assertEqual(cur.fetchone()[0], 'read committed') self.conn.rollback() self.conn.isolation_level = "default" self.assertEqual(self.conn.isolation_level, None) cur.execute("SHOW transaction_isolation;") isol = cur.fetchone()[0] cur.execute("SHOW default_transaction_isolation;") self.assertEqual(cur.fetchone()[0], isol)
def fix_up_databases(databases): """Increase isolation level, use atomic requests. Does not modify connections to non-PostgreSQL databases. """ # Remove keys with null values from databases. databases.update({ alias: { key: value for key, value in database.items() if value is not None } for alias, database in databases.items() }) # Ensure that transactions are configured correctly. from psycopg2.extensions import ISOLATION_LEVEL_REPEATABLE_READ for _, database in databases.items(): engine = database.get("ENGINE") if engine == 'django.db.backends.postgresql_psycopg2': options = database.setdefault("OPTIONS", {}) # Explicitly set the transaction isolation level. MAAS needs a # particular transaction isolation level, and it enforces it. if "isolation_level" in options: isolation_level = options["isolation_level"] if isolation_level != ISOLATION_LEVEL_REPEATABLE_READ: warnings.warn( "isolation_level is set to %r; overriding to %r." % (isolation_level, ISOLATION_LEVEL_REPEATABLE_READ), RuntimeWarning, 2) options["isolation_level"] = ISOLATION_LEVEL_REPEATABLE_READ # Enable ATOMIC_REQUESTS: MAAS manages transactions across the # whole request/response lifecycle including middleware (Django, # in its infinite wisdom, does not). However we enable this # setting to ensure that views run within _savepoints_ so that # middleware exception handlers that suppress exceptions don't # inadvertently allow failed requests to be committed. if "ATOMIC_REQUESTS" in database: atomic_requests = database["ATOMIC_REQUESTS"] if not atomic_requests: warnings.warn( "ATOMIC_REQUESTS is set to %r; overriding to True." % (atomic_requests,), RuntimeWarning, 2) database["ATOMIC_REQUESTS"] = True