我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用falcon.HTTPBadRequest()。
def _first_hook(req, resp, resource, params): if resource.req_ids is None: raise falcon.HTTPBadRequest(title='Append request id failed', description='Append request id failed') if((req.env['calplus.cloud'] != 'cloud1') or ('request-id' not in req.env)): raise falcon.HTTPBadRequest(title='Process Request Error', description='Problem when process request') if not req.client_accepts_json: raise falcon.HTTPNotAcceptable( 'This API only supports responses encoded as JSON.', href='http://docs.examples.com/api/json') if req.method in ('POST', 'PUT'): if 'application/json' not in req.content_type: raise falcon.HTTPUnsupportedMediaType( 'This API only supports requests encoded as JSON.', href='http://docs.examples.com/api/json')
def _parse_logical_op(self, arg, value, op, prevent_expand=True): if isinstance(value, dict): return self._build_filter_expressions(value, op, prevent_expand=prevent_expand) if not isinstance(value, list): raise HTTPBadRequest('Invalid attribute', 'Filter attribute {} is invalid'.format(arg)) parts = [] for subconditions in value: if not isinstance(subconditions, dict): raise HTTPBadRequest('Invalid attribute', 'Filter attribute {} is invalid'.format(arg)) subexpressions = self._build_filter_expressions(subconditions, 'must', prevent_expand=prevent_expand) if subexpressions is not None: parts.append(subexpressions) result = None if len(parts) > 1: parts = self._group_nested(parts, op) if len(parts) > 1: result = {'bool': {op: parts}} elif len(parts) == 1: result = parts[0] if op != 'must_not' else {'bool': {'must_not': parts[0]}} return result
def get_queryset(self, req, resp): query_term = self.get_param_or_post(req, self.PARAM_TEXT_QUERY) search = self.get_param_or_post(req, self.PARAM_SEARCH) if search: try: req.params['__raw__'] = json.loads(search) except ValueError: raise HTTPBadRequest('Invalid attribute', 'Value of {} filter attribute is invalid'.format(self.PARAM_SEARCH)) order = self.get_param_or_post(req, self.PARAM_ORDER) queryset = self.objects_class.objects(**req.params) if query_term is not None: queryset = queryset.search_text(query_term) if order: queryset = queryset.order_by(order) return queryset
def get_total_objects(self, queryset, totals): """ Return total number of results in a query. :param queryset: queryset object from :func:`get_queryset` :param totals: a list of dicts with aggregate function as key and column as value :type totals: list :return: dict with totals calculated in this query, ex. total_count with number of results :rtype: dict """ if not totals: return {} for total in totals: if len(total) > 1 or 'count' not in total or total['count'] is not None: raise falcon.HTTPBadRequest('Invalid attribute', 'Only _count_ is supported in the _totals_ param') return {'total_count': queryset.count()}
def on_post(self, req, resp, revision_id, tag=None): """Creates a revision tag.""" body = req.stream.read(req.content_length or 0) try: tag_data = yaml.safe_load(body) except yaml.YAMLError as e: error_msg = ("Could not parse the request body into YAML data. " "Details: %s." % e) LOG.error(error_msg) raise falcon.HTTPBadRequest(description=e) try: resp_tag = db_api.revision_tag_create(revision_id, tag, tag_data) except errors.RevisionNotFound as e: raise falcon.HTTPNotFound(description=e.format_message()) except errors.RevisionTagBadFormat as e: raise falcon.HTTPBadRequest(description=e.format_message()) resp_body = revision_tag_view.ViewBuilder().show(resp_tag) resp.status = falcon.HTTP_201 resp.body = resp_body
def _show_validation_entry(self, req, resp, revision_id, validation_name, entry_id): try: entry_id = int(entry_id) except ValueError: raise falcon.HTTPBadRequest( description='The {entry_id} parameter must be an integer.') try: entry = db_api.validation_get_entry( revision_id, validation_name, entry_id) except (errors.RevisionNotFound, errors.ValidationNotFound) as e: raise falcon.HTTPNotFound(description=e.format_message()) resp_body = self.view_builder.show_entry(entry) return resp_body
def on_post(self, req, resp, revision_id): try: latest_revision = db_api.revision_get_latest() except errors.RevisionNotFound as e: raise falcon.HTTPNotFound(description=e.format_message()) for document in latest_revision['documents']: if document['metadata'].get('storagePolicy') == 'encrypted': policy.conditional_authorize( 'deckhand:create_encrypted_documents', req.context) break try: rollback_revision = db_api.revision_rollback( revision_id, latest_revision) except errors.InvalidRollback as e: raise falcon.HTTPBadRequest(description=e.format_message()) revision_resp = self.view_builder.show(rollback_revision) resp.status = falcon.HTTP_201 resp.body = revision_resp
def process_request(self, req, resp, **kwargs): if 'multipart/form-data' not in (req.content_type or ''): return # This must be done to avoid a bug in cgi.FieldStorage. req.env.setdefault('QUERY_STRING', '') # To avoid all stream consumption problem which occurs in falcon 1.0.0 # or above. stream = (req.stream.stream if hasattr(req.stream, 'stream') else req.stream) try: form = self.parse(stream=stream, environ=req.env) except ValueError as e: # Invalid boundary? raise falcon.HTTPBadRequest('Error parsing file', str(e)) for key in form: # TODO: put files in req.files instead when #418 get merged. req._params[key] = self.parse_field(form[key])
def on_get(self, req, resp, model_name): try: model = get_model(model_name) output = { 'dep_types': get_dep_types(model), 'ent_types': get_ent_types(model), 'pos_types': get_pos_types(model) } resp.body = json.dumps(output, sort_keys=True, indent=2) resp.content_type = 'text/string' resp.append_header('Access-Control-Allow-Origin', "*") resp.status = falcon.HTTP_200 except Exception as e: raise falcon.HTTPBadRequest( 'Schema construction failed', '{}'.format(e)) resp.status = falcon.HTTP_500
def on_post(self, req, resp): req_body = req.stream.read() json_data = json.loads(req_body.decode('utf8')) text = json_data.get('text') model_name = json_data.get('model', 'en') collapse_punctuation = json_data.get('collapse_punctuation', True) collapse_phrases = json_data.get('collapse_phrases', True) try: model = get_model(model_name) parse = Parse(model, text, collapse_punctuation, collapse_phrases) resp.body = json.dumps(parse.to_json(), sort_keys=True, indent=2) resp.content_type = 'text/string' resp.append_header('Access-Control-Allow-Origin', "*") resp.status = falcon.HTTP_200 except Exception as e: raise falcon.HTTPBadRequest( 'Dependency parsing failed', '{}'.format(e)) resp.status = falcon.HTTP_500
def test_int_neg(self, simulate_request, client, resource): client.app.add_route('/', resource) query_string = 'marker=deadbeef&pos=-7' simulate_request(client=client, path='/', query_string=query_string) req = resource.captured_req assert req.get_param_as_int('pos') == -7 assert req.get_param_as_int('pos', min=-10, max=10) == -7 assert req.get_param_as_int('pos', max=10) == -7 with pytest.raises(falcon.HTTPBadRequest): req.get_param_as_int('pos', min=-6, max=0) with pytest.raises(falcon.HTTPBadRequest): req.get_param_as_int('pos', min=-6) with pytest.raises(falcon.HTTPBadRequest): req.get_param_as_int('pos', min=0, max=10) with pytest.raises(falcon.HTTPBadRequest): req.get_param_as_int('pos', min=0, max=10)
def process_request(self, req, resp): # req.stream corresponds to the WSGI wsgi.input environ variable, # and allows you to read bytes from the request body. # # See also: PEP 3333 if req.content_length in (None, 0): # Nothing to do return body = req.stream.read() if not body: raise falcon.HTTPBadRequest('Empty request body', 'A valid JSON document is required.') try: req.context['doc'] = json.loads(body.decode('utf-8')) except (ValueError, UnicodeDecodeError): raise falcon.HTTPError(falcon.HTTP_753, 'Malformed JSON', 'Could not decode the request body. The ' 'JSON was incorrect or not encoded as ' 'UTF-8.')
def read_json_msg_body(req): """Read the json_msg from the http request body and return as JSON. :param req: HTTP request object. :return: Returns the metrics as a JSON object. :raises falcon.HTTPBadRequest: """ try: msg = req.stream.read() json_msg = rest_utils.from_json(msg) return json_msg except exceptions.DataConversionException as ex: LOG.debug(ex) raise falcon.HTTPBadRequest('Bad request', 'Request body is not valid JSON') except ValueError as ex: LOG.debug(ex) raise falcon.HTTPBadRequest('Bad request', 'Request body is not valid JSON')
def verify_credentials(req, resp, resource, params): ''' Decorator method to verify whether email and password are present in data and also the email is valid or not. ''' data = req.stream # extract required parameters from the data. try: email = data['email'] data['password'] except KeyError: msg = "Either email or password is not present or the email is invalid." raise falcon.HTTPBadRequest('Incomplete credentials', msg) if not login_utils.is_valid_email(email): msg = 'This email address does not exist.' raise falcon.HTTPUnauthorized('Invalid credentials', msg, False)
def on_post(self, req, resp, method=None): if not self.has_param(req, 'uuid'): raise falcon.HTTPBadRequest("Please provide the 'uuid' parameter, specifying which Productstatus resource to process.") if not self.has_param(req, 'adapter'): raise falcon.HTTPBadRequest("Please provide the 'adapter' parameter, specifying which adapter should process the resource.") try: self.exec_functions(req, resp, method, ['productinstance', 'datainstance']) resp.status = falcon.HTTP_202 except productstatus.exceptions.NotFoundException as e: raise falcon.HTTPBadRequest('The Productstatus resource could not be found: %s' % e) except productstatus.exceptions.ServiceUnavailableException as e: raise falcon.HTTPServiceUnavailable('An error occurred when retrieving Productstatus resources: %s' % e)
def process_request(self, req, resp): req.context['body'] = '' if req.content_length in (None, 0): return body = req.stream.read() if not body: raise falcon.HTTPBadRequest('Empty request body', 'A valid JSON document is required.') try: req.context['body'] = body.decode('utf-8') req.context['doc'] = json.loads(req.context['body']) except (ValueError, UnicodeDecodeError): raise falcon.HTTPError( falcon.HTTP_753, 'Malformed JSON', 'Could not decode the request body. The JSON was incorrect or not encoded as UTF-8.', )
def read_body_as_json(req): """Reads the request body and returns a dict with the content If body is empty, returns a void python dictionary :return: A dictionary or similar python object (list) :rtype: Object """ try: body_req = req.stream.read().decode('utf-8') if body_req is None or body_req == "": return {} else: return json.loads(body_req) except (json.decoder.JSONDecodeError) as err: msg = ("Please, read the documentation carefully and try again. " "Couldn't decode the input stream (body).") raise falcon.HTTPBadRequest( title="Couldn't read body correctly from HTTP request", description=str(msg))
def on_post(self, req, resp): payload = json.loads(req.stream.read()) pi = PredictionInput() if ('url' in payload.keys()) and ('modelId' in payload.keys()): pi.url = payload['url'] pi.model_id = payload['modelId'] else: resp.status = falcon.HTTP_400 raise falcon.HTTPBadRequest("Bad Request", "Url and(or) modelId missing in the payload") po = self.caffe_worker_client.predict(pi) if po.bo.status == 'Success': resp.status = falcon.HTTP_200 resp.body = (str(po.values)) elif po.bo.status == 'Failure': resp.body = json.dumps({'status': 'Failure', 'message' : 'Error occurred'}) resp.status = falcon.HTTP_500 raise falcon.HTTPInternalServerError('Internal Server Error', 'Predict failed! ')
def on_post(self, req, resp, parsed): model = models.UserScores( username=parsed.get('username'), company=parsed.get('company'), score=parsed.get('score') ) try: model.save(self.db.session) except IntegrityError: raise falcon.HTTPBadRequest( 'Username exists', 'Could not create user due to username already existing' ) resp.status = falcon.HTTP_201 resp.body = self.format_body({ 'id': model.id })
def validate(schema): def decorator(func): def wrapper(self, req, resp, *args, **kwargs): try: raw_json = req.stream.read() obj = json.loads(raw_json.decode('utf-8')) except Exception: raise falcon.HTTPBadRequest( 'Invalid data', 'Could not properly parse the provided data as JSON' ) try: jsonschema.validate(obj, schema) except jsonschema.ValidationError as e: raise falcon.HTTPBadRequest( 'Failed data validation', e.message ) return func(self, req, resp, *args, parsed=obj, **kwargs) return wrapper return decorator
def guarded_session(): ''' Context manager that will automatically close session on exceptions ''' try: session = Session() yield session except IrisValidationException as e: session.close() raise HTTPBadRequest('Validation error', str(e)) except (HTTPForbidden, HTTPUnauthorized, HTTPNotFound, HTTPBadRequest): session.close() raise except Exception: session.close() logger.exception('SERVER ERROR') raise
def on_post(self, req, resp, plan_id): post_body = ujson.loads(req.context['body']) try: active = int(post_body['active']) except KeyError: raise HTTPBadRequest('"active" field required') except ValueError: raise HTTPBadRequest('Invalid active field') with db.guarded_session() as session: if active: session.execute( '''INSERT INTO `plan_active` (`name`, `plan_id`) VALUES ((SELECT `name` FROM `plan` WHERE `id` = :plan_id), :plan_id) ON DUPLICATE KEY UPDATE `plan_id`=:plan_id''', {'plan_id': plan_id}) else: session.execute('DELETE FROM `plan_active` WHERE `plan_id`=:plan_id', {'plan_id': plan_id}) session.commit() session.close() resp.status = HTTP_200 resp.body = ujson.dumps(active)
def on_post(self, req, resp, template_id): template_params = ujson.loads(req.context['body']) try: active = int(template_params['active']) except ValueError: raise HTTPBadRequest('Invalid active argument', 'active must be an int') except KeyError: raise HTTPBadRequest('Missing active argument') with db.guarded_session() as session: if active: session.execute( '''INSERT INTO `template_active` (`name`, `template_id`) VALUES ((SELECT `name` FROM `template` WHERE `id` = :template_id), :template_id) ON DUPLICATE KEY UPDATE `template_id`=:template_id''', {'template_id': template_id}) else: session.execute('DELETE FROM `template_active` WHERE `template_id`=:template_id', {'template_id': template_id}) session.commit() session.close() resp.status = HTTP_200 resp.body = ujson.dumps(active)
def on_delete(self, req, resp, app_name): if not req.context['is_admin']: raise HTTPUnauthorized('Only admins can remove apps') affected = False with db.guarded_session() as session: try: affected = session.execute('DELETE FROM `application` WHERE `name` = :app_name', {'app_name': app_name}).rowcount session.commit() session.close() except IntegrityError: raise HTTPBadRequest('Cannot remove app. It has likely already in use.') if not affected: raise HTTPBadRequest('No rows changed; app name probably already deleted') resp.body = '[]'
def on_post(self, req, resp, app_name): if not req.context['is_admin']: raise HTTPUnauthorized('You must be an admin to rekey an app') data = { 'app_name': app_name, 'new_key': hashlib.sha256(os.urandom(32)).hexdigest() } affected = False with db.guarded_session() as session: affected = session.execute( 'UPDATE `application` SET `key` = :new_key WHERE `name` = :app_name', data).rowcount session.commit() session.close() if not affected: raise HTTPBadRequest('No rows changed; app name likely incorrect') logger.info('Admin user %s has re-key\'d app %s', req.context['username'], app_name) resp.body = '[]'
def on_put(self, req, resp, username): try: data = ujson.loads(req.context['body']) except ValueError: raise HTTPBadRequest('Invalid json in post body') connection = db.engine.raw_connection() cursor = connection.cursor() chosen_timezone = data.get('timezone') if chosen_timezone and chosen_timezone in self.supported_timezones: try: cursor.execute(update_username_settings_query, {'name': 'timezone', 'value': chosen_timezone, 'username': req.context['username']}) connection.commit() except Exception: logger.exception('Failed setting timezone to %s for user %s', chosen_timezone, req.context['username']) cursor.close() connection.close() resp.body = '[]' resp.status = HTTP_204
def on_post(self, req, resp): gmail_params = ujson.loads(req.context['body']) try: msg_id = int(gmail_params['msg_id']) email_address = gmail_params['email_address'] cmd = gmail_params['cmd'] except (ValueError, KeyError): raise HTTPBadRequest('Post body missing required key or key of wrong type') if cmd != 'claim': raise HTTPBadRequest('GmailOneClick only supports claiming individual messages') try: app, response = self.handle_user_response('email', msg_id, email_address, cmd) except Exception: logger.exception('Failed to handle gmail one click response: %s' % gmail_params) raise success, re = self.create_email_message(app, email_address, response, response) if not success: logger.error('Failed to send user response email: %s' % re) raise HTTPBadRequest('Failed to send user response email', re) resp.status = HTTP_204
def on_post(self, req, resp): post_dict = parse_qs(req.context['body']) msg_id = req.get_param('message_id', required=True) if 'Digits' not in post_dict: raise HTTPBadRequest('Digits argument not found') # For phone call callbacks, To argument is the target and From is the # twilio number if 'To' not in post_dict: raise HTTPBadRequest('To argument not found') digits = post_dict['Digits'][0] source = post_dict['To'][0] try: _, response = self.handle_user_response('call', msg_id, source, digits) except Exception: logger.exception('Failed to handle call response: %s' % digits) raise else: resp.status = HTTP_200 resp.body = ujson.dumps({'app_response': response})
def on_post(self, req, resp): post_dict = parse_qs(req.context['body']) if 'Body' not in post_dict: raise HTTPBadRequest('SMS body not found', 'Missing Body argument in post body') if 'From' not in post_dict: raise HTTPBadRequest('From argument not found', 'Missing From in post body') source = post_dict['From'][0] body = post_dict['Body'][0] try: msg_id, content = utils.parse_response(body.strip(), 'sms', source) except (ValueError, IndexError): raise HTTPBadRequest('Invalid response', 'failed to parse response') try: _, response = self.handle_user_response('sms', msg_id, source, content) except Exception: logger.exception('Failed to handle sms response: %s' % body) raise else: resp.status = HTTP_200 resp.body = ujson.dumps({'app_response': response})
def on_post(self, req, resp): slack_params = ujson.loads(req.context['body']) try: msg_id = int(slack_params['msg_id']) source = slack_params['source'] content = slack_params['content'] except KeyError: raise HTTPBadRequest('Post body missing required key') # Process claim all with parse_response. Not needed for claim, since message id is # already known in this case. if content == 'claim all': msg_id, content = utils.parse_response(content, 'slack', source) try: _, response = self.handle_user_response('slack', msg_id, source, content) except Exception: logger.exception('Failed to handle slack response: %s' % req.context['body']) raise HTTPBadRequest('Bad Request', 'Failed to handle slack response') else: resp.status = HTTP_200 resp.body = ujson.dumps({'app_response': response})
def on_post(self, req, resp): """ Accept twilio POST that has message delivery status, and pass it to iris-api """ try: re = self.iclient.post(self.endpoint, req.context['body'], raw=True) except MaxRetryError: logger.exception('Failed posting data to iris-api') raise falcon.HTTPInternalServerError('Internal Server Error', 'API call failed') if re.status is not 204: logger.error('Invalid response from API for delivery status update: %s', re.status) raise falcon.HTTPBadRequest('Likely bad params passed', 'Invalid response from API') resp.status = falcon.HTTP_204
def on_post(self, req, resp): if req.get_param('output') == 'parquet-mr': json_schema = req.bounded_stream.read() file = tempfile.NamedTemporaryFile(delete=False, suffix='.json') try: file.write(json_schema) file.close() output = subprocess.check_output( [JSONSCHEMA_PARQUET_PATH, 'parquet', '--deref', file.name], ) resp.body = json.dumps({'parquet-mr': output.decode()}) finally: os.remove(file.name) else: raise falcon.HTTPBadRequest( 'Bad request', 'Invalid or missing output parameter.')
def from_json(self, datastring): try: return json.loads(datastring, object_hook=self._sanitizer) except ValueError: msg = 'Malformed JSON in request body.' raise falcon.HTTPBadRequest( title='Malformed JSON', description=msg)
def default(self, request): if self.has_body(request): body = request.stream.read(request.content_length) return {'body': self.from_json(body.decode("utf-8"))} else: raise falcon.HTTPBadRequest('Empty request body', 'A valid JSON doc is required')
def get_queryset(self, req, resp): query = self.get_base_query(req, resp) conditions = {} if 'doc' in req.context: conditions = dict(req.context['doc']) # ignore any special params except SEARCH and ORDER for param in self.get_special_params(): conditions.pop(param, None) conditions.update(req.params) if self.PARAM_SEARCH in conditions: search = conditions.pop(self.PARAM_SEARCH) try: conditions.update(json.loads(search) if isinstance(search, str) else search) except ValueError: raise HTTPBadRequest('Invalid attribute', 'Value of {} filter attribute is invalid'.format(self.PARAM_SEARCH)) order = conditions.pop(self.PARAM_ORDER, None) if not order: return self.filter_by(query, conditions) if isinstance(order, str): if (order[0] == '{' and order[-1] == '}') or (order[0] == '[' and order[-1] == ']'): try: order = json.loads(order) except ValueError: # not valid json, ignore and try to parse as an ordinary list of attributes pass if not isinstance(order, list) and not isinstance(order, dict): order = [order] order_expressions = self._build_order_expressions(order) if order_expressions: query = query.sort(*order_expressions) return self.filter_by(query, conditions, order_criteria=order_expressions)
def _parse_logical_op(self, arg, value, default_op, relationships): """ :param arg: condition name :type arg: str :param value: condition value :type value: dict | list :param default_op: a default operator to join all filter expressions :type default_op: function :param relationships: a dict with all joins to apply, describes current state in recurrent calls :type relationships: dict :return: expressions list :rtype: list """ if isinstance(value, dict): return self._build_filter_expressions(value, default_op, relationships) if not isinstance(value, list): raise HTTPBadRequest('Invalid attribute', 'Filter attribute {} is invalid'.format(arg)) expressions = [] for subconditions in value: if not isinstance(subconditions, dict): raise HTTPBadRequest('Invalid attribute', 'Filter attribute {} is invalid'.format(arg)) subexpressions = self._build_filter_expressions(subconditions, and_, relationships) if subexpressions is not None: expressions.append(subexpressions) result = None if len(expressions) > 1: result = default_op(*expressions) if default_op != not_ else not_(and_(*expressions)) elif len(expressions) == 1: result = expressions[0] if default_op != not_ else not_(expressions[0]) return result
def process_request(self, req, resp): """ Converts request input data from JSON to a dict. :param req: Falcon request :type req: falcon.request.Request :param resp: Falcon response :type resp: falcon.response.Response """ # req.stream corresponds to the WSGI wsgi.input environ variable, # and allows you to read bytes from the request body. # # See also: PEP 3333 if req.content_length in (None, 0): # Nothing to do return body = req.stream.read() if not body: raise falcon.HTTPBadRequest('Empty request body', 'A valid JSON document is required.') try: req.context['doc'] = json.loads(body.decode('utf-8')) except (ValueError, UnicodeDecodeError): raise falcon.HTTPError(falcon.HTTP_753, 'Malformed JSON', 'Could not decode the request body. The ' 'JSON was incorrect or not encoded as ' 'UTF-8.')
def process_resource(self, req, resp, resource, params): """ :param req: Falcon request :type req: falcon.request.Request :param resp: Falcon response :type resp: falcon.response.Response :param resource: :type resource: falcon_dbapi.resources.base.BaseCollectionResource| falcon_dbapi.resources.base.BaseSingleResource :param params: parameters dict :type params: dict """ if resource is None or req.method not in ['POST', 'PUT', 'PATCH']: return body = req.stream.read() if not body: raise falcon.HTTPBadRequest('Empty request body', 'At least one value is required') try: body = body.decode('utf-8') except UnicodeDecodeError: raise falcon.HTTPBadRequest('Invalid request body', 'A valid UTF-8 encoded document is required') req.context['doc'] = parse_query_string(body, keep_blank_qs_values=True, parse_qs_csv=False)
def on_post(self, req, resp, revision_id, validation_name): validation_data = req.stream.read(req.content_length or 0) try: validation_data = yaml.safe_load(validation_data) except yaml.YAMLError as e: error_msg = ("Could not parse the validation into YAML data. " "Details: %s." % e) LOG.error(error_msg) raise falcon.HTTPBadRequest(description=six.text_type(e)) if not validation_data: error_msg = 'Validation payload must be provided.' LOG.error(error_msg) raise falcon.HTTPBadRequest(description=error_msg) if not all([validation_data.get(x) for x in ('status', 'validator')]): error_msg = 'Validation payload must contain keys: %s.' % ( ', '.join(['"status"', '"validator"'])) LOG.error(error_msg) raise falcon.HTTPBadRequest(description=error_msg) try: resp_body = db_api.validation_create( revision_id, validation_name, validation_data) except errors.RevisionNotFound as e: raise falcon.HTTPNotFound(description=e.format_message()) resp.status = falcon.HTTP_201 resp.append_header('Content-Type', 'application/x-yaml') resp.body = self.view_builder.show(resp_body)
def on_put(self, req, resp, bucket_name=None): document_data = req.stream.read(req.content_length or 0) try: documents = list(yaml.safe_load_all(document_data)) except yaml.YAMLError as e: error_msg = ("Could not parse the document into YAML data. " "Details: %s." % e) LOG.error(error_msg) raise falcon.HTTPBadRequest(description=six.text_type(e)) # NOTE: Must validate documents before doing policy enforcement, # because we expect certain formatting of the documents while doing # policy enforcement. If any documents fail basic schema validaiton # raise an exception immediately. try: doc_validator = document_validation.DocumentValidation(documents) validations = doc_validator.validate_all() except (deckhand_errors.InvalidDocumentFormat, deckhand_errors.InvalidDocumentSchema) as e: LOG.exception(e.format_message()) raise falcon.HTTPBadRequest(description=e.format_message()) for document in documents: if document['metadata'].get('storagePolicy') == 'encrypted': policy.conditional_authorize( 'deckhand:create_encrypted_documents', req.context) break self._prepare_secret_documents(documents) created_documents = self._create_revision_documents( bucket_name, documents, validations) resp.body = self.view_builder.list(created_documents) resp.status = falcon.HTTP_200
def test_required(self, simulate_request, client, resource, method_name): client.app.add_route('/', resource) query_string = '' simulate_request(client=client, path='/', query_string=query_string) req = resource.captured_req try: getattr(req, method_name)('marker', required=True) pytest.fail('falcon.HTTPMissingParam not raised') except falcon.HTTPMissingParam as ex: assert isinstance(ex, falcon.HTTPBadRequest) assert ex.title == 'Missing parameter' expected_desc = 'The "marker" parameter is required.' assert ex.description == expected_desc
def test_boolean(self, simulate_request, client, resource): client.app.add_route('/', resource) query_string = ('echo=true&doit=false&bogus=bar&bogus2=foo&' 't1=True&f1=False&t2=yes&f2=no&blank&one=1&zero=0&' 'checkbox1=on&checkbox2=off') simulate_request(client=client, path='/', query_string=query_string) req = resource.captured_req with pytest.raises(falcon.HTTPBadRequest): req.get_param_as_bool('bogus') try: req.get_param_as_bool('bogus2') except Exception as ex: assert isinstance(ex, falcon.HTTPInvalidParam) assert ex.title == 'Invalid parameter' expected_desc = ('The "bogus2" parameter is invalid. ' 'The value of the parameter must be "true" ' 'or "false".') assert ex.description == expected_desc assert req.get_param_as_bool('echo') is True assert req.get_param_as_bool('doit') is False assert req.get_param_as_bool('t1') is True assert req.get_param_as_bool('t2') is True assert req.get_param_as_bool('f1') is False assert req.get_param_as_bool('f2') is False assert req.get_param_as_bool('one') is True assert req.get_param_as_bool('zero') is False assert req.get_param('blank') is None assert req.get_param_as_bool('checkbox1') is True assert req.get_param_as_bool('checkbox2') is False store = {} assert req.get_param_as_bool('echo', store=store) is True assert store['echo'] is True
def test_http_error_repr(): error = falcon.HTTPBadRequest() _repr = '<%s: %s>' % (error.__class__.__name__, error.status) assert error.__repr__() == _repr
def test_misc(self, client): self._misc_test(client, falcon.HTTPBadRequest, falcon.HTTP_400) self._misc_test(client, falcon.HTTPNotAcceptable, falcon.HTTP_406, needs_title=False) self._misc_test(client, falcon.HTTPConflict, falcon.HTTP_409) self._misc_test(client, falcon.HTTPPreconditionFailed, falcon.HTTP_412) self._misc_test(client, falcon.HTTPUnsupportedMediaType, falcon.HTTP_415, needs_title=False) self._misc_test(client, falcon.HTTPUnprocessableEntity, falcon.HTTP_422) self._misc_test(client, falcon.HTTPUnavailableForLegalReasons, falcon.HTTP_451, needs_title=False) self._misc_test(client, falcon.HTTPInternalServerError, falcon.HTTP_500) self._misc_test(client, falcon.HTTPBadGateway, falcon.HTTP_502)
def test_required_header(self, client): resource = testing.SimpleTestResource(body=SAMPLE_BODY) client.app.add_route('/', resource) client.simulate_get() try: req = resource.captured_req req.get_header('X-Not-Found', required=True) pytest.fail('falcon.HTTPMissingHeader not raised') except falcon.HTTPMissingHeader as ex: assert isinstance(ex, falcon.HTTPBadRequest) assert ex.title == 'Missing header value' expected_desc = 'The X-Not-Found header is required.' assert ex.description == expected_desc
def test_jsonschema_validation_failure(): req = RequestStub() req.media = {} res = SampleResource() with pytest.raises(falcon.HTTPBadRequest) as err: res.on_get(req, None) assert err.value.description == '\'message\' is a required property'
def validate(req, resp, params): raise falcon.HTTPBadRequest('Invalid thing', 'Your thing was not ' 'formatted correctly.')
def validate_field(req, resp, params): try: params['id'] = int(params['id']) except ValueError: raise falcon.HTTPBadRequest('Invalid ID', 'ID was not valid.')
def __init__(self, title=None, description=None, **kwargs): super(HTTPBadRequest, self).__init__(status.HTTP_400, title, description, **kwargs)
def validate(schema): """Decorator that validates ``req.media`` using JSON Schema Args: schema (dict): A dictionary that follows the JSON Schema specification. See `json-schema.org <http://json-schema.org/>`_ for more information on defining a compatible dictionary. Example: .. code:: python from falcon.media.validators import jsonschema # -- snip -- @jsonschema.validate(my_post_schema) def on_post(self, req, resp): # -- snip -- Note: This validator requires the ``jsonschema`` library available via PyPI. The library also requires Python 2.7+. """ def decorator(func): def wrapper(self, req, resp, *args, **kwargs): try: jsonschema.validate(req.media, schema) except jsonschema.ValidationError as e: raise falcon.HTTPBadRequest( 'Failed data validation', description=e.message ) return func(self, req, resp, *args, **kwargs) return wrapper return decorator