我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用django.core.cache.cache.set()。
def onduty_members(self): OnDuty = [] if 'OnDuty' in cache.keys('OnDuty'): OnDuty = cache.get('OnDuty') else: try: event_start, event_end, instance = ScheduledOccurrence.objects.filter(event__in=ScheduledEvent.objects.filter(event=0)).next_occurrence() NOW = datetime.datetime.now(datetime.timezone.utc).timestamp() if NOW >= event_start.timestamp() and NOW <= event_end.timestamp(): for user in instance.event.members_list(): OnDuty.append(user.pk) logger.debug('onduty_members found: %s' % OnDuty) #cache.set('OnDuty', OnDuty, timeout=event_end.timestamp()) cache.set('OnDuty', OnDuty, timeout=settings.ON_DUTY_CACHE_MEMBERS) else: logger.debug('onduty_members can not find onduty_members') except: logger.error('onduty_members failed finding onduty_members') pass return OnDuty
def user_dnd(self, user_pk): if 'DnD_' + str(user_pk) in cache.keys("DnD_*"): #DnD = cache.get('DnD_' + str(user_pk)) DnD = True else: DnD = False try: event_start, event_end, instance = ScheduledOccurrence.objects.filter(event__in=ScheduledEvent.objects.filter(event=1, members__in=[user_pk])).next_occurrence() NOW = datetime.datetime.now(datetime.timezone.utc).timestamp() if NOW >= event_start.timestamp() and NOW <= event_end.timestamp(): DnD = True cache.set('DnD_' + str(user_pk), DnD, timeout=event_end.timestamp()) except: pass return DnD
def test_binary_string(self): # Binary strings should be cacheable cache = self.cache from zlib import compress, decompress value = 'value_to_be_compressed' compressed_value = compress(value.encode()) # Test set cache.set('binary1', compressed_value) compressed_result = cache.get('binary1') self.assertEqual(compressed_value, compressed_result) self.assertEqual(value, decompress(compressed_result).decode()) # Test add cache.add('binary1-add', compressed_value) compressed_result = cache.get('binary1-add') self.assertEqual(compressed_value, compressed_result) self.assertEqual(value, decompress(compressed_result).decode()) # Test set_many cache.set_many({'binary1-set_many': compressed_value}) compressed_result = cache.get('binary1-set_many') self.assertEqual(compressed_value, compressed_result) self.assertEqual(value, decompress(compressed_result).decode())
def test_long_timeout(self): """ Followe memcached's convention where a timeout greater than 30 days is treated as an absolute expiration timestamp instead of a relative offset (#12399). """ cache = self.cache cache.set('key1', 'eggs', 60 * 60 * 24 * 30 + 1) # 30 days + 1 second self.assertEqual(cache.get('key1'), 'eggs') cache.add('key2', 'ham', 60 * 60 * 24 * 30 + 1) self.assertEqual(cache.get('key2'), 'ham') cache.set_many({'key3': 'sausage', 'key4': 'lobster bisque'}, 60 * 60 * 24 * 30 + 1) self.assertEqual(cache.get('key3'), 'sausage') self.assertEqual(cache.get('key4'), 'lobster bisque')
def test_forever_timeout(self): """ Passing in None into timeout results in a value that is cached forever """ cache = self.cache cache.set('key1', 'eggs', None) self.assertEqual(cache.get('key1'), 'eggs') cache.add('key2', 'ham', None) self.assertEqual(cache.get('key2'), 'ham') added = cache.add('key1', 'new eggs', None) self.assertIs(added, False) self.assertEqual(cache.get('key1'), 'eggs') cache.set_many({'key3': 'sausage', 'key4': 'lobster bisque'}, None) self.assertEqual(cache.get('key3'), 'sausage') self.assertEqual(cache.get('key4'), 'lobster bisque')
def generate_dashboard(): logger = logging.getLogger("dashboard.jobs.generate_dashboard") logger.info("Start") sheet_id = settings.GOOGLE_SPREADSHEET_ID data = sheets.load_sheet(sheet_id, settings.GOOGLE_SPREADSHEET_AUTH_FILE) for row in data: row.xtras = _add_target_date(row.xtras, row.xtras.get('_target_date')) if row.xtras.get('_jira_filter'): row.xtras = _add_current_jira_summary(row.xtras, row.xtras['_jira_filter'], logger) if row.xtras.get('jira_summary'): row.xtras = _add_week_ago_summary(row.xtras, row.xtras['jira_summary'], logger) row.xtras = _add_forecasts(row.xtras, row.xtras['jira_summary'], logger) cache.set('dashboard_data', data, None) cache.set('dashboard_data_updated', datetime.datetime.now(get_default_timezone()), None) logger.info("End") return True
def ready(self): """Do this when the app is ready.""" cache_key = 'dashboard.config.jobs_scheduled' scheduled = cache.get(cache_key, False) if not scheduled and settings.RQ_QUEUES.get('default', {}).get('ASYNC', True): logger.info("Scheduling jobs") scheduler = django_rq.get_scheduler('default') scheduler.schedule( scheduled_time=datetime.utcnow(), func='dashboard.jobs.generate_dashboard', interval=INTERVAL, result_ttl=INTERVAL + 30 ) cache.set(cache_key, True, INTERVAL) return True
def test_client_task_tester(client, clear_redis_store): url = reverse('task_tester') def fake_task(key, value, expires): cache.set(key, value, expires) _mock_function = 'tecken.views.sample_task.delay' with mock.patch(_mock_function, new=fake_task): response = client.get(url) assert response.status_code == 400 assert b'Make a POST request to this URL first' in response.content response = client.post(url) assert response.status_code == 201 assert b'Now make a GET request to this URL' in response.content response = client.get(url) assert response.status_code == 200 assert b'It works!' in response.content
def task_tester(request): if request.method == 'POST': cache.set('marco', 'ping', 100) sample_task.delay('marco', 'polo', 10) return http.HttpResponse( 'Now make a GET request to this URL\n', status=201, ) else: if not cache.get('marco'): return http.HttpResponseBadRequest( 'Make a POST request to this URL first\n' ) for i in range(3): value = cache.get('marco') if value == 'polo': return http.HttpResponse('It works!\n') time.sleep(1) return http.HttpResponseServerError( 'Tried 4 times (4 seconds) and no luck :(\n' )
def resources(self): """Returns a list of :cls:`~pootle_app.models.Directory` and :cls:`~pootle_store.models.Store` resource paths available for this :cls:`~pootle_project.models.Project` across all languages. """ cache_key = make_method_key(self, 'resources', self.code) resources = cache.get(cache_key, None) if resources is not None: return resources stores = Store.objects.live().order_by().filter( translation_project__project__pk=self.pk) dirs = Directory.objects.live().order_by().filter( pootle_path__regex=r"^/[^/]*/%s/" % self.code) resources = sorted( {to_tp_relative_path(pootle_path) for pootle_path in (set(stores.values_list("pootle_path", flat=True)) | set(dirs.values_list("pootle_path", flat=True)))}, key=get_path_sortkey) cache.set(cache_key, resources, settings.POOTLE_CACHE_TIMEOUT) return resources # # # # # # # # # # # # # # Methods # # # # # # # # # # # # # # # # # # #
def get(self, request): try: last_query_time = parse(request.query_params.get('last_query_time')) except (TypeError, ValueError): last_query_time = cache.get('isDirty_query_time', (timezone.now() - timedelta(days=7))) url = settings.POND_URL + '/pond/pond/blocks/new/?since={}&using=default'.format(last_query_time.strftime('%Y-%m-%dT%H:%M:%S')) now = timezone.now() try: response = requests.get(url) response.raise_for_status() except Exception as e: return HttpResponseServerError({'error': repr(e)}) pond_blocks = response.json() is_dirty = update_request_states_from_pond_blocks(pond_blocks) cache.set('isDirty_query_time', now, None) # also factor in if a change in requests (added, updated, cancelled) has occurred since we last checked last_update_time = max(Request.objects.latest('modified').modified, UserRequest.objects.latest('modified').modified) is_dirty |= last_update_time >= last_query_time return Response({'isDirty': is_dirty})
def __init__(self, start, end, telescopes=None, sites=None, instrument_types=None): try: self.es = Elasticsearch([settings.ELASTICSEARCH_URL]) except LocationValueError: logger.error('Could not find host. Make sure ELASTICSEARCH_URL is set.') raise ImproperlyConfigured('ELASTICSEARCH_URL') self.instrument_types = instrument_types self.available_telescopes = self._get_available_telescopes() sites = list({tk.site for tk in self.available_telescopes}) if not sites else sites telescopes = list({tk.telescope for tk in self.available_telescopes if tk.site in sites}) \ if not telescopes else telescopes self.start = start.replace(tzinfo=timezone.utc).replace(microsecond=0) self.end = end.replace(tzinfo=timezone.utc).replace(microsecond=0) cached_event_data = cache.get('tel_event_data') if cached_event_data: self.event_data = cached_event_data else: self.event_data = self._get_es_data(sites, telescopes) cache.set('tel_event_data', self.event_data, 1800)
def set_mcqs_in_cache(): """ Set MCQs in cache if they have changed or have not been set. """ languages = { 'C': 'c_mcqs', 'J': 'java_mcqs', } # If MCQs have been changed or have not been created if not cache.get('mcqs_flag', False): for lang_code, cache_key in languages.items(): mcqs_json = extract_mcqs(lang_code) cache.set(cache_key, mcqs_json) # Mark MCQs as unchanged cache.set('mcqs_flag', True)
def get_token(self): """Get wechat access token.Store in cache""" access_token = cache.get('wx_access_token') if access_token: return access_token else: param = { 'grant_type': 'client_credential', 'appid': self.appid, 'secret': self.appsecret, } url = self.get_url('token', param) data = self.get_data(url) cache.set('wx_access_token', data['access_token'],\ int(data['expires_in'])) return data['access_token']
def search_endorsers(request): query = request.GET.get('q') endorsers = [] endorser_pks = set() if query: # First find the endorsers whose names start with this query. results = Endorser.objects.filter(name__istartswith=query) for endorser in results[:5]: endorser_pks.add(endorser.pk) endorsers.append(endorser) if results.count() < 5: results = Endorser.objects.filter(name__icontains=query) for endorser in results: if endorser.pk in endorser_pks: continue endorsers.append(endorser) if len(endorsers) == 5: break return JsonResponse({ 'endorsers': [{'pk': e.pk, 'name': e.name} for e in endorsers], })
def render(self, region, context, timeout=None): """render(self, region, context, *, timeout=None) Render a single region using the context passed If ``timeout`` is ``None`` caching is disabled. .. note:: You should treat anything except for the ``region`` and ``context`` argument as keyword-only. """ if timeout is not None: key = self.cache_key(region) html = cache.get(key) if html is not None: return html html = mark_safe(''.join( self._renderer.render_plugin_in_context(plugin, context) for plugin in self._contents[region] )) if timeout is not None: cache.set(key, html, timeout=timeout) return html
def general_image(self, image_format='PNG'): fm_width = self.cleaned_data['width'] fm_height = self.cleaned_data['height'] key = '{}.{}.{}'.format(fm_width, fm_height, image_format) content = cache.get(key) if content is None: image = Image.new('RGB', (fm_width, fm_height), color=122) draw = ImageDraw.Draw(image) text = '{}x{}'.format(fm_width, fm_height) text_width, text_height = draw.textsize(text) if text_width < fm_width and text_height < fm_height: text_top = (fm_height - text_height) // 2 text_left = (fm_width - text_width) // 2 draw.text((text_top, text_left), text, fill=(255, 255, 255)) content = BytesIO() image.save(content, image_format) content.seek(0) cache.set(key, content, 60 * 60) return content
def test_custom_key_function(self): for key in ["foo-aa", "foo-ab", "foo-bb", "foo-bc"]: self.cache.set(key, "foo") res = self.cache.delete_pattern("*foo-a*") self.assertTrue(bool(res)) keys = self.cache.keys("foo*") self.assertEqual(set(keys), set(["foo-bb", "foo-bc"])) # ensure our custom function was actually called try: self.assertEqual(set(k.decode('utf-8') for k in self.cache.raw_client.keys('*')), set(['#1#foo-bc', '#1#foo-bb'])) except (NotImplementedError, AttributeError): # not all clients support .keys() pass
def test_setnx(self): # we should ensure there is no test_key_nx in redis self.cache.delete("test_key_nx") res = self.cache.get("test_key_nx", None) self.assertEqual(res, None) res = self.cache.set("test_key_nx", 1, nx=True) self.assertTrue(res) # test that second set will have res = self.cache.set("test_key_nx", 2, nx=True) self.assertFalse(res) res = self.cache.get("test_key_nx") self.assertEqual(res, 1) self.cache.delete("test_key_nx") res = self.cache.get("test_key_nx", None) self.assertEqual(res, None)
def test_setnx_timeout(self): # test that timeout still works for nx=True res = self.cache.set("test_key_nx", 1, timeout=2, nx=True) self.assertTrue(res) time.sleep(3) res = self.cache.get("test_key_nx", None) self.assertEqual(res, None) # test that timeout will not affect key, if it was there self.cache.set("test_key_nx", 1) res = self.cache.set("test_key_nx", 2, timeout=2, nx=True) self.assertFalse(res) time.sleep(3) res = self.cache.get("test_key_nx", None) self.assertEqual(res, 1) self.cache.delete("test_key_nx") res = self.cache.get("test_key_nx", None) self.assertEqual(res, None)
def test_save_dict(self): if isinstance(self.cache.client._serializer, json_serializer.JSONSerializer): self.skipTest("Datetimes are not JSON serializable") if isinstance(self.cache.client._serializer, msgpack_serializer.MSGPackSerializer): # MSGPackSerializer serializers use the isoformat for datetimes # https://github.com/msgpack/msgpack-python/issues/12 now_dt = datetime.datetime.now().isoformat() else: now_dt = datetime.datetime.now() test_dict = {"id": 1, "date": now_dt, "name": "Foo"} self.cache.set("test_key", test_dict) res = self.cache.get("test_key") self.assertIsInstance(res, dict) self.assertEqual(res["id"], 1) self.assertEqual(res["name"], "Foo") self.assertEqual(res["date"], now_dt)
def test_timeout_parameter_as_positional_argument(self): self.cache.set("test_key", 222, -1) res = self.cache.get("test_key", None) self.assertIsNone(res) self.cache.set("test_key", 222, 1) res1 = self.cache.get("test_key", None) time.sleep(2) res2 = self.cache.get("test_key", None) self.assertEqual(res1, 222) self.assertEqual(res2, None) # nx=True should not overwrite expire of key already in db self.cache.set("test_key", 222, 0) self.cache.set("test_key", 222, -1, nx=True) res = self.cache.get("test_key", None) self.assertEqual(res, 222)
def test_iter_keys(self): cache = caches["default"] _params = cache._params _is_shard = (_params["OPTIONS"]["CLIENT_CLASS"] == "django_redis.client.ShardClient") if _is_shard: return cache.set("foo1", 1) cache.set("foo2", 1) cache.set("foo3", 1) # Test simple result result = set(cache.iter_keys("foo*")) self.assertEqual(result, set(["foo1", "foo2", "foo3"])) # Test limited result result = list(cache.iter_keys("foo*", itersize=2)) self.assertEqual(len(result), 3) # Test generator object result = cache.iter_keys("foo*") self.assertNotEqual(next(result), None)
def test_sentinel_switching(self): if not isinstance(self.cache.client, SentinelClient): self.skipTest("Not Sentinel clients use default master-slave setup") try: cache = caches["sample"] client = cache.client master = client.get_client(write=True) slave = client.get_client(write=False) master.set("Foo", "Bar") self.assertEqual(slave.get("Foo"), "Bar") self.assertEqual(master.info()['role'], "master") self.assertEqual(slave.info()['role'], "slave") except NotImplementedError: pass
def query_user_by_id(user_id=0, use_cache=True): """ ???????? :param user_id: ???ID :param use_cache: ?????? """ key = CACHE_KEY+str(user_id) if use_cache: account = cache.get(key) if account: return account try: account = UserAccount.objects.get(id=user_id) cache.set(key, account, CACHE_TIME) return account except UserAccount.DoesNotExist: return None
def query_token_by_user_id(user_id, use_cache=True): """ ?????ID?????Token :param user_id: ??ID :param use_cache: ?????? """ key = CACHE_TOKEN_ID+str(user_id) if use_cache: token = cache.get(key) if token: return token try: token = AccessToken.objects.order_by("-id").filter(status=1).get(user_id=user_id) cache.set(key, token, CACHE_TIME) return token except AccessToken.DoesNotExist: return None
def query_token(token, use_cache=True): """ ????token?????? """ key = CACHE_TOKEN+token if use_cache: token_result = cache.get(key) if token_result: return token_result try: token = AccessToken.objects.order_by("-id").filter(status=1).get(access_token=token) cache.set(key, token, CACHE_TIME) return token except AccessToken.DoesNotExist: return None
def query_user_meta_count(user_id, is_follow=True, use_cache=True): """ ??????????????? :param user_id: ???????ID :param is_follow: ???True??????????????? :param use_cache: ?????? """ cache_key = CACHE_FANS_COUNT_KEY + str(user_id) if is_follow: cache_key = CACHE_FOLLOW_COUNT_KEY + str(user_id) if use_cache: count = cache.get(cache_key) if count: return count if is_follow: count = UserFollow.objects.filter(user_id=user_id, status=1).aggregate(Count("id")) else: count = UserFollow.objects.filter(follow_user=user_id, status=1).aggregate(Count("id")) count = count["id__count"] cache.set(cache_key, count, CACHE_COUNT_TIME) return count
def query_format_info_by_user_id(user_id, use_cache=True): """ ????ID?????? :param user_id: ??ID :param use_cache: ?????? """ key = CACHE_KEY + str(user_id) if use_cache: result = cache.get(key) if result: return result try: user_info = UserInfo.objects.get(user_id=user_id) format_user_info = UserInfo.format_user_info(user_info) cache.set(key, format_user_info, CACHE_TIME) return format_user_info except UserInfo.DoesNotExist: return None
def query_published_article_count(user_id, use_cache=True): """ ?????????????? :param user_id: ??ID :param use_cache: ?????? """ cache_key = CACHE_ARTICLE_COUNT + str(user_id) if use_cache: count = cache.get(cache_key) if count: return count count = BlogArticle.objects.filter(user_id=user_id, status=1).aggregate(Count("id")) count = count["id__count"] if count: cache.set(cache_key, count, CACHE_TIME) return count
def query_article_by_id(article_id=0, use_cache=True): """ ????ID??????? :param article_id: ??ID :param use_cache: ?????? """ key = CACHE_KEY_ID + str(article_id) if use_cache: cache_result = cache.get(key) if cache_result: return cache_result try: article = BlogArticle.objects.get(id=article_id) article = BlogArticle.format_article(article) cache.set(key, article, CACHE_TIME) return article except BlogArticle.DoesNotExist: return None
def query_top_list(order="hit", use_cache=True): """????""" key = CACHE_TOP_LIST_KEY+order if use_cache: top_list = cache.get(key) if top_list: return top_list try: # ????????10??? top_list = BlogArticleMeta.objects.filter(status=1).order_by("-"+order)[:10] cache.set(key, top_list, CACHE_TOP_LIST_TIME) return top_list except BlogArticleMeta.DoesNotExist: return []
def commits_over_52(self): cache_name = self.cache_namer(self.commits_over_52) value = cache.get(cache_name) if value is not None: return value now = datetime.now() commits = self.commit_set.filter( commit_date__gt=now - timedelta(weeks=52), ).values_list('commit_date', flat=True) weeks = [0] * 52 for cdate in commits: age_weeks = (now - cdate).days // 7 if age_weeks < 52: weeks[age_weeks] += 1 value = ','.join(map(str, reversed(weeks))) cache.set(cache_name, value) return value
def prices_get(exchange, pair, num_prices=None, price_ratio=1, cached=True): filename = get_filename(exchange, pair) if cached == True: from django.core.cache import cache text = cache.get(filename) else: text = None if text == None: try: text = get_s3_text(filename) if cached == True: cache.set(filename, text, MAX_CACHE_TIME) except Exception as e: print str(e) return None result = json.loads(text) result = result[0::price_ratio] if num_prices != None: return result[:num_prices] else: return result
def setUp(self): # Run mocked testrpc for reorgs print('Starting httpd...') self.p = Process(target=start_mock_server) self.p.start() cache.set('block_number', '0x0') sleep(1) print('served') self.rpc = RPCProvider( host='127.0.0.1', port='8545', ssl=0 ) web3_service = Web3Service(self.rpc) self.web3 = web3_service.web3 # Mock web3 self.daemon = DaemonFactory()
def test_reorg_ok(self): # Last block hash haven't changed block_hash_0 = '{:040d}'.format(0) cache.set('0x0', block_hash_0) cache.set('block_number', '0x1') Block.objects.create(block_hash=block_hash_0, block_number=0, timestamp=0) Daemon.objects.all().update(block_number=0) (had_reorg, _) = check_reorg() self.assertFalse(had_reorg) block_hash_1 = '{:040d}'.format(1) cache.set('0x1', block_hash_1) cache.set('block_number', '0x2') Block.objects.create(block_hash=block_hash_1, block_number=1, timestamp=0) Daemon.objects.all().update(block_number=1) (had_reorg, _) = check_reorg() self.assertFalse(had_reorg)
def test_reorg_happened(self): # Last block hash haven't changed block_hash_0 = '{:040d}'.format(0) cache.set('0x0', block_hash_0) cache.set('block_number', '0x1') Block.objects.create(block_hash=block_hash_0, block_number=0, timestamp=0) Daemon.objects.all().update(block_number=0) (had_reorg, _) = check_reorg() self.assertFalse(had_reorg) # Last block hash changed block_hash_1 = '{:040d}'.format(1) cache.set('0x1', block_hash_1) cache.set('block_number', '0x2') block_hash_reorg = '{:040d}'.format(1313) Block.objects.create(block_hash=block_hash_reorg, block_number=1, timestamp=0) Daemon.objects.all().update(block_number=1) (had_reorg, block_number) = check_reorg() self.assertTrue(had_reorg) self.assertEqual(block_number, 0) Block.objects.filter(block_number=1).update(block_hash=block_hash_1, timestamp=0) (had_reorg, _) = check_reorg() self.assertFalse(had_reorg)
def test_reorg_mined_multiple_blocks_ok(self): # Last block hash haven't changed block_hash_0 = '{:040d}'.format(0) cache.set('0x0', block_hash_0) cache.set('block_number', '0x1') Block.objects.create(block_hash=block_hash_0, block_number=0, timestamp=0) Daemon.objects.all().update(block_number=0) (had_reorg, _) = check_reorg() self.assertFalse(had_reorg) # new block number changed more than one unit block_hash_1 = '{:040d}'.format(1) cache.set('0x1', block_hash_1) # set_mocked_testrpc_block_hash cache.set('block_number', '0x9') Block.objects.create(block_hash=block_hash_1, block_number=1, timestamp=0) Daemon.objects.all().update(block_number=1) (had_reorg, _) = check_reorg() self.assertFalse(had_reorg)
def ajax_tag_autocomplete(request): """Offers a list of existing tags that match the specified query""" if 'q' in request.GET: q = request.GET['q'] key = 'ajax_tag_auto_%s' % q response = cache.get(key) if response is not None: return response tags = list(Tag.objects.filter(name__istartswith=q)[:10]) response = HttpResponse(u'\n'.join(tag.name for tag in tags)) cache.set(key, response, 300) return response return HttpResponse()
def cached(func): """ Used to decorate .get_queryset method on search viewsets. Caches a queryset. """ @wraps(func) def _impl(self, *args, **kwargs): key = self.get_cache_key() ttl = 120 result = None if key: result = cache.get(key) if result: return result result = func(self, *args, **kwargs) cache.set(key, result, ttl) return result return _impl
def about(request): """Information about the current site, its goals, and what content is loaded""" # Provider counts providers = cache.get_or_set(CACHE_STATS_NAME, [], CACHE_STATS_DURATION) if not providers: for provider in sorted(settings.PROVIDERS.keys()): s = Search() q = Q('term', provider=provider) s = s.query(q) response = s.execute() if response.hits.total > 0: data = settings.PROVIDERS[provider] total = intcomma(response.hits.total) data.update({'hits': total}) providers.append(data) # All results s = Search() response = s.execute() total = intcomma(response.hits.total) providers.append({'display_name': 'Total', 'hits': total}) cache.set(CACHE_STATS_NAME, providers) return render(request, "about.html", {'providers': providers})
def get_sf_session(force=False): if force: session_info = None else: session_info = cache.get(SFDC_SESSION_CACHE_KEY) if session_info is None: statsd.incr('news.backends.sfdc.session_refresh') session_id, sf_instance = sfapi.SalesforceLogin(**settings.SFDC_SETTINGS) session_info = { 'id': session_id, 'instance': sf_instance, 'expires': time() + settings.SFDC_SESSION_TIMEOUT, } cache.set(SFDC_SESSION_CACHE_KEY, session_info, settings.SFDC_SESSION_TIMEOUT) return session_info
def refresh_auth_tokens_from_cache(self): """Refresh the auth token and other values from cache""" if self.authToken is not None and time() + MAX_BUFFER < self.authTokenExpiration: # no need to refresh if the current tokens are still good return tokens = cache.get(self.token_cache_key) if tokens: if not isinstance(tokens, dict): # something wrong was cached cache.delete(self.token_cache_key) return for prop, value in tokens.items(): if prop in self.token_property_names: setattr(self, prop, value) # set the value so we can detect if it changed later self._old_authToken = self.authToken self.build_soap_client()
def _cache_kolibri_studio_channel_request(self, identifier=None): cache_key = get_channel_lookup_url(identifier=identifier) # cache channel lookup values if cache.get(cache_key): return Response(cache.get(cache_key)) resp = requests.get(cache_key) # always check response code of request and set cache if resp.status_code == 404: raise Http404( _("The requested channel does not exist on the content server") ) kolibri_mapped_response = [] for channel in resp.json(): kolibri_mapped_response.append(self._studio_response_to_kolibri_response(channel)) cache.set(cache_key, kolibri_mapped_response, 5) return Response(kolibri_mapped_response)
def get_authentication_url(self): oauth = OAuth1Session( self.client_id, client_secret=self.client_secret, ) token = oauth.fetch_request_token(self.request_token_url) cache.set( 'oa-token-%s' % token['oauth_token'], token, timeout=3600) self._request.session['oa_token'] = token['oauth_token'] authorization_url = oauth.authorization_url( self.authorization_base_url, ) return authorization_url
def timetable(self): duration = timedelta(seconds=0) users_by_entry_id = {} users_by_id = {} for user in self.users_with_constraints: users_by_id[user.id] = user entries = list() participations = (Participation.objects .filter(entry__meeting=self) .select_related('user') .order_by('user__username') ) for participation in participations: users_by_entry_id.setdefault(participation.entry_id, set()).add((users_by_id.get(participation.user_id), participation.ignored_for_optimization)) for entry in self.timetable_entries.filter(timetable_index__isnull=False).select_related('submission').order_by('timetable_index'): entry.users = users_by_entry_id.get(entry.id, set()) entry.has_ignored_participations = any(ignored for user, ignored in entry.users) entry.start = self.start + duration duration += entry.duration entry.end = self.start + duration entries.append(entry) return tuple(entries), set(users_by_id.values())
def _login_cookies(): login_cookies = cache.get('lichess_login_cookies') if login_cookies is None: # Read the credentials with open(settings.LICHESS_CREDS_FILE_PATH) as creds_file: lines = creds_file.readlines() creds = {'username': lines[0].strip(), 'password': lines[1].strip()} # Send a login request login_response = requests.post(settings.LICHESS_DOMAIN + 'login', data=creds, headers=_headers) if login_response.status_code != 200: logger.error('Received status %s when trying to log in to lichess' % login_response.status_code) return None # Save the cookies login_cookies = dict(login_response.cookies) cache.set('lichess_login_cookies', login_cookies, 60) # Cache cookies for 1 minute return login_cookies # Sends a mail on lichess
def get_token(self, token_only=True, scopes=None): if scopes is None: scopes = ['send_notification', 'view_room'] cache_key = 'hipchat-tokens:%s:%s' % (self.id, ','.join(scopes)) def gen_token(): data = { 'grant_type': 'client_credentials', 'scope': ' '.join(scopes), } resp = requests.post( self.token_url, data=data, auth=HTTPBasicAuth(self.id, self.secret), timeout=10 ) if resp.status_code == 200: return resp.json() elif resp.status_code == 401: raise OauthClientInvalidError(self) else: raise Exception('Invalid token: %s' % resp.text) if token_only: token = cache.get(cache_key) if not token: data = gen_token() token = data['access_token'] cache.set(cache_key, token, data['expires_in'] - 20) return token return gen_token()
def process_request(self, request, extra_context=None): """Main page processing logic""" context = self.get_rendering_context(request) if extra_context: context.update(extra_context) cache_key, seconds = self.get_cache_settings(request) if cache_key: content = cache.get(cache_key) if content is None: content = self.render(context) cache.set(cache_key, content, seconds) else: content = self.render(context) return self.create_response(request, content)