我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用flask.abort()。
def auth_jwt_project(short_name): """Create a JWT for a project via its secret KEY.""" project_secret_key = None if 'Authorization' in request.headers: project_secret_key = request.headers.get('Authorization') if project_secret_key: project = project_repo.get_by_shortname(short_name) if project and project.secret_key == project_secret_key: token = jwt.encode({'short_name': short_name, 'project_id': project.id}, project.secret_key, algorithm='HS256') return token else: return abort(404) else: return abort(403)
def init_login_manager(db): """Init security extensions (login manager and principal) :param db: Database which stores user accounts and roles :type db: ``flask_sqlalchemy.SQLAlchemy`` :return: Login manager and principal extensions :rtype: (``flask_login.LoginManager``, ``flask_principal.Principal`` """ login_manager = flask_login.LoginManager() principals = flask_principal.Principal() login_manager.anonymous_user = Anonymous @login_manager.unauthorized_handler def unauthorized(): flask.abort(403) @login_manager.user_loader def load_user(user_id): return db.session.query(UserAccount).get(int(user_id)) return login_manager, principals
def lookup(): icao_identifier = request.form['airport'] if request.form['date']: try: date = dateparser.parse(request.form['date']).date() except ValueError: return "Unable to understand date %s" % request.form['date'], 400 else: date = datetime.date.today() try: result = do_lookup(icao_identifier, date) except Exception as e: return str(e), 400 except: flask.abort(500) return json.dumps(result)
def star(): """Starring/Highlighting handler. Attempts to toggle a star/highlight on a particular show. The show ID must be passed in the ``id`` query string. If the user is unauthenticated, the function is aborted with a ``404`` message to hide the page. Returns: JSON formatted output describing success and the ID of the show starred. """ log.debug("Entering star, trying to toggle star.") if fe.check_login_id(escape(session['logged_in'])): log.debug("Sending show ID {0} to function".format(request.args['id'])) fe.star_show(request.args['id']) log.debug("Returning to user.") return jsonify({ "star": "success", "id": request.args['id'] }) log.debug("User cannot be authenticated, send 404 to hide page.") abort(404)
def drop_show(): """Show removal handler. Attempts to remove a show from the backend system. The show ID must be passed in the ``id`` query string. If the user if unauthenticated, the function is aborted with a ``404`` message to hide the page. Returns: An HTTP redirect to the home page, to refresh. """ log.debug("Entering drop_show, trying to remove show from list.") if fe.check_login_id(escape(session['logged_in'])): log.debug("Sending show ID {0} to function".format(request.args['id'])) fe.remove_show(request.args['id']) log.debug("Refreshing user's page.") return redirect('/') log.debug("User cannot be authenticated, send 404 to hide page.") abort(404)
def scan_scrapers(): """On demand scrapper scanning handler. For some reason the scheduler doesn't always work, this endpoint allows for instant scanning, assuming it's not already occurring. The function is aborted with a ``404`` message to hide the page if the user is not authenticated. Scanning can take a long time - 20 to 30 minutes - so it's recommended this endpoint be called asynchronously. Returns: JSON formatted output to identify that scanning has completed or is already ongoing. """ log.debug("Entering scan_scrapers.") if fe.check_login_id(escape(session['logged_in'])): log.debug("User is logged in, attempting to begin scan.") if not fe.scrape_shows(): log.debug("scrape_shows returned false, either the lockfile exists incorrectly or scraping is ongoing.") return jsonify({"scan":"failure", "reason":"A scan is ongoing"}) log.debug("scrape_shows just returned. Returning success.") return jsonify({"scan":"success"}) log.debug("User cannot be authenticated, send 404 to hide page.") abort(404)
def test_error_handling(self): app = flask.Flask(__name__) admin = flask.Module(__name__, 'admin') @admin.app_errorhandler(404) def not_found(e): return 'not found', 404 @admin.app_errorhandler(500) def internal_server_error(e): return 'internal server error', 500 @admin.route('/') def index(): flask.abort(404) @admin.route('/error') def error(): 1 // 0 app.register_module(admin) c = app.test_client() rv = c.get('/') self.assert_equal(rv.status_code, 404) self.assert_equal(rv.data, b'not found') rv = c.get('/error') self.assert_equal(rv.status_code, 500) self.assert_equal(b'internal server error', rv.data)
def test_aborting(self): class Foo(Exception): whatever = 42 app = flask.Flask(__name__) app.testing = True @app.errorhandler(Foo) def handle_foo(e): return str(e.whatever) @app.route('/') def index(): raise flask.abort(flask.redirect(flask.url_for('test'))) @app.route('/test') def test(): raise Foo() with app.test_client() as c: rv = c.get('/') self.assertEqual(rv.headers['Location'], 'http://localhost/test') rv = c.get('/test') self.assertEqual(rv.data, b'42')
def index(page=1): """Index page for all PYBOSSA registered users.""" update_feed = get_update_feed() per_page = 24 count = cached_users.get_total_users() accounts = cached_users.get_users_page(page, per_page) if not accounts and page != 1: abort(404) pagination = Pagination(page, per_page, count) if current_user.is_authenticated(): user_id = current_user.id else: user_id = None top_users = cached_users.get_leaderboard(current_app.config['LEADERBOARD'], user_id) tmp = dict(template='account/index.html', accounts=accounts, total=count, top_users=top_users, title="Community", pagination=pagination, update_feed=update_feed) return handle_content_type(tmp)
def confirm_email(): """Send email to confirm user email.""" acc_conf_dis = current_app.config.get('ACCOUNT_CONFIRMATION_DISABLED') if acc_conf_dis: return abort(404) if current_user.valid_email is False: user = user_repo.get(current_user.id) account = dict(fullname=current_user.fullname, name=current_user.name, email_addr=current_user.email_addr) confirm_url = get_email_confirmation_url(account) subject = ('Verify your email in %s' % current_app.config.get('BRAND')) msg = dict(subject=subject, recipients=[current_user.email_addr], body=render_template('/account/email/validate_email.md', user=account, confirm_url=confirm_url)) msg['html'] = render_template('/account/email/validate_email.html', user=account, confirm_url=confirm_url) mail_queue.enqueue(send_mail, msg) msg = gettext("An e-mail has been sent to \ validate your e-mail address.") flash(msg, 'info') user.confirmation_email_sent = True user_repo.update(user) return redirect_content_type(url_for('.profile', name=current_user.name))
def projects(name): """ List user's project list. Returns a Jinja2 template with the list of projects of the user. """ user = user_repo.get_by_name(name) if not user: return abort(404) if current_user.name != name: return abort(403) user = user_repo.get(current_user.id) projects_published, projects_draft = _get_user_projects(user.id) response = dict(template='account/projects.html', title=gettext("Projects"), projects_published=projects_published, projects_draft=projects_draft) return handle_content_type(response)
def reset_api_key(name): """ Reset API-KEY for user. Returns a Jinja2 template. """ if request.method == 'POST': user = user_repo.get_by_name(name) if not user: return abort(404) ensure_authorized_to('update', user) user.api_key = model.make_uuid() user_repo.update(user) cached_users.delete_user_summary(user.name) msg = gettext('New API-KEY generated') flash(msg, 'success') return redirect_content_type(url_for('account.profile', name=name)) else: csrf = dict(form=dict(csrf=generate_csrf())) return jsonify(csrf)
def project_by_shortname(short_name): project = cached_projects.get_project(short_name) if project: # Get owner owner = user_repo.get(project.owner_id) # Populate CACHE with the data of the project return (project, owner, cached_projects.n_tasks(project.id), cached_projects.n_task_runs(project.id), cached_projects.overall_progress(project.id), cached_projects.last_activity(project.id), cached_projects.n_results(project.id)) else: cached_projects.delete_project(short_name) return abort(404)
def delete_autoimporter(short_name): pro = pro_features() if not pro['autoimporter_enabled']: raise abort(403) project = project_by_shortname(short_name)[0] ensure_authorized_to('read', project) ensure_authorized_to('update', project) if project.has_autoimporter(): autoimporter = project.get_autoimporter() project.delete_autoimporter() project_repo.save(project) auditlogger.log_event(project, current_user, 'delete', 'autoimporter', json.dumps(autoimporter), 'Nothing') return redirect(url_for('.tasks', short_name=project.short_name))
def auditlog(short_name): pro = pro_features() if not pro['auditlog_enabled']: raise abort(403) (project, owner, n_tasks, n_task_runs, overall_progress, last_activity, n_results) = project_by_shortname(short_name) ensure_authorized_to('read', Auditlog, project_id=project.id) logs = auditlogger.get_project_logs(project.id) project = add_custom_contrib_button_to(project, get_user_id_or_ip()) return render_template('projects/auditlog.html', project=project, owner=owner, logs=logs, overall_progress=overall_progress, n_tasks=n_tasks, n_task_runs=n_task_runs, n_completed_tasks=cached_projects.n_completed_tasks(project.get('id')), n_volunteers=cached_projects.n_volunteers(project.get('id')), pro_features=pro)
def add_admin(user_id=None): """Add admin flag for user_id.""" try: if user_id: user = user_repo.get(user_id) if user: ensure_authorized_to('update', user) user.admin = True user_repo.update(user) return redirect_content_type(url_for(".users")) else: msg = "User not found" return format_error(msg, 404) except Exception as e: # pragma: no cover current_app.logger.error(e) return abort(500)
def del_admin(user_id=None): """Del admin flag for user_id.""" try: if user_id: user = user_repo.get(user_id) if user: ensure_authorized_to('update', user) user.admin = False user_repo.update(user) return redirect_content_type(url_for('.users')) else: msg = "User.id not found" return format_error(msg, 404) else: # pragma: no cover msg = "User.id is missing for method del_admin" return format_error(msg, 415) except Exception as e: # pragma: no cover current_app.logger.error(e) return abort(500)
def run_cmd_host(host_id): """Execute host specific command.""" #Select host on the server. res = RSPET_API.select([host_id]) #Check if host selected correctly (if not probably host_id is invalid). if res["code"] != rspet_server.ReturnCodes.OK: abort(404) #Read 'command' argument from query string. comm = request.args.get('command') if not comm or comm in EXCLUDED_FUNCTIONS: abort(400) try: #Read 'args' argument from query string. args = request.args.getlist('args') #Cast arguments to string. for i, val in enumerate(args): args[i] = str(val) except KeyError: args = [] #Execute command. res = RSPET_API.call_plugin(comm, args) #Unselect host. Maintain statelessness of RESTful. RSPET_API.select() return jsonify(res)
def run_cmd(): """Execute general (non-host specific) command.""" #Read 'command' argument from query string. comm = request.args.get('command') if not comm or comm in EXCLUDED_FUNCTIONS: abort(400) try: #Read 'args' argument from query string. args = request.args.getlist('args') #Cast arguments to string. for i, val in enumerate(args): args[i] = str(val) except KeyError: args = [] #Execute command. ret = RSPET_API.call_plugin(comm, args) if ret['code'] == rspet_server.ReturnCodes.OK: http_code = 200 else: http_code = 404 return make_response(jsonify(ret), http_code)
def post(self): parser = reqparse.RequestParser() parser.add_argument("schoolID", type=str, required=True, location="json") parser.add_argument("userID", type=str, required=True, location="json") args = parser.parse_args() user = db.tempUser.find_one({"_id": args["userID"]}) if user is None: abort(400) school = db.school.find_one({"_id": args["schoolID"]}) if school is None: abort(400) user["schoolID"] = args["schoolID"] db.user.insert_one(user) session['userID'] = user["_id"] return jsonify(user, status=201)
def test_error_handling(self): app = flask.Flask(__name__) @app.errorhandler(404) def not_found(e): return 'not found', 404 @app.errorhandler(500) def internal_server_error(e): return 'internal server error', 500 @app.route('/') def index(): flask.abort(404) @app.route('/error') def error(): 1 // 0 c = app.test_client() rv = c.get('/') self.assert_equal(rv.status_code, 404) self.assert_equal(rv.data, b'not found') rv = c.get('/error') self.assert_equal(rv.status_code, 500) self.assert_equal(b'internal server error', rv.data)
def post_recipe(): payload = request.get_json() topology = payload.get("topology") scenarios = payload.get("scenarios") headers = payload.get("headers") #pattern = payload.get("header_pattern") if not topology: abort(400, "Topology required") if not scenarios: abort(400, "Failure scenarios required") if headers and type(headers)!=dict: abort(400, "Headers must be a dictionary") # if not pattern: # abort(400, "Header_pattern required") appgraph = ApplicationGraph(topology) fg = A8FailureGenerator(appgraph, a8_controller_url='{0}/v1/rules'.format(a8_controller_url), a8_controller_token=a8_controller_token, headers=headers, debug=debug) fg.setup_failures(scenarios) return make_response(jsonify(recipe_id=fg.get_id()), 201, {'location': url_for('get_recipe_results', recipe_id=fg.get_id())})
def delete_recipe(recipe_id): # clear fault injection rules url = '{0}/v1/rules?tag={1}'.format(a8_controller_url, recipe_id) headers = {} if a8_controller_token != "" : headers['Authorization'] = "Bearer " + token try: r = requests.delete(url, headers=headers) except Exception, e: sys.stderr.write("Could not DELETE {0}".format(url)) sys.stderr.write("\n") sys.stderr.write(str(e)) sys.stderr.write("\n") abort(500, "Could not DELETE {0}".format(url)) if r.status_code != 200 and r.status_code != 204: abort(r.status_code) return ""
def serve_static_files(p, index_on_error=True): """Securely serve static files for the given path using send_file.""" # Determine the canonical path of the file full_path = os.path.realpath(os.path.join(app.static_folder, p)) # We have a problem if either: # - the path is not a sub-path of app.static_folder; or # - the path does not refer to a real file. if (os.path.commonprefix([app.static_folder, full_path]) != app.static_folder or not os.path.isfile(full_path)): file_to_return = app.config.get('STATIC_FILE_ON_404', None) if file_to_return is not None: full_path = os.path.realpath(os.path.join(app.static_folder, file_to_return)) else: return abort(404) return send_file(full_path)
def post(self): if current_user() is None: redirect(url_for('authorized')) user = current_user() if not user.faculty: return abort(403) key = request.form.get('key', None) value = request.form.get('value', None) try: try: config.set(key, config.from_frontend_value(key, json.loads(value))) return jsonify({'status': 'OK'}) except ValueError: return abort(404) except: return abort(400)
def configure_views(app): @app.route('/') def home(): return render_template('ok.html') @app.route('/mistake') def mistake(): # this causes Internal Server Error return abort(500) @app.route('/secret') def secret(): # this causes Foridden (user not authorized) return abort(403)
def validate_room_jwt(function=None, requires_jamf_configured=True): def decorator(f): @functools.wraps(f) def wrapper(*args, **kwargs): try: token = flask.request.headers.get('Authorization').split()[1] except: flask.abort(401) hipchat_room = verify_jwt(token) if requires_jamf_configured and not hipchat_room.jamf_configured: message = "You must first configure a Jamf Pro service account to use this feature." send_notification(hipchat_room.hipchat_token, hipchat_room.hipchat_room_id, message, color='yellow') flask.abort(400) return f(hipchat_room, *args, **kwargs) return wrapper return decorator(function) if function else decorator
def _dispatch(self, route, **kwargs): user = None if 'shared_id' in session: if session['shared_id'] in self.user_tracker.user_refs: user = self.user_tracker.users[session['shared_id']] if user is None: user = self.user_tracker.connect_user() session['shared_id'] = user.id_ user.open_page(route, kwargs) ctx = CallCTX(abort=abort) user.active_controller.before_connect(ctx) ctx.deactivate() possible = user.active_controller.render_page() if user.active_controller.special_return_handler is not None: return user.active_controller.special_return_handler() return possible
def role_assignment_add(name): """Assign role to user (POST handler)""" db = flask.current_app.container.get('db') login = flask.request.form.get('login', '') user = db.session.query(User).filter_by(login=login).first() role = db.session.query(Role).filter_by(name=name).first() if user is None or role is None: flask.abort(404) account = user.user_account if account in role.user_accounts: flask.flash('User {} already has role {}'.format(login, name), 'error') else: role.user_accounts.append(account) db.session.commit() flask.flash('Role {} assigned to user {}'.format(name, login), 'success') return flask.redirect(flask.url_for('admin.role_detail', name=name))
def role_assignment_remove(name): """Remove assignment of role to user (POST handler)""" db = flask.current_app.container.get('db') login = flask.request.form.get('login', '') user = db.session.query(User).filter_by(login=login).first() role = db.session.query(Role).filter_by(name=name).first() if user is None or role is None: flask.abort(404) account = user.user_account if account not in role.user_accounts: flask.flash('User {} doesn\'t have role {}'.format(login, name), 'error') else: role.user_accounts.remove(account) db.session.commit() flask.flash('Role {} removed from user {}'.format(name, login), 'success') return flask.redirect(flask.url_for('admin.role_detail', name=name))
def org_detail(login): """Organization detail (GET handler) .. todo:: implement 410 (org deleted/archived/renamed) """ db = flask.current_app.container.get('db') ext_master = flask.current_app.container.get('ext_master') org = db.session.query(Organization).filter_by(login=login).first() if org is None: user = db.session.query(User).filter_by(login=login).first() if user is None: flask.abort(404) flask.flash('Oy! You wanted to access organization, but it\'s auser.' 'We redirected you but be careful next time!', 'notice') return flask.redirect(flask.url_for('core.user_detail', login=login)) tabs = {} ext_master.call('view_core_org_detail_tabs', org=org, tabs_dict=tabs) tabs = sorted(tabs.values()) active_tab = flask.request.args.get('tab', tabs[0].id) return flask.render_template( 'core/org.html', org=org, tabs=tabs, active_tab=active_tab )
def post(self): args = parser.parse({ 'username': wf.Str(missing=None), 'email': wf.Str(missing=None), 'password': wf.Str(missing=None), }) username = args['username'] or args['email'] password = args['password'] if not username or not password: return flask.abort(400) if username.find('@') > 0: user_db = model.User.get_by('email', username.lower()) else: user_db = model.User.get_by('username', username.lower()) if user_db and user_db.password_hash == util.password_hash(user_db, password): auth.signin_user_db(user_db) return helpers.make_response(user_db, model.User.FIELDS) return flask.abort(401)
def project_view(project_id): project_db = model.Project.get_by_id(project_id) if not project_db or project_db.user_key != auth.current_user_key(): flask.abort(404) crash_dbs, crash_cursor = project_db.get_crash_dbs( order=util.param('order') or '-created', ) return flask.render_template( 'project/project_view.html', html_class='project-view', title=project_db.name, project_db=project_db, crash_dbs=crash_dbs, next_url=util.generate_next_url(crash_cursor), api_url=flask.url_for('api.project', project_key=project_db.key.urlsafe() if project_db.key else ''), ) ############################################################################### # Admin List ###############################################################################
def admin_project_update(project_id=0): if project_id: project_db = model.Project.get_by_id(project_id) else: project_db = model.Project(user_key=auth.current_user_key()) if not project_db: flask.abort(404) form = ProjectUpdateAdminForm(obj=project_db) if form.validate_on_submit(): form.populate_obj(project_db) project_db.put() return flask.redirect(flask.url_for('admin_project_list', order='-modified')) return flask.render_template( 'project/admin_project_update.html', title=project_db.name, html_class='admin-project-update', form=form, project_db=project_db, back_url_for='admin_project_list', api_url=flask.url_for('api.admin.project', project_key=project_db.key.urlsafe() if project_db.key else ''), )
def admin_test(test=None): if test and test not in TESTS: flask.abort(404) form = TestForm() if form.validate_on_submit(): pass return flask.render_template( 'test/test_one.html' if test else 'test/test.html', title='Test: %s' % test.title() if test else 'Test', html_class='test', form=form, test=test, tests=TESTS, back_url_for='admin_test' if test else None, )
def backup(username:str, timeframe:str, backup_date:str): """ Route: /backup/username/timeframe/backup_date This route returns the requested backup. :param username the server username of the user needing their backup. :param timeframe the timeframe of the requested backup. Can be either "weekly", or "monthly". :param backup_date the backup-date of the requested backup. Must be in the form YYYY-MM-DD. """ if flask.request.method != "GET": return flask.abort(400) # make sure the arguments are valid if not re.match(r"^[a-z]+$", username) or \ not re.match(r"^[0-9]{4}-[0-9]{2}-[0-9]{2}", backup_date) or \ timeframe not in ["weekly", "monthly"]: app.logger.debug("backups(%s, %s, %s): invalid arguments"%( username, timeframe, backup_date)) return flask.abort(400) backups_base_dir = os.path.join(b.BACKUPS_DIR, username, timeframe) return flask.send_from_directory(backups_base_dir, backup_date+".tgz")
def tutorials(): """ Route: /tutorials This route will render the tutorials page. Note that the markdown tutorial files are read when the application starts-up. """ global TUTORIALS if flask.request.method != "GET": return flask.abort(400) if len(TUTORIALS) == 0: return flask.render_template("tutorials.html", show_logout_button=l.is_logged_in(), error="No tutorials to show") if DEBUG: TUTORIALS = [] populate_tutorials() return flask.render_template("tutorials.html", show_logout_button=l.is_logged_in(), tutorials=TUTORIALS)