Python falcon 模块,HTTPForbidden() 实例源码

我们从Python开源项目中,提取了以下27个代码示例,用于说明如何使用falcon.HTTPForbidden()

项目:deckhand    作者:att-comdev    | 项目源码 | 文件源码
def authorize(action):
    """Verifies whether a policy action can be performed given the credentials
    found in the falcon request context.

    :param action: The policy action to enforce.
    :returns: ``True`` if policy enforcement succeeded, else ``False``.
    :raises: falcon.HTTPForbidden if policy enforcement failed or if the policy
        action isn't registered under ``deckhand.policies``.
    """
    def decorator(func):
        @functools.wraps(func)
        def handler(*args, **kwargs):
            # args[1] is always the falcon Request object.
            context = args[1].context
            _do_enforce_rbac(action, context)
            return func(*args, **kwargs)
        return handler

    return decorator
项目:drydock    作者:att-comdev    | 项目源码 | 文件源码
def check_auth(ba_ctx, req):
        """Check request authentication based on boot action context.

        Raise proper Falcon exception if authentication fails, otherwise
        silently return

        :param ba_ctx: Boot Action context from database
        :param req: The falcon request object of the API call
        """
        identity_key = req.get_header('X-Bootaction-Key', default='')

        if identity_key == '':
            raise falcon.HTTPUnauthorized(
                title='Unauthorized',
                description='No X-Bootaction-Key',
                challenges=['Bootaction-Key'])

        if ba_ctx['identity_key'] != bytes.fromhex(identity_key):
            logger.warn(
                "Forbidding boot action access - node: %s, identity_key: %s, req header: %s"
                % (ba_ctx['node_name'], str(ba_ctx['identity_key']),
                   str(bytes.fromhex(identity_key))))
            raise falcon.HTTPForbidden(
                title='Unauthorized', description='Invalid X-Bootaction-Key')
项目:iris    作者:linkedin    | 项目源码 | 文件源码
def guarded_session():
    '''
    Context manager that will automatically close session on exceptions
    '''
    try:
        session = Session()
        yield session
    except IrisValidationException as e:
        session.close()
        raise HTTPBadRequest('Validation error', str(e))
    except (HTTPForbidden, HTTPUnauthorized, HTTPNotFound, HTTPBadRequest):
        session.close()
        raise
    except Exception:
        session.close()
        logger.exception('SERVER ERROR')
        raise
项目:plex-watched-sync    作者:fiLLLip    | 项目源码 | 文件源码
def on_post(self, req, resp):
        """Check with plex web api the user id and server to have access"""
        if 'X-TOKEN' not in req.headers:
            raise falcon.HTTPForbidden('Permission Denied', 'Missing header X-token')

        token = req.headers['X-TOKEN']
        result = plex_get_request('https://plex.tv/users/account.json', token)

        if result.status_code == 401 or result.status_code == 422:
            raise falcon.HTTPForbidden('Permission Denied', 'Provided token is not valid')

        account_id = result.json()['user']['id']
        result = plex_get_request('https://plex.tv/pms/servers', token)
        media_container_element = ElementTree.fromstring(result.content)
        servers = set()
        for server_element in media_container_element:
            servers.add(server_element.attrib['machineIdentifier'])

        # The following action is idempotent, so we can run it without worrying
        for server_id in servers:
            server.Actions.add_account(server_id, account_id)
            account.Actions.add_server(account_id, server_id)

        resp.status = falcon.HTTP_200
项目:iris-relay    作者:linkedin    | 项目源码 | 文件源码
def on_get(self, req, resp):
        token = req.get_param('token', True)
        data = {}
        for key in self.data_keys:
            data[key] = req.get_param(key, True)

        if not self.validate_token(token, data):
            raise falcon.HTTPForbidden('Invalid token for these given values', '')

        endpoint = self.config['iris']['hook']['gmail_one_click']

        try:
            result = self.iclient.post(endpoint, data)
        except MaxRetryError:
            logger.exception('Hitting iris-api failed for gmail oneclick')
        else:
            if result.status == 204:
                resp.status = falcon.HTTP_204
                return
            else:
                logger.error('Unexpected status code from api %s for gmail oneclick', result.status)

        raise falcon.HTTPInternalServerError('Internal Server Error', 'Invalid response from API')
