我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用django.db()。
def _ensureConnection(self): # If connection is already made close it. from django.db import connection if connection.connection is not None: connection.close() # Loop forever until a connection can be made. while True: try: connection.ensure_connection() except Exception: log.err(_why=( "Error starting: " "Connection to database cannot be established.")) time.sleep(1) else: # Connection made, now close it. connection.close() break
def uploadDonations(self,donation_list): print("database congress size:",len(Representative.objects.all())) for donation in donation_list: donation_dict = {} rep = Representative.objects.get(propublicaid=donation["propublica_candidate_id"]) sup = SuperPAC.objects.get(fecid=donation["committee_id"]) donation_dict["representative_id"] = rep.id donation_dict["superpac_id"] = sup.id donation_dict["amount"] = donation["amount"] donation_dict["uid"] = donation["unique_id"] donation_dict["support"] = donation["support_or_oppose"] ##Simple try catch block to avoid duplicate donation problems with transaction.atomic(): ##Django 1.5/1.6 transaction bug requires above check try: Donation.objects.create(**donation_dict) except django.db.utils.IntegrityError: pass
def test_undefined(self): from django.db import models from django.db.models import fields class UndefinedClass(models.Model): pass alias = adapter.DjangoClassAlias(UndefinedClass, None) x = UndefinedClass() alias.applyAttributes(x, { 'id': pyamf.Undefined }) self.assertEqual(x.id, fields.NOT_PROVIDED) x.id = fields.NOT_PROVIDED attrs = alias.getEncodableAttributes(x) self.assertEqual(attrs, {'id': pyamf.Undefined})
def test_non_field_prop(self): from django.db import models class Book(models.Model): def _get_number_of_odd_pages(self): return 234 # note the lack of a setter callable .. numberOfOddPages = property(_get_number_of_odd_pages) alias = adapter.DjangoClassAlias(Book, 'Book') x = Book() self.assertEqual( alias.getEncodableAttributes(x), {'numberOfOddPages': 234, 'id': None} ) # now we test sending the numberOfOddPages attribute alias.applyAttributes(x, {'numberOfOddPages': 24, 'id': None}) # test it hasn't been set self.assertEqual(x.numberOfOddPages, 234)
def get_fields_from_path(model, path): """ Return list of Fields given path relative to model. e.g. (ModelX, "user__groups__name") -> [ <django.db.models.fields.related.ForeignKey object at 0x...>, <django.db.models.fields.related.ManyToManyField object at 0x...>, <django.db.models.fields.CharField object at 0x...>, ] """ pieces = path.split(LOOKUP_SEP) fields = [] for piece in pieces: if fields: parent = get_model_from_relation(fields[-1]) else: parent = model fields.append(parent._meta.get_field(piece)) return fields
def test_db_type(self): """ Test simple output of the field's overridden "db_type" method. """ for test_config in test_utils.FC_TEST_CONFIGS: field_kwarg_string = test_utils.create_dict_string(test_config.kwargs_dict) for db_alias in test_utils.get_db_aliases(): db_connection = django.db.connections[db_alias] db_backend = db_connection.settings_dict['ENGINE'] with self.subTest(backend=db_backend, kwargs=field_kwarg_string): field = forcedfields.FixedCharField(**test_config.kwargs_dict) returned_db_type = field.db_type(db_connection) expected_db_type = test_config.db_type_dict[db_alias] self.assertEqual(returned_db_type, expected_db_type)
def test_insert(self): """ Test that insert operations produce expected results. """ for test_config in test_utils.FC_TEST_CONFIGS: kwargs_string = test_utils.create_dict_string(test_config.kwargs_dict) model_class_name = test_utils.get_fc_model_class_name(**test_config.kwargs_dict) model_class = getattr(test_models, model_class_name) for insert_value, expected_value in test_config.insert_values_dict.items(): for db_alias in test_utils.get_db_aliases(): db_backend = django.db.connections[db_alias].settings_dict['ENGINE'] with self.subTest( backend=db_backend, kwargs=kwargs_string, insert_value=insert_value ): self._test_insert_dict( db_alias, model_class, test_utils.FC_FIELD_ATTRNAME, insert_value, expected_value )
def test_insert(self): """ Test that the values saved during INSERT operations are correct. """ for test_config in test_utils.TS_TEST_CONFIGS: kwargs_string = test_utils.create_dict_string(test_config.kwargs_dict) model_class_name = test_utils.get_ts_model_class_name(**test_config.kwargs_dict) model_class = getattr(test_models, model_class_name) for insert_value, expected_value in test_config.insert_values_dict.items(): for db_alias in test_utils.get_db_aliases(): db_backend = django.db.connections[db_alias].settings_dict['ENGINE'] with self.subTest( backend=db_backend, kwargs=kwargs_string, insert_value=insert_value ): self._test_insert_dict( db_alias, model_class, test_utils.TS_FIELD_ATTRNAME, insert_value, expected_value )
def test_update(self): """ Test that an UPDATE statement works correctly in specific cases. Test that the timestamp field value is unchanged when only auto_now_add is enabled and test that the timestamp field is automatically updated when only auto_now_update is enabled. Unfortunately, this test is impossible to isolate from any side effects of a broken INSERT operation in the timestamp field. """ for alias in test_utils.get_db_aliases(): engine = django.db.connections[alias].settings_dict['ENGINE'] with self.subTest(backend=engine): self._test_update_no_auto(alias) self._test_update_auto(alias)
def test_base_manager(self): def show_base_manager(model): return "{0} {1}".format( repr(type(model._base_manager)), repr(model._base_manager.model) ) self.assertEqual(show_base_manager(PlainA), "<class 'django.db.models.manager.Manager'> <class 'polymorphic.tests.PlainA'>") self.assertEqual(show_base_manager(PlainB), "<class 'django.db.models.manager.Manager'> <class 'polymorphic.tests.PlainB'>") self.assertEqual(show_base_manager(PlainC), "<class 'django.db.models.manager.Manager'> <class 'polymorphic.tests.PlainC'>") self.assertEqual(show_base_manager(Model2A), "<class 'polymorphic.managers.PolymorphicManager'> <class 'polymorphic.tests.Model2A'>") self.assertEqual(show_base_manager(Model2B), "<class 'django.db.models.manager.Manager'> <class 'polymorphic.tests.Model2B'>") self.assertEqual(show_base_manager(Model2C), "<class 'django.db.models.manager.Manager'> <class 'polymorphic.tests.Model2C'>") self.assertEqual(show_base_manager(One2OneRelatingModel), "<class 'polymorphic.managers.PolymorphicManager'> <class 'polymorphic.tests.One2OneRelatingModel'>") self.assertEqual(show_base_manager(One2OneRelatingModelDerived), "<class 'django.db.models.manager.Manager'> <class 'polymorphic.tests.One2OneRelatingModelDerived'>")
def test_instance_default_manager(self): def show_default_manager(instance): return "{0} {1}".format( repr(type(instance._default_manager)), repr(instance._default_manager.model) ) plain_a = PlainA(field1='C1') plain_b = PlainB(field2='C1') plain_c = PlainC(field3='C1') model_2a = Model2A(field1='C1') model_2b = Model2B(field2='C1') model_2c = Model2C(field3='C1') self.assertEqual(show_default_manager(plain_a), "<class 'django.db.models.manager.Manager'> <class 'polymorphic.tests.PlainA'>") self.assertEqual(show_default_manager(plain_b), "<class 'django.db.models.manager.Manager'> <class 'polymorphic.tests.PlainB'>") self.assertEqual(show_default_manager(plain_c), "<class 'django.db.models.manager.Manager'> <class 'polymorphic.tests.PlainC'>") self.assertEqual(show_default_manager(model_2a), "<class 'polymorphic.managers.PolymorphicManager'> <class 'polymorphic.tests.Model2A'>") self.assertEqual(show_default_manager(model_2b), "<class 'polymorphic.managers.PolymorphicManager'> <class 'polymorphic.tests.Model2B'>") self.assertEqual(show_default_manager(model_2c), "<class 'polymorphic.managers.PolymorphicManager'> <class 'polymorphic.tests.Model2C'>")
def django_debug_cleanup(): """Resets Django's list of logged queries. When DJANGO_DEBUG is set to true, Django will log all generated SQL queries in a list, which grows indefinitely. This is ok for short-lived processes; not so much for daemons. We may want those queries in the short-term, but in the long-term the ever-growing list is uninteresting and also bad. This should be called once-in-a-while from every thread that has Django database access, as the queries list is stored in thread-local data. """ query_count = len(django.db.connection.queries) if query_count: runtime = sum_django_queries_runtime() thread= threading.current_thread() _logger.debug("Thread %s/%s: Removing %d logged Django queries " "(total time %.03f):\n%s", thread.ident, thread.name, query_count, runtime, pformat(django.db.connection.queries)) django.db.reset_queries() gc.collect()
def reset_connection_on_interface_error(func): """Decorates function to reset the current thread's Django database connection on exceptions that appear to come from connection resets. """ def _reset(*args, **kwargs): try: return func(*args, **kwargs) except (InterfaceError, OperationalError, DjangoInterfaceError, DjangoOperationalError) as error: thread = threading.current_thread() _logger.warning("it appears this thread's database connection was " "dropped, resetting it now - you may see further " "errors about this until the situation is fully " "resolved for all threads " "(this thread is '%s', error was '%s')", thread.name, error) django.db.connection.connection = None raise ResetDBConnectionError("The database connection was reset", error) return wraps(func)(_reset)
def purge_old_job_log_entries(): """ Purges old job log entries from the ipdevpoll_job_log db table """ cursor = django.db.connection.cursor() # Delete all but the last 100 entries of each netbox/job_name combination, # ordered by timestamp cursor.execute( """ WITH ranked AS (SELECT id, rank() OVER (PARTITION BY netboxid, job_name ORDER BY end_time DESC) FROM ipdevpoll_job_log) DELETE FROM ipdevpoll_job_log USING ranked WHERE ipdevpoll_job_log.id = ranked.id AND rank > 100; """)
def validate_folder(self): """Validates whether a folder can be created. Performs two types of validation: 1. Checks if a DB entry is present. 2. Checks if a physical folder exists in the system.""" unicoded_title = "".join((i if ord(i) < 128 else '_') for i in unidecode(self.title)) parent_folder = self.folder if parent_folder: if ImageFolder.objects.filter(folder=parent_folder, title=self.title).count() > 0: raise ValidationError("Folder exists in the DB!", code='db') folder_path = os.path.join(settings.MEDIA_ROOT, parent_folder.path, unicoded_title) if os.path.isdir(folder_path): raise ValidationError("Folder exists in the OS!", code='os') else: if ImageFolder.objects.filter(folder__isnull=True, title=self.title).count() > 0: raise ValidationError("Folder exists in the DB!", code='db') folder_path = os.path.join(settings.MEDIA_ROOT, IMAGES_FOLDER_NAME, unicoded_title) if os.path.isdir(folder_path): raise ValidationError("Folder exists in the OS!", code='os')
def database_exists(): """Detect if the database exists""" # can't be imported in global scope as they already require # the settings module during import from django.db import DatabaseError from django.core.exceptions import ImproperlyConfigured from wger.manager.models import User try: # TODO: Use another model, the User could be deactivated User.objects.count() except DatabaseError: return False except ImproperlyConfigured: print("Your settings file seems broken") sys.exit(0) else: return True
def safe_move(battle_contubernium_in_turn: BattleContuberniumInTurn, target_distance_function): turn = battle_contubernium_in_turn.battle_turn def tile_availability_test(coords: Coordinates): return True if turn.get_contubernium_in_position(coords) is None else False if target_distance_function(battle_contubernium_in_turn.coordinates()) > 0: path = find_path(battle_contubernium_in_turn, target_distance_function, tile_availability_test) if len(path) > 1: battle_contubernium_in_turn.moved_this_turn = True battle_contubernium_in_turn.x_pos = path[1].x battle_contubernium_in_turn.z_pos = path[1].z #TODO WARNING: HORRIBLE HACK STARTS HERE #(to avoid unique constraint errors when contubs overlap for some reason) with transaction.atomic(): try: battle_contubernium_in_turn.save() except django.db.utils.IntegrityError as e: pass
def get_fields_from_path(model, path): """ Return list of Fields given path relative to model. e.g. (ModelX, "user__groups__name") -> [ <django.db.models.fields.related.ForeignKey object at 0x...>, <django.db.models.fields.related.ManyToManyField object at 0x...>, <django.db.models.fields.CharField object at 0x...>, ] """ pieces = path.split(LOOKUP_SEP) fields = [] for piece in pieces: if fields: parent = get_model_from_relation(fields[-1]) else: parent = model fields.append(parent._meta.get_field_by_name(piece)[0]) return fields
def update(self, **fields): """Updates all rows that match the filter.""" # build up the query to execute self._for_write = True query = self.query.clone(UpdateQuery) query._annotations = None query.add_update_values(fields) # build the compiler for for the query connection = django.db.connections[self.db] compiler = PostgresReturningUpdateCompiler(query, connection, self.db) # execute the query with transaction.atomic(using=self.db, savepoint=False): rows = compiler.execute_sql(CURSOR) self._result_cache = None # send out a signal for each row for row in rows: signals.update.send(self.model, pk=row[0]) # the original update(..) returns the amount of rows # affected, let's do the same return len(rows)
def __init__(self, *args, **kwargs): """Initializes a new instance of :see:PostgresManager.""" super(PostgresManager, self).__init__(*args, **kwargs) # make sure our back-end is set and refuse to proceed # if it's not set db_backend = settings.DATABASES['default']['ENGINE'] if 'psqlextra' not in db_backend: raise ImproperlyConfigured(( '\'%s\' is not the \'psqlextra.backend\'. ' 'django-postgres-extra cannot function without ' 'the \'psqlextra.backend\'. Set DATABASES.ENGINE.' ) % db_backend) # hook into django signals to then trigger our own django.db.models.signals.post_save.connect( self._on_model_save, sender=self.model, weak=False) django.db.models.signals.pre_delete.connect( self._on_model_delete, sender=self.model, weak=False) self._signals_connected = True
def to_fields(qs, fieldnames): for fieldname in fieldnames: model = qs.model for fieldname_part in fieldname.split('__'): try: field = model._meta.get_field(fieldname_part) except django.db.models.fields.FieldDoesNotExist: rels = model._meta.get_all_related_objects_with_model() for relobj, _ in rels: if relobj.get_accessor_name() == fieldname_part: field = relobj.field model = field.model break else: if (hasattr(field, "one_to_many") and field.one_to_many) or (hasattr(field, "one_to_one") and field.one_to_one): model = field.related_model elif field.get_internal_type() in ('ForeignKey', 'OneToOneField', 'ManyToManyField'): model = field.rel.to yield field
def uploadRepresentatives(self,congress_list): for congressman in congress_list["house"]['results'][0]['members']: congress_dict = {} #personal details congress_dict["propublicaid"] = congressman['id'] congress_dict["first_name"] = congressman['first_name'] congress_dict["last_name"] = congressman['last_name'] #office details congress_dict["district"] = congressman['district'] congress_dict["state"] = congressman['state'] congress_dict["party"] = congressman['party'] congress_dict["chamber"] = "H" ##Simple try catch block to avoid duplicate congressman problems with transaction.atomic(): ##Django 1.5/1.6 transaction bug requires above check try: Representative.objects.create(**congress_dict) except django.db.utils.IntegrityError: pass for senator in congress_list["senate"]['results'][0]['members']: senator_dict = {}#personal details senator_dict["propublicaid"] = senator['id'] senator_dict["first_name"] = senator['first_name'] senator_dict["last_name"] = senator['last_name'] #office details senator_dict["state"] = senator['state'] senator_dict["party"] = senator['party'] senator_dict["chamber"] = "S" ##Simple try catch block to avoid duplicate senator problems with transaction.atomic(): ##Django 1.5/1.6 transaction bug requires above check try: Representative.objects.create(**senator_dict) except django.db.utils.IntegrityError: pass
def uploadSuperPACs(self,superpac_list): for superpac in superpac_list: superpac_dict = {} superpac_dict["name"]=superpac["name"] superpac_dict["fecid"]=superpac["committee_id"] ##Simple try catch block to avoid duplicate superpac problems with transaction.atomic(): ##Django 1.5/1.6 transaction bug requires above check try: SuperPAC.objects.create(**superpac_dict) except django.db.utils.IntegrityError: pass
def check_migrations(): from django.db.migrations.autodetector import MigrationAutodetector from django.db.migrations.executor import MigrationExecutor from django.db.migrations.state import ProjectState changed = set() print("Checking {} migrations...".format(APP_NAME)) for db in settings.DATABASES.keys(): try: executor = MigrationExecutor(connections[db]) except OperationalError as e: sys.exit( "Unable to check migrations due to database: {}".format(e) ) autodetector = MigrationAutodetector( executor.loader.project_state(), ProjectState.from_apps(apps), ) changed.update( autodetector.changes(graph=executor.loader.graph).keys() ) if changed and APP_NAME in changed: sys.exit( "A migration file is missing. Please run " "`python makemigrations.py` to generate it." ) else: print("All migration files present.")
def test_NOT_PROVIDED(self): from django.db.models import fields self.assertEqual( pyamf.encode(fields.NOT_PROVIDED, encoding=pyamf.AMF0).getvalue(), '\x06' ) encoder = pyamf.get_encoder(pyamf.AMF3) encoder.writeElement(fields.NOT_PROVIDED) self.assertEqual(encoder.stream.getvalue(), '\x00')
def test_properties(self): """ See #764 """ from django.db import models class Foob(models.Model): def _get_days(self): return 1 def _set_days(self, val): assert 1 == val days = property(_get_days, _set_days) alias = adapter.DjangoClassAlias(Foob, 'Bar') x = Foob() self.assertEqual(x.days, 1) self.assertEqual( alias.getEncodableAttributes(x), {'days': 1, 'id': None} ) # now we test sending the numberOfOddPages attribute alias.applyAttributes(x, {'id': None})
def check_migrations(self): """ Print a warning if the set of migrations on disk don't match the migrations in the database. """ from django.db.migrations.executor import MigrationExecutor try: executor = MigrationExecutor(connections[DEFAULT_DB_ALIAS]) except ImproperlyConfigured: # No databases are configured (or the dummy one) return except MigrationSchemaMissing: self.stdout.write(self.style.NOTICE( "\nNot checking migrations as it is not possible to access/create the django_migrations table." )) return plan = executor.migration_plan(executor.loader.graph.leaf_nodes()) if plan: apps_waiting_migration = sorted(set(migration.app_label for migration, backwards in plan)) self.stdout.write( self.style.NOTICE( "\nYou have %(unpplied_migration_count)s unapplied migration(s). " "Your project may not work properly until you apply the " "migrations for app(s): %(apps_waiting_migration)s." % { "unpplied_migration_count": len(plan), "apps_waiting_migration": ", ".join(apps_waiting_migration), } ) ) self.stdout.write(self.style.NOTICE("Run 'python manage.py migrate' to apply them.\n"))
def test_order_by_lower(self): from django.db.models.functions import Lower c = Category.objects.create(name='test') Blog.objects.create(title='A', title_nl='c', category=c) Blog.objects.create(title='a', title_nl='b', category=c) filtered = Blog.objects.filter(category=c) # order by title should result in aA because it is case sensitive. qs = filtered.order_by('title', 'title_nl') self.assertEquals(key(qs, 'title'), ['a', 'A']) # order by Lower('title') should result in Aa because lower('A') == lower('A') # so the title_nl field should determine the sorting qs = filtered.order_by(Lower('title'), 'title_nl') self.assertEquals(key(qs, 'title'), ['a', 'A']) # applying lower to title_nl should not matter since it is not the same letter qs = filtered.order_by(Lower('title_nl')) self.assertEquals(key(qs, 'title'), ['a', 'A']) # should be the same as previous with override('nl'): qs = filtered.order_by(Lower('title_i18n')) self.assertEquals(key(qs, 'title'), ['a', 'A'])
def test_values_kwarg_lower(self): from django.db.models.functions import Lower qs1 = Blog.objects.values(lower_name=Lower('category__name')) qs2 = Blog.objects.values(lower_name=Lower('category__name_en')) self.assertEquals(list(qs1), list(qs2))
def aggregate(self, *args, **kwargs): """ Returns a dictionary containing the calculations (aggregation) over the current queryset If args is present the expression is passed as a kwarg using the Aggregate object's default alias. """ if self.query.distinct_fields: raise NotImplementedError("aggregate() + distinct(fields) not implemented.") for arg in args: # The default_alias property may raise a TypeError, so we use # a try/except construct rather than hasattr in order to remain # consistent between PY2 and PY3 (hasattr would swallow # the TypeError on PY2). try: arg.default_alias except (AttributeError, TypeError): raise TypeError("Complex aggregates require an alias") kwargs[arg.default_alias] = arg if django.VERSION < (2, 0): query = self.query.clone(CTEAggregateQuery) else: query = self.query.chain(CTEAggregateQuery) for (alias, aggregate_expr) in kwargs.items(): query.add_annotation(aggregate_expr, alias, is_summary=True) if not query.annotations[alias].contains_aggregate: raise TypeError("%s is not an aggregate expression" % alias) return query.get_aggregation(self.db, kwargs.keys())
def distinct(queryset, base): if settings.DATABASES[queryset.db]["ENGINE"] == "django.db.backends.oracle": # distinct analogue for Oracle users return base.filter(pk__in=set(queryset.values_list('pk', flat=True))) return queryset.distinct() # Obtaining manager instances and names from model options differs after 1.10.
def managed_transaction(func): """ This decorator wraps a function so that all sql executions in the function are atomic It's used instead of django.db.transaction.commit_on_success in cases where reporting exceptions is necessary as commit_on_success swallows exceptions """ @wraps(func) @transaction.commit_manually def _inner(*args, **kwargs): try: ret = func(*args, **kwargs) except Exception: transaction.rollback() raise else: transaction.commit() return ret return _inner