我们从Python开源项目中,提取了以下20个代码示例,用于说明如何使用peewee.Database()。
def _peewee_model(): from peewee import BaseModel, Database return [BaseModel, Database]
def load_database(self, database): """ Load the given database, whatever it might be. A connection string: ``sqlite:///database.sqlite`` A dictionary: ``{'engine': 'SqliteDatabase', 'name': 'database.sqlite'}`` A peewee.Database instance: ``peewee.SqliteDatabase('database.sqlite')`` :param database: Connection string, dict, or peewee.Database instance to use. :raises: peewee.DatabaseError if database connection cannot be established. :return: Database connection. :rtype: peewee.Database instance. """ # It could be an actual instance... if isinstance(database, (peewee.Proxy, peewee.Database)): return database # It could be a dictionary... if isinstance(database, dict): try: name = database.pop('name') engine = database.pop('engine') except KeyError: error_msg = 'Configuration dict must specify "name" and "engine" keys.' raise peewee.DatabaseError(error_msg) db_class = pydoc.locate(engine) if not db_class: raise peewee.DatabaseError('Unable to import engine class: {}'.format(engine)) return db_class(name, **database) # Or it could be a database URL. return url_connect(database)
def _load_from_config_dict(self, config_dict): try: name = config_dict.pop('name') engine = config_dict.pop('engine') except KeyError: raise RuntimeError('DATABASE configuration must specify a ' '`name` and `engine`.') if '.' in engine: path, class_name = engine.rsplit('.', 1) else: path, class_name = 'peewee', engine try: __import__(path) module = sys.modules[path] database_class = getattr(module, class_name) assert issubclass(database_class, _Database) except ImportError: raise RuntimeError('Unable to import %s' % engine) except AttributeError: raise RuntimeError('Database engine not found %s' % engine) except AssertionError: raise RuntimeError('Database engine not a subclass of ' 'peewee.Database: %s' % engine) return database_class(name, **config_dict)
def __init__(self, app=None, database=None): self.database = None # Reference to actual Peewee database instance. self._app = app self._db = database # dict, url, Database, or None (default). if app is not None: self.init_app(app)
def _load_database(self, app, config_value): if isinstance(config_value, Database): database = config_value elif isinstance(config_value, dict): database = self._load_from_config_dict(dict(config_value)) else: # Assume a database connection URL. database = db_url_connect(config_value) if isinstance(self.database, Proxy): self.database.initialize(database) else: self.database = database
def _load_from_config_dict(self, config_dict): try: name = config_dict.pop('name') engine = config_dict.pop('engine') except KeyError: raise RuntimeError('DATABASE configuration must specify a ' '`name` and `engine`.') if '.' in engine: path, class_name = engine.rsplit('.', 1) else: path, class_name = 'peewee', engine try: __import__(path) module = sys.modules[path] database_class = getattr(module, class_name) assert issubclass(database_class, Database) except ImportError: raise RuntimeError('Unable to import %s' % engine) except AttributeError: raise RuntimeError('Database engine not found %s' % engine) except AssertionError: raise RuntimeError('Database engine not a subclass of ' 'peewee.Database: %s' % engine) return database_class(name, **config_dict)
def get_model_class(self): if self.database is None: raise RuntimeError('Database must be initialized.') class BaseModel(Model): class Meta: database = self.database return BaseModel
def __init__(self, db_or_model, file_or_name, fields=None, field_names=None, has_header=True, sample_size=10, converter=None, db_table=None, pk_in_csv=False, **reader_kwargs): self.file_or_name = file_or_name self.fields = fields self.field_names = field_names self.has_header = has_header self.sample_size = sample_size self.converter = converter self.reader_kwargs = reader_kwargs if isinstance(file_or_name, basestring): self.filename = file_or_name elif isinstance(file_or_name, StringIO): self.filename = 'data.csv' else: self.filename = file_or_name.name if isinstance(db_or_model, Database): self.database = db_or_model self.model = None self.db_table = ( db_table or os.path.splitext(os.path.basename(self.filename))[0]) else: self.model = db_or_model self.database = self.model._meta.database self.db_table = self.model._meta.db_table self.fields = self.model._meta.sorted_fields self.field_names = self.model._meta.sorted_field_names # If using an auto-incrementing primary key, ignore it unless we # are told the primary key is included in the CSV. if self.model._meta.auto_increment and not pk_in_csv: self.fields = self.fields[1:] self.field_names = self.field_names[1:]
def get_conn(self): if self.closed: raise OperationalError('Database pool has not been initialized') return AioConnection(self.pool.acquire(), autocommit=self.autocommit, autorollback=self.autorollback, exception_wrapper=self.exception_wrapper)
def connect(self, safe=True): if self.deferred: raise OperationalError('Database has not been initialized') if not self.closed: if safe: return raise OperationalError('Connection already open') with self.exception_wrapper: self.pool = await self._connect(self.database, **self.connect_kwargs) self.closed = False
def drop_table(self, model_class, fail_silently=False, cascade=False): qc = self.compiler() if cascade and not self.drop_cascade: raise ValueError('Database does not support DROP TABLE..CASCADE.') async with self.get_conn() as conn: args = qc.drop_table(model_class, fail_silently, cascade) return await conn.execute_sql(*args)