该问题针对的是熟悉py4j的人员-可以帮助解决酸洗错误。我正在尝试向pyspark PythonMLLibAPI添加一个方法,该方法接受namedtuple的RDD,进行一些工作,并以RDD的形式返回结果。
该方法是在PYthonMLLibAPI.trainALSModel()方法之后建模的,其类似的 现有 相关部分为:
def trainALSModel( ratingsJRDD: JavaRDD[Rating], .. )
用于建模新代码的 现有 python Rating类为:
class Rating(namedtuple("Rating", ["user", "product", "rating"])): def __reduce__(self): return Rating, (int(self.user), int(self.product), float(self.rating))
这是尝试,所以这里是相关的类:
新的 python类pyspark.mllib.clustering.MatrixEntry:
from collections import namedtuple class MatrixEntry(namedtuple("MatrixEntry", ["x","y","weight"])): def __reduce__(self): return MatrixEntry, (long(self.x), long(self.y), float(self.weight))
___PythonMLLibAPI中的_ 新 方法foobarRDD:
def foobarRdd( data: JavaRDD[MatrixEntry]): RDD[FooBarResult] = { val rdd = data.rdd.map { d => FooBarResult(d.i, d.j, d.value, d.i * 100 + d.j * 10 + d.value)} rdd }
现在让我们尝试一下:
from pyspark.mllib.clustering import MatrixEntry def convert_to_MatrixEntry(tuple): return MatrixEntry(*tuple) from pyspark.mllib.clustering import * pic = PowerIterationClusteringModel(2) tups = [(1,2,3),(4,5,6),(12,13,14),(15,7,8),(16,17,16.5)] trdd = sc.parallelize(map(convert_to_MatrixEntry,tups)) # print out the RDD on python side just for validation print "%s" %(repr(trdd.collect())) from pyspark.mllib.common import callMLlibFunc pic = callMLlibFunc("foobar", trdd)
结果的相关部分:
[(1,2)=3.0, (4,5)=6.0, (12,13)=14.0, (15,7)=8.0, (16,17)=16.5]
这表明输入rdd为“整个”。但是,酸洗不开心:
5/04/27 21:15:44 ERROR Executor: Exception in task 6.0 in stage 1.0 (TID 14) net.razorvine.pickle.PickleException: expected zero arguments for construction of ClassDict (for pyspark.mllib.clustering.MatrixEntry) at net.razorvine.pickle.objects.ClassDictConstructor.construct(ClassDictConstructor.java:23) at net.razorvine.pickle.Unpickler.load_reduce(Unpickler.java:617) at net.razorvine.pickle.Unpickler.dispatch(Unpickler.java:170) at net.razorvine.pickle.Unpickler.load(Unpickler.java:84) at net.razorvine.pickle.Unpickler.loads(Unpickler.java:97) at org.apache.spark.mllib.api.python.SerDe$$anonfun$pythonToJava$1$$anonfun$apply$1.apply(PythonMLLibAPI.scala:1167) at org.apache.spark.mllib.api.python.SerDe$$anonfun$pythonToJava$1$$anonfun$apply$1.apply(PythonMLLibAPI.scala:1166) at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) at scala.collection.Iterator$class.foreach(Iterator.scala:727) at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48) at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103) at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47) at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273) at scala.collection.AbstractIterator.to(Iterator.scala:1157) at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265) at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157) at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252) at scala.collection.AbstractIterator.toArray(Iterator.scala:1157) at org.apache.spark.rdd.RDD$$anonfun$17.apply(RDD.scala:819) at org.apache.spark.rdd.RDD$$anonfun$17.apply(RDD.scala:819) at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1523) at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1523) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61) at org.apache.spark.scheduler.Task.run(Task.scala:64) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:212) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:724)
下面是python调用堆栈跟踪的视觉效果:
我遇到了与使用MLlib相同的错误,结果发现我在其中一个函数中返回了错误的数据类型。现在,在对返回值进行简单的强制转换之后,它就可以工作。这可能不是您要寻找的答案,但这至少是要遵循的方向的提示。