我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用django.contrib.messages.warning()。
def index(self, request, *args, **kwargs): extra_context = {} errors = validate_server_db1() errors += validate_server_db2() errors += validate_server_db3() errors += validate_project_db1() errors += validate_project_db2() errors += validate_uwsgi() errors += validate_server_ip() errors += validate_project_dirs() if errors: for e in errors: messages.warning(request, e) return admin.site.__class__.index( self, request, extra_context=extra_context, *args, **kwargs)
def validate_voucher(view): """Decorate a view making it check whether a discount voucher is valid. If the voucher is invalid it will be removed and the user will be redirected to the checkout summary view. """ @wraps(view) def func(request, checkout, cart): if checkout.voucher_code: try: Voucher.objects.active().get(code=checkout.voucher_code) except Voucher.DoesNotExist: del checkout.voucher_code checkout.recalculate_discount() msg = pgettext( 'Checkout warning', 'This voucher has expired. Please review your checkout.') messages.warning(request, msg) return redirect('checkout:summary') return view(request, checkout, cart) return func
def scan_scan_list(request: HttpRequest, scan_list_id: int) -> HttpResponse: """Schedule the scan of a scan list.""" scan_list = get_object_or_404( ScanList.objects.prefetch_related(Prefetch( 'sites', queryset=Site.objects.select_related('last_scan') \ .annotate_most_recent_scan_start() \ .annotate_most_recent_scan_end_or_null()) ), pk=scan_list_id) was_any_site_scannable = scan_list.scan() if was_any_site_scannable: num_scanning_sites = Scan.objects.filter(end__isnull=True).count() messages.success(request, _("Scans for this list have been scheduled. "+ \ "The total number of sites in the scanning queue "+ \ "is %i (including yours)." % num_scanning_sites)) else: messages.warning(request, _('All sites have been scanned recently. Please wait 30 minutes and try again.')) return redirect(reverse('frontend:view_scan_list', args=(scan_list_id,)))
def save_related(self, request, form, *args, **kwargs): """ If the user has saved a gallery with a photo that belongs only to different Sites - it might cause much confusion. So let them know. """ super(GalleryAdmin, self).save_related(request, form, *args, **kwargs) orphaned_photos = form.instance.orphaned_photos() if orphaned_photos: msg = ungettext( 'The following photo does not belong to the same site(s)' ' as the gallery, so will never be displayed: %(photo_list)s.', 'The following photos do not belong to the same site(s)' ' as the gallery, so will never be displayed: %(photo_list)s.', len(orphaned_photos) ) % {'photo_list': ", ".join([photo.title for photo in orphaned_photos])} messages.warning(request, msg)
def add_slack_btn(request): code = request.GET.get("code", "") if len(code) < 8: return HttpResponseBadRequest() result = requests.post("https://slack.com/api/oauth.access", { "client_id": settings.SLACK_CLIENT_ID, "client_secret": settings.SLACK_CLIENT_SECRET, "code": code }) doc = result.json() if doc.get("ok"): channel = Channel() channel.user = request.team.user channel.kind = "slack" channel.value = result.text channel.save() channel.assign_all_checks() messages.success(request, "The Slack integration has been added!") else: s = doc.get("error") messages.warning(request, "Error message from slack: %s" % s) return redirect("hc-channels")
def project_details(request, project_id): proj = Project.objects.filter( users_assigned=request.user.id, pk=project_id) if not proj: messages.warning( request, 'You are not authorized to view this project') return redirect('/taskManager/dashboard') else: proj = Project.objects.get(pk=project_id) user_can_edit = request.user.has_perm('project_edit') return render(request, 'taskManager/project_details.html', {'proj': proj, 'user_can_edit': user_can_edit}) # A4: Insecure Direct Object Reference (IDOR)
def change_password(request): if request.method == 'POST': user = request.user old_password = request.POST.get('old_password') new_password = request.POST.get('new_password') confirm_password = request.POST.get('confirm_password') if authenticate(username=user.username, password=old_password): if new_password == confirm_password: user.set_password(new_password) user.save() messages.success(request, 'Password Updated') else: messages.warning(request, 'Passwords do not match') else: messages.warning(request, 'Invalid Password') return render(request, 'taskManager/change_password.html', {'user': request.user})
def activate(request, uid, token): try: uid = force_text(urlsafe_base64_decode(uid)) user = User.objects.get(pk=uid) if request.user != user: messages.warning(request, "User email can be verified") return redirect('root') except (TypeError, ValueError, OverflowError, User.DoesNotExist): messages.warning(request, "User email can be verified") return redirect('root') if account_activation_token.check_token(user, token): messages.success(request, "Email verified!") user.email_verified = True user.save() auth.login(request, user) else: messages.error(request, "This email verification url has expired") return redirect('root')
def index(request): signupForm = SignupForm() signinForm = SigninForm() context = { 'signupForm': signupForm, 'signinForm': signinForm, 'message': messages.debug(request, 'Bleh!') } # messages.debug(request, '%s SQL statements were executed.' % count) # messages.info(request, 'Three credits remain in your account.') # messages.success(request, 'Profile details updated.') # messages.warning(request, 'Your account expires in three days.') # messages.error(request, 'Bleh!') return render(request, 'user/index.html', context)
def form_valid(self, form): form.save() if self.request.POST.get('test', '0').strip() == '1': backend = self.request.event.get_mail_backend(force_custom=True) try: backend.test(self.request.event.settings.mail_from) except Exception as e: messages.warning(self.request, _('An error occured while contacting the SMTP server: %s') % str(e)) return redirect(reverse('orga:settings.mail.edit', kwargs={'event': self.request.event.slug})) else: if form.cleaned_data.get('smtp_use_custom'): messages.success(self.request, _('Yay, your changes have been saved and the connection attempt to ' 'your SMTP server was successful.')) else: messages.success(self.request, _('We\'ve been able to contact the SMTP server you configured. ' 'Remember to check the "use custom SMTP server" checkbox, ' 'otherwise your SMTP server will not be used.')) else: messages.success(self.request, _('Yay! We saved your changes.')) ret = super().form_valid(form) return ret
def dispatch(self, request, *args, **kwargs): super().dispatch(request, *args, **kwargs) submission = self.get_object() nick = request.POST.get('nick') try: if '@' in nick: speaker = User.objects.get(email__iexact=nick) else: speaker = User.objects.get(nick__iexact=nick) except User.DoesNotExist: speaker = create_user_as_orga(request.POST.get('nick'), submission=submission) if not speaker: messages.error(request, _('Please provide a valid nick or email address!')) else: if submission not in speaker.submissions.all(): speaker.submissions.add(submission) speaker.save(update_fields=['submissions']) submission.log_action('pretalx.submission.speakers.add', person=request.user, orga=True) messages.success(request, _('The speaker has been added to the submission.')) else: messages.warning(request, _('The speaker was already part of the submission.')) if not speaker.profiles.filter(event=request.event).exists(): SpeakerProfile.objects.create(user=speaker, event=request.event) return redirect(submission.orga_urls.speakers)
def post(self, request, pk=id): form = UploadFileForm(request.POST, request.FILES) if form.is_valid(): try: sitefile=request.FILES['file'] user = request.user print sitefile task = bulkuploadsites.delay(user, sitefile, pk) if CeleryTaskProgress.objects.create(task_id=task.id, user=user, task_type=0): messages.success(request, 'Sites are being uploaded. You will be notified in notifications list as well.') else: messages.success(request, 'Sites cannot be updated a the moment.') return HttpResponseRedirect(reverse('fieldsight:proj-site-list', kwargs={'pk': pk})) except Exception as e: form.full_clean() form._errors[NON_FIELD_ERRORS] = form.error_class(['Sites Upload Failed, UnSupported Data', e]) messages.warning(request, 'Site Upload Failed, UnSupported Data ') return render(request, 'fieldsight/upload_sites.html', {'form': form, 'project': pk})
def form_valid(self, form): data = form.clean() if data.get('file_field'): rules = data['file_field'].read().decode('utf8') elif data.get('rules'): rules = data.get('rules') else: messages.warning(self.request, 'Missing rules') return self.form_invalid(form) try: counters = prometheus.import_rules(rules) messages.info(self.request, 'Imported %s' % counters) return redirect('rule-import') except: messages.error(self.request, 'Error importing rules') return self.form_invalid(form)
def __call__(self, request): # This works the same as the django middleware # django.contrib.sites.middleware.CurrentSiteMiddleware # but ensures that it uses our proxy object so that test cases # properly find our rule_set object request.site = models.Site.objects.get_current() # Get our logged in user to use with our audit logging plugin if request.user.is_authenticated(): _user.value = request.user response = self.get_response(request) triggers = { 'Config': trigger_write_config.send, 'Rules': trigger_write_rules.send, 'URLs': trigger_write_urls.send, } for msg, func in triggers.items(): for (receiver, status) in func(self, request=request, force=True): if status is False: messages.warning(request, 'Error queueing %s ' % msg) return response
def remove_member(request, team_id, user_id): team = get_object_or_404(models.Team, pk=team_id) # Only staff and team's captain can edit team if not request.user.is_staff and team.captain != request.user: return HttpResponseNotFound() user = get_object_or_404(users_models.User, pk=user_id) with transaction.atomic(): if user == team.captain: messages.warning(request, 'Captain can\'t leave the team') return redirect(team) if user not in team.members.all(): messages.warning(request, 'User %s not in the team' % (user.username,)) return redirect(team) team.members.remove(user) team.save() messages.success(request, 'User %s has left the team %s' % (user.username, team.name)) return redirect(team)
def restart(request,app,model,object_id,instance): """ :param request: :param app: :param model: :param object_id: :return: """ try: inst = Instance.objects.get(id=int(instance)) if request.user == inst.starter: inst.delete() messages.success(request,_("workflow restarted success")) else: messages.warning(request,_("you do not have the permission to restart,only the starter can restart")) except Exception,e: messages.error(request,e) return HttpResponseRedirect("/admin/%s/%s/%s"%(app,model,object_id))
def get_user_name(backend, details, response, is_new=False, *args, **kwargs): request = backend.strategy.request errors = {} if is_new==False: return {} if request.method == 'GET': return render(request, 'stud_auth/select_user_name.html', {}) else: if request.POST.get('cancel_button'): messages.warning(request, u"Entrance is canceled") return HttpResponseRedirect(reverse('home')) elif request.POST.get('success_button'): final_username = request.POST.get('username') if final_username != '': if len(User.objects.filter(username=final_username)) != 0: errors['username'] = _(u"Sorry, but the same username already exists") else: errors['username'] = _(u"Please, enter your username") if errors: return render(request, 'stud_auth/select_user_name.html', {'errors': errors, 'user_name': final_username}) else: return {'username': final_username}
def post(self, request, *args, **kwargs): if request.POST.get('cancel_button'): messages.warning(request, _(u"Deleting group calceled")) return HttpResponseRedirect(reverse('groups')) else: group = Group.objects.get(pk=kwargs['pk']) if Student.objects.filter(student_group=group): messages.error(request, _(u"Deletion is impossible. There are students in the group")) return HttpResponseRedirect(reverse('groups')) else: exams = Exam.objects.filter(exam_group=group) if exams: for exam in exams: if Result.objects.filter(result_exam=exam): messages.error(request, _(u"Deletion is impossible. There are available results for this group")) return HttpResponseRedirect(reverse('groups')) return super(GroupDeleteView, self).post(request, *args, **kwargs)
def results_delete(request, rid): if request.method == "POST": if request.POST.get('cancel_button'): messages.warning(request, _(u"Deleting results of exam canceled")) else: exam = Exam.objects.get(pk=int(rid)) exam.is_completed = False results = Result.objects.filter(result_exam=exam) results.delete() exam.save() messages.success(request, _(u"Information about results of exam %s deleted successfully") % exam.name) return HttpResponseRedirect(reverse('results')) else: try: exam = Exam.objects.get(pk=int(rid)) except: messages.error(_(u'There was an error on the server. Please try again later')) return HttpResponseRedirect(reverse('results')) else: return render(request, 'students/results_confirm_delete.html', {'exam': exam})
def get_pending_enrollment_message(cls, pending_users, enrolled_in): """ Create message for the users who were enrolled in a course or program. Args: users: An iterable of PendingEnterpriseCustomerUsers who were successfully linked with a pending enrollment enrolled_in (str): A string identifier for the course or program the pending users were linked to Returns: tuple: A 2-tuple containing a message type and message text """ pending_emails = [pending_user.user_email for pending_user in pending_users] return ( 'warning', _( "The following learners do not have an account on " "{platform_name}. They have not been enrolled in " "{enrolled_in}. When these learners create an account, they will " "be enrolled automatically: {pending_email_list}" ).format( platform_name=settings.PLATFORM_NAME, enrolled_in=enrolled_in, pending_email_list=', '.join(pending_emails), ) )
def save_account_group(request, account, group_form): """Add a group to an account""" group = group_form.cleaned_data['group'] special_case = ( (group.is_admin_group() or group.is_protected_group()) and account.is_default_account()) if special_case: messages.error( request, 'Default user may not be added to "%s" group.' % group) else: try: account.accountgroup_set.get(id=group.id) messages.warning(request, 'Group was not added as it has already been added.') except AccountGroup.DoesNotExist: account.accountgroup_set.add(group) messages.success( request, 'Added "%s" to group "%s"' % (account, group)) return HttpResponseRedirect(reverse('useradmin-account_detail', args=[account.id]))
def custom_delete_selected(self, request, queryset): if request.POST.get('post') != 'yes': #the confirm page, or user not confirmed return self.default_delete_action[0](self, request, queryset) #user confirm to delete the publishes, execute the custom delete logic. result = None failed_objects = [] for style in queryset: try: style.delete() except: error = sys.exc_info() failed_objects.append((style.identifier, traceback.format_exception_only(error[0], error[1]))) #remove failed, continue to process the next publish continue if failed_objects: messages.warning(request, mark_safe("Some selected styles are deleted failed:<ul>{0}</ul>".format("".join(["<li>{0} : {1}</li>".format(o[0], o[1]) for o in failed_objects])))) else: messages.success(request, "All selected styles are deleted successfully")
def publish(self, request, queryset): result = None failed_objects = [] data = {"layers": [record.identifier for record in queryset]} res = None try: res = requests.post(request, "{}/api/metajobs/".format(settings.BORG_URL), json=data) res.raise_for_status() result = res.json() if result["status"]: messages.success(request, "All selected records are published successfully") else: for layer, status in result.items(): if layer == "status": continue if status["status"]: continue failed_objects.append((layer, status["message"])) if failed_objects: messages.warning(request, mark_safe("Some selected records are published failed:<ul>{0}</ul>".format("".join(["<li>{0} : {1}</li>".format(o[0], o[1]) for o in failed_objects])))) else: messages.success(request, "All selected records are published successfully") except Exception as e: traceback.print_exc() messages.warning(request, str(e))
def custom_delete_selected(self, request, queryset): if request.POST.get('post') != 'yes': #the confirm page, or user not confirmed return self.default_delete_action[0](self, request, queryset) #user confirm to delete the publishes, execute the custom delete logic. result = None failed_objects = [] for record in queryset: try: record.delete() except: error = sys.exc_info() failed_objects.append((record.identifier, traceback.format_exception_only(error[0], error[1]))) #remove failed, continue to process the next publish continue if failed_objects: messages.warning(request, mark_safe("Some selected records are deleted failed:<ul>{0}</ul>".format("".join(["<li>{0} : {1}</li>".format(o[0], o[1]) for o in failed_objects])))) else: messages.success(request, "All selected records are deleted successfully")
def handle_line_bulk(self, request, ids): study = self.get_object() total = len(ids) saved = 0 for value in ids: line = self._get_line(value) if line: form = LineForm(request.POST, instance=line, prefix='line', study=study) form.check_bulk_edit() # removes fields having disabled bulk edit checkbox if form.is_valid(): form.save() saved += 1 else: for error in form.errors.values(): messages.warning(request, error) messages.success( request, _('Saved %(saved)s of %(total)s Lines') % { 'saved': saved, 'total': total, } ) return True
def _handle_permission_update(self, permission_def): ptype = permission_def.get('type', None) kwargs = dict(study=self.object) defaults = dict(permission_type=ptype) if 'group' in permission_def: kwargs.update(group_id=permission_def['group'].get('id', 0)) manager = self.object.grouppermission_set elif 'user' in permission_def: kwargs.update(user_id=permission_def['user'].get('id', 0)) manager = self.object.userpermission_set elif 'public' in permission_def: manager = self.object.everyonepermission_set if manager is None or ptype is None: logger.warning('Invalid permission type for add') elif ptype == StudyPermission.NONE: manager.filter(**kwargs).delete() else: kwargs.update(defaults=defaults) manager.update_or_create(**kwargs) # /study/<study_id>/import/
def password_change(request): """ Render Pontifex password change scheme """ td = { "user": request.user, } if request.method == "POST": form = PasswordChangeForm(user=request.user, data=request.POST) td["form"] = form if form.is_valid(): form.save() messages.success(request, "Password successfully changed.") return redirect("/users/profile/") else: for field, error_list in form.errors.iteritems(): for msg in error_list: messages.warning(request, msg) return render(request, "users/password_change.html", td) else: form = PasswordChangeForm(request.user) td["form"] = form return render(request, "users/password_change.html", td)
def password_reset(request): """ Render Pontifex password reset scheme """ td = {} if request.method == "POST": form = PasswordResetForm(request.POST) if not form.is_valid(): show_form_field_errors(request, form) return render(request, "users/password_reset.html", td) elif not is_email(form.cleaned_data.get("email")): messages.warning(request, "Please provide a valid email address.") return render(request, "users/password_reset.html", td) else: form.save() messages.success(request, "Success! We sent an email with a new password to the Pontifex user at %s." % form.cleaned_data.get("email")) return redirect("/") else: td["form"] = PasswordResetForm() return render(request, "users/password_reset.html", td)
def userkey_required(): """ Decorator for views which require that the user has an active UserKey (typically for encryption/decryption of Secrets). """ def _decorator(view): def wrapped_view(request, *args, **kwargs): try: uk = UserKey.objects.get(user=request.user) except UserKey.DoesNotExist: messages.warning(request, "This operation requires an active user key, but you don't have one.") return redirect('user:userkey') if not uk.is_active(): messages.warning(request, "This operation is not available. Your user key has not been activated.") return redirect('user:userkey') return view(request, *args, **kwargs) return wrapped_view return _decorator
def search_result(request): if request.GET.get('keyword_list'): # note: ?? Key? ??? default ? ?? (None), KeyError ?? ?? keyword = request.GET['keyword_list'] books = Book.objects.filter(title__icontains=keyword) return render(request, 'books/book_list.html', {'books': books}) if request.GET.get('keyword_wish'): keyword = request.GET['keyword_wish'] books = get_book_info(keyword, display='5') return render(request, 'books/wish_book.html', {'books': books}) if request.GET.get('keyword_register'): keyword = request.GET['keyword_register'] books = get_book_info(keyword, display='5') return render(request, 'books/register.html', {'books': books}) messages.warning(request, '???? ??????.') return redirect('books:list')
def settings(request, setting): template_name = "accounts/settings.html" template = "accounts/security/_default.html" context = {} if setting == "profile": form = EditUserForm(instance=request.user) context['form'] = form template = "accounts/security/_" + setting + ".html" elif setting == "changePassword": form = PasswordChangeForm(request.user) context['form'] = form template = "accounts/security/_" + setting + ".html" elif setting == "removeProfile": form = RemoveProfileForm() context['form'] = form messages.warning( request, "Are you sure that you want to delete your account? Deleting the account is irreversible!" ) template = "accounts/security/_" + setting + ".html" else: messages.error(request, "Error 404, area does not exist") context['template'] = template context['setting'] = setting return render(request, template_name, context)
def get_context_data(self, *args, **kwargs): context_data = super(DashboardDetailView, self).get_context_data(*args, **kwargs) client = get_dlogr_client(auth_token=self.request.session['user']['auth_token']) object_type = kwargs['object_type'] object_id = kwargs['object_id'] try: response = client.Event.list(object_type=object_type, object_id=object_id).json() context_data['events'] = response context_data['api_ok'] = True if response['count']: context_data['has_results'] = True context_data['human_identifier'] = response['results'][0]['human_identifier'] else: messages.warning(self.request, 'No events found for this object.') context_data['has_results'] = False context_data['human_identifier'] = object_id except DlogrAPIError: messages.error(self.request, APIInternalError.message) context_data['api_ok'] = False return context_data
def handle(self): try: connection = Connection.objects.get(id=self.conn_id).subclass() vici_wrapper = ViciWrapper() logs = None if hasattr(self, 'sa_id'): logs = vici_wrapper.terminate_ike_sa(self.sa_id) elif hasattr(self, 'child_sa_id'): logs = vici_wrapper.terminate_child_sa(self.child_sa_id) for log in logs: LogMessage(connection=connection, message=log['message']).save() if hasattr(self, 'sa_id'): messages.info(self.request, "Ike SA terminated.") elif hasattr(self, 'child_sa_id'): messages.info(self.request, "Child SA terminated.") except ViciException as e: messages.warning(self.request, str(e)) return HttpResponseRedirect(reverse("server_connections:index"))
def auth_login(request): form = LoginForm(request.POST or None) next_url = request.GET.get('next') if next_url is None: next_url = "/" if form.is_valid(): email = form.cleaned_data['email'] password = form.cleaned_data['password'] user = authenticate(email=email, password=password) if user is not None: messages.success(request, 'Success! Welcome, '+(user.first_name or "")) login(request, user) return redirect(next_url) else: messages.warning(request, 'Invalid username or password.') context = { "form": form, "page_name" : "Login", "button_value" : "Login", "links" : ["register"], } return render(request, 'auth_form.html', context)
def invitation_send(request, poll_url): current_poll = get_object_or_404(Poll, url=poll_url) user_error = "" if not current_poll.can_edit(request.user, request): messages.error( request, _("You are not allowed to edit this Poll") ) else: receivers = request.POST.get('invite', None).split() for receiver in receivers: try: user = BitpollUser.objects.get(username=receiver) invitation = Invitation(user=user, poll=current_poll, date_created=now(), creator=request.user, vote=None) invitation.save() invitation.send(request) except IntegrityError: messages.warning(request, _("The user {} was already invited".format(receiver))) except ObjectDoesNotExist: try: group = Group.objects.get(name=receiver) for group_user in group.user_set: try: invitation = Invitation(user=group_user, poll=current_poll, date_created=now(), creator=request.user, vote=None) invitation.save() invitation.send(request) except IntegrityError: # One user is already invited, ignore it pass except ObjectDoesNotExist: user_error += " '{}'".format(receiver) if user_error: messages.error( request, _("The following User/Groups could not be found: {}".format(user_error)) ) return redirect('invitations', current_poll.url)
def vote_assign(request, poll_url, vote_id): current_poll = get_object_or_404(Poll, url=poll_url) current_vote = get_object_or_404(Vote, id=vote_id) if request.method == 'POST': if request.user.is_authenticated and current_vote.can_edit(request.user): username = request.POST.get('username').strip() try: user = BitpollUser.objects.get(username=username) if not current_poll.vote_set.filter(Q(user=user)) and current_poll.one_vote_per_user: current_vote.user = user current_vote.name = user.get_displayname() current_vote.assigned_by = request.user current_vote.save() return redirect('poll', poll_url) else: messages.info(request, _("This user has already voted and only one vote per user is permitted")) except ObjectDoesNotExist: messages.warning(request, _("The user {} does not exists".format(username))) else: return HttpResponseForbidden() return TemplateResponse(request, 'poll/vote_assign.html', { 'poll': current_poll, 'vote': current_vote })
def require_login(request): """Display warning message if user not authenticated. If the user manages to trigger a lendable view in an inactive session display a useful message and redirect to the index page. """ messages.warning(request, 'Permission denied: You must log in.') return redirect(reverse('library:index'))
def check_product_availability_and_warn(request, cart): if contains_unavailable_variants(cart): msg = pgettext_lazy( 'Cart warning message', 'Sorry. We don\'t have that many items in stock. ' 'Quantity was set to maximum available for now.') messages.warning(request, msg) remove_unavailable_variants(cart)
def handle_order_placement(request, checkout): """Try to create an order and redirect the user as necessary. This is a helper function. """ order, redirect = create_order(checkout) if not order: msg = pgettext('Checkout warning', 'Please review your checkout.') messages.warning(request, msg) return redirect
def delete_semantic_domain(request, domain): sd = SemanticDomain.objects.get(name=domain) sd.delete() messages.warning(request, "Semantic domain '%s' deleted" % domain) return HttpResponseRedirect(reverse("view-domains"))
def redirect_lexeme_citation(request, lexeme_id): """From a lexeme, redirect to the first citation""" lexeme = Lexeme.objects.get(id=lexeme_id) try: first_citation = lexeme.lexemecitation_set.all()[0] return HttpResponseRedirect(redirect("lexeme-citation-detail", args=[first_citation.id])) except IndexError: msg = "Operation failed: this lexeme has no citations" messages.warning(request, msg) return HttpResponseRedirect(lexeme.get_absolute_url()) # -- /cognate/ ------------------------------------------------------------
def _calculate_ratings_count(sites): # TODO: use ordered dict and sort by rating ordering # for now, frontend template can just use static ordering of all available ratings ratings_count = dict(Counter(site.evaluated.rating.rating for site in sites)) for rating in ('good', 'bad', 'warning', 'critical', 'neutral'): if rating not in ratings_count: ratings_count[rating] = 0 return ratings_count
def scan_site(request: HttpRequest, site_id: Union[int, None] = None) -> HttpResponse: """Schedule the scan of a site.""" if site_id: site = get_object_or_404( Site.objects.annotate_most_recent_scan_start() \ .annotate_most_recent_scan_end_or_null(), pk=site_id) else: # no site_id supplied form = SingleSiteForm(request.POST) if form.is_valid(): site, created = Site.objects.annotate_most_recent_scan_start() \ .annotate_most_recent_scan_end_or_null().get_or_create( url=form.cleaned_data.get('url')) if created: site.last_scan__end_or_null = None site.last_scan__start = None else: return render(request, 'frontend/create_site.html', { 'form': form, }) status_code = site.scan() if status_code == Site.SCAN_OK: if not site_id: # if the site is new we want to show the dog return redirect(reverse('frontend:scan_site_created', args=(site.pk,))) else: num_scanning_sites = Scan.objects.filter(end__isnull=True).count() messages.success(request, _("A scan of the site has been scheduled. "+ \ "The total number of sites in the scanning queue "+ \ "is %i (including yours)." % num_scanning_sites)) return redirect(reverse('frontend:view_site', args=(site.pk,))) elif status_code == Site.SCAN_COOLDOWN: messages.warning(request, _('The site is already scheduled for scanning or it has been scanned recently. No scan was scheduled.')) elif status_code == Site.SCAN_BLACKLISTED: messages.warning(request, _('The operator of this website requested to be blacklisted, scanning this website is not possible, sorry.')) return redirect(reverse('frontend:view_site', args=(site.pk,)))
def save_model(self, request, obj, form, change): super(ReleaseAdmin, self).save_model(request, obj, form, change) if not preferences.Setting.active_release: messages.warning(request, mark_safe(_( "There is no active release. " + "<a href=\"" + reverse("set_default_release", args=[obj.id]) + "\">" + "Set current release as active release." + "</a>" )))
def delete(self, request, *args, **kwargs): self.object = self.get_object() if not self.object.is_owner or \ self.object.is_owner and self.get_queryset().can_delete(): return super().delete(request, *args, **kwargs) messages.warning(self.request, self.owner_needed_message) return self.json_to_response()
def reset_password(request): if request.method == 'POST': reset_token = request.POST.get('reset_token') try: userprofile = UserProfile.objects.get(reset_token = reset_token) if timezone.now() > userprofile.reset_token_expiration: # Reset the token and move on userprofile.reset_token_expiration = timezone.now() userprofile.reset_token = '' userprofile.save() return redirect('/taskManager/') except UserProfile.DoesNotExist: messages.warning(request, 'Invalid password reset token') return render(request, 'taskManager/reset_password.html') new_password = request.POST.get('new_password') confirm_password = request.POST.get('confirm_password') if new_password != confirm_password: messages.warning(request, 'Passwords do not match') return render(request, 'taskManager/reset_password.html') # Reset the user's password + remove the tokens userprofile.user.set_password(new_password) userprofile.reset_token = '' userprofile.reset_token_expiration = timezone.now() userprofile.user.save() userprofile.save() messages.success(request, 'Password has been successfully reset') return redirect('/taskManager/login') return render(request, 'taskManager/reset_password.html') # Vuln: Username Enumeration
def forgot_password(request): if request.method == 'POST': t_email = request.POST.get('email') try: reset_user = User.objects.get(email=t_email) # Generate secure random 6 digit number res = "" nums = [x for x in os.urandom(6)] for x in nums: res = res + str(x) reset_token = res[:6] reset_user.userprofile.reset_token = reset_token reset_user.userprofile.reset_token_expiration = timezone.now() + datetime.timedelta(minutes=10) reset_user.userprofile.save() reset_user.save() reset_user.email_user( "Reset your password", "You can reset your password at /taskManager/reset_password/. Use \"{}\" as your token. This link will only work for 10 minutes.".format(reset_token)) messages.success(request, 'Check your email for a reset token') return redirect('/taskManager/reset_password') except User.DoesNotExist: messages.warning(request, 'Check your email for a reset token') return render(request, 'taskManager/forgot_password.html') # A8: Cross Site Request Forgery (CSRF)
def clear_logs(request): """Clear admin activity logs if user has permissions""" if not request.user.is_authenticated(): # should be applied to anything under /console return redirect('login') if request.user.has_perm('admin.delete_logentry'): LogEntry.objects.all().filter(user__pk=request.user.id).delete() messages.info(request, 'Successfully cleared admin activity logs.', fail_silently=True) else: messages.warning(request, 'Unable to clear the admin activity logs.', fail_silently=True) return redirect('admin:index')
def flash_warnings(request, *warning_messages): for warning in warning_messages: messages.warning(request, warning)
def dispatch(self, request, *args, **kwargs): super().dispatch(request, *args, **kwargs) submission = self.get_object() speaker = User.objects.get(nick__iexact=request.GET.get('nick')) if submission in speaker.submissions.all(): speaker.submissions.remove(submission) speaker.save(update_fields=['submissions']) submission.log_action('pretalx.submission.speakers.remove', person=request.user, orga=True) messages.success(request, _('The speaker has been removed from the submission.')) else: messages.warning(request, _('The speaker was not part of this submission.')) return redirect(submission.orga_urls.speakers)