我们从Python开源项目中,提取了以下33个代码示例,用于说明如何使用rest_framework.exceptions.APIException()。
def run_action(self, action, pk, data): try: if not self.has_permission(self.user, action, pk): self.reply(action, errors=['Permission Denied'], status=401) if not action in self.available_actions: self.reply(action, errors=['Invalid Action'], status=400) elif action in ('create', 'list'): data, status = getattr(self, action)(data) elif action in ('retrieve', 'delete'): data, status = getattr(self, action)(pk) elif action in ('update', 'subscribe'): data, status = getattr(self, action)(pk, data) self.reply(action, data=data, status=status, request_id=self.request_id) except APIException as ex: self.reply(action, errors=self._format_errors(ex.detail), status=ex.status_code, request_id=self.request_id)
def friendly_exception_handler(exc, context): response = exception_handler(exc, context) if not response and settings.CATCH_ALL_EXCEPTIONS: exc = APIException(exc) response = exception_handler(exc, context) if response is not None: if is_pretty(response): return response error_message = response.data['detail'] error_code = settings.FRIENDLY_EXCEPTION_DICT.get( exc.__class__.__name__) response.data.pop('detail', {}) response.data['code'] = error_code response.data['message'] = error_message response.data['status_code'] = response.status_code # response.data['exception'] = exc.__class__.__name__ return response
def run_action(self, action, pk, data): try: if not self.has_permission(self.user, action, pk): self.reply(action, errors=['Permission Denied'], status=401, request_id=self.request_id) elif action not in self.available_actions: self.reply(action, errors=['Invalid Action'], status=400, request_id=self.request_id) else: methodname = self.available_actions[action] method = getattr(self, methodname) detail = getattr(method, 'detail', True) if detail: rv = method(pk, data=data) else: rv = method(data=data) data, status = rv self.reply(action, data=data, status=status, request_id=self.request_id) except APIException as ex: self.reply(action, errors=self._format_errors(ex.detail), status=ex.status_code, request_id=self.request_id)
def _authenticate(self): """ Attempt to authenticate the request using each authentication instance in turn. Returns a three-tuple of (authenticator, user, authtoken). """ for authenticator in self.authenticators: try: user_auth_tuple = authenticator.authenticate(self) except exceptions.APIException: self._not_authenticated() raise if user_auth_tuple is not None: self._authenticator = authenticator self.user, self.auth = user_auth_tuple return self._not_authenticated()
def web_sight_exception_handler(exc, context): """ This is a custom Django exception handler that handles all exceptions thrown by the Web Sight REST API. :param exc: The exception that was thrown. :param context: The context in which the exception was thrown. :return: A response to return to the requesting user. """ if isinstance(exc, IntegrityError): return handle_integrity_error(exc, context) elif isinstance(exc, ValidationError): return handle_validation_error(exc, context) elif isinstance(exc, WsRestNonFieldException): return handle_non_field_error(exc, context) elif isinstance(exc, APIException): return handle_api_exception(exc, context) else: return None
def weather(request, city): """ Returns weather information from a particular city """ if city is None: return APIException("Missing city parameter") try: weather_provider_class = utils.get_setting('WEATHER_PROVIDER') weather_provider = utils.module_member(weather_provider_class) provider_instance = weather_provider() res = provider_instance.get_weather_data(city) except ApiCallFailed: return Response(status=status.HTTP_404_NOT_FOUND) return Response(res)
def get(self, request, *args, **kwargs): try: provided_token = request.META['HTTP_AUTHORIZATION'] provided_token = provided_token.replace('Token ', '') token = Token.objects.select_related('user').get(key=provided_token) except Exception as e: # modify the original exception response raise exceptions.APIException("Token not valid.") if not token.user.is_active: # can also modify this exception message return Response(error_response("User inactive or deleted.")) sport = int(request.query_params.get('sport', '')) if sport == 0: swimmer = Swimmer.objects.get(type=request.user.id) user_serialized = SwimmerSerializer(swimmer) elif sport == 1: runner = Runner.objects.get(type=request.user.id) user_serialized = RunnerSerializer(runner) return Response(success_response(user_serialized.data))
def _share(self): content = self.get_object() try: share = content.share(self.request.user.profile) except ValidationError as e: raise exceptions.ValidationError(e.message) except Exception: raise exceptions.APIException("Unknown error when creating share.") return Response({"status": "ok", "content_id": share.id}, status=HTTP_201_CREATED)
def _unshare(self): content = self.get_object() try: content.unshare(self.request.user.profile) except ValidationError as e: raise exceptions.ValidationError(e.message) except Exception: raise exceptions.APIException("Unknown error when creating share.") return Response({"status": "ok"}, status=HTTP_204_NO_CONTENT)
def stars_top_employee_lists(request, top_number, kind, id): """ Returns stars top {top_number} list according to {kind} (category, keyword) {id} (kind_id) --- serializer: stars.serializers.StarTopEmployeeLists responseMessages: - code: 401 message: Unauthorized. Authentication credentials were not provided. Invalid token. - code: 403 message: Forbidden, authentication credentials were not provided - code: 404 message: Not found - code: 412 message: Precondition failed, kind should be category or subcategory """ try: if request.method == 'GET': if kind == 'category': top_list = Star.objects.filter(category__id=id).values( 'to_user__pk', 'to_user__username', 'to_user__first_name', 'to_user__last_name', 'to_user__level' 'to_user__avatar').annotate(num_stars=Count('to_user')).order_by('-num_stars')[:top_number] elif kind == 'keyword': top_list = Star.objects.filter(keyword__id=id).values( 'to_user__pk', 'to_user__username', 'to_user__first_name', 'to_user__last_name', 'to_user__level', 'to_user__avatar').annotate(num_stars=Count('to_user')).order_by('-num_stars')[:top_number] else: return Response(status=status.HTTP_412_PRECONDITION_FAILED) serializer = StarTopEmployeeLists(top_list, many=True) return Response(serializer.data, status=status.HTTP_200_OK) except Exception as e: raise APIException(e)
def get(self, request, kind, format=None): """ List all employees --- serializer: administrator.serializers.EmployeeSerializer parameters: - name: pagination required: false type: string paramType: query - name: quantity required: false type: string paramType: query """ employee_list = Employee.objects.filter(is_active=True, is_base_profile_complete=True).order_by('-' + kind) if request.GET.get('quantity'): try: quantity = request.GET.get('quantity') employee_list = employee_list[:quantity] except Exception as e: raise APIException(e) if request.GET.get('pagination'): pagination = request.GET.get('pagination') if pagination == 'true': paginator = AdministratorPagination() results = paginator.paginate_queryset(employee_list, request) serializer = EmployeeTopSerializer(results, many=True) return paginator.get_paginated_response(serializer.data) else: return Response(status=status.HTTP_400_BAD_REQUEST) else: serializer = EmployeeTopSerializer(employee_list, many=True) return Response(serializer.data, status=status.HTTP_200_OK)
def perform_create(self, serializer): '''Add a new tileset When adding a new dataset, we need to enforce permissions as well as other rules like the uniqueness of uuids. Args: serializer (tilsets.serializer.TilesetSerializer): The serializer to use to save the request. ''' if 'uid' in self.request.data: try: self.queryset.get(uuid=self.request.data['uid']) # this uid already exists, return an error raise rfe.APIException("UID already exists") except tm.Tileset.DoesNotExist: uid = self.request.data['uid'] else: uid = slugid.nice().decode('utf-8') if 'filetype' not in self.request.data: raise rfe.APIException('Missing filetype') datafile_name = self.request.data.get('datafile').name if 'name' in self.request.data: name = self.request.data['name'] else: name = op.split(datafile_name)[1] if self.request.user.is_anonymous: # can't create a private dataset as an anonymous user serializer.save( owner=gu.get_anonymous_user(), private=False, name=name, uuid=uid ) else: serializer.save(owner=self.request.user, name=name, uuid=uid)
def server_error(request): raise APIException('APIException')
def get_object_or_404(self, pk): queryset = self.filter_queryset(self.get_queryset()) filter_kwargs = {self.lookup_field: pk} try: return get_object_or_404(queryset, **filter_kwargs) except Http404: # transform Http404 into an APIException raise NotFound
def has_view_permissions(self, path, method, view): """ Return `True` if the incoming request has the correct view permissions. """ if view.request is None: return True try: view.check_permissions(view.request) except (exceptions.APIException, Http404, PermissionDenied): return False return True
def show_form_for_method(self, view, method, request, obj): """ Returns True if a form should be shown for this method. """ if method not in view.allowed_methods: return # Not a valid method try: view.check_permissions(request) if obj is not None: view.check_object_permissions(request, obj) except exceptions.APIException: return False # Doesn't have permissions return True
def exception_handler(exc, context): """ Returns the response that should be used for any given exception. By default we handle the REST framework `APIException`, and also Django's built-in `Http404` and `PermissionDenied` exceptions. Any unhandled exceptions may return `None`, which will cause a 500 error to be raised. """ if isinstance(exc, exceptions.APIException): headers = {} if getattr(exc, 'auth_header', None): headers['WWW-Authenticate'] = exc.auth_header if getattr(exc, 'wait', None): headers['Retry-After'] = '%d' % exc.wait if isinstance(exc.detail, (list, dict)): data = exc.detail else: data = {'detail': exc.detail} set_rollback() return Response(data, status=exc.status_code, headers=headers) elif isinstance(exc, Http404): msg = _('Not found.') data = {'detail': six.text_type(msg)} set_rollback() return Response(data, status=status.HTTP_404_NOT_FOUND) elif isinstance(exc, PermissionDenied): msg = _('Permission denied.') data = {'detail': six.text_type(msg)} set_rollback() return Response(data, status=status.HTTP_403_FORBIDDEN) return None
def determine_actions(self, request, view): """ For generic class based views we return information about the fields that are accepted for 'PUT' and 'POST' methods. """ actions = {} for method in {'PUT', 'POST'} & set(view.allowed_methods): view.request = clone_request(request, method) try: # Test global permissions if hasattr(view, 'check_permissions'): view.check_permissions(view.request) # Test object permissions if method == 'PUT' and hasattr(view, 'get_object'): view.get_object() except (exceptions.APIException, PermissionDenied, Http404): pass else: # If user has appropriate permissions for the view, include # appropriate metadata about the fields that should be supplied. serializer = view.get_serializer() actions[method] = self.get_serializer_info(serializer) finally: view.request = request return actions
def add_player(self, request, pk=None): game = get_object_or_404(Game, pk=pk) body = request.data player = get_object_or_404(Player, pk=body["id"]) with transaction.atomic(): relation = GamePlayerRelation(game=game, player=player) relation.save() if GamePlayerRelation.objects.filter(game=game).count() > 6: raise APIException(detail="Game is already full", code=400) return redirect(reverse("frisbeer:games-detail", args=[pk]))
def create_teams(self, request, pk=None): game = get_object_or_404(Game, pk=pk) if GamePlayerRelation.objects.filter(game=game).count() != 6: raise APIException("Game needs 6 players before teams can be created", code=400) force = request.data.get("re_create", False) game.create_teams() game.state = Game.READY game.save() print("Created") return redirect(reverse("frisbeer:games-detail", args=[pk]))
def exception_handler(exc, context): """ Returns the response that should be used for any given exception. By default we handle the REST framework `APIException`, and also Django's built-in `Http404` and `PermissionDenied` exceptions. Any unhandled exceptions may return `None`, which will cause a 500 error to be raised. """ if isinstance(exc, exceptions.APIException): headers = {} if getattr(exc, 'auth_header', None): headers['WWW-Authenticate'] = exc.auth_header if getattr(exc, 'wait', None): headers['Retry-After'] = '%d' % exc.wait if isinstance(exc.detail, (list, dict)): data = exc.detail else: data = {'detail': exc.detail} set_rollback() return Response(data, status=exc.status_code, headers=headers) elif isinstance(exc, Http404): msg = _('Not found.') data = {'detail': six.text_type(msg)} set_rollback() return Response(data, status=status.HTTP_404_NOT_FOUND) elif isinstance(exc, PermissionDenied): msg = _('Permission denied.') data = {'detail': six.text_type(msg)} set_rollback() return Response(data, status=status.HTTP_403_FORBIDDEN) # Note: Unhandled exceptions will raise a 500 error. return None
def authenticate(self, request): api_settings = APISettings.get_solo() if not api_settings.allow: raise exceptions.PermissionDenied('API is disabled') if request.META.get('HTTP_X_AUTHKEY') != api_settings.auth_key: raise exceptions.AuthenticationFailed('Invalid auth key') try: user = User.objects.get(username=settings.DSMRREADER_REST_FRAMEWORK_API_USER) except User.DoesNotExist: raise exceptions.APIException('API user not found') return (user, None)
def exception_handler(exc, context=None): """Returns the response that should be used for any given exception. By default we handle the REST framework `APIException`, and also Django's builtin `Http404` and `PermissionDenied` exceptions. Any unhandled exceptions may return `None`, which will cause a 500 error to be raised. """ if isinstance(exc, exceptions.APIException): headers = {} if getattr(exc, 'auth_header', None): headers['WWW-Authenticate'] = exc.auth_header if getattr(exc, 'wait', None): headers['X-Throttle-Wait-Seconds'] = '%d' % exc.wait detail = format_exception(exc) return Response(detail, status=exc.status_code, headers=headers) elif isinstance(exc, Http404): return Response({'error_type': exc.__class__.__name__, 'errors': [{'message': str(exc)}]}, status=status.HTTP_404_NOT_FOUND) elif isinstance(exc, DjangoPermissionDenied): return Response({'error_type': exc.__class__.__name__, 'errors': [{'message': str(exc)}]}, status=status.HTTP_403_FORBIDDEN) # Note: Unhandled exceptions will raise a 500 error. return None
def _apply_geo_filters(self, request, queryset): # Strictly speaking these are not queries that should be possible with a GeoReport v2 # core implementation, but as they do not require extra data in the models, it's worth it # to have them available "for free". bbox = request.query_params.get('bbox') if bbox: bbox = parse_bbox(bbox) (long1, lat1), (long2, lat2) = bbox queryset = queryset.filter(lat__range=(lat1, lat2)) queryset = queryset.filter(long__range=(long1, long2)) lat = request.query_params.get('lat') lon = request.query_params.get('long') radius = request.query_params.get('radius') if lat and lon and radius: try: lat = float(lat) lon = float(lon) radius = float(radius) except ValueError: raise APIException('lat/lon/radius must all be valid decimal numbers') if not determine_gissiness(): raise APIException('this installation is not capable of lat/lon/radius queries') from django.contrib.gis.db.models.functions import Distance from django.contrib.gis.geos import Point from django.contrib.gis.measure import D point = Point(x=lon, y=lat, srid=4326) queryset = queryset.annotate(distance=Distance('location', point)) queryset = queryset.filter(location__distance_lte=(point, D(m=radius))) return queryset
def create(self, validated_data, *args, **kwargs): city = self.Meta.model.look_up_city(latitude=validated_data['latitude'], longitude=validated_data['longitude'] , ) validated_data['city'] = city place_qs = self.Meta.model.objects.filter(city=validated_data['city'], name=validated_data['name'] ) if place_qs: raise APIException('city: "{}"??????"{}"???,?????'.format( validated_data['city'], validated_data['name'])) return super(PlaceSerializer, self).create(validated_data, *args, **kwargs)
def create(self, validated_data): validated_data = { 'email': validated_data.get('email', ''), 'password': validated_data.get('password1', ''), 'gender': validated_data.get('gender'), 'age': validated_data.get('age'), 'latitude': validated_data.get('latitude'), 'hardness': validated_data.get('hardness'), 'registration_id': validated_data.get('registration_id'), } try: return CustomUser.objects.create_user(**validated_data) except Exception as e: raise APIException({"detail": e.args})
def perform_create(self, serializer): serializer.save() post = Post.objects.get(pk=self.kwargs.get('post_pk')) try: Alarm.objects.create(post=post, comment_author=self.request.user) post_pk = self.kwargs.get('post_pk') post = get_object_or_404(Post, pk=post_pk) registration_id = post.author.registration_id is_logined = Token.objects.filter(user=post.author) if registration_id and is_logined: message_body = "??? ? ? '{}'...? ??? ?????".format(post.content) messaging(message_body, post.pk, registration_id) except Exception as e: raise APIException({"detail": e.args})
def exception_handler(exc, context): if isinstance(exc, exceptions.APIException): if isinstance(exc.detail, dict): data = exc.detail else: data = {'detail': exc.detail} set_rollback() return data # Note: Unhandled exceptions will raise a 500 error. return None
def handle_api_exception(exc, context): """ Handle the given APIException. :param exc: The exception that was thrown. :param context: The context in which the exception was thrown. :return: A Django Response object. """ response = exception_handler(exc, context) response.data = { "status_code": response.status_code, "message": "Exception thrown", "detail": exc.detail, "error_fields": [], } return response
def top(request, kind, quantity): """ Returns top {quantity} list, {kind} (total_score, level, last_month_score, current_month_score, last_year_score, current_year_score) --- serializer: employees.serializers.EmployeeTopListSerializer responseMessages: - code: 401 message: Unauthorized. Authentication credentials were not provided. Invalid token. - code: 403 message: Forbidden, authentication credentials were not provided - code: 404 message: Not found - code: 500 message: Internal server error, cannot resolve keyword into field. """ try: employee_list_filtered = [] if request.method == 'GET': employee_list = Employee.objects.filter(is_active=True, is_base_profile_complete=True).order_by('-' + kind)[:quantity] if kind == 'total_score': for employee in employee_list: if employee.total_score > 0: employee_list_filtered.append(employee) serializer = EmployeeTopTotalScoreList(employee_list_filtered, many=True) elif kind == 'level': for employee in employee_list: if employee.level > 0: employee_list_filtered.append(employee) serializer = EmployeeTopLevelList(employee_list_filtered, many=True) elif kind == 'current_month_score': for employee in employee_list: if employee.current_month_score > 0: employee_list_filtered.append(employee) serializer = EmployeeTopCurrentMonthList(employee_list_filtered, many=True) elif kind == 'current_year_score': for employee in employee_list: if employee.current_year_score > 0: employee_list_filtered.append(employee) serializer = EmployeeTopCurrentYearList(employee_list_filtered, many=True) elif kind == 'last_month_score': for employee in employee_list: if employee.last_month_score > 0: employee_list_filtered.append(employee) serializer = EmployeeTopLastMonthList(employee_list_filtered, many=True) elif kind == 'last_year_score': for employee in employee_list: if employee.last_year_score > 0: employee_list_filtered.append(employee) serializer = EmployeeTopLastYearList(employee_list_filtered, many=True) return Response(serializer.data, status=status.HTTP_200_OK) except Exception as e: raise APIException(e)