我有一个具有一行和几列的数据框。一些列是单个值,另一些是列表。所有列表列的长度均相同。我想将每个列表列拆分为单独的行,同时将任何非列表列保持原样。
样本DF:
from pyspark import Row from pyspark.sql import SQLContext from pyspark.sql.functions import explode sqlc = SQLContext(sc) df = sqlc.createDataFrame([Row(a=1, b=[1,2,3],c=[7,8,9], d='foo')]) # +---+---------+---------+---+ # | a| b| c| d| # +---+---------+---------+---+ # | 1|[1, 2, 3]|[7, 8, 9]|foo| # +---+---------+---------+---+
我想要的是:
+---+---+----+------+ | a| b| c | d | +---+---+----+------+ | 1| 1| 7 | foo | | 1| 2| 8 | foo | | 1| 3| 9 | foo | +---+---+----+------+
如果我只有一个列表列,只需执行以下操作即可轻松实现explode:
explode
df_exploded = df.withColumn('b', explode('b')) # >>> df_exploded.show() # +---+---+---------+---+ # | a| b| c| d| # +---+---+---------+---+ # | 1| 1|[7, 8, 9]|foo| # | 1| 2|[7, 8, 9]|foo| # | 1| 3|[7, 8, 9]|foo| # +---+---+---------+---+
但是,如果我也尝试使用explode该c列,则会得到一个长度为我想要的平方的数据框:
c
df_exploded_again = df_exploded.withColumn('c', explode('c')) # >>> df_exploded_again.show() # +---+---+---+---+ # | a| b| c| d| # +---+---+---+---+ # | 1| 1| 7|foo| # | 1| 1| 8|foo| # | 1| 1| 9|foo| # | 1| 2| 7|foo| # | 1| 2| 8|foo| # | 1| 2| 9|foo| # | 1| 3| 7|foo| # | 1| 3| 8|foo| # | 1| 3| 9|foo| # +---+---+---+---+
我想要的是-对于每一列,采用该列中数组的第n个元素,并将其添加到新行中。我试过在数据框中的所有列上映射爆炸,但这似乎也不起作用:
df_split = df.rdd.map(lambda col: df.withColumn(col, explode(col))).toDF()
火花 > = 2.4
您可以替换zip_ udf为arrays_zip功能
zip_
udf
arrays_zip
from pyspark.sql.functions import arrays_zip, col, explode (df .withColumn("tmp", arrays_zip("b", "c")) .withColumn("tmp", explode("tmp")) .select("a", col("tmp.b"), col("tmp.c"), "d"))
火花 <2.4
与DataFrames和UDF:
DataFrames
from pyspark.sql.types import ArrayType, StructType, StructField, IntegerType from pyspark.sql.functions import col, udf, explode zip_ = udf( lambda x, y: list(zip(x, y)), ArrayType(StructType([ # Adjust types to reflect data types StructField("first", IntegerType()), StructField("second", IntegerType()) ])) ) (df .withColumn("tmp", zip_("b", "c")) # UDF output cannot be directly passed to explode .withColumn("tmp", explode("tmp")) .select("a", col("tmp.first").alias("b"), col("tmp.second").alias("c"), "d"))
与RDDs:
RDDs
(df .rdd .flatMap(lambda row: [(row.a, b, c, row.d) for b, c in zip(row.b, row.c)]) .toDF(["a", "b", "c", "d"]))
由于Python的通讯开销,这两种解决方案的效率都不高。如果数据大小固定,则可以执行以下操作:
from functools import reduce from pyspark.sql import DataFrame # Length of array n = 3 # For legacy Python you'll need a separate function # in place of method accessor reduce( DataFrame.unionAll, (df.select("a", col("b").getItem(i), col("c").getItem(i), "d") for i in range(n)) ).toDF("a", "b", "c", "d")
甚至:
from pyspark.sql.functions import array, struct # SQL level zip of arrays of known size # followed by explode tmp = explode(array(*[ struct(col("b").getItem(i).alias("b"), col("c").getItem(i).alias("c")) for i in range(n) ])) (df .withColumn("tmp", tmp) .select("a", col("tmp").getItem("b"), col("tmp").getItem("c"), "d"))
与UDF或RDD相比,这应该明显更快。通用化以支持任意数量的列:
# This uses keyword only arguments # If you use legacy Python you'll have to change signature # Body of the function can stay the same def zip_and_explode(*colnames, n): return explode(array(*[ struct(*[col(c).getItem(i).alias(c) for c in colnames]) for i in range(n) ])) df.withColumn("tmp", zip_and_explode("b", "c", n=3))