我有两个Spark数据框:
数据框A:
|col_1 | col_2 | ... | col_n | |val_1 | val_2 | ... | val_n |
和数据框B:
|col_1 | col_2 | ... | col_m | |val_1 | val_2 | ... | val_m |
数据框B可以包含来自数据框A的重复行,更新行和新行。我想在spark中编写操作,在其中可以创建一个新数据框,其中包含数据框A的行以及数据框B的更新行和新行。
我首先创建一个仅包含不可更新列的哈希列。这是唯一的ID。所以我们可以说col1,并col2可以改变值(可更新),但是col3,..,coln是唯一的。我创建了一个哈希函数为hash(col3,..,coln):
col1
col2
col3,..,coln
hash(col3,..,coln)
A=A.withColumn("hash", hash(*[col(colname) for colname in unique_cols_A])) B=B.withColumn("hash", hash(*[col(colname) for colname in unique_cols_B]))
现在,我想编写一些火花代码,基本上从B中选择哈希值不在A中的 行(因此,新行和更新后的行) ,并将它们与A中的行一起加入新的数据帧中。 pyspark?
编辑:数据框B可以有来自数据框A的额外列,因此无法进行联合。
样例
+-----+-----+ |col_1|col_2| +-----+-----+ | a| www| | b| eee| | c| rrr| +-----+-----+
数据框B:
+-----+-----+-----+ |col_1|col_2|col_3| +-----+-----+-----+ | a| wew| 1| | d| yyy| 2| | c| rer| 3| +-----+-----+-----+
结果:数据框C:
+-----+-----+-----+ |col_1|col_2|col_3| +-----+-----+-----+ | a| wew| 1| | b| eee| null| | c| rer| 3| | d| yyy| 2| +-----+-----+-----+
这与用新值更新数据框列密切相关,除了您还想添加数据框B中的行。一种方法是首先执行链接的问题中概述的操作,然后将结果与数据框B合并并删除重复。
例如:
dfA.alias('a').join(dfB.alias('b'), on=['col_1'], how='left')\ .select( 'col_1', f.when( ~f.isnull(f.col('b.col_2')), f.col('b.col_2') ).otherwise(f.col('a.col_2')).alias('col_2'), 'b.col_3' )\ .union(dfB)\ .dropDuplicates()\ .sort('col_1')\ .show() #+-----+-----+-----+ #|col_1|col_2|col_3| #+-----+-----+-----+ #| a| wew| 1| #| b| eee| null| #| c| rer| 3| #| d| yyy| 2| #+-----+-----+-----+
如果您有很多要替换的列并且不想对它们全部进行硬编码,则可以更一般地使用列表推导:
cols_to_update = ['col_2'] dfA.alias('a').join(dfB.alias('b'), on=['col_1'], how='left')\ .select( *[ ['col_1'] + [ f.when( ~f.isnull(f.col('b.{}'.format(c))), f.col('b.{}'.format(c)) ).otherwise(f.col('a.{}'.format(c))).alias(c) for c in cols_to_update ] + ['b.col_3'] ] )\ .union(dfB)\ .dropDuplicates()\ .sort('col_1')\ .show()