我们从Python开源项目中,提取了以下31个代码示例,用于说明如何使用rest_framework.status.HTTP_409_CONFLICT。
def corpus_enrichment_api(request, name=None): if request.method == 'POST': data = request.data try: corpus = Corpus.objects.get(name=name) except ObjectDoesNotExist: return HttpResponse('Could not find the specified corpus.', status=status.HTTP_404_NOT_FOUND) if corpus.database.status != 'R': return HttpResponse("The corpus's database is not currently running.", status=status.HTTP_400_BAD_REQUEST) if corpus.status == 'NI': return HttpResponse('The corpus has not been imported yet.', status=status.HTTP_400_BAD_REQUEST) if corpus.is_busy: return HttpResponse('The corpus is currently busy, please try once the current process is finished.', status=status.HTTP_409_CONFLICT) corpus.status = 'ER' corpus.save() blocking = data.get('blocking', False) if blocking: enrich_corpus_task(corpus.pk, data) else: t = enrich_corpus_task.delay(corpus.pk, data) return HttpResponse(status=status.HTTP_202_ACCEPTED)
def create(request) -> Response: """Create a user :param request: HTTP Request :return: Response """ name = request.data.get('name') mail_address = request.data.get('mail_address') if name is None: return Response(data={'msg': "No name provided"}, status=status.HTTP_400_BAD_REQUEST) user = User(name=name, mail_address=mail_address) try: user.save() except django.db.utils.IntegrityError: return Response(data={'msg': "user {} already exists".format(name)}, status=status.HTTP_409_CONFLICT) return Response(data=user.to_full_dict(), status=status.HTTP_201_CREATED)
def post(self, request, format=None): if not request.user.is_authenticated(): email = request.data['email'] password = request.data['password'] u = User.objects.get(email = email) user = authenticate(username = u.username, password = password) if user is not None: if user.is_active: login(request, user) request.session['member_id'] = email return Response({"info": "sucessfully logged in"}, status=status.HTTP_200_OK) else: return Response({"info": "user dosent exists"}, status=status.HTTP_409_CONFLICT) else: return Response({"info": "user dosent exists"}, status=status.HTTP_409_CONFLICT) else: return Response({"info": "user authenticated"}, status=status.HTTP_200_OK)
def destroy(self, request, pk=None): """ Delete the email address with the provided ID. Args: request: The request being made. pk (int): The primary key of the email address to delete. Returns: A ``204`` status code if the object was successfully deleted. If the email is the user's primary email, a ``409`` status is returned. If there is no email address with the given primary key, a ``404`` status is returned. """ instance = self.get_object() if instance.primary: return Response( {'non_field_errors': self.generic_messages['delete_primary']}, status=status.HTTP_409_CONFLICT) return super().destroy(request, pk=pk)
def test_terminate_while_saving(self): # Create upload upload = UploadFactory( filename='test_data.txt', upload_metadata=json.dumps({'filename': 'test_data.txt'}), upload_length=100, state=states.SAVING) # Prepare headers headers = { 'Tus-Resumable': tus_api_version, } # Perform request result = self.client.delete( reverse('rest_framework_tus:api:upload-detail', kwargs={'guid': upload.guid}), headers=headers) # Check result assert result.status_code == status.HTTP_409_CONFLICT # Verify existence assert get_upload_model().objects.filter(guid=upload.guid).exists()
def test_user_cant_signup_with_same_username(self): get_user_model().objects.create_user('test', email='test', password='test') client = APIClient() response = client.post(reverse('auth-signup'), {'username': 'test', 'password': 'test'}) self.assertEqual(response.status_code, status.HTTP_409_CONFLICT) data = json.loads(response.content.decode('utf-8')) self.assertEqual(data, {u'detail': u'Username already taken'})
def post(self, request, connection_id): ip = self.get_client_ip(request) if ip is None: return Response(status=http_status.HTTP_403_FORBIDDEN) connection = get_object_or_404(Connection, pk=connection_id) vote = Vote.objects.filter(ip=ip, connection=connection).first() if vote is not None: return Response(status=http_status.HTTP_409_CONFLICT) Vote.objects.create(ip=ip, connection=connection) return Response(status=http_status.HTTP_201_CREATED)
def post(self, request, *args, **kwargs): """Create a new channel""" serializer = ChannelSerializer(data=request.data, context={'request': request}) serializer.is_valid(raise_exception=True) try: serializer.save() except ChannelAlreadyExistsException: return Response(status=status.HTTP_409_CONFLICT) return Response( serializer.data, status=status.HTTP_201_CREATED, )
def test_add_channel_channel_already_exists(mock_staff_client, patched_users_api): """Channel already exists with that channel name""" response_409 = Response() response_409.status_code = statuses.HTTP_409_CONFLICT mock_staff_client.channels.create.return_value = response_409 title = "title" name = "name" public_description = "public description" channel_type = "private" input_search = Search.from_dict({"unmodified": "search"}) role = RoleFactory.create() mod = UserFactory.create() with pytest.raises(ChannelAlreadyExistsException): api.add_channel( original_search=input_search, title=title, name=name, public_description=public_description, channel_type=channel_type, program_id=role.program.id, creator_id=mod.id, ) mock_staff_client.channels.create.assert_called_once_with( title=title, name=name, public_description=public_description, channel_type=channel_type, )
def corpus_query_api(request, name=None): if request.method == 'POST': data = request.data try: corpus = Corpus.objects.get(name=name) except ObjectDoesNotExist: return HttpResponse('Could not find the specified corpus.', status=status.HTTP_404_NOT_FOUND) if corpus.database.status != 'R': return HttpResponse("The corpus's database is not currently running.", status=status.HTTP_400_BAD_REQUEST) if corpus.status == 'NI': return HttpResponse('The corpus has not been imported yet.', status=status.HTTP_400_BAD_REQUEST) if corpus.is_busy: return HttpResponse('The corpus is currently busy, please try once the current process is finished.', status=status.HTTP_409_CONFLICT) corpus.status = corpus.QUERY_RUNNING corpus.save() blocking = data.get('blocking', False) if blocking: results = query_corpus_task(corpus.pk, data) results = list(results.to_json()) return JsonResponse(data=results, status=status.HTTP_200_OK, safe=False) else: t = query_corpus_task.delay(corpus.pk, data) corpus.current_task_id = t.task_id return HttpResponse(status=status.HTTP_202_ACCEPTED)
def update(self, request, *args, **kwargs): try: return super().update(request, *args, **kwargs) except django.core.exceptions.ValidationError as e: ex = ValidationError(detail={"detail": str(e)}) ex.status_code = status.HTTP_409_CONFLICT raise ex
def update(self, request, *args, **kwargs): if request.data.get('records') == []: return self.delete(request, *args, **kwargs) try: return super().update(request, *args, **kwargs) except django.core.exceptions.ValidationError as e: ex = ValidationError(detail=e.message_dict) ex.status_code = status.HTTP_409_CONFLICT raise ex
def findDomain(self, request): def findDomainname(request): # 1. hostname parameter if 'hostname' in request.query_params and request.query_params['hostname'] != 'YES': return request.query_params['hostname'] # 2. host_id parameter if 'host_id' in request.query_params: return request.query_params['host_id'] # 3. http basic auth username try: domainname = base64.b64decode(get_authorization_header(request).decode().split(' ')[1].encode()).decode().split(':')[0] if domainname: return domainname except IndexError: pass except UnicodeDecodeError: pass # 4. username parameter if 'username' in request.query_params: return request.query_params['username'] # 5. only domain associated with this user account if len(request.user.domains.all()) == 1: return request.user.domains.all()[0].name if len(request.user.domains.all()) > 1: ex = ValidationError(detail={"detail": "Request does not specify domain unambiguously.", "code": "domain-ambiguous"}) ex.status_code = status.HTTP_409_CONFLICT raise ex return None name = findDomainname(request) try: return self.request.user.domains.get(name=name) except Domain.DoesNotExist: return None
def testCantChangeDomainName(self): url = reverse('domain-detail', args=(self.ownedDomains[1].pk,)) response = self.client.get(url) newname = utils.generateDomainname() response.data['name'] = newname response = self.client.put(url, json.dumps(response.data), content_type='application/json') self.assertEqual(response.status_code, status.HTTP_409_CONFLICT) response = self.client.get(url) self.assertEqual(response.status_code, status.HTTP_200_OK) self.assertEqual(response.data['name'], self.ownedDomains[1].name)
def testCantPostDomainAlreadyTakenInPdns(self): name = utils.generateDomainname() httpretty.enable() httpretty.register_uri(httpretty.POST, settings.NSLORD_PDNS_API + '/zones', body='{"error": "Domain \'' + name + '.\' already exists"}', status=422) url = reverse('domain-list') data = {'name': name} response = self.client.post(url, data) self.assertEqual(response.status_code, status.HTTP_409_CONFLICT)
def testCantPostNonDynDomains(self): url = reverse('domain-list') data = {'name': utils.generateDomainname()} response = self.client.post(url, data) self.assertEqual(response.status_code, status.HTTP_409_CONFLICT) self.assertEqual(response.data['code'], 'domain-illformed') data = {'name': 'very.long.domain.' + utils.generateDynDomainname()} response = self.client.post(url, data) self.assertEqual(response.status_code, status.HTTP_409_CONFLICT) self.assertEqual(response.data['code'], 'domain-illformed')
def testCantChangeEssentialProperties(self): url = reverse('rrsets', args=(self.ownedDomains[1].name,)) data = {'records': ['1.2.3.4'], 'ttl': 60, 'type': 'A', 'subname': 'test1'} response = self.client.post(url, json.dumps(data), content_type='application/json') self.assertEqual(response.status_code, status.HTTP_201_CREATED) # Changing the type is expected to cause an error url = reverse('rrset', args=(self.ownedDomains[1].name, 'test1', 'A',)) data = {'records': ['3.2.3.4'], 'ttl': 120, 'subname': 'test2'} response = self.client.patch(url, json.dumps(data), content_type='application/json') self.assertEqual(response.status_code, status.HTTP_409_CONFLICT) # Changing the subname is expected to cause an error data = {'records': ['3.2.3.4'], 'ttl': 120, 'type': 'TXT'} response = self.client.patch(url, json.dumps(data), content_type='application/json') self.assertEqual(response.status_code, status.HTTP_409_CONFLICT) # Check that nothing changed response = self.client.get(url) self.assertEqual(response.status_code, status.HTTP_200_OK) self.assertEqual(response.data['records'][0], '1.2.3.4') self.assertEqual(response.data['ttl'], 60) self.assertEqual(response.data['name'], 'test1.' + self.ownedDomains[1].name + '.') self.assertEqual(response.data['subname'], 'test1') self.assertEqual(response.data['type'], 'A') # This is expected to work, but the fields are ignored data = {'records': ['3.2.3.4'], 'name': 'example.com.', 'domain': 'example.com'} response = self.client.patch(url, json.dumps(data), content_type='application/json') self.assertEqual(response.status_code, status.HTTP_200_OK) response = self.client.get(url) self.assertEqual(response.status_code, status.HTTP_200_OK) self.assertEqual(response.data['records'][0], '3.2.3.4') self.assertEqual(response.data['domain'], self.ownedDomains[1].name) self.assertEqual(response.data['name'], 'test1.' + self.ownedDomains[1].name + '.')
def testIdentificationByTokenWithEmptyUser(self): self.client.credentials(HTTP_AUTHORIZATION='Basic ' + base64.b64encode((':' + self.password).encode()).decode()) url = reverse('dyndns12update') response = self.client.get(url, REMOTE_ADDR='10.5.5.6') self.assertEqual(response.status_code, status.HTTP_200_OK) self.assertEqual(response.data, 'good') self.assertIP(ipv4='10.5.5.6') # Now make sure we get a conflict when the user has multiple domains. Thus, # we add a second domain for the current user. name = 'second-' + self.domain httpretty.register_uri(httpretty.GET, settings.NSLORD_PDNS_API + '/zones/' + name + '.', body='{"rrsets": []}', content_type="application/json") httpretty.register_uri(httpretty.GET, settings.NSLORD_PDNS_API + '/zones/' + name + './cryptokeys', body='[]', content_type="application/json") self.client.credentials(HTTP_AUTHORIZATION='Token ' + self.token) url = reverse('domain-list') response = self.client.post(url, {'name': name}) self.assertEqual(response.status_code, status.HTTP_201_CREATED) url = reverse('dyndns12update') response = self.client.get(url, REMOTE_ADDR='10.5.5.7') self.assertEqual(response.status_code, status.HTTP_409_CONFLICT)
def post(self, request): if not ( request.POST.get('uuid') and request.POST.get('lat') and request.POST.get('long') and request.POST.get('band') and request.POST.get('channel') and request.POST.get('power_level') ): return Response("Missing Arguments", status=status.HTTP_406_NOT_ACCEPTABLE) if not ( request.POST.get('band') in BTS.bands and request.POST.get('channel').isdigit() and request.POST.get('power_level').isdigit() ): return Response("Invalid Arguments", status=status.HTTP_400_BAD_REQUEST) pnt = GEOSGeometry(Point(float(request.POST.get('long')), float(request.POST.get('lat')))) with transaction.atomic(): tower = BTS.objects.get(uuid=request.POST.get('uuid')) nearby_towers = BTS.objects.filter( location__distance_lt=(pnt,D(km=RANGE))).filter( band=request.POST.get('band')).exclude(uuid=request.POST.get('uuid')) for t in nearby_towers: if (int(request.POST.get('channel')) == t.channel): return Response("Channel In Use", status=status.HTTP_409_CONFLICT) #no one interfered tower.channel = int(request.POST.get('channel')) tower.location = pnt tower.band = request.POST.get('band') tower.save() return Response("Success", status=status.HTTP_200_OK)
def create_account(request, format=None): serializer = AccountSerializer(data=request.data) serializer.is_valid(raise_exception=True) email = serializer.validated_data.get('email') # Make sure no account exists with this email try: account = AccountEmail.objects.get(email=email) return Response({'error': "An account with the same email already exists. Please check your inbox to get your API token."}, status=status.HTTP_409_CONFLICT) except AccountEmail.DoesNotExist: pass # Create a new account account = Account() account.save() # Create a new email account_email = AccountEmail(account=account) account_email.email = email account_email.save() # Create a new API Key api_key = ApiKey(account=account) api_key.save() is_testing = 'test' in sys.argv if not is_testing: send_welcome_email(account, account_email, api_key) return Response({}, status=status.HTTP_201_CREATED)
def get(self, request, format=None): if request.user.is_authenticated(): return Response({"info": "user authenticated"}, status=status.HTTP_200_OK) else: return Response({"info": " user is not authenticated"}, status=status.HTTP_409_CONFLICT)
def post(self, request, format=None): if not request.user.is_authenticated(): email = request.data['email'] isactive = 1 user = User.objects.create_user(username = email, email = email, is_active = isactive, password = 'qwerty') user.save() return Response({"info": "Sucessfully Registered"}, status=status.HTTP_201_CREATED) else: return Response({"info": "user authenticated"}, status=status.HTTP_409_CONFLICT)
def post(self, request, format=None): serializer = self.serializer_class(data=request.data) if serializer.is_valid(): serializer.save() return Response(serializer.data, status=status.HTTP_201_CREATED) else: return Response(serializer.errors, status=status.HTTP_409_CONFLICT)
def test_delete_primary_email(api_rf, email_factory): """ Sending a DELETE request to the view with the ID of a primary email address should fail to delete the email. """ email = email_factory(primary=True) user = email.user api_rf.user = user request = api_rf.delete(email.get_absolute_url()) response = email_detail_view(request, pk=email.pk) assert response.status_code == status.HTTP_409_CONFLICT assert user.email_addresses.get() == email
def create(self, request, journal_uid=None): queryset = self.get_queryset(use_last=False) last_in_db = queryset.last() last = request.query_params.get('last', None) last_entry = None if last is not None: last_entry = get_object_or_404(queryset, uid=last) if last_entry != last_in_db: return Response({}, status=status.HTTP_409_CONFLICT) journal_object = self.get_journal_queryset(Journal.objects).get(uid=journal_uid) many = isinstance(request.data, list) serializer = self.serializer_class(data=request.data, many=many) if serializer.is_valid(): try: with transaction.atomic(): serializer.save(journal=journal_object) except IntegrityError: content = {'code': 'integrity_error'} return Response(content, status=status.HTTP_400_BAD_REQUEST) return Response({}, status=status.HTTP_201_CREATED) return Response(serializer.errors, status=status.HTTP_400_BAD_REQUEST)
def test_errors_basic(self): """Test basic validation errors""" # Not saved on purpose entry = models.Entry(content=b'test') self.client.force_authenticate(user=self.user1) # Put bad/empty uid response = self.client.post(reverse(self.LIST, kwargs={'journal_uid': self.journal.uid}), self.serializer(entry).data) self.assertEqual(response.status_code, status.HTTP_400_BAD_REQUEST) entry.uid = "12" response = self.client.post(reverse(self.LIST, kwargs={'journal_uid': self.journal.uid}), self.serializer(entry).data) self.assertEqual(response.status_code, status.HTTP_400_BAD_REQUEST) # Put none/empty content entry.content = b'' response = self.client.post(reverse(self.LIST, kwargs={'journal_uid': self.journal.uid}), self.serializer(entry).data) # FIXME self.assertEqual(response.status_code, status.HTTP_400_BAD_REQUEST) entry.content = None response = self.client.post(reverse(self.LIST, kwargs={'journal_uid': self.journal.uid}), self.serializer(entry).data) # FIXME self.assertEqual(response.status_code, status.HTTP_400_BAD_REQUEST) # Put existing uid entry.uid = self.get_random_hash() entry.content = b'test' response = self.client.post(reverse(self.LIST, kwargs={'journal_uid': self.journal.uid}), self.serializer(entry).data) self.assertEqual(response.status_code, status.HTTP_201_CREATED) response = self.client.post(reverse(self.LIST, kwargs={'journal_uid': self.journal.uid}) + '?last={}'.format(models.Entry.objects.last().uid), self.serializer(entry).data) self.assertEqual(response.status_code, status.HTTP_400_BAD_REQUEST) # Add multiple with one existing. No update to nothing. multi = [models.Entry(uid=self.get_random_hash(), content=b'test'), entry, models.Entry(uid=self.get_random_hash(), content=b'test')] response = self.client.post(reverse(self.LIST, kwargs={'journal_uid': self.journal.uid}), json.dumps(self.serializer(multi, many=True).data), content_type='application/json') self.assertEqual(response.status_code, status.HTTP_409_CONFLICT) ## Verify we got as many we expected (none) response = self.client.get(reverse(self.LIST, kwargs={'journal_uid': self.journal.uid})) self.assertEqual(response.status_code, status.HTTP_200_OK) self.assertEqual(len(response.data), 1)
def destroy(self, request, *args, **kwargs): # Retrieve object upload = self.get_object() # When the upload is still saving, we're not able to destroy the entity if upload.state == states.SAVING: return Response(_('Unable to terminate upload while in state "{}".'.format(upload.state)), status=status.HTTP_409_CONFLICT) # Destroy object self.perform_destroy(upload) return Response(status=status.HTTP_204_NO_CONTENT)
def perform_create(self, serializer): pattern = patternDyn if self.request.user.dyn else patternNonDyn if pattern.match(serializer.validated_data['name']) is None: ex = ValidationError(detail={"detail": "This domain name is not well-formed, by policy.", "code": "domain-illformed"}) ex.status_code = status.HTTP_409_CONFLICT raise ex # Generate a list containing this and all higher-level domain names domain_name = serializer.validated_data['name'] domain_parts = domain_name.split('.') domain_list = {'.'.join(domain_parts[i:]) for i in range(1, len(domain_parts))} # Remove public suffixes and then use this list to control registration public_suffixes = {'dedyn.io'} domain_list = domain_list - public_suffixes queryset = Domain.objects.filter(Q(name=domain_name) | (Q(name__in=domain_list) & ~Q(owner=self.request.user))) if queryset.exists(): ex = ValidationError(detail={"detail": "This domain name is unavailable.", "code": "domain-unavailable"}) ex.status_code = status.HTTP_409_CONFLICT raise ex if self.request.user.limit_domains is not None and self.request.user.domains.count() >= self.request.user.limit_domains: ex = ValidationError(detail={"detail": "You reached the maximum number of domains allowed for your account.", "code": "domain-limit"}) ex.status_code = status.HTTP_403_FORBIDDEN raise ex try: obj = serializer.save(owner=self.request.user) except Exception as e: if str(e).endswith(' already exists'): ex = ValidationError(detail={"detail": "This domain name is unavailable.", "code": "domain-unavailable"}) ex.status_code = status.HTTP_409_CONFLICT raise ex else: raise e def sendDynDnsEmail(domain): content_tmpl = get_template('emails/domain-dyndns/content.txt') subject_tmpl = get_template('emails/domain-dyndns/subject.txt') from_tmpl = get_template('emails/from.txt') context = { 'domain': domain.name, 'url': 'https://update.dedyn.io/', 'username': domain.name, 'password': self.request.auth.key } email = EmailMessage(subject_tmpl.render(context), content_tmpl.render(context), from_tmpl.render(context), [self.request.user.email]) email.send() if obj.name.endswith('.dedyn.io'): sendDynDnsEmail(obj)
def post(self, request, format=None): """ Request a number and associate with a subscriber. """ if not ("bts_uuid" in request.POST or "imsi" in request.POST): return Response("", status=status.HTTP_400_BAD_REQUEST) bts_uuid = str(request.POST['bts_uuid']) imsi = str(request.POST['imsi']) if not re.match('^IMSI\d{14,15}$', imsi): return Response("Invalid IMSI", status=status.HTTP_400_BAD_REQUEST) network = get_network_from_user(request.user) try: bts = models.BTS.objects.get(uuid=bts_uuid, network=network) except models.BTS.DoesNotExist: return Response("User is not associated with that BTS.", status=status.HTTP_403_FORBIDDEN) # If the IMSI is already in use, and associated with another network, # prevent the registration of a new number. If it's associated with # this network, simply return the currently-associated number. N.B., # this effectively enforces a 1-1 mapping of subscriber to number # currently. try: subscriber = models.Subscriber.objects.get(imsi=imsi) if subscriber.network != network: return Response("IMSI already registered to another network", status=status.HTTP_409_CONFLICT) except models.Subscriber.DoesNotExist: # Create a new subscriber if one matching this IMSI didn't already # exist. subscriber = models.Subscriber(network=network, imsi=imsi, balance=0, bts=bts) subscriber.save() # If the subscriber already exists, we should return the associated # phone number and update the BTS to match what is being used. n = models.Number.objects.filter(subscriber=subscriber).first() if not n: # Otherwise, pick a random available number and associate it. with transaction.atomic(): n = models.Number.objects.filter(state="available", country_id=network.number_country).first() if not n: return Response("No number available", status=status.HTTP_404_NOT_FOUND) n.state = "inuse" n.network = network n.subscriber = subscriber n.save() n.charge() return Response({'number': n.number, 'subscriber': subscriber.imsi, 'balance': subscriber.balance}, status=status.HTTP_200_OK)
def get(self, request, bts_uuid=None, number=None, format=None): """ Associate a number to a BTS. DEPRECATED (shasan 2016jan5) -- use the POST endpoint instead """ if not (number or bts_uuid or "imsi" in request.GET): return Response("", status=status.HTTP_400_BAD_REQUEST) network = get_network_from_user(request.user) try: bts = models.BTS.objects.get(uuid=bts_uuid, network=network) except models.BTS.DoesNotExist: return Response("User is not associated with that BTS.", status=status.HTTP_403_FORBIDDEN) # If the IMSI is already in use, and associated with another BTS, # prevent the registration of a new number. However, we allow IMSIs # to register a second number on the IMSI's original BTS. imsi = request.GET['imsi'] try: subscriber = models.Subscriber.objects.get(imsi=imsi) if subscriber.network != network: return Response("IMSI already registered", status=status.HTTP_409_CONFLICT) except models.Subscriber.DoesNotExist: # Create a new subscriber if one matching this IMSI didn't already # exist. subscriber = models.Subscriber(network=network, imsi=imsi, balance=0, bts=bts) subscriber.save() with transaction.atomic(): q = models.Number.objects.filter(number__exact="%s" % number) if len(q) > 0: n = q[0] # This is tricky. Numbers that get marked 'pending' will have # the network id already set, so this check fails and we set # the number as in-use. This is an artifact of the two-step # number registration process. So don't remove the network ID # check! if n.state != "available" and n.network != bts.network: return Response("Number already in use.", status=status.HTTP_400_BAD_REQUEST) n.network = bts.network n.state = "inuse" else: # FIXME this should never happen -- all numbers should already # be in the system, unless we're associating an old BTS for the # first time (like w/ Bok) n = models.Number(number=number, state="inuse", network=bts.network) # Associate it with the subscriber and save. n.subscriber = subscriber n.save() return Response(None, status=status.HTTP_200_OK)
def test_fetch_with_last(self): """Test using the 'last' query param""" # Not saved on purpose entry = models.Entry(journal=self.journal, uid=self.get_random_hash(), content=b'1') entry.save() entry2 = models.Entry(journal=self.journal, uid=self.get_random_hash(), content=b'2') entry2.save() entry = models.Entry(journal=self.journal, uid=self.get_random_hash(), content=b'3') entry.save() self.client.force_authenticate(user=self.user1) # List response = self.client.get(reverse(self.LIST, kwargs={'journal_uid': self.journal.uid})) self.assertEqual(response.status_code, status.HTTP_200_OK) self.assertEqual(len(response.data), 3) response = self.client.get(reverse(self.LIST, kwargs={'journal_uid': self.journal.uid}) + '?last={}'.format(response.data[0]['uid'])) self.assertEqual(response.status_code, status.HTTP_200_OK) self.assertEqual(len(response.data), 2) response = self.client.get(reverse(self.LIST, kwargs={'journal_uid': self.journal.uid}) + '?last={}'.format(response.data[0]['uid'])) self.assertEqual(response.status_code, status.HTTP_200_OK) self.assertEqual(len(response.data), 1) ## Also verify it's really the one we expect to be last self.assertEqual(response.data[0]['uid'], entry.uid) response = self.client.get(reverse(self.LIST, kwargs={'journal_uid': self.journal.uid}) + '?last={}'.format(response.data[0]['uid'])) self.assertEqual(response.status_code, status.HTTP_200_OK) self.assertEqual(len(response.data), 0) # Non-existent last response = self.client.get(reverse(self.LIST, kwargs={'journal_uid': self.journal.uid}) + '?last={}'.format(self.get_random_hash())) self.assertEqual(response.status_code, status.HTTP_404_NOT_FOUND) # Add ## With correct last entry = models.Entry(journal=self.journal, uid=self.get_random_hash(), content=b'3') response = self.client.post(reverse(self.LIST, kwargs={'journal_uid': self.journal.uid}) + '?last={}'.format(models.Entry.objects.last().uid), self.serializer(entry).data) self.assertEqual(response.status_code, status.HTTP_201_CREATED) ## With incorrect last entry = models.Entry(journal=self.journal, uid=self.get_random_hash(), content=b'3') response = self.client.post(reverse(self.LIST, kwargs={'journal_uid': self.journal.uid}) + '?last={}'.format(entry2.uid), self.serializer(entry).data) self.assertEqual(response.status_code, status.HTTP_409_CONFLICT) ## With non-existing last response = self.client.post(reverse(self.LIST, kwargs={'journal_uid': self.journal.uid}) + '?last={}'.format(self.get_random_hash()), self.serializer(entry).data) self.assertEqual(response.status_code, status.HTTP_404_NOT_FOUND) ## Missing a last response = self.client.post(reverse(self.LIST, kwargs={'journal_uid': self.journal.uid}), self.serializer(entry).data) self.assertEqual(response.status_code, status.HTTP_409_CONFLICT)