我有以下格式的数据(RDD或Spark DataFrame):
from pyspark.sql import SQLContext sqlContext = SQLContext(sc) rdd = sc.parallelize([('X01',41,'US',3), ('X01',41,'UK',1), ('X01',41,'CA',2), ('X02',72,'US',4), ('X02',72,'UK',6), ('X02',72,'CA',7), ('X02',72,'XX',8)]) # convert to a Spark DataFrame schema = StructType([StructField('ID', StringType(), True), StructField('Age', IntegerType(), True), StructField('Country', StringType(), True), StructField('Score', IntegerType(), True)]) df = sqlContext.createDataFrame(rdd, schema)
我想做的是“重塑”数据,将“国家/地区”中的某些行(特别是美国,英国和加拿大)转换为列:
ID Age US UK CA 'X01' 41 3 1 2 'X02' 72 4 6 7
本质上,我需要一些与Pythonpivot工作流程类似的东西:
pivot
categories = ['US', 'UK', 'CA'] new_df = df[df['Country'].isin(categories)].pivot(index = 'ID', columns = 'Country', values = 'Score')
我的数据集很大,因此我无法真正collect()将数据吸收到内存中以在Python本身中进行重塑。有没有办法.pivot()在映射RDD或Spark DataFrame时将Python转换为可调用函数?任何帮助,将不胜感激!
collect()
.pivot()
从Spark 1.6开始,您可以使用pivotfunctionGroupedData并提供聚合表达式。
GroupedData
pivoted = (df .groupBy("ID", "Age") .pivot( "Country", ['US', 'UK', 'CA']) # Optional list of levels .sum("Score")) # alternatively you can use .agg(expr)) pivoted.show() ## +---+---+---+---+---+ ## | ID|Age| US| UK| CA| ## +---+---+---+---+---+ ## |X01| 41| 3| 1| 2| ## |X02| 72| 4| 6| 7| ## +---+---+---+---+---+
电平可以省略,但如果提供,则可以提高性能并用作内部滤波器。
该方法仍然相对较慢,但肯定胜过了在JVM和Python之间手动传递数据。