我们从Python开源项目中,提取了以下28个代码示例,用于说明如何使用falcon.HTTPUnauthorized()。
def process_request(self, req, resp): """ :param req: Falcon request :type req: falcon.request.Request :param resp: Falcon response :type resp: falcon.response.Response :raises falcon.HTTPUnauthorized """ if (self.prefix and not req.path.startswith(self.prefix)) or req.path in self.noauth_routes: return token = req.get_header('X-Auth-Token') project = req.get_header('X-Project-ID') if token is None: description = 'Please provide an auth token as part of the request.' raise falcon.HTTPUnauthorized('Auth token required', description, None) if not self._token_is_valid(token, project): description = 'The provided auth token is not valid. Please request a new token and try again.' raise falcon.HTTPUnauthorized('Authentication required', description, None)
def process_request(self, req, resp): token = req.get_header('Authorization') account_id = req.get_header('Account-ID') challenges = ['Token type="Fernet"'] if token is None: description = ('Please provide an auth token ' 'as part of the request.') raise falcon.HTTPUnauthorized('Auth token required', description, challenges, href='http://docs.example.com/auth') if not self._token_is_valid(token, account_id): description = ('The provided auth token is not valid. ' 'Please request a new token and try again.') raise falcon.HTTPUnauthorized('Authentication required', description, challenges, href='http://docs.example.com/auth')
def check_auth(ba_ctx, req): """Check request authentication based on boot action context. Raise proper Falcon exception if authentication fails, otherwise silently return :param ba_ctx: Boot Action context from database :param req: The falcon request object of the API call """ identity_key = req.get_header('X-Bootaction-Key', default='') if identity_key == '': raise falcon.HTTPUnauthorized( title='Unauthorized', description='No X-Bootaction-Key', challenges=['Bootaction-Key']) if ba_ctx['identity_key'] != bytes.fromhex(identity_key): logger.warn( "Forbidding boot action access - node: %s, identity_key: %s, req header: %s" % (ba_ctx['node_name'], str(ba_ctx['identity_key']), str(bytes.fromhex(identity_key)))) raise falcon.HTTPForbidden( title='Unauthorized', description='Invalid X-Bootaction-Key')
def verify_credentials(req, resp, resource, params): ''' Decorator method to verify whether email and password are present in data and also the email is valid or not. ''' data = req.stream # extract required parameters from the data. try: email = data['email'] data['password'] except KeyError: msg = "Either email or password is not present or the email is invalid." raise falcon.HTTPBadRequest('Incomplete credentials', msg) if not login_utils.is_valid_email(email): msg = 'This email address does not exist.' raise falcon.HTTPUnauthorized('Invalid credentials', msg, False)
def guarded_session(): ''' Context manager that will automatically close session on exceptions ''' try: session = Session() yield session except IrisValidationException as e: session.close() raise HTTPBadRequest('Validation error', str(e)) except (HTTPForbidden, HTTPUnauthorized, HTTPNotFound, HTTPBadRequest): session.close() raise except Exception: session.close() logger.exception('SERVER ERROR') raise
def on_delete(self, req, resp, app_name): if not req.context['is_admin']: raise HTTPUnauthorized('Only admins can remove apps') affected = False with db.guarded_session() as session: try: affected = session.execute('DELETE FROM `application` WHERE `name` = :app_name', {'app_name': app_name}).rowcount session.commit() session.close() except IntegrityError: raise HTTPBadRequest('Cannot remove app. It has likely already in use.') if not affected: raise HTTPBadRequest('No rows changed; app name probably already deleted') resp.body = '[]'
def on_post(self, req, resp, app_name): if not req.context['is_admin']: raise HTTPUnauthorized('You must be an admin to rekey an app') data = { 'app_name': app_name, 'new_key': hashlib.sha256(os.urandom(32)).hexdigest() } affected = False with db.guarded_session() as session: affected = session.execute( 'UPDATE `application` SET `key` = :new_key WHERE `name` = :app_name', data).rowcount session.commit() session.close() if not affected: raise HTTPBadRequest('No rows changed; app name likely incorrect') logger.info('Admin user %s has re-key\'d app %s', req.context['username'], app_name) resp.body = '[]'
def _extract_credentials(self, req): auth = req.get_header('Authorization') token = self.parse_auth_token_from_request(auth_header=auth) try: token = base64.b64decode(token).decode('utf-8') except Exception as ex: raise falcon.HTTPUnauthorized( title='401 Unauthorized', description='Invalid Authorization Header: Unable to decode credentials', challenges=None) try: username, password = token.split(':') except ValueError: raise falcon.HTTPUnauthorized( title='401 Unauthorized', description='Invalid Authorization: Unable to decode credentials', challenges=None) return username, password
def process_resource(self, req, resp, resource, params): """Handle the authentication needs of the routed request. :param req: ``falcon`` request object that will be examined for method :param resource: ``falcon`` resource class that will be examined for authentication needs by looking at the no_authentication_methods list of http methods. By default, this will assume that all requests need authentication unless noted in this array. Note that this does not bypass any authorization checks, which will fail if the user is not authenticated. :raises: falcon.HTTPUnauthorized: when value of the 'X-Identity-Status' header is not 'Confirmed' and anonymous access is disallowed. """ authentication_required = True try: if req.method in resource.no_authentication_methods: authentication_required = False except AttributeError: # assume that authentication is required. pass if authentication_required: if req.headers.get('X-IDENTITY-STATUS') == 'Confirmed': req.context = deckhand.context.RequestContext.from_environ( req.env) elif CONF.allow_anonymous_access: req.context = deckhand.context.get_context() else: raise falcon.HTTPUnauthorized() else: req.context = deckhand.context.RequestContext.from_environ(req.env)
def test_http_unauthorized_no_title_and_desc_and_challenges(): try: raise falcon.HTTPUnauthorized() except falcon.HTTPUnauthorized as e: assert status.HTTP_401 == e.title assert e.description is None assert 'WWW-Authenticate' not in e.headers
def test_http_unauthorized_with_title_and_desc_and_challenges(): try: raise falcon.HTTPUnauthorized( title='Test', description='Testdescription', challenges=['Testch'] ) except falcon.HTTPUnauthorized as e: assert 'Test' == e.title assert 'Testdescription' == e.description assert 'Testch' == e.headers['WWW-Authenticate']
def on_get(self, req, resp): raise falcon.HTTPUnauthorized('Authentication Required', 'Missing or invalid authorization.', ['Basic realm="simple"'])
def on_post(self, req, resp): raise falcon.HTTPUnauthorized('Authentication Required', 'Missing or invalid authorization.', ['Newauth realm="apps"', 'Basic realm="simple"'])
def process_request(self, req, resp): # check auth for POST or PUT methods if req.method in ('POST', 'PUT'): token = req.get_header('X-Auth-Token') project = req.get_header('X-Project-ID') def _token_is_valid(token, project): from config.secret import secret if token == secret: req.context['authorized'] = True return req.context['authorized'] return False if token is None: description = ('Please provide an auth token ' 'as part of the request.') raise falcon.HTTPUnauthorized('Auth token required', description, href='http://example.com/auth') if not _token_is_valid(token, project): description = ('The provided auth token is not valid. ' 'Please provide a new token and try again.') raise falcon.HTTPUnauthorized('Authentication required', description, href='http://example.com/auth', scheme='Token; UUID')
def _check_signature(self, payload, signature): checker = eva.gpg.GPGSignatureChecker(payload, signature) result = checker.verify() if result.exit_code != 0: self.logger.warning('GPG verification of request failed: %s', result.stderr[0]) for line in result.stderr: self.logger.warning(line) raise falcon.HTTPUnauthorized('GPG verification of request failed.') if result.key_id is None: self.logger.warning('GPG key ID not parsed correctly from GPG output, dropping request.') raise falcon.HTTPUnauthorized('GPG verification of request failed.') self.logger.info('Request is signed by %s with %s key %s at %s', result.signer, result.key_type, result.key_id, eva.strftime_iso8601(result.timestamp)) if result.key_id not in self.gpg_key_ids: self.logger.warning("GPG key ID '%s' is not in whitelist, dropping request.", result.key_id) raise falcon.HTTPUnauthorized('Only few of mere mortals may try to enter the twilight zone.') time_diff = eva.now_with_timezone() - result.timestamp time_diff_secs = abs(time_diff.total_seconds()) if time_diff_secs > self.TIME_DIFF_THRESHOLD: self.logger.warning("GPG signature differs from local time with %.1f seconds, over threshold of %.1f seconds, dropping request.", time_diff_secs, self.TIME_DIFF_THRESHOLD) raise falcon.HTTPUnauthorized('Too high time difference between server and client; is your clock correct?') self.logger.info('Permitting access to %s with %s key %s', result.signer, result.key_type, result.key_id)
def debug_auth(self, req, resp, resource, params): req.context['username'] = req.env.get('beaker.session', {}).get('user', None) method = req.method if resource.allow_read_no_auth and method == 'GET': return # If we're authenticated using beaker, don't validate app as if this is an # API call, but set 'app' to the internal iris user as some routes (test incident creation) # need it. if req.context['username']: req.context['app'] = cache.applications.get('iris') return # For the purpose of e2etests, allow setting username via header, rather than going # through beaker username_header = req.get_header('X-IRIS-USERNAME') if username_header: req.context['username'] = username_header return # If this is a frontend route, and we're not logged in, don't fall through to process as # an app. This will allow the ACLMiddleware to force the login page. if getattr(resource, 'frontend_route', False): return # Proceed with authenticating this route as a third party application try: # Ignore HMAC requirements for custom webhooks if req.env['PATH_INFO'].startswith('/v0/webhooks/'): app = req.get_param('application', required=True) else: app, client_digest = req.get_header('AUTHORIZATION', '')[5:].split(':', 1) if app not in cache.applications: logger.warn('Tried authenticating with nonexistent app: "%s"', app) raise HTTPUnauthorized('Authentication failure', 'Application not found', []) req.context['app'] = cache.applications[app] except TypeError: return
def process_admin_acl(self, req, resource, params): req.context['is_admin'] = False # Quickly check the username in the path matches who's logged in enforce_user = getattr(resource, 'enforce_user', False) if not req.context['username']: if enforce_user: raise HTTPUnauthorized('Username must be specified for this action', '', []) return # Check if user is an admin connection = db.engine.raw_connection() cursor = connection.cursor() cursor.execute(check_username_admin_query, req.context['username']) result = cursor.fetchone() cursor.close() connection.close() if result: req.context['is_admin'] = bool(result[0]) if enforce_user and not req.context['is_admin']: path_username = params.get('username') if not equals(path_username, req.context['username']): raise HTTPUnauthorized('This user is not allowed to access this resource', '', [])
def on_get(self, req, resp, app_name): if not req.context['username']: raise HTTPUnauthorized('You must be a logged in user to view this app\'s key') with db.guarded_session() as session: if not req.context['is_admin']: has_permission = session.execute( '''SELECT 1 FROM `application_owner` JOIN `target` on `target`.`id` = `application_owner`.`user_id` JOIN `application` on `application`.`id` = `application_owner`.`application_id` WHERE `target`.`name` = :username AND `application`.`name` = :app_name''', {'app_name': app_name, 'username': req.context['username']}).scalar() if not has_permission: raise HTTPForbidden('You don\'t have permissions to view this app\'s key.') key = session.execute( 'SELECT `key` FROM `application` WHERE `name` = :app_name LIMIT 1', {'app_name': app_name}).scalar() if not key: raise HTTPBadRequest('Key for this application not found') session.close() resp.body = ujson.dumps({'key': key})
def on_put(self, req, resp, app_name): if not req.context['is_admin']: raise HTTPUnauthorized('Only admins can rename apps') try: data = ujson.loads(req.context['body']) except ValueError: raise HTTPBadRequest('Invalid json in post body') new_name = data.get('new_name', '').strip() if new_name == '': raise HTTPBadRequest('Missing new_name from post body') if new_name == app_name: raise HTTPBadRequest('New and old app name are identical') data = { 'new_name': new_name, 'old_name': app_name } affected = False with db.guarded_session() as session: try: affected = session.execute( 'UPDATE `application` SET `name` = :new_name WHERE `name` = :old_name', data).rowcount session.commit() except IntegrityError: raise HTTPBadRequest('Destination app name likely already exists') finally: session.close() if not affected: raise HTTPBadRequest('No rows changed; old app name incorrect') resp.body = '[]'
def on_post(self, req, resp): """ Accept slack's message from interactive buttons """ try: form_post = falcon.uri.parse_query_string(req.context['body']) payload = ujson.loads(form_post['payload']) if not self.valid_token(payload['token']): logger.error('Invalid token sent in the request.') raise falcon.HTTPUnauthorized('Access denied', 'Not a valid auth token') try: msg_id = int(payload['callback_id']) except KeyError as e: logger.error('callback_id not found in the json payload.') raise falcon.HTTPBadRequest('Bad Request', 'Callback id not found') except ValueError as e: logger.error('Callback ID not an integer: %s', payload['callback_id']) raise falcon.HTTPBadRequest('Bad Request', 'Callback id must be int') data = {'msg_id': msg_id, 'source': payload['user']['name'], 'content': payload['actions'][0]['name']} endpoint = self.config['iris']['hook']['slack'] try: result = self.iclient.post(endpoint, data) except MaxRetryError as e: logger.error(e.reason) return if result.status == 400: raise falcon.HTTPBadRequest('Bad Request', '') elif result.status is not 200: raise falcon.HTTPInternalServerError('Internal Server Error', 'Unknown response from the api') else: content = process_api_response(result.data) self.return_slack_message(resp, content) return except Exception: logger.exception('Unable to read payload from slack. Our post body: %s', req.context['body']) raise falcon.HTTPBadRequest('Bad Request', 'Unable to read the payload from slack')
def parse_auth_token_from_request(self, auth_header): """ Parses and returns Auth token from the request header. Raises `falcon.HTTPUnauthoried exception` with proper error message """ if not auth_header: raise falcon.HTTPUnauthorized( title='401 Unauthorized', description='Missing Authorization Header', challenges=None) parts = auth_header.split() if parts[0].lower() != self.auth_header_prefix.lower(): raise falcon.HTTPUnauthorized( title='401 Unauthorized', description='Invalid Authorization Header: ' 'Must start with {0}'.format(self.auth_header_prefix), challenges=None) elif len(parts) == 1: raise falcon.HTTPUnauthorized( title='401 Unauthorized', description='Invalid Authorization Header: Token Missing', challenges=None) elif len(parts) > 2: raise falcon.HTTPUnauthorized( title='401 Unauthorized', description='Invalid Authorization Header: Contains extra content', challenges=None) return parts[1]
def _decode_jwt_token(self, req): # Decodes the jwt token into a payload auth_header = req.get_header('Authorization') token = self.parse_auth_token_from_request(auth_header=auth_header) options = dict(('verify_' + claim, True) for claim in self.verify_claims) options.update( dict(('require_' + claim, True) for claim in self.required_claims) ) try: payload = jwt.decode(jwt=token, key=self.secret_key, options=options, algorithms=[self.algorithm], issuer=self.issuer, audience=self.audience, leeway=self.leeway) except InvalidTokenError as ex: raise falcon.HTTPUnauthorized( title='401 Unauthorized', description=str(ex), challenges=None) return payload
def authenticate(self, req, resp, resource): """ Extract auth token from request `authorization` header, deocode jwt token, verify configured claims and return either a ``user`` object if successful else raise an `falcon.HTTPUnauthoried exception` """ payload = self._decode_jwt_token(req) user = self.user_loader(payload) if not user: raise falcon.HTTPUnauthorized( title='401 Unauthorized', description='Invalid JWT Credentials', challenges=None) return user
def authenticate(self, req, resp, resource): """ Extract basic auth token from request `authorization` header, deocode the token, verifies the username/password and return either a ``user`` object if successful else raise an `falcon.HTTPUnauthoried exception` """ username, password = self._extract_credentials(req) user = self.user_loader(username, password) if not user: raise falcon.HTTPUnauthorized( title='401 Unauthorized', description='Invalid Username/Password', challenges=None) return user
def authenticate(self, req, resp, resource): for backend in self.backends: try: user = backend.authenticate(req, resp, resource) if user: return user except falcon.HTTPUnauthorized: pass raise falcon.HTTPUnauthorized( title='401 Unauthorized', description='Authorization Failed', challenges=None)
def on_post(self, req, resp, app_name): try: data = ujson.loads(req.context['body']) except ValueError: raise HTTPBadRequest('Invalid json in post body') if data.viewkeys() != required_quota_keys: raise HTTPBadRequest('Missing required keys in post body') try: for key in quota_int_keys: if int(data[key]) < 1: raise HTTPBadRequest('All int keys must be over 0') except ValueError: raise HTTPBadRequest('Some int keys are not integers') if data['hard_quota_threshold'] <= data['soft_quota_threshold']: raise HTTPBadRequest('Hard threshold must be bigger than soft threshold') with db.guarded_session() as session: application_id = session.execute( 'SELECT `id` FROM `application` WHERE `name` = :app_name', {'app_name': app_name}).scalar() if not application_id: raise HTTPBadRequest('No ID found for that application') # Only admins and application owners can change quota settings if not req.context['is_admin']: has_ownership = session.execute( check_application_ownership_query, {'application_id': application_id, 'username': req.context['username']} ).scalar() if not has_ownership: raise HTTPUnauthorized( 'You don\'t have permissions to update this app\'s quota.') is_active = session.execute( 'SELECT 1 FROM `plan_active` WHERE `name` = :plan_name', data).scalar() if not is_active: raise HTTPBadRequest('No active ID found for that plan') target_id = session.execute( 'SELECT `id` FROM `target` WHERE `name` = :target_name', data).scalar() if not target_id: raise HTTPBadRequest('No ID found for that target') data['application_id'] = application_id data['target_id'] = target_id session.execute(insert_application_quota_query, data) session.commit() session.close() resp.status = HTTP_201 resp.body = '[]'
def on_post(self, req, resp): try: data = ujson.loads(req.context['body']) except ValueError: raise HTTPBadRequest('Invalid json in post body') app_name = data.get('name', '').strip() if app_name == '': raise HTTPBadRequest('Missing app name') # Only iris super admins can create apps if not req.context['is_admin']: raise HTTPUnauthorized('Only admins can create apps') new_app_data = { 'name': app_name, 'key': hashlib.sha256(os.urandom(32)).hexdigest() } with db.guarded_session() as session: try: app_id = session.execute( 'INSERT INTO `application` (`name`, `key`) VALUES (:name, :key)', new_app_data).lastrowid session.commit() except IntegrityError: raise HTTPBadRequest('This app already exists') # Enable all modes for this app except for "drop" by default try: session.execute( '''INSERT INTO `application_mode` (`application_id`, `mode_id`) SELECT :app_id, `mode`.`id` FROM `mode` WHERE `mode`.`name` != 'drop' ''', {'app_id': app_id}) session.commit() except IntegrityError: logger.error('Failed configuring supported modes for newly created app %s', app_name) finally: session.close() logger.info('Created application "%s" with id %s', app_name, app_id) resp.status = HTTP_201 resp.body = ujson.dumps({'id': app_id})
def process_request(self, req, resp): # http basic auth if self.config['server'].get('enable_basic_auth'): hdr_auth = req.get_header('AUTHORIZATION') if not hdr_auth: raise falcon.HTTPUnauthorized('Access denied', 'No auth header', []) auth = re.sub('^Basic ', '', hdr_auth) usr, pwd = decodestring(auth).split(':') if not equals(self.basic_auth.get(usr, ''), pwd): logger.warning('basic auth failure: %s', usr) raise falcon.HTTPUnauthorized('Access denied', 'Basic auth failure', []) segments = req.path.strip('/').split('/') if segments[0] == 'api': if len(segments) >= 3: # twilio validation if segments[2] == 'twilio': sig = req.get_header('X_TWILIO_SIGNATURE') if sig is None: logger.warning("no twilio signature found!") raise falcon.HTTPUnauthorized('Access denied', 'No Twilio signature', []) uri = [req.protocol, '://', req.get_header('HOST'), self.config['server'].get('lb_routing_path', ''), req.path] if req.query_string: uri.append('?') uri.append(req.query_string) post_body = req.context['body'] expected_sigs = [compute_signature(t, ''.join(uri), post_body) for t in self.twilio_auth_token] if sig not in expected_sigs: logger.warning('twilio validation failure: %s not in possible sigs: %s', sig, expected_sigs) raise falcon.HTTPUnauthorized('Access denied', 'Twilio auth failure', []) return elif segments[2] == 'gmail' or segments[2] == 'gmail-oneclick' or segments[2] == 'slack': return elif len(segments) == 1: if segments[0] == 'health' or segments[0] == 'healthcheck': return elif segments[0] == self.config['gmail'].get('verification_code'): return raise falcon.HTTPUnauthorized('Access denied', 'Authentication failed', [])