我们从Python开源项目中,提取了以下49个代码示例,用于说明如何使用peewee.Model()。
def __call__(self, form, field): if isinstance(field.data, Model): return if isinstance(field.data, string_types) and UUID_REGEXP.search(field.data): try: obj = self.model.get(uuid=field.data) except self.model.DoesNotExist: raise ValidationError(self.message) field.data = obj return try: obj = self.model.get(id=field.data) except self.model.DoesNotExist: raise ValidationError(self.message) field.data = obj return
def build_fake_model(self, name): """ Build a fake model with some defaults and the given table name. We need this so we can perform operations that actually require a model class. :param name: Name of database table. :return: A new model class. :rtype: peewee.Model """ class FakeModel(peewee.Model): class Meta: database = peewee.Proxy() primary_key = False indexes = [] constraints = [] db_table = name return FakeModel
def _get_meta_db_class(db): """creating a declartive class model for db """ class _BlockedMeta(BaseModel): def __new__(cls, name, bases, attrs): _instance = super(_BlockedMeta, cls).__new__(cls, name, bases, attrs) _instance.objects = AsyncManager(_instance, db) return _instance class _Base(Model, metaclass=_BlockedMeta): def to_dict(self): return self._data class Meta: database = db return _Base
def models(self): """Return self.application models.""" Model_ = self.app.config['PEEWEE_MODELS_CLASS'] ignore = self.app.config['PEEWEE_MODELS_IGNORE'] models = [] if Model_ is not Model: try: mod = import_module(self.app.config['PEEWEE_MODELS_MODULE']) for model in dir(mod): models = getattr(mod, model) if not isinstance(model, pw.Model): continue models.append(models) except ImportError: return models elif isinstance(Model_, BaseSignalModel): models = BaseSignalModel.models return [m for m in models if m._meta.name not in ignore]
def test_add_fk_column(self): class Person(pw.Model): class Meta: database = self.db class Car(pw.Model): class Meta: database = self.db self.evolve_and_check_noop() peeweedbevolve.unregister(Car) class Car(pw.Model): owner = pw.ForeignKeyField(rel_model=Person, null=False) class Meta: database = self.db self.evolve_and_check_noop() person = Person.create() car = Car.create(owner=person)
def test_drop_fk_column(self): class Person(pw.Model): class Meta: database = self.db class Car(pw.Model): owner = pw.ForeignKeyField(rel_model=Person, null=False) class Meta: database = self.db self.evolve_and_check_noop() person = Person.create() car = Car.create(owner=person) peeweedbevolve.unregister(Car) class Car(pw.Model): class Meta: database = self.db self.evolve_and_check_noop()
def test_change_fk_column_to_int(self): class Person(pw.Model): class Meta: database = self.db class Car(pw.Model): owner = pw.ForeignKeyField(rel_model=Person, null=False) class Meta: database = self.db self.evolve_and_check_noop() person = Person.create() car = Car.create(owner=person) peeweedbevolve.unregister(Car) class Car(pw.Model): owner_id = pw.IntegerField(null=False) class Meta: database = self.db self.evolve_and_check_noop() self.assertEqual(Car.select().first().owner_id, person.id) Car.create(owner_id=-1) # this should not fail
def test_add_not_null_constraint_with_records_and_default_which_is_function(self): class SomeModel(pw.Model): some_field = pw.CharField(null=True) class Meta: database = self.db self.evolve_and_check_noop() SomeModel.create(some_field=None) peeweedbevolve.clear() def woot(): return 'woot' class SomeModel(pw.Model): some_field = pw.CharField(null=False, default=woot) class Meta: database = self.db self.evolve_and_check_noop() self.assertEqual(SomeModel.select().first().some_field, 'woot')
def test_reorder_multi_index(self): class SomeModel(pw.Model): some_field = pw.CharField(null=True) class Meta: database = self.db indexes = ( (('id', 'some_field'), False), ) self.evolve_and_check_noop() self.assertEqual(sorted(peeweedbevolve.normalize_indexes(peeweedbevolve.get_indexes_by_table(self.db,'somemodel'))), [(u'somemodel', (u'id',), True), (u'somemodel', (u'id',u'some_field'), False)]) peeweedbevolve.clear() class SomeModel(pw.Model): some_field = pw.CharField(null=True) class Meta: database = self.db indexes = ( (('some_field', 'id'), False), ) self.evolve_and_check_noop() self.assertEqual(sorted(peeweedbevolve.normalize_indexes(peeweedbevolve.get_indexes_by_table(self.db,'somemodel'))), [(u'somemodel', (u'id',), True), (u'somemodel', (u'some_field',u'id'), False)])
def test_change_integer_to_fake_fk_column(self): class Person(pw.Model): class Meta: database = self.db class Car(pw.Model): owner_id = pw.IntegerField(null=False) class Meta: database = self.db self.evolve_and_check_noop() car = Car.create(owner_id=-1) peeweedbevolve.unregister(Car) class Car(pw.Model): owner = pw.ForeignKeyField(rel_model=Person, null=False, fake=True) class Meta: database = self.db self.evolve_and_check_noop() person = Person.create() car = Car.create(owner=-2) self.assertEqual(Car.select().count(), 2)
def test_drop_column_default(self): class SomeModel(pw.Model): some_field = pw.CharField(null=True, default='woot2') class Meta: database = self.db self.evolve_and_check_noop() model = SomeModel.create() self.assertEqual(model.some_field, 'woot2') peeweedbevolve.clear() class SomeModel(pw.Model): some_field = pw.CharField(null=True) class Meta: database = self.db self.evolve_and_check_noop() model = SomeModel.create() self.assertEqual(model.some_field, None) self.assertEqual(SomeModel.get(SomeModel.id==model.id).some_field, None)
def test_dont_drop_table(self): class SomeModel(pw.Model): some_field = pw.CharField(null=True) class Meta: database = self.db self.evolve_and_check_noop() SomeModel.create(some_field='woot') peeweedbevolve.clear() class SomeModel(pw.Model): some_field = pw.CharField(null=True) class Meta: database = self.db evolve = False self.evolve_and_check_noop() # doesn't fail because table is still there SomeModel.create(some_field='woot2')
def test_dont_add_column(self): class SomeModel(pw.Model): some_field = pw.CharField(null=True) class Meta: database = self.db self.evolve_and_check_noop() peeweedbevolve.clear() class SomeModel(pw.Model): some_field = pw.CharField(null=True) some_other_field = pw.CharField(null=False) class Meta: database = self.db evolve = False self.evolve_and_check_noop() # should not fail because the not-null column wasn't added SomeModel.create(some_field='woot')
def test_change_decimal_precision(self): class SomeModel(pw.Model): some_field = pw.DecimalField(max_digits=4, decimal_places=0) class Meta: database = self.db self.evolve_and_check_noop() model = SomeModel.create(some_field=1234) self.assertEqual(model.some_field, 1234) peeweedbevolve.clear() class SomeModel(pw.Model): some_field = pw.DecimalField(max_digits=8, decimal_places=2) class Meta: database = self.db self.evolve_and_check_noop() self.assertEqual(SomeModel.select().first().some_field, 1234) model.some_field = 123456.78 model.save() self.assertEqual(SomeModel.select().first().some_field, decimal.Decimal('123456.78'))
def coerce_single_instance(lookup_field, value): """ Convert from whatever value is given to a scalar value for lookup_field. If value is a dict, then lookup_field.name is used to get the value from the dict. Example: lookup_field.name = 'id' value = {'id': 123, 'name': 'tim'} returns = 123 If value is a model, then lookup_field.name is extracted from the model. Example: lookup_field.name = 'id' value = <User id=123 name='tim'> returns = 123 Otherwise the value is returned as-is. :param lookup_field: Peewee model field used for getting name from value. :param value: Some kind of value (usually a dict, Model instance, or scalar). """ if isinstance(value, dict): return value.get(lookup_field.name) if isinstance(value, peewee.Model): return getattr(value, lookup_field.name) return value
def test_json_field(database, value, kwargs, dict_type): """Ensure that the JSONField can load and dump dicts.""" class JSONModel(peewee.Model): data = JSONField(**kwargs) JSONModel._meta.database = database.database JSONModel.create_table(True) instance = JSONModel(data=value) instance.save() # Should be the same dict upon save. assert instance.data is value # Should be the same dict when queried. queried_instance = JSONModel.select().first() assert isinstance(queried_instance.data, dict_type) assert queried_instance.data == value
def __init__(self, formdata=None, obj=None, data=None, *args, **kwargs): super(PeeweeForm, self).__init__(formdata=formdata, obj=obj, data=data, *args, **kwargs) if obj is not None and not isinstance(obj, Model): raise PeeweeFormException(u'obj must None or peewee model instance.') self.obj = obj self._validate = False
def validate(model, field): """Decorator to add validation methods to validation_fields dictionary. Populate validation_fields dictionary. The key/value pair is made up of the model (table) name and tuples field name and the method to run. For example, {'table_1': [('field_1', 'method_1'), ('field_2', 'method_2'), ...], 'table_A': [('field_a', 'method_a'), ('field_b', 'method_b'), ...], } Parameters ---------- field : str model : str """ def _validate(validation_method): try: validate.validation_fields[model].append( (field, validation_method.__name__)) except AttributeError: validate.validation_fields = defaultdict(list) validate.validation_fields[model].append( (field, validation_method.__name__)) @wraps(validation_method) def wrapped(self): return validation_method(self) return wrapped return _validate ##################### # Extend peewee Model #####################
def addSong(self, fid, filename, pid=None,artist=None, name=None): if pid == '': pid = 'None' # query = 'INSERT INTO `tracks` (`id`,`filename`,`playlistID`) VALUES ("{}","{}","{}");'.format(fid, filename, pid) # self.CURSOR.execute("""INSERT INTO tracks (id,filename,playlistID,artist,name) VALUES(%s,%s,%s,%s,%s)""", (fid,filename,pid,artist,name)) self.db_conn.commit() result = self.CURSOR.fetchone() print("Adding new track to tracks table...") #print(query) print(result) #self.CURSOR.close() return 1 # db = MySQLDatabase('jonhydb', user='john',passwd='megajonhy') # class Book(peewee.Model): # author = peewee.CharField() # title = peewee.TextField() # class Meta: # database = db # Book.create_table() # book = Book(author="me", title='Peewee is cool') # book.save() # for book in Book.filter(author="me"): # print book.title
def test_initialize(): tc = TableCreator('awesome') assert issubclass(tc.model, peewee.Model)
def foreign_key(self, coltype, name, references, **kwargs): """ Add a foreign key to the model. This has some special cases, which is why it's not handled like all the other column types. :param name: Name of the foreign key. :param references: Table name in the format of "table.column" or just "table" (and id will be default column). :param kwargs: Additional kwargs to pass to the column instance. You can also provide "on_delete" and "on_update" to add constraints. :return: None """ try: rel_table, rel_column = references.split('.', 1) except ValueError: rel_table, rel_column = references, 'id' # Create a dummy model that we can relate this field to. # Add the foreign key as a local field on the dummy model. # We only do this so that Peewee can generate the nice foreign key constraint for us. class DummyRelated(peewee.Model): class Meta: primary_key = False database = peewee.Proxy() db_table = rel_table rel_field_class = FIELD_TO_PEEWEE.get(coltype, peewee.IntegerField) rel_field = rel_field_class() rel_field.add_to_class(DummyRelated, rel_column) field = peewee.ForeignKeyField(DummyRelated, db_column=name, to_field=rel_column, **kwargs) field.add_to_class(self.model, name)
def startup_library_service(): wamp = autobahn_sync.AutobahnSync() wamp.run() db = pw.SqliteDatabase('books.db') class Book(pw.Model): title = pw.CharField() author = pw.CharField() class Meta: database = db try: db.create_table(Book) except pw.OperationalError: pass @wamp.register('com.library.get_book') def get_book(id): try: b = Book.get(id=id) except pw.DoesNotExist: return {'_error': "Doesn't exist"} return {'id': id, 'title': b.title, 'author': b.author} @wamp.register('com.library.new_book') def new_book(title, author): book = Book(title=title, author=author) book.save() wamp.session.publish('com.library.book_created', book.id) return {'id': book.id}
def get_model_class(self): if self.database is None: raise RuntimeError('Database must be initialized.') class BaseModel(Model): class Meta: database = self.database return BaseModel
def Model(self): if self._app is None: database = getattr(self, 'database', None) if database is None: self.database = Proxy() if not hasattr(self, '_model_class'): self._model_class = self.get_model_class() return self._model_class
def _create_table(model, fail_silently=True): """Utility method for creating a database table and indexes that is a work around for unexpected behavior by the peewee ORM module. Specifically, peewee create_table doesn't create compound indexes as expected. Args: db: an active peewee database connection model: subclass of peewee.Model indexes: a tuple of indexes that would normally be specified in the peewee.Mode's class Meta. Example: indexes = ( (('chrom', 'start', 'end'), True), # True means unique index ) fail_silently: if True, no error will be raised if the table already exists """ # create table as a compressed TokuDB table db = model._meta.database indexes = model._meta.indexes raw_query = db.compiler().create_table(model, safe=fail_silently) raw_query = list(raw_query) raw_query[0] = raw_query[0] + " engine=TokuDB, compression='tokudb_zlib', charset=latin1" db.execute_sql(*raw_query) logging.debug(raw_query[0]) # create indexes safe_str = "IF NOT EXISTS" if fail_silently else "" model_name = model.__name__.lower() for i, (columns, unique) in enumerate(indexes): columns_str = ",".join(map(lambda c: "`"+c+"`", columns)) unique_str = "UNIQUE" if unique else "" q = "ALTER TABLE `%(model_name)s` ADD %(unique_str)s KEY %(safe_str)s `index%(i)s`(%(columns_str)s);" % locals() logging.debug(q) db.execute_sql(q)
def run(self, result=None): model_classes = [ m[1] for m in inspect.getmembers(sys.modules['models'], inspect.isclass) if issubclass(m[1], Model) and m[1] != Model ] with test_database(test_db, model_classes): super(TestCaseDB, self).run(result)
def test_pd_field(): db = SqliteDatabase(":memory:") class TestModel(Model): pf = PartialDateField(null=True) class Meta: database = db TestModel.create_table() TestModel(pf=PartialDate()).save() TestModel(pf=None).save() res = [r[0] for r in db.execute_sql("SELECT pf FROM testmodel").fetchall()] assert res[0] is None and res[1] is None TestModel(pf=PartialDate(1997)).save() TestModel(pf=PartialDate(1996, 4)).save() TestModel(pf=PartialDate(1995, 5, 13)).save() res = [r[0] for r in db.execute_sql("SELECT pf FROM testmodel").fetchall()] assert '1995-05-13' in res assert '1996-04-**' in res assert '1997-**-**' in res res = [r.pf for r in TestModel.select().order_by(TestModel.pf)] assert res[0] is None assert res[1] is None assert res[2] == PartialDate(1995, 5, 13) assert res[3] == PartialDate(1996, 4) assert res[4] == PartialDate(1997)
def _get_model_handler(self, model): if peewee and issubclass(model, peewee.Model): return PeeweeHandler(model) if django and issubclass(model, DjangoModel): return DjangoHandler(model)
def save(self, *args, **kwargs): # unsure how self.updated_at refers to class variable BaseModel.updated_at self.updated_at = datetime.datetime.now().strftime("%Y/%m/%d %H:%M:%S") peewee.Model.save(self)
def select(cls, *args, **kwargs): """Support read slaves.""" query = super(Model, cls).select(*args, **kwargs) query.database = cls._get_read_database() return query
def raw(cls, *args, **kwargs): query = super(Model, cls).raw(*args, **kwargs) if query._sql.lower().startswith('select'): query.database = cls._get_read_database() return query
def save(self, force_insert=False, **kwargs): """Send signals.""" created = force_insert or not bool(self.pk) self.pre_save.send(self, created=created) super(Model, self).save(force_insert=force_insert, **kwargs) self.post_save.send(self, created=created)
def delete_instance(self, *args, **kwargs): """Send signals.""" self.pre_delete.send(self) super(Model, self).delete_instance(*args, **kwargs) self.post_delete.send(self)
def init_app(self, app, database=None): """Initialize application.""" # Register application if not app: raise RuntimeError('Invalid application.') self.app = app if not hasattr(app, 'extensions'): app.extensions = {} app.extensions['peewee'] = self app.config.setdefault('PEEWEE_CONNECTION_PARAMS', {}) app.config.setdefault('PEEWEE_DATABASE_URI', 'sqlite:///peewee.sqlite') app.config.setdefault('PEEWEE_MANUAL', False) app.config.setdefault('PEEWEE_MIGRATE_DIR', 'migrations') app.config.setdefault('PEEWEE_MIGRATE_TABLE', 'migratehistory') app.config.setdefault('PEEWEE_MODELS_CLASS', Model) app.config.setdefault('PEEWEE_MODELS_IGNORE', []) app.config.setdefault('PEEWEE_MODELS_MODULE', '') app.config.setdefault('PEEWEE_READ_SLAVES', '') app.config.setdefault('PEEWEE_USE_READ_SLAVES', True) # Initialize database params = app.config['PEEWEE_CONNECTION_PARAMS'] database = database or app.config.get('PEEWEE_DATABASE_URI') if not database: raise RuntimeError('Invalid database.') database = get_database(database, **params) slaves = app.config['PEEWEE_READ_SLAVES'] if isinstance(slaves, string_types): slaves = slaves.split(',') self.slaves = [get_database(slave, **params) for slave in slaves if slave] self.database.initialize(database) if self.database.database == ':memory:': app.config['PEEWEE_MANUAL'] = True if not app.config['PEEWEE_MANUAL']: app.before_request(self.connect) app.teardown_request(self.close)
def test_create_table(self): class SomeModel(pw.Model): some_field = pw.CharField(null=True) class Meta: database = self.db self.evolve_and_check_noop() SomeModel.create(some_field='woot') self.assertEqual(SomeModel.select().first().some_field, 'woot')
def test_drop_table(self): class SomeModel(pw.Model): some_field = pw.CharField(null=True) class Meta: database = self.db self.evolve_and_check_noop() SomeModel.create(some_field='woot') peeweedbevolve.clear() self.evolve_and_check_noop() with self.assertRaises(pw.ProgrammingError): SomeModel.create(some_field='woot2') # fails because table isn't there
def test_create_table_with_fk(self): class SomeModel(pw.Model): some_field = pw.CharField(null=True) class Meta: database = self.db class SomeModel2(pw.Model): some_field2 = pw.CharField(null=True) some_model = pw.ForeignKeyField(rel_model=SomeModel) class Meta: database = self.db self.evolve_and_check_noop() sm = SomeModel.create(some_field='woot') sm2 = SomeModel2.create(some_field2='woot2', some_model=sm)
def test_rename_table_aka_string(self): self.test_create_table() peeweedbevolve.clear() class SomeOtherModel(pw.Model): some_field = pw.CharField(null=True) class Meta: database = self.db aka = 'somemodel' self.evolve_and_check_noop() self.assertEqual(SomeOtherModel.select().first().some_field, 'woot')
def test_rename_table_aka_list(self): self.test_create_table() peeweedbevolve.clear() class SomeOtherModel(pw.Model): some_field = pw.CharField(null=True) class Meta: database = self.db aka = ['somemodel'] self.evolve_and_check_noop() self.assertEqual(SomeOtherModel.select().first().some_field, 'woot')
def test_add_column(self): self.test_create_table() peeweedbevolve.clear() class SomeModel(pw.Model): some_field = pw.CharField(null=True) another_field = pw.CharField(null=True) class Meta: database = self.db self.evolve_and_check_noop() self.assertEqual(SomeModel.select().first().another_field, None)
def test_rename_column_aka_string(self): self.test_create_table() peeweedbevolve.clear() class SomeModel(pw.Model): some_other_field = pw.CharField(null=True, aka='some_field') class Meta: database = self.db self.evolve_and_check_noop() self.assertEqual(SomeModel.select().first().some_other_field, 'woot')
def test_drop_column(self): self.test_create_table() peeweedbevolve.clear() class SomeModel(pw.Model): class Meta: database = self.db self.evolve_and_check_noop() self.assertFalse(hasattr(SomeModel.select().first(), 'some_field'))
def test_rename_table_add_column(self): self.test_create_table() peeweedbevolve.clear() class SomeOtherModel(pw.Model): some_field = pw.CharField(null=True) another_field = pw.CharField(null=True) class Meta: database = self.db aka = 'somemodel' self.evolve_and_check_noop() o = SomeOtherModel.select().first() self.assertEqual(o.some_field, 'woot') self.assertEqual(o.another_field, None)