我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用django.core.serializers.json.DjangoJSONEncoder()。
def portfolioDataPush(): for userid in redis_conn.hkeys("online-users"): userid = userid.decode("utf-8") try: portfolioData = portfolio(userid) Channel( redis_conn.hget("online-users",userid)).send( { 'text': json.dumps(portfolioData,cls=DjangoJSONEncoder) }) except: print("Error in portfolioPush") #======================= SELL CHANNEL ==========================#
def history(request): username = auth.get_user(request).username if (username): ?t = city.objects.all().values() co = country.objects.all().values() city_json = json.dumps(list(?t), cls=DjangoJSONEncoder,ensure_ascii=False) country_json = json.dumps(list(co), cls=DjangoJSONEncoder,ensure_ascii=False) args={} args['city']=city_json args['country'] = country_json args['max_date'] = [] for i in ?t: args['max_date'].append((temperature.objects.filter(city_id__exact=i['city_id']).latest('date').date)) return render_to_response("history.html",args) else: return redirect("/login")
def forecast(request): username = auth.get_user(request).username if (username): ?t = city.objects.all().values() co = country.objects.all().values() city_json = json.dumps(list(?t), cls=DjangoJSONEncoder,ensure_ascii=False) country_json = json.dumps(list(co), cls=DjangoJSONEncoder,ensure_ascii=False) args={} args['city']=city_json args['country'] = country_json args['max_date']=[] for i in ?t: args['max_date'].append((temperature.objects.filter(city_id__exact = i['city_id']).latest('date').date)) #args['max_date'] = (temperature.objects.filter(city_id__exact=city).latest('date').date) return render_to_response("forecast.html",args) else: return redirect("/login")
def messages(request, room_id): if request.method == 'POST': try: room = ChatRoom.objects.get(eid=room_id) except ChatRoom.DoesNotExist: try: room = ChatRoom(eid=room_id) room.save() except IntegrityError: # someone else made the room. no problem room = ChatRoom.objects.get(eid=room_id) mfrom = request.POST['from'] text = request.POST['text'] with transaction.atomic(): msg = ChatMessage(room=room, user=mfrom, text=text) msg.save() send_event('room-%s' % room_id, 'message', msg.to_data()) body = json.dumps(msg.to_data(), cls=DjangoJSONEncoder) return HttpResponse(body, content_type='application/json') else: return HttpResponseNotAllowed(['POST'])
def append_event(self, channel, event_type, data): from . import models db_event = models.Event( channel=channel, type=event_type, data=json.dumps(data, cls=DjangoJSONEncoder)) db_event.save() self.trim_event_log() e = Event( db_event.channel, db_event.type, data, id=db_event.eid) return e
def test_jsonrpc_invalid_request_1(live_server): # Missing 'method' in payload headers = {'content-type': 'application/json'} payload = { # "method": 'add', "params": [5, 6], "jsonrpc": "2.0", "id": random.randint(1, 1000), } req_data = json.dumps(payload, cls=DjangoJSONEncoder) response = requests.post(live_server.url + '/all-rpc/', data=req_data, headers=headers).json() assert 'Missing parameter "method"' in response['error']['message'] assert RPC_INVALID_REQUEST == response['error']['code']
def test_jsonrpc_invalid_request_2(live_server): # Missing 'jsonrpc' in payload headers = {'content-type': 'application/json'} payload = { "method": 'add', "params": [5, 6], # "jsonrpc": "2.0", "id": random.randint(1, 1000), } req_data = json.dumps(payload, cls=DjangoJSONEncoder) response = requests.post(live_server.url + '/all-rpc/', data=req_data, headers=headers).json() assert 'Missing parameter "jsonrpc"' in response['error']['message'] assert RPC_INVALID_REQUEST == response['error']['code']
def test_jsonrpc_invalid_request_3(live_server): # Bad value for payload member 'jsonrpc' headers = {'content-type': 'application/json'} payload = { "method": 'add', "params": [5, 6], "jsonrpc": "1.0", "id": random.randint(1, 1000), } req_data = json.dumps(payload, cls=DjangoJSONEncoder) response = requests.post(live_server.url + '/all-rpc/', data=req_data, headers=headers).json() assert 'The attribute "jsonrpc" must contain "2.0"' in response['error']['message'] assert RPC_INVALID_REQUEST == response['error']['code']
def serialize_instance(instance): """ Since Django 1.6 items added to the session are no longer pickled, but JSON encoded by default. We are storing partially complete models in the session (user, account, token, ...). We cannot use standard Django serialization, as these are models are not "complete" yet. Serialization will start complaining about missing relations et al. """ data = {} for k, v in instance.__dict__.items(): if k.startswith('_') or callable(v): continue try: if isinstance(instance._meta.get_field(k), BinaryField): v = force_text(base64.b64encode(v)) except FieldDoesNotExist: pass data[k] = v return json.loads(json.dumps(data, cls=DjangoJSONEncoder))
def common_well_search(request): """ Returns json and array data for a well search. Used by both the map and text search. """ well_results = None well_results_json = '[]' form = SearchForm(request.GET) if form.is_valid(): well_results = form.process() if well_results and not len(well_results) > SearchForm.WELL_RESULTS_LIMIT: well_results_json = json.dumps( [well.as_dict() for well in well_results], cls=DjangoJSONEncoder) return form, well_results, well_results_json
def __init__(self, *args, **kwargs): """ :param dump_kwargs: Keyword arguments which will be passed to `json.dumps()` :param load_kwargs: Keyword arguments which will be passed to `json.loads()` """ self.dump_kwargs = kwargs.pop('dump_kwargs', { 'cls': DjangoJSONEncoder, 'ensure_ascii': False, 'sort_keys': False, 'separators': (',', ':') }) self.load_kwargs = kwargs.pop('load_kwargs', { 'object_pairs_hook': OrderedDict }) if not kwargs.get('null', False): kwargs['default'] = kwargs.get('default', defaultdict(OrderedDict)) super(JSONField, self).__init__(*args, **kwargs)
def _preencode(data, magic_marker, in_coords=False, in_groups=False): if isinstance(data, dict): data = data.copy() for name, value in tuple(data.items()): if name in ('bounds', 'point', 'locations') and isinstance(value, (tuple, list)): data[name] = magic_marker+json.dumps(value, cls=DjangoJSONEncoder)+magic_marker else: data[name] = _preencode(value, magic_marker, in_coords=(name == 'coordinates'), in_groups=(name == 'groups')) return data elif isinstance(data, (tuple, list)): if (in_coords and data and isinstance(data[0], (int, float))) or in_groups: return magic_marker+json.dumps(data, cls=DjangoJSONEncoder)+magic_marker else: return tuple(_preencode(value, magic_marker, in_coords) for value in data) else: return data
def export_action(self, request): response = HttpResponse(content_type='application/json') response['Content-Disposition'] = 'attachment; filename=templatemodel_export.json' items = [{ 'name': t.name, 'layout': t.layout, 'subject': t.subject, 'body': t.body, 'version': t.version, 'enabled': t.enabled, 'has_errors': t.has_errors, 'created_at': t.created_at, 'updated_at': t.updated_at, 'history': [{ 'layout': h.layout, 'subject': h.subject, 'body': h.body, 'version': h.version, 'archived_at': h.archived_at, } for h in t.history.all()] } for t in TemplateModel.objects.all()] json.dump(items, response, cls=DjangoJSONEncoder, indent=2, sort_keys=True) return response
def calc_frequency(areaNum, word_counts, curr_timeslot_word_counts): timeslot_area_percentage = {} for i in word_counts: for j in curr_timeslot_word_counts: #on same word (key) calculate actual percent if i == j: timeslot_area_percentage[i] = (word_counts[i]/(curr_timeslot_word_counts[j] + 0.00)) * 100 timeslot_area_percentage[i] = round(timeslot_area_percentage[i], 2) return timeslot_area_percentage # if areaNum == 1: # return {"area1" : timeslot_area_percentage} # TODO: convert to django friendly using below?? # elif areaNum == 2: # return {"area2" : timeslot_area_percentage} # elif areaNum == 3: # return {"area3" : timeslot_area_percentage} #convert site percents to django friendly JSON # site1_percentage_json = json.dumps(dict(site1_percentage), cls=DjangoJSONEncoder) # site2_percentage_json = json.dumps(dict(site2_percentage), cls=DjangoJSONEncoder) # takes a url as a string and returns a SET of TUPLES of all of the words # that are used on that webpage in order of frequency
def get(self, request, nnid): """ Common Network Info Get Method --- # Class Name : CommonNNInfoList # Description: Structure : <nninfo> - version - batch version Search deinfed list of neural networks """ try: condition = {} condition['nn_id'] = nnid if str(nnid).lower() == 'all': condition['nn_id'] = '%' elif str(nnid).lower() == 'seq': condition['nn_id'] = 'seq' return_data = NNCommonManager().get_nn_info(condition) logging.info(return_data) return Response(json.dumps(return_data, cls=DjangoJSONEncoder)) except Exception as e: return_data = {"status": "404", "result": str(e)} return Response(json.dumps(return_data, cls=DjangoJSONEncoder))
def test_serialize_and_deserialize_1(): transcoder = UnifiedTranscoder(json_encoder_cls=DjangoJSONEncoder) # test serialize created = SampleAggregate.Created(entity_id='b089a0a6-e0b3-480d-9382-c47f99103b3d', attr1='val1', attr2='val2', metadata={'command_id': 123}) created_stored_event = transcoder.serialize(created) assert created_stored_event.event_type == 'sample_aggregate_created' assert created_stored_event.event_version == 1 assert created_stored_event.event_data == '{"attr1":"val1","attr2":"val2"}' assert created_stored_event.aggregate_id == 'b089a0a6-e0b3-480d-9382-c47f99103b3d' assert created_stored_event.aggregate_version == 0 assert created_stored_event.aggregate_type == 'SampleAggregate' assert created_stored_event.module_name == 'djangoevents.tests.test_unifiedtranscoder' assert created_stored_event.class_name == 'SampleAggregate.Created' assert created_stored_event.metadata == '{"command_id":123}' # test deserialize created_copy = transcoder.deserialize(created_stored_event) assert 'metadata' not in created_copy.__dict__ created.__dict__.pop('metadata') # metadata is not included in deserialization assert created.__dict__ == created_copy.__dict__
def test_serialize_and_deserialize_2(): transcoder = UnifiedTranscoder(json_encoder_cls=DjangoJSONEncoder) # test serialize updated = SampleAggregate.Updated(entity_id='b089a0a6-e0b3-480d-9382-c47f99103b3d', entity_version=10, attr1='val1', attr2='val2', metadata={'command_id': 123}) updated_stored_event = transcoder.serialize(updated) assert updated_stored_event.event_type == 'overridden_event_type' assert updated_stored_event.event_version == 1 assert updated_stored_event.event_data == '{"attr1":"val1","attr2":"val2"}' assert updated_stored_event.aggregate_id == 'b089a0a6-e0b3-480d-9382-c47f99103b3d' assert updated_stored_event.aggregate_version == 10 assert updated_stored_event.aggregate_type == 'SampleAggregate' assert updated_stored_event.module_name == 'djangoevents.tests.test_unifiedtranscoder' assert updated_stored_event.class_name == 'SampleAggregate.Updated' assert updated_stored_event.metadata == '{"command_id":123}' # test deserialize updated_copy = transcoder.deserialize(updated_stored_event) assert 'metadata' not in updated_copy.__dict__ updated.__dict__.pop('metadata') # metadata is not included in deserialization assert updated.__dict__ == updated_copy.__dict__
def test_transcoder_passes_all_attributes_to_event_constructor(): attributes = { 'entity_id': 'b089a0a6-e0b3-480d-9382-c47f99103b3d', 'foo': 0, 'bar': 1, } event = SampleAggregate.Created(**attributes) transcoder = UnifiedTranscoder(json_encoder_cls=DjangoJSONEncoder) serialized_event = transcoder.serialize(event) init_call_args = None def init(self, *args, **kwargs): nonlocal init_call_args init_call_args = (args, kwargs) with mock.patch.object(SampleAggregate.Created, '__init__', init): transcoder.deserialize(serialized_event) args, kwargs = init_call_args assert args == tuple() for key, value in attributes.items(): assert key in kwargs assert kwargs[key] == value
def set_service(self, service_name, service_config): try: service = Service.objects.get(name=service_name) service.json = json.dumps(service_config.config) except Service.DoesNotExist: service = Service( name=service_name, json=json.dumps(service_config.config, cls=DjangoJSONEncoder), ) service.save()
def _init_scratch_org(self, org_config): if not org_config.scratch: # Only run against scratch orgs return # Set up the logger to output to the build.log field init_logger(self.build) # Create the scratch org and get its info info = org_config.scratch_info # Get the contents of the org's json file from Salesforce DX json_path = os.path.join(os.path.expanduser('~'), '.sfdx', info['username'] + '.json') if not os.path.isfile(json_path): json_path = os.path.join(os.path.expanduser('~'), '.local', '.sfdx', info['username'] + '.json') with open(json_path, 'r') as json_file: dx_json = json_file.read() org_json = json.dumps(org_config.config, cls=DjangoJSONEncoder) # Create a ScratchOrgInstance to store the org info instance = ScratchOrgInstance( org=org_config.org, build=self.build, sf_org_id=info['org_id'], username=info['username'], json_dx=dx_json, json=org_json, ) instance.save() org_config.org_instance = instance return org_config
def set_org(self, org_config): org_json = json.dumps(org_config.config, cls=DjangoJSONEncoder) try: org = Org.objects.get(repo=self.build.repo, name=org_config.name) org.json = org_json except Org.DoesNotExist: org = Org( name=org_config.name, json=org_json, repo=self.build.repo, ) org.scratch = isinstance(org_config, ScratchOrgConfig) if not org.scratch: org.save()
def __init__(self, height=None, width=None, html_id=None, json_encoder_class=DjangoJSONEncoder): self.height = height self.width = width self.json_encoder_class = json_encoder_class self.html_id = html_id if ((height is not None or width is not None) and (self.responsive is None or self.responsive)): raise ImproperlyConfigured( 'Using the height/width parameter will have no ' 'effect if the chart is in responsive mode. ' 'Disable responsive mode by setting chart.responsive ' 'to False')
def __init__(self, data, encoder=DjangoJSONEncoder, safe=True, json_dumps_params=None, **kwargs): if safe and not isinstance(data, dict): raise TypeError('In order to allow non-dict objects to be ' 'serialized set the safe parameter to False') if json_dumps_params is None: json_dumps_params = {} kwargs.setdefault('content_type', 'application/json') data = json.dumps(data, cls=encoder, **json_dumps_params) super(JsonResponse, self).__init__(content=data, **kwargs)
def api_detail(self, request, *args, **kwargs): obj = self.get_object(request, object_id=kwargs.get("pk")) ModelForm = self.get_form(request, obj=obj) form = ModelForm(instance=obj) data = self.obj_as_dict(request, form.instance) return HttpResponse(json.dumps(data, cls=DjangoJSONEncoder), content_type='application/json')
def api_create(self, request, *args, **kwargs): data = json.loads(request.body) ModelForm = self.get_form(request, obj=None) form = ModelForm(data=data, files=request.FILES) if form.is_valid(): obj = form.save() data = self.obj_as_dict(request, obj) return HttpResponse(json.dumps(data, cls=DjangoJSONEncoder), content_type="application/json") else: errors = { "errors": json.loads(form.errors.as_json()) } return HttpResponse(json.dumps(errors), status=400, content_type="application/json")
def niftyChannelDataPush(): nifty_data = niftyData() Group('nifty-channel').send( { 'text': json.dumps(nifty_data,cls=DjangoJSONEncoder) }) #================ Leaderboard channel =======================# # @http_session_user # @isLoggedInCh # def connect_to_leaderboard_channel(message): # Group('leaderboard-channel').add(message.reply_channel) # message.reply_channel.send({ # 'text' : json.dumps({"accept": True}) #{ "close" : True } # }) # print("New leaderboard listener added!") # def disconnect_from_leaderboard_channel(message): # Group('leaderboard-channel').discard(message.reply_channel) # def leaderboardChannelDataPush(): # leaderboard_data = leaderboardData() # Group('leaderboard-channel').send( # { # 'text': json.dumps(leaderboard_data,cls=DjangoJSONEncoder) # }) #=============== Graph Channel =======================#
def graphDataPush(): graphData = graph('NIFTY 50') Group('NIFTY-50').send({ 'text': json.dumps(graphData,cls=DjangoJSONEncoder), }) #==================== portfolio channel ========================#
def sellDataPush(): for userid in redis_conn.hkeys("online-sellers"): userid = userid.decode("utf-8") try: sellData = sell_data(userid) Channel( redis_conn.hget("online-sellers",userid) ).send( { 'text' : json.dumps(sellData,cls=DjangoJSONEncoder) }) except: print("sellDataPush failed!") #================ Ticker Channel =======================#
def tickerDataPush(): tickerData = ticker_data() Group('Ticker').send({ 'text': json.dumps(tickerData,cls=DjangoJSONEncoder), })
def kryptos_leader_channel_push(data): Group('kryptos-leader-channel').send( { 'text': json.dumps(data,cls=DjangoJSONEncoder) })
def echo_leader_channel_push(data): Group('echo-leader-channel').send( { 'text': json.dumps(data,cls=DjangoJSONEncoder) })
def userDataPush(userid,data): Channel( redis_conn.hget("online-users",userid) ).send( { 'text' : json.dumps(sellData,cls=DjangoJSONEncoder) } )
def hashinclude_channel_push(data): Group('hashinclude-channel').send( { 'text': json.dumps(data,cls=DjangoJSONEncoder) })
def test_no_pond_blocks_no_state_changed(self, modify_mock): pond_blocks = [] now = timezone.now() mixer.cycle(3).blend(Window, request=(r for r in self.requests), start=now - timedelta(days=2), end=now + timedelta(days=1)) responses.add(responses.GET, settings.POND_URL + '/pond/pond/blocks/new/', body=json.dumps(pond_blocks, cls=DjangoJSONEncoder), status=200, content_type='application/json') one_week_ahead = timezone.now() + timedelta(weeks=1) response = self.client.get(reverse('api:isDirty') + '?last_query_time=' + parse.quote(one_week_ahead.isoformat())) response_json = response.json() self.assertFalse(response_json['isDirty'])
def test_pond_blocks_no_state_changed(self, modify_mock): now = timezone.now() mixer.cycle(3).blend(Window, request=(r for r in self.requests), start=now - timedelta(days=2), end=now + timedelta(days=1)) molecules1 = basic_mixer.cycle(3).blend(PondMolecule, completed=False, failed=False, request_num=self.requests[0].id, tracking_num=self.ur.id, event=[]) molecules2 = basic_mixer.cycle(3).blend(PondMolecule, completed=False, failed=False, request_num=self.requests[1].id, tracking_num=self.ur.id, event=[]) molecules3 = basic_mixer.cycle(3).blend(PondMolecule, completed=False, failed=False, request_num=self.requests[2].id, tracking_num=self.ur.id, event=[]) pond_blocks = basic_mixer.cycle(3).blend(PondBlock, molecules=(m for m in [molecules1, molecules2, molecules3]), start=now + timedelta(minutes=30), end=now + timedelta(minutes=40)) pond_blocks = [pb._to_dict() for pb in pond_blocks] responses.add(responses.GET, settings.POND_URL + '/pond/pond/blocks/new/', body=json.dumps(pond_blocks, cls=DjangoJSONEncoder), status=200, content_type='application/json') one_week_ahead = timezone.now() + timedelta(weeks=1) response = self.client.get(reverse('api:isDirty') + '?last_query_time=' + parse.quote(one_week_ahead.isoformat())) response_json = response.json() self.assertFalse(response_json['isDirty']) for i, req in enumerate(self.requests): req.refresh_from_db() self.assertEqual(req.state, 'PENDING') self.ur.refresh_from_db() self.assertEqual(self.ur.state, 'PENDING')
def test_pond_blocks_state_change_completed(self, modify_mock): now = timezone.now() mixer.cycle(3).blend(Window, request=(r for r in self.requests), start=now - timedelta(days=2), end=now - timedelta(days=1)) molecules1 = basic_mixer.cycle(3).blend(PondMolecule, completed=True, failed=False, request_num=self.requests[0].id, tracking_num=self.ur.id, event=[]) molecules2 = basic_mixer.cycle(3).blend(PondMolecule, completed=False, failed=False, request_num=self.requests[1].id, tracking_num=self.ur.id, event=[]) molecules3 = basic_mixer.cycle(3).blend(PondMolecule, completed=False, failed=False, request_num=self.requests[2].id, tracking_num=self.ur.id, event=[]) pond_blocks = basic_mixer.cycle(3).blend(PondBlock, molecules=(m for m in [molecules1, molecules2, molecules3]), start=now - timedelta(minutes=30), end=now - timedelta(minutes=20)) pond_blocks = [pb._to_dict() for pb in pond_blocks] responses.add(responses.GET, settings.POND_URL + '/pond/pond/blocks/new/', body=json.dumps(pond_blocks, cls=DjangoJSONEncoder), status=200, content_type='application/json') response = self.client.get(reverse('api:isDirty')) response_json = response.json() self.assertTrue(response_json['isDirty']) request_states = ['COMPLETED', 'WINDOW_EXPIRED', 'WINDOW_EXPIRED'] for i, req in enumerate(self.requests): req.refresh_from_db() self.assertEqual(req.state, request_states[i]) self.ur.refresh_from_db() self.assertEqual(self.ur.state, 'COMPLETED')
def get_prep_value(self, value): return json.dumps(value, cls=DjangoJSONEncoder)
def sse_encode_event(event_type, data, event_id=None, escape=False): data_str = json.dumps(data, cls=DjangoJSONEncoder) if escape: event_type = build_id_escape(event_type) data_str = build_id_escape(data_str) out = 'event: %s\n' % event_type if event_id: out += 'id: %s\n' % event_id out += 'data: %s\n\n' % data_str return out
def get_filter_info(self, request, **kwargs): self.setup_filters(**kwargs) search = request.GET.get("search", None) if search: self.apply_search(search) name = request.GET.get("name", None) table_filter = self.filter_map.get_filter(name) return json.dumps(table_filter.to_json(self.queryset), indent=2, cls=DjangoJSONEncoder)
def get(self, request, *args, **kwargs): def response(data): return HttpResponse(json.dumps(data, indent=2, cls=DjangoJSONEncoder), content_type="application/json") error = "ok" search_term = request.GET.get("search", None) if search_term is None: # We got no search value so return empty reponse return response({'error': error, 'results': []}) try: prj = Project.objects.get(pk=kwargs['pid']) except KeyError: prj = None results = self.apply_search(search_term, prj, request)[:ToasterTypeAhead.MAX_RESULTS] if len(results) > 0: try: self.validate_fields(results[0]) except self.MissingFieldsException as e: error = e data = {'results': results, 'error': error} return response(data)
def __init__(self, content, status=200, *args, **kwargs): super(UploadResponse, self).__init__( content=DjangoJSONEncoder().encode(content), content_type='application/json', status=status, *args, **kwargs )
def process_form_submission(self, form): self.get_submission_class().objects.create( form_data=json.dumps(form.cleaned_data, cls=DjangoJSONEncoder), page=self, user=form.user )
def process_form_submission(self, form): """ Accepts form instance with submitted data, user and page. Creates submission instance. You can override this method if you want to have custom creation logic. For example, if you want to save reference to a user. """ self.get_submission_class().objects.create( form_data=json.dumps(form.cleaned_data, cls=DjangoJSONEncoder), page=self, )
def serialized_facility_factory(identifier): facility = Facility(name="Facility {}".format(identifier), id=identifier) return DjangoJSONEncoder().encode(facility.serialize())
def __get__(self, instance, owner): from restframeworkclient.models import Model def callable(**kwargs): def get_value(value): if isinstance(value, dict): return json.dumps(value, cls=DjangoJSONEncoder) if value in (datetime.datetime.now, timezone.now): return value() if isinstance(value, Model): return value.pk return value kwargs = {k: get_value(v) for k, v in kwargs.items()} arg_name = 'data' if self.method == 'POST' else 'params' if self.static: url = owner._resources_url() + self.subresource + '/' data = owner._rest_call(url, method=self.method, **{arg_name: kwargs}) else: url = instance._resource_url(instance.pk) + self.subresource + '/' data = instance._rest_call(url, method=self.method, **{arg_name: kwargs}) if self.unwrapping_key: data = data[self.unwrapping_key] if self.model: obj = self.model(**data) obj._persisted = True return obj return data return callable() if self.as_property else callable
def kolibri_set_server_time(): html = ("<script type='text/javascript'>" "{0}.utils.serverClock.setServerTime({1});" "</script>".format(settings.KOLIBRI_CORE_JS_NAME, json.dumps(now(), cls=DjangoJSONEncoder))) return mark_safe(html)
def _store_form_kwargs(self, form): session_data = self.request.session.setdefault(self.wizard_name, {}) # Adjust kwargs to avoid trying to save the instances form_data = form.cleaned_data.copy() for prop in ('range', 'benefit', 'condition', 'offer_group'): obj = form_data.get(prop, None) if obj is not None: form_data[prop] = obj.id form_kwargs = {'data': form_data} json_data = json.dumps(form_kwargs, cls=DjangoJSONEncoder) session_data[self._key()] = json_data self.request.session.save()
def __init__(self, data, encoder=DjangoJSONEncoder, safe=True, **kwargs): if safe and not isinstance(data, dict): raise TypeError('In order to allow non-dict objects to be ' 'serialized set the safe parameter to False') kwargs.setdefault('content_type', 'application/json') data = json.dumps(data, cls=encoder) super(JsonResponse, self).__init__(content=data, **kwargs)