小编典典

如何在Spark 2 Scala中将Row转换为json

json

有一种简单的方法可以将给定的Row对象转换为json吗?

但是我只想将一个Row转换为json。这是我要尝试执行的伪代码。

更准确地说,我正在读取json作为Dataframe中的输入。我正在产生主要基于列的新输出,但对于所有不适合列的信息都具有一个json字段。

我的问题是编写此函数的最简单方法是什么:convertRowToJson()

def convertRowToJson(row: Row): String = ???

def transformVenueTry(row: Row): Try[Venue] = {
  Try({
    val name = row.getString(row.fieldIndex("name"))
    val metadataRow = row.getStruct(row.fieldIndex("meta"))
    val score: Double = calcScore(row)
    val combinedRow: Row = metadataRow ++ ("score" -> score)
    val jsonString: String = convertRowToJson(combinedRow)
    Venue(name = name, json = jsonString)
  })
}

Psidom解决方案:

def convertRowToJSON(row: Row): String = {
    val m = row.getValuesMap(row.schema.fieldNames)
    JSONObject(m).toString()
}

仅当行只有一个级别而不具有嵌套行时才起作用。这是模式:

StructType(
    StructField(indicator,StringType,true),   
    StructField(range,
    StructType(
        StructField(currency_code,StringType,true),
        StructField(maxrate,LongType,true), 
        StructField(minrate,LongType,true)),true))

还尝试了Artem建议,但未编译:

def row2DataFrame(row: Row, sqlContext: SQLContext): DataFrame = {
  val sparkContext = sqlContext.sparkContext
  import sparkContext._
  import sqlContext.implicits._
  import sqlContext._
  val rowRDD: RDD[Row] = sqlContext.sparkContext.makeRDD(row :: Nil)
  val dataFrame = rowRDD.toDF() //XXX does not compile
  dataFrame
}

阅读 905

收藏
2020-07-27

共1个答案

小编典典

我需要阅读json输入并产生json输出。大多数字段是单独处理的,但是只需保留几个json子对象。

当Spark读取数据帧时,它将记录转换为行。行是一个类似于json的结构。可以将其转换并写到json中。

但是我需要将一些子json结构分解为字符串以用作新字段。

可以这样完成:

dataFrameWithJsonField = dataFrame.withColumn("address_json", to_json($"location.address"))

location.address是传入的基于json的数据框的子json对象的路径。address_json是该对象的列名,该列名转换为json的字符串版本。

to_json 在Spark 2.1中实现。

如果使用json4s生成输出json,则应将address_json解析为AST表示,否则输出json将转义address_json部分。

2020-07-27