Python sqlite3 模块,PARSE_COLNAMES 实例源码

我们从Python开源项目中,提取了以下44个代码示例,用于说明如何使用sqlite3.PARSE_COLNAMES

项目:multiplierz    作者:BlaisProteomics    | 项目源码 | 文件源码
def __init__(self, file_name, table_name=None, sheet_name=None):
        if sheet_name and not table_name:
            table_name = sheet_name
        elif sheet_name and table_name and (sheet_name != table_name):
            raise IOError, "Doubly-specified table name: %s and %s" % (sheet_name, table_name)
        self.table_name = table_name or 'PeptideData'

        if not os.path.exists(file_name):
            raise IOError("No such file: '%s'" % os.path.basename(file_name))

        self.file_name = file_name
        self.conn = sqlite3.connect(file_name, detect_types=sqlite3.PARSE_COLNAMES)

        cursor = self.conn.execute("select name from sqlite_master where type='table' or type='view'")
        self.tables = tuple(t[0] for t in cursor.fetchall())

        if self.table_name not in self.tables:
            self.conn.close()
            raise IOError("Table %s not found in %s" % (self.table_name, self.file_name))

        cursor = self.conn.execute('select * from %s limit 1' % self.table_name)
        self.columns = [d[0] for d in cursor.description]
项目:pineapple    作者:peter765    | 项目源码 | 文件源码
def bind_roles(self, server_id):
        """
        This method will read all the roles from the server database and add them to their container
        :param name: Permission level name in config file
        :param container: Container list to add the groups to
        """
        self.servers[server_id] = None

        if not os.path.exists("cache/"):
            os.makedirs("cache")

        # Connect to SQLite file for server in cache/SERVERID.sqlite
        con = sqlite3.connect("cache/" + server_id + ".sqlite",
                              detect_types=sqlite3.PARSE_DECLTYPES | sqlite3.PARSE_COLNAMES)

        with con:
            cur = con.cursor()
            cur.execute("CREATE TABLE IF NOT EXISTS rank_binding(DiscordGroup TEXT PRIMARY KEY, Rank TEXT)")
            cur.execute("SELECT * FROM rank_binding")
            rows = cur.fetchall()

            rc = Ranks.RankContainer()

            rc.default.append("@everyone")

            for row in rows:
                if row[1] == "Admin":
                    rc.admin.append(row[0])
                if row[1] == "Mod":
                    rc.mod.append(row[0])
                if row[1] == "Member":
                    rc.member.append(row[0])
            self.servers[server_id] = rc
项目:pineapple    作者:peter765    | 项目源码 | 文件源码
def admin(self, message_object, group):
        if message_object.author is message_object.server.owner:
            if not os.path.exists("cache/"):
                os.makedirs("cache")

            # Connect to SQLite file for server in cache/SERVERID.sqlite
            con = sqlite3.connect("cache/" + message_object.server.id + ".sqlite",
                                  detect_types=sqlite3.PARSE_DECLTYPES | sqlite3.PARSE_COLNAMES)

            with con:
                cur = con.cursor()
                cur.execute("CREATE TABLE IF NOT EXISTS rank_binding(DiscordGroup TEXT PRIMARY KEY, Rank TEXT)")
                cur.execute("INSERT OR IGNORE INTO rank_binding(DiscordGroup, Rank) VALUES(?, ?)", (group, "Admin"))
项目:pineapple    作者:peter765    | 项目源码 | 文件源码
def bind(self, message_object, group, rank):
        if message_object.author is message_object.server.owner:
            if not os.path.exists("cache/"):
                os.makedirs("cache")

            # Connect to SQLite file for server in cache/SERVERID.sqlite
            con = sqlite3.connect("cache/" + message_object.server.id + ".sqlite",
                                  detect_types=sqlite3.PARSE_DECLTYPES | sqlite3.PARSE_COLNAMES)

            with con:
                cur = con.cursor()
                cur.execute("CREATE TABLE IF NOT EXISTS rank_binding(DiscordGroup TEXT PRIMARY KEY, Rank TEXT)")
                cur.execute("INSERT OR IGNORE INTO rank_binding(DiscordGroup, Rank) VALUES(?, ?)", (group, rank))
                await self.pm.clientWrap.send_message(self.name, message_object.channel,
                                                      "Group " + group + " was added as " + rank)
