我们从Python开源项目中,提取了以下47个代码示例,用于说明如何使用rest_framework.status.HTTP_201_CREATED。
def keyword_create(request): """ Creates a new keyword --- serializer: categories.serializers.KeywordSerializer responseMessages: - code: 401 message: Unauthorized. Authentication credentials were not provided. Invalid token. - code: 403 message: Forbidden. - code: 404 message: Not found """ if request.method == 'POST': serializer = KeywordSerializer(data=request.data) if serializer.is_valid(): serializer.save() return Response(serializer.data, status=status.HTTP_201_CREATED) return Response(serializer.errors, status=status.HTTP_400_BAD_REQUEST)
def create(self, request, *args, **kwargs): serializer = self.get_serializer(data=request.data) serializer.is_valid(raise_exception=True) user = UserModel.objects.create_user( username=serializer.initial_data['username'], email=serializer.initial_data['email'], first_name=serializer.initial_data['first_name'], last_name=serializer.initial_data['last_name'] ) user.set_password(serializer.initial_data['password']) user.save() EmailAddress.objects.create(user=user, email=user.email, primary=True, verified=False) # send email self.active_email(request, user) headers = self.get_success_headers(serializer.data) data = serializer.data data.update({'id': user.pk}) return Response(data, status=status.HTTP_201_CREATED, headers=headers)
def test_user_create(self): """ Check that a user can be created using the POST method Required to test because the serializer overrides create. NOTE: MUST POST IN JSON TO UPDATE/CREATE PROFILE """ endpoint = reverse("api:user-list") response = self.client.post(endpoint, data=fixtures['api_user'], format='json') self.assertEqual(response.status_code, status.HTTP_201_CREATED) # Check that a profile and user exist in database user = User.objects.filter(username=fixtures['api_user']['username']) profile = Profile.objects.filter(user=user) self.assertEquals(len(user), 1) self.assertEquals(len(profile), 1) self.assertEqual(profile[0].organization, 'SETI')
def group_list(request,format=None): """ List all order, or create a server assets order. """ if not request.user.has_perm('Opsmanage.read_group'): return Response(status=status.HTTP_403_FORBIDDEN) if request.method == 'GET': snippets = Group.objects.all() serializer = GroupSerializer(snippets, many=True) return Response(serializer.data) elif request.method == 'POST': if not request.user.has_perm('Opsmanage.change_group'): return Response(status=status.HTTP_403_FORBIDDEN) serializer = GroupSerializer(data=request.data) if serializer.is_valid(): serializer.save() recordAssets.delay(user=str(request.user),content="??????{group_name}".format(group_name=request.data.get("name")),type="group",id=serializer.data.get('id')) return Response(serializer.data, status=status.HTTP_201_CREATED) return Response(serializer.errors, status=status.HTTP_400_BAD_REQUEST)
def zone_list(request,format=None): """ List all order, or create a server assets order. """ if request.method == 'GET': snippets = Zone_Assets.objects.all() serializer = ZoneSerializer(snippets, many=True) return Response(serializer.data) elif request.method == 'POST': serializer = ZoneSerializer(data=request.data) if serializer.is_valid(): serializer.save() recordAssets.delay(user=str(request.user),content="???????{zone_name}".format(zone_name=request.data.get("zone_name")),type="zone",id=serializer.data.get('id')) return Response(serializer.data, status=status.HTTP_201_CREATED) return Response(serializer.errors, status=status.HTTP_400_BAD_REQUEST)
def asset_server_list(request,format=None): """ List all order, or create a server assets order. """ if request.method == 'GET': snippets = Server_Assets.objects.all() serializer = ServerSerializer(snippets, many=True) return Response(serializer.data) elif request.method == 'POST': if(request.data.get('data')): data = request.data.get('data') else: data = request.data serializer = ServerSerializer(data=data) if serializer.is_valid(): serializer.save() recordAssets.delay(user=str(request.user),content="????????{ip}".format(ip=data.get("ip")),type="server",id=serializer.data.get('id')) return Response(serializer.data, status=status.HTTP_201_CREATED) return Response(serializer.errors, status=status.HTTP_400_BAD_REQUEST)
def asset_net_list(request,format=None): """ List all order, or create a new net assets. """ if request.method == 'GET': snippets = Network_Assets.objects.all() serializer = NetworkSerializer(snippets, many=True) return Response(serializer.data) elif request.method == 'POST': if(request.data.get('data')): data = request.data.get('data') else: data = request.data serializer = NetworkSerializer(data=data) if serializer.is_valid(): serializer.save() recordAssets.delay(user=str(request.user),content="?????????{ip}".format(ip=data.get("ip")),type="net",id=serializer.data.get('id')) return Response(serializer.data, status=status.HTTP_201_CREATED) return Response(serializer.errors, status=status.HTTP_400_BAD_REQUEST)
def deploy_list(request,format=None): """ List all order, or create a server assets order. """ if request.method == 'GET': snippets = Project_Config.objects.all() serializer = ProjectConfigSerializer(snippets, many=True) return Response(serializer.data) # elif request.method == 'POST': # serializer = ProjectConfigSerializer(data=request.data) # # if serializer.is_valid(): # serializer.save() # return Response(serializer.data, status=status.HTTP_201_CREATED) # return Response(serializer.errors, status=status.HTTP_400_BAD_REQUEST)
def post(self, request, *args, **kwargs): if 'voice' in request.FILES: request.data['file'] = request.FILES['voice'] if 'title' in request.data: request.data['title'] = request.data['title'].replace('"', '') if 'duration' in request.data: request.data['duration'] = request.data['duration'].replace('"', '') if 'filename' in request.data: request.data['filename'] = request.data['filename'].replace('"', '') if 'file' in request.data: request.data['is_uploaded'] = True serializer = self.get_serializer(data=request.data) serializer.is_valid(raise_exception=True) self.perform_create(serializer) headers = self.get_success_headers(serializer.data) payload = {"filename": str(request.data.get('file')), "record_id": serializer.data.get('id')} try: requests.post('http://192.168.43.180:8000/api/records/converter', data=payload, timeout=1) except: pass return Response(serializer.data, status=status.HTTP_201_CREATED, headers=headers)
def post(self, request): """ Adding a new List. """ serializer = ListSerializer(data=request.data) if not serializer.is_valid(): return Response(serializer.errors, status= status.HTTP_400_BAD_REQUEST) data = serializer.data author = request.user l = List( author=author, name=data['name'], priority=data['priority'], active=True) l.save() resp = { 'uuid':l.uuid } resp.update(request.data) return Response(resp, status=status.HTTP_201_CREATED)
def reverse(self, *args, **kwargs) -> Response: session = self.request.user.get_current_session() if not session: # noqa raise RuntimeError('This should never happen because the auth layer should handle this.') try: new_id = reverse_transaction(kwargs.get('pk'), session) Transaction.objects.get(pk=new_id).print_receipt(do_open_drawer=False) return Response({ 'success': True, 'id': new_id }, status=status.HTTP_201_CREATED) except FlowError as e: return Response({ 'success': False, 'message': e.message }, status=status.HTTP_400_BAD_REQUEST)
def create(self, request, *args, **kwargs): serializer = self.get_serializer(data=request.data) if not serializer.is_valid(): return Response(serializer.errors, status=status.HTTP_400_BAD_REQUEST) game_round = self.get_round() player = Player.objects.get(game=game_round.game, user=request.user) card = serializer.validated_data.get('card') story = serializer.validated_data.get('story') try: play = Play.play_for_round(game_round, player, card, story) except GameInvalidPlay as exc: return Response({'detail': exc.msg}, status=status.HTTP_403_FORBIDDEN) play_data = PlaySerializer(play).data return Response(play_data, status=status.HTTP_201_CREATED)
def create(self, request, *args, **kwargs): serializer = self.get_serializer(data=request.data) if not serializer.is_valid(): return Response(serializer.errors, status=status.HTTP_400_BAD_REQUEST) game_round = self.get_round() player = Player.objects.get(game=game_round.game, user=request.user) card = serializer.validated_data.get('card') play = Play.objects.get(game_round=game_round, player=player) try: play.vote_card(card) except GameInvalidPlay as exc: return Response({'detail': exc.msg}, status=status.HTTP_403_FORBIDDEN) game_round.refresh_from_db() if game_round.status == RoundStatus.COMPLETE: game = self.get_game() try: game.next_round() except (GameDeckExhausted, GameFinished): # treat deck exhaust as a fair finish pass play_data = PlaySerializer(play).data return Response(play_data, status=status.HTTP_201_CREATED)
def create(self, request, *args, **kwargs): serializer = self.get_serializer(data=request.data) if not serializer.is_valid(): return Response(serializer.errors, status=status.HTTP_400_BAD_REQUEST) try: game = self.get_game() player = game.add_player(request.user, request.data['name']) except IntegrityError as exc: if 'user_id' in str(exc): return Response({"detail": 'You are already playing this game'}, status=status.HTTP_403_FORBIDDEN) return Response({"detail": "Username already in use"}, status=status.HTTP_403_FORBIDDEN) data = PlayerSerializer(player).data return Response(data, status=status.HTTP_201_CREATED)
def test_dictionary_api_should_return_english_word_form_korean_word(self): test_korean_word = "??" test_dictionary_url = reverse( 'api:naver:dict', kwargs={ "find_word": test_korean_word, } ) response = self.client.get( test_dictionary_url, ) self.assertEqual( response.status_code, status.HTTP_201_CREATED, ) self.assertIn( "apple", response.data.get("word_meaning"), )
def test_dictionary_api_should_return_korean_word_form_english_word(self): test_english_word = "apple" test_dictionary_url = reverse( 'api:naver:dict', kwargs={ "find_word": test_english_word, } ) response = self.client.get( test_dictionary_url, ) self.assertEqual( response.status_code, status.HTTP_201_CREATED, ) self.assertIn( "??", response.data.get("word_meaning"), )
def test_dictionary_api_should_return_nothing_when_nonexistent_korean_word(self): test_nonexistent_korean_word = "????" test_dictionary_url = reverse( "api:naver:dict", kwargs={ "find_word": test_nonexistent_korean_word, } ) response = self.client.get( test_dictionary_url, ) self.assertEqual( response.status_code, status.HTTP_201_CREATED, ) self.assertFalse( response.data.get("word_meaning"), )
def test_dictionary_api_should_return_nothing_when_nonexistent_english_word(self): test_nonexistent_english_word = "asfasdfasdf" test_dictionary_url = reverse( "api:naver:dict", kwargs={ "find_word": test_nonexistent_english_word, } ) response = self.client.get( test_dictionary_url, ) self.assertEqual( response.status_code, status.HTTP_201_CREATED, ) self.assertFalse( response.data.get("word_meaning"), )
def get(self, request, *args, **kwargs): naver_dict_url = "http://endic.naver.com/search.nhn?sLn=kr&searchOption=all&query=" find_word = kwargs.get("find_word") response = requests.get(naver_dict_url+find_word) dom = BeautifulSoup(response.content, "html.parser") searched_word_element = dom.select_one(".fnt_e30") or None word_meaning_element = dom.select_one(".fnt_k05") or None searched_word = find_word word_meaning = "" if searched_word_element and searched_word_element.select_one('strong'): searched_word = searched_word_element.select_one('strong').text.strip() word_meaning = word_meaning_element.text return Response( status=status.HTTP_201_CREATED, data={ "searched_word": searched_word, "word_meaning": word_meaning, }, )
def get(self, request, *args, **kwargs): year = kwargs.get("year") month = kwargs.get("month") day = kwargs.get("day") datetime = year+"/"+month+"/"+day diary = request.user.diary_set.get_or_none(datetime=datetime) if diary: # Diary is exist content = diary.content else: # Diary is not exist content = "" return Response( status=status.HTTP_201_CREATED, data={ "content": content, }, )
def delete(self, request, *args, **kwargs): year = kwargs.get("year") month = kwargs.get("month") day = kwargs.get("day") datetime = year + "/" + month + "/" + day diary = request.user.diary_set.get_or_none(datetime=datetime) if diary: # Diary is exist diary.delete() return Response( status=status.HTTP_201_CREATED, data={ }, )
def get(self, request, *args, **kwargs): year = kwargs.get("year") month = kwargs.get("month") monthly_words = request.user.monthly_words( year=year, month=month, ) distinct_monthly_words_count = request.user.distinct_monthly_words_count( year=year, month=month, ) return Response( status=status.HTTP_201_CREATED, data={ "words": monthly_words, "count": distinct_monthly_words_count, }, )
def test_get_changed_email_notification(self): test_user_email_notification_url = reverse( 'api:user:email_notification', ) self.user.email_notification = False self.user.save() response = self.client.get( test_user_email_notification_url, ) self.assertEqual( response.status_code, status.HTTP_201_CREATED, ) self.assertFalse( response.data.get("email_notification"), )
def test_user_set_email_notification_true(self): test_user_email_noficatoin_url = reverse('api:user:email_notification') self.user.email_notification = True self.user.save() test_data = { 'email_notification': 'off', } response = self.client.patch( test_user_email_noficatoin_url, test_data, ) self.assertEqual( response.status_code, status.HTTP_201_CREATED, ) self.assertFalse( get_user_model().objects.last().email_notification, )
def test_user_set_email_notification_false(self): test_user_email_noficatoin_url = reverse('api:user:email_notification') self.user.email_notification = False self.user.save() test_data = { 'email_notification': 'on', } response = self.client.patch( test_user_email_noficatoin_url, test_data, ) self.assertEqual( response.status_code, status.HTTP_201_CREATED, ) self.assertTrue( get_user_model().objects.last().email_notification, )
def test_delete_user(self): test_user_delete_url = reverse('api:user:delete') response = self.client.delete( test_user_delete_url, ) self.assertEqual( response.status_code, status.HTTP_201_CREATED, ) self.assertEqual( get_user_model().objects.count(), 0, )
def test_create_database(): client = APIClient() data = {'name': 'first_database'} response = client.post( reverse('pgdb:create_database_api'), data, format="json") assert response.status_code == status.HTTP_201_CREATED res = Database.objects.all() assert len(res) == 2 db = Database.objects.get(name='first_database') assert db.neo4j_http_port == 7404 assert db.influxdb_http_port == 8404 assert db.influxdb_udp_port == 8406 response = client.delete( reverse('pgdb:database_api', args=[data['name']]), format="json") assert response.status_code == status.HTTP_202_ACCEPTED
def test_activity_create_no_task(auth_client): """Should create a new activity without a task.""" data = { 'data': { 'type': 'activities', 'id': None, 'attributes': { 'date': '2017-01-01', 'comment': 'Test activity' }, 'relationships': { 'task': { 'data': None } } } } url = reverse('activity-list') response = auth_client.post(url, data) assert response.status_code == status.HTTP_201_CREATED json = response.json() assert json['data']['relationships']['task']['data'] is None
def test_attendance_create(auth_client): """Should create a new attendance and automatically set the user.""" user = auth_client.user data = { 'data': { 'type': 'attendances', 'id': None, 'attributes': { 'date': '2017-01-01', 'from-time': '08:00', 'to-time': '10:00' } } } url = reverse('attendance-list') response = auth_client.post(url, data) assert response.status_code == status.HTTP_201_CREATED json = response.json() assert ( json['data']['relationships']['user']['data']['id'] == str(user.id) )
def testCanPostReverseDomains(self): name = '0.8.0.0.0.1.c.a.2.4.6.0.c.e.e.d.4.4.0.1.a.0.1.0.8.f.4.0.1.0.a.2.ip6.arpa' httpretty.enable() httpretty.register_uri(httpretty.POST, settings.NSLORD_PDNS_API + '/zones', status=201) httpretty.register_uri(httpretty.GET, settings.NSLORD_PDNS_API + '/zones/' + name + '.', body='{"rrsets": []}', content_type="application/json") httpretty.register_uri(httpretty.GET, settings.NSLORD_PDNS_API + '/zones/' + name + './cryptokeys', body='[]', content_type="application/json") url = reverse('domain-list') data = {'name': name} response = self.client.post(url, data) self.assertEqual(response.status_code, status.HTTP_201_CREATED) self.assertEqual(len(mail.outbox), 0)
def testCantPostDomainAlreadyTakenInAPI(self): url = reverse('domain-list') data = {'name': utils.generateDomainname()} response = self.client.post(url, data) self.assertEqual(response.status_code, status.HTTP_201_CREATED) response = self.client.post(url, data) self.assertEqual(response.status_code, status.HTTP_409_CONFLICT) data = {'name': 'www.' + self.ownedDomains[0].name} response = self.client.post(url, data) self.assertEqual(response.status_code, status.HTTP_201_CREATED) data = {'name': 'www.' + self.otherDomains[0].name} response = self.client.post(url, data) self.assertEqual(response.status_code, status.HTTP_409_CONFLICT)
def testCanGetOwnRRsetsFromSubname(self): url = reverse('rrsets', args=(self.ownedDomains[1].name,)) data = {'records': ['1.2.3.4'], 'ttl': 120, 'type': 'A'} response = self.client.post(url, json.dumps(data), content_type='application/json') self.assertEqual(response.status_code, status.HTTP_201_CREATED) data = {'records': ['2.2.3.4'], 'ttl': 120, 'type': 'A', 'subname': 'test'} response = self.client.post(url, json.dumps(data), content_type='application/json') self.assertEqual(response.status_code, status.HTTP_201_CREATED) data = {'records': ['"test"'], 'ttl': 120, 'type': 'TXT', 'subname': 'test'} response = self.client.post(url, json.dumps(data), content_type='application/json') self.assertEqual(response.status_code, status.HTTP_201_CREATED) response = self.client.get(url) self.assertEqual(response.status_code, status.HTTP_200_OK) self.assertEqual(len(response.data), 3 + 1) # don't forget NS RRset response = self.client.get(url + '?subname=test') self.assertEqual(response.status_code, status.HTTP_200_OK) self.assertEqual(len(response.data), 2)
def testCanGetOwnRRsetsFromType(self): url = reverse('rrsets', args=(self.ownedDomains[1].name,)) data = {'records': ['1.2.3.4'], 'ttl': 120, 'type': 'A'} response = self.client.post(url, json.dumps(data), content_type='application/json') self.assertEqual(response.status_code, status.HTTP_201_CREATED) data = {'records': ['2.2.3.4'], 'ttl': 120, 'type': 'A', 'subname': 'test'} response = self.client.post(url, json.dumps(data), content_type='application/json') self.assertEqual(response.status_code, status.HTTP_201_CREATED) data = {'records': ['"test"'], 'ttl': 120, 'type': 'TXT', 'subname': 'test'} response = self.client.post(url, json.dumps(data), content_type='application/json') self.assertEqual(response.status_code, status.HTTP_201_CREATED) response = self.client.get(url) self.assertEqual(response.status_code, status.HTTP_200_OK) self.assertEqual(len(response.data), 3 + 1) # don't forget NS RRset response = self.client.get(url + '?type=A') self.assertEqual(response.status_code, status.HTTP_200_OK) self.assertEqual(len(response.data), 2)
def testCanPostOwnRRsets(self): url = reverse('rrsets', args=(self.ownedDomains[1].name,)) data = {'records': ['1.2.3.4'], 'ttl': 60, 'type': 'A'} response = self.client.post(url, json.dumps(data), content_type='application/json') self.assertEqual(response.status_code, status.HTTP_201_CREATED) response = self.client.get(url) self.assertEqual(response.status_code, status.HTTP_200_OK) self.assertEqual(len(response.data), 1 + 1) # don't forget NS RRset url = reverse('rrset', args=(self.ownedDomains[1].name, '', 'A',)) response = self.client.get(url) self.assertEqual(response.status_code, status.HTTP_200_OK) self.assertEqual(response.data['records'][0], '1.2.3.4') url = reverse('rrsets', args=(self.ownedDomains[1].name,)) data = {'records': ['desec.io.'], 'ttl': 900, 'type': 'PTR'} response = self.client.post(url, json.dumps(data), content_type='application/json') self.assertEqual(response.status_code, status.HTTP_201_CREATED)
def testCanGetOwnRRsetWithSubname(self): url = reverse('rrsets', args=(self.ownedDomains[1].name,)) data = {'records': ['1.2.3.4'], 'ttl': 120, 'type': 'A'} response = self.client.post(url, json.dumps(data), content_type='application/json') self.assertEqual(response.status_code, status.HTTP_201_CREATED) data = {'records': ['2.2.3.4'], 'ttl': 120, 'type': 'A', 'subname': 'test'} response = self.client.post(url, json.dumps(data), content_type='application/json') self.assertEqual(response.status_code, status.HTTP_201_CREATED) data = {'records': ['"test"'], 'ttl': 120, 'type': 'TXT', 'subname': 'test'} response = self.client.post(url, json.dumps(data), content_type='application/json') self.assertEqual(response.status_code, status.HTTP_201_CREATED) response = self.client.get(url) self.assertEqual(response.status_code, status.HTTP_200_OK) self.assertEqual(len(response.data), 3 + 1) # don't forget NS RRset url = reverse('rrset', args=(self.ownedDomains[1].name, 'test', 'A',)) response = self.client.get(url) self.assertEqual(response.status_code, status.HTTP_200_OK) self.assertEqual(response.data['records'][0], '2.2.3.4') self.assertEqual(response.data['ttl'], 120) self.assertEqual(response.data['name'], 'test.' + self.ownedDomains[1].name + '.')
def testCanGetOwnRRsetWithWildcard(self): for subname in ('*', '*.foobar'): url = reverse('rrsets', args=(self.ownedDomains[1].name,)) data = {'records': ['"barfoo"'], 'ttl': 120, 'type': 'TXT', 'subname': subname} response = self.client.post(url, json.dumps(data), content_type='application/json') self.assertEqual(response.status_code, status.HTTP_201_CREATED) response1 = self.client.get(url + '?subname=' + subname) self.assertEqual(response1.status_code, status.HTTP_200_OK) self.assertEqual(response1.data[0]['records'][0], '"barfoo"') self.assertEqual(response1.data[0]['ttl'], 120) self.assertEqual(response1.data[0]['name'], subname + '.' + self.ownedDomains[1].name + '.') url = reverse('rrset', args=(self.ownedDomains[1].name, subname, 'TXT',)) response2 = self.client.get(url) self.assertEqual(response2.data, response1.data[0])
def testCantDeleteForeignRRset(self): self.client.credentials(HTTP_AUTHORIZATION='Token ' + self.otherToken) url = reverse('rrsets', args=(self.otherDomains[0].name,)) data = {'records': ['1.2.3.4'], 'ttl': 60, 'type': 'A'} response = self.client.post(url, json.dumps(data), content_type='application/json') self.assertEqual(response.status_code, status.HTTP_201_CREATED) self.client.credentials(HTTP_AUTHORIZATION='Token ' + self.token) url = reverse('rrset', args=(self.otherDomains[0].name, '', 'A',)) # Try PATCH with empty records data = {'records': []} response = self.client.patch(url, json.dumps(data), content_type='application/json') self.assertEqual(response.status_code, status.HTTP_204_NO_CONTENT) # Try DELETE response = self.client.delete(url) self.assertEqual(response.status_code, status.HTTP_204_NO_CONTENT)
def testCantDeleteOwnRRsetWhileAccountIsLocked(self): self.owner.captcha_required = True self.owner.save() url = reverse('rrsets', args=(self.ownedDomains[1].name,)) data = {'records': ['1.2.3.4'], 'ttl': 60, 'type': 'A'} response = self.client.post(url, json.dumps(data), content_type='application/json') self.assertEqual(response.status_code, status.HTTP_201_CREATED) url = reverse('rrset', args=(self.ownedDomains[1].name, '', 'A',)) # Try PATCH with empty records data = {'records': []} response = self.client.patch(url, json.dumps(data), content_type='application/json') self.assertEqual(response.status_code, status.HTTP_403_FORBIDDEN) # Try DELETE response = self.client.delete(url) self.assertEqual(response.status_code, status.HTTP_403_FORBIDDEN)
def setUp(self): if not hasattr(self, 'owner'): self.username = utils.generateRandomString(12) self.password = utils.generateRandomString(12) self.user = utils.createUser(self.username, self.password) self.token = utils.createToken(user=self.user) self.setCredentials(self.username, self.password) self.url = reverse('dyndns12update') self.client.credentials(HTTP_AUTHORIZATION='Token ' + self.token) self.domain = utils.generateDynDomainname() url = reverse('domain-list') data = {'name': self.domain} response = self.client.post(url, data) self.assertEqual(response.status_code, status.HTTP_201_CREATED) httpretty.enable() httpretty.register_uri(httpretty.POST, settings.NSLORD_PDNS_API + '/zones') httpretty.register_uri(httpretty.GET, settings.NSLORD_PDNS_API + '/zones/' + self.domain + '.', body='{"rrsets": []}', content_type="application/json") httpretty.register_uri(httpretty.PATCH, settings.NSLORD_PDNS_API + '/zones/' + self.domain + '.') httpretty.register_uri(httpretty.PUT, settings.NSLORD_PDNS_API + '/zones/' + self.domain + './notify')
def test_multiple_registration_no_captcha_required_different_ip(self): url = reverse('register') data = {'email': utils.generateUsername(), 'password': utils.generateRandomString(size=12)} response = self.client.post(url, data, REMOTE_ADDR="1.3.3.8") self.assertEqual(response.status_code, status.HTTP_201_CREATED) user = models.User.objects.get(email=data['email']) self.assertEqual(user.email, data['email']) self.assertEqual(user.registration_remote_ip, "1.3.3.8") self.assertEqual(user.captcha_required, False) url = reverse('register') data = {'email': utils.generateUsername(), 'password': utils.generateRandomString(size=12)} response = self.client.post(url, data, REMOTE_ADDR="1.3.3.9") self.assertEqual(response.status_code, status.HTTP_201_CREATED) user = models.User.objects.get(email=data['email']) self.assertEqual(user.email, data['email']) self.assertEqual(user.registration_remote_ip, "1.3.3.9") self.assertEqual(user.captcha_required, False)
def test_multiple_registration_no_captcha_required_same_ip_long_time(self): url = reverse('register') data = {'email': utils.generateUsername(), 'password': utils.generateRandomString(size=12)} response = self.client.post(url, data, REMOTE_ADDR="1.3.3.10") self.assertEqual(response.status_code, status.HTTP_201_CREATED) user = models.User.objects.get(email=data['email']) self.assertEqual(user.email, data['email']) self.assertEqual(user.registration_remote_ip, "1.3.3.10") self.assertEqual(user.captcha_required, False) #fake registration time user.created = timezone.now() - timedelta(hours=settings.ABUSE_BY_REMOTE_IP_PERIOD_HRS+1) user.save() url = reverse('register') data = {'email': utils.generateUsername(), 'password': utils.generateRandomString(size=12)} response = self.client.post(url, data, REMOTE_ADDR="1.3.3.10") self.assertEqual(response.status_code, status.HTTP_201_CREATED) user = models.User.objects.get(email=data['email']) self.assertEqual(user.email, data['email']) self.assertEqual(user.registration_remote_ip, "1.3.3.10") self.assertEqual(user.captcha_required, False)
def test_token_email(self): outboxlen = len(mail.outbox) url = reverse('register') data = { 'email': utils.generateRandomString() + '@test-same-email.desec.io', 'password': utils.generateRandomString(size=12), 'dyn': False, } response = self.client.post(url, data, REMOTE_ADDR=utils.generateRandomIPv4Address()) self.assertEqual(response.status_code, status.HTTP_201_CREATED) self.assertEqual(len(mail.outbox), outboxlen + 1) user = models.User.objects.get(email=data['email']) self.assertTrue(user.get_token() in mail.outbox[-1].body)
def test_scenario_post(self): # list view for POST (create new) url = reverse('scenario-list') # foo can create self.client.login(username='foo', password='secret') data = { "name": "New Scenario 1", "scene_set": [], } response = self.client.post(url, data, format='json') self.assertEqual(response.status_code, status.HTTP_201_CREATED) # bar can create self.client.login(username='bar', password='secret') data = { "name": "New Scenario 2", "scene_set": [], } response = self.client.post(url, data, format='json') self.assertEqual(response.status_code, status.HTTP_201_CREATED)
def rollback(self, request, **kwargs): """ Create a new release as a copy of the state of the compiled slug and config vars of a previous release. """ app = self.get_app() try: release = app.release_set.latest() version_to_rollback_to = release.version - 1 if request.data.get('version'): version_to_rollback_to = int(request.data['version']) new_release = release.rollback(request.user, version_to_rollback_to) response = {'version': new_release.version} return Response(response, status=status.HTTP_201_CREATED) except EnvironmentError as e: return Response({'detail': str(e)}, status=status.HTTP_400_BAD_REQUEST) except RuntimeError: new_release.delete() raise
def post(self, request, pk=None): user_voter = str(request.user) information = get_object_or_404(Information, pk=pk) try: rating = Rating.objects.get( user_voter=user_voter) rating.delete() except BaseException: rating = None rating = Rating(vote=request.data['vote'], user_voter=user_voter, information=information) try: rating.save() # logout(request) return Response({'status': 'Vote successfully done.'}, status.HTTP_201_CREATED, template_name="game/index.html") except BaseException: return Response({'status': 'The vote could not be done.'}, status.HTTP_400_BAD_REQUEST)
def decks_list(request): """ List all decks """ if request.method == 'GET': if 'name' in request.GET: decks = Deck.objects.filter(owner=request.user, name=request.GET['name']) else: decks = Deck.objects.filter(owner=request.user) serializer = DeckSerializer(decks, many=True) return Response(serializer.data) elif request.method == 'POST': serializer = DeckSerializer(data=request.data) if serializer.is_valid(): if request.user.is_anonymous: return Response(serializer.errors, status=status.HTTP_401_UNAUTHORIZED) else: serializer.save(owner=request.user) return Response(serializer.data, status=status.HTTP_201_CREATED) return Response(serializer.errors, status=status.HTTP_400_BAD_REQUEST)