Python peewee 模块,Database() 实例源码

我们从Python开源项目中,提取了以下20个代码示例,用于说明如何使用peewee.Database()

项目:wsgicli    作者:kobinpy    | 项目源码 | 文件源码
def _peewee_model():
    from peewee import BaseModel, Database
    return [BaseModel, Database]
项目:peewee-moves    作者:timster    | 项目源码 | 文件源码
def load_database(self, database):
        """
        Load the given database, whatever it might be.

        A connection string: ``sqlite:///database.sqlite``

        A dictionary: ``{'engine': 'SqliteDatabase', 'name': 'database.sqlite'}``

        A peewee.Database instance: ``peewee.SqliteDatabase('database.sqlite')``

        :param database: Connection string, dict, or peewee.Database instance to use.
        :raises: peewee.DatabaseError if database connection cannot be established.
        :return: Database connection.
        :rtype: peewee.Database instance.
        """
        # It could be an actual instance...
        if isinstance(database, (peewee.Proxy, peewee.Database)):
            return database

        # It could be a dictionary...
        if isinstance(database, dict):
            try:
                name = database.pop('name')
                engine = database.pop('engine')
            except KeyError:
                error_msg = 'Configuration dict must specify "name" and "engine" keys.'
                raise peewee.DatabaseError(error_msg)

            db_class = pydoc.locate(engine)
            if not db_class:
                raise peewee.DatabaseError('Unable to import engine class: {}'.format(engine))
            return db_class(name, **database)

        # Or it could be a database URL.
        return url_connect(database)
项目:mmpdb    作者:rdkit    | 项目源码 | 文件源码
def _load_from_config_dict(self, config_dict):
        try:
            name = config_dict.pop('name')
            engine = config_dict.pop('engine')
        except KeyError:
            raise RuntimeError('DATABASE configuration must specify a '
                               '`name` and `engine`.')

        if '.' in engine:
            path, class_name = engine.rsplit('.', 1)
        else:
            path, class_name = 'peewee', engine

        try:
            __import__(path)
            module = sys.modules[path]
            database_class = getattr(module, class_name)
            assert issubclass(database_class, _Database)
        except ImportError:
            raise RuntimeError('Unable to import %s' % engine)
        except AttributeError:
            raise RuntimeError('Database engine not found %s' % engine)
        except AssertionError:
            raise RuntimeError('Database engine not a subclass of '
                               'peewee.Database: %s' % engine)

        return database_class(name, **config_dict)
项目:Quiver-alfred    作者:danielecook    | 项目源码 | 文件源码
def __init__(self, app=None, database=None):
        self.database = None  # Reference to actual Peewee database instance.
        self._app = app
        self._db = database  # dict, url, Database, or None (default).
        if app is not None:
            self.init_app(app)
项目:Quiver-alfred    作者:danielecook    | 项目源码 | 文件源码
def _load_database(self, app, config_value):
        if isinstance(config_value, Database):
            database = config_value
        elif isinstance(config_value, dict):
            database = self._load_from_config_dict(dict(config_value))
        else:
            # Assume a database connection URL.
            database = db_url_connect(config_value)

        if isinstance(self.database, Proxy):
            self.database.initialize(database)
        else:
            self.database = database
项目:Quiver-alfred    作者:danielecook    | 项目源码 | 文件源码
def _load_from_config_dict(self, config_dict):
        try:
            name = config_dict.pop('name')
            engine = config_dict.pop('engine')
        except KeyError:
            raise RuntimeError('DATABASE configuration must specify a '
                               '`name` and `engine`.')

        if '.' in engine:
            path, class_name = engine.rsplit('.', 1)
        else:
            path, class_name = 'peewee', engine

        try:
            __import__(path)
            module = sys.modules[path]
            database_class = getattr(module, class_name)
            assert issubclass(database_class, Database)
        except ImportError:
            raise RuntimeError('Unable to import %s' % engine)
        except AttributeError:
            raise RuntimeError('Database engine not found %s' % engine)
        except AssertionError:
            raise RuntimeError('Database engine not a subclass of '
                               'peewee.Database: %s' % engine)

        return database_class(name, **config_dict)
项目:Quiver-alfred    作者:danielecook    | 项目源码 | 文件源码
def get_model_class(self):
        if self.database is None:
            raise RuntimeError('Database must be initialized.')

        class BaseModel(Model):
            class Meta:
                database = self.database

        return BaseModel
