public static void main(String[] args) throws Exception { SparkConf sparkConf = new SparkConf().setAppName(APP_NAME); final JavaSparkContext sc = new JavaSparkContext(sparkConf); // Example of implementing a progress reporter for a simple job. JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1, 2, 3, 4, 5), 5).map( new IdentityWithDelay<Integer>()); JavaFutureAction<List<Integer>> jobFuture = rdd.collectAsync(); while (!jobFuture.isDone()) { Thread.sleep(1000); // 1 second List<Integer> jobIds = jobFuture.jobIds(); if (jobIds.isEmpty()) { continue; } int currentJobId = jobIds.get(jobIds.size() - 1); SparkJobInfo jobInfo = sc.statusTracker().getJobInfo(currentJobId); SparkStageInfo stageInfo = sc.statusTracker().getStageInfo(jobInfo.stageIds()[0]); System.out.println(stageInfo.numTasks() + " tasks total: " + stageInfo.numActiveTasks() + " active, " + stageInfo.numCompletedTasks() + " complete"); } System.out.println("Job results are: " + jobFuture.get()); sc.stop(); }
public LocalSparkJobStatus(JavaSparkContext sparkContext, int jobId, JavaFutureAction<Void> future) { this.sparkContext = sparkContext; this.jobId = jobId; this.future = future; }