项目:commissaire-mvp    作者:projectatomic    | 项目源码 | 文件源码
def test_authenticate_with_invalid_password(self):
        """
        Verify authenticate denies with a proper JSON file, Authorization header, and the wrong password.
        """
        with mock.patch('cherrypy.engine.publish') as _publish:
            return_value = mock.MagicMock(etcd.EtcdResult)
            with open(self.user_config, 'r') as users_file:
                return_value.value = users_file.read()

            manager = mock.MagicMock(StoreHandlerManager)
            _publish.return_value = [manager]

            manager.get.return_value = return_value

            # Reload with the data from the mock'd Etcd
            http_basic_auth = httpbasicauth.HTTPBasicAuth()

            req = falcon.Request(
                create_environ(headers={'Authorization': 'basic YTpiCg=='}))
            resp = falcon.Response()
            self.assertRaises(
                falcon.HTTPForbidden,
                http_basic_auth.authenticate,
                req, resp)
项目:commissaire-mvp    作者:projectatomic    | 项目源码 | 文件源码
def authenticate(self, req, resp):
        """
        Implements the authentication logic.

        :param req: Request instance that will be passed through.
        :type req: falcon.Request
        :param resp: Response instance that will be passed through.
        :type resp: falcon.Response
        :raises: falcon.HTTPForbidden
        """
        cert = req.env.get(SSL_CLIENT_VERIFY, {})
        if cert:
            for obj in cert.get('subject', ()):
                for key, value in obj:
                    if key == 'commonName' and \
                            (not self.cn or value == self.cn):
                        return

        # Forbid by default
        raise falcon.HTTPForbidden('Forbidden', 'Forbidden')
项目:commissaire-mvp    作者:projectatomic    | 项目源码 | 文件源码
def authenticate(self, req, resp):
        """
        Implements the authentication logic.

        :param req: Request instance that will be passed through.
        :type req: falcon.Request
        :param resp: Response instance that will be passed through.
        :type resp: falcon.Response
        :raises: falcon.HTTPForbidden
        """
        user, passwd = self._decode_basic_auth(req)
        if user is not None and passwd is not None:
            if user in self._data.keys():
                self.logger.debug('User {0} found in datastore.'.format(user))
                if self.check_authentication(user, passwd):
                    return  # Authentication is good

        # Forbid by default
        raise falcon.HTTPForbidden('Forbidden', 'Forbidden')
项目:deckhand    作者:att-comdev    | 项目源码 | 文件源码
def test_enforce_disallowed_action(self):
        action = "deckhand:list_cleartext_documents"
        error_re = "Policy doesn't allow %s to be performed." % action
        e = self.assertRaises(
            falcon.HTTPForbidden, self._enforce_policy, action)
        self.assertRegexpMatches(error_re, e.description)
项目:deckhand    作者:att-comdev    | 项目源码 | 文件源码
def test_enforce_nonexistent_action(self):
        action = "example:undefined"
        error_re = "Policy %s has not been registered" % action
        e = self.assertRaises(
            falcon.HTTPForbidden, self._enforce_policy, action)
        self.assertRegexpMatches(error_re, e.description)
