@Test public void testInMemoryMerger() throws IOException { JobID jobId = new JobID("a", 0); TaskAttemptID reduceId = new TaskAttemptID( new TaskID(jobId, TaskType.REDUCE, 0), 0); TaskAttemptID mapId1 = new TaskAttemptID( new TaskID(jobId, TaskType.MAP, 1), 0); TaskAttemptID mapId2 = new TaskAttemptID( new TaskID(jobId, TaskType.MAP, 2), 0); LocalDirAllocator lda = new LocalDirAllocator(MRConfig.LOCAL_DIR); MergeManagerImpl<Text, Text> mergeManager = new MergeManagerImpl<Text, Text>( reduceId, jobConf, fs, lda, Reporter.NULL, null, null, null, null, null, null, null, new Progress(), new MROutputFiles()); // write map outputs Map<String, String> map1 = new TreeMap<String, String>(); map1.put("apple", "disgusting"); map1.put("carrot", "delicious"); Map<String, String> map2 = new TreeMap<String, String>(); map1.put("banana", "pretty good"); byte[] mapOutputBytes1 = writeMapOutput(conf, map1); byte[] mapOutputBytes2 = writeMapOutput(conf, map2); InMemoryMapOutput<Text, Text> mapOutput1 = new InMemoryMapOutput<Text, Text>( conf, mapId1, mergeManager, mapOutputBytes1.length, null, true); InMemoryMapOutput<Text, Text> mapOutput2 = new InMemoryMapOutput<Text, Text>( conf, mapId2, mergeManager, mapOutputBytes2.length, null, true); System.arraycopy(mapOutputBytes1, 0, mapOutput1.getMemory(), 0, mapOutputBytes1.length); System.arraycopy(mapOutputBytes2, 0, mapOutput2.getMemory(), 0, mapOutputBytes2.length); // create merger and run merge MergeThread<InMemoryMapOutput<Text, Text>, Text, Text> inMemoryMerger = mergeManager.createInMemoryMerger(); List<InMemoryMapOutput<Text, Text>> mapOutputs = new ArrayList<InMemoryMapOutput<Text, Text>>(); mapOutputs.add(mapOutput1); mapOutputs.add(mapOutput2); inMemoryMerger.merge(mapOutputs); Assert.assertEquals(1, mergeManager.onDiskMapOutputs.size()); Path outPath = mergeManager.onDiskMapOutputs.iterator().next(); List<String> keys = new ArrayList<String>(); List<String> values = new ArrayList<String>(); readOnDiskMapOutput(conf, fs, outPath, keys, values); Assert.assertEquals(keys, Arrays.asList("apple", "banana", "carrot")); Assert.assertEquals(values, Arrays.asList("disgusting", "pretty good", "delicious")); }
@Test public void testInMemoryMerger() throws Throwable { JobID jobId = new JobID("a", 0); TaskAttemptID reduceId = new TaskAttemptID( new TaskID(jobId, TaskType.REDUCE, 0), 0); TaskAttemptID mapId1 = new TaskAttemptID( new TaskID(jobId, TaskType.MAP, 1), 0); TaskAttemptID mapId2 = new TaskAttemptID( new TaskID(jobId, TaskType.MAP, 2), 0); LocalDirAllocator lda = new LocalDirAllocator(MRConfig.LOCAL_DIR); MergeManagerImpl<Text, Text> mergeManager = new MergeManagerImpl<Text, Text>( reduceId, jobConf, fs, lda, Reporter.NULL, null, null, null, null, null, null, null, new Progress(), new MROutputFiles()); // write map outputs Map<String, String> map1 = new TreeMap<String, String>(); map1.put("apple", "disgusting"); map1.put("carrot", "delicious"); Map<String, String> map2 = new TreeMap<String, String>(); map1.put("banana", "pretty good"); byte[] mapOutputBytes1 = writeMapOutput(conf, map1); byte[] mapOutputBytes2 = writeMapOutput(conf, map2); InMemoryMapOutput<Text, Text> mapOutput1 = new InMemoryMapOutput<Text, Text>( conf, mapId1, mergeManager, mapOutputBytes1.length, null, true); InMemoryMapOutput<Text, Text> mapOutput2 = new InMemoryMapOutput<Text, Text>( conf, mapId2, mergeManager, mapOutputBytes2.length, null, true); System.arraycopy(mapOutputBytes1, 0, mapOutput1.getMemory(), 0, mapOutputBytes1.length); System.arraycopy(mapOutputBytes2, 0, mapOutput2.getMemory(), 0, mapOutputBytes2.length); // create merger and run merge MergeThread<InMemoryMapOutput<Text, Text>, Text, Text> inMemoryMerger = mergeManager.createInMemoryMerger(); List<InMemoryMapOutput<Text, Text>> mapOutputs = new ArrayList<InMemoryMapOutput<Text, Text>>(); mapOutputs.add(mapOutput1); mapOutputs.add(mapOutput2); inMemoryMerger.merge(mapOutputs); Assert.assertEquals(1, mergeManager.onDiskMapOutputs.size()); Path outPath = mergeManager.onDiskMapOutputs.iterator().next(); List<String> keys = new ArrayList<String>(); List<String> values = new ArrayList<String>(); readOnDiskMapOutput(conf, fs, outPath, keys, values); Assert.assertEquals(keys, Arrays.asList("apple", "banana", "carrot")); Assert.assertEquals(values, Arrays.asList("disgusting", "pretty good", "delicious")); mergeManager.close(); Assert.assertEquals(0, mergeManager.inMemoryMapOutputs.size()); Assert.assertEquals(0, mergeManager.inMemoryMergedMapOutputs.size()); Assert.assertEquals(0, mergeManager.onDiskMapOutputs.size()); }