我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用django.db.models.fields.DateTimeField()。
def to_dict(self): d = model_to_dict(self) tastyjson = self._serializer.to_json(d) d = self._serializer.from_json(tastyjson) d[DOC_TYPE_FIELD_NAME] = self.get_doc_type() d['id'] = self.get_id() if 'cbnosync_ptr' in d: del d['cbnosync_ptr'] if 'csrfmiddlewaretoken' in d: del d['csrfmiddlewaretoken'] for field in self._meta.fields: if isinstance(field, DateTimeField): d[field.name] = self._string_from_date(field.name) if isinstance(field, ListField): if isinstance(field.item_field, EmbeddedModelField): self.to_dict_nested_list(field.name, d) if isinstance(field.item_field, ModelReferenceField): self.to_dict_reference_list(field.name, d) if isinstance(field, EmbeddedModelField): self.to_dict_nested(field.name, d) if isinstance(field, ModelReferenceField): self.to_dict_reference(field.name, d) return d
def check_expression_support(self, expression): bad_fields = (fields.DateField, fields.DateTimeField, fields.TimeField) bad_aggregates = (aggregates.Sum, aggregates.Avg, aggregates.Variance, aggregates.StdDev) if isinstance(expression, bad_aggregates): for expr in expression.get_source_expressions(): try: output_field = expr.output_field if isinstance(output_field, bad_fields): raise NotImplementedError( 'You cannot use Sum, Avg, StdDev, and Variance ' 'aggregations on date/time fields in sqlite3 ' 'since date/time is saved as text.' ) except FieldError: # Not every subexpression has an output_field which is fine # to ignore. pass
def from_dict(self, dict_payload): for field in self._meta.fields: if field.name not in dict_payload: continue if isinstance(field, EmbeddedModelField): self.from_dict_nested(field.name, field.embedded_model, dict_payload) continue if isinstance(field, ListField): if isinstance(field.item_field, EmbeddedModelField): self.from_dict_nested_list(field.name, field.item_field.embedded_model, dict_payload) continue if isinstance(field, DateTimeField): self._date_from_string(field.name, dict_payload.get(field.name)) elif isinstance(field, DecimalField): self._decimal_from_string(field.name, dict_payload.get(field.name)) elif field.name in dict_payload: setattr(self, field.name, dict_payload[field.name]) if 'id' in dict_payload.keys(): self.id = dict_payload['id']
def get_django_field_map(self): from django.db.models import fields as djf return [ (djf.AutoField, PrimaryKeyField), (djf.BigIntegerField, BigIntegerField), # (djf.BinaryField, BlobField), (djf.BooleanField, BooleanField), (djf.CharField, CharField), (djf.DateTimeField, DateTimeField), # Extends DateField. (djf.DateField, DateField), (djf.DecimalField, DecimalField), (djf.FilePathField, CharField), (djf.FloatField, FloatField), (djf.IntegerField, IntegerField), (djf.NullBooleanField, partial(BooleanField, null=True)), (djf.TextField, TextField), (djf.TimeField, TimeField), (djf.related.ForeignKey, ForeignKeyField), ]
def get_db_converters(self, expression): converters = super(DatabaseOperations, self).get_db_converters(expression) internal_type = expression.output_field.get_internal_type() if internal_type == 'DateTimeField': converters.append(self.convert_datetimefield_value) elif internal_type == 'DateField': converters.append(self.convert_datefield_value) elif internal_type == 'TimeField': converters.append(self.convert_timefield_value) elif internal_type == 'DecimalField': converters.append(self.convert_decimalfield_value) elif internal_type == 'UUIDField': converters.append(self.convert_uuidfield_value) elif internal_type in ('NullBooleanField', 'BooleanField'): converters.append(self.convert_booleanfield_value) return converters
def results_iter(self): if self.connection.ops.oracle: from django.db.models.fields import DateTimeField fields = [DateTimeField()] else: needs_string_cast = self.connection.features.needs_datetime_string_cast offset = len(self.query.extra_select) for rows in self.execute_sql(MULTI): for row in rows: date = row[offset] if self.connection.ops.oracle: date = self.resolve_columns(row, fields)[offset] elif needs_string_cast: date = typecast_date(str(date)) if isinstance(date, datetime.datetime): date = date.date() yield date
def results_iter(self): if self.connection.ops.oracle: from django.db.models.fields import DateTimeField fields = [DateTimeField()] else: needs_string_cast = self.connection.features.needs_datetime_string_cast offset = len(self.query.extra_select) for rows in self.execute_sql(MULTI): for row in rows: datetime = row[offset] if self.connection.ops.oracle: datetime = self.resolve_columns(row, fields)[offset] elif needs_string_cast: datetime = typecast_timestamp(str(datetime)) # Datetimes are artifically returned in UTC on databases that # don't support time zone. Restore the zone used in the query. if settings.USE_TZ: datetime = datetime.replace(tzinfo=None) datetime = timezone.make_aware(datetime, self.query.tzinfo) yield datetime
def annotate_channel_queryset_with_latest_activity_at(queryset, user): return queryset.annotate( latest_activity_timestamp=Max('action_targets__timestamp'), ).annotate( latest_activity_at=Case( When( latest_activity_timestamp__isnull=True, then='created_at' ), When( latest_activity_timestamp__gt=F('created_at'), then='latest_activity_timestamp' ), default='created_at', output_field=DateTimeField() ) )
def _encodeValue(self, field, value): if value is fields.NOT_PROVIDED: return pyamf.Undefined if value is None: return value # deal with dates .. if isinstance(field, fields.DateTimeField): return value elif isinstance(field, fields.DateField): return datetime.datetime( value.year, value.month, value.day, 0, # hour 0, # minute 0, # second ) elif isinstance(field, fields.TimeField): return datetime.datetime( 1970, # year 1, # month 1, # day value.hour, value.minute, value.second, value.microsecond ) elif isinstance(value, files.FieldFile): return value.name return value
def _decodeValue(self, field, value): if value is pyamf.Undefined: return fields.NOT_PROVIDED if isinstance(field, fields.AutoField) and value == 0: return None elif isinstance(field, fields.DateTimeField): # deal with dates return value elif isinstance(field, fields.DateField): if not value: return None return datetime.date(value.year, value.month, value.day) elif isinstance(field, fields.TimeField): if not value: return None return datetime.time( value.hour, value.minute, value.second, value.microsecond, ) return value
def get_db_converters(self, expression): converters = super(DatabaseOperations, self).get_db_converters(expression) internal_type = expression.output_field.get_internal_type() if internal_type == 'DateTimeField': converters.append(self.convert_datetimefield_value) elif internal_type == 'DateField': converters.append(self.convert_datefield_value) elif internal_type == 'TimeField': converters.append(self.convert_timefield_value) elif internal_type == 'DecimalField': converters.append(self.convert_decimalfield_value) elif internal_type == 'UUIDField': converters.append(self.convert_uuidfield_value) return converters
def resolve_expression(self, query=None, allow_joins=True, reuse=None, summarize=False, for_save=False): copy = self.copy() copy.col = query.resolve_ref(self.lookup, allow_joins, reuse, summarize) field = copy.col.output_field assert isinstance(field, fields.DateField), "%r isn't a DateField." % field.name if settings.USE_TZ: assert not isinstance(field, fields.DateTimeField), ( "%r is a DateTimeField, not a DateField." % field.name ) return copy
def __init__(self, lookup, lookup_type, tzinfo): super(DateTime, self).__init__(output_field=fields.DateTimeField()) self.lookup = lookup self.col = None self.lookup_type = lookup_type if tzinfo is None: self.tzname = None else: self.tzname = timezone._get_timezone_name(tzinfo) self.tzinfo = tzinfo
def resolve_expression(self, query=None, allow_joins=True, reuse=None, summarize=False, for_save=False): copy = self.copy() copy.col = query.resolve_ref(self.lookup, allow_joins, reuse, summarize) field = copy.col.output_field assert isinstance(field, fields.DateTimeField), ( "%r isn't a DateTimeField." % field.name ) return copy
def check_expression_support(self, expression): bad_fields = (fields.DateField, fields.DateTimeField, fields.TimeField) bad_aggregates = (aggregates.Sum, aggregates.Avg, aggregates.Variance, aggregates.StdDev) if isinstance(expression, bad_aggregates): try: output_field = expression.input_field.output_field if isinstance(output_field, bad_fields): raise NotImplementedError( 'You cannot use Sum, Avg, StdDev and Variance aggregations ' 'on date/time fields in sqlite3 ' 'since date/time is saved as text.') except FieldError: # not every sub-expression has an output_field which is fine to # ignore pass
def __init__(self, output_field=None, **extra): if output_field is None: output_field = fields.DateTimeField() super(Now, self).__init__(output_field=output_field, **extra)
def year_lookup_bounds(self, connection, year): output_field = self.lhs.lhs.output_field if isinstance(output_field, DateTimeField): bounds = connection.ops.year_lookup_bounds_for_datetime_field(year) else: bounds = connection.ops.year_lookup_bounds_for_date_field(year) return bounds
def korben_view(request, model): """View for Korben.""" data = request.data try: obj = model.objects.get(pk=data['id']) for key, value in data.items(): setattr(obj, key, value) except model.DoesNotExist: obj = model(**data) # create datetime objects for datetime fields for field in obj._meta.fields: if isinstance(field, DateTimeField): try: date_obj = parse_date(getattr(obj, field.name, None)) setattr(obj, field.name, date_obj) except (ValueError, AttributeError): if field.null: pass else: return Response(data=data, status=HTTP_400_BAD_REQUEST) obj.save(as_korben=True) # data comes from Korben, kill validation return Response(data={'message': 'OK'})
def _prepare_data(self, data): """ Prepare data for addition to the tree. If the data is a list or tuple, it is expected to be of the form (obj, lookup_type, value), where obj is a Constraint object, and is then slightly munged before being stored (to avoid storing any reference to field objects). Otherwise, the 'data' is stored unchanged and can be any class with an 'as_sql()' method. """ if not isinstance(data, (list, tuple)): return data obj, lookup_type, value = data if isinstance(value, collections.Iterator): # Consume any generators immediately, so that we can determine # emptiness and transform any non-empty values correctly. value = list(value) # The "value_annotation" parameter is used to pass auxiliary information # about the value(s) to the query construction. Specifically, datetime # and empty values need special handling. Other types could be used # here in the future (using Python types is suggested for consistency). if (isinstance(value, datetime.datetime) or (isinstance(obj.field, DateTimeField) and lookup_type != 'isnull')): value_annotation = datetime.datetime elif hasattr(value, 'value_annotation'): value_annotation = value.value_annotation else: value_annotation = bool(value) if hasattr(obj, 'prepare'): value = obj.prepare(lookup_type, value) return (obj, lookup_type, value_annotation, value)
def export_selected_data(self,request,queryset): ops = self.model._meta workbook = xlwt.Workbook(encoding='utf-8') dd = datetime.date.today().strftime('%Y%m%d') file_name = force_text(ops.verbose_name+dd) sheet = workbook.add_sheet(force_text(ops.verbose_name)) obj_fields = getattr(self,'export_fields',None) or self.list_display or self.fields head_col_index = 0 for field in obj_fields: col_name = field try: f = ops.get_field(field) col_name = f.verbose_name except Exception,e: f = getattr(self.model,field) if hasattr(f,'short_description'): col_name = f.short_description sheet.write(0,head_col_index,force_text(col_name)) head_col_index+=1 row_index = 1 for obj in queryset: col_index = 0 for field in obj_fields: f = field try: f = ops.get_field(field) except Exception,e: pass v = getattr(obj,field,'') if hasattr(v,'__call__') or callable(v): v = v() elif type(f) == fields.DateField: v = v.strftime('%Y-%m-%d') elif type(f) == fields.DateTimeField: v = v.strftime('%Y-%m-%d %H:%M') elif type(f) == fields.CharField and f.choices: fc = 'get_'+field+'_display' v = getattr(obj,fc)() elif type(f) == related.ForeignKey: v = str(v) sheet.write(row_index,col_index,v) col_index += 1 row_index += 1 response = HttpResponse(content_type='application/vnd.ms-excel') agent = request.META.get('HTTP_USER_AGENT') nn = smart_str(file_name) if agent and re.search('MSIE',agent): nn = urlquote(file_name) response['Content-Disposition'] = 'attachment; filename=%s.xls'%nn workbook.save(response) return response #self.message_user(request,'SUCCESS')
def update_local_issue( gr_issue, id_namespace='', service_namespace='', ): """ :param gr_issue: GeoReportv2 Issue structure (as a dict) :param id_namespace: String to prepend to request identifiers :param service_namespace: String to prepend to service codes :return: The created/updated Issue and a `created` flag """ gr_issue = deepcopy(gr_issue) identifier = gr_issue.pop('service_request_id') if id_namespace: identifier = '%s:%s' % (id_namespace, identifier) issue = Issue.objects.filter(identifier=identifier).first() if not issue: issue = Issue(identifier=identifier) created = True else: created = False for field in Issue._meta.get_fields(): if field.name in gr_issue: value = gr_issue.pop(field.name) if isinstance(field, DateTimeField): value = parse_date(value) setattr(issue, field.attname, value) if "long" in gr_issue and "lat" in gr_issue: issue.location = GEOSGeometry( 'SRID=4326;POINT(%s %s)' % (gr_issue.pop('long'), gr_issue.pop('lat')) ) if 'service_code' in gr_issue: gr_issue['service_code'] = '%s%s' % (service_namespace, gr_issue['service_code']) # This has no direct mapping in our schema, but it can be used by implicit autocreation of services issue.service_name = gr_issue.pop('service_name', None) issue._cache_data() issue.full_clean() issue.save() extended_attributes = gr_issue.pop('extended_attributes', {}) for ex_class in get_extensions(): ex = ex_class() ex.parse_extended_attributes(issue, extended_attributes) if gr_issue: print(gr_issue) issue.source = gr_issue return (issue, created)
def handle(self, *args, **options): """ Update periodic update events and send notifications for upcoming update events. """ # command to run: python manage.py tunga_manage_task_status # Choose tasks that aren't closed or under review already tasks_filter = Task.objects.filter( scope=TASK_SCOPE_TASK, closed=False, review=False ).annotate( activated_at=Case( When( approved_at__isnull=True, then='created_at' ), default='approved_at', output_field=DateTimeField() ) ) utc_now = datetime.datetime.utcnow() # Remind admins and devs about approved tasks with no applications 2 days after creation or approval min_date_no_applications = utc_now - relativedelta(days=2) min_date_no_developer_selected = utc_now - relativedelta(days=10) tasks_no_applications = tasks_filter.filter( approved=True, participants__isnull=False, activated_at__range=[ min_date_no_developer_selected, min_date_no_applications ] ) for task in tasks_no_applications: # Remind admins remind_no_task_applications.delay(task.id, admin=True) # Remind devs remind_no_task_applications.delay(task.id, admin=False) # Remind admins to take action on tasks with no accepted applications 10 days after creation or approval tasks_no_developers_selected = tasks_filter.filter( participants__isnull=True, created_at__lte=min_date_no_developer_selected ) for task in tasks_no_developers_selected: # Put task in review task.review = True task.save() # Notify admins to take action notify_review_task_admin.delay(task.id)