private List<AMInfo> readJustAMInfos() { List<AMInfo> amInfos = new ArrayList<AMInfo>(); FSDataInputStream inputStream = null; try { inputStream = getPreviousJobHistoryStream(getConfig(), appAttemptID); EventReader jobHistoryEventReader = new EventReader(inputStream); // All AMInfos are contiguous. Track when the first AMStartedEvent // appears. boolean amStartedEventsBegan = false; HistoryEvent event; while ((event = jobHistoryEventReader.getNextEvent()) != null) { if (event.getEventType() == EventType.AM_STARTED) { if (!amStartedEventsBegan) { // First AMStartedEvent. amStartedEventsBegan = true; } AMStartedEvent amStartedEvent = (AMStartedEvent) event; amInfos.add(MRBuilderUtils.newAMInfo( amStartedEvent.getAppAttemptId(), amStartedEvent.getStartTime(), amStartedEvent.getContainerId(), StringInterner.weakIntern(amStartedEvent.getNodeManagerHost()), amStartedEvent.getNodeManagerPort(), amStartedEvent.getNodeManagerHttpPort())); } else if (amStartedEventsBegan) { // This means AMStartedEvents began and this event is a // non-AMStarted event. // No need to continue reading all the other events. break; } } } catch (IOException e) { LOG.warn("Could not parse the old history file. " + "Will not have old AMinfos ", e); } finally { if (inputStream != null) { IOUtils.closeQuietly(inputStream); } } return amInfos; }
@Test public void testMultipleFailedTasks() throws Exception { JobHistoryParser parser = new JobHistoryParser(Mockito.mock(FSDataInputStream.class)); EventReader reader = Mockito.mock(EventReader.class); final AtomicInteger numEventsRead = new AtomicInteger(0); // Hack! final org.apache.hadoop.mapreduce.TaskType taskType = org.apache.hadoop.mapreduce.TaskType.MAP; final TaskID[] tids = new TaskID[2]; final JobID jid = new JobID("1", 1); tids[0] = new TaskID(jid, taskType, 0); tids[1] = new TaskID(jid, taskType, 1); Mockito.when(reader.getNextEvent()).thenAnswer( new Answer<HistoryEvent>() { public HistoryEvent answer(InvocationOnMock invocation) throws IOException { // send two task start and two task fail events for tasks 0 and 1 int eventId = numEventsRead.getAndIncrement(); TaskID tid = tids[eventId & 0x1]; if (eventId < 2) { return new TaskStartedEvent(tid, 0, taskType, ""); } if (eventId < 4) { TaskFailedEvent tfe = new TaskFailedEvent(tid, 0, taskType, "failed", "FAILED", null, new Counters()); tfe.setDatum(tfe.getDatum()); return tfe; } if (eventId < 5) { JobUnsuccessfulCompletionEvent juce = new JobUnsuccessfulCompletionEvent(jid, 100L, 2, 0, "JOB_FAILED", Collections.singletonList( "Task failed: " + tids[0].toString())); return juce; } return null; } }); JobInfo info = parser.parse(reader); assertTrue("Task 0 not implicated", info.getErrorInfo().contains(tids[0].toString())); }
public CurrentJHParser(InputStream input) throws IOException { reader = new EventReader(new DataInputStream(input)); }