我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用rest_framework.exceptions.NotFound()。
def suggest(request): ''' Suggest gene names based on the input text ''' # suggestions are taken from a gene annotations tileset tileset_uuid = request.GET['d'] text = request.GET['ac'] try: tileset = tm.Tileset.objects.get(uuid=tileset_uuid) except ObjectDoesNotExist: raise rfe.NotFound('Suggestion source file not found') result_dict = tsu.get_gene_suggestions( tut.get_datapath(tileset.datafile.url), text ) return JsonResponse(result_dict, safe=False)
def delete_scan_list(request: Request, token: str) -> Response: """Update an existing list.""" # TODO: Access control (Or is token sufficient)? try: scan_list = ScanList.objects.get(token=token) # all related objects CASCADE automatically. scan_list.delete() return Response({ 'type': 'success', 'message': 'ok', }) except KeyError as e: raise ParseError except ScanList.DoesNotExist: raise NotFound # TODO: Why POST? # TODO: Add a filter option to get_lists and get rid of this search method
def create(self, validated_data): company = self.context['request'].user.company log_id = self.context.get('view').kwargs.get('log_id') recipient = validated_data.get('recipient') try: log = NotificationLog.objects.get(notification__company=company, id=log_id) except NotificationLog.DoesNotExist: raise exceptions.NotFound() try: log.trigger(recipient=recipient) except Exception as exc: raise serializers.ValidationError( {"non_field_errors": ["Internal server error."]}) return log
def get_build(request, package_name, build_number, version=None, format=None): account = request.user package = get_object_or_404(Package, name=package_name, account=account) try: last_build = package.builds.filter(version=version).get(build_number=build_number) except Build.DoesNotExist: raise exceptions.NotFound("Couldn't find a build for this package name") # Serialize the new build serializer = BuildSerializer(last_build) response_data = serializer.data.copy() response_data.update(last_build.extra) return Response(response_data)
def create(self, request, *args, **kwargs): """ 1? post ?? ??? ??, 2? post ?? ??? ?? """ try: famous_line = FamousLine.objects.get(pk=kwargs['pk']) except: raise NotFound('???? ???? ????.') serializer = self.get_serializer(data=request.data) serializer.is_valid(raise_exception=True) # ?? ??? ??? ?? famous_like_exist = FamousLike.objects.filter(user=request.user, famous_line=famous_line) if famous_like_exist.exists(): famous_like_exist.delete() return Response(serializer.errors, status=status.HTTP_306_RESERVED) serializer.save(famous_line=famous_line, user=request.user) headers = self.get_success_headers(serializer.data) return Response(serializer.data, status=status.HTTP_201_CREATED, headers=headers)
def create(self, request, *args, **kwargs): try: comment = Comment.objects.get(pk=kwargs['pk']) except: raise NotFound('??? ???? ????.') serializer = self.get_serializer(data=request.data) serializer.is_valid(raise_exception=True) # ?? ?? ? ??? ?? comment_like_exist = CommentLike.objects.filter(user=request.user, comment=comment) if comment_like_exist.exists(): comment_like_exist.delete() return Response(serializer.errors, status=status.HTTP_306_RESERVED) serializer.save(comment=comment, user=request.user) headers = self.get_success_headers(serializer.data) return Response(serializer.data, status=status.HTTP_201_CREATED, headers=headers)
def experiment_by_id(request, exp_id): try: dataset = DataSet.objects.visible().get(id=exp_id) except DataSet.DoesNotExist: raise NotFound('No experiment with given id found.') data = { 'id': dataset.id, 'name': dataset.name, 'description': '', # TODO 'authors': [], 'populations': dataset.get_populations(), 'subgroups': dataset.get_subgroups(), 'metrics': dataset.get_metrics(), } return Response(data)
def modify_certificates(self, request, pk=None): calendar = self.get_object() certificate_id = request.query_params['cert_id'] try: certificate = Certificate.objects.get(id=certificate_id) except Certificate.DoesNotExist: raise NotFound('Certificate "%s" not found.' % certificate_id) if request.method == 'POST': # check if already there, and if so, raise 409 if calendar.certificates.filter(id=certificate.id).exists(): raise ContentError('Certificate "%s" already selected.' % certificate_id) calendar.certificates.add(certificate) elif request.method == 'DELETE': if not calendar.certificates.filter(id=certificate.id).exists(): raise ContentError('Certificate "%s" not in this calendar.' % course_id) calendar.certificates.remove(certificate) genie.clear_cached_recommendations(calendar.profile_id, calendar.pk) return Response({'success': True})
def post(self, request, username=None): follower = self.request.user.profile try: followee = Profile.objects.get(user__username=username) except Profile.DoesNotExist: raise NotFound('A profile with this username was not found.') if follower.pk is followee.pk: raise serializers.ValidationError('You can not follow yourself.') follower.follow(followee) serializer = self.serializer_class(followee, context={ 'request': request }) return Response(serializer.data, status=status.HTTP_201_CREATED)
def update(self, request, slug): serializer_context = {'request': request} try: serializer_instance = self.queryset.get(slug=slug) except Article.DoesNotExist: raise NotFound('An article with this slug does not exist.') serializer_data = request.data.get('article', {}) serializer = self.serializer_class( serializer_instance, context=serializer_context, data=serializer_data, partial=True ) serializer.is_valid(raise_exception=True) serializer.save() return Response(serializer.data, status=status.HTTP_200_OK)
def post(self, request, join_id): try: game = Game.objects.get(join_id=join_id) except Game.DoesNotExist: raise NotFound("Invalid join_id",404) if game.game_state != 'starting': raise ValidationError("You can no longer join this game") data = request.data enforce_required_params(['name'],data) player = Player.objects.create(name=data['name'], game=game) if game.num_players == game.players.count(): game.game_state = 'pick_wc' game.save() game.start_new_round() return Response({ 'player':player.get_dict(show_private=True), 'game':game.get_dict()})
def history(self, request, data, error_message, filter_lookup, order_function, *args, **kwargs): user_id = request.channel_session.get('user') if user_id is not None: composition_id = int(kwargs.get('composition_id')) serializer = DiffHistorySerializer( data=data, context={'composition_id': composition_id} ) serializer.is_valid(raise_exception=True) diff_version_id = serializer.data['diff_composition_version'] diff_version = getattr(DiffCompositionVersion.objects .filter( composition_id=composition_id, **{filter_lookup: diff_version_id} ), order_function)() if diff_version is None: raise NotFound(error_message) self.route_send( request.reply_channel, DiffCompositionVersionSerializer(diff_version).data, status.HTTP_200_OK ) else: raise PermissionDenied
def retrieve(self, request, key=None, *args, **kwargs): show_redirects = 'show_redirects' in request.GET detailed = 'detailed' in request.GET geometry = 'geometry' in request.GET location = self.get_object() if location is None: raise NotFound if isinstance(location, LocationRedirect): if not show_redirects: return redirect('../' + str(location.target.slug)) # todo: why does redirect/reverse not work here? return Response(location.serialize(include_type=True, detailed=detailed, geometry=geometry, simple_geometry=True))
def list(self, request, *args, **kwargs): # get database and table from the querystring database_name = self.request.GET.get('database') table_name = self.request.GET.get('table') column_names = self.request.GET.getlist('column') # get the columns using the utils function columns = get_columns(self.request.user, database_name, table_name) if columns: if column_names: columns = [column for column in columns if column['name'] in column_names] return Response(ColumnSerializer(columns, many=True).data) # if nothing worked, return 404 raise NotFound()
def retrieve_organization_scan_config(request, pk=None): """ Retrieve the ScanConfig associated with the referenced organization. :param request: The request received by this API handler. :param pk: The primary key of the organization to retrieve the ScanConfig for. :return: A response containing the ScanConfig associated with the given Organization. """ if request.user.is_superuser: query = rest.models.Organization.objects else: query = rest.models.Organization.objects.filter( auth_groups__users=request.user, auth_groups__name="org_read", ) try: organization = query.get(pk=pk) except rest.models.Organization.DoesNotExist: raise NotFound() if not organization.scan_config: raise NotFound() else: return_data = rest.serializers.ScanConfigSerializer(organization.scan_config) return Response(return_data.data)
def check_scan_config_validity(request, pk=None): """ Check to see if a given ScanConfig is valid for an order to be placed with it. :param request: The request that invoked this handler. :param pk: The primary key of the ScanConfig to check. :return: An HTTP response. """ try: if request.user.is_superuser: query = rest.models.ScanConfig.objects else: query = rest.models.ScanConfig.objects\ .filter( Q(user=request.user) | Q(is_default=True) ) scan_config = query.get(pk=pk) except rest.models.ScanConfig.DoesNotExist: raise NotFound() return rest.responses.WsScanConfigValidityResponse(scan_config=scan_config, status=200)
def try_to_get(obj, pk): """ This method can be used to get a model instance based on it's primary key. If the instance does not exists, it will automatically throws a 404 error. Method version : uses the queryset provided in the subclass definition Static version : takes the queryset to use as its first argument. """ if isinstance(obj, SigmaViewSet): return SigmaViewSet.try_to_get(obj.__class__.queryset, pk) try: instance = obj.get(pk=pk) except obj.model.DoesNotExist: raise NotFound() return instance
def get_object(self): zone = self.zone record_id = self.kwargs['record_id'] for record in zone.records: if record.id == record_id: return record raise NotFound(detail='Record not found.')
def get(self, request, format=None, *args, **kwargs): query = """ SELECT row_to_json(fc) FROM ( SELECT 'FeatureCollection' AS type, array_to_json(array_agg(f)) AS features FROM (SELECT 'Feature' AS type, ST_AsGeoJSON(g.geom_simple)::json AS geometry, g.uuid AS id, row_to_json((SELECT p FROM ( SELECT uuid AS id, name, label, state_abbrev, organization_id) AS p)) AS properties FROM pfb_analysis_neighborhood AS g WHERE g.uuid = %s) AS f) AS fc; """ # get the neighborhood ID from the request uuid = kwargs.get('neighborhood', '') if not uuid: return Response({}) try: with connection.cursor() as cursor: cursor.execute(query, [uuid]) json = cursor.fetchone() if not json or not len(json): return Response({}) except DataError: raise NotFound(detail='{} is not a valid neighborhood UUID.'.format(uuid)) return Response(json[0])
def scan_result(request: Request, scan_id: int) -> Response: """Get a scan result by its id.""" try: scan = Scan.objects.get(pk=scan_id) return Response(scan.result.result) except Scan.DoesNotExist: raise NotFound except ScanResult.DoesNotExist: raise NotFound('scan not finished')
def not_found(request): raise NotFound('NotFound')
def get_game(self): try: return Game.objects.get(pk=self.kwargs['game_pk']) except Game.DoesNotExist: raise NotFound('game not found')
def get_round(self): try: game = self.get_game() round_number = self.kwargs['round_number'] return Round.objects.get(game=game, number=round_number) except Game.DoesNotExist: raise NotFound('game not found') except Round.DoesNotExist: raise NotFound('round not found')
def get_object(self): game_pk = self.kwargs['game_pk'] round_number = self.kwargs['round_number'] try: return Play.objects.get(game=game_pk, round__number=round_number) except Play.DoesNotExist: raise NotFound('play not found')
def get_object(self): game_pk = self.kwargs['game_pk'] number = self.kwargs['player_number'] try: return get_object_or_404(Player, game=game_pk, number=number) except Player.DoesNotExist: raise NotFound('player not found')
def has_permission(self, request, view): try: game_pk = view.kwargs['game_pk'] return Game.objects.get(pk=game_pk, players__user=request.user) except Game.DoesNotExist: raise NotFound('game not found')
def _extract_date(self): """ Extract date from request. In detail route extract it from pk and it list from query params. """ pk = self.request.parser_context['kwargs'].get('pk') # detail case if pk is not None: try: return datetime.datetime.strptime( pk.split('_')[1], '%Y-%m-%d' ) except (ValueError, TypeError, IndexError): raise exceptions.NotFound() # list case query_params = self.request.query_params try: return datetime.datetime.strptime( query_params.get('date'), '%Y-%m-%d' ).date() except ValueError: raise exceptions.ParseError(_('Date is invalid')) except TypeError: if query_params.get('last_reported_date', '0') == '0': raise exceptions.ParseError(_('Date filter needs to be set')) return None
def _extract_date(self): """ Extract date from request. In detail route extract it from pk and it list from query params. """ pk = self.request.parser_context['kwargs'].get('pk') # detail case if pk is not None: try: return datetime.datetime.strptime( pk.split('_')[2], '%Y-%m-%d' ) except (ValueError, TypeError, IndexError): raise exceptions.NotFound() # list case try: return datetime.datetime.strptime( self.request.query_params.get('date'), '%Y-%m-%d' ).date() except ValueError: raise exceptions.ParseError(_('Date is invalid')) except TypeError: raise exceptions.ParseError(_('Date filter needs to be set'))
def _extract_user(self): """ Extract user from request. In detail route extract it from pk and it list from query params. """ pk = self.request.parser_context['kwargs'].get('pk') # detail case if pk is not None: try: user_id = int(pk.split('_')[0]) # avoid query if user is self if self.request.user.id == user_id: return self.request.user return get_user_model().objects.get(pk=pk.split('_')[0]) except (ValueError, get_user_model().DoesNotExist): raise exceptions.NotFound() # list case try: user_id = self.request.query_params.get('user') if user_id is None: raise exceptions.ParseError(_('User filter needs to be set')) # avoid query if user is self if self.request.user.id == int(user_id): return self.request.user return get_user_model().objects.get(pk=user_id) except (ValueError, get_user_model().DoesNotExist): raise exceptions.ParseError(_('User is invalid'))
def determine_version(self, request, *args, **kwargs): version = kwargs.get(self.version_param, self.default_version) if not self.is_allowed_version(version): raise exceptions.NotFound(self.invalid_version_message) return version
def determine_version(self, request, *args, **kwargs): resolver_match = getattr(request, 'resolver_match', None) if resolver_match is None or not resolver_match.namespace: return self.default_version # Allow for possibly nested namespaces. possible_versions = resolver_match.namespace.split(':') for version in possible_versions: if self.is_allowed_version(version): return version raise exceptions.NotFound(self.invalid_version_message)
def determine_version(self, request, *args, **kwargs): hostname, separator, port = request.get_host().partition(':') match = self.hostname_regex.match(hostname) if not match: return self.default_version version = match.group(1) if not self.is_allowed_version(version): raise exceptions.NotFound(self.invalid_version_message) return version # We don't need to implement `reverse`, as the hostname will already be # preserved as part of the REST framework `reverse` implementation.
def determine_version(self, request, *args, **kwargs): version = request.query_params.get(self.version_param, self.default_version) if not self.is_allowed_version(version): raise exceptions.NotFound(self.invalid_version_message) return version
def paginate_queryset(self, queryset, request, view=None): """ Paginate a queryset if required, either returning a page object, or `None` if pagination is not configured for this view. """ page_size = self.get_page_size(request) if not page_size: return None paginator = self.django_paginator_class(queryset, page_size) page_number = request.query_params.get(self.page_query_param, 1) if page_number in self.last_page_strings: page_number = paginator.num_pages try: self.page = paginator.page(page_number) except InvalidPage as exc: msg = self.invalid_page_message.format( page_number=page_number, message=six.text_type(exc) ) raise NotFound(msg) if paginator.num_pages > 1 and self.template is not None: # The browsable API should display pagination controls. self.display_page_controls = True self.request = request return list(self.page)
def decode_cursor(self, request): """ Given a request with a cursor, return a `Cursor` instance. Differs from the standard CursorPagination to handle a tuple in the position field. """ # Determine if we have a cursor, and if so then decode it. encoded = request.query_params.get(self.cursor_query_param) if encoded is None: return None try: querystring = b64decode(encoded.encode('ascii')).decode('ascii') tokens = urlparse.parse_qs(querystring, keep_blank_values=True) offset = tokens.get('o', ['0'])[0] # This was hard-coded until Django REST Framework 3.4.0. try: offset_cutoff = self.offset_cutoff except AttributeError: offset_cutoff = 1000 offset = _positive_int(offset, cutoff=offset_cutoff) reverse = tokens.get('r', ['0'])[0] reverse = bool(int(reverse)) # The difference. Don't get just the 0th entry: get all entries. position = tokens.get('p', None) except (TypeError, ValueError): raise NotFound(self.invalid_cursor_message) return Cursor(offset=offset, reverse=reverse, position=position)
def test_invalid_cursor(self): request = Request(factory.get('/', {'cursor': '123'})) with self.assertRaises(exceptions.NotFound): self.pagination.paginate_queryset(self.queryset, request)
def get(self, request, *args, **kwargs): company = request.user.company notification_id = kwargs['notification_id'] try: note = Notification.objects.get(company=company, id=notification_id) except Notification.DoesNotExist: raise exceptions.NotFound() serializer = self.get_serializer(note) return Response({'status': 'success', 'data': serializer.data})
def patch(self, request, *args, **kwargs): company = request.user.company notification_id = kwargs['notification_id'] try: note = Notification.objects.get(company=company, id=notification_id) except Notification.DoesNotExist: raise exceptions.NotFound() serializer = self.get_serializer(note, data=request.data, partial=True) serializer.is_valid(raise_exception=True) instance = serializer.save() return Response({'status': 'success', 'data': serializer.data})
def delete(self, request, *args, **kwargs): company = request.user.company notification_id = kwargs['notification_id'] try: note = Notification.objects.get(company=company, id=notification_id) except Notification.DoesNotExist: raise exceptions.NotFound() serializer = self.get_serializer(note) instance = serializer.delete() return Response({'status': 'success'})
def get(self, request, *args, **kwargs): company = request.user.company log_id = kwargs['log_id'] try: log = NotificationLog.objects.get(notification__company=company, id=log_id) except NotificationLog.DoesNotExist: raise exceptions.NotFound() serializer = self.get_serializer(log) return Response({'status': 'success', 'data': serializer.data})
def get(self, request, *args, **kwargs): company = request.user.company notification_id = kwargs['notification_id'] try: note = Notification.objects.get(company=company, id=notification_id, preference_enabled=True) except Notification.DoesNotExist: raise exceptions.NotFound() serializer = self.get_serializer(note) return Response({'status': 'success', 'data': serializer.data})
def get_last_build(request, package_name, version=None): account = request.user package = get_object_or_404(Package, name=package_name, account=account) last_build = package.builds.filter(version=version).order_by('-build_number').first() if not last_build: raise exceptions.NotFound("Couldn't find a build for this package name") # Serialize the new build serializer = BuildSerializer(last_build) response_data = serializer.data.copy() response_data.update(last_build.extra) return Response(response_data)
def determine_version(self, request, *args, **kwargs): resolver_match = getattr(request, 'resolver_match', None) if (resolver_match is None or not resolver_match.namespace): return self.default_version version = resolver_match.namespace if not self.is_allowed_version(version): raise exceptions.NotFound(self.invalid_version_message) return version
def determine_version(self, request, *args, **kwargs): hostname, seperator, port = request.get_host().partition(':') match = self.hostname_regex.match(hostname) if not match: return self.default_version version = match.group(1) if not self.is_allowed_version(version): raise exceptions.NotFound(self.invalid_version_message) return version # We don't need to implement `reverse`, as the hostname will already be # preserved as part of the REST framework `reverse` implementation.
def determine_version(self, request, *args, **kwargs): resolver_match = getattr(request, 'resolver_match', None) if (resolver_match is None or not resolver_match.namespace): return self.default_version version = resolver_match.namespace.split(':')[0] if not self.is_allowed_version(version): raise exceptions.NotFound(self.invalid_version_message) return version
def get_queryset(self): movie_pk = self.kwargs['pk'] result = Movie.objects.filter(pk=movie_pk) if result: return result raise NotFound('?? ??? ????.')
def metrics(request): metrics = Metric.objects.all().order_by('name') ds = request.query_params.get('ds') if ds: # Further filter metrics by dataset. metrics = (metrics.filter(collection__dataset__slug=ds) .distinct('id', 'name')) if not metrics: raise NotFound('No data set with given dataset found.') return Response([MetricSerializer(m).data for m in metrics])