我有一组文件。文件的路径保存在文件中,例如all_files.txt。使用apache spark,我需要对所有文件进行操作并合并结果。
all_files.txt
我要执行的步骤是:
这是我为此写的代码:
def return_contents_from_file (file_name): return spark.read.text(file_name).rdd.map(lambda r: r[0]) def run_spark(): file_name = 'path_to_file' spark = SparkSession \ .builder \ .appName("PythonWordCount") \ .getOrCreate() counts = spark.read.text(file_name).rdd.map(lambda r: r[0]) \ # this line is supposed to return the paths to each file .flatMap(return_contents_from_file) \ # here i am expecting to club all the contents of all files .flatMap(do_operation_on_each_line_of_all_files) # here i am expecting do an operation on each line of all files
这引发了错误:
第323行,位于get_return_value py4j.protocol.Py4JError中:调用o25时发生错误。 getnewargs 。跟踪:py4j.Py4JException:方法 py4j.reflection.ReflectionEngine.getMethod (ReflectionEngine.java:318)的py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:326)处的方法 getnewargs ([])不存在。在py4j.GatewayConnection.run(GatewayConnection.java:214)在py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)在py4j.commands.CallCommand.execute(CallCommand.java:79)处调用(Gateway.java:272) )在java.lang.Thread.run(Thread.java:745)
有人可以告诉我我做错了什么,我应该怎么做。提前致谢。
不允许使用spark内部执行程序flatMap或在执行程序上发生的任何转换(spark会话仅在驱动程序上可用)。也不能创建RDD的RDD(请参阅:是否可以在Apache Spark中创建嵌套的RDD?)
spark
flatMap
但是你可以实现以另一种方式这一转变-读的所有内容all_files.txt到数据帧,使用 当地 map使他们dataframes和 当地 reduce工会所有,见例如:
map
reduce
>>> filenames = spark.read.text('all_files.txt').collect() >>> dataframes = map(lambda r: spark.read.text(r[0]), filenames) >>> all_lines_df = reduce(lambda df1, df2: df1.unionAll(df2), dataframes)