我们从Python开源项目中,提取了以下44个代码示例,用于说明如何使用sqlite3.PARSE_COLNAMES。
def __init__(self, file_name, table_name=None, sheet_name=None): if sheet_name and not table_name: table_name = sheet_name elif sheet_name and table_name and (sheet_name != table_name): raise IOError, "Doubly-specified table name: %s and %s" % (sheet_name, table_name) self.table_name = table_name or 'PeptideData' if not os.path.exists(file_name): raise IOError("No such file: '%s'" % os.path.basename(file_name)) self.file_name = file_name self.conn = sqlite3.connect(file_name, detect_types=sqlite3.PARSE_COLNAMES) cursor = self.conn.execute("select name from sqlite_master where type='table' or type='view'") self.tables = tuple(t[0] for t in cursor.fetchall()) if self.table_name not in self.tables: self.conn.close() raise IOError("Table %s not found in %s" % (self.table_name, self.file_name)) cursor = self.conn.execute('select * from %s limit 1' % self.table_name) self.columns = [d[0] for d in cursor.description]
def bind_roles(self, server_id): """ This method will read all the roles from the server database and add them to their container :param name: Permission level name in config file :param container: Container list to add the groups to """ self.servers[server_id] = None if not os.path.exists("cache/"): os.makedirs("cache") # Connect to SQLite file for server in cache/SERVERID.sqlite con = sqlite3.connect("cache/" + server_id + ".sqlite", detect_types=sqlite3.PARSE_DECLTYPES | sqlite3.PARSE_COLNAMES) with con: cur = con.cursor() cur.execute("CREATE TABLE IF NOT EXISTS rank_binding(DiscordGroup TEXT PRIMARY KEY, Rank TEXT)") cur.execute("SELECT * FROM rank_binding") rows = cur.fetchall() rc = Ranks.RankContainer() rc.default.append("@everyone") for row in rows: if row[1] == "Admin": rc.admin.append(row[0]) if row[1] == "Mod": rc.mod.append(row[0]) if row[1] == "Member": rc.member.append(row[0]) self.servers[server_id] = rc
def admin(self, message_object, group): if message_object.author is message_object.server.owner: if not os.path.exists("cache/"): os.makedirs("cache") # Connect to SQLite file for server in cache/SERVERID.sqlite con = sqlite3.connect("cache/" + message_object.server.id + ".sqlite", detect_types=sqlite3.PARSE_DECLTYPES | sqlite3.PARSE_COLNAMES) with con: cur = con.cursor() cur.execute("CREATE TABLE IF NOT EXISTS rank_binding(DiscordGroup TEXT PRIMARY KEY, Rank TEXT)") cur.execute("INSERT OR IGNORE INTO rank_binding(DiscordGroup, Rank) VALUES(?, ?)", (group, "Admin"))
def bind(self, message_object, group, rank): if message_object.author is message_object.server.owner: if not os.path.exists("cache/"): os.makedirs("cache") # Connect to SQLite file for server in cache/SERVERID.sqlite con = sqlite3.connect("cache/" + message_object.server.id + ".sqlite", detect_types=sqlite3.PARSE_DECLTYPES | sqlite3.PARSE_COLNAMES) with con: cur = con.cursor() cur.execute("CREATE TABLE IF NOT EXISTS rank_binding(DiscordGroup TEXT PRIMARY KEY, Rank TEXT)") cur.execute("INSERT OR IGNORE INTO rank_binding(DiscordGroup, Rank) VALUES(?, ?)", (group, rank)) await self.pm.clientWrap.send_message(self.name, message_object.channel, "Group " + group + " was added as " + rank)
def connect(self): if self.database_name == ':memory:': path = ':memory:' else: db_filename = self.database_name + '.sqlite' path = os.path.join(config.get('database', 'path'), db_filename) if not os.path.isfile(path): raise IOError('Database "%s" doesn\'t exist!' % db_filename) if self._conn is not None: return self self._conn = sqlite.connect(path, detect_types=sqlite.PARSE_DECLTYPES | sqlite.PARSE_COLNAMES) self._conn.create_function('extract', 2, SQLiteExtract.extract) self._conn.create_function('date_trunc', 2, date_trunc) self._conn.create_function('split_part', 3, split_part) self._conn.create_function('position', 2, SQLitePosition.position) self._conn.create_function('overlay', 3, SQLiteOverlay.overlay) self._conn.create_function('overlay', 4, SQLiteOverlay.overlay) if sqlite.sqlite_version_info < (3, 3, 14): self._conn.create_function('replace', 3, replace) self._conn.create_function('now', 0, now) self._conn.create_function('sign', 1, sign) self._conn.create_function('greatest', -1, max) self._conn.create_function('least', -1, min) self._conn.execute('PRAGMA foreign_keys = ON') return self
def start(self): conn = sqlite3.connect(self._db, detect_types=sqlite3.PARSE_COLNAMES) c = conn.cursor() c.execute("SELECT MIN(created_at) as 'ts [timestamp]' from tweets") earliest = c.fetchone() if earliest[0] is None: earliest = datetime.now() else: earliest = earliest[0] c.close() return earliest
def end(self): conn = sqlite3.connect(self._db, detect_types=sqlite3.PARSE_COLNAMES) c = conn.cursor() c.execute("SELECT MAX(created_at) as 'ts [timestamp]' from tweets") latest = c.fetchone() if latest[0] is None: latest = datetime.now() else: latest = latest[0] c.close() return latest
def tweet_dates(self): conn = sqlite3.connect(self._db, detect_types=sqlite3.PARSE_COLNAMES) df = pd.read_sql_query( 'SELECT created_at FROM tweets', conn, parse_dates=['created_at'], index_col=['created_at'] ) return df
def setUp(self): self.con = sqlite.connect(":memory:", detect_types=sqlite.PARSE_COLNAMES) self.cur = self.con.cursor() self.cur.execute("create table test(x foo)") sqlite.converters["FOO"] = lambda x: "[%s]" % x sqlite.converters["BAR"] = lambda x: "<%s>" % x sqlite.converters["EXC"] = lambda x: 5 // 0 sqlite.converters["B1B1"] = lambda x: "MARKER"
def setUp(self): self.con = sqlite.connect(":memory:", detect_types=sqlite.PARSE_COLNAMES) sqlite.register_converter("bin", BinaryConverterTests.convert)
def CheckPragmaSchemaVersion(self): # This still crashed pysqlite <= 2.2.1 con = sqlite.connect(":memory:", detect_types=sqlite.PARSE_COLNAMES) try: cur = self.con.cursor() cur.execute("pragma schema_version") finally: cur.close() con.close()
def setUp(self): self.con = sqlite.connect(":memory:", detect_types=sqlite.PARSE_COLNAMES) self.cur = self.con.cursor() self.cur.execute("create table test(x foo)") sqlite.converters["FOO"] = lambda x: "[%s]" % x.decode("ascii") sqlite.converters["BAR"] = lambda x: "<%s>" % x.decode("ascii") sqlite.converters["EXC"] = lambda x: 5/0 sqlite.converters["B1B1"] = lambda x: "MARKER"
def _get_conn(self): id = get_ident() if id not in self._connection_cache: self._connection_cache[id] = sqlite3.connect(self.db_filepath, detect_types=sqlite3.PARSE_DECLTYPES | sqlite3.PARSE_COLNAMES) return self._connection_cache[id]
def __init__(self, dbfile=None, scan_changes=True, silent=False): if not dbfile: dblocation = appdirs.user_data_dir("PythonJukebox", "Razorvine") os.makedirs(dblocation, mode=0o700, exist_ok=True) dbfile = os.path.join(dblocation, "tracks.sqlite") dbfile = os.path.abspath(dbfile) self.dbfile = dbfile self.dbconn = sqlite3.connect(dbfile, detect_types=sqlite3.PARSE_DECLTYPES | sqlite3.PARSE_COLNAMES) self.dbconn.row_factory = sqlite3.Row # make sure we can get results by column name self.dbconn.execute("PRAGMA foreign_keys=ON") try: self.dbconn.execute("SELECT COUNT(*) FROM tracks").fetchone() if not silent: print("Connected to database.") print("Database file:", dbfile) if scan_changes: self.scan_changes() except sqlite3.OperationalError: # the table does not yet exist, create the schema if not silent: print("Creating new database.") print("Database file:", dbfile) self.dbconn.execute("""CREATE TABLE tracks ( id integer PRIMARY KEY, title nvarchar(260), artist nvarchar(260), album nvarchar(260), year int, genre nvarchar(100), duration real NOT NULL, modified timestamp NOT NULL, location nvarchar(500) NOT NULL, hash char(40) NOT NULL UNIQUE );""")
def __init__(self,name,recarray,cachesize=5,connection=None,is_tmp=False): """Build the SQL table from a numpy record array. T = SQLarray(<name>,<recarray>,cachesize=5,connection=None) :Arguments: name table name (can be referred to as __self__ in SQL queries) recarray numpy record array that describes the layout and initializes the table cachesize number of (query, result) pairs that are chached connection if not None, reuse this connection; this adds a new table to the same database, which allows more complicated queries with cross-joins. The table's connection is available as the attribute T.connection. is_tmp False: default, True: create a tmp table """ self.name = str(name) if self.name == self.tmp_table_name and not is_tmp: raise ValueError('name = %s is reserved, choose another one' % name) if connection is None: self.connection = sqlite.connect(':memory:', detect_types=sqlite.PARSE_DECLTYPES | sqlite.PARSE_COLNAMES) self._init_sqlite_functions() # add additional functions to database else: self.connection = connection # use existing connection self.cursor = self.connection.cursor() self.columns = recarray.dtype.names self.ncol = len(self.columns) # initialize table # * input is NOT sanitized and is NOT safe, don't use as CGI... # * this can overwrite an existing table (name is not checked) if not is_tmp: SQL = "CREATE TABLE "+self.name+" ("+",".join(self.columns)+")" else: # temporary table SQL = "CREATE TEMPORARY TABLE "+self.name+" ("+",".join(self.columns)+")" self.cursor.execute(SQL) SQL = "INSERT INTO "+self.name+" ("+ ",".join(self.columns)+") "\ +"VALUES "+"("+",".join(self.ncol*['?'])+")" self.cursor.executemany(SQL,recarray) self.__cache = KRingbuffer(cachesize)