public TaskInfo(Task task) { TaskType ttype = task.getType(); this.type = ttype.toString(); TaskReport report = task.getReport(); this.startTime = report.getStartTime(); this.finishTime = report.getFinishTime(); this.state = report.getTaskState(); this.elapsedTime = Times.elapsed(this.startTime, this.finishTime, this.state == TaskState.RUNNING); if (this.elapsedTime == -1) { this.elapsedTime = 0; } this.progress = report.getProgress() * 100; this.status = report.getStatus(); this.id = MRApps.toString(task.getID()); this.taskNum = task.getID().getId(); this.successful = getSuccessfulAttempt(task); if (successful != null) { this.successfulAttempt = MRApps.toString(successful.getID()); } else { this.successfulAttempt = ""; } }
@Override public JobStateInternal transition(JobImpl job, JobEvent event) { job.completedTaskCount++; LOG.info("Num completed Tasks: " + job.completedTaskCount); JobTaskEvent taskEvent = (JobTaskEvent) event; Task task = job.tasks.get(taskEvent.getTaskID()); if (taskEvent.getState() == TaskState.SUCCEEDED) { taskSucceeded(job, task); } else if (taskEvent.getState() == TaskState.FAILED) { taskFailed(job, task); } else if (taskEvent.getState() == TaskState.KILLED) { taskKilled(job, task); } return checkJobAfterTaskCompletion(job); }
@Test public void testJobError() throws Exception { MRApp app = new MRApp(1, 0, false, this.getClass().getName(), true); Job job = app.submit(new Configuration()); app.waitForState(job, JobState.RUNNING); Assert.assertEquals("Num tasks not correct", 1, job.getTasks().size()); Iterator<Task> it = job.getTasks().values().iterator(); Task task = it.next(); app.waitForState(task, TaskState.RUNNING); //send an invalid event on task at current state app.getContext().getEventHandler().handle( new TaskEvent( task.getID(), TaskEventType.T_SCHEDULE)); //this must lead to job error app.waitForState(job, JobState.ERROR); }
@Test public void testJobRebootNotLastRetryOnUnregistrationFailure() throws Exception { MRApp app = new MRApp(1, 0, false, this.getClass().getName(), true); Job job = app.submit(new Configuration()); app.waitForState(job, JobState.RUNNING); Assert.assertEquals("Num tasks not correct", 1, job.getTasks().size()); Iterator<Task> it = job.getTasks().values().iterator(); Task task = it.next(); app.waitForState(task, TaskState.RUNNING); //send an reboot event app.getContext().getEventHandler().handle(new JobEvent(job.getID(), JobEventType.JOB_AM_REBOOT)); // return exteranl state as RUNNING since otherwise the JobClient will // prematurely exit. app.waitForState(job, JobState.RUNNING); }
@Test public void testJobRebootOnLastRetryOnUnregistrationFailure() throws Exception { // make startCount as 2 since this is last retry which equals to // DEFAULT_MAX_AM_RETRY // The last param mocks the unregistration failure MRApp app = new MRApp(1, 0, false, this.getClass().getName(), true, 2, false); Configuration conf = new Configuration(); Job job = app.submit(conf); app.waitForState(job, JobState.RUNNING); Assert.assertEquals("Num tasks not correct", 1, job.getTasks().size()); Iterator<Task> it = job.getTasks().values().iterator(); Task task = it.next(); app.waitForState(task, TaskState.RUNNING); //send an reboot event app.getContext().getEventHandler().handle(new JobEvent(job.getID(), JobEventType.JOB_AM_REBOOT)); app.waitForInternalState((JobImpl) job, JobStateInternal.REBOOT); // return exteranl state as RUNNING if this is the last retry while // unregistration fails app.waitForState(job, JobState.RUNNING); }
private void testMRAppHistory(MRApp app) throws Exception { Configuration conf = new Configuration(); Job job = app.submit(conf); app.waitForState(job, JobState.FAILED); Map<TaskId, Task> tasks = job.getTasks(); Assert.assertEquals("Num tasks is not correct", 1, tasks.size()); Task task = tasks.values().iterator().next(); Assert.assertEquals("Task state not correct", TaskState.FAILED, task .getReport().getTaskState()); Map<TaskAttemptId, TaskAttempt> attempts = tasks.values().iterator().next() .getAttempts(); Assert.assertEquals("Num attempts is not correct", 4, attempts.size()); Iterator<TaskAttempt> it = attempts.values().iterator(); TaskAttemptReport report = it.next().getReport(); Assert.assertEquals("Attempt state not correct", TaskAttemptState.FAILED, report.getTaskAttemptState()); Assert.assertEquals("Diagnostic Information is not Correct", "Test Diagnostic Event", report.getDiagnosticInfo()); report = it.next().getReport(); Assert.assertEquals("Attempt state not correct", TaskAttemptState.FAILED, report.getTaskAttemptState()); }
private static void completeJobTasks(JobImpl job) { // complete the map tasks and the reduce tasks so we start committing int numMaps = job.getTotalMaps(); for (int i = 0; i < numMaps; ++i) { job.handle(new JobTaskEvent( MRBuilderUtils.newTaskId(job.getID(), 1, TaskType.MAP), TaskState.SUCCEEDED)); Assert.assertEquals(JobState.RUNNING, job.getState()); } int numReduces = job.getTotalReduces(); for (int i = 0; i < numReduces; ++i) { job.handle(new JobTaskEvent( MRBuilderUtils.newTaskId(job.getID(), 1, TaskType.MAP), TaskState.SUCCEEDED)); Assert.assertEquals(JobState.RUNNING, job.getState()); } }
@Test //First attempt is failed and second attempt is passed //The job succeeds. public void testFailTask() throws Exception { MRApp app = new MockFirstFailingAttemptMRApp(1, 0); Configuration conf = new Configuration(); // this test requires two task attempts, but uberization overrides max to 1 conf.setBoolean(MRJobConfig.JOB_UBERTASK_ENABLE, false); Job job = app.submit(conf); app.waitForState(job, JobState.SUCCEEDED); Map<TaskId,Task> tasks = job.getTasks(); Assert.assertEquals("Num tasks is not correct", 1, tasks.size()); Task task = tasks.values().iterator().next(); Assert.assertEquals("Task state not correct", TaskState.SUCCEEDED, task.getReport().getTaskState()); Map<TaskAttemptId, TaskAttempt> attempts = tasks.values().iterator().next().getAttempts(); Assert.assertEquals("Num attempts is not correct", 2, attempts.size()); //one attempt must be failed //and another must have succeeded Iterator<TaskAttempt> it = attempts.values().iterator(); Assert.assertEquals("Attempt state not correct", TaskAttemptState.FAILED, it.next().getReport().getTaskAttemptState()); Assert.assertEquals("Attempt state not correct", TaskAttemptState.SUCCEEDED, it.next().getReport().getTaskAttemptState()); }
@Test //All Task attempts are timed out, leading to Job failure public void testTimedOutTask() throws Exception { MRApp app = new TimeOutTaskMRApp(1, 0); Configuration conf = new Configuration(); int maxAttempts = 2; conf.setInt(MRJobConfig.MAP_MAX_ATTEMPTS, maxAttempts); // disable uberization (requires entire job to be reattempted, so max for // subtask attempts is overridden to 1) conf.setBoolean(MRJobConfig.JOB_UBERTASK_ENABLE, false); Job job = app.submit(conf); app.waitForState(job, JobState.FAILED); Map<TaskId,Task> tasks = job.getTasks(); Assert.assertEquals("Num tasks is not correct", 1, tasks.size()); Task task = tasks.values().iterator().next(); Assert.assertEquals("Task state not correct", TaskState.FAILED, task.getReport().getTaskState()); Map<TaskAttemptId, TaskAttempt> attempts = tasks.values().iterator().next().getAttempts(); Assert.assertEquals("Num attempts is not correct", maxAttempts, attempts.size()); for (TaskAttempt attempt : attempts.values()) { Assert.assertEquals("Attempt state not correct", TaskAttemptState.FAILED, attempt.getReport().getTaskAttemptState()); } }
public static org.apache.hadoop.mapred.TIPStatus fromYarn( TaskState state) { switch (state) { case NEW: case SCHEDULED: return org.apache.hadoop.mapred.TIPStatus.PENDING; case RUNNING: return org.apache.hadoop.mapred.TIPStatus.RUNNING; case KILLED: return org.apache.hadoop.mapred.TIPStatus.KILLED; case SUCCEEDED: return org.apache.hadoop.mapred.TIPStatus.COMPLETE; case FAILED: return org.apache.hadoop.mapred.TIPStatus.FAILED; } throw new YarnRuntimeException("Unrecognized task state: " + state); }
@Test public void testEnums() throws Exception { for (YarnApplicationState applicationState : YarnApplicationState.values()) { TypeConverter.fromYarn(applicationState, FinalApplicationStatus.FAILED); } // ad hoc test of NEW_SAVING, which is newly added Assert.assertEquals(State.PREP, TypeConverter.fromYarn( YarnApplicationState.NEW_SAVING, FinalApplicationStatus.FAILED)); for (TaskType taskType : TaskType.values()) { TypeConverter.fromYarn(taskType); } for (JobState jobState : JobState.values()) { TypeConverter.fromYarn(jobState); } for (QueueState queueState : QueueState.values()) { TypeConverter.fromYarn(queueState); } for (TaskState taskState : TaskState.values()) { TypeConverter.fromYarn(taskState); } }
private long computeFinishedMaps(JobInfo jobInfo, int numMaps, int numSuccessfulMaps) { if (numMaps == numSuccessfulMaps) { return jobInfo.getFinishedMaps(); } long numFinishedMaps = 0; Map<org.apache.hadoop.mapreduce.TaskID, TaskInfo> taskInfos = jobInfo .getAllTasks(); for (TaskInfo taskInfo : taskInfos.values()) { if (TaskState.SUCCEEDED.toString().equals(taskInfo.getTaskStatus())) { ++numFinishedMaps; } } return numFinishedMaps; }
private Task getTask(long timestamp) { JobId jobId = new JobIdPBImpl(); jobId.setId(0); jobId.setAppId(ApplicationIdPBImpl.newInstance(timestamp,1)); TaskId taskId = new TaskIdPBImpl(); taskId.setId(0); taskId.setTaskType(TaskType.REDUCE); taskId.setJobId(jobId); Task task = mock(Task.class); when(task.getID()).thenReturn(taskId); TaskReport report = mock(TaskReport.class); when(report.getProgress()).thenReturn(0.7f); when(report.getTaskState()).thenReturn(TaskState.SUCCEEDED); when(report.getStartTime()).thenReturn(100001L); when(report.getFinishTime()).thenReturn(100011L); when(task.getReport()).thenReturn(report); when(task.getType()).thenReturn(TaskType.REDUCE); return task; }
@Override public void transition(JobImpl job, JobEvent event) { //get number of shuffling reduces int shufflingReduceTasks = 0; for (TaskId taskId : job.reduceTasks) { Task task = job.tasks.get(taskId); if (TaskState.RUNNING.equals(task.getState())) { for(TaskAttempt attempt : task.getAttempts().values()) { if(attempt.getPhase() == Phase.SHUFFLE) { shufflingReduceTasks++; break; } } } } JobTaskAttemptFetchFailureEvent fetchfailureEvent = (JobTaskAttemptFetchFailureEvent) event; for (org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId mapId : fetchfailureEvent.getMaps()) { Integer fetchFailures = job.fetchFailuresMapping.get(mapId); fetchFailures = (fetchFailures == null) ? 1 : (fetchFailures+1); job.fetchFailuresMapping.put(mapId, fetchFailures); float failureRate = shufflingReduceTasks == 0 ? 1.0f : (float) fetchFailures / shufflingReduceTasks; // declare faulty if fetch-failures >= max-allowed-failures if (fetchFailures >= job.getMaxFetchFailuresNotifications() && failureRate >= job.getMaxAllowedFetchFailuresFraction()) { LOG.info("Too many fetch-failures for output of task attempt: " + mapId + " ... raising fetch failure to map"); job.eventHandler.handle(new TaskAttemptEvent(mapId, TaskAttemptEventType.TA_TOO_MANY_FETCH_FAILURE)); job.fetchFailuresMapping.remove(mapId); } } }
@Override public TaskState getState() { readLock.lock(); try { return getExternalState(getInternalState()); } finally { readLock.unlock(); } }
private static TaskState getExternalState(TaskStateInternal smState) { if (smState == TaskStateInternal.KILL_WAIT) { return TaskState.KILLED; } else { return TaskState.valueOf(smState.name()); } }
private void sendTaskSucceededEvents() { eventHandler.handle(new JobTaskEvent(taskId, TaskState.SUCCEEDED)); LOG.info("Task succeeded with attempt " + successfulAttempt); if (historyTaskStartGenerated) { TaskFinishedEvent tfe = createTaskFinishedEvent(this, TaskStateInternal.SUCCEEDED); eventHandler.handle(new JobHistoryEvent(taskId.getJobId(), tfe)); } }
@Test public void testKillJob() throws Exception { final CountDownLatch latch = new CountDownLatch(1); MRApp app = new BlockingMRApp(1, 0, latch); //this will start the job but job won't complete as task is //blocked Job job = app.submit(new Configuration()); //wait and vailidate for Job to become RUNNING app.waitForState(job, JobState.RUNNING); //send the kill signal to Job app.getContext().getEventHandler().handle( new JobEvent(job.getID(), JobEventType.JOB_KILL)); //unblock Task latch.countDown(); //wait and validate for Job to be KILLED app.waitForState(job, JobState.KILLED); Map<TaskId,Task> tasks = job.getTasks(); Assert.assertEquals("No of tasks is not correct", 1, tasks.size()); Task task = tasks.values().iterator().next(); Assert.assertEquals("Task state not correct", TaskState.KILLED, task.getReport().getTaskState()); Map<TaskAttemptId, TaskAttempt> attempts = tasks.values().iterator().next().getAttempts(); Assert.assertEquals("No of attempts is not correct", 1, attempts.size()); Iterator<TaskAttempt> it = attempts.values().iterator(); Assert.assertEquals("Attempt state not correct", TaskAttemptState.KILLED, it.next().getReport().getTaskAttemptState()); }
@Test public void testCommitPending() throws Exception { MRApp app = new MRApp(1, 0, false, this.getClass().getName(), true); Job job = app.submit(new Configuration()); app.waitForState(job, JobState.RUNNING); Assert.assertEquals("Num tasks not correct", 1, job.getTasks().size()); Iterator<Task> it = job.getTasks().values().iterator(); Task task = it.next(); app.waitForState(task, TaskState.RUNNING); TaskAttempt attempt = task.getAttempts().values().iterator().next(); app.waitForState(attempt, TaskAttemptState.RUNNING); //send the commit pending signal to the task app.getContext().getEventHandler().handle( new TaskAttemptEvent( attempt.getID(), TaskAttemptEventType.TA_COMMIT_PENDING)); //wait for first attempt to commit pending app.waitForState(attempt, TaskAttemptState.COMMIT_PENDING); //re-send the commit pending signal to the task app.getContext().getEventHandler().handle( new TaskAttemptEvent( attempt.getID(), TaskAttemptEventType.TA_COMMIT_PENDING)); //the task attempt should be still at COMMIT_PENDING app.waitForState(attempt, TaskAttemptState.COMMIT_PENDING); //send the done signal to the task app.getContext().getEventHandler().handle( new TaskAttemptEvent( task.getAttempts().values().iterator().next().getID(), TaskAttemptEventType.TA_DONE)); app.waitForState(job, JobState.SUCCEEDED); }
@Test public void checkTaskStateTypeConversion() { //verify that all states can be converted without // throwing an exception for (TaskState state : TaskState.values()) { TypeConverter.fromYarn(state); } }
@Test public void testAbortJobCalledAfterKillingTasks() throws IOException { Configuration conf = new Configuration(); conf.set(MRJobConfig.MR_AM_STAGING_DIR, stagingDir); conf.set(MRJobConfig.MR_AM_COMMITTER_CANCEL_TIMEOUT_MS, "1000"); InlineDispatcher dispatcher = new InlineDispatcher(); dispatcher.init(conf); dispatcher.start(); OutputCommitter committer = Mockito.mock(OutputCommitter.class); CommitterEventHandler commitHandler = createCommitterEventHandler(dispatcher, committer); commitHandler.init(conf); commitHandler.start(); JobImpl job = createRunningStubbedJob(conf, dispatcher, 2, null); //Fail one task. This should land the JobImpl in the FAIL_WAIT state job.handle(new JobTaskEvent( MRBuilderUtils.newTaskId(job.getID(), 1, TaskType.MAP), TaskState.FAILED)); //Verify abort job hasn't been called Mockito.verify(committer, Mockito.never()) .abortJob((JobContext) Mockito.any(), (State) Mockito.any()); assertJobState(job, JobStateInternal.FAIL_WAIT); //Verify abortJob is called once and the job failed Mockito.verify(committer, Mockito.timeout(2000).times(1)) .abortJob((JobContext) Mockito.any(), (State) Mockito.any()); assertJobState(job, JobStateInternal.FAILED); dispatcher.stop(); }