我们从Python开源项目中,提取了以下21个代码示例,用于说明如何使用psycopg2.extras.RealDictCursor()。
def select_dict(conn, query: str, params=None, name=None, itersize=5000): """Return a select statement's results as dictionary. Parameters ---------- conn : database connection query : select query string params : query parameters. name : server side cursor name. defaults to client side. itersize : number of records fetched by server. """ with conn.cursor(name, cursor_factory=RealDictCursor) as cursor: cursor.itersize = itersize cursor.execute(query, params) for result in cursor: yield result
def get_task_status(self, task_id): """""" # type: (str)->Union[str,Dict[str,Any]] sql_update = "SELECT task_status, description updated_at FROM {} WHERE task_id=%s".format(self.table_task_status) cur = self.db_connection.cursor(cursor_factory=RealDictCursor) # type: cursor try: cur.execute(sql_update, (task_id, )) tasK_status_record = cur.fetchone() except: traceback_message = traceback.format_exc() return traceback_message else: cur.close() return tasK_status_record
def get_music(artist, album, title): '''Get a track tags or download it''' page_format = request.args.get('format', 'html') artist = unquote(artist) album = unquote(album) title = unquote(title) collection = app.config['COLLECTION'] mf = MusicFilter(artists=[artist], albums=[album], titles=[title]) musics = webfilter(partial(collection.filter, cursor_factory=RealDictCursor), mf) if len(musics) != 1: return ('Music not found', 404) music = musics[0] if page_format == 'html': return render_template("music.html", music=music) elif page_format == 'json': return dumps(music, sort_keys=True, indent=4, separators=(',', ': ')) return ('Invalid format, available: json,html', 400)
def remove_redundant_delays(conn, delays): if len(delays.values()) == 0: return delays # no new delays to add ids = delays.keys() cur = conn.cursor(cursor_factory=RealDictCursor) query = """ SELECT * FROM realtime_updates WHERE id IN %(ids)s """ cur.execute(query, {'ids': tuple(ids)}) results = cur.fetchall() cur.close() existing_delays = {get_delay_id(r): r for r in results} new_delays = {} for key, delay in delays.iteritems(): # if stop already registered in datbase if key in existing_delays: current = existing_delays[key] # and the file we're reading now is newer than the file in the database if current['s3_path'] < delay['s3_path']: # and the timings is different if current['arrival_delay'] != delay['arrival_delay'] \ or current['departure_delay'] != delay['departure_delay']: # then store the change new_delays[key] = delay else: new_delays[key] = delay return new_delays
def get_user(user_name: str): """Returns a user entry from PostgreSQL table. Returns ------- dict_user : dict A dictionary with user information """ with psycopg2.connect(cursor_factory=RealDictCursor, **PSQL_CONN) as conn: dict_cur = conn.cursor() dict_cur.execute( """SELECT * FROM {} WHERE user_name = '{}';""" .format(PSQL_TABLE, user_name)) dict_user = dict_cur.fetchone() conn.close() return dict_user
def execute(conn, query, query_vars=()): """Execute given query and return fetched results""" with conn.cursor(cursor_factory=RealDictCursor) as cursor: cursor.execute(query, query_vars) return cursor.fetchall()
def execute(conn, query): """Execute given query and return fetched results""" with conn.cursor(cursor_factory=RealDictCursor) as cursor: cursor.execute(query) return cursor.fetchall()
def connect_and_execute(query, database='postgres'): """Connect to database, execute given query and return fetched results""" conn = connect(database=database) try: conn.set_isolation_level(ISOLATION_LEVEL_AUTOCOMMIT) cursor = conn.cursor(cursor_factory=RealDictCursor) cursor.execute(query) return cursor.fetchall() finally: conn.close()
def get_processed_documents(self, task_id): """""" # type: (str)->Dict[str,Any] sql_update = "SELECT text_id,sentence_index,task_id,knp_result,status FROM {} WHERE task_id=%s".format(self.table_knp_result) cur = self.db_connection.cursor(cursor_factory=RealDictCursor) # type: cursor try: cur.execute(sql_update, (task_id, )) task_status_record = cur.fetchall() except: traceback_message = traceback.format_exc() return traceback_message else: cur.close() return task_status_record
def get_stats(): '''Music library statistics''' page_format = request.args.get('format', 'html') collection = app.config['COLLECTION'] stats = webfilter(partial(collection.stats, cursor_factory=RealDictCursor)) def bytesToHuman(b): return humanfriendly.format_size(b) def secondsToHuman(s): import datetime return str(datetime.timedelta(seconds=s)) if page_format == 'html': return render_template("stats.html", stats=stats, bytesToHuman=bytesToHuman, secondsToHuman=secondsToHuman) elif page_format == 'json': return dumps(stats) return ('Invalid format, available: json,html', 400)
def test_stats(self): stats = self.collection.stats(cursor_factory=RealDictCursor) self.assertEqual(stats, teststats)
def test_filtered_stats(self): mf = lib.MusicFilter() mf.keywords = ['rock'] stats = self.collection.stats(mf, cursor_factory=RealDictCursor) self.assertEqual(stats, filtered_teststats)
def get_next(self): '''Gets the next pending device. Returns: Dict: The next pending device as a dictionary object with the names of the rows as keys. ''' proc = 'main_db.get_next' # User a special cursor which returns results as dicts with self.conn, self.conn.cursor(cursor_factory=RealDictCursor) as cur: cur.execute(''' SELECT * FROM pending WHERE working= FALSE ORDER BY pending_id ASC LIMIT 1 ''') output = cur.fetchone() # Mark the new entry as being worked on if output: cur.execute(''' UPDATE pending SET working= TRUE WHERE pending_id= %s ''', (output['pending_id'],)) # Return the next device output = dict(output) return output else: return None
def get_dir_db_connection(*, dict_cursor: bool = False): try: connection = psycopg2.connect( dbname=DIR_DB_NAME, user=DIR_DB_USER, password=DIR_DB_PASSWORD, host=DIR_DB_HOST, port=DIR_DB_PORT) except psycopg2.OperationalError as e: logging.error('Unable to connect to Directory DB!\n%s', e) raise else: logging.debug('Connected to Directory DB: %s!', DIR_DB_NAME) if dict_cursor: cursor = connection.cursor(cursor_factory=RealDictCursor) else: cursor = connection.cursor() return connection, cursor
def get_sso_db_connection(*, dict_cursor: bool = False): try: connection = psycopg2.connect( dbname=SSO_DB_NAME, user=SSO_DB_USER, password=SSO_DB_PASSWORD, host=SSO_DB_HOST, port=SSO_DB_PORT) except psycopg2.OperationalError as e: logging.error('Unable to connect to SSO DB!\n%s', e) raise else: logging.debug('Connected to Directory DB: %s!', DIR_DB_NAME) if dict_cursor: cursor = connection.cursor(cursor_factory=RealDictCursor) else: cursor = connection.cursor() return connection, cursor
def get_sso_db_connection(*, dict_cursor: bool = False): try: connection = psycopg2.connect( dbname=SSO_DB_NAME, user=SSO_DB_USER, password=SSO_DB_PASSWORD, host=SSO_DB_HOST, port=SSO_DB_PORT) except psycopg2.OperationalError as e: logging.error('Unable to connect to SSO DB!\n%s', e) raise if dict_cursor: cursor = connection.cursor(cursor_factory=RealDictCursor) else: cursor = connection.cursor() return connection, cursor
def query_cursor(self, q, lazy_fetch=False, commit=True): """Execute a query and yield a cursor. All execution performed by the Postgres object uses this method. Args: q (str): SQL query lazy_fetch (bool): whether to use a server-side cursor (lazily fetches results). """ self.cursors_opened += 1 if self.verbose: logging.debug(q) if self.debug: empty_cursor = Bunch() empty_cursor.fetchmany = lambda size: [] empty_cursor.fetchall = lambda: [] yield empty_cursor return cursor_name = 'server_side_{}'.format(self.cursors_opened) if lazy_fetch else None with self.connection.cursor(cursor_name, cursor_factory=RealDictCursor) as cursor: cursor.execute(q) yield cursor if commit: self.commit()
def zupc_show_temp(): cur = current_app.extensions['sqlalchemy'].db.session.connection().\ connection.cursor(cursor_factory=RealDictCursor) cur.execute("""SELECT id, nom, insee FROM zupc_temp WHERE multiple=true AND parent_id = id;""") return render_template("zupc_show_temp.html", list_zupc=cur.fetchall(), apikey=current_user.apikey, mapbox_token=current_app.config['MAPBOX_TOKEN'])
def execute_to_json(conn, query, params=None): with conn.cursor(cursor_factory=RealDictCursor) as cursor: cursor.execute(query, params) results = [] for row in cursor.fetchall(): row = StringConverter().snake_case_to_camel_case(row) results.append(dict(zip(row.keys(), row.values()))) return results
def read_pg(sql, conn=None, **kwargs): ''' Read a SQL query and return it as a Table ''' cur = conn.cursor(cursor_factory=extras.RealDictCursor) cur.execute(sql) # Error occurs if a function is used in SQL query # and column name is not explictly provided new_table = Table(name='SQL Query', dialect='postgres') new_table.add_dicts(list(cur)) return new_table