我有一个很大的 pyspark.sql.dataframe.DataFrame 名为df。我需要某种枚举记录的方式- 因此,能够访问具有特定索引的记录。(或选择具有索引范围的记录组)
在大熊猫中,我可以
indexes=[2,3,6,7] df[indexes]
在这里我想要类似的东西 (并且不将数据框转换为熊猫)
我最接近的是:
indexes=np.arange(df.count()) df_indexed=df.withColumn('index', indexes)
* 使用where()函数搜索所需的值。
问题:
indexes=[2,3,6,7] df1.where("index in indexes").collect()
它不起作用,因为:
withColumn
Column
np.array
"index in indexes"
where
indexes
PySpark > = 1.4.0
您可以使用相应的窗口函数添加行号,并使用Column.isin方法或格式正确的查询字符串进行查询:
Column.isin
from pyspark.sql.functions import col, rowNumber from pyspark.sql.window import Window w = Window.orderBy() indexed = df.withColumn("index", rowNumber().over(w)) # Using DSL indexed.where(col("index").isin(set(indexes))) # Using SQL expression indexed.where("index in ({0})".format(",".join(str(x) for x in indexes)))
看起来调用无PARTITION BY子句的窗口函数会将所有数据移动到单个分区,因此上述毕竟不是最佳解决方案。
PARTITION BY
有没有更快,更简单的处理方法?
并不是的。Spark DataFrames不支持随机行访问。
PairedRDD``lookup如果使用进行分区,则可以使用相对较快的方法进行访问HashPartitioner。还有一个index- rdd项目,它支持有效的查找。
PairedRDD``lookup
HashPartitioner
编辑 :
与PySpark版本无关,您可以尝试执行以下操作:
from pyspark.sql import Row from pyspark.sql.types import StructType, StructField, LongType row = Row("char") row_with_index = Row("char", "index") df = sc.parallelize(row(chr(x)) for x in range(97, 112)).toDF() df.show(5) ## +----+ ## |char| ## +----+ ## | a| ## | b| ## | c| ## | d| ## | e| ## +----+ ## only showing top 5 rows # This part is not tested but should work and save some work later schema = StructType( df.schema.fields[:] + [StructField("index", LongType(), False)]) indexed = (df.rdd # Extract rdd .zipWithIndex() # Add index .map(lambda ri: row_with_index(*list(ri[0]) + [ri[1]])) # Map to rows .toDF(schema)) # It will work without schema but will be more expensive # inSet in Spark < 1.3 indexed.where(col("index").isin(indexes))