我们从Python开源项目中,提取了以下31个代码示例,用于说明如何使用django.http.request.HttpRequest()。
def viewer_login(request: HttpRequest) -> HttpResponse: if request.method == 'POST': username = request.POST['username'] password = request.POST['password'] user = authenticate(username=username, password=password) if user is not None: if user.is_active: login(request, user) next_url = request.POST.get('next', 'viewer:main-page') return redirect(next_url) else: return render_error(request, "This account has been disabled.") else: return render_error(request, "Invalid login credentials.") else: next_url = request.GET.get('next', 'viewer:main-page') d = {'next': next_url} return render(request, 'viewer/login.html', d)
def image_viewer(request: HttpRequest, archive: int, page: int) -> HttpResponse: images = Image.objects.filter(archive=archive, extracted=True) if not images: raise Http404("Archive " + str(archive) + " has no extracted images") paginator = Paginator(images, 1) try: image = paginator.page(page) except (InvalidPage, EmptyPage): image = paginator.page(paginator.num_pages) image_object = image.object_list[0] if image_object.image_width / image_object.image_height > 1: image_object.is_horizontal = True d = {'image': image, 'backurl': redirect(image.object_list[0].archive).url, 'images_range': range(1, images.count() + 1), 'image_object': image_object} return render(request, "viewer/image_viewer.html", d)
def gallery_thumb(request: HttpRequest, pk: int) -> HttpResponse: try: gallery = Gallery.objects.get(pk=pk) except Gallery.DoesNotExist: raise Http404("Gallery does not exist") if not gallery.public and not request.user.is_authenticated: raise Http404("Gallery is not public") if 'HTTP_X_FORWARDED_HOST' in request.META: response = HttpResponse() response["Content-Type"] = "image/jpeg" # response["Content-Disposition"] = 'attachment; filename*=UTF-8\'\'{0}'.format( # archive.pretty_name) response['X-Accel-Redirect'] = "/image/{0}".format(gallery.thumbnail.name) return response else: return HttpResponseRedirect(gallery.thumbnail.url)
def public_stats(request: HttpRequest) -> HttpResponse: """Display public galleries and archives stats.""" if not crawler_settings.urls.enable_public_stats: if not request.user.is_staff: raise Http404("Page not found") else: return render_error(request, "Page disabled by settings (urls: enable_public_stats).") stats_dict = { "n_archives": Archive.objects.filter(public=True).count(), "archive": Archive.objects.filter(public=True).filter(filesize__gt=0).aggregate( Avg('filesize'), Max('filesize'), Min('filesize'), Sum('filesize')), "n_tags": Tag.objects.filter(gallery_tags__public=True).distinct().count(), "top_10_tags": Tag.objects.filter(gallery_tags__public=True).distinct().annotate( num_archive=Count('gallery_tags')).order_by('-num_archive')[:10] } d = {'stats': stats_dict} return render(request, "viewer/public_stats.html", d)
def no_route_found(self, request): """ Default callback for route not found :param request HttpRequest :rtype: Response """ response_obj = OrderedDict() response_obj["status"] = False response_obj["exceptions"] = { "message": "No route found for {0} {1}".format(self.__method, self.__uri), } response_obj["request"] = { "method": self.__method, "path_info": self.__uri, "content": request.body.decode("utf-8") } response_obj["message"] = "We are sorry, but something went terribly wrong." return Response(response_obj, content_type="application/json", status=404, charset="utf-8")
def get_dummy_request(dc, method=None, user=None, system_user=False): """ Return dummy request object. """ request = HttpRequest() request.csrf_processing_done = True request.dc = dc if method: request = set_request_method(request, method, copy_request=False) if system_user: from api.task.utils import get_system_task_user request.user = get_system_task_user() elif user: request.user = user return request
def get_form(self, req: HttpRequest, obj: Domain=None, **kwargs: Any) -> type: if req.GET.get("_prefill_key", "0") == "1": def formfield_callback(field: _ModelField, request: HttpRequest=None, **kwargs: Any) -> Type[_FormField]: f = self.formfield_for_dbfield(field, request=request, **kwargs) # type: _FormField # f can be None if the dbfield does not get a FormField (like hidden fields # or auto increment IDs). Only the dbfield has a .name attribute. if f and field.name == "dkimkey": if obj: obj.dkimkey = RSA.generate(2048).exportKey("PEM").decode("utf-8") else: f.initial = RSA.generate(2048).exportKey("PEM").decode("utf-8") return f kwargs["formfield_callback"] = functools.partial(formfield_callback, request=req) form_t = super().get_form(req, obj, **kwargs) return form_t
def _make_access_token(self, request: HttpRequest, tokenreq: _TokenRequest, rightnow: datetime.datetime, perms: TokenPermissions, for_user: MNUser) -> Dict[str, Any]: # TODO: figure out if we need to support more than one access scope # This implementation is based around this article, that, among other things, # describes the "kid" field required by Docker. The JWK implementation provided # by jwcrypto doesn't seem to work. # https://umbrella.cisco.com/blog/blog/2016/02/23/implementing-oauth-for-registry-v2/ _x = [] # type: List[str] jwtobj = { 'exp': int((rightnow + datetime.timedelta(minutes=2)).timestamp()), 'nbf': int((rightnow - datetime.timedelta(seconds=1)).timestamp()), 'iat': int(rightnow.timestamp()), 'iss': request.get_host(), 'aud': tokenreq.service, 'sub': str(for_user.pk), 'access': [{ "type": perms.type, "name": perms.path, "actions": _x + (["push"] if perms.push else []) + (["pull"] if perms.pull else []) + (["login"] if perms.type == "login" else []) }] } # type: Dict[str, Union[str, int, List[Dict[str, Union[str, List[str]]]]]] return jwtobj
def setUp(self): super().setUp() self.request = HttpRequest()
def viewer_logout(request: HttpRequest) -> HttpResponse: logout(request) return HttpResponseRedirect(reverse('viewer:main-page'))
def session_settings(request: HttpRequest) -> HttpResponse: if request.method == 'POST': data = json.loads(request.body.decode("utf-8")) if 'viewer_parameters' in data: if "viewer_parameters" not in request.session: request.session["viewer_parameters"] = {} for k, v in data.items(): if not k == 'viewer_parameters': request.session["viewer_parameters"][k] = v request.session.modified = True return HttpResponse(json.dumps({'result': "ok"}), content_type="application/json; charset=utf-8") elif request.method == 'GET': data = json.loads(request.body.decode("utf-8")) if 'viewer_parameters' in data: if "viewer_parameters" not in request.session: request.session["viewer_parameters"] = {} request.session["viewer_parameters"]["image_width"] = 900 request.session.modified = True return HttpResponse( json.dumps(request.session["viewer_parameters"]), content_type="application/json; charset=utf-8" ) return HttpResponse(json.dumps({'result': "error"}), content_type="application/json; charset=utf-8") else: return HttpResponse(json.dumps({'result': "error"}), content_type="application/json; charset=utf-8") # TODO: Generalize this script for several providers.
def panda_userscript(request: HttpRequest) -> HttpResponse: return render( request, 'viewer/panda.user.js', { "api_key": crawler_settings.api_key, "img_url": request.build_absolute_uri(crawler_settings.urls.static_url + 'favicon-160.png'), "server_url": request.build_absolute_uri(reverse('viewer:json-parser')) }, content_type='application/javascript' )
def user_archive_preferences(request: HttpRequest, archive_pk: int, setting: str) -> HttpResponse: """Archive user favorite toggle.""" try: Archive.objects.get(pk=archive_pk) except Archive.DoesNotExist: raise Http404("Archive does not exist") if setting == 'favorite': current_user_archive_preferences, created = UserArchivePrefs.objects.get_or_create( user=User.objects.get(pk=request.user.id), archive=Archive.objects.get(pk=archive_pk), defaults={'favorite_group': 1} ) if not created: current_user_archive_preferences.favorite_group = 1 current_user_archive_preferences.save() elif setting == 'unfavorite': current_user_archive_preferences, created = UserArchivePrefs.objects.get_or_create( user=User.objects.get(pk=request.user.id), archive=Archive.objects.get(pk=archive_pk), defaults={'favorite_group': 0} ) if not created: current_user_archive_preferences.favorite_group = 0 current_user_archive_preferences.save() else: return render_error(request, "Unknown user preference.") return HttpResponseRedirect(request.META["HTTP_REFERER"], {'user_archive_preferences': current_user_archive_preferences})
def render_error(request: HttpRequest, message: str) -> HttpResponseRedirect: messages.error(request, message, extra_tags='danger') if 'HTTP_REFERER' in request.META: return HttpResponseRedirect(request.META["HTTP_REFERER"]) else: return HttpResponseRedirect(reverse('viewer:main-page'))
def about(request: HttpRequest) -> HttpResponse: return render( request, 'viewer/about.html' )
def default_route_options(request: HttpRequest): """ Default callback for OPTIONS request :param request HttpRequest :rtype: Response """ response_obj = OrderedDict() response_obj["status"] = True response_obj["data"] = "Ok" return Response(response_obj, content_type="application/json", charset="utf-8")
def test_http_basic_get_user(): # Basic django request, without authentication info request = HttpRequest() # Standard middlewares were not applied on this request user = http_basic_auth_get_user(request) assert user is not None assert user_is_anonymous(user)
def test_model(self): req = HttpRequest() self.article.publish() self.article_2.publish() # test the listing contains the published article self.assertTrue(self.article.get_published() in self.listing.get_items_to_list(req)) # ...not the draft one self.assertTrue(self.article not in self.listing.get_items_to_list(req)) # ...not an article that isn't associated with the listing self.assertTrue(self.article_2 not in self.listing.get_items_to_list(req)) self.assertTrue(self.article_2.get_published() not in self.listing.get_items_to_list(req)) self.article.unpublish() self.article_2.unpublish()
def init_request(self): request = HttpRequest() request.data = QueryDict().copy() request.session = DummySession() return request
def detail(request: HttpRequest, question_id: int) -> HttpResponse: question = get_object_or_404(Question, pk=question_id) if not question.is_active: return 'invalid request' return render(request, 'polls/detail.html', {'question': question})
def health(request: HttpRequest) -> HttpResponse: c = connection.cursor() # type: CursorWrapper try: c.execute("SELECT 1") res = c.fetchone() except DatabaseError as e: return HttpResponseServerError(("Health check failed: %s" % str(e)).encode("utf-8"), content_type="text/plain; charset=utf-8") else: return HttpResponse(b'All green', status=200, content_type="text/plain; charset=utf-8")
def nothing(request: HttpRequest) -> HttpResponse: return HttpResponse(b'nothing to see here', status=200, content_type="text/plain; charset=utf-8")
def test_error(request: HttpRequest) -> HttpResponse: if settings.DEBUG: raise Exception("This is a test") else: return HttpResponseRedirect("/")
def branding(request: HttpRequest) -> Dict[str, str]: return { "company_name": settings.COMPANY_NAME, "company_logo": settings.COMPANY_LOGO_URL, }
def validate_refresh_token(self, refresh_token: str, client: MNApplication, request: HttpRequest, *args: Any, **kwargs: Any) -> bool: res = super().validate_refresh_token(refresh_token, client, request, *args, **kwargs) if res: # our base validated the refresh token, let's check if the client or user permissions # changed missing_permissions = find_missing_permissions(client, request.user) return len(missing_permissions) == 0 else: return False
def _make_refresh_token(self, request: HttpRequest, tokenreq: _TokenRequest, rightnow: datetime.datetime, for_user: MNUser) -> Dict[str, Any]: jwtobj = { 'exp': int((rightnow + datetime.timedelta(hours=2)).timestamp()), 'nbf': int((rightnow - datetime.timedelta(seconds=1)).timestamp()), 'iat': int(rightnow.timestamp()), 'iss': request.get_host(), 'aud': tokenreq.service, 'sub': str(for_user.pk), 'malleable': True, 'client_id': tokenreq.client_id, } return jwtobj
def dispatch(self, request: HttpRequest, *args: Any, **kwargs: Any) -> HttpResponse: return super().dispatch(request, *args, **kwargs)
def render(self): if not self.template_name: return None try: logger.debug("Template name: %s" % self.template_name) template = get_template(self.template_name) except TemplateDoesNotExist: logger.debug("Template not found: %s" % self.template_name) return None # TODO: Avoid using a null HttRequest to context processors ctx = RequestContext(HttpRequest(), self.ctx) return template.render(ctx)
def gallery_details(request: HttpRequest, pk: int, tool: str=None) -> HttpResponse: try: gallery = Gallery.objects.get(pk=pk) except Gallery.DoesNotExist: raise Http404("Gallery does not exist") if not (gallery.public or request.user.is_authenticated): raise Http404("Gallery does not exist") if request.user.is_staff and tool == "download": if 'downloader' in request.GET: current_settings = Settings(load_from_config=crawler_settings.config) current_settings.allow_downloaders_only([request.GET['downloader']], True, True, True) current_settings.workers.web_queue.enqueue_args_list((gallery.get_link(),), override_options=current_settings) else: # Since this is used from the gallery page mainly to download an already added gallery using # downloader settings, force replace_metadata current_settings = Settings(load_from_config=crawler_settings.config) current_settings.replace_metadata = True current_settings.workers.web_queue.enqueue_args_list((gallery.get_link(),), override_options=current_settings) return HttpResponseRedirect(request.META["HTTP_REFERER"]) if request.user.is_staff and tool == "toggle-hidden": gallery.hidden = not gallery.hidden gallery.save() return HttpResponseRedirect(request.META["HTTP_REFERER"]) if request.user.is_staff and tool == "toggle-public": gallery.public_toggle() return HttpResponseRedirect(request.META["HTTP_REFERER"]) if request.user.is_staff and tool == "mark-deleted": gallery.mark_as_deleted() return HttpResponseRedirect(request.META["HTTP_REFERER"]) if request.user.is_staff and tool == "recall-api": current_settings = Settings(load_from_config=crawler_settings.config) current_settings.set_update_metadata_options(providers=(gallery.provider,)) current_settings.workers.web_queue.enqueue_args_list((gallery.get_link(),), override_options=current_settings) frontend_logger.info( 'Updating gallery API data for gallery: {} and related archives'.format( gallery.get_absolute_url() ) ) return HttpResponseRedirect(request.META["HTTP_REFERER"]) tag_lists = sort_tags(gallery.tags.all()) d = {'gallery': gallery, 'tag_lists': tag_lists, 'settings': crawler_settings} return render(request, "viewer/gallery.html", d)
def route(self, request: HttpRequest): """ Prepares for the CallBackResolver and handles the response and exceptions :param request HttpRequest :rtype: HttpResponse """ self.flush() self.__request = request self.__uri = request.path[1:] self.__method = request.method self.__bound_routes = dict() self.register('log', logging.getLogger(os.urandom(3).hex().upper())) self.register('router', RouteMapping()) self.__app['router'].flush_routes() routes = self.__callable().connect(self.__app) self.__bound_routes = routes['router'].get__routes() request_headers = request.META if 'HTTP_USER_AGENT' in request_headers: indent = 2 if re.match("[Mozilla]{7}", request_headers['HTTP_USER_AGENT']) else 0 else: indent = 0 if self.set_end_point_uri() is False: return self.set_response_headers(self.no_route_found(self.__request).render(indent)) acutal_params = self.get_url_params(self.get_end_point_uri()) try: response = self.exec_route_callback(acutal_params) if type(response) == Response: return self.set_response_headers(response.render(indent)) else: return self.set_response_headers(response) except InvalidInputException: self.__app['log'].error("< 400", exc_info=True) return self.set_response_headers(Response(None, status=400).render(indent)) except AuthException as e: self.__app['log'].error("< 403", exc_info=True) return self.set_response_headers(Response(None, status=403).render(indent)) except NotFoundException: self.__app['log'].error("< 404", exc_info=True) return self.set_response_headers(Response(None, status=404).render(indent)) except RequestDataTooBig: self.__app['log'].error("< 413", exc_info=True) return self.set_response_headers(Response(None, status=413).render(indent)) except BaseException as e: self.__app['log'].error("< 500", exc_info=True) return self.set_response_headers(Response(None, status=500).render(indent)) finally: del self
def post(self, request: HttpRequest, *args: Any, **kwargs: Any) -> HttpResponse: # POST received: # <QueryDict: { # 'client_id': ['docker'], # 'refresh_token': ['boink'], # 'service': ['registry.maurusnet.test'], # 'scope': ['repository:dev/destalinator:push,pull'], # 'grant_type': ['refresh_token']}> if "refresh_token" in request.POST and request.POST["grant_type"] == "refresh_token": tr = _tkr_parse(request.POST) if tr.scope: tp = TokenPermissions.parse_scope(tr.scope) else: return HttpResponseBadRequest("Can't issue access token without valid scope (scope=%s)", tr.scope) try: client = DockerRegistry.objects.get(client_id=tr.service) # type: DockerRegistry except DockerRegistry.DoesNotExist: return HttpResponseNotFound("No such registry/client from refresh token(%s)" % str(tr)) user = self._user_from_refresh_token(request.POST["refresh_token"], client.public_key_pem(), expected_issuer=request.get_host(), expected_audience=tr.service) if user: try: drepo = DockerRepo.objects.get(name=tp.path, registry_id=client.id) except DockerRepo.DoesNotExist: if settings.DOCKERAUTH_ALLOW_UNCONFIGURED_REPOS: drepo = DockerRepo() drepo.name = tp.path drepo.registry = client drepo.unauthenticated_read = True drepo.unauthenticated_write = True else: return HttpResponseNotFound("No such repo '%s'" % tp.path) if drepo.registry.has_access(user, tp) or drepo.has_access(user, tp): rightnow = datetime.datetime.now(tz=pytz.UTC) return HttpResponse(content=json.dumps({ "access_token": self._create_jwt( self._make_access_token(request, tr, rightnow, tp, user), client.private_key_pem(), ), "scope": tr.scope, "expires_in": 119, "refresh_token": self._create_jwt( self._make_refresh_token(request, tr, rightnow, user), client.private_key_pem(), ) }), status=200, content_type="application/json") else: return HttpResponseForbidden("User %s doesn't have access to repo %s" % (user.pk, tp.path)) else: return HttpResponse("Unauthorized", status=401) else: return HttpResponseBadRequest("POSTing to this endpoint requires a refresh_token")