我有一个 Spark DataFrame(使用 PySpark 1.5.1)并且想添加一个新列。
我尝试了以下方法但没有成功:
type(randomed_hours) # => list # Create in Python and transform to RDD new_col = pd.DataFrame(randomed_hours, columns=['new_col']) spark_new_col = sqlContext.createDataFrame(new_col) my_df_spark.withColumn("hours", spark_new_col["new_col"])
使用这个也有错误:
my_df_spark.withColumn("hours", sc.parallelize(randomed_hours))
那么如何使用 PySpark 向现有 DataFrame 添加新列(基于 Python 向量)?
您不能将任意列添加到DataFrameSpark 中的 a。只能使用字面量创建新列
DataFrame
from pyspark.sql.functions import lit df = sqlContext.createDataFrame( [(1, "a", 23.0), (3, "B", -23.0)], ("x1", "x2", "x3")) df_with_x4 = df.withColumn("x4", lit(0)) df_with_x4.show() ## +---+---+-----+---+ ## | x1| x2| x3| x4| ## +---+---+-----+---+ ## | 1| a| 23.0| 0| ## | 3| B|-23.0| 0| ## +---+---+-----+---+
转换现有列:
from pyspark.sql.functions import exp df_with_x5 = df_with_x4.withColumn("x5", exp("x3")) df_with_x5.show() ## +---+---+-----+---+--------------------+ ## | x1| x2| x3| x4| x5| ## +---+---+-----+---+--------------------+ ## | 1| a| 23.0| 0| 9.744803446248903E9| ## | 3| B|-23.0| 0|1.026187963170189...| ## +---+---+-----+---+--------------------+
包括使用join:
join
from pyspark.sql.functions import exp lookup = sqlContext.createDataFrame([(1, "foo"), (2, "bar")], ("k", "v")) df_with_x6 = (df_with_x5 .join(lookup, col("x1") == col("k"), "leftouter") .drop("k") .withColumnRenamed("v", "x6")) ## +---+---+-----+---+--------------------+----+ ## | x1| x2| x3| x4| x5| x6| ## +---+---+-----+---+--------------------+----+ ## | 1| a| 23.0| 0| 9.744803446248903E9| foo| ## | 3| B|-23.0| 0|1.026187963170189...|null| ## +---+---+-----+---+--------------------+----+
或使用函数 / udf 生成:
from pyspark.sql.functions import rand df_with_x7 = df_with_x6.withColumn("x7", rand()) df_with_x7.show() ## +---+---+-----+---+--------------------+----+-------------------+ ## | x1| x2| x3| x4| x5| x6| x7| ## +---+---+-----+---+--------------------+----+-------------------+ ## | 1| a| 23.0| 0| 9.744803446248903E9| foo|0.41930610446846617| ## | 3| B|-23.0| 0|1.026187963170189...|null|0.37801881545497873| ## +---+---+-----+---+--------------------+----+-------------------+
在性能方面,映射到 Catalyst 表达式的内置函数 ( pyspark.sql.functions) 通常比 Python 用户定义函数更受欢迎。
pyspark.sql.functions
如果您想将任意 RDD 的内容添加为列,您可以
zipWithIndex