我们从Python开源项目中,提取了以下47个代码示例,用于说明如何使用rest_framework.status.HTTP_202_ACCEPTED。
def employee_activate(request, employee_id, action): """ Activate employee account, action could be true or false --- response_serializer: employees.serializers.EmployeeSerializer responseMessages: - code: 401 message: Unauthorized. Authentication credentials were not provided. Invalid token. - code: 403 message: Forbidden. - code: 404 message: Not found """ if request.method == 'PATCH': employee = get_object_or_404(Employee, pk=employee_id) if action == 'true': employee.is_active = True elif action == 'false': employee.is_active = False else: pass employee.save() serializer = EmployeeSerializer(employee) return Response(serializer.data, status=status.HTTP_202_ACCEPTED)
def employee_block(request, employee_id, action): """ Block employee account, action could be true or false --- response_serializer: employees.serializers.EmployeeSerializer responseMessages: - code: 401 message: Unauthorized. Authentication credentials were not provided. Invalid token. - code: 403 message: Forbidden. - code: 404 message: Not found """ if request.method == 'PATCH': employee = get_object_or_404(Employee, pk=employee_id) if action == 'true': employee.is_blocked = True elif action == 'false': employee.is_blocked = False else: pass employee.save() serializer = EmployeeSerializer(employee) return Response(serializer.data, status=status.HTTP_202_ACCEPTED)
def employee_logout(request): """ Logout employee --- responseMessages: - code: 401 message: Unauthorized. Authentication credentials were not provided. Invalid token. - code: 403 message: Forbidden. - code: 404 message: Not found - code: 500 message: Internal Server Error """ if request.method == 'GET': employee = request.user try: devices = EmployeeDevice.objects.filter(username=employee) for device in devices: device.delete() except: pass logout(request) content = {'detail': config.USER_LOGOUT} return Response(content, status=status.HTTP_202_ACCEPTED)
def employee_admin(request, employee_id, action): """ Set or unset admin permission to employee, action could be true or false --- response_serializer: employees.serializers.EmployeeSerializer responseMessages: - code: 401 message: Unauthorized. Authentication credentials were not provided. Invalid token. - code: 403 message: Forbidden. - code: 404 message: Not found """ if request.method == 'PATCH': employee = get_object_or_404(Employee, pk=employee_id) if action == 'true': employee.is_staff = True elif action == 'false': employee.is_staff = False else: pass employee.save() serializer = EmployeeSerializer(employee) return Response(serializer.data, status=status.HTTP_202_ACCEPTED)
def corpus_api(request, name=None): if name is None: if request.method == 'GET': corpora = Corpus.objects.all() data = {} for c in corpora: data[c.name] = c.get_status_display() return JsonResponse(data, status=status.HTTP_200_OK) else: try: corpus = Corpus.objects.get(name=name) except ObjectDoesNotExist: return HttpResponse('Could not find the specified corpus.', status=status.HTTP_404_NOT_FOUND) if request.method == 'DELETE': corpus.delete() return HttpResponse(status=status.HTTP_202_ACCEPTED) elif request.method == 'GET': data = corpus.get_status_display() return JsonResponse({'data': data}, status=status.HTTP_200_OK)
def corpus_enrichment_api(request, name=None): if request.method == 'POST': data = request.data try: corpus = Corpus.objects.get(name=name) except ObjectDoesNotExist: return HttpResponse('Could not find the specified corpus.', status=status.HTTP_404_NOT_FOUND) if corpus.database.status != 'R': return HttpResponse("The corpus's database is not currently running.", status=status.HTTP_400_BAD_REQUEST) if corpus.status == 'NI': return HttpResponse('The corpus has not been imported yet.', status=status.HTTP_400_BAD_REQUEST) if corpus.is_busy: return HttpResponse('The corpus is currently busy, please try once the current process is finished.', status=status.HTTP_409_CONFLICT) corpus.status = 'ER' corpus.save() blocking = data.get('blocking', False) if blocking: enrich_corpus_task(corpus.pk, data) else: t = enrich_corpus_task.delay(corpus.pk, data) return HttpResponse(status=status.HTTP_202_ACCEPTED)
def email_share(self, user, email, type, id): # Get our e-mail formatter formatter = self.get_email_formatter(type) instance = None # Attempt to get the object instance we want to share try: instance = apps.get_model(self.AVAILABLE_TYPES[type]).objects.get(id=id) except ObjectDoesNotExist: # If it doesn't exist, respond with a 404 return Response({"message": f"Object with id {id} does not exist"}, status=status.HTTP_404_NOT_FOUND) # Create our e-mail body email_body = { "to": email, "subject": f"[TalentMAP] Shared {type}", "body": formatter(instance) } # TODO: Implement actual e-mail sending here when avaiable e-mail servers are clarified # Return a 202 ACCEPTED with a copy of the email body return Response({"message": f"Position shared externally via email at {email}", "email_body": email_body}, status=status.HTTP_202_ACCEPTED)
def internal_share(self, user, email, type, id): receiving_user = None # Attempt to get the object instance we want to share try: apps.get_model(self.AVAILABLE_TYPES[type]).objects.get(id=id) except ObjectDoesNotExist: # If it doesn't exist, respond with a 404 return Response({"message": f"Object with id {id} does not exist"}, status=status.HTTP_404_NOT_FOUND) # Attempt to get the receiving user by e-mail address try: receiving_user = UserProfile.objects.get(user__email=email) except ObjectDoesNotExist: return Response({"message": f"User with email {email} does not exist"}, status=status.HTTP_404_NOT_FOUND) # Create our sharable object using the source user, receiving user, id, and model # This will auto-populate in the receiving user's received shares on their profile Sharable.objects.create(sharing_user=user.profile, receiving_user=receiving_user, sharable_id=id, sharable_model=self.AVAILABLE_TYPES[type]) return Response({"message": f"Position shared internally to user with email {email}"}, status=status.HTTP_202_ACCEPTED)
def card_ratings(request, deck_id, card_id): """ Card ratings (state) """ if request.method == 'GET': card = get_object_or_404(Flashcard, pk=card_id, deck__id=deck_id, owner=request.user) serializer = RatingSeriallizer(card) return Response(serializer.data) elif request.method == 'POST': card = get_object_or_404(Flashcard, pk=card_id, deck__id=deck_id, owner=request.user) serializer = RatingSeriallizer(card, data=request.data) if serializer.is_valid(): serializer.save(rating=request.data['rating']) return Response(serializer.data, status=status.HTTP_202_ACCEPTED) return Response(serializer.errors, status=status.HTTP_401_BAD_REQUEST)
def _labels_delete(label, instance): """Delete a label from an instance. :param instance: object to delete label from. :param label: the label to delete. :returns the status and all the tags. """ count = instance.tags.count() instance.tags.remove(label) if isinstance(instance, XForm): xform_tags_delete.send(sender=XForm, xform=instance, tag=label) # Accepted, label does not exist hence nothing removed http_status = status.HTTP_202_ACCEPTED if count == instance.tags.count()\ else status.HTTP_200_OK return [http_status, list(instance.tags.names())]
def post(self, request): data = request.data # Clean emoji for now if isinstance(data, dict) and data.get('Body'): if isinstance(data, QueryDict): data = data.dict() data['Body'] = clean_text(data['Body']) PhoneReceivedRaw.objects.create(data=data) if data.get('Direction') == 'inbound': if data.get('CallStatus') == 'ringing': # incoming voice call text = getattr(settings, 'UNIVERSAL_NOTIFICATIONS_TWILIO_CALL_RESPONSE_DEFAULT', '<?xml version="1.0" encoding="UTF-8"?>' + '<Response>' + '<Say>Hello, thanks for calling. ' + 'To leave a message wait for the tone.</Say>' + '<Record timeout="30" />' '</Response>') return Response(SafeString(text), content_type='text/xml') return Response({}, status=status.HTTP_202_ACCEPTED)
def post(self, request, *args, **kwargs): """ Override to add the product to cart. """ errors = {} cart = Cart.objects.get_or_create_from_request(request) context = self.get_context(request, **kwargs) product = context.pop('product') quantity = int(request.data.get('quantity', 1)) if product.is_group: errors['variant'] = [_("You can't add a group product to the cart.")] else: total_quantity = getattr(product.is_in_cart(cart), 'quantity', 0) + quantity available, diff = product.is_available(total_quantity) if available: item, created = CartItem.objects.get_or_create( cart=cart, product=product, quantity=quantity, product_code=product.product_code) serializer_class = WatchItemSerializer if total_quantity == 0 else CartItemSerializer serializer = serializer_class(item, context=context) return Response(serializer.data, status=status.HTTP_202_ACCEPTED) errors['quantity'] = [_('Product not available for given quantity, there is %d left.') % (quantity + diff)] return Response(errors, status=status.HTTP_400_BAD_REQUEST)
def invite_host_to_team(request, pk): try: challenge_host_team = ChallengeHostTeam.objects.get(pk=pk) except ChallengeHostTeam.DoesNotExist: response_data = {'error': 'ChallengeHostTeam does not exist'} return Response(response_data, status=status.HTTP_406_NOT_ACCEPTABLE) serializer = InviteHostToTeamSerializer(data=request.data, context={'challenge_host_team': challenge_host_team, 'request': request}) if serializer.is_valid(): serializer.save() response_data = { 'message': 'User has been added successfully to the host team'} return Response(response_data, status=status.HTTP_202_ACCEPTED) return Response(serializer.errors, status=status.HTTP_400_BAD_REQUEST)
def post(self, request): """ Subscribes a user to push notifications """ serializer = SubscribeSerializer(data=request.data) if serializer.is_valid() and request.user.is_authenticated: user = request.user user.push_notification_id = serializer.data['registration_id'] try: user.save() return Response(status.HTTP_202_ACCEPTED) except IntegrityError: return Response(serializer.errors, status.HTTP_406_NOT_ACCEPTABLE) else: return Response( serializer.errors, status=status.HTTP_400_BAD_REQUEST )
def post(self, request, youth_visit_id, format=None): try: youth_visit = YouthVisit.objects.get(pk=youth_visit_id) except YouthVisit.DoesNotExist: response = Response(status=status.HTTP_404_NOT_FOUND) response['error'] = 'Youth visit pk=%s does not exist' % youth_visit_id return response note = request.POST.get('note', None) if not note: response = Response(status=status.HTTP_400_BAD_REQUEST) response['error'] = 'Missing POST param "note"' return response youth_visit.notes = note youth_visit.save() return Response({ 'youth_visit_id': youth_visit_id, 'note': note }, status=status.HTTP_202_ACCEPTED)
def employee_image(request, employee_id): """ Returns employee avatar --- response_serializer: employees.serializers.EmployeeAvatarSerializer parameters: - name: avatar required: true type: file responseMessages: - code: 401 message: Unauthorized. Authentication credentials were not provided. Invalid token. - code: 403 message: Forbidden. - code: 404 message: Not found """ if request.method == 'GET': employee = get_object_or_404(Employee, pk=employee_id) serializer = EmployeeAvatarSerializer(employee) return Response(serializer.data, status=status.HTTP_200_OK) if request.method == 'POST': employee = get_object_or_404(Employee, pk=employee_id) upload = request.FILES['image'] employee.avatar.delete() employee.avatar = upload employee.save() serializer = EmployeeSerializer(employee) return Response(serializer.data, status=status.HTTP_202_ACCEPTED)
def employee_register_device(request, employee_id): """ Register employee device --- response_serializer: employees.serializers.EmployeeDeviceSerializer parameters: - name: android_device type: string - name: ios_device type: string responseMessages: - code: 401 message: Unauthorized. Authentication credentials were not provided. Invalid token. - code: 403 message: Forbidden. - code: 404 message: Not found - code: 500 message: Internal Server Error """ if request.method == 'POST': employee = get_object_or_404(Employee, pk=employee_id) device, created = EmployeeDevice.objects.get_or_create(username=employee) if 'android_device' in request.data: device.android_device = request.data['android_device'] if 'ios_device' in request.data: device.ios_device = request.data['ios_device'] device.save() serializer = EmployeeDeviceSerializer(device) return Response(serializer.data, status=status.HTTP_202_ACCEPTED)
def delete(self, request, badge_id, format=None): """ Delete badge (inactive badge, you should edit is_active attribute to revert this change) --- serializer: administrator.serializers.BadgeSerializer """ badge = get_object_or_404(Badge, pk=badge_id) badge.is_active = False badge.save() serializer = BadgeSerializer(badge) return Response(serializer.data, status=status.HTTP_202_ACCEPTED)
def delete(self, request, category_id, format=None): """ Delete category (inactive category, you should edit is_active attribute to revert this change) --- serializer: administrator.serializers.CategorySerializer """ category = get_object_or_404(Category, pk=category_id) category.is_active = False category.save() serializer = CategorySerializer(category) return Response(serializer.data, status=status.HTTP_202_ACCEPTED)
def delete(self, request, keyword_id, format=None): """ Delete keyword (inactive keyword, you should edit is_active attribute to revert this change) --- serializer: administrator.serializers.KeywordSerializer """ keyword = get_object_or_404(Keyword, pk=keyword_id) keyword.is_active = False keyword.save() serializer = KeywordSerializer(keyword) return Response(serializer.data, status=status.HTTP_202_ACCEPTED)
def delete(self, request, location_id, format=None): """ Deactivate location, you should edit is_active attribute to revert this change --- serializer: administrator.serializers.LocationSerializer """ location = get_object_or_404(Location, pk=location_id) location.is_active = False location.save() serializer = LocationSerializer(location) return Response(serializer.data, status=status.HTTP_202_ACCEPTED)
def delete(self, request, position_id, format=None): """ Deactivate position, you should edit is_active attribute to revert this change --- serializer: administrator.serializers.PositionSerializer """ position = get_object_or_404(Position, pk=position_id) position.is_active = False position.save() serializer = PositionSerializer(position) return Response(serializer.data, status=status.HTTP_202_ACCEPTED)
def delete(self, request, role_id, format=None): """ Delete role, you should edit is_active to revert this change. --- serializer: administrator.serializers.RoleSerializer """ role = get_object_or_404(Role, pk=role_id) role.is_active = False role.save() serializer = RoleSerializer(role) return Response(serializer.data, status=status.HTTP_202_ACCEPTED)
def update(self, request, pk=None): """ Update single object via Firebase ID. """ user_profile = UserProfile.objects.get(auth_id=pk) favourites = request.data.get('favourites') user_profile.recipes = Recipe.objects.filter(id__in=favourites) user_profile.save() return Response(user_profile.get_serialized(), status=status.HTTP_202_ACCEPTED)
def start_database(request): if request.method == 'POST': try: shutil.rmtree(os.path.expanduser('~/.neo4j')) except FileNotFoundError: pass try: database = Database.objects.get(name=request.data.get('name')) except ObjectDoesNotExist: return HttpResponse('Could not find the specified database.', status=status.HTTP_404_NOT_FOUND) success = database.start() if success: return HttpResponse(status=status.HTTP_202_ACCEPTED) return HttpResponse(status=status.HTTP_423_LOCKED)
def stop_database(request): if request.method == 'POST': try: database = Database.objects.get(name=request.data.get('name')) except ObjectDoesNotExist: return HttpResponse('Could not find the specified database.', status=status.HTTP_404_NOT_FOUND) try: success = database.stop() if success: return HttpResponse(status=status.HTTP_202_ACCEPTED) except Exception as e: return HttpResponse(content=str(e), status=status.HTTP_423_LOCKED)
def database_api(request, name): try: database = Database.objects.get(name=name) except ObjectDoesNotExist: return HttpResponse(status=status.HTTP_404_NOT_FOUND) if request.method == 'DELETE': database.delete() return HttpResponse(status=status.HTTP_202_ACCEPTED) elif request.method == 'GET': data = database.get_status_display() return JsonResponse({'data': data}, status=status.HTTP_200_OK)
def import_corpus_api(request): if request.method == 'POST': try: corpus = Corpus.objects.get(name=request.data.get('name')) except ObjectDoesNotExist: try: database = Database.objects.get(name=request.data.get('database_name')) except ObjectDoesNotExist: return HttpResponse('Could not find the specified database.', status=status.HTTP_404_NOT_FOUND) if database.status != 'R': return HttpResponse('The specified database is not currently running.', status=status.HTTP_400_BAD_REQUEST) corpus = Corpus(name=request.data['name'], database=database, source_directory=os.path.join(settings.SOURCE_DATA_DIRECTORY, request.data['source_directory']), input_format=request.data['format']) corpus.save() if corpus.status != 'NI': return HttpResponse('The corpus has already been imported.', status=status.HTTP_400_BAD_REQUEST) corpus.status = 'IR' # corpus.current_task_id = t.task_id corpus.save() blocking = request.data.get('blocking', False) if blocking: import_corpus_task(corpus.pk) else: t = import_corpus_task.delay(corpus.pk) return HttpResponse(status=status.HTTP_202_ACCEPTED)
def test_hierarchy(): from polyglotdb.structure import Hierarchy client = APIClient() # Set up response = client.post( reverse('pgdb:start_database_api'), {'name': 'test_database'}, format="json") assert response.status_code == status.HTTP_202_ACCEPTED try: response = client.get( reverse('pgdb:hierarchy_api', args=['test_corpus']), format='json' ) assert response.status_code == status.HTTP_200_OK h = Hierarchy() h.from_json(json.loads(response.content)) assert h['phone'] == 'word' except: raise finally: # Clean up response = client.post( reverse('pgdb:stop_database_api'), {'name': 'test_database'}, format="json") assert response.status_code == status.HTTP_202_ACCEPTED
def test_import(celery_app): client = APIClient() data = {'name': 'test_database'} response = client.post( reverse('pgdb:start_database_api'), data, format="json") print(response.content) assert response.status_code == status.HTTP_202_ACCEPTED data = {'name': 'test', 'database_name': 'test_database', 'source_directory': "acoustic", 'format': 'M', 'blocking': True} response = client.post( reverse('pgdb:import_corpus_api'), data, format="json") print(response.content) assert response.status_code == status.HTTP_202_ACCEPTED # response = client.get( # reverse('pgdb:corpus_status_api',args=['test']), # format="json") # print(reverse('pgdb:corpus_status_api',args=['test'])) # print(response.content) c = Corpus.objects.get(name='test') assert c.status == 'I' data = {'name': 'test_database'} response = client.post( reverse('pgdb:stop_database_api'), data, format="json") assert response.status_code == status.HTTP_202_ACCEPTED
def test_corpus_api(): client = APIClient() data = {'name': 'test_database'} response = client.post( reverse('pgdb:start_database_api'), data, format="json") assert response.status_code == status.HTTP_202_ACCEPTED try: c = Corpus.objects.get(name='test_corpus') assert c.status == 'I' response = client.get( reverse('pgdb:corpus_api', args=['test_corpus']), format="json") assert response.status_code == status.HTTP_200_OK r = json.loads(response.content) assert r == {'data': 'Imported'} response = client.get( reverse('pgdb:corpus_api'), format="json") assert response.status_code == status.HTTP_200_OK r = json.loads(response.content) assert r == {'test_corpus': 'Imported'} except: raise finally: # Clean up response = client.post( reverse('pgdb:stop_database_api'), {'name': 'test_database'}, format="json") assert response.status_code == status.HTTP_202_ACCEPTED
def post(self, request, format=None): """POST handler.""" from_ = str(request.POST['from']) to_ = str(request.POST['to']) body = str(request.POST['body']) network = get_network_from_user(request.user) if network.billing_enabled and network.ledger.balance <= 0: # Shouldn't this be a 402 payment required? -kurtis return Response("operator has no credit in Endaga account", status=status.HTTP_400_BAD_REQUEST) q = models.Number.objects.get(number=from_) if (q.kind not in SendSMS.HANDLERS): #shouldn't this be a 404 not found? -kurtis return Response("Invalid sending number", status=status.HTTP_400_BAD_REQUEST) else: provider = SendSMS.HANDLERS[q.kind][0]( SendSMS.HANDLERS[q.kind][1], #username SendSMS.HANDLERS[q.kind][2], #password SendSMS.HANDLERS[q.kind][3], #inbound_sms SendSMS.HANDLERS[q.kind][4], #outbound_sms SendSMS.HANDLERS[q.kind][5]) #inbound_voice try: provider.send(to_,from_,body) except Exception: message = '%s to: %s, from: %s, body len: %s' % ( provider, to_, from_, len(body)) raise Exception(message) # Bill the operator. cost_to_operator = network.calculate_operator_cost( 'off_network_send', 'sms', destination_number=to_) network.bill_for_sms(cost_to_operator, 'outside_sms') return Response("", status=status.HTTP_202_ACCEPTED)
def delete(self, request): user = request.user user.delete() return Response('User {} was deleted'.format(user), status=HTTP_202_ACCEPTED)
def test_upload_dropbox_videos(logged_in_apiclient, mocker): """ Tests for UploadVideosFromDropbox happy path """ client, user = logged_in_apiclient collection = CollectionFactory(owner=user) url = reverse('upload-videos') mocked_api = mocker.patch('ui.api.process_dropbox_data', return_value={'foo': 'bar'}) input_data = { "collection": collection.hexkey, "files": [ { "isDir": False, "link": "http://foo.bar/hoo.mp4", "thumbnailLink": "http://foo.bar.link/hoo.mp4", "bytes": 80633422, "name": "foo file", "icon": "https://foo.bar/static/images/icons64/page_white_film.png" } ] } response = client.post(url, input_data, format='json') assert response.status_code == status.HTTP_202_ACCEPTED assert response.data == {'foo': 'bar'} serializer = DropboxUploadSerializer(data=input_data) serializer.is_valid() mocked_api.assert_called_once_with(serializer.validated_data)
def test_upload_subtitles(logged_in_apiclient, mocker, enable_video_permissions, settings): """ Tests for UploadVideoSubtitle """ mocker.patch('ui.views.cloudapi.boto3') settings.ENABLE_VIDEO_PERMISSIONS = enable_video_permissions client, user = logged_in_apiclient video = VideoFactory(collection=CollectionFactory(owner=user)) yt_video = YouTubeVideoFactory(video=video) filename = 'subtitles.vtt' youtube_task = mocker.patch('ui.views.upload_youtube_caption.delay') input_data = { "collection": video.collection.hexkey, "video": video.hexkey, "language": "en", "filename": filename, "file": SimpleUploadedFile(filename, bytes(1024)) } response = client.post(reverse('upload-subtitles'), input_data, format='multipart') assert response.status_code == status.HTTP_202_ACCEPTED expected_data = { 'language': 'en', 'filename': filename, 's3_object_key': 'subtitles/{}/subtitle_en.vtt'.format(video.hexkey), 'language_name': 'English' } for key in expected_data: assert expected_data[key] == response.data[key] if enable_video_permissions: assert VideoSubtitle.objects.get(id=response.data['id']).video.youtube_id == yt_video.id youtube_task.assert_called_once_with(response.data['id'])
def test_invite_participant_to_team_with_all_data(self): expected = { 'message': 'User has been successfully added to the team!' } response = self.client.post(self.url, self.data) self.assertEqual(response.data, expected) self.assertEqual(response.status_code, status.HTTP_202_ACCEPTED)
def test_invite_host_to_team_with_all_data(self): expected = { 'message': 'User has been added successfully to the host team' } response = self.client.post(self.url, self.data) self.assertEqual(response.data, expected) self.assertEqual(response.status_code, status.HTTP_202_ACCEPTED)
def invite_participant_to_team(request, pk): try: participant_team = ParticipantTeam.objects.get(pk=pk) except ParticipantTeam.DoesNotExist: response_data = {'error': 'ParticipantTeam does not exist'} return Response(response_data, status=status.HTTP_406_NOT_ACCEPTABLE) email = request.data.get('email') try: user = User.objects.get(email=email) except User.DoesNotExist: response_data = {'error': 'User does not exist with this email address!'} return Response(response_data, status=status.HTTP_406_NOT_ACCEPTABLE) invited_user_participated_challenges = get_list_of_challenges_participated_by_a_user( user).values_list("id", flat=True) team_participated_challenges = get_list_of_challenges_for_participant_team( [participant_team]).values_list("id", flat=True) if set(invited_user_participated_challenges) & set(team_participated_challenges): """ Condition to check if the user has already participated in challenges where the inviting participant has participated. If this is the case, then the user cannot be invited since he cannot participate in a challenge via two teams. """ response_data = {'error': 'Sorry, cannot invite user to the team!'} return Response(response_data, status=status.HTTP_406_NOT_ACCEPTABLE) serializer = InviteParticipantToTeamSerializer(data=request.data, context={'participant_team': participant_team, 'request': request}) if serializer.is_valid(): serializer.save() response_data = { 'message': 'User has been successfully added to the team!'} return Response(response_data, status=status.HTTP_202_ACCEPTED) return Response(serializer.errors, status=status.HTTP_400_BAD_REQUEST)
def put_test(self, data, urlname, urlkwargs=None, **kwargs): response = self.client.put(reverse(urlname, kwargs=urlkwargs), data, **kwargs) self.assertEqual(response.status_code, status.HTTP_202_ACCEPTED) return response
def post(self, request, *args, **kwargs): serializer = AddItemSerializer(data=request.data) if serializer.is_valid(raise_exception=True): # print(serializer.validated_data) if (add_item(serializer.validated_data['item_url'])): ret = {'error_code': 0} return Response(ret, status=status.HTTP_202_ACCEPTED) ret = {'error_code': 1} return Response(ret, status=status.HTTP_400_BAD_REQUEST)
def post(self, request, youth_visit_id, format=None): try: youth_visit = YouthVisit.objects.get(pk=youth_visit_id) except YouthVisit.DoesNotExist: response = Response(status=status.HTTP_404_NOT_FOUND) response['error'] = 'Youth visit pk=%s does not exist' % youth_visit_id return response extension = request.POST.get('extension', None) if not extension: response = Response(status=status.HTTP_400_BAD_REQUEST) response['error'] = 'Missing POST param "extension"' return response try: extension = int(extension) except ValueError: msg = 'Non int POST param passed to add extension endpoint' logger.warn(msg) response = Response(status=status.HTTP_406_NOT_ACCEPTABLE) response['error'] = msg return response youth_visit.current_placement_extension_days += int(extension) youth_visit.save() return Response({}, status=status.HTTP_202_ACCEPTED)
def assertAccepted(self, response): return self._assertStatus(response, status.HTTP_202_ACCEPTED)
def retrieve(self, request, *args, **kwargs): order = self.get_object() serializer = self.get_serializer(order) status_code = status.HTTP_200_OK if order.status in ['queued', 'running']: status_code = status.HTTP_202_ACCEPTED elif order.status == 'completed' and order.action == 'create': status_code = status.HTTP_201_CREATED return response.Response(serializer.data, status=status_code)
def employee_update_password(request, employee_id): """ This endpoint update employee password --- response_serializer: employees.serializers.EmployeeSerializer parameters: - name: current_password required: true paramType: string - name: new_password required: true paramType: string responseMessages: - code: 400 message: Bad request. - code: 401 message: Unauthorized. Authentication credentials were not provided. Invalid token. - code: 403 message: Forbidden. - code: 404 message: Not found """ if request.method == 'POST': try: current_password = request.data['current_password'] new_password = request.data['new_password'] except Exception as e: print(e) raise NotAcceptable(config.USER_DATA_IS_MISSING) employee = get_object_or_404(Employee, pk=employee_id) if current_password == new_password: content = {'detail': config.PASSWORD_EQUAL} return Response(content, status=status.HTTP_400_BAD_REQUEST) elif employee.check_password(current_password): employee.set_password(new_password) employee.reset_password_code = None employee.is_password_reset_required = False employee.save() serializer = EmployeeSerializer(employee) return Response(serializer.data, status=status.HTTP_202_ACCEPTED) else: content = {'detail': config.WRONG_CURRENT_PASSWORD} return Response(content, status=status.HTTP_400_BAD_REQUEST)
def employee_set_list(request, employee_id): """ Endpoint to set list of positions to employee, type can be position or role --- parameters: - name: body required: true paramType: body pytype: employees.serializers.EmployeeSetListSerializer responseMessages: - code: 400 message: Bad request - code: 401 message: Unauthorized. Authentication credentials were not provided. Invalid token. - code: 403 message: Forbidden. - code: 404 message: Not found - code: 406 message: Request not acceptable """ if request.method == 'PATCH': employee = get_object_or_404(Employee, pk=employee_id) serializer = EmployeeSetListSerializer(data=request.data) errors = [] if serializer.is_valid(): list_type = request.data['type'] elements_list = request.data['set_id_list'] if list_type == 'position': employee.position = elements_list employee.save() elif list_type == 'role': employee.role = elements_list employee.save() else: errors.append(config.SET_LIST_TYPE_UNKNOWN) else: errors.append(serializer.errors) if len(errors) == 0: serializer = EmployeeSerializer(employee) return Response(serializer.data, status=status.HTTP_202_ACCEPTED) else: content = {'detail': errors} return Response(content, status=status.HTTP_406_NOT_ACCEPTABLE)
def test_start_stop_database(): client = APIClient() data = {'name': 'test_database'} response = client.post( reverse('pgdb:start_database_api'), data, format="json") print(response.content) assert response.status_code == status.HTTP_202_ACCEPTED db = Database.objects.get(name='test_database') assert db.status == 'R' response = client.post( reverse('pgdb:start_database_api'), data, format="json") print(response.content) assert response.status_code == status.HTTP_423_LOCKED db = Database.objects.get(name='test_database') assert db.status == 'R' response = client.post( reverse('pgdb:stop_database_api'), data, format="json") print(response.content) assert response.status_code == status.HTTP_202_ACCEPTED db = Database.objects.get(name='test_database') assert db.status == 'S' response = client.post( reverse('pgdb:stop_database_api'), data, format="json") assert response.status_code == status.HTTP_423_LOCKED db = Database.objects.get(name='test_database') assert db.status == 'S'