我们从Python开源项目中,提取了以下11个代码示例,用于说明如何使用sqlite3.version()。
def connect(self): if self not in opened_dbs: opened_dbs.append(self) self.log.debug("Connecting to %s (sqlite version: %s)..." % (self.db_path, sqlite3.version)) if not os.path.isdir(self.db_dir): # Directory not exist yet os.makedirs(self.db_dir) self.log.debug("Created Db path: %s" % self.db_dir) if not os.path.isfile(self.db_path): self.log.debug("Db file not exist yet: %s" % self.db_path) self.conn = sqlite3.connect(self.db_path) self.conn.row_factory = sqlite3.Row self.conn.isolation_level = None self.cur = self.getCursor() # We need more speed then security self.cur.execute("PRAGMA journal_mode = WAL") self.cur.execute("PRAGMA journal_mode = MEMORY") self.cur.execute("PRAGMA synchronous = OFF") if self.foreign_keys: self.execute("PRAGMA foreign_keys = ON") # Execute query using dbcursor
def getTableVersion(self, table_name): """if not self.table_names: # Get existing table names res = self.cur.execute("SELECT name FROM sqlite_master WHERE type='table'") self.table_names = [row["name"] for row in res] if table_name not in self.table_names: return False else:""" if not self.db_keyvalues: # Get db keyvalues try: res = self.cur.execute("SELECT * FROM keyvalue WHERE json_id=0") # json_id = 0 is internal keyvalues except sqlite3.OperationalError, err: # Table not exist self.log.debug("Query error: %s" % err) return False for row in res: self.db_keyvalues[row["key"]] = row["value"] return self.db_keyvalues.get("table.%s.version" % table_name, 0) # Check Db tables # Return: <list> Changed table names
def __init__(self, schema, db_path): self.db_path = db_path self.db_dir = os.path.dirname(db_path) + "/" self.schema = schema self.schema["version"] = self.schema.get("version", 1) self.conn = None self.cur = None self.log = logging.getLogger("Db:%s" % schema["db_name"]) self.table_names = None self.collect_stats = False self.foreign_keys = False self.query_stats = {} self.db_keyvalues = {} self.last_query_time = time.time()
def getCursor(self): if not self.conn: self.connect() return DbCursor(self.conn, self) # Get the table version # Return: Table version or None if not exist
def get_summary(cls): """ Return a dictionary of information about this database backend. """ summary = { "DB-API version": "2.0", "Database SQL type": cls.__name__, "Database SQL module": "sqlite3", "Database SQL Python module version": sqlite3.version, "Database SQL module version": sqlite3.sqlite_version, "Database SQL module location": sqlite3.__file__, } return summary
def test_main(): if test.support.verbose: print("test_sqlite: testing with version", "{!r}, sqlite_version {!r}".format(sqlite3.version, sqlite3.sqlite_version)) test.support.run_unittest(dbapi.suite(), types.suite(), userfunctions.suite(), factory.suite(), transactions.suite(), hooks.suite(), regression.suite(), dump.suite())
def load_tests(*args): if test.support.verbose: print("test_sqlite: testing with version", "{!r}, sqlite_version {!r}".format(sqlite3.version, sqlite3.sqlite_version)) return unittest.TestSuite([dbapi.suite(), types.suite(), userfunctions.suite(), factory.suite(), transactions.suite(), hooks.suite(), regression.suite(), dump.suite()])
def copytree(src, dst, symlinks=False): """Recursively copy a directory tree using copy2(). This is shutil's copytree modified version """ from shutil import copy2 names = os.listdir(src) if not os.path.isdir(dst): os.mkdir(dst) errors = [] for name in names: srcname = os.path.join(src, name) dstname = os.path.join(dst, name) try: if symlinks and os.path.islink(srcname): linkto = os.readlink(srcname) os.symlink(linkto, dstname) elif os.path.isdir(srcname): copytree(srcname, dstname, symlinks) else: copy2(srcname, dstname) except (IOError, os.error), why: errors.append((srcname, dstname, why)) # catch the Error from the recursive copytree so that we can # continue with other files except EnvironmentError, err: errors.extend(err.args[0]) if errors: raise EnvironmentError(errors)
def checkTables(self): s = time.time() changed_tables = [] cur = self.getCursor() cur.execute("BEGIN") # Check internal tables # Check keyvalue table changed = cur.needTable("keyvalue", [ ["keyvalue_id", "INTEGER PRIMARY KEY AUTOINCREMENT"], ["key", "TEXT"], ["value", "INTEGER"], ["json_id", "INTEGER REFERENCES json (json_id)"], ], [ "CREATE UNIQUE INDEX key_id ON keyvalue(json_id, key)" ], version=self.schema["version"]) if changed: changed_tables.append("keyvalue") # Check json table if self.schema["version"] == 1: changed = cur.needTable("json", [ ["json_id", "INTEGER PRIMARY KEY AUTOINCREMENT"], ["path", "VARCHAR(255)"] ], [ "CREATE UNIQUE INDEX path ON json(path)" ], version=self.schema["version"]) else: changed = cur.needTable("json", [ ["json_id", "INTEGER PRIMARY KEY AUTOINCREMENT"], ["directory", "VARCHAR(255)"], ["file_name", "VARCHAR(255)"] ], [ "CREATE UNIQUE INDEX path ON json(directory, file_name)" ], version=self.schema["version"]) if changed: changed_tables.append("json") # Check schema tables for table_name, table_settings in self.schema["tables"].items(): changed = cur.needTable( table_name, table_settings["cols"], table_settings["indexes"], version=table_settings["schema_changed"] ) if changed: changed_tables.append(table_name) cur.execute("COMMIT") self.log.debug("Db check done in %.3fs, changed tables: %s" % (time.time() - s, changed_tables)) if changed_tables: self.db_keyvalues = {} # Refresh table version cache return changed_tables # Load json file to db # Return: True if matched