df在Spark中有一个数据框:
df
|-- array_field: array (nullable = true) | |-- element: struct (containsNull = true) | | |-- a: string (nullable = true) | | |-- b: long (nullable = true) | | |-- c: long (nullable = true)
如何将字段重命名array_field.a为array_field.a_renamed?
array_field.a
array_field.a_renamed
[更新]:
.withColumnRenamed() 不适用于嵌套字段,因此我尝试了这种hacky和不安全的方法:
.withColumnRenamed()
# First alter the schema: schema = df.schema schema['array_field'].dataType.elementType['a'].name = 'a_renamed' ind = schema['array_field'].dataType.elementType.names.index('a') schema['array_field'].dataType.elementType.names[ind] = 'a_renamed' # Then set dataframe's schema with altered schema df._schema = schema
我知道设置私有属性不是一个好习惯,但我不知道其他为df设置架构的方法
我觉得我是在一个正确的轨道,但df.printSchema()仍显示为旧名array_field.a,虽然df.schema == schema是True
df.printSchema()
df.schema == schema
True
蟒蛇
无法修改单个嵌套字段。您必须重新创建一个整体结构。在这种特殊情况下,最简单的解决方案是使用cast。
cast
首先是一堆进口商品:
from collections import namedtuple from pyspark.sql.functions import col from pyspark.sql.types import ( ArrayType, LongType, StringType, StructField, StructType)
和示例数据:
Record = namedtuple("Record", ["a", "b", "c"]) df = sc.parallelize([([Record("foo", 1, 3)], )]).toDF(["array_field"])
让我们确认模式与您的情况相同:
df.printSchema() root |-- array_field: array (nullable = true) | |-- element: struct (containsNull = true) | | |-- a: string (nullable = true) | | |-- b: long (nullable = true) | | |-- c: long (nullable = true)
您可以将新模式定义为例如字符串:
str_schema = "array<struct<a_renamed:string,b:bigint,c:bigint>>" df.select(col("array_field").cast(str_schema)).printSchema() root |-- array_field: array (nullable = true) | |-- element: struct (containsNull = true) | | |-- a_renamed: string (nullable = true) | | |-- b: long (nullable = true) | | |-- c: long (nullable = true)
或DataType:
DataType
struct_schema = ArrayType(StructType([ StructField("a_renamed", StringType()), StructField("b", LongType()), StructField("c", LongType()) ])) df.select(col("array_field").cast(struct_schema)).printSchema() root |-- array_field: array (nullable = true) | |-- element: struct (containsNull = true) | | |-- a_renamed: string (nullable = true) | | |-- b: long (nullable = true) | | |-- c: long (nullable = true)
斯卡拉
可以在Scala中使用相同的技术:
case class Record(a: String, b: Long, c: Long) val df = Seq(Tuple1(Seq(Record("foo", 1, 3)))).toDF("array_field") val strSchema = "array<struct<a_renamed:string,b:bigint,c:bigint>>" df.select($"array_field".cast(strSchema))
要么
import org.apache.spark.sql.types._ val structSchema = ArrayType(StructType(Seq( StructField("a_renamed", StringType), StructField("b", LongType), StructField("c", LongType) ))) df.select($"array_field".cast(structSchema))
可能的改进 :
如果您使用表现力的数据操作或JSON处理库,则将数据类型转储到dictJSON字符串并从那里获取数据会更容易,例如(Python / toolz):
dict
toolz
from toolz.curried import pipe, assoc_in, update_in, map from operator import attrgetter # Update name to "a_updated" if name is "a" rename_field = update_in( keys=["name"], func=lambda x: "a_updated" if x == "a" else x) updated_schema = pipe( # Get schema of the field as a dict df.schema["array_field"].jsonValue(), # Update fields with rename update_in( keys=["type", "elementType", "fields"], func=lambda x: pipe(x, map(rename_field), list)), # Load schema from dict StructField.fromJson, # Get data type attrgetter("dataType")) df.select(col("array_field").cast(updated_schema)).printSchema()