@Override public void commit() throws IOException { fs.rename(tmpOutputPath, outputPath); CompressAwarePath compressAwarePath = new CompressAwarePath(outputPath, getSize(), this.compressedSize); merger.closeOnDiskFile(compressAwarePath); }
@Override public void commit() throws IOException { fs.rename(tmpOutputPath, outputPath); CompressAwarePath compressAwarePath = new CompressAwarePath(outputPath, getSize(), this.compressedSize); getMerger().closeOnDiskFile(compressAwarePath); }
@SuppressWarnings({ "unchecked", "deprecation" }) @Test(timeout=10000) public void testOnDiskMerger() throws IOException, URISyntaxException, InterruptedException { JobConf jobConf = new JobConf(); final int SORT_FACTOR = 5; jobConf.setInt(MRJobConfig.IO_SORT_FACTOR, SORT_FACTOR); MapOutputFile mapOutputFile = new MROutputFiles(); FileSystem fs = FileSystem.getLocal(jobConf); MergeManagerImpl<IntWritable, IntWritable> manager = new MergeManagerImpl<IntWritable, IntWritable>(null, jobConf, fs, null , null, null, null, null, null, null, null, null, null, mapOutputFile); MergeThread<MapOutput<IntWritable, IntWritable>, IntWritable, IntWritable> onDiskMerger = (MergeThread<MapOutput<IntWritable, IntWritable>, IntWritable, IntWritable>) Whitebox.getInternalState(manager, "onDiskMerger"); int mergeFactor = (Integer) Whitebox.getInternalState(onDiskMerger, "mergeFactor"); // make sure the io.sort.factor is set properly assertEquals(mergeFactor, SORT_FACTOR); // Stop the onDiskMerger thread so that we can intercept the list of files // waiting to be merged. onDiskMerger.suspend(); //Send the list of fake files waiting to be merged Random rand = new Random(); for(int i = 0; i < 2*SORT_FACTOR; ++i) { Path path = new Path("somePath"); CompressAwarePath cap = new CompressAwarePath(path, 1l, rand.nextInt()); manager.closeOnDiskFile(cap); } //Check that the files pending to be merged are in sorted order. LinkedList<List<CompressAwarePath>> pendingToBeMerged = (LinkedList<List<CompressAwarePath>>) Whitebox.getInternalState( onDiskMerger, "pendingToBeMerged"); assertTrue("No inputs were added to list pending to merge", pendingToBeMerged.size() > 0); for(int i = 0; i < pendingToBeMerged.size(); ++i) { List<CompressAwarePath> inputs = pendingToBeMerged.get(i); for(int j = 1; j < inputs.size(); ++j) { assertTrue("Not enough / too many inputs were going to be merged", inputs.size() > 0 && inputs.size() <= SORT_FACTOR); assertTrue("Inputs to be merged were not sorted according to size: ", inputs.get(j).getCompressedSize() >= inputs.get(j-1).getCompressedSize()); } } }