我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用django.db.router.db_for_read()。
def has_key(self, key, version=None): key = self.make_key(key, version=version) self.validate_key(key) db = router.db_for_read(self.cache_model_class) connection = connections[db] table = connection.ops.quote_name(self._table) if settings.USE_TZ: now = datetime.utcnow() else: now = datetime.now() now = now.replace(microsecond=0) with connection.cursor() as cursor: cursor.execute("SELECT cache_key FROM %s " "WHERE cache_key = %%s and expires > %%s" % table, [key, connection.ops.adapt_datetimefield_value(now)]) return cursor.fetchone() is not None
def validate(self, value, model_instance): if self.remote_field.parent_link: return super(ForeignKey, self).validate(value, model_instance) if value is None: return using = router.db_for_read(model_instance.__class__, instance=model_instance) qs = self.remote_field.model._default_manager.using(using).filter( **{self.remote_field.field_name: value} ) qs = qs.complex_filter(self.get_limit_choices_to()) if not qs.exists(): raise exceptions.ValidationError( self.error_messages['invalid'], code='invalid', params={ 'model': self.remote_field.model._meta.verbose_name, 'pk': value, 'field': self.remote_field.field_name, 'value': value, }, # 'pk' is included for backwards compatibility )
def has_key(self, key, version=None): key = self.make_key(key, version=version) self.validate_key(key) db = router.db_for_read(self.cache_model_class) table = connections[db].ops.quote_name(self._table) if settings.USE_TZ: now = datetime.utcnow() else: now = datetime.now() now = now.replace(microsecond=0) with connections[db].cursor() as cursor: cursor.execute("SELECT cache_key FROM %s " "WHERE cache_key = %%s and expires > %%s" % table, [key, connections[db].ops.value_to_db_datetime(now)]) return cursor.fetchone() is not None
def validate(self, value, model_instance): if self.remote_field.parent_link: return super(models.ForeignKey, self).validate(value, model_instance) if value is None: return using = router.db_for_read(self.rel.to, instance=model_instance) qs = self.remote_field.model._default_manager.using(using).filter( **{self.remote_field.field_name: value} ) qs = qs.complex_filter(self.get_limit_choices_to()) if not qs.exists(): raise exceptions.ValidationError( self.error_messages['invalid'], code='invalid', params={ 'model': self.remote_field.model._meta.verbose_name, 'pk': value, 'field': self.remote_field.field_name, 'value': value, }, # 'pk' is included for backwards compatibility )
def validate(self, value, model_instance): if self.remote_field.parent_link: return super(ForeignKey, self).validate(value, model_instance) if value is None: return using = router.db_for_read(self.remote_field.model, instance=model_instance) qs = self.remote_field.model._default_manager.using(using).filter( **{self.remote_field.field_name: value} ) qs = qs.complex_filter(self.get_limit_choices_to()) if not qs.exists(): raise exceptions.ValidationError( self.error_messages['invalid'], code='invalid', params={ 'model': self.remote_field.model._meta.verbose_name, 'pk': value, 'field': self.remote_field.field_name, 'value': value, }, # 'pk' is included for backwards compatibility )
def ready(self): for db in settings.DATABASES: name = db['NAME'] db_wrapper_class = import_string(db['ENGINE'] + '.base.DatabaseWrapper') base_model = getattr(db_wrapper_class, 'base_model', None) if base_model: models = apps.get_models() for model in models: if name == router.db_for_read(model): for k, v in base_model.__dict__: if k == 'objects': model_manager = getattr(model, 'objects', None) if model_manager: manager_cls = model_manager.__class__ custom_cls = v.__class__ new_manager = type('AnyBackendCustomManager', (custom_cls, manager_cls), {}) setattr(model, 'objects', new_manager()) else: setattr(model, 'objects', v) elif not k.startswith('__'): setattr(model, k ,v)
def has_key(self, key, version=None): key = self.make_key(key, version=version) self.validate_key(key) db = router.db_for_read(self.cache_model_class) table = connections[db].ops.quote_name(self._table) cursor = connections[db].cursor() if settings.USE_TZ: now = datetime.utcnow() else: now = datetime.now() now = now.replace(microsecond=0) cursor.execute("SELECT cache_key FROM %s " "WHERE cache_key = %%s and expires > %%s" % table, [key, connections[db].ops.value_to_db_datetime(now)]) return cursor.fetchone() is not None
def validate(self, value, model_instance): if self.rel.parent_link: return super(ForeignKey, self).validate(value, model_instance) if value is None: return using = router.db_for_read(model_instance.__class__, instance=model_instance) qs = self.rel.to._default_manager.using(using).filter( **{self.rel.field_name: value} ) qs = qs.complex_filter(self.get_limit_choices_to()) if not qs.exists(): raise exceptions.ValidationError( self.error_messages['invalid'], code='invalid', params={ 'model': self.rel.to._meta.verbose_name, 'pk': value, 'field': self.rel.field_name, 'value': value, }, # 'pk' is included for backwards compatibility )
def get(self, key, default=None, version=None): key = self.make_key(key, version=version) self.validate_key(key) db = router.db_for_read(self.cache_model_class) connection = connections[db] table = connection.ops.quote_name(self._table) with connection.cursor() as cursor: cursor.execute("SELECT cache_key, value, expires FROM %s " "WHERE cache_key = %%s" % table, [key]) row = cursor.fetchone() if row is None: return default expires = row[2] expression = models.Expression(output_field=models.DateTimeField()) for converter in (connection.ops.get_db_converters(expression) + expression.get_db_converters(connection)): expires = converter(expires, expression, connection, {}) if expires < timezone.now(): db = router.db_for_write(self.cache_model_class) connection = connections[db] with connection.cursor() as cursor: cursor.execute("DELETE FROM %s " "WHERE cache_key = %%s" % table, [key]) return default value = connection.ops.process_clob(row[1]) return pickle.loads(base64.b64decode(force_bytes(value)))
def db(self): return self._db or router.db_for_read(self.model, **self._hints) ####################### # PROXIES TO QUERYSET # #######################
def get(self, key, default=None, version=None): key = self.make_key(key, version=version) self.validate_key(key) db = router.db_for_read(self.cache_model_class) table = connections[db].ops.quote_name(self._table) with connections[db].cursor() as cursor: cursor.execute("SELECT cache_key, value, expires FROM %s " "WHERE cache_key = %%s" % table, [key]) row = cursor.fetchone() if row is None: return default now = timezone.now() expires = row[2] if connections[db].features.needs_datetime_string_cast and not isinstance(expires, datetime): # Note: typecasting is needed by some 3rd party database backends. # All core backends work without typecasting, so be careful about # changes here - test suite will NOT pick regressions here. expires = typecast_timestamp(str(expires)) if expires < now: db = router.db_for_write(self.cache_model_class) with connections[db].cursor() as cursor: cursor.execute("DELETE FROM %s " "WHERE cache_key = %%s" % table, [key]) return default value = connections[db].ops.process_clob(row[1]) return pickle.loads(base64.b64decode(force_bytes(value)))