/** * Process one {@link HistoryEvent} * * @param event * The {@link HistoryEvent} to be processed. */ public void process(HistoryEvent event) { if (event instanceof TaskAttemptFinishedEvent) { processTaskAttemptFinishedEvent((TaskAttemptFinishedEvent) event); } else if (event instanceof TaskAttemptUnsuccessfulCompletionEvent) { processTaskAttemptUnsuccessfulCompletionEvent((TaskAttemptUnsuccessfulCompletionEvent) event); } else if (event instanceof TaskStartedEvent) { processTaskStartedEvent((TaskStartedEvent) event); } else if (event instanceof MapAttemptFinishedEvent) { processMapAttemptFinishedEvent((MapAttemptFinishedEvent) event); } else if (event instanceof ReduceAttemptFinishedEvent) { processReduceAttemptFinishedEvent((ReduceAttemptFinishedEvent) event); } // I do NOT expect these if statements to be exhaustive. }
HistoryEvent maybeEmitEvent(ParsedLine line, String taskAttemptIDName, HistoryEventEmitter thatg) { if (taskAttemptIDName == null) { return null; } TaskAttemptID taskAttemptID = TaskAttemptID.forName(taskAttemptIDName); String finishTime = line.get("FINISH_TIME"); String status = line.get("TASK_STATUS"); if (finishTime != null && status != null && status.equalsIgnoreCase("success")) { String hostName = line.get("HOSTNAME"); String counters = line.get("COUNTERS"); String state = line.get("STATE_STRING"); MapAttempt20LineHistoryEventEmitter that = (MapAttempt20LineHistoryEventEmitter) thatg; if ("success".equalsIgnoreCase(status)) { return new MapAttemptFinishedEvent (taskAttemptID, that.originalTaskType, status, Long.parseLong(finishTime), Long.parseLong(finishTime), hostName, -1, null, state, maybeParseCounters(counters), null); } } return null; }
@Override public void handleEvent(HistoryEvent event) { EventType type = event.getEventType(); switch (type) { case MAP_ATTEMPT_FINISHED: handleMapAttemptFinishedEvent((MapAttemptFinishedEvent) event); super.handleEvent(event); break; case REDUCE_ATTEMPT_FINISHED: handleReduceAttemptFinishedEvent((ReduceAttemptFinishedEvent) event); super.handleEvent(event); break; default: super.handleEvent(event); break; } }
private void processMapAttemptFinishedEvent(MapAttemptFinishedEvent event) { LoggedTaskAttempt attempt = getOrMakeTaskAttempt(event.getTaskType(), event.getTaskId().toString(), event.getAttemptId().toString()); if (attempt == null) { return; } attempt.setResult(getPre21Value(event.getTaskStatus())); attempt.setHostName(event.getHostname()); // XXX There may be redundant location info available in the event. // We might consider extracting it from this event. Currently this // is redundant, but making this will add future-proofing. attempt.setFinishTime(event.getFinishTime()); attempt .incorporateCounters(((MapAttemptFinished) event.getDatum()).counters); }
private void processMapAttemptFinishedEvent(MapAttemptFinishedEvent event) { ParsedTaskAttempt attempt = getOrMakeTaskAttempt(event.getTaskType(), event.getTaskId().toString(), event.getAttemptId().toString()); if (attempt == null) { return; } attempt.setResult(getPre21Value(event.getTaskStatus())); attempt.setHostName(event.getHostname(), event.getRackName()); ParsedHost pHost = getAndRecordParsedHost(event.getRackName(), event.getHostname()); if (pHost != null) { attempt.setLocation(pHost.makeLoggedLocation()); } // XXX There may be redundant location info available in the event. // We might consider extracting it from this event. Currently this // is redundant, but making this will add future-proofing. attempt.setFinishTime(event.getFinishTime()); attempt .incorporateCounters(((MapAttemptFinished) event.getDatum()).counters); attempt.arraySetClockSplits(event.getClockSplits()); attempt.arraySetCpuUsages(event.getCpuUsages()); attempt.arraySetGpuUsages(event.getGpuUsages()); attempt.arraySetVMemKbytes(event.getVMemKbytes()); attempt.arraySetPhysMemKbytes(event.getPhysMemKbytes()); }
private void processMapAttemptFinishedEvent(MapAttemptFinishedEvent event) { ParsedTaskAttempt attempt = getOrMakeTaskAttempt(event.getTaskType(), event.getTaskId().toString(), event.getAttemptId().toString()); if (attempt == null) { return; } attempt.setResult(getPre21Value(event.getTaskStatus())); attempt.setHostName(event.getHostname(), event.getRackName()); ParsedHost pHost = getAndRecordParsedHost(event.getRackName(), event.getHostname()); if (pHost != null) { attempt.setLocation(pHost.makeLoggedLocation()); } // XXX There may be redundant location info available in the event. // We might consider extracting it from this event. Currently this // is redundant, but making this will add future-proofing. attempt.setFinishTime(event.getFinishTime()); attempt .incorporateCounters(((MapAttemptFinished) event.getDatum()).counters); attempt.arraySetClockSplits(event.getClockSplits()); attempt.arraySetCpuUsages(event.getCpuUsages()); attempt.arraySetVMemKbytes(event.getVMemKbytes()); attempt.arraySetPhysMemKbytes(event.getPhysMemKbytes()); }
HistoryEvent maybeEmitEvent(ParsedLine line, String taskAttemptIDName, HistoryEventEmitter thatg) { if (taskAttemptIDName == null) { return null; } TaskAttemptID taskAttemptID = TaskAttemptID.forName(taskAttemptIDName); String finishTime = line.get("FINISH_TIME"); String status = line.get("TASK_STATUS"); if (finishTime != null && status != null && status.equalsIgnoreCase("success")) { String hostName = line.get("HOSTNAME"); String counters = line.get("COUNTERS"); String state = line.get("STATE_STRING"); MapAttempt20LineHistoryEventEmitter that = (MapAttempt20LineHistoryEventEmitter) thatg; if (finishTime != null && "success".equalsIgnoreCase(status)) { return new MapAttemptFinishedEvent (taskAttemptID, that.originalTaskType, status, Long.parseLong(finishTime), Long.parseLong(finishTime), hostName, -1, null, state, maybeParseCounters(counters), null); } } return null; }
HistoryEvent maybeEmitEvent(ParsedLine line, String taskAttemptIDName, HistoryEventEmitter thatg) { if (taskAttemptIDName == null) { return null; } TaskAttemptID taskAttemptID = TaskAttemptID.forName(taskAttemptIDName); String finishTime = line.get("FINISH_TIME"); String status = line.get("TASK_STATUS"); if (finishTime != null && status != null && status.equalsIgnoreCase("success")) { String hostName = line.get("RPC_ADDRESSES"); String counters = line.get("COUNTERS"); String state = line.get("STATE_STRING"); MapAttempt20LineHistoryEventEmitter that = (MapAttempt20LineHistoryEventEmitter) thatg; if ("success".equalsIgnoreCase(status)) { return new MapAttemptFinishedEvent (taskAttemptID, that.originalTaskType, status, Long.parseLong(finishTime), Long.parseLong(finishTime), hostName, -1, null, state, maybeParseCounters(counters), null); } } return null; }
/** * Customized methods for handling MapAttemptFinishedEvents * @param event */ private void handleMapAttemptFinishedEvent(MapAttemptFinishedEvent event) { Map<TaskAttemptID, AdditionalTaskInfo> additionalJobInfoMap = additionalJobInfo .getAdditionalTasksMap(); if (!additionalJobInfoMap.containsKey(event.getAttemptId())) { AdditionalTaskInfo additionalTaskInfo = new AdditionalTaskInfo(); additionalTaskInfo.taskType = event.getTaskType(); additionalTaskInfo.cpuUsages = event.getCpuUsages(); additionalTaskInfo.physicalMemInKBs = event.getPhysMemKbytes(); additionalJobInfoMap.put(event.getAttemptId(), additionalTaskInfo); } }
HistoryEvent maybeEmitEvent(ParsedLine line, String taskAttemptIDName, HistoryEventEmitter thatg) { if (taskAttemptIDName == null) { return null; } TaskAttemptID taskAttemptID = TaskAttemptID.forName(taskAttemptIDName); String finishTime = line.get("FINISH_TIME"); String status = line.get("TASK_STATUS"); if (finishTime != null && status != null && status.equalsIgnoreCase("success")) { String hostName = line.get("HOSTNAME"); String counters = line.get("COUNTERS"); String state = line.get("STATE_STRING"); MapAttempt20LineHistoryEventEmitter that = (MapAttempt20LineHistoryEventEmitter) thatg; if (finishTime != null && "success".equalsIgnoreCase(status)) { return new MapAttemptFinishedEvent(taskAttemptID, that.originalTaskType, status, Long.parseLong(finishTime), Long .parseLong(finishTime), hostName, state, maybeParseCounters(counters)); } } return null; }
@SuppressWarnings({ "unchecked" }) private void logAttemptFinishedEvent(TaskAttemptStateInternal state) { //Log finished events only if an attempt started. if (getLaunchTime() == 0) return; String containerHostName = this.container == null ? "UNKNOWN" : this.container.getNodeId().getHost(); int containerNodePort = this.container == null ? -1 : this.container.getNodeId().getPort(); if (attemptId.getTaskId().getTaskType() == TaskType.MAP) { MapAttemptFinishedEvent mfe = new MapAttemptFinishedEvent(TypeConverter.fromYarn(attemptId), TypeConverter.fromYarn(attemptId.getTaskId().getTaskType()), state.toString(), this.reportedStatus.mapFinishTime, finishTime, containerHostName, containerNodePort, this.nodeRackName == null ? "UNKNOWN" : this.nodeRackName, this.reportedStatus.stateString, getCounters(), getProgressSplitBlock().burst()); eventHandler.handle( new JobHistoryEvent(attemptId.getTaskId().getJobId(), mfe)); } else { ReduceAttemptFinishedEvent rfe = new ReduceAttemptFinishedEvent(TypeConverter.fromYarn(attemptId), TypeConverter.fromYarn(attemptId.getTaskId().getTaskType()), state.toString(), this.reportedStatus.shuffleFinishTime, this.reportedStatus.sortFinishTime, finishTime, containerHostName, containerNodePort, this.nodeRackName == null ? "UNKNOWN" : this.nodeRackName, this.reportedStatus.stateString, getCounters(), getProgressSplitBlock().burst()); eventHandler.handle( new JobHistoryEvent(attemptId.getTaskId().getJobId(), rfe)); } }
private void processMapAttemptFinishedEvent(MapAttemptFinishedEvent event) { recordParsedHost(event.getHostname(), event.getRackName()); }
/** * Process one {@link HistoryEvent} * * @param event * The {@link HistoryEvent} to be processed. */ public void process(HistoryEvent event) { if (finalized) { throw new IllegalStateException( "JobBuilder.process(HistoryEvent event) called after ParsedJob built"); } // these are in lexicographical order by class name. if (event instanceof AMStartedEvent) { // ignore this event as Rumen currently doesnt need this event //TODO Enhance Rumen to process this event and capture restarts return; } else if (event instanceof NormalizedResourceEvent) { // Log an warn message as NormalizedResourceEvent shouldn't be written. LOG.warn("NormalizedResourceEvent should be ignored in history server."); } else if (event instanceof JobFinishedEvent) { processJobFinishedEvent((JobFinishedEvent) event); } else if (event instanceof JobInfoChangeEvent) { processJobInfoChangeEvent((JobInfoChangeEvent) event); } else if (event instanceof JobInitedEvent) { processJobInitedEvent((JobInitedEvent) event); } else if (event instanceof JobPriorityChangeEvent) { processJobPriorityChangeEvent((JobPriorityChangeEvent) event); } else if (event instanceof JobQueueChangeEvent) { processJobQueueChangeEvent((JobQueueChangeEvent) event); } else if (event instanceof JobStatusChangedEvent) { processJobStatusChangedEvent((JobStatusChangedEvent) event); } else if (event instanceof JobSubmittedEvent) { processJobSubmittedEvent((JobSubmittedEvent) event); } else if (event instanceof JobUnsuccessfulCompletionEvent) { processJobUnsuccessfulCompletionEvent((JobUnsuccessfulCompletionEvent) event); } else if (event instanceof MapAttemptFinishedEvent) { processMapAttemptFinishedEvent((MapAttemptFinishedEvent) event); } else if (event instanceof ReduceAttemptFinishedEvent) { processReduceAttemptFinishedEvent((ReduceAttemptFinishedEvent) event); } else if (event instanceof TaskAttemptFinishedEvent) { processTaskAttemptFinishedEvent((TaskAttemptFinishedEvent) event); } else if (event instanceof TaskAttemptStartedEvent) { processTaskAttemptStartedEvent((TaskAttemptStartedEvent) event); } else if (event instanceof TaskAttemptUnsuccessfulCompletionEvent) { processTaskAttemptUnsuccessfulCompletionEvent((TaskAttemptUnsuccessfulCompletionEvent) event); } else if (event instanceof TaskFailedEvent) { processTaskFailedEvent((TaskFailedEvent) event); } else if (event instanceof TaskFinishedEvent) { processTaskFinishedEvent((TaskFinishedEvent) event); } else if (event instanceof TaskStartedEvent) { processTaskStartedEvent((TaskStartedEvent) event); } else if (event instanceof TaskUpdatedEvent) { processTaskUpdatedEvent((TaskUpdatedEvent) event); } else throw new IllegalArgumentException( "JobBuilder.process(HistoryEvent): unknown event type:" + event.getEventType() + " for event:" + event); }