我们从Python开源项目中,提取了以下27个代码示例,用于说明如何使用django.db.models.fields.DurationField()。
def as_sql(self, compiler, connection): try: lhs_output = self.lhs.output_field except FieldError: lhs_output = None try: rhs_output = self.rhs.output_field except FieldError: rhs_output = None if (not connection.features.has_native_duration_field and ((lhs_output and lhs_output.get_internal_type() == 'DurationField') or (rhs_output and rhs_output.get_internal_type() == 'DurationField'))): return DurationExpression(self.lhs, self.connector, self.rhs).as_sql(compiler, connection) expressions = [] expression_params = [] sql, params = compiler.compile(self.lhs) expressions.append(sql) expression_params.extend(params) sql, params = compiler.compile(self.rhs) expressions.append(sql) expression_params.extend(params) # order of precedence expression_wrapper = '(%s)' sql = connection.ops.combine_expression(self.connector, expressions) return expression_wrapper % sql, expression_params
def _combine(self, other, connector, reversed, node=None): if not hasattr(other, 'resolve_expression'): # everything must be resolvable to an expression if isinstance(other, datetime.timedelta): other = DurationValue(other, output_field=fields.DurationField()) else: other = Value(other) if reversed: return CombinedExpression(other, connector, self) return CombinedExpression(self, connector, other) ############# # OPERATORS # #############
def compile(self, side, compiler, connection): if not isinstance(side, DurationValue): try: output = side.output_field except FieldError: pass else: if output.get_internal_type() == 'DurationField': sql, params = compiler.compile(side) return connection.ops.format_for_duration_arithmetic(sql), params return compiler.compile(side)
def as_sql(self, compiler, connection): try: lhs_output = self.lhs.output_field except FieldError: lhs_output = None try: rhs_output = self.rhs.output_field except FieldError: rhs_output = None if (not connection.features.has_native_duration_field and ((lhs_output and lhs_output.get_internal_type() == 'DurationField') or (rhs_output and rhs_output.get_internal_type() == 'DurationField'))): return DurationExpression(self.lhs, self.connector, self.rhs).as_sql(compiler, connection) if (lhs_output and rhs_output and self.connector == self.SUB and lhs_output.get_internal_type() in {'DateField', 'DateTimeField', 'TimeField'} and lhs_output.get_internal_type() == lhs_output.get_internal_type()): return TemporalSubtraction(self.lhs, self.rhs).as_sql(compiler, connection) expressions = [] expression_params = [] sql, params = compiler.compile(self.lhs) expressions.append(sql) expression_params.extend(params) sql, params = compiler.compile(self.rhs) expressions.append(sql) expression_params.extend(params) # order of precedence expression_wrapper = '(%s)' sql = connection.ops.combine_expression(self.connector, expressions) return expression_wrapper % sql, expression_params
def __init__(self, lhs, rhs): super(TemporalSubtraction, self).__init__(lhs, self.SUB, rhs, output_field=fields.DurationField())
def as_sql(self, compiler, connection): try: lhs_output = self.lhs.output_field except FieldError: lhs_output = None try: rhs_output = self.rhs.output_field except FieldError: rhs_output = None if (not connection.features.has_native_duration_field and ((lhs_output and lhs_output.get_internal_type() == 'DurationField') or (rhs_output and rhs_output.get_internal_type() == 'DurationField'))): return DurationExpression(self.lhs, self.connector, self.rhs).as_sql(compiler, connection) if (lhs_output and rhs_output and self.connector == self.SUB and lhs_output.get_internal_type() in {'DateField', 'DateTimeField', 'TimeField'} and lhs_output.get_internal_type() == rhs_output.get_internal_type()): return TemporalSubtraction(self.lhs, self.rhs).as_sql(compiler, connection) expressions = [] expression_params = [] sql, params = compiler.compile(self.lhs) expressions.append(sql) expression_params.extend(params) sql, params = compiler.compile(self.rhs) expressions.append(sql) expression_params.extend(params) # order of precedence expression_wrapper = '(%s)' sql = connection.ops.combine_expression(self.connector, expressions) return expression_wrapper % sql, expression_params
def get_avg_duration(query_set): duration = ExpressionWrapper(F('updated_datetime') - F('requested_datetime'), output_field=fields.DurationField()) duration_list = query_set.annotate(duration=duration).values_list("duration", flat=True) if not duration_list: return datetime.timedelta(0) return sum(duration_list, datetime.timedelta(0)) / len(duration_list) # Returns median duration of closed issues (updated_datetime - requested_datetime) # from given category. Returns a tuple (days, hours)
def get_median_duration(query_set): duration = ExpressionWrapper(F('updated_datetime') - F('requested_datetime'), output_field=fields.DurationField()) duration_list = sorted(query_set.annotate(duration=duration).values_list("duration", flat=True)) if not duration_list: return datetime.timedelta(0) return duration_list[(len(duration_list) - 1) // 2] # Concerts timedelta into millisoconds
def overview(request, event_url_name): event = get_object_or_404(Event, url_name=event_url_name) # permission if not event.is_admin(request.user): return nopermission(request) num_helpers = event.helper_set.count() num_coordinators = 0 timeline = {} for helper in event.helper_set.all(): if helper.is_coordinator: num_coordinators += 1 else: day = helper.timestamp.strftime('%Y-%m-%d') if day in timeline: timeline[day] += 1 else: timeline[day] = 1 num_vegetarians = event.helper_set.filter(vegetarian=True).count() num_shift_slots = Shift.objects.filter(job__event=event).aggregate( Sum('number'))['number__sum'] empty_slots_expr = ExpressionWrapper(F('number') - F('num_helpers'), output_field=fields.IntegerField()) num_empty_shift_slots = Shift.objects.filter(job__event=event) \ .annotate(num_helpers=Count('helper')) \ .annotate(empty_slots=empty_slots_expr) \ .aggregate(Sum('empty_slots'))['empty_slots__sum'] total_duration = ExpressionWrapper((F('end') - F('begin')) * F('number'), output_field=fields.DurationField()) hours_total = Shift.objects.filter(job__event=event) \ .annotate(duration=total_duration) \ .aggregate(Sum('duration'))['duration__sum'] # sum up timeline timeline = OrderedDict(sorted(timeline.items())) timeline_sum = OrderedDict() tmp = 0 for day in timeline: tmp += timeline[day] timeline_sum[day] = tmp # render context = {'event': event, 'num_helpers': num_helpers, 'num_coordinators': num_coordinators, 'num_vegetarians': num_vegetarians, 'num_shift_slots': num_shift_slots, 'num_empty_shift_slots': num_empty_shift_slots, 'hours_total': hours_total, 'timeline': timeline_sum} return render(request, 'statistic/overview.html', context)