项目:health-mosconi    作者:GNUHealth-Mosconi    | 项目源码 | 文件源码
def connect(self):
        if self.database_name == ':memory:':
            path = ':memory:'
        else:
            db_filename = self.database_name + '.sqlite'
            path = os.path.join(config.get('database', 'path'), db_filename)
            if not os.path.isfile(path):
                raise IOError('Database "%s" doesn\'t exist!' % db_filename)
        if self._conn is not None:
            return self
        self._conn = sqlite.connect(path,
            detect_types=sqlite.PARSE_DECLTYPES | sqlite.PARSE_COLNAMES)
        self._conn.create_function('extract', 2, SQLiteExtract.extract)
        self._conn.create_function('date_trunc', 2, date_trunc)
        self._conn.create_function('split_part', 3, split_part)
        self._conn.create_function('position', 2, SQLitePosition.position)
        self._conn.create_function('overlay', 3, SQLiteOverlay.overlay)
        self._conn.create_function('overlay', 4, SQLiteOverlay.overlay)
        if sqlite.sqlite_version_info < (3, 3, 14):
            self._conn.create_function('replace', 3, replace)
        self._conn.create_function('now', 0, now)
        self._conn.create_function('sign', 1, sign)
        self._conn.create_function('greatest', -1, max)
        self._conn.create_function('least', -1, min)
        self._conn.execute('PRAGMA foreign_keys = ON')
        return self
项目:tweetfeels    作者:uclatommy    | 项目源码 | 文件源码
def start(self):
        conn = sqlite3.connect(self._db, detect_types=sqlite3.PARSE_COLNAMES)
        c = conn.cursor()
        c.execute("SELECT MIN(created_at) as 'ts [timestamp]' from tweets")
        earliest = c.fetchone()
        if earliest[0] is None:
            earliest = datetime.now()
        else:
            earliest = earliest[0]
        c.close()
        return earliest
项目:tweetfeels    作者:uclatommy    | 项目源码 | 文件源码
def end(self):
        conn = sqlite3.connect(self._db, detect_types=sqlite3.PARSE_COLNAMES)
        c = conn.cursor()
        c.execute("SELECT MAX(created_at) as 'ts [timestamp]' from tweets")
        latest = c.fetchone()
        if latest[0] is None:
            latest = datetime.now()
        else:
            latest = latest[0]
        c.close()
        return latest
项目:tweetfeels    作者:uclatommy    | 项目源码 | 文件源码
def tweet_dates(self):
        conn = sqlite3.connect(self._db, detect_types=sqlite3.PARSE_COLNAMES)
        df = pd.read_sql_query(
            'SELECT created_at FROM tweets', conn, parse_dates=['created_at'],
            index_col=['created_at']
            )
        return df
项目:hostapd-mana    作者:adde88    | 项目源码 | 文件源码
def setUp(self):
        self.con = sqlite.connect(":memory:", detect_types=sqlite.PARSE_COLNAMES)
        self.cur = self.con.cursor()
        self.cur.execute("create table test(x foo)")

        sqlite.converters["FOO"] = lambda x: "[%s]" % x
        sqlite.converters["BAR"] = lambda x: "<%s>" % x
        sqlite.converters["EXC"] = lambda x: 5 // 0
        sqlite.converters["B1B1"] = lambda x: "MARKER"
项目:hostapd-mana    作者:adde88    | 项目源码 | 文件源码
def setUp(self):
        self.con = sqlite.connect(":memory:", detect_types=sqlite.PARSE_COLNAMES)
        sqlite.register_converter("bin", BinaryConverterTests.convert)
项目:hostapd-mana    作者:adde88    | 项目源码 | 文件源码
def CheckPragmaSchemaVersion(self):
        # This still crashed pysqlite <= 2.2.1
        con = sqlite.connect(":memory:", detect_types=sqlite.PARSE_COLNAMES)
        try:
            cur = self.con.cursor()
            cur.execute("pragma schema_version")
        finally:
            cur.close()
            con.close()
