我在pyspark中使用sqlContext.sql函数读取了一个数据框。它包含4个数字列,每个客户都有信息(这是键ID)。我需要计算每个客户端的最大值并将此值加入数据框:
+--------+-------+-------+-------+-------+ |ClientId|m_ant21|m_ant22|m_ant23|m_ant24| +--------+-------+-------+-------+-------+ | 0| null| null| null| null| | 1| null| null| null| null| | 2| null| null| null| null| | 3| null| null| null| null| | 4| null| null| null| null| | 5| null| null| null| null| | 6| 23| 13| 17| 8| | 7| null| null| null| null| | 8| null| null| null| null| | 9| null| null| null| null| | 10| 34| 2| 4| 0| | 11| 0| 0| 0| 0| | 12| 0| 0| 0| 0| | 13| 0| 0| 30| 0| | 14| null| null| null| null| | 15| null| null| null| null| | 16| 37| 29| 29| 29| | 17| 0| 0| 16| 0| | 18| 0| 0| 0| 0| | 19| null| null| null| null| +--------+-------+-------+-------+-------+
在这种情况下,客户端“ six”的最大值为23,而客户端“ ten”的最大值为30。“ null”在新列中自然为null。
请帮助我显示如何执行此操作。
我认为将值组合到列表中而不是找到最大值将是最简单的方法。
from pyspark.sql.types import * schema = StructType([ StructField("ClientId", IntegerType(), True), StructField("m_ant21", IntegerType(), True), StructField("m_ant22", IntegerType(), True), StructField("m_ant23", IntegerType(), True), StructField("m_ant24", IntegerType(), True) ]) df = spark\ .createDataFrame( data=[(0, None, None, None, None), (1, 23, 13, 17, 99), (2, 0, 0, 0, 1), (3, 0, None, 1, 0)], schema=schema) import pyspark.sql.functions as F def agg_to_list(m21,m22,m23,m24): return [m21,m22,m23,m24] u_agg_to_list = F.udf(agg_to_list, ArrayType(IntegerType())) df2 = df.withColumn('all_values', u_agg_to_list('m_ant21', 'm_ant22', 'm_ant23', 'm_ant24'))\ .withColumn('max', F.sort_array("all_values", False)[0])\ .select('ClientId', 'max') df2.show()
输出:
+--------+----+ |ClientId|max | +--------+----+ |0 |null| |1 |99 | |2 |1 | |3 |1 | +--------+----+