/** * TaskAttempt is reporting that it is in commit_pending and it is waiting for * the commit Response * * <br> * Commit it a two-phased protocol. First the attempt informs the * ApplicationMaster that it is * {@link #commitPending(TaskAttemptID, TaskStatus)}. Then it repeatedly polls * the ApplicationMaster whether it {@link #canCommit(TaskAttemptID)} This is * a legacy from the centralized commit protocol handling by the JobTracker. */ @Override public void commitPending(TaskAttemptID taskAttemptID, TaskStatus taskStatsu) throws IOException, InterruptedException { LOG.info("Commit-pending state update from " + taskAttemptID.toString()); // An attempt is asking if it can commit its output. This can be decided // only by the task which is managing the multiple attempts. So redirect the // request there. org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId attemptID = TypeConverter.toYarn(taskAttemptID); taskHeartbeatHandler.progressing(attemptID); //Ignorable TaskStatus? - since a task will send a LastStatusUpdate context.getEventHandler().handle( new TaskAttemptEvent(attemptID, TaskAttemptEventType.TA_COMMIT_PENDING)); }
@SuppressWarnings("unchecked") @Override public KillTaskAttemptResponse killTaskAttempt( KillTaskAttemptRequest request) throws IOException { TaskAttemptId taskAttemptId = request.getTaskAttemptId(); UserGroupInformation callerUGI = UserGroupInformation.getCurrentUser(); String message = "Kill task attempt " + taskAttemptId + " received from " + callerUGI + " at " + Server.getRemoteAddress(); LOG.info(message); verifyAndGetAttempt(taskAttemptId, JobACL.MODIFY_JOB); appContext.getEventHandler().handle( new TaskAttemptDiagnosticsUpdateEvent(taskAttemptId, message)); appContext.getEventHandler().handle( new TaskAttemptEvent(taskAttemptId, TaskAttemptEventType.TA_KILL)); KillTaskAttemptResponse response = recordFactory.newRecordInstance(KillTaskAttemptResponse.class); return response; }
@SuppressWarnings("unchecked") @Override public FailTaskAttemptResponse failTaskAttempt( FailTaskAttemptRequest request) throws IOException { TaskAttemptId taskAttemptId = request.getTaskAttemptId(); UserGroupInformation callerUGI = UserGroupInformation.getCurrentUser(); String message = "Fail task attempt " + taskAttemptId + " received from " + callerUGI + " at " + Server.getRemoteAddress(); LOG.info(message); verifyAndGetAttempt(taskAttemptId, JobACL.MODIFY_JOB); appContext.getEventHandler().handle( new TaskAttemptDiagnosticsUpdateEvent(taskAttemptId, message)); appContext.getEventHandler().handle( new TaskAttemptEvent(taskAttemptId, TaskAttemptEventType.TA_FAILMSG)); FailTaskAttemptResponse response = recordFactory. newRecordInstance(FailTaskAttemptResponse.class); return response; }
@Override protected void attemptLaunched(TaskAttemptId attemptID) { if (attemptID.getTaskId().getId() == 0 && attemptID.getId() == 0) { //this blocks the first task's first attempt //the subsequent ones are completed try { latch.await(); } catch (InterruptedException e) { e.printStackTrace(); } } else { getContext().getEventHandler().handle( new TaskAttemptEvent(attemptID, TaskAttemptEventType.TA_DONE)); } }
@Override public void handle(ContainerLauncherEvent event) { switch (event.getType()) { case CONTAINER_REMOTE_LAUNCH: getContext().getEventHandler().handle( new TaskAttemptContainerLaunchedEvent(event.getTaskAttemptID(), shufflePort)); attemptLaunched(event.getTaskAttemptID()); break; case CONTAINER_REMOTE_CLEANUP: getContext().getEventHandler().handle( new TaskAttemptEvent(event.getTaskAttemptID(), TaskAttemptEventType.TA_CONTAINER_CLEANED)); break; } }
@Override public void fatalError(TaskAttemptID taskAttemptID, String msg) throws IOException { // This happens only in Child and in the Task. LOG.fatal("Task: " + taskAttemptID + " - exited : " + msg); reportDiagnosticInfo(taskAttemptID, "Error: " + msg); org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId attemptID = TypeConverter.toYarn(taskAttemptID); // handling checkpoints preemptionPolicy.handleFailedContainer(attemptID); context.getEventHandler().handle( new TaskAttemptEvent(attemptID, TaskAttemptEventType.TA_FAILMSG)); }
@Override public void fsError(TaskAttemptID taskAttemptID, String message) throws IOException { // This happens only in Child. LOG.fatal("Task: " + taskAttemptID + " - failed due to FSError: " + message); reportDiagnosticInfo(taskAttemptID, "FSError: " + message); org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId attemptID = TypeConverter.toYarn(taskAttemptID); // handling checkpoints preemptionPolicy.handleFailedContainer(attemptID); context.getEventHandler().handle( new TaskAttemptEvent(attemptID, TaskAttemptEventType.TA_FAILMSG)); }
@SuppressWarnings("unchecked") @Override public FailTaskAttemptResponse failTaskAttempt( FailTaskAttemptRequest request) throws IOException { TaskAttemptId taskAttemptId = request.getTaskAttemptId(); UserGroupInformation callerUGI = UserGroupInformation.getCurrentUser(); String message = "Fail task attempt " + taskAttemptId + " received from " + callerUGI + " at " + Server.getRemoteAddress(); LOG.info(message); verifyAndGetAttempt(taskAttemptId, JobACL.MODIFY_JOB); appContext.getEventHandler().handle( new TaskAttemptDiagnosticsUpdateEvent(taskAttemptId, message)); appContext.getEventHandler().handle( new TaskAttemptEvent(taskAttemptId, TaskAttemptEventType.TA_FAILMSG_BY_CLIENT)); FailTaskAttemptResponse response = recordFactory. newRecordInstance(FailTaskAttemptResponse.class); return response; }
@Test public void testTaskAttemptDiagnosticEventOnFinishing() throws Exception { MockEventHandler eventHandler = new MockEventHandler(); TaskAttemptImpl taImpl = createTaskAttemptImpl(eventHandler); taImpl.handle(new TaskAttemptEvent(taImpl.getID(), TaskAttemptEventType.TA_DONE)); assertEquals("Task attempt is not in RUNNING state", taImpl.getState(), TaskAttemptState.SUCCEEDED); assertEquals("Task attempt's internal state is not " + "SUCCESS_FINISHING_CONTAINER", taImpl.getInternalState(), TaskAttemptStateInternal.SUCCESS_FINISHING_CONTAINER); // TA_DIAGNOSTICS_UPDATE doesn't change state taImpl.handle(new TaskAttemptDiagnosticsUpdateEvent(taImpl.getID(), "Task got updated")); assertEquals("Task attempt is not in RUNNING state", taImpl.getState(), TaskAttemptState.SUCCEEDED); assertEquals("Task attempt's internal state is not " + "SUCCESS_FINISHING_CONTAINER", taImpl.getInternalState(), TaskAttemptStateInternal.SUCCESS_FINISHING_CONTAINER); assertFalse("InternalError occurred", eventHandler.internalError); }
@Test public void testTimeoutWhileSuccessFinishing() throws Exception { MockEventHandler eventHandler = new MockEventHandler(); TaskAttemptImpl taImpl = createTaskAttemptImpl(eventHandler); taImpl.handle(new TaskAttemptEvent(taImpl.getID(), TaskAttemptEventType.TA_DONE)); assertEquals("Task attempt is not in RUNNING state", taImpl.getState(), TaskAttemptState.SUCCEEDED); assertEquals("Task attempt's internal state is not " + "SUCCESS_FINISHING_CONTAINER", taImpl.getInternalState(), TaskAttemptStateInternal.SUCCESS_FINISHING_CONTAINER); // If the task stays in SUCCESS_FINISHING_CONTAINER for too long, // TaskAttemptListenerImpl will time out the attempt. taImpl.handle(new TaskAttemptEvent(taImpl.getID(), TaskAttemptEventType.TA_TIMED_OUT)); assertEquals("Task attempt is not in RUNNING state", taImpl.getState(), TaskAttemptState.SUCCEEDED); assertEquals("Task attempt's internal state is not " + "SUCCESS_CONTAINER_CLEANUP", taImpl.getInternalState(), TaskAttemptStateInternal.SUCCESS_CONTAINER_CLEANUP); assertFalse("InternalError occurred", eventHandler.internalError); }
@Test public void testTimeoutWhileFailFinishing() throws Exception { MockEventHandler eventHandler = new MockEventHandler(); TaskAttemptImpl taImpl = createTaskAttemptImpl(eventHandler); taImpl.handle(new TaskAttemptEvent(taImpl.getID(), TaskAttemptEventType.TA_FAILMSG)); assertEquals("Task attempt is not in RUNNING state", taImpl.getState(), TaskAttemptState.FAILED); assertEquals("Task attempt's internal state is not " + "FAIL_FINISHING_CONTAINER", taImpl.getInternalState(), TaskAttemptStateInternal.FAIL_FINISHING_CONTAINER); // If the task stays in FAIL_FINISHING_CONTAINER for too long, // TaskAttemptListenerImpl will time out the attempt. taImpl.handle(new TaskAttemptEvent(taImpl.getID(), TaskAttemptEventType.TA_TIMED_OUT)); assertEquals("Task attempt's internal state is not FAIL_CONTAINER_CLEANUP", taImpl.getInternalState(), TaskAttemptStateInternal.FAIL_CONTAINER_CLEANUP); assertFalse("InternalError occurred", eventHandler.internalError); }
@Override public void done(TaskAttemptID taskAttemptID) throws IOException { LOG.info("Done acknowledgement from " + taskAttemptID.toString()); org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId attemptID = TypeConverter.toYarn(taskAttemptID); taskHeartbeatHandler.progressing(attemptID); context.getEventHandler().handle( new TaskAttemptEvent(attemptID, TaskAttemptEventType.TA_DONE)); }
@Override public void fatalError(TaskAttemptID taskAttemptID, String msg) throws IOException { // This happens only in Child and in the Task. LOG.fatal("Task: " + taskAttemptID + " - exited : " + msg); reportDiagnosticInfo(taskAttemptID, "Error: " + msg); org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId attemptID = TypeConverter.toYarn(taskAttemptID); context.getEventHandler().handle( new TaskAttemptEvent(attemptID, TaskAttemptEventType.TA_FAILMSG)); }
@Override public void fsError(TaskAttemptID taskAttemptID, String message) throws IOException { // This happens only in Child. LOG.fatal("Task: " + taskAttemptID + " - failed due to FSError: " + message); reportDiagnosticInfo(taskAttemptID, "FSError: " + message); org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId attemptID = TypeConverter.toYarn(taskAttemptID); context.getEventHandler().handle( new TaskAttemptEvent(attemptID, TaskAttemptEventType.TA_FAILMSG)); }
@SuppressWarnings("unchecked") protected void handleTaskAbort(CommitterTaskAbortEvent event) { try { committer.abortTask(event.getAttemptContext()); } catch (Exception e) { LOG.warn("Task cleanup failed for attempt " + event.getAttemptID(), e); } context.getEventHandler().handle( new TaskAttemptEvent(event.getAttemptID(), TaskAttemptEventType.TA_CLEANUP_DONE)); }
@Override public void transition(JobImpl job, JobEvent event) { //get number of shuffling reduces int shufflingReduceTasks = 0; for (TaskId taskId : job.reduceTasks) { Task task = job.tasks.get(taskId); if (TaskState.RUNNING.equals(task.getState())) { for(TaskAttempt attempt : task.getAttempts().values()) { if(attempt.getPhase() == Phase.SHUFFLE) { shufflingReduceTasks++; break; } } } } JobTaskAttemptFetchFailureEvent fetchfailureEvent = (JobTaskAttemptFetchFailureEvent) event; for (org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId mapId : fetchfailureEvent.getMaps()) { Integer fetchFailures = job.fetchFailuresMapping.get(mapId); fetchFailures = (fetchFailures == null) ? 1 : (fetchFailures+1); job.fetchFailuresMapping.put(mapId, fetchFailures); float failureRate = shufflingReduceTasks == 0 ? 1.0f : (float) fetchFailures / shufflingReduceTasks; // declare faulty if fetch-failures >= max-allowed-failures if (fetchFailures >= job.getMaxFetchFailuresNotifications() && failureRate >= job.getMaxAllowedFetchFailuresFraction()) { LOG.info("Too many fetch-failures for output of task attempt: " + mapId + " ... raising fetch failure to map"); job.eventHandler.handle(new TaskAttemptEvent(mapId, TaskAttemptEventType.TA_TOO_MANY_FETCH_FAILURE)); job.fetchFailuresMapping.remove(mapId); } } }
private void addAndScheduleAttempt(Avataar avataar) { TaskAttempt attempt = addAttempt(avataar); inProgressAttempts.add(attempt.getID()); //schedule the nextAttemptNumber if (failedAttempts.size() > 0) { eventHandler.handle(new TaskAttemptEvent(attempt.getID(), TaskAttemptEventType.TA_RESCHEDULE)); } else { eventHandler.handle(new TaskAttemptEvent(attempt.getID(), TaskAttemptEventType.TA_SCHEDULE)); } }
@SuppressWarnings("unchecked") void sendContainerLaunchFailedMsg(TaskAttemptId taskAttemptID, String message) { LOG.error(message); context.getEventHandler().handle( new TaskAttemptDiagnosticsUpdateEvent(taskAttemptID, message)); context.getEventHandler().handle( new TaskAttemptEvent(taskAttemptID, TaskAttemptEventType.TA_CONTAINER_LAUNCH_FAILED)); }
@Override public void run() { while (!stopped && !Thread.currentThread().isInterrupted()) { Iterator<Map.Entry<TaskAttemptId, ReportTime>> iterator = runningAttempts.entrySet().iterator(); // avoid calculating current time everytime in loop long currentTime = clock.getTime(); while (iterator.hasNext()) { Map.Entry<TaskAttemptId, ReportTime> entry = iterator.next(); boolean taskTimedOut = (taskTimeOut > 0) && (currentTime > (entry.getValue().getLastProgress() + taskTimeOut)); if(taskTimedOut) { // task is lost, remove from the list and raise lost event iterator.remove(); eventHandler.handle(new TaskAttemptDiagnosticsUpdateEvent(entry .getKey(), "AttemptID:" + entry.getKey().toString() + " Timed out after " + taskTimeOut / 1000 + " secs")); eventHandler.handle(new TaskAttemptEvent(entry.getKey(), TaskAttemptEventType.TA_TIMED_OUT)); } } try { Thread.sleep(taskTimeOutCheckInterval); } catch (InterruptedException e) { LOG.info("TaskHeartbeatHandler thread interrupted"); break; } } }
@VisibleForTesting public TaskAttemptEvent createContainerFinishedEvent(ContainerStatus cont, TaskAttemptId attemptID) { if (cont.getExitStatus() == ContainerExitStatus.ABORTED || cont.getExitStatus() == ContainerExitStatus.PREEMPTED) { // killed by framework return new TaskAttemptEvent(attemptID, TaskAttemptEventType.TA_KILL); } else { return new TaskAttemptEvent(attemptID, TaskAttemptEventType.TA_CONTAINER_COMPLETED); } }
@Test public void testCommitPending() throws Exception { MRApp app = new MRApp(1, 0, false, this.getClass().getName(), true); Job job = app.submit(new Configuration()); app.waitForState(job, JobState.RUNNING); Assert.assertEquals("Num tasks not correct", 1, job.getTasks().size()); Iterator<Task> it = job.getTasks().values().iterator(); Task task = it.next(); app.waitForState(task, TaskState.RUNNING); TaskAttempt attempt = task.getAttempts().values().iterator().next(); app.waitForState(attempt, TaskAttemptState.RUNNING); //send the commit pending signal to the task app.getContext().getEventHandler().handle( new TaskAttemptEvent( attempt.getID(), TaskAttemptEventType.TA_COMMIT_PENDING)); //wait for first attempt to commit pending app.waitForState(attempt, TaskAttemptState.COMMIT_PENDING); //re-send the commit pending signal to the task app.getContext().getEventHandler().handle( new TaskAttemptEvent( attempt.getID(), TaskAttemptEventType.TA_COMMIT_PENDING)); //the task attempt should be still at COMMIT_PENDING app.waitForState(attempt, TaskAttemptState.COMMIT_PENDING); //send the done signal to the task app.getContext().getEventHandler().handle( new TaskAttemptEvent( task.getAttempts().values().iterator().next().getID(), TaskAttemptEventType.TA_DONE)); app.waitForState(job, JobState.SUCCEEDED); }
@Override protected void attemptLaunched(TaskAttemptId attemptID) { getContext().getEventHandler().handle( new TaskAttemptDiagnosticsUpdateEvent(attemptID, "Test Diagnostic Event")); getContext().getEventHandler().handle( new TaskAttemptEvent(attemptID, TaskAttemptEventType.TA_FAILMSG)); }
protected void attemptLaunched(TaskAttemptId attemptID) { if (autoComplete) { // send the done event getContext().getEventHandler().handle( new TaskAttemptEvent(attemptID, TaskAttemptEventType.TA_DONE)); } }
@Test public void testTaskFailWithUnusedContainer() throws Exception { MRApp app = new MRAppWithFailingTaskAndUnusedContainer(); Configuration conf = new Configuration(); int maxAttempts = 1; conf.setInt(MRJobConfig.MAP_MAX_ATTEMPTS, maxAttempts); // disable uberization (requires entire job to be reattempted, so max for // subtask attempts is overridden to 1) conf.setBoolean(MRJobConfig.JOB_UBERTASK_ENABLE, false); Job job = app.submit(conf); app.waitForState(job, JobState.RUNNING); Map<TaskId, Task> tasks = job.getTasks(); Assert.assertEquals("Num tasks is not correct", 1, tasks.size()); Task task = tasks.values().iterator().next(); app.waitForState(task, TaskState.SCHEDULED); Map<TaskAttemptId, TaskAttempt> attempts = tasks.values().iterator() .next().getAttempts(); Assert.assertEquals("Num attempts is not correct", maxAttempts, attempts .size()); TaskAttempt attempt = attempts.values().iterator().next(); app.waitForInternalState((TaskAttemptImpl) attempt, TaskAttemptStateInternal.ASSIGNED); app.getDispatcher().getEventHandler().handle( new TaskAttemptEvent(attempt.getID(), TaskAttemptEventType.TA_CONTAINER_COMPLETED)); app.waitForState(job, JobState.FAILED); }
@Override protected ContainerLauncher createContainerLauncher(AppContext context) { return new ContainerLauncherImpl(context) { @Override public void handle(ContainerLauncherEvent event) { switch (event.getType()) { case CONTAINER_REMOTE_LAUNCH: super.handle(event); // Unused event and container. break; case CONTAINER_REMOTE_CLEANUP: getContext().getEventHandler().handle( new TaskAttemptEvent(event.getTaskAttemptID(), TaskAttemptEventType.TA_CONTAINER_CLEANED)); break; } } @Override public ContainerManagementProtocolProxyData getCMProxy( String containerMgrBindAddr, ContainerId containerId) throws IOException { try { synchronized (this) { wait(); // Just hang the thread simulating a very slow NM. } } catch (InterruptedException e) { e.printStackTrace(); } return null; } }; }
@Override protected void attemptLaunched(TaskAttemptId attemptID) { if (attemptID.getTaskId().getId() == 0) {//check if it is first task // send the Fail event getContext().getEventHandler().handle( new TaskAttemptEvent(attemptID, TaskAttemptEventType.TA_FAILMSG)); } else { getContext().getEventHandler().handle( new TaskAttemptEvent(attemptID, TaskAttemptEventType.TA_DONE)); } }
@Override protected void attemptLaunched(TaskAttemptId attemptID) { if (attemptID.getTaskId().getId() == 0 && attemptID.getId() == 0) { //check if it is first task's first attempt // send the Fail event getContext().getEventHandler().handle( new TaskAttemptEvent(attemptID, TaskAttemptEventType.TA_FAILMSG)); } else { getContext().getEventHandler().handle( new TaskAttemptEvent(attemptID, TaskAttemptEventType.TA_DONE)); } }
private void finishTask(DrainDispatcher rmDispatcher, MockNM node, MRApp mrApp, Task task) throws Exception { TaskAttempt attempt = task.getAttempts().values().iterator().next(); List<ContainerStatus> contStatus = new ArrayList<ContainerStatus>(1); contStatus.add(ContainerStatus.newInstance(attempt.getAssignedContainerID(), ContainerState.COMPLETE, "", 0)); Map<ApplicationId,List<ContainerStatus>> statusUpdate = new HashMap<ApplicationId,List<ContainerStatus>>(1); statusUpdate.put(mrApp.getAppID(), contStatus); node.nodeHeartbeat(statusUpdate, true); rmDispatcher.await(); mrApp.getContext().getEventHandler().handle( new TaskAttemptEvent(attempt.getID(), TaskAttemptEventType.TA_DONE)); mrApp.waitForState(task, TaskState.SUCCEEDED); }
private static TaskAttempt[] makeFirstAttemptWin( EventHandler appEventHandler, Task speculatedTask) { // finish 1st TA, 2nd will be killed Collection<TaskAttempt> attempts = speculatedTask.getAttempts().values(); TaskAttempt[] ta = new TaskAttempt[attempts.size()]; attempts.toArray(ta); appEventHandler.handle( new TaskAttemptEvent(ta[0].getID(), TaskAttemptEventType.TA_DONE)); appEventHandler.handle(new TaskAttemptEvent(ta[0].getID(), TaskAttemptEventType.TA_CONTAINER_CLEANED)); return ta; }
@SuppressWarnings("unchecked") @Override protected void attemptLaunched(TaskAttemptId attemptID) { if (attemptID.getTaskId().getId() == 0 && attemptID.getId() == 0) { getContext().getEventHandler().handle( new TaskAttemptEvent(attemptID, TaskAttemptEventType.TA_FAILMSG)); } else { getContext().getEventHandler().handle( new TaskAttemptEvent(attemptID, TaskAttemptEventType.TA_DONE)); } }
@SuppressWarnings("unchecked") @Override protected void attemptLaunched(TaskAttemptId attemptID) { if (attemptID.getTaskId().getId() == 0) { getContext().getEventHandler().handle( new TaskAttemptEvent(attemptID, TaskAttemptEventType.TA_FAILMSG)); } else { getContext().getEventHandler().handle( new TaskAttemptEvent(attemptID, TaskAttemptEventType.TA_DONE)); } }
@SuppressWarnings("unchecked") @Override protected void attemptLaunched(TaskAttemptId attemptID) { if (attemptID.getTaskId().getId() == 0) { getContext().getEventHandler().handle( new JobEvent(attemptID.getTaskId().getJobId(), JobEventType.JOB_KILL)); } else { getContext().getEventHandler().handle( new TaskAttemptEvent(attemptID, TaskAttemptEventType.TA_DONE)); } }
@Override public void preempted(TaskAttemptID taskAttemptID, TaskStatus taskStatus) throws IOException, InterruptedException { LOG.info("Preempted state update from " + taskAttemptID.toString()); // An attempt is telling us that it got preempted. org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId attemptID = TypeConverter.toYarn(taskAttemptID); preemptionPolicy.reportSuccessfulPreemption(attemptID); taskHeartbeatHandler.progressing(attemptID); context.getEventHandler().handle( new TaskAttemptEvent(attemptID, TaskAttemptEventType.TA_PREEMPTED)); }