我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用rest_framework.exceptions.ParseError()。
def delete_scan_list(request: Request, token: str) -> Response: """Update an existing list.""" # TODO: Access control (Or is token sufficient)? try: scan_list = ScanList.objects.get(token=token) # all related objects CASCADE automatically. scan_list.delete() return Response({ 'type': 'success', 'message': 'ok', }) except KeyError as e: raise ParseError except ScanList.DoesNotExist: raise NotFound # TODO: Why POST? # TODO: Add a filter option to get_lists and get rid of this search method
def authenticate(self, request): if request.META['REQUEST_METHOD'] != 'POST': return None print 'Request data: {}'.format(request.data) if 'HTTP_TOKEN' not in request.META: raise exceptions.ParseError("Registration Token not present in the request.") elif 'sampling_feature' not in request.data: raise exceptions.ParseError("Sampling feature UUID not present in the request.") # Get auth_token(uuid) from header, get registration object with auth_token, get the user from that registration, verify sampling_feature uuid is registered by this user, be happy. token = request.META['HTTP_TOKEN'] registration = SiteRegistration.objects.filter(registration_token=token).first() if not registration: raise exceptions.PermissionDenied('Invalid Security Token') # request needs to have the sampling feature uuid of the registration - if str(registration.sampling_feature.sampling_feature_uuid) != request.data['sampling_feature']: raise exceptions.AuthenticationFailed( 'Site Identifier is not associated with this Token') # or other related exception return None
def parse(self, stream, media_type=None, parser_context=None): """ Parses the incoming bytestream as a multipart encoded form, and returns a DataAndFiles object. `.data` will be a `QueryDict` containing all the form parameters. `.files` will be a `QueryDict` containing all the form files. """ parser_context = parser_context or {} request = parser_context['request'] encoding = parser_context.get('encoding', settings.DEFAULT_CHARSET) meta = request.META.copy() meta['CONTENT_TYPE'] = media_type upload_handlers = request.upload_handlers try: parser = DjangoMultiPartParser(meta, stream, upload_handlers, encoding) data, files = parser.parse() return DataAndFiles(data, files) except MultiPartParserError as exc: raise ParseError('Multipart form parse error - %s' % six.text_type(exc))
def contains_payment(self, price, request_headers, **kwargs): """Validate the payment information received in the request headers. Args: price (int): The price the user must pay for the resource. request_headers (dict): Headers sent by client with their request. keyword args: Any other headers needed to verify payment. Returns: (bool): True if payment is valid, False if no payment attached (402 initiation). Raises: ParseError: If request is malformed. """ for method in self.allowed_methods: if method.should_redeem(request_headers): try: v = method.redeem_payment(price, request_headers, **kwargs) except Exception as e: raise ParseError(str(e)) return v return False
def get_object(self): """Lookup user profile by pk or username""" lookup = self.kwargs.get(self.lookup_field, None) if lookup is None: raise ParseError( 'Expected URL keyword argument `%s`.' % self.lookup_field ) queryset = self.filter_queryset(self.get_queryset()) try: pk = int(lookup) except (TypeError, ValueError): filter_kwargs = {'username': lookup} else: filter_kwargs = {'pk': pk} # Return a 404 if the user does not exist user = get_object_or_404(User, **filter_kwargs) # Since the user does exist, create a matching profile if necessary obj, created = queryset.get_or_create(user=user) # May raise a permission denied self.check_object_permissions(self.request, obj) return obj
def get_object(self): obj = super(DataViewSet, self).get_object() pk_lookup, dataid_lookup = self.lookup_fields pk = self.kwargs.get(pk_lookup) dataid = self.kwargs.get(dataid_lookup) if pk is not None and dataid is not None: try: int(dataid) except ValueError: raise ParseError(_(u"Invalid dataid %(dataid)s" % {'dataid': dataid})) obj = get_object_or_404(Instance, pk=dataid, xform__pk=pk) return obj
def filter_queryset(self, queryset, view=None): qs = super(DataViewSet, self).filter_queryset(queryset) pk = self.kwargs.get(self.lookup_field) tags = self.request.query_params.get('tags', None) if tags and isinstance(tags, six.string_types): tags = tags.split(',') qs = qs.filter(tags__name__in=tags).distinct() if pk: try: int(pk) except ValueError: if pk == self.public_data_endpoint: qs = self._get_public_forms_queryset() else: raise ParseError(_(u"Invalid pk %(pk)s" % {'pk': pk})) else: qs = self._filtered_or_shared_qs(qs, pk) return qs
def enketo(self, request, *args, **kwargs): self.object = self.get_object() data = {} if isinstance(self.object, XForm): raise ParseError(_(u"Data id not provided.")) elif(isinstance(self.object, Instance)): if request.user.has_perm("change_xform", self.object.xform): return_url = request.query_params.get('return_url') if not return_url: raise ParseError(_(u"return_url not provided.")) try: data["url"] = get_enketo_edit_url( request, self.object, return_url) except EnketoError as e: data['detail'] = "{}".format(e) else: raise PermissionDenied(_(u"You do not have edit permissions.")) return Response(data=data)
def to_representation(self, obj): request = self.context.get('request') if not isinstance(obj, XForm): return super(DataListSerializer, self).to_representation(obj) query_params = (request and request.query_params) or {} query = { ParsedInstance.USERFORM_ID: u'%s_%s' % (obj.user.username, obj.id_string) } try: query.update(json.loads(query_params.get('query', '{}'))) except ValueError: raise ParseError(_("Invalid query: %(query)s" % {'query': query_params.get('query')})) query_kwargs = { 'query': json.dumps(query), 'fields': query_params.get('fields'), 'sort': query_params.get('sort') } cursor = ParsedInstance.query_mongo_minimal(**query_kwargs) return list(cursor)
def perform_create(self, serializer): """ 1. ?? ??? ?? 2. ??? ??? ???? ?? ?? 3. ????? ???? ?? """ # print(self) movie_pk = self.kwargs['pk'] movie = Movie.objects.get(pk=movie_pk) author = MyUser.objects.get(pk=self.request.user.id) a1 = Actor.objects.filter(movie=movie_pk) a2 = Actor.objects.get(pk=self.request.data['actor']) # ?? ??? ???? ?? if a2 not in [i for i in a1]: raise ParseError('?? ??? ?? ? ????.') # ?? ??? content = self.request.data['content'] r = ProfanitiesFilter() clean_content = r.clean(content) serializer.save(movie=movie, actor=a2, author=author, content=clean_content)
def perform_update(self, serializer): """ 1. ?? ??? ?? 2. ????? ???? ?? """ famous_pk = self.kwargs['pk'] # get object ??? pk? url ??? ? movie_pk -> pk? ??? famous_line = FamousLine.objects.get(pk=famous_pk) movie_pk = famous_line.movie.pk actors = Actor.objects.filter(movie=movie_pk) actor = Actor.objects.get(pk=self.request.data['actor']) # ?? ??? try: content = self.request.data['content'] r = ProfanitiesFilter() clean_content = r.clean(content) except: clean_content = serializer.instance.content # ?? ??? ???? ?? if actor not in [i for i in actors]: raise ParseError('?? ??? ?? ? ????.') serializer.save(content=clean_content)
def validate_data(self, stream_data): template_valid_str = "Valid format: {template:{data:[{name: ,value: },...]}}" if not isinstance(stream_data, dict): detail = "Template is not a dictionary. " detail += template_valid_str raise ParseError(detail=detail) json_data = {} try: for x in stream_data['template']['data']: json_data[x['name']] = x['value'] except KeyError as e: detail = "%s field required. " % e detail += template_valid_str raise ParseError(detail=detail) except TypeError as e: detail = "Invalid data provided. " detail += template_valid_str raise ParseError(detail=detail) return json_data
def get(self, request, string, format=None): """ request: GET /biocircuit/ID response: json api_circuit """ try: digit_count = len([c for c in string if c in ('0', '1')]) if digit_count < 2: raise ParseError(detail="At least two digits are required.") expr = biocircuit.string2expr(string) circuit = biocircuit.create_circuit(expr) scores = biocircuit.circuit_score(circuit, biogate.d_gate) response_dict = biocircuit.api_circuit(circuit, scores) return Response(response_dict) except BaseException as error: # raise response = {} response["status"] = "failed" response["detail"] = str(error) return Response(response, status=status.HTTP_400_BAD_REQUEST)
def get_times(request): """Gets start and endtime from request As we use no timezone in NAV, remove it from parsed timestamps :param request: django.http.HttpRequest """ starttime = request.GET.get('starttime') endtime = request.GET.get('endtime') try: if starttime: starttime = iso8601.parse_date(starttime).replace(tzinfo=None) if endtime: endtime = iso8601.parse_date(endtime).replace(tzinfo=None) except iso8601.ParseError: raise Iso8601ParseError return starttime, endtime
def get_object(self, queryset=None): obj = super(DataViewSet, self).get_object(queryset) pk_lookup, dataid_lookup = self.lookup_fields pk = self.kwargs.get(pk_lookup) dataid = self.kwargs.get(dataid_lookup) if pk is not None and dataid is not None: try: int(dataid) except ValueError: raise ParseError(_(u"Invalid dataid %(dataid)s" % {'dataid': dataid})) obj = get_object_or_404(Instance, pk=dataid, xform__pk=pk) return obj
def filter_queryset(self, queryset, view=None): qs = super(DataViewSet, self).filter_queryset(queryset) pk = self.kwargs.get(self.lookup_field) tags = self.request.QUERY_PARAMS.get('tags', None) if tags and isinstance(tags, six.string_types): tags = tags.split(',') qs = qs.filter(tags__name__in=tags).distinct() if pk: try: int(pk) except ValueError: if pk == self.public_data_endpoint: qs = self._get_public_forms_queryset() else: raise ParseError(_(u"Invalid pk %(pk)s" % {'pk': pk})) else: qs = self._filtered_or_shared_qs(qs, pk) return qs
def enketo(self, request, *args, **kwargs): self.object = self.get_object() data = {} if isinstance(self.object, XForm): raise ParseError(_(u"Data id not provided.")) elif(isinstance(self.object, Instance)): if request.user.has_perm("change_xform", self.object.xform): return_url = request.QUERY_PARAMS.get('return_url') if not return_url: raise ParseError(_(u"return_url not provided.")) try: data["url"] = get_enketo_edit_url( request, self.object, return_url) except EnketoError as e: data['detail'] = "{}".format(e) else: raise PermissionDenied(_(u"You do not have edit permissions.")) return Response(data=data)
def to_native(self, obj): if obj is None: return super(StatsInstanceSerializer, self).to_native(obj) request = self.context.get('request') method = request.QUERY_PARAMS.get('method', None) field = request.QUERY_PARAMS.get('field', None) if field and field not in obj.data_dictionary().get_keys(): raise exceptions.ParseError(detail=_("Field not in XForm.")) stats_function = STATS_FUNCTIONS.get(method and method.lower(), get_all_stats) try: data = stats_function(obj, field) except ValueError as e: raise exceptions.ParseError(detail=e.message) return data
def get_queryset(self): """returns actions""" start_date = self.request.GET.get('start') end_date = self.request.GET.get('end') if not start_date or not end_date: raise ParseError("Period frame missing") queryset = self.model.objects.all() queryset = self._apply_in_action_type_lookup(queryset) queryset = self._apply_in_charge_lookup(queryset) try: start_date = self._parse_date(start_date) end_date = self._parse_date(end_date) except ValueError: raise ParseError("Invalid period frame") start_datetime = datetime.combine(start_date, time.min) end_datetime = datetime.combine(end_date, time.max) if end_datetime < start_datetime: return self.model.objects.none() queryset = queryset.filter( Q(planned_date__lte=start_datetime, end_datetime__gte=end_datetime) | # starts before, ends after period Q(planned_date__gte=start_datetime, end_datetime__lte=end_datetime) | # starts and ends during period Q(planned_date__lte=start_datetime, end_datetime__gte=start_datetime) | # starts before, ends during Q(planned_date__lte=end_datetime, end_datetime__gte=end_datetime) | # starts during period, ends after Q( planned_date__gte=start_datetime, end_datetime__isnull=True, planned_date__lte=end_datetime ) # no end, starts during period ) return queryset
def check_data(self, request): """check and prepare data""" self.contacts = request.data.get("contacts", []) action_type_id = request.data.get('type', None) action_status_id = request.data.get('status', None) if action_type_id and action_status_id: try: action_type = ActionType.objects.get(id=action_type_id) action_status = ActionStatus.objects.get(id=action_status_id) if action_status not in action_type.allowed_status.all(): raise ParseError( "status {0} is not allowed for type {1}".format(action_type.name, action_status.name) ) except (ValueError, ActionType.DoesNotExist, ActionStatus.DoesNotExist): raise ParseError("Invalid parameters")
def save_object(self, serializer): """save object""" contacts = [] for contact_id in self.contacts: try: contact = Contact.objects.get(id=contact_id) contacts.append(contact) except (ValueError, Contact.DoesNotExist): raise ParseError("Invalid contacts") obj = serializer.save() obj.contacts.clear() for contact in contacts: obj.contacts.add(contact) obj.save() return obj
def update_scan_list(request: Request, scan_list_id: int) -> Response: """Update an existing list.""" try: # TODO: Check if list is editable (and by current user) scan_list = ScanList.objects.get(pk=scan_list_id, token=request.data['token']) scan_list.name = request.data['listname'] scan_list.description = request.data['description'] scan_list.private = request.data['isprivate'] # save tags scan_list.save_tags(request.data['tags']) # save columns scan_list.save_columns(request.data['columns']) scan_list.save() return Response({ 'type': 'success', 'message': 'ok', }) except KeyError as e: raise ParseError except ScanList.DoesNotExist: raise NotFound
def _extract_date(self): """ Extract date from request. In detail route extract it from pk and it list from query params. """ pk = self.request.parser_context['kwargs'].get('pk') # detail case if pk is not None: try: return datetime.datetime.strptime( pk.split('_')[1], '%Y-%m-%d' ) except (ValueError, TypeError, IndexError): raise exceptions.NotFound() # list case query_params = self.request.query_params try: return datetime.datetime.strptime( query_params.get('date'), '%Y-%m-%d' ).date() except ValueError: raise exceptions.ParseError(_('Date is invalid')) except TypeError: if query_params.get('last_reported_date', '0') == '0': raise exceptions.ParseError(_('Date filter needs to be set')) return None
def _extract_date(self): """ Extract date from request. In detail route extract it from pk and it list from query params. """ pk = self.request.parser_context['kwargs'].get('pk') # detail case if pk is not None: try: return datetime.datetime.strptime( pk.split('_')[2], '%Y-%m-%d' ) except (ValueError, TypeError, IndexError): raise exceptions.NotFound() # list case try: return datetime.datetime.strptime( self.request.query_params.get('date'), '%Y-%m-%d' ).date() except ValueError: raise exceptions.ParseError(_('Date is invalid')) except TypeError: raise exceptions.ParseError(_('Date filter needs to be set'))
def _extract_user(self): """ Extract user from request. In detail route extract it from pk and it list from query params. """ pk = self.request.parser_context['kwargs'].get('pk') # detail case if pk is not None: try: user_id = int(pk.split('_')[0]) # avoid query if user is self if self.request.user.id == user_id: return self.request.user return get_user_model().objects.get(pk=pk.split('_')[0]) except (ValueError, get_user_model().DoesNotExist): raise exceptions.NotFound() # list case try: user_id = self.request.query_params.get('user') if user_id is None: raise exceptions.ParseError(_('User filter needs to be set')) # avoid query if user is self if self.request.user.id == int(user_id): return self.request.user return get_user_model().objects.get(pk=user_id) except (ValueError, get_user_model().DoesNotExist): raise exceptions.ParseError(_('User is invalid'))
def parse(self, stream, media_type=None, parser_context=None): """ Parses the incoming bytestream as JSON and returns the resulting data. """ parser_context = parser_context or {} encoding = parser_context.get('encoding', settings.DEFAULT_CHARSET) try: data = stream.read().decode(encoding) return json.loads(data) except ValueError as exc: raise ParseError('JSON parse error - %s' % six.text_type(exc))
def get_queryset(self): queryset = Event.objects.get_online() since = self.request.query_params.get('since', None) until = self.request.query_params.get('until', None) if since is None and until is None: # We return event for the next two days since = datetime.datetime.now().strftime('%Y-%m-%d') until = (datetime.datetime.now() + datetime.timedelta(days=7)).strftime('%Y-%m-%d') if since is not None and until is None: try: since_date = datetime.datetime.strptime(since, '%Y-%m-%d') queryset = queryset.filter( (Q(begins_at__lte=since_date) & Q(begins_at__gte=since_date)) | (Q(begins_at__gte=since_date)) ) except ValueError: raise ParseError("Bad format for since parameter. Accepted format : %Y-%m-%d.") elif since is None and until is not None: try: until_date = datetime.datetime.strptime(until, '%Y-%m-%d') queryset = queryset.filter( (Q(begins_at__lte=until_date) & Q(ends_at__gte=until_date)) | (Q(ends_at__lte=until_date)) ) except ValueError: raise ParseError("Bad format for until parameter. Accepted format : %Y-%m-%d.") else: try: since_date = datetime.datetime.strptime(since, '%Y-%m-%d') until_date = datetime.datetime.strptime(until, '%Y-%m-%d') queryset = queryset.filter( (Q(begins_at__gte=since_date) & Q(begins_at__lte=until_date)) | (Q(ends_at__gte=since_date) & Q(ends_at__lte=until_date)) | (Q(begins_at__lte=since_date) & Q(ends_at__gte=until_date)) ) except ValueError: raise ParseError("Bad format for since/until parameters. Accepted format : %Y-%m-%d.") return queryset
def destroy(self, request, *args, **kwargs): self.object = self.get_object() if isinstance(self.object, XForm): raise ParseError(_(u"Data id not provided.")) elif isinstance(self.object, Instance): if request.user.has_perm("delete_xform", self.object.xform): self.object.delete() else: raise PermissionDenied(_(u"You do not have delete " u"permissions.")) return Response(status=status.HTTP_204_NO_CONTENT)
def _get_export_type(export_type): if export_type in EXPORT_EXT.keys(): export_type = EXPORT_EXT[export_type] else: raise exceptions.ParseError( _(u"'%(export_type)s' format not known or not implemented!" % {'export_type': export_type}) ) return export_type
def _set_start_end_params(request, query): format_date_for_mongo = lambda x, datetime: datetime.strptime( x, '%y_%m_%d_%H_%M_%S').strftime('%Y-%m-%dT%H:%M:%S') # check for start and end params if 'start' in request.GET or 'end' in request.GET: query = json.loads(query) \ if isinstance(query, six.string_types) else query query[SUBMISSION_TIME] = {} try: if request.GET.get('start'): query[SUBMISSION_TIME]['$gte'] = format_date_for_mongo( request.GET['start'], datetime) if request.GET.get('end'): query[SUBMISSION_TIME]['$lte'] = format_date_for_mongo( request.GET['end'], datetime) except ValueError: raise exceptions.ParseError( _("Dates must be in the format YY_MM_DD_hh_mm_ss") ) else: query = json.dumps(query) return query
def update(self, request, pk, *args, **kwargs): if 'xls_file' in request.FILES or 'text_xls_form' in request.data: # A new XLSForm has been uploaded and will replace the existing # form existing_xform = get_object_or_404(XForm, pk=pk) # Behave like `onadata.apps.main.views.update_xform`: only allow # the update to proceed if the user is the owner owner = existing_xform.user if request.user.pk != owner.pk: raise exceptions.PermissionDenied( detail=_("Only a form's owner can overwrite its contents")) survey = utils.publish_xlsform(request, owner, existing_xform) if not isinstance(survey, XForm): if isinstance(survey, dict) and 'text' in survey: # Typical error text; pass it along raise exceptions.ParseError(detail=survey['text']) else: # Something odd; hopefully it can be coerced into a string raise exceptions.ParseError(detail=survey) # Let the superclass handle updates to the other fields # noti = existing_xform.logs.create(source=request.user, type=7, title="Kobo form Updated", # organization=request.organization, # description="new kobo form {0} Updated by {1}". # format(existing_xform.title, request.user.username)) # result = {} # result['description'] = noti.description # result['url'] = noti.get_absolute_url() # ChannelGroup("notify-0").send({"text":json.dumps(result)}) # if noti.organization: # ChannelGroup("notify-{}".format(noti.organization.id)).send({"text":json.dumps(result)}) return super(XFormViewSet, self).update(request, pk, *args, **kwargs)
def to_representation(self, obj): if obj is None: return \ super(SubmissionStatsInstanceSerializer, self).to_representation(obj) request = self.context.get('request') field = request.query_params.get('group') name = request.query_params.get('name', field) if field is None: raise exceptions.ParseError(_(u"Expecting `group` and `name`" u" query parameters.")) try: data = get_form_submissions_grouped_by_field( obj, field, name) except ValueError as e: raise exceptions.ParseError(detail=e.message) else: if data: dd = obj.data_dictionary() element = dd.get_survey_element(field) if element and element.type in SELECT_FIELDS: for record in data: label = dd.get_choice_label(element, record[name]) record[name] = label return data
def to_internal_value(self, data): if type(data) is not list: raise ParseError("expected a list of data") return data
def get_object(self): """ Incase the lookup is on an object that has been hyperlinked then update the queryset filter appropriately """ if self.kwargs.get(self.lookup_field, None) is None: raise ParseError( 'Expected URL keyword argument `%s`.' % self.lookup_field ) queryset = self.filter_queryset(self.get_queryset()) filter_kwargs = {} serializer = self.get_serializer() lookup_field = self.lookup_field if self.lookup_field in serializer.get_fields(): k = serializer.get_fields()[self.lookup_field] if isinstance(k, serializers.HyperlinkedRelatedField): lookup_field = '%s__%s' % (self.lookup_field, k.lookup_field) filter_kwargs[lookup_field] = self.kwargs[self.lookup_field] obj = get_object_or_404(queryset, **filter_kwargs) # May raise a permission denied self.check_object_permissions(self.request, obj) return obj
def get_object(self): queryset = self.filter_queryset(self.get_queryset()) filter_kwargs = {} serializer = self.get_serializer() lookup_fields = getattr(self, 'lookup_fields', []) for field in lookup_fields: lookup_field = field if lookup_field in serializer.get_fields(): k = serializer.get_fields()[lookup_field] if isinstance(k, serializers.HyperlinkedRelatedField): if k.source: lookup_field = k.source lookup_field = '%s__%s' % (lookup_field, k.lookup_field) if self.kwargs.get(field, None) is None: raise ParseError( 'Expected URL keyword argument `%s`.' % field ) filter_kwargs[lookup_field] = self.kwargs[field] obj = get_object_or_404(queryset, **filter_kwargs) # May raise a permission denied self.check_object_permissions(self.request, obj) return obj
def filter_queryset(self, request, queryset, view): """ Anonymous user has no object permissions, return queryset as it is. """ user = request.user project_id = view.kwargs.get(view.lookup_field) if user.is_anonymous(): return queryset.filter(Q(shared=True)) if project_id: try: int(project_id) except ValueError: raise ParseError( u"Invalid value for project_id '%s' must be a positive " "integer." % project_id) # check if project is public and return it try: project = queryset.get(id=project_id) except ObjectDoesNotExist: raise Http404 if project.shared: return queryset.filter(Q(id=project_id)) return super(AnonUserProjectFilter, self)\ .filter_queryset(request, queryset, view)
def filter_queryset(self, request, queryset, view): queryset = self._xform_filter_queryset(request, queryset, view, 'instance__xform') instance_id = request.query_params.get('instance') if instance_id: try: int(instance_id) except ValueError: raise ParseError( u"Invalid value for instance %s." % instance_id) instance = get_object_or_404(Instance, pk=instance_id) queryset = queryset.filter(instance=instance) return queryset
def _serialize_data(self, data, serializer_class=None): """ Function to serialize data with the serializer given in the serializer_class attribute. Also validates the data and responds with a HTTP_400_BAD_REQUEST when validation failed. Due to being an open api we do not want to give away the required fields and their requirements. Args: data(dict): Dictonary with that data that need to be serialized. serializer_class(Serializer): Class to use for serialization. Returns: dict: Dictionary with the validated data. Raises: NotImplementedError: When serializer_class attribute is not set. ParseError: When validation fails but because it's an open API we do not want to give away what failed. ParseError returns a HTTP_400_BAD_REQUEST. """ if serializer_class is None: if self.serializer_class is None: raise NotImplementedError('serializer_class Attribute should be set') else: self.serializer_class = serializer_class serializer = self.serializer_class(data=data) if not serializer.is_valid(raise_exception=False): # Log errors. logger.info('BAD REQUEST! Serialization failed with following errors:\n\n{0}\n\nData:\n\n{1}'.format( serializer.errors, data, )) # This raises a bad request response. raise ParseError(detail=None) return serializer.validated_data
def parse(self, stream, media_type=None, parser_context=None): parser_context = parser_context or {} delimiter = parser_context.get('delimiter', ',') try: encoding = parser_context.get('encoding', settings.DEFAULT_CHARSET) rows = unicode_csv_reader(universal_newlines(stream), delimiter=delimiter, charset=encoding) data = OrderedRows(next(rows)) for row in rows: row_data = dict(zip(data.header, row)) data.append(row_data) return data except Exception as exc: raise ParseError('CSV parse error - %s' % str(exc))
def get(self, request): raise ParseError('lol nice one')
def _get_page(self): try: return int(self.request.GET.get('page', '1')) except ValueError: raise ParseError('page must be an integer')
def _get_page_size(self): try: return int(self.request.GET.get('page_size', '10')) except ValueError: raise ParseError('page_size must be an integer')
def parse(self, data): try: return ujson.loads(data) except ValueError as exc: raise ParseError('JSON parse error - %s' % six.text_type(exc))
def get_object(self, queryset=None): """Lookup user profile by pk or username""" if self.kwargs.get(self.lookup_field, None) is None: raise ParseError( 'Expected URL keyword argument `%s`.' % self.lookup_field ) if queryset is None: queryset = self.filter_queryset(self.get_queryset()) serializer = self.get_serializer() lookup_field = self.lookup_field if self.lookup_field in serializer.get_fields(): k = serializer.get_fields()[self.lookup_field] if isinstance(k, serializers.HyperlinkedRelatedField): lookup_field = '%s__%s' % (self.lookup_field, k.lookup_field) lookup = self.kwargs[self.lookup_field] filter_kwargs = {lookup_field: lookup} try: pk = int(lookup) except (TypeError, ValueError): pass else: filter_kwargs = {'user__pk': pk} obj = get_object_or_404(queryset, **filter_kwargs) # May raise a permission denied self.check_object_permissions(self.request, obj) return obj
def expire(self, request, *args, **kwargs): try: TempToken.objects.get(user=request.user).delete() except TempToken.DoesNotExist: raise ParseError(_(u"Temporary token not found!")) return Response(status=status.HTTP_204_NO_CONTENT)