我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用django.db.models.expressions.Value()。
def as_postgresql(self, compiler, connection): geo_field = GeometryField(srid=self.srid) # Fake field to get SRID info src_field = self.get_source_fields()[0] geography = src_field.geography and self.srid == 4326 if geography: # Set parameters as geography if base field is geography for pos, expr in enumerate( self.source_expressions[self.geom_param_pos + 1:], start=self.geom_param_pos + 1): if isinstance(expr, GeomValue): expr.geography = True elif geo_field.geodetic(connection): # Geometry fields with geodetic (lon/lat) coordinates need special distance functions if self.spheroid: self.function = 'ST_Distance_Spheroid' # More accurate, resource intensive # Replace boolean param by the real spheroid of the base field self.source_expressions[2] = Value(geo_field._spheroid) else: self.function = 'ST_Distance_Sphere' return super(Distance, self).as_sql(compiler, connection)
def batch_process_rhs(self, compiler, connection, rhs=None): if rhs is None: rhs = self.rhs if self.bilateral_transforms: sqls, sqls_params = [], [] for p in rhs: value = Value(p, output_field=self.lhs.output_field) value = self.apply_bilateral_transforms(value) value = value.resolve_expression(compiler.query) sql, sql_params = compiler.compile(value) sqls.append(sql) sqls_params.extend(sql_params) else: params = self.lhs.output_field.get_db_prep_lookup( self.lookup_name, rhs, connection, prepared=True) sqls, sqls_params = ['%s'] * len(params), params return sqls, sqls_params
def process_rhs(self, compiler, connection): value = self.rhs if self.bilateral_transforms: if self.rhs_is_direct_value(): # Do not call get_db_prep_lookup here as the value will be # transformed before being used for lookup value = Value(value, output_field=self.lhs.output_field) value = self.apply_bilateral_transforms(value) value = value.resolve_expression(compiler.query) # Due to historical reasons there are a couple of different # ways to produce sql here. get_compiler is likely a Query # instance, _as_sql QuerySet and as_sql just something with # as_sql. Finally the value can of course be just plain # Python value. if hasattr(value, 'get_compiler'): value = value.get_compiler(connection=connection) if hasattr(value, 'as_sql'): sql, params = compiler.compile(value) return '(' + sql + ')', params if hasattr(value, '_as_sql'): sql, params = value._as_sql(connection=connection) return '(' + sql + ')', params else: return self.get_db_prep_lookup(value, connection)
def as_postgresql(self, compiler, connection): geo_field = GeometryField(srid=self.srid) # Fake field to get SRID info if self.source_is_geography(): # Set parameters as geography if base field is geography for pos, expr in enumerate( self.source_expressions[self.geom_param_pos + 1:], start=self.geom_param_pos + 1): if isinstance(expr, GeomValue): expr.geography = True elif geo_field.geodetic(connection): # Geometry fields with geodetic (lon/lat) coordinates need special distance functions if self.spheroid: self.function = 'ST_Distance_Spheroid' # More accurate, resource intensive # Replace boolean param by the real spheroid of the base field self.source_expressions[2] = Value(geo_field._spheroid) else: self.function = 'ST_Distance_Sphere' return super(Distance, self).as_sql(compiler, connection)
def as_postgresql(self, compiler, connection): geo_field = GeometryField(srid=self.srid) # Fake field to get SRID info if self.source_is_geography(): # Set parameters as geography if base field is geography for pos, expr in enumerate( self.source_expressions[self.geom_param_pos + 1:], start=self.geom_param_pos + 1): if isinstance(expr, GeomValue): expr.geography = True elif geo_field.geodetic(connection): # Geometry fields with geodetic (lon/lat) coordinates need special distance functions if self.spheroid: # DistanceSpheroid is more accurate and resource intensive than DistanceSphere self.function = connection.ops.spatial_function_name('DistanceSpheroid') # Replace boolean param by the real spheroid of the base field self.source_expressions[2] = Value(geo_field._spheroid) else: self.function = connection.ops.spatial_function_name('DistanceSphere') return super(Distance, self).as_sql(compiler, connection)
def batch_process_rhs(self, compiler, connection, rhs=None): if rhs is None: rhs = self.rhs if self.bilateral_transforms: sqls, sqls_params = [], [] for p in rhs: value = Value(p, output_field=self.lhs.output_field) value = self.apply_bilateral_transforms(value) value = value.resolve_expression(compiler.query) sql, sql_params = compiler.compile(value) sqls.append(sql) sqls_params.extend(sql_params) else: _, params = self.get_db_prep_lookup(rhs, connection) sqls, sqls_params = ['%s'] * len(params), params return sqls, sqls_params
def process_rhs(self, compiler, connection): value = self.rhs if self.bilateral_transforms: if self.rhs_is_direct_value(): # Do not call get_db_prep_lookup here as the value will be # transformed before being used for lookup value = Value(value, output_field=self.lhs.output_field) value = self.apply_bilateral_transforms(value) value = value.resolve_expression(compiler.query) # Due to historical reasons there are a couple of different # ways to produce sql here. get_compiler is likely a Query # instance and as_sql just something with as_sql. Finally the value # can of course be just plain Python value. if hasattr(value, 'get_compiler'): value = value.get_compiler(connection=connection) if hasattr(value, 'as_sql'): sql, params = compiler.compile(value) return '(' + sql + ')', params else: return self.get_db_prep_lookup(value, connection)
def __init__(self, expression, pos, length=None, **extra): """ expression: the name of a field, or an expression returning a string pos: an integer > 0, or an expression returning an integer length: an optional number of characters to return """ if not hasattr(pos, 'resolve_expression'): if pos < 1: raise ValueError("'pos' must be greater than 0") pos = Value(pos) expressions = [expression, pos] if length is not None: if not hasattr(length, 'resolve_expression'): length = Value(length) expressions.append(length) super(Substr, self).__init__(*expressions, **extra)
def with_status_name(self): case = Case(output_field=models.CharField()) for status_value in self.model.STATUS._db_values: case.cases.append( When(status=status_value, then=Value(str(self.model.STATUS[status_value]))), ) return self.annotate(status_name=case)
def as_postgresql(self, compiler, connection): geo_field = GeometryField(srid=self.srid) # Fake field to get SRID info src_field = self.get_source_fields()[0] geography = src_field.geography and self.srid == 4326 if geography: self.source_expressions.append(Value(self.spheroid)) elif geo_field.geodetic(connection): # Geometry fields with geodetic (lon/lat) coordinates need length_spheroid self.function = 'ST_Length_Spheroid' self.source_expressions.append(Value(geo_field._spheroid)) else: dim = min(f.dim for f in self.get_source_fields() if f) if dim > 2: self.function = connection.ops.length3d return super(Length, self).as_sql(compiler, connection)
def as_sqlite(self, compiler, connection): func_name = connection.ops.spatial_function_name(self.name) if func_name == 'ST_Translate' and len(self.source_expressions) < 4: # Always provide the z parameter for ST_Translate (Spatialite >= 3.1) self.source_expressions.append(Value(0)) elif func_name == 'ShiftCoords' and len(self.source_expressions) > 3: raise ValueError("This version of Spatialite doesn't support 3D") return super(Translate, self).as_sqlite(compiler, connection)
def as_postgresql(self, compiler, connection): geo_field = GeometryField(srid=self.srid) # Fake field to get SRID info if self.source_is_geography(): self.source_expressions.append(Value(self.spheroid)) elif geo_field.geodetic(connection): # Geometry fields with geodetic (lon/lat) coordinates need length_spheroid self.function = 'ST_Length_Spheroid' self.source_expressions.append(Value(geo_field._spheroid)) else: dim = min(f.dim for f in self.get_source_fields() if f) if dim > 2: self.function = connection.ops.length3d return super(Length, self).as_sql(compiler, connection)
def __init__(self, *expressions, **extra): super(SearchVector, self).__init__(*expressions, **extra) self.source_expressions = [ Coalesce(expression, Value('')) for expression in self.source_expressions ] self.config = self.extra.get('config', self.config) weight = self.extra.get('weight') if weight is not None and not hasattr(weight, 'resolve_expression'): weight = Value(weight) self.weight = weight
def resolve_expression(self, query=None, allow_joins=True, reuse=None, summarize=False, for_save=False): resolved = super(SearchVector, self).resolve_expression(query, allow_joins, reuse, summarize, for_save) if self.config: if not hasattr(self.config, 'resolve_expression'): resolved.config = Value(self.config).resolve_expression(query, allow_joins, reuse, summarize, for_save) else: resolved.config = self.config.resolve_expression(query, allow_joins, reuse, summarize, for_save) return resolved
def __init__(self, vector, query, **extra): if not hasattr(vector, 'resolve_expression'): vector = SearchVector(vector) if not hasattr(query, 'resolve_expression'): query = SearchQuery(query) weights = extra.get('weights') if weights is not None and not hasattr(weights, 'resolve_expression'): weights = Value(weights) self.weights = weights super(SearchRank, self).__init__(vector, query, **extra)
def __init__(self, expression, string, **extra): if not hasattr(string, 'resolve_expression'): string = Value(string) super(TrigramBase, self).__init__(expression, string, output_field=FloatField(), **extra)
def as_postgresql(self, compiler, connection): geo_field = GeometryField(srid=self.srid) # Fake field to get SRID info if self.source_is_geography(): self.source_expressions.append(Value(self.spheroid)) elif geo_field.geodetic(connection): # Geometry fields with geodetic (lon/lat) coordinates need length_spheroid self.function = connection.ops.spatial_function_name('LengthSpheroid') self.source_expressions.append(Value(geo_field._spheroid)) else: dim = min(f.dim for f in self.get_source_fields() if f) if dim > 2: self.function = connection.ops.length3d return super(Length, self).as_sql(compiler, connection)
def as_sqlite(self, compiler, connection): if len(self.source_expressions) < 4: # Always provide the z parameter for ST_Translate self.source_expressions.append(Value(0)) return super(Translate, self).as_sqlite(compiler, connection)