我们从Python开源项目中,提取了以下44个代码示例,用于说明如何使用django.http.response.HttpResponseForbidden()。
def task_file(request, contest_id, file_id): contest = get_object_or_404(models.TaskBasedContest, pk=contest_id) if not contest.is_visible_in_list and not request.user.is_staff: return HttpResponseNotFound() file = get_object_or_404(tasks_models.TaskFile, pk=file_id) if not contest.has_task(file.task): return HttpResponseNotFound() if not contest.is_started() and not request.user.is_staff: return HttpResponseForbidden('Contest is not started') participant = contest.get_participant_for_user(request.user) if not is_task_open(contest, file.task, participant) and not request.user.is_staff: return HttpResponseForbidden('Task is closed') if file.participant is not None and file.participant.id != request.user.id: return HttpResponseForbidden() file_path = file.get_path_abspath() return respond_as_attachment(request, file_path, file.name, file.content_type)
def get(self, request, **kwargs): form = SearchForm(request.GET) if not form.is_valid(): return HttpResponseForbidden() query = form.cleaned_data.get('q') paginator = SearchPaginator( request, query=query, per_page=20, page_neighbors=1, side_neighbors=1, ) # SEO seo = Seo() seo.title = _('Search results') seo.save(request) return self.render_to_response({ 'form': form, 'title': _('Search by «%s»') % query, 'paginator': paginator, })
def tutorial_message(request, pk): tutorial = get_object_or_404(PyConTutorialProposal, pk=pk) presentation = Presentation.objects.get(proposal_base=tutorial) if not request.user.is_staff: if not is_attendee_or_speaker(request.user, presentation): return HttpResponseForbidden(_(u"Not authorized for this page")) message_form = TutorialMessageForm() if request.method == 'POST': message = PyConTutorialMessage(user=request.user, tutorial=tutorial) message_form = TutorialMessageForm(request.POST, instance=message) if message_form.is_valid(): message = message_form.save() context = email_context(request, tutorial, message) sender_email = request.user.email speakers = [x.email for x in tutorial.speakers() if x.email != sender_email] attendees = [x.email for x in tutorial.registrants.all() if x.email != sender_email] recipients = speakers + attendees # Send new message notice to speakers/attendees send_email_message("message", from_=settings.DEFAULT_FROM_EMAIL, to=[request.user.email], bcc=recipients, context=context) messages.add_message(request, messages.INFO, _(u"Message sent")) url = reverse('schedule_presentation_detail', args=[presentation.pk]) return redirect(url) return render(request, "tutorials/message.html", { 'presentation': presentation, 'form': message_form })
def _process_individual_form(self, form_name, form_classes): forms = self.get_forms(form_classes, (form_name,)) form = forms.get(form_name) if not form: return HttpResponseForbidden() elif form.is_valid(): return self.forms_valid(forms, form_name) else: return self.forms_invalid(forms)
def dispatch(self, *args, **kwargs): if not self.request.user.has_perm('clips.add_clip'): return HttpResponseForbidden() return super(ClipCreateView, self).dispatch(*args, **kwargs)
def dispatch(self, *args, **kwargs): if not self.request.user.has_perm('clips.change_clip'): return HttpResponseForbidden() return super(ClipUpdateView, self).dispatch(*args, **kwargs)
def dispatch(self, *args, **kwargs): if not self.request.user.has_perm('clips.delete_clip'): return HttpResponseForbidden() return super(ClipDeleteView, self).dispatch(*args, **kwargs)
def get(self, request): username = request.user.get_username() filename = request.GET['filename'] signature = request.GET['signature'] if not check_signature(signature, filename, username): return HttpResponseForbidden() return HttpResponseRedirect( redirect_to=self.storage.cloud_front_url(filename) )
def post(self, request, *args, **kwargs): # Check if the secret key matches if request.META.get('HTTP_AUTH_SECRET') != 'supersecretkey': return HttpResponseForbidden('Auth key incorrect') form_class = modelform_factory(DataPoint, fields=['node_name', 'data_type', 'data_value']) form = form_class(request.POST) if form.is_valid(): form.save() return HttpResponse() else: return HttpResponseBadRequest()
def dispatch(self, request, *args, **kwargs): user = request.user if Blog.objects.filter(owner=user).exists(): return HttpResponseForbidden ('You can not create more than one blogs per account') else: return super(NewBlogView, self).dispatch(request, *args, **kwargs)
def get(self, request, post_pk, blog_pk): blog_post = BlogPost.objects.get(pk=post_pk) if blog_post.blog.owner != request.user: return HttpResponseForbidden('You can only share posts that you created') blog = Blog.objects.get(pk=blog_pk) blog_post.shared_to.add(blog) return HttpResponseRedirect(reverse('home'))
def get(self, request, post_pk, blog_pk): blog_post = BlogPost.objects.get(pk=post_pk) if blog_post.blog.owner != request.user: return HttpResponseForbidden('You can only stop sharing posts that you created') blog = Blog.objects.get(pk=blog_pk) blog_post.shared_to.remove(blog) return HttpResponseRedirect(reverse('home'))
def dispatch(self, request, *args, **kwargs): obj = self.get_object() if obj.author != self.request.user: #add a message here (see DeleteView) #add a redirect instead of ResponseForbidden #this mixin only works with Author return HttpResponseForbidden() return super(UserRequiredMixin, self).dispatch(request, *args, **kwargs)
def detail(request, pk): mail = get_object_or_404(Mail, pk=pk) can_read = mail.can_read(request) if can_read == (True, None): mail.read() return render(request, 'web/detail.html', { 'mail': mail, 'recipient': mail.recipient }) elif can_read == (False, {CannotReadReasons.secret_code}): return render(request, 'web/secretcode_form.html', { 'mail': mail, 'recipient': mail.recipient }) else: return HttpResponseForbidden()
def dispatch(self, request, *args, **kwargs): ''' Can only add contract types in own gym ''' if not request.user.is_authenticated(): return HttpResponseForbidden() if request.user.userprofile.gym_id != int(self.kwargs['gym_pk']): return HttpResponseForbidden() return super(AddView, self).dispatch(request, *args, **kwargs)
def dispatch(self, request, *args, **kwargs): ''' Can only add contract types in own gym ''' if not request.user.is_authenticated(): return HttpResponseForbidden() contract_type = self.get_object() if request.user.userprofile.gym_id != contract_type.gym_id: return HttpResponseForbidden() return super(UpdateView, self).dispatch(request, *args, **kwargs)
def dispatch(self, request, *args, **kwargs): ''' Can only add contract types in own gym ''' if not request.user.is_authenticated(): return HttpResponseForbidden() contract_type = self.get_object() if request.user.userprofile.gym_id != contract_type.gym_id: return HttpResponseForbidden() return super(DeleteView, self).dispatch(request, *args, **kwargs)
def dispatch(self, request, *args, **kwargs): ''' Can only list contract types in own gym ''' if not request.user.is_authenticated(): return HttpResponseForbidden() self.gym = get_object_or_404(Gym, id=self.kwargs['gym_pk']) if request.user.userprofile.gym_id != self.gym.id: return HttpResponseForbidden() return super(ListView, self).dispatch(request, *args, **kwargs)
def dispatch(self, request, *args, **kwargs): ''' Can only add contract option in own gym ''' if not request.user.is_authenticated(): return HttpResponseForbidden() contract_type = self.get_object() if request.user.userprofile.gym_id != contract_type.gym_id: return HttpResponseForbidden() return super(DeleteView, self).dispatch(request, *args, **kwargs)
def dispatch(self, request, *args, **kwargs): ''' Can only add documents to users in own gym ''' if not request.user.is_authenticated(): return HttpResponseForbidden() user = get_object_or_404(User, pk=self.kwargs['user_pk']) self.member = user if user.userprofile.gym_id != request.user.userprofile.gym_id: return HttpResponseForbidden() return super(AddView, self).dispatch(request, *args, **kwargs)
def dispatch(self, request, *args, **kwargs): ''' Can only see contracts for own gym ''' if not request.user.is_authenticated(): return HttpResponseForbidden() contract = self.get_object() if contract.member.userprofile.gym_id != request.user.userprofile.gym_id: return HttpResponseForbidden() return super(DetailView, self).dispatch(request, *args, **kwargs)
def dispatch(self, request, *args, **kwargs): ''' Only trainers for this gym can edit user notes ''' if not request.user.is_authenticated(): return HttpResponseForbidden() contract = self.get_object() if contract.member.userprofile.gym_id != request.user.userprofile.gym_id: return HttpResponseForbidden() return super(UpdateView, self).dispatch(request, *args, **kwargs)
def dispatch(self, request, *args, **kwargs): ''' Only managers for this gym can add new members ''' if request.user.has_perm('gym.change_gymconfig'): gym_id = request.user.userprofile.gym_id if gym_id != int(self.kwargs['pk']): return HttpResponseForbidden() return super(GymConfigUpdateView, self).dispatch(request, *args, **kwargs)
def dispatch(self, request, *args, **kwargs): ''' Can only add notes to users in own gym ''' if not request.user.is_authenticated(): return HttpResponseForbidden() user = User.objects.get(pk=self.kwargs['user_pk']) self.member = user if user.userprofile.gym_id != request.user.userprofile.gym_id: return HttpResponseForbidden() return super(ListView, self).dispatch(request, *args, **kwargs)
def dispatch(self, request, *args, **kwargs): ''' Can only add notes to users in own gym ''' if not request.user.is_authenticated(): return HttpResponseForbidden() user = User.objects.get(pk=self.kwargs['user_pk']) self.member = user gym_id = user.userprofile.gym_id if gym_id != request.user.userprofile.gym_id: return HttpResponseForbidden() return super(AddView, self).dispatch(request, *args, **kwargs)
def dispatch(self, request, *args, **kwargs): ''' Only trainers for this gym can edit user notes ''' if not request.user.is_authenticated(): return HttpResponseForbidden() note = self.get_object() if note.member.userprofile.gym_id != request.user.userprofile.gym_id: return HttpResponseForbidden() return super(UpdateView, self).dispatch(request, *args, **kwargs)
def dispatch(self, request, *args, **kwargs): ''' Only trainers for this gym can delete user notes ''' if not request.user.is_authenticated(): return HttpResponseForbidden() note = self.get_object() if note.member.userprofile.gym_id != request.user.userprofile.gym_id: return HttpResponseForbidden() return super(DeleteView, self).dispatch(request, *args, **kwargs)
def dispatch(self, request, *args, **kwargs): ''' Can only add documents to users in own gym ''' if not request.user.is_authenticated(): return HttpResponseForbidden() user = User.objects.get(pk=self.kwargs['user_pk']) self.member = user if user.userprofile.gym_id != request.user.userprofile.gym_id: return HttpResponseForbidden() return super(AddView, self).dispatch(request, *args, **kwargs)
def dispatch(self, request, *args, **kwargs): ''' Only trainers for this gym can delete documents ''' if not request.user.is_authenticated(): return HttpResponseForbidden() note = self.get_object() if note.member.userprofile.gym_id != request.user.userprofile.gym_id: return HttpResponseForbidden() return super(DeleteView, self).dispatch(request, *args, **kwargs)
def _process_individual_form(self, form_name, form_classes): """ Perform the is_valid() check for a single form. """ forms = self.get_forms(form_classes, (form_name,)) form = forms.get(form_name) if not form: # there is no form with this name registered in the form_classes dictionary return HttpResponseForbidden() elif form.is_valid(): # the form is valid -> call the from valid method return self.forms_valid(forms, form_name) else: # call the form invalid method return self.forms_invalid(forms, form_name)
def tutorial_email(request, pk, pks): presentation = get_object_or_404(Presentation, pk=pk) if not request.user.is_staff: if not is_attendee_or_speaker(request.user, presentation): return HttpResponseForbidden(_(u"Not authorized for this page")) pks = pks.split(",") user_model = get_user_model() recipients = user_model.objects.filter(pk__in=pks) emails = recipients.values_list('email', flat=True) from_speaker = False if hasattr(request.user, 'speaker_profile'): from_speaker = request.user.speaker_profile in presentation.speakers() form = BulkEmailForm() if request.method == 'POST': form = BulkEmailForm(request.POST) if form.is_valid(): subject = form.cleaned_data['subject'] body = form.cleaned_data['body'] context = email_context( request, presentation.proposal, body, subject=subject) try: # Send Email to each recipient separately, send_email_message("direct_email", from_=settings.DEFAULT_FROM_EMAIL, to=[], bcc=emails, context=context, headers={'Reply-To': request.user.email}) except SMTPException: log.exception("ERROR sending Tutorial emails") messages.add_message(request, messages.ERROR, _(u"There was some error sending emails, " u"not all of them might have made it")) else: messages.add_message(request, messages.INFO, _(u"Email(s) sent")) url = reverse( 'schedule_presentation_detail', args=[presentation.pk] ) return redirect(url) ctx = { 'presentation': presentation, 'form': form, 'users': recipients, 'from_speaker': from_speaker } return render(request, "tutorials/email.html", ctx)
def finaid_review(request, pks=None): """Starting view for reviewers - list the applications""" # On a POST the pks are in the form. # On a GET there might be pks in the URL. if not is_reviewer(request.user): return HttpResponseForbidden(_(u"Not authorized for this page")) if request.method == 'POST': # They want to do something to bulk applicants # Find the checkboxes they checked regex = re.compile(r'^finaid_application_(.*)$') pk_list = [] for field_name in request.POST: m = regex.match(field_name) if m: pk_list.append(m.group(1)) if not pk_list: messages.add_message( request, messages.ERROR, _(u"Please select at least one application")) return redirect(request.path) if 'email_action' in request.POST: # They want to email applicants pks = ",".join(pk_list) return redirect('finaid_email', pks=pks) elif 'message_action' in request.POST: # They want to attach a message to applications pks = ",".join(pk_list) return redirect('finaid_message', pks=pks) elif 'status_action' in request.POST: # They want to change applications' statuses applications = FinancialAidApplication.objects.filter(pk__in=pk_list)\ .select_related('review') status = int(request.POST['status']) count = 0 for application in applications: try: review = application.review except FinancialAidReviewData.DoesNotExist: review = FinancialAidReviewData(application=application) if review.status != status: review.status = status review.save() count += 1 messages.info(request, "Updated %d application status%s" % (count, "" if count == 1 else "es")) pks = ",".join(pk_list) return redirect(reverse('finaid_review', kwargs=dict(pks=pks))) else: messages.error(request, "WHAT?") else: # GET - pks are in the URL. maybe. pk_list = pks.split(",") if pks else [] return render(request, "finaid/application_list.html", { "applications": FinancialAidApplication.objects.all().select_related('review'), "status_options": STATUS_CHOICES, "pks": [int(pk) for pk in pk_list], })
def finaid_message(request, pks): """Add a message to some applications""" if not is_reviewer(request.user): return HttpResponseForbidden(_(u"Not authorized for this page")) applications = FinancialAidApplication.objects.filter(pk__in=pks.split(","))\ .select_related('user') if not applications.exists(): messages.add_message(request, messages.ERROR, _(u"No applications selected")) return redirect('finaid_review') if request.method == 'POST': for application in applications: message = FinancialAidMessage(user=request.user, application=application) message_form = ReviewerMessageForm(request.POST, instance=message) if message_form.is_valid(): message = message_form.save() # Send notice to reviewers/pycon-aid alias, and the applicant if visible context = email_context(request, application, message) send_email_message("reviewer/message", # From whoever is logged in clicking the buttons from_=request.user.email, to=[email_address()], context=context, headers={'Reply-To': email_address()} ) # If visible to applicant, notify them as well if message.visible: send_email_message("applicant/message", from_=request.user.email, to=[application.user.email], context=context, headers={'Reply-To': email_address()} ) messages.add_message(request, messages.INFO, _(u"Messages sent")) return redirect(reverse('finaid_review', kwargs=dict(pks=pks))) else: message_form = ReviewerMessageForm() return render(request, "finaid/reviewer_message.html", { 'applications': applications, 'form': message_form, })
def finaid_email(request, pks): if not is_reviewer(request.user): return HttpResponseForbidden(_(u"Not authorized for this page")) applications = FinancialAidApplication.objects.filter(pk__in=pks.split(","))\ .select_related('user') emails = [app.user.email for app in applications] form = None if request.method == 'POST': form = BulkEmailForm(request.POST) if form.is_valid(): subject = form.cleaned_data['subject'] from_email = email_address() template_text = form.cleaned_data['template'].template template = Template(template_text) # emails will be the datatuple arg to send_mail_mail emails = [] for application in applications: try: review = application.review except FinancialAidReviewData.DoesNotExist: review = None ctx = { 'application': application, 'review': review, } text = template.render(Context(ctx)) emails.append((subject, text, from_email, [application.user.email])) try: send_mass_mail(emails) except SMTPException: log.exception("ERROR sending financial aid emails") messages.add_message(request, messages.ERROR, _(u"There was some error sending emails, " u"not all of them might have made it")) else: messages.add_message(request, messages.INFO, _(u"Emails sent")) return redirect(reverse('finaid_review', kwargs=dict(pks=pks))) ctx = { 'form': form or BulkEmailForm(), 'users': [app.user for app in applications] } return render(request, "finaid/email.html", ctx)
def get_blocks(request): if not request.is_ajax(): return HttpResponseForbidden() block_ids = request.GET.get('block_ids') if not block_ids: return JsonResponse({}) try: cid = int(request.GET.get('cid')) oid = int(request.GET.get('oid')) except (TypeError, ValueError): instance = None else: try: ct = ContentType.objects.get(pk=cid) except ContentType.DoesNotExist: return JsonResponse({}) ct_model = ct.model_class() try: instance = ct_model.objects.get(pk=oid) except ct_model.DoesNotExists: return JsonResponse({}) result = {} for block_id in block_ids.split(','): try: block_id = int(block_id) except (TypeError, ValueError): continue block_ct_id = AttachableBlock.objects.filter(pk=block_id).values_list('content_type', flat=True).first() block_model = get_model_by_ct(block_ct_id) block = block_model.objects.get(pk=block_id) if not block.visible: continue block_view = get_block_view(block) if not block_view: continue result[block_id] = block_view(RequestContext(request, { 'request': request, }), block, instance=instance) return JsonResponse(result)
def post(self, request, *args, **kwargs): """ Handle a XML-RPC or JSON-RPC request. :param request: Incoming request :param args: Additional arguments :param kwargs: Additional named arguments :return: A HttpResponse containing XML-RPC or JSON-RPC response, depending on the incoming request """ logger.debug('RPC request received...') for handler_cls in self.get_handler_classes(): handler = handler_cls(request, self.entry_point) try: if not handler.can_handle(): continue logger.debug('Request will be handled by {}'.format(handler_cls.__name__)) result = handler.process_request() return handler.result_success(result) except AuthenticationFailed as e: # Customize HttpResponse instance used when AuthenticationFailed was raised logger.warning(e) return handler.result_error(e, HttpResponseForbidden) except RPCException as e: logger.warning('RPC exception: {}'.format(e), exc_info=settings.MODERNRPC_LOG_EXCEPTIONS) return handler.result_error(e) except Exception as e: logger.error('Exception raised from a RPC method: "{}"'.format(e), exc_info=settings.MODERNRPC_LOG_EXCEPTIONS) return handler.result_error(RPCInternalError(str(e))) logger.error('Unable to handle incoming request.') return HttpResponse('Unable to handle your request. Please ensure you called the right entry point. If not, ' 'this could be a server error.')
def post(self, request: HttpRequest, *args: Any, **kwargs: Any) -> HttpResponse: # POST received: # <QueryDict: { # 'client_id': ['docker'], # 'refresh_token': ['boink'], # 'service': ['registry.maurusnet.test'], # 'scope': ['repository:dev/destalinator:push,pull'], # 'grant_type': ['refresh_token']}> if "refresh_token" in request.POST and request.POST["grant_type"] == "refresh_token": tr = _tkr_parse(request.POST) if tr.scope: tp = TokenPermissions.parse_scope(tr.scope) else: return HttpResponseBadRequest("Can't issue access token without valid scope (scope=%s)", tr.scope) try: client = DockerRegistry.objects.get(client_id=tr.service) # type: DockerRegistry except DockerRegistry.DoesNotExist: return HttpResponseNotFound("No such registry/client from refresh token(%s)" % str(tr)) user = self._user_from_refresh_token(request.POST["refresh_token"], client.public_key_pem(), expected_issuer=request.get_host(), expected_audience=tr.service) if user: try: drepo = DockerRepo.objects.get(name=tp.path, registry_id=client.id) except DockerRepo.DoesNotExist: if settings.DOCKERAUTH_ALLOW_UNCONFIGURED_REPOS: drepo = DockerRepo() drepo.name = tp.path drepo.registry = client drepo.unauthenticated_read = True drepo.unauthenticated_write = True else: return HttpResponseNotFound("No such repo '%s'" % tp.path) if drepo.registry.has_access(user, tp) or drepo.has_access(user, tp): rightnow = datetime.datetime.now(tz=pytz.UTC) return HttpResponse(content=json.dumps({ "access_token": self._create_jwt( self._make_access_token(request, tr, rightnow, tp, user), client.private_key_pem(), ), "scope": tr.scope, "expires_in": 119, "refresh_token": self._create_jwt( self._make_refresh_token(request, tr, rightnow, user), client.private_key_pem(), ) }), status=200, content_type="application/json") else: return HttpResponseForbidden("User %s doesn't have access to repo %s" % (user.pk, tp.path)) else: return HttpResponse("Unauthorized", status=401) else: return HttpResponseBadRequest("POSTing to this endpoint requires a refresh_token")