项目:Quiver-alfred    作者:danielecook    | 项目源码 | 文件源码
def __init__(self, db_or_model, file_or_name, fields=None,
                 field_names=None, has_header=True, sample_size=10,
                 converter=None, db_table=None, pk_in_csv=False,
                 **reader_kwargs):
        self.file_or_name = file_or_name
        self.fields = fields
        self.field_names = field_names
        self.has_header = has_header
        self.sample_size = sample_size
        self.converter = converter
        self.reader_kwargs = reader_kwargs

        if isinstance(file_or_name, basestring):
            self.filename = file_or_name
        elif isinstance(file_or_name, StringIO):
            self.filename = 'data.csv'
        else:
            self.filename = file_or_name.name

        if isinstance(db_or_model, Database):
            self.database = db_or_model
            self.model = None
            self.db_table = (
                db_table or
                os.path.splitext(os.path.basename(self.filename))[0])
        else:
            self.model = db_or_model
            self.database = self.model._meta.database
            self.db_table = self.model._meta.db_table
            self.fields = self.model._meta.sorted_fields
            self.field_names = self.model._meta.sorted_field_names
            # If using an auto-incrementing primary key, ignore it unless we
            # are told the primary key is included in the CSV.
            if self.model._meta.auto_increment and not pk_in_csv:
                self.fields = self.fields[1:]
                self.field_names = self.field_names[1:]
项目:metrics    作者:Jeremy-Friedman    | 项目源码 | 文件源码
def __init__(self, app=None, database=None):
        self.database = None  # Reference to actual Peewee database instance.
        self._app = app
        self._db = database  # dict, url, Database, or None (default).
        if app is not None:
            self.init_app(app)
项目:metrics    作者:Jeremy-Friedman    | 项目源码 | 文件源码
def _load_database(self, app, config_value):
        if isinstance(config_value, Database):
            database = config_value
        elif isinstance(config_value, dict):
            database = self._load_from_config_dict(dict(config_value))
        else:
            # Assume a database connection URL.
            database = db_url_connect(config_value)

        if isinstance(self.database, Proxy):
            self.database.initialize(database)
        else:
            self.database = database
项目:metrics    作者:Jeremy-Friedman    | 项目源码 | 文件源码
def _load_from_config_dict(self, config_dict):
        try:
            name = config_dict.pop('name')
            engine = config_dict.pop('engine')
        except KeyError:
            raise RuntimeError('DATABASE configuration must specify a '
                               '`name` and `engine`.')

        if '.' in engine:
            path, class_name = engine.rsplit('.', 1)
        else:
            path, class_name = 'peewee', engine

        try:
            __import__(path)
            module = sys.modules[path]
            database_class = getattr(module, class_name)
            assert issubclass(database_class, Database)
        except ImportError:
            raise RuntimeError('Unable to import %s' % engine)
        except AttributeError:
            raise RuntimeError('Database engine not found %s' % engine)
        except AssertionError:
            raise RuntimeError('Database engine not a subclass of '
                               'peewee.Database: %s' % engine)

        return database_class(name, **config_dict)
项目:metrics    作者:Jeremy-Friedman    | 项目源码 | 文件源码
def get_model_class(self):
        if self.database is None:
            raise RuntimeError('Database must be initialized.')

        class BaseModel(Model):
            class Meta:
                database = self.database

        return BaseModel
项目:metrics    作者:Jeremy-Friedman    | 项目源码 | 文件源码
def __init__(self, db_or_model, file_or_name, fields=None,
                 field_names=None, has_header=True, sample_size=10,
                 converter=None, db_table=None, pk_in_csv=False,
                 **reader_kwargs):
        self.file_or_name = file_or_name
        self.fields = fields
        self.field_names = field_names
        self.has_header = has_header
        self.sample_size = sample_size
        self.converter = converter
        self.reader_kwargs = reader_kwargs

        if isinstance(file_or_name, basestring):
            self.filename = file_or_name
        elif isinstance(file_or_name, StringIO):
            self.filename = 'data.csv'
        else:
            self.filename = file_or_name.name

        if isinstance(db_or_model, Database):
            self.database = db_or_model
            self.model = None
            self.db_table = (
                db_table or
                os.path.splitext(os.path.basename(self.filename))[0])
        else:
            self.model = db_or_model
            self.database = self.model._meta.database
            self.db_table = self.model._meta.db_table
            self.fields = self.model._meta.sorted_fields
            self.field_names = self.model._meta.sorted_field_names
            # If using an auto-incrementing primary key, ignore it unless we
            # are told the primary key is included in the CSV.
            if self.model._meta.auto_increment and not pk_in_csv:
                self.fields = self.fields[1:]
                self.field_names = self.field_names[1:]
