我们从Python开源项目中,提取了以下48个代码示例,用于说明如何使用rest_framework.status.HTTP_400_BAD_REQUEST。
def get(self, request, format=None): """ List all badges --- serializer: administrator.serializers.BadgeSerializer parameters: - name: pagination required: false type: string paramType: query """ badges = get_list_or_404(Badge) if request.GET.get('pagination'): pagination = request.GET.get('pagination') if pagination == 'true': paginator = AdministratorPagination() results = paginator.paginate_queryset(badges, request) serializer = BadgeSerializer(results, many=True) return paginator.get_paginated_response(serializer.data) else: return Response(status=status.HTTP_400_BAD_REQUEST) else: serializer = BadgeSerializer(badges, many=True) return Response(serializer.data, status=status.HTTP_200_OK)
def get(self, request, format=None): """ List all employee positions --- serializer: administrator.serializers.PositionSerializer parameters: - name: pagination required: false type: string paramType: query """ positions = get_list_or_404(Position) if request.GET.get('pagination'): pagination = request.GET.get('pagination') if pagination == 'true': paginator = AdministratorPagination() results = paginator.paginate_queryset(positions, request) serializer = PositionSerializer(results, many=True) return paginator.get_paginated_response(serializer.data) else: return Response(status=status.HTTP_400_BAD_REQUEST) else: serializer = PositionSerializer(positions, many=True) return Response(serializer.data, status=status.HTTP_200_OK)
def post(self, request): # noqa username = request.data.get('username') password = request.data.get('password') device_id = request.data.get('device_id') or '' if not username or not password: return Response( {'error': 'Missing username or password'}, status=status.HTTP_400_BAD_REQUEST ) user = authenticate( username=username.lower(), password=password ) if not user: raise InvalidEmailOrPasswordAPIException() auth_token, _ = AuthToken.objects.get_or_create( user=user, device_id=device_id ) return Response({'token': auth_token.key})
def put(self, request, id, format=None): """Summary Args: request (TYPE): Description id (TYPE): Description format (None, optional): Description Returns: TYPE: Description """ process = self.get_object(id) serializer = serializers.ProcessSerializer(process, data=request.data) if serializer.is_valid(): serializer.save() return Response(serializer.data) return Response(serializer.errors, status=status.HTTP_400_BAD_REQUEST)
def keyword_create(request): """ Creates a new keyword --- serializer: categories.serializers.KeywordSerializer responseMessages: - code: 401 message: Unauthorized. Authentication credentials were not provided. Invalid token. - code: 403 message: Forbidden. - code: 404 message: Not found """ if request.method == 'POST': serializer = KeywordSerializer(data=request.data) if serializer.is_valid(): serializer.save() return Response(serializer.data, status=status.HTTP_201_CREATED) return Response(serializer.errors, status=status.HTTP_400_BAD_REQUEST)
def get(self, request, format=None): """ List all employees --- serializer: administrator.serializers.EmployeeSerializer parameters: - name: pagination required: false type: string paramType: query """ employees = get_list_or_404(Employee) if request.GET.get('pagination'): pagination = request.GET.get('pagination') if pagination == 'true': paginator = AdministratorPagination() results = paginator.paginate_queryset(employees, request) serializer = EmployeeSerializer(results, many=True) return paginator.get_paginated_response(serializer.data) else: return Response(status=status.HTTP_400_BAD_REQUEST) else: serializer = EmployeeSerializer(employees, many=True) return Response(serializer.data, status=status.HTTP_200_OK)
def get(self, request, format=None): """ List all events --- serializer: administrator.serializers.EventSerializer parameters: - name: pagination required: false type: string paramType: query """ events = get_list_or_404(Event) if request.GET.get('pagination'): pagination = request.GET.get('pagination') if pagination == 'true': paginator = AdministratorPagination() results = paginator.paginate_queryset(events, request) serializer = EventSerializer(results, many=True) return paginator.get_paginated_response(serializer.data) else: return Response(status=status.HTTP_400_BAD_REQUEST) else: serializer = EventSerializer(events, many=True) return Response(serializer.data, status=status.HTTP_200_OK)
def get(self, request, event_id, format=None): """ List all activities for an event --- serializer: administrator.serializers.EventActivitySerializer parameters: - name: pagination required: false type: string paramType: query """ event = get_object_or_404(Event, pk=event_id) activities = EventActivity.objects.filter(event=event) if request.GET.get('pagination'): pagination = request.GET.get('pagination') if pagination == 'true': paginator = AdministratorPagination() results = paginator.paginate_queryset(activities, request) serializer = EventActivitySerializer(results, many=True) return paginator.get_paginated_response(serializer.data) else: return Response(status=status.HTTP_400_BAD_REQUEST) else: serializer = EventActivitySerializer(activities, many=True) return Response(serializer.data, status=status.HTTP_200_OK)
def get(self, request, format=None): """ List all messages --- serializer: administrator.serializers.MessageSerializer parameters: - name: pagination required: false type: string paramType: query """ messages = get_list_or_404(Message) if request.GET.get('pagination'): pagination = request.GET.get('pagination') if pagination == 'true': paginator = AdministratorPagination() results = paginator.paginate_queryset(messages, request) serializer = MessageSerializer(results, many=True) return paginator.get_paginated_response(serializer.data) else: return Response(status=status.HTTP_400_BAD_REQUEST) else: serializer = MessageSerializer(messages, many=True) return Response(serializer.data, status=status.HTTP_200_OK)
def get(self, request, employee_id, format=None): """ List all messages from employee --- serializer: administrator.serializers.MessageSerializer parameters: - name: pagination required: false type: string paramType: query """ employee = get_object_or_404(Employee, pk=employee_id) messages = get_list_or_404(Message, from_user=employee) if request.GET.get('pagination'): pagination = request.GET.get('pagination') if pagination == 'true': paginator = AdministratorPagination() results = paginator.paginate_queryset(messages, request) serializer = MessageSerializer(results, many=True) return paginator.get_paginated_response(serializer.data) else: return Response(status=status.HTTP_400_BAD_REQUEST) else: serializer = MessageSerializer(messages, many=True) return Response(serializer.data, status=status.HTTP_200_OK)
def get(self, request, format=None): """ List all roles --- serializer: administrator.serializers.RoleSerializer parameters: - name: pagination required: false type: string paramType: query """ roles = get_list_or_404(Role) if request.GET.get('pagination'): pagination = request.GET.get('pagination') if pagination == 'true': paginator = AdministratorPagination() results = paginator.paginate_queryset(roles, request) serializer = RoleSerializer(results, many=True) return paginator.get_paginated_response(serializer.data) else: return Response(status=status.HTTP_400_BAD_REQUEST) else: serializer = RoleSerializer(roles, many=True) return Response(serializer.data, status=status.HTTP_200_OK)
def retrieve(self, request, parent_lookup_torrent=None, *args, **kwargs): """ Retrieve a file of the user. :return: Response """ if parent_lookup_torrent: # Retrieve a file of the torrent. torrent = get_object_or_404(Torrent, pk=parent_lookup_torrent) if not torrent.finished: return Response({ 'detail': "The torrent hasn't finished downloading yet.", 'progress': torrent.progress }, status=status.HTTP_400_BAD_REQUEST) file_obj = get_object_or_404(File, torrent__pk=parent_lookup_torrent, pk=kwargs.get('pk')) serializer = self.serializer_class(file_obj) return Response(serializer.data) return super(FileViewSet, self).retrieve(request, *args, **kwargs)
def vote(self, request, pk=None): """ Note that the upvotes and downvotes keys are required by the front-end """ answer = self.get_object() serializer = VotingSerializer(data=request.data, context={'request': request}) if serializer.is_valid(): kwargs = { 'content': answer, 'user': request.user, 'vote': serializer.validated_data['vote'], } _, created = Vote.objects.punch_ballot(**kwargs) response = serializer.data response.update({'status': 'vote recorded', 'created': created, 'upvotes': answer.votes.upvotes().count(), 'downvotes': answer.votes.downvotes().count()}) return Response(response) else: return Response(serializer.errors, status=status.HTTP_400_BAD_REQUEST)
def get_process_single(self, id): ##### # Retrieves a single object ##### try: clean_id = int(id) except: return status.HTTP_400_BAD_REQUEST try: object = self.Meta.Model.objects.get(id=clean_id) except ObjectDoesNotExist: return status.HTTP_404_NOT_FOUND self.setResult(object.get_as_dict()) return status.HTTP_200_OK
def put_process_single(self, id): ##### # Updates an Object ##### try: clean_id = int(id) except: return status.HTTP_400_BAD_REQUEST try: object = self.Meta.Model.objects.get(id=clean_id) except: return status.HTTP_404_NOT_FOUND serializer = self.Meta.Serializer(object, data=self.getInput(), partial=True) if serializer.is_valid(): serializer.save() self.setResult(serializer.data) return status.HTTP_202_ACCEPTED
def delete_process_single(self, id): ##### # Deletes a single product ##### try: clean_id = int(id) except: return status.HTTP_400_BAD_REQUEST try: object = self.Meta.Model.objects.get(id=clean_id) except ObjectDoesNotExist: return status.HTTP_404_NOT_FOUND object.delete() return status.HTTP_200_OK
def user_detail(request, id,format=None): """ Retrieve, update or delete a server assets instance. """ try: snippet = User.objects.get(id=id) except User.DoesNotExist: return Response(status=status.HTTP_404_NOT_FOUND) if request.method == 'GET': serializer = UserSerializer(snippet) return Response(serializer.data) elif request.method == 'PUT': serializer = UserSerializer(snippet, data=request.data) if serializer.is_valid(): serializer.save() return Response(serializer.data) return Response(serializer.errors, status=status.HTTP_400_BAD_REQUEST) elif request.method == 'DELETE': if not request.user.has_perm('OpsManage.delete_user'): return Response(status=status.HTTP_403_FORBIDDEN) snippet.delete() return Response(status=status.HTTP_204_NO_CONTENT)
def group_list(request,format=None): """ List all order, or create a server assets order. """ if not request.user.has_perm('Opsmanage.read_group'): return Response(status=status.HTTP_403_FORBIDDEN) if request.method == 'GET': snippets = Group.objects.all() serializer = GroupSerializer(snippets, many=True) return Response(serializer.data) elif request.method == 'POST': if not request.user.has_perm('Opsmanage.change_group'): return Response(status=status.HTTP_403_FORBIDDEN) serializer = GroupSerializer(data=request.data) if serializer.is_valid(): serializer.save() recordAssets.delay(user=str(request.user),content="??????{group_name}".format(group_name=request.data.get("name")),type="group",id=serializer.data.get('id')) return Response(serializer.data, status=status.HTTP_201_CREATED) return Response(serializer.errors, status=status.HTTP_400_BAD_REQUEST)
def zone_list(request,format=None): """ List all order, or create a server assets order. """ if request.method == 'GET': snippets = Zone_Assets.objects.all() serializer = ZoneSerializer(snippets, many=True) return Response(serializer.data) elif request.method == 'POST': serializer = ZoneSerializer(data=request.data) if serializer.is_valid(): serializer.save() recordAssets.delay(user=str(request.user),content="???????{zone_name}".format(zone_name=request.data.get("zone_name")),type="zone",id=serializer.data.get('id')) return Response(serializer.data, status=status.HTTP_201_CREATED) return Response(serializer.errors, status=status.HTTP_400_BAD_REQUEST)
def asset_server_list(request,format=None): """ List all order, or create a server assets order. """ if request.method == 'GET': snippets = Server_Assets.objects.all() serializer = ServerSerializer(snippets, many=True) return Response(serializer.data) elif request.method == 'POST': if(request.data.get('data')): data = request.data.get('data') else: data = request.data serializer = ServerSerializer(data=data) if serializer.is_valid(): serializer.save() recordAssets.delay(user=str(request.user),content="????????{ip}".format(ip=data.get("ip")),type="server",id=serializer.data.get('id')) return Response(serializer.data, status=status.HTTP_201_CREATED) return Response(serializer.errors, status=status.HTTP_400_BAD_REQUEST)
def asset_net_list(request,format=None): """ List all order, or create a new net assets. """ if request.method == 'GET': snippets = Network_Assets.objects.all() serializer = NetworkSerializer(snippets, many=True) return Response(serializer.data) elif request.method == 'POST': if(request.data.get('data')): data = request.data.get('data') else: data = request.data serializer = NetworkSerializer(data=data) if serializer.is_valid(): serializer.save() recordAssets.delay(user=str(request.user),content="?????????{ip}".format(ip=data.get("ip")),type="net",id=serializer.data.get('id')) return Response(serializer.data, status=status.HTTP_201_CREATED) return Response(serializer.errors, status=status.HTTP_400_BAD_REQUEST)
def cron_detail(request, id,format=None): """ Retrieve, update or delete a server assets instance. """ try: snippet = Cron_Config.objects.get(id=id) except Cron_Config.DoesNotExist: return Response(status=status.HTTP_404_NOT_FOUND) if request.method == 'GET': serializer = CronSerializer(snippet) return Response(serializer.data) elif request.method == 'PUT': serializer = CronSerializer(snippet, data=request.data) if serializer.is_valid(): serializer.save() return Response(serializer.data) return Response(serializer.errors, status=status.HTTP_400_BAD_REQUEST) elif request.method == 'DELETE': if not request.user.has_perm('OpsManage.delete_service_assets'): return Response(status=status.HTTP_403_FORBIDDEN) snippet.delete() return Response(status=status.HTTP_204_NO_CONTENT)
def post(self, request): if 'email' not in request.data: return Response({'errors': ['No email provided']}, status.HTTP_400_BAD_REQUEST) email = request.data['email'] try: user = PFBUser.objects.get(email=email) url = get_password_reset_url(request, user) if user.email: send_mail( settings.USER_EMAIL_SUBJECT, password_reset_txt.format(username=user.get_short_name(), url=url), settings.RESET_EMAIL_FROM, [email] ) except PFBUser.DoesNotExist: pass return Response({'status': 'Success'})
def post(self, request): errors = [] fatal = False token = request.data.get('token') password = request.data.get('password') if not token: errors.append('Invalid reset token.') fatal = True if not password: errors.append('No password provided.') signer = TimestampSigner(salt=settings.RESET_SALT) if token: try: user_uuid = signer.unsign(token, max_age=settings.RESET_TOKEN_LENGTH) except BadSignature: errors.append('Can not reset password because the reset link used was invalid.') fatal = True if len(errors) == 0: # set password user = PFBUser.objects.get(uuid=user_uuid) user.set_password(password) user.save() return Response({'status': 'Success'}) else: return Response({'errors': errors, 'fatal': fatal}, status.HTTP_400_BAD_REQUEST)
def post(self, request, *args, **kwargs): if self.alias_type.upper() not in api_settings.PASSWORDLESS_AUTH_TYPES: # Only allow auth types allowed in settings. return Response(status=status.HTTP_404_NOT_FOUND) serializer = self.serializer_class(data=request.data, context={'request': request}) if serializer.is_valid(raise_exception=True): # Validate - user = serializer.validated_data['user'] # Create and send callback token success = TokenService.send_token(user, self.alias_type, **self.message_payload) # Respond With Success Or Failure of Sent if success: status_code = status.HTTP_200_OK response_detail = self.success_response else: status_code = status.HTTP_400_BAD_REQUEST response_detail = self.failure_response return Response({'detail': response_detail}, status=status_code) else: return Response(serializer.error_messages, status=status.HTTP_400_BAD_REQUEST)
def post(self, request, *args, **kwargs): serializer = self.serializer_class(data=request.data) if serializer.is_valid(raise_exception=True): user = serializer.validated_data['user'] token, created = Token.objects.get_or_create(user=user) if created: # Initially set an unusable password if a user is created through this. user.set_unusable_password() user.save() if token: # Return our key for consumption. return Response({'token': token.key}, status=status.HTTP_200_OK) else: log.error( "Couldn't log in unknown user. Errors on serializer: %s" % (serializer.error_messages, )) return Response({'detail': 'Couldn\'t log you in. Try again later.'}, status=status.HTTP_400_BAD_REQUEST)
def test_mobile_signup_disabled(self): api_settings.PASSWORDLESS_REGISTER_NEW_USERS = False # Verify user doesn't exist yet user = User.objects.filter(**{self.mobile_field_name: '+15557654321'}).first() # Make sure our user isn't None, meaning the user was created. self.assertEqual(user, None) mobile = '+15557654321' data = {'mobile': mobile} # verify a new user was not created response = self.client.post(self.url, data) self.assertEqual(response.status_code, status.HTTP_400_BAD_REQUEST) user = User.objects.filter(**{self.mobile_field_name: '+15557654321'}).first() self.assertEqual(user, None) # Verify no token was created for the user self.assertEqual(CallbackToken.objects.filter(user=user, is_active=True).exists(), 0)
def test_post_should_not_login_if_user_is_not_active(self): user = create_user() data = { 'username': user.username, 'password': user.raw_password, } user.is_active = False user.save() user_logged_in.connect(self.signal_receiver) request = self.factory.post(data=data) response = self.view(request) self.assert_status_equal(response, status.HTTP_400_BAD_REQUEST) self.assertEqual(response.data['non_field_errors'], [djoser.constants.INACTIVE_ACCOUNT_ERROR]) self.assertFalse(self.signal_sent)
def test_post_should_not_login_if_user_is_not_active(self): user = create_user() data = { 'username': user.username, 'password': user.raw_password, } user.is_active = False user.save() user_logged_in.connect(self.signal_receiver) request = self.factory.post(data=data) response = self.view(request) self.assert_status_equal(response, status.HTTP_400_BAD_REQUEST) self.assertEqual(response.data['non_field_errors'], [djoser.constants.INVALID_CREDENTIALS_ERROR]) self.assertFalse(self.signal_sent)
def test_post_readable_error_message_when_uid_is_broken(self): """ Regression test for https://github.com/sunscrapers/djoser/issues/122 When uid is not correct unicode string, error message was looked like: 'utf-8' codec can't decode byte 0xd3 in position 0: invalid continuation byte. You passed in b'\xd3\x10\xb4' (<class 'bytes'>) Now we provide human readable message """ user = create_user() data = { 'uid': b'\xd3\x10\xb4', 'token': default_token_generator.make_token(user), 'new_password': 'new password', } request = self.factory.post(data=data) response = self.view(request) self.assert_status_equal(response, status.HTTP_400_BAD_REQUEST) self.assertIn('uid', response.data) self.assertEqual(len(response.data['uid']), 1) self.assertEqual(response.data['uid'][0], djoser.constants.INVALID_UID_ERROR)
def test_post_should_not_set_new_password_if_mismatch(self): user = create_user() data = { 'uid': djoser.utils.encode_uid(user.pk), 'token': default_token_generator.make_token(user), 'new_password': 'new password', 're_new_password': 'wrong', } request = self.factory.post(data=data) response = self.view(request) self.assert_status_equal(response, status.HTTP_400_BAD_REQUEST) user = utils.refresh(user) self.assertFalse(user.check_password(data['new_password']))
def post(self, request): serializer = serializers.AuthUserSerializer(data=request.data) if serializer.is_valid(): # Fetch current user data.. user = chino_model.User.fetch(request.user) # Copy all updated attributes.. for attr in serializer.validated_data: setattr(user, attr, serializer.validated_data[attr]) user.update(request.user) ser = serializers.AuthUserSerializer(user) return Response(ser.data) return Response(serializer.errors, status.HTTP_400_BAD_REQUEST)
def post(self, request): """ Adding a new List. """ serializer = ListSerializer(data=request.data) if not serializer.is_valid(): return Response(serializer.errors, status= status.HTTP_400_BAD_REQUEST) data = serializer.data author = request.user l = List( author=author, name=data['name'], priority=data['priority'], active=True) l.save() resp = { 'uuid':l.uuid } resp.update(request.data) return Response(resp, status=status.HTTP_201_CREATED)
def reverse(self, *args, **kwargs) -> Response: session = self.request.user.get_current_session() if not session: # noqa raise RuntimeError('This should never happen because the auth layer should handle this.') try: new_id = reverse_transaction(kwargs.get('pk'), session) Transaction.objects.get(pk=new_id).print_receipt(do_open_drawer=False) return Response({ 'success': True, 'id': new_id }, status=status.HTTP_201_CREATED) except FlowError as e: return Response({ 'success': False, 'message': e.message }, status=status.HTTP_400_BAD_REQUEST)
def handle_request(self, data): logger.debug("Received SMS message: %r", data) serializer = SMSRequestSerializer(data=data) if serializer.is_valid(): self.object = serializer.save() self.post_save() else: logger.error( "Failed validation of received SMS message: %s", serializer.errors, extra={"request": self.request} ) return HttpResponse(status=status.HTTP_400_BAD_REQUEST) return self.get_response(message=self.object, data=data)
def handle_request(self, data): logger.debug("Callback for sent SMS message status %s: %r", self.object.pk, data) serializer = SMSStatusSerializer(instance=self.object, data=data) if serializer.is_valid(): self.object = serializer.save(force_update=True) self.post_save() logger.debug( "Updated message status %s, %s: %s", self.object.pk, self.object.sms_sid, self.object.status ) else: logger.error( "Failed SMS status callback: %s", serializer.errors, extra={"request": self.request} ) return HttpResponse(status=status.HTTP_400_BAD_REQUEST) return self.get_response(message=self.object)
def assign(self, request, pk): try: task = Task.objects.get(pk=pk, assignee=None) except Task.DoesNotExist: return Response(json.dumps({"message": "Already taken"}), status=status.HTTP_400_BAD_REQUEST) expense, created = TaskExpense.objects.get_or_create( task=task, executor_id=request.user.pk, money=task.money) if created: with transaction.atomic(): request.user.update_balance(u"???? ??????", task.money, task=task) Task.objects.filter(pk=pk, assignee=None).update(assignee=request.user) return Response(json.dumps({'message': "Taken"}), status=status.HTTP_200_OK)
def create(self, request, *args, **kwargs): serializer = self.get_serializer(data=request.data) if not serializer.is_valid(): return Response(serializer.errors, status=status.HTTP_400_BAD_REQUEST) game_round = self.get_round() player = Player.objects.get(game=game_round.game, user=request.user) card = serializer.validated_data.get('card') story = serializer.validated_data.get('story') try: play = Play.play_for_round(game_round, player, card, story) except GameInvalidPlay as exc: return Response({'detail': exc.msg}, status=status.HTTP_403_FORBIDDEN) play_data = PlaySerializer(play).data return Response(play_data, status=status.HTTP_201_CREATED)
def create(self, request, *args, **kwargs): serializer = self.get_serializer(data=request.data) if not serializer.is_valid(): return Response(serializer.errors, status=status.HTTP_400_BAD_REQUEST) game_round = self.get_round() player = Player.objects.get(game=game_round.game, user=request.user) card = serializer.validated_data.get('card') play = Play.objects.get(game_round=game_round, player=player) try: play.vote_card(card) except GameInvalidPlay as exc: return Response({'detail': exc.msg}, status=status.HTTP_403_FORBIDDEN) game_round.refresh_from_db() if game_round.status == RoundStatus.COMPLETE: game = self.get_game() try: game.next_round() except (GameDeckExhausted, GameFinished): # treat deck exhaust as a fair finish pass play_data = PlaySerializer(play).data return Response(play_data, status=status.HTTP_201_CREATED)
def create(self, request, *args, **kwargs): serializer = self.get_serializer(data=request.data) if not serializer.is_valid(): return Response(serializer.errors, status=status.HTTP_400_BAD_REQUEST) try: game = self.get_game() player = game.add_player(request.user, request.data['name']) except IntegrityError as exc: if 'user_id' in str(exc): return Response({"detail": 'You are already playing this game'}, status=status.HTTP_403_FORBIDDEN) return Response({"detail": "Username already in use"}, status=status.HTTP_403_FORBIDDEN) data = PlayerSerializer(player).data return Response(data, status=status.HTTP_201_CREATED)
def vmServer_detail(request, id,format=None): """ Retrieve, update or delete a server assets instance. """ try: snippet = VmServer.objects.get(id=id) except VmServer.DoesNotExist: return Response(status=status.HTTP_404_NOT_FOUND) if request.method == 'GET': serializer = VmServerSerializer(snippet) return Response(serializer.data) elif request.method == 'PUT': serializer = VmServerSerializer(snippet, data=request.data) if serializer.is_valid(): serializer.save() return Response(serializer.data) return Response(serializer.errors, status=status.HTTP_400_BAD_REQUEST) elif request.method == 'DELETE': if not request.user.has_perm('vmanageplatform.delete_vmserver'): return Response(status=status.HTTP_403_FORBIDDEN) snippet.delete() return Response(status=status.HTTP_204_NO_CONTENT)
def vmlog_detail(request, id,format=None): """ Retrieve, update or delete a server assets instance. """ try: snippet = VmLogs.objects.get(id=id) except VmLogs.DoesNotExist: return Response(status=status.HTTP_404_NOT_FOUND) if request.method == 'GET': serializer = VmLogsSerializer(snippet) return Response(serializer.data) elif request.method == 'PUT': serializer = VmLogsSerializer(snippet, data=request.data) if serializer.is_valid(): serializer.save() return Response(serializer.data) return Response(serializer.errors, status=status.HTTP_400_BAD_REQUEST) elif request.method == 'DELETE': if not request.user.has_perm('vmanageplatform.delete_vmserver'): return Response(status=status.HTTP_403_FORBIDDEN) snippet.delete() return Response(status=status.HTTP_204_NO_CONTENT)
def generate_pitch_track(request, corpus, utterance_id): form = PitchAnalysisForm(request.POST) if form.is_valid(): print(form.cleaned_data) corpus = Corpus.objects.get(name=corpus) with CorpusContext(corpus.config) as c: results, pulses = c.analyze_utterance_pitch(utterance_id, with_pulses=True, **form.cleaned_data) pitch_data = {} track = [] for datapoint in results: v = datapoint['F0'] k = datapoint['time'] if v is None or v < 1: continue track.append({'x': k, 'y': v}) pitch_data['pitch_track'] = track pitch_data['pulses'] = [{'x': x} for x in sorted(pulses)] # pitch_data['pulses'] = [] return JsonResponse(pitch_data, safe=False) else: return JsonResponse(data=form.errors, status=status.HTTP_400_BAD_REQUEST)
def corpus_enrichment_api(request, name=None): if request.method == 'POST': data = request.data try: corpus = Corpus.objects.get(name=name) except ObjectDoesNotExist: return HttpResponse('Could not find the specified corpus.', status=status.HTTP_404_NOT_FOUND) if corpus.database.status != 'R': return HttpResponse("The corpus's database is not currently running.", status=status.HTTP_400_BAD_REQUEST) if corpus.status == 'NI': return HttpResponse('The corpus has not been imported yet.', status=status.HTTP_400_BAD_REQUEST) if corpus.is_busy: return HttpResponse('The corpus is currently busy, please try once the current process is finished.', status=status.HTTP_409_CONFLICT) corpus.status = 'ER' corpus.save() blocking = data.get('blocking', False) if blocking: enrich_corpus_task(corpus.pk, data) else: t = enrich_corpus_task.delay(corpus.pk, data) return HttpResponse(status=status.HTTP_202_ACCEPTED)
def post(self, request, format=None): ''' Shares a TalentMAP element Post method accepts an object with at minimum the following parameters: * type - the type of object to be shared (position) * id - the id of the object to be shared * email - the email to send the share to ''' valid, error = self.validate(request) user = self.request.user if valid: response = None if "@state.gov" in request.data.get("email"): response = self.internal_share(user, request.data.get("email"), request.data.get("type"), request.data.get("id")) else: response = self.email_share(user, request.data.get("email"), request.data.get("type"), request.data.get("id")) return response else: return Response({"message": error}, status=status.HTTP_400_BAD_REQUEST)