我正在尝试使用Spark在S3事件上创建一个简单的SQL查询。我正在加载约30GB的JSON文件,如下所示:
val d2 = spark.read.json("s3n://myData/2017/02/01/1234"); d2.persist(org.apache.spark.storage.StorageLevel.MEMORY_AND_DISK); d2.registerTempTable("d2");
然后,我尝试写入文件以查询结果:
val users_count = sql("select count(distinct data.user_id) from d2"); users_count.write.format("com.databricks.spark.csv").option("header", "true").save("s3n://myfolder/UsersCount.csv");
但是Spark抛出以下异常:
java.lang.IllegalArgumentException: Size exceeds Integer.MAX_VALUE at sun.nio.ch.FileChannelImpl.map(FileChannelImpl.java:869) at org.apache.spark.storage.DiskStore$$anonfun$getBytes$2.apply(DiskStore.scala:103) at org.apache.spark.storage.DiskStore$$anonfun$getBytes$2.apply(DiskStore.scala:91) at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1287) at org.apache.spark.storage.DiskStore.getBytes(DiskStore.scala:105) at org.apache.spark.storage.BlockManager.getLocalValues(BlockManager.scala:439) at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:672) at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:330) at org.apache.spark.rdd.RDD.iterator(RDD.scala:281) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319) at org.apache.spark.rdd.RDD.iterator(RDD.scala:283) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319) at org.apache.spark.rdd.RDD.iterator(RDD.scala:283) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319) at org.apache.spark.rdd.RDD.iterator(RDD.scala:283) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:79) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:47) at org.apache.spark.scheduler.Task.run(Task.scala:85) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745)
请注意,同一查询适用于少量数据。这是什么问题
任何Spark shuffle块都不能大于2GB(Integer.MAX_VALUE字节),因此您需要更多/更小的分区。
您应该调整spark.default.parallelism和spark.sql.shuffle.partitions(默认为200),以使分区数可以容纳您的数据而不会达到2GB的限制(您可以尝试针对256MB /分区,因此对于200GB,您可以获得800 GB分区)。成千上万个分区非常常见,因此请不要害怕建议将分区重新分区为1000。
仅供参考,您可以使用rdd.getNumPartitions(即d2.rdd.getNumPartitions)之类的内容来检查RDD的分区数
有一个故事可以跟踪解决各种2GB限制的工作(现已开放一段时间):https : //issues.apache.org/jira/browse/SPARK-6235