在spark文档中,很清楚如何RDD根据您自己的案例类创建镶木地板文件。(来自文档)
RDD
val people: RDD[Person] = ??? // An RDD of case class objects, from the previous example. // The RDD is implicitly converted to a SchemaRDD by createSchemaRDD, allowing it to be stored using Parquet. people.saveAsParquetFile("people.parquet")
但不清楚如何转换回去,实际上我们想要一个readParquetFile可以做的方法:
readParquetFile
val people: RDD[Person] = sc.readParquestFile[Person](path)
其中定义了case类的那些值的地方就是该方法读取的那些值。
我想出的最好的解决方案是对新类进行最少的复制和粘贴,如下所示(不过,我仍然希望看到另一种解决方案)
首先,您必须定义案例类,以及(部分)可重用的工厂方法
import org.apache.spark.sql.catalyst.expressions case class MyClass(fooBar: Long, fred: Long) // Here you want to auto gen these functions using macros or something object Factories extends java.io.Serializable { def longLong[T](fac: (Long, Long) => T)(row: expressions.Row): T = fac(row(0).asInstanceOf[Long], row(1).asInstanceOf[Long]) }
一些锅炉板将已经可用
import scala.reflect.runtime.universe._ val sqlContext = new org.apache.spark.sql.SQLContext(sc) import sqlContext.createSchemaRDD
魔术
import scala.reflect.ClassTag import org.apache.spark.sql.SchemaRDD def camelToUnderscores(name: String) = "[A-Z]".r.replaceAllIn(name, "_" + _.group(0).toLowerCase()) def getCaseMethods[T: TypeTag]: List[String] = typeOf[T].members.sorted.collect { case m: MethodSymbol if m.isCaseAccessor => m }.toList.map(_.toString) def caseClassToSQLCols[T: TypeTag]: List[String] = getCaseMethods[T].map(_.split(" ")(1)).map(camelToUnderscores) def schemaRDDToRDD[T: TypeTag: ClassTag](schemaRDD: SchemaRDD, fac: expressions.Row => T) = { val tmpName = "tmpTableName" // Maybe should use a random string schemaRDD.registerAsTable(tmpName) sqlContext.sql("SELECT " + caseClassToSQLCols[T].mkString(", ") + " FROM " + tmpName) .map(fac) }
使用范例
val parquetFile = sqlContext.parquetFile(path) val normalRDD: RDD[MyClass] = schemaRDDToRDD[MyClass](parquetFile, Factories.longLong[MyClass](MyClass.apply))
也可以看看:
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-SQL-Convert- SchemaRDD-back-to-RDD-td9071.html
Though I failed to find any example or documentation by following the JIRA link.