我们从Python开源项目中,提取了以下48个代码示例,用于说明如何使用django.core.cache.cache()。
def test_django_cache(self): try: from django.conf import settings settings.configure(CACHE_BACKEND = 'locmem://') from django.core.cache import cache except ImportError: # no Django, so nothing to test return congress = Congress(API_KEY, cache) self.assertEqual(congress.http.cache, cache) self.assertEqual(congress.members.http.cache, cache) self.assertEqual(congress.bills.http.cache, cache) self.assertEqual(congress.votes.http.cache, cache) try: bills = congress.bills.introduced('house') except Exception as e: self.fail(e)
def release(self): """ Release the lock. """ # If we went over the lock duration (timeout), we need to exit to avoid # accidentally releasing a lock that was acquired by another process. if not self.held: logger.warning('Tried to release unheld lock: %r', self) return False try: # XXX: There is a possible race condition here -- this could be # actually past the timeout due to clock skew or the delete # operation could reach the server after the timeout for a variety # of reasons. The only real fix for this would be to use a check # and delete operation, but that is backend dependent and not # supported by the cache API. self.cache.delete(self.lock_key) except Exception as e: logger.exception(e) finally: self.__acquired_at = None return True
def handle(self, sender, signal, **kwargs): # only add things to the activity stream if there's a user if kwargs.get("user", None) is None: return target = self.getTargetObj(**kwargs) # invalidate cache for target followers users = followers(target) for user in users: update_activity_stream_for_user.delay(user.username) update_activity_stream_for_user.delay(kwargs['user'].username, actor=True) if signal == signals.create: action.send(kwargs['user'], verb=self.createVerb, action_object=kwargs['instance'], target=target) if signal == signals.modify: action.send(kwargs['user'], verb=self.modifyVerb, action_object=kwargs['instance'], target=target)
def test_page_view_ok_with_cache(self): page = Page.objects.create( url='/test/', template='<h1>Hello world!</h1>', page_processor_config={'cache': 15} # cache for 15 seconds ) cache_key = cachekeys.rendered_source_for_lang(page.pk, 'en') self.assertNotIn(cache_key, cache) response = self.client.get('/test/') self.assertEqual(response.status_code, 200) self.assertContains(response, '<h1>Hello world!</h1>') self.assertIn(cache_key, cache) # Not Found Processor:
def get_cached_choices(self): if not self.cache_config['enabled']: return None c = caches(self.cache_config['cache']) return c.get(self.cache_config['key']%self.field_path)
def set_cached_choices(self,choices): if not self.cache_config['enabled']: return c = caches(self.cache_config['cache']) return c.set(self.cache_config['key']%self.field_path,choices)
def apply(self, envelope): for path in settings.TRANSACTIONAL.get('EXEC_QUEUE_POLICIES'): if os.path.exists(path): with open(path) as module: ephemeral_context = {} allowed_context = {'__builtins__': { 'settings': settings, 'cache': cache, 'logger': logger, 'print': print }} for mod in settings.TRANSACTIONAL.get('EXEC_QUEUE_POLICIES_CONTEXT_BUILTINS'): allowed_context['__builtins__'][mod] = __import__(mod) try: exec(module.read(), allowed_context, ephemeral_context) ephemeral_context.get('apply')(envelope) except Exception as err: logger.warning( '[{}] Failed to execute "{}" ephemeral ' 'policy: {}'.format( envelope.headers.get( settings.TRANSACTIONAL.get( 'X_MESSAGE_ID_HEADER', 'NO-MESSAGE-ID')), path, err)) else: logger.warning( "[{}] Following ephemeral policy doesn't " "exists: {}".format( envelope.headers.get(settings.TRANSACTIONAL.get( 'X_MESSAGE_ID_HEADER', 'NO-MESSAGE-ID')), path))
def key(cache, attr): return 'cache.%s.%s' % (cache.__module__.split('.')[-1], attr)
def __init__(self, cache): self.cache = cache
def __getattribute__(self, attr): if attr == 'cache': return BaseCache.__getattribute__(self, attr) return wrap(getattr(self.cache, attr), key(self.cache, attr))
def patch(): cache.cache = XRayTracker(cache.cache)
def __init__(self): super(SignatureValidator, self).__init__() self.endpoint = SignatureOnlyEndpoint(self) self.lti_consumer = None self.cache = cache # The OAuth signature uses the endpoint URL as part of the request to be # hashed. By default, the oauthlib library rejects any URLs that do not # use HTTPS. We turn this behavior off in order to allow edX to run without # SSL in development mode. When the platform is deployed and running with # SSL enabled, the URL passed to the signature verifier must start with # 'https', otherwise the message signature would not match the one generated # on the platform.
def validate_timestamp_and_nonce(self, client_key, timestamp, nonce, request): """ Verify that the request is not too old (according to the timestamp), and that the nonce value is unique. Nonce value should not have been used already within the period of time in which the timestamp marks a request as valid. This method signature is required by the oauthlib library. :return: True if the OAuth nonce and timestamp are valid, False if they are not. """ msg = "LTI request's {} is not valid." log.debug('Timestamp validating is started.') ts = int(timestamp) ts_key = '{}_ts'.format(client_key) cache_ts = self.cache.get(ts_key, ts) if cache_ts > ts: log.debug(msg.format('timestamp')) return False # NOTE(idegtiarov) cache data with timestamp and nonce lives for 10 seconds self.cache.set(ts_key, ts, 10) log.debug('Timestamp is valid.') log.debug('Nonce validating is started.') if self.cache.get(nonce): log.debug(msg.format('nonce')) return False self.cache.set(nonce, 1, 10) log.debug('Nonce is valid.') return True
def enable(self): # Just patch default cache to always find nothing # since drf-extensions stores the instance of the cache everywhere, # which makes it impossible to replace with DummyCache def mocked_cache_get(key, default=None, version=None): return default self.patch = patch.object(cache_module.cache, 'get', mocked_cache_get) self.patch.start()
def __init__(self, lock_key, timeout=3, cache=None, nowait=False): if cache is None: self.cache = default_cache else: self.cache = cache self.timeout = timeout self.lock_key = lock_key self.nowait = nowait self.__acquired_at = None
def acquire(self): """ Attempt to acquire the lock, returning a boolean that represents if the lock is held. """ # NOTE: This isn't API compatible with the standard Python # ``Lock.acquire`` method signature. It may make sense to make these # compatible in the future, but that would also require changes to the # the constructor: https://docs.python.org/2/library/threading.html#lock-objects time_remaining = self.seconds_remaining if time_remaining: raise LockAlreadyHeld('Tried to acquire lock that is already held, %.3fs remaining: %r' % (time_remaining, self)) self.__acquired_at = None delay = 0.01 + random.random() / 10 for i in xrange(int(self.timeout // delay)): if i != 0: sleep(delay) attempt_started_at = time() if self.cache.add(self.lock_key, '', self.timeout): self.__acquired_at = attempt_started_at break if self.nowait: break return self.__acquired_at is not None