项目:vsphere-storage-for-docker    作者:vmware    | 项目源码 | 文件源码
def setUp(self):
        self.con = sqlite.connect(":memory:", detect_types=sqlite.PARSE_COLNAMES)
        self.cur = self.con.cursor()
        self.cur.execute("create table test(x foo)")

        sqlite.converters["FOO"] = lambda x: "[%s]" % x
        sqlite.converters["BAR"] = lambda x: "<%s>" % x
        sqlite.converters["EXC"] = lambda x: 5 // 0
        sqlite.converters["B1B1"] = lambda x: "MARKER"
项目:vsphere-storage-for-docker    作者:vmware    | 项目源码 | 文件源码
def setUp(self):
        self.con = sqlite.connect(":memory:", detect_types=sqlite.PARSE_COLNAMES)
        sqlite.register_converter("bin", BinaryConverterTests.convert)
项目:vsphere-storage-for-docker    作者:vmware    | 项目源码 | 文件源码
def CheckPragmaSchemaVersion(self):
        # This still crashed pysqlite <= 2.2.1
        con = sqlite.connect(":memory:", detect_types=sqlite.PARSE_COLNAMES)
        try:
            cur = self.con.cursor()
            cur.execute("pragma schema_version")
        finally:
            cur.close()
            con.close()
项目:OSPTF    作者:xSploited    | 项目源码 | 文件源码
def setUp(self):
        self.con = sqlite.connect(":memory:", detect_types=sqlite.PARSE_COLNAMES)
        self.cur = self.con.cursor()
        self.cur.execute("create table test(x foo)")

        sqlite.converters["FOO"] = lambda x: "[%s]" % x
        sqlite.converters["BAR"] = lambda x: "<%s>" % x
        sqlite.converters["EXC"] = lambda x: 5 // 0
        sqlite.converters["B1B1"] = lambda x: "MARKER"
项目:OSPTF    作者:xSploited    | 项目源码 | 文件源码
def setUp(self):
        self.con = sqlite.connect(":memory:", detect_types=sqlite.PARSE_COLNAMES)
        sqlite.register_converter("bin", BinaryConverterTests.convert)
项目:OSPTF    作者:xSploited    | 项目源码 | 文件源码
def CheckPragmaSchemaVersion(self):
        # This still crashed pysqlite <= 2.2.1
        con = sqlite.connect(":memory:", detect_types=sqlite.PARSE_COLNAMES)
        try:
            cur = self.con.cursor()
            cur.execute("pragma schema_version")
        finally:
            cur.close()
            con.close()
项目:pupy    作者:ru-faraon    | 项目源码 | 文件源码
def setUp(self):
        self.con = sqlite.connect(":memory:", detect_types=sqlite.PARSE_COLNAMES)
        self.cur = self.con.cursor()
        self.cur.execute("create table test(x foo)")

        sqlite.converters["FOO"] = lambda x: "[%s]" % x
        sqlite.converters["BAR"] = lambda x: "<%s>" % x
        sqlite.converters["EXC"] = lambda x: 5 // 0
        sqlite.converters["B1B1"] = lambda x: "MARKER"
项目:pupy    作者:ru-faraon    | 项目源码 | 文件源码
def setUp(self):
        self.con = sqlite.connect(":memory:", detect_types=sqlite.PARSE_COLNAMES)
        sqlite.register_converter("bin", BinaryConverterTests.convert)
项目:pupy    作者:ru-faraon    | 项目源码 | 文件源码
def CheckPragmaSchemaVersion(self):
        # This still crashed pysqlite <= 2.2.1
        con = sqlite.connect(":memory:", detect_types=sqlite.PARSE_COLNAMES)
        try:
            cur = self.con.cursor()
            cur.execute("pragma schema_version")
        finally:
            cur.close()
            con.close()
项目:zippy    作者:securesystemslab    | 项目源码 | 文件源码
def setUp(self):
        self.con = sqlite.connect(":memory:", detect_types=sqlite.PARSE_COLNAMES)
        self.cur = self.con.cursor()
        self.cur.execute("create table test(x foo)")

        sqlite.converters["FOO"] = lambda x: "[%s]" % x.decode("ascii")
        sqlite.converters["BAR"] = lambda x: "<%s>" % x.decode("ascii")
        sqlite.converters["EXC"] = lambda x: 5/0
        sqlite.converters["B1B1"] = lambda x: "MARKER"
项目:zippy    作者:securesystemslab    | 项目源码 | 文件源码
def setUp(self):
        self.con = sqlite.connect(":memory:", detect_types=sqlite.PARSE_COLNAMES)
        sqlite.register_converter("bin", BinaryConverterTests.convert)
项目:zippy    作者:securesystemslab    | 项目源码 | 文件源码
def CheckPragmaSchemaVersion(self):
        # This still crashed pysqlite <= 2.2.1
        con = sqlite.connect(":memory:", detect_types=sqlite.PARSE_COLNAMES)
        try:
            cur = self.con.cursor()
            cur.execute("pragma schema_version")
        finally:
            cur.close()
            con.close()
项目:oil    作者:oilshell    | 项目源码 | 文件源码
def setUp(self):
        self.con = sqlite.connect(":memory:", detect_types=sqlite.PARSE_COLNAMES)
        self.cur = self.con.cursor()
        self.cur.execute("create table test(x foo)")

        sqlite.converters["FOO"] = lambda x: "[%s]" % x
        sqlite.converters["BAR"] = lambda x: "<%s>" % x
        sqlite.converters["EXC"] = lambda x: 5 // 0
        sqlite.converters["B1B1"] = lambda x: "MARKER"
项目:oil    作者:oilshell    | 项目源码 | 文件源码
def setUp(self):
        self.con = sqlite.connect(":memory:", detect_types=sqlite.PARSE_COLNAMES)
        sqlite.register_converter("bin", BinaryConverterTests.convert)
项目:oil    作者:oilshell    | 项目源码 | 文件源码
def CheckPragmaSchemaVersion(self):
        # This still crashed pysqlite <= 2.2.1
        con = sqlite.connect(":memory:", detect_types=sqlite.PARSE_COLNAMES)
        try:
            cur = self.con.cursor()
            cur.execute("pragma schema_version")
        finally:
            cur.close()
            con.close()
项目:python2-tracer    作者:extremecoders-re    | 项目源码 | 文件源码
def setUp(self):
        self.con = sqlite.connect(":memory:", detect_types=sqlite.PARSE_COLNAMES)
        self.cur = self.con.cursor()
        self.cur.execute("create table test(x foo)")

        sqlite.converters["FOO"] = lambda x: "[%s]" % x
        sqlite.converters["BAR"] = lambda x: "<%s>" % x
        sqlite.converters["EXC"] = lambda x: 5 // 0
        sqlite.converters["B1B1"] = lambda x: "MARKER"
项目:python2-tracer    作者:extremecoders-re    | 项目源码 | 文件源码
def setUp(self):
        self.con = sqlite.connect(":memory:", detect_types=sqlite.PARSE_COLNAMES)
        sqlite.register_converter("bin", BinaryConverterTests.convert)
项目:python2-tracer    作者:extremecoders-re    | 项目源码 | 文件源码
def CheckPragmaSchemaVersion(self):
        # This still crashed pysqlite <= 2.2.1
        con = sqlite.connect(":memory:", detect_types=sqlite.PARSE_COLNAMES)
        try:
            cur = self.con.cursor()
            cur.execute("pragma schema_version")
        finally:
            cur.close()
            con.close()
项目:sslstrip-hsts-openwrt    作者:adde88    | 项目源码 | 文件源码
def setUp(self):
        self.con = sqlite.connect(":memory:", detect_types=sqlite.PARSE_COLNAMES)
        self.cur = self.con.cursor()
        self.cur.execute("create table test(x foo)")

        sqlite.converters["FOO"] = lambda x: "[%s]" % x
        sqlite.converters["BAR"] = lambda x: "<%s>" % x
        sqlite.converters["EXC"] = lambda x: 5 // 0
        sqlite.converters["B1B1"] = lambda x: "MARKER"
项目:sslstrip-hsts-openwrt    作者:adde88    | 项目源码 | 文件源码
def setUp(self):
        self.con = sqlite.connect(":memory:", detect_types=sqlite.PARSE_COLNAMES)
        sqlite.register_converter("bin", BinaryConverterTests.convert)
项目:sslstrip-hsts-openwrt    作者:adde88    | 项目源码 | 文件源码
def CheckPragmaSchemaVersion(self):
        # This still crashed pysqlite <= 2.2.1
        con = sqlite.connect(":memory:", detect_types=sqlite.PARSE_COLNAMES)
        try:
            cur = self.con.cursor()
            cur.execute("pragma schema_version")
        finally:
            cur.close()
            con.close()
项目:ouroboros    作者:pybee    | 项目源码 | 文件源码
def setUp(self):
        self.con = sqlite.connect(":memory:", detect_types=sqlite.PARSE_COLNAMES)
        self.cur = self.con.cursor()
        self.cur.execute("create table test(x foo)")

        sqlite.converters["FOO"] = lambda x: "[%s]" % x.decode("ascii")
        sqlite.converters["BAR"] = lambda x: "<%s>" % x.decode("ascii")
        sqlite.converters["EXC"] = lambda x: 5/0
        sqlite.converters["B1B1"] = lambda x: "MARKER"
项目:ouroboros    作者:pybee    | 项目源码 | 文件源码
def setUp(self):
        self.con = sqlite.connect(":memory:", detect_types=sqlite.PARSE_COLNAMES)
        sqlite.register_converter("bin", BinaryConverterTests.convert)
项目:ouroboros    作者:pybee    | 项目源码 | 文件源码
def CheckPragmaSchemaVersion(self):
        # This still crashed pysqlite <= 2.2.1
        con = sqlite.connect(":memory:", detect_types=sqlite.PARSE_COLNAMES)
        try:
            cur = self.con.cursor()
            cur.execute("pragma schema_version")
        finally:
            cur.close()
            con.close()
项目:ndk-python    作者:gittor    | 项目源码 | 文件源码
def setUp(self):
        self.con = sqlite.connect(":memory:", detect_types=sqlite.PARSE_COLNAMES)
        self.cur = self.con.cursor()
        self.cur.execute("create table test(x foo)")

        sqlite.converters["FOO"] = lambda x: "[%s]" % x
        sqlite.converters["BAR"] = lambda x: "<%s>" % x
        sqlite.converters["EXC"] = lambda x: 5 // 0
        sqlite.converters["B1B1"] = lambda x: "MARKER"
项目:ndk-python    作者:gittor    | 项目源码 | 文件源码
def setUp(self):
        self.con = sqlite.connect(":memory:", detect_types=sqlite.PARSE_COLNAMES)
        sqlite.register_converter("bin", BinaryConverterTests.convert)
项目:ndk-python    作者:gittor    | 项目源码 | 文件源码
def CheckPragmaSchemaVersion(self):
        # This still crashed pysqlite <= 2.2.1
        con = sqlite.connect(":memory:", detect_types=sqlite.PARSE_COLNAMES)
        try:
            cur = self.con.cursor()
            cur.execute("pragma schema_version")
        finally:
            cur.close()
            con.close()
项目:TweetSets    作者:justinlittman    | 项目源码 | 文件源码
def _get_conn(self):
        id = get_ident()
        if id not in self._connection_cache:
            self._connection_cache[id] = sqlite3.connect(self.db_filepath,
                                                         detect_types=sqlite3.PARSE_DECLTYPES | sqlite3.PARSE_COLNAMES)

        return self._connection_cache[id]
项目:kbe_server    作者:xiaohaoppy    | 项目源码 | 文件源码
def setUp(self):
        self.con = sqlite.connect(":memory:", detect_types=sqlite.PARSE_COLNAMES)
        self.cur = self.con.cursor()
        self.cur.execute("create table test(x foo)")

        sqlite.converters["FOO"] = lambda x: "[%s]" % x.decode("ascii")
        sqlite.converters["BAR"] = lambda x: "<%s>" % x.decode("ascii")
        sqlite.converters["EXC"] = lambda x: 5/0
        sqlite.converters["B1B1"] = lambda x: "MARKER"
项目:kbe_server    作者:xiaohaoppy    | 项目源码 | 文件源码
def setUp(self):
        self.con = sqlite.connect(":memory:", detect_types=sqlite.PARSE_COLNAMES)
        sqlite.register_converter("bin", BinaryConverterTests.convert)
项目:kbe_server    作者:xiaohaoppy    | 项目源码 | 文件源码
def CheckPragmaSchemaVersion(self):
        # This still crashed pysqlite <= 2.2.1
        con = sqlite.connect(":memory:", detect_types=sqlite.PARSE_COLNAMES)
        try:
            cur = self.con.cursor()
            cur.execute("pragma schema_version")
        finally:
            cur.close()
            con.close()
项目:synthesizer    作者:irmen    | 项目源码 | 文件源码
def __init__(self, dbfile=None, scan_changes=True, silent=False):
        if not dbfile:
            dblocation = appdirs.user_data_dir("PythonJukebox", "Razorvine")
            os.makedirs(dblocation, mode=0o700, exist_ok=True)
            dbfile = os.path.join(dblocation, "tracks.sqlite")
        dbfile = os.path.abspath(dbfile)
        self.dbfile = dbfile
        self.dbconn = sqlite3.connect(dbfile, detect_types=sqlite3.PARSE_DECLTYPES | sqlite3.PARSE_COLNAMES)
        self.dbconn.row_factory = sqlite3.Row   # make sure we can get results by column name
        self.dbconn.execute("PRAGMA foreign_keys=ON")
        try:
            self.dbconn.execute("SELECT COUNT(*) FROM tracks").fetchone()
            if not silent:
                print("Connected to database.")
                print("Database file:", dbfile)
            if scan_changes:
                self.scan_changes()
        except sqlite3.OperationalError:
            # the table does not yet exist, create the schema
            if not silent:
                print("Creating new database.")
                print("Database file:", dbfile)
            self.dbconn.execute("""CREATE TABLE tracks
                (
                    id integer PRIMARY KEY,
                    title nvarchar(260),
                    artist nvarchar(260),
                    album nvarchar(260),
                    year int,
                    genre nvarchar(100),
                    duration real NOT NULL,
                    modified timestamp NOT NULL,
                    location nvarchar(500) NOT NULL,
                    hash char(40) NOT NULL UNIQUE
                );""")
项目:AdK_analysis    作者:orbeckst    | 项目源码 | 文件源码
def __init__(self,name,recarray,cachesize=5,connection=None,is_tmp=False):
        """Build the SQL table from a numpy record array.

        T = SQLarray(<name>,<recarray>,cachesize=5,connection=None)

        :Arguments:
        name        table name (can be referred to as __self__ in SQL queries)
        recarray    numpy record array that describes the layout and initializes the
                    table
        cachesize   number of (query, result) pairs that are chached
        connection  if not None, reuse this connection; this adds a new table to the same
                    database, which allows more complicated queries with cross-joins. The
                    table's connection is available as the attribute T.connection.
        is_tmp      False: default, True: create a tmp table
        """
        self.name = str(name)
        if self.name == self.tmp_table_name and not is_tmp:
            raise ValueError('name = %s is reserved, choose another one' % name)
        if connection is None:
            self.connection = sqlite.connect(':memory:', detect_types=sqlite.PARSE_DECLTYPES | sqlite.PARSE_COLNAMES)
            self._init_sqlite_functions()   # add additional functions to database
        else:
            self.connection = connection    # use existing connection
        self.cursor = self.connection.cursor()
        self.columns = recarray.dtype.names
        self.ncol = len(self.columns)

        # initialize table
        # * input is NOT sanitized and is NOT safe, don't use as CGI...
        # * this can overwrite an existing table (name is not checked)
        if not is_tmp:
            SQL = "CREATE TABLE "+self.name+" ("+",".join(self.columns)+")"
        else:
            # temporary table
            SQL = "CREATE TEMPORARY TABLE "+self.name+" ("+",".join(self.columns)+")"
        self.cursor.execute(SQL)
        SQL = "INSERT INTO "+self.name+" ("+ ",".join(self.columns)+") "\
            +"VALUES "+"("+",".join(self.ncol*['?'])+")"
        self.cursor.executemany(SQL,recarray)
        self.__cache = KRingbuffer(cachesize)