private long computeFinishedMaps(JobInfo jobInfo, int numMaps, int numSuccessfulMaps) { if (numMaps == numSuccessfulMaps) { return jobInfo.getFinishedMaps(); } long numFinishedMaps = 0; Map<org.apache.hadoop.mapreduce.TaskID, TaskInfo> taskInfos = jobInfo .getAllTasks(); for (TaskInfo taskInfo : taskInfos.values()) { if (TaskState.SUCCEEDED.toString().equals(taskInfo.getTaskStatus())) { ++numFinishedMaps; } } return numFinishedMaps; }
@Test public void testFailedJobHistoryWithoutDiagnostics() throws Exception { final Path histPath = new Path(getClass().getClassLoader().getResource( "job_1393307629410_0001-1393307687476-user-Sleep+job-1393307723835-0-0-FAILED-default-1393307693920.jhist") .getFile()); final FileSystem lfs = FileSystem.getLocal(new Configuration()); final FSDataInputStream fsdis = lfs.open(histPath); try { JobHistoryParser parser = new JobHistoryParser(fsdis); JobInfo info = parser.parse(); assertEquals("History parsed jobId incorrectly", info.getJobId(), JobID.forName("job_1393307629410_0001") ); assertEquals("Default diagnostics incorrect ", "", info.getErrorInfo()); } finally { fsdis.close(); } }
/** * Creates job, task, and task attempt entities based on the job history info * and configuration. * * Note: currently these are plan timeline entities created for mapreduce * types. These are not meant to be the complete and accurate entity set-up * for mapreduce jobs. We do not leverage hierarchical timeline entities. If * we create canonical mapreduce hierarchical timeline entities with proper * parent-child relationship, we could modify this to use that instead. * * Note that we also do not add info to the YARN application entity, which * would be needed for aggregation. */ public Set<TimelineEntity> createTimelineEntities(JobInfo jobInfo, Configuration conf) { Set<TimelineEntity> entities = new HashSet<>(); // create the job entity TimelineEntity job = createJobEntity(jobInfo, conf); entities.add(job); // create the task and task attempt entities Set<TimelineEntity> tasksAndAttempts = createTaskAndTaskAttemptEntities(jobInfo); entities.addAll(tasksAndAttempts); return entities; }
private TimelineEntity createJobEntity(JobInfo jobInfo, Configuration conf) { TimelineEntity job = new TimelineEntity(); job.setEntityType(JOB); job.setEntityId(jobInfo.getJobId().toString()); job.setStartTime(jobInfo.getSubmitTime()); job.addPrimaryFilter("JOBNAME", jobInfo.getJobname()); job.addPrimaryFilter("USERNAME", jobInfo.getUsername()); job.addOtherInfo("JOB_QUEUE_NAME", jobInfo.getJobQueueName()); job.addOtherInfo("SUBMIT_TIME", jobInfo.getSubmitTime()); job.addOtherInfo("LAUNCH_TIME", jobInfo.getLaunchTime()); job.addOtherInfo("FINISH_TIME", jobInfo.getFinishTime()); job.addOtherInfo("JOB_STATUS", jobInfo.getJobStatus()); job.addOtherInfo("PRIORITY", jobInfo.getPriority()); job.addOtherInfo("TOTAL_MAPS", jobInfo.getTotalMaps()); job.addOtherInfo("TOTAL_REDUCES", jobInfo.getTotalReduces()); job.addOtherInfo("UBERIZED", jobInfo.getUberized()); job.addOtherInfo("ERROR_INFO", jobInfo.getErrorInfo()); LOG.info("converted job " + jobInfo.getJobId() + " to a timeline entity"); return job; }
@Test (timeout=30000) public void testCompletedJobWithDiagnostics() throws Exception { final String jobError = "Job Diagnostics"; JobInfo jobInfo = spy(new JobInfo()); when(jobInfo.getErrorInfo()).thenReturn(jobError); when(jobInfo.getJobStatus()).thenReturn(JobState.FAILED.toString()); when(jobInfo.getAMInfos()).thenReturn(Collections.<JobHistoryParser.AMInfo>emptyList()); final JobHistoryParser mockParser = mock(JobHistoryParser.class); when(mockParser.parse()).thenReturn(jobInfo); HistoryFileInfo info = mock(HistoryFileInfo.class); when(info.getConfFile()).thenReturn(fullConfPath); when(info.getHistoryFile()).thenReturn(fullHistoryPath); CompletedJob job = new CompletedJob(conf, jobId, fullHistoryPath, loadTasks, "user", info, jobAclsManager) { @Override protected JobHistoryParser createJobHistoryParser( Path historyFileAbsolute) throws IOException { return mockParser; } }; assertEquals(jobError, job.getReport().getDiagnostics()); }
/** * This method is responsible for populating the setup phase details. * @return TaskOutputDetails contains the details of the set up phase. */ private PhaseDetails prepareSetupDetails(JobInfo jobInfo,Map<TaskAttemptID, TaskAttemptInfo> tasks){ PhaseDetails phaseDetails = new PhaseDetails(); List<TaskOutputDetails> taskOutputDetails = new ArrayList<TaskOutputDetails>(); TaskOutputDetails tod; tod = new TaskOutputDetails(); tod.setTaskType("SETUP"); tod.setTaskID("Setup"); for (Map.Entry<TaskAttemptID, TaskAttemptInfo> task : tasks .entrySet()) { TaskAttemptInfo taskAttemptInfo = (TaskAttemptInfo) (task.getValue()); tod.setLocation(taskAttemptInfo.getHostname()); } long startPoint = jobInfo.getSubmitTime(); tod.setStartPoint(0); long endPoint = (jobInfo.getLaunchTime()-startPoint) / CONVERSION_FACTOR_MILLISECS_TO_SECS; tod.setEndPoint(endPoint); tod.setDataFlowRate(0); taskOutputDetails.add(tod); phaseDetails.setTaskOutputDetails(taskOutputDetails); phaseDetails.setAvgDataFlowRate(0); return phaseDetails; }
/** * This method is responsible for populating the clean up phase details. * @return TaskOutputDetails contains the details of the clean up phase. */ private PhaseDetails prepareCleanupDetails(JobInfo jobInfo, Map<TaskAttemptID, TaskAttemptInfo> tasks){ PhaseDetails phaseDetails = new PhaseDetails(); List<TaskOutputDetails> cleanupTaskOuptputDetails = new ArrayList<TaskOutputDetails>(); TaskOutputDetails taskOutputDetails = new TaskOutputDetails(); taskOutputDetails.setTaskType("CLEANUP"); taskOutputDetails.setTaskID("Cleanup"); for (Map.Entry<TaskAttemptID, TaskAttemptInfo> task : tasks .entrySet()) { TaskAttemptInfo taskAttemptInfo = (TaskAttemptInfo) (task.getValue()); taskOutputDetails.setLocation(taskAttemptInfo.getHostname()); } long startPoint = getMaxReduceTime(tasks,jobInfo.getSubmitTime()); taskOutputDetails.setStartPoint(startPoint); long endPoint = (jobInfo.getFinishTime() - jobInfo.getSubmitTime())/CONVERSION_FACTOR_MILLISECS_TO_SECS; taskOutputDetails.setEndPoint(endPoint); taskOutputDetails.setDataFlowRate(0); cleanupTaskOuptputDetails.add(taskOutputDetails); phaseDetails.setTaskOutputDetails(cleanupTaskOuptputDetails); phaseDetails.setAvgDataFlowRate(0); return phaseDetails; }
private static void validateJobLevelKeyValuesFormat(JobInfo jobInfo, String status) { long submitTime = jobInfo.getSubmitTime(); long launchTime = jobInfo.getLaunchTime(); long finishTime = jobInfo.getFinishTime(); assertTrue("Invalid submit time", submitTime > 0); assertTrue("SubmitTime > LaunchTime", submitTime <= launchTime); assertTrue("LaunchTime > FinishTime", launchTime <= finishTime); String stat = jobInfo.getJobStatus(); assertTrue("Unexpected JOB_STATUS \"" + stat + "\" is seen in" + " history file", (status.equals(stat))); String priority = jobInfo.getPriority(); assertNotNull(priority); assertTrue("Unknown priority for the job in history file", (priority.equals("HIGH") || priority.equals("LOW") || priority.equals("NORMAL") || priority.equals("VERY_HIGH") || priority.equals("VERY_LOW"))); }
private void populate_Job (Hashtable<Enum, String> job, JobInfo jobInfo) throws ParseException { job.put(JobKeys.FINISH_TIME, String.valueOf(jobInfo.getFinishTime())); job.put(JobKeys.JOBID, jobInfo.getJobId().toString()); job.put(JobKeys.JOBNAME, jobInfo.getJobname()); job.put(JobKeys.USER, jobInfo.getUsername()); job.put(JobKeys.JOBCONF, jobInfo.getJobConfPath()); job.put(JobKeys.SUBMIT_TIME, String.valueOf(jobInfo.getSubmitTime())); job.put(JobKeys.LAUNCH_TIME, String.valueOf(jobInfo.getLaunchTime())); job.put(JobKeys.TOTAL_MAPS, String.valueOf(jobInfo.getTotalMaps())); job.put(JobKeys.TOTAL_REDUCES, String.valueOf(jobInfo.getTotalReduces())); job.put(JobKeys.FAILED_MAPS, String.valueOf(jobInfo.getFailedMaps())); job.put(JobKeys.FAILED_REDUCES, String.valueOf(jobInfo.getFailedReduces())); job.put(JobKeys.FINISHED_MAPS, String.valueOf(jobInfo.getFinishedMaps())); job.put(JobKeys.FINISHED_REDUCES, String.valueOf(jobInfo.getFinishedReduces())); job.put(JobKeys.STATUS, jobInfo.getJobStatus().toString()); job.put(JobKeys.JOB_PRIORITY, jobInfo.getPriority()); parseAndAddJobCounters(job, jobInfo.getTotalCounters().toString()); }
/** * @param jobConfFile - URL pointing to job configuration (job_conf.xml) file * @param jobHistoryFile - URL pointing to job history log file * @param testsConfFileIs - file path for test configuration file (optional). * If not specified default path is:$HADOOP_PREFIX/contrib/vaidya/pxpd_tests_config.xml * @param reportFile - file path for storing report (optional) */ public PostExPerformanceDiagnoser (String jobConfFile, String jobHistoryFile, InputStream testsConfFileIs, String reportFile) throws Exception { this._jobHistoryFile = jobHistoryFile; this._testsConfFileIs = testsConfFileIs; this._reportFile = reportFile; this._jobConfFile = jobConfFile; /* * Read the job information necessary for post performance analysis */ JobConf jobConf = new JobConf(); JobInfo jobInfo = readJobInformation(jobConf); this._jobExecutionStatistics = new JobStatistics(jobConf, jobInfo); }
/** Apply the filter (status) on the parsed job and generate summary */ public FilteredJob(JobInfo job, String status) { filter = status; Map<TaskID, JobHistoryParser.TaskInfo> tasks = job.getAllTasks(); for (JobHistoryParser.TaskInfo task : tasks.values()) { Map<TaskAttemptID, JobHistoryParser.TaskAttemptInfo> attempts = task.getAllTaskAttempts(); for (JobHistoryParser.TaskAttemptInfo attempt : attempts.values()) { if (attempt.getTaskStatus().equals(status)) { String hostname = attempt.getHostname(); TaskID id = attempt.getAttemptId().getTaskID(); Set<TaskID> set = badNodesToFilteredTasks.get(hostname); if (set == null) { set = new TreeSet<TaskID>(); set.add(id); badNodesToFilteredTasks.put(hostname, set); }else{ set.add(id); } } } } }
/** * Test compatibility of JobHistoryParser with 2.0.3-alpha history files * @throws IOException */ @Test public void testTaskAttemptUnsuccessfulCompletionWithoutCounters203() throws IOException { Path histPath = new Path(getClass().getClassLoader().getResource( "job_2.0.3-alpha-FAILED.jhist").getFile()); JobHistoryParser parser = new JobHistoryParser(FileSystem.getLocal (new Configuration()), histPath); JobInfo jobInfo = parser.parse(); LOG.info(" job info: " + jobInfo.getJobname() + " " + jobInfo.getFinishedMaps() + " " + jobInfo.getTotalMaps() + " " + jobInfo.getJobId() ) ; }
/** * Test compatibility of JobHistoryParser with 2.4.0 history files * @throws IOException */ @Test public void testTaskAttemptUnsuccessfulCompletionWithoutCounters240() throws IOException { Path histPath = new Path(getClass().getClassLoader().getResource( "job_2.4.0-FAILED.jhist").getFile()); JobHistoryParser parser = new JobHistoryParser(FileSystem.getLocal (new Configuration()), histPath); JobInfo jobInfo = parser.parse(); LOG.info(" job info: " + jobInfo.getJobname() + " " + jobInfo.getFinishedMaps() + " " + jobInfo.getTotalMaps() + " " + jobInfo.getJobId() ); }
/** * Test compatibility of JobHistoryParser with 0.23.9 history files * @throws IOException */ @Test public void testTaskAttemptUnsuccessfulCompletionWithoutCounters0239() throws IOException { Path histPath = new Path(getClass().getClassLoader().getResource( "job_0.23.9-FAILED.jhist").getFile()); JobHistoryParser parser = new JobHistoryParser(FileSystem.getLocal (new Configuration()), histPath); JobInfo jobInfo = parser.parse(); LOG.info(" job info: " + jobInfo.getJobname() + " " + jobInfo.getFinishedMaps() + " " + jobInfo.getTotalMaps() + " " + jobInfo.getJobId() ) ; }
private Set<TimelineEntity> createTaskAndTaskAttemptEntities(JobInfo jobInfo) { Set<TimelineEntity> entities = new HashSet<>(); Map<TaskID,TaskInfo> taskInfoMap = jobInfo.getAllTasks(); LOG.info("job " + jobInfo.getJobId()+ " has " + taskInfoMap.size() + " tasks"); for (TaskInfo taskInfo: taskInfoMap.values()) { TimelineEntity task = createTaskEntity(taskInfo); entities.add(task); // add the task attempts from this task Set<TimelineEntity> taskAttempts = createTaskAttemptEntities(taskInfo); entities.addAll(taskAttempts); } return entities; }