/** * Collect a bunch of random terasort-like records into the map output * collector. */ @SuppressWarnings("unchecked") private void collectRecords(MapOutputCollector collector) throws IOException, InterruptedException { // Use a consistent random seed so that different implementations are // sorting the exact same data. XorshiftRandom r = new XorshiftRandom(1); // The 10-90 key-value split is the same as terasort. BytesWritable key = new BytesWritable(); key.setSize(10); BytesWritable val = new BytesWritable(); val.setSize(90); r.nextBytes(val.getBytes()); byte[] keyBytes = key.getBytes(); for (int i = 0; i < NUM_RECORDS; i++) { int partition = i % NUM_PARTITIONS; r.nextBytes(keyBytes); collector.collect(key, val, partition); } }
public void runTest(Class<? extends MapOutputCollector> collector) throws Exception { for (int i = 0; i < 30; i++) { // GC a few times first so we're really just testing the collection, nothing else. System.gc(); System.gc(); System.gc(); doBenchmark(collector); } }
public static void main(String[] args) throws Exception { Preconditions.checkArgument(args.length == 1, "Usage: " + MapOutputCollectorBenchmark.class.getName() + " <collector class name>"); Class<?> clazz = Class.forName(args[0]); Class<? extends MapOutputCollector> collectorClazz = clazz.asSubclass(MapOutputCollector.class); new MapOutputCollectorBenchmark().runTest(collectorClazz); }
private void doBenchmark(Class<? extends MapOutputCollector> collectorClazz) throws Exception { JobConf jobConf = new JobConf(); jobConf.setInt("io.sort.mb", 600); jobConf.setInt("io.file.buffer.size", 128*1024); jobConf.setMapOutputKeyClass(BytesWritable.class); jobConf.setMapOutputValueClass(BytesWritable.class); jobConf.setNumReduceTasks(NUM_PARTITIONS); jobConf.set(JobContext.TASK_ATTEMPT_ID, "test_attempt"); // Fake out a bunch of stuff to make a task context. MapOutputFile output = new YarnOutputFiles(); output.setConf(jobConf); Progress mapProgress = new Progress(); mapProgress.addPhase("map"); mapProgress.addPhase("sort"); MapTask mapTask = Mockito.mock(MapTask.class); Mockito.doReturn(output).when(mapTask).getMapOutputFile(); Mockito.doReturn(true).when(mapTask).isMapTask(); Mockito.doReturn(new TaskAttemptID("fake-jt", 12345, TaskType.MAP, 1, 1)).when(mapTask).getTaskID(); Mockito.doReturn(mapProgress).when(mapTask).getSortPhase(); MapTask t = new MapTask(); Constructor<TaskReporter> constructor = TaskReporter.class.getDeclaredConstructor(Task.class, Progress.class, TaskUmbilicalProtocol.class); constructor.setAccessible(true); TaskReporter reporter = constructor.newInstance(t, mapProgress, null); reporter.setProgress(0.0f); Context context = new MapOutputCollector.Context(mapTask, jobConf, reporter); // Actually run the map sort. ResourceTimer timer = new ResourceTimer(); MapOutputCollector<?,?> collector = ReflectionUtils.newInstance(collectorClazz, jobConf); collector.init(context); collectRecords(collector); collector.flush(); collector.close(); // Print results System.out.println("---------------------"); System.out.println("Results for " + collectorClazz.getName() + ":"); System.out.println("CPU time: " + timer.elapsedCpu() + "ms"); System.out.println("CPU time (only this thread): " + timer.elapsedCpuThisThread() + "ms"); System.out.println("Compilation time: " + timer.elapsedCompilation()); System.out.println("GC time: " + timer.elapsedGC()); System.out.println("Wall time: " + timer.elapsedWall()); System.out.println("---------------------"); }