我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用django.core.cache.cache.get()。
def for_request(self, request, body=None): if body and 'oauth_client_id' in body: rv = Tenant.objects.get(pk=body['oauth_client_id']) if rv is not None: return rv, {} jwt_data = request.GET.get('signed_request') if not jwt_data: header = request.META.get('HTTP_AUTHORIZATION', '') jwt_data = header[4:] if header.startswith('JWT ') else None if not jwt_data: raise BadTenantError('Could not find JWT') try: oauth_id = jwt.decode(jwt_data, verify=False)['iss'] client = Tenant.objects.get(pk=oauth_id) if client is not None: data = jwt.decode(jwt_data, client.secret) return client, data except jwt.exceptions.DecodeError: pass raise BadTenantError('Could not find tenant')
def get_event_from_url_params(self, group_id, event_id=None, slug_vars=None): if event_id is not None: try: event = Event.objects.get(pk=int(event_id)) except (ValueError, Event.DoesNotExist): return None group = event.group if six.text_type(group.id) != group_id: return None else: try: group = Group.objects.get(pk=int(group_id)) except (ValueError, Group.DoesNotExist): return None event = group.get_latest_event() event = self._ensure_and_bind_event(event) if event is None: return None if slug_vars is not None: if slug_vars['org_slug'] != group.organization.slug or \ slug_vars['proj_slug'] != group.project.slug: return None return event
def url_by_alias(page_alias): """ Try to read page url from cache. If it's missing then try to find matching page that could be missing in cache. If page is found then refresh all url list since it's too old. If no matching is found then return None so we can throw any exception we want in other places """ if page_alias: url_to_alias = cache.get(cachekeys.URL_LIST_CACHE) if url_to_alias is None: url_to_alias = PageURLCache.refresh() url = url_to_alias.get(page_alias) else: url = None return url # Models:
def clients(request): data = {} for word in cache.keys("client_*"): client = re.sub(r'^client_', '', word) try: client_data = cache.get(word) data[client] = client_data except: raise profile_form = ContactForm(instance=Contact.objects.get(user=request.user.id)) return render(request, 'isubscribe/clients.html', {'DATA':data, 'profile_form': profile_form})
def subscriptions(request): data = {} for word in r.keys("subscription_*"): subscription = re.sub(r'^subscription_', '', str(word.decode('utf-8'))) try: subscription_data = r.lrange(word, 0, -1) data[subscription] = subscription_data except: raise profile_form = ContactForm(instance=Contact.objects.get(user=request.user.id)) return render(request, 'isubscribe/subscriptions.html', {'DATA':data, 'profile_form': profile_form}) #@login_required(login_url=reverse_lazy('login'))
def user_settings(request): logger.debug('settings view triggered by %s' % (request.user.username)) form = ContactForm(request.POST, instance=Contact.objects.get(user=request.user.id)) if form.is_valid: try: form.save() return HttpResponse('Done', status=200) except: return HttpResponse(json.dumps(form.errors), status=409) else: return HttpResponse(json.dumps(form.errors), status=409) return render(request, 'isubscribe/user_settings.html', {'DATA':data, 'form': form})
def check_config(request): mimetype = 'application/json' data = {} if request.method == 'POST' and 'entity' in request.POST and request.POST['entity'] != '': client_name, check_name = request.POST['entity'].split(':') #check_name = 'check_gw_tomcat_errors_1h' #data = cache.get('check_' + check_name) data = cache.get('check_' + request.POST['entity']) return HttpResponse(json.dumps(data), mimetype) #@login_required(login_url=reverse_lazy('login'))
def onduty_members(self): OnDuty = [] if 'OnDuty' in cache.keys('OnDuty'): OnDuty = cache.get('OnDuty') else: try: event_start, event_end, instance = ScheduledOccurrence.objects.filter(event__in=ScheduledEvent.objects.filter(event=0)).next_occurrence() NOW = datetime.datetime.now(datetime.timezone.utc).timestamp() if NOW >= event_start.timestamp() and NOW <= event_end.timestamp(): for user in instance.event.members_list(): OnDuty.append(user.pk) logger.debug('onduty_members found: %s' % OnDuty) #cache.set('OnDuty', OnDuty, timeout=event_end.timestamp()) cache.set('OnDuty', OnDuty, timeout=settings.ON_DUTY_CACHE_MEMBERS) else: logger.debug('onduty_members can not find onduty_members') except: logger.error('onduty_members failed finding onduty_members') pass return OnDuty
def user_dnd(self, user_pk): if 'DnD_' + str(user_pk) in cache.keys("DnD_*"): #DnD = cache.get('DnD_' + str(user_pk)) DnD = True else: DnD = False try: event_start, event_end, instance = ScheduledOccurrence.objects.filter(event__in=ScheduledEvent.objects.filter(event=1, members__in=[user_pk])).next_occurrence() NOW = datetime.datetime.now(datetime.timezone.utc).timestamp() if NOW >= event_start.timestamp() and NOW <= event_end.timestamp(): DnD = True cache.set('DnD_' + str(user_pk), DnD, timeout=event_end.timestamp()) except: pass return DnD
def test_binary_string(self): # Binary strings should be cacheable cache = self.cache from zlib import compress, decompress value = 'value_to_be_compressed' compressed_value = compress(value.encode()) # Test set cache.set('binary1', compressed_value) compressed_result = cache.get('binary1') self.assertEqual(compressed_value, compressed_result) self.assertEqual(value, decompress(compressed_result).decode()) # Test add cache.add('binary1-add', compressed_value) compressed_result = cache.get('binary1-add') self.assertEqual(compressed_value, compressed_result) self.assertEqual(value, decompress(compressed_result).decode()) # Test set_many cache.set_many({'binary1-set_many': compressed_value}) compressed_result = cache.get('binary1-set_many') self.assertEqual(compressed_value, compressed_result) self.assertEqual(value, decompress(compressed_result).decode())
def test_forever_timeout(self): """ Passing in None into timeout results in a value that is cached forever """ cache = self.cache cache.set('key1', 'eggs', None) self.assertEqual(cache.get('key1'), 'eggs') cache.add('key2', 'ham', None) self.assertEqual(cache.get('key2'), 'ham') added = cache.add('key1', 'new eggs', None) self.assertIs(added, False) self.assertEqual(cache.get('key1'), 'eggs') cache.set_many({'key3': 'sausage', 'key4': 'lobster bisque'}, None) self.assertEqual(cache.get('key3'), 'sausage') self.assertEqual(cache.get('key4'), 'lobster bisque')
def test_client_task_tester(client, clear_redis_store): url = reverse('task_tester') def fake_task(key, value, expires): cache.set(key, value, expires) _mock_function = 'tecken.views.sample_task.delay' with mock.patch(_mock_function, new=fake_task): response = client.get(url) assert response.status_code == 400 assert b'Make a POST request to this URL first' in response.content response = client.post(url) assert response.status_code == 201 assert b'Now make a GET request to this URL' in response.content response = client.get(url) assert response.status_code == 200 assert b'It works!' in response.content
def task_tester(request): if request.method == 'POST': cache.set('marco', 'ping', 100) sample_task.delay('marco', 'polo', 10) return http.HttpResponse( 'Now make a GET request to this URL\n', status=201, ) else: if not cache.get('marco'): return http.HttpResponseBadRequest( 'Make a POST request to this URL first\n' ) for i in range(3): value = cache.get('marco') if value == 'polo': return http.HttpResponse('It works!\n') time.sleep(1) return http.HttpResponseServerError( 'Tried 4 times (4 seconds) and no luck :(\n' )
def get_canonical(cls, language_code): """Returns the canonical `Language` object matching `language_code`. If no language can be matched, `None` will be returned. :param language_code: the code of the language to retrieve. """ try: return cls.objects.get(code__iexact=language_code) except cls.DoesNotExist: _lang_code = language_code if "-" in language_code: _lang_code = language_code.replace("-", "_") elif "_" in language_code: _lang_code = language_code.replace("_", "-") try: return cls.objects.get(code__iexact=_lang_code) except cls.DoesNotExist: return None
def validate_email(self): """Ensure emails are unique across the models tracking emails. Since it's essential to keep email addresses unique to support our workflows, a `ValidationError` will be raised if the email trying to be saved is already assigned to some other user. """ lookup = Q(email__iexact=self.email) if self.pk is not None: # When there's an update, ensure no one else has this address lookup &= ~Q(user=self) try: EmailAddress.objects.get(lookup) except EmailAddress.DoesNotExist: pass else: raise ValidationError({ 'email': [_('This email address already exists.')] })
def get(self, request): try: start, end = get_start_end_paramters(request, default_days_back=1) except ValueError as e: return HttpResponseBadRequest(str(e)) combine = request.query_params.get('combine') sites = request.query_params.getlist('site') telescopes = request.query_params.getlist('telescope') try: telescope_availability = get_telescope_availability_per_day( start, end, sites=sites, telescopes=telescopes ) except ElasticSearchException: logger.warning('Error connecting to ElasticSearch. Is SBA reachable?') return Response('ConnectionError') if combine: telescope_availability = combine_telescope_availabilities_by_site_and_class(telescope_availability) str_telescope_availability = {str(k): v for k, v in telescope_availability.items()} return Response(str_telescope_availability)
def archive_bearer_token(self): # During testing, you will probably have to copy access tokens from prod for this to work try: app = Application.objects.get(name='Archive') except Application.DoesNotExist: logger.error('Archive application not found. Oauth applications need to be populated.') return '' access_token = AccessToken.objects.filter(user=self.user, application=app, expires__gt=timezone.now()).last() if not access_token: access_token = AccessToken( user=self.user, application=app, token=uuid.uuid4().hex, expires=timezone.now() + timedelta(days=30) ) access_token.save() return access_token.token
def get_rise_set_intervals(request_dict, site=''): intervals = [] site = site if site else request_dict['location'].get('site', '') telescope_details = configdb.get_telescopes_with_instrument_type_and_location( request_dict['molecules'][0]['instrument_name'], site, request_dict['location'].get('observatory', ''), request_dict['location'].get('telescope', '') ) if not telescope_details: return intervals intervals_by_site = get_rise_set_intervals_by_site(request_dict) intervalsets_by_telescope = intervals_by_site_to_intervalsets_by_telescope(intervals_by_site, telescope_details.keys()) filtered_intervalsets_by_telescope = filter_out_downtime_from_intervalsets(intervalsets_by_telescope) filtered_intervalset = Intervals().union(filtered_intervalsets_by_telescope.values()) filtered_intervals = filtered_intervalset.toTupleList() return filtered_intervals
def __init__(self, start, end, telescopes=None, sites=None, instrument_types=None): try: self.es = Elasticsearch([settings.ELASTICSEARCH_URL]) except LocationValueError: logger.error('Could not find host. Make sure ELASTICSEARCH_URL is set.') raise ImproperlyConfigured('ELASTICSEARCH_URL') self.instrument_types = instrument_types self.available_telescopes = self._get_available_telescopes() sites = list({tk.site for tk in self.available_telescopes}) if not sites else sites telescopes = list({tk.telescope for tk in self.available_telescopes if tk.site in sites}) \ if not telescopes else telescopes self.start = start.replace(tzinfo=timezone.utc).replace(microsecond=0) self.end = end.replace(tzinfo=timezone.utc).replace(microsecond=0) cached_event_data = cache.get('tel_event_data') if cached_event_data: self.event_data = cached_event_data else: self.event_data = self._get_es_data(sites, telescopes) cache.set('tel_event_data', self.event_data, 1800)
def get(self): telescope_states = {} current_lump = dict(reasons=None, types=None, start=None) for event in self.event_data: if self._telescope(event['_source']) not in self.available_telescopes: continue if current_lump['start'] is None: current_lump = self._set_lump(event) continue if self._belongs_in_lump(event['_source'], current_lump): current_lump = self._update_lump(current_lump, event) else: lump_end = self._lump_end(current_lump, event['_source']) if lump_end >= self.start: telescope_states = self._update_states(telescope_states, current_lump, lump_end) current_lump = self._set_lump(event) if current_lump['start']: lump_end = self._lump_end(current_lump) telescope_states = self._update_states(telescope_states, current_lump, lump_end) return telescope_states
def set_mcqs_in_cache(): """ Set MCQs in cache if they have changed or have not been set. """ languages = { 'C': 'c_mcqs', 'J': 'java_mcqs', } # If MCQs have been changed or have not been created if not cache.get('mcqs_flag', False): for lang_code, cache_key in languages.items(): mcqs_json = extract_mcqs(lang_code) cache.set(cache_key, mcqs_json) # Mark MCQs as unchanged cache.set('mcqs_flag', True)
def _get_question_statuses(team): """ Returns a dictionary of Question numbers and statuses as key-value pairs. Status could be: 'S': Solved 'U': Unattempted """ status_dict = {} for ques in Question.objects.filter(language=team.lang_pref): try: team.teammcqanswer_set.get(question_no=ques.question_no) status = 'S' except TeamMcqAnswer.DoesNotExist: status = 'U' status_dict[ques.question_no] = status return status_dict
def get_token(self): """Get wechat access token.Store in cache""" access_token = cache.get('wx_access_token') if access_token: return access_token else: param = { 'grant_type': 'client_credential', 'appid': self.appid, 'secret': self.appsecret, } url = self.get_url('token', param) data = self.get_data(url) cache.set('wx_access_token', data['access_token'],\ int(data['expires_in'])) return data['access_token']
def search_endorsers(request): query = request.GET.get('q') endorsers = [] endorser_pks = set() if query: # First find the endorsers whose names start with this query. results = Endorser.objects.filter(name__istartswith=query) for endorser in results[:5]: endorser_pks.add(endorser.pk) endorsers.append(endorser) if results.count() < 5: results = Endorser.objects.filter(name__icontains=query) for endorser in results: if endorser.pk in endorser_pks: continue endorsers.append(endorser) if len(endorsers) == 5: break return JsonResponse({ 'endorsers': [{'pk': e.pk, 'name': e.name} for e in endorsers], })
def progress_wikipedia_missing(request, slug): if slug == NEWSPAPER_SLUG: endorsements = Endorsement.objects.filter( endorser__importednewspaper=None, endorser__tags=Tag.objects.get(name='Publication') ) else: position = Position.objects.get(slug=SLUG_MAPPING[slug]) endorsements = Endorsement.objects.filter( position=position, endorser__importedendorsement=None ) context = { 'slug': slug, 'endorsements': endorsements, } return render(request, 'progress/wikipedia_missing.html', context)
def render(self, region, context, timeout=None): """render(self, region, context, *, timeout=None) Render a single region using the context passed If ``timeout`` is ``None`` caching is disabled. .. note:: You should treat anything except for the ``region`` and ``context`` argument as keyword-only. """ if timeout is not None: key = self.cache_key(region) html = cache.get(key) if html is not None: return html html = mark_safe(''.join( self._renderer.render_plugin_in_context(plugin, context) for plugin in self._contents[region] )) if timeout is not None: cache.set(key, html, timeout=timeout) return html
def general_image(self, image_format='PNG'): fm_width = self.cleaned_data['width'] fm_height = self.cleaned_data['height'] key = '{}.{}.{}'.format(fm_width, fm_height, image_format) content = cache.get(key) if content is None: image = Image.new('RGB', (fm_width, fm_height), color=122) draw = ImageDraw.Draw(image) text = '{}x{}'.format(fm_width, fm_height) text_width, text_height = draw.textsize(text) if text_width < fm_width and text_height < fm_height: text_top = (fm_height - text_height) // 2 text_left = (fm_width - text_width) // 2 draw.text((text_top, text_left), text, fill=(255, 255, 255)) content = BytesIO() image.save(content, image_format) content.seek(0) cache.set(key, content, 60 * 60) return content
def test_setnx(self): # we should ensure there is no test_key_nx in redis self.cache.delete("test_key_nx") res = self.cache.get("test_key_nx", None) self.assertEqual(res, None) res = self.cache.set("test_key_nx", 1, nx=True) self.assertTrue(res) # test that second set will have res = self.cache.set("test_key_nx", 2, nx=True) self.assertFalse(res) res = self.cache.get("test_key_nx") self.assertEqual(res, 1) self.cache.delete("test_key_nx") res = self.cache.get("test_key_nx", None) self.assertEqual(res, None)
def test_setnx_timeout(self): # test that timeout still works for nx=True res = self.cache.set("test_key_nx", 1, timeout=2, nx=True) self.assertTrue(res) time.sleep(3) res = self.cache.get("test_key_nx", None) self.assertEqual(res, None) # test that timeout will not affect key, if it was there self.cache.set("test_key_nx", 1) res = self.cache.set("test_key_nx", 2, timeout=2, nx=True) self.assertFalse(res) time.sleep(3) res = self.cache.get("test_key_nx", None) self.assertEqual(res, 1) self.cache.delete("test_key_nx") res = self.cache.get("test_key_nx", None) self.assertEqual(res, None)
def test_save_dict(self): if isinstance(self.cache.client._serializer, json_serializer.JSONSerializer): self.skipTest("Datetimes are not JSON serializable") if isinstance(self.cache.client._serializer, msgpack_serializer.MSGPackSerializer): # MSGPackSerializer serializers use the isoformat for datetimes # https://github.com/msgpack/msgpack-python/issues/12 now_dt = datetime.datetime.now().isoformat() else: now_dt = datetime.datetime.now() test_dict = {"id": 1, "date": now_dt, "name": "Foo"} self.cache.set("test_key", test_dict) res = self.cache.get("test_key") self.assertIsInstance(res, dict) self.assertEqual(res["id"], 1) self.assertEqual(res["name"], "Foo") self.assertEqual(res["date"], now_dt)
def test_timeout_parameter_as_positional_argument(self): self.cache.set("test_key", 222, -1) res = self.cache.get("test_key", None) self.assertIsNone(res) self.cache.set("test_key", 222, 1) res1 = self.cache.get("test_key", None) time.sleep(2) res2 = self.cache.get("test_key", None) self.assertEqual(res1, 222) self.assertEqual(res2, None) # nx=True should not overwrite expire of key already in db self.cache.set("test_key", 222, 0) self.cache.set("test_key", 222, -1, nx=True) res = self.cache.get("test_key", None) self.assertEqual(res, 222)
def test_sentinel_switching(self): if not isinstance(self.cache.client, SentinelClient): self.skipTest("Not Sentinel clients use default master-slave setup") try: cache = caches["sample"] client = cache.client master = client.get_client(write=True) slave = client.get_client(write=False) master.set("Foo", "Bar") self.assertEqual(slave.get("Foo"), "Bar") self.assertEqual(master.info()['role'], "master") self.assertEqual(slave.info()['role'], "slave") except NotImplementedError: pass
def test_invalid_key(self): # Submitting an invalid session key (either by guessing, or if the db has # removed the key) results in a new key being generated. try: session = self.backend('1') try: session.save() except AttributeError: self.fail( "The session object did not save properly. " "Middleware may be saving cache items without namespaces." ) self.assertNotEqual(session.session_key, '1') self.assertEqual(session.get('cat'), None) session.delete() finally: # Some backends leave a stale cache entry for the invalid # session key; make sure that entry is manually deleted session.delete('1')
def query_user_by_id(user_id=0, use_cache=True): """ ???????? :param user_id: ???ID :param use_cache: ?????? """ key = CACHE_KEY+str(user_id) if use_cache: account = cache.get(key) if account: return account try: account = UserAccount.objects.get(id=user_id) cache.set(key, account, CACHE_TIME) return account except UserAccount.DoesNotExist: return None
def query_token_by_user_id(user_id, use_cache=True): """ ?????ID?????Token :param user_id: ??ID :param use_cache: ?????? """ key = CACHE_TOKEN_ID+str(user_id) if use_cache: token = cache.get(key) if token: return token try: token = AccessToken.objects.order_by("-id").filter(status=1).get(user_id=user_id) cache.set(key, token, CACHE_TIME) return token except AccessToken.DoesNotExist: return None
def query_token(token, use_cache=True): """ ????token?????? """ key = CACHE_TOKEN+token if use_cache: token_result = cache.get(key) if token_result: return token_result try: token = AccessToken.objects.order_by("-id").filter(status=1).get(access_token=token) cache.set(key, token, CACHE_TIME) return token except AccessToken.DoesNotExist: return None
def query_user_meta_count(user_id, is_follow=True, use_cache=True): """ ??????????????? :param user_id: ???????ID :param is_follow: ???True??????????????? :param use_cache: ?????? """ cache_key = CACHE_FANS_COUNT_KEY + str(user_id) if is_follow: cache_key = CACHE_FOLLOW_COUNT_KEY + str(user_id) if use_cache: count = cache.get(cache_key) if count: return count if is_follow: count = UserFollow.objects.filter(user_id=user_id, status=1).aggregate(Count("id")) else: count = UserFollow.objects.filter(follow_user=user_id, status=1).aggregate(Count("id")) count = count["id__count"] cache.set(cache_key, count, CACHE_COUNT_TIME) return count
def query_format_info_by_ease_mob(ease_mob, use_cache=True): """ ????????????????? """ key = CACHE_EASE_MOB_KEY + ease_mob if use_cache: result = cache.get(key) if result: return result try: user_info = UserInfo.objects.get(ease_mob=ease_mob) format_user_info = UserInfo.format_user_info(user_info) cache.set(key, format_user_info, CACHE_TIME) return format_user_info except UserInfo.DoesNotExist: return None
def query_format_info_by_user_id(user_id, use_cache=True): """ ????ID?????? :param user_id: ??ID :param use_cache: ?????? """ key = CACHE_KEY + str(user_id) if use_cache: result = cache.get(key) if result: return result try: user_info = UserInfo.objects.get(user_id=user_id) format_user_info = UserInfo.format_user_info(user_info) cache.set(key, format_user_info, CACHE_TIME) return format_user_info except UserInfo.DoesNotExist: return None
def query_published_article_count(user_id, use_cache=True): """ ?????????????? :param user_id: ??ID :param use_cache: ?????? """ cache_key = CACHE_ARTICLE_COUNT + str(user_id) if use_cache: count = cache.get(cache_key) if count: return count count = BlogArticle.objects.filter(user_id=user_id, status=1).aggregate(Count("id")) count = count["id__count"] if count: cache.set(cache_key, count, CACHE_TIME) return count
def query_article_by_id(article_id=0, use_cache=True): """ ????ID??????? :param article_id: ??ID :param use_cache: ?????? """ key = CACHE_KEY_ID + str(article_id) if use_cache: cache_result = cache.get(key) if cache_result: return cache_result try: article = BlogArticle.objects.get(id=article_id) article = BlogArticle.format_article(article) cache.set(key, article, CACHE_TIME) return article except BlogArticle.DoesNotExist: return None
def commits_over_52(self): cache_name = self.cache_namer(self.commits_over_52) value = cache.get(cache_name) if value is not None: return value now = datetime.now() commits = self.commit_set.filter( commit_date__gt=now - timedelta(weeks=52), ).values_list('commit_date', flat=True) weeks = [0] * 52 for cdate in commits: age_weeks = (now - cdate).days // 7 if age_weeks < 52: weeks[age_weeks] += 1 value = ','.join(map(str, reversed(weeks))) cache.set(cache_name, value) return value
def record_plugin_history(self, sender, instance, **kwargs): """When a plugin is created or edited""" from cms.models import CMSPlugin, Page from .models import EditHistory if not isinstance(instance, CMSPlugin): return user_id = cache.get('cms-user-id') comment = cache.get('cms-comment') content = generate_content(instance) if content is None: return # Don't record a history of change if nothing changed. history = EditHistory.objects.filter(plugin_id=instance.id) if history.count() > 0: # Temporary history object for uuid this = EditHistory(content=content) latest = history.latest() if latest.content == content or this.uuid == latest.uuid: return EditHistory.objects.record(instance, user_id, comment, content)
def get_token(self, token_only=True, scopes=None): if scopes is None: scopes = ['send_notification', 'view_room'] cache_key = 'hipchat-tokens:%s:%s' % (self.id, ','.join(scopes)) def gen_token(): data = { 'grant_type': 'client_credentials', 'scope': ' '.join(scopes), } resp = requests.post( self.token_url, data=data, auth=HTTPBasicAuth(self.id, self.secret), timeout=10 ) if resp.status_code == 200: return resp.json() elif resp.status_code == 401: raise OauthClientInvalidError(self) else: raise Exception('Invalid token: %s' % resp.text) if token_only: token = cache.get(cache_key) if not token: data = gen_token() token = data['access_token'] cache.set(cache_key, token, data['expires_in'] - 20) return token return gen_token()
def update_room_info(self, commit=True): headers = { 'Authorization': 'Bearer %s' % self.get_token(), 'Content-Type': 'application/json' } room = requests.get( urljoin(self.api_base_url, 'room/%s') % self.room_id, headers=headers, timeout=5 ).json() self.room_name = room['name'] self.room_owner_id = six.text_type(room['owner']['id']) self.room_owner_name = room['owner']['name'] if commit: self.save()
def __exit__(self, exc_type, exc_value, tb): # If we get an invalid oauth client we better clean up the tenant # and swallow the error. if isinstance(exc_value, OauthClientInvalidError): self.tenant.delete() return True
def for_request(request, body=None): """Creates the context for a specific request.""" tenant, jwt_data = Tenant.objects.for_request(request, body) webhook_sender_id = jwt_data.get('sub') sender_data = None if body and 'item' in body: if 'sender' in body['item']: sender_data = body['item']['sender'] elif 'message' in body['item'] and 'from' in body['item']['message']: sender_data = body['item']['message']['from'] if sender_data is None: if webhook_sender_id is None: raise BadTenantError('Cannot identify sender in tenant') sender_data = {'id': webhook_sender_id} return Context( tenant=tenant, sender=HipchatUser( id=sender_data.get('id'), name=sender_data.get('name'), mention_name=sender_data.get('mention_name'), ), signed_request=request.GET.get('signed_request'), context=jwt_data.get('context') or {}, )
def get_event(self, event_id): try: event = Event.objects.get(pk=int(event_id)) except (ValueError, Event.DoesNotExist): return None return self._ensure_and_bind_event(event)
def process_request(self, request, extra_context=None): """Main page processing logic""" context = self.get_rendering_context(request) if extra_context: context.update(extra_context) cache_key, seconds = self.get_cache_settings(request) if cache_key: content = cache.get(cache_key) if content is None: content = self.render(context) cache.set(cache_key, content, seconds) else: content = self.render(context) return self.create_response(request, content)