我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用psycopg2.extensions.AsIs()。
def test_adapt_most_specific(self): from psycopg2.extensions import adapt, register_adapter, AsIs class A(object): pass class B(A): pass class C(B): pass register_adapter(A, lambda a: AsIs("a")) register_adapter(B, lambda b: AsIs("b")) try: self.assertEqual(b'b', adapt(C()).getquoted()) finally: del psycopg2.extensions.adapters[A, psycopg2.extensions.ISQLQuote] del psycopg2.extensions.adapters[B, psycopg2.extensions.ISQLQuote]
def getfeatures(self,sourcebinding): """ Yields a feature iterator. Each feature is a dict Throws exceptions May want to switch to server-side cursor. """ cur = self.conn.cursor(cursor_factory = psycopg2.extras.RealDictCursor) SQL = "select * from %s " if sourcebinding.filter or sourcebinding.config.filter : print "Need to apply filter!" # filterclause = 'WHERE ' # process clauses in an injection-safe way and append to SQL try : cur.execute(SQL, (AsIs(sourcebinding.source), )) except Exception as e: # import pdb; pdb.set_trace() raise e for r in cur : yield r # break # refactor if necessary to force connection close on failure
def total_unread(user_id=None): if user_id is None: user_id = current_user_id() with db.conn() as conn: cur = conn.execute(''' SELECT COUNT(*), SUM("unread_count") FROM %(schema_name)s.user_conversation WHERE "unread_count" > 0 AND "user" = %(user_id)s ''', { 'schema_name': AsIs(_get_schema_name()), 'user_id': user_id } ) r = cur.first() conversation_count = r[0] message_count = r[1] return { 'conversation': conversation_count, 'message': message_count }
def __validate_conversation(participants): first_row = None with db.conn() as conn: result = conn.execute(""" SELECT c._id FROM %(schema_name)s.conversation AS c WHERE c.distinct_by_participants = TRUE AND ( SELECT COUNT(DISTINCT uc.user) FROM %(schema_name)s.user_conversation AS uc WHERE uc.conversation = c._id AND uc.user IN %(user_ids)s ) = %(count)s """, {'schema_name': AsIs(_get_schema_name()), 'user_ids': tuple(participants), 'count': len(participants)}) first_row = result.first() valid = first_row is None if not valid: raise ConversationAlreadyExistsException(first_row[0])
def getReceiptList(self): """ Returns a list of message receipt statuses. """ receipts = list() with db.conn() as conn: cur = conn.execute(''' SELECT receipt.user, read_at, delivered_at FROM %(schema_name)s.receipt WHERE "message" = %(message_id)s AND (read_at IS NOT NULL or delivered_at is NOT NULL) ''', { 'schema_name': AsIs(_get_schema_name()), 'message_id': self.id.key } ) for row in cur: receipts.append({ 'user': row['user'], 'read_at': to_rfc3339_or_none(row['read_at']), 'delivered_at': to_rfc3339_or_none(row['delivered_at']) }) return receipts
def patch(self, id=None): try: content = validate_update_columns(request.get_json(), UPDATEABLE_COLUMNS) logger.info("Request Body: {content}".format(content=content)) conn = db_conn() for key, value in content.items(): arguments = (AsIs(key), value, id) execute(conn, UPDATE_CATEGORY, arguments) conn.close() return status_ok.modified() except KeyError as error: logger.info(error) return invalid_fields(error.fields) except Exception as error: logger.info(error) return unexpected_error()
def addapt_numpy_float64(numpy_float64): return AsIs(numpy_float64)
def addapt_numpy_int64(numpy_int64): return AsIs(numpy_int64)
def test_no_mro_no_joy(self): from psycopg2.extensions import adapt, register_adapter, AsIs class A: pass class B(A): pass register_adapter(A, lambda a: AsIs("a")) try: self.assertRaises(psycopg2.ProgrammingError, adapt, B()) finally: del psycopg2.extensions.adapters[A, psycopg2.extensions.ISQLQuote]
def test_adapt_subtype_3(self): from psycopg2.extensions import adapt, register_adapter, AsIs class A: pass class B(A): pass register_adapter(A, lambda a: AsIs("a")) try: self.assertEqual(b"a", adapt(B()).getquoted()) finally: del psycopg2.extensions.adapters[A, psycopg2.extensions.ISQLQuote]
def adapt_path(path): return AsIs('%s::ltree' % adapt(path.value))
def get_min_max(data_table, boundary_table, stat_field, num_classes, min_val, map_type, pg_cur, settings): # query to get min and max values (filter small populations that overly influence the map visualisation) try: # if map_type == "values": sql = "SELECT MIN(%s) AS min, MAX(%s) AS max FROM %s AS tab " \ "INNER JOIN %s AS bdy ON tab.{0} = bdy.id " \ "WHERE %s > 0 " \ "AND bdy.population > {1}" \ .format(settings['region_id_field'], float(min_val)) sql_string = pg_cur.mogrify(sql, (AsIs(stat_field), AsIs(stat_field), AsIs(data_table), AsIs(boundary_table), AsIs(stat_field))) pg_cur.execute(sql_string) row = pg_cur.fetchone() except Exception as ex: print("{0} - {1} Failed: {2}".format(data_table, stat_field, ex)) return list() output_dict = { "min": row["min"], "max": row["max"] } return output_dict
def compile_create(self): """Materalized view.""" return 'CREATE MATERIALIZED VIEW {}'.format(AsIs(self.name))
def refresh(self): """Refresh a materialized view.""" return 'REFRESH MATERIALIZED VIEW {}'.format(AsIs(self.name))
def drop(self): return 'DROP MATERIALIZED VIEW {}'.format(AsIs(self.name))
def test_adapt_most_specific(self): from psycopg2.extensions import adapt, register_adapter, AsIs class A(object): pass class B(A): pass class C(B): pass register_adapter(A, lambda a: AsIs("a")) register_adapter(B, lambda b: AsIs("b")) try: self.assertEqual(b('b'), adapt(C()).getquoted()) finally: del psycopg2.extensions.adapters[A, psycopg2.extensions.ISQLQuote] del psycopg2.extensions.adapters[B, psycopg2.extensions.ISQLQuote]
def test_adapt_subtype_3(self): from psycopg2.extensions import adapt, register_adapter, AsIs class A: pass class B(A): pass register_adapter(A, lambda a: AsIs("a")) try: self.assertEqual(b("a"), adapt(B()).getquoted()) finally: del psycopg2.extensions.adapters[A, psycopg2.extensions.ISQLQuote]
def adapt_array(arr): conn = arr.field.model_class._meta.database.get_conn() items = adapt(arr.items) items.prepare(conn) return AsIs('%s::%s%s' % ( items, arr.field.get_column_type(), '[]' * arr.field.dimensions))
def adapt_decimal(Decimal): return AsIs(float)