我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用django.forms.models.model_to_dict()。
def to_dict(self): d = model_to_dict(self) tastyjson = self._serializer.to_json(d) d = self._serializer.from_json(tastyjson) d[DOC_TYPE_FIELD_NAME] = self.get_doc_type() d['id'] = self.get_id() if 'cbnosync_ptr' in d: del d['cbnosync_ptr'] if 'csrfmiddlewaretoken' in d: del d['csrfmiddlewaretoken'] for field in self._meta.fields: if isinstance(field, DateTimeField): d[field.name] = self._string_from_date(field.name) if isinstance(field, ListField): if isinstance(field.item_field, EmbeddedModelField): self.to_dict_nested_list(field.name, d) if isinstance(field.item_field, ModelReferenceField): self.to_dict_reference_list(field.name, d) if isinstance(field, EmbeddedModelField): self.to_dict_nested(field.name, d) if isinstance(field, ModelReferenceField): self.to_dict_reference(field.name, d) return d
def create(body): """ Create new tag """ try: body.pop('id', None) if body.get('tagType') not in TAG_TYPE: raise BadRequest('Invalid or missing tag type') existing = [tag.lower() for tag in Tag.objects.all().values_list('name', flat=True)] if body['name'].lower().strip() in existing: raise BadRequest('Tag already exists') body['codename'] = body['name'].lower().replace(' ', '_') tag = Tag.objects.get_or_create(**body)[0] except (AttributeError, KeyError, FieldError, IntegrityError, ValueError): raise BadRequest('Invalid fields in body') return model_to_dict(tag)
def update(tag_id, body): """ Update category """ try: tag = Tag.objects.get(id=tag_id) except (ObjectDoesNotExist, ValueError): raise NotFound('Tag not found') try: body.pop('id', None) existing = Tag.objects.exclude(id=tag.id).values_list('name', flat=True) existing = [tg.lower() for tg in existing] if body['name'].lower().strip() in existing: raise BadRequest('Tag already exists') Tag.objects.filter(pk=tag.pk).update(**body) tag = Tag.objects.get(pk=tag.pk) except (AttributeError, KeyError, FieldError, IntegrityError, ValueError, TypeError): raise BadRequest('Invalid fields in body') return model_to_dict(tag)
def _format_ticket_response(tickets, user): """ Convert datetime object and add flat foreign key """ for ticket in tickets: # Flat foreign models if ticket.get('defendant'): defendant = Defendant.objects.get(id=ticket['defendant']) ticket['defendant'] = model_to_dict(defendant) ticket['defendant']['email'] = defendant.details.email if ticket.get('service'): ticket['service'] = model_to_dict(Service.objects.get(id=ticket['service'])) if ticket.get('treatedBy'): ticket['treatedBy'] = User.objects.get(id=ticket['treatedBy']).username if ticket.get('tags'): tags = Ticket.objects.get(id=ticket['id']).tags.all() ticket['tags'] = [model_to_dict(tag) for tag in tags] ticket['commentsCount'] = TicketComment.objects.filter(ticket=ticket['id']).count() ticket['starredByMe'] = StarredTicket.objects.filter( ticket_id=ticket['id'], user=user ).exists()
def get_ticket_attachments(ticket_id): """ Get ticket attachments.. """ try: ticket = Ticket.objects.get(id=ticket_id) except (IndexError, ObjectDoesNotExist, ValueError): raise NotFound('Ticket not found') ticket_reports_id = ticket.reportTicket.all().values_list( 'id', flat=True ).distinct() attachments = AttachedDocument.objects.filter(report__id__in=ticket_reports_id).distinct() attachments = list(attachments) attachments.extend(ticket.attachments.all()) attachments = list(set(attachments)) attachments = [model_to_dict(attach) for attach in attachments] return attachments
def get_jobs_status(ticket_id): """ Get actions todo status """ try: ticket = Ticket.objects.get(id=ticket_id) except (ObjectDoesNotExist, ValueError): raise NotFound('Ticket not found') resp = [] jobs = ticket.jobs.all().order_by('creationDate') for job in jobs: info = model_to_dict(job) if info.get('action'): info['action'] = model_to_dict(ServiceAction.objects.get(id=info['action'])) resp.append(info) return resp
def update(news_id, body, user): """ Update news """ try: if user.is_superuser: news = News.objects.get(id=news_id) else: news = News.objects.get(id=news_id, author__id=user.id) except (ObjectDoesNotExist, ValueError): return NotFound('News not found') try: body = {k: v for k, v in body.iteritems() if k not in ['author', 'date', 'tags']} News.objects.filter(pk=news.pk).update(**body) news = News.objects.get(pk=news.pk) except (KeyError, FieldError, IntegrityError): raise BadRequest('Invalid fields in body') return model_to_dict(news)
def index(**kwargs): """ Main endpoint, get all templates """ filters = {} if kwargs.get('filters'): try: filters = json.loads(unquote(unquote(kwargs['filters']))) except (ValueError, SyntaxError, TypeError) as ex: raise BadRequest(str(ex.message)) try: where = generate_request_filter(filters) except (AttributeError, KeyError, IndexError, FieldError, SyntaxError, TypeError, ValueError) as ex: raise BadRequest(str(ex.message)) try: templates = MailTemplate.objects.filter(where).order_by('name') except (AttributeError, KeyError, IndexError, FieldError, SyntaxError, TypeError, ValueError) as ex: raise BadRequest(str(ex.message)) return [model_to_dict(t) for t in templates]
def create(body): """ Create provider """ if 'email' not in body: raise BadRequest('Email field required') if len(Provider.objects.filter(email=body['email'])) > 1: raise BadRequest('Provider already exists') try: cat = None if body.get('defaultCategory'): cat = Category.objects.get(name=body['defaultCategory']) body.pop('defaultCategory', None) body = {k: v for k, v in body.iteritems() if k in PROVIDER_FIELDS} provider = Provider.objects.create(defaultCategory=cat, **body) return model_to_dict(provider) except (FieldError, IntegrityError, ObjectDoesNotExist) as ex: raise BadRequest(str(ex.message))
def update(prov, body): """ Update provider infos """ try: provider = Provider.objects.get(email=prov) except (ObjectDoesNotExist, ValueError): raise NotFound('Provider does not exist') try: body = {k: v for k, v in body.iteritems() if k in PROVIDER_FIELDS} cat = None if body.get('defaultCategory'): cat = Category.objects.get(name=body['defaultCategory']) body.pop('defaultCategory', None) Provider.objects.filter(pk=provider.pk).update(defaultCategory=cat, **body) provider = Provider.objects.get(pk=provider.pk) except (FieldError, IntegrityError, ObjectDoesNotExist) as ex: raise BadRequest(str(ex.message)) return model_to_dict(provider)
def add_tag(provider_email, body): """ Add provider tag """ try: tag = Tag.objects.get(**body) provider = Provider.objects.get(email=provider_email) if provider.__class__.__name__ != tag.tagType: raise BadRequest('Invalid tag for provider') provider.tags.add(tag) provider.save() except (KeyError, FieldError, IntegrityError, ObjectDoesNotExist, ValueError): raise NotFound('Provider or tag not found') return model_to_dict(provider)
def remove_tag(provider_email, tag_id): """ Remove defendant tag """ try: tag = Tag.objects.get(id=tag_id) provider = Provider.objects.get(email=provider_email) if provider.__class__.__name__ != tag.tagType: raise BadRequest('Invalid tag for provider') provider.tags.remove(tag) provider.save() except (ObjectDoesNotExist, FieldError, IntegrityError, ValueError): raise NotFound('Provider or tag not found') return model_to_dict(provider)
def __format_report_response(reports): """ Convert datetime object and add flat foreign key """ for rep in reports: # Flat foreign models if rep.get('defendant'): defendant = Defendant.objects.get(id=rep['defendant']) rep['defendant'] = model_to_dict(defendant) rep['defendant']['email'] = defendant.details.email if rep.get('plaintiff'): rep['plaintiff'] = model_to_dict(Plaintiff.objects.get(id=rep['plaintiff'])) if rep.get('service'): rep['service'] = model_to_dict(Service.objects.get(id=rep['service'])) if rep.get('provider'): rep['provider'] = ProvidersController.show(rep['provider']) if rep.get('tags'): tags = Report.objects.get(id=rep['id']).tags.all() rep['tags'] = [model_to_dict(tag) for tag in tags]
def post(self, request, *args, **kwargs): user = request.user team_info = json.loads(request.body.decode("utf-8")) validator = Validator(team_schema) if not validator.validate(team_info): return JsonResponse({'error': validator.errors}) survey_send_time = parser.parse(team_info['send_time']).replace(second=0, microsecond=0) summary_send_time = parser.parse(team_info['summary_time']).replace(second=0, microsecond=0) rule = recurrence.Rule(recurrence.WEEKLY, byday=team_info['days_of_week']) now = datetime.datetime.now() exdates = [] if survey_send_time < now: exdates.append(now.replace(hour=0, minute=0, second=0, microsecond=0)) rec = recurrence.Recurrence(rrules=[rule], exdates=exdates) try: team = Team.objects.create(admin=user, name=team_info['name']) except IntegrityError: return JsonResponse({'error': {"name": _("team with this name already exists")}}) Report.objects.create(team=team, recurrences=rec, survey_send_time=survey_send_time, summary_send_time=summary_send_time) team_dict = model_to_dict(team, exclude=['users']) team_dict['report'] = self.report_dict(team) return JsonResponse({'team': team_dict})
def receive_product_viewed(sender, product, user, request, response, **kwargs): """ :param sender: :param product: :param user: :param request: :param response: :param kwargs: :return: """ qs = filer_hookevent(product, 1) if qs: u = dict(user=user.username) if hasattr(user, 'email'): u.update(email=user.email) data = { "user": u, "product": model_to_dict(product) } run_hook_tasks_job(qs, data)
def receive_basket_addition(sender, request, product, user, **kwargs): """ Raised when a product is added to a basket. :param sender: :param request: :param product: :param user: :param kwargs: :return: """ qs = filer_hookevent(product, 4) if qs: data = { "user": dict(user=user.username, email=user.email), "product": model_to_dict(product) } run_hook_tasks_job(qs, data)
def receive_order_placed(sender, order, user, **kwargs): """ :param sender: :param order: :param user: :param kwargs: :return: """ for line in order.basket.lines.all(): product = line.product qs = filer_hookevent(product, 9) if qs: data = { "user": dict(user=user.username, email=user.email), "order": order.number, "product": model_to_dict(product), "price_excl_tax": line.price_excl_tax, "price_incl_tax": line.price_incl_tax, "quantity": line.quantity } run_hook_tasks_job(qs, data)
def get_submission_formsets_initial(instance): formset_initializers = [ ('measure', lambda sf: sf.measures.filter(category='6.1')), ('routinemeasure', lambda sf: sf.measures.filter(category='6.2')), ('nontesteduseddrug', lambda sf: sf.nontesteduseddrug_set.all()), ('participatingcenternonsubject', lambda sf: sf.participatingcenternonsubject_set.all()), ('foreignparticipatingcenter', lambda sf: sf.foreignparticipatingcenter_set.all()), ('investigator', lambda sf: sf.investigators.all()), ] formsets = {} for name, initial in formset_initializers: formsets[name] = [ model_to_dict(obj, exclude=('id',)) for obj in initial(instance).order_by('id') ] initial = [] if instance: for index, investigator in enumerate(instance.investigators.order_by('id')): for employee in investigator.employees.order_by('id'): employee_dict = model_to_dict(employee, exclude=('id', 'investigator')) employee_dict['investigator_index'] = index initial.append(employee_dict) formsets['investigatoremployee'] = initial return formsets
def post(self, request): """ Handling POST method :param json file with username and password :return: HttpResponse with superuser status and code 200 if user is invited or HttpResponseBadRequest if request contain incorrect data """ data = json.loads(request.body) login_form = LoginForm(data) if not login_form.is_valid(): return HttpResponseBadRequest('Invalid input data', status=401) username = data.get('username', None) password = data.get('password', None) user = auth.authenticate(username=username, password=password) if user: role = model_to_dict(User.objects.get(username=username)) response = HttpResponse('is_supeuser', status=200) response.set_cookie('role', value=role['is_superuser']) auth.login(request, user) return response else: return HttpResponseBadRequest("Incorrect email or password", status=401)
def get(self, request, template_id=None): """Handling GET method. Args: request: Request to View. template_id: id of retrieved template. Returns: if template_id is None returns all templates. otherwise returns single template by given template_id. """ if not template_id: xml_templates = XmlTemplate.get_all() data = [model_to_dict(i) for i in xml_templates] return HttpResponse(json.dumps(data)) xml_template = XmlTemplate.get_by_id(template_id) data = model_to_dict(xml_template) return HttpResponse(json.dumps(data))
def get(self, request, company_id): """ Handling GET method. :args request: Request to View. company_id: id of company to be returned. :return: HttpResponse with company fields and values by id. If user is not superuser and tries to get acces into foreign company returns HttpResponseBadRequest with 'Permission denied' massage. """ company_id = int(company_id) if (not request.user.is_superuser) and (company_id != request.user.adviseruser.id_company.id): return HttpResponseBadRequest("Permission denied") data = {"company" : model_to_dict(Company.get_company(company_id)), "users" : Company.get_company(company_id).get_users()} return HttpResponse(json.dumps(data))
def test_initialization(self): EXPECTED = { 'id': self.flow_cell.pk, 'demux_operator': None, 'owner': self.user.pk, 'description': 'Description', 'sequencing_machine': self.machine.pk, 'num_lanes': 8, 'status': models.FLOWCELL_STATUS_SEQ_COMPLETE, 'operator': 'John Doe', 'is_paired': True, 'index_read_count': 1, 'rta_version': models.RTA_VERSION_V2, 'read_length': 151, 'label': 'LABEL', 'run_date': datetime.date(2016, 3, 3), 'run_number': 815, 'slot': 'A', 'status': 'seq_complete', 'vendor_id': 'BCDEFGHIXX', } self.assertEqual(model_to_dict(self.flow_cell), EXPECTED)
def test_render(self): """Test that the flow cell delete POST works""" # Check precondition self.assertEqual(FlowCell.objects.all().count(), 1) # Simulate the POST with self.login(self.user): response = self.client.post( reverse('flowcell_delete', kwargs={'pk': self.flow_cell.pk})) # Check resulting database state self.assertEqual(FlowCell.objects.all().count(), 0) # Check call to sending emails self.email_mock.assert_called_once_with(self.user, ANY) m1 = model_to_dict(self.arg_flowcell) del m1['id'] m2 = model_to_dict(self.flow_cell) del m2['id'] self.assertEqual(m1, m2) # Check resulting response with self.login(self.user): self.assertRedirects( response, reverse('flowcell_list'))
def api_deploys(request): deploy = Deploy.objects.all().order_by('-created_at')[:10] deploy_list = list(deploy.values()) deploy_list_new = [] for i in deploy_list: host_data = model_to_dict(Host.objects.get(id=i["host_id"])) project_data = model_to_dict(Project.objects.get(id=i["project_id"])) user_data = model_to_dict(User.objects.get(id=i["user_id"]), fields=["created_at", "email", "id", "username"]) i["host"] = host_data i["user"] = user_data i["project"] = project_data deploy_list_new.append(i) deploy_data = dict(rc=0, data=dict(deploys=deploy_list_new, count=deploy.count())) return JsonResponse(deploy_data)
def add_view(self, request, form_url='', extra_context=None): # Prepopulate new configuration entries with the value of the current config, if given: if 'source' in request.GET: get = request.GET.copy() source_id = int(get.pop('source')[0]) source = get_object_or_404(self.model, pk=source_id) source_dict = models.model_to_dict(source) for field_name, field_value in source_dict.items(): # read files into request.FILES, if: # * user hasn't ticked the "clear" checkbox # * user hasn't uploaded a new file if field_value and isinstance(field_value, File): clear_checkbox_name = '{0}-clear'.format(field_name) if request.POST.get(clear_checkbox_name) != 'on': request.FILES.setdefault(field_name, field_value) get[field_name] = field_value request.GET = get # Call our grandparent's add_view, skipping the parent code # because the parent code has a different way to prepopulate new configuration entries # with the value of the latest config, which doesn't make sense for keyed models. # pylint: disable=bad-super-call return super(ConfigurationModelAdmin, self).add_view(request, form_url, extra_context)
def get_changeform_initial_data(self, request): """Sets the initial state of the Create FOIA form""" initial_data = super(FoiaAdmin, self).get_changeform_initial_data(request) initial_data['reporter'] = request.user.pk try: if request.user.specialperson.default_project: initial_data['tags'] = [request.user.specialperson.default_project] except SpecialPerson.DoesNotExist: pass if 'duplicatefoia' in request.GET: id_of_foia_to_dupe = request.GET['duplicatefoia'] foia_to_dupe = model_to_dict(Foia.objects.get(pk=id_of_foia_to_dupe)) attributes_to_dupe = { 'filed_date': foia_to_dupe['filed_date'], 'is_state_foia': foia_to_dupe['is_state_foia'], 'request_subject': foia_to_dupe['request_subject'], 'request_notes': foia_to_dupe['request_notes'] } initial_data.update(attributes_to_dupe) # nothing is done with this right now. # eventually I'd like to keep track of this. initial_data["duplicate_of"] = id_of_foia_to_dupe return initial_data
def form_args(self, bundle): data = bundle.data # Ensure we get a bound Form, regardless of the state of the bundle. if data is None: data = {} kwargs = {'data': {}} if hasattr(bundle.obj, 'pk'): if issubclass(self.form_class, ModelForm): kwargs['instance'] = bundle.obj kwargs['data'] = model_to_dict(bundle.obj) kwargs['data'].update(data) return kwargs
def _copy(self, id): from django.forms import model_to_dict instance = BKPPic.objects.get(id=id) kwargs = model_to_dict(instance, exclude=['id', 'status']) new_instance = BKPPic.objects.create(**kwargs) # copy targets targets = BKPTarget.objects.filter(bkp_pic_id=id).all() for t in targets: t.id = None t.bkp_pic_id = new_instance.id t.save() return ('msg', '????')
def raw_data(self): """Return data suitable for use in payment and billing forms""" data = model_to_dict(self) data['card_code'] = getattr(self, 'card_code') return data
def model_to_dict(instance, **kwargs): if kwargs.pop('all'): kwargs['fields'] = [field.name for field in instance._meta.fields] return model_to_dict_django(instance, **kwargs)
def profile(request): current_user_id = request.user.id user_histories = UserHistory.objects.all().filter(user_id=current_user_id) context = {"skipped_grammars": [], "completed_grammars": [], "percentile": 0, "user_info": {}} available_score = 0 earned_score = 0 # get data for each grammar that the user has completed for user_history in user_histories: grammar = Grammar.objects.get(pk=user_history.grammar_id) grammar_dict = model_to_dict(grammar, fields=["gid","prods","nonTerminals"]) stats_dict = model_to_dict(user_history, fields=["complete", "score", "updateTime"]) stats_dict.update(grammar_dict) nQuestions = (2 * len(stats_dict['nonTerminals']) + 2) available_score += nQuestions if stats_dict["complete"]: stats_dict["grammar_avg"] = get_grammar_avg(user_history.grammar_id) stats_dict["total_score"] = nQuestions context["completed_grammars"].append(stats_dict) earned_score += stats_dict["score"] else: context["skipped_grammars"].append(stats_dict) #sort grammarhistory based on time context["completed_grammars"] = sorted(context["completed_grammars"], key=lambda k: k['updateTime']) context["skipped_grammars"] = sorted(context["skipped_grammars"], key=lambda k: k['updateTime']) # get user information user_info = model_to_dict(User.objects.get(pk=current_user_id), ["first_name", "last_name", "data_joined", "email", "last_login"]) context["percentile"] = round(get_user_performance(current_user_id),2) context["user_info"] = user_info #pack up infos for the pie charts chart_stats = {"correct": earned_score, "wrong": available_score-earned_score} chart_stats.update(get_complete_rate(current_user_id)) context["chart_stats"] = json.dumps(chart_stats) return render(request, 'LL1_Academy/profile.html', context)
def shipping_address(self, address): address_data = model_to_dict(address) address_data['country'] = smart_text(address_data['country']) self.storage['shipping_address'] = address_data self.modified = True self._shipping_address = address
def billing_address(self, address): address_data = model_to_dict(address) address_data['country'] = smart_text(address_data['country']) self.storage['billing_address'] = address_data self.modified = True
def as_data(self, address): """Return the address as a dict suitable for passing as kwargs. Result does not contain the primary key or an associated user. """ data = model_to_dict(address, exclude=['id', 'user']) if isinstance(data['country'], Country): data['country'] = data['country'].code return data
def field_values(self): """Returns the user's field-values (can be encoded as e.g. JSON).""" return model_to_dict(self, exclude=['password'])
def as_dict(self): ret_dict = model_to_dict(self) ret_dict['requests'] = [r.as_dict for r in self.requests.all()] return ret_dict
def as_dict(self): ret_dict = model_to_dict(self) ret_dict['target'] = self.target.as_dict ret_dict['molecules'] = [m.as_dict for m in self.molecules.all()] ret_dict['location'] = self.location.as_dict ret_dict['constraints'] = self.constraints.as_dict ret_dict['windows'] = [w.as_dict for w in self.windows.all()] return ret_dict
def as_dict(self): return model_to_dict(self)
def get_all(): """ Get all threshold """ return [model_to_dict(thres) for thres in ReportThreshold.objects.all()]
def show(threshold_id): """ Get infos for specified threshold """ try: threshold = ReportThreshold.objects.get(id=threshold_id) return model_to_dict(threshold) except ValueError: raise BadRequest('Not a valid threshold id') except ObjectDoesNotExist: raise NotFound('Threshold not found')
def create(body): """ Create threshold """ try: category = Category.objects.get(name=body['category']) if ReportThreshold.objects.filter(category=category).exists(): raise BadRequest('Threshold already exists for this category') body['category'] = category threshold, created = ReportThreshold.objects.get_or_create(**body) except (KeyError, FieldError, IntegrityError, ObjectDoesNotExist): raise BadRequest('Missing or invalid fields in body') if not created: raise BadRequest('Threshold already exists') return model_to_dict(threshold)
def update(threshold_id, body): """ Update threshold """ try: threshold = ReportThreshold.objects.get(id=threshold_id) except (ObjectDoesNotExist, ValueError): raise NotFound('Threshold not found') try: body = {k: v for k, v in body.iteritems() if k in ['threshold', 'interval']} ReportThreshold.objects.filter(pk=threshold.pk).update(**body) threshold = ReportThreshold.objects.get(pk=threshold.pk) except (KeyError, FieldError, IntegrityError): raise BadRequest('Missing or invalid fields in body') return model_to_dict(threshold)
def index(**kwargs): """ Get all tags """ where = [Q()] if 'tagType' in kwargs: where.append(Q(tagType__iexact=kwargs['tagType'])) where = reduce(operator.and_, where) tags = Tag.objects.filter(where) return [model_to_dict(tag) for tag in tags]
def get_prefetch_preset(user, ticket_id, preset_id, lang=None): """ Prefetch preset with ticket infos """ action = params = None try: ticket = Ticket.objects.get(id=ticket_id) preset = TicketWorkflowPreset.objects.get(id=preset_id, roles=user.operator.role) if preset.config: action = model_to_dict(preset.config.action) if preset.config.params: params = {param.codename: param.value for param in preset.config.params.all()} preset = model_to_dict(preset) except (ObjectDoesNotExist, ValueError): raise NotFound('Ticket or preset not found') preset['templates'] = [] templates_codename = MailTemplate.objects.filter( ticketworkflowpreset=preset['id'] ).values_list( 'codename', flat=True ) templates_codename = list(set(templates_codename)) for codename in templates_codename: template = MailTemplate.objects.get(codename=codename) resp = TemplatesController.get_prefetch_template(ticket.id, template.id, lang=lang) preset['templates'].append(resp) preset['action'] = action if action and params: preset['action']['params'] = params return preset
def show(user, preset_id): """ Get given preset """ try: preset = TicketWorkflowPreset.objects.get(id=preset_id, roles=user.operator.role) except (IndexError, ObjectDoesNotExist, ValueError, TypeError): raise NotFound('Preset not found') action = params = None if preset.config: action = model_to_dict(preset.config.action) if preset.config.params: params = {param.codename: param.value for param in preset.config.params.all()} preset = model_to_dict(preset) preset['templates'] = MailTemplate.objects.filter(ticketworkflowpreset=preset['id']) preset['templates'] = [model_to_dict(m) for m in preset['templates']] preset['action'] = action if action and params: preset['action']['params'] = params preset['roles'] = list(TicketWorkflowPreset.objects.get( id=preset['id'] ).roles.all().values_list( 'codename', flat=True ).distinct()) return preset