我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用rest_framework.exceptions.PermissionDenied()。
def authenticate(self, request): if request.META['REQUEST_METHOD'] != 'POST': return None print 'Request data: {}'.format(request.data) if 'HTTP_TOKEN' not in request.META: raise exceptions.ParseError("Registration Token not present in the request.") elif 'sampling_feature' not in request.data: raise exceptions.ParseError("Sampling feature UUID not present in the request.") # Get auth_token(uuid) from header, get registration object with auth_token, get the user from that registration, verify sampling_feature uuid is registered by this user, be happy. token = request.META['HTTP_TOKEN'] registration = SiteRegistration.objects.filter(registration_token=token).first() if not registration: raise exceptions.PermissionDenied('Invalid Security Token') # request needs to have the sampling feature uuid of the registration - if str(registration.sampling_feature.sampling_feature_uuid) != request.data['sampling_feature']: raise exceptions.AuthenticationFailed( 'Site Identifier is not associated with this Token') # or other related exception return None
def test_publish_release(self, mock_client): self.client = DockerClient() self.client.publish_release('ozzy/embryo:git-f2a8020', {'POWERED_BY': 'Deis'}, 'ozzy/embryo:v4', True) self.assertTrue(self.client.client.pull.called) self.assertTrue(self.client.client.tag.called) self.assertTrue(self.client.client.build.called) self.assertTrue(self.client.client.push.called) # Test that a registry host prefix is replaced with deis-registry for the target self.client.publish_release('ozzy/embryo:git-f2a8020', {'POWERED_BY': 'Deis'}, 'quay.io/ozzy/embryo:v4', True) docker_push = self.client.client.push docker_push.assert_called_with( 'localhost:5000/ozzy/embryo', tag='v4', insecure_registry=True, stream=True) # Test that blacklisted image names can't be published with self.assertRaises(PermissionDenied): self.client.publish_release( 'deis/controller:v1.11.1', {}, 'deis/controller:v1.11.1', True) with self.assertRaises(PermissionDenied): self.client.publish_release( 'localhost:5000/deis/controller:v1.11.1', {}, 'deis/controller:v1.11.1', True)
def test_build(self, mock_client): # test that self.client.build was called with proper arguments self.client = DockerClient() self.client.build('ozzy/embryo:git-f3a8020', {'POWERED_BY': 'Deis'}, 'ozzy/embryo', 'v4') docker_build = self.client.client.build self.assertTrue(docker_build.called) args = {"rm": True, "tag": u'localhost:5000/ozzy/embryo:v4', "stream": True} kwargs = docker_build.call_args[1] self.assertDictContainsSubset(args, kwargs) # test that the fileobj arg to "docker build" contains a correct Dockerfile f = kwargs['fileobj'] self.assertEqual(f.read(), "FROM ozzy/embryo:git-f3a8020\nENV POWERED_BY='Deis'") # Test that blacklisted image names can't be built with self.assertRaises(PermissionDenied): self.client.build('deis/controller:v1.11.1', {}, 'deis/controller', 'v1.11.1') with self.assertRaises(PermissionDenied): self.client.build( 'localhost:5000/deis/controller:v1.11.1', {}, 'deis/controller', 'v1.11.1')
def passwd(self, request, **kwargs): caller_obj = self.get_object() target_obj = self.get_object() if request.data.get('username'): # if you "accidentally" target yourself, that should be fine if caller_obj.username == request.data['username'] or caller_obj.is_superuser: target_obj = get_object_or_404(User, username=request.data['username']) else: raise PermissionDenied() if request.data.get('password') or not caller_obj.is_superuser: if not target_obj.check_password(request.data['password']): return Response({'detail': 'Current password does not match'}, status=status.HTTP_400_BAD_REQUEST) target_obj.set_password(request.data['new_password']) target_obj.save() return Response({'status': 'password set'})
def has_permission(self, request, view): """ If settings.REGISTRATION_MODE does not exist, such as during a test, return True Return `True` if permission is granted, `False` otherwise. """ try: if settings.REGISTRATION_MODE == 'disabled': raise exceptions.PermissionDenied('Registration is disabled') if settings.REGISTRATION_MODE == 'enabled': return True elif settings.REGISTRATION_MODE == 'admin_only': return request.user.is_superuser else: raise Exception("{} is not a valid registation mode" .format(settings.REGISTRATION_MODE)) except AttributeError: return True
def test_check_status_code(self): """ Test status codes and exceptions raised. """ # Step 1: Status code 200. self.authentication._check_status_code(200) # Step 2: Status code 401. with self.assertRaises(AuthenticationFailed): self.authentication._check_status_code(401) # Step 3: Status code 403. with self.assertRaises(PermissionDenied): self.authentication._check_status_code(403) # Step 4: Status code other than tested. with self.assertRaises(UnavailableException): self.authentication._check_status_code(500)
def post(self, request, *args, **kwargs): user = authenticate(userid=request.data['username'], password=request.data['password']) if user: is_active = user.is_active if is_active: token, _ = Token.objects.get_or_create(user=user) response = Response({"token": token.key, "user_pk": token.user_id, "created": token.created}, status=status.HTTP_200_OK) return response else: detail = "?? ??? ??????." raise PermissionDenied(detail=detail) else: detail = "???? ?? ? ????. username? password? ?? ??????." raise ValidationError(detail=detail)
def run(self, request, pk): """ Run a Report and create a new ReportResult, overwriting any previous result for the Report. """ # Check that the user has permission to run reports. if not request.user.has_perm('extras.add_reportresult'): raise PermissionDenied("This user does not have permission to run reports.") # Retrieve and run the Report. This will create a new ReportResult. report = self._retrieve_report(pk) report.run() serializer = serializers.ReportDetailSerializer(report) return Response(serializer.data) # # User activity #
def delete(self, request, group, note_id): if not request.user.is_authenticated(): raise PermissionDenied(detail="Key doesn't have permission to delete Note") try: note = Activity.objects.get( group=group, type=Activity.NOTE, user=request.user, id=note_id, ) except Activity.DoesNotExist: raise ResourceDoesNotExist note.delete() return Response(status=204)
def put(self, request, group, note_id): if not request.user.is_authenticated(): raise PermissionDenied(detail="Key doesn't have permission to edit Note") try: note = Activity.objects.get( group=group, type=Activity.NOTE, user=request.user, id=note_id, ) except Activity.DoesNotExist: raise ResourceDoesNotExist serializer = NoteSerializer(data=request.DATA) if serializer.is_valid(): # Would be nice to have a last_modified timestamp we could bump here note.data = dict(serializer.object) note.save() return Response(serialize(note, request.user), status=200) return Response(serializer.errors, status=status.HTTP_400_BAD_REQUEST)
def clone(self, request, *args, **kwargs): self.object = self.get_object() data = {'xform': self.object.pk, 'username': request.DATA['username']} serializer = CloneXFormSerializer(data=data) if serializer.is_valid(): clone_to_user = User.objects.get(username=data['username']) if not request.user.has_perm('can_add_xform', clone_to_user.profile): raise exceptions.PermissionDenied( detail=_(u"User %(user)s has no permission to add " "xforms to account %(account)s" % {'user': request.user.username, 'account': data['username']})) xform = serializer.save() serializer = XFormSerializer( xform.cloned_form, context={'request': request}) return Response(data=serializer.data, status=status.HTTP_201_CREATED) return Response(data=serializer.errors, status=status.HTTP_400_BAD_REQUEST)
def authenticate_credentials(self, key): token_cache = 'token_' + key cache_user = cache.get(token_cache) if cache_user: return (cache_user, key) try: token = self.model.objects.get(key=key) except self.model.DoesNotExist: raise exceptions.AuthenticationFailed('User does not exist.') if not token.user.is_active: raise exceptions.PermissionDenied('The user is forbidden.') utc_now = timezone.now() if token.created < utc_now - timezone.timedelta(hours=24 * 30): raise exceptions.AuthenticationFailed('Token has been expired.') if token: token_cache = 'token_' + key cache.set(token_cache, token.user, 24 * 7 * 60 * 60) return (token.user, token)
def my_exception_handler(exc, context): # Call REST framework's default exception handler first, # to get the standard error response. response = exception_handler(exc, context) # Now add the HTTP status code to the response. # print(exc) # print(context) if response is not None: if isinstance(exc, exceptions.AuthenticationFailed): response.data['error_code'] = 2 elif isinstance(exc, exceptions.PermissionDenied): response.data['error_code'] = 3 else: response.data['error_code'] = 1 return response
def place_order(request, pk=None): """ Place a specific order. """ order = get_object_or_404(rest.models.Order, pk=pk) if not request.user.is_superuser: if not order.organization.can_user_scan(request.user): raise PermissionDenied("You do not have sufficient privileges to start scans for that organization.") if not order.is_ready_to_place: raise PermissionDenied(order.get_ready_errors()) order.place_order() order.save() send_emails_for_placed_order.delay( order_uuid=unicode(order.uuid), receipt_description=order.get_receipt_description(), ) handle_placed_order.delay(order_uuid=unicode(order.uuid)) return Response(status=204)
def change_rights(self, request, pk): """ Used to change a member's rights. Only administrators can change member's rights. Only super-administrator can change admin's rights. If the super-administrator right is requested, then it must come from the current super-administrator whose thus, losing his status. If succeeded, returns HTTP_200_OK with the updated GroupMember object """ user = request.user member = self.get_or_404(pk) rights_serializer, rights = SigmaViewSet.get_deserialized(GroupMemberRightsSerializer, request.data) if not GroupMember.model.can_change_rights(user, member, rights): raise PermissionDenied() if rights.is_super_administrator: pass # TODO : de-superadminer le gars qui file ses droits member_serializer, member = self.get_deserialized(rights, member, partial=True) member_serializer.save() return Response(member_serializer.data, status=status.HTTP_200_OK)
def add_follower(self, request, pk=None): guid = request.data.get("guid") try: target_profile = Profile.objects.get(guid=guid) except Profile.DoesNotExist: raise PermissionDenied("Profile given does not exist.") profile = self.get_object() if profile.guid == guid: raise ValidationError("Cannot follow self!") profile.following.add(target_profile) return Response({"status": "Follower added."})
def remove_follower(self, request, pk=None): guid = request.data.get("guid") try: target_profile = Profile.objects.get(guid=guid) except Profile.DoesNotExist: raise PermissionDenied("Profile given does not exist.") profile = self.get_object() if profile.guid == guid: raise ValidationError("Cannot unfollow self!") profile.following.remove(target_profile) return Response({"status": "Follower removed."})
def log_pending(self, request, pk=None, **kwargs): """ Create event log with status == "mk42.apps.core.constants.STATUS_PENDING". Actually NO. :param request: django request instance. :type request: django.http.request.HttpRequest. :param pk: event object primary key. :type pk: unicode. :param kwargs: additional args. :type kwargs: dict. :return: django rest framework response. :rtype: rest_framework.response.Response. """ raise PermissionDenied(detail=_("Okay move along, move along people, there's nothing to see here!"))
def log_canceled(self, request, pk=None, **kwargs): """ Create event log with status == "mk42.apps.core.constants.STATUS_CANCELED". :param request: django request instance. :type request: django.http.request.HttpRequest. :param pk: event object primary key. :type pk: unicode. :param kwargs: additional args. :type kwargs: dict. :return: django rest framework response. :rtype: rest_framework.response.Response. """ obj = self.get_object() if request.user != obj.group.owner: # only group owner can change event status raise PermissionDenied(detail=_("You must be owner of this group to perform this action.")) log = obj.log_canceled(**kwargs) if not log: # can't create event logs with status == "mk42.apps.core.constants.STATUS_CANCELED" # if log with status == "mk42.apps.core.constants.STATUS_PENDING" does not exist # or log with status == "mk42.apps.core.constants.STATUS_ONGOING" exist raise PermissionDenied(detail=_("Can't change status to '{status}'.").format(**{"status": dict(obj.STATUS_CHOICES).get(obj.STATUS_CANCELED), })) return Response({"detail": EventLogSerializer(instance=log, context={"request": request, }).data if log else None, })
def log_ongoing(self, request, pk=None, **kwargs): """ Create event log with status == "mk42.apps.core.constants.STATUS_ONGOING". :param request: django request instance. :type request: django.http.request.HttpRequest. :param pk: event object primary key. :type pk: unicode. :param kwargs: additional args. :type kwargs: dict. :return: django rest framework response. :rtype: rest_framework.response.Response. """ obj = self.get_object() if request.user != obj.group.owner: # only group owner can change event status raise PermissionDenied(detail=_("You must be owner of this group to perform this action.")) log = obj.log_ongoing(**kwargs) if not log: # can't create event logs with status == "mk42.apps.core.constants.STATUS_ONGOING" # if log with status == "mk42.apps.core.constants.STATUS_FINISHED" exist # if log with status == "mk42.apps.core.constants.STATUS_CANCELED" exist # or log with status == "mk42.apps.core.constants.STATUS_PENDING" does not exist raise PermissionDenied(detail=_("Can't change status to '{status}'.").format(**{"status": dict(obj.STATUS_CHOICES).get(obj.STATUS_ONGOING), })) return Response({"detail": EventLogSerializer(instance=log, context={"request": request, }).data if log else None, })
def log_finished(self, request, pk=None, **kwargs): """ Create event log with status == "mk42.apps.core.constants.STATUS_FINISHED". :param request: django request instance. :type request: django.http.request.HttpRequest. :param pk: event object primary key. :type pk: unicode. :param kwargs: additional args. :type kwargs: dict. :return: django rest framework response. :rtype: rest_framework.response.Response. """ obj = self.get_object() if request.user != obj.group.owner: # only group owner can change event status raise PermissionDenied(detail=_("You must be owner of this group to perform this action.")) log = obj.log_finished(**kwargs) if not log: # can't create event logs with status == "mk42.apps.core.constants.STATUS_FINISHED" # if log with status == "mk42.apps.core.constants.STATUS_FINISHED" exist # if log with status == "mk42.apps.core.constants.STATUS_CANCELED" exist # or log with status == "mk42.apps.core.constants.STATUS_ONGOING" does not exist raise PermissionDenied(detail=_("Can't change status to '{status}'.").format(**{"status": dict(obj.STATUS_CHOICES).get(obj.STATUS_FINISHED), })) return Response({"detail": EventLogSerializer(instance=log, context={"request": request, }).data if log else None, })
def parkings_exception_handler(exc, context): response = exception_handler(exc, context) if response is not None: if isinstance(exc, ParkingException): response.data['code'] = exc.get_codes() elif isinstance(exc, PermissionDenied): response.data['code'] = 'permission_denied' return response
def get_profile(self): try: profile = self.request.user.profile except exceptions.ObjectDoesNotExist: raise PermissionDenied(detail='Role incorrect') return profile
def get_queryset(self): only_valid = self.request.query_params.get('only_valid', '') only_valid = only_valid == 'true' user = self.request.user try: queryset = user.parent.coupon_set.all() except exceptions.ObjectDoesNotExist: raise PermissionDenied(detail='Role incorrect') now = timezone.now() out_time = models.Coupon.OUT_OF_DATE_TIME if only_valid: # ?????????? # ????, ????? => ?????, ??????????? queryset = queryset.filter( expired_at__gt=now, used=False, ).order_by('-amount', 'expired_at') else: # ??????????, ??????? # ????, ????? => ????? queryset = queryset.filter( expired_at__gt=now - out_time, ).extra( # ??????????????? select={'date_diff': 'abs(extract(epoch from (now()-expired_at)))'} ).order_by('date_diff', '-amount') # ??????? if self.action == 'list': return sorted(queryset, key=lambda x: x.sort_key()) return queryset
def get_parent(self): try: parent = self.request.user.parent except (AttributeError, exceptions.ObjectDoesNotExist): raise PermissionDenied(detail='Role incorrect') return parent
def _enforce_csrf(self, request): """Make sure that we have a valid CSRF token. Django restframework does validate this when using the SessionAuthentication but since that also checks if the user is authenticated we can't really use that """ reason = CSRFCheck().process_view(request, None, (), {}) if reason: # CSRF failed, bail with explicit error message raise PermissionDenied('CSRF Failed: %s' % reason)
def has_view_permissions(self, path, method, view): """ Return `True` if the incoming request has the correct view permissions. """ if view.request is None: return True try: view.check_permissions(view.request) except (exceptions.APIException, Http404, PermissionDenied): return False return True
def get(self, request, *args, **kwargs): schema = self.schema_generator.get_schema(request, self.public) if schema is None: raise exceptions.PermissionDenied() return Response(schema)
def exception_handler(exc, context): """ Returns the response that should be used for any given exception. By default we handle the REST framework `APIException`, and also Django's built-in `Http404` and `PermissionDenied` exceptions. Any unhandled exceptions may return `None`, which will cause a 500 error to be raised. """ if isinstance(exc, exceptions.APIException): headers = {} if getattr(exc, 'auth_header', None): headers['WWW-Authenticate'] = exc.auth_header if getattr(exc, 'wait', None): headers['Retry-After'] = '%d' % exc.wait if isinstance(exc.detail, (list, dict)): data = exc.detail else: data = {'detail': exc.detail} set_rollback() return Response(data, status=exc.status_code, headers=headers) elif isinstance(exc, Http404): msg = _('Not found.') data = {'detail': six.text_type(msg)} set_rollback() return Response(data, status=status.HTTP_404_NOT_FOUND) elif isinstance(exc, PermissionDenied): msg = _('Permission denied.') data = {'detail': six.text_type(msg)} set_rollback() return Response(data, status=status.HTTP_403_FORBIDDEN) return None
def permission_denied(self, request, message=None): """ If request is not permitted, determine what kind of exception to raise. """ if request.authenticators and not request.successful_authenticator: raise exceptions.NotAuthenticated() raise exceptions.PermissionDenied(detail=message)
def enforce_csrf(self, request): """ Enforce CSRF validation for session based authentication. """ reason = CSRFCheck().process_view(request, None, (), {}) if reason: # CSRF failed, bail with explicit error message raise exceptions.PermissionDenied('CSRF Failed: %s' % reason)
def check_blacklist(repo): """Check a Docker repository name for collision with deis/* components.""" blacklisted = [ # NOTE: keep this list up to date! 'builder', 'cache', 'controller', 'database', 'logger', 'logspout', 'publisher', 'registry', 'router', 'store-admin', 'store-daemon', 'store-gateway', 'store-metadata', 'store-monitor', 'swarm', 'mesos-master', 'mesos-marathon', 'mesos-slave', 'zookeeper', ] if any("deis/{}".format(c) in repo for c in blacklisted): raise PermissionDenied("Repository name {} is not allowed".format(repo))
def test_pull(self, mock_client): self.client = DockerClient() self.client.pull('alpine', '3.2') docker_pull = self.client.client.pull docker_pull.assert_called_once_with( 'alpine', tag='3.2', insecure_registry=True, stream=True) # Test that blacklisted image names can't be pulled with self.assertRaises(PermissionDenied): self.client.pull('deis/controller', 'v1.11.1') with self.assertRaises(PermissionDenied): self.client.pull('localhost:5000/deis/controller', 'v1.11.1')
def test_tag(self, mock_client): self.client = DockerClient() self.client.tag('ozzy/embryo:git-f2a8020', 'ozzy/embryo', 'v4') docker_tag = self.client.client.tag docker_tag.assert_called_once_with( 'ozzy/embryo:git-f2a8020', 'ozzy/embryo', tag='v4', force=True) # Test that blacklisted image names can't be tagged with self.assertRaises(PermissionDenied): self.client.tag('deis/controller:v1.11.1', 'deis/controller', 'v1.11.1') with self.assertRaises(PermissionDenied): self.client.tag('localhost:5000/deis/controller:v1.11.1', 'deis/controller', 'v1.11.1')
def update(self, request, **kwargs): app = self.get_object() if request.data.get('owner'): if self.request.user != app.owner and not self.request.user.is_superuser: raise PermissionDenied() new_owner = get_object_or_404(User, username=request.data['owner']) app.owner = new_owner app.save() return Response(status=status.HTTP_200_OK)
def create(self, request, *args, **kwargs): app = get_object_or_404(models.App, id=request.data['receive_repo']) request.user = get_object_or_404(User, username=request.data['receive_user']) # check the user is authorized for this app if not permissions.is_app_user(request, app): raise PermissionDenied() request.data['app'] = app request.data['owner'] = request.user return super(PushHookViewSet, self).create(request, *args, **kwargs)
def create(self, request, *args, **kwargs): app = get_object_or_404(models.App, id=request.data['receive_repo']) self.user = request.user = get_object_or_404(User, username=request.data['receive_user']) # check the user is authorized for this app if not permissions.is_app_user(request, app): raise PermissionDenied() request.data['app'] = app request.data['owner'] = self.user super(BuildHookViewSet, self).create(request, *args, **kwargs) # return the application databag response = {'release': {'version': app.release_set.latest().version}, 'domains': ['.'.join([app.id, settings.DEIS_DOMAIN])]} return Response(response, status=status.HTTP_200_OK)
def create(self, request, *args, **kwargs): app = get_object_or_404(models.App, id=request.data['receive_repo']) request.user = get_object_or_404(User, username=request.data['receive_user']) # check the user is authorized for this app if not permissions.is_app_user(request, app): raise PermissionDenied() config = app.release_set.latest().config serializer = self.get_serializer(config) return Response(serializer.data, status=status.HTTP_200_OK)
def destroy(self, request, **kwargs): app = get_object_or_404(models.App, id=self.kwargs['id']) user = get_object_or_404(User, username=kwargs['username']) perm_name = "api.{}".format(self.perm) if not user.has_perm(perm_name, app): raise PermissionDenied() if (user != request.user and not permissions.IsOwnerOrAdmin.has_object_permission(permissions.IsOwnerOrAdmin(), request, self, app)): raise PermissionDenied() remove_perm(self.perm, user, app) models.log_event(app, "User {} was revoked access to {}".format(user, app)) return Response(status=status.HTTP_204_NO_CONTENT)
def destroy(self, request, **kwargs): calling_obj = self.get_object() target_obj = calling_obj if request.data.get('username'): # if you "accidentally" target yourself, that should be fine if calling_obj.username == request.data['username'] or calling_obj.is_superuser: target_obj = get_object_or_404(User, username=request.data['username']) else: raise PermissionDenied() target_obj.delete() return Response(status=status.HTTP_204_NO_CONTENT)
def create(self, request, **kwargs): app = self.get_object() if not permissions.IsOwnerOrAdmin.has_object_permission(permissions.IsOwnerOrAdmin(), request, self, app): raise PermissionDenied() user = get_object_or_404(User, username=request.data['username']) assign_perm(self.perm, user, app) models.log_event(app, "User {} was granted access to {}".format(user, app)) return Response(status=status.HTTP_201_CREATED)