private static StubbedJob createStubbedJob(Configuration conf, Dispatcher dispatcher, int numSplits, AppContext appContext) { JobID jobID = JobID.forName("job_1234567890000_0001"); JobId jobId = TypeConverter.toYarn(jobID); if (appContext == null) { appContext = mock(AppContext.class); when(appContext.hasSuccessfullyUnregistered()).thenReturn(true); } StubbedJob job = new StubbedJob(jobId, ApplicationAttemptId.newInstance(ApplicationId.newInstance(0, 0), 0), conf,dispatcher.getEventHandler(), true, "somebody", numSplits, appContext); dispatcher.register(JobEventType.class, job); EventHandler mockHandler = mock(EventHandler.class); dispatcher.register(TaskEventType.class, mockHandler); dispatcher.register(org.apache.hadoop.mapreduce.jobhistory.EventType.class, mockHandler); dispatcher.register(JobFinishEvent.Type.class, mockHandler); return job; }
@Override public void handleEvent(HistoryEvent event) { EventType type = event.getEventType(); switch (type) { case MAP_ATTEMPT_FINISHED: handleMapAttemptFinishedEvent((MapAttemptFinishedEvent) event); super.handleEvent(event); break; case REDUCE_ATTEMPT_FINISHED: handleReduceAttemptFinishedEvent((ReduceAttemptFinishedEvent) event); super.handleEvent(event); break; default: super.handleEvent(event); break; } }
@Test public void testJobNoTasks() { Configuration conf = new Configuration(); conf.setInt(MRJobConfig.NUM_REDUCES, 0); conf.set(MRJobConfig.MR_AM_STAGING_DIR, stagingDir); conf.set(MRJobConfig.WORKFLOW_ID, "testId"); conf.set(MRJobConfig.WORKFLOW_NAME, "testName"); conf.set(MRJobConfig.WORKFLOW_NODE_NAME, "testNodeName"); conf.set(MRJobConfig.WORKFLOW_ADJACENCY_PREFIX_STRING + "key1", "value1"); conf.set(MRJobConfig.WORKFLOW_ADJACENCY_PREFIX_STRING + "key2", "value2"); conf.set(MRJobConfig.WORKFLOW_TAGS, "tag1,tag2"); AsyncDispatcher dispatcher = new AsyncDispatcher(); dispatcher.init(conf); dispatcher.start(); OutputCommitter committer = mock(OutputCommitter.class); CommitterEventHandler commitHandler = createCommitterEventHandler(dispatcher, committer); commitHandler.init(conf); commitHandler.start(); JobSubmittedEventHandler jseHandler = new JobSubmittedEventHandler("testId", "testName", "testNodeName", "\"key2\"=\"value2\" \"key1\"=\"value1\" ", "tag1,tag2"); dispatcher.register(EventType.class, jseHandler); JobImpl job = createStubbedJob(conf, dispatcher, 0, null); job.handle(new JobEvent(job.getID(), JobEventType.JOB_INIT)); assertJobState(job, JobStateInternal.INITED); job.handle(new JobStartEvent(job.getID())); assertJobState(job, JobStateInternal.SUCCEEDED); dispatcher.stop(); commitHandler.stop(); try { Assert.assertTrue(jseHandler.getAssertValue()); } catch (InterruptedException e) { Assert.fail("Workflow related attributes are not tested properly"); } }
@Override public void handle(JobHistoryEvent jhEvent) { if (jhEvent.getType() != EventType.JOB_SUBMITTED) { return; } JobSubmittedEvent jsEvent = (JobSubmittedEvent) jhEvent.getHistoryEvent(); if (!workflowId.equals(jsEvent.getWorkflowId())) { setAssertValue(false); return; } if (!workflowName.equals(jsEvent.getWorkflowName())) { setAssertValue(false); return; } if (!workflowNodeName.equals(jsEvent.getWorkflowNodeName())) { setAssertValue(false); return; } String[] wrkflowAdj = workflowAdjacencies.split(" "); String[] jswrkflowAdj = jsEvent.getWorkflowAdjacencies().split(" "); Arrays.sort(wrkflowAdj); Arrays.sort(jswrkflowAdj); if (!Arrays.equals(wrkflowAdj, jswrkflowAdj)) { setAssertValue(false); return; } if (!workflowTags.equals(jsEvent.getWorkflowTags())) { setAssertValue(false); return; } setAssertValue(true); }
@Override protected EventHandler<JobHistoryEvent> createJobHistoryHandler( AppContext context) { return new JobHistoryEventHandler(context, getStartCount()) { @Override public void handle(JobHistoryEvent event) { if (event.getHistoryEvent().getEventType() == EventType.JOB_INITED) { JobInitedEvent jie = (JobInitedEvent) event.getHistoryEvent(); jobLaunchTime.set(jie.getLaunchTime()); } super.handle(event); } }; }
@Test public void testJobNoTasks() { Configuration conf = new Configuration(); conf.setInt(MRJobConfig.NUM_REDUCES, 0); conf.set(MRJobConfig.MR_AM_STAGING_DIR, stagingDir); conf.set(MRJobConfig.WORKFLOW_ID, "testId"); conf.set(MRJobConfig.WORKFLOW_NAME, "testName"); conf.set(MRJobConfig.WORKFLOW_NODE_NAME, "testNodeName"); conf.set(MRJobConfig.WORKFLOW_ADJACENCY_PREFIX_STRING + "key1", "value1"); conf.set(MRJobConfig.WORKFLOW_ADJACENCY_PREFIX_STRING + "key2", "value2"); conf.set(MRJobConfig.WORKFLOW_TAGS, "tag1,tag2"); AsyncDispatcher dispatcher = new AsyncDispatcher(); dispatcher.init(conf); dispatcher.start(); OutputCommitter committer = mock(OutputCommitter.class); CommitterEventHandler commitHandler = createCommitterEventHandler(dispatcher, committer); commitHandler.init(conf); commitHandler.start(); JobSubmittedEventHandler jseHandler = new JobSubmittedEventHandler("testId", "testName", "testNodeName", "\"key2\"=\"value2\" \"key1\"=\"value1\" ", "tag1,tag2"); dispatcher.register(EventType.class, jseHandler); JobImpl job = createStubbedJob(conf, dispatcher, 0); job.handle(new JobEvent(job.getID(), JobEventType.JOB_INIT)); assertJobState(job, JobStateInternal.INITED); job.handle(new JobStartEvent(job.getID())); assertJobState(job, JobStateInternal.SUCCEEDED); dispatcher.stop(); commitHandler.stop(); try { Assert.assertTrue(jseHandler.getAssertValue()); } catch (InterruptedException e) { Assert.fail("Workflow related attributes are not tested properly"); } }
private static StubbedJob createStubbedJob(Configuration conf, Dispatcher dispatcher, int numSplits) { JobID jobID = JobID.forName("job_1234567890000_0001"); JobId jobId = TypeConverter.toYarn(jobID); StubbedJob job = new StubbedJob(jobId, ApplicationAttemptId.newInstance(ApplicationId.newInstance(0, 0), 0), conf,dispatcher.getEventHandler(), true, "somebody", numSplits); dispatcher.register(JobEventType.class, job); EventHandler mockHandler = mock(EventHandler.class); dispatcher.register(TaskEventType.class, mockHandler); dispatcher.register(org.apache.hadoop.mapreduce.jobhistory.EventType.class, mockHandler); dispatcher.register(JobFinishEvent.Type.class, mockHandler); return job; }
private List<AMInfo> readJustAMInfos() { List<AMInfo> amInfos = new ArrayList<AMInfo>(); FSDataInputStream inputStream = null; try { inputStream = getPreviousJobHistoryStream(getConfig(), appAttemptID); EventReader jobHistoryEventReader = new EventReader(inputStream); // All AMInfos are contiguous. Track when the first AMStartedEvent // appears. boolean amStartedEventsBegan = false; HistoryEvent event; while ((event = jobHistoryEventReader.getNextEvent()) != null) { if (event.getEventType() == EventType.AM_STARTED) { if (!amStartedEventsBegan) { // First AMStartedEvent. amStartedEventsBegan = true; } AMStartedEvent amStartedEvent = (AMStartedEvent) event; amInfos.add(MRBuilderUtils.newAMInfo( amStartedEvent.getAppAttemptId(), amStartedEvent.getStartTime(), amStartedEvent.getContainerId(), StringInterner.weakIntern(amStartedEvent.getNodeManagerHost()), amStartedEvent.getNodeManagerPort(), amStartedEvent.getNodeManagerHttpPort())); } else if (amStartedEventsBegan) { // This means AMStartedEvents began and this event is a // non-AMStarted event. // No need to continue reading all the other events. break; } } } catch (IOException e) { LOG.warn("Could not parse the old history file. " + "Will not have old AMinfos ", e); } finally { if (inputStream != null) { IOUtils.closeQuietly(inputStream); } } return amInfos; }
@Test public void testRecoverySuccessAttempt() { LOG.info("--- START: testRecoverySuccessAttempt ---"); long clusterTimestamp = System.currentTimeMillis(); EventHandler mockEventHandler = mock(EventHandler.class); MapTaskImpl recoverMapTask = getMockMapTask(clusterTimestamp, mockEventHandler); TaskId taskId = recoverMapTask.getID(); JobID jobID = new JobID(Long.toString(clusterTimestamp), 1); TaskID taskID = new TaskID(jobID, org.apache.hadoop.mapreduce.TaskType.MAP, taskId.getId()); //Mock up the TaskAttempts Map<TaskAttemptID, TaskAttemptInfo> mockTaskAttempts = new HashMap<TaskAttemptID, TaskAttemptInfo>(); TaskAttemptID taId1 = new TaskAttemptID(taskID, 2); TaskAttemptInfo mockTAinfo1 = getMockTaskAttemptInfo(taId1, TaskAttemptState.SUCCEEDED); mockTaskAttempts.put(taId1, mockTAinfo1); TaskAttemptID taId2 = new TaskAttemptID(taskID, 1); TaskAttemptInfo mockTAinfo2 = getMockTaskAttemptInfo(taId2, TaskAttemptState.FAILED); mockTaskAttempts.put(taId2, mockTAinfo2); OutputCommitter mockCommitter = mock (OutputCommitter.class); TaskInfo mockTaskInfo = mock(TaskInfo.class); when(mockTaskInfo.getTaskStatus()).thenReturn("SUCCEEDED"); when(mockTaskInfo.getTaskId()).thenReturn(taskID); when(mockTaskInfo.getAllTaskAttempts()).thenReturn(mockTaskAttempts); recoverMapTask.handle( new TaskRecoverEvent(taskId, mockTaskInfo,mockCommitter, true)); ArgumentCaptor<Event> arg = ArgumentCaptor.forClass(Event.class); verify(mockEventHandler,atLeast(1)).handle( (org.apache.hadoop.yarn.event.Event) arg.capture()); Map<TaskAttemptID, TaskAttemptState> finalAttemptStates = new HashMap<TaskAttemptID, TaskAttemptState>(); finalAttemptStates.put(taId1, TaskAttemptState.SUCCEEDED); finalAttemptStates.put(taId2, TaskAttemptState.FAILED); List<EventType> jobHistoryEvents = new ArrayList<EventType>(); jobHistoryEvents.add(EventType.TASK_STARTED); jobHistoryEvents.add(EventType.MAP_ATTEMPT_STARTED); jobHistoryEvents.add(EventType.MAP_ATTEMPT_FINISHED); jobHistoryEvents.add(EventType.MAP_ATTEMPT_STARTED); jobHistoryEvents.add(EventType.MAP_ATTEMPT_FAILED); jobHistoryEvents.add(EventType.TASK_FINISHED); recoveryChecker(recoverMapTask, TaskState.SUCCEEDED, finalAttemptStates, arg, jobHistoryEvents, 2L, 1L); }
@Test public void testRecoveryAllFailAttempts() { LOG.info("--- START: testRecoveryAllFailAttempts ---"); long clusterTimestamp = System.currentTimeMillis(); EventHandler mockEventHandler = mock(EventHandler.class); MapTaskImpl recoverMapTask = getMockMapTask(clusterTimestamp, mockEventHandler); TaskId taskId = recoverMapTask.getID(); JobID jobID = new JobID(Long.toString(clusterTimestamp), 1); TaskID taskID = new TaskID(jobID, org.apache.hadoop.mapreduce.TaskType.MAP, taskId.getId()); //Mock up the TaskAttempts Map<TaskAttemptID, TaskAttemptInfo> mockTaskAttempts = new HashMap<TaskAttemptID, TaskAttemptInfo>(); TaskAttemptID taId1 = new TaskAttemptID(taskID, 2); TaskAttemptInfo mockTAinfo1 = getMockTaskAttemptInfo(taId1, TaskAttemptState.FAILED); mockTaskAttempts.put(taId1, mockTAinfo1); TaskAttemptID taId2 = new TaskAttemptID(taskID, 1); TaskAttemptInfo mockTAinfo2 = getMockTaskAttemptInfo(taId2, TaskAttemptState.FAILED); mockTaskAttempts.put(taId2, mockTAinfo2); OutputCommitter mockCommitter = mock (OutputCommitter.class); TaskInfo mockTaskInfo = mock(TaskInfo.class); when(mockTaskInfo.getTaskStatus()).thenReturn("FAILED"); when(mockTaskInfo.getTaskId()).thenReturn(taskID); when(mockTaskInfo.getAllTaskAttempts()).thenReturn(mockTaskAttempts); recoverMapTask.handle( new TaskRecoverEvent(taskId, mockTaskInfo, mockCommitter, true)); ArgumentCaptor<Event> arg = ArgumentCaptor.forClass(Event.class); verify(mockEventHandler,atLeast(1)).handle( (org.apache.hadoop.yarn.event.Event) arg.capture()); Map<TaskAttemptID, TaskAttemptState> finalAttemptStates = new HashMap<TaskAttemptID, TaskAttemptState>(); finalAttemptStates.put(taId1, TaskAttemptState.FAILED); finalAttemptStates.put(taId2, TaskAttemptState.FAILED); List<EventType> jobHistoryEvents = new ArrayList<EventType>(); jobHistoryEvents.add(EventType.TASK_STARTED); jobHistoryEvents.add(EventType.MAP_ATTEMPT_STARTED); jobHistoryEvents.add(EventType.MAP_ATTEMPT_FAILED); jobHistoryEvents.add(EventType.MAP_ATTEMPT_STARTED); jobHistoryEvents.add(EventType.MAP_ATTEMPT_FAILED); jobHistoryEvents.add(EventType.TASK_FAILED); recoveryChecker(recoverMapTask, TaskState.FAILED, finalAttemptStates, arg, jobHistoryEvents, 2L, 2L); }
@Test public void testRecoveryTaskSuccessAllAttemptsFail() { LOG.info("--- START: testRecoveryTaskSuccessAllAttemptsFail ---"); long clusterTimestamp = System.currentTimeMillis(); EventHandler mockEventHandler = mock(EventHandler.class); MapTaskImpl recoverMapTask = getMockMapTask(clusterTimestamp, mockEventHandler); TaskId taskId = recoverMapTask.getID(); JobID jobID = new JobID(Long.toString(clusterTimestamp), 1); TaskID taskID = new TaskID(jobID, org.apache.hadoop.mapreduce.TaskType.MAP, taskId.getId()); //Mock up the TaskAttempts Map<TaskAttemptID, TaskAttemptInfo> mockTaskAttempts = new HashMap<TaskAttemptID, TaskAttemptInfo>(); TaskAttemptID taId1 = new TaskAttemptID(taskID, 2); TaskAttemptInfo mockTAinfo1 = getMockTaskAttemptInfo(taId1, TaskAttemptState.FAILED); mockTaskAttempts.put(taId1, mockTAinfo1); TaskAttemptID taId2 = new TaskAttemptID(taskID, 1); TaskAttemptInfo mockTAinfo2 = getMockTaskAttemptInfo(taId2, TaskAttemptState.FAILED); mockTaskAttempts.put(taId2, mockTAinfo2); OutputCommitter mockCommitter = mock (OutputCommitter.class); TaskInfo mockTaskInfo = mock(TaskInfo.class); when(mockTaskInfo.getTaskStatus()).thenReturn("SUCCEEDED"); when(mockTaskInfo.getTaskId()).thenReturn(taskID); when(mockTaskInfo.getAllTaskAttempts()).thenReturn(mockTaskAttempts); recoverMapTask.handle( new TaskRecoverEvent(taskId, mockTaskInfo, mockCommitter, true)); ArgumentCaptor<Event> arg = ArgumentCaptor.forClass(Event.class); verify(mockEventHandler,atLeast(1)).handle( (org.apache.hadoop.yarn.event.Event) arg.capture()); Map<TaskAttemptID, TaskAttemptState> finalAttemptStates = new HashMap<TaskAttemptID, TaskAttemptState>(); finalAttemptStates.put(taId1, TaskAttemptState.FAILED); finalAttemptStates.put(taId2, TaskAttemptState.FAILED); // check for one new attempt launched since successful attempt not found TaskAttemptID taId3 = new TaskAttemptID(taskID, 2000); finalAttemptStates.put(taId3, TaskAttemptState.NEW); List<EventType> jobHistoryEvents = new ArrayList<EventType>(); jobHistoryEvents.add(EventType.TASK_STARTED); jobHistoryEvents.add(EventType.MAP_ATTEMPT_STARTED); jobHistoryEvents.add(EventType.MAP_ATTEMPT_FAILED); jobHistoryEvents.add(EventType.MAP_ATTEMPT_STARTED); jobHistoryEvents.add(EventType.MAP_ATTEMPT_FAILED); recoveryChecker(recoverMapTask, TaskState.RUNNING, finalAttemptStates, arg, jobHistoryEvents, 2L, 2L); }
@Test public void testRecoveryTaskSuccessAllAttemptsSucceed() { LOG.info("--- START: testRecoveryTaskSuccessAllAttemptsFail ---"); long clusterTimestamp = System.currentTimeMillis(); EventHandler mockEventHandler = mock(EventHandler.class); MapTaskImpl recoverMapTask = getMockMapTask(clusterTimestamp, mockEventHandler); TaskId taskId = recoverMapTask.getID(); JobID jobID = new JobID(Long.toString(clusterTimestamp), 1); TaskID taskID = new TaskID(jobID, org.apache.hadoop.mapreduce.TaskType.MAP, taskId.getId()); //Mock up the TaskAttempts Map<TaskAttemptID, TaskAttemptInfo> mockTaskAttempts = new HashMap<TaskAttemptID, TaskAttemptInfo>(); TaskAttemptID taId1 = new TaskAttemptID(taskID, 2); TaskAttemptInfo mockTAinfo1 = getMockTaskAttemptInfo(taId1, TaskAttemptState.SUCCEEDED); mockTaskAttempts.put(taId1, mockTAinfo1); TaskAttemptID taId2 = new TaskAttemptID(taskID, 1); TaskAttemptInfo mockTAinfo2 = getMockTaskAttemptInfo(taId2, TaskAttemptState.SUCCEEDED); mockTaskAttempts.put(taId2, mockTAinfo2); OutputCommitter mockCommitter = mock (OutputCommitter.class); TaskInfo mockTaskInfo = mock(TaskInfo.class); when(mockTaskInfo.getTaskStatus()).thenReturn("SUCCEEDED"); when(mockTaskInfo.getTaskId()).thenReturn(taskID); when(mockTaskInfo.getAllTaskAttempts()).thenReturn(mockTaskAttempts); recoverMapTask.handle( new TaskRecoverEvent(taskId, mockTaskInfo, mockCommitter, true)); ArgumentCaptor<Event> arg = ArgumentCaptor.forClass(Event.class); verify(mockEventHandler,atLeast(1)).handle( (org.apache.hadoop.yarn.event.Event) arg.capture()); Map<TaskAttemptID, TaskAttemptState> finalAttemptStates = new HashMap<TaskAttemptID, TaskAttemptState>(); finalAttemptStates.put(taId1, TaskAttemptState.SUCCEEDED); finalAttemptStates.put(taId2, TaskAttemptState.SUCCEEDED); List<EventType> jobHistoryEvents = new ArrayList<EventType>(); jobHistoryEvents.add(EventType.TASK_STARTED); jobHistoryEvents.add(EventType.MAP_ATTEMPT_STARTED); jobHistoryEvents.add(EventType.MAP_ATTEMPT_FINISHED); jobHistoryEvents.add(EventType.MAP_ATTEMPT_STARTED); jobHistoryEvents.add(EventType.MAP_ATTEMPT_FINISHED); jobHistoryEvents.add(EventType.TASK_FINISHED); recoveryChecker(recoverMapTask, TaskState.SUCCEEDED, finalAttemptStates, arg, jobHistoryEvents, 2L, 0L); }
@Test public void testRecoveryAllAttemptsKilled() { LOG.info("--- START: testRecoveryAllAttemptsKilled ---"); long clusterTimestamp = System.currentTimeMillis(); EventHandler mockEventHandler = mock(EventHandler.class); MapTaskImpl recoverMapTask = getMockMapTask(clusterTimestamp, mockEventHandler); TaskId taskId = recoverMapTask.getID(); JobID jobID = new JobID(Long.toString(clusterTimestamp), 1); TaskID taskID = new TaskID(jobID, org.apache.hadoop.mapreduce.TaskType.MAP, taskId.getId()); //Mock up the TaskAttempts Map<TaskAttemptID, TaskAttemptInfo> mockTaskAttempts = new HashMap<TaskAttemptID, TaskAttemptInfo>(); TaskAttemptID taId1 = new TaskAttemptID(taskID, 2); TaskAttemptInfo mockTAinfo1 = getMockTaskAttemptInfo(taId1, TaskAttemptState.KILLED); mockTaskAttempts.put(taId1, mockTAinfo1); TaskAttemptID taId2 = new TaskAttemptID(taskID, 1); TaskAttemptInfo mockTAinfo2 = getMockTaskAttemptInfo(taId2, TaskAttemptState.KILLED); mockTaskAttempts.put(taId2, mockTAinfo2); OutputCommitter mockCommitter = mock (OutputCommitter.class); TaskInfo mockTaskInfo = mock(TaskInfo.class); when(mockTaskInfo.getTaskStatus()).thenReturn("KILLED"); when(mockTaskInfo.getTaskId()).thenReturn(taskID); when(mockTaskInfo.getAllTaskAttempts()).thenReturn(mockTaskAttempts); recoverMapTask.handle( new TaskRecoverEvent(taskId, mockTaskInfo, mockCommitter, true)); ArgumentCaptor<Event> arg = ArgumentCaptor.forClass(Event.class); verify(mockEventHandler,atLeast(1)).handle( (org.apache.hadoop.yarn.event.Event) arg.capture()); Map<TaskAttemptID, TaskAttemptState> finalAttemptStates = new HashMap<TaskAttemptID, TaskAttemptState>(); finalAttemptStates.put(taId1, TaskAttemptState.KILLED); finalAttemptStates.put(taId2, TaskAttemptState.KILLED); List<EventType> jobHistoryEvents = new ArrayList<EventType>(); jobHistoryEvents.add(EventType.TASK_STARTED); jobHistoryEvents.add(EventType.MAP_ATTEMPT_STARTED); jobHistoryEvents.add(EventType.MAP_ATTEMPT_KILLED); jobHistoryEvents.add(EventType.MAP_ATTEMPT_STARTED); jobHistoryEvents.add(EventType.MAP_ATTEMPT_KILLED); jobHistoryEvents.add(EventType.TASK_FAILED); recoveryChecker(recoverMapTask, TaskState.KILLED, finalAttemptStates, arg, jobHistoryEvents, 2L, 0L); }
private void recoveryChecker(MapTaskImpl checkTask, TaskState finalState, Map<TaskAttemptID, TaskAttemptState> finalAttemptStates, ArgumentCaptor<Event> arg, List<EventType> expectedJobHistoryEvents, long expectedMapLaunches, long expectedFailedMaps) { assertEquals("Final State of Task", finalState, checkTask.getState()); Map<TaskAttemptId, TaskAttempt> recoveredAttempts = checkTask.getAttempts(); assertEquals("Expected Number of Task Attempts", finalAttemptStates.size(), recoveredAttempts.size()); for (TaskAttemptID taID : finalAttemptStates.keySet()) { assertEquals("Expected Task Attempt State", finalAttemptStates.get(taID), recoveredAttempts.get(TypeConverter.toYarn(taID)).getState()); } Iterator<Event> ie = arg.getAllValues().iterator(); int eventNum = 0; long totalLaunchedMaps = 0; long totalFailedMaps = 0; boolean jobTaskEventReceived = false; while (ie.hasNext()) { Object current = ie.next(); ++eventNum; LOG.info(eventNum + " " + current.getClass().getName()); if (current instanceof JobHistoryEvent) { JobHistoryEvent jhe = (JobHistoryEvent) current; LOG.info(expectedJobHistoryEvents.get(0).toString() + " " + jhe.getHistoryEvent().getEventType().toString() + " " + jhe.getJobID()); assertEquals(expectedJobHistoryEvents.get(0), jhe.getHistoryEvent().getEventType()); expectedJobHistoryEvents.remove(0); } else if (current instanceof JobCounterUpdateEvent) { JobCounterUpdateEvent jcue = (JobCounterUpdateEvent) current; LOG.info("JobCounterUpdateEvent " + jcue.getCounterUpdates().get(0).getCounterKey() + " " + jcue.getCounterUpdates().get(0).getIncrementValue()); if (jcue.getCounterUpdates().get(0).getCounterKey() == JobCounter.NUM_FAILED_MAPS) { totalFailedMaps += jcue.getCounterUpdates().get(0) .getIncrementValue(); } else if (jcue.getCounterUpdates().get(0).getCounterKey() == JobCounter.TOTAL_LAUNCHED_MAPS) { totalLaunchedMaps += jcue.getCounterUpdates().get(0) .getIncrementValue(); } } else if (current instanceof JobTaskEvent) { JobTaskEvent jte = (JobTaskEvent) current; assertEquals(jte.getState(), finalState); jobTaskEventReceived = true; } } assertTrue(jobTaskEventReceived || (finalState == TaskState.RUNNING)); assertEquals("Did not process all expected JobHistoryEvents", 0, expectedJobHistoryEvents.size()); assertEquals("Expected Map Launches", expectedMapLaunches, totalLaunchedMaps); assertEquals("Expected Failed Maps", expectedFailedMaps, totalFailedMaps); }