我们从Python开源项目中,提取了以下16个代码示例,用于说明如何使用psycopg2.extras.NamedTupleCursor()。
def select(conn, query: str, params=None, name=None, itersize=5000): """Return a select statement's results as a namedtuple. Parameters ---------- conn : database connection query : select query string params : query parameters. name : server side cursor name. defaults to client side. itersize : number of records fetched by server. """ with conn.cursor(name, cursor_factory=NamedTupleCursor) as cursor: cursor.itersize = itersize cursor.execute(query, params) for result in cursor: yield result
def __fetch(self, fetch_all, sql=None, mf=MusicFilter(), cursor_factory=NamedTupleCursor): if sql is None: sql = '''select * from do_filter(%s::filter)''' if mf is None: mf = MusicFilter() if cursor_factory is None: cursor_factory = NamedTupleCursor arg = [mf.tuple()] with self.pg.cursor(cursor_factory=cursor_factory) as cursor: debug('Query: {}'.format(cursor.mogrify(sql, arg))) with benchmark("Fetching:"): cursor.execute(sql, arg) if fetch_all: s = cursor.fetchall() else: s = cursor.fetchone() # debug(s) return s
def get_records(conn, qualified_name): with conn: with conn.cursor(cursor_factory=NamedTupleCursor) as cursor: cursor.execute('select * from %s;' % qualified_name) records = cursor.fetchall() return records
def insert(conn, qualified_name: str, column_names, records): """Insert a collection of namedtuple records.""" query = create_insert_statement(qualified_name, column_names) with conn: with conn.cursor(cursor_factory=NamedTupleCursor) as cursor: for record in records: cursor.execute(query, record)
def assert_db_anonymized(db): cursor = db.cursor(cursor_factory=NamedTupleCursor) cursor.execute('SELECT * FROM customer;') customers = cursor.fetchall() assert_customer_anonymized(customers[0], 'name_1', 'fr', 'LAT', '111.111.111.111') assert_customer_anonymized(customers[1], 'name_2', 'tlh', 'KR', '111.111.111.111') cursor.execute('SELECT * FROM customer_address;') addresses = cursor.fetchall() assert_address_anonymized(addresses[0], 1, '15', 'France') assert_address_anonymized(addresses[1], 2, '6', 'Klingon Empire')
def __executefile(self, filepath): if self.dry_run: info("[DRY-RUN] Executing {}".format(filepath)) return schema_path = os.path.join(os.path.dirname(sys.argv[0]), filepath) with open(schema_path, "r") as s: with self.pg.cursor(cursor_factory=NamedTupleCursor) as cursor: cursor.execute(s.read())
def folders(self): sql = '''select name from folders''' with benchmark("DB Folders"): with self.pg.cursor(cursor_factory=NamedTupleCursor) as folder_cursor: folder_cursor.execute(sql, []) return [f.name for f in folder_cursor.fetchall()]
def filter(self, mf=MusicFilter(), cursor_factory=NamedTupleCursor): return self.__fetch(True, None, mf, cursor_factory)
def __fetchone(self, sql=None, mf=MusicFilter(), cursor_factory=NamedTupleCursor): return self.__fetch(False, sql, mf, cursor_factory)
def titles(self, mf=MusicFilter(), cursor_factory=NamedTupleCursor, fast=False): if fast: sql = """select distinct title as name from musics""" return self.__fetchfast(sql) else: if mf is None: sql = """select coalesce(array_agg(distinct title), array[]::text[]) as titles from musics""" else: sql = """select coalesce(array_agg(distinct title), array[]::text[]) as titles from do_filter(%s::filter)""" return self.__fetchone(sql, mf, cursor_factory).titles
def albums(self, mf=MusicFilter(), cursor_factory=NamedTupleCursor, fast=False): if fast: sql = """select distinct name from albums""" return self.__fetchfast(sql) else: if mf is None: sql = """select coalesce(array_agg(distinct name), array[]::text[]) as albums from albums""" else: sql = """select coalesce(array_agg(distinct album), array[]::text[]) as albums from do_filter(%s::filter)""" return self.__fetchone(sql, mf, cursor_factory).albums
def genres(self, mf=MusicFilter(), cursor_factory=NamedTupleCursor, fast=False): if fast: sql = """select distinct name from genres""" return self.__fetchfast(sql) else: if mf is None: sql = """select coalesce(array_agg(distinct name), array[]::text[]) as genres from genres""" else: sql = """select coalesce(array_agg(distinct genre), array[]::text[]) as genres from do_filter(%s::filter)""" return self.__fetchone(sql, mf, cursor_factory).genres
def keywords(self, mf=MusicFilter(), cursor_factory=NamedTupleCursor, fast=False): if fast: sql = """select distinct name from tags""" return self.__fetchfast(sql) else: if mf is None: sql = """select coalesce(array_agg(distinct name), array[]::text[]) as keywords from tags""" else: sql = """select coalesce(array_agg(distinct keywords), array[]::text[]) as keywords from (select unnest(array_cat_agg(keywords)) as keywords from do_filter(%s::filter)) k""" return self.__fetchone(sql, mf, cursor_factory).keywords
def artists(self, mf=MusicFilter(), cursor_factory=NamedTupleCursor, fast=False): if fast: sql = """select distinct name from artists""" return self.__fetchfast(sql) else: if mf is None: sql = """select coalesce(array_agg(distinct name), array[]::text[]) as artists from artists""" else: sql = """select coalesce(array_agg(distinct artist), array[]::text[]) as artists from do_filter(%s::filter)""" return self.__fetchone(sql, mf, cursor_factory).artists
def stats(self, mf=MusicFilter(), cursor_factory=NamedTupleCursor): sql = 'select * from do_stats(%s::filter)' return self.__fetchone(sql, mf, cursor_factory)
def __init__(self, dbname, username, password, host, port, cursor_type=None, verbose=False): self.sshtunnel = None self._psql_conn = None self.verbose = verbose # self.connection_string = ( # "dbname={} user={} " # "password={} " if password is not None else "{}" # "host={} " if host is not None else "{}" # "port={} " if host is not None else "{}").format( # dbname, # username, # password if password is not None else "", # host if host is not None else "", # port if port is not None else "") self.dbname = dbname self.username = username self.password = password self.host = host self.port = int(port) if port != "" else 5432 # self.connection_string = "dbname={} ".format(dbname) # self.connection_string += "user={} ".format(username) # self.connection_string += "password={} ".format(password) if password is not None else "" # self.connection_string += "host={} ".format(host) if host is not None else "" # self.connection_string += "port={}".format(port) if host is not None else "" # # print self.connection_string if cursor_type is None: self.cursor_factory = NamedTupleCursor elif cursor_type == "standard": self.cursor_factory = None elif cursor_type == "namedtuple": self.cursor_factory = NamedTupleCursor elif cursor_type == "dictlike": self.cursor_factory = DictCursor elif cursor_type == "dict": self.cursor_factory = RealDictCursor else: raise Exception("'cursor_type' unknonw: '{}'".format(cursor_type)) # self.conn = pgres.connect(connection_string, cursor_factory=NamedTupleCursor) # self._conn = pgres.connect(connection_string) self.connect(self.cursor_factory)