@SuppressWarnings("unchecked") @Override public void transition(TaskAttemptImpl taskAttempt, TaskAttemptEvent event) { // unregister it to TaskAttemptListener so that it stops listening // for it taskAttempt.taskAttemptListener.unregister( taskAttempt.attemptId, taskAttempt.jvmID); if (event instanceof TaskAttemptKillEvent) { taskAttempt.addDiagnosticInfo( ((TaskAttemptKillEvent) event).getMessage()); } taskAttempt.reportedStatus.progress = 1.0f; taskAttempt.updateProgressSplits(); //send the cleanup event to containerLauncher taskAttempt.eventHandler.handle(new ContainerLauncherEvent( taskAttempt.attemptId, taskAttempt.container.getId(), StringInterner .weakIntern(taskAttempt.container.getNodeId().toString()), taskAttempt.container.getContainerToken(), ContainerLauncher.EventType.CONTAINER_REMOTE_CLEANUP)); }
private void actOnUnusableNode(NodeId nodeId, NodeState nodeState) { // rerun previously successful map tasks List<TaskAttemptId> taskAttemptIdList = nodesToSucceededTaskAttempts.get(nodeId); if(taskAttemptIdList != null) { String mesg = "TaskAttempt killed because it ran on unusable node " + nodeId; for(TaskAttemptId id : taskAttemptIdList) { if(TaskType.MAP == id.getTaskId().getTaskType()) { // reschedule only map tasks because their outputs maybe unusable LOG.info(mesg + ". AttemptId:" + id); eventHandler.handle(new TaskAttemptKillEvent(id, mesg)); } } } // currently running task attempts on unusable nodes are handled in // RMContainerAllocator }
@Override public void transition(TaskImpl task, TaskEvent event) { TaskTAttemptEvent ev = (TaskTAttemptEvent) event; // The nextAttemptNumber is commit pending, decide on set the commitAttempt TaskAttemptId attemptID = ev.getTaskAttemptID(); if (task.commitAttempt == null) { // TODO: validate attemptID task.commitAttempt = attemptID; LOG.info(attemptID + " given a go for committing the task output."); } else { // Don't think this can be a pluggable decision, so simply raise an // event for the TaskAttempt to delete its output. LOG.info(task.commitAttempt + " already given a go for committing the task output, so killing " + attemptID); task.eventHandler.handle(new TaskAttemptKillEvent(attemptID, SPECULATION + task.commitAttempt + " committed first!")); } }
@Override public void transition(TaskImpl task, TaskEvent event) { TaskTAttemptEvent taskTAttemptEvent = (TaskTAttemptEvent) event; TaskAttemptId taskAttemptId = taskTAttemptEvent.getTaskAttemptID(); task.handleTaskAttemptCompletion( taskAttemptId, TaskAttemptCompletionEventStatus.SUCCEEDED); task.finishedAttempts.add(taskAttemptId); task.inProgressAttempts.remove(taskAttemptId); task.successfulAttempt = taskAttemptId; task.sendTaskSucceededEvents(); for (TaskAttempt attempt : task.attempts.values()) { if (attempt.getID() != task.successfulAttempt && // This is okay because it can only talk us out of sending a // TA_KILL message to an attempt that doesn't need one for // other reasons. !attempt.isFinished()) { LOG.info("Issuing kill to other attempt " + attempt.getID()); task.eventHandler.handle(new TaskAttemptKillEvent(attempt.getID(), SPECULATION + task.successfulAttempt + " succeeded first!")); } } task.finished(TaskStateInternal.SUCCEEDED); }
@SuppressWarnings("unchecked") void preemptReduce(int toPreempt) { List<TaskAttemptId> reduceList = new ArrayList<TaskAttemptId> (reduces.keySet()); //sort reduces on progress Collections.sort(reduceList, new Comparator<TaskAttemptId>() { @Override public int compare(TaskAttemptId o1, TaskAttemptId o2) { return Float.compare( getJob().getTask(o1.getTaskId()).getAttempt(o1).getProgress(), getJob().getTask(o2.getTaskId()).getAttempt(o2).getProgress()); } }); for (int i = 0; i < toPreempt && reduceList.size() > 0; i++) { TaskAttemptId id = reduceList.remove(0);//remove the one on top LOG.info("Preempting " + id); preemptionWaitingReduces.add(id); eventHandler.handle(new TaskAttemptKillEvent(id, RAMPDOWN_DIAGNOSTIC)); } }
private static AppContext createAppContext( ApplicationAttemptId appAttemptId, Job job) { AppContext context = mock(AppContext.class); ApplicationId appId = appAttemptId.getApplicationId(); when(context.getApplicationID()).thenReturn(appId); when(context.getApplicationAttemptId()).thenReturn(appAttemptId); when(context.getJob(isA(JobId.class))).thenReturn(job); when(context.getClusterInfo()).thenReturn( new ClusterInfo(Resource.newInstance(10240, 1))); when(context.getEventHandler()).thenReturn(new EventHandler() { @Override public void handle(Event event) { // Only capture interesting events. if (event instanceof TaskAttemptContainerAssignedEvent) { events.add((TaskAttemptContainerAssignedEvent) event); } else if (event instanceof TaskAttemptKillEvent) { taskAttemptKillEvents.add((TaskAttemptKillEvent)event); } else if (event instanceof JobUpdatedNodesEvent) { jobUpdatedNodeEvents.add((JobUpdatedNodesEvent)event); } else if (event instanceof JobEvent) { jobEvents.add((JobEvent)event); } } }); return context; }
private void actOnUnusableNode(NodeId nodeId, NodeState nodeState) { // rerun previously successful map tasks // do this only if the job is still in the running state and there are // running reducers if (getInternalState() == JobStateInternal.RUNNING && !allReducersComplete()) { List<TaskAttemptId> taskAttemptIdList = nodesToSucceededTaskAttempts.get(nodeId); if (taskAttemptIdList != null) { String mesg = "TaskAttempt killed because it ran on unusable node " + nodeId; for (TaskAttemptId id : taskAttemptIdList) { if (TaskType.MAP == id.getTaskId().getTaskType()) { // reschedule only map tasks because their outputs maybe unusable LOG.info(mesg + ". AttemptId:" + id); eventHandler.handle(new TaskAttemptKillEvent(id, mesg)); } } } } // currently running task attempts on unusable nodes are handled in // RMContainerAllocator }
private static AppContext createAppContext( ApplicationAttemptId appAttemptId, Job job) { AppContext context = mock(AppContext.class); ApplicationId appId = appAttemptId.getApplicationId(); when(context.getApplicationID()).thenReturn(appId); when(context.getApplicationAttemptId()).thenReturn(appAttemptId); when(context.getJob(isA(JobId.class))).thenReturn(job); when(context.getClusterInfo()).thenReturn( new ClusterInfo(Resource.newInstance(10240, 1))); when(context.getEventHandler()).thenReturn(new EventHandler() { @Override public void handle(Event event) { // Only capture interesting events. if (event instanceof TaskAttemptContainerAssignedEvent) { events.add((TaskAttemptContainerAssignedEvent) event); } else if (event instanceof TaskAttemptKillEvent) { taskAttemptKillEvents.add((TaskAttemptKillEvent)event); } else if (event instanceof JobUpdatedNodesEvent) { jobUpdatedNodeEvents.add((JobUpdatedNodesEvent)event); } } }); return context; }
@Override public void transition(TaskImpl task, TaskEvent event) { TaskTAttemptEvent taskTAttemptEvent = (TaskTAttemptEvent) event; TaskAttemptId taskAttemptId = taskTAttemptEvent.getTaskAttemptID(); task.handleTaskAttemptCompletion( taskAttemptId, TaskAttemptCompletionEventStatus.SUCCEEDED); task.finishedAttempts.add(taskAttemptId); task.inProgressAttempts.remove(taskAttemptId); task.successfulAttempt = taskAttemptId; for (TaskAttempt attempt : task.attempts.values()) { if (attempt.getID() != task.successfulAttempt && // This is okay because it can only talk us out of sending a // TA_KILL message to an attempt that doesn't need one for // other reasons. !attempt.isFinished()) { LOG.info("Issuing kill to other attempt " + attempt.getID()); task.eventHandler.handle(new TaskAttemptKillEvent(attempt.getID(), SPECULATION + task.successfulAttempt + " succeeded first!")); } } task.finished(TaskStateInternal.SUCCEEDED); task.sendTaskSucceededEvents(); }
@SuppressWarnings("unchecked") void preemptReduce(int toPreempt) { List<TaskAttemptId> reduceList = new ArrayList<TaskAttemptId> (reduces.keySet()); //sort reduces on progress Collections.sort(reduceList, new Comparator<TaskAttemptId>() { @Override public int compare(TaskAttemptId o1, TaskAttemptId o2) { return Float.compare( getJob().getTask(o1.getTaskId()).getAttempt(o1).getProgress(), getJob().getTask(o2.getTaskId()).getAttempt(o2).getProgress()); } }); for (int i = 0; i < toPreempt && reduceList.size() > 0; i++) { TaskAttemptId id = reduceList.remove(0);//remove the one on top LOG.info("Preempting " + id); preemptionWaitingReduces.add(id); //record the id of preemption task eventHandler.handle(new TaskAttemptKillEvent(id, RAMPDOWN_DIAGNOSTIC)); //kill this task } }
@SuppressWarnings("unchecked") @Override public void transition(TaskAttemptImpl taskAttempt, TaskAttemptEvent event) { if (taskAttempt.getLaunchTime() == 0) { sendJHStartEventForAssignedFailTask(taskAttempt); } //set the finish time taskAttempt.setFinishTime(); taskAttempt.eventHandler .handle(createJobCounterUpdateEventTAKilled(taskAttempt, false)); TaskAttemptUnsuccessfulCompletionEvent tauce = createTaskAttemptUnsuccessfulCompletionEvent(taskAttempt, TaskAttemptStateInternal.KILLED); taskAttempt.eventHandler.handle(new JobHistoryEvent( taskAttempt.attemptId.getTaskId().getJobId(), tauce)); if (event instanceof TaskAttemptKillEvent) { taskAttempt.addDiagnosticInfo( ((TaskAttemptKillEvent) event).getMessage()); } taskAttempt.eventHandler.handle(new TaskTAttemptKilledEvent( taskAttempt.attemptId, taskAttempt.getRescheduleNextAttempt())); }
@SuppressWarnings("unchecked") private static void sendContainerCleanup(TaskAttemptImpl taskAttempt, TaskAttemptEvent event) { if (event instanceof TaskAttemptKillEvent) { taskAttempt.addDiagnosticInfo( ((TaskAttemptKillEvent) event).getMessage()); } //send the cleanup event to containerLauncher taskAttempt.eventHandler.handle(new ContainerLauncherEvent( taskAttempt.attemptId, taskAttempt.container.getId(), StringInterner .weakIntern(taskAttempt.container.getNodeId().toString()), taskAttempt.container.getContainerToken(), ContainerLauncher.EventType.CONTAINER_REMOTE_CLEANUP, event.getType() == TaskAttemptEventType.TA_TIMED_OUT)); }
private void actOnUnusableNode(NodeId nodeId, NodeState nodeState) { // rerun previously successful map tasks // do this only if the job is still in the running state and there are // running reducers if (getInternalState() == JobStateInternal.RUNNING && !allReducersComplete()) { List<TaskAttemptId> taskAttemptIdList = nodesToSucceededTaskAttempts.get(nodeId); if (taskAttemptIdList != null) { String mesg = "TaskAttempt killed because it ran on unusable node " + nodeId; for (TaskAttemptId id : taskAttemptIdList) { if (TaskType.MAP == id.getTaskId().getTaskType()) { // reschedule only map tasks because their outputs maybe unusable LOG.info(mesg + ". AttemptId:" + id); // Kill the attempt and indicate that next map attempt should be // rescheduled (i.e. considered as a fast fail map). eventHandler.handle(new TaskAttemptKillEvent(id, mesg, true)); } } } } // currently running task attempts on unusable nodes are handled in // RMContainerAllocator }