我有一个具有以下结构的Spark数据框。bodyText_token具有标记(已处理/单词集)。而且我有一个嵌套的已定义关键字列表
root |-- id: string (nullable = true) |-- body: string (nullable = true) |-- bodyText_token: array (nullable = true) keyword_list=[['union','workers','strike','pay','rally','free','immigration',], ['farmer','plants','fruits','workers'],['outside','field','party','clothes','fashions']]
我需要检查每个关键字列表下有多少标记,并将结果添加为现有数据框的新列。例如:如果tokens =["become", "farmer","rally","workers","student"] 结果为->[1,2,0]
tokens =["become", "farmer","rally","workers","student"]
[1,2,0]
以下功能按预期工作。
def label_maker_topic(tokens,topic_words): twt_list = [] for i in range(0, len(topic_words)): count = 0 #print(topic_words[i]) for tkn in tokens: if tkn in topic_words[i]: count += 1 twt_list.append(count) return twt_list
我在下面使用udfwithColumn访问该函数,但出现错误。我认为这与将外部列表传递给udf有关。有没有一种方法可以将外部列表和dataframe列传递给udf,并向dataframe中添加新列?
withColumn
topicWord = udf(label_maker_topic,StringType()) myDF=myDF.withColumn("topic_word_count",topicWord(myDF.bodyText_token,keyword_list))
最干净的解决方案是使用闭包传递其他参数:
def make_topic_word(topic_words): return udf(lambda c: label_maker_topic(c, topic_words)) df = sc.parallelize([(["union"], )]).toDF(["tokens"]) (df.withColumn("topics", make_topic_word(keyword_list)(col("tokens"))) .show())
这不需要更改keyword_list或使用UDF包装功能。您也可以使用此方法传递任意对象。例如,这可以用于传递sets有效查找的列表。
keyword_list
sets
如果要使用当前的UDF并topic_words直接传递,则必须先将其转换为列文字:
topic_words
from pyspark.sql.functions import array, lit ks_lit = array(*[array(*[lit(k) for k in ks]) for ks in keyword_list]) df.withColumn("ad", topicWord(col("tokens"), ks_lit)).show()
根据您的数据和要求,可以选择其他更有效的解决方案,这些解决方案不需要UDF(爆炸+聚合+折叠)或查找(散列+向量运算)。