df1具有字段id和json;df2具有字段id和json
df1.count()=> 1200;df2.count()=> 20
df1具有所有行。df2的增量更新只有20行。
我的目标是使用中的值更新df1 df2。的所有IDdf2在df1中。但是df2json为这些相同的ID更新了值(在该字段中)。
结果df应该具有的所有值df1和的更新的值df2。
做这个的最好方式是什么?-具有最少数量的联接和过滤器。
谢谢!
您可以使用一个左连接来实现。
创建示例数据框
使用@Shankar Koirala在其答案中提供的样本数据。 data1 = [ (1, "a"), (2, "b"), (3, "c") ] df1 = sqlCtx.createDataFrame(data1, ["id", "value"]) data2 = [ (1, "x"), (2, "y") ] df2 = sqlCtx.createDataFrame(data2, ["id", "value"])
左加入
使用id列上的左联接将两个DataFrame联接起来。这会将所有行保留在左侧的DataFrame中。对于右侧DataFrame中没有匹配项的行id,其值为null。
import pyspark.sql.functions as f df1.alias('l').join(df2.alias('r'), on='id', how='left')\ .select( 'id', f.col('l.value').alias('left_value'), f.col('r.value').alias('right_value') )\ .show() #+---+----------+-----------+ #| id|left_value|right_value| #+---+----------+-----------+ #| 1| a| x| #| 3| c| null| #| 2| b| y| #+---+----------+-----------+
选择所需的数据
我们将利用不匹配的idsnull选择最后一列的事实。使用pyspark.sql.functions.when()使用权价值,如果它不为空,否则保持左值。
df1.alias('l').join(df2.alias('r'), on='id', how='left')\ .select( 'id', f.when( ~f.isnull(f.col('r.value')), f.col('r.value') ).otherwise(f.col('l.value')).alias('value') )\ .show() #+---+-----+ #| id|value| #+---+-----+ #| 1| x| #| 3| c| #| 2| y| #+---+-----+
您可以id按顺序排序此输出。
使用pyspark-sql
您可以使用pyspark-sql查询执行相同的操作:
df1.registerTempTable('df1') df2.registerTempTable('df2') query = """SELECT l.id, CASE WHEN r.value IS NOT NULL THEN r.value ELSE l.value END AS value FROM df1 l LEFT JOIN df2 r ON l.id = r.id""" sqlCtx.sql(query.replace("\n", "")).show() #+---+-----+ #| id|value| #+---+-----+ #| 1| x| #| 3| c| #| 2| y| #+---+-----+