小编典典

DataFrame到RDD [(String,String)]转换

redis

我想
在Databricks中将转换org.apache.spark.sql.DataFrameorg.apache.spark.rdd.RDD[(String, String)]
有人可以帮忙吗?

背景
(也欢迎一个更好的解决方案):我有一个Kafka流,经过一些步骤后,该流变成了2列数据帧。我想将其放入Redis缓存中,第一列作为键,第二列作为值。

更具体地说 ,输入的类型是:lastContacts: org.apache.spark.sql.DataFrame = [serialNumber: string, lastModified: bigint]。我尝试放入Redis,如下所示:

sc.toRedisKV(lastContacts)(redisConfig)

错误消息如下所示:

notebook:20: error: type mismatch;
 found   : org.apache.spark.sql.DataFrame
    (which expands to)  org.apache.spark.sql.Dataset[org.apache.spark.sql.Row]
 required: org.apache.spark.rdd.RDD[(String, String)]
sc.toRedisKV(lastContacts)(redisConfig)

我已经玩过一些想法(例如function .rdd),但是没有一个帮助。


阅读 514

收藏
2020-06-20

共1个答案

小编典典

如果要将行映射到其他RDD元素,可以使用df.map(row => …)将数据帧转换为RDD。

例如:

val df = Seq(("table1",432),
      ("table2",567),
      ("table3",987),
      ("table1",789)).
      toDF("tablename", "Code").toDF()

    df.show()

    +---------+----+
|tablename|Code|
+---------+----+
|   table1| 432|
|   table2| 567|
|   table3| 987|
|   table1| 789|
+---------+----+

    val rddDf = df.map(r => (r(0), r(1))).rdd // Type:RDD[(Any,Any)]

    OR

    val rdd = df.map(r => (r(0).toString, r(1).toString)).rdd  //Type: RDD[(String,String)]

有关 AnalysisException,
请参阅https://community.hortonworks.com/questions/106500/error-in-spark-
streaming-kafka-integration-structu.html
:必须使用writeStream.start()执行带有流源的查询

您需要等待使用查询终止查询。 awaitTermination() 防止查询活动时退出进程。

2020-06-20