/** * Verify that all segments are read from disk * @throws Exception might be thrown */ public void testReduceFromDisk() throws Exception { final int MAP_TASKS = 8; JobConf job = mrCluster.createJobConf(); job.set(JobContext.REDUCE_INPUT_BUFFER_PERCENT, "0.0"); job.setNumMapTasks(MAP_TASKS); job.set(JobConf.MAPRED_REDUCE_TASK_JAVA_OPTS, "-Xmx128m"); job.setLong(JobContext.REDUCE_MEMORY_TOTAL_BYTES, 128 << 20); job.set(JobContext.SHUFFLE_INPUT_BUFFER_PERCENT, "0.05"); job.setInt(JobContext.IO_SORT_FACTOR, 2); job.setInt(JobContext.REDUCE_MERGE_INMEM_THRESHOLD, 4); Counters c = runJob(job); final long spill = c.findCounter(TaskCounter.SPILLED_RECORDS).getCounter(); final long out = c.findCounter(TaskCounter.MAP_OUTPUT_RECORDS).getCounter(); assertTrue("Expected all records spilled during reduce (" + spill + ")", spill >= 2 * out); // all records spill at map, reduce assertTrue("Expected intermediate merges (" + spill + ")", spill >= 2 * out + (out / MAP_TASKS)); // some records hit twice }
private void validateFileCounters(Counters counter, long fileBytesRead, long fileBytesWritten, long mapOutputBytes, long mapOutputMaterializedBytes) { assertTrue(counter.findCounter(FileInputFormatCounter.BYTES_READ) .getValue() != 0); assertEquals(fileBytesRead, counter.findCounter(FileInputFormatCounter.BYTES_READ).getValue()); assertTrue(counter.findCounter(FileOutputFormatCounter.BYTES_WRITTEN) .getValue() != 0); if (mapOutputBytes >= 0) { assertTrue(counter.findCounter(TaskCounter.MAP_OUTPUT_BYTES).getValue() != 0); } if (mapOutputMaterializedBytes >= 0) { assertTrue(counter.findCounter(TaskCounter.MAP_OUTPUT_MATERIALIZED_BYTES) .getValue() != 0); } }
@SuppressWarnings("deprecation") private long getTaskCounterUsage (JobClient client, JobID id, int numReports, int taskId, TaskType type) throws Exception { TaskReport[] reports = null; if (TaskType.MAP.equals(type)) { reports = client.getMapTaskReports(id); } else if (TaskType.REDUCE.equals(type)) { reports = client.getReduceTaskReports(id); } assertNotNull("No reports found for task type '" + type.name() + "' in job " + id, reports); // make sure that the total number of reports match the expected assertEquals("Mismatch in task id", numReports, reports.length); Counters counters = reports[taskId].getCounters(); return counters.getCounter(TaskCounter.COMMITTED_HEAP_BYTES); }
/** Verify that at least one segment does not hit disk */ public void testReduceFromPartialMem() throws Exception { final int MAP_TASKS = 7; JobConf job = mrCluster.createJobConf(); job.setNumMapTasks(MAP_TASKS); job.setInt(JobContext.REDUCE_MERGE_INMEM_THRESHOLD, 0); job.set(JobContext.REDUCE_INPUT_BUFFER_PERCENT, "1.0"); job.setInt(JobContext.SHUFFLE_PARALLEL_COPIES, 1); job.setInt(JobContext.IO_SORT_MB, 10); job.set(JobConf.MAPRED_REDUCE_TASK_JAVA_OPTS, "-Xmx128m"); job.setLong(JobContext.REDUCE_MEMORY_TOTAL_BYTES, 128 << 20); job.set(JobContext.SHUFFLE_INPUT_BUFFER_PERCENT, "0.14"); job.set(JobContext.SHUFFLE_MERGE_PERCENT, "1.0"); Counters c = runJob(job); final long out = c.findCounter(TaskCounter.MAP_OUTPUT_RECORDS).getCounter(); final long spill = c.findCounter(TaskCounter.SPILLED_RECORDS).getCounter(); assertTrue("Expected some records not spilled during reduce" + spill + ")", spill < 2 * out); // spilled map records, some records at the reduce }
public Task(String jobFile, TaskAttemptID taskId, int partition, int numSlotsRequired) { this.jobFile = jobFile; this.taskId = taskId; this.partition = partition; this.numSlotsRequired = numSlotsRequired; this.taskStatus = TaskStatus.createTaskStatus(isMapTask(), this.taskId, 0.0f, numSlotsRequired, TaskStatus.State.UNASSIGNED, "", "", "", isMapTask() ? TaskStatus.Phase.MAP : TaskStatus.Phase.SHUFFLE, counters); spilledRecordsCounter = counters.findCounter(TaskCounter.SPILLED_RECORDS); failedShuffleCounter = counters.findCounter(TaskCounter.FAILED_SHUFFLE); mergedMapOutputsCounter = counters.findCounter(TaskCounter.MERGED_MAP_OUTPUTS); gcUpdater = new GcTimeUpdater(); }
TrackedRecordReader(TaskReporter reporter, JobConf job) throws IOException{ inputRecordCounter = reporter.getCounter(TaskCounter.MAP_INPUT_RECORDS); fileInputByteCounter = reporter.getCounter(FileInputFormatCounter.BYTES_READ); this.reporter = reporter; List<Statistics> matchedStats = null; if (this.reporter.getInputSplit() instanceof FileSplit) { matchedStats = getFsStatistics(((FileSplit) this.reporter .getInputSplit()).getPath(), job); } fsStats = matchedStats; bytesInPrev = getInputBytes(fsStats); rawIn = job.getInputFormat().getRecordReader(reporter.getInputSplit(), job, reporter); bytesInCurr = getInputBytes(fsStats); fileInputByteCounter.increment(bytesInCurr - bytesInPrev); }
NewTrackingRecordReader(org.apache.hadoop.mapreduce.InputSplit split, org.apache.hadoop.mapreduce.InputFormat<K, V> inputFormat, TaskReporter reporter, org.apache.hadoop.mapreduce.TaskAttemptContext taskContext) throws InterruptedException, IOException { this.reporter = reporter; this.inputRecordCounter = reporter .getCounter(TaskCounter.MAP_INPUT_RECORDS); this.fileInputByteCounter = reporter .getCounter(FileInputFormatCounter.BYTES_READ); List <Statistics> matchedStats = null; if (split instanceof org.apache.hadoop.mapreduce.lib.input.FileSplit) { matchedStats = getFsStatistics(((org.apache.hadoop.mapreduce.lib.input.FileSplit) split) .getPath(), taskContext.getConfiguration()); } fsStats = matchedStats; long bytesInPrev = getInputBytes(fsStats); this.real = inputFormat.createRecordReader(split, taskContext); long bytesInCurr = getInputBytes(fsStats); fileInputByteCounter.increment(bytesInCurr - bytesInPrev); }
@SuppressWarnings("unchecked") NewDirectOutputCollector(MRJobConfig jobContext, JobConf job, TaskUmbilicalProtocol umbilical, TaskReporter reporter) throws IOException, ClassNotFoundException, InterruptedException { this.reporter = reporter; mapOutputRecordCounter = reporter .getCounter(TaskCounter.MAP_OUTPUT_RECORDS); fileOutputByteCounter = reporter .getCounter(FileOutputFormatCounter.BYTES_WRITTEN); List<Statistics> matchedStats = null; if (outputFormat instanceof org.apache.hadoop.mapreduce.lib.output.FileOutputFormat) { matchedStats = getFsStatistics(org.apache.hadoop.mapreduce.lib.output.FileOutputFormat .getOutputPath(taskContext), taskContext.getConfiguration()); } fsStats = matchedStats; long bytesOutPrev = getOutputBytes(fsStats); out = outputFormat.getRecordWriter(taskContext); long bytesOutCurr = getOutputBytes(fsStats); fileOutputByteCounter.increment(bytesOutCurr - bytesOutPrev); }
@SuppressWarnings("unchecked") public void init(MapOutputCollector.Context context ) throws IOException, ClassNotFoundException { this.reporter = context.getReporter(); JobConf job = context.getJobConf(); String finalName = getOutputName(getPartition()); FileSystem fs = FileSystem.get(job); OutputFormat<K, V> outputFormat = job.getOutputFormat(); mapOutputRecordCounter = reporter.getCounter(TaskCounter.MAP_OUTPUT_RECORDS); fileOutputByteCounter = reporter .getCounter(FileOutputFormatCounter.BYTES_WRITTEN); List<Statistics> matchedStats = null; if (outputFormat instanceof FileOutputFormat) { matchedStats = getFsStatistics(FileOutputFormat.getOutputPath(job), job); } fsStats = matchedStats; long bytesOutPrev = getOutputBytes(fsStats); out = job.getOutputFormat().getRecordWriter(fs, job, finalName, reporter); long bytesOutCurr = getOutputBytes(fsStats); fileOutputByteCounter.increment(bytesOutCurr - bytesOutPrev); }
public SkippingReduceValuesIterator(RawKeyValueIterator in, RawComparator<KEY> comparator, Class<KEY> keyClass, Class<VALUE> valClass, Configuration conf, TaskReporter reporter, TaskUmbilicalProtocol umbilical) throws IOException { super(in, comparator, keyClass, valClass, conf, reporter); this.umbilical = umbilical; this.skipGroupCounter = reporter.getCounter(TaskCounter.REDUCE_SKIPPED_GROUPS); this.skipRecCounter = reporter.getCounter(TaskCounter.REDUCE_SKIPPED_RECORDS); this.toWriteSkipRecs = toWriteSkipRecs() && SkipBadRecords.getSkipOutputPath(conf)!=null; this.keyClass = keyClass; this.valClass = valClass; this.reporter = reporter; skipIt = getSkipRanges().skipRangeIterator(); mayBeSkip(); }
@Test public void testCounters() throws IOException { Enum[] keysWithResource = {TaskCounter.MAP_INPUT_RECORDS, TaskCounter.MAP_OUTPUT_BYTES}; Enum[] keysWithoutResource = {myCounters.TEST1, myCounters.TEST2}; String[] groups = {"group1", "group2", "group{}()[]"}; String[] counters = {"counter1", "counter2", "counter{}()[]"}; try { // I. Check enum counters that have resource bundler testCounter(getEnumCounters(keysWithResource)); // II. Check enum counters that dont have resource bundler testCounter(getEnumCounters(keysWithoutResource)); // III. Check string counters testCounter(getEnumCounters(groups, counters)); } catch (ParseException pe) { throw new IOException(pe); } }
@SuppressWarnings("deprecation") private void checkLegacyNames(Counters counters) { assertEquals("New name", 1, counters.findCounter( TaskCounter.class.getName(), "MAP_INPUT_RECORDS").getValue()); assertEquals("Legacy name", 1, counters.findCounter( "org.apache.hadoop.mapred.Task$Counter", "MAP_INPUT_RECORDS").getValue()); assertEquals("Legacy enum", 1, counters.findCounter(Task.Counter.MAP_INPUT_RECORDS).getValue()); assertEquals("New name", 1, counters.findCounter( JobCounter.class.getName(), "DATA_LOCAL_MAPS").getValue()); assertEquals("Legacy name", 1, counters.findCounter( "org.apache.hadoop.mapred.JobInProgress$Counter", "DATA_LOCAL_MAPS").getValue()); assertEquals("Legacy enum", 1, counters.findCounter(JobInProgress.Counter.DATA_LOCAL_MAPS).getValue()); assertEquals("New name", 1, counters.findCounter( FileSystemCounter.class.getName(), "FILE_BYTES_READ").getValue()); assertEquals("New name and method", 1, counters.findCounter("file", FileSystemCounter.BYTES_READ).getValue()); assertEquals("Legacy name", 1, counters.findCounter( "FileSystemCounters", "FILE_BYTES_READ").getValue()); }
public static void verifyCounters(Job normalJob, Job nativeJob, boolean hasCombiner) throws IOException { Counters normalCounters = normalJob.getCounters(); Counters nativeCounters = nativeJob.getCounters(); assertEquals("Counter MAP_OUTPUT_RECORDS should be equal", normalCounters.findCounter(TaskCounter.MAP_OUTPUT_RECORDS).getValue(), nativeCounters.findCounter(TaskCounter.MAP_OUTPUT_RECORDS).getValue()); assertEquals("Counter REDUCE_INPUT_GROUPS should be equal", normalCounters.findCounter(TaskCounter.REDUCE_INPUT_GROUPS).getValue(), nativeCounters.findCounter(TaskCounter.REDUCE_INPUT_GROUPS).getValue()); if (!hasCombiner) { assertEquals("Counter REDUCE_INPUT_RECORDS should be equal", normalCounters.findCounter(TaskCounter.REDUCE_INPUT_RECORDS).getValue(), nativeCounters.findCounter(TaskCounter.REDUCE_INPUT_RECORDS).getValue()); } }
private boolean bigItemCount(String output) throws IOException, ClassNotFoundException, InterruptedException { Job job = Job.getInstance(this.getConf(), "Counting items from " + this.input); job.setJarByClass(TopPIoverHadoop.class); job.setInputFormatClass(TextInputFormat.class); job.setOutputFormatClass(SequenceFileOutputFormat.class); job.setOutputKeyClass(IntWritable.class); job.setOutputValueClass(IntWritable.class); FileInputFormat.addInputPath(job, new Path(this.input)); FileOutputFormat.setOutputPath(job, new Path(output)); job.setMapperClass(ItemBigCountingMapper.class); job.setReducerClass(ItemBigCountingReducer.class); boolean success = job.waitForCompletion(true); if (success) { Counter rebasingMaxID = job.getCounters().findCounter(TaskCounter.REDUCE_OUTPUT_RECORDS); this.getConf().setInt(KEY_REBASING_MAX_ID, (int) rebasingMaxID.getValue()); } return success; }