我们从Python开源项目中,提取了以下19个代码示例,用于说明如何使用peewee.SQL。
def channel_top_list(self, count, user, server): """ Gets top <count> channels for a user :param count: length of top list :param user: the user :param server: the server :return: a SelectQuery with Message.channel with the top channels """ messages = (user.messages .select(self.models.Message.channel) .join(self.models.Server, on=(self.models.Server.id == self.models.Channel.server)) .switch(self.models.Message) # peewee needs the == .where(self.models.Message.is_command == False, self.models.Server.id == server) .annotate(self.models.Channel) .order_by(SQL('count').desc()) .limit(count)) return messages
def user_top_list(self, count, server): """ Gets top <count> users on the server :param count: length of the top list :param server: the server :return: a SelectQuery with users on the top list """ users = (self.models.User .select() .join(self.models.Channel, on=(self.models.Channel.id == self.models.Message.channel)) .join(self.models.Server, on=(self.models.Server.id == self.models.Channel.server)) .switch(self.models.User) # peewee needs the == .where(self.models.Message.is_command == False, self.models.User.is_bot == False, self.models.Server.id == server) .annotate(self.models.Message) # annotate alias to 'count' .order_by(SQL('count').desc()) .limit(count)) return users
def column(self, coltype, name, **kwargs): """ Generic method to add a column of any type. :param coltype: Column type (from FIELD_TO_PEEWEE). :param name: Name of column. :param kwargs: Arguments for the given column type. """ constraints = kwargs.pop('constraints', []) new_constraints = [] for const in constraints: if isinstance(const, str): const = peewee.SQL(const) new_constraints.append(const) kwargs['constraints'] = new_constraints field_class = FIELD_TO_PEEWEE.get(coltype, peewee.CharField) field_class(**kwargs).add_to_class(self.model, name)
def _execute(db, to_run, interactive=True, commit=True): if interactive: print() try: with db.atomic() as txn: for sql, params in to_run: if interactive or DEBUG: print_sql(' %s; %s' % (sql, params or '')) if sql.strip().startswith('--'): continue db.execute_sql(sql, params) if interactive: print() print( (colorama.Style.BRIGHT + 'SUCCESS!' + colorama.Style.RESET_ALL) if commit else 'TEST PASSED - ROLLING BACK', colorama.Style.DIM + '-', 'https://github.com/keredson/peewee-db-evolve' + colorama.Style.RESET_ALL ) print() if not commit: txn.rollback() except Exception as e: print() print('------------------------------------------') print(colorama.Style.BRIGHT + colorama.Fore.RED + ' SQL EXCEPTION - ROLLING BACK ALL CHANGES' + colorama.Style.RESET_ALL) print('------------------------------------------') print() raise e
def sort(self, collection, *sorting, **Kwargs): """Sort resources.""" logger.debug('Sort collection: %r', sorting) sorting_ = [] for name, desc in sorting: field = self.meta.model._meta.fields.get(name) or SQL(name) if desc: field = field.desc() sorting_.append(field) if sorting_: collection = collection.order_by(*sorting_) return collection
def __ddl_column__(self, ctype): return SQL(self.enum_name)
def test_constraint(): tc = TableCreator('awesome') tc.column('char', 'fname') const = peewee.SQL('fname not null') tc.add_constraint(const) assert tc.model._meta.constraints == [const]
def add_constraint(self, value): """ Add a constraint to the model. :param name: String value of constraint. :return: None """ self.model._meta.constraints.append(peewee.SQL(value))
def execute_sql(self, sql, params=None): """ Run the given sql and return a cursor. :param sql: SQL string. :param params: Parameters for the given SQL (deafult None). :return: SQL cursor :rtype: cursor """ return self.database.execute_sql(sql, params=params, require_commit=False)
def _get_last_users(): return list( User.select(User.username, fn.MAX(LastUsers.id).alias("last_id")) .join(LastUsers) .where(User.username != "") .group_by(User.id, User.username) .order_by(SQL("last_id").desc()) .limit(10))
def drop_foreign_key(db, migrator, table_name, fk_name): drop_stmt = 'drop foreign key' if is_mysql(db) else 'DROP CONSTRAINT' op = pw.Clause(pw.SQL('ALTER TABLE'), pw.Entity(table_name), pw.SQL(drop_stmt), pw.Entity(fk_name)) return normalize_whatever_junk_peewee_migrations_gives_you(db, migrator, op)
def drop_default(db, migrator, table_name, column_name, field): op = pw.Clause(pw.SQL('ALTER TABLE'), pw.Entity(table_name), pw.SQL('ALTER COLUMN'), pw.Entity(column_name), pw.SQL('DROP DEFAULT')) return normalize_whatever_junk_peewee_migrations_gives_you(db, migrator, op)
def set_default(db, migrator, table_name, column_name, field): default = field.default if callable(default): default = default() param = pw.Param(field.db_value(default)) op = pw.Clause(pw.SQL('ALTER TABLE'), pw.Entity(table_name), pw.SQL('ALTER COLUMN'), pw.Entity(column_name), pw.SQL('SET DEFAULT'), param) return normalize_whatever_junk_peewee_migrations_gives_you(db, migrator, op)
def rename_column(db, migrator, ntn, ocn, ncn, field): qc = db.compiler() if is_mysql(db): junk = pw.Clause( pw.SQL('ALTER TABLE'), pw.Entity(ntn), pw.SQL('CHANGE'), pw.Entity(ocn), qc.field_definition(field) ) else: junk = migrator.rename_column(ntn, ocn, ncn, generate=True) return normalize_whatever_junk_peewee_migrations_gives_you(db, migrator, junk)
def change_column_type(db, migrator, table_name, column_name, field): qc = db.compiler() column_type = qc.get_column_type(field.get_db_field()) if is_postgres(db): op = pw.Clause(pw.SQL('ALTER TABLE'), pw.Entity(table_name), pw.SQL('ALTER'), field.as_entity(), pw.SQL('TYPE'), field.__ddl_column__(column_type)) elif is_mysql(db): op = pw.Clause(*[pw.SQL('ALTER TABLE'), pw.Entity(table_name), pw.SQL('MODIFY')] + field.__ddl__(column_type)) else: raise Exception('how do i change a column type for %s?' % db) return normalize_whatever_junk_peewee_migrations_gives_you(db, migrator, op)
def exists(self): clone = self.paginate(1, 1) clone._select = [SQL('1')] return bool(await clone.scalar())
def default_insert_clause(self, model_class): return SQL('DEFAULT VALUES')
def test_select_all(flushdb): await create_users_blogs(2, 2) all_cols = SQL('*') query = Blog.select(all_cols) blogs = [blog async for blog in query.order_by(Blog.pk)] assert [b.title for b in blogs] == ['b-0-0', 'b-0-1', 'b-1-0', 'b-1-1'] assert [(await b.user).username for b in blogs] == ['u0', 'u0', 'u1', 'u1']
def exists(self): clone = self.paginate(1, 1) clone._select = [SQL('1')] raise gen.Return(bool((yield clone.scalar())))