我们从Python开源项目中,提取了以下27个代码示例,用于说明如何使用falcon.HTTPForbidden()。
def authorize(action): """Verifies whether a policy action can be performed given the credentials found in the falcon request context. :param action: The policy action to enforce. :returns: ``True`` if policy enforcement succeeded, else ``False``. :raises: falcon.HTTPForbidden if policy enforcement failed or if the policy action isn't registered under ``deckhand.policies``. """ def decorator(func): @functools.wraps(func) def handler(*args, **kwargs): # args[1] is always the falcon Request object. context = args[1].context _do_enforce_rbac(action, context) return func(*args, **kwargs) return handler return decorator
def check_auth(ba_ctx, req): """Check request authentication based on boot action context. Raise proper Falcon exception if authentication fails, otherwise silently return :param ba_ctx: Boot Action context from database :param req: The falcon request object of the API call """ identity_key = req.get_header('X-Bootaction-Key', default='') if identity_key == '': raise falcon.HTTPUnauthorized( title='Unauthorized', description='No X-Bootaction-Key', challenges=['Bootaction-Key']) if ba_ctx['identity_key'] != bytes.fromhex(identity_key): logger.warn( "Forbidding boot action access - node: %s, identity_key: %s, req header: %s" % (ba_ctx['node_name'], str(ba_ctx['identity_key']), str(bytes.fromhex(identity_key)))) raise falcon.HTTPForbidden( title='Unauthorized', description='Invalid X-Bootaction-Key')
def guarded_session(): ''' Context manager that will automatically close session on exceptions ''' try: session = Session() yield session except IrisValidationException as e: session.close() raise HTTPBadRequest('Validation error', str(e)) except (HTTPForbidden, HTTPUnauthorized, HTTPNotFound, HTTPBadRequest): session.close() raise except Exception: session.close() logger.exception('SERVER ERROR') raise
def on_post(self, req, resp): """Check with plex web api the user id and server to have access""" if 'X-TOKEN' not in req.headers: raise falcon.HTTPForbidden('Permission Denied', 'Missing header X-token') token = req.headers['X-TOKEN'] result = plex_get_request('https://plex.tv/users/account.json', token) if result.status_code == 401 or result.status_code == 422: raise falcon.HTTPForbidden('Permission Denied', 'Provided token is not valid') account_id = result.json()['user']['id'] result = plex_get_request('https://plex.tv/pms/servers', token) media_container_element = ElementTree.fromstring(result.content) servers = set() for server_element in media_container_element: servers.add(server_element.attrib['machineIdentifier']) # The following action is idempotent, so we can run it without worrying for server_id in servers: server.Actions.add_account(server_id, account_id) account.Actions.add_server(account_id, server_id) resp.status = falcon.HTTP_200
def on_get(self, req, resp): token = req.get_param('token', True) data = {} for key in self.data_keys: data[key] = req.get_param(key, True) if not self.validate_token(token, data): raise falcon.HTTPForbidden('Invalid token for these given values', '') endpoint = self.config['iris']['hook']['gmail_one_click'] try: result = self.iclient.post(endpoint, data) except MaxRetryError: logger.exception('Hitting iris-api failed for gmail oneclick') else: if result.status == 204: resp.status = falcon.HTTP_204 return else: logger.error('Unexpected status code from api %s for gmail oneclick', result.status) raise falcon.HTTPInternalServerError('Internal Server Error', 'Invalid response from API')
def test_authenticate_with_invalid_password(self): """ Verify authenticate denies with a proper JSON file, Authorization header, and the wrong password. """ with mock.patch('cherrypy.engine.publish') as _publish: return_value = mock.MagicMock(etcd.EtcdResult) with open(self.user_config, 'r') as users_file: return_value.value = users_file.read() manager = mock.MagicMock(StoreHandlerManager) _publish.return_value = [manager] manager.get.return_value = return_value # Reload with the data from the mock'd Etcd http_basic_auth = httpbasicauth.HTTPBasicAuth() req = falcon.Request( create_environ(headers={'Authorization': 'basic YTpiCg=='})) resp = falcon.Response() self.assertRaises( falcon.HTTPForbidden, http_basic_auth.authenticate, req, resp)
def authenticate(self, req, resp): """ Implements the authentication logic. :param req: Request instance that will be passed through. :type req: falcon.Request :param resp: Response instance that will be passed through. :type resp: falcon.Response :raises: falcon.HTTPForbidden """ cert = req.env.get(SSL_CLIENT_VERIFY, {}) if cert: for obj in cert.get('subject', ()): for key, value in obj: if key == 'commonName' and \ (not self.cn or value == self.cn): return # Forbid by default raise falcon.HTTPForbidden('Forbidden', 'Forbidden')
def authenticate(self, req, resp): """ Implements the authentication logic. :param req: Request instance that will be passed through. :type req: falcon.Request :param resp: Response instance that will be passed through. :type resp: falcon.Response :raises: falcon.HTTPForbidden """ user, passwd = self._decode_basic_auth(req) if user is not None and passwd is not None: if user in self._data.keys(): self.logger.debug('User {0} found in datastore.'.format(user)) if self.check_authentication(user, passwd): return # Authentication is good # Forbid by default raise falcon.HTTPForbidden('Forbidden', 'Forbidden')
def test_enforce_disallowed_action(self): action = "deckhand:list_cleartext_documents" error_re = "Policy doesn't allow %s to be performed." % action e = self.assertRaises( falcon.HTTPForbidden, self._enforce_policy, action) self.assertRegexpMatches(error_re, e.description)
def test_enforce_nonexistent_action(self): action = "example:undefined" error_re = "Policy %s has not been registered" % action e = self.assertRaises( falcon.HTTPForbidden, self._enforce_policy, action) self.assertRegexpMatches(error_re, e.description)
def test_falcon_exception_formatting(self): """Verify formatting for an exception class that inherits from :class:`falcon.HTTPError`. """ expected_msg = ( 'deckhand:create_cleartext_documents is disallowed by policy') with mock.patch.object( policy, '_do_enforce_rbac', spec_set=policy._do_enforce_rbac) as m_enforce_rbac: m_enforce_rbac.side_effect = falcon.HTTPForbidden( description=expected_msg) resp = self.app.simulate_put( '/api/v1.0/buckets/test/documents', headers={'Content-Type': 'application/x-yaml'}, body=None) expected = { 'status': 'Failure', 'kind': 'status', 'code': '403 Forbidden', 'apiVersion': 'v1.0', 'reason': '403 Forbidden', 'retry': False, 'details': { 'errorType': 'HTTPForbidden', 'errorCount': 1, 'messageList': [ { 'message': expected_msg, 'error': True } ] }, 'message': expected_msg, 'metadata': {} } body = yaml.safe_load(resp.text) self.assertEqual(403, resp.status_code) self.assertEqual(expected, body)
def _do_enforce_rbac(action, context, do_raise=True): init() credentials = context.to_policy_values() target = {'project_id': context.project_id, 'user_id': context.user_id} exc = errors.PolicyNotAuthorized try: # `oslo.policy` supports both enforce and authorize. authorize is # stricter because it'll raise an exception if the policy action is # not found in the list of registered rules. This means that attempting # to enforce anything not found in ``deckhand.policies`` will error out # with a 'Policy not registered' message. return _ENFORCER.authorize( action, target, context.to_dict(), do_raise=do_raise, exc=exc, action=action) except policy.PolicyNotRegistered as e: LOG.exception('Policy not registered.') raise falcon.HTTPForbidden(description=six.text_type(e)) except Exception as e: LOG.debug( 'Policy check for %(action)s failed with credentials ' '%(credentials)s', {'action': action, 'credentials': credentials}) raise falcon.HTTPForbidden(description=six.text_type(e))
def __init__(self, title=None, description=None, **kwargs): super(HTTPForbidden, self).__init__(title or self.title, description or self.description, **kwargs)
def on_post(self, req, resp): raise falcon.HTTPForbidden(falcon.HTTP_403, 'Setec Astronomy')
def on_post(self, req, resp): raise falcon.HTTPForbidden( 'Request denied', 'You do not have write permissions for this queue.', href='http://example.com/api/rbac')
def process_request(self, req, resp): # req.host has the port stripped from what the browser # sent us, even when it isn't 80, which is probably a bug # in Falcon. We deal with that in __init__ by removing # ports from self.hosts. if req.host not in self.hosts: print("Attempted request with Host header '%s' denied" % req.host) raise falcon.HTTPForbidden("Bad Host header", "Cannot connect via the provided hostname") # the gunicorn application
def on_get(self, req, resp, app_name): if not req.context['username']: raise HTTPUnauthorized('You must be a logged in user to view this app\'s key') with db.guarded_session() as session: if not req.context['is_admin']: has_permission = session.execute( '''SELECT 1 FROM `application_owner` JOIN `target` on `target`.`id` = `application_owner`.`user_id` JOIN `application` on `application`.`id` = `application_owner`.`application_id` WHERE `target`.`name` = :username AND `application`.`name` = :app_name''', {'app_name': app_name, 'username': req.context['username']}).scalar() if not has_permission: raise HTTPForbidden('You don\'t have permissions to view this app\'s key.') key = session.execute( 'SELECT `key` FROM `application` WHERE `name` = :app_name LIMIT 1', {'app_name': app_name}).scalar() if not key: raise HTTPBadRequest('Key for this application not found') session.close() resp.body = ujson.dumps({'key': key})
def validate_account_server_access(req, resp, resource, params): if 'X-SERVER-ID' not in req.headers: raise falcon.HTTPForbidden('Permission Denied', 'Missing header X-server-id') server_id = req.headers['X-SERVER-ID'] account_id = params['account_id'] servers = Actions.get_servers(account_id) try: b = servers.index(server_id) except ValueError: raise falcon.HTTPForbidden('Permission Denied', 'Server ' + server_id + ' has no access to account id ' + account_id)
def test_authenticator_process_request(self): """ Verify Authenticator's process_request calls authenticate. """ self.assertRaises( falcon.HTTPForbidden, self.authenticator.process_request, falcon.Request(create_environ()), falcon.Response())
def test_authenticate_with_invalid_user(self): """ Verify authenticate denies with a proper JSON file, Authorization header, and no matching user. """ self.http_basic_auth = httpbasicauth.HTTPBasicAuth(self.user_config) req = falcon.Request( create_environ(headers={'Authorization': 'basic Yjpi'})) resp = falcon.Response() self.assertRaises( falcon.HTTPForbidden, self.http_basic_auth.authenticate, req, resp)
def test_authenticate_with_invalid_password(self): """ Verify authenticate denies with a proper JSON file, Authorization header, and the wrong password. """ self.http_basic_auth= httpbasicauth.HTTPBasicAuth(self.user_config) req = falcon.Request( create_environ(headers={'Authorization': 'basic YTpiCg=='})) resp = falcon.Response() self.assertRaises( falcon.HTTPForbidden, self.http_basic_auth.authenticate, req, resp)
def test_authenticate_with_invalid_user(self): """ Verify authenticate denies with a proper JSON in Etcd, Authorization header, and no matching user. """ with mock.patch('cherrypy.engine.publish') as _publish: # Mock the return of the Etcd get result return_value = mock.MagicMock(etcd.EtcdResult) with open(self.user_config, 'r') as users_file: return_value.value = users_file.read() manager = mock.MagicMock(StoreHandlerManager) _publish.return_value = [manager] manager.get.return_value = return_value # Reload with the data from the mock'd Etcd http_basic_auth = httpbasicauth.HTTPBasicAuth() # Test the call req = falcon.Request( create_environ(headers={'Authorization': 'basic Yjpi'})) resp = falcon.Response() self.assertRaises( falcon.HTTPForbidden, http_basic_auth.authenticate, req, resp)
def authenticate(self, req, resp): """ Implements the authentication logic. :param req: Request instance that will be passed through. :type req: falcon.Request :param resp: Response instance that will be passed through. :type resp: falcon.Response :raises: falcon.HTTPForbidden """ token = self._decode_bearer_auth(req) if token is not None: self.logger.debug('Token found: {0}'.format(token)) try: # NOTE: We are assuming that if the user has access to # the resource they should be granted access to commissaire endpoint = self._kubernetes.base_uri + self.resource_check self.logger.debug('Checking against {0}.'.format(endpoint)) resp = requests.get( endpoint, headers={'Authentication': 'Bearer ' + token}) self.logger.debug('Kubernetes response: {0}'.format( resp.json())) # If we get a 200 then the user is valid. Anything else is # a failure if resp.status_code == 200: self.logger.info( 'Accepted Kubernetes token for {0}'.format( req.remote_addr)) return self.logger.debug('Rejecting Kubernetes token for {0}'.format( req.remote_addr)) except Exception as error: self.logger.warn( 'Encountered {0} while attempting to ' 'authenticate. {1}'.format(type(error), error)) raise error # Forbid by default raise falcon.HTTPForbidden('Forbidden', 'Forbidden')
def process_request(self, req, resp): """ Falcon hook to inject authentication before the requests is processed. :param req: Request instance that will be passed through. :type req: falcon.Request :param resp: Response instance that will be passed through. :type resp: falcon.Response :raises: falcon.HTTPForbidden """ self.authenticate(req, resp)
def authenticate(self, req, resp): """ Method should be overriden with specific a specific authentication call for an implementation. :param req: Request instance that will be passed through. :type req: falcon.Request :param resp: Response instance that will be passed through. :type resp: falcon.Response :raises: falcon.HTTPForbidden """ raise falcon.HTTPForbidden('Forbidden', 'Forbidden')
def do_log_history(req, resp): for item in advance_path: path = item.keys()[0] if not re.search(path, req.path): continue else: user_group = req.get_header('LOGIN-USER') groups = item.values()[0] if user_group not in groups: raise falcon.HTTPForbidden( title='You not allow access it', description='Please connect the manager') request_id = req.get_header(name='REQUEST-ID') if request_id is None: return request_id = request_id[-20:] paths = filter(lambda x: x != '', req.path.split('/')) if len(paths) >= 2: ctrl_name = paths[0] func_name = paths[1] elif len(paths) == 1: func_name = paths[0] create_timestamp = datetime.now() OperatorLog.insert( request=request_id, controller=ctrl_name, exec_path=req.get_header(name='EXEC-PATH'), func=func_name, login_gid=req.get_header(name='LOGIN-GID'), login_uid=req.get_header(name='LOGIN-UID'), login_user=req.get_header(name='LOGIN-USER'), post_data=json.dumps(req._params), server_ip=req.get_header(name='SERVER-IP'), create_timestamp=create_timestamp).execute() #????