我们从Python开源项目中,提取了以下48个代码示例,用于说明如何使用werkzeug.exceptions.Forbidden()。
def test_permission_admin(app, empty_db_session): with app.test_request_context('/'): @permissions.admin_role.require(403) def test(): return 200 with pytest.raises(Forbidden): assert test() == 200 role_admin = Role('admin', '') account = UserAccount() account.id = 666 account.roles.append(role_admin) empty_db_session.add(role_admin) empty_db_session.add(account) empty_db_session.commit() login(account)
def toolbar(**kwargs): """ Get reports/tickets stats """ user = kwargs['user'] where = [Q()] if not AbusePermission.objects.filter(user=user.id).count(): raise Forbidden('You are not allowed to see any category') user_specific_where = _get_user_specific_where(user) user_specific_where = reduce(operator.or_, user_specific_where) where.append(user_specific_where) # Aggregate all filters where = reduce(operator.and_, where) response = _get_toolbar_count(where, user) return response
def test_sumbit_not_matching(self, g_f_s_mock, g_i_mock): Utils.start_contest() self._insert_data(token="token", task="poldo") self._insert_data(token="token2", task="poldo") backup = Logger.LOG_LEVEL Logger.LOG_LEVEL = 9001 self.handler.generate_input(token='token', task='poldo', _ip='1.1.1.1') g_i_mock.return_value = ("inputid2", "/path") self.handler.generate_input(token='token2', task='poldo', _ip='1.1.1.1') Database.c.execute("INSERT INTO outputs (id, input, path, size, result) " "VALUES ('outputid', 'inputid', '/output', 42, '{}')") Database.c.execute("INSERT INTO sources (id, input, path, size) " "VALUES ('sourceid', 'inputid2', '/source', 42)") with self.assertRaises(Forbidden) as ex: self.handler.submit(output_id='outputid', source_id='sourceid', _ip='1.1.1.1') self.assertIn("The provided pair of source-output is invalid", ex.exception.response.data.decode()) Logger.LOG_LEVEL = backup
def validate_id(param, name, getter, required=True): """ Ensure that in the request "param" is present and it's valid. If it is present `getter` is called with the param and the return values is sent to the handler as "name". If the getter returns None an error is thrown It provides the handler with a `name` argument with the return value of getter. """ def closure(handler): def handle(*args, **kwargs): if param in kwargs: thing = getter(kwargs[param]) if thing is None: BaseHandler.raise_exc(Forbidden, "FORBIDDEN", "No such " + name) del kwargs[param] else: thing = None kwargs[name] = thing return handler(*args, **kwargs) HandlerParams.initialize_handler_params(handle, handler) HandlerParams.add_handler_param(handle, param, str, required=required) # the case when the name of the model corresponds with the param if name != param: HandlerParams.remove_handler_param(handle, name) return handle return closure
def _get_user_from_sso(jwt_token, token): try: data = jwt.decode(jwt_token, Config.jwt_secret, algorithms=['HS256']) username = data["username"] name = data.get("firstName", username) surname = data.get("lastName", "") if username != token: BaseHandler.raise_exc(Forbidden, "FORBIDDEN", "Use the same username from the SSO") if Database.get_user(username) is None: Database.begin() Database.add_user(username, name, surname, sso_user=True, autocommit=False) for task in Database.get_tasks(): Database.add_user_task(username, task["name"], autocommit=False) Database.commit() Logger.info("NEW_USER", "User %s created from SSO" % username) return Database.get_user(username) except jwt.exceptions.DecodeError: BaseHandler.raise_exc(Forbidden, "FORBIDDEN", "Please login at %s" % Config.sso_url)
def generate_input(self, task, user): """ POST /generate_input """ token = user["token"] if Database.get_user_task(token, task["name"])["current_attempt"] is not None: self.raise_exc(Forbidden, "FORBIDDEN", "You already have a ready input!") attempt = Database.get_next_attempt(token, task["name"]) id, path = ContestManager.get_input(task["name"], attempt) size = StorageManager.get_file_size(path) Database.begin() try: Database.add_input(id, token, task["name"], attempt, path, size, autocommit=False) Database.set_user_attempt(token, task["name"], attempt, autocommit=False) Database.commit() except: Database.rollback() raise Logger.info("CONTEST", "Generated input %s for user %s on task %s" % (id, token, task["name"])) return BaseHandler.format_dates(Database.get_input(id=id))
def _checkout_form_save(self, mode, checkout, all_values): Partner = request.env['res.partner'] if mode[0] == 'new': partner_id = Partner.sudo().create(checkout) elif mode[0] == 'edit': partner_id = int(all_values.get('partner_id', 0)) if partner_id: # double check order = request.website.sale_get_order() shippings = Partner.sudo().search( [("id", "child_of", order.partner_id.commercial_partner_id.ids)]) if partner_id not in shippings.mapped('id') and \ partner_id != order.partner_id.id: return Forbidden() Partner.browse(partner_id).sudo().write(checkout) return partner_id
def test_some_roles_required(self): from pillar.api.utils.authorization import require_login called = [False] @require_login(require_roles={'admin'}) def call_me(): called[0] = True with self.app.test_request_context(): self.login_api_as(ObjectId(24 * 'a'), ['succubus']) self.assertRaises(Forbidden, call_me) self.assertFalse(called[0]) with self.app.test_request_context(): self.login_api_as(ObjectId(24 * 'a'), ['admin']) call_me() self.assertTrue(called[0])
def test_cap_required(self): from pillar.api.utils.authorization import require_login called = [False] @require_login(require_cap='subscriber') def call_me(): called[0] = True with self.app.test_request_context(): self.login_api_as(ObjectId(24 * 'a'), ['succubus']) self.assertRaises(Forbidden, call_me) self.assertFalse(called[0]) with self.app.test_request_context(): self.login_api_as(ObjectId(24 * 'a'), ['demo']) call_me() self.assertTrue(called[0])
def assert_is_valid_patch(node_id, patch): """Raises an exception when the patch isn't valid.""" try: op = patch['op'] except KeyError: raise wz_exceptions.BadRequest("PATCH should have a key 'op' indicating the operation.") if op not in VALID_COMMENT_OPERATIONS: raise wz_exceptions.BadRequest('Operation should be one of %s', ', '.join(VALID_COMMENT_OPERATIONS)) if op not in COMMENT_VOTING_OPS: # We can't check here, we need the node owner for that. return # See whether the user is allowed to patch if authorization.user_matches_roles(current_app.config['ROLES_FOR_COMMENT_VOTING']): log.debug('User is allowed to upvote/downvote comment') return # Access denied. log.info('User %s wants to PATCH comment node %s, but is not allowed.', authentication.current_user_id(), node_id) raise wz_exceptions.Forbidden()
def user_has(ability, get_user=import_user): """ Takes an ability (a string name of either a role or an ability) and returns the function if the user has that ability """ def wrapper(func): @wraps(func) def inner(*args, **kwargs): desired_ability = Ability.query.filter_by( name=ability).first() user_abilities = [] current_identity = get_user() for group in current_identity._groups: user_abilities += group.abilities if desired_ability.id in user_abilities or current_identity.admin: return func(*args, **kwargs) else: raise Forbidden("You do not have access") return inner return wrapper
def get_validated_user_info(): """Returns a valid (user email, user info), or raises Unauthorized or Forbidden.""" user_email = get_oauth_id() # Allow clients to simulate an unauthentiated request (for testing) # becaues we haven't found another way to create an unauthenticated request # when using dev_appserver. When client tests are checking to ensure that an # unauthenticated requests gets rejected, they helpfully add this header. # The `application_id` check ensures this feature only works in dev_appserver. if request.headers.get('unauthenticated') and app_identity.get_application_id() == 'None': user_email = None if user_email is None: raise Unauthorized('No OAuth user found.') user_info = lookup_user_info(user_email) if user_info: enforce_ip_whitelisted(request.remote_addr, get_whitelisted_ips(user_info)) enforce_appid_whitelisted(request.headers.get('X-Appengine-Inbound-Appid'), get_whitelisted_appids(user_info)) logging.info('User %r ALLOWED', user_email) return (user_email, user_info) logging.info('User %r NOT ALLOWED' % user_email) raise Forbidden()
def test_update_withdrawn_status_fails(self): p = Participant(withdrawalStatus=WithdrawalStatus.NO_USE) time = datetime.datetime(2016, 1, 1) with random_ids([1, 2]): with FakeClock(time): self.dao.insert(p) expected_participant = self._participant_with_defaults( participantId=1, version=1, biobankId=2, lastModified=time, signUpTime=time, withdrawalStatus=WithdrawalStatus.NO_USE) self.assertEquals(expected_participant.asdict(), p.asdict()) p2 = self.dao.get(1) self.assertEquals(p.asdict(), p2.asdict()) p.version = 1 p.withdrawalStatus = WithdrawalStatus.NOT_WITHDRAWN with self.assertRaises(Forbidden): self.dao.update(p)
def _ensure_task_was_requested(self, task, guard): if not guard.check_task_stamped(task, get_user_id_or_ip()): raise Forbidden('You must request a task first!')
def _create_json_response(self, query_result, oid): if len(query_result) == 1 and query_result[0] is None: raise abort(404) items = [] for result in query_result: # This is for n_favs orderby case if not isinstance(result, DomainObject): result = result[0] try: if (result.__class__ != self.__class__): (item, headline, rank) = result else: item = result headline = None rank = None datum = self._create_dict_from_model(item) if headline: datum['headline'] = headline if rank: datum['rank'] = rank ensure_authorized_to('read', item) items.append(datum) except (Forbidden, Unauthorized): # Remove last added item, as it is 401 or 403 if len(items) > 0: items.pop() except Exception: # pragma: no cover raise if oid is not None: ensure_authorized_to('read', query_result[0]) items = items[0] return json.dumps(items)
def _forbidden_attributes(self, data): for key in data.keys(): if key in self.reserved_keys: if key == 'published': raise Forbidden('You cannot publish a project via the API') raise BadRequest("Reserved keys in payload")
def test_non_owner_authenticated_user_create_given_helpingmaterial(self): """Test authenticated user cannot create a given helpingmaterial if is not the project owner""" project = ProjectFactory.create() helpingmaterial = HelpingMaterialFactory.build(project_id=project.id) assert self.mock_authenticated.id != project.owner_id assert_raises(Forbidden, ensure_authorized_to, 'create', helpingmaterial)
def test_non_owner_authenticated_user_read_given_helpingmaterial_draft_project(self): """Test authenticated user cannot read a given helpingmaterial of a draft project if is not the project owner""" project = ProjectFactory.create(published=False) helpingmaterial = HelpingMaterialFactory.create(project_id=project.id) assert self.mock_authenticated.id != project.owner.id assert_raises(Forbidden, ensure_authorized_to, 'read', helpingmaterial)
def test_non_owner_authenticated_user_read_helpingmaterials_for_given_draft_project(self): """Test authenticated user cannot read helpingmaterials of a given project if is a draft and is not the project owner""" project = ProjectFactory.create(published=False) assert self.mock_authenticated.id != project.owner.id assert_raises(Forbidden, ensure_authorized_to, 'read', HelpingMaterial, project_id=project.id)
def test_non_owner_authenticated_user_delete_helpingmaterial(self): """Test authenticated user cannot delete a helpingmaterial if is not the post owner and is not admin""" owner = UserFactory.create(id=5) project = ProjectFactory.create(owner=owner, published=True) helpingmaterial = HelpingMaterialFactory.create(project_id=project.id) assert self.mock_authenticated.id != owner.id assert not self.mock_authenticated.admin assert_raises(Forbidden, ensure_authorized_to, 'delete', helpingmaterial)
def test_authenticated_user_cannot_crud_auditlog(self): """Test authenticated users cannot crud auditlogs""" log = Auditlog() assert_raises(Forbidden, ensure_authorized_to, 'create', log) assert_raises(Forbidden, ensure_authorized_to, 'update', log) assert_raises(Forbidden, ensure_authorized_to, 'delete', log)
def test_admin_user_cannot_crud_auditlog(self): """Test admin users cannot crud auditlogs""" log = Auditlog() assert_raises(Forbidden, ensure_authorized_to, 'create', log) assert_raises(Forbidden, ensure_authorized_to, 'update', log) assert_raises(Forbidden, ensure_authorized_to, 'delete', log)
def test_anonymous_user_can_create_taskrun_for_draft_project(self): """Test anonymous users can create a taskrun for a project that is a draft""" project = ProjectFactory.create(published=False) task = TaskFactory.create(project=project) taskrun = AnonymousTaskRunFactory.build(task_id=task.id, project_id=project.id) assert_not_raises(Forbidden, ensure_authorized_to, 'create', taskrun)
def test_authenticated_user_create_repeated_taskrun(self): """Test authenticated user cannot create two taskruns for the same task""" task = TaskFactory.create() taskrun1 = TaskRunFactory.create(task=task) taskrun2 = TaskRunFactory.build(task=task, user=taskrun1.user) assert self.mock_authenticated.id == taskrun1.user.id assert_raises(Forbidden, ensure_authorized_to, 'create', taskrun2)
def test_authenticated_user_can_create_taskrun_for_draft_project(self): """Test authenticated users can create a taskrun for a project that is a draft""" project = ProjectFactory.create(published=False) task = TaskFactory.create(project=project) taskrun = TaskRunFactory.build(task_id=task.id, project_id=project.id) assert_not_raises(Forbidden, ensure_authorized_to, 'create', taskrun)
def test_authenticated_user_update_anonymous_taskrun(self): """Test authenticated users cannot update an anonymously posted taskrun""" anonymous_taskrun = AnonymousTaskRunFactory.create() assert_raises(Forbidden, ensure_authorized_to, 'update', anonymous_taskrun)
def test_admin_update_anonymous_taskrun(self): """Test admins cannot update anonymously posted taskruns""" anonymous_taskrun = AnonymousTaskRunFactory.create() assert_raises(Forbidden, ensure_authorized_to, 'update', anonymous_taskrun)
def test_admin_update_user_taskrun(self): """Test admins cannot update taskruns posted by authenticated users""" user_taskrun = TaskRunFactory.create() assert self.mock_admin.id != user_taskrun.user.id assert_raises(Forbidden, ensure_authorized_to, 'update', user_taskrun)
def test_authenticated_user_delete_anonymous_taskrun(self): """Test authenticated users cannot delete an anonymously posted taskrun""" anonymous_taskrun = AnonymousTaskRunFactory.create() assert_raises(Forbidden, ensure_authorized_to, 'delete', anonymous_taskrun)
def test_authenticated_user_delete_other_users_taskrun(self): """Test authenticated user cannot delete a taskrun if it was created by another authenticated user, but can delete his own taskruns""" own_taskrun = TaskRunFactory.create() other_users_taskrun = TaskRunFactory.create() assert self.mock_authenticated.id == own_taskrun.user.id assert self.mock_authenticated.id != other_users_taskrun.user.id assert_not_raises(Exception, ensure_authorized_to, 'delete', own_taskrun) assert_raises(Forbidden, ensure_authorized_to, 'delete', other_users_taskrun)
def test_authenticated_user_cannot_save_results(self): """Test authenticated users cannot save results of a specific project""" result = Result() assert_raises(Forbidden, ensure_authorized_to, 'create', result)
def test_admin_user_cannot_save_results(self): """Test admin users cannot save results of a specific project""" result = Result() assert_raises(Forbidden, ensure_authorized_to, 'create', result)
def test_admin_user_cannot_delete_results(self): """Test admin users cannot delete results of a specific project""" result = Result() assert_raises(Forbidden, ensure_authorized_to, 'delete', result)
def test_auth_user_cannot_update_results(self): """Test auth users but not owner cannot update results of a specific project""" result = self.create_result() assert_raises(Forbidden, ensure_authorized_to, 'update', result)
def test_authenticated_user_cannot_crud_webhook(self): """Test authenticated users cannot crud webhooks""" webhook = Webhook() assert_raises(Forbidden, ensure_authorized_to, 'create', webhook) assert_raises(Forbidden, ensure_authorized_to, 'update', webhook) assert_raises(Forbidden, ensure_authorized_to, 'delete', webhook)
def test_admin_user_cannot_crud_webhook(self): """Test admin users cannot crud webhooks""" webhook = Webhook() assert_raises(Forbidden, ensure_authorized_to, 'create', webhook) assert_raises(Forbidden, ensure_authorized_to, 'update', webhook) assert_raises(Forbidden, ensure_authorized_to, 'delete', webhook)
def test_authenticated_user_can_crud(self): """Test authenticated users cannot crud categories""" category = CategoryFactory.build() assert_raises(Forbidden, ensure_authorized_to, 'create', category) assert_not_raises(Exception, ensure_authorized_to, 'read', category) assert_not_raises(Exception, ensure_authorized_to, 'read', Category) assert_raises(Forbidden, ensure_authorized_to, 'update', category) assert_raises(Forbidden, ensure_authorized_to, 'delete', category)
def test_authenticated_user_cannot_create(self): """Test authenticated users cannot create users""" assert_raises(Forbidden, ensure_authorized_to, 'create', User)
def test_authenticated_user_cannot_update_another_user(self): """Test authenticated users cannot update another user than themselves""" user = UserFactory.create() assert user.id != self.mock_authenticated.id, user.id assert_raises(Forbidden, ensure_authorized_to, 'update', user)
def test_authenticated_user_cannot_delete_another_user(self): """Test authenticated users cannot delete another user than themselves""" user = UserFactory.create() assert user.id != self.mock_authenticated.id, user.id assert_raises(Forbidden, ensure_authorized_to, 'delete', user)
def test_anonymous_user_cannot_crud(self): """Test anonymous users cannot crud tasks""" user = UserFactory.create() project = ProjectFactory.create(owner=user) task = TaskFactory.create(project=project) assert_raises(Unauthorized, ensure_authorized_to, 'create', task) assert_not_raises(Forbidden, ensure_authorized_to, 'read', task) assert_not_raises(Forbidden, ensure_authorized_to, 'read', Task) assert_raises(Unauthorized, ensure_authorized_to, 'update', task) assert_raises(Unauthorized, ensure_authorized_to, 'delete', task)
def test_project_owner_can_crud(self): """Test project owner can crud tasks""" user = UserFactory.create() owner = UserFactory.create() project = ProjectFactory.create(owner=owner) task = TaskFactory.create(project=project) assert self.mock_authenticated.id == owner.id assert_not_raises(Forbidden, ensure_authorized_to, 'create', task) assert_not_raises(Forbidden, ensure_authorized_to, 'read', task) assert_not_raises(Forbidden, ensure_authorized_to, 'read', Task) assert_not_raises(Forbidden, ensure_authorized_to, 'update', task) assert_not_raises(Forbidden, ensure_authorized_to, 'delete', task)
def test_admin_can_crud(self): """Test admin user can crud tasks""" admin = UserFactory.create() owner = UserFactory.create() project = ProjectFactory.create(owner=owner) task = TaskFactory.create(project=project) assert self.mock_admin.id != owner.id assert_not_raises(Forbidden, ensure_authorized_to, 'create', task) assert_not_raises(Forbidden, ensure_authorized_to, 'read', task) assert_not_raises(Forbidden, ensure_authorized_to, 'read', Task) assert_not_raises(Forbidden, ensure_authorized_to, 'update', task) assert_not_raises(Forbidden, ensure_authorized_to, 'delete', task)
def test_authenticated_user_delete(self): """Test authenticated user is not allowed to delete an oauth token""" for token in self.auth_providers: assert_raises(Forbidden, ensure_authorized_to, 'delete', 'token', token=token)
def test_authenticated_user_create(self): """Test authenticated user is not allowed to create an oauth token""" for token in self.auth_providers: assert_raises(Forbidden, ensure_authorized_to, 'create', 'token', token=token)
def test_authenticated_user_update(self): """Test authenticated user is not allowed to update an oauth token""" for token in self.auth_providers: assert_raises(Forbidden, ensure_authorized_to, 'update', 'token', token=token)
def test_a_project_cannot_be_created_as_published(self): """Test a project cannot be created directly as published""" published_project = ProjectFactory.build(published=True) assert_raises(Forbidden, ensure_authorized_to, 'create', published_project)