@SuppressWarnings("unchecked") private void sendLaunchedEvents() { JobCounterUpdateEvent jce = new JobCounterUpdateEvent(attemptId.getTaskId() .getJobId()); jce.addCounterUpdate(attemptId.getTaskId().getTaskType() == TaskType.MAP ? JobCounter.TOTAL_LAUNCHED_MAPS : JobCounter.TOTAL_LAUNCHED_REDUCES, 1); eventHandler.handle(jce); LOG.info("TaskAttempt: [" + attemptId + "] using containerId: [" + container.getId() + " on NM: [" + StringInterner.weakIntern(container.getNodeId().toString()) + "]"); TaskAttemptStartedEvent tase = new TaskAttemptStartedEvent(TypeConverter.fromYarn(attemptId), TypeConverter.fromYarn(attemptId.getTaskId().getTaskType()), launchTime, trackerName, httpPort, shufflePort, container.getId(), locality.toString(), avataar.toString()); eventHandler.handle( new JobHistoryEvent(attemptId.getTaskId().getJobId(), tase)); }
@SuppressWarnings("unchecked") private ContainerRequest assignToFailedMap(Container allocated) { //try to assign to earlierFailedMaps if present ContainerRequest assigned = null; while (assigned == null && earlierFailedMaps.size() > 0 && canAssignMaps()) { TaskAttemptId tId = earlierFailedMaps.removeFirst(); if (maps.containsKey(tId)) { assigned = maps.remove(tId); JobCounterUpdateEvent jce = new JobCounterUpdateEvent(assigned.attemptID.getTaskId().getJobId()); jce.addCounterUpdate(JobCounter.OTHER_LOCAL_MAPS, 1); eventHandler.handle(jce); LOG.info("Assigned from earlierFailedMaps"); break; } } return assigned; }
/** * Launches a MR job and tests the job counters against the expected values. * @param testName The name for the job * @param mr The MR cluster * @param fileSys The FileSystem * @param in Input path * @param out Output path * @param numMaps Number of maps * @param otherLocalMaps Expected value of other local maps * @param datalocalMaps Expected value of data(node) local maps * @param racklocalMaps Expected value of rack local maps */ static void launchJobAndTestCounters(String jobName, MiniMRCluster mr, FileSystem fileSys, Path in, Path out, int numMaps, int otherLocalMaps, int dataLocalMaps, int rackLocalMaps) throws IOException { JobConf jobConf = mr.createJobConf(); if (fileSys.exists(out)) { fileSys.delete(out, true); } RunningJob job = launchJob(jobConf, in, out, numMaps, jobName); Counters counters = job.getCounters(); assertEquals("Number of local maps", counters.getCounter(JobCounter.OTHER_LOCAL_MAPS), otherLocalMaps); assertEquals("Number of Data-local maps", counters.getCounter(JobCounter.DATA_LOCAL_MAPS), dataLocalMaps); assertEquals("Number of Rack-local maps", counters.getCounter(JobCounter.RACK_LOCAL_MAPS), rackLocalMaps); mr.waitUntilIdle(); mr.shutdown(); }
protected void verifySleepJobCounters(Job job) throws InterruptedException, IOException { Counters counters = job.getCounters(); Assert.assertEquals(3, counters.findCounter(JobCounter.OTHER_LOCAL_MAPS) .getValue()); Assert.assertEquals(3, counters.findCounter(JobCounter.TOTAL_LAUNCHED_MAPS) .getValue()); Assert.assertEquals(numSleepReducers, counters.findCounter(JobCounter.TOTAL_LAUNCHED_REDUCES).getValue()); Assert .assertTrue(counters.findCounter(JobCounter.SLOTS_MILLIS_MAPS) != null && counters.findCounter(JobCounter.SLOTS_MILLIS_MAPS).getValue() != 0); Assert .assertTrue(counters.findCounter(JobCounter.SLOTS_MILLIS_MAPS) != null && counters.findCounter(JobCounter.SLOTS_MILLIS_MAPS).getValue() != 0); }
@SuppressWarnings("deprecation") private void checkLegacyNames(Counters counters) { assertEquals("New name", 1, counters.findCounter( TaskCounter.class.getName(), "MAP_INPUT_RECORDS").getValue()); assertEquals("Legacy name", 1, counters.findCounter( "org.apache.hadoop.mapred.Task$Counter", "MAP_INPUT_RECORDS").getValue()); assertEquals("Legacy enum", 1, counters.findCounter(Task.Counter.MAP_INPUT_RECORDS).getValue()); assertEquals("New name", 1, counters.findCounter( JobCounter.class.getName(), "DATA_LOCAL_MAPS").getValue()); assertEquals("Legacy name", 1, counters.findCounter( "org.apache.hadoop.mapred.JobInProgress$Counter", "DATA_LOCAL_MAPS").getValue()); assertEquals("Legacy enum", 1, counters.findCounter(JobInProgress.Counter.DATA_LOCAL_MAPS).getValue()); assertEquals("New name", 1, counters.findCounter( FileSystemCounter.class.getName(), "FILE_BYTES_READ").getValue()); assertEquals("New name and method", 1, counters.findCounter("file", FileSystemCounter.BYTES_READ).getValue()); assertEquals("Legacy name", 1, counters.findCounter( "FileSystemCounters", "FILE_BYTES_READ").getValue()); }
@SuppressWarnings("rawtypes") @Test public void testFrameworkCounter() { GroupFactory groupFactory = new GroupFactoryForTest(); FrameworkGroupFactory frameworkGroupFactory = groupFactory.newFrameworkGroupFactory(JobCounter.class); Group group = (Group) frameworkGroupFactory.newGroup("JobCounter"); FrameworkCounterGroup counterGroup = (FrameworkCounterGroup) group.getUnderlyingGroup(); org.apache.hadoop.mapreduce.Counter count1 = counterGroup.findCounter(JobCounter.NUM_FAILED_MAPS.toString()); Assert.assertNotNull(count1); // Verify no exception get thrown when finding an unknown counter org.apache.hadoop.mapreduce.Counter count2 = counterGroup.findCounter("Unknown"); Assert.assertNull(count2); }
@SuppressWarnings("unchecked") private ContainerRequest assignToFailedMap(Container allocated) { //try to assign to earlierFailedMaps if present ContainerRequest assigned = null; while (assigned == null && earlierFailedMaps.size() > 0) { TaskAttemptId tId = earlierFailedMaps.removeFirst(); if (maps.containsKey(tId)) { assigned = maps.remove(tId); JobCounterUpdateEvent jce = new JobCounterUpdateEvent(assigned.attemptID.getTaskId().getJobId()); jce.addCounterUpdate(JobCounter.OTHER_LOCAL_MAPS, 1); eventHandler.handle(jce); LOG.info("Assigned from earlierFailedMaps"); break; } } return assigned; }
private static JobCounterUpdateEvent createJobCounterUpdateEventTAFailed( TaskAttemptImpl taskAttempt, boolean taskAlreadyCompleted) { TaskType taskType = taskAttempt.getID().getTaskId().getTaskType(); JobCounterUpdateEvent jce = new JobCounterUpdateEvent(taskAttempt.getID().getTaskId().getJobId()); long slotMillisIncrement = computeSlotMillis(taskAttempt); if (taskType == TaskType.MAP) { jce.addCounterUpdate(JobCounter.NUM_FAILED_MAPS, 1); if(!taskAlreadyCompleted) { // dont double count the elapsed time jce.addCounterUpdate(JobCounter.SLOTS_MILLIS_MAPS, slotMillisIncrement); } } else { jce.addCounterUpdate(JobCounter.NUM_FAILED_REDUCES, 1); if(!taskAlreadyCompleted) { // dont double count the elapsed time jce.addCounterUpdate(JobCounter.SLOTS_MILLIS_REDUCES, slotMillisIncrement); } } return jce; }
private static JobCounterUpdateEvent createJobCounterUpdateEventTAKilled( TaskAttemptImpl taskAttempt, boolean taskAlreadyCompleted) { TaskType taskType = taskAttempt.getID().getTaskId().getTaskType(); JobCounterUpdateEvent jce = new JobCounterUpdateEvent(taskAttempt.getID().getTaskId().getJobId()); long slotMillisIncrement = computeSlotMillis(taskAttempt); if (taskType == TaskType.MAP) { jce.addCounterUpdate(JobCounter.NUM_KILLED_MAPS, 1); if(!taskAlreadyCompleted) { // dont double count the elapsed time jce.addCounterUpdate(JobCounter.SLOTS_MILLIS_MAPS, slotMillisIncrement); } } else { jce.addCounterUpdate(JobCounter.NUM_KILLED_REDUCES, 1); if(!taskAlreadyCompleted) { // dont double count the elapsed time jce.addCounterUpdate(JobCounter.SLOTS_MILLIS_REDUCES, slotMillisIncrement); } } return jce; }
protected void verifySleepJobCounters(Job job) throws InterruptedException, IOException { Counters counters = job.getCounters(); Assert.assertEquals(3, counters.findCounter(JobCounter.OTHER_LOCAL_MAPS) .getValue()); Assert.assertEquals(3, counters.findCounter(JobCounter.TOTAL_LAUNCHED_MAPS) .getValue()); Assert.assertEquals(2, counters.findCounter(JobCounter.TOTAL_LAUNCHED_REDUCES).getValue()); Assert .assertTrue(counters.findCounter(JobCounter.SLOTS_MILLIS_MAPS) != null && counters.findCounter(JobCounter.SLOTS_MILLIS_MAPS).getValue() != 0); Assert .assertTrue(counters.findCounter(JobCounter.SLOTS_MILLIS_MAPS) != null && counters.findCounter(JobCounter.SLOTS_MILLIS_MAPS).getValue() != 0); }
@Override protected void verifySleepJobCounters(Job job) throws InterruptedException, IOException { Counters counters = job.getCounters(); Assert.assertEquals(3, counters.findCounter(JobCounter.OTHER_LOCAL_MAPS) .getValue()); Assert.assertEquals(3, counters.findCounter(JobCounter.TOTAL_LAUNCHED_MAPS) .getValue()); Assert.assertEquals(1, counters.findCounter(JobCounter.TOTAL_LAUNCHED_REDUCES).getValue()); Assert .assertTrue(counters.findCounter(JobCounter.SLOTS_MILLIS_MAPS) != null && counters.findCounter(JobCounter.SLOTS_MILLIS_MAPS).getValue() != 0); Assert .assertTrue(counters.findCounter(JobCounter.SLOTS_MILLIS_MAPS) != null && counters.findCounter(JobCounter.SLOTS_MILLIS_MAPS).getValue() != 0); Assert.assertEquals(3, counters.findCounter(JobCounter.NUM_UBER_SUBMAPS) .getValue()); Assert.assertEquals(1, counters.findCounter(JobCounter.NUM_UBER_SUBREDUCES) .getValue()); Assert.assertEquals(4, counters.findCounter(JobCounter.TOTAL_LAUNCHED_UBERTASKS).getValue()); }
@Override protected void verifyFailingMapperCounters(Job job) throws InterruptedException, IOException { Counters counters = job.getCounters(); Assert.assertEquals(2, counters.findCounter(JobCounter.OTHER_LOCAL_MAPS) .getValue()); Assert.assertEquals(2, counters.findCounter(JobCounter.TOTAL_LAUNCHED_MAPS) .getValue()); Assert.assertEquals(2, counters.findCounter(JobCounter.NUM_FAILED_MAPS) .getValue()); Assert .assertTrue(counters.findCounter(JobCounter.SLOTS_MILLIS_MAPS) != null && counters.findCounter(JobCounter.SLOTS_MILLIS_MAPS).getValue() != 0); Assert.assertEquals(2, counters.findCounter(JobCounter.TOTAL_LAUNCHED_UBERTASKS).getValue()); Assert.assertEquals(2, counters.findCounter(JobCounter.NUM_UBER_SUBMAPS) .getValue()); Assert.assertEquals(2, counters .findCounter(JobCounter.NUM_FAILED_UBERTASKS).getValue()); }