我们从Python开源项目中,提取了以下24个代码示例,用于说明如何使用django.contrib.messages.get_messages()。
def test_updating_cart_item(self): session = self.client.session cart = self._create_testing_cart() cart.session_key = session.session_key cart.save() cart_item = self._create_testing_cart_item(cart_instance=cart, product_instance=self.test_product) response = self.client.post(reverse('cart:update', kwargs={'pk': cart_item.pk}), data={'cart_item_quantity': '2'}, follow=True ) messages = [msg for msg in get_messages(response.wsgi_request)] updated_quantity = response.context['cart'].items.first().quantity cart_item.quantity = updated_quantity cart_item.save() self.assertEqual(response.status_code, 200) self.assertEqual(cart_item.quantity, 2) self.assertEqual(cart_item.total_price, Decimal(cart_item.quantity * cart_item.product.price)) self.assertEqual(messages[0].tags, 'success', 'Message type should return success type') self.assertEqual(messages[0].message, 'Product quantity has been updated.')
def is_valid(self): ''' For this form to be considered valid, there must be not only no errors, but also no messages on the request that need to be shown. ''' valid = super(RegistrationContactForm,self).is_valid() msgs = messages.get_messages(self._request) # We only want validation messages to show up once, so pop messages that have already show up # before checking to see if any messages remain to be shown. prior_messages = self._session.pop('prior_messages',[]) remaining_messages = [] for m in msgs: m_dict = {'message': m.message, 'level': m.level, 'extra_tags': m.extra_tags} if m_dict not in prior_messages: remaining_messages.append(m_dict) if remaining_messages: self._session['prior_messages'] = remaining_messages self._request.session.modified = True return False return valid
def running_jobs(request): #Strategies = Strategy.objects.all() if request.user.is_authenticated(): username = request.user.username useremail = request.user.email messages.get_messages(request) template = get_template('strategy.html') request_context = RequestContext(request) request_context.push(locals()) html = template.render(request_context) running_jobs = scheduler.get_jobs() #running_jobs = [] today = datetime.datetime.now() #return HttpResponse(html) return render_to_response('running_jobs.html', {"running_jobs": running_jobs, "username":username, "today":today}, context_instance=RequestContext(request)) #initial apscheduler
def assert_message_contains(self, response, message, level=None): messages = self.get_messages(response) found = self.filter_messages(messages, message, level) if not found: messages = [ '%s (%s)' % (msg.message, msg.level) for msg in messages ] if level: pytest.fail( 'Message %r with level %r not found in request. ' 'Available messages: %r' % (message, level, messages) ) else: pytest.fail( 'Message %r not found in request. ' 'Available messages: %r' % (message, messages) )
def test_deleting_cart_item(self): session = self.client.session cart = self._create_testing_cart() cart.session_key = session.session_key cart.save() cart_item = self._create_testing_cart_item( cart_instance=cart, product_instance=self.test_product ) response = self.client.post(reverse('cart:remove', kwargs={'product_id': cart_item.product_id}), data={'product_id': cart_item.product_id}, follow=True) messages = [msg for msg in get_messages(response.wsgi_request)] self.assertEqual(response.status_code, 200) self.assertEqual(messages[0].tags, 'success', 'Message type should return success type' ) self.assertEqual( messages[0].message, 'The item has been deleted from your cart.', 'Message text should be equal to: The item has been deleted from ' 'your cart') self.assertEqual(cart.items.count(), 0, 'Cart should have zero items.')
def horizon_message_already_queued(request, message): _message = force_text(message) if request.is_ajax(): for tag, msg, extra in request.horizon['async_messages']: if _message == msg: return True else: for msg in _messages.get_messages(request)._queued_messages: if msg.message == _message: return True return False
def error(request): msgs = messages.get_messages(request) return TemplateResponse(request, 'bom/error.html', locals())
def billing_view(request): """Shows CC info and transaction history.""" user_profile = UserProfile.objects.get(user=request.user) network = user_profile.network transactions = (network.ledger.transaction_set.filter( kind__in=["credit", "adjustment"]).order_by('-created')) transaction_paginator = Paginator(transactions, 10) page = request.GET.get('page', 1) try: transactions = transaction_paginator.page(page) except PageNotAnInteger: # If page is not an integer, deliver first page. transactions = transaction_paginator.page(1) except EmptyPage: # If page is out of range (e.g. 9999), deliver last page of results. num_pages = transaction_paginator.num_pages transactions = transaction_paginator.page(num_pages) context = { 'networks': get_objects_for_user(request.user, 'view_network', klass=Network), 'user_profile': user_profile, 'transactions': transactions } # deal with messages; template needs to explicitly handle messages msgs = messages.get_messages(request) for m in msgs: if "billing_resp_code" in m.tags: context[m.message] = True # pass the message on to the template as-is if network.stripe_card_type == "American Express": context['card_type'] = 'AmEx' else: context['card_type'] = network.stripe_card_type # Pass in card types that have icons. context['cards_with_icons'] = ['Visa', 'AmEx', 'MasterCard', 'Discover'] t = get_template("dashboard/billing.html") html = t.render(context, request) return HttpResponse(html)
def dispatch(self, request, *args, **kwargs): context = {} if not request.user.is_authenticated: return JsonResponse(context) customer = getattr(request.user,'customer',None) if customer: context.update({ 'customer': True, 'first_name': customer.first_name or request.user.first_name, 'last_name': customer.last_name or request.user.last_name, 'email': customer.email or request.user.email, 'phone': customer.phone, }) else: context.update({ 'customer': False, 'first_name': request.user.first_name, 'last_name': request.user.last_name, 'email': request.user.email, }) # Also add any outstanding messages (e.g. login successful message) to be # relayed to the user when this information is used. context['messages'] = [] for message in messages.get_messages(request): context['messages'].append({ "level": message.level, "message": message.message, "extra_tags": message.tags, }) return JsonResponse(context)
def index(request, pid=None, del_pass=None): if request.user.is_authenticated(): username = request.user.username useremail = request.user.email messages.get_messages(request) template = get_template('index.html') request_context = RequestContext(request) request_context.push(locals()) html = template.render(request_context) return HttpResponse(html)
def strategy(request): Strategies = Strategy.objects.all() if request.user.is_authenticated(): username = request.user.username useremail = request.user.email messages.get_messages(request) template = get_template('strategy.html') request_context = RequestContext(request) request_context.push(locals()) html = template.render(request_context) return HttpResponse(html)
def get_messages(self, response): if self._messages is None: self._messages = get_messages(response.wsgi_request) return self._messages
def assert_message_count(self, response, expected): messages = self.get_messages(response) actual_num = len(messages) if actual_num != expected: pytest.fail( 'Message count was %d, expected %d' % (actual_num, expected) )
def assert_message_misses(self, response, message, level=None): messages = self.get_messages(response) found = self.filter_messages(messages, message, level) if found: if level: pytest.fail( 'Message %r with level %r found in request' % (message, level) ) else: pytest.fail('Message %r found in request' % message)
def messages_to_json(request): json = {'messages': []} for message in messages.get_messages(request): json['messages'].append({ "level": message.level, "level_tag": message.level_tag, "message": message.message, }) json['messages_html'] = render_to_string( 'home/includes/messages.html', {'messages': messages.get_messages(request)}) return json
def test_alert_messages(self, alert_messages, expected_messages): """ Test that the alert_messages template tag returns the correct message given a message type. """ request = self._get_mock_request() self._add_messages(request, alert_messages) template = Template("{% load enterprise %} {% alert_messages messages %}") rendered = template.render(Context({'messages': messages.get_messages(request)})) for expected_icon, expected_message in expected_messages: assert expected_icon in rendered.strip() assert expected_message in rendered.strip()
def build_ajax_messages(request): """Create a structure suitable for converting to json from messages""" ajax_messages = [] for message in messages.get_messages(request): ajax_messages.append({ 'level': message.level, 'message': message.message, 'extra_tags': message.tags }) return ajax_messages
def cache_if_anon(timeout): """Cache the view if the user is not authenticated and there are no messages to display.""" # https://stackoverflow.com/questions/11661503/django-caching-for-authenticated-users-only def _decorator(view_func): @wraps(view_func, assigned=available_attrs(view_func)) def _wrapped_view(request, *args, **kwargs): if request.user.is_authenticated() or messages.get_messages(request): return view_func(request, *args, **kwargs) else: return cache_page(timeout)(view_func)(request, *args, **kwargs) return _wrapped_view return _decorator
def test_delete_message_mixin(self): request = self.factory.post('/fake-path', {}) request._messages = default_storage(request) DeleteMessageView.as_view()(request) self.assertEqual(len(get_messages(request)), 1)
def test_extra_forms_and_formsets_and_update_message_mixin(self): content_type = ContentType.objects.get( app_label='contenttypes', model='contenttype', ) request = self.factory.post('/fake-path', { 'app_label': content_type.app_label, 'model': content_type.model, 'permission_set-TOTAL_FORMS': 4, 'permission_set-INITIAL_FORMS': 3, 'permission_set-MIN_NUM_FORMS': 0, 'permission_set-MAX_NUM_FORMS': 1000, 'permission_set-0-name': "Can add content type", 'permission_set-0-codename': "add_contenttype", 'permission_set-0-id': 13, 'permission_set-0-content_type': 5, 'permission_set-1-name': "Can change content type", 'permission_set-1-codename': "change_contenttype", 'permission_set-1-id': 14, 'permission_set-1-content_type': 5, 'permission_set-2-name': "Can delete content type", 'permission_set-2-codename': "delete_contenttype", 'permission_set-2-id': 15, 'permission_set-2-content_type': 5, 'permission_set-3-name': "Can test content type", 'permission_set-3-codename': "test_contenttype", 'permission_set-3-content_type': 5, }) request._messages = default_storage(request) response = ExtraFormsAndFormsetsAndUpdateView.as_view()( request, pk=content_type.pk, ) self.assertEqual( response.permission_set.get(codename='test_contenttype').codename, 'test_contenttype' ) self.assertEqual(len(get_messages(request)), 1)
def clearMsg(self, request): storage = messages.get_messages(request) for msg in storage: try: del msg._loaded_messages except: pass # Check permissions for model and object both
def render_messages(request): context = {"messages": messages.get_messages(request)} return render_to_string("pinax_theme_bootstrap:_messages.html", context)
def check_server_configs(request, server_id): """Checks server's configuration. When server has some installs it should have particular configs for these installs. """ server = get_object_or_404(models.Server, id=server_id) installs = models.Install.objects.filter(server_id=server.id) for i in installs: if i.item in ['mysql', 'postgresql']: if i.item == 'mysql': type_db = 'M' else: type_db = 'P' message_01 = '{}: {}: Db not found'.format(server.code, i.item) message_02 = '{}: {}: Db version not correct (It should be x.x)'\ .format(server.code, i.item) message_03 = '{}: {}: root user not found'\ .format(server.code, i.item) try: db = models.Db.objects.get( server_id=server.id, type='S', type_db=type_db) if not re.match('\d\.\d$', db.version): messages.error(request, message_02) models.User.objects.get(type='db', db_id=db.id, name='root') except models.Db.DoesNotExist: messages.error(request, message_01) messages.error(request, message_03) except models.User.DoesNotExist: messages.error(request, message_03) for filename in i.required_files(): dic = dict(server_id=server.id, item=i.item, filename=filename) if not models.Conf.objects.filter(**dic).exists(): message = '{}: {}: {} not found'\ .format(server.code, i.item, filename) messages.error(request, message) storage = messages.get_messages(request) if len(storage) == 0: messages.success(request, "Server is ready for install.") return HttpResponseRedirect('/admin/api/server/{}/'.format(server_id))
def get_react_config(context): if 'request' in context: user = context['request'].user messages = get_messages(context['request']) try: is_superuser = context['request'].is_superuser() except AttributeError: is_superuser = False else: user = None messages = [] is_superuser = False if user: user = extract_lazy_object(user) enabled_features = [] if features.has('organizations:create', actor=user): enabled_features.append('organizations:create') if features.has('auth:register', actor=user): enabled_features.append('auth:register') version_info = _get_version_info() needs_upgrade = False if is_superuser: needs_upgrade = _needs_upgrade() context = { 'singleOrganization': settings.SENTRY_SINGLE_ORGANIZATION, 'urlPrefix': options.get('system.url-prefix'), 'version': version_info, 'features': enabled_features, 'mediaUrl': get_asset_url('sentry', ''), 'needsUpgrade': needs_upgrade, 'dsn': _get_public_dsn(), 'messages': [{ 'message': msg.message, 'level': msg.tags, } for msg in messages], } if user and user.is_authenticated(): context.update({ 'isAuthenticated': True, 'user': serialize(user, user), }) context['user']['isSuperuser'] = is_superuser else: context.update({ 'isAuthenticated': False, 'user': None, }) return mark_safe(json.dumps(context))