项目:metrics    作者:Jeremy-Friedman    | 项目源码 | 文件源码
def _load_database(self, app, config_value):
        if isinstance(config_value, Database):
            database = config_value
        elif isinstance(config_value, dict):
            database = self._load_from_config_dict(dict(config_value))
        else:
            # Assume a database connection URL.
            database = db_url_connect(config_value)

        if isinstance(self.database, Proxy):
            self.database.initialize(database)
        else:
            self.database = database
项目:metrics    作者:Jeremy-Friedman    | 项目源码 | 文件源码
def _load_from_config_dict(self, config_dict):
        try:
            name = config_dict.pop('name')
            engine = config_dict.pop('engine')
        except KeyError:
            raise RuntimeError('DATABASE configuration must specify a '
                               '`name` and `engine`.')

        if '.' in engine:
            path, class_name = engine.rsplit('.', 1)
        else:
            path, class_name = 'peewee', engine

        try:
            __import__(path)
            module = sys.modules[path]
            database_class = getattr(module, class_name)
            assert issubclass(database_class, Database)
        except ImportError:
            raise RuntimeError('Unable to import %s' % engine)
        except AttributeError:
            raise RuntimeError('Database engine not found %s' % engine)
        except AssertionError:
            raise RuntimeError('Database engine not a subclass of '
                               'peewee.Database: %s' % engine)

        return database_class(name, **config_dict)
项目:metrics    作者:Jeremy-Friedman    | 项目源码 | 文件源码
def get_model_class(self):
        if self.database is None:
            raise RuntimeError('Database must be initialized.')

        class BaseModel(Model):
            class Meta:
                database = self.database

        return BaseModel
项目:metrics    作者:Jeremy-Friedman    | 项目源码 | 文件源码
def __init__(self, db_or_model, file_or_name, fields=None,
                 field_names=None, has_header=True, sample_size=10,
                 converter=None, db_table=None, pk_in_csv=False,
                 **reader_kwargs):
        self.file_or_name = file_or_name
        self.fields = fields
        self.field_names = field_names
        self.has_header = has_header
        self.sample_size = sample_size
        self.converter = converter
        self.reader_kwargs = reader_kwargs

        if isinstance(file_or_name, basestring):
            self.filename = file_or_name
        elif isinstance(file_or_name, StringIO):
            self.filename = 'data.csv'
        else:
            self.filename = file_or_name.name

        if isinstance(db_or_model, Database):
            self.database = db_or_model
            self.model = None
            self.db_table = (
                db_table or
                os.path.splitext(os.path.basename(self.filename))[0])
        else:
            self.model = db_or_model
            self.database = self.model._meta.database
            self.db_table = self.model._meta.db_table
            self.fields = self.model._meta.sorted_fields
            self.field_names = self.model._meta.sorted_field_names
            # If using an auto-incrementing primary key, ignore it unless we
            # are told the primary key is included in the CSV.
            if self.model._meta.auto_increment and not pk_in_csv:
                self.fields = self.fields[1:]
                self.field_names = self.field_names[1:]
项目:aiopeewee    作者:kszucs    | 项目源码 | 文件源码
def get_conn(self):
        if self.closed:
            raise OperationalError('Database pool has not been initialized')

        return AioConnection(self.pool.acquire(),
                             autocommit=self.autocommit,
                             autorollback=self.autorollback,
                             exception_wrapper=self.exception_wrapper)
项目:aiopeewee    作者:kszucs    | 项目源码 | 文件源码
def connect(self, safe=True):
        if self.deferred:
            raise OperationalError('Database has not been initialized')
        if not self.closed:
            if safe:
                return
            raise OperationalError('Connection already open')

        with self.exception_wrapper:
            self.pool = await self._connect(self.database,
                                            **self.connect_kwargs)
            self.closed = False
项目:aiopeewee    作者:kszucs    | 项目源码 | 文件源码
def drop_table(self, model_class, fail_silently=False, cascade=False):
        qc = self.compiler()
        if cascade and not self.drop_cascade:
            raise ValueError('Database does not support DROP TABLE..CASCADE.')
        async with self.get_conn() as conn:
            args = qc.drop_table(model_class, fail_silently, cascade)
            return await conn.execute_sql(*args)