我在此DataBricks帖子中看到,SparkSql支持窗口函数,特别是我正在尝试使用lag()窗口函数。
我有几行信用卡交易,并且已经对它们进行了排序,现在我要遍历各行,并为每一行显示交易金额,以及当前行金额与上一行金额的差额。
在DataBricks帖子之后,我提出了这个查询,但是它给我抛出了一个异常,我无法理解为什么。
这是在PySpark中。tx是我在注册为临时表时创建的数据框。
test =sqlContext.sql("SELECT tx.cc_num,tx.trans_date,tx.trans_time,tx.amt, (lag(tx.amt) OVER (PARTITION BY tx.cc_num ORDER BY tx.trans_date,tx.trans_time ROW BETWEEN PRECEDING AND CURRENT ROW)) as prev_amt from tx")
和异常(被截断)。
py4j.protocol.Py4JJavaError: An error occurred while calling o76.sql. : java.lang.RuntimeException: [1.67] failure: ``)'' expected but identifier OVER found
我真的很感激任何见解,该功能是相当新的功能,就现有示例或其他相关帖子而言,没有太多事情要做。
编辑
我也尝试过在没有SQL语句的情况下执行此操作,如下所示,但继续出现错误。我已经将其与Hive和SQLContext一起使用,并收到相同的错误。
windowSpec = \ Window \ .partitionBy(h_tx_df_ordered['cc_num']) \ .orderBy(h_tx_df_ordered['cc_num'],h_tx_df_ordered['trans_date'],h_tx_df_ordered['trans_time']) windowSpec.rowsBetween(-1, 0) lag_amt = \ (lag(h_tx_df_ordered['amt']).over(windowSpec) - h_tx_df_ordered['amt']) tx_df_ordered.select( h_tx_df_ordered['cc_num'], h_tx_df_ordered['trans_date'], h_tx_df_ordered['trans_time'], h_tx_df_ordered['amt'], lag_amt.alias("prev_amt")).show()
Traceback (most recent call last): File "rdd_raw_data.py", line 116, in <module> lag_amt.alias("prev_amt")).show() File "/opt/spark/python/pyspark/sql/dataframe.py", line 721, in select jdf = self._jdf.select(self._jcols(*cols)) File "/home/brandon/anaconda/lib/python2.7/site-packages/py4j/java_gateway.py", line 813, in __call__ answer, self.gateway_client, self.target_id, self.name) File "/home/brandon/anaconda/lib/python2.7/site-packages/py4j/protocol.py", line 308, in get_return_value format(target_id, ".", name), value) py4j.protocol.Py4JJavaError: An error occurred while calling o152.select. : org.apache.spark.sql.AnalysisException: Could not resolve window function 'lag'. Note that, using window functions currently requires a HiveContext; at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$class.failAnalysis(CheckAnalysis.scala:38)
ROWS
ROW
ROWS BETWEEN 1 PRECEDING AND CURRENT ROW
或UNBOUNDED关键字
UNBOUNDED
ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW
LAG
SELECT tx.cc_num,tx.trans_date,tx.trans_time,tx.amt, LAG(tx.amt) OVER ( PARTITION BY tx.cc_num ORDER BY tx.trans_date,tx.trans_time
) as prev_amt from tx
编辑 :
关于SQL DSL使用:
注意,使用窗口函数当前需要一个HiveContex
请务必sqlContext使用HiveContextnot进行初始化SQLContext
sqlContext
HiveContext
SQLContext
windowSpec.rowsBetween(-1, 0)
lag