我正在使用monotonically_increasing_id()使用以下语法将行号分配给pyspark数据帧:
df1 = df1.withColumn("idx", monotonically_increasing_id())
现在df1有26,572,528条记录。因此,我期望idx值为0-26,572,527。
但是当我选择max(idx)时,它的值非常大:335,008,054,165。
这个功能是怎么回事?使用此功能与具有相似记录数量的另一个数据集合并是否可靠?
我有大约300个数据框,我想合并为一个数据框。因此,一个数据框包含ID,其他数据框包含与之对应的不同记录
从文档中
生成单调递增的64位整数的列。 保证生成的ID是单调递增且唯一的,但不是连续的。当前实现将分区ID放在高31位中,将记录号放在每个分区的低33位中。假设数据帧的分区少于10亿,每个分区的记录少于80亿。
生成单调递增的64位整数的列。
保证生成的ID是单调递增且唯一的,但不是连续的。当前实现将分区ID放在高31位中,将记录号放在每个分区的低33位中。假设数据帧的分区少于10亿,每个分区的记录少于80亿。
因此,它不像RDB中的自动增量ID,并且对于合并 也不 可靠。
如果您需要像RDB中那样的自动递增行为,并且您的数据是可排序的,则可以使用 row_number
row_number
df.createOrReplaceTempView('df') spark.sql('select row_number() over (order by "some_column") as num, * from df') +---+-----------+ |num|some_column| +---+-----------+ | 1| ....... | | 2| ....... | | 3| ..........| +---+-----------+
如果您的数据无法排序,并且您不介意使用rdds创建索引然后又退回到数据框,则可以使用 rdd.zipWithIndex()
rdd.zipWithIndex()
可以在这里找到一个例子 简而言之:
# since you have a dataframe, use the rdd interface to create indexes with zipWithIndex() df = df.rdd.zipWithIndex() # return back to dataframe df = df.toDF() df.show() # your data | indexes +---------------------+---+ | _1 | _2| +-----------=---------+---+ |[data col1,data col2]| 0| |[data col1,data col2]| 1| |[data col1,data col2]| 2| +---------------------+---+
之后,您可能需要更多的转换才能使数据框达到所需的状态。注意:这不是一个非常有效的解决方案。
希望这可以帮助。祝好运!
编辑: 考虑一下,您可以结合monotonically_increasing_id使用row_number:
monotonically_increasing_id
# create a monotonically increasing id df = df.withColumn("idx", monotonically_increasing_id()) # then since the id is increasing but not consecutive, it means you can sort by it, so you can use the `row_number` df.createOrReplaceTempView('df') new_df = spark.sql('select row_number() over (order by "idx") as num, * from df')
虽然不确定性能。
有关执行此操作的方式和风险的完整示例,请参见此处