我们从Python开源项目中,提取了以下7个代码示例,用于说明如何使用django.core.cache.get_cache()。
def get_cache(cache_name): if hasattr(caches, '__call__'): return caches(cache_name) return caches[cache_name]
def get_cache(k): from django.core.cache import caches return caches[k]
def __init__(self, session_key=None): self._cache = get_cache(settings.SESSION_CACHE_ALIAS) super(SessionStore, self).__init__(session_key)
def test_default_cache(self): self.session.save() self.assertNotEqual(get_cache('default').get(self.session.cache_key), None)
def test_non_default_cache(self): # Re-initalize the session backend to make use of overridden settings. self.session = self.backend() self.session.save() self.assertEqual(get_cache('default').get(self.session.cache_key), None) self.assertNotEqual(get_cache('sessions').get(self.session.cache_key), None)
def get_redis_connection(config, use_strict_redis=False): """ Returns a redis connection from a connection config """ redis_cls = redis.StrictRedis if use_strict_redis else redis.Redis if 'URL' in config: return redis_cls.from_url(config['URL'], db=config.get('DB')) if 'USE_REDIS_CACHE' in config.keys(): try: from django.core.cache import caches cache = caches[config['USE_REDIS_CACHE']] except ImportError: from django.core.cache import get_cache cache = get_cache(config['USE_REDIS_CACHE']) if hasattr(cache, 'client'): # We're using django-redis. The cache's `client` attribute # is a pluggable backend that return its Redis connection as # its `client` try: # To get Redis connection on django-redis >= 3.4.0 # we need to use cache.client.get_client() instead of # cache.client.client used in older versions try: return cache.client.get_client() except AttributeError: return cache.client.client except NotImplementedError: pass else: # We're using django-redis-cache try: return cache._client except AttributeError: # For django-redis-cache > 0.13.1 return cache.get_master_client() if 'UNIX_SOCKET_PATH' in config: return redis_cls(unix_socket_path=config['UNIX_SOCKET_PATH'], db=config['DB']) return redis_cls(host=config['HOST'], port=config['PORT'], db=config['DB'], password=config.get('PASSWORD', None))
def create_test_db(self, verbosity=1, autoclobber=False): """ Creates a test database, prompting the user for confirmation if the database already exists. Returns the name of the test database created. This method is overloaded to load up the SpatiaLite initialization SQL prior to calling the `syncdb` command. """ # Don't import django.core.management if it isn't needed. from django.core.management import call_command test_database_name = self._get_test_db_name() if verbosity >= 1: test_db_repr = '' if verbosity >= 2: test_db_repr = " ('%s')" % test_database_name print("Creating test database for alias '%s'%s..." % (self.connection.alias, test_db_repr)) self._create_test_db(verbosity, autoclobber) self.connection.close() self.connection.settings_dict["NAME"] = test_database_name # Need to load the SpatiaLite initialization SQL before running `syncdb`. self.load_spatialite_sql() # Report syncdb messages at one level lower than that requested. # This ensures we don't get flooded with messages during testing # (unless you really ask to be flooded) call_command('syncdb', verbosity=max(verbosity - 1, 0), interactive=False, database=self.connection.alias, load_initial_data=False) # We need to then do a flush to ensure that any data installed by # custom SQL has been removed. The only test data should come from # test fixtures, or autogenerated from post_syncdb triggers. # This has the side effect of loading initial data (which was # intentionally skipped in the syncdb). call_command('flush', verbosity=max(verbosity - 1, 0), interactive=False, database=self.connection.alias) from django.core.cache import get_cache from django.core.cache.backends.db import BaseDatabaseCache for cache_alias in settings.CACHES: cache = get_cache(cache_alias) if isinstance(cache, BaseDatabaseCache): call_command('createcachetable', cache._table, database=self.connection.alias) # Get a cursor (even though we don't need one yet). This has # the side effect of initializing the test database. cursor = self.connection.cursor() return test_database_name