我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用django.http.Http404()。
def flatpage(request, url): """ Public interface to the flat page view. Models: `flatpages.flatpages` Templates: Uses the template defined by the ``template_name`` field, or :template:`flatpages/default.html` if template_name is not defined. Context: flatpage `flatpages.flatpages` object """ if not url.startswith('/'): url = '/' + url site_id = get_current_site(request).id try: f = get_object_or_404(FlatPage, url=url, sites=site_id) except Http404: if not url.endswith('/') and settings.APPEND_SLASH: url += '/' f = get_object_or_404(FlatPage, url=url, sites=site_id) return HttpResponsePermanentRedirect('%s/' % request.path) else: raise return render_flatpage(request, f)
def private_document(request, document_key): """ This is temp code. Hopefully I will make this function a lot better """ PRIVATE_MEDIA_ROOT = settings.PRIVATE_MEDIA_ROOT #Now get the document location and return that to the user. document_results=documents.objects.get(pk=document_key) path = PRIVATE_MEDIA_ROOT + '/' + document_results.document.name #path = '/home/luke/Downloads/gog_gods_will_be_watching_2.1.0.9.sh' """ Serve private files to users with read permission. """ #logger.debug('Serving {0} to {1}'.format(path, request.user)) #if not permissions.has_read_permission(request, path): # if settings.DEBUG: # raise PermissionDenied # else: # raise Http404('File not found') return server.serve(request, path=path)
def serve(self, request, path): # the following code is largely borrowed from `django.views.static.serve` # and django-filetransfers: filetransfers.backends.default fullpath = os.path.join(settings.PRIVATE_MEDIA_ROOT, path) if not os.path.exists(fullpath): raise Http404('"{0}" does not exist'.format(fullpath)) # Respect the If-Modified-Since header. statobj = os.stat(fullpath) content_type = mimetypes.guess_type(fullpath)[0] or 'application/octet-stream' if not was_modified_since(request.META.get('HTTP_IF_MODIFIED_SINCE'), statobj[stat.ST_MTIME], statobj[stat.ST_SIZE]): return HttpResponseNotModified(content_type=content_type) response = HttpResponse(open(fullpath, 'rb').read(), content_type=content_type) response["Last-Modified"] = http_date(statobj[stat.ST_MTIME]) # filename = os.path.basename(path) # response['Content-Disposition'] = smart_str(u'attachment; filename={0}'.format(filename)) return response
def find_random_image(request): """ Randomly get an image and return the relative url """ item = KioskItem.objects.filter(active=True).order_by('?').first() if item is None: raise Http404("No active kiosk items found") response_data = { "id": item.id, "url": item.image.url, } return HttpResponse( json.dumps(response_data), content_type="application/json" )
def page(request, path): """Page processing view""" # ensure that path starts and ends with "/" if not path.startswith("/"): path = "/" + path # redirect to equivalent page with ending slash # if path doesn't end with slash and it's not a file name: if not path.endswith("/") and '.' not in path.split('/')[-1]: return http.HttpResponsePermanentRedirect(path + "/") matching_pages = Page.objects.all().filter(url=path) try: page_obj = matching_pages[0] except IndexError: raise http.Http404 page_processor = page_obj.get_page_processor() return page_processor.process_request(request)
def render(self, context): key = self.queryset_var.var value = self.queryset_var.resolve(context) if isinstance(self.paginate_by, int): paginate_by = self.paginate_by else: paginate_by = self.paginate_by.resolve(context) paginator = Paginator(value, paginate_by, self.orphans) try: page_obj = paginator.page(context['request'].page) except InvalidPage: if INVALID_PAGE_RAISES_404: raise Http404('Invalid page requested. If DEBUG were set to ' + 'False, an HTTP 404 page would have been shown instead.') context[key] = [] context['invalid_page'] = True return '' if self.context_var is not None: context[self.context_var] = page_obj.object_list else: context[key] = page_obj.object_list context['paginator'] = paginator context['page_obj'] = page_obj return ''
def author(self, request, author=None): """listing of posts by a specific author""" if not author: # Invalid author filter raise Http404('Invalid Author') posts = self.posts.filter( models.Q(owner__username=author) | models.Q(owner__username=unslugify(author))) return render(request, self.get_template(request), {'self': self, 'posts': self._paginate(request, posts), 'filter_type': 'author', 'filter': author})
def tag(self, request, tag=None): """listing of posts in a specific tag""" if not tag: # Invalid tag filter raise Http404('Invalid Tag') posts = self.posts.filter( models.Q(tags__name=tag) | models.Q(tags__name=unslugify(tag))) return render(request, self.get_template(request), {'self': self, 'posts': self._paginate(request, posts), 'filter_type': 'tag', 'filter': tag})
def serve(self, request, file_obj, **kwargs): fullpath = file_obj.path # the following code is largely borrowed from `django.views.static.serve` # and django-filetransfers: filetransfers.backends.default if not os.path.exists(fullpath): raise Http404('"%s" does not exist' % fullpath) # Respect the If-Modified-Since header. statobj = os.stat(fullpath) content_type_key = 'mimetype' if LTE_DJANGO_1_4 else 'content_type' response_params = {content_type_key: self.get_mimetype(fullpath)} if not was_modified_since(request.META.get('HTTP_IF_MODIFIED_SINCE'), statobj[stat.ST_MTIME], statobj[stat.ST_SIZE]): return HttpResponseNotModified(**response_params) response = HttpResponse(open(fullpath, 'rb').read(), **response_params) response["Last-Modified"] = http_date(statobj[stat.ST_MTIME]) self.default_headers(request=request, response=response, file_obj=file_obj, **kwargs) return response
def serve_protected_thumbnail(request, path): """ Serve protected thumbnails to authenticated users. If the user doesn't have read permissions, redirect to a static image. """ source_path = thumbnail_to_original_filename(path) if not source_path: raise Http404('File not found') try: file_obj = File.objects.get(file=source_path, is_public=False) except File.DoesNotExist: raise Http404('File not found') if not file_obj.has_read_permission(request): if settings.DEBUG: raise PermissionDenied else: raise Http404('File not found') try: thumbnail = ThumbnailFile(name=path, storage=file_obj.file.thumbnail_storage) return thumbnail_server.serve(request, thumbnail, save_as=False) except Exception: raise Http404('File not found')
def get_context_data(self, **kwargs): view = self.kwargs['view'] urlconf = urlresolvers.get_urlconf() if urlresolvers.get_resolver(urlconf)._is_callback(view): mod, func = urlresolvers.get_mod_func(view) view_func = getattr(import_module(mod), func) else: raise Http404 title, body, metadata = utils.parse_docstring(view_func.__doc__) if title: title = utils.parse_rst(title, 'view', _('view:') + view) if body: body = utils.parse_rst(body, 'view', _('view:') + view) for key in metadata: metadata[key] = utils.parse_rst(metadata[key], 'model', _('view:') + view) kwargs.update({ 'name': view, 'summary': title, 'body': body, 'meta': metadata, }) return super(ViewDetailView, self).get_context_data(**kwargs)
def serve(request, path, insecure=False, **kwargs): """ Serve static files below a given point in the directory structure or from locations inferred from the staticfiles finders. To use, put a URL pattern such as:: from django.contrib.staticfiles import views url(r'^(?P<path>.*)$', views.serve) in your URLconf. It uses the django.views.static.serve() view to serve the found files. """ if not settings.DEBUG and not insecure: raise Http404 normalized_path = posixpath.normpath(unquote(path)).lstrip('/') absolute_path = finders.find(normalized_path) if not absolute_path: if path.endswith('/') or path == '': raise Http404("Directory indexes are not allowed here.") raise Http404("'%s' could not be found" % path) document_root, path = os.path.split(absolute_path) return static.serve(request, path, document_root=document_root, **kwargs)
def feed(request, url, feed_dict=None): """Provided for backwards compatibility.""" if not feed_dict: raise Http404(_("No feeds are registered.")) slug = url.partition('/')[0] try: f = feed_dict[slug] except KeyError: raise Http404(_("Slug %r isn't registered.") % slug) instance = f() instance.feed_url = getattr(f, 'feed_url', None) or request.path instance.title_template = f.title_template or ('feeds/%s_title.html' % slug) instance.description_template = f.description_template or ('feeds/%s_description.html' % slug) return instance(request)
def paginate_queryset(self, queryset, page_size): """ Paginate the queryset, if needed. """ paginator = self.get_paginator( queryset, page_size, orphans=self.get_paginate_orphans(), allow_empty_first_page=self.get_allow_empty()) page_kwarg = self.page_kwarg page = self.kwargs.get(page_kwarg) or self.request.GET.get(page_kwarg) or 1 try: page_number = int(page) except ValueError: if page == 'last': page_number = paginator.num_pages else: raise Http404(_("Page is not 'last', nor can it be converted to an int.")) try: page = paginator.page(page_number) return (paginator, page, page.object_list, page.has_other_pages()) except InvalidPage as e: raise Http404(_('Invalid page (%(page_number)s): %(message)s') % { 'page_number': page_number, 'message': str(e) })
def get(self, request, *args, **kwargs): self.object_list = self.get_queryset() allow_empty = self.get_allow_empty() if not allow_empty: # When pagination is enabled and object_list is a queryset, # it's better to do a cheap query than to load the unpaginated # queryset in memory. if (self.get_paginate_by(self.object_list) is not None and hasattr(self.object_list, 'exists')): is_empty = not self.object_list.exists() else: is_empty = len(self.object_list) == 0 if is_empty: raise Http404(_("Empty list and '%(class_name)s.allow_empty' is False.") % {'class_name': self.__class__.__name__}) context = self.get_context_data() return self.render_to_response(context)
def get_dated_queryset(self, **lookup): """ Get a queryset properly filtered according to `allow_future` and any extra lookup kwargs. """ qs = self.get_queryset().filter(**lookup) date_field = self.get_date_field() allow_future = self.get_allow_future() allow_empty = self.get_allow_empty() paginate_by = self.get_paginate_by(qs) if not allow_future: now = timezone.now() if self.uses_datetime_field else timezone_today() qs = qs.filter(**{'%s__lte' % date_field: now}) if not allow_empty: # When pagination is enabled, it's better to do a cheap query # than to load the unpaginated queryset in memory. is_empty = len(qs) == 0 if paginate_by is None else not qs.exists() if is_empty: raise Http404(_("No %(verbose_name_plural)s available") % { 'verbose_name_plural': force_text(qs.model._meta.verbose_name_plural) }) return qs
def get_date_list(self, queryset, date_type=None, ordering='ASC'): """ Get a date list by calling `queryset.dates/datetimes()`, checking along the way for empty lists that aren't allowed. """ date_field = self.get_date_field() allow_empty = self.get_allow_empty() if date_type is None: date_type = self.get_date_list_period() if self.uses_datetime_field: date_list = queryset.datetimes(date_field, date_type, ordering) else: date_list = queryset.dates(date_field, date_type, ordering) if date_list is not None and not date_list and not allow_empty: name = force_text(queryset.model._meta.verbose_name_plural) raise Http404(_("No %(verbose_name_plural)s available") % {'verbose_name_plural': name}) return date_list
def convert_exception_to_response(get_response): """ Wrap the given get_response callable in exception-to-response conversion. All exceptions will be converted. All known 4xx exceptions (Http404, PermissionDenied, MultiPartParserError, SuspiciousOperation) will be converted to the appropriate response, and all other exceptions will be converted to 500 responses. This decorator is automatically applied to all middleware to ensure that no middleware leaks an exception and that the next middleware in the stack can rely on getting a response instead of an exception. """ @wraps(get_response, assigned=available_attrs(get_response)) def inner(request): try: response = get_response(request) except Exception as exc: response = response_for_exception(request, exc) return response return inner
def get_context_data(self, **kwargs): view = self.kwargs['view'] view_func = self._get_view_func(view) if view_func is None: raise Http404 title, body, metadata = utils.parse_docstring(view_func.__doc__) if title: title = utils.parse_rst(title, 'view', _('view:') + view) if body: body = utils.parse_rst(body, 'view', _('view:') + view) for key in metadata: metadata[key] = utils.parse_rst(metadata[key], 'model', _('view:') + view) kwargs.update({ 'name': view, 'summary': title, 'body': body, 'meta': metadata, }) return super(ViewDetailView, self).get_context_data(**kwargs)
def app_index(self, request, app_label, extra_context=None): app_dict = self._build_app_dict(request, app_label) if not app_dict: raise Http404('The requested admin page does not exist.') # Sort the models alphabetically within each app. app_dict['models'].sort(key=lambda x: x['name']) app_name = apps.get_app_config(app_label).verbose_name context = dict( self.each_context(request), title=_('%(app)s administration') % {'app': app_name}, app_list=[app_dict], app_label=app_label, ) context.update(extra_context or {}) request.current_app = self.name return TemplateResponse(request, self.app_index_template or [ 'admin/%s/app_index.html' % app_label, 'admin/app_index.html' ], context) # This global object represents the default admin site, for the common case. # You can instantiate AdminSite in your own code to create a custom admin site.
def sitemap(request, **kwargs): course_index = kwargs.get("index") r = requests.get('https://api.cmucoursefind.xyz/course/v1/list-all-courses/term/{}/'.format(course_index)) courseids = r.json().get('courseids') if courseids: output = '' if course_index == 'current': for courseid in courseids: output += "https://www.cmucoursefind.xyz/courses/{}\n".format( courseid.strip()) else: for courseid in courseids: output += "https://www.cmucoursefind.xyz/courses/{}/{}\n".format( courseid.strip(), course_index.strip()) return HttpResponse(output, content_type="text/plain") raise Http404("No sitemap for /{}".format(course_index))
def team_apply(request, slug): team = get_object_or_404(Team, slug=slug) state = team.get_state_for_user(request.user) if team.access == "invitation" and state is None and not request.user.is_staff: raise Http404() if can_apply(team, request.user) and request.method == "POST": membership, created = Membership.objects.get_or_create(team=team, user=request.user) membership.state = "applied" membership.save() managers = [m.user.email for m in team.managers()] send_email(managers, "teams_user_applied", context={ "team": team, "user": request.user }) messages.success(request, "Applied to join team.") return redirect("team_detail", slug=slug) else: return redirect("team_detail", slug=slug)
def speaker_create_staff(request, pk): user = get_object_or_404(User, pk=pk) if not request.user.is_staff: raise Http404 try: return redirect(user.speaker_profile) except ObjectDoesNotExist: pass if request.method == "POST": form = SpeakerForm(request.POST, request.FILES) if form.is_valid(): speaker = form.save(commit=False) speaker.user = user speaker.save() messages.success(request, "Speaker profile created.") return redirect("user_list") else: form = SpeakerForm(initial={"name": user.get_full_name()}) return render(request, "speakers/speaker_create.html", { "form": form, })
def speaker_edit(request, pk=None): if pk is None: try: speaker = request.user.speaker_profile except Speaker.DoesNotExist: return redirect("speaker_create") else: if request.user.is_staff: speaker = get_object_or_404(Speaker, pk=pk) else: raise Http404() if request.method == "POST": form = SpeakerForm(request.POST, request.FILES, instance=speaker) if form.is_valid(): form.save() messages.success(request, "Speaker profile updated.") return redirect("dashboard") else: form = SpeakerForm(instance=speaker) return render(request, "speakers/speaker_edit.html", { "form": form, })
def test_model(db): assert db is db foo = Foo.objects.create(text="hello") assert foo.ekey assert foo == Foo.objects.get_by_ekey(foo.ekey) assert foo == Foo.objects.get_by_ekey_or_404(foo.ekey) assert foo == Foo.objects.get(ekey=foo.ekey) assert foo == Foo.objects.filter(ekey=foo.ekey).get() foo = Foo2.objects.create(text="hello") assert foo.ekey assert foo == Foo2.objects.get_by_ekey(foo.ekey) assert foo == Foo2.objects.get_by_ekey_or_404(foo.ekey) assert foo == Foo2.objects.get(ekey=foo.ekey) assert foo == Foo2.objects.filter(ekey=foo.ekey).get() with pytest.raises(Http404): Foo.objects.get_by_ekey_or_404("123123") with pytest.raises(Http404): get_list_or_404(Foo, ekey="123123") with pytest.raises(Http404): get_object_or_404(Foo, ekey="123123")
def dispatch(self, request, *args, **kwargs): form = self.get_form() if not form.is_valid(): raise Http404(ValidationError(form.errors)) path = form.cleaned_data['path'] lang_code, proj_code, dir_path, filename = split_pootle_path(path) kwargs.update({ 'language_code': lang_code, 'project_code': proj_code, 'dir_path': dir_path, 'filename': filename, }) kwargs.update(**form.cleaned_data) view_class = self.get_view_class(lang_code, proj_code, dir_path, filename) return view_class.as_view()(request, *args, **kwargs)
def get_units(request): """Based on the vector of uids and the vector of header uids, return a dictionary of lightweight results for the view rows. :return: A JSON-encoded string containing the dictionary """ form = UnitViewRowsForm(request.GET, user=request.user) if not form.is_valid(): errors = form.errors.as_data() if 'uids' in errors: for error in errors['uids']: if error.code in ['invalid', 'required']: raise Http400(error.message) raise Http404(forms.ValidationError(form.errors).messages) units = search_backend.get(Unit)( request.user, **form.cleaned_data ).get_units() return JsonResponse(ViewRowResults(units, form.cleaned_data['headers']).data)
def reject_suggestion(request, unit, suggid): try: sugg = unit.suggestion_set.get(id=suggid) except ObjectDoesNotExist: raise Http404 # In order to be able to reject a suggestion, users have to either: # 1. Have `review` rights, or # 2. Be the author of the suggestion being rejected if (not check_permission('review', request) and (request.user.is_anonymous or request.user != sugg.user)): raise PermissionDenied(_('Insufficient rights to access review mode.')) unit.reject_suggestion(sugg, request.translation_project, request.user) r_data = QueryDict(request.body) if "comment" in r_data and r_data["comment"]: handle_suggestion_comment(request, sugg, unit, r_data["comment"], "rejected") json = { 'udbid': unit.id, 'sugid': suggid, 'user_score': request.user.public_score, } return JsonResponse(json)
def accept_suggestion(request, unit, suggid): try: suggestion = unit.suggestion_set.get(id=suggid) except ObjectDoesNotExist: raise Http404 unit.accept_suggestion(suggestion, request.translation_project, request.user) if "comment" in request.POST and request.POST["comment"]: handle_suggestion_comment(request, suggestion, unit, request.POST["comment"], "accepted") json = { 'udbid': unit.id, 'sugid': suggid, 'user_score': request.user.public_score, 'newtargets': [target for target in unit.target.strings], 'checks': _get_critical_checks_snippet(request, unit), } return JsonResponse(json)
def get_object(self): """ Primary key lookup if pk numeric, otherwise use custom filter kwargs. This allows us to also support build 37 lookup by chromosome, position, reference and variant. """ if self.kwargs['pk'].isdigit(): return super(VariantViewSet, self).get_object() queryset = self.filter_queryset(self.get_queryset()) filter_kwargs = self._custom_variant_filter_kwargs(self.kwargs['pk']) if not filter_kwargs: raise Http404('No {} matches the given query.'.format( queryset.model._meta.object_name)) obj = get_object_or_404(queryset, **filter_kwargs) self.check_object_permissions(self.request, obj) return obj
def init_request(self, object_id, *args, **kwargs): "The 'delete' admin view for this model." self.obj = self.get_object(unquote(object_id)) if not self.has_delete_permission(self.obj): raise PermissionDenied if self.obj is None: raise Http404(_('%(name)s object with primary key %(key)r does not exist.') % {'name': force_text(self.opts.verbose_name), 'key': escape(object_id)}) using = router.db_for_write(self.model) # Populate deleted_objects, a data structure of all related objects that # will also be deleted. (self.deleted_objects, model_count, self.perms_needed, self.protected) = get_deleted_objects( [self.obj], self.opts, self.request.user, self.admin_site, using)
def verify_step2(request, uidb64, token): bytes_uid = urlsafe_base64_decode(uidb64) try: uid = int(bytes_uid) except ValueError: raise SuspiciousOperation('verify_step2 received invalid base64 user ID: {}'.format( bytes_uid)) if uid != request.user.id: raise PermissionDenied('UID mismatch - user is {}, request was for {}'.format( request.user.id, uid)) user = get_object_or_404(models.User, pk=uid) if not verify_token_generator.check_token(user, token): raise Http404('token invalid') if not user.email_verified: user.email_verified = True user.save() messages.success(request, _('Your email has been verified successfully. Thanks!')) else: messages.info(request, _('Your email address has already been verified.')) return redirect('index')
def job_detail(request, queue_index, job_id): queue_index = int(queue_index) queue = get_queue_by_index(queue_index) try: job = Job.fetch(job_id, connection=queue.connection) except NoSuchJobError: raise Http404(_("Couldn't find job with this ID: %s") % job_id) context_data = admin.site.each_context(request) context_data.update({ 'title': _("Job Detail"), 'queue_index': queue_index, 'job': job, 'queue': queue, }) return render(request, 'django_rq/job_detail.html', context_data)
def click(request): global clicktrack clicktrack += 1 do_reset = (request.is_intercooler() and request.intercooler_data.element.id == 'intro-btn2' and request.intercooler_data.current_url.match is not None) if do_reset: clicktrack = 0 time = pluralize(clicktrack) text = "<span>You clicked me {} time{}...</span>".format(clicktrack, time) if do_reset: text = "<span>You reset the counter!, via {}</span>".format(request.intercooler_data.trigger.id) if not request.is_intercooler(): raise Http404("Not allowed to come here outside of an Intercooler.js request!") resp = HttpResponse(text) return resp
def init_request(self, object_id, *args, **kwargs): "The 'delete' admin view for this model." self.obj = self.get_object(unquote(object_id)) if not self.has_delete_permission(self.obj): raise PermissionDenied if self.obj is None: raise Http404(_('%(name)s object with primary key %(key)r does not exist.') % {'name': force_unicode(self.opts.verbose_name), 'key': escape(object_id)}) using = router.db_for_write(self.model) # Populate deleted_objects, a data structure of all related objects that # will also be deleted. (self.deleted_objects, model_count, self.perms_needed, self.protected) = get_deleted_objects( [self.obj], self.opts, self.request.user, self.admin_site, using)