我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用bottle.abort()。
def login_post_handler(): try: username = bottle.request.forms['username'] password = bottle.request.forms['password'] except KeyError: bottle.abort(400, 'Invalid form.') try: user = model.get_user(username) except KeyError: bottle.redirect('/login?msg=fail') if user['password_hash'] == model.PASSWORDLESS_HASH: if not handler_util.is_admin(): bottle.redirect('/login?msg=fail') else: if not sha256_crypt.verify(password, user['password_hash']): bottle.redirect('/login?msg=fail') handler_util.set_current_username(username) bottle.redirect('/')
def problem_view_handler(problem_id): try: problem_id = int(problem_id) except ValueError: bottle.abort(404, 'Problem not found.') try: problem = model.get_public_problem(problem_id=problem_id) except KeyError: bottle.abort(404, 'Problem not found.') owner = model.get_user(problem['owner']) problem_spec = model.load_blob(problem['problem_spec_hash']) snapshot_time, ranked_solutions = model.get_last_problem_ranking_snapshot( problem_id=problem_id, public_only=True) handler_util.inject_ranks_to_ranked_solutions(ranked_solutions) template_dict = { 'problem': problem, 'problem_spec': problem_spec, 'owner': owner, 'ranked_solutions': ranked_solutions, 'snapshot_time': snapshot_time, } return handler_util.render('problem_view.html', template_dict)
def admin_problem_view_handler(problem_id): problem_id = int(problem_id) try: problem = model.get_problem_for_admin(problem_id=problem_id) except KeyError: bottle.abort(404, 'Problem not found.') owner = model.get_user(problem['owner']) problem_spec = model.load_blob(problem['problem_spec_hash']) snapshot_time, ranked_solutions = model.get_last_problem_ranking_snapshot( problem_id=problem_id, public_only=False) handler_util.inject_ranks_to_ranked_solutions(ranked_solutions) team_display_name_map = handler_util.compute_team_display_name_map( solution['owner'] for solution in ranked_solutions) template_dict = { 'problem': problem, 'problem_spec': problem_spec, 'owner': owner, 'ranked_solutions': ranked_solutions, 'team_display_name_map': team_display_name_map, 'snapshot_time': snapshot_time, } return handler_util.render('admin/problem_view.html', template_dict)
def admin_solution_view_handler(solution_id): solution_id = int(solution_id) try: solution = model.get_solution_for_admin(solution_id=solution_id) problem = model.get_problem_for_admin(problem_id=solution['problem_id']) except KeyError: bottle.abort(404, 'Solution not found.') problem_owner = model.get_user(problem['owner']) solution_owner = model.get_user(solution['owner']) solution_spec = model.load_blob(solution['solution_spec_hash']) template_dict = { 'solution': solution, 'solution_spec': solution_spec, 'solution_owner': solution_owner, 'problem_owner': problem_owner, } return handler_util.render('admin/solution_view.html', template_dict)
def enforce_api_rate_limit(action, limit_in_window): """Enforces API rate limit. Args: action: Action name. limit_in_window: Maximum number of requests of this action in a window. Raises: bottle.HTTPError: If the rate limit is exceeded. """ if (FLAGS.enable_load_test_hacks and bottle.request.headers.get('X-Load-Test', '') == 'yes'): return if get_current_user()['organizer']: return username = get_current_username() if (model.record_last_api_access_time(username) < FLAGS.api_rate_limit_request_interval): bottle.abort(429, 'Rate limit exceeded (per-second limit).') count = model.increment_api_rate_limit_counter(username, action) if count > limit_in_window: bottle.abort(429, 'Rate limit exceeded (per-hour limit).') # http://flask.pocoo.org/snippets/44/
def post_batch_entityset(entitySetName): try: result = dataProviderDto.apiProvider.handleInsertEntityBatch(entitySetName, request.json, dataService) response.content_type = "application/json; charset=utf-8" return json.dumps(result, cls=CustomEncoder) except dalUtils.StatusCodeError as err: response.status = err.value except: abort(400, 'Bad Request') ## DELETE: api/datasource/crud/batch/:entitySetName #@delete('/api/datasource/crud/batch/<entitySetName>') #def delete_batch_entityset(entitySetName): # try: # result = dataProviderDto.apiProvider.handleDeleteEntityBatch(entitySetName, request.json, dataService) # response.content_type = "application/json; charset=utf-8" # return json.dumps(result, cls=CustomEncoder) # except dalUtils.StatusCodeError as err: # response.status = err.value # except: # abort(400, 'Bad Request') # DELETE: api/datasource/crud/batch/:entitySetName?keys=key1:1,2,3,4;key2:4,5,6,7
def v1_random_fc_get(response, dbid_to_visid, store): '''Retrieves a random feature collection from the database. The route for this endpoint is: ``GET /dossier/v1/random/feature-collection``. Assuming the database has at least one feature collection, this end point returns an array of two elements. The first element is the content id and the second element is a feature collection (in the same format returned by :func:`dossier.web.routes.v1_fc_get`). If the database is empty, then a 404 error is returned. Note that currently, this may not be a uniformly random sample. ''' # Careful, `store.scan()` would be obscenely slow here... sample = streaming_sample(store.scan_ids(), 1, 1000) if len(sample) == 0: bottle.abort(404, 'The feature collection store is empty.') return [dbid_to_visid(sample[0]), util.fc_to_json(store.get(sample[0]))]
def verify_user_exists(user): user in pubsub.user_info or abort(404, f'Unknown user: {user!r}')
def register_handler(): if settings.has_contest_finished(): bottle.abort(403, 'The contest is over!') empty_user = { 'display_name': '', 'contact_email': '', 'member_names': '', 'nationalities': '', 'languages': '', 'source_url': '', } return handler_util.render('register.html', {'user': empty_user})
def register_post_handler(): if settings.has_contest_finished(): bottle.abort(403, 'The contest is over!') try: basic_profile = handler_util.parse_basic_profile_forms() except ValueError: bottle.abort(400, 'Invalid form.') username, password = model.register_user( remote_host=bottle.request.remote_addr, **basic_profile) template_dict = { 'username': username, 'password': password, } return handler_util.render('registered.html', template_dict)
def profile_post_handler(): try: basic_profile = handler_util.parse_basic_profile_forms() except ValueError: bottle.abort(400, 'Invalid form.') username = handler_util.get_current_username() model.update_user(username, **basic_profile) bottle.redirect('/profile?msg=updated')
def solution_submit_handler(): if handler_util.get_current_user()['organizer']: bottle.abort(400, 'You are organizer, not allowed to submit solutions.') problem_id = bottle.request.query.get('problem_id', '') return handler_util.render('solution_submit.html', {'problem_id': problem_id})
def admin_visualize_problem_handler(problem_id): thumbnail = bool(bottle.request.query.get('thumbnail')) problem_id = int(problem_id) try: problem = model.get_problem_for_admin(problem_id=problem_id) except KeyError: bottle.abort(404, 'Problem not found.') problem_spec = model.load_blob(problem['problem_spec_hash']) image_binary = visualize.visualize_problem(problem_spec, thumbnail) bottle.response.content_type = 'image/png' return image_binary
def admin_visualize_solution_handler(solution_id): thumbnail = bool(bottle.request.query.get('thumbnail')) solution_id = int(solution_id) try: solution = model.get_solution_for_admin(solution_id=solution_id) except KeyError: bottle.abort(404, 'Solution not found.') solution_spec = model.load_blob(solution['solution_spec_hash']) problem_spec = model.load_blob(solution['problem_spec_hash']) image_binary = visualize.visualize_solution( problem_spec, solution_spec, thumbnail) bottle.response.content_type = 'image/png' return image_binary
def testing_cron_snapshot_job_handler(): if not FLAGS.enable_testing_handlers: bottle.abort(403, 'Disabled') cron_jobs.snapshot_job() return 'done'
def _protect_before_contest_hook(): """Before-request hook to protect the whole website before the contest.""" if bottle.request.path.startswith(('/health', '/ping', '/api/', '/admin/')): return if not is_admin() and not settings.has_contest_started(): bottle.abort(403, 'The contest has not started yet.')
def _enforce_web_rate_limit_hook(): """Before-request hook to enforce web rate limit.""" if (FLAGS.enable_load_test_hacks and bottle.request.headers.get('X-Load-Test', '') == 'yes'): return if bottle.request.path.startswith(('/health', '/ping', '/static/', '/api/', '/admin/')): return user = get_current_user() if not user: return if user['organizer']: return if not model.decrement_web_rate_limit_counter(user['_id']): bottle.abort(429, 'Rate limit exceeded.')
def _protect_xsrf_hook(): """Before-request hook to protect from XSRF attacks.""" # No need to protect API calls. if bottle.request.path.startswith('/api/'): return if bottle.request.method not in ('GET', 'HEAD'): xsrf_token = bottle.request.forms.get('xsrf_token', 'N/A') if xsrf_token != get_xsrf_token(): bottle.abort(400, 'XSRF token is incorrect or not set.')
def _require_gzip_hook(): """Before-request hook to require gzip for API requests.""" if (FLAGS.enable_load_test_hacks and bottle.request.headers.get('X-Load-Test', '') == 'yes'): return if bottle.request.path.startswith('/api/'): accept_encoding = bottle.request.headers.get( 'X-Accept-Encoding', bottle.request.headers.get('Accept-Encoding', '')) if 'gzip' not in accept_encoding: bottle.abort( 400, 'Accept-Encoding: gzip is required for API requests')
def checkserial(func): """Decorator to call function only when machine connected.""" def _decorator(*args, **kwargs): if driveboard.connected(): return func(*args, **kwargs) else: bottle.abort(400, "No machine.") return _decorator ### STATIC FILES
def offset(x, y, z): if not driveboard.status()['ready']: bottle.abort(400, "Machine not ready.") driveboard.def_offset_custom(x, y, z) driveboard.sel_offset_custom() return '{}'
def clear_offset(): if not driveboard.status()['ready']: bottle.abort(400, "Machine not ready.") driveboard.def_offset_custom(0,0,0) driveboard.sel_offset_table() return '{}' ### JOBS QUEUE
def _get(jobname, library=False): # get job as sting if library: jobpath = os.path.join(conf['rootdir'], 'library', jobname.strip('/\\')) else: jobpath = os.path.join(conf['stordir'], jobname.strip('/\\')) if os.path.exists(jobpath+'.dba'): jobpath = jobpath+'.dba' elif os.path.exists(jobpath + '.dba.starred'): jobpath = jobpath + '.dba.starred' else: bottle.abort(400, "No such file.") with open(jobpath) as fp: job = fp.read() return job
def _get_path(jobname, library=False): if library: jobpath = os.path.join(conf['rootdir'], 'library', jobname.strip('/\\')) else: jobpath = os.path.join(conf['stordir'], jobname.strip('/\\')) if os.path.exists(jobpath+'.dba'): return jobpath+'.dba' elif os.path.exists(jobpath+'.dba.starred'): return jobpath+'.dba.starred' else: bottle.abort(400, "No such file.")
def load(): """Load a dba, svg, dxf, or gcode job. Args: (Args come in through the POST request.) job: Parsed dba or job string (dba, svg, dxf, or ngc). name: name of the job (string) optimize: flag whether to optimize (bool) overwrite: flag whether to overwite file if present (bool) """ load_request = json.loads(bottle.request.forms.get('load_request')) job = load_request.get('job') # always a string name = load_request.get('name') # optimize defaults if 'optimize' in load_request: optimize = load_request['optimize'] else: optimize = True # overwrite defaults if 'overwrite' in load_request: overwrite = load_request['overwrite'] else: overwrite = False # sanity check if job is None or name is None: bottle.abort(400, "Invalid request data.") # convert try: job = jobimport.convert(job, optimize=optimize) except TypeError: if DEBUG: traceback.print_exc() bottle.abort(400, "Invalid file type.") if not overwrite: name = _unique_name(name) _add(json.dumps(job), name) return json.dumps(name)
def listing(kind=None): """List all queue jobs by name.""" if kind is None: files = _get_sorted('*.dba*', stripext=True) elif kind == 'starred': files = _get_sorted('*.dba.starred', stripext=True) print files elif kind == 'unstarred': files = _get_sorted('*.dba', stripext=True) else: bottle.abort(400, "Invalid kind.") return json.dumps(files)
def star(jobname): """Star a job.""" jobpath = _get_path(jobname) if jobpath.endswith('.dba'): os.rename(jobpath, jobpath + '.starred') else: bottle.abort(400, "No such file.") return '{}'
def unstar(jobname): """Unstar a job.""" jobpath = _get_path(jobname) if jobpath.endswith('.starred'): os.rename(jobpath, jobpath[:-8]) else: bottle.abort(400, "No such file.") return '{}'
def run(jobname): """Send job from queue to the machine.""" job = _get(jobname) if not driveboard.status()['ready']: bottle.abort(400, "Machine not ready.") driveboard.job(json.loads(job)) return '{}'
def build(): """Build firmware from firmware/src files (for all config files).""" return_code = driveboard.build() if return_code != 0: bottle.abort(400, "Build failed.") else: return '{}'
def flash(firmware=None): """Flash firmware to MCU.""" if firmware is None: return_code = driveboard.flash() else: return_code = driveboard.flash(firmware=firmware) if return_code != 0: bottle.abort(400, "Flashing failed.") else: return '{}'
def reset(): """Reset MCU""" try: driveboard.reset() except IOError: bottle.abort(400, "Reset failed.") return '{}'
def error_translation(func): def wrapper(*args, **kwargs): try: return func(*args, **kwargs) except ValueError as e: traceback.print_exc() bottle.abort(400, e.args) except exceptions.ExperimentNotFound as e: traceback.print_exc() bottle.abort(404, e.message) except exceptions.ExperimentValidationError as e: traceback.print_exc() bottle.abort(400, e.message) except exceptions.ManagerNotFound as e: traceback.print_exc() bottle.abort(404, e.message) except exceptions.ResourceAlreadyBooked as e: traceback.print_exc() bottle.abort(400, e.message) except exceptions.ResourceNotFound as e: traceback.print_exc() bottle.abort(404, e.message) except exceptions.RpcFailedCall: traceback.print_exc() bottle.abort(500, "Ups, an internal error occurred, please report to us the procedure and we will fix it") except FileNotFoundError: traceback.print_exc() bottle.abort(404, "File not found in your request") return wrapper
def get_metadata(): try: response.content_type = "application/json; charset=utf-8" metadataClient = databaseInfo.getMetadataClient() return json.dumps(metadataClient, cls=MetadataEncoder, indent=2) except: abort(500, 'Internal server error') # GET: api/datasource/crud/:entitySetName?skip=20&top=10
def get_entityset(entitySetName): try: result = dataProviderDto.apiProvider.handleGet(entitySetName, request.query, dataService) response.content_type = "application/json; charset=utf-8" return json.dumps(result, cls=CustomEncoder, indent=2) except: abort(400, 'Bad Request') # GET: api/datasource/crud/single/:entitySetName?keys=key1:{key1}
def get_single_entityset(entitySetName): try: result = dataProviderDto.apiProvider.handleGetSingle(entitySetName, request.query, dataService) response.content_type = "application/json; charset=utf-8" return json.dumps(result, cls=CustomEncoder, indent=2) except: abort(400, 'Bad Request') # GET: api/datasource/crud/many/:entitySetName?keys=key1:1,2,3,4;key2:4,5,6,7
def get_many_entityset(entitySetName): try: result = dataProviderDto.apiProvider.handleGetMany(entitySetName, request.query, dataService) response.content_type = "application/json; charset=utf-8" return json.dumps(result, cls=CustomEncoder, indent=2) except: abort(400, 'Bad Request') # PUT: api/datasource/crud/:entitySetName?keys=key1:{key1}
def put_entityset(entitySetName): try: result = dataProviderDto.apiProvider.handleUpdateEntity(entitySetName, request.query, request.json, dataService) response.content_type = "application/json; charset=utf-8" return json.dumps(result, cls=CustomEncoder) except dalUtils.StatusCodeError as err: response.status = err.value except: abort(400, 'Bad Request') # PATCH: api/datasource/crud/:entitySetName?keys=key1:{key1} #@patch('/api/datasource/crud/<entitySetName>')
def post_entityset(entitySetName): # test1 = json.loads(request.body.read()) try: result = dataProviderDto.apiProvider.handleInsertEntity(entitySetName, request.json, dataService) response.content_type = "application/json; charset=utf-8" return json.dumps(result, cls=CustomEncoder) except dalUtils.StatusCodeError as err: response.status = err.value except: abort(400, 'Bad Request') # DELETE: api/datasource/crud/:entitySetName?keys=key1:{key1}
def delete_entityset(entitySetName): try: result = dataProviderDto.apiProvider.handleDeleteEntity(entitySetName, request.query, dataService) response.content_type = "application/json; charset=utf-8" return json.dumps(result, cls=CustomEncoder) except dalUtils.StatusCodeError as err: response.status = err.value except: abort(400, 'Bad Request') # PUT: api/datasource/crud/batch/:entitySetName
def put_batch_entityset(entitySetName): try: result = dataProviderDto.apiProvider.handleUpdateEntityBatch(entitySetName, request.json, dataService) response.content_type = "application/json; charset=utf-8" return json.dumps(result, cls=CustomEncoder) except dalUtils.StatusCodeError as err: response.status = err.value except: abort(400, 'Bad Request') # PATCH: api/datasource/crud/batch/:entitySetName #@patch('/api/datasource/crud/batch/<entitySetName>')
def patch_batch_entityset(entitySetName): try: result = dataProviderDto.apiProvider.handleUpdateEntityBatch(entitySetName, request.json, dataService) response.content_type = "application/json; charset=utf-8" return json.dumps(result, cls=CustomEncoder) except dalUtils.StatusCodeError as err: response.status = err.value except: abort(400, 'Bad Request') # POST: api/datasource/crud/batch/:entitySetName
def charity(regno, filetype='html'): res = app.config["es"].get(index=app.config["es_index"], doc_type=app.config["es_type"], id=regno, ignore=[404]) if "_source" in res: if filetype == "html": return bottle.template('charity', charity=res["_source"], charity_id=res["_id"]) else: return res["_source"] else: bottle.abort(404, bottle.template('Charity {{regno}} not found.', regno=regno))
def charity(regno): res = app.config["es"].get(index=app.config["es_index"], doc_type=app.config["es_type"], id=regno, ignore=[404]) if "_source" in res: return bottle.template('preview', charity=res["_source"], charity_id=res["_id"]) else: bottle.abort(404, bottle.template('Charity {{regno}} not found.', regno=regno))
def test_httperror_in_generator_callback(self): @self.app.route('/') def test(): yield bottle.abort(404, 'teststring') self.assertInBody('teststring') self.assertInBody('404 Not Found') self.assertStatus(404)
def test_401(self): """ WSGI: abort(401, '') (HTTP 401) """ @bottle.route('/') def test(): bottle.abort(401) self.assertStatus(401, '/') @bottle.error(401) def err(e): bottle.response.status = 200 return str(type(e)) self.assertStatus(200, '/') self.assertBody("<class 'bottle.HTTPError'>",'/')
def view_post(user, slug): post = Post.query().where(Post.slug == slug).get() if post is None: return abort(404) return {"post": post, "embedded": False} # [END post] # [START create]
def user_required(*permissions): def decorator(fn): def wrapper(*args, **kwargs): session_id = request.get_cookie("sid") if not session_id: return redirect("/login") session = Session.get(session_id) if not session: return redirect("/login") user = session.user.get() if user is None: return redirect("/login") for permission in permissions: if permission not in user.permissions: return abort(403) return fn(user, *args, **kwargs) return wrapper return decorator # [END user-required] # [START create-session]
def create_injector(param_name, fun_param_value): '''Dependency injection with Bottle. This creates a simple dependency injector that will map ``param_name`` in routes to the value ``fun_param_value()`` each time the route is invoked. ``fun_param_value`` is a closure so that it is lazily evaluated. This is useful for handling thread local services like database connections. :param str param_name: name of function parameter to inject into :param fun_param_value: the value to insert :type fun_param_value: a closure that can be applied with zero arguments ''' class _(object): api = 2 def apply(self, callback, route): if param_name not in inspect.getargspec(route.callback)[0]: return callback def _(*args, **kwargs): pval = fun_param_value() if pval is None: logger.error('service "%s" unavailable', param_name) bottle.abort(503, 'service "%s" unavailable' % param_name) return kwargs[param_name] = pval return callback(*args, **kwargs) return _ return _()
def v1_search(request, response, visid_to_dbid, config, search_engines, filters, cid, engine_name): '''Search feature collections. The route for this endpoint is: ``/dossier/v1/<content_id>/search/<search_engine_name>``. ``content_id`` can be any *profile* content identifier. (This restriction may be lifted at some point.) Namely, it must start with ``p|``. ``engine_name`` corresponds to the search strategy to use. The list of available search engines can be retrieved with the :func:`v1_search_engines` endpoint. This endpoint returns a JSON payload which is an object with a single key, ``results``. ``results`` is a list of objects, where the objects each have ``content_id`` and ``fc`` attributes. ``content_id`` is the unique identifier for the result returned, and ``fc`` is a JSON serialization of a feature collection. There are also two query parameters: * **limit** limits the number of results to the number given. * **filter** sets the filtering function. The default filter function, ``already_labeled``, will filter out any feature collections that have already been labeled with the query ``content_id``. ''' db_cid = visid_to_dbid(cid) try: search_engine = search_engines[engine_name] except KeyError as e: bottle.abort(404, 'Search engine "%s" does not exist.' % e.message) query = request.query if request.method == 'GET' else request.forms search_engine = (config.create(search_engine) .set_query_id(db_cid) .set_query_params(query)) for name, filter in filters.items(): search_engine.add_filter(name, config.create(filter)) return search_engine.respond(response)