我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用django.conf.settings.DATABASES。
def handle(self, *args, **options): self.stdout.write(self.style.SUCCESS('Starting Schema creation..')) dbname = settings.DATABASES['default']['NAME'] user = settings.DATABASES['default']['USER'] password = settings.DATABASES['default']['PASSWORD'] host = settings.DATABASES['default']['HOST'] con = connect(dbname=dbname, user=user, host=host, password=password) self.stdout.write(self.style.SUCCESS('Adding schema {schema} to database {dbname}' .format(schema=settings.SCHEMA, dbname=dbname))) con.set_isolation_level(ISOLATION_LEVEL_AUTOCOMMIT) cur = con.cursor() cur.execute('CREATE SCHEMA {schema};'.format(schema=settings.SCHEMA)) cur.close() con.close() self.stdout.write(self.style.SUCCESS('All Done!'))
def handle(self, *args, **options): self.stdout.write(self.style.SUCCESS('Starting Schema deletion..')) dbname = settings.DATABASES['default']['NAME'] user = settings.DATABASES['default']['USER'] password = settings.DATABASES['default']['PASSWORD'] host = settings.DATABASES['default']['HOST'] con = connect(dbname=dbname, user=user, host=host, password=password) self.stdout.write(self.style.SUCCESS('Removing schema {schema} from database {dbname}' .format(schema=settings.SCHEMA, dbname=dbname))) con.set_isolation_level(ISOLATION_LEVEL_AUTOCOMMIT) cur = con.cursor() cur.execute('DROP SCHEMA {schema} CASCADE;'.format(schema=settings.SCHEMA)) cur.close() con.close() self.stdout.write(self.style.SUCCESS('All Done.'))
def handle(self, *args, **options): self.stdout.write(self.style.SUCCESS('Starting DB creation..')) dbname = settings.DATABASES['default']['NAME'] user = settings.DATABASES['default']['USER'] password = settings.DATABASES['default']['PASSWORD'] host = settings.DATABASES['default']['HOST'] self.stdout.write(self.style.SUCCESS('Connecting to host..')) con = connect(dbname='postgres', user=user, host=host, password=password) con.set_isolation_level(ISOLATION_LEVEL_AUTOCOMMIT) self.stdout.write(self.style.SUCCESS('Creating database')) cur = con.cursor() cur.execute('CREATE DATABASE ' + dbname) cur.close() con.close() self.stdout.write(self.style.SUCCESS('All done!'))
def get_config(): """ Gets engine type from Django settings """ DB = settings.DATABASES['default'] ENGINE = DB.get('ENGINE', '') config = {} if 'postgresql' in ENGINE \ or 'psycopg' in ENGINE: config['engine'] = ENGINE_POSTGRESQL elif 'mysql' in ENGINE: config['engine'] = ENGINE_MYSQL else: raise BadConfig('Django configured with unsupported database engine: ' '%s' % DB.get('ENGINE', '')) return config
def get_connection_params(self): settings_dict = self.settings_dict # None may be used to connect to the default 'postgres' db if settings_dict['NAME'] == '': raise ImproperlyConfigured( "settings.DATABASES is improperly configured. " "Please supply the NAME value.") conn_params = { 'database': settings_dict['NAME'] or 'postgres', } conn_params.update(settings_dict['OPTIONS']) conn_params.pop('isolation_level', None) if settings_dict['USER']: conn_params['user'] = settings_dict['USER'] if settings_dict['PASSWORD']: conn_params['password'] = force_str(settings_dict['PASSWORD']) if settings_dict['HOST']: conn_params['host'] = settings_dict['HOST'] if settings_dict['PORT']: conn_params['port'] = settings_dict['PORT'] return conn_params
def _nodb_connection(self): nodb_connection = super(DatabaseWrapper, self)._nodb_connection try: nodb_connection.ensure_connection() except (DatabaseError, WrappedDatabaseError): warnings.warn( "Normally Django will use a connection to the 'postgres' database " "to avoid running initialization queries against the production " "database when it's not needed (for example, when running tests). " "Django was unable to create a connection to the 'postgres' database " "and will use the default database instead.", RuntimeWarning ) settings_dict = self.settings_dict.copy() settings_dict['NAME'] = settings.DATABASES[DEFAULT_DB_ALIAS]['NAME'] nodb_connection = self.__class__( self.settings_dict.copy(), alias=self.alias, allow_thread_sharing=False) return nodb_connection
def _switch_to_test_user(self, parameters): """ Oracle doesn't have the concept of separate databases under the same user. Thus, we use a separate user (see _create_test_db). This method is used to switch to that user. We will need the main user again for clean-up when we end testing, so we keep its credentials in SAVED_USER/SAVED_PASSWORD entries in the settings dict. """ real_settings = settings.DATABASES[self.connection.alias] real_settings['SAVED_USER'] = self.connection.settings_dict['SAVED_USER'] = \ self.connection.settings_dict['USER'] real_settings['SAVED_PASSWORD'] = self.connection.settings_dict['SAVED_PASSWORD'] = \ self.connection.settings_dict['PASSWORD'] real_test_settings = real_settings['TEST'] test_settings = self.connection.settings_dict['TEST'] real_test_settings['USER'] = real_settings['USER'] = test_settings['USER'] = \ self.connection.settings_dict['USER'] = parameters['user'] real_settings['PASSWORD'] = self.connection.settings_dict['PASSWORD'] = parameters['password']
def __init__(self, db_settings=None, db_backup_name=None): """ Constructor Args: db_settings (dict): A dict of database settings db_backup_name (str): The name that will be given to the backup database """ self.db_settings = db_settings or settings.DATABASES['default'] self.db_name = self.db_settings['NAME'] if self.db_name[0:5] != 'test_': raise Exception( "The test suite is attempting to use the database '{}'." "The test database should have a name that begins with 'test_'. Exiting...".format(self.db_name) ) self.db_backup_name = db_backup_name or getattr(settings, 'BACKUP_DB_NAME', self.DEFAULT_BACKUP_DB_NAME) self.db_cmd_args = [ "-h", self.db_settings['HOST'], "-p", str(self.db_settings['PORT']), "-U", self.db_settings['USER'] ]
def pg_dump(file_location): env = os.environ.copy() env.update({ # requires having password set to test "PGPASSWORD": settings.DATABASES['default']['PASSWORD'] }) pg_dump = [ 'pg_dump', '-h%s' % settings.DATABASES[settings.CLIPS_DATABASE_ALIAS]['HOST'], '-U%s' % settings.DATABASES['default']['USER'], settings.DATABASES['default']['NAME'] ] with Popen(pg_dump, env=env, stdout=PIPE, stderr=STDOUT, bufsize=1 ) as task, open(file_location, 'wb') as f: for line in task.stdout: f.write(line) return task.wait()
def dbs_by_environment(environment, write_only=True): """ Retrieve all database aliases that contain the given environment. Args: environment (str): The environment the databases must contain. write_only (Optional[bool]): Exclude any read-only databases. Returns: Set of aliases. """ possible = set() for alias in settings.DATABASES: if write_only and is_read_db(alias): continue if environment in settings.DATABASES[alias]['ENVIRONMENTS']: possible.add(alias) return possible
def get_connection_params(self): settings_dict = self.settings_dict # None may be used to connect to the default 'postgres' db if settings_dict['NAME'] == '': from django.core.exceptions import ImproperlyConfigured raise ImproperlyConfigured( "settings.DATABASES is improperly configured. " "Please supply the NAME value.") conn_params = { 'database': settings_dict['NAME'] or 'postgres', } conn_params.update(settings_dict['OPTIONS']) conn_params.pop('isolation_level', None) if settings_dict['USER']: conn_params['user'] = settings_dict['USER'] if settings_dict['PASSWORD']: conn_params['password'] = force_str(settings_dict['PASSWORD']) if settings_dict['HOST']: conn_params['host'] = settings_dict['HOST'] if settings_dict['PORT']: conn_params['port'] = settings_dict['PORT'] return conn_params
def destroy_test_db(self, old_database_name, verbosity=1, keepdb=False): """ Destroy a test database, prompting the user for confirmation if the database already exists. """ self.connection.close() test_database_name = self.connection.settings_dict['NAME'] if verbosity >= 1: test_db_repr = '' action = 'Destroying' if verbosity >= 2: test_db_repr = " ('%s')" % test_database_name if keepdb: action = 'Preserving' print("%s test database for alias '%s'%s..." % ( action, self.connection.alias, test_db_repr)) # if we want to preserve the database # skip the actual destroying piece. if not keepdb: self._destroy_test_db(test_database_name, verbosity) # Restore the original database name settings.DATABASES[self.connection.alias]["NAME"] = old_database_name self.connection.settings_dict["NAME"] = old_database_name
def back_up_database(self, backup_storage, temp_backup_path): logger.info('Start backing up the database.') file_path = '{database}_{timestamp}.dump'.format( database=settings.DATABASES['default']['NAME'], timestamp=self.timestamp ) temp_file_path = '{backup_path}/{file_path}'.format(backup_path=temp_backup_path, file_path=file_path) # Run the `pg_dump` command. os.system('pg_dump -h {host} -U {user} {database} > {file_path}'.format( host=settings.DATABASES['default']['HOST'], user=settings.DATABASES['default']['USER'], database=settings.DATABASES['default']['NAME'], file_path=temp_file_path )) # Store the dump file on the backup bucket. with open(temp_file_path, 'rb') as database_backup_file: target_file_path = '{timestamp}/{path}'.format(timestamp=self.timestamp, path=file_path) backup_storage.save(target_file_path, database_backup_file) logger.info('Database dump successfully copied to the target storage backend.')
def _nodb_connection(self): nodb_connection = super(DatabaseWrapper, self)._nodb_connection try: nodb_connection.ensure_connection() except (Database.DatabaseError, WrappedDatabaseError): warnings.warn( "Normally Django will use a connection to the 'postgres' database " "to avoid running initialization queries against the production " "database when it's not needed (for example, when running tests). " "Django was unable to create a connection to the 'postgres' database " "and will use the default database instead.", RuntimeWarning ) settings_dict = self.settings_dict.copy() settings_dict['NAME'] = settings.DATABASES[DEFAULT_DB_ALIAS]['NAME'] nodb_connection = self.__class__( self.settings_dict.copy(), alias=self.alias, allow_thread_sharing=False) return nodb_connection
def ready(self): for db in settings.DATABASES: name = db['NAME'] db_wrapper_class = import_string(db['ENGINE'] + '.base.DatabaseWrapper') base_model = getattr(db_wrapper_class, 'base_model', None) if base_model: models = apps.get_models() for model in models: if name == router.db_for_read(model): for k, v in base_model.__dict__: if k == 'objects': model_manager = getattr(model, 'objects', None) if model_manager: manager_cls = model_manager.__class__ custom_cls = v.__class__ new_manager = type('AnyBackendCustomManager', (custom_cls, manager_cls), {}) setattr(model, 'objects', new_manager()) else: setattr(model, 'objects', v) elif not k.startswith('__'): setattr(model, k ,v)
def handle(self, *args, **options): try: connection = connections[options['database']] cursor = connection.cursor() database_settings = settings.DATABASES[options['database']] except ConnectionDoesNotExist: raise CommandError('Database "%s" does not exist in settings' % options['database']) if connection.vendor == 'sqlite': print("Deleting database %s" % database_settings['NAME']) os.remove(database_settings['NAME']) elif connection.vendor == 'mysql': print("Dropping database %s" % database_settings['NAME']) cursor.execute("DROP DATABASE `%s`;" % database_settings['NAME']) print("Creating database %s" % database_settings['NAME']) cursor.execute("CREATE DATABASE `%s` CHARACTER SET utf8;" % database_settings['NAME']) # Should fix some "MySQL has gone away issues" cursor.execute("SET GLOBAL max_allowed_packet=32*1024*1024;") elif connection.vendor == 'postgresql': print("Dropping and recreating schema public") cursor.execute("DROP schema public CASCADE; CREATE schema public") else: raise CommandError('Database vendor not supported')
def line_strain_changed(sender, instance, action, reverse, model, pk_set, using, **kwargs): """ Handles changes to the Line <-> Strain relationship caused by adding/removing/changing the strain associated with a single line in a study. Detects changes that indicate a need to push changes across to ICE for the (ICE part -> EDD study) link stored in ICE. """ # only care about changes in the forward direction, Line -> Strain if reverse or check_ice_cannot_proceed(): return # only execute these signals if using a non-testing database if using in settings.DATABASES: action_function = { 'post_add': strain_added, 'pre_remove': strain_removing, 'post_remove': strain_removed, }.get(action, None) if action_function: action_function(instance, pk_set) # ----- helper functions -----
def modify_row(self, decoder): modulo = importlib.import_module(self.db+".models") clase_mane = decoder["tb"] for s in dir(modulo): if s.lower() == decoder["tb"]: clase_mane = s class_model = getattr(modulo, clase_mane) if "condition" in decoder: db_name = settings.DATABASES["default"]["NAME"] row_id = Model(db_name=db_name, table_name=self.db+"_"+decoder['tb']) row_id.load_first_by_query(**decoder["condition"]) decoder["fields"]["id"] = row_id.id if 'id' in decoder["fields"]: try: row = class_model.objects.get(pk=decoder['fields']['id']) except: row = class_model() else: row = class_model() for k, v in decoder["fields"].items(): setattr(row, k, v) return row
def __init__(self): self.database_key = 'data' self.database_config = settings.DATABASES['data'] try: database_adapter_class = import_class(settings.ADAPTER_DATABASE) self.database = database_adapter_class(self.database_key, self.database_config) except AttributeError: if self.database_config['ENGINE'] == 'django.db.backends.mysql': self.database = MySQLAdapter(self.database_key, self.database_config) else: raise Exception('No suitable database adapter found.') try: download_adapter_class = import_class(settings.ADAPTER_DOWNLOAD) self.download = download_adapter_class(self.database_key, self.database_config) except AttributeError: if self.database_config['ENGINE'] == 'django.db.backends.mysql': self.download = MysqldumpAdapter(self.database_key, self.database_config) else: raise Exception('No suitable download adapter found.')
def _extract_django16_south_maas19(cls): """Extract the django16, south, and MAAS 1.9 source code in to a temp path.""" path_to_tarball = cls._path_to_django16_south_maas19() tempdir = tempfile.mkdtemp(prefix='maas-upgrade-') subprocess.check_call([ "tar", "zxf", path_to_tarball, "-C", tempdir]) settings_json = os.path.join(tempdir, "maas19settings.json") with open(settings_json, "w", encoding="utf-8") as fd: fd.write(json.dumps({"DATABASES": settings.DATABASES})) script_path = os.path.join(tempdir, "migrate.py") with open(script_path, "wb") as fp: fp.write(MAAS_UPGRADE_SCRIPT.encode("utf-8")) return tempdir, script_path
def __init__(self, *args, **kwargs): """Initializes a new instance of :see:PostgresManager.""" super(PostgresManager, self).__init__(*args, **kwargs) # make sure our back-end is set and refuse to proceed # if it's not set db_backend = settings.DATABASES['default']['ENGINE'] if 'psqlextra' not in db_backend: raise ImproperlyConfigured(( '\'%s\' is not the \'psqlextra.backend\'. ' 'django-postgres-extra cannot function without ' 'the \'psqlextra.backend\'. Set DATABASES.ENGINE.' ) % db_backend) # hook into django signals to then trigger our own django.db.models.signals.post_save.connect( self._on_model_save, sender=self.model, weak=False) django.db.models.signals.pre_delete.connect( self._on_model_delete, sender=self.model, weak=False) self._signals_connected = True
def django_db_setup(): settings.DATABASES['default']['name'] = os.path.join( settings.BASE_DIR, 'db.sqlite3')
def _maindb_connection(self): """ This is analogous to other backends' `_nodb_connection` property, which allows access to an "administrative" connection which can be used to manage the test databases. For Oracle, the only connection that can be used for that purpose is the main (non-test) connection. """ settings_dict = settings.DATABASES[self.connection.alias] user = settings_dict.get('SAVED_USER') or settings_dict['USER'] password = settings_dict.get('SAVED_PASSWORD') or settings_dict['PASSWORD'] settings_dict = settings_dict.copy() settings_dict.update(USER=user, PASSWORD=password) DatabaseWrapper = type(self.connection) return DatabaseWrapper(settings_dict, alias=self.connection.alias)
def get_connection_params(self): settings_dict = self.settings_dict if not settings_dict['NAME']: from django.core.exceptions import ImproperlyConfigured raise ImproperlyConfigured( "settings.DATABASES is improperly configured. " "Please supply the NAME value.") kwargs = { 'database': settings_dict['NAME'], 'detect_types': Database.PARSE_DECLTYPES | Database.PARSE_COLNAMES, } kwargs.update(settings_dict['OPTIONS']) # Always allow the underlying SQLite connection to be shareable # between multiple threads. The safe-guarding will be handled at a # higher level by the `BaseDatabaseWrapper.allow_thread_sharing` # property. This is necessary as the shareability is disabled by # default in pysqlite and it cannot be changed once a connection is # opened. if 'check_same_thread' in kwargs and kwargs['check_same_thread']: warnings.warn( 'The `check_same_thread` option was provided and set to ' 'True. It will be overridden with False. Use the ' '`DatabaseWrapper.allow_thread_sharing` property instead ' 'for controlling thread shareability.', RuntimeWarning ) kwargs.update({'check_same_thread': False}) if self.features.can_share_in_memory_db: kwargs.update({'uri': True}) return kwargs
def destroy_test_db(self, old_database_name=None, verbosity=1, keepdb=False, number=None): """ Destroy a test database, prompting the user for confirmation if the database already exists. """ self.connection.close() if number is None: test_database_name = self.connection.settings_dict['NAME'] else: test_database_name = self.get_test_db_clone_settings(number)['NAME'] if verbosity >= 1: action = 'Destroying' if keepdb: action = 'Preserving' print("%s test database for alias %s..." % ( action, self._get_database_display_str(verbosity, test_database_name), )) # if we want to preserve the database # skip the actual destroying piece. if not keepdb: self._destroy_test_db(test_database_name, verbosity) # Restore the original database name if old_database_name is not None: settings.DATABASES[self.connection.alias]["NAME"] = old_database_name self.connection.settings_dict["NAME"] = old_database_name
def test_db_signature(self): """ Returns a tuple with elements of self.connection.settings_dict (a DATABASES setting value) that uniquely identify a database accordingly to the RDBMS particularities. """ settings_dict = self.connection.settings_dict return ( settings_dict['HOST'], settings_dict['PORT'], settings_dict['ENGINE'], settings_dict['NAME'] )
def __init__(self, databases=None): """ databases is an optional dictionary of database definitions (structured like settings.DATABASES). """ self._databases = databases self._connections = local()
def databases(self): if self._databases is None: self._databases = settings.DATABASES if self._databases == {}: self._databases = { DEFAULT_DB_ALIAS: { 'ENGINE': 'django.db.backends.dummy', }, } if self._databases[DEFAULT_DB_ALIAS] == {}: self._databases[DEFAULT_DB_ALIAS]['ENGINE'] = 'django.db.backends.dummy' if DEFAULT_DB_ALIAS not in self._databases: raise ImproperlyConfigured("You must define a '%s' database" % DEFAULT_DB_ALIAS) return self._databases
def current_engine(self): try: return settings.DATABASES[self.db]['ENGINE'] except AttributeError: return settings.DATABASE_ENGINE
def distinct(queryset, base): if settings.DATABASES[queryset.db]["ENGINE"] == "django.db.backends.oracle": # distinct analogue for Oracle users return base.filter(pk__in=set(queryset.values_list('pk', flat=True))) return queryset.distinct() # Obtaining manager instances and names from model options differs after 1.10.
def check(self): for db in settings.DATABASES: try: connections[db].introspection.table_names() except Exception as exc: self.add_error('Could not connect to {}: {}'.format(db, exc))
def pg_load(file_location): env = os.environ.copy() env.update({ # requires having password set to test "PGPASSWORD": settings.DATABASES['default']['PASSWORD'] }) load = [ 'psql', '-h%s' % settings.DATABASES['default']['HOST'], '-U%s' % settings.DATABASES['default']['USER'], '-d%s' % settings.DATABASES['default']['NAME'], '-f%s' % file_location, ] task = Popen(load, env=env) return task.wait()