我们从Python开源项目中,提取了以下25个代码示例,用于说明如何使用django.http.response.HttpResponseBadRequest()。
def paste_clipboard_to_folder(request): if True: # TODO: cleanly remove Clipboard code if it is no longer needed return HttpResponseBadRequest('not implemented anymore') if request.method == 'POST': folder = Folder.objects.get(id=request.POST.get('folder_id')) clipboard = Clipboard.objects.get(id=request.POST.get('clipboard_id')) if folder.has_add_children_permission(request): tools.move_files_from_clipboard_to_folder(clipboard, folder) tools.discard_clipboard(clipboard) else: raise PermissionDenied redirect = request.GET.get('redirect_to', '') if not redirect: redirect = request.POST.get('redirect_to', '') return HttpResponseRedirect( '{0}?order_by=-modified_at{1}'.format( redirect, admin_url_params_encoded(request, first_separator='&'), ) )
def get_graphql_params(request, data): query = request.GET.get('query') or data.get('query') variables = request.GET.get('variables') or data.get('variables') id = request.GET.get('id') or data.get('id') if variables and isinstance(variables, six.text_type): try: variables = json.loads(variables) except Exception: raise HttpError(HttpResponseBadRequest( 'Variables are invalid JSON.')) operation_name = request.GET.get( 'operationName') or data.get('operationName') if operation_name == "null": operation_name = None return query, variables, operation_name, id
def receive_callback(request): """ Parses SSO callback, validates, retrieves :model:`esi.Token`, and internally redirects to the target url. """ # make sure request has required parameters code = request.GET.get('code', None) state = request.GET.get('state', None) try: assert code assert state except AssertionError: return HttpResponseBadRequest() callback = get_object_or_404(CallbackRedirect, state=state, session_key=request.session.session_key) token = Token.objects.create_from_request(request) callback.token = token callback.save() return redirect(callback.url)
def render_to_response(self, context, **response_kwargs): try: scope_name = self.kwargs['scope'] range_start = self.request.GET.get('start', None) or None range_end = self.request.GET.get('end', None) or None if scope_name == "custom": if range_start is None: return HttpResponseBadRequest() range_start = int(range_start) if range_end is not None: range_end = int(range_end) data, start, end, resolution = D3GraphDataGenerator(self.node, self.object, scope_name, range_start, range_end).generate() return JsonResponse(data) except: logger.exception("Error rendering graph data for %s on %s", self.object, self.node) raise
def discard_clipboard(request): if True: # TODO: cleanly remove Clipboard code if it is no longer needed return HttpResponseBadRequest('not implemented anymore') if request.method == 'POST': clipboard = Clipboard.objects.get(id=request.POST.get('clipboard_id')) tools.discard_clipboard(clipboard) return HttpResponseRedirect( '{0}{1}'.format( request.POST.get('redirect_to', ''), admin_url_params_encoded(request, first_separator='&'), ) )
def delete_clipboard(request): if True: # TODO: cleanly remove Clipboard code if it is no longer needed return HttpResponseBadRequest('not implemented anymore') if request.method == 'POST': clipboard = Clipboard.objects.get(id=request.POST.get('clipboard_id')) tools.delete_clipboard(clipboard) return HttpResponseRedirect( '{0}{1}'.format( request.POST.get('redirect_to', ''), admin_url_params_encoded(request, first_separator='&'), ) )
def get(self, request): if self.conf.USE_POST: return HttpResponseBadRequest('Need POST') return self.process_request(request, request.GET)
def post(self, request): if not self.conf.USE_POST: return HttpResponseBadRequest('Need GET') return self.process_request(request, request.POST)
def parse_body(self, request): content_type = self.get_content_type(request) if content_type == 'application/graphql': return {'query': request.body.decode()} elif content_type == 'application/json': # noinspection PyBroadException try: body = request.body.decode('utf-8') except Exception as e: raise HttpError(HttpResponseBadRequest(str(e))) try: request_json = json.loads(body) if self.batch: assert isinstance(request_json, list), ( 'Batch requests should receive a list, but received {}.' ).format(repr(request_json)) assert len(request_json) > 0, ( 'Received an empty list in the batch request.' ) else: assert isinstance(request_json, dict), ( 'The received data is not a valid JSON query.' ) return request_json except AssertionError as e: raise HttpError(HttpResponseBadRequest(str(e))) except (TypeError, ValueError): raise HttpError(HttpResponseBadRequest( 'POST body sent invalid JSON.')) elif content_type in ['application/x-www-form-urlencoded', 'multipart/form-data']: return request.POST return {}
def form_invalid(self, form): if self.request.is_ajax(): return HttpResponseBadRequest(json.dumps(form.errors)) else: return super(SetView, self).form_invalid(form)
def main_data(request): if request.method != 'GET': return HttpResponseNotAllowed('GET') tables = request.session.get('tables', None) sugang_list = request.session.get('list', None) if tables is None or sugang_list is None: return HttpResponseBadRequest() json_dict = { 'total_simdata': tables, } # TODO: remove 'indent' option when frontend development is done return HttpResponse(json.dumps(json_dict, indent=2, ensure_ascii=False), content_type='application/json')
def post(self, request): tree = request.session.get('tree', None) node_title = request.data.get('node_title', None) if tree is None or node_title is None: return HttpResponseBadRequest() node = tree.find(node_title) if node is None: return HttpResponseBadRequest() convert_list = { 'title__icontains': 'title', 'code__contains': 'code', 'credit': 'credit', 'category': 'category', 'area': 'area', 'subarea': 'subarea', 'college': 'college', 'dept': 'dept', } query_list = {key: request.data.get(value, None) for key, value in convert_list} # Delete non-parameter keys from query for key, value in query_list: if value is None: del query_list[key] # TODO: add functions to get these variables query_list['year'] = '2017' query_list['semester'] = '1' retrieved = Course.objects.filter(**query_list) node.filter_true(retrieved) retrieved_list = [] for item in retrieved: course = {value: item.get(value, None) for key, value in convert_list} retrieved_list.append(course) json_root = {'data': retrieved_list} return JsonResponse(json.dumps(json_root))
def get(self, request, *args, **kwargs): """Using given `statistic`, return the result JSON.""" statistic = request.GET.get("statistic") if not statistic: return HttpResponseBadRequest() # Call the stats function with request.GET parameters stats_func = getattr(stats, "stats_%s" % statistic, None) if stats_func and inspect.isfunction(stats_func): result = stats_func(request.GET) return JsonResponse(result) return HttpResponseBadRequest()
def post(self, request, *args, **kwargs): # Check if the secret key matches if request.META.get('HTTP_AUTH_SECRET') != 'supersecretkey': return HttpResponseForbidden('Auth key incorrect') form_class = modelform_factory(DataPoint, fields=['node_name', 'data_type', 'data_value']) form = form_class(request.POST) if form.is_valid(): form.save() return HttpResponse() else: return HttpResponseBadRequest()
def post(self, request, **kwargs): key = request.POST.get('secret_key') if not key: return HttpResponseBadRequest('secret_key not provided.', status=401) if not key == getattr(settings, 'RPI_SECRET_KEY'): return HttpResponseBadRequest('secret_key not valid.', status=401) mac = request.POST.get('mac_address') ip = get_client_ip(request) time = datetime.datetime.now() is_new = False try: pi = RaspberryPi.objects.get(mac=mac) except RaspberryPi.DoesNotExist: is_new = True pi = RaspberryPi( mac=mac, name=RaspberryPi.suggest_name(), ) pi.ip = ip pi.last_seen = time pi.full_clean() pi.save() return JsonResponse( data={ 'name': pi.name, 'mac': pi.mac, 'ip': pi.ip, 'last_seen': pi.last_seen, }, status=201 if is_new else 200 )
def index(request): if request.method == 'GET': # ? request ??????? (signature, timestamp, nonce, xml) signature = request.GET.get('signature') timestamp = request.GET.get('timestamp') nonce = request.GET.get('nonce') # ????? if not wechat_instance.check_signature( signature=signature, timestamp=timestamp, nonce=nonce): return HttpResponseBadRequest('Verify Failed') return HttpResponse( request.GET.get('echostr', ''), content_type="text/plain") # POST?? # ??????? XML ?? try: wechat_instance.parse_data(data=request.body) except ParseError: return HttpResponseBadRequest('Invalid XML Data') # ??????????? if isinstance(wechat_instance.message, TextMessage): response = text_message_deal(wechat_instance) elif isinstance(wechat_instance.message, ShortVideoMessage): response = video_message_deal(wechat_instance) elif isinstance(wechat_instance.message, VoiceMessage): response = voice_message_deal(wechat_instance) elif isinstance(wechat_instance.message, EventMessage): response = event_message_deal(wechat_instance) return HttpResponse(response, content_type="application/xml")
def post(self, request, *args, **kwargs): """ Receive a webhook POST request. """ # Convert the topic to a signal name and trigger it. signal_name = get_signal_name_for_topic(request.webhook_topic) try: signals.webhook_received.send_robust(self, domain = request.webhook_domain, topic = request.webhook_topic, data = request.webhook_data) getattr(signals, signal_name).send_robust(self, domain = request.webhook_domain, topic = request.webhook_topic, data = request.webhook_data) except AttributeError: return HttpResponseBadRequest() # All good, return a 200. return HttpResponse('OK')
def wechat_home(request): # get signature, timestamp and nonce signature = request.GET.get('signature') timestamp = request.GET.get('timestamp') nonce = request.GET.get('nonce') # create a newInstance wechat_instance = WechatBasic(conf=conf) # check_signature function tells whether the request is sent by wechat # not checked if not wechat_instance.check_signature(signature=signature, timestamp=timestamp, nonce=nonce): return HttpResponseBadRequest('Verify Failed') else: # checked # GET method represents that Wechat sent verification information if request.method == 'GET': response = request.GET.get('echostr', 'error') # POST method stands for Wechat sent user's messages else: try: wechat_instance.parse_data(request.body) # parse data from instance message = wechat_instance.get_message() # get message # classify the type of message if isinstance(message, TextMessage): # text message reply_text = 'text' elif isinstance(message, VoiceMessage): # voice message reply_text = 'voice' elif isinstance(message, ImageMessage): # image message reply_text = 'image' elif isinstance(message, LinkMessage): # link message reply_text = 'link' elif isinstance(message, LocationMessage): # location message reply_text = 'location' elif isinstance(message, VideoMessage): # video message reply_text = 'video' elif isinstance(message, ShortVideoMessage): # shortvideo message reply_text = 'shortvideo' else: reply_text = 'other' # other message response = wechat_instance.response_text(content=reply_text) except ParseError: # ERROR when parsing return HttpResponseBadRequest('Invalid XML Data') # reply with our defined message return HttpResponse(response, content_type="application/xml") # END
def execute_graphql_request(self, request, data, query, variables, operation_name, show_graphiql=False): if not query: if show_graphiql: return None raise HttpError(HttpResponseBadRequest( 'Must provide query string.')) source = Source(query, name='GraphQL request') try: document_ast = parse(source) validation_errors = validate(self.schema, document_ast) if validation_errors: return ExecutionResult( errors=validation_errors, invalid=True, ) except Exception as e: return ExecutionResult(errors=[e], invalid=True) if request.method.lower() == 'get': operation_ast = get_operation_ast(document_ast, operation_name) if operation_ast and operation_ast.operation != 'query': if show_graphiql: return None raise HttpError(HttpResponseNotAllowed( ['POST'], 'Can only perform a {} operation from a POST request.'.format( operation_ast.operation) )) try: return self.execute( document_ast, root_value=self.get_root_value(request), variable_values=variables, operation_name=operation_name, context_value=self.get_context(request), middleware=self.get_middleware(request), executor=self.executor, ) except Exception as e: return ExecutionResult(errors=[e], invalid=True)
def post(self, request: HttpRequest, *args: Any, **kwargs: Any) -> HttpResponse: # POST received: # <QueryDict: { # 'client_id': ['docker'], # 'refresh_token': ['boink'], # 'service': ['registry.maurusnet.test'], # 'scope': ['repository:dev/destalinator:push,pull'], # 'grant_type': ['refresh_token']}> if "refresh_token" in request.POST and request.POST["grant_type"] == "refresh_token": tr = _tkr_parse(request.POST) if tr.scope: tp = TokenPermissions.parse_scope(tr.scope) else: return HttpResponseBadRequest("Can't issue access token without valid scope (scope=%s)", tr.scope) try: client = DockerRegistry.objects.get(client_id=tr.service) # type: DockerRegistry except DockerRegistry.DoesNotExist: return HttpResponseNotFound("No such registry/client from refresh token(%s)" % str(tr)) user = self._user_from_refresh_token(request.POST["refresh_token"], client.public_key_pem(), expected_issuer=request.get_host(), expected_audience=tr.service) if user: try: drepo = DockerRepo.objects.get(name=tp.path, registry_id=client.id) except DockerRepo.DoesNotExist: if settings.DOCKERAUTH_ALLOW_UNCONFIGURED_REPOS: drepo = DockerRepo() drepo.name = tp.path drepo.registry = client drepo.unauthenticated_read = True drepo.unauthenticated_write = True else: return HttpResponseNotFound("No such repo '%s'" % tp.path) if drepo.registry.has_access(user, tp) or drepo.has_access(user, tp): rightnow = datetime.datetime.now(tz=pytz.UTC) return HttpResponse(content=json.dumps({ "access_token": self._create_jwt( self._make_access_token(request, tr, rightnow, tp, user), client.private_key_pem(), ), "scope": tr.scope, "expires_in": 119, "refresh_token": self._create_jwt( self._make_refresh_token(request, tr, rightnow, user), client.private_key_pem(), ) }), status=200, content_type="application/json") else: return HttpResponseForbidden("User %s doesn't have access to repo %s" % (user.pk, tp.path)) else: return HttpResponse("Unauthorized", status=401) else: return HttpResponseBadRequest("POSTing to this endpoint requires a refresh_token")