/** * TaskAttempt is reporting that it is in commit_pending and it is waiting for * the commit Response * * <br> * Commit it a two-phased protocol. First the attempt informs the * ApplicationMaster that it is * {@link #commitPending(TaskAttemptID, TaskStatus)}. Then it repeatedly polls * the ApplicationMaster whether it {@link #canCommit(TaskAttemptID)} This is * a legacy from the centralized commit protocol handling by the JobTracker. */ @Override public void commitPending(TaskAttemptID taskAttemptID, TaskStatus taskStatsu) throws IOException, InterruptedException { LOG.info("Commit-pending state update from " + taskAttemptID.toString()); // An attempt is asking if it can commit its output. This can be decided // only by the task which is managing the multiple attempts. So redirect the // request there. org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId attemptID = TypeConverter.toYarn(taskAttemptID); taskHeartbeatHandler.progressing(attemptID); //Ignorable TaskStatus? - since a task will send a LastStatusUpdate context.getEventHandler().handle( new TaskAttemptEvent(attemptID, TaskAttemptEventType.TA_COMMIT_PENDING)); }
@SuppressWarnings("unchecked") @Override public void transition(TaskAttemptImpl taskAttempt, TaskAttemptEvent event) { // Tell any speculator that we're requesting a container taskAttempt.eventHandler.handle (new SpeculatorEvent(taskAttempt.getID().getTaskId(), +1)); //request for container if (rescheduled) { taskAttempt.eventHandler.handle( ContainerRequestEvent.createContainerRequestEventForFailedContainer( taskAttempt.attemptId, taskAttempt.resourceCapability)); } else { taskAttempt.eventHandler.handle(new ContainerRequestEvent( taskAttempt.attemptId, taskAttempt.resourceCapability, taskAttempt.dataLocalHosts.toArray( new String[taskAttempt.dataLocalHosts.size()]), taskAttempt.dataLocalRacks.toArray( new String[taskAttempt.dataLocalRacks.size()]))); } }
@SuppressWarnings("unchecked") @Override public void transition(TaskAttemptImpl taskAttempt, TaskAttemptEvent event) { //set the finish time taskAttempt.setFinishTime(); taskAttempt.eventHandler.handle( createJobCounterUpdateEventTASucceeded(taskAttempt)); taskAttempt.logAttemptFinishedEvent(TaskAttemptStateInternal.SUCCEEDED); taskAttempt.eventHandler.handle(new TaskTAttemptEvent( taskAttempt.attemptId, TaskEventType.T_ATTEMPT_SUCCEEDED)); taskAttempt.eventHandler.handle (new SpeculatorEvent (taskAttempt.reportedStatus, taskAttempt.clock.getTime())); }
@SuppressWarnings("unchecked") @Override public void transition(TaskAttemptImpl taskAttempt, TaskAttemptEvent event) { // set the finish time taskAttempt.setFinishTime(); if (taskAttempt.getLaunchTime() != 0) { taskAttempt.eventHandler .handle(createJobCounterUpdateEventTAFailed(taskAttempt, false)); TaskAttemptUnsuccessfulCompletionEvent tauce = createTaskAttemptUnsuccessfulCompletionEvent(taskAttempt, TaskAttemptStateInternal.FAILED); taskAttempt.eventHandler.handle(new JobHistoryEvent( taskAttempt.attemptId.getTaskId().getJobId(), tauce)); // taskAttempt.logAttemptFinishedEvent(TaskAttemptStateInternal.FAILED); Not // handling failed map/reduce events. }else { LOG.debug("Not generating HistoryFinish event since start event not " + "generated for taskAttempt: " + taskAttempt.getID()); } taskAttempt.eventHandler.handle(new TaskTAttemptEvent( taskAttempt.attemptId, TaskEventType.T_ATTEMPT_FAILED)); }
@SuppressWarnings("unchecked") @Override public void transition(TaskAttemptImpl taskAttempt, TaskAttemptEvent event) { // too many fetch failure can only happen for map tasks Preconditions .checkArgument(taskAttempt.getID().getTaskId().getTaskType() == TaskType.MAP); //add to diagnostic taskAttempt.addDiagnosticInfo("Too Many fetch failures.Failing the attempt"); if (taskAttempt.getLaunchTime() != 0) { taskAttempt.eventHandler .handle(createJobCounterUpdateEventTAFailed(taskAttempt, true)); TaskAttemptUnsuccessfulCompletionEvent tauce = createTaskAttemptUnsuccessfulCompletionEvent(taskAttempt, TaskAttemptStateInternal.FAILED); taskAttempt.eventHandler.handle(new JobHistoryEvent( taskAttempt.attemptId.getTaskId().getJobId(), tauce)); }else { LOG.debug("Not generating HistoryFinish event since start event not " + "generated for taskAttempt: " + taskAttempt.getID()); } taskAttempt.eventHandler.handle(new TaskTAttemptEvent( taskAttempt.attemptId, TaskEventType.T_ATTEMPT_FAILED)); }
@SuppressWarnings("unchecked") @Override public void transition(TaskAttemptImpl taskAttempt, TaskAttemptEvent event) { // unregister it to TaskAttemptListener so that it stops listening // for it taskAttempt.taskAttemptListener.unregister( taskAttempt.attemptId, taskAttempt.jvmID); if (event instanceof TaskAttemptKillEvent) { taskAttempt.addDiagnosticInfo( ((TaskAttemptKillEvent) event).getMessage()); } taskAttempt.reportedStatus.progress = 1.0f; taskAttempt.updateProgressSplits(); //send the cleanup event to containerLauncher taskAttempt.eventHandler.handle(new ContainerLauncherEvent( taskAttempt.attemptId, taskAttempt.container.getId(), StringInterner .weakIntern(taskAttempt.container.getNodeId().toString()), taskAttempt.container.getContainerToken(), ContainerLauncher.EventType.CONTAINER_REMOTE_CLEANUP)); }
@SuppressWarnings("unchecked") @Override public KillTaskAttemptResponse killTaskAttempt( KillTaskAttemptRequest request) throws IOException { TaskAttemptId taskAttemptId = request.getTaskAttemptId(); UserGroupInformation callerUGI = UserGroupInformation.getCurrentUser(); String message = "Kill task attempt " + taskAttemptId + " received from " + callerUGI + " at " + Server.getRemoteAddress(); LOG.info(message); verifyAndGetAttempt(taskAttemptId, JobACL.MODIFY_JOB); appContext.getEventHandler().handle( new TaskAttemptDiagnosticsUpdateEvent(taskAttemptId, message)); appContext.getEventHandler().handle( new TaskAttemptEvent(taskAttemptId, TaskAttemptEventType.TA_KILL)); KillTaskAttemptResponse response = recordFactory.newRecordInstance(KillTaskAttemptResponse.class); return response; }
@SuppressWarnings("unchecked") @Override public FailTaskAttemptResponse failTaskAttempt( FailTaskAttemptRequest request) throws IOException { TaskAttemptId taskAttemptId = request.getTaskAttemptId(); UserGroupInformation callerUGI = UserGroupInformation.getCurrentUser(); String message = "Fail task attempt " + taskAttemptId + " received from " + callerUGI + " at " + Server.getRemoteAddress(); LOG.info(message); verifyAndGetAttempt(taskAttemptId, JobACL.MODIFY_JOB); appContext.getEventHandler().handle( new TaskAttemptDiagnosticsUpdateEvent(taskAttemptId, message)); appContext.getEventHandler().handle( new TaskAttemptEvent(taskAttemptId, TaskAttemptEventType.TA_FAILMSG)); FailTaskAttemptResponse response = recordFactory. newRecordInstance(FailTaskAttemptResponse.class); return response; }
@Override protected void dispatch(Event event) { if (event instanceof TaskAttemptEvent) { TaskAttemptEvent attemptEvent = (TaskAttemptEvent) event; TaskAttemptId attemptID = ((TaskAttemptEvent) event).getTaskAttemptID(); if (attemptEvent.getType() == this.attemptEventTypeToWait && attemptID.getTaskId().getId() == 0 && attemptID.getId() == 0 ) { try { latch.await(); } catch (InterruptedException e) { e.printStackTrace(); } } } super.dispatch(event); }
@Override protected void attemptLaunched(TaskAttemptId attemptID) { if (attemptID.getTaskId().getId() == 0 && attemptID.getId() == 0) { //this blocks the first task's first attempt //the subsequent ones are completed try { latch.await(); } catch (InterruptedException e) { e.printStackTrace(); } } else { getContext().getEventHandler().handle( new TaskAttemptEvent(attemptID, TaskAttemptEventType.TA_DONE)); } }
@Override public void handle(ContainerLauncherEvent event) { switch (event.getType()) { case CONTAINER_REMOTE_LAUNCH: getContext().getEventHandler().handle( new TaskAttemptContainerLaunchedEvent(event.getTaskAttemptID(), shufflePort)); attemptLaunched(event.getTaskAttemptID()); break; case CONTAINER_REMOTE_CLEANUP: getContext().getEventHandler().handle( new TaskAttemptEvent(event.getTaskAttemptID(), TaskAttemptEventType.TA_CONTAINER_CLEANED)); break; } }
@Override public void fatalError(TaskAttemptID taskAttemptID, String msg) throws IOException { // This happens only in Child and in the Task. LOG.fatal("Task: " + taskAttemptID + " - exited : " + msg); reportDiagnosticInfo(taskAttemptID, "Error: " + msg); org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId attemptID = TypeConverter.toYarn(taskAttemptID); // handling checkpoints preemptionPolicy.handleFailedContainer(attemptID); context.getEventHandler().handle( new TaskAttemptEvent(attemptID, TaskAttemptEventType.TA_FAILMSG)); }
@SuppressWarnings("unchecked") @Override public TaskAttemptStateInternal transition(TaskAttemptImpl taskAttempt, TaskAttemptEvent event) { taskAttempt.appContext.getTaskAttemptFinishingMonitor().unregister( taskAttempt.attemptId); sendContainerCleanup(taskAttempt, event); if(taskAttempt.getID().getTaskId().getTaskType() == TaskType.REDUCE) { // after a reduce task has succeeded, its outputs are in safe in HDFS. // logically such a task should not be killed. we only come here when // there is a race condition in the event queue. E.g. some logic sends // a kill request to this attempt when the successful completion event // for this task is already in the event queue. so the kill event will // get executed immediately after the attempt is marked successful and // result in this transition being exercised. // ignore this for reduce tasks LOG.info("Ignoring killed event for successful reduce task attempt" + taskAttempt.getID().toString()); return TaskAttemptStateInternal.SUCCESS_CONTAINER_CLEANUP; } else { return TaskAttemptStateInternal.KILL_CONTAINER_CLEANUP; } }
@SuppressWarnings("unchecked") @Override public void transition(TaskAttemptImpl taskAttempt, TaskAttemptEvent event) { taskAttempt.setFinishTime(); taskAttempt.taskAttemptListener.unregister( taskAttempt.attemptId, taskAttempt.jvmID); taskAttempt.eventHandler.handle(new ContainerLauncherEvent( taskAttempt.attemptId, taskAttempt.getAssignedContainerID(), taskAttempt.getAssignedContainerMgrAddress(), taskAttempt.container.getContainerToken(), ContainerLauncher.EventType.CONTAINER_REMOTE_CLEANUP)); taskAttempt.eventHandler.handle(new TaskTAttemptEvent( taskAttempt.attemptId, TaskEventType.T_ATTEMPT_KILLED)); }
@SuppressWarnings("unchecked") @Override public FailTaskAttemptResponse failTaskAttempt( FailTaskAttemptRequest request) throws IOException { TaskAttemptId taskAttemptId = request.getTaskAttemptId(); UserGroupInformation callerUGI = UserGroupInformation.getCurrentUser(); String message = "Fail task attempt " + taskAttemptId + " received from " + callerUGI + " at " + Server.getRemoteAddress(); LOG.info(message); verifyAndGetAttempt(taskAttemptId, JobACL.MODIFY_JOB); appContext.getEventHandler().handle( new TaskAttemptDiagnosticsUpdateEvent(taskAttemptId, message)); appContext.getEventHandler().handle( new TaskAttemptEvent(taskAttemptId, TaskAttemptEventType.TA_FAILMSG_BY_CLIENT)); FailTaskAttemptResponse response = recordFactory. newRecordInstance(FailTaskAttemptResponse.class); return response; }
@Test public void testTaskAttemptDiagnosticEventOnFinishing() throws Exception { MockEventHandler eventHandler = new MockEventHandler(); TaskAttemptImpl taImpl = createTaskAttemptImpl(eventHandler); taImpl.handle(new TaskAttemptEvent(taImpl.getID(), TaskAttemptEventType.TA_DONE)); assertEquals("Task attempt is not in RUNNING state", taImpl.getState(), TaskAttemptState.SUCCEEDED); assertEquals("Task attempt's internal state is not " + "SUCCESS_FINISHING_CONTAINER", taImpl.getInternalState(), TaskAttemptStateInternal.SUCCESS_FINISHING_CONTAINER); // TA_DIAGNOSTICS_UPDATE doesn't change state taImpl.handle(new TaskAttemptDiagnosticsUpdateEvent(taImpl.getID(), "Task got updated")); assertEquals("Task attempt is not in RUNNING state", taImpl.getState(), TaskAttemptState.SUCCEEDED); assertEquals("Task attempt's internal state is not " + "SUCCESS_FINISHING_CONTAINER", taImpl.getInternalState(), TaskAttemptStateInternal.SUCCESS_FINISHING_CONTAINER); assertFalse("InternalError occurred", eventHandler.internalError); }
@Test public void testTimeoutWhileSuccessFinishing() throws Exception { MockEventHandler eventHandler = new MockEventHandler(); TaskAttemptImpl taImpl = createTaskAttemptImpl(eventHandler); taImpl.handle(new TaskAttemptEvent(taImpl.getID(), TaskAttemptEventType.TA_DONE)); assertEquals("Task attempt is not in RUNNING state", taImpl.getState(), TaskAttemptState.SUCCEEDED); assertEquals("Task attempt's internal state is not " + "SUCCESS_FINISHING_CONTAINER", taImpl.getInternalState(), TaskAttemptStateInternal.SUCCESS_FINISHING_CONTAINER); // If the task stays in SUCCESS_FINISHING_CONTAINER for too long, // TaskAttemptListenerImpl will time out the attempt. taImpl.handle(new TaskAttemptEvent(taImpl.getID(), TaskAttemptEventType.TA_TIMED_OUT)); assertEquals("Task attempt is not in RUNNING state", taImpl.getState(), TaskAttemptState.SUCCEEDED); assertEquals("Task attempt's internal state is not " + "SUCCESS_CONTAINER_CLEANUP", taImpl.getInternalState(), TaskAttemptStateInternal.SUCCESS_CONTAINER_CLEANUP); assertFalse("InternalError occurred", eventHandler.internalError); }
@Override public void handle(ContainerLauncherEvent event) { switch (event.getType()) { case CONTAINER_REMOTE_LAUNCH: containerLaunched(event.getTaskAttemptID(), shufflePort); attemptLaunched(event.getTaskAttemptID()); break; case CONTAINER_REMOTE_CLEANUP: getContext().getEventHandler().handle( new TaskAttemptEvent(event.getTaskAttemptID(), TaskAttemptEventType.TA_CONTAINER_CLEANED)); break; case CONTAINER_COMPLETED: break; } }
@Override public void done(TaskAttemptID taskAttemptID) throws IOException { LOG.info("Done acknowledgement from " + taskAttemptID.toString()); org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId attemptID = TypeConverter.toYarn(taskAttemptID); taskHeartbeatHandler.progressing(attemptID); context.getEventHandler().handle( new TaskAttemptEvent(attemptID, TaskAttemptEventType.TA_DONE)); }
@Override public void fatalError(TaskAttemptID taskAttemptID, String msg) throws IOException { // This happens only in Child and in the Task. LOG.fatal("Task: " + taskAttemptID + " - exited : " + msg); reportDiagnosticInfo(taskAttemptID, "Error: " + msg); org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId attemptID = TypeConverter.toYarn(taskAttemptID); context.getEventHandler().handle( new TaskAttemptEvent(attemptID, TaskAttemptEventType.TA_FAILMSG)); }
@Override public void fsError(TaskAttemptID taskAttemptID, String message) throws IOException { // This happens only in Child. LOG.fatal("Task: " + taskAttemptID + " - failed due to FSError: " + message); reportDiagnosticInfo(taskAttemptID, "FSError: " + message); org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId attemptID = TypeConverter.toYarn(taskAttemptID); context.getEventHandler().handle( new TaskAttemptEvent(attemptID, TaskAttemptEventType.TA_FAILMSG)); }
@SuppressWarnings("unchecked") protected void handleTaskAbort(CommitterTaskAbortEvent event) { try { committer.abortTask(event.getAttemptContext()); } catch (Exception e) { LOG.warn("Task cleanup failed for attempt " + event.getAttemptID(), e); } context.getEventHandler().handle( new TaskAttemptEvent(event.getAttemptID(), TaskAttemptEventType.TA_CLEANUP_DONE)); }
@SuppressWarnings("unchecked") @Override public void handle(TaskAttemptEvent event) { if (LOG.isDebugEnabled()) { LOG.debug("Processing " + event.getTaskAttemptID() + " of type " + event.getType()); } writeLock.lock(); try { final TaskAttemptStateInternal oldState = getInternalState() ; try { stateMachine.doTransition(event.getType(), event); } catch (InvalidStateTransitonException e) { LOG.error("Can't handle this event at current state for " + this.attemptId, e); eventHandler.handle(new JobDiagnosticsUpdateEvent( this.attemptId.getTaskId().getJobId(), "Invalid event " + event.getType() + " on TaskAttempt " + this.attemptId)); eventHandler.handle(new JobEvent(this.attemptId.getTaskId().getJobId(), JobEventType.INTERNAL_ERROR)); } if (oldState != getInternalState()) { LOG.info(attemptId + " TaskAttempt Transitioned from " + oldState + " to " + getInternalState()); } } finally { writeLock.unlock(); } }
@SuppressWarnings({ "unchecked" }) @Override public void transition(final TaskAttemptImpl taskAttempt, TaskAttemptEvent event) { final TaskAttemptContainerAssignedEvent cEvent = (TaskAttemptContainerAssignedEvent) event; Container container = cEvent.getContainer(); taskAttempt.container = container; // this is a _real_ Task (classic Hadoop mapred flavor): taskAttempt.remoteTask = taskAttempt.createRemoteTask(); taskAttempt.jvmID = new WrappedJvmID(taskAttempt.remoteTask.getTaskID().getJobID(), taskAttempt.remoteTask.isMapTask(), taskAttempt.container.getId().getContainerId()); taskAttempt.taskAttemptListener.registerPendingTask( taskAttempt.remoteTask, taskAttempt.jvmID); taskAttempt.computeRackAndLocality(); //launch the container //create the container object to be launched for a given Task attempt ContainerLaunchContext launchContext = createContainerLaunchContext( cEvent.getApplicationACLs(), taskAttempt.conf, taskAttempt.jobToken, taskAttempt.remoteTask, taskAttempt.oldJobId, taskAttempt.jvmID, taskAttempt.taskAttemptListener, taskAttempt.credentials); taskAttempt.eventHandler .handle(new ContainerRemoteLaunchEvent(taskAttempt.attemptId, launchContext, container, taskAttempt.remoteTask)); // send event to speculator that our container needs are satisfied taskAttempt.eventHandler.handle (new SpeculatorEvent(taskAttempt.getID().getTaskId(), -1)); }
@SuppressWarnings("unchecked") @Override public void transition(TaskAttemptImpl taskAttempt, TaskAttemptEvent evnt) { TaskAttemptContainerLaunchedEvent event = (TaskAttemptContainerLaunchedEvent) evnt; //set the launch time taskAttempt.launchTime = taskAttempt.clock.getTime(); taskAttempt.shufflePort = event.getShufflePort(); // register it to TaskAttemptListener so that it can start monitoring it. taskAttempt.taskAttemptListener .registerLaunchedTask(taskAttempt.attemptId, taskAttempt.jvmID); //TODO Resolve to host / IP in case of a local address. InetSocketAddress nodeHttpInetAddr = // TODO: Costly to create sock-addr? NetUtils.createSocketAddr(taskAttempt.container.getNodeHttpAddress()); taskAttempt.trackerName = nodeHttpInetAddr.getHostName(); taskAttempt.httpPort = nodeHttpInetAddr.getPort(); taskAttempt.sendLaunchedEvents(); taskAttempt.eventHandler.handle (new SpeculatorEvent (taskAttempt.attemptId, true, taskAttempt.clock.getTime())); //make remoteTask reference as null as it is no more needed //and free up the memory taskAttempt.remoteTask = null; //tell the Task that attempt has started taskAttempt.eventHandler.handle(new TaskTAttemptEvent( taskAttempt.attemptId, TaskEventType.T_ATTEMPT_LAUNCHED)); }
@SuppressWarnings("unchecked") @Override public void transition(TaskAttemptImpl taskAttempt, TaskAttemptEvent event) { taskAttempt.eventHandler.handle(new TaskTAttemptEvent( taskAttempt.attemptId, TaskEventType.T_ATTEMPT_COMMIT_PENDING)); }
@SuppressWarnings("unchecked") @Override public void transition(TaskAttemptImpl taskAttempt, TaskAttemptEvent event) { TaskAttemptContext taskContext = new TaskAttemptContextImpl(taskAttempt.conf, TypeConverter.fromYarn(taskAttempt.attemptId)); taskAttempt.eventHandler.handle(new CommitterTaskAbortEvent( taskAttempt.attemptId, taskContext)); }
@Override public TaskAttemptStateInternal transition(TaskAttemptImpl taskAttempt, TaskAttemptEvent event) { TaskAttemptRecoverEvent tare = (TaskAttemptRecoverEvent) event; return taskAttempt.recover(tare.getTaskAttemptInfo(), tare.getCommitter(), tare.getRecoverOutput()); }
@SuppressWarnings("unchecked") @Override public TaskAttemptStateInternal transition(TaskAttemptImpl taskAttempt, TaskAttemptEvent event) { if(taskAttempt.getID().getTaskId().getTaskType() == TaskType.REDUCE) { // after a reduce task has succeeded, its outputs are in safe in HDFS. // logically such a task should not be killed. we only come here when // there is a race condition in the event queue. E.g. some logic sends // a kill request to this attempt when the successful completion event // for this task is already in the event queue. so the kill event will // get executed immediately after the attempt is marked successful and // result in this transition being exercised. // ignore this for reduce tasks LOG.info("Ignoring killed event for successful reduce task attempt" + taskAttempt.getID().toString()); return TaskAttemptStateInternal.SUCCEEDED; } if(event instanceof TaskAttemptKillEvent) { TaskAttemptKillEvent msgEvent = (TaskAttemptKillEvent) event; //add to diagnostic taskAttempt.addDiagnosticInfo(msgEvent.getMessage()); } // not setting a finish time since it was set on success assert (taskAttempt.getFinishTime() != 0); assert (taskAttempt.getLaunchTime() != 0); taskAttempt.eventHandler .handle(createJobCounterUpdateEventTAKilled(taskAttempt, true)); TaskAttemptUnsuccessfulCompletionEvent tauce = createTaskAttemptUnsuccessfulCompletionEvent( taskAttempt, TaskAttemptStateInternal.KILLED); taskAttempt.eventHandler.handle(new JobHistoryEvent(taskAttempt.attemptId .getTaskId().getJobId(), tauce)); taskAttempt.eventHandler.handle(new TaskTAttemptEvent( taskAttempt.attemptId, TaskEventType.T_ATTEMPT_KILLED)); return TaskAttemptStateInternal.KILLED; }
@SuppressWarnings("unchecked") @Override public void transition(TaskAttemptImpl taskAttempt, TaskAttemptEvent event) { //set the finish time taskAttempt.setFinishTime(); if (taskAttempt.getLaunchTime() != 0) { taskAttempt.eventHandler .handle(createJobCounterUpdateEventTAKilled(taskAttempt, false)); TaskAttemptUnsuccessfulCompletionEvent tauce = createTaskAttemptUnsuccessfulCompletionEvent(taskAttempt, TaskAttemptStateInternal.KILLED); taskAttempt.eventHandler.handle(new JobHistoryEvent( taskAttempt.attemptId.getTaskId().getJobId(), tauce)); }else { LOG.debug("Not generating HistoryFinish event since start event not " + "generated for taskAttempt: " + taskAttempt.getID()); } if (event instanceof TaskAttemptKillEvent) { taskAttempt.addDiagnosticInfo( ((TaskAttemptKillEvent) event).getMessage()); } // taskAttempt.logAttemptFinishedEvent(TaskAttemptStateInternal.KILLED); Not logging Map/Reduce attempts in case of failure. taskAttempt.eventHandler.handle(new TaskTAttemptEvent( taskAttempt.attemptId, TaskEventType.T_ATTEMPT_KILLED)); }
@SuppressWarnings("unchecked") @Override public void transition(TaskAttemptImpl taskAttempt, TaskAttemptEvent event) { // Status update calls don't really change the state of the attempt. TaskAttemptStatus newReportedStatus = ((TaskAttemptStatusUpdateEvent) event) .getReportedTaskAttemptStatus(); // Now switch the information in the reportedStatus taskAttempt.reportedStatus = newReportedStatus; taskAttempt.reportedStatus.taskState = taskAttempt.getState(); // send event to speculator about the reported status taskAttempt.eventHandler.handle (new SpeculatorEvent (taskAttempt.reportedStatus, taskAttempt.clock.getTime())); taskAttempt.updateProgressSplits(); //if fetch failures are present, send the fetch failure event to job //this only will happen in reduce attempt type if (taskAttempt.reportedStatus.fetchFailedMaps != null && taskAttempt.reportedStatus.fetchFailedMaps.size() > 0) { taskAttempt.eventHandler.handle(new JobTaskAttemptFetchFailureEvent( taskAttempt.attemptId, taskAttempt.reportedStatus.fetchFailedMaps)); } }