我们从Python开源项目中,提取了以下40个代码示例,用于说明如何使用rest_framework.renderers.JSONRenderer()。
def publish(user, item=None, additional_data=None): if additional_data is None: additional_data = {} redis_publisher = RedisPublisher(facility='all', users=[user.email]) r = JSONRenderer() if item is None: data = {} else: data = item.as_dict() data.update(additional_data) data = r.render(data) message = RedisMessage(data) if getattr(settings, 'TESTING', False): # Do not send in tests return redis_publisher.publish_message(message)
def test_publish(self): with mock.patch("universal_notifications.backends.websockets.RedisMessage") as mocked_message: # test without extra arguments publish(self.user) mocked_message.assert_called_with(JSONRenderer().render({})) mocked_message.reset_mock() # test with single item publish(self.user, self.item) mocked_message.assert_called_with(JSONRenderer().render(self.item.as_dict())) mocked_message.reset_mock() # test with additional_data additional_data = {"additional": True} publish(self.user, self.item, additional_data) result = self.item.as_dict() result.update(additional_data) mocked_message.assert_called_with(JSONRenderer().render(result))
def __init__(self, data, **kwargs): kwargs['status'] = data['status'] del data['status'] request = data['request'] del data['request'] mime = request.META.get('HTTP_ACCEPT') if mime=='application/json': kwargs['content_type'] = 'application/json' data['error'] = data['detail'] del data['detail'] content = JSONRenderer().render(data) else: kwargs['content_type'] = 'application/vnd.collection+json' self.exception = True renderer_context = {} renderer_context['request'] = request renderer_context['view'] = None renderer_context['response'] = self content = CollectionJsonRenderer().render(data, renderer_context=renderer_context) super(RenderedResponse, self).__init__(content, **kwargs)
def render(self, data, media_type=None, renderer_context=None): if data.get('results', None) is not None: return json.dumps({ self.pagination_object_label: data['results'], self.pagination_count_label: data['count'] }) # If the view throws an error (such as the user can't be authenticated # or something similar), `data` will contain an `errors` key. We want # the default JSONRenderer to handle rendering errors, so we need to # check for this case. elif data.get('errors', None) is not None: return super(ConduitJSONRenderer, self).render(data) else: return json.dumps({ self.object_label: data })
def react_documents(context, module, reload_on_success=False): chapters = Chapter.objects.filter(module=module) serializer = ChapterSerializer(chapters, many=True) chapters_json = JSONRenderer().render(serializer.data) widget = CKEditorUploadingWidget(config_name='image-editor') widget._set_config() config = JSONRenderer().render(widget.config) context = { 'chapters': chapters_json, 'module': module.pk, 'config': config, 'id': 'document-' + str(module.id), 'reload_on_success': json.dumps(reload_on_success), 'ckeditor_media': widget.media } return context
def delete(self, request): """ Take the user object from the request and call the 'delete' method on it if it exists in the DB. If this succeeds then we can report a success. :param request: :param args: :param kwargs: :return: Serialised JSON Response Object to indicate the resource has been created """ stdlogger.debug("Hitting HTTP DELETE account view") user_email = request.user.email try: stdlogger.debug("Deleting this user object") request.user.delete() except (IntegrityError, InternalError, DataError, DatabaseError): # The chances of this happening are slim to none! And this line of code should never happen. So we really # need to tell the other system we are not capable of creating the resource. raise DatabaseFailureException context = {'account': user_email, 'deleted': 'success'} json_context = JSONRenderer().render(context) return Response(data=json_context, status=status.HTTP_201_CREATED,)
def force_evaluate(val): s = JSONRenderer().render(val) return json.loads(s)
def home(request, bd=None): """Home view.""" if bd is not None: bd_object = get_object_or_404(BlockDiagram, id=bd) bd_data = BlockDiagramSerializer(bd_object).data bd_serial = JSONRenderer().render(bd_data) else: bd_serial = "None" return render(request, 'home.html', {'bd': bd_serial})
def to_json(data): return mark_safe(JSONRenderer().render(data).decode('utf-8'))
def test_is_json_serializable(self): user_agent_string = str( 'Mozilla/5.0 (Linux; Android 6.0.1; Z831 Build/MMB29M) ' 'AppleWebKit/537.36 (KHTML, like Gecko) Chrome/58.0.3029.83 ' 'Mobile Safari/537.36') request = self.client.get( reverse('intake-home'), HTTP_USER_AGENT=user_agent_string ).wsgi_request data = RequestSerializer(request).data json = JSONRenderer().render(data, renderer_context={'indent': 2}) for chunk in self.json_chunks: self.assertIn(chunk, json)
def get_response(self, data): json = JSONRenderer().render(data) return HttpResponse(json, content_type="application/json")
def kolibri_bootstrap_model(context, base_name, api_resource, **kwargs): response, kwargs, url_params = _kolibri_bootstrap_helper(context, base_name, api_resource, 'detail', **kwargs) html = ("<script type='text/javascript'>" "var model = {0}.resources.{1}.createModel(JSON.parse({2}), {3});" "model.synced = true;" "</script>".format(settings.KOLIBRI_CORE_JS_NAME, api_resource, json.dumps(JSONRenderer().render(response.data).decode('utf-8')), json.dumps(url_params))) return mark_safe(html)
def kolibri_bootstrap_collection(context, base_name, api_resource, **kwargs): response, kwargs, url_params = _kolibri_bootstrap_helper(context, base_name, api_resource, 'list', **kwargs) html = ("<script type='text/javascript'>" "var collection = {0}.resources.{1}.createCollection({2}, {3}, JSON.parse({4}));" "collection.synced = true;" "</script>".format(settings.KOLIBRI_CORE_JS_NAME, api_resource, json.dumps(url_params), json.dumps(kwargs), json.dumps(JSONRenderer().render(response.data).decode('utf-8')), )) return mark_safe(html)
def post(self, request): ''' Returns a list of food with calories ''' if 'foods' not in request.data: raise Http404 food_querysets = {"foods": []} for food in request.data.get('foods'): q = Q(name__icontains=food.upper()) | Q(name__fuzzy=food.upper()) result = Food.objects.filter(q)[:5] food_querysets['foods'].append(result) serializer = FoodListSerializer(food_querysets) json = JSONRenderer().render(serializer.data) return Response(json)
def __init__(self, data, **kwargs): content = JSONRenderer().render(data) kwargs['content_type'] = 'application/json' super(JSONResponse, self).__init__(content, **kwargs)
def render(self, data, accepted_media_type=None, renderer_context=None): if renderer_context["response"].status_code != status.HTTP_200_OK: return JSONRenderer().render(data) extra = self.get_customizations() return VersionedOpenAPICodec().encode(data, extra=extra)
def test_dictionary_to_json(self): serializer = StatusReportSerializer(self.new_status) content = JSONRenderer().render(serializer.data) expected_json = JSONRenderer().render(self.expected_dict) self.assertEquals(expected_json, content)
def test_json_to_StatusReport(self): json = JSONRenderer().render(self.expected_dict) stream = BytesIO(json) data = JSONParser().parse(stream) serializer = StatusReportSerializer(self.new_status, data=data) self.assertTrue(serializer.is_valid()) status = serializer.save() self.assertEqual(self.new_status, status) self.assertEqual(self.new_status.id, status.id) self.assertEqual(self.new_status.status, status.status) self.assertEqual(self.new_status.when, status.when) self.assertEqual(self.new_status.user, status.user)
def test_json_to_new_StatusReport(self): json = JSONRenderer().render(self.expected_dict) stream = BytesIO(json) data = JSONParser().parse(stream) serializer = StatusReportSerializer(data=data) self.assertTrue(serializer.is_valid()) status = serializer.save() self.assertEqual(self.new_status.status, status.status) self.assertIsNotNone(status.when) self.assertEqual(self.new_status.user, status.user)
def render(self, data): from rest_framework.renderers import JSONRenderer renderer = JSONRenderer() return renderer.render(data)
def __init__(self, data, **kwargs): content = JSONRenderer().render(data) kwargs['content_type'] = 'application/json' super(JSONResponse, self).__init__(content, **kwargs) # helper functions
def __init__(self, data, errorcode=200, errormsg="success", **kwargs): print(kwargs) composite = {} if isinstance(data, list): composite = {"errorcode":errorcode, "errormsg":errormsg, "data":{"result":data}} else: composite = {"errorcode":errorcode, "errormsg":errormsg, "data":data} content = JSONRenderer().render(composite) kwargs['content_type'] = 'application/json' super(JSONResponse, self).__init__(content, **kwargs)
def parse_json_response(json): """ parse the json response """ rendered = JSONRenderer().render(json) stream = BytesIO(rendered) return JSONParser().parse(stream)
def render(self, data, accepted_media_type=None, renderer_context=None): if renderer_context['response'].status_code != status.HTTP_200_OK: return JSONRenderer().render(data) extra = self.get_customizations() swagger_doc = json.loads(OpenAPICodec().encode(data, extra=extra)) result = append_schemas(swagger_doc) # Append more information to documentation return result
def add_to_queue(self, obj): if getattr(settings, 'UNIVERSAL_NOTIFICATIONS_TWILIO_ENABLE_PROXY', False): connection = StrictRedis(**private_settings.WS4REDIS_CONNECTION) r = JSONRenderer() json_data = r.render({'number': obj.from_phone}) channel = getattr(settings, 'UNIVERSAL_NOTIFICATIONS_TWILIO_DISPATCHER_CHANNEL', '__un_twilio_dispatcher') connection.publish(channel, RedisMessage(json_data)) else: self.send(obj.message) obj.message.save()
def handle(self, *args, **options): connection = StrictRedis(**private_settings.WS4REDIS_CONNECTION) numbers = PhonePendingMessages.objects.all().values_list("from_phone", flat=True).distinct() for n in numbers: r = JSONRenderer() json_data = r.render({"number": n}) channel = getattr(settings, "UNIVERSAL_NOTIFICATIONS_TWILIO_DISPATCHER_CHANNEL", "__un_twilio_dispatcher") connection.publish(channel, RedisMessage(json_data))
def render(self, data, accepted_media_type=None, renderer_context=None): """This is almost the same as parent, only here I use my own OpenAPICodec, add definitions and base path""" if renderer_context['response'].status_code != status.HTTP_200_OK: return JSONRenderer().render(data) extra = self.get_customizations() extra['definitions'] = encode_schemas(data._definitions) # if the base path auto detection doesn't work well for you it may be overridden extra['basePath'] = getattr(settings, 'API_BASE_PATH', data._base_path) return BetterOpenAPICodec().encode(data, extra=extra)
def render(self, data, accepted_media_type=None, renderer_context=None): if renderer_context['response'].status_code != status.HTTP_200_OK: return JSONRenderer().render(data) extra = self.get_customizations() return OpenAPICodec().encode(data, extra=extra)
def quote_list(request, limit=263): """ API endpoint that lists all quotes or a specific number of quotes limit -- the number of quotes to be returned """ quotes = Quote.objects.all()[:int(limit)] serializer = QuoteSerializer(quotes, many=True) response = HttpResponse(JSONRenderer().render(serializer.data), content_type='application/json') response["Access-Control-Allow-Origin"] = "*" return response
def random_quote_detail(request): """ API endpoint that generates a random quote """ quote = Quote.objects.get(id=randrange(1, 264)) serializer = QuoteSerializer(quote) response = HttpResponse(JSONRenderer().render(serializer.data), content_type='application/json') response["Access-Control-Allow-Origin"] = "*" return response
def render(self, data, media_type=None, renderer_context=None): # Return data, squeezing time dimension as this only works with 3D data if not data["time_request"]: self.render_style = 'binary' data["data"].data = np.squeeze(data["data"].data, axis=(0,)) else: # This appears to contain time data. Error out renderer_context["response"].status_code = 400 renderer_context["accepted_media_type"] = 'application/json' self.media_type = 'application/json' self.format = 'json' err_msg = {"status": 400, "message": "The cutout service JPEG interface does not support 4D cutouts", "code": 2005} jr = JSONRenderer() return jr.render(err_msg, 'application/json', renderer_context) if renderer_context['view'].bit_depth != 8: # This renderer only works on uint8 data renderer_context["response"].status_code = 400 renderer_context["accepted_media_type"] = 'application/json' self.media_type = 'application/json' self.format = 'json' err_msg = {"status": 400, "message": "The cutout service JPEG interface only supports uint8 image data", "code": 2001} jr = JSONRenderer() return jr.render(err_msg, 'application/json', renderer_context) # Reshape matrix d_shape = data["data"].data.shape data["data"].data = np.reshape(data["data"].data, (d_shape[0] * d_shape[1], d_shape[2]), order="C") # Save to Image jpeg_image = Image.fromarray(data["data"].data) img_file = io.BytesIO() jpeg_image.save(img_file, "JPEG", quality=85) # Send file img_file.seek(0) return img_file.read()
def __init__(self, data, **kwargs): content = JSONRenderer().render( data, renderer_context={'indent': 2, 'ensure_asci': False}) kwargs['content_type'] = 'application/json' super(JSONResponse, self).__init__(content, **kwargs)
def react_polls(context, question): question_serializer = serializers.QuestionSerializer( question, context={'request': context['request']} ) return format_html( '<div data-mb-widget="polls" data-question="{question}"></div>', question=JSONRenderer().render(question_serializer.data) )
def react_poll_form(poll, reload_on_success=False): serializer = serializers.PollSerializer(poll) data_poll = JSONRenderer().render(serializer.data) reload_on_success = json.dumps(reload_on_success) return format_html( ( '<div data-mb-widget="poll-management" data-module="{module}" ' ' data-poll="{poll}" data-reloadOnSuccess="{reload_on_success}">' '</div>' ), module=poll.module.pk, poll=data_poll, reload_on_success=reload_on_success, )
def post(self, request, *args, **kwargs): """ Take the user object from the request and call the 'save' method on it to persist to the DB. If this succeeds then we can report a success. :param request: :param args: :param kwargs: :return: Serialised JSON Response Object to indicate the resource has been created """ stdlogger.debug("Hitting HTTP POST account view") # Try and persist the user to the DB. Remember this could fail a data integrity check if some other system has # saved this user before we run this line of code! try: request.user.save() except (IntegrityError, InternalError, DataError, DatabaseError): # The chances of this happening are slim to none! And this line of code should never happen. So we really # need to tell the other system we are not capable of creating the resource. raise DatabaseFailureException context = {'account': request.user.email, 'created': 'success'} json_context = JSONRenderer().render(context) return Response(data=json_context, status=status.HTTP_201_CREATED,)
def put(self, request): """ Take the user object from the request and updates user info if it exists in the DB. If the email (which is unique) is to be updated this will be in a new attribute within the PUT message called new_username. If this succeeds then we can report a success. :param request: :param args: :param kwargs: :return: Serialised JSON Response Object to indicate the resource has been created """ stdlogger.debug("Hitting HTTP PUT account view") # Try and persist the user to the DB. Remember this could fail a data integrity check if some other system has # saved this user before we run this line of code! try: # Check to see if this PUT is changing the user ID. If it is, then keep the same user object with Primary # Key and change the email to the new one. if request.new_username: stdlogger.info("Admin is updating a user ID to a new value") request.user.email = request.new_username request.user.save() except (IntegrityError, InternalError, DataError, DatabaseError): # The chances of this happening are slim to none! And this line of code should never happen. So we really # need to tell the other system we are not capable of creating the resource. raise DatabaseFailureException context = {'account': request.user.email, 'updated': 'success'} json_context = JSONRenderer().render(context) return Response(data=json_context, status=status.HTTP_201_CREATED,)
def course_table(request): if not http_basic_auth(request): return HttpResponse('Unauthorized.', status=401) if request.method == 'GET': auth_number = request.user.authority.auth if has_auth(auth_number, AuthorityName.Student): courses = models.Course.objects.filter(as_student_set=request.user.as_student) elif has_auth(auth_number, AuthorityName.Teacher): courses = models.Course.objects.filter(teacher=request.user.as_teacher) else: return HttpResponse('Permission Denied.', status=403) params = request.GET # ???? if 'year' in params: courses = courses.filter(year=params['year']) if 'term' in params: courses = courses.filter(term=params['term']) ser = serializer.CourseTableSerializer(courses, many=True) course_table_json = json.loads(JSONRenderer().render(ser.data)) # ??????????????????(?????????????/????) (now_year, now_month, now_day, now_hour, now_minute, now_second, _, _, _) = time.localtime(time.time()) date_now = utils.date_to_str(now_year, now_month, now_day) schedules = models.SystemSchedule.objects.filter(begin__lte=date_now).filter(end__gte=date_now) if schedules: schedule = schedules.first() items = models.SystemScheduleItem.objects.filter(system_schedule=schedule).order_by('no') schedule_data = { 'count': items.count(), 'items': [ { 'begin': utils.time_to_str(item.begin.hour, item.begin.minute, item.begin.second), 'end': utils.time_to_str(item.end.hour, item.end.minute, item.end.second), 'no': item.no, } for item in items ] } else: schedule_data = { 'count': 0, 'items': [] } # ??????json??? data = { 'count': schedule_data['count'], 'schedule_items': schedule_data['items'], 'course_items': course_table_json, } return HttpResponse(json.dumps(data), status=200) else: return HttpResponse('Method Not Allowed', status=405)