我们从Python开源项目中,提取了以下49个代码示例,用于说明如何使用rest_framework.status.HTTP_403_FORBIDDEN。
def user_detail(request, id,format=None): """ Retrieve, update or delete a server assets instance. """ try: snippet = User.objects.get(id=id) except User.DoesNotExist: return Response(status=status.HTTP_404_NOT_FOUND) if request.method == 'GET': serializer = UserSerializer(snippet) return Response(serializer.data) elif request.method == 'PUT': serializer = UserSerializer(snippet, data=request.data) if serializer.is_valid(): serializer.save() return Response(serializer.data) return Response(serializer.errors, status=status.HTTP_400_BAD_REQUEST) elif request.method == 'DELETE': if not request.user.has_perm('OpsManage.delete_user'): return Response(status=status.HTTP_403_FORBIDDEN) snippet.delete() return Response(status=status.HTTP_204_NO_CONTENT)
def assetsLog_detail(request, id,format=None): """ Retrieve, update or delete a server assets instance. """ try: snippet = Log_Assets.objects.get(id=id) except Log_Assets.DoesNotExist: return Response(status=status.HTTP_404_NOT_FOUND) if request.method == 'GET': serializer = AssetsLogsSerializer(snippet) return Response(serializer.data) elif request.method == 'DELETE' and request.user.has_perm('OpsManage.delete_log_assets'): if not request.user.has_perm('OpsManage.delete_log_assets'): return Response(status=status.HTTP_403_FORBIDDEN) snippet.delete() return Response(status=status.HTTP_204_NO_CONTENT)
def group_list(request,format=None): """ List all order, or create a server assets order. """ if not request.user.has_perm('Opsmanage.read_group'): return Response(status=status.HTTP_403_FORBIDDEN) if request.method == 'GET': snippets = Group.objects.all() serializer = GroupSerializer(snippets, many=True) return Response(serializer.data) elif request.method == 'POST': if not request.user.has_perm('Opsmanage.change_group'): return Response(status=status.HTTP_403_FORBIDDEN) serializer = GroupSerializer(data=request.data) if serializer.is_valid(): serializer.save() recordAssets.delay(user=str(request.user),content="??????{group_name}".format(group_name=request.data.get("name")),type="group",id=serializer.data.get('id')) return Response(serializer.data, status=status.HTTP_201_CREATED) return Response(serializer.errors, status=status.HTTP_400_BAD_REQUEST)
def deploy_detail(request, id,format=None): """ Retrieve, update or delete a server assets instance. """ try: snippet = Project_Config.objects.get(id=id) except Project_Config.DoesNotExist: return Response(status=status.HTTP_404_NOT_FOUND) if request.method == 'GET': serializer = ProjectConfigSerializer(snippet) return Response(serializer.data) elif request.method == 'DELETE': if not request.user.has_perm('OpsManage.delete_project_config'): return Response(status=status.HTTP_403_FORBIDDEN) recordProject.delay(project_user=str(request.user),project_id=id,project_name=snippet.project_name,project_content="????") snippet.delete() return Response(status=status.HTTP_204_NO_CONTENT)
def playbook_detail(request, id,format=None): """ Retrieve, update or delete a server assets instance. """ try: snippet = Ansible_Playbook.objects.get(id=id) except Ansible_Playbook.DoesNotExist: return Response(status=status.HTTP_404_NOT_FOUND) if request.method == 'GET': serializer = AnbiblePlaybookSerializer(snippet) return Response(serializer.data) elif request.method == 'DELETE': if not request.user.has_perm('OpsManage.can_delete_ansible_playbook'): return Response(status=status.HTTP_403_FORBIDDEN) snippet.delete() return Response(status=status.HTTP_204_NO_CONTENT)
def modelLogsdetail(request, id,format=None): """ Retrieve, update or delete a server assets instance. """ try: snippet = Log_Ansible_Model.objects.get(id=id) except Log_Ansible_Model.DoesNotExist: return Response(status=status.HTTP_404_NOT_FOUND) if request.method == 'GET': serializer = AnsibleModelLogsSerializer(snippet) return Response(serializer.data) elif request.method == 'DELETE': if not request.user.has_perm('OpsManage.can_delete_log_ansible_model'): return Response(status=status.HTTP_403_FORBIDDEN) snippet.delete() return Response(status=status.HTTP_204_NO_CONTENT)
def playbookLogsdetail(request, id,format=None): """ Retrieve, update or delete a server assets instance. """ try: snippet = Log_Ansible_Playbook.objects.get(id=id) except Log_Ansible_Playbook.DoesNotExist: return Response(status=status.HTTP_404_NOT_FOUND) if request.method == 'GET': serializer = AnsiblePlaybookLogsSerializer(snippet) return Response(serializer.data) elif request.method == 'DELETE': if not request.user.has_perm('OpsManage.delete_log_ansible_playbook'): return Response(status=status.HTTP_403_FORBIDDEN) snippet.delete() return Response(status=status.HTTP_204_NO_CONTENT)
def cron_detail(request, id,format=None): """ Retrieve, update or delete a server assets instance. """ try: snippet = Cron_Config.objects.get(id=id) except Cron_Config.DoesNotExist: return Response(status=status.HTTP_404_NOT_FOUND) if request.method == 'GET': serializer = CronSerializer(snippet) return Response(serializer.data) elif request.method == 'PUT': serializer = CronSerializer(snippet, data=request.data) if serializer.is_valid(): serializer.save() return Response(serializer.data) return Response(serializer.errors, status=status.HTTP_400_BAD_REQUEST) elif request.method == 'DELETE': if not request.user.has_perm('OpsManage.delete_service_assets'): return Response(status=status.HTTP_403_FORBIDDEN) snippet.delete() return Response(status=status.HTTP_204_NO_CONTENT)
def cronLogsdetail(request, id,format=None): """ Retrieve, update or delete a server assets instance. """ try: snippet = Log_Cron_Config.objects.get(id=id) except Log_Cron_Config.DoesNotExist: return Response(status=status.HTTP_404_NOT_FOUND) if request.method == 'GET': serializer = CronLogsSerializer(snippet) return Response(serializer.data) elif request.method == 'DELETE': if not request.user.has_perm('OpsManage.delete_log_cron_config'): return Response(status=status.HTTP_403_FORBIDDEN) snippet.delete() return Response(status=status.HTTP_204_NO_CONTENT)
def create(self, request, *args, **kwargs): serializer = self.get_serializer(data=request.data) if not serializer.is_valid(): return Response(serializer.errors, status=status.HTTP_400_BAD_REQUEST) game_round = self.get_round() player = Player.objects.get(game=game_round.game, user=request.user) card = serializer.validated_data.get('card') story = serializer.validated_data.get('story') try: play = Play.play_for_round(game_round, player, card, story) except GameInvalidPlay as exc: return Response({'detail': exc.msg}, status=status.HTTP_403_FORBIDDEN) play_data = PlaySerializer(play).data return Response(play_data, status=status.HTTP_201_CREATED)
def create(self, request, *args, **kwargs): serializer = self.get_serializer(data=request.data) if not serializer.is_valid(): return Response(serializer.errors, status=status.HTTP_400_BAD_REQUEST) game_round = self.get_round() player = Player.objects.get(game=game_round.game, user=request.user) card = serializer.validated_data.get('card') play = Play.objects.get(game_round=game_round, player=player) try: play.vote_card(card) except GameInvalidPlay as exc: return Response({'detail': exc.msg}, status=status.HTTP_403_FORBIDDEN) game_round.refresh_from_db() if game_round.status == RoundStatus.COMPLETE: game = self.get_game() try: game.next_round() except (GameDeckExhausted, GameFinished): # treat deck exhaust as a fair finish pass play_data = PlaySerializer(play).data return Response(play_data, status=status.HTTP_201_CREATED)
def create(self, request, *args, **kwargs): serializer = self.get_serializer(data=request.data) if not serializer.is_valid(): return Response(serializer.errors, status=status.HTTP_400_BAD_REQUEST) try: game = self.get_game() player = game.add_player(request.user, request.data['name']) except IntegrityError as exc: if 'user_id' in str(exc): return Response({"detail": 'You are already playing this game'}, status=status.HTTP_403_FORBIDDEN) return Response({"detail": "Username already in use"}, status=status.HTTP_403_FORBIDDEN) data = PlayerSerializer(player).data return Response(data, status=status.HTTP_201_CREATED)
def vmServer_detail(request, id,format=None): """ Retrieve, update or delete a server assets instance. """ try: snippet = VmServer.objects.get(id=id) except VmServer.DoesNotExist: return Response(status=status.HTTP_404_NOT_FOUND) if request.method == 'GET': serializer = VmServerSerializer(snippet) return Response(serializer.data) elif request.method == 'PUT': serializer = VmServerSerializer(snippet, data=request.data) if serializer.is_valid(): serializer.save() return Response(serializer.data) return Response(serializer.errors, status=status.HTTP_400_BAD_REQUEST) elif request.method == 'DELETE': if not request.user.has_perm('vmanageplatform.delete_vmserver'): return Response(status=status.HTTP_403_FORBIDDEN) snippet.delete() return Response(status=status.HTTP_204_NO_CONTENT)
def vmlog_detail(request, id,format=None): """ Retrieve, update or delete a server assets instance. """ try: snippet = VmLogs.objects.get(id=id) except VmLogs.DoesNotExist: return Response(status=status.HTTP_404_NOT_FOUND) if request.method == 'GET': serializer = VmLogsSerializer(snippet) return Response(serializer.data) elif request.method == 'PUT': serializer = VmLogsSerializer(snippet, data=request.data) if serializer.is_valid(): serializer.save() return Response(serializer.data) return Response(serializer.errors, status=status.HTTP_400_BAD_REQUEST) elif request.method == 'DELETE': if not request.user.has_perm('vmanageplatform.delete_vmserver'): return Response(status=status.HTTP_403_FORBIDDEN) snippet.delete() return Response(status=status.HTTP_204_NO_CONTENT)
def testCantDeleteOwnRRsetWhileAccountIsLocked(self): self.owner.captcha_required = True self.owner.save() url = reverse('rrsets', args=(self.ownedDomains[1].name,)) data = {'records': ['1.2.3.4'], 'ttl': 60, 'type': 'A'} response = self.client.post(url, json.dumps(data), content_type='application/json') self.assertEqual(response.status_code, status.HTTP_201_CREATED) url = reverse('rrset', args=(self.ownedDomains[1].name, '', 'A',)) # Try PATCH with empty records data = {'records': []} response = self.client.patch(url, json.dumps(data), content_type='application/json') self.assertEqual(response.status_code, status.HTTP_403_FORBIDDEN) # Try DELETE response = self.client.delete(url) self.assertEqual(response.status_code, status.HTTP_403_FORBIDDEN)
def handle_exception(self, exc): """ Handle any exception that occurs, by returning an appropriate response, or re-raising the error. """ if isinstance(exc, (exceptions.NotAuthenticated, exceptions.AuthenticationFailed)): # WWW-Authenticate header for 401 responses, else coerce to 403 auth_header = self.get_authenticate_header(self.request) if auth_header: exc.auth_header = auth_header else: exc.status_code = status.HTTP_403_FORBIDDEN exception_handler = self.get_exception_handler() context = self.get_exception_handler_context() response = exception_handler(exc, context) if response is None: self.raise_uncaught_exception(exc) response.exception = True return response
def delete(self, request, tower_uuid): network = get_network_from_user(request.user) tower = models.BTS.objects.get(uuid=tower_uuid) if tower.network and tower.network != network: return Response("Network is not associated with that BTS.", status=status.HTTP_403_FORBIDDEN) # Create a DerigisteredBTS instance. dbts = models.DeregisteredBTS(uuid=tower.uuid, secret=tower.secret) dbts.save() # Create a 'deregister_bts' UsageEvent. now = datetime.datetime.now(pytz.utc) if tower.nickname: name = 'tower "%s" (%s)' % (tower.nickname, tower.uuid) else: name = 'tower %s' % tower.uuid event = models.UsageEvent.objects.create( date=now, bts_uuid=tower.uuid, kind='deregister_bts', reason='deregistered %s' % name) event.save() # TODO(matt): generate revocation certs # And finally delete the BTS. tower.delete() return Response("")
def delete(self, request, bts_uuid=None, number=None, format=None): """ Dis-associate a number from a BTS and mark it available. """ if not (number or bts_uuid): return Response("", status=status.HTTP_400_BAD_REQUEST) network = get_network_from_user(request.user) try: bts = models.BTS.objects.get(uuid=bts_uuid, network=network) except models.BTS.DoesNotExist: return Response("User is not associated with that BTS.", status=status.HTTP_403_FORBIDDEN) with transaction.atomic(): q = models.Number.objects.filter(number__exact=number).filter( network=bts.network) for number in q: number.state = "available" number.network = None number.subscriber = None number.save() return Response(None, status=status.HTTP_200_OK) return Response(None, status=status.HTTP_404_NOT_FOUND)
def test_workflow_duplicate_view(self): old_ids = [w.id for w in Workflow.objects.all()] # list of all current workflow ids response = self.client.get('/api/workflows/%d/duplicate' % self.workflow1.id) self.assertEqual(response.status_code, status.HTTP_201_CREATED) new_id = response.data['id'] self.assertFalse(new_id in old_ids) # created at entirely new id self.assertEqual(response.data['name'], 'Copy of ' + self.workflow1.name) new_wf = Workflow.objects.get(pk=new_id) # will fail if no Workflow created # Ensure 404 with bad id response = self.client.get('/api/workflows/%d/duplicate' % 999999) self.assertEqual(response.status_code, status.HTTP_404_NOT_FOUND) # Ensure 403 when another user tries to clone private workflow self.assertFalse(self.workflow1.public) self.client.force_login(self.otheruser) response = self.client.get('/api/workflows/%d/duplicate' % self.workflow1.id) self.assertEqual(response.status_code, status.HTTP_403_FORBIDDEN) # But another user can duplicate public workflow self.workflow1.public = True self.workflow1.save() response = self.client.get('/api/workflows/%d/duplicate' % self.workflow1.id) self.assertEqual(response.status_code, status.HTTP_201_CREATED)
def test_edit_property(self): """Anonymous users should not be able to PUT new properties. """ url = reverse('property-detail', kwargs={'pk':1}) data = { 'listing_type': FOR_SALE, 'price': 1234, 'raw_address': '125 Fake St', 'size_units': METRIC } response = self.client.put(url, data, format='json') self.assertEqual(response.status_code, status.HTTP_403_FORBIDDEN) self.assertEqual(Property.objects.filter(price=1000).count(), 1) self.assertEqual(Property.objects.filter(price=1234).count(), 0)
def test_edit_property(self): """Push Group users should not be able to PUT new properties. """ url = reverse('property-detail', kwargs={'pk':1}) data = { 'listing_type': FOR_SALE, 'price': 1235, 'raw_address': '125 Fake St', 'size_units': METRIC, 'listing_timestamp': timezone.now() } response = self.client.put(url, data, format='json') self.assertEqual(response.status_code, status.HTTP_403_FORBIDDEN) self.assertEqual(Property.objects.count(), 1) self.assertEqual(Property.objects.filter(price=1000).count(), 1) self.assertEqual(Property.objects.filter(price=1235).count(), 0)
def test_write_property(self): """Flagging Group users should not be able to POST new properties. """ url = reverse('property-list') data = { 'listing_type': FOR_SALE, 'price': 1234, 'raw_address': '125 Fake St', 'size_units': METRIC, 'listing_timestamp': timezone.now() } response = self.client.post(url, data, format='json') self.assertEqual(response.status_code, status.HTTP_403_FORBIDDEN) self.assertEqual(Property.objects.count(), 1)
def retrieve(self, request, *args, **kwargs): """ Return particular client which owned by particular reseller """ application = Application.objects.filter(owner=request.user).first() if application: reseller = Reseller.objects.filter(name=kwargs['reseller_name'], application=application).first() else: reseller = Reseller.objects.filter(name=kwargs['reseller_name'], owner=request.user).first() if not reseller: admin = ClientUser.objects.filter(owner=request.user, admin=True).first() if not admin: return Response("Client does not exist", status=HTTP_404_NOT_FOUND) if not admin.client.name == kwargs['name']: return Response("Authorization failed", status=status.HTTP_403_FORBIDDEN) reseller = admin.client.reseller client = Client.objects.filter(reseller=reseller, name=kwargs['name']).first() if not client: return Response("Client does not exist", status=status.HTTP_404_NOT_FOUND) queryset = (client, ) serializer = ClientSerializer(queryset, many=True) return Response(serializer.data[0])
def get(self, request): result = check_token(request) if result['permission'] is False: return result['response'] if all(auth not in result['auth'] for auth in ['1', '11', ]): alert = {'alert': '??????????????'} return Response(alert, status=status.HTTP_403_FORBIDDEN) commodities = Commodity.objects.all() if commodities is False: alert = {'alert': '?????'} return Response(alert) serializer = CommoditySerializer(commodities, many=True) return Response(serializer.data)
def get(self, request, commodity_id): result = check_token(request) if result['permission'] is False: return result['response'] if all(auth not in result['auth'] for auth in ['1', '11', ]): alert = {'alert': '??????????????'} return Response(alert, status=status.HTTP_403_FORBIDDEN) commodity = self.get_object(commodity_id) if commodity is None: alert = {'alert': '?????????'} return Response(alert, status=status.HTTP_404_NOT_FOUND) serializer = CommoditySerializer(commodity) return Response(serializer.data)
def get(self, request): result = check_token(request) if result['permission'] is False: return result['response'] if all(auth not in result['auth'] for auth in ['1', '11', ]): alert = {'alert': '??????????????'} return Response(alert, status=status.HTTP_403_FORBIDDEN) in_entrepots = InEntrepot.objects.all() if in_entrepots is False: alert = {'alert': '?????'} return Response(alert) serializer = InEntrepotSerializer(in_entrepots, many=True) return Response(serializer.data)
def get(self, request, ie_id): result = check_token(request) if result['permission'] is False: return result['response'] if all(auth not in result['auth'] for auth in ['1', '11', ]): alert = {'alert': '??????????????'} return Response(alert, status=status.HTTP_403_FORBIDDEN) in_entrepot_detail_list = self.get_object_list(ie_id) if not in_entrepot_detail_list: try: in_entrepot = InEntrepot.objects.get(ie_id=ie_id) except InEntrepot.DoesNotExist: alert = {'alert': '????????'} return Response(alert, status=status.HTTP_404_NOT_FOUND) serializer = InEntrepotSerializer(in_entrepot) return Response(serializer.data) serializer = InEntrepotDetailSerializer(in_entrepot_detail_list, many=True) return Response(serializer.data)
def get(self, request): result = check_token(request) if result['permission'] is False: return result['response'] if all(auth not in result['auth'] for auth in ['1', '11', ]): alert = {'alert': '??????????????'} return Response(alert, status=status.HTTP_403_FORBIDDEN) out_entrepots = OutEntrepot.objects.all() if out_entrepots is False: alert = {'alert': '????'} return Response(alert) serializer = InEntrepotSerializer(out_entrepots, many=True) return Response(serializer.data)
def get(self, request, emp_id): result = check_token(request) if result['permission'] is False: return result['response'] if all(auth not in result['auth'] for auth in ['1', '2', ]) and result['emp_id'] != emp_id: alert = {'alert': '??????????????'} return Response(alert, status=status.HTTP_403_FORBIDDEN) employee = self.get_object(emp_id) if employee is None: alert = {'alert': '?????????'} return Response(alert, status=status.HTTP_404_NOT_FOUND) serializer = EmployeeSerializer(employee) return Response(serializer.data)
def get(self, request, repair_id): result = check_token(request) if not result['permission']: return result['response'] repair = self.get_object(repair_id) if repair is None: alert = {'alert': '????????'} return Response(alert, status=status.HTTP_404_NOT_FOUND) if all(auth not in result['auth'] for auth in ['1', '10']) and repair.part_id != result['part_id']: alert = {'alert': '?????????????'} return Response(alert, status=status.HTTP_403_FORBIDDEN) serializer = RepairSerializer(repair) return Response(serializer.data)
def get(self, request, room_id): result = check_token(request) if not result['permission']: return result['response'] if all(auth not in result['auth'] for auth in ['1', '8']): alert = {'alert': '??????????????'} return Response(alert, status=status.HTTP_403_FORBIDDEN) room = self.get_object(room_id) if room is None: alert = {'alert': '?????????'} return Response(alert, status=status.HTTP_404_NOT_FOUND) serializer = RoomSerializer(room) return Response(serializer.data)
def get(self, request): result = check_token(request) if not result['permission']: return result['response'] if all(auth not in result['auth'] for auth in ['1', '8']): alert = {'alert': '??????????????'} return Response(alert, status=status.HTTP_403_FORBIDDEN) cars = Car.objects.all() if not cars: alert = {'alert': '????????'} return Response(alert) serializer = CarSerializer(cars, many=True) return Response(serializer.data)
def get(self, request, car_id): result = check_token(request) if not result['permission']: return result['response'] if all(auth not in result['auth'] for auth in ['1', '8']): alert = {'alert': '??????????????'} return Response(alert, status=status.HTTP_403_FORBIDDEN) car = self.get_object(car_id) if car is None: alert = {'alert': '????????'} return Response(alert, status=status.HTTP_404_NOT_FOUND) serializer = CarSerializer(car) return Response(serializer.data)
def get(self, request): result = check_token(request) if not result['permission']: return result['response'] if all(auth not in result['auth'] for auth in ['1', '8']): alert = {'alert': '??????????????'} return Response(alert, status=status.HTTP_403_FORBIDDEN) temp_employees = TempEmployee.objects.all() if not temp_employees: alert = {'alert': '????????'} return Response(alert) serializer = TempEmployeeSerializer(temp_employees, many=True) return Response(serializer.data)
def get(self, request, temp_emp_id): result = check_token(request) if not result['permission']: return result['response'] if all(auth not in result['auth'] for auth in ['1', '8']): alert = {'alert': '??????????????'} return Response(alert, status=status.HTTP_403_FORBIDDEN) temp_employee = self.get_object(temp_emp_id) if temp_employee is None: alert = {'alert': '?????????'} return Response(alert, status=status.HTTP_404_NOT_FOUND) serializer = TempEmployeeSerializer(temp_employee) return Response(serializer.data)
def create(request, user_pk=None) -> Response: """Create a new transaction for a user :param request: Request send from the client :param user_pk: Primary key to identify a user :return: Response """ value = request.data.get('value') if value is None: return Response(data={'msg': 'Value missing'}, status=status.HTTP_400_BAD_REQUEST) try: serializer = TransactionSerializer(data={'user': user_pk, 'value': value}) user = User.objects.get(pk=user_pk) serializer.is_valid(raise_exception=True) serializer.save() user.balance += value user.save() return Response(serializer.data, status=status.HTTP_201_CREATED) except KeyError as e: return Response(data={'msg': e}, status=status.HTTP_404_NOT_FOUND) except TransactionValueZero as e: return Response(data={'msg': str(e)}, status=status.HTTP_400_BAD_REQUEST) except TransactionValueError as e: return Response(data={'msg': str(e)}, status=status.HTTP_403_FORBIDDEN)
def toggle_featured(request, entryid): """ Toggle the featured status of an entry. """ user = request.user if user.has_perm('entries.can_change_entry'): entry = None # find the entry for this id try: entry = Entry.objects.get(id=entryid) except Entry.DoesNotExist: return Response("No such entry", status=status.HTTP_404_NOT_FOUND) entry.featured = not entry.featured entry.save() return Response("Toggled featured status.", status=status.HTTP_204_NO_CONTENT) return Response( "You donot have permission to change entry featured status.", status=status.HTTP_403_FORBIDDEN)
def check_permission(self, perm, url, method): user = UserFactory() self.client.force_authenticate(user) r = getattr(self.client, method)(url) self.assertEqual( r.status_code, status.HTTP_403_FORBIDDEN, prettify_response('Request should be forbidden', r) ) user.delete() user_with_perm = UserFactory(permissions=[perm]) self.client.force_authenticate(user_with_perm) r = getattr(self.client, method)(url) self.assertTrue( status.is_success(r.status_code), prettify_response('Request should be successful', r) ) user_with_perm.delete()
def test_successful_sign_out(self): r = self.client.sign_in(self.user.username) self.assertEqual( r.status_code, status.HTTP_200_OK, prettify_response('Sign in failed', r) ) token = r.data.get('token') r = self.client.sign_out(token) self.assertEqual( r.status_code, status.HTTP_204_NO_CONTENT, prettify_response('Sign out failed', r) ) r = self.client.authenticate(token) self.assertEqual( r.status_code, status.HTTP_403_FORBIDDEN, prettify_response('Authentication should be failed', r) )
def auth(self, request): serializer = AuthSerializer(data=request.DATA) if serializer.is_valid(): try: token = Authenticator.authenticate( email=serializer.data.get('email'), date=serializer.data.get('date'), signature=serializer.data.get('signature') ) if token: return Response(TokenSerializer(token).data) else: return Response({'result': False}, status=status.HTTP_403_FORBIDDEN) except Exception as e: raise CustomAPIException(exception=e) raise CustomAPIException(detail=serializer.errors)
def test_whitelist_block_request(self): """ Ensure we block requests from groups not included in whitelist """ self.client.login(username='john', password='john123john') data_ok = { 'name': 'acl', 'config': { 'whitelist': ['nogroup'], } } response = self.client.post(self.url, data_ok, format='json') self.assertEqual(response.status_code, status.HTTP_200_OK) self.client.credentials( HTTP_HOST='httpbin.org', HTTP_CONSUMER_ID=str(self.consumer.id) ) response = self.client.get('/get') self.assertEqual(response.status_code, status.HTTP_403_FORBIDDEN)
def test_blacklist_block_request(self): """ Ensure we block requests from groups included in blacklist """ self.client.login(username='john', password='john123john') data_ok = { 'name': 'acl', 'config': { 'blacklist': ['group1', 'anyone'], } } response = self.client.post(self.url, data_ok, format='json') self.assertEqual(response.status_code, status.HTTP_200_OK) self.client.credentials( HTTP_HOST='httpbin.org', HTTP_CONSUMER_ID=str(self.consumer.id) ) response = self.client.get('/get') self.assertEqual(response.status_code, status.HTTP_403_FORBIDDEN)
def test_block_request_without_consumer(self): """ Ensure we block requests from groups included in blacklist """ self.client.login(username='john', password='john123john') data_ok = { 'name': 'acl', 'config': { 'whitelist': ['group1', 'anyone'], } } response = self.client.post(self.url, data_ok, format='json') self.assertEqual(response.status_code, status.HTTP_200_OK) self.client.credentials(HTTP_HOST='httpbin.org') response = self.client.get('/get') self.assertEqual(response.status_code, status.HTTP_403_FORBIDDEN)
def test_whitelist_block_request(self): """ Ensure we block requests from clients not included in whitelist """ self.client.login(username='john', password='john123john') data_ok = { 'name': 'ip-restriction', 'config': { 'whitelist': ['192.168.1.0/24', '2001:db00::0/24'], 'blacklist': [], } } response = self.client.post(self.key_auth_url, data_ok, format='json') self.assertEqual(response.status_code, status.HTTP_200_OK) self.client.credentials(HTTP_HOST='httpbin.org') response = self.client.get('/get') self.assertEqual(response.status_code, status.HTTP_403_FORBIDDEN)
def test_blacklist_block_request(self): """ Ensure we block requests from clients included in blacklist """ self.client.login(username='john', password='john123john') data_ok = { 'name': 'ip-restriction', 'config': { 'blacklist': ['192.168.1.0/24', '2001:db00::0/24'], 'whitelist': [], } } response = self.client.post(self.key_auth_url, data_ok, format='json') self.assertEqual(response.status_code, status.HTTP_200_OK) self.client.credentials( HTTP_HOST='httpbin.org', HTTP_X_FORWARDED_FOR='192.168.1.10') response = self.client.get('/get') self.assertEqual(response.status_code, status.HTTP_403_FORBIDDEN)
def test_details_api_unauthenticated_403(self): """ Ensure we can get the details of an api object only if unauthenticated. """ data = { 'name': 'example-api', 'hosts': ['example.com'], 'upstream_url': 'https://httpbin.org' } # Log as superuser and create an api and then logout self.client.login(username='john', password='john123john') response = self.client.post(self.url, data, format='json') self.assertEqual(response.status_code, status.HTTP_201_CREATED) self.client.logout() url_get = '{}{}/'.format(self.url, 'example-api') response_get = self.client.get(url_get) self.assertEqual(response_get.status_code, status.HTTP_403_FORBIDDEN)
def test_delete_view(self): if self.api_is_read_only: self.assertTrue(True) else: to_delete = self.model_factory_class(**self.factory_delete_kwargs) to_delete.save() url = self.get_detail_url(to_delete) response = self.client.delete(url, format='json') if self.delete_requires_login: self.assertIn(response.status_code, [status.HTTP_403_FORBIDDEN, status.HTTP_401_UNAUTHORIZED]) self.login() response = self.client.delete(url, format='json') self.assertEqual(response.status_code, status.HTTP_204_NO_CONTENT)
def test_delete_subtitles_authentication(mock_moira_client, logged_in_apiclient, mocker): """ Tests that only authenticated users with collection admin permissions can delete VideoSubtitles """ mocker.patch('ui.views.VideoSubtitle.delete_from_s3') client, _ = logged_in_apiclient client.logout() moira_list = factories.MoiraListFactory() video = VideoFactory(collection=CollectionFactory(admin_lists=[moira_list])) subtitle = VideoSubtitleFactory(video=video) url = reverse('models-api:subtitle-detail', kwargs={'id': subtitle.id}) # call with anonymous user assert client.delete(url).status_code == status.HTTP_403_FORBIDDEN # call with another user not on admin list client.force_login(UserFactory()) mock_moira_client.return_value.user_lists.return_value = [] assert client.delete(url).status_code == status.HTTP_403_FORBIDDEN # call with user on admin list mock_moira_client.return_value.user_lists.return_value = [moira_list.name] assert client.delete(url).status_code == status.HTTP_204_NO_CONTENT
def post(self, request, format=None): # Get the patient if present or result None. ser = self.serializer_class( data=request.data, context={'request': request} ) if ser.is_valid(): token = PhoneToken.create_otp_for_number( request.data.get('phone_number') ) if token: phone_token = self.serializer_class( token, context={'request': request} ) return Response(phone_token.data) return Response({ 'reason': "you can not have more than {n} attempts per day, please try again tomorrow".format( n=getattr(settings, 'PHONE_LOGIN_ATTEMPTS', 10))}, status=status.HTTP_403_FORBIDDEN) return Response( {'reason': ser.errors}, status=status.HTTP_406_NOT_ACCEPTABLE)