我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用cherrypy.HTTPError()。
def _setup(self): if self.skip: # Don't bother return for op, arg in list(self.funcs.items()): try: op(self, arg) except ConfigurationError as err: _txt = "Configuration error: {}".format(err) self.conv.events.store(EV_EXCEPTION, _txt) raise cherrypy.HTTPError(message=_txt) except Exception as err: _txt = "Can't do {}".format(op) self.conv.events.store(EV_EXCEPTION, _txt) raise cherrypy.HTTPError(message=_txt) # self.conv.events.store(EV_OP_ARGS, self.op_args)
def POST(self): """ Deploy on remote host(s). """ targets = cherrypy.request.json.pop('targets', None) if not targets or not isinstance(targets, list): raise cherrypy.HTTPError(status.BAD_REQUEST, ERR_NO_TARGET) role = cherrypy.request.json.pop('role', None) if not role or not isinstance(role, six.text_type): raise cherrypy.HTTPError(status.BAD_REQUEST, ERR_BAD_ROLE) extra_vars = cherrypy.request.json.pop('extra_vars', dict()) if not isinstance(extra_vars, dict): raise cherrypy.HTTPError(status.BAD_REQUEST, ERR_BAD_EXTRAVARS) partial = cherrypy.request.json.pop('partial', None) if partial != None: if not isinstance(partial, six.text_type) or partial.strip() == "": raise cherrypy.HTTPError(status.BAD_REQUEST, ERR_BAD_PARTIAL) jid = self.handle(targets, role, extra_vars, partial, async=True) return response(status.CREATED, dict(jid=jid))
def handle(self, *args, **kwargs): """ Handle api request by invoke `runner.handle()`. All exception raised by runner will be catched here, and convert them into cherrypy `HTTPError()` with corresponding status code and message. """ try: return self._runner.handle(*args, **kwargs) except JobDeleteError: raise cherrypy.HTTPError(status.BAD_REQUEST, excinst().message) except JobConflictError: raise cherrypy.HTTPError(status.CONFLICT, excinst().message) except JobNotSupportedError: raise cherrypy.HTTPError(status.INTERNAL_SERVER_ERROR, excinst().message) except (JobNotExistsError, ExecutorNoMatchError): raise cherrypy.HTTPError(status.NOT_FOUND, excinst().message) except: cherrypy.log("error response 500", traceback=True) raise cherrypy.HTTPError(status.INTERNAL_SERVER_ERROR)
def GET(self, **params): """ Manipulate service on remote host. """ name = params.pop('name', None) state = parse_params_int(params, 'state') target = parse_params_target(params) if name == None or state == None: raise cherrypy.HTTPError(status.BAD_REQUEST, ERR_BAD_SERVPARAMS) name = name.lower() start, restart = _state_parse(state) graceful = parse_params_bool(params, 'graceful') result = self.handle(target, name, start, restart, graceful) if not result: raise cherrypy.HTTPError(status.NOT_FOUND, ERR_NO_MATCH) else: return response(status.OK, result)
def POST(self): """ Manipulate service on remote host(s). """ targets = cherrypy.request.json.pop('targets', None) if not targets or not isinstance(targets, list): raise cherrypy.HTTPError(status.BAD_REQUEST, ERR_NO_TARGET) name = cherrypy.request.json.pop('name', None) state = cherrypy.request.json.pop('state', None) if name == None or state == None: raise cherrypy.HTTPError(status.BAD_REQUEST, ERR_BAD_SERVPARAMS) name = name.lower() start, restart = _state_parse(state) graceful = cherrypy.request.json.pop('graceful', False) jid = self.handle(targets, name, start, restart, graceful, async=True) return response(status.CREATED, dict(jid=jid))
def POST(self): message = cherrypy.request.json.get('message', None) name = cherrypy.request.json.get('name', None) auth_token = cherrypy.request.headers.get('Http-Auth', None) if auth_token != token: raise cherrypy.HTTPError(401, "Unauthorized") elif not message: raise cherrypy.HTTPError(422, "Message is missing") elif not name: raise cherrypy.HTTPError(422, "Name is missing") elif name not in allowed_metadata: raise cherrypy.HTTPError(422, name + " is not an allowed metadata") else: extractor = MetadataExtractor(message, [name]) return extractor.perform()
def index(self, **kwargs): if cherrypy.request.process_request_body is True: _json_doc = cherrypy.request.body.read() else: raise cherrypy.HTTPError(400, 'Missing Client registration body') if _json_doc == b'': raise cherrypy.HTTPError(400, 'Missing Client registration body') _args = json.loads(as_unicode(_json_doc)) _mds = MetadataStatement(**_args) try: _mds.verify() except (MessageException, VerificationError) as err: raise cherrypy.CherryPyException(str(err)) else: _jwt = self.signer.create_signed_metadata_statement(_mds, single=True) cherrypy.response.headers['Content-Type'] = 'application/jwt' return as_bytes(_jwt)
def register(self, url): if cherrypy.request.process_request_body is True: _json_doc = cherrypy.request.body.read() else: raise cherrypy.HTTPError(400, 'Missing Client registration body') if _json_doc == b'': raise cherrypy.HTTPError(400, 'Missing Client registration body') _args = json.loads(as_unicode(_json_doc)) _mds = MetadataStatement(**_args) try: _mds.verify() except (MessageException, VerificationError) as err: raise cherrypy.CherryPyException(str(err)) else: res = requests.post(url, json=_mds.to_json()) if 200 <= res.status_code < 300: self.signer.metadata_statements[url] = res.text cherrypy.response.headers['Content-Type'] = 'application/jwt' return as_bytes(res.text) else: raise cherrypy.HTTPError(message=res.text)
def index(self, resource='', rel=''): logger.debug('webfinger request: res={}, rel={}'.format(resource, rel)) if rel != 'http://openid.net/specs/connect/1.0/issuer': logger.error('unknown rel') raise cherrypy.HTTPError(400, "Unknown 'rel") cnf = cherrypy.request.config subj = resource _base = cnf['base_url'] if resource.startswith('http'): assert resource.startswith(_base) elif resource.startswith('acct:'): loc, dom = resource[5:].split('@', 1) r = urlparse(_base) try: assert dom == r.netloc except AssertionError: raise cherrypy.HTTPError(400, 'Not my domain') else: raise cherrypy.HTTPError(400, "URI type I don't support") return as_bytes(self.srv.response(subj, _base))
def token(self, **kwargs): if cherrypy.request.method == "OPTIONS": cherrypy_cors.preflight( allowed_methods=["POST"], origins='*', allowed_headers=['Authorization', 'content-type']) else: logger.debug('AccessTokenRequest') try: authn = cherrypy.request.headers['Authorization'] except KeyError: authn = None logger.debug('Authorization: {}'.format(authn)) try: resp = self.op.token_endpoint(kwargs, authn, 'dict') except Exception as err: raise cherrypy.HTTPError(message=str(err)) else: return conv_response(resp)
def index(self, uid='', iss=''): link = '' if iss: link = iss elif uid: try: link = self.rph.find_srv_discovery_url( resource="acct:{}".format(uid)) except requests.ConnectionError: raise cherrypy.HTTPError( message="Webfinger lookup failed, connection error") else: fname = os.path.join(self.html_home, 'opbyuid.html') return as_bytes(open(fname, 'r').read()) if link: resp_headers = self.rph.begin(link) raise cherrypy.HTTPRedirect(resp_headers['Location'])
def acb(self, op_hash='', **kwargs): try: rp = self.rph.issuer2rp[self.rph.hash2issuer[op_hash]] except KeyError: raise cherrypy.HTTPError(400, "Response to something I hadn't asked for") res = self.rph.phaseN(rp, kwargs) if res[0] is True: fname = os.path.join(self.html_home, 'opresult.html') _pre_html = open(fname, 'r').read() _html = _pre_html.format(result=create_result_page(*res[1:])) return as_bytes(_html) else: raise cherrypy.HTTPError(400, res[1])
def run(self): request = cherrypy.serving.request response = cherrypy.serving.response path = request.path_info if path.endswith('login_screen'): self._debug_message('routing %(path)r to login_screen', locals()) response.body = self.login_screen() return True elif path.endswith('do_login'): if request.method != 'POST': response.headers['Allow'] = 'POST' self._debug_message('do_login requires POST') raise cherrypy.HTTPError(405) self._debug_message('routing %(path)r to do_login', locals()) return self.do_login(**request.params) elif path.endswith('do_logout'): if request.method != 'POST': response.headers['Allow'] = 'POST' raise cherrypy.HTTPError(405) self._debug_message('routing %(path)r to do_logout', locals()) return self.do_logout(**request.params) else: self._debug_message('No special path, running do_check') return self.do_check()
def run(self, point): """Execute all registered Hooks (callbacks) for the given point.""" exc = None hooks = self[point] hooks.sort() for hook in hooks: # Some hooks are guaranteed to run even if others at # the same hookpoint fail. We will still log the failure, # but proceed on to the next hook. The only way # to stop all processing from one of these hooks is # to raise SystemExit and stop the whole server. if exc is None or hook.failsafe: try: hook() except (KeyboardInterrupt, SystemExit): raise except (cherrypy.HTTPError, cherrypy.HTTPRedirect, cherrypy.InternalRedirect): exc = sys.exc_info()[1] except: exc = sys.exc_info()[1] cherrypy.log(traceback=True, severity=40) if exc: raise exc
def process_query_string(self): """Parse the query string into Python structures. (Core)""" try: p = httputil.parse_query_string( self.query_string, encoding=self.query_string_encoding) except UnicodeDecodeError: raise cherrypy.HTTPError( 404, 'The given query string could not be processed. Query ' 'strings for this resource must be encoded with %r.' % self.query_string_encoding) # Python 2 only: keyword arguments must be byte strings (type 'str'). if six.PY2: for key, value in p.items(): if isinstance(key, six.text_type): del p[key] p[key.encode(self.query_string_encoding)] = value self.params.update(p)
def decode_entity(self , value): """Return a given byte encoded value as a string""" for charset in self.attempt_charsets: try: value = value.decode(charset) except UnicodeDecodeError: pass else: self.charset = charset return value else: raise cherrypy.HTTPError( 400, 'The request entity could not be decoded. The following ' 'charsets were attempted: %s' % repr(self.attempt_charsets) )
def run(self): request = cherrypy.serving.request response = cherrypy.serving.response path = request.path_info if path.endswith('login_screen'): self._debug_message('routing %(path)r to login_screen', locals()) response.body = self.login_screen() return True elif path.endswith('do_login'): if request.method != 'POST': response.headers['Allow'] = "POST" self._debug_message('do_login requires POST') raise cherrypy.HTTPError(405) self._debug_message('routing %(path)r to do_login', locals()) return self.do_login(**request.params) elif path.endswith('do_logout'): if request.method != 'POST': response.headers['Allow'] = "POST" raise cherrypy.HTTPError(405) self._debug_message('routing %(path)r to do_logout', locals()) return self.do_logout(**request.params) else: self._debug_message('No special path, running do_check') return self.do_check()
def process_query_string(self): """Parse the query string into Python structures. (Core)""" try: p = httputil.parse_query_string( self.query_string, encoding=self.query_string_encoding) except UnicodeDecodeError: raise cherrypy.HTTPError( 404, "The given query string could not be processed. Query " "strings for this resource must be encoded with %r." % self.query_string_encoding) # Python 2 only: keyword arguments must be byte strings (type 'str'). if six.PY2: for key, value in p.items(): if isinstance(key, six.text_type): del p[key] p[key.encode(self.query_string_encoding)] = value self.params.update(p)
def setup_server(): class Root: @cherrypy.expose def resource(self): return "Oh wah ta goo Siam." @cherrypy.expose def fail(self, code): code = int(code) if 300 <= code <= 399: raise cherrypy.HTTPRedirect([], code) else: raise cherrypy.HTTPError(code) @cherrypy.expose # In Python 3, tools.encode is on by default @cherrypy.config(**{'tools.encode.on': True}) def unicoded(self): return ntou('I am a \u1ee4nicode string.', 'escape') conf = {'/': {'tools.etags.on': True, 'tools.etags.autotags': True, }} cherrypy.tree.mount(Root(), config=conf)
def decode_entity(self , value): """Return a given byte encoded value as a string""" for charset in self.attempt_charsets: try: value = value.decode(charset) except UnicodeDecodeError: pass else: self.charset = charset return value else: raise cherrypy.HTTPError( 400, "The request entity could not be decoded. The following " "charsets were attempted: %s" % repr(self.attempt_charsets) )
def catalog_1(self, *tokens): """Outputs the contents of the specified catalog file, using the name in the request path, directly to the client.""" try: name = tokens[0] except IndexError: raise cherrypy.HTTPError(http_client.FORBIDDEN, _("Directory listing not allowed.")) try: fpath = self.repo.catalog_1(name, pub=self._get_req_pub()) except srepo.RepositoryError as e: # Treat any remaining repository error as a 404, but # log the error and include the real failure # information. cherrypy.log("Request failed: {0}".format(str(e))) raise cherrypy.HTTPError(http_client.NOT_FOUND, str(e)) self.__set_response_expires("catalog", 86400, 86400) return serve_file(fpath, "text/plain; charset=utf-8")
def manifest_1(self, *tokens): """Outputs the contents of the manifest or uploads the manifest.""" method = cherrypy.request.method if method == "GET": return self.manifest_0(*tokens) elif method in ("POST", "PUT"): return self.__upload_manifest(*tokens) raise cherrypy.HTTPError(http_client.METHOD_NOT_ALLOWED, "{0} is not allowed".format(method)) # We need to prevent cherrypy from processing the request body so that # manifest can parse the request body itself. In addition, we also need # to set the timeout higher since the default is five minutes; not # really enough for a slow connection to upload content.
def file_0(self, *tokens): """Outputs the contents of the file, named by the SHA-1 hash name in the request path, directly to the client.""" try: fhash = tokens[0] except IndexError: fhash = None try: fpath = self.repo.file(fhash, pub=self._get_req_pub()) except srepo.RepositoryFileNotFoundError as e: raise cherrypy.HTTPError(http_client.NOT_FOUND, str(e)) except srepo.RepositoryError as e: # Treat any remaining repository error as a 404, but # log the error and include the real failure # information. cherrypy.log("Request failed: {0}".format(str(e))) raise cherrypy.HTTPError(http_client.NOT_FOUND, str(e)) self.__set_response_expires("file", 86400*365, 86400*365) return serve_file(fpath, "application/data")
def file_1(self, *tokens): """Outputs the contents of the file, named by the SHA hash name in the request path, directly to the client.""" method = cherrypy.request.method if method == "GET": return self.file_0(*tokens) elif method in ("POST", "PUT"): return self.__upload_file(*tokens) raise cherrypy.HTTPError(http_client.METHOD_NOT_ALLOWED, "{0} is not allowed".format(method)) # We need to prevent cherrypy from processing the request body so that # file can parse the request body itself. In addition, we also need to # set the timeout higher since the default is five minutes; not really # enough for a slow connection to upload content.
def abandon_0(self, *tokens): """Aborts an in-flight transaction for the Transaction ID specified in the request path. Returns no output.""" try: # cherrypy decoded it, but we actually need it encoded. trans_id = quote(tokens[0], "") except IndexError: trans_id = None try: self.repo.abandon(trans_id) except srepo.RepositoryError as e: # Assume a bad request was made. A 404 can't be # returned here as misc.versioned_urlopen will interpret # that to mean that the server doesn't support this # operation. raise cherrypy.HTTPError(http_client.BAD_REQUEST, str(e))
def make(self, *args, **kwargs): if not is_logged_in: raise cherrypy.HTTPError(401) # cd into the target directory sesh_id = os.path.basename(cherrypy.request.cookie["sesh_id"].value) rate_limited_long(sesh_id) errored = False try: output = subprocess.check_output (["make"], cwd=HOME+sesh_id, stderr=subprocess.STDOUT) except subprocess.CalledProcessError as e: output = e.output errored = True fl = open(HOME+sesh_id+"/build.log", "w") fl.write(output); fl.close() if errored: raise cherrypy.HTTPError(422) return output
def _check_request(self): method = cherrypy.request.method.upper() headers = cherrypy.request.headers # Fail for other methods than get or head if method not in ("GET", "HEAD"): raise cherrypy.HTTPError(405) # Error if the requester is not allowed # for now this is a simple check just checking if the useragent matches Kodi user_agent = headers['User-Agent'].lower() if not ("kodi" in user_agent or "osmc" in user_agent): raise cherrypy.HTTPError(403) return method
def GET(self, **params): """ Work on remote host (block). """ target = parse_params_target(params) result = self.handle(target) if not result: raise cherrypy.HTTPError(status.NOT_FOUND, ERR_NO_MATCH) else: return response(status.OK, result)
def GET(self, **params): """ Execute command on remote host. """ target = parse_params_target(params) cmd = params.pop('cmd', None) if cmd == None: raise cherrypy.HTTPError(status.BAD_REQUEST, ERR_BAD_SERVPARAMS) cmd = unquote(cmd) result = self.handle(target, cmd) if not result: raise cherrypy.HTTPError(status.NOT_FOUND, ERR_NO_MATCH) else: return response(status.OK, result)
def POST(self, **params): """ Execute command on remote host(s). """ targets = cherrypy.request.json.pop('targets', None) if not targets or not isinstance(targets, list): raise cherrypy.HTTPError(status.BAD_REQUEST, ERR_NO_TARGET) cmd = cherrypy.request.json.pop('cmd', None) if not cmd or not isinstance(cmd, six.text_type): raise cherrypy.HTTPError(status.BAD_REQUEST, ERR_BAD_ROLE) jid = self.handle(targets, cmd, async=True) return response(status.CREATED, dict(jid=jid))
def _state_parse(state): restart = False if state == STATE_STARTED: start = True elif state == STATE_STOPED: start = False elif state == STATE_RESTARTED: start = True restart = True else: raise cherrypy.HTTPError(status.BAD_REQUEST, ERR_BAD_SERVPARAMS) return start, restart
def params_check_target(): """ Check the request params contains 'target' or not. """ if not cherrypy.request.params.has_key('target'): raise cherrypy.HTTPError(status.BAD_REQUEST, ERR_NO_TARGET)
def parse_params_target(params): """ Get the `target` from request params or raise http 400 error. """ try: return params.pop('target') except KeyError: raise cherrypy.HTTPError(status.BAD_REQUEST, ERR_NO_TARGET)
def index(self): if "content-length" in cherrypy.request.headers and \ "content-type" in cherrypy.request.headers and \ cherrypy.request.headers["content-type"] == "application/json": length = int(cherrypy.request.headers["content-length"]) json_string = cherrypy.request.body.read(length).decode("utf-8") update = telebot.types.Update.de_json(json_string) bot.process_new_updates([update]) return '' else: raise cherrypy.HTTPError(403)
def login(self, username=None, password=None): if username is None or password is None: raise cherrypy.HTTPError(400, 'Bad Request') try: cherrypy.session['user'] = User(username) cherrypy.session['auth'] = cherrypy.session['user'].authenticate(password) return {'ok': cherrypy.session['user'].auth} except (InvalidUser, InvalidCredentials): return { 'ok': False, 'error': 'Invalid credentials. Try again.' } except UserModelException: return {'ok': False}
def forgot(self, emailaddr, email_type): if '@' in emailaddr: localpart, domain = emailaddr.split('@') else: localpart = emailaddr domain = cherrypy.request.app.config.get('email', {}).get('domain', 'localhost') cherrypy.request.email_address = '@'.join([localpart, domain]) if email_type not in ['password', 'username']: raise cherrypy.HTTPError(400) cherrypy.request.email_type = email_type return {'ok': True}
def sshkey(self, sshpubkey=None): http_method = cherrypy.request.method.upper() if cherrypy.session.get('auth', False): user = cherrypy.session['user'] if http_method == 'POST': try: newkey = SSHKey(sshpubkey) user.add_key(newkey.key) return { 'ok': True, 'fingerprint': newkey.fingerprint, 'comment': newkey.comment } except UserModelException: return {'ok': False} except InvalidKey: return { 'ok': False, 'error': 'Not a valid SSH Public Key!' } else: return {'ok': False} if http_method == 'DELETE': try: user.delete_key(sshpubkey) return {'ok': True} except UserModelException: return {'ok': False} if http_method == 'GET': return user.sshPublicKey else: raise cherrypy.HTTPError(403)
def _check_method(cls, allowed_methods=VALID_METHODS): """ Validate the request method is in the set of allowed methods. If not, set the Allow header to the list of allowed methods and return a 405 (Method Not Allowed). """ if cherrypy.request.method.upper() not in allowed_methods: cherrypy.response.headers['Allow'] = (', ').join(allowed_methods) raise cherrypy.HTTPError(405)
def post(self): message = cherrypy.request.json.get('message', None) auth_token = cherrypy.request.headers.get('Http-Auth', None) if auth_token != token: raise cherrypy.HTTPError(401, "Unauthorized") elif not message: raise cherrypy.HTTPError(422, "Message is missing") else: stemmer = MessageStemmer(message) return stemmer.perform() # ================================== # Cherrypy configuration and routing # ==================================
def index(self): if 'content-length' in cherrypy.request.headers and \ 'content-type' in cherrypy.request.headers and \ cherrypy.request.headers['content-type'] == 'application/json': length = int(cherrypy.request.headers['content-length']) json_string = cherrypy.request.body.read(length).decode("utf-8") update = telebot.types.Update.de_json(json_string) bot.process_new_messages([update.message]) return '' else: raise cherrypy.HTTPError(403) # Handle '/start' and '/help'
def conv_response(resp): if not isinstance(resp, Response): return as_bytes(resp) cookie = cherrypy.response.cookie for header, value in resp.headers: if header == 'Set-Cookie': cookie_obj = SimpleCookie(value) for name in cookie_obj: morsel = cookie_obj[name] cookie[name] = morsel.value for key in ['expires', 'path', 'comment', 'domain', 'max-age', 'secure', 'version']: if morsel[key]: cookie[name][key] = morsel[key] _stat = int(resp._status.split(' ')[0]) # if self.mako_lookup and self.mako_template: # argv["message"] = message # mte = self.mako_lookup.get_template(self.mako_template) # return [mte.render(**argv)] if _stat < 300: cherrypy.response.status = _stat for key, val in resp.headers: cherrypy.response.headers[key] = val return as_bytes(resp.message) elif 300 <= _stat < 400: raise cherrypy.HTTPRedirect(resp.message, status=_stat) else: raise cherrypy.HTTPError(_stat, message=resp.message)
def registration(self, **kwargs): logger.debug('Request headers: {}'.format(cherrypy.request.headers)) if cherrypy.request.method == "OPTIONS": cherrypy_cors.preflight( allowed_methods=["POST", "GET"], origins='*', allowed_headers=['Authorization', 'content-type']) elif cherrypy.request.method == "GET": _cinfo = self.op.cdb[kwargs['client_id']] for attr in ['redirect_uris', 'post_logout_redirect_uris']: try: _cinfo[attr] = unpack_redirect_uri(_cinfo[attr]) except KeyError: pass rr = RegistrationResponse(**_cinfo) cherrypy.response.headers['Content-Type'] = 'application/json' return as_bytes(json.dumps(rr.to_dict())) else: logger.debug('ClientRegistration kwargs: {}'.format(kwargs)) _request = None if cherrypy.request.process_request_body is True: _request = as_unicode(cherrypy.request.body.read()) logger.debug('request_body: {}'.format(_request)) try: if _request: resp = self.op.registration_endpoint(_request) else: resp = self.op.registration_endpoint(json.dumps(kwargs)) except Exception as err: logger.error(err) raise cherrypy.HTTPError(message=str(err)) return conv_response(resp)
def claims(self, **kwargs): if cherrypy.request.method == "OPTIONS": cherrypy_cors.preflight( allowed_methods=["GET"], origins='*', allowed_headers='Authorization') else: try: authz = cherrypy.request.headers['Authorization'] except KeyError: authz = None try: assert authz.startswith("Bearer") except AssertionError: logger.error("Bad authorization token") cherrypy.HTTPError(400, "Bad authorization token") tok = authz[7:] try: _claims = self.op.claim_access_token[tok] except KeyError: logger.error("Bad authorization token") cherrypy.HTTPError(400, "Bad authorization token") else: # one time token del self.op.claim_access_token[tok] _info = Message(**_claims) jwt_key = self.op.keyjar.get_signing_key() logger.error(_info.to_dict()) cherrypy.response.headers["content-type"] = 'application/jwt' return as_bytes(_info.to_jwt(key=jwt_key, algorithm="RS256"))
def json_processor(entity): """Read application/json data into request.json.""" if not entity.headers.get(ntou('Content-Length'), ntou('')): raise cherrypy.HTTPError(411) body = entity.fp.read() with cherrypy.HTTPError.handle(ValueError, 400, 'Invalid JSON document'): cherrypy.serving.request.json = json_decode(body.decode('utf-8'))
def _get_file_path(self): f = os.path.join(self.storage_path, self.SESSION_PREFIX + self.id) if not os.path.abspath(f).startswith(self.storage_path): raise cherrypy.HTTPError(400, 'Invalid session id in cookie.') return f
def basic_auth(realm, users, encrypt=None, debug=False): """If auth fails, raise 401 with a basic authentication header. realm A string containing the authentication realm. users A dict of the form: {username: password} or a callable returning a dict. encrypt callable used to encrypt the password returned from the user-agent. if None it defaults to a md5 encryption. """ if check_auth(users, encrypt): if debug: cherrypy.log('Auth successful', 'TOOLS.BASIC_AUTH') return # inform the user-agent this path is protected cherrypy.serving.response.headers[ 'www-authenticate'] = httpauth.basicAuth(realm) raise cherrypy.HTTPError( 401, 'You are not authorized to access that resource')
def validate_since(): """Validate the current Last-Modified against If-Modified-Since headers. If no code has set the Last-Modified response header, then no validation will be performed. """ response = cherrypy.serving.response lastmod = response.headers.get('Last-Modified') if lastmod: status, reason, msg = _httputil.valid_status(response.status) request = cherrypy.serving.request since = request.headers.get('If-Unmodified-Since') if since and since != lastmod: if (status >= 200 and status <= 299) or status == 412: raise cherrypy.HTTPError(412) since = request.headers.get('If-Modified-Since') if since and since == lastmod: if (status >= 200 and status <= 299) or status == 304: if request.method in ('GET', 'HEAD'): raise cherrypy.HTTPRedirect([], 304) else: raise cherrypy.HTTPError(412) # Tool code #
def allow(methods=None, debug=False): """Raise 405 if request.method not in methods (default ['GET', 'HEAD']). The given methods are case-insensitive, and may be in any order. If only one method is allowed, you may supply a single string; if more than one, supply a list of strings. Regardless of whether the current method is allowed or not, this also emits an 'Allow' response header, containing the given methods. """ if not isinstance(methods, (tuple, list)): methods = [methods] methods = [m.upper() for m in methods if m] if not methods: methods = ['GET', 'HEAD'] elif 'GET' in methods and 'HEAD' not in methods: methods.append('HEAD') cherrypy.response.headers['Allow'] = ', '.join(methods) if cherrypy.request.method not in methods: if debug: cherrypy.log('request.method %r not in methods %r' % (cherrypy.request.method, methods), 'TOOLS.ALLOW') raise cherrypy.HTTPError(405) else: if debug: cherrypy.log('request.method %r in methods %r' % (cherrypy.request.method, methods), 'TOOLS.ALLOW')
def referer(pattern, accept=True, accept_missing=False, error=403, message='Forbidden Referer header.', debug=False): """Raise HTTPError if Referer header does/does not match the given pattern. pattern A regular expression pattern to test against the Referer. accept If True, the Referer must match the pattern; if False, the Referer must NOT match the pattern. accept_missing If True, permit requests with no Referer header. error The HTTP error code to return to the client on failure. message A string to include in the response body on failure. """ try: ref = cherrypy.serving.request.headers['Referer'] match = bool(re.match(pattern, ref)) if debug: cherrypy.log('Referer %r matches %r' % (ref, pattern), 'TOOLS.REFERER') if accept == match: return except KeyError: if debug: cherrypy.log('No Referer header', 'TOOLS.REFERER') if accept_missing: return raise cherrypy.HTTPError(error, message)