我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用django.db.utils.DatabaseError()。
def schedule_changed(self): try: # If MySQL is running with transaction isolation level # REPEATABLE-READ (default), then we won't see changes done by # other transactions until the current transaction is # committed (Issue #41). try: transaction.commit() except transaction.TransactionManagementError: pass # not in transaction management. last, ts = self._last_timestamp, self.Changes.last_change() except DatabaseError as exc: logger.exception('Database gave error: %r', exc) return False try: if ts and ts > (last if last else ts): return True finally: self._last_timestamp = ts return False
def delete_model(self, model, **kwargs): from django.contrib.gis.db.models.fields import GeometryField # Drop spatial metadata (dropping the table does not automatically remove them) for field in model._meta.local_fields: if isinstance(field, GeometryField): self.remove_geometry_metadata(model, field) # Make sure all geom stuff is gone for geom_table in self.geometry_tables: try: self.execute( self.sql_discard_geometry_columns % { "geom_table": geom_table, "table": self.quote_name(model._meta.db_table), } ) except DatabaseError: pass super(SpatialiteSchemaEditor, self).delete_model(model, **kwargs)
def _nodb_connection(self): nodb_connection = super(DatabaseWrapper, self)._nodb_connection try: nodb_connection.ensure_connection() except (DatabaseError, WrappedDatabaseError): warnings.warn( "Normally Django will use a connection to the 'postgres' database " "to avoid running initialization queries against the production " "database when it's not needed (for example, when running tests). " "Django was unable to create a connection to the 'postgres' database " "and will use the default database instead.", RuntimeWarning ) settings_dict = self.settings_dict.copy() settings_dict['NAME'] = settings.DATABASES[DEFAULT_DB_ALIAS]['NAME'] nodb_connection = self.__class__( self.settings_dict.copy(), alias=self.alias, allow_thread_sharing=False) return nodb_connection
def supports_stddev(self): """Confirm support for STDDEV and related stats functions SQLite supports STDDEV as an extension package; so connection.ops.check_expression_support() can't unilaterally rule out support for STDDEV. We need to manually check whether the call works. """ with self.connection.cursor() as cursor: cursor.execute('CREATE TABLE STDDEV_TEST (X INT)') try: cursor.execute('SELECT STDDEV(*) FROM STDDEV_TEST') has_support = True except utils.DatabaseError: has_support = False cursor.execute('DROP TABLE STDDEV_TEST') return has_support
def validate_thread_sharing(self): """ Validates that the connection isn't accessed by another thread than the one which originally created it, unless the connection was explicitly authorized to be shared between threads (via the `allow_thread_sharing` property). Raises an exception if the validation fails. """ if not (self.allow_thread_sharing or self._thread_ident == thread.get_ident()): raise DatabaseError("DatabaseWrapper objects created in a " "thread can only be used in that same thread. The object " "with alias '%s' was created in thread id %s and this is " "thread id %s." % (self.alias, self._thread_ident, thread.get_ident())) # ##### Miscellaneous #####
def _execute_wrapper(self, method, query, args): """Wrapper around execute() and executemany()""" try: return method(query, args) except (PyMysqlPool.mysql.connector.ProgrammingError) as err: six.reraise(utils.ProgrammingError, utils.ProgrammingError(err.msg), sys.exc_info()[2]) except (PyMysqlPool.mysql.connector.IntegrityError) as err: six.reraise(utils.IntegrityError, utils.IntegrityError(err.msg), sys.exc_info()[2]) except PyMysqlPool.mysql.connector.OperationalError as err: # Map some error codes to IntegrityError, since they seem to be # misclassified and Django would prefer the more logical place. if err.args[0] in self.codes_for_integrityerror: six.reraise(utils.IntegrityError, utils.IntegrityError(err.msg), sys.exc_info()[2]) else: six.reraise(utils.DatabaseError, utils.DatabaseError(err.msg), sys.exc_info()[2]) except PyMysqlPool.mysql.connector.DatabaseError as err: six.reraise(utils.DatabaseError, utils.DatabaseError(err.msg), sys.exc_info()[2])
def _execute_wrapper(self, method, query, args): """Wrapper around execute() and executemany()""" try: return method(query, args) except (mysql.connector.ProgrammingError) as err: six.reraise(utils.ProgrammingError, utils.ProgrammingError(err.msg), sys.exc_info()[2]) except (mysql.connector.IntegrityError) as err: six.reraise(utils.IntegrityError, utils.IntegrityError(err.msg), sys.exc_info()[2]) except mysql.connector.OperationalError as err: # Map some error codes to IntegrityError, since they seem to be # misclassified and Django would prefer the more logical place. if err.args[0] in self.codes_for_integrityerror: six.reraise(utils.IntegrityError, utils.IntegrityError(err.msg), sys.exc_info()[2]) else: six.reraise(utils.DatabaseError, utils.DatabaseError(err.msg), sys.exc_info()[2]) except mysql.connector.DatabaseError as err: six.reraise(utils.DatabaseError, utils.DatabaseError(err.msg), sys.exc_info()[2])
def heartbeat(request): # pylint: disable=unused-argument """ View to check if database is reachable and ready to handle requests. """ try: db_status() except DatabaseError: return JsonResponse( {'OK': False}, status=status.HTTP_500_INTERNAL_SERVER_ERROR ) return JsonResponse( {'OK': True}, status=status.HTTP_200_OK )
def filter_class_ids(): """Wrapper to avoid importing storage_plugin_manager at module scope (it requires DB to construct itself) so that this module can be imported for e.g. building docs without a database. Return a list of storage resource class IDs which are valid for display (i.e. those for which we have a plugin available in this process) """ from psycopg2 import OperationalError from django.db.utils import DatabaseError try: from chroma_core.lib.storage_plugin.manager import storage_plugin_manager return storage_plugin_manager.resource_class_id_to_class.keys() except (OperationalError, DatabaseError): # OperationalError if the DB server can't be contacted # DatabaseError if the DB exists but isn't populated return []
def _nodb_connection(self): nodb_connection = super(DatabaseWrapper, self)._nodb_connection try: nodb_connection.ensure_connection() except (Database.DatabaseError, WrappedDatabaseError): warnings.warn( "Normally Django will use a connection to the 'postgres' database " "to avoid running initialization queries against the production " "database when it's not needed (for example, when running tests). " "Django was unable to create a connection to the 'postgres' database " "and will use the default database instead.", RuntimeWarning ) settings_dict = self.settings_dict.copy() settings_dict['NAME'] = settings.DATABASES[DEFAULT_DB_ALIAS]['NAME'] nodb_connection = self.__class__( self.settings_dict.copy(), alias=self.alias, allow_thread_sharing=False) return nodb_connection
def _execute_allow_fail_statements(self, cursor, statements, parameters, verbosity, acceptable_ora_err): """ Execute statements which are allowed to fail silently if the Oracle error code given by `acceptable_ora_err` is raised. Return True if the statements execute without an exception, or False otherwise. """ try: # Statement can fail when acceptable_ora_err is not None allow_quiet_fail = acceptable_ora_err is not None and len(acceptable_ora_err) > 0 self._execute_statements(cursor, statements, parameters, verbosity, allow_quiet_fail=allow_quiet_fail) return True except DatabaseError as err: description = str(err) if acceptable_ora_err is None or acceptable_ora_err not in description: raise return False
def validate_thread_sharing(self): """ Validates that the connection isn't accessed by another thread than the one which originally created it, unless the connection was explicitly authorized to be shared between threads (via the `allow_thread_sharing` property). Raises an exception if the validation fails. """ if not (self.allow_thread_sharing or self._thread_ident == thread.get_ident()): raise DatabaseError( "DatabaseWrapper objects created in a " "thread can only be used in that same thread. The object " "with alias '%s' was created in thread id %s and this is " "thread id %s." % (self.alias, self._thread_ident, thread.get_ident()) ) # ##### Miscellaneous #####
def get_combinator_sql(self, combinator, all): features = self.connection.features compilers = [ query.get_compiler(self.using, self.connection) for query in self.query.combined_queries ] if not features.supports_slicing_ordering_in_compound: for query, compiler in zip(self.query.combined_queries, compilers): if query.low_mark or query.high_mark: raise DatabaseError('LIMIT/OFFSET not allowed in subqueries of compound statements.') if compiler.get_order_by(): raise DatabaseError('ORDER BY not allowed in subqueries of compound statements.') parts = (compiler.as_sql() for compiler in compilers) combinator_sql = self.connection.ops.set_operators[combinator] if all and combinator == 'union': combinator_sql += ' ALL' braces = '({})' if features.supports_slicing_ordering_in_compound else '{}' sql_parts, args_parts = zip(*((braces.format(sql), args) for sql, args in parts)) result = [' {} '.format(combinator_sql).join(sql_parts)] params = [] for part in args_parts: params.extend(part) return result, params
def prefix_add_tags(request, prefix_id): """Adds usages to a prefix from post data""" prefix = Prefix.objects.get(pk=prefix_id) existing_usages = {u[0] for u in prefix.usages.values_list()} usages = set(request.POST.getlist('usages')) to_remove = list(existing_usages - usages) to_add = list(usages - existing_usages) PrefixUsage.objects.filter(prefix=prefix, usage__in=to_remove).delete() for usage_key in to_add: usage = Usage.objects.get(pk=usage_key) try: PrefixUsage(prefix=prefix, usage=usage).save() except DatabaseError: pass return HttpResponse()
def spatial_version(self): """Determine the version of the PostGIS library.""" # Trying to get the PostGIS version because the function # signatures will depend on the version used. The cost # here is a database query to determine the version, which # can be mitigated by setting `POSTGIS_VERSION` with a 3-tuple # comprising user-supplied values for the major, minor, and # subminor revision of PostGIS. if hasattr(settings, 'POSTGIS_VERSION'): version = settings.POSTGIS_VERSION else: try: vtup = self.postgis_version_tuple() except DatabaseError: raise ImproperlyConfigured( 'Cannot determine PostGIS version for database "%s". ' 'GeoDjango requires at least PostGIS version 1.3. ' 'Was the database created from a spatial database ' 'template?' % self.connection.settings_dict['NAME'] ) version = vtup[1:] return version
def test_postgres_trigger_sum_zero(self): """" Check the database enforces leg amounts summing to zero This is enforced by a postgres trigger applied in migration 0005. Note that this requires the test case extend TransactionTestCase, as the trigger is only run when changes are committed to the DB (which the normal TestCase will not do) """ account = self.account() transaction = Transaction.objects.create() with self.assertRaises(DatabaseError): Leg.objects.create(transaction=transaction, account=account, amount=100) with self.assertRaises(DatabaseError), db_transaction.atomic(): # Also ensure we distinguish between currencies Leg.objects.create(transaction=transaction, account=account, amount=Money(100, 'EUR')) Leg.objects.create(transaction=transaction, account=account, amount=Money(-100, 'GBP'))
def can_reconnect(exc): if isinstance(exc, psycopg2.InterfaceError): return True # elif isinstance(exc, psycopg2.OperationalError): # exc_msg = str(exc) # if "can't fetch default_isolation_level" in exc_msg: # return True # elif "can't set datestyle to ISO" in exc_msg: # return True # return True elif isinstance(exc, DatabaseError): exc_msg = str(exc) if 'server closed the connection unexpectedly' in exc_msg: return True elif 'client_idle_timeout' in exc_msg: return True return False