我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用werkzeug.exceptions.HTTPException()。
def make_json_app(import_name, **kwargs): """ Creates a JSON-oriented Flask app. All error responses that you don't specifically manage yourself will have application/json content type, and will contain JSON like this (just an example): { "message": "405: Method Not Allowed" } """ def make_json_error(ex): response = jsonify(message=str(ex)) response.status_code = (ex.code if isinstance(ex, HTTPException) else 500) return response app = Flask(import_name, **kwargs) for code in default_exceptions.iterkeys(): app.register_error_handler(code, make_json_error) return app
def handle_user_exception(self, e): """ override handle_user_exception and redirect the exception to handle_api_exception """ exc_type, exc_value, tb = sys.exc_info() assert exc_value is e if isinstance(e, APIError): return self.handle_api_exception(e) # hook HttpException and return handle_api_exception if isinstance(e, HTTPException) and not self.trap_http_exception(e): # return self.handle_http_exception(e) return self.handle_api_exception(e) handler = self._find_error_handler(e) if handler is None: reraise(exc_type, exc_value, tb) return handler(e)
def test(self, path_info=None, method=None): """Test if a rule would match. Works like `match` but returns `True` if the URL matches, or `False` if it does not exist. :param path_info: the path info to use for matching. Overrides the path info specified on binding. :param method: the HTTP method used for matching. Overrides the method specified on binding. """ try: self.match(path_info, method) except RequestRedirect: pass except HTTPException: return False return True
def _process_event(self, repo, event): """Process potentially new event for repository :param repo: Repository related to event :type repo: ``repocribro.models.Repository`` :param event: GitHub event data :type event: dict :return: If the event was new or already registered before :rtype: bool """ last = pytz.utc.localize(repo.last_event) if iso8601.parse_date(event['created_at']) <= last: return False hook_type = self.event2webhook.get(event['type'], 'uknown') for event_processor in self.hooks.get(hook_type, []): try: event_processor(db=self.db, repo=repo, payload=event['payload'], actor=event['actor']) print('Processed {} from {} event for {}'.format( event['type'], event['created_at'], repo.full_name )) except HTTPException: print('Error while processing #{}'.format(event['id'])) return True
def make_json_app(import_name, **kwargs): app = flask.Flask(import_name, **kwargs) @app.errorhandler(exceptions.FuxiException) @app.errorhandler(cinder_exception.ClientException) @app.errorhandler(nova_exception.ClientException) @app.errorhandler(manila_exception.ClientException) @app.errorhandler(processutils.ProcessExecutionError) @app.errorhandler(brick_exception.BrickException) def make_json_error(ex): LOG.error("Unexpected error happened: %s", traceback.format_exc()) response = flask.jsonify({"Err": str(ex)}) response.status_code = w_exceptions.InternalServerError.code if isinstance(ex, w_exceptions.HTTPException): response.status_code = ex.code content_type = 'application/vnd.docker.plugins.v1+json; charset=utf-8' response.headers['Content-Type'] = content_type return response for code in w_exceptions.default_exceptions: app.register_error_handler(code, make_json_error) return app
def can_handle_request(self, environ): ''' Decides whether it can handle a request with the Flask app by matching the request environ against the route mapper Returns (True, 'flask_app') if this is the case. ''' # TODO: identify matching urls as core or extension. This will depend # on how we setup routing in Flask urls = self.url_map.bind_to_environ(environ) try: endpoint, args = urls.match() log.debug('Flask route match, endpoint: {0}, args: {1}'.format( endpoint, args)) return (True, self.app_name) except HTTPException: return (False, self.app_name)
def raise_exc(cls, code, message): """ Raise an HTTPException with a code and a message sent in a json like { "code": code "message": message } :param cls: HTTPException of the error, for example NotFound, BadRequest, NotAuthorized :param code: A brief message for the exception, like MISSING_PARAMETER :param message: A longer description of the error :return: Nothing, raise the provided exception with the correct response """ response = Response() response.mimetype = "application/json" response.status_code = cls.code response.data = json.dumps({ "code": code, "message": message }) Logger.warning(cls.__name__.upper(), code + ": " + message) raise cls(response=response)
def handle(self, endpoint, route_args, request): """ Handle a request in the derived handler. The request is routed to the correct method using *endpoint* :param endpoint: A string with the name of the class method to call with (route_args, request) as parameters, this method should return a Response or call self.raise_exc. *NOTE*: the method MUST be implemented in the derived class :param route_args: The route parameters, the parameters extracted from the matching route in the URL :param request: The Request object, request.args contains the query parameters of the request :return: Return a Response if the request is successful, an HTTPException if an error occurred """ try: data = BaseHandler._call(self.__getattribute__(endpoint), route_args, request) response = Response() if data is not None: response.code = 200 response.mimetype = "application/json" response.data = json.dumps(data) else: response.code = 204 return response except HTTPException as e: return e
def http_error_handler(error, without_code=False): if isinstance(error, HTTPException): error = error.code elif not isinstance(error, int): error = 500 body = render_template('errors/{}.html'.format(error)) if not without_code: return make_response(body, error) else: return make_response(body) # ----------------------------------------------------------- # Hooks
def test_SupervisorRolePermission_authenticated_user_with_password_with_check_supervisor( authenticated_user_instance ): authenticated_user_instance.password = "correct_password" obj = Mock() obj.check_supervisor = lambda user: user == authenticated_user_instance with permissions.SupervisorRolePermission( obj=obj, password_required=True, password="correct_password" ): pass with pytest.raises(HTTPException): with permissions.SupervisorRolePermission( obj=obj, password_required=True, password="wrong_password" ): pass
def test_SupervisorRolePermission_authenticated_user_with_password_without_check_supervisor( authenticated_user_instance ): authenticated_user_instance.password = "correct_password" obj = Mock() del obj.check_supervisor with pytest.raises(HTTPException): with permissions.SupervisorRolePermission( obj=obj, password_required=True, password="correct_password" ): pass with pytest.raises(HTTPException): with permissions.SupervisorRolePermission( obj=obj, password_required=True, password="wrong_password" ): pass
def test_OwnerRolePermission_authenticated_user_with_password_without_check_owner( authenticated_user_instance ): authenticated_user_instance.password = "correct_password" obj = Mock() del obj.check_owner with pytest.raises(HTTPException): with permissions.OwnerRolePermission( obj=obj, password_required=True, password="correct_password" ): pass with pytest.raises(HTTPException): with permissions.OwnerRolePermission( obj=obj, password_required=True, password="wrong_password" ): pass
def _handle_error(self, e): """Returns handled excpetion. Detects blueprint from global :class:`~flask.wrappers.Request` object, and passes exception object to its `registered_errorhandler`. """ blueprint = self._detect_blueprint() if isinstance(e, HTTPException): if self._error_loader_callback is not None: e = self._error_loader_callback(e) # load custom exception else: print(traceback.format_exc()) e = HTTPException() if blueprint is not None: if blueprint.name in self._errorhandlers: return self._errorhandlers[blueprint.name](e) else: if None in self._errorhandlers: return self._errorhandlers[None](e) return e
def handle_exception(error): code = 500 message = None if hasattr(error, 'status_code') : code = error.status_code if hasattr(error, 'message') : message = str(error.message) if isinstance(error, HTTPException): code = error.code message = str(error) extra = error.extra if hasattr(error, 'extra') else None response = jsonify(format_exception(message, code=code, extra=extra)) response.status_code = code return response
def error_handler(error): """ Standard Error Handler """ if isinstance(error, HTTPException): return jsonify({ 'statusCode': error.code, 'name': error.name, 'description': error.description }), error.code else: return jsonify({ 'statusCode': 500, 'name': 'Internal Server Error', 'description': 'An unknown error has occurred' }), 500 # common errors - add others as needed
def render(self, exception): """ Render the exception :param exception: The exception :type exception: Exception :return: The response """ if not isinstance(exception, HTTPException): http_exception = InternalServerError() else: http_exception = exception is_server_error = http_exception.code - (http_exception.code % 100) == 500 if self.app.debug and is_server_error: if sys.version_info < (3, 0): exc_type, exc_value, tb = sys.exc_info() if exc_value is exception: reraise(exc_type, exc_value, tb) raise exception else: if self.app.testing and is_server_error and isinstance(exception, Exception): http_exception.description = '%s' % exception return http_exception.get_response()
def abort(self, code=500, message=None, **kwargs): ''' Properly abort the current request. Raise a `HTTPException` for the given status `code`. Attach any keyword arguments to the exception for later processing. :param int code: The associated HTTP status code :param str message: An optional details message :param kwargs: Any additional data to pass to the error payload :raise HTTPException: ''' try: flask.abort(code) except HTTPException as e: # JSON API specs kwargs['errors'] = [] kwargs['errors'].append({}) kwargs['errors'][0]['detail'] = message kwargs['errors'][0]['status'] = str(code) kwargs['errors'][0]['title'] = str(e).split(':')[1].lstrip(' ') e.data = kwargs raise
def __init__(self, app, environ): self.app = app self.url_adapter = app.url_map.bind_to_environ(environ, server_name=app.config['SERVER_NAME']) self.request = app.request_class(environ) self.session = app.open_session(self.request) if self.session is None: self.session = _NullSession() self.g = _RequestGlobals() self.flashes = None try: self.request.endpoint, self.request.view_args = \ self.url_adapter.match() except HTTPException, e: self.request.routing_exception = e
def __init__(self, app, environ): self.app = app self.url_adapter = app.url_map.bind_to_environ(environ) self.request = app.request_class(environ) # ??(session) ??: self.session = app.open_session(self.request) # ????: session: ?????? session ?? if self.session is None: self.session = _NullSession() # g ??: self.g = _RequestGlobals() # g: ?????? g ?? self.flashes = None try: self.request.endpoint, self.request.view_args = \ self.url_adapter.match() except HTTPException, e: self.request.routing_exception = e # # ??: # - ?? ????? # - ? ??????, ?? --> ??????? g, session #
def dispatch_request(self): """Does the request dispatching. Matches the URL and returns the return value of the view or error handler. This does not have to be a response object. In order to convert the return value to a proper response object, call :func:`make_response`. """ try: endpoint, values = self.match_request() # ???? return self.view_functions[endpoint](**values) except HTTPException, e: handler = self.error_handlers.get(e.code) if handler is None: return e return handler(e) except Exception, e: handler = self.error_handlers.get(500) if self.debug or handler is None: raise return handler(e) # ????
def with_error_response(logger): def _deco(func): @functools.wraps(func) def _wrapper(*args, **kwargs): try: return func(*args, **kwargs) except HTTPException as exc: logger.exception("Tolerated error: %s", exc) raise except SQLAlchemyError as exc: logger.exception("SQLAlchemy error: %s", exc) raise except Exception as exc: logger.exception("Unknown error: %s", exc) raise return _wrapper return _deco
def returns_json(f): """Decorator to add the content type to responses.""" @wraps(f) def decorated_function(*args, **kwargs): try: r = f(*args, **kwargs) except HTTPException as e: # monkey-patch the headers / body to be json headers = e.get_headers() for header in headers: if 'Content-Type' in header: headers.remove(header) headers.append(('Content-Type', 'application/json')) e.get_headers = lambda x: headers e.get_body = lambda x: json.dumps({"message": e.description}) raise e if isinstance(r, tuple): return Response(r[0], status=r[1], content_type='application/json') else: return Response(r, content_type='application/json') return decorated_function
def dispatch_request(self): """Does the request dispatching. Matches the URL and returns the return value of the view or error handler. This does not have to be a response object. In order to convert the return value to a proper response object, call :func:`make_response`. """ try: endpoint, values = self.match_request() return self.view_functions[endpoint](**values) except HTTPException, e: handler = self.error_handlers.get(e.code) if handler is None: return e return handler(e) except Exception, e: handler = self.error_handlers.get(500) if self.debug or handler is None: raise return handler(e)
def dispatch_request(self): """ ???dispatch -> ????? request -> function -> response HTTP???? function -> ????? """ try: endpoint, values = self.match_request() return self.route_functions[endpoint](**values) except HTTPException, e: handler = self.error_handlers.get(e.code) if handler is None: return e return handler(e) except Exception, e: handler = self.error_handlers.get(500) if self.debug or handler is None: raise return handler(e)
def match_request(self): """Can be overridden by a subclass to hook into the matching of the request. """ try: url_rule, self.request.view_args = \ self.url_adapter.match(return_rule=True) self.request.url_rule = url_rule except HTTPException as e: self.request.routing_exception = e
def del_category(id): """Delete a category.""" try: category = project_repo.get_category(id) if category: if len(cached_cat.get_all()) > 1: ensure_authorized_to('delete', category) if request.method == 'GET': response = dict(template='admin/del_category.html', title=gettext('Delete Category'), category=category, form=dict(csrf=generate_csrf())) return handle_content_type(response) if request.method == 'POST': project_repo.delete_category(category) msg = gettext("Category deleted") flash(msg, 'success') cached_cat.reset() return redirect_content_type(url_for(".categories")) else: msg = gettext('Sorry, it is not possible to delete the only' ' available category. You can modify it, ' ' click the edit button') flash(msg, 'warning') return redirect_content_type(url_for('.categories')) else: abort(404) except HTTPException: raise except Exception as e: # pragma: no cover current_app.logger.error(e) return abort(500)
def update_category(id): """Update a category.""" try: category = project_repo.get_category(id) if category: ensure_authorized_to('update', category) form = CategoryForm(obj=category) form.populate_obj(category) if request.method == 'GET': response = dict(template='admin/update_category.html', title=gettext('Update Category'), category=category, form=form) return handle_content_type(response) if request.method == 'POST': form = CategoryForm(request.body) if form.validate(): slug = form.name.data.lower().replace(" ", "") new_category = Category(id=form.id.data, name=form.name.data, short_name=slug) project_repo.update_category(new_category) cached_cat.reset() msg = gettext("Category updated") flash(msg, 'success') return redirect_content_type(url_for(".categories")) else: msg = gettext("Please correct the errors") flash(msg, 'success') response = dict(template='admin/update_category.html', title=gettext('Update Category'), category=category, form=form) return handle_content_type(response) else: abort(404) except HTTPException: raise except Exception as e: # pragma: no cover current_app.logger.error(e) return abort(500)
def dispatch_request(self, request): """ ????~???? request.environ -> url_map bind to environ -> caller(???) -> ins(??????) -> current_method(????http??) -> current_method(ins): ??????????????? -> res(????) -> make_response(??????????) ????????????????? """ adapter = self.url_map.bind_to_environ(request.environ) try: endpoint, values = adapter.match() caller = [ caller_rule[0] for caller_rule in \ self.url_map._caller_rule \ if endpoint == caller_rule[0].__name__ ][0] ins = caller(app_name=self.app_name, **values) methods = adapter.default_method current_method = getattr(caller, request.method.lower(), None) if not current_method: return self.make_response("Method not allowed") res = current_method(ins) return self.make_response(res, request) or \ self.make_response("rv is NoneType...") except NotFound, e: return self.error_404() except HTTPException, e: return e
def dispatch_request(self, request): adapter = self.url_map.bind_to_environ(request.environ) try: endpoint, values = adapter.match() method = getattr(self, 'endpoint_{}'.format(endpoint)) return method(adapter, request, **values) except HTTPException, e: return e
def allowed_methods(self, path_info=None): """Returns the valid methods that match for a given path. .. versionadded:: 0.7 """ try: self.match(path_info, method='--') except MethodNotAllowed as e: return e.valid_methods except HTTPException as e: pass return []
def dispatch_request(self, request): adapter = self.url_map.bind_to_environ(request.environ) try: endpoint, values = adapter.match() return getattr(self, 'on_' + endpoint)(request, **values) except HTTPException as e: return e