项目:deckhand    作者:att-comdev    | 项目源码 | 文件源码
def test_falcon_exception_formatting(self):
        """Verify formatting for an exception class that inherits from
        :class:`falcon.HTTPError`.
        """
        expected_msg = (
            'deckhand:create_cleartext_documents is disallowed by policy')

        with mock.patch.object(
                policy, '_do_enforce_rbac',
                spec_set=policy._do_enforce_rbac) as m_enforce_rbac:
            m_enforce_rbac.side_effect = falcon.HTTPForbidden(
                description=expected_msg)
            resp = self.app.simulate_put(
                '/api/v1.0/buckets/test/documents',
                headers={'Content-Type': 'application/x-yaml'}, body=None)

        expected = {
            'status': 'Failure',
            'kind': 'status',
            'code': '403 Forbidden',
            'apiVersion': 'v1.0',
            'reason': '403 Forbidden',
            'retry': False,
            'details': {
                'errorType': 'HTTPForbidden',
                'errorCount': 1,
                'messageList': [
                    {
                        'message': expected_msg,
                        'error': True
                    }
                ]
            },
            'message': expected_msg,
            'metadata': {}
        }
        body = yaml.safe_load(resp.text)

        self.assertEqual(403, resp.status_code)
        self.assertEqual(expected, body)
项目:deckhand    作者:att-comdev    | 项目源码 | 文件源码
def _do_enforce_rbac(action, context, do_raise=True):
    init()

    credentials = context.to_policy_values()
    target = {'project_id': context.project_id,
              'user_id': context.user_id}
    exc = errors.PolicyNotAuthorized

    try:
        # `oslo.policy` supports both enforce and authorize. authorize is
        # stricter because it'll raise an exception if the policy action is
        # not found in the list of registered rules. This means that attempting
        # to enforce anything not found in ``deckhand.policies`` will error out
        # with a 'Policy not registered' message.
        return _ENFORCER.authorize(
            action, target, context.to_dict(), do_raise=do_raise,
            exc=exc, action=action)
    except policy.PolicyNotRegistered as e:
        LOG.exception('Policy not registered.')
        raise falcon.HTTPForbidden(description=six.text_type(e))
    except Exception as e:
        LOG.debug(
            'Policy check for %(action)s failed with credentials '
            '%(credentials)s',
            {'action': action, 'credentials': credentials})
        raise falcon.HTTPForbidden(description=six.text_type(e))
项目:clark    作者:discovery-corps    | 项目源码 | 文件源码
def __init__(self, title=None, description=None, **kwargs):
        super(HTTPForbidden, self).__init__(title or self.title,
                                            description or self.description,
                                            **kwargs)
项目:deb-python-falcon    作者:openstack    | 项目源码 | 文件源码
def on_post(self, req, resp):
        raise falcon.HTTPForbidden(falcon.HTTP_403, 'Setec Astronomy')
项目:deb-python-falcon    作者:openstack    | 项目源码 | 文件源码
def on_post(self, req, resp):
        raise falcon.HTTPForbidden(
            'Request denied',
            'You do not have write permissions for this queue.',
            href='http://example.com/api/rbac')
项目:kapsel    作者:conda    | 项目源码 | 文件源码
def process_request(self, req, resp):
        # req.host has the port stripped from what the browser
        # sent us, even when it isn't 80, which is probably a bug
        # in Falcon. We deal with that in __init__ by removing
        # ports from self.hosts.
        if req.host not in self.hosts:
            print("Attempted request with Host header '%s' denied" % req.host)
            raise falcon.HTTPForbidden("Bad Host header", "Cannot connect via the provided hostname")


# the gunicorn application
项目:iris    作者:linkedin    | 项目源码 | 文件源码
def on_get(self, req, resp, app_name):
        if not req.context['username']:
            raise HTTPUnauthorized('You must be a logged in user to view this app\'s key')

        with db.guarded_session() as session:
            if not req.context['is_admin']:
                has_permission = session.execute(
                    '''SELECT 1
                       FROM `application_owner`
                       JOIN `target` on `target`.`id` = `application_owner`.`user_id`
                       JOIN `application` on `application`.`id` = `application_owner`.`application_id`
                       WHERE `target`.`name` = :username
                       AND `application`.`name` = :app_name''',
                    {'app_name': app_name, 'username': req.context['username']}).scalar()
                if not has_permission:
                    raise HTTPForbidden('You don\'t have permissions to view this app\'s key.')

            key = session.execute(
                'SELECT `key` FROM `application` WHERE `name` = :app_name LIMIT 1',
                {'app_name': app_name}).scalar()

            if not key:
                raise HTTPBadRequest('Key for this application not found')

            session.close()

        resp.body = ujson.dumps({'key': key})
项目:plex-watched-sync    作者:fiLLLip    | 项目源码 | 文件源码
def validate_account_server_access(req, resp, resource, params):
        if 'X-SERVER-ID' not in req.headers:
            raise falcon.HTTPForbidden('Permission Denied', 'Missing header X-server-id')
        server_id = req.headers['X-SERVER-ID']
        account_id = params['account_id']
        servers = Actions.get_servers(account_id)
        try:
            b = servers.index(server_id)
        except ValueError:
            raise falcon.HTTPForbidden('Permission Denied',
                                       'Server ' + server_id + ' has no access to account id ' + account_id)
项目:anaconda-project    作者:Anaconda-Platform    | 项目源码 | 文件源码
def process_request(self, req, resp):
        # req.host has the port stripped from what the browser
        # sent us, even when it isn't 80, which is probably a bug
        # in Falcon. We deal with that in __init__ by removing
        # ports from self.hosts.
        if req.host not in self.hosts:
            print("Attempted request with Host header '%s' denied" % req.host)
            raise falcon.HTTPForbidden("Bad Host header", "Cannot connect via the provided hostname")


# the gunicorn application
项目:commissaire-mvp    作者:projectatomic    | 项目源码 | 文件源码
def test_authenticator_process_request(self):
        """
        Verify Authenticator's process_request calls authenticate.
        """
        self.assertRaises(
            falcon.HTTPForbidden,
            self.authenticator.process_request,
            falcon.Request(create_environ()),
            falcon.Response())
项目:commissaire-mvp    作者:projectatomic    | 项目源码 | 文件源码
def test_authenticate_with_invalid_user(self):
        """
        Verify authenticate denies with a proper JSON file, Authorization header, and no matching user.
        """
        self.http_basic_auth = httpbasicauth.HTTPBasicAuth(self.user_config)
        req = falcon.Request(
            create_environ(headers={'Authorization': 'basic Yjpi'}))
        resp = falcon.Response()
        self.assertRaises(
            falcon.HTTPForbidden,
            self.http_basic_auth.authenticate,
            req, resp)
项目:commissaire-mvp    作者:projectatomic    | 项目源码 | 文件源码
def test_authenticate_with_invalid_password(self):
        """
        Verify authenticate denies with a proper JSON file, Authorization header, and the wrong password.
        """
        self.http_basic_auth= httpbasicauth.HTTPBasicAuth(self.user_config)
        req = falcon.Request(
            create_environ(headers={'Authorization': 'basic YTpiCg=='}))
        resp = falcon.Response()
        self.assertRaises(
            falcon.HTTPForbidden,
            self.http_basic_auth.authenticate,
            req, resp)
项目:commissaire-mvp    作者:projectatomic    | 项目源码 | 文件源码
def test_authenticate_with_invalid_user(self):
        """
        Verify authenticate denies with a proper JSON in Etcd, Authorization header, and no matching user.
        """
        with mock.patch('cherrypy.engine.publish') as _publish:
            # Mock the return of the Etcd get result
            return_value = mock.MagicMock(etcd.EtcdResult)
            with open(self.user_config, 'r') as users_file:
                return_value.value = users_file.read()

            manager = mock.MagicMock(StoreHandlerManager)
            _publish.return_value = [manager]

            manager.get.return_value = return_value

            # Reload with the data from the mock'd Etcd
            http_basic_auth = httpbasicauth.HTTPBasicAuth()

            # Test the call
            req = falcon.Request(
                create_environ(headers={'Authorization': 'basic Yjpi'}))
            resp = falcon.Response()
            self.assertRaises(
                falcon.HTTPForbidden,
                http_basic_auth.authenticate,
                req, resp)
项目:commissaire-mvp    作者:projectatomic    | 项目源码 | 文件源码
def authenticate(self, req, resp):
        """
        Implements the authentication logic.

        :param req: Request instance that will be passed through.
        :type req: falcon.Request
        :param resp: Response instance that will be passed through.
        :type resp: falcon.Response
        :raises: falcon.HTTPForbidden
        """
        token = self._decode_bearer_auth(req)
        if token is not None:
            self.logger.debug('Token found: {0}'.format(token))
            try:
                # NOTE: We are assuming that if the user has access to
                # the resource they should be granted access to commissaire
                endpoint = self._kubernetes.base_uri + self.resource_check
                self.logger.debug('Checking against {0}.'.format(endpoint))
                resp = requests.get(
                    endpoint, headers={'Authentication': 'Bearer ' + token})
                self.logger.debug('Kubernetes response: {0}'.format(
                    resp.json()))
                # If we get a 200 then the user is valid. Anything else is
                # a failure
                if resp.status_code == 200:
                    self.logger.info(
                        'Accepted Kubernetes token for {0}'.format(
                            req.remote_addr))
                    return
                self.logger.debug('Rejecting Kubernetes token for {0}'.format(
                    req.remote_addr))
            except Exception as error:
                self.logger.warn(
                    'Encountered {0} while attempting to '
                    'authenticate. {1}'.format(type(error), error))
                raise error

        # Forbid by default
        raise falcon.HTTPForbidden('Forbidden', 'Forbidden')
项目:commissaire-mvp    作者:projectatomic    | 项目源码 | 文件源码
def process_request(self, req, resp):
        """
        Falcon hook to inject authentication before the requests is
        processed.

        :param req: Request instance that will be passed through.
        :type req: falcon.Request
        :param resp: Response instance that will be passed through.
        :type resp: falcon.Response
        :raises: falcon.HTTPForbidden
        """
        self.authenticate(req, resp)
项目:commissaire-mvp    作者:projectatomic    | 项目源码 | 文件源码
def authenticate(self, req, resp):
        """
        Method should be overriden with specific a specific authentication
        call for an implementation.

        :param req: Request instance that will be passed through.
        :type req: falcon.Request
        :param resp: Response instance that will be passed through.
        :type resp: falcon.Response
        :raises: falcon.HTTPForbidden
        """
        raise falcon.HTTPForbidden('Forbidden', 'Forbidden')
项目:ops    作者:xiaomatech    | 项目源码 | 文件源码
def do_log_history(req, resp):
    for item in advance_path:
        path = item.keys()[0]
        if not re.search(path, req.path):
            continue
        else:
            user_group = req.get_header('LOGIN-USER')
            groups = item.values()[0]
            if user_group not in groups:
                raise falcon.HTTPForbidden(
                    title='You not allow access it',
                    description='Please connect the manager')

    request_id = req.get_header(name='REQUEST-ID')
    if request_id is None:
        return
    request_id = request_id[-20:]
    paths = filter(lambda x: x != '', req.path.split('/'))
    if len(paths) >= 2:
        ctrl_name = paths[0]
        func_name = paths[1]
    elif len(paths) == 1:
        func_name = paths[0]
    create_timestamp = datetime.now()
    OperatorLog.insert(
        request=request_id,
        controller=ctrl_name,
        exec_path=req.get_header(name='EXEC-PATH'),
        func=func_name,
        login_gid=req.get_header(name='LOGIN-GID'),
        login_uid=req.get_header(name='LOGIN-UID'),
        login_user=req.get_header(name='LOGIN-USER'),
        post_data=json.dumps(req._params),
        server_ip=req.get_header(name='SERVER-IP'),
        create_timestamp=create_timestamp).execute()


#????