我们从Python开源项目中,提取了以下33个代码示例,用于说明如何使用django.http()。
def java_to_python_keys(self): if SETTINGS.CONVERT_KEYS and self.need_convert('request', self.method): for param_data in (self.uri_params, self.params, self.data): if isinstance(param_data, dict): list_param_data = [param_data] elif isinstance(param_data, (tuple, list)): list_param_data = param_data else: raise Exception('data should be dict, list or tuple') for pd in list_param_data: keys = list(pd.keys()) for key in keys: if key in SETTINGS.KEYWORDS_WITH_VALUE_NEED_CONVERT: value = pd[key] if isinstance(value, list): for i, v in enumerate(value): value[i] = self.java_to_python(v) else: pd[key] = self.java_to_python(value) else: pd[self.java_to_python(key)] = pd.pop(key) if self.method == 'get' and SETTINGS.ORDER_BY in self.params: self.params[SETTINGS.ORDER_BY] = self.java_to_python(self.params[SETTINGS.ORDER_BY]) # rename input keys in http request
def get_next_url(request, redirect_field_name): """Retrieves next url from request Note: This verifies that the url is safe before returning it. If the url is not safe, this returns None. :arg HttpRequest request: the http request :arg str redirect_field_name: the name of the field holding the next url :returns: safe url or None """ next_url = request.GET.get(redirect_field_name) if next_url: kwargs = { 'url': next_url, 'host': request.get_host() } # NOTE(willkg): Django 1.11+ allows us to require https, too. if django.VERSION >= (1, 11): kwargs['require_https'] = request.is_secure() is_safe = is_safe_url(**kwargs) if is_safe: return next_url return None
def test_no_oidc_token_expiration_forces_renewal(self, mock_random_string): mock_random_string.return_value = 'examplestring' request = self.factory.get('/foo') request.user = self.user request.session = {} response = self.middleware.process_request(request) self.assertEquals(response.status_code, 302) url, qs = response.url.split('?') self.assertEquals(url, 'http://example.com/authorize') expected_query = { 'response_type': ['code'], 'redirect_uri': ['http://testserver/callback/'], 'client_id': ['foo'], 'nonce': ['examplestring'], 'prompt': ['none'], 'scope': ['openid email'], 'state': ['examplestring'], } self.assertEquals(expected_query, parse_qs(qs))
def get_times(request): """Gets start and endtime from request As we use no timezone in NAV, remove it from parsed timestamps :param request: django.http.HttpRequest """ starttime = request.GET.get('starttime') endtime = request.GET.get('endtime') try: if starttime: starttime = iso8601.parse_date(starttime).replace(tzinfo=None) if endtime: endtime = iso8601.parse_date(endtime).replace(tzinfo=None) except iso8601.ParseError: raise Iso8601ParseError return starttime, endtime
def is_authenticated(self, request): """ Handles checking if the user is authenticated and dealing with unauthenticated users. Mostly a hook, this uses class assigned to ``authentication`` from ``Resource._meta``. """ # Authenticate the request as needed. auth_result = self._meta.authentication.is_authenticated(request) if isinstance(auth_result, HttpResponse): raise ImmediateHttpResponse(response=auth_result) if not auth_result is True: raise ImmediateHttpResponse(response=http.HttpUnauthorized())
def get_detail(self, request, **kwargs): """ Returns a single serialized resource. Calls ``cached_obj_get/obj_get`` to provide the data, then handles that result set and serializes it. Should return a HttpResponse (200 OK). """ basic_bundle = self.build_bundle(request=request) try: obj = self.cached_obj_get(bundle=basic_bundle, **self.remove_api_resource_names(kwargs)) except ObjectDoesNotExist: return http.HttpNotFound() except MultipleObjectsReturned: return http.HttpMultipleChoices("More than one resource is found at this URI.") bundle = self.build_bundle(obj=obj, request=request) bundle = self.full_dehydrate(bundle) bundle = self.alter_detail_data_to_serialize(request, bundle) return self.create_response(request, bundle)
def delete_detail(self, request, **kwargs): """ Destroys a single resource/object. Calls ``obj_delete``. If the resource is deleted, return ``HttpNoContent`` (204 No Content). If the resource did not exist, return ``Http404`` (404 Not Found). """ # Manually construct the bundle here, since we don't want to try to # delete an empty instance. bundle = Bundle(request=request) try: self.obj_delete(bundle=bundle, **self.remove_api_resource_names(kwargs)) return http.HttpNoContent() except NotFound: return http.HttpNotFound()
def delete(self, request, id): """Delete a single user by id. This method returns HTTP 204 (no content) on success. """ if id == 'current': raise django.http.HttpResponseNotFound('current') api.keystone.user_delete(request, id)
def delete(self, request, id): """Delete a single role by id. This method returns HTTP 204 (no content) on success. """ if id == 'default': raise django.http.HttpResponseNotFound('default') api.keystone.role_delete(request, id)
def delete(self, request, id): """Delete a single domain by id. This method returns HTTP 204 (no content) on success. """ if id == 'default': raise django.http.HttpResponseNotFound('default') api.keystone.domain_delete(request, id)
def __init__(self, environ): script_name = get_script_name(environ) path_info = get_path_info(environ) if not path_info: # Sometimes PATH_INFO exists, but is empty (e.g. accessing # the SCRIPT_NAME URL without a trailing slash). We really need to # operate as if they'd requested '/'. Not amazingly nice to force # the path like this, but should be harmless. path_info = '/' self.environ = environ self.path_info = path_info # be careful to only replace the first slash in the path because of # http://test/something and http://test//something being different as # stated in http://www.ietf.org/rfc/rfc2396.txt self.path = '%s/%s' % (script_name.rstrip('/'), path_info.replace('/', '', 1)) self.META = environ self.META['PATH_INFO'] = path_info self.META['SCRIPT_NAME'] = script_name self.method = environ['REQUEST_METHOD'].upper() _, content_params = cgi.parse_header(environ.get('CONTENT_TYPE', '')) if 'charset' in content_params: try: codecs.lookup(content_params['charset']) except LookupError: pass else: self.encoding = content_params['charset'] self._post_parse_error = False try: content_length = int(environ.get('CONTENT_LENGTH')) except (ValueError, TypeError): content_length = 0 self._stream = LimitedStream(self.environ['wsgi.input'], content_length) self._read_started = False self.resolver_match = None
def GET(self): # The WSGI spec says 'QUERY_STRING' may be absent. raw_query_string = get_bytes_from_wsgi(self.environ, 'QUERY_STRING', '') return http.QueryDict(raw_query_string, encoding=self._encoding)
def COOKIES(self): raw_cookie = get_str_from_wsgi(self.environ, 'HTTP_COOKIE', '') return http.parse_cookie(raw_cookie)
def COOKIES(self): #raw_cookie = get_str_from_wsgi(self.environ, 'HTTP_COOKIE', '') #return http.parse_cookie(raw_cookie) return self.sanic_request.cookies
def JsonResponse(responseData): import django if float(django.get_version()) >= 1.7: from django.http import JsonResponse return JsonResponse(responseData) else: return HttpResponse(json.dumps(responseData), content_type="application/json")
def preview_page(self, request, object_id, language): """Redirecting preview function based on draft_id """ page = get_object_or_404(self.model, id=object_id) attrs = "?%s" % get_cms_setting('CMS_TOOLBAR_URL__EDIT_ON') attrs += "&language=" + language with force_language(language): url = page.get_absolute_url(language) + attrs site = get_current_site(request) if not site == page.site: url = "http%s://%s%s" % ('s' if request.is_secure() else '', page.site.domain, url) return HttpResponseRedirect(url)
def test_expired_token_forces_renewal(self, mock_random_string): mock_random_string.return_value = 'examplestring' request = self.factory.get('/foo') request.user = self.user request.session = { 'oidc_id_token_expiration': time.time() - 10 } response = self.middleware.process_request(request) self.assertEquals(response.status_code, 302) url, qs = response.url.split('?') self.assertEquals(url, 'http://example.com/authorize') expected_query = { 'response_type': ['code'], 'redirect_uri': ['http://testserver/callback/'], 'client_id': ['foo'], 'nonce': ['examplestring'], 'prompt': ['none'], 'scope': ['openid email'], 'state': ['examplestring'], } self.assertEquals(expected_query, parse_qs(qs)) # This adds a "home page" we can test against.
def test_expired_token_redirects_to_sso(self, mock_random_string): mock_random_string.return_value = 'examplestring' client = ClientWithUser() client.login(username=self.user.username, password='password') # Set expiration to some time in the past session = client.session session['oidc_id_token_expiration'] = time.time() - 100 session.save() resp = client.get('/mdo_fake_view/') self.assertEqual(resp.status_code, 302) url, qs = resp.url.split('?') self.assertEquals(url, 'http://example.com/authorize') expected_query = { 'response_type': ['code'], 'redirect_uri': ['http://testserver/callback/'], 'client_id': ['foo'], 'nonce': ['examplestring'], 'prompt': ['none'], 'scope': ['openid email'], 'state': ['examplestring'], } self.assertEquals(expected_query, parse_qs(qs))
def get_or_create_token(request): """Gets an existing token or creates a new one. If the old token has expired, create a new one. :type request: django.http.HttpRequest """ if request.account.is_admin(): token, _ = APIToken.objects.get_or_create( client=request.account, expires__gte=datetime.now(), defaults={'token': auth_token(), 'expires': datetime.now() + EXPIRE_DELTA}) return HttpResponse(str(token)) else: return HttpResponse('You must log in to get a token', status=status.HTTP_403_FORBIDDEN)
def getBaseURL(req): """ Given a Django web request object, returns the OpenID 'trust root' for that request; namely, the absolute URL to the site root which is serving the Django request. The trust root will include the proper scheme and authority. It will lack a port if the port is standard (80, 443). """ name = req.META['HTTP_HOST'] try: name = name[:name.index(':')] except: pass try: port = int(req.META['SERVER_PORT']) except: port = 80 proto = req.META['SERVER_PROTOCOL'] if 'HTTPS' in proto: proto = 'https' else: proto = 'http' if port in [80, 443] or not port: port = '' else: port = ':%s' % (port,) url = "%s://%s%s/" % (proto, name, port) return url
def dispatch(self, request_type, request, **kwargs): """ Handles the common operations (allowed HTTP method, authentication, throttling, method lookup) surrounding most CRUD interactions. """ allowed_methods = getattr(self._meta, "%s_allowed_methods" % request_type, None) if 'HTTP_X_HTTP_METHOD_OVERRIDE' in request.META: request.method = request.META['HTTP_X_HTTP_METHOD_OVERRIDE'] request_method = self.method_check(request, allowed=allowed_methods) method = getattr(self, "%s_%s" % (request_method, request_type), None) if method is None: raise ImmediateHttpResponse(response=http.HttpNotImplemented()) self.is_authenticated(request) self.throttle_check(request) # All clear. Process the request. request = convert_post_to_put(request) response = method(request, **kwargs) # Add the throttled request. self.log_throttled_access(request) # If what comes back isn't a ``HttpResponse``, assume that the # request was accepted and that some action occurred. This also # prevents Django from freaking out. if not isinstance(response, HttpResponse): return http.HttpNoContent() return response
def method_check(self, request, allowed=None): """ Ensures that the HTTP method used on the request is allowed to be handled by the resource. Takes an ``allowed`` parameter, which should be a list of lowercase HTTP methods to check against. Usually, this looks like:: # The most generic lookup. self.method_check(request, self._meta.allowed_methods) # A lookup against what's allowed for list-type methods. self.method_check(request, self._meta.list_allowed_methods) # A useful check when creating a new endpoint that only handles # GET. self.method_check(request, ['get']) """ if allowed is None: allowed = [] request_method = request.method.lower() allows = ','.join([s.upper() for s in allowed]) if request_method == "options": response = HttpResponse(allows) response['Allow'] = allows raise ImmediateHttpResponse(response=response) if not request_method in allowed: response = http.HttpMethodNotAllowed(allows) response['Allow'] = allows raise ImmediateHttpResponse(response=response) return request_method
def throttle_check(self, request): """ Handles checking if the user should be throttled. Mostly a hook, this uses class assigned to ``throttle`` from ``Resource._meta``. """ identifier = self._meta.authentication.get_identifier(request) # Check to see if they should be throttled. if self._meta.throttle.should_be_throttled(identifier): # Throttle limit exceeded. raise ImmediateHttpResponse(response=http.HttpTooManyRequests())
def error_response(self, request, errors, response_class=None): """ Extracts the common "which-format/serialize/return-error-response" cycle. Should be used as much as possible to return errors. """ if response_class is None: response_class = http.HttpBadRequest desired_format = None if request: if request.GET.get('callback', None) is None: try: desired_format = self.determine_format(request) except BadRequest: pass # Fall through to default handler below else: # JSONP can cause extra breakage. desired_format = 'application/json' if not desired_format: desired_format = self._meta.default_format try: serialized = self.serialize(request, errors, desired_format) except BadRequest, e: error = "Additional errors occurred, but serialization of those errors failed." if settings.DEBUG: error += " %s" % e return response_class(content=error, content_type='text/plain') return response_class(content=serialized, content_type=build_content_type(desired_format))
def post_list(self, request, **kwargs): """ Creates a new resource/object with the provided data. Calls ``obj_create`` with the provided data and returns a response with the new resource's location. If a new resource is created, return ``HttpCreated`` (201 Created). If ``Meta.always_return_data = True``, there will be a populated body of serialized data. """ if django.VERSION >= (1, 4): body = request.body else: body = request.raw_post_data deserialized = self.deserialize(request, body, format=request.META.get('CONTENT_TYPE', 'application/json')) deserialized = self.alter_deserialized_detail_data(request, deserialized) bundle = self.build_bundle(data=dict_strip_unicode_keys(deserialized), request=request) updated_bundle = self.obj_create(bundle, **self.remove_api_resource_names(kwargs)) location = self.get_resource_uri(updated_bundle) if not self._meta.always_return_data: return http.HttpCreated(location=location) else: updated_bundle = self.full_dehydrate(updated_bundle) updated_bundle = self.alter_detail_data_to_serialize(request, updated_bundle) return self.create_response(request, updated_bundle, response_class=http.HttpCreated, location=location)
def post_detail(self, request, **kwargs): """ Creates a new subcollection of the resource under a resource. This is not implemented by default because most people's data models aren't self-referential. If a new resource is created, return ``HttpCreated`` (201 Created). """ return http.HttpNotImplemented()
def delete_list(self, request, **kwargs): """ Destroys a collection of resources/objects. Calls ``obj_delete_list``. If the resources are deleted, return ``HttpNoContent`` (204 No Content). """ bundle = self.build_bundle(request=request) self.obj_delete_list(bundle=bundle, request=request, **self.remove_api_resource_names(kwargs)) return http.HttpNoContent()
def __init__(self, sanic_request): """ :param SanicRequest sanic_request: """ #script_name = get_script_name(environ) #path_info = get_path_info(environ) script_name = "/" path_info = sanic_request.path if not path_info: # Sometimes PATH_INFO exists, but is empty (e.g. accessing # the SCRIPT_NAME URL without a trailing slash). We really need to # operate as if they'd requested '/'. Not amazingly nice to force # the path like this, but should be harmless. path_info = '/' self.sanic_request = sanic_request self.path_info = path_info # be careful to only replace the first slash in the path because of # http://test/something and http://test//something being different as # stated in http://www.ietf.org/rfc/rfc2396.txt self.path = '%s/%s' % (script_name.rstrip('/'), path_info.replace('/', '', 1)) self.META = {"HTTP_{:s}".format(str(k).upper()): v for (k, v) in sanic_request.headers.items()} self.META['REMOTE_ADDR'] = sanic_request.ip[0] self.META['PATH_INFO'] = path_info self.META['SCRIPT_NAME'] = script_name self.method = str(sanic_request.method).upper() # _, content_params = cgi.parse_header(environ.get('CONTENT_TYPE', '')) # if 'charset' in content_params: # try: # codecs.lookup(content_params['charset']) # except LookupError: # pass # else: # self.encoding = content_params['charset'] self._post_parse_error = False try: content_length = int(sanic_request.headers['content-length']) except (KeyError, ValueError, TypeError): content_length = 0 #self._stream = LimitedStream(self.environ['wsgi.input'], content_length) self._body = sanic_request.body self._read_started = False self.resolver_match = None
def process_response(self, res): data = res.get(SETTINGS.DATA, None) if isinstance(res, dict): # dict ? json ?? if SETTINGS.DATA_STYLE == 'dict': if data is not None: if isinstance(res[SETTINGS.DATA], (list, dict)) and len(res[SETTINGS.DATA]) == 0: res[SETTINGS.DATA] = None elif isinstance(res[SETTINGS.DATA], list): res[SETTINGS.DATA] = { SETTINGS.RESULT: res[SETTINGS.DATA], SETTINGS.COUNT: len(res[SETTINGS.DATA]) } elif isinstance(res[SETTINGS.DATA].get(SETTINGS.RESULT, None), (list, dict)) and len(res[SETTINGS.DATA][SETTINGS.RESULT]) == 0: res[SETTINGS.DATA][SETTINGS.RESULT] = None if data is not None and len(data) > 0: if self.method == 'get': path = '/' if SETTINGS.RESULT in res[SETTINGS.DATA]: has_result_field = True else: has_result_field = False else: path = None has_result_field = None self.process_keys(res, path, has_result_field) # additional data additional_data = getattr(self, 'additional_data', None) if isinstance(additional_data, dict): for key, value in additional_data.items(): res[SETTINGS.DATA][self.python_to_java(key, self.omit_underlines)] = value # process json response class json_response_class = getattr(SETTINGS, 'JSON_RESPONSE_CLASS', None) if json_response_class == 'rest_framework.response.Response': res = Response(res) elif json_response_class == 'django.http.JsonResponse': res = JsonResponse(res, json_dumps_params={"indent": 2}) else: raise Exception('JSON_RESPONSE_CLASS in the benchmark_settings is not defined or not correct. The value of it should be "rest_framework.response.Response", or "django.http.JsonResponse"') if isinstance(res, (StreamingHttpResponse, django.http.response.HttpResponse)): # ???, ?????? http ?? return res raise Exception('unknown response type: %s' % type(res)) # ?? style 2 ? get ????
def paginate(self, res): if res[SETTINGS.CODE] == SETTINGS.SUCCESS_CODE and SETTINGS.DATA_STYLE == 'dict': if isinstance(res[SETTINGS.DATA], dict): res[SETTINGS.DATA] = [res[SETTINGS.DATA]] # get one if self.get_one is None and 'pk' in self.uri_params.keys() or self.get_one: if len(res[SETTINGS.DATA]) == 0: res[SETTINGS.DATA] = None else: res[SETTINGS.DATA] = res[SETTINGS.DATA][0] # get many in pages elif self.page is not None: if self.limit == 0: page_count = 0 if self.count == 0 else 1 else: page_count = math.ceil(self.count / self.limit) if 1 <= self.page <= page_count: result = res[SETTINGS.DATA] else: result = None if self.page < 1: self.page = 0 elif self.page > page_count: self.page = page_count + 1 basic_url = 'http://' + self.host + self.path previous_param_url = None next_param_url = None params = copy.deepcopy(self.params) params[SETTINGS.LIMIT] = self.limit params[SETTINGS.PAGE] = self.page if self.page <= 1: previous_url = None else: for key, value in params.items(): if key == 'page': value = str(self.page - 1) if previous_param_url is None: previous_param_url = '?' + key + '=' + str(value) else: previous_param_url += '&' + key + '=' + str(value) previous_url = basic_url + previous_param_url if self.page >= page_count: next_url = None else: for key, value in params.items(): if key == 'page': value = str(self.page + 1) if next_param_url is None: next_param_url = '?' + key + '=' + str(value) else: next_param_url += '&' + key + '=' + str(value) next_url = basic_url + next_param_url res[SETTINGS.DATA] = {SETTINGS.RESULT: result, SETTINGS.COUNT: self.count, SETTINGS.NEXT: next_url, SETTINGS.PREVIOUS: previous_url} # get many not in pages else: res[SETTINGS.DATA] = {SETTINGS.RESULT: res[SETTINGS.DATA], SETTINGS.COUNT: len(res[SETTINGS.DATA])} # ?? get ??
def _handle_500(self, request, exception): import traceback import sys the_trace = '\n'.join(traceback.format_exception(*(sys.exc_info()))) response_class = http.HttpApplicationError response_code = 500 NOT_FOUND_EXCEPTIONS = (NotFound, ObjectDoesNotExist, Http404) if isinstance(exception, NOT_FOUND_EXCEPTIONS): response_class = HttpResponseNotFound response_code = 404 if settings.DEBUG: data = { "error_message": unicode(exception), "traceback": the_trace, } return self.error_response(request, data, response_class=response_class) # When DEBUG is False, send an error message to the admins (unless it's # a 404, in which case we check the setting). send_broken_links = getattr(settings, 'SEND_BROKEN_LINK_EMAILS', False) if not response_code == 404 or send_broken_links: log = logging.getLogger('django.request.tastypie') log.error('Internal Server Error: %s' % request.path, exc_info=True, extra={'status_code': response_code, 'request': request}) if django.VERSION < (1, 3, 0): from django.core.mail import mail_admins subject = 'Error (%s IP): %s' % ((request.META.get('REMOTE_ADDR') in settings.INTERNAL_IPS and 'internal' or 'EXTERNAL'), request.path) try: request_repr = repr(request) except: request_repr = "Request repr() unavailable" message = "%s\n\n%s" % (the_trace, request_repr) mail_admins(subject, message, fail_silently=True) # Send the signal so other apps are aware of the exception. got_request_exception.send(self.__class__, request=request) # Prep the data going out. data = { "error_message": getattr(settings, 'TASTYPIE_CANNED_ERROR', "Sorry, this request could not be processed. Please try again later."), } return self.error_response(request, data, response_class=response_class)
def put_list(self, request, **kwargs): """ Replaces a collection of resources with another collection. Calls ``delete_list`` to clear out the collection then ``obj_create`` with the provided the data to create the new collection. Return ``HttpNoContent`` (204 No Content) if ``Meta.always_return_data = False`` (default). Return ``HttpAccepted`` (200 OK) if ``Meta.always_return_data = True``. """ if django.VERSION >= (1, 4): body = request.body else: body = request.raw_post_data deserialized = self.deserialize(request, body, format=request.META.get('CONTENT_TYPE', 'application/json')) deserialized = self.alter_deserialized_list_data(request, deserialized) if not self._meta.collection_name in deserialized: raise BadRequest("Invalid data sent.") basic_bundle = self.build_bundle(request=request) self.obj_delete_list_for_update(bundle=basic_bundle, **self.remove_api_resource_names(kwargs)) bundles_seen = [] for object_data in deserialized[self._meta.collection_name]: bundle = self.build_bundle(data=dict_strip_unicode_keys(object_data), request=request) # Attempt to be transactional, deleting any previously created # objects if validation fails. try: self.obj_create(bundle=bundle, **self.remove_api_resource_names(kwargs)) bundles_seen.append(bundle) except ImmediateHttpResponse: self.rollback(bundles_seen) raise if not self._meta.always_return_data: return http.HttpNoContent() else: to_be_serialized = {} to_be_serialized[self._meta.collection_name] = [self.full_dehydrate(bundle, for_list=True) for bundle in bundles_seen] to_be_serialized = self.alter_list_data_to_serialize(request, to_be_serialized) return self.create_response(request, to_be_serialized)
def patch_detail(self, request, **kwargs): """ Updates a resource in-place. Calls ``obj_update``. If the resource is updated, return ``HttpAccepted`` (202 Accepted). If the resource did not exist, return ``HttpNotFound`` (404 Not Found). """ request = convert_post_to_patch(request) basic_bundle = self.build_bundle(request=request) # We want to be able to validate the update, but we can't just pass # the partial data into the validator since all data needs to be # present. Instead, we basically simulate a PUT by pulling out the # original data and updating it in-place. # So first pull out the original object. This is essentially # ``get_detail``. try: obj = self.cached_obj_get(bundle=basic_bundle, **self.remove_api_resource_names(kwargs)) except ObjectDoesNotExist: return http.HttpNotFound() except MultipleObjectsReturned: return http.HttpMultipleChoices("More than one resource is found at this URI.") bundle = self.build_bundle(obj=obj, request=request) bundle = self.full_dehydrate(bundle) bundle = self.alter_detail_data_to_serialize(request, bundle) # Now update the bundle in-place. if django.VERSION >= (1, 4): body = request.body else: body = request.raw_post_data deserialized = self.deserialize(request, body, format=request.META.get('CONTENT_TYPE', 'application/json')) self.update_in_place(request, bundle, deserialized) if not self._meta.always_return_data: return http.HttpAccepted() else: bundle = self.full_dehydrate(bundle) bundle = self.alter_detail_data_to_serialize(request, bundle) return self.create_response(request, bundle, response_class=http.HttpAccepted)