我们从Python开源项目中,提取了以下42个代码示例,用于说明如何使用rest_framework.status.HTTP_401_UNAUTHORIZED。
def post(self, request, *args, **kwargs): """Login a user given a username password combination Args: request (rest_framework.request.Request) """ email = request.data.get('email', None) password = request.data.get('password', None) if not all([email, password]): raise serializers.ValidationError({'error': 'email and/or password not provided'}) user = authenticate(email=email, password=password) if user is not None: login(request, user) return Response(PFBUserSerializer(user).data) else: return Response({ 'detail': 'Unable to login with provided username/password' }, status=status.HTTP_401_UNAUTHORIZED)
def post(self, request, *args, **kwargs): serializer = self.get_serializer(data=request.data) serializer.is_valid(raise_exception=True) if hasattr(request.auth, 'get') and request.auth.get(claims.SESSION_ID): try: session_token = SessionToken.objects.active().\ get(pk=request.auth.get(claims.SESSION_ID), user=request.user) except SessionToken.DoesNotExist: return Response({'detail': 'Invalid token.'}, status=status.HTTP_401_UNAUTHORIZED) else: session_token, created = SessionToken.objects.active().\ first_or_create(user=request.user, request_meta=request.META) session_token.update_attributes(request=request) session_token.save() payload = create_authorization_payload( session_token=session_token, user=request.user, **serializer.validated_data ) jwt_token = encode_jwt_token(payload=payload) return Response({'token': jwt_token})
def test_delegate_jwti_inactive_user(self): data = { 'client_id': 'gandolf', 'grant_type': 'urn:ietf:params:oauth:grant-type:jwt-bearer', 'refresh_token': self.token1.key, 'api_type': 'app', } self.user1.is_active = False self.user1.save() response = self.client.post(self.delegate_url, data=data, format='json') self.assertEqual( response.status_code, status.HTTP_401_UNAUTHORIZED, (response.status_code, response.content) )
def test_transcript_credentials_unauthorized(self): """ Tests that if user is not logged in we get Unauthorized response. """ # Logout client if previously logged in. self.client.logout() # Try to send post without being authorized / logged in. response = self.client.post( self.url, data=json.dumps({'org': 'test'}), content_type='application/json' ) response_status_code = response.status_code self.assertEqual(response_status_code, status.HTTP_401_UNAUTHORIZED)
def decks_list(request): """ List all decks """ if request.method == 'GET': if 'name' in request.GET: decks = Deck.objects.filter(owner=request.user, name=request.GET['name']) else: decks = Deck.objects.filter(owner=request.user) serializer = DeckSerializer(decks, many=True) return Response(serializer.data) elif request.method == 'POST': serializer = DeckSerializer(data=request.data) if serializer.is_valid(): if request.user.is_anonymous: return Response(serializer.errors, status=status.HTTP_401_UNAUTHORIZED) else: serializer.save(owner=request.user) return Response(serializer.data, status=status.HTTP_201_CREATED) return Response(serializer.errors, status=status.HTTP_400_BAD_REQUEST)
def create(self, request): username = request.data.get('username', '') password = request.data.get('password', '') facility_id = request.data.get('facility', None) user = authenticate(username=username, password=password, facility=facility_id) if user is not None and user.is_active: # Correct password, and the user is marked "active" login(request, user) # Success! return Response(self.get_session(request)) elif not password and FacilityUser.objects.filter(username=username, facility=facility_id).exists(): # Password was missing, but username is valid, prompt to give password return Response({ "message": "Please provide password for user", "missing_field": "password" }, status=status.HTTP_400_BAD_REQUEST) else: # Respond with error return Response("User credentials invalid!", status=status.HTTP_401_UNAUTHORIZED)
def respective_clients(self, request, pk=None): """ Return the pk from financial planning and pk goal manager.\ Permissions: see_own_client_data and see_financial_adviser_data """ if FinancialPlanningPermission.has_permission_to_see_chart( request.user): current_financial_adviser = FinancialAdviser.objects.filter( id=self.request.user.id).get() # fp = Financial Planning and gm = Goal Manager fp_and_gm_pks = () if current_financial_adviser.clients.filter(pk=pk).exists(): try: goal_manager_pk = current_financial_adviser.clients.filter( id=pk).get().financialplanning.goal_manager.pk fp_and_gm_pks = ( {'fp': pk, 'gm': goal_manager_pk}) except BaseException: fp_and_gm_pks = ({'fp': '', 'gm': ''}) return Response(fp_and_gm_pks) else: return Response(status=status.HTTP_401_UNAUTHORIZED)
def test_replay_attack_multiple(self): ''' Authenticate with a valid header multiple times. The following authentication attempts should be detected as replay attacks. ''' header = self.make_header() with self.http_auth(header): response = self.client.get(self.base_url) self.assertEqual(response.status_code, status.HTTP_200_OK) for _ in range(10): with self.http_auth(header): new_resp = self.client.get(self.base_url) self.assertEqual(new_resp.status_code, status.HTTP_401_UNAUTHORIZED)
def test_logout_old_token(self): """ Here we prove that a client can't use an expired token to logout even if logged in again with a different one :return: """ token1 = self.get_token().data['token'] self.with_token(token1).logout_current() token2 = self.get_token().data['token'] self.assertNotEqual(token1, token2) response2 = self.with_token(token1).logout_current() self.assertEqual(response2.status_code, status.HTTP_401_UNAUTHORIZED) # We can eventually go on and verify if we're still logged # and if we can finally log out using the second token response3 = self.with_token(token2).logout_current() self.assertEqual(response3.status_code, status.HTTP_204_NO_CONTENT)
def test_close_other_access(self): """ During this test a client obtains 2 tokens and we prove that authenticating with just one of them he's able to invalidate the other one using the 'logout_other' endpoint :return: """ # Warning, in the next 3 lines we're using 'magic numbers' response_list = self.get_n_tokens(2) token1 = response_list[0].data['token'] token2 = response_list[1].data['token'] response3 = self.with_token(token1).logout_other() self.assertEqual(response3.status_code, status.HTTP_200_OK) response4 = self.verify_token(token1) self.assertEqual(response4.status_code, status.HTTP_204_NO_CONTENT) response5 = self.verify_token(token2) self.assertEqual(response5.status_code, status.HTTP_401_UNAUTHORIZED)
def post(self, request, format=None): UserModel = get_user_model() uidb64 = request.data['uid'] token = request.data['token'] new_password = request.data['new_password'] token_generator = MunchUserTokenGenerator() try: # urlsafe_base64_decode() decodes to bytestring on Python 3 uid = force_text(urlsafe_base64_decode(uidb64)) user = MunchUser.objects.get(pk=uid) except (TypeError, ValueError, OverflowError, UserModel.DoesNotExist): user = None if token_generator.check_token(user, token): user.set_password(new_password) return Response(status=status.HTTP_200_OK) else: return Response(status=status.HTTP_401_UNAUTHORIZED)
def test_delete_view(self): if self.api_is_read_only: self.assertTrue(True) else: to_delete = self.model_factory_class(**self.factory_delete_kwargs) to_delete.save() url = self.get_detail_url(to_delete) response = self.client.delete(url, format='json') if self.delete_requires_login: self.assertIn(response.status_code, [status.HTTP_403_FORBIDDEN, status.HTTP_401_UNAUTHORIZED]) self.login() response = self.client.delete(url, format='json') self.assertEqual(response.status_code, status.HTTP_204_NO_CONTENT)
def post(self, request, format=None): data = request.data email = data.get('email', None) password = data.get('password', None) account = authenticate(email=email, password=password) # Generate token and add it to the response object if account is not None: login(request, account) return Response({ 'status': 'Successful', 'message': 'You have successfully been logged into your account.' }, status=status.HTTP_200_OK) return Response({ 'status': 'Unauthorized', 'message': 'Username/password combination invalid.' }, status=status.HTTP_401_UNAUTHORIZED)
def remove_self_from_participant_team(request, participant_team_pk): """ A user can remove himself from the participant team. """ try: participant_team = ParticipantTeam.objects.get(pk=participant_team_pk) except ParticipantTeam.DoesNotExist: response_data = {'error': 'ParticipantTeam does not exist!'} return Response(response_data, status=status.HTTP_406_NOT_ACCEPTABLE) try: participant = Participant.objects.get(user=request.user, team__pk=participant_team_pk) except: response_data = {'error': 'Sorry, you do not belong to this team!'} return Response(response_data, status=status.HTTP_401_UNAUTHORIZED) if get_list_of_challenges_for_participant_team([participant_team]).exists(): response_data = {'error': 'Sorry, you cannot delete this team since it has taken part in challenge(s)!'} return Response(response_data, status=status.HTTP_403_FORBIDDEN) else: participant.delete() return Response(status=status.HTTP_204_NO_CONTENT)
def remove_self_from_challenge_host_team(request, challenge_host_team_pk): """ A user can remove himself from the challenge host team. """ try: ChallengeHostTeam.objects.get(pk=challenge_host_team_pk) except ChallengeHostTeam.DoesNotExist: response_data = {'error': 'ChallengeHostTeam does not exist'} return Response(response_data, status=status.HTTP_406_NOT_ACCEPTABLE) try: challenge_host = ChallengeHost.objects.filter(user=request.user.id, team_name__pk=challenge_host_team_pk) challenge_host.delete() return Response(status=status.HTTP_204_NO_CONTENT) except: response_data = {'error': 'Sorry, you do not belong to this team.'} return Response(response_data, status=status.HTTP_401_UNAUTHORIZED)
def post(self, request): if request.data['user_source']!=request.data['user_dest']: if str(request.user.uuid) != request.data['user_source']: return Response("Source wallet needs to be owned by requester", status=status.HTTP_401_UNAUTHORIZED) if request.data['value'] == "": return Response(_("Value can't be empty"), status=status.HTTP_400_BAD_REQUEST) wallet_src = Wallet.objects.get(owner__uuid=request.data['user_source']) destination_wallet = Wallet.objects.get(owner__uuid=request.data['user_dest']) try: trans = wallet_src.transfer(destination_wallet, Decimal(request.data['value'])) if trans: return Response(_("Transaction created correctly!"), status=status.HTTP_201_CREATED) return Response(_("The transaction was not created correctly!"), status=status.HTTP_400_BAD_REQUEST) except InsufficientBalance: return Response(_("Not enough balance to make that transaction"), status=status.HTTP_400_BAD_REQUEST) return Response(_("Source and destination wallets can not be the same!"), status=status.HTTP_400_BAD_REQUEST)
def post(self, request, format=None): data = request.data username = data.get('username', None) password = data.get('password', None) user = authenticate(username=username, password=password) if user is not None: if user.is_active: login(request, user) serializer = UserSerializer(user, context={'request': request}) return Response(serializer.data) else: return Response({ 'status': 'Unauthorized', 'message': 'This accout is not active.' }, status=status.HTTP_401_UNAUTHORIZED) return Response({ 'status': 'Unauthorized', 'message': 'Username or password is invalid' }, status=status.HTTP_401_UNAUTHORIZED)
def test_list_users(self): """ Test list users Permission : admin only """ # GET /users/ url = reverse('user-list') # 1. No authentication response = self.client.get(url) self.assertIn(response.status_code, [status.HTTP_401_UNAUTHORIZED, status.HTTP_403_FORBIDDEN]) # 2. Authentication with normal user user = User.objects.get(username=NORMAL_USER_USERNAME) self.client.force_authenticate(user=user) response = self.client.get(url) self.assertIn(response.status_code, [status.HTTP_401_UNAUTHORIZED, status.HTTP_403_FORBIDDEN]) self.client.force_authenticate(user=None) # 3. Authentication with admin user = User.objects.get(username=ADMIN_USERNAME) self.client.force_authenticate(user=user) response = self.client.get(url) self.assertEqual(response.status_code, status.HTTP_200_OK) self.client.force_authenticate(user=None)
def test_destroy_user(self): """ Test destroy user Permission : admin only """ # DELETE /users/{pk}/ url = '/users/' + str(User.objects.get(username=SELF_USERNAME).id) + '/' # 1. No authentication response = self.client.delete(url) self.assertIn(response.status_code, [status.HTTP_401_UNAUTHORIZED, status.HTTP_403_FORBIDDEN]) # 2. Authentication with own self user = User.objects.get(username=SELF_USERNAME) self.client.force_authenticate(user=user) response = self.client.delete(url) self.assertIn(response.status_code, [status.HTTP_401_UNAUTHORIZED, status.HTTP_403_FORBIDDEN]) self.client.force_authenticate(user=None) # 3. Authentication with admin user = User.objects.get(username=ADMIN_USERNAME) self.client.force_authenticate(user=user) response = self.client.delete(url) self.assertEqual(response.status_code, status.HTTP_204_NO_CONTENT) self.client.force_authenticate(user=None)
def test_invalid_credentials(self): self.assertEqual(OAuthAccessToken.objects.count(), 0) self.assertEqual(OAuthRefreshToken.objects.count(), 0) response = self.api_client.post( path='/api/v1/tokens/', data={'grant_type': 'client_credentials'}, HTTP_AUTHORIZATION='Basic:{}'.format(base64.encodestring('bogus:bogus')),) self.assertEqual(OAuthAccessToken.objects.count(), 0) self.assertEqual(OAuthRefreshToken.objects.count(), 0) self.assertEqual(response.status_code, status.HTTP_401_UNAUTHORIZED) #self.assertEqual(response.data['error'], u'invalid_client') self.assertEqual(response.data['detail'], u'Invalid client credentials') #self.assertEqual(response.data['detail'], u'Invalid client credentials')
def test_missing_client_credentials(self): self.assertEqual(OAuthAccessToken.objects.count(), 1) self.assertEqual(OAuthRefreshToken.objects.count(), 1) response = self.api_client.post( path='/api/v1/tokens/', data={ 'grant_type': 'refresh_token', 'refresh_token': '6fd8d272-375a-4d8a-8d0f-43367dc8b791', }, ) self.assertEqual(OAuthAccessToken.objects.count(), 1) self.assertEqual(OAuthRefreshToken.objects.count(), 1) self.assertEqual(response.status_code, status.HTTP_401_UNAUTHORIZED) self.assertEqual(response.data['detail'], u'Client credentials were not found in the headers or body')
def test_user_cant_login_with_invalid_username(self): get_user_model().objects.create_user('test', email='test', password='test') client = APIClient() response = client.post(reverse('auth-login'), {'username': 'invalid', 'password': 'test'}) self.assertEqual(response.status_code, status.HTTP_401_UNAUTHORIZED) data = json.loads(response.content.decode('utf-8')) self.assertEqual(data, {u'detail': u'Invalid email or password. Please try logging in again.'})
def test_user_cant_login_with_invalid_password(self): get_user_model().objects.create_user('test', email='test', password='test') client = APIClient() response = client.post(reverse('auth-login'), {'username': 'test', 'password': 'invalid'}) self.assertEqual(response.status_code, status.HTTP_401_UNAUTHORIZED) data = json.loads(response.content.decode('utf-8')) self.assertEqual(data, {u'detail': u'Invalid email or password. Please try logging in again.'})
def test_user_can_access_authenticated_endpoint(self): client = APIClient() response = client.get(reverse('token-authenticated-view')) self.assertEqual(response.status_code, status.HTTP_401_UNAUTHORIZED) user = get_user_model().objects.create_user('test', email='test', password='test') token = AuthToken.objects.create(user=user) client.credentials(HTTP_AUTHORIZATION='DToken ' + token.key) response = client.get(reverse('token-authenticated-view')) self.assertEqual(response.status_code, status.HTTP_200_OK)
def test_authorization_is_enforced(self): """Test that the api has user authorization. """ my_client = APIClient() res = my_client.get( reverse('api.rasterbuckets'), kwargs={'pk': 3}, format="json") self.assertEqual(res.status_code, status.HTTP_401_UNAUTHORIZED)
def test_permission_checks(api_client, operator_api_client, operator, url_kind): url = get_url(url_kind, operator) check_method_status_codes( api_client, [url], ALL_METHODS, HTTP_401_UNAUTHORIZED) check_method_status_codes( operator_api_client, [url], ALL_METHODS, HTTP_403_FORBIDDEN, error_code='permission_denied')
def test_permission_checks(api_client, operator_api_client, parking, url_kind): url = get_url(url_kind, parking) check_method_status_codes( api_client, [url], ALL_METHODS, HTTP_401_UNAUTHORIZED) check_method_status_codes( operator_api_client, [url], ALL_METHODS, HTTP_403_FORBIDDEN, error_code='permission_denied')
def test_logout_with_invalid_token(self): """ Tries to log out with an invalid token """ token = self.login_and_obtain_token("user1", "secret1") # there should be one token self.assertEqual(MultiToken.objects.all().count(), 1) # logout with an invalid token response = self.rest_do_logout(token + "a") self.assertEqual(response.status_code, status.HTTP_401_UNAUTHORIZED) # there should be one token self.assertEqual(MultiToken.objects.all().count(), 1)
def test_logout_without_token(self): """ Try to logout without a token """ self.reset_client_credentials() response = self.rest_do_logout(None) self.assertEqual(response.status_code, status.HTTP_401_UNAUTHORIZED)
def post(self, request, *args, **kwargs): # ToDo: Remove Support For Django 1.8 and 1.9 and use request.user.is_authenticated if user_is_authenticated_helper(request.user): # delete this users auth token auth_header = get_authorization_header(request) token = auth_header.split()[1].decode() tokens = MultiToken.objects.filter(key=token, user=request.user) if len(tokens) == 1: tokens.delete() return Response({'status': 'logged out'}) else: return Response({'error': 'invalid token'}, status=status.HTTP_400_BAD_REQUEST) return Response({'error': 'not logged in'}, status=status.HTTP_401_UNAUTHORIZED)
def post(self, request, *args, **kwargs): serializer = self.serializer_class(data=request.data) serializer.is_valid(raise_exception=True) # fire pre_auth signal pre_auth.send( sender=self.__class__, username=serializer.data['username'], password=serializer.data['password'] ) user = serializer.validated_data['user'] # ToDo: Remove Support For Django 1.8 and 1.9 and use user.is_authenticated if user_is_authenticated_helper(user): update_last_login(None, user) token = MultiToken.objects.create( user=user, user_agent=request.META.get('HTTP_USER_AGENT', ''), last_known_ip=request.META.get('REMOTE_ADDR', '') ) # fire post_auth signal post_auth.send(sender=self.__class__, user=user) return Response({'token': token.key}) # else: return Response({'error': 'not logged in'}, status=status.HTTP_401_UNAUTHORIZED)
def _mailgun_request( # pylint: disable=too-many-arguments cls, request_func, endpoint, params, sender_name=None, raise_for_status=True ): """ Sends a request to the Mailgun API Args: request_func (function): requests library HTTP function (get/post/etc.) endpoint (str): Mailgun endpoint (eg: 'messages', 'events') params (dict): Dict of params to add to the request as 'data' raise_for_status (bool): If true, check the status and raise for non-2xx statuses Returns: requests.Response: HTTP response """ mailgun_url = '{}/{}'.format(settings.MAILGUN_URL, endpoint) email_params = cls.default_params() email_params.update(params) # Update 'from' address if sender_name was specified if sender_name is not None: email_params['from'] = "{sender_name} <{email}>".format( sender_name=sender_name, email=email_params['from'] ) response = request_func( mailgun_url, auth=cls._basic_auth_credentials, data=email_params ) if response.status_code == status.HTTP_401_UNAUTHORIZED: message = "Mailgun API keys not properly configured." log.error(message) raise ImproperlyConfigured(message) if raise_for_status: response.raise_for_status() return response
def test_requires_auth(self): response = self.client.get(self.list_url) self.assertEqual( response.status_code, status.HTTP_401_UNAUTHORIZED, (response.status_code, response.content) ) response = self.client.get(self.detail_url) self.assertEqual( response.status_code, status.HTTP_401_UNAUTHORIZED, (response.status_code, response.content) ) response = self.client.delete(self.detail_url) self.assertEqual( response.status_code, status.HTTP_401_UNAUTHORIZED, (response.status_code, response.content) ) response = self.client.post(self.list_url) self.assertEqual( response.status_code, status.HTTP_401_UNAUTHORIZED, (response.status_code, response.content) )
def test_delegate_jwti_wrong_token(self): data = { 'client_id': 'gandolf', 'grant_type': 'urn:ietf:params:oauth:grant-type:jwt-bearer', 'refresh_token': 'nope', 'api_type': 'app', } response = self.client.post(self.delegate_url, data=data, format='json') self.assertEqual( response.status_code, status.HTTP_401_UNAUTHORIZED, (response.status_code, response.content) )
def test_post_repos_non_authenticated_fails(self): """ Check posting to repos without authentication return 401 """ platform = create_platform() data = make_repo_data_with_all_fields(platform.id) url = reverse('repos') response = self.client.post(url, data, format='json') self.assertEqual(response.status_code, status.HTTP_401_UNAUTHORIZED)
def test_post_apps_non_authenticated_fails(self): """ Check posting apps without authentication return 401 """ platform = create_platform() data = make_app_data_with_all_fields(platform.id) url = reverse('apps') response = self.client.post(url, data, format='json') self.assertEqual(response.status_code, status.HTTP_401_UNAUTHORIZED)
def test_post_articles_non_authenticated_fails(self): """ Check posting articles without authentication return 401 """ platform = create_platform() data = make_article_data_with_all_fields(platform.id) url = reverse('articles') response = self.client.post(url, data, format='json') self.assertEqual(response.status_code, status.HTTP_401_UNAUTHORIZED)
def testExpectUnauthorizedOnGet(self): url = reverse('domain-list') response = self.client.get(url, format='json') self.assertEqual(response.status_code, status.HTTP_401_UNAUTHORIZED)
def testExpectUnauthorizedOnPost(self): url = reverse('domain-list') response = self.client.post(url, format='json') self.assertEqual(response.status_code, status.HTTP_401_UNAUTHORIZED)
def testExpectUnauthorizedOnPut(self): url = reverse('domain-detail', args=(1,)) response = self.client.put(url, format='json') self.assertEqual(response.status_code, status.HTTP_401_UNAUTHORIZED)
def testExpectUnauthorizedOnDelete(self): url = reverse('domain-detail', args=(1,)) response = self.client.delete(url, format='json') self.assertEqual(response.status_code, status.HTTP_401_UNAUTHORIZED)
def testExpectUnauthorizedOnGet(self): url = reverse('rrsets', args=('example.com',)) response = self.client.get(url, format='json') self.assertEqual(response.status_code, status.HTTP_401_UNAUTHORIZED)