我们从Python开源项目中,提取了以下15个代码示例,用于说明如何使用bottle.error()。
def send_ws(self,event): try: _data = json.dumps(event.dict_,ensure_ascii=False) _l = cache['msg']+[_data] cache['msg'] = _l[-1*cache['len']:] if event.type_ == EVENT_LOG: print(event.dict_['log']) logger.error(event.dict_['log']) elif event.type_ == EVENT_REBOOT: reboot_ctp({}) for _ws in cs: _ws.send(_data) except Exception,e: print('ws.send_ws.error,',str(e)) finally: pass
def bridge_set(a,b,c): if c not in ['int','str','float']:return 'error type' _d = bg.get_instrument() _l = a.split('.') b = eval(c)(b) _tmp = _d _out = [] for one in _l: _out.append((one,_tmp)) if one in _tmp: _tmp = _tmp[one] else: return '%s not in correct place'%one _tmp = b _n = {} for k,d in _out[::-1]: d[k] = _tmp _tmp = d bg.set_instrument(_tmp) return str(_tmp)
def error404(error): msg = """Non-existent page""" return template('honeyd/utilities/http/index.html', data=msg) # css declaration
def raise_error(): abort(404, "error...")
def error404(error): return '404 error !!!!!'
def reboot_ctp(act): global cache global logger _now = int(time.time()/3600) print('ws,reboot_ctp,reboot') if cache.get('reboot',0)!=_now: logger.error('ws.reboot') print('reboot') cache['reboot'] = _now start_accounts(get_accounts()) logger = otherLogger().get_logger()
def test_401(self): """ WSGI: abort(401, '') (HTTP 401) """ @bottle.route('/') def test(): bottle.abort(401) self.assertStatus(401, '/') @bottle.error(401) def err(e): bottle.response.status = 200 return str(type(e)) self.assertStatus(200, '/') self.assertBody("<class 'bottle.HTTPError'>",'/')
def test_module_shortcuts(self): for name in '''route get post put delete error mount hook install uninstall'''.split(): short = getattr(bottle, name) original = getattr(bottle.app(), name) self.assertWraps(short, original)
def exception_plugin(func): def wrapper(*args, **kwargs): try: return func(*args, **kwargs) except TrackParseError as e: exception = ''.join( traceback.format_exception(*e.orig_exception_info) ) return template('error', errormsg=e.msg, exception=exception) return wrapper # Provide custom, simple error responses for 400 and 500 errors, # suitable for direct inclusion in alert models on the client.
def customerror(error): from bottle import DEBUG return template("error_simple", DEBUG=DEBUG, e=error)
def error404(error): return "Error 404: Nothing here, sorry."
def handle_500_error(http_error): if http_error.exception is not None: logger.error(http_error.exception) if http_error.traceback is not None: logger.error(http_error.traceback) if http_error.body is not None and len(http_error.body) > 0: logger.error(http_error.body) return json.dumps( { 'status': 1, 'error': "The server encountered an internal error (see server logs for details)", "details": http_error.body } )
def check_auth(): request.account = None # Allow preflight requests if request.method == 'OPTIONS': return # FIXME: this is legacy authentification, remove this token_protected_prefix = default_app().config.get("general", "token_protected_paths") or [] if 'X-Auth-Token' in request.headers and \ any(request.path.startswith(prefix) for prefix in token_protected_prefix): auth_token = request.headers['X-Auth-Token'] expected_token = default_app().config.get("general", "auth_token") if auth_token != expected_token: logger.error("Invalid Token:[%s]" % (auth_token,)) abort(401, "Invalid Token.") return # Get session token from request headers or cookies token = None if 'X-Session-Token' in request.headers: token = request.headers['X-Session-Token'] if token is None: # Allow unprotected routes if no token is provided return use_default_user() # If a token is provided, check it with database.session_scope() as session: user = session.query(m.User).filter(m.User.session_token == token).one_or_none() if user is None: logger.info("Unauthorized access attempt with token: {}".format(token)) abort(403) if user.token_issued_at + datetime.timedelta(minutes=30) < datetime.datetime.utcnow(): logger.info("Token expired: {}".format(token)) abort(403) request.account = user # Load roles, then remove them from the session so we can use them anywhere _expunge_user(request.account, session)
def error404(error): return "nothing here sorry"
def ccqHubSub(): VARS = request.json jobScriptLocation = VARS['jobScriptLocation'] jobScriptText = VARS['jobScriptFile'] ccOptionsParsed = VARS['ccOptionsCommandLine'] jobName = VARS['jobName'] jobMD5Hash = VARS["jobMD5Hash"] userName = VARS["userName"] password = VARS["password"] valKey = VARS['valKey'] dateExpires = VARS['dateExpires'] certLength = VARS['certLength'] ccAccessKey = VARS['ccAccessKey'] targetName = VARS['targetName'] remoteUserName = VARS['remoteUserName'] additionalActionsAndPermissionsRequired = VARS['additionalActionsAndPermissionsRequired'] # Since we are not calculating the instance type here we need to set it to None. It will be updated later on when we get the info from the ccq scheduler. # If the job stays for a local scheduler then this argument is never needed. ccOptionsParsed['instanceType'] = ccOptionsParsed['requestedInstanceType'] values = validateCreds(userName, password, dateExpires, valKey, certLength, ccAccessKey, remoteUserName, additionalActionsAndPermissionsRequired) if values['status'] != "success": #Credentials failed to validate on the server send back error message and failure return {"status": "failure", "payload": {"message": str(values['payload']), "cert": str(None)}} else: identity = values['payload']['identity'] userName = values['payload']['userName'] password = values['payload']['password'] cert = values['payload']['cert'] #TODO implement pricing calls to the instances # The unique identification of the user who submitted the job is the combination of the identity object and the username that they provide for the job # They are required to provide a username for the job when submitting through ccqHubsub. obj = {"jobScriptLocation": str(jobScriptLocation), "jobScriptText": str(jobScriptText), "jobName": str(jobName), "ccOptionsCommandLine": ccOptionsParsed, "jobMD5Hash": jobMD5Hash, "userName": str(userName), "targetName": str(targetName), "identity": str(identity)} values = ccqHubMethods.saveJobScript(**obj) if values['status'] != "success": return {"status": "error", "payload": {"message": values['payload'], "cert": str(None)}} else: obj = {"jobScriptLocation": str(jobScriptLocation), "jobScriptText": str(jobScriptText), "jobName": str(jobName), "ccOptionsParsed": ccOptionsParsed, "userName": str(userName), "isRemoteSubmit": "True", "identity": str(identity), "targetName": str(targetName)} results = ccqHubMethods.saveJob(**obj) if results['status'] != "success": return {"status": "error", "payload": {"message": values['payload'], "cert": str(None)}} else: generatedJobId = results['payload']['jobId'] return {"status": "success", "payload": {"message": "The job has successfully been submitted to ccqHub. The job id is: " + str(generatedJobId) + ".\n You may use this job id to lookup the job's status using the ccqstat utility.", "cert": str(cert)}}