@Override public void transition(TaskImpl task, TaskEvent event) { TaskAttemptId taskAttemptId = ((TaskTAttemptEvent) event).getTaskAttemptID(); task.handleTaskAttemptCompletion( taskAttemptId, TaskAttemptCompletionEventStatus.KILLED); task.finishedAttempts.add(taskAttemptId); task.inProgressAttempts.remove(taskAttemptId); if (task.successfulAttempt == null) { task.addAndScheduleAttempt(Avataar.VIRGIN); } if ((task.commitAttempt != null) && (task.commitAttempt == taskAttemptId)) { task.commitAttempt = null; } }
@Override public void transition(TaskImpl task, TaskEvent event) { TaskAttemptId taskAttemptId = ((TaskTAttemptEvent) event).getTaskAttemptID(); task.handleTaskAttemptCompletion( taskAttemptId, TaskAttemptCompletionEventStatus.KILLED); task.finishedAttempts.add(taskAttemptId); task.inProgressAttempts.remove(taskAttemptId); if (task.successfulAttempt == null) { boolean rescheduleNewAttempt = false; if (event instanceof TaskTAttemptKilledEvent) { rescheduleNewAttempt = ((TaskTAttemptKilledEvent)event).getRescheduleAttempt(); } task.addAndScheduleAttempt(Avataar.VIRGIN, rescheduleNewAttempt); } if ((task.commitAttempt != null) && (task.commitAttempt == taskAttemptId)) { task.commitAttempt = null; } }
private void addAndScheduleAttempt(Avataar avataar) { TaskAttempt attempt = addAttempt(avataar); inProgressAttempts.add(attempt.getID()); //schedule the nextAttemptNumber if (failedAttempts.size() > 0) { eventHandler.handle(new TaskAttemptEvent(attempt.getID(), TaskAttemptEventType.TA_RESCHEDULE)); } else { eventHandler.handle(new TaskAttemptEvent(attempt.getID(), TaskAttemptEventType.TA_SCHEDULE)); } }
private TaskAttemptImpl addAttempt(Avataar avataar) { TaskAttemptImpl attempt = createAttempt(); attempt.setAvataar(avataar); if (LOG.isDebugEnabled()) { LOG.debug("Created attempt " + attempt.getID()); } switch (attempts.size()) { case 0: attempts = Collections.singletonMap(attempt.getID(), (TaskAttempt) attempt); break; case 1: Map<TaskAttemptId, TaskAttempt> newAttempts = new LinkedHashMap<TaskAttemptId, TaskAttempt>(maxAttempts); newAttempts.putAll(attempts); attempts = newAttempts; attempts.put(attempt.getID(), attempt); break; default: attempts.put(attempt.getID(), attempt); break; } ++nextAttemptNumber; return attempt; }
@Override public TaskStateInternal transition(TaskImpl task, TaskEvent event) { TaskAttemptId attemptId = null; if (event instanceof TaskTAttemptEvent) { TaskTAttemptEvent castEvent = (TaskTAttemptEvent) event; attemptId = castEvent.getTaskAttemptID(); if (task.getInternalState() == TaskStateInternal.SUCCEEDED && !attemptId.equals(task.successfulAttempt)) { // don't allow a different task attempt to override a previous // succeeded state task.finishedAttempts.add(castEvent.getTaskAttemptID()); task.inProgressAttempts.remove(castEvent.getTaskAttemptID()); return TaskStateInternal.SUCCEEDED; } } // a successful REDUCE task should not be overridden // TODO: consider moving it to MapTaskImpl if (!TaskType.MAP.equals(task.getType())) { LOG.error("Unexpected event for REDUCE task " + event.getType()); task.internalError(event.getType()); } // successful attempt is now killed. reschedule // tell the job about the rescheduling unSucceed(task); task.handleTaskAttemptCompletion(attemptId, TaskAttemptCompletionEventStatus.KILLED); task.eventHandler.handle(new JobMapTaskRescheduledEvent(task.taskId)); // typically we are here because this map task was run on a bad node and // we want to reschedule it on a different node. // Depending on whether there are previous failed attempts or not this // can SCHEDULE or RESCHEDULE the container allocate request. If this // SCHEDULE's then the dataLocal hosts of this taskAttempt will be used // from the map splitInfo. So the bad node might be sent as a location // to the RM. But the RM would ignore that just like it would ignore // currently pending container requests affinitized to bad nodes. task.addAndScheduleAttempt(Avataar.VIRGIN); return TaskStateInternal.SCHEDULED; }
/** * {@link Avataar} */ private void assertTaskAttemptAvataar(Avataar avataar) { for (TaskAttempt taskAttempt : mockTask.getAttempts().values()) { if (((TaskAttemptImpl) taskAttempt).getAvataar() == avataar) { return; } } fail("There is no " + (avataar == Avataar.VIRGIN ? "virgin" : "speculative") + "task attempt"); }
private void runSpeculativeTaskAttemptSucceeds( TaskEventType firstAttemptFinishEvent) { TaskId taskId = getNewTaskID(); scheduleTaskAttempt(taskId); launchTaskAttempt(getLastAttempt().getAttemptId()); updateLastAttemptState(TaskAttemptState.RUNNING); // Add a speculative task attempt that succeeds mockTask.handle(new TaskTAttemptEvent(getLastAttempt().getAttemptId(), TaskEventType.T_ADD_SPEC_ATTEMPT)); launchTaskAttempt(getLastAttempt().getAttemptId()); commitTaskAttempt(getLastAttempt().getAttemptId()); mockTask.handle(new TaskTAttemptEvent(getLastAttempt().getAttemptId(), TaskEventType.T_ATTEMPT_SUCCEEDED)); // The task should now have succeeded assertTaskSucceededState(); // Now complete the first task attempt, after the second has succeeded mockTask.handle(new TaskTAttemptEvent(taskAttempts.get(0).getAttemptId(), firstAttemptFinishEvent)); // The task should still be in the succeeded state assertTaskSucceededState(); // The task should contain speculative a task attempt assertTaskAttemptAvataar(Avataar.SPECULATIVE); }