小编典典

从SQL查询创建Spark数据框

sql

我确定这是一个简单的SQLContext问题,但是在Spark文档中找不到任何答案

我想从MySQL上的SQL查询创建Spark数据框

例如,我有一个复杂的MySQL查询,例如

SELECT a.X,b.Y,c.Z FROM FOO as a JOIN BAR as b ON ... JOIN ZOT as c ON ... WHERE ...

我想要一个带有X,Y和Z列的数据框

我弄清楚了如何将整个表加载到Spark中,然后可以将它们全部加载,然后在那里进行连接和选择。但是,这是非常低效的。我只想加载由SQL查询生成的表。

这是我当前的代码近似值,不起作用。Mysql-connector有一个选项“ dbtable”,可用于加载整个表。我希望有一些方法可以指定查询

  val df = sqlContext.format("jdbc").
    option("url", "jdbc:mysql://localhost:3306/local_content").
    option("driver", "com.mysql.jdbc.Driver").
    option("useUnicode", "true").
    option("continueBatchOnError","true").
    option("useSSL", "false").
    option("user", "root").
    option("password", "").
    sql(
"""
select dl.DialogLineID, dlwim.Sequence, wi.WordRootID from Dialog as d
join DialogLine as dl on dl.DialogID=d.DialogID
join DialogLineWordInstanceMatch as dlwim o n dlwim.DialogLineID=dl.DialogLineID
join WordInstance as wi on wi.WordInstanceID=dlwim.WordInstanceID
join WordRoot as wr on wr.WordRootID=wi.WordRootID
where d.InSite=1 and dl.Active=1
limit 100
"""
    ).load()

阅读 196

收藏
2021-03-17

共1个答案

小编典典

我在这里通过Spark SQL进行批量数据迁移dbname参数可以是括号中带有别名的任何查询。因此,就我而言,我需要这样做:

val query = """
  (select dl.DialogLineID, dlwim.Sequence, wi.WordRootID from Dialog as d
    join DialogLine as dl on dl.DialogID=d.DialogID
    join DialogLineWordInstanceMatch as dlwim on dlwim.DialogLineID=dl.DialogLineID
    join WordInstance as wi on wi.WordInstanceID=dlwim.WordInstanceID
    join WordRoot as wr on wr.WordRootID=wi.WordRootID
    where d.InSite=1 and dl.Active=1
    limit 100) foo
"""

val df = sqlContext.format("jdbc").
  option("url", "jdbc:mysql://localhost:3306/local_content").
  option("driver", "com.mysql.jdbc.Driver").
  option("useUnicode", "true").
  option("continueBatchOnError","true").
  option("useSSL", "false").
  option("user", "root").
  option("password", "").
  option("dbtable",query).
  load()

不出所料,将每个表作为自己的数据框加载并将其加入Spark效率很低。

2021-03-17