我确定这是一个简单的SQLContext问题,但是在Spark文档中找不到任何答案
我想从MySQL上的SQL查询创建Spark数据框
例如,我有一个复杂的MySQL查询,例如
SELECT a.X,b.Y,c.Z FROM FOO as a JOIN BAR as b ON ... JOIN ZOT as c ON ... WHERE ...
我想要一个带有X,Y和Z列的数据框
我弄清楚了如何将整个表加载到Spark中,然后可以将它们全部加载,然后在那里进行连接和选择。但是,这是非常低效的。我只想加载由SQL查询生成的表。
这是我当前的代码近似值,不起作用。Mysql-connector有一个选项“ dbtable”,可用于加载整个表。我希望有一些方法可以指定查询
val df = sqlContext.format("jdbc"). option("url", "jdbc:mysql://localhost:3306/local_content"). option("driver", "com.mysql.jdbc.Driver"). option("useUnicode", "true"). option("continueBatchOnError","true"). option("useSSL", "false"). option("user", "root"). option("password", ""). sql( """ select dl.DialogLineID, dlwim.Sequence, wi.WordRootID from Dialog as d join DialogLine as dl on dl.DialogID=d.DialogID join DialogLineWordInstanceMatch as dlwim o n dlwim.DialogLineID=dl.DialogLineID join WordInstance as wi on wi.WordInstanceID=dlwim.WordInstanceID join WordRoot as wr on wr.WordRootID=wi.WordRootID where d.InSite=1 and dl.Active=1 limit 100 """ ).load()
我在这里通过Spark SQL进行批量数据迁移dbname参数可以是括号中带有别名的任何查询。因此,就我而言,我需要这样做:
val query = """ (select dl.DialogLineID, dlwim.Sequence, wi.WordRootID from Dialog as d join DialogLine as dl on dl.DialogID=d.DialogID join DialogLineWordInstanceMatch as dlwim on dlwim.DialogLineID=dl.DialogLineID join WordInstance as wi on wi.WordInstanceID=dlwim.WordInstanceID join WordRoot as wr on wr.WordRootID=wi.WordRootID where d.InSite=1 and dl.Active=1 limit 100) foo """ val df = sqlContext.format("jdbc"). option("url", "jdbc:mysql://localhost:3306/local_content"). option("driver", "com.mysql.jdbc.Driver"). option("useUnicode", "true"). option("continueBatchOnError","true"). option("useSSL", "false"). option("user", "root"). option("password", ""). option("dbtable",query). load()
不出所料,将每个表作为自己的数据框加载并将其加入Spark效率很低。