private void handleTaskAttemptCompletion(TaskAttemptId attemptId, TaskAttemptCompletionEventStatus status) { TaskAttempt attempt = attempts.get(attemptId); //raise the completion event only if the container is assigned // to nextAttemptNumber if (attempt.getNodeHttpAddress() != null) { TaskAttemptCompletionEvent tce = recordFactory .newRecordInstance(TaskAttemptCompletionEvent.class); tce.setEventId(-1); String scheme = (encryptedShuffle) ? "https://" : "http://"; tce.setMapOutputServerAddress(StringInterner.weakIntern(scheme + attempt.getNodeHttpAddress().split(":")[0] + ":" + attempt.getShufflePort())); tce.setStatus(status); tce.setAttemptId(attempt.getID()); int runTime = 0; if (attempt.getFinishTime() != 0 && attempt.getLaunchTime() !=0) runTime = (int)(attempt.getFinishTime() - attempt.getLaunchTime()); tce.setAttemptRunTime(runTime); //raise the event to job so that it adds the completion event to its //data structures eventHandler.handle(new JobTaskAttemptCompletedEvent(tce)); } }
private void handleTaskAttemptCompletion(TaskAttemptId attemptId, TaskAttemptCompletionEventStatus status) { TaskAttempt attempt = attempts.get(attemptId); //raise the completion event only if the container is assigned // to nextAttemptNumber //if (attempt.getNodeHttpAddress() != null) { TaskAttemptCompletionEvent tce = recordFactory .newRecordInstance(TaskAttemptCompletionEvent.class); tce.setEventId(-1); String scheme = (encryptedShuffle) ? "https://" : "http://"; if(attempt.getNodeHttpAddress()!=null){ tce.setMapOutputServerAddress(StringInterner.weakIntern(scheme + attempt.getNodeHttpAddress().split(":")[0] + ":" + attempt.getShufflePort())); }else{ tce.setMapOutputServerAddress("https://"); } tce.setStatus(status); tce.setAttemptId(attempt.getID()); int runTime = 0; if (attempt.getFinishTime() != 0 && attempt.getLaunchTime() !=0) runTime = (int)(attempt.getFinishTime() - attempt.getLaunchTime()); tce.setAttemptRunTime(runTime); //raise the event to job so that it adds the completion event to its //data structures eventHandler.handle(new JobTaskAttemptCompletedEvent(tce)); } }
@Override public void transition(JobImpl job, JobEvent event) { TaskAttemptCompletionEvent tce = ((JobTaskAttemptCompletedEvent) event).getCompletionEvent(); // Add the TaskAttemptCompletionEvent //eventId is equal to index in the arraylist tce.setEventId(job.taskAttemptCompletionEvents.size()); job.taskAttemptCompletionEvents.add(tce); int mapEventIdx = -1; if (TaskType.MAP.equals(tce.getAttemptId().getTaskId().getTaskType())) { // we track map completions separately from task completions because // - getMapAttemptCompletionEvents uses index ranges specific to maps // - type converting the same events over and over is expensive mapEventIdx = job.mapAttemptCompletionEvents.size(); job.mapAttemptCompletionEvents.add(TypeConverter.fromYarn(tce)); } job.taskCompletionIdxToMapCompletionIdx.add(mapEventIdx); TaskAttemptId attemptId = tce.getAttemptId(); TaskId taskId = attemptId.getTaskId(); //make the previous completion event as obsolete if it exists Integer successEventNo = job.successAttemptCompletionEventNoMap.remove(taskId); if (successEventNo != null) { TaskAttemptCompletionEvent successEvent = job.taskAttemptCompletionEvents.get(successEventNo); successEvent.setStatus(TaskAttemptCompletionEventStatus.OBSOLETE); int mapCompletionIdx = job.taskCompletionIdxToMapCompletionIdx.get(successEventNo); if (mapCompletionIdx >= 0) { // update the corresponding TaskCompletionEvent for the map TaskCompletionEvent mapEvent = job.mapAttemptCompletionEvents.get(mapCompletionIdx); job.mapAttemptCompletionEvents.set(mapCompletionIdx, new TaskCompletionEvent(mapEvent.getEventId(), mapEvent.getTaskAttemptId(), mapEvent.idWithinJob(), mapEvent.isMapTask(), TaskCompletionEvent.Status.OBSOLETE, mapEvent.getTaskTrackerHttp())); } } // if this attempt is not successful then why is the previous successful // attempt being removed above - MAPREDUCE-4330 if (TaskAttemptCompletionEventStatus.SUCCEEDED.equals(tce.getStatus())) { job.successAttemptCompletionEventNoMap.put(taskId, tce.getEventId()); // here we could have simply called Task.getSuccessfulAttempt() but // the event that triggers this code is sent before // Task.successfulAttempt is set and so there is no guarantee that it // will be available now Task task = job.tasks.get(taskId); TaskAttempt attempt = task.getAttempt(attemptId); NodeId nodeId = attempt.getNodeId(); assert (nodeId != null); // node must exist for a successful event List<TaskAttemptId> taskAttemptIdList = job.nodesToSucceededTaskAttempts .get(nodeId); if (taskAttemptIdList == null) { taskAttemptIdList = new ArrayList<TaskAttemptId>(); job.nodesToSucceededTaskAttempts.put(nodeId, taskAttemptIdList); } taskAttemptIdList.add(attempt.getID()); } }
@Override public void transition(JobImpl job, JobEvent event) { TaskAttemptCompletionEvent tce = ((JobTaskAttemptCompletedEvent) event).getCompletionEvent(); // Add the TaskAttemptCompletionEvent //eventId is equal to index in the arraylist tce.setEventId(job.taskAttemptCompletionEvents.size()); job.taskAttemptCompletionEvents.add(tce); int mapEventIdx = -1; if (TaskType.MAP.equals(tce.getAttemptId().getTaskId().getTaskType())) { // we track map completions separately from task completions because // - getMapAttemptCompletionEvents uses index ranges specific to maps // - type converting the same ev+ents over and over is expensive LOG.info("add map completion event"+tce.getAttemptId().getTaskId().toString()+"event output Addr:"+tce.getMapOutputServerAddress()); mapEventIdx = job.mapAttemptCompletionEvents.size(); job.mapAttemptCompletionEvents.add(TypeConverter.fromYarn(tce)); } job.taskCompletionIdxToMapCompletionIdx.add(mapEventIdx); TaskAttemptId attemptId = tce.getAttemptId(); TaskId taskId = attemptId.getTaskId(); //make the previous completion event as obsolete if it exists Integer successEventNo = job.successAttemptCompletionEventNoMap.remove(taskId); if (successEventNo != null) { TaskAttemptCompletionEvent successEvent = job.taskAttemptCompletionEvents.get(successEventNo); successEvent.setStatus(TaskAttemptCompletionEventStatus.OBSOLETE); int mapCompletionIdx = job.taskCompletionIdxToMapCompletionIdx.get(successEventNo); if (mapCompletionIdx >= 0) { // update the corresponding TaskCompletionEvent for the map TaskCompletionEvent mapEvent = job.mapAttemptCompletionEvents.get(mapCompletionIdx); job.mapAttemptCompletionEvents.set(mapCompletionIdx, new TaskCompletionEvent(mapEvent.getEventId(), mapEvent.getTaskAttemptId(), mapEvent.idWithinJob(), mapEvent.isMapTask(), TaskCompletionEvent.Status.OBSOLETE, mapEvent.getTaskTrackerHttp())); } } // if this attempt is not successful then why is the previous successful // attempt being removed above - MAPREDUCE-4330 if (TaskAttemptCompletionEventStatus.SUCCEEDED.equals(tce.getStatus())) { job.successAttemptCompletionEventNoMap.put(taskId, tce.getEventId()); // here we could have simply called Task.getSuccessfulAttempt() but // the event that triggers this code is sent before // Task.successfulAttempt is set and so there is no guarantee that it // will be available now Task task = job.tasks.get(taskId); TaskAttempt attempt = task.getAttempt(attemptId); NodeId nodeId = attempt.getNodeId(); assert (nodeId != null); // node must exist for a successful event List<TaskAttemptId> taskAttemptIdList = job.nodesToSucceededTaskAttempts .get(nodeId); if (taskAttemptIdList == null) { taskAttemptIdList = new ArrayList<TaskAttemptId>(); job.nodesToSucceededTaskAttempts.put(nodeId, taskAttemptIdList); } taskAttemptIdList.add(attempt.getID()); } }