我们从Python开源项目中,提取了以下42个代码示例,用于说明如何使用flask_restful.abort()。
def schema_request(media_type_name): def wrapper(fn): @wraps(fn) def decorated(*args, **kwargs): if isinstance(media_type_name, basestring): schema = get_schema_for_media_type(media_type_name) else: schema = media_type_name json_object = request.get_json() try: validate(json_object, schema) except ValidationError as e: report = generate_validation_error_report(e, json_object) abort(400, description=report) return fn(*args, **kwargs) return decorated return wrapper
def put(self, message_id): """Edit an existing message. Argument: message_id (int): -- """ text = get_valid_json(self.SCHEMA)['text'] result = db.session.query(models.Message).get(message_id) # we don't want people editing posts which don't # belong to them... if result.user.username == auth.username(): result.text = text db.session.commit() return self.get(message_id) else: message = "You are not this message's author." flask_restful.abort(400, message=message)
def delete(self, message_id): """Delete a specific message. Argument: message_id (int): -- """ result = db.session.query(models.Message).get(message_id) if result is None: flask_restful.abort(404, message="No post by id %d" % message_id) if result.user.username == auth.username(): db.session.delete(result) db.session.commit() return {} else: message = "You're not the author of message %d" % message_id flask_restful.abort(401, message=message)
def post(self, args): email = args['email'] password = args['password'] # Check the info! user_from_db = users.User.query.filter_by(email=email).first() if user_from_db is None: # FIXME: bad message flask_restful.abort(401, message={'error': 'failed login'}) elif flask_security.utils.verify_password(password, user_from_db.password): return { 'access_token': create_access_token(identity=email), } else: # FIXME: terrible message. flask_restful.abort(401, message={'error': 'incorrect email/pass'})
def post(self): """ Register a new domain name on this resolver. Returns a serialization which includes a JSON Web Token which can be used to authorize updates to this domain mapping in future. """ args = new_domain_parser.parse_args() domain_name = "{}.{}".format(args.domain_name["label"], args.domain_name["zone"]) try: domain = Domain.create(domain_name=domain_name, zone=args.domain_name["zone"], public=args.public) except exc.IntegrityError: return abort(422, message={'domain_name': "Domain {} already exists".format(domain_name)}) return domain
def post(self, name): if not current_user.is_authenticated: return abort(401, message='permission denied') for topic in current_user.following_topics: if topic.name == name: return topic topic = topics_service.get(name) if topic is None: return abort(400, message='topic not found') try: current_user.following_topics.append(topic) topic.followers_count += 1 print topic.followers_count db.session.commit() except: db.session.rollback() return abort(500, message='operation failed') return topic
def delete(self, name): topic = None if not current_user.is_authenticated: return abort(401, message='permission denied') for topic in current_user.following_topics: if topic.name == name: break else: return abort(400, message='topic not found') try: current_user.following_topics.remove(topic) if topic.followers_count > 0: topic.followers_count -= 1 db.session.commit() except: db.session.rollback() return abort(500, message='operation failed')
def handle_validation_error(self, error, bundle_errors): """Called when an error is raised while parsing. Aborts the request with a 400 status and an error message :param error: the error that was raised :param bundle_errors: do not abort when first error occurs, return a dict with the name of the argument and the error message to be bundled """ error_str = six.text_type(error) error_msg = self.help.format(error_msg=error_str) if self.help else error_str msg = {self.name: error_msg} if current_app.config.get("BUNDLE_ERRORS", False) or bundle_errors: return error, msg flask_restful.abort(400, message=msg)
def put(group_id): try: args = put_parser.parse_args() except BadRequest: return abort(400, message="Invalid arguments") with db: with db.cursor() as cursor: user_id = id_from_token(args['token']) if user_id is None: return abort(403, message="Token is not allowed to view this group") cursor.execute("SELECT id FROM groups_rel WHERE user_id=%s AND groups_rel.group_id=%s", [user_id, group_id]) group = cursor.fetchone() if group is None: return abort(403, message="Token is not allowed to view this group") cursor.execute("INSERT INTO group_recipe_vote_log (grecipe_id, user_id, action) VALUES " "((SELECT id FROM group_recipes WHERE group_id=%s AND group_recipes.recipe_id=%s)," " %s, %s)", [group_id, args['recipe_id'], user_id, args['action']]) return None, 200
def get(ean): with db: with db.cursor() as cursor: cursor.execute("SELECT name, type FROM products" " WHERE ean=%s", (ean,)) product = cursor.fetchone() if product is not None: return { "ean": ean, "name": product[0], "type": product[1] } else: result = request_product(ean) if result is not None: Product.add_product(result['ean'], result['name'], result['type']) return result abort(404, message="Product with EAN {} does not exist.".format(ean))
def put(self, ean): try: args = parser.parse_args() args['type'] = int(args['type']) except Exception: return abort(400, message="Invalid arguments") if args['name'] is None or len(args['name']) == 0: args['name'] = "Unknown" with db: with db.cursor() as cursor: cursor.execute("SELECT id FROM product_types WHERE id=%s", [args['type']]) if cursor.fetchone() is None: abort(400, message="Invalid arguments, product type does not exist.") cursor.execute("SELECT name, type FROM products WHERE ean=%s", [ean, ]) if cursor.fetchone() is None: self.add_product(ean, args['name'], args['type']) return None, 201 else: cursor.execute("UPDATE products SET name=%s, type=%s WHERE ean=%s", (args['name'], args['type'], ean)) db.commit() return None, 200
def put(self): try: args = parser.parse_args() except BadRequest: return abort(400, message="Invalid arguments") with db: with db.cursor() as cursor: user_id = get_or_create_id(args['token']) cursor.execute(u"UPDATE users SET firebase_token=%s WHERE id=%s", [args['firebase_token'], user_id[0]]) if user_id[1]: return None, 201 else: return None, 200
def get(self): try: args = parser.parse_args() except Exception: return abort(400, message="Invalid arguments") with db: with db.cursor() as cursor: user_id = get_or_create_id(args['token']) cursor.execute( "SELECT fridge_items.ean, products.name, products.type FROM fridge_items" " JOIN products ON products.ean=fridge_items.ean" " WHERE fridge_items.user_id=%s", [user_id[0]]) response = [] for item in cursor.fetchall(): response.append({'ean': item[0], 'name': item[1], 'type': item[2]}) return response, 200
def put(self, ean): try: args = parser.parse_args() except Exception: return abort(400, message="Invalid arguments") with db: with db.cursor() as cursor: user_id = get_or_create_id(args['token']) cursor.execute( "SELECT products.name, products.type FROM fridge_items" " JOIN products ON products.ean=fridge_items.ean" " WHERE fridge_items.user_id=%s AND fridge_items.ean=%s", [user_id[0], ean]) query = cursor.fetchone() if query is None: p = Product.get(ean) # Produces 404 if not exists cursor.execute("INSERT INTO fridge_items (ean, user_id) VALUES (%s, %s)", [ean, user_id[0]]) return p, 201 else: return { "name": query[0], "type": query[1] }, 200
def put(self, group_id): try: args = putParser.parse_args() except BadRequest: return abort(400, message="Invalid arguments") with db: with db.cursor() as cursor: user_id = id_from_token(args['token']) if user_id is None: return abort(403, message="Token is not allowed to view this group") cursor.execute("SELECT id FROM groups_rel WHERE user_id=%s AND groups_rel.group_id=%s", [user_id, group_id]) rel = cursor.fetchone() if rel is None: return abort(404, message="Group not found") cursor.execute("UPDATE groups_rel SET accepted=%s WHERE id=%s", [args['accept'], rel[0]]) return 200
def get(self): try: args = parser.parse_args() except BadRequest: return abort(400, message="Invalid arguments") with db: with db.cursor() as cursor: cursor.execute("SELECT name FROM users WHERE id=%s", [id_from_token(args['token'])]) query = cursor.fetchone() if query is None: return None, 404 else: return { "name": query[0] }
def put(self): try: args = parser.parse_args() except BadRequest: return abort(400, message="Invalid arguments") if 'resend_all' not in args: return abort(200, message="") if args['resend_all']: with db: with db.cursor() as cursor: user_id = id_from_token(args['token']) if user_id is None: return abort(400, message="Invalid arguments") cursor.execute("UPDATE groups_rel SET invited=FALSE WHERE id = (SELECT groups_rel.id FROM groups_rel " "JOIN groups ON groups_rel.group_id = groups.id " "WHERE groups_rel.user_id=%s AND groups.day = CURRENT_DATE)", [user_id]) send_invitations() return 200
def handle_validation_error(self, error, bundle_errors): """Called when an error is raised while parsing. Aborts the request with a 400 status and an error message :param error: the error that was raised :param bundle_errors: do not abort when first error occurs, return a dict with the name of the argument and the error message to be bundled """ error_str = six.text_type(error) error_msg = self.help.format(error_msg=error_str) if self.help else error_str msg = {self.name: "{0}".format(error_msg)} if current_app.config.get("BUNDLE_ERRORS", False) or bundle_errors: return error, msg flask_restful.abort(400, message=msg)
def check_schema(json_object, schema, title=None): """Do json schema check on object and abort with 400 error if it fails.""" try: validate(json_object, schema) except ValidationError as e: report = generate_validation_error_report(e, json_object) if title: report = "Schema check failed: %s\n%s" % (title, report) log.info("Schema validation failure\n%s", report) abort(400, description=report)
def post(self): """Create a new user. If a user already exists with the provided username, an HTTP 400 is sent. Returns: dict: If the user was successfully created, return a dictionary describing that created user. None: If aborted. """ json_data = get_valid_json(self.SCHEMA_POST) bio = json_data.get('bio') username = json_data['username'] password = json_data['password'] new_user = models.User(username, password, bio=bio) db.session.add(new_user) try: db.session.commit() except sqlalchemy.exc.IntegrityError: message = "A user already exists with username: %s" % username flask_restful.abort(400, message=message) else: # will happen only if successful/no # error raised return new_user.to_dict()
def get(self): """Get a range of messages using a "limit" and an "offset." Returns: list: list of dictionaries describing messages, if successful. None: If aborted. """ json_data = get_valid_json(self.SCHEMA_GET) offset = int(json_data['offset']) limit = int(json_data['limit']) # Just make sure not requesting too many at once! if limit > config.LIMITS_MESSAGES_GET_LIMIT: message = ("You may only request %d messages at once." % config.LIMITS_MESSAGES_GET_LIMIT) flask_restful.abort(400, message=message) # Now we're sure we have the right data to # make a query, actually do that query and # return said data or 404 if nothing matches. query = db.session.query(models.Message) results = query.limit(limit).offset(offset).all() if results == []: message = ("No messages found at offset %d limit %d" % (offset, limit)) flask_restful.abort(404, message=message) else: return [r.to_dict() for r in results]
def auth_error(): flask_restful.abort(401, message="Unathorized")
def get_valid_json(schema): json_data = flask.request.get_json(force=True) try: jsonschema.validate(json_data, schema) except jsonschema.ValidationError as e: flask_restful.abort(400, message=e.message) else: return json_data
def handle_arg_schema_error(error): """RESTful error regarding webarg schema mismatch (invalidation). Arguments: error (???): An error object from webargs, passed to this function when a request's arguments are not valid according to the specified webargs schema (a dict, see: use_args decorator). Returns: json "error"... """ flask_restful.abort(400, validationError=error.messages)
def get(self): return abort(501)
def get(self, storageid): return abort(501)
def get(self, nodeid): return abort(http_client.NOT_IMPLEMENTED)
def post(self): """ Add a new proxy to the proxy list """ args = new_proxy_parser.parse_args() ip_address = args.ip_address try: proxy = Proxy.create(ip_address=str(ip_address), ip_type=str(ip_address.version)) except exc.IntegrityError: return abort(422, message="Proxy {} already exists".format(str(ip_address))) return proxy
def post(self, domain_name): """ Create a new DNS record on this domain The request must be validated with the domain token. """ probable_onion_mapping = None args = new_record_parser.parse_args() num_records = Record.query.filter_by(domain=g.domain).count() if num_records >= current_app.config["MAX_RECORDS"]: return abort(403, message="This domain has had reach the DNS record limit. You cannot " "add more without removing some existing records") # Record address on domain if this looks like an onion mapping if is_onion_record(args.value) and args.type == "TXT": probable_onion_mapping = args.value.split("=")[1] try: record = Record.create(domain=g.domain, label=args.label, ttl=args.ttl or current_app.config["TXT_RECORD_TTL"], record_type=args.type, value=args.value, is_onion_mapping=probable_onion_mapping) except exc.IntegrityError: return abort(422, message="An unknown error occurred when trying to create " "this record") # Guess that the user is updating their onion address if its a valid # onion=theonionaddress.onion type TXT record if probable_onion_mapping: # Update the convenience onion_address wrapper on the domain g.domain.update(onion_address=probable_onion_mapping, date_updated=datetime.datetime.utcnow(), service_online=True, updated_since_synced=True) return record
def get(self, id): """Get run by id""" run = self.backend_store.get_run(id) if not run: return abort(404, message="Run {} doesn't exist".format(id)) return run_model.format_response(run)
def delete(self, id): """Delete run by id""" run = self.backend_store.get_run(id) if not run: return abort(404, message="Run {} doesn't exist".format(id)) if not self.manager.delete_run(run): return abort(400, message="Failed to find the task queue " "manager of run {}.".format(id))
def get(self, task_id): """ Returns information about task status. """ queue = rq.get_queue() task = queue.fetch_job(task_id) if task: return { "id": task.id, "status": task.status, "result_url": task.result } else: return abort(404, message="Unknown task_id")
def put(self, issue_id, username): if not current_user.is_authenticated: return abort(401, message='permission denied') issue = issues_service.get(issue_id) if issue is None: return abort(400, message='issue not found') user = users_service.get_by_username(username) if user is None: return abort(400, message='user not found') args = self.parser.parse_args() action = args.get('action') action = 'upvote' if action is None else action value = -1 if action == 'downvote' else 1 voter = voters_service.get(user, issue) if voter is None: if current_user.points - points.VOTE < 0: return abort(403, message='points are not enough') current_user.points -= points.VOTE log = PointLog(action, points.VOTE, user, issue) voter = Voter(user, issue) db.session.add(voter) db.session.add(log) issue.points += value elif voter.value != value: issue.points += value voter.value = value else: return voter try: db.session.commit() except: db.session.rollback() return abort(500, message='operation failed') return voter
def parse_args(self, req=None, strict=False): """Parse all arguments from the provided request and return the results as a Namespace :param strict: if req includes args not in parser, throw 400 BadRequest exception """ if req is None: req = request namespace = self.namespace_class() # A record of arguments not yet parsed; as each is found # among self.args, it will be popped out req.unparsed_arguments = dict(self.argument_class('').source(req)) if strict else {} errors = {} for arg in self.args: value, found = arg.parse(req, self.bundle_errors) if isinstance(value, ValueError): errors.update(found) found = None if found or arg.store_missing: namespace[arg.dest or arg.name] = value if errors: flask_restful.abort(400, message=errors) if strict and req.unparsed_arguments: raise exceptions.BadRequest('Unknown arguments: %s' % ', '.join(req.unparsed_arguments.keys())) return namespace
def get(group_id): try: args = parser.parse_args() except BadRequest: return abort(400, message="Invalid arguments") with db: with db.cursor() as cursor: user_id = id_from_token(args['token']) if user_id is None: return abort(403, message="Token is not allowed to view this group") cursor.execute("SELECT id FROM groups_rel WHERE user_id=%s AND groups_rel.group_id=%s", [user_id, group_id]) group = cursor.fetchone() if group is None: return abort(403, message="Token is not allowed to view this group") cursor.execute("SELECT recipe_id, title, upvotes, veto, image, cheap, vegan, vegetarian, duration FROM group_recipes " "LEFT JOIN rec_recipes ON group_recipes.recipe_id = rec_recipes.id " "WHERE group_id=%s", [group_id]) recipes = [] for recipe in cursor.fetchall(): recipes.append({ 'id': recipe[0], 'title': recipe[1], 'desc': None, 'upvotes': recipe[2], 'veto': recipe[3], 'image': recipe[4], 'cheap': recipe[5], 'vegan': recipe[6], 'vegetarian': recipe[7], 'duration': recipe[8] }) return recipes
def delete(self, ean): try: args = parser.parse_args() except Exception: return abort(400, message="Invalid arguments") with db: with db.cursor() as cursor: user_id = id_from_token(args['token']) cursor.execute("DELETE FROM fridge_items WHERE ean=%s AND fridge_items.user_id=%s", [ean, user_id]) if cursor.rowcount == 1: return None, 200 else: abort(404, message="Fridge item does not exist")
def get(group_id): try: args = parser.parse_args() except BadRequest: return abort(400, message="Invalid arguments") with db: with db.cursor() as cursor: user_id = id_from_token(args['token']) if user_id is None: return abort(403, message="Token is not allowed to view this group") cursor.execute("SELECT day, COUNT(CASE WHEN invited THEN 1 END) AS invited, " "COUNT(CASE WHEN accepted THEN 1 END) AS accepted, " # Only if user is Group Member this will be 1 "COUNT(CASE WHEN user_id=%s THEN 1 END) = 1 AS allowed " "FROM groups " "JOIN groups_rel ON groups.id = groups_rel.group_id " "WHERE groups.id=%s " "GROUP BY groups.id ", [user_id, group_id]) group = cursor.fetchone() if group is None: return abort(404, message="Group not found") if not group[3]: return abort(403, message="Token is not allowed to view this group") return { 'invited': group[1], 'accepted': group[2], 'day': str(group[0]), }
def get(self, user_id=None, username=None): """Get a specific user's info by user ID *or* username. This returns a 400 if neither user_id nor username was provided. Returns a 404 if cannot find a user by the provided user ID or username. Arguments: user_id (int|None): -- username (str|None): -- Warning: If both `user_id` and `username` are specified, user_id will be used. Returns: None: If aborted. dict: If such a user is found, the return value is a dictionary describing the user. """ if user_id is not None: user = db.session.query(models.User).get(user_id) if user is None: message = "No user matching ID: %s" % user_id flask_restful.abort(404, message=message) elif username is not None: user = (db.session.query(models.User) .filter(models.User.username == username).first()) if user is None: message = "No user matching username: %s" % username flask_restful.abort(404, message=message) else: message = "Must specify user_id or username." flask_restful.abort(400, message=message) return user.to_dict()
def post(self): """ Recieves file and options, checks file mimetype, validates options and creates converting task """ if "file" not in request.files: return abort(400, message="file field is required") else: with NamedTemporaryFile(delete=False, prefix=app.config["MEDIA_PATH"]) as tmp_file: request.files["file"].save(tmp_file) remove_file.schedule( datetime.timedelta(seconds=app.config["ORIGINAL_FILE_TTL"]) , tmp_file.name) with Magic() as magic: # detect mimetype mimetype = magic.from_file(tmp_file.name) if mimetype not in app.config["SUPPORTED_MIMETYPES"]: return abort(400, message="Not supported mimetype: '{0}'".format(mimetype)) options = request.form.get("options", None) if options: # options validation options = ujson.loads(options) formats = options.get("formats", None) if not isinstance(formats, list) or not formats: return abort(400, message="Invalid 'formats' value") else: for fmt in formats: supported = (fmt in app.config["SUPPORTED_MIMETYPES"][mimetype]["formats"]) if not supported: message = "'{0}' mimetype can't be converted to '{1}'" return abort(400, message=message.format(mimetype, fmt)) thumbnails = options.get("thumbnails", None) if thumbnails: if not isinstance(thumbnails, dict): return abort(400, message="Invalid 'thumbnails' value") else: thumbnails_size = thumbnails.get("size", None) if not isinstance(thumbnails_size, str) or not thumbnails_size: return abort(400, message="Invalid 'size' value") else: try: (width, height) = map(int, thumbnails_size.split("x")) except ValueError: return abort(400, message="Invalid 'size' value") else: options["thumbnails"]["size"] = (width, height) else: if mimetype == "application/pdf": options = { "formats": ["html"] } else: options = app.config["DEFAULT_OPTIONS"] task = process_document.queue(tmp_file.name, options, { "mimetype": mimetype, }) return { "id": task.id, "status": task.status, }