@SuppressWarnings("deprecation") private void checkLegacyNames(Counters counters) { assertEquals("New name", 1, counters.findCounter( TaskCounter.class.getName(), "MAP_INPUT_RECORDS").getValue()); assertEquals("Legacy name", 1, counters.findCounter( "org.apache.hadoop.mapred.Task$Counter", "MAP_INPUT_RECORDS").getValue()); assertEquals("Legacy enum", 1, counters.findCounter(Task.Counter.MAP_INPUT_RECORDS).getValue()); assertEquals("New name", 1, counters.findCounter( JobCounter.class.getName(), "DATA_LOCAL_MAPS").getValue()); assertEquals("Legacy name", 1, counters.findCounter( "org.apache.hadoop.mapred.JobInProgress$Counter", "DATA_LOCAL_MAPS").getValue()); assertEquals("Legacy enum", 1, counters.findCounter(JobInProgress.Counter.DATA_LOCAL_MAPS).getValue()); assertEquals("New name", 1, counters.findCounter( FileSystemCounter.class.getName(), "FILE_BYTES_READ").getValue()); assertEquals("New name and method", 1, counters.findCounter("file", FileSystemCounter.BYTES_READ).getValue()); assertEquals("Legacy name", 1, counters.findCounter( "FileSystemCounters", "FILE_BYTES_READ").getValue()); }
@Test public void testFileSystemGroupIteratorConcurrency() { Counters counters = new Counters(); // create 2 filesystem counter groups counters.findCounter("fs1", FileSystemCounter.BYTES_READ).increment(1); counters.findCounter("fs2", FileSystemCounter.BYTES_READ).increment(1); // Iterate over the counters in this group while updating counters in // the group Group group = counters.getGroup(FileSystemCounter.class.getName()); Iterator<Counter> iterator = group.iterator(); counters.findCounter("fs3", FileSystemCounter.BYTES_READ).increment(1); assertTrue(iterator.hasNext()); iterator.next(); counters.findCounter("fs3", FileSystemCounter.BYTES_READ).increment(1); assertTrue(iterator.hasNext()); iterator.next(); }
@Test public void testLegacyGetGroupNames() { Counters counters = new Counters(); // create 2 filesystem counter groups counters.findCounter("fs1", FileSystemCounter.BYTES_READ).increment(1); counters.findCounter("fs2", FileSystemCounter.BYTES_READ).increment(1); counters.incrCounter("group1", "counter1", 1); HashSet<String> groups = new HashSet<String>(counters.getGroupNames()); HashSet<String> expectedGroups = new HashSet<String>(); expectedGroups.add("group1"); expectedGroups.add("FileSystemCounters"); //Legacy Name expectedGroups.add("org.apache.hadoop.mapreduce.FileSystemCounter"); assertEquals(expectedGroups, groups); }
/** * check the counters to see whether the task has exceeded any configured * limits. * @throws TaskLimitException */ protected void checkTaskLimits() throws TaskLimitException { // check the limit for writing to local file system long limit = conf.getLong(MRJobConfig.TASK_LOCAL_WRITE_LIMIT_BYTES, MRJobConfig.DEFAULT_TASK_LOCAL_WRITE_LIMIT_BYTES); if (limit >= 0) { Counters.Counter localWritesCounter = null; try { LocalFileSystem localFS = FileSystem.getLocal(conf); localWritesCounter = counters.findCounter(localFS.getScheme(), FileSystemCounter.BYTES_WRITTEN); } catch (IOException e) { LOG.warn("Could not get LocalFileSystem BYTES_WRITTEN counter"); } if (localWritesCounter != null && localWritesCounter.getCounter() > limit) { throw new TaskLimitException("too much write to local file system." + " current value is " + localWritesCounter.getCounter() + " the limit is " + limit); } } }
private static void runSort(JobConf job, Path sortInput, Path sortOutput) throws Exception { job.setInt("mapred.job.reuse.jvm.num.tasks", -1); job.setInt("io.sort.mb", 1); job.setNumMapTasks(12); // Setup command-line arguments to 'sort' String[] sortArgs = {sortInput.toString(), sortOutput.toString()}; // Run Sort Sort sort = new Sort(); assertEquals(ToolRunner.run(job, sort, sortArgs), 0); Counters counters = sort.getResult().getCounters(); long mapInput = counters.findCounter(Task.Counter.MAP_INPUT_BYTES ).getValue(); long hdfsRead = counters.findCounter("hdfs", FileSystemCounter.BYTES_READ) .getValue(); // the hdfs read should be between 100% and 110% of the map input bytes assertTrue("map input = " + mapInput + ", hdfs read = " + hdfsRead, (hdfsRead < (mapInput * 1.1)) && (hdfsRead > mapInput)); }
@Override public Metrics copy() throws CircusTrainException { LOG.info("Copying table data."); LOG.debug("Invoking DistCp: {} -> {}", sourceDataBaseLocation, replicaDataLocation); DistCpOptions distCpOptions = parseCopierOptions(copierOptions); LOG.debug("Invoking DistCp with options: {}", distCpOptions); CircusTrainCopyListing.setAsCopyListingClass(conf); CircusTrainCopyListing.setRootPath(conf, sourceDataBaseLocation); try { loadHComS3AFileSystem(); distCpOptions.setBlocking(false); Job job = executor.exec(conf, distCpOptions); String counter = String.format("%s_BYTES_WRITTEN", replicaDataLocation.toUri().getScheme().toUpperCase()); registerRunningJobMetrics(job, counter); if (!job.waitForCompletion(true)) { throw new IOException( "DistCp failure: Job " + job.getJobID() + " has failed: " + job.getStatus().getFailureInfo()); } return new JobMetrics(job, FileSystemCounter.class.getName(), counter); } catch (Exception e) { cleanUpReplicaDataLocation(); throw new CircusTrainException("Unable to copy file(s)", e); } }
private void registerRunningJobMetrics(final Job job, final String counter) { registry.remove(RunningMetrics.DIST_CP_BYTES_REPLICATED.name()); registry.register(RunningMetrics.DIST_CP_BYTES_REPLICATED.name(), new Gauge<Long>() { @Override public Long getValue() { try { return job.getCounters().findCounter(FileSystemCounter.class.getName(), counter).getValue(); } catch (IOException e) { LOG.warn("Could not get value for counter " + counter, e); } return 0L; } }); }
void updateCounters() { if (readBytesCounter == null) { readBytesCounter = counters.findCounter(scheme, FileSystemCounter.BYTES_READ); } if (writeBytesCounter == null) { writeBytesCounter = counters.findCounter(scheme, FileSystemCounter.BYTES_WRITTEN); } if (readOpsCounter == null) { readOpsCounter = counters.findCounter(scheme, FileSystemCounter.READ_OPS); } if (largeReadOpsCounter == null) { largeReadOpsCounter = counters.findCounter(scheme, FileSystemCounter.LARGE_READ_OPS); } if (writeOpsCounter == null) { writeOpsCounter = counters.findCounter(scheme, FileSystemCounter.WRITE_OPS); } long readBytes = 0; long writeBytes = 0; long readOps = 0; long largeReadOps = 0; long writeOps = 0; for (FileSystem.Statistics stat: stats) { readBytes = readBytes + stat.getBytesRead(); writeBytes = writeBytes + stat.getBytesWritten(); readOps = readOps + stat.getReadOps(); largeReadOps = largeReadOps + stat.getLargeReadOps(); writeOps = writeOps + stat.getWriteOps(); } readBytesCounter.setValue(readBytes); writeBytesCounter.setValue(writeBytes); readOpsCounter.setValue(readOps); largeReadOpsCounter.setValue(largeReadOps); writeOpsCounter.setValue(writeOps); }
/** * Find the file system counter for the given scheme and enum. * @param scheme of the file system * @param key the enum of the counter * @return the file system counter */ @InterfaceAudience.Private public synchronized C findCounter(String scheme, FileSystemCounter key) { return ((FileSystemCounterGroup<C>) getGroup( FileSystemCounter.class.getName()).getUnderlyingGroup()). findCounter(scheme, key); }