private List<AMInfo> readJustAMInfos() { List<AMInfo> amInfos = new ArrayList<AMInfo>(); FSDataInputStream inputStream = null; try { inputStream = getPreviousJobHistoryStream(getConfig(), appAttemptID); EventReader jobHistoryEventReader = new EventReader(inputStream); // All AMInfos are contiguous. Track when the first AMStartedEvent // appears. boolean amStartedEventsBegan = false; HistoryEvent event; while ((event = jobHistoryEventReader.getNextEvent()) != null) { if (event.getEventType() == EventType.AM_STARTED) { if (!amStartedEventsBegan) { // First AMStartedEvent. amStartedEventsBegan = true; } AMStartedEvent amStartedEvent = (AMStartedEvent) event; amInfos.add(MRBuilderUtils.newAMInfo( amStartedEvent.getAppAttemptId(), amStartedEvent.getStartTime(), amStartedEvent.getContainerId(), StringInterner.weakIntern(amStartedEvent.getNodeManagerHost()), amStartedEvent.getNodeManagerPort(), amStartedEvent.getNodeManagerHttpPort())); } else if (amStartedEventsBegan) { // This means AMStartedEvents began and this event is a // non-AMStarted event. // No need to continue reading all the other events. break; } } } catch (IOException e) { LOG.warn("Could not parse the old history file. " + "Will not have old AMinfos ", e); } finally { if (inputStream != null) { IOUtils.closeQuietly(inputStream); } } return amInfos; }
/** * Process one {@link HistoryEvent} * * @param event * The {@link HistoryEvent} to be processed. */ public void process(HistoryEvent event) { if (finalized) { throw new IllegalStateException( "JobBuilder.process(HistoryEvent event) called after ParsedJob built"); } // these are in lexicographical order by class name. if (event instanceof AMStartedEvent) { // ignore this event as Rumen currently doesnt need this event //TODO Enhance Rumen to process this event and capture restarts return; } else if (event instanceof NormalizedResourceEvent) { // Log an warn message as NormalizedResourceEvent shouldn't be written. LOG.warn("NormalizedResourceEvent should be ignored in history server."); } else if (event instanceof JobFinishedEvent) { processJobFinishedEvent((JobFinishedEvent) event); } else if (event instanceof JobInfoChangeEvent) { processJobInfoChangeEvent((JobInfoChangeEvent) event); } else if (event instanceof JobInitedEvent) { processJobInitedEvent((JobInitedEvent) event); } else if (event instanceof JobPriorityChangeEvent) { processJobPriorityChangeEvent((JobPriorityChangeEvent) event); } else if (event instanceof JobQueueChangeEvent) { processJobQueueChangeEvent((JobQueueChangeEvent) event); } else if (event instanceof JobStatusChangedEvent) { processJobStatusChangedEvent((JobStatusChangedEvent) event); } else if (event instanceof JobSubmittedEvent) { processJobSubmittedEvent((JobSubmittedEvent) event); } else if (event instanceof JobUnsuccessfulCompletionEvent) { processJobUnsuccessfulCompletionEvent((JobUnsuccessfulCompletionEvent) event); } else if (event instanceof MapAttemptFinishedEvent) { processMapAttemptFinishedEvent((MapAttemptFinishedEvent) event); } else if (event instanceof ReduceAttemptFinishedEvent) { processReduceAttemptFinishedEvent((ReduceAttemptFinishedEvent) event); } else if (event instanceof TaskAttemptFinishedEvent) { processTaskAttemptFinishedEvent((TaskAttemptFinishedEvent) event); } else if (event instanceof TaskAttemptStartedEvent) { processTaskAttemptStartedEvent((TaskAttemptStartedEvent) event); } else if (event instanceof TaskAttemptUnsuccessfulCompletionEvent) { processTaskAttemptUnsuccessfulCompletionEvent((TaskAttemptUnsuccessfulCompletionEvent) event); } else if (event instanceof TaskFailedEvent) { processTaskFailedEvent((TaskFailedEvent) event); } else if (event instanceof TaskFinishedEvent) { processTaskFinishedEvent((TaskFinishedEvent) event); } else if (event instanceof TaskStartedEvent) { processTaskStartedEvent((TaskStartedEvent) event); } else if (event instanceof TaskUpdatedEvent) { processTaskUpdatedEvent((TaskUpdatedEvent) event); } else throw new IllegalArgumentException( "JobBuilder.process(HistoryEvent): unknown event type:" + event.getEventType() + " for event:" + event); }
@SuppressWarnings("unchecked") @Override protected void serviceStart() throws Exception { amInfos = new LinkedList<AMInfo>(); completedTasksFromPreviousRun = new HashMap<TaskId, TaskInfo>(); processRecovery(); // Current an AMInfo for the current AM generation. AMInfo amInfo = MRBuilderUtils.newAMInfo(appAttemptID, startTime, containerID, nmHost, nmPort, nmHttpPort); amInfos.add(amInfo); // /////////////////// Create the job itself. job = createJob(getConfig(), forcedState, shutDownMessage); // End of creating the job. // Send out an MR AM inited event for this AM and all previous AMs. for (AMInfo info : amInfos) { dispatcher.getEventHandler().handle( new JobHistoryEvent(job.getID(), new AMStartedEvent(info .getAppAttemptId(), info.getStartTime(), info.getContainerId(), info.getNodeManagerHost(), info.getNodeManagerPort(), info .getNodeManagerHttpPort()))); } // metrics system init is really init & start. // It's more test friendly to put it here. DefaultMetricsSystem.initialize("MRAppMaster"); if (!errorHappenedShutDown) { // create a job event for job intialization JobEvent initJobEvent = new JobEvent(job.getID(), JobEventType.JOB_INIT); // Send init to the job (this does NOT trigger job execution) // This is a synchronous call, not an event through dispatcher. We want // job-init to be done completely here. jobEventDispatcher.handle(initJobEvent); // JobImpl's InitTransition is done (call above is synchronous), so the // "uber-decision" (MR-1220) has been made. Query job and switch to // ubermode if appropriate (by registering different container-allocator // and container-launcher services/event-handlers). if (job.isUber()) { speculatorEventDispatcher.disableSpeculation(); LOG.info("MRAppMaster uberizing job " + job.getID() + " in local container (\"uber-AM\") on node " + nmHost + ":" + nmPort + "."); } else { // send init to speculator only for non-uber jobs. // This won't yet start as dispatcher isn't started yet. dispatcher.getEventHandler().handle( new SpeculatorEvent(job.getID(), clock.getTime())); LOG.info("MRAppMaster launching normal, non-uberized, multi-container " + "job " + job.getID() + "."); } } //start all the components super.serviceStart(); // All components have started, start the job. startJobs(); }
/** * Process one {@link HistoryEvent} * * @param event * The {@link HistoryEvent} to be processed. */ public void process(HistoryEvent event) { if (finalized) { throw new IllegalStateException( "JobBuilder.process(HistoryEvent event) called after ParsedJob built"); } // these are in lexicographical order by class name. if (event instanceof AMStartedEvent) { // ignore this event as Rumen currently doesnt need this event //TODO Enhance Rumen to process this event and capture restarts return; } else if (event instanceof JobFinishedEvent) { processJobFinishedEvent((JobFinishedEvent) event); } else if (event instanceof JobInfoChangeEvent) { processJobInfoChangeEvent((JobInfoChangeEvent) event); } else if (event instanceof JobInitedEvent) { processJobInitedEvent((JobInitedEvent) event); } else if (event instanceof JobPriorityChangeEvent) { processJobPriorityChangeEvent((JobPriorityChangeEvent) event); } else if (event instanceof JobStatusChangedEvent) { processJobStatusChangedEvent((JobStatusChangedEvent) event); } else if (event instanceof JobSubmittedEvent) { processJobSubmittedEvent((JobSubmittedEvent) event); } else if (event instanceof JobUnsuccessfulCompletionEvent) { processJobUnsuccessfulCompletionEvent((JobUnsuccessfulCompletionEvent) event); } else if (event instanceof MapAttemptFinishedEvent) { processMapAttemptFinishedEvent((MapAttemptFinishedEvent) event); } else if (event instanceof ReduceAttemptFinishedEvent) { processReduceAttemptFinishedEvent((ReduceAttemptFinishedEvent) event); } else if (event instanceof TaskAttemptFinishedEvent) { processTaskAttemptFinishedEvent((TaskAttemptFinishedEvent) event); } else if (event instanceof TaskAttemptStartedEvent) { processTaskAttemptStartedEvent((TaskAttemptStartedEvent) event); } else if (event instanceof TaskAttemptUnsuccessfulCompletionEvent) { processTaskAttemptUnsuccessfulCompletionEvent((TaskAttemptUnsuccessfulCompletionEvent) event); } else if (event instanceof TaskFailedEvent) { processTaskFailedEvent((TaskFailedEvent) event); } else if (event instanceof TaskFinishedEvent) { processTaskFinishedEvent((TaskFinishedEvent) event); } else if (event instanceof TaskStartedEvent) { processTaskStartedEvent((TaskStartedEvent) event); } else if (event instanceof TaskUpdatedEvent) { processTaskUpdatedEvent((TaskUpdatedEvent) event); } else throw new IllegalArgumentException( "JobBuilder.process(HistoryEvent): unknown event type"); }
@SuppressWarnings("unchecked") @Override protected void serviceStart() throws Exception { amInfos = new LinkedList<AMInfo>(); completedTasksFromPreviousRun = new HashMap<TaskId, TaskInfo>(); processRecovery(); // Current an AMInfo for the current AM generation. AMInfo amInfo = MRBuilderUtils.newAMInfo(appAttemptID, startTime, containerID, nmHost, nmPort, nmHttpPort); amInfos.add(amInfo); // /////////////////// Create the job itself. job = createJob(getConfig(), forcedState, shutDownMessage); // End of creating the job. // Send out an MR AM inited event for this AM and all previous AMs. for (AMInfo info : amInfos) { dispatcher.getEventHandler().handle( new JobHistoryEvent(job.getID(), new AMStartedEvent(info .getAppAttemptId(), info.getStartTime(), info.getContainerId(), info.getNodeManagerHost(), info.getNodeManagerPort(), info .getNodeManagerHttpPort()))); } // metrics system init is really init & start. // It's more test friendly to put it here. DefaultMetricsSystem.initialize("MRAppMaster"); if (!errorHappenedShutDown) { // create a job event for job intialization JobEvent initJobEvent = new JobEvent(job.getID(), JobEventType.JOB_INIT); // Send init to the job (this does NOT trigger job execution) // This is a synchronous call, not an event through dispatcher. We want // job-init to be done completely here. jobEventDispatcher.handle(initJobEvent); // JobImpl's InitTransition is done (call above is synchronous), so the // "uber-decision" (MR-1220) has been made. Query job and switch to // ubermode if appropriate (by registering different container-allocator // and container-launcher services/event-handlers). if (job.isUber()) { speculatorEventDispatcher.disableSpeculation(); LOG.info("MRAppMaster uberizing job " + job.getID() + " in local container (\"uber-AM\") on node " + nmHost + ":" + nmPort + "."); } else { // send init to speculator only for non-uber jobs. // This won't yet start as dispatcher isn't started yet. dispatcher.getEventHandler().handle( new SpeculatorEvent(job.getID(), clock.getTime())); LOG.info("MRAppMaster launching normal, non-uberized, multi-container " + "job " + job.getID() + "."); } // Start ClientService here, since it's not initialized if // errorHappenedShutDown is true clientService.start(); } //start all the components super.serviceStart(); // All components have started, start the job. startJobs(); }
@SuppressWarnings("unchecked") @Override protected void serviceStart() throws Exception { amInfos = new LinkedList<AMInfo>(); completedTasksFromPreviousRun = new HashMap<TaskId, TaskInfo>(); processRecovery(); // Current an AMInfo for the current AM generation. AMInfo amInfo = MRBuilderUtils.newAMInfo(appAttemptID, startTime, containerID, nmHost, nmPort, nmHttpPort); // /////////////////// Create the job itself. job = createJob(getConfig(), forcedState, shutDownMessage); // End of creating the job. // Send out an MR AM inited event for all previous AMs. for (AMInfo info : amInfos) { dispatcher.getEventHandler().handle( new JobHistoryEvent(job.getID(), new AMStartedEvent(info .getAppAttemptId(), info.getStartTime(), info.getContainerId(), info.getNodeManagerHost(), info.getNodeManagerPort(), info .getNodeManagerHttpPort()))); } // Send out an MR AM inited event for this AM. dispatcher.getEventHandler().handle( new JobHistoryEvent(job.getID(), new AMStartedEvent(amInfo .getAppAttemptId(), amInfo.getStartTime(), amInfo.getContainerId(), amInfo.getNodeManagerHost(), amInfo.getNodeManagerPort(), amInfo .getNodeManagerHttpPort(), this.forcedState == null ? null : this.forcedState.toString()))); amInfos.add(amInfo); // metrics system init is really init & start. // It's more test friendly to put it here. DefaultMetricsSystem.initialize("MRAppMaster"); if (!errorHappenedShutDown) { // create a job event for job intialization JobEvent initJobEvent = new JobEvent(job.getID(), JobEventType.JOB_INIT); // Send init to the job (this does NOT trigger job execution) // This is a synchronous call, not an event through dispatcher. We want // job-init to be done completely here. jobEventDispatcher.handle(initJobEvent); // JobImpl's InitTransition is done (call above is synchronous), so the // "uber-decision" (MR-1220) has been made. Query job and switch to // ubermode if appropriate (by registering different container-allocator // and container-launcher services/event-handlers). if (job.isUber()) { speculatorEventDispatcher.disableSpeculation(); LOG.info("MRAppMaster uberizing job " + job.getID() + " in local container (\"uber-AM\") on node " + nmHost + ":" + nmPort + "."); } else { // send init to speculator only for non-uber jobs. // This won't yet start as dispatcher isn't started yet. dispatcher.getEventHandler().handle( new SpeculatorEvent(job.getID(), clock.getTime())); LOG.info("MRAppMaster launching normal, non-uberized, multi-container " + "job " + job.getID() + "."); } // Start ClientService here, since it's not initialized if // errorHappenedShutDown is true clientService.start(); } //start all the components super.serviceStart(); // set job classloader if configured MRApps.setJobClassLoader(getConfig()); // All components have started, start the job. startJobs(); }