我们从Python开源项目中,提取了以下17个代码示例,用于说明如何使用rest_framework.exceptions.NotAcceptable()。
def get(self, request, *args, **kwargs): box_office = BoxOfficeMovie.objects.all().order_by('-created')[:10] # ????? ?? ?? ?? ???? comments = [] for i in box_office: comment = Comment.objects.filter(movie__pk=i.movie.pk) for k in comment: comments.append(k) # print('???', comments) # 5? ??? 1? ?? ?? comments = sorted(comments, key=attrgetter('likes_count'), reverse=True) comments = comments[:5] # print('???', comments) if len(comments) == 0: raise NotAcceptable('???? ????') else: best_comment = random.sample(comments, 1) serializer = CommentSerializer(best_comment, many=True) return Response(serializer.data)
def authenticate(self, request): try: token = request.META['Authorization'] except KeyError: raise NotAcceptable() token = token.split() if token[0] != "token": raise NotAcceptable() try: player = Player.objects.get(auth_token=token[1]) except Player.DoesNotExist: return None user = AnonymousUser() user.player = player return (user, None)
def employee_update(request, employee_id): """ This endpoint update skype, first_name, last_name and location --- response_serializer: employees.serializers.EmployeeSerializer parameters: - name: first_name required: true paramType: string - name: last_name required: true paramType: string - name: skype_id required: true paramType: string - name: location description: location id paramType: string responseMessages: - code: 401 message: Unauthorized. Authentication credentials were not provided. Invalid token. - code: 403 message: Forbidden. - code: 404 message: Not found """ if request.method == 'PATCH': try: employee = get_object_or_404(Employee, pk=employee_id) employee.skype_id = request.data['skype_id'] employee.first_name = request.data['first_name'] employee.last_name = request.data['last_name'] employee.location = get_object_or_404(Location, pk=request.data['location']) employee.save() serializer = EmployeeSerializer(employee) return Response(serializer.data, status=status.HTTP_202_ACCEPTED) except Exception as e: print(e) raise NotAcceptable(config.USER_DATA_IS_MISSING)
def post(self, request, *args, **kwargs): """ Returns a token and user_id, for credentials provided. --- response_serializer: employees.serializers.EmployeeAuthenticationResponse responseMessages: - code: 404 message: Not found - code: 500 message: Unable to log in with provided credentials. parameters: - name: username required: true paramType: string - name: password required: true :paramType: string """ try: response = super(CustomObtainAuthToken, self).post(request, *args, **kwargs) token = Token.objects.get(key=response.data['token']) employee = get_object_or_404(Employee, pk=token.user_id) return Response({'token': token.key, 'user_id': token.user_id, 'reset_password_code': employee.reset_password_code, 'is_base_profile_complete': employee.is_base_profile_complete, 'is_password_reset_required': employee.is_password_reset_required, 'is_staff': employee.is_staff}) except Exception as e: print(e) raise NotAcceptable(config.USER_UNABLE_TO_LOG)
def determine_version(self, request, *args, **kwargs): media_type = _MediaType(request.accepted_media_type) version = media_type.params.get(self.version_param, self.default_version) version = unicode_http_header(version) if not self.is_allowed_version(version): raise exceptions.NotAcceptable(self.invalid_version_message) return version # We don't need to implement `reverse`, as the versioning is based # on the `Accept` header, not on the request URL.
def perform_create(self, serializer): """ 1. ?? ???? ?? 2. ?? ??? ?? ??? ???? movie ??? ?? 3. ?? 1?? 1? ??? ?? ?? """ movie = Movie.objects.get(pk=self.kwargs['pk']) author = MyUser.objects.get(pk=self.request.user.pk) # ?? ??? try: content = self.request.data['content'] r = ProfanitiesFilter() clean_content = r.clean(content) except: clean_content = '' if Comment.objects.filter(movie=movie, author=author).exists(): raise NotAcceptable('?? ???? ??????') serializer.save(movie=movie, author=author, content=clean_content) # ?? ?? ?? movie.comment_count += 1 new_star = float(self.request.data['star']) movie.star_sum += new_star movie.star_average = (movie.star_average * (movie.comment_count - 1) + new_star) / movie.comment_count movie.save()
def get(self, request): try: magazine = Magazine.objects.all() serializer = MagazineSerializer(magazine, many=True) return Response(serializer.data) except: raise NotAcceptable('not reachable')
def get(self, request, *args, **kwargs): try: magazines = Magazine.objects.all() magazine_samples = random.sample(set(magazines), 4) serializer = MagazineSerializer(magazine_samples, many=True) return Response(serializer.data) except: raise NotAcceptable('not reachable')
def modify_sandbox(self, request, pk=None): calendar = self.get_object() search_kwargs = {} course_name = '' if 'course_id' in request.query_params: course_name = request.query_params['course_id'] search_kwargs['course_id'] = course_name elif 'short_name' in request.query_params: course_name = request.query_params['short_name'] short_name = COURSE_RE.match(course_name) if not short_name: raise NotAcceptable(detail='short_name field does not match pattern.') search_kwargs['number'] = short_name.group('num') search_kwargs['department'] = short_name.group('dept') if short_name.group('letter'): search_kwargs['letter'] = short_name.group('letter') else: raise NotAcceptable(detail='short_name or course_id parameter required.') try: course = Course.objects.get(**search_kwargs) except Course.DoesNotExist: raise NotFound('Course "%s" not found.' % course_name) if request.method == 'POST': # check if already there, and if so, raise 409 if calendar.sandbox.filter(id=course.id).exists(): raise ContentError('Course "%s" already in sandbox.' % course_name) calendar.sandbox.add(course) elif request.method == 'DELETE': if not calendar.sandbox.filter(id=course.id).exists(): raise ContentError('Course "%s" not in sandbox.' % course_name) calendar.sandbox.remove(course) genie.clear_cached_recommendations(calendar.profile_id, calendar.pk) serializer = CourseSerializer(course) return Response(serializer.data)
def modify_course(self, request, pk=None): pref = self.get_object() search_kwargs = {} course_name = '' if 'course_id' in request.query_params: course_name = request.query_params['course_id'] search_kwargs['course_id'] = course_name else: course_name = request.query_params['short_name'] short_name = COURSE_RE.match(course_name) if not short_name: raise NotAcceptable(detail='short_name field does not match pattern.') search_kwargs['number'] = short_name.group('num') search_kwargs['department'] = short_name.group('dept') if short_name.group('letter'): search_kwargs['letter'] = short_name.group('letter') try: course = Course.objects.get(**search_kwargs) except Course.DoesNotExist: raise NotFound('Course "%s" not found.' % course_name) if request.method == 'POST': # check if already there, and if so, raise 409 if pref.bl_courses.filter(id=course.id).exists(): raise ContentError('Course "%s" already in blacklist.' % course_name) pref.bl_courses.add(course) elif request.method == 'DELETE': if not pref.bl_courses.filter(id=course.id).exists(): raise ContentError('Course "%s" not in blacklist.' % course_name) pref.bl_courses.remove(course) genie.clear_cached_recommendations(pref.profile_id) return Response({'success': True})
def modify_course(self, request, pk=None): semester = self.get_object() search_kwargs = {} course_name = '' if 'course_id' in request.query_params: course_name = request.query_params['course_id'] search_kwargs['course_id'] = course_name elif 'short_name' in request.query_params: course_name = request.query_params['short_name'] short_name = COURSE_RE.match(course_name) if not short_name: raise NotAcceptable(detail='short_name field does not match pattern.') search_kwargs['number'] = short_name.group('num') search_kwargs['department'] = short_name.group('dept') if short_name.group('letter'): search_kwargs['letter'] = short_name.group('letter') else: raise NotAcceptable(detail='short_name or course_id parameter required.') try: course = Course.objects.get(**search_kwargs) except Course.DoesNotExist: raise NotFound('Course "%s" not found.' % course_name) if request.method == 'POST': # check if already there, and if so, raise 409 if semester.courses.filter(id=course.id).exists(): raise ContentError('Course "%s" already in semester.' % course_name) semester.courses.add(course) elif request.method == 'DELETE': if not semester.courses.filter(id=course.id).exists(): raise ContentError('Course "%s" not in semester.' % course_name) semester.courses.remove(course) genie.clear_cached_recommendations(semester.calendar.profile_id) serializer = CourseSerializer(course) return Response(serializer.data)
def create(self, validated_data): basket = validated_data.get('basket') order_number = self.generate_order_number(basket) billing_address = None if 'billing_address' in validated_data: billing_address = BillingAddress(**validated_data['billing_address']) shipping_address = None if 'shipping_address' in validated_data: shipping_address = ShippingAddress(**validated_data['shipping_address']) # Place the order try: order = self._insupd_order( basket=basket, user=validated_data.get('user') or AnonymousUser(), order_number=order_number, billing_address=billing_address, shipping_address=shipping_address, order_total=validated_data.get('total'), shipping_method=validated_data.get('shipping_method'), shipping_charge=validated_data.get('shipping_charge'), guest_email=validated_data.get('guest_email') or '') except ValueError as e: raise exceptions.NotAcceptable(str(e)) # Return the order return order
def employee_update_password(request, employee_id): """ This endpoint update employee password --- response_serializer: employees.serializers.EmployeeSerializer parameters: - name: current_password required: true paramType: string - name: new_password required: true paramType: string responseMessages: - code: 400 message: Bad request. - code: 401 message: Unauthorized. Authentication credentials were not provided. Invalid token. - code: 403 message: Forbidden. - code: 404 message: Not found """ if request.method == 'POST': try: current_password = request.data['current_password'] new_password = request.data['new_password'] except Exception as e: print(e) raise NotAcceptable(config.USER_DATA_IS_MISSING) employee = get_object_or_404(Employee, pk=employee_id) if current_password == new_password: content = {'detail': config.PASSWORD_EQUAL} return Response(content, status=status.HTTP_400_BAD_REQUEST) elif employee.check_password(current_password): employee.set_password(new_password) employee.reset_password_code = None employee.is_password_reset_required = False employee.save() serializer = EmployeeSerializer(employee) return Response(serializer.data, status=status.HTTP_202_ACCEPTED) else: content = {'detail': config.WRONG_CURRENT_PASSWORD} return Response(content, status=status.HTTP_400_BAD_REQUEST)
def select_renderer(self, request, renderers, format_suffix=None): """ Given a request and a list of renderers, return a two-tuple of: (renderer, media type). """ # Allow URL style format override. eg. "?format=json format_query_param = self.settings.URL_FORMAT_OVERRIDE format = format_suffix or request.query_params.get(format_query_param) if format: renderers = self.filter_renderers(renderers, format) accepts = self.get_accept_list(request) # Check the acceptable media types against each renderer, # attempting more specific media types first # NB. The inner loop here isn't as bad as it first looks :) # Worst case is we're looping over len(accept_list) * len(self.renderers) for media_type_set in order_by_precedence(accepts): for renderer in renderers: for media_type in media_type_set: if media_type_matches(renderer.media_type, media_type): # Return the most specific media type as accepted. media_type_wrapper = _MediaType(media_type) if ( _MediaType(renderer.media_type).precedence > media_type_wrapper.precedence ): # Eg client requests '*/*' # Accepted media type is 'application/json' full_media_type = ';'.join( (renderer.media_type,) + tuple('{0}={1}'.format( key, value.decode(HTTP_HEADER_ENCODING)) for key, value in media_type_wrapper.params.items())) return renderer, full_media_type else: # Eg client requests 'application/json; indent=8' # Accepted media type is 'application/json; indent=8' return renderer, media_type raise exceptions.NotAcceptable(available_renderers=renderers)
def _insupd_order(self, basket, user=None, shipping_address=None, billing_address=None, **kwargs): existing_orders = basket.order_set.all() if existing_orders.exclude(status=settings.ORDER_STATUS_PAYMENT_DECLINED).exists(): raise exceptions.NotAcceptable(_("An non-declined order already exists for this basket.")) existing_count = existing_orders.count() if existing_count > 1: raise exceptions.NotAcceptable(_("Multiple order exist for this basket! This should never happen and we don't know what to do.")) # Get request object from context request = self.context.get('request', None) # If no orders were pre-existing, make a new one. if existing_count == 0: return self.place_order( basket=basket, user=user, shipping_address=shipping_address, billing_address=billing_address, request=request, **kwargs) # Update this order instead of making a new one. order = existing_orders.first() kwargs['order_number'] = order.number status = self.get_initial_order_status(basket) shipping_address = self.create_shipping_address( user=user, shipping_address=shipping_address) billing_address = self.create_billing_address( user=user, billing_address=billing_address, shipping_address=shipping_address, **kwargs) return utils.OrderUpdater().update_order( order=order, basket=basket, user=user, shipping_address=shipping_address, billing_address=billing_address, status=status, request=request, **kwargs)