Python psycopg2.extras 模块,NamedTupleCursor() 实例源码

我们从Python开源项目中,提取了以下16个代码示例,用于说明如何使用psycopg2.extras.NamedTupleCursor()

项目:postpy    作者:portfoliome    | 项目源码 | 文件源码
def select(conn, query: str, params=None, name=None, itersize=5000):
    """Return a select statement's results as a namedtuple.

    Parameters
    ----------
    conn : database connection
    query : select query string
    params : query parameters.
    name : server side cursor name. defaults to client side.
    itersize : number of records fetched by server.
    """

    with conn.cursor(name, cursor_factory=NamedTupleCursor) as cursor:
        cursor.itersize = itersize
        cursor.execute(query, params)

        for result in cursor:
            yield result
项目:musicbot    作者:AdrienPensart    | 项目源码 | 文件源码
def __fetch(self, fetch_all, sql=None, mf=MusicFilter(), cursor_factory=NamedTupleCursor):
        if sql is None:
            sql = '''select * from do_filter(%s::filter)'''
        if mf is None:
            mf = MusicFilter()
        if cursor_factory is None:
            cursor_factory = NamedTupleCursor
        arg = [mf.tuple()]
        with self.pg.cursor(cursor_factory=cursor_factory) as cursor:
            debug('Query: {}'.format(cursor.mogrify(sql, arg)))
            with benchmark("Fetching:"):
                cursor.execute(sql, arg)
                if fetch_all:
                    s = cursor.fetchall()
                else:
                    s = cursor.fetchone()
            # debug(s)
            return s
项目:postpy    作者:portfoliome    | 项目源码 | 文件源码
def get_records(conn, qualified_name):
    with conn:
        with conn.cursor(cursor_factory=NamedTupleCursor) as cursor:
            cursor.execute('select * from %s;' % qualified_name)
            records = cursor.fetchall()
    return records
项目:postpy    作者:portfoliome    | 项目源码 | 文件源码
def insert(conn, qualified_name: str, column_names, records):
    """Insert a collection of namedtuple records."""

    query = create_insert_statement(qualified_name, column_names)

    with conn:
        with conn.cursor(cursor_factory=NamedTupleCursor) as cursor:
            for record in records:
                cursor.execute(query, record)
项目:pgantomizer    作者:asgeirrr    | 项目源码 | 文件源码
def assert_db_anonymized(db):
    cursor = db.cursor(cursor_factory=NamedTupleCursor)
    cursor.execute('SELECT * FROM customer;')
    customers = cursor.fetchall()
    assert_customer_anonymized(customers[0], 'name_1', 'fr', 'LAT', '111.111.111.111')
    assert_customer_anonymized(customers[1], 'name_2', 'tlh', 'KR', '111.111.111.111')

    cursor.execute('SELECT * FROM customer_address;')
    addresses = cursor.fetchall()
    assert_address_anonymized(addresses[0], 1, '15', 'France')
    assert_address_anonymized(addresses[1], 2, '6', 'Klingon Empire')
项目:musicbot    作者:AdrienPensart    | 项目源码 | 文件源码
def __executefile(self, filepath):
        if self.dry_run:
            info("[DRY-RUN] Executing {}".format(filepath))
            return
        schema_path = os.path.join(os.path.dirname(sys.argv[0]), filepath)
        with open(schema_path, "r") as s:
            with self.pg.cursor(cursor_factory=NamedTupleCursor) as cursor:
                cursor.execute(s.read())
项目:musicbot    作者:AdrienPensart    | 项目源码 | 文件源码
def folders(self):
        sql = '''select name from folders'''
        with benchmark("DB Folders"):
            with self.pg.cursor(cursor_factory=NamedTupleCursor) as folder_cursor:
                folder_cursor.execute(sql, [])
                return [f.name for f in folder_cursor.fetchall()]
项目:musicbot    作者:AdrienPensart    | 项目源码 | 文件源码
def filter(self, mf=MusicFilter(), cursor_factory=NamedTupleCursor):
        return self.__fetch(True, None, mf, cursor_factory)
项目:musicbot    作者:AdrienPensart    | 项目源码 | 文件源码
def __fetchone(self, sql=None, mf=MusicFilter(), cursor_factory=NamedTupleCursor):
        return self.__fetch(False, sql, mf, cursor_factory)
项目:musicbot    作者:AdrienPensart    | 项目源码 | 文件源码
def titles(self, mf=MusicFilter(), cursor_factory=NamedTupleCursor, fast=False):
        if fast:
            sql = """select distinct title as name from musics"""
            return self.__fetchfast(sql)
        else:
            if mf is None:
                sql = """select coalesce(array_agg(distinct title), array[]::text[]) as titles from musics"""
            else:
                sql = """select coalesce(array_agg(distinct title), array[]::text[]) as titles from do_filter(%s::filter)"""
        return self.__fetchone(sql, mf, cursor_factory).titles
项目:musicbot    作者:AdrienPensart    | 项目源码 | 文件源码
def albums(self, mf=MusicFilter(), cursor_factory=NamedTupleCursor, fast=False):
        if fast:
            sql = """select distinct name from albums"""
            return self.__fetchfast(sql)
        else:
            if mf is None:
                sql = """select coalesce(array_agg(distinct name), array[]::text[]) as albums from albums"""
            else:
                sql = """select coalesce(array_agg(distinct album), array[]::text[]) as albums from do_filter(%s::filter)"""
        return self.__fetchone(sql, mf, cursor_factory).albums
项目:musicbot    作者:AdrienPensart    | 项目源码 | 文件源码
def genres(self, mf=MusicFilter(), cursor_factory=NamedTupleCursor, fast=False):
        if fast:
            sql = """select distinct name from genres"""
            return self.__fetchfast(sql)
        else:
            if mf is None:
                sql = """select coalesce(array_agg(distinct name), array[]::text[]) as genres from genres"""
            else:
                sql = """select coalesce(array_agg(distinct genre), array[]::text[]) as genres from do_filter(%s::filter)"""
        return self.__fetchone(sql, mf, cursor_factory).genres
项目:musicbot    作者:AdrienPensart    | 项目源码 | 文件源码
def keywords(self, mf=MusicFilter(), cursor_factory=NamedTupleCursor, fast=False):
        if fast:
            sql = """select distinct name from tags"""
            return self.__fetchfast(sql)
        else:
            if mf is None:
                sql = """select coalesce(array_agg(distinct name), array[]::text[]) as keywords from tags"""
            else:
                sql = """select coalesce(array_agg(distinct keywords), array[]::text[]) as keywords from (select unnest(array_cat_agg(keywords)) as keywords from do_filter(%s::filter)) k"""
        return self.__fetchone(sql, mf, cursor_factory).keywords
项目:musicbot    作者:AdrienPensart    | 项目源码 | 文件源码
def artists(self, mf=MusicFilter(), cursor_factory=NamedTupleCursor, fast=False):
        if fast:
            sql = """select distinct name from artists"""
            return self.__fetchfast(sql)
        else:
            if mf is None:
                sql = """select coalesce(array_agg(distinct name), array[]::text[]) as artists from artists"""
            else:
                sql = """select coalesce(array_agg(distinct artist), array[]::text[]) as artists from do_filter(%s::filter)"""
        return self.__fetchone(sql, mf, cursor_factory).artists
项目:musicbot    作者:AdrienPensart    | 项目源码 | 文件源码
def stats(self, mf=MusicFilter(), cursor_factory=NamedTupleCursor):
        sql = 'select * from do_stats(%s::filter)'
        return self.__fetchone(sql, mf, cursor_factory)
项目:Scalable-PaQL-Queries    作者:mattfeel    | 项目源码 | 文件源码
def __init__(self, dbname, username, password, host, port, cursor_type=None, verbose=False):
        self.sshtunnel = None
        self._psql_conn = None
        self.verbose = verbose
        # self.connection_string = (
        #   "dbname={} user={} "
        #   "password={} " if password is not None else "{}"
        #   "host={} " if host is not None else "{}"
        #   "port={} " if host is not None else "{}").format(
        #   dbname,
        #   username,
        #   password if password is not None else "",
        #   host if host is not None else "",
        #   port if port is not None else "")

        self.dbname = dbname
        self.username = username
        self.password = password
        self.host = host
        self.port = int(port) if port != "" else 5432

        # self.connection_string = "dbname={} ".format(dbname)
        # self.connection_string += "user={} ".format(username)
        # self.connection_string += "password={} ".format(password) if password is not None else ""
        # self.connection_string += "host={} ".format(host) if host is not None else ""
        # self.connection_string += "port={}".format(port) if host is not None else ""
        #
        # print self.connection_string

        if cursor_type is None:
            self.cursor_factory = NamedTupleCursor
        elif cursor_type == "standard":
            self.cursor_factory = None
        elif cursor_type == "namedtuple":
            self.cursor_factory = NamedTupleCursor
        elif cursor_type == "dictlike":
            self.cursor_factory = DictCursor
        elif cursor_type == "dict":
            self.cursor_factory = RealDictCursor
        else:
            raise Exception("'cursor_type' unknonw: '{}'".format(cursor_type))

        # self.conn = pgres.connect(connection_string, cursor_factory=NamedTupleCursor)
        # self._conn = pgres.connect(connection_string)

        self.connect(self.cursor_factory)