我想用类似SQL的IN子句过滤Pyspark DataFrame ,如
IN
sc = SparkContext() sqlc = SQLContext(sc) df = sqlc.sql('SELECT * from my_df WHERE field1 IN a')
a元组在哪儿(1, 2, 3)?我收到此错误:
a
(1, 2, 3)
java.lang.RuntimeException:[1.67]错误:预期“(”,但找到了标识符
这基本上是说它期待的是 ‘(1,2,3)’ 而不是a。问题是由于从另一个作业中提取了值,因此我无法手动将其写入。
在这种情况下,我该如何过滤?
您传递给SQLContext它的字符串是在SQL环境范围内评估的。它没有捕获闭包。如果要传递变量,则必须使用字符串格式显式地进行操作:
SQLContext
df = sc.parallelize([(1, "foo"), (2, "x"), (3, "bar")]).toDF(("k", "v")) df.registerTempTable("df") sqlContext.sql("SELECT * FROM df WHERE v IN {0}".format(("foo", "bar"))).count() ## 2
显然,出于安全考虑,这不是您在“实际” SQL环境中要使用的东西,但在这里并不重要。
在实践中,DataFrame当您要创建动态查询时,DSL是很多选择:
DataFrame
from pyspark.sql.functions import col df.where(col("v").isin({"foo", "bar"})).count() ## 2
它很容易为您构建,组成和处理HiveQL / Spark SQL的所有细节。