HistoryEvent maybeEmitEvent(ParsedLine line, String jobIDName, HistoryEventEmitter thatg) { JobID jobID = JobID.forName(jobIDName); if (jobIDName == null) { return null; } String priority = line.get("JOB_PRIORITY"); if (priority != null) { return new JobPriorityChangeEvent(jobID, JobPriority.valueOf(priority)); } return null; }
HistoryEvent maybeEmitEvent(ParsedLine line, String jobIDName, HistoryEventEmitter thatg) { if (jobIDName == null) { return null; } JobID jobID = JobID.forName(jobIDName); String status = line.get("JOB_STATUS"); if (status != null) { return new JobStatusChangedEvent(jobID, status); } return null; }
HistoryEvent maybeEmitEvent(ParsedLine line, String jobIDName, HistoryEventEmitter thatg) { if (jobIDName == null) { return null; } JobID jobID = JobID.forName(jobIDName); String launchTime = line.get("LAUNCH_TIME"); if (launchTime != null) { Job20LineHistoryEventEmitter that = (Job20LineHistoryEventEmitter) thatg; return new JobInfoChangeEvent(jobID, that.originalSubmitTime, Long .parseLong(launchTime)); } return null; }
HistoryEvent maybeEmitEvent(ParsedLine line, String jobIDName, HistoryEventEmitter thatg) { if (jobIDName == null) { return null; } JobID jobID = JobID.forName(jobIDName); String finishTime = line.get("FINISH_TIME"); String status = line.get("JOB_STATUS"); String finishedMaps = line.get("FINISHED_MAPS"); String finishedReduces = line.get("FINISHED_REDUCES"); if (status != null && !status.equalsIgnoreCase("success") && finishTime != null && finishedMaps != null && finishedReduces != null) { return new JobUnsuccessfulCompletionEvent(jobID, Long .parseLong(finishTime), Integer.parseInt(finishedMaps), Integer .parseInt(finishedReduces), status); } return null; }
HistoryEvent maybeEmitEvent(ParsedLine line, String taskAttemptIDName, HistoryEventEmitter thatg) { if (taskAttemptIDName == null) { return null; } TaskAttemptID taskAttemptID = TaskAttemptID.forName(taskAttemptIDName); String finishTime = line.get("FINISH_TIME"); String status = line.get("TASK_STATUS"); if (finishTime != null && status != null && status.equalsIgnoreCase("success")) { String hostName = line.get("HOSTNAME"); String counters = line.get("COUNTERS"); String state = line.get("STATE_STRING"); String shuffleFinish = line.get("SHUFFLE_FINISHED"); String sortFinish = line.get("SORT_FINISHED"); if (shuffleFinish != null && sortFinish != null && "success".equalsIgnoreCase(status)) { ReduceAttempt20LineHistoryEventEmitter that = (ReduceAttempt20LineHistoryEventEmitter) thatg; return new ReduceAttemptFinishedEvent (taskAttemptID, that.originalTaskType, status, Long.parseLong(shuffleFinish), Long.parseLong(sortFinish), Long.parseLong(finishTime), hostName, -1, null, state, maybeParseCounters(counters), null); } } return null; }
HistoryEvent maybeEmitEvent(ParsedLine line, String taskIDName, HistoryEventEmitter thatg) { if (taskIDName == null) { return null; } TaskID taskID = TaskID.forName(taskIDName); String taskType = line.get("TASK_TYPE"); String startTime = line.get("START_TIME"); String splits = line.get("SPLITS"); if (startTime != null && taskType != null) { Task20LineHistoryEventEmitter that = (Task20LineHistoryEventEmitter) thatg; that.originalStartTime = Long.parseLong(startTime); that.originalTaskType = Version20LogInterfaceUtils.get20TaskType(taskType); return new TaskStartedEvent(taskID, that.originalStartTime, that.originalTaskType, splits); } return null; }
HistoryEvent maybeEmitEvent(ParsedLine line, String taskIDName, HistoryEventEmitter thatg) { if (taskIDName == null) { return null; } TaskID taskID = TaskID.forName(taskIDName); String finishTime = line.get("FINISH_TIME"); if (finishTime != null) { return new TaskUpdatedEvent(taskID, Long.parseLong(finishTime)); } return null; }
/** * Process one {@link HistoryEvent} * * @param event * The {@link HistoryEvent} to be processed. */ public void process(HistoryEvent event) { if (event instanceof TaskAttemptFinishedEvent) { processTaskAttemptFinishedEvent((TaskAttemptFinishedEvent) event); } else if (event instanceof TaskAttemptUnsuccessfulCompletionEvent) { processTaskAttemptUnsuccessfulCompletionEvent((TaskAttemptUnsuccessfulCompletionEvent) event); } else if (event instanceof TaskStartedEvent) { processTaskStartedEvent((TaskStartedEvent) event); } else if (event instanceof MapAttemptFinishedEvent) { processMapAttemptFinishedEvent((MapAttemptFinishedEvent) event); } else if (event instanceof ReduceAttemptFinishedEvent) { processReduceAttemptFinishedEvent((ReduceAttemptFinishedEvent) event); } // I do NOT expect these if statements to be exhaustive. }
HistoryEvent maybeEmitEvent(ParsedLine line, String taskAttemptIDName, HistoryEventEmitter thatg) { if (taskAttemptIDName == null) { return null; } TaskAttemptID taskAttemptID = TaskAttemptID.forName(taskAttemptIDName); String finishTime = line.get("FINISH_TIME"); String status = line.get("TASK_STATUS"); if (finishTime != null && status != null && status.equalsIgnoreCase("success")) { String hostName = line.get("HOSTNAME"); String counters = line.get("COUNTERS"); String state = line.get("STATE_STRING"); MapAttempt20LineHistoryEventEmitter that = (MapAttempt20LineHistoryEventEmitter) thatg; if ("success".equalsIgnoreCase(status)) { return new MapAttemptFinishedEvent (taskAttemptID, that.originalTaskType, status, Long.parseLong(finishTime), Long.parseLong(finishTime), hostName, -1, null, state, maybeParseCounters(counters), null); } } return null; }
HistoryEvent maybeEmitEvent(ParsedLine line, String jobIDName, HistoryEventEmitter thatg) { if (jobIDName == null) { return null; } JobID jobID = JobID.forName(jobIDName); String launchTime = line.get("LAUNCH_TIME"); String status = line.get("JOB_STATUS"); String totalMaps = line.get("TOTAL_MAPS"); String totalReduces = line.get("TOTAL_REDUCES"); String uberized = line.get("UBERIZED"); if (launchTime != null && totalMaps != null && totalReduces != null) { return new JobInitedEvent(jobID, Long.parseLong(launchTime), Integer .parseInt(totalMaps), Integer.parseInt(totalReduces), status, Boolean.parseBoolean(uberized)); } return null; }
HistoryEvent maybeEmitEvent(ParsedLine line, String jobIDName, HistoryEventEmitter thatg) { if (jobIDName == null) { return null; } JobID jobID = JobID.forName(jobIDName); String finishTime = line.get("FINISH_TIME"); String status = line.get("JOB_STATUS"); String finishedMaps = line.get("FINISHED_MAPS"); String finishedReduces = line.get("FINISHED_REDUCES"); String failedMaps = line.get("FAILED_MAPS"); String failedReduces = line.get("FAILED_REDUCES"); String counters = line.get("COUNTERS"); if (status != null && status.equalsIgnoreCase("success") && finishTime != null && finishedMaps != null && finishedReduces != null) { return new JobFinishedEvent(jobID, Long.parseLong(finishTime), Integer .parseInt(finishedMaps), Integer.parseInt(finishedReduces), Integer .parseInt(failedMaps), Integer.parseInt(failedReduces), null, null, maybeParseCounters(counters)); } return null; }
HistoryEvent maybeEmitEvent(ParsedLine line, String taskAttemptIDName, HistoryEventEmitter thatg) { if (taskAttemptIDName == null) { return null; } TaskAttemptID taskAttemptID = TaskAttemptID.forName(taskAttemptIDName); String startTime = line.get("START_TIME"); String taskType = line.get("TASK_TYPE"); String trackerName = line.get("TRACKER_NAME"); String httpPort = line.get("HTTP_PORT"); String locality = line.get("LOCALITY"); if (locality == null) { locality = ""; } String avataar = line.get("AVATAAR"); if (avataar == null) { avataar = ""; } if (startTime != null && taskType != null) { TaskAttempt20LineEventEmitter that = (TaskAttempt20LineEventEmitter) thatg; that.originalStartTime = Long.parseLong(startTime); that.originalTaskType = Version20LogInterfaceUtils.get20TaskType(taskType); int port = httpPort.equals("") ? DEFAULT_HTTP_PORT : Integer .parseInt(httpPort); return new TaskAttemptStartedEvent(taskAttemptID, that.originalTaskType, that.originalStartTime, trackerName, port, -1, locality, avataar); } return null; }
HistoryEvent maybeEmitEvent(ParsedLine line, String taskAttemptIDName, HistoryEventEmitter thatg) { if (taskAttemptIDName == null) { return null; } TaskAttemptID taskAttemptID = TaskAttemptID.forName(taskAttemptIDName); String finishTime = line.get("FINISH_TIME"); String status = line.get("TASK_STATUS"); if (finishTime != null && status != null && status.equalsIgnoreCase("success")) { String hostName = line.get("HOSTNAME"); String counters = line.get("COUNTERS"); String state = line.get("STATE_STRING"); TaskAttempt20LineEventEmitter that = (TaskAttempt20LineEventEmitter) thatg; ParsedHost pHost = ParsedHost.parse(hostName); return new TaskAttemptFinishedEvent(taskAttemptID, that.originalTaskType, status, Long.parseLong(finishTime), pHost.getRackName(), pHost.getNodeName(), state, maybeParseCounters(counters)); } return null; }
HistoryEvent maybeEmitEvent(ParsedLine line, String taskAttemptIDName, HistoryEventEmitter thatg) { if (taskAttemptIDName == null) { return null; } TaskAttemptID taskAttemptID = TaskAttemptID.forName(taskAttemptIDName); String finishTime = line.get("FINISH_TIME"); String status = line.get("TASK_STATUS"); if (finishTime != null && status != null && !status.equalsIgnoreCase("success")) { String hostName = line.get("HOSTNAME"); String error = line.get("ERROR"); TaskAttempt20LineEventEmitter that = (TaskAttempt20LineEventEmitter) thatg; ParsedHost pHost = ParsedHost.parse(hostName); String rackName = null; // Earlier versions of MR logged on hostnames (without rackname) for // unsuccessful attempts if (pHost != null) { rackName = pHost.getRackName(); hostName = pHost.getNodeName(); } return new TaskAttemptUnsuccessfulCompletionEvent (taskAttemptID, that.originalTaskType, status, Long.parseLong(finishTime), hostName, -1, rackName, error, null); } return null; }