我想 在Databricks中将转换org.apache.spark.sql.DataFrame为org.apache.spark.rdd.RDD[(String, String)]。有人可以帮忙吗?
org.apache.spark.sql.DataFrame
org.apache.spark.rdd.RDD[(String, String)]
背景 (也欢迎一个更好的解决方案):我有一个Kafka流,经过一些步骤后,该流变成了2列数据帧。我想将其放入Redis缓存中,第一列作为键,第二列作为值。
更具体地说 ,输入的类型是:lastContacts: org.apache.spark.sql.DataFrame = [serialNumber: string, lastModified: bigint]。我尝试放入Redis,如下所示:
lastContacts: org.apache.spark.sql.DataFrame = [serialNumber: string, lastModified: bigint]
sc.toRedisKV(lastContacts)(redisConfig)
错误消息如下所示:
notebook:20: error: type mismatch; found : org.apache.spark.sql.DataFrame (which expands to) org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] required: org.apache.spark.rdd.RDD[(String, String)] sc.toRedisKV(lastContacts)(redisConfig)
我已经玩过一些想法(例如function .rdd),但是没有一个帮助。
.rdd
如果要将行映射到其他RDD元素,可以使用df.map(row => …)将数据帧转换为RDD。
例如:
val df = Seq(("table1",432), ("table2",567), ("table3",987), ("table1",789)). toDF("tablename", "Code").toDF() df.show() +---------+----+ |tablename|Code| +---------+----+ | table1| 432| | table2| 567| | table3| 987| | table1| 789| +---------+----+ val rddDf = df.map(r => (r(0), r(1))).rdd // Type:RDD[(Any,Any)] OR val rdd = df.map(r => (r(0).toString, r(1).toString)).rdd //Type: RDD[(String,String)]
有关 AnalysisException, 请参阅https://community.hortonworks.com/questions/106500/error-in-spark- streaming-kafka-integration-structu.html :必须使用writeStream.start()执行带有流源的查询
您需要等待使用查询终止查询。 awaitTermination() 防止查询活动时退出进程。