@Test public void testLargeMemoryLimits() throws Exception { final JobConf conf = new JobConf(); // Xmx in production conf.setLong(MRJobConfig.REDUCE_MEMORY_TOTAL_BYTES, 8L * 1024 * 1024 * 1024); // M1 = Xmx fraction for map outputs conf.setFloat(MRJobConfig.SHUFFLE_INPUT_BUFFER_PERCENT, 1.0f); // M2 = max M1 fraction for a single maple output conf.setFloat(MRJobConfig.SHUFFLE_MEMORY_LIMIT_PERCENT, 0.95f); // M3 = M1 fraction at which in memory merge is triggered conf.setFloat(MRJobConfig.SHUFFLE_MERGE_PERCENT, 1.0f); // M4 = M1 fraction of map outputs remaining in memory for a reduce conf.setFloat(MRJobConfig.REDUCE_INPUT_BUFFER_PERCENT, 1.0f); final MergeManagerImpl<Text, Text> mgr = new MergeManagerImpl<Text, Text>( null, conf, mock(LocalFileSystem.class), null, null, null, null, null, null, null, null, null, null, new MROutputFiles()); assertTrue("Large shuffle area unusable: " + mgr.memoryLimit, mgr.memoryLimit > Integer.MAX_VALUE); final long maxInMemReduce = mgr.getMaxInMemReduceLimit(); assertTrue("Large in-memory reduce area unusable: " + maxInMemReduce, maxInMemReduce > Integer.MAX_VALUE); }
@Test public void testLargeMemoryLimits() throws Exception { final JobConf conf = new JobConf(); // Xmx in production conf.setLong(MRJobConfig.REDUCE_MEMORY_TOTAL_BYTES, 8L * 1024 * 1024 * 1024); // M1 = Xmx fraction for map outputs conf.setFloat(MRJobConfig.SHUFFLE_INPUT_BUFFER_PERCENT, 1.0f); // M2 = max M1 fraction for a single maple output conf.setFloat(MRJobConfig.SHUFFLE_MEMORY_LIMIT_PERCENT, 0.95f); // M3 = M1 fraction at which in memory merge is triggered conf.setFloat(MRJobConfig.SHUFFLE_MERGE_PERCENT, 1.0f); // M4 = M1 fraction of map outputs remaining in memory for a reduce conf.setFloat(MRJobConfig.REDUCE_INPUT_BUFFER_PERCENT, 1.0f); final MergeManagerImpl<Text, Text> mgr = new MergeManagerImpl<Text, Text>( null, conf, mock(LocalFileSystem.class), null, null, null, null, null, null, null, null, null, null, new MROutputFiles()); assertTrue("Large shuffle area unusable: " + mgr.memoryLimit, mgr.memoryLimit > Integer.MAX_VALUE); final long maxInMemReduce = mgr.getMaxInMemReduceLimit(); assertTrue("Large in-memory reduce area unusable: " + maxInMemReduce, maxInMemReduce > Integer.MAX_VALUE); assertEquals("maxSingleShuffleLimit to be capped at Integer.MAX_VALUE", Integer.MAX_VALUE, mgr.maxSingleShuffleLimit); verifyReservedMapOutputType(mgr, 10L, "MEMORY"); verifyReservedMapOutputType(mgr, 1L + Integer.MAX_VALUE, "DISK"); }
@SuppressWarnings({ "unchecked", "deprecation" }) @Test(timeout=10000) public void testOnDiskMerger() throws IOException, URISyntaxException, InterruptedException { JobConf jobConf = new JobConf(); final int SORT_FACTOR = 5; jobConf.setInt(MRJobConfig.IO_SORT_FACTOR, SORT_FACTOR); MapOutputFile mapOutputFile = new MROutputFiles(); FileSystem fs = FileSystem.getLocal(jobConf); MergeManagerImpl<IntWritable, IntWritable> manager = new MergeManagerImpl<IntWritable, IntWritable>(null, jobConf, fs, null , null, null, null, null, null, null, null, null, null, mapOutputFile); MergeThread<MapOutput<IntWritable, IntWritable>, IntWritable, IntWritable> onDiskMerger = (MergeThread<MapOutput<IntWritable, IntWritable>, IntWritable, IntWritable>) Whitebox.getInternalState(manager, "onDiskMerger"); int mergeFactor = (Integer) Whitebox.getInternalState(onDiskMerger, "mergeFactor"); // make sure the io.sort.factor is set properly assertEquals(mergeFactor, SORT_FACTOR); // Stop the onDiskMerger thread so that we can intercept the list of files // waiting to be merged. onDiskMerger.suspend(); //Send the list of fake files waiting to be merged Random rand = new Random(); for(int i = 0; i < 2*SORT_FACTOR; ++i) { Path path = new Path("somePath"); CompressAwarePath cap = new CompressAwarePath(path, 1l, rand.nextInt()); manager.closeOnDiskFile(cap); } //Check that the files pending to be merged are in sorted order. LinkedList<List<CompressAwarePath>> pendingToBeMerged = (LinkedList<List<CompressAwarePath>>) Whitebox.getInternalState( onDiskMerger, "pendingToBeMerged"); assertTrue("No inputs were added to list pending to merge", pendingToBeMerged.size() > 0); for(int i = 0; i < pendingToBeMerged.size(); ++i) { List<CompressAwarePath> inputs = pendingToBeMerged.get(i); for(int j = 1; j < inputs.size(); ++j) { assertTrue("Not enough / too many inputs were going to be merged", inputs.size() > 0 && inputs.size() <= SORT_FACTOR); assertTrue("Inputs to be merged were not sorted according to size: ", inputs.get(j).getCompressedSize() >= inputs.get(j-1).getCompressedSize()); } } }
@Test public void testInMemoryMerger() throws IOException { JobID jobId = new JobID("a", 0); TaskAttemptID reduceId = new TaskAttemptID( new TaskID(jobId, TaskType.REDUCE, 0), 0); TaskAttemptID mapId1 = new TaskAttemptID( new TaskID(jobId, TaskType.MAP, 1), 0); TaskAttemptID mapId2 = new TaskAttemptID( new TaskID(jobId, TaskType.MAP, 2), 0); LocalDirAllocator lda = new LocalDirAllocator(MRConfig.LOCAL_DIR); MergeManagerImpl<Text, Text> mergeManager = new MergeManagerImpl<Text, Text>( reduceId, jobConf, fs, lda, Reporter.NULL, null, null, null, null, null, null, null, new Progress(), new MROutputFiles()); // write map outputs Map<String, String> map1 = new TreeMap<String, String>(); map1.put("apple", "disgusting"); map1.put("carrot", "delicious"); Map<String, String> map2 = new TreeMap<String, String>(); map1.put("banana", "pretty good"); byte[] mapOutputBytes1 = writeMapOutput(conf, map1); byte[] mapOutputBytes2 = writeMapOutput(conf, map2); InMemoryMapOutput<Text, Text> mapOutput1 = new InMemoryMapOutput<Text, Text>( conf, mapId1, mergeManager, mapOutputBytes1.length, null, true); InMemoryMapOutput<Text, Text> mapOutput2 = new InMemoryMapOutput<Text, Text>( conf, mapId2, mergeManager, mapOutputBytes2.length, null, true); System.arraycopy(mapOutputBytes1, 0, mapOutput1.getMemory(), 0, mapOutputBytes1.length); System.arraycopy(mapOutputBytes2, 0, mapOutput2.getMemory(), 0, mapOutputBytes2.length); // create merger and run merge MergeThread<InMemoryMapOutput<Text, Text>, Text, Text> inMemoryMerger = mergeManager.createInMemoryMerger(); List<InMemoryMapOutput<Text, Text>> mapOutputs = new ArrayList<InMemoryMapOutput<Text, Text>>(); mapOutputs.add(mapOutput1); mapOutputs.add(mapOutput2); inMemoryMerger.merge(mapOutputs); Assert.assertEquals(1, mergeManager.onDiskMapOutputs.size()); Path outPath = mergeManager.onDiskMapOutputs.iterator().next(); List<String> keys = new ArrayList<String>(); List<String> values = new ArrayList<String>(); readOnDiskMapOutput(conf, fs, outPath, keys, values); Assert.assertEquals(keys, Arrays.asList("apple", "banana", "carrot")); Assert.assertEquals(values, Arrays.asList("disgusting", "pretty good", "delicious")); }
@Test public void testInMemoryMerger() throws Throwable { JobID jobId = new JobID("a", 0); TaskAttemptID reduceId = new TaskAttemptID( new TaskID(jobId, TaskType.REDUCE, 0), 0); TaskAttemptID mapId1 = new TaskAttemptID( new TaskID(jobId, TaskType.MAP, 1), 0); TaskAttemptID mapId2 = new TaskAttemptID( new TaskID(jobId, TaskType.MAP, 2), 0); LocalDirAllocator lda = new LocalDirAllocator(MRConfig.LOCAL_DIR); MergeManagerImpl<Text, Text> mergeManager = new MergeManagerImpl<Text, Text>( reduceId, jobConf, fs, lda, Reporter.NULL, null, null, null, null, null, null, null, new Progress(), new MROutputFiles()); // write map outputs Map<String, String> map1 = new TreeMap<String, String>(); map1.put("apple", "disgusting"); map1.put("carrot", "delicious"); Map<String, String> map2 = new TreeMap<String, String>(); map1.put("banana", "pretty good"); byte[] mapOutputBytes1 = writeMapOutput(conf, map1); byte[] mapOutputBytes2 = writeMapOutput(conf, map2); InMemoryMapOutput<Text, Text> mapOutput1 = new InMemoryMapOutput<Text, Text>( conf, mapId1, mergeManager, mapOutputBytes1.length, null, true); InMemoryMapOutput<Text, Text> mapOutput2 = new InMemoryMapOutput<Text, Text>( conf, mapId2, mergeManager, mapOutputBytes2.length, null, true); System.arraycopy(mapOutputBytes1, 0, mapOutput1.getMemory(), 0, mapOutputBytes1.length); System.arraycopy(mapOutputBytes2, 0, mapOutput2.getMemory(), 0, mapOutputBytes2.length); // create merger and run merge MergeThread<InMemoryMapOutput<Text, Text>, Text, Text> inMemoryMerger = mergeManager.createInMemoryMerger(); List<InMemoryMapOutput<Text, Text>> mapOutputs = new ArrayList<InMemoryMapOutput<Text, Text>>(); mapOutputs.add(mapOutput1); mapOutputs.add(mapOutput2); inMemoryMerger.merge(mapOutputs); Assert.assertEquals(1, mergeManager.onDiskMapOutputs.size()); Path outPath = mergeManager.onDiskMapOutputs.iterator().next(); List<String> keys = new ArrayList<String>(); List<String> values = new ArrayList<String>(); readOnDiskMapOutput(conf, fs, outPath, keys, values); Assert.assertEquals(keys, Arrays.asList("apple", "banana", "carrot")); Assert.assertEquals(values, Arrays.asList("disgusting", "pretty good", "delicious")); mergeManager.close(); Assert.assertEquals(0, mergeManager.inMemoryMapOutputs.size()); Assert.assertEquals(0, mergeManager.inMemoryMergedMapOutputs.size()); Assert.assertEquals(0, mergeManager.onDiskMapOutputs.size()); }