/** The sort method derived from BasicTypeSorterBase and overridden here*/ public RawKeyValueIterator sort() { MergeSort m = new MergeSort(this); int count = super.count; if (count == 0) return null; int [] pointers = super.pointers; int [] pointersCopy = new int[count]; System.arraycopy(pointers, 0, pointersCopy, 0, count); m.mergeSort(pointers, pointersCopy, 0, count); return new MRSortResultIterator(super.keyValBuffer, pointersCopy, super.startOffsets, super.keyLengths, super.valueLengths); }
public void runTest(CompressionType compressionType) throws IOException { JobConf job = new JobConf(); FileSystem fs = FileSystem.getLocal(job); Path dir = new Path(System.getProperty("test.build.data",".") + "/mapred"); Path file = new Path(dir, "test.seq"); Path tempDir = new Path(dir, "tmp"); fs.delete(dir, true); FileInputFormat.setInputPaths(job, dir); fs.mkdirs(tempDir); LongWritable tkey = new LongWritable(); Text tval = new Text(); SequenceFile.Writer writer = SequenceFile.createWriter(fs, job, file, LongWritable.class, Text.class, compressionType, new DefaultCodec()); try { for (int i = 0; i < RECORDS; ++i) { tkey.set(1234); tval.set("valuevaluevaluevaluevaluevaluevaluevaluevaluevaluevalue"); writer.append(tkey, tval); } } finally { writer.close(); } long fileLength = fs.getFileStatus(file).getLen(); LOG.info("With compression = " + compressionType + ": " + "compressed length = " + fileLength); SequenceFile.Sorter sorter = new SequenceFile.Sorter(fs, job.getOutputKeyComparator(), job.getMapOutputKeyClass(), job.getMapOutputValueClass(), job); Path[] paths = new Path[] {file}; RawKeyValueIterator rIter = sorter.merge(paths, tempDir, false); int count = 0; while (rIter.next()) { count++; } assertEquals(RECORDS, count); assertEquals(1.0f, rIter.getProgress().get()); }
public void runTest(CompressionType compressionType) throws IOException { JobConf job = new JobConf(); FileSystem fs = FileSystem.getLocal(job); Path dir = new Path(System.getProperty("test.build.data",".") + "/mapred"); Path file = new Path(dir, "test.seq"); Path tempDir = new Path(dir, "tmp"); fs.delete(dir, true); FileInputFormat.setInputPaths(job, dir); fs.mkdirs(tempDir); LongWritable tkey = new LongWritable(); Text tval = new Text(); SequenceFile.Writer writer = SequenceFile.createWriter(fs, job, file, LongWritable.class, Text.class, compressionType, new DefaultCodec()); try { for (int i = 0; i < RECORDS; ++i) { tkey.set(1234); tval.set("valuevaluevaluevaluevaluevaluevaluevaluevaluevaluevalue"); writer.append(tkey, tval); } } finally { writer.close(); } long fileLength = fs.getFileStatus(file).getLen(); LOG.info("With compression = " + compressionType + ": " + "compressed length = " + fileLength); SequenceFile.Sorter sorter = new SequenceFile.Sorter(fs, job.getOutputKeyComparator(), job.getMapOutputKeyClass(), job.getMapOutputValueClass(), job); Path[] paths = new Path[] {file}; RawKeyValueIterator rIter = sorter.merge(paths, tempDir, false); int count = 0; while (rIter.next()) { count++; } assertEquals(RECORDS, count); assertEquals(1.0f, rIter.getProgress().get(), 0.0000); }