@Override public RecordReader<NullWritable,ColumnAndIndex> getRecordReader( final InputSplit split, final JobConf job, final Reporter reporter ) throws IOException { FileSplit fileSplit = (FileSplit)split; Path path = fileSplit.getPath(); FileSystem fs = path.getFileSystem( job ); long fileLength = fs.getLength( path ); long start = fileSplit.getStart(); long length = fileSplit.getLength(); InputStream in = fs.open( path ); IJobReporter jobReporter = new HadoopJobReporter( reporter ); jobReporter.setStatus( String.format( "Read file : %s" , path.toString() ) ); HiveReaderSetting hiveConfig = new HiveReaderSetting( fileSplit , job ); if ( hiveConfig.isVectorMode() ){ IVectorizedReaderSetting vectorizedSetting = new HiveVectorizedReaderSetting( fileSplit , job , hiveConfig ); return (RecordReader)new MDSHiveDirectVectorizedReader( in , fileLength , start , length , vectorizedSetting , jobReporter ); } else{ return new MDSHiveLineReader( in , fileLength , start , length , hiveConfig , jobReporter , spreadCounter ); } }
@SuppressWarnings({"rawtypes", "unchecked"}) public void initReader() throws IOException { try { Configuration conf = WorkerContext.get().getConf(); String inputFormatClassName = conf.get(AngelConf.ANGEL_INPUTFORMAT_CLASS, AngelConf.DEFAULT_ANGEL_INPUTFORMAT_CLASS); Class<? extends org.apache.hadoop.mapred.InputFormat> inputFormatClass = (Class<? extends org.apache.hadoop.mapred.InputFormat>) Class .forName(inputFormatClassName); org.apache.hadoop.mapred.InputFormat inputFormat = ReflectionUtils.newInstance(inputFormatClass, new JobConf(conf)); org.apache.hadoop.mapred.RecordReader<KEY, VALUE> recordReader = inputFormat.getRecordReader(split, new JobConf(conf), Reporter.NULL); setReader(new DFSReaderOldAPI(recordReader)); } catch (Exception x) { LOG.error("init reader error ", x); throw new IOException(x); } }
/** * From each split sampled, take the first numSamples / numSplits records. */ @SuppressWarnings("unchecked") // ArrayList::toArray doesn't preserve type public K[] getSample(InputFormat<K,V> inf, JobConf job) throws IOException { InputSplit[] splits = inf.getSplits(job, job.getNumMapTasks()); ArrayList<K> samples = new ArrayList<K>(numSamples); int splitsToSample = Math.min(maxSplitsSampled, splits.length); int splitStep = splits.length / splitsToSample; int samplesPerSplit = numSamples / splitsToSample; long records = 0; for (int i = 0; i < splitsToSample; ++i) { RecordReader<K,V> reader = inf.getRecordReader(splits[i * splitStep], job, Reporter.NULL); K key = reader.createKey(); V value = reader.createValue(); while (reader.next(key, value)) { samples.add(key); key = reader.createKey(); ++records; if ((i+1) * samplesPerSplit <= records) { break; } } reader.close(); } return (K[])samples.toArray(); }
/** * For each split sampled, emit when the ratio of the number of records * retained to the total record count is less than the specified * frequency. */ @SuppressWarnings("unchecked") // ArrayList::toArray doesn't preserve type public K[] getSample(InputFormat<K,V> inf, JobConf job) throws IOException { InputSplit[] splits = inf.getSplits(job, job.getNumMapTasks()); ArrayList<K> samples = new ArrayList<K>(); int splitsToSample = Math.min(maxSplitsSampled, splits.length); int splitStep = splits.length / splitsToSample; long records = 0; long kept = 0; for (int i = 0; i < splitsToSample; ++i) { RecordReader<K,V> reader = inf.getRecordReader(splits[i * splitStep], job, Reporter.NULL); K key = reader.createKey(); V value = reader.createValue(); while (reader.next(key, value)) { ++records; if ((double) kept / records < freq) { ++kept; samples.add(key); key = reader.createKey(); } } reader.close(); } return (K[])samples.toArray(); }
/** * test DBInputFormat class. Class should split result for chunks * @throws Exception */ @Test(timeout = 10000) public void testDBInputFormat() throws Exception { JobConf configuration = new JobConf(); setupDriver(configuration); DBInputFormat<NullDBWritable> format = new DBInputFormat<NullDBWritable>(); format.setConf(configuration); format.setConf(configuration); DBInputFormat.DBInputSplit splitter = new DBInputFormat.DBInputSplit(1, 10); Reporter reporter = mock(Reporter.class); RecordReader<LongWritable, NullDBWritable> reader = format.getRecordReader( splitter, configuration, reporter); configuration.setInt(MRJobConfig.NUM_MAPS, 3); InputSplit[] lSplits = format.getSplits(configuration, 3); assertEquals(5, lSplits[0].getLength()); assertEquals(3, lSplits.length); // test reader .Some simple tests assertEquals(LongWritable.class, reader.createKey().getClass()); assertEquals(0, reader.getPos()); assertEquals(0, reader.getProgress(), 0.001); reader.close(); }
public RecordReader getRecordReader(InputSplit split, JobConf job, Reporter reporter) throws IOException { FileSplit fileSplit = (FileSplit) split; FileSystem fs = FileSystem.get(fileSplit.getPath().toUri(), job); FSDataInputStream is = fs.open(fileSplit.getPath()); byte[] header = new byte[3]; RecordReader reader = null; try { is.readFully(header); } catch (EOFException eof) { reader = textInputFormat.getRecordReader(split, job, reporter); } finally { is.close(); } if (header[0] == 'S' && header[1] == 'E' && header[2] == 'Q') { reader = seqFileInputFormat.getRecordReader(split, job, reporter); } else { reader = textInputFormat.getRecordReader(split, job, reporter); } return reader; }
@Test public void readEthereumBlockInputFormatBlock1346406Bzip2Compressed() throws IOException, EthereumBlockReadException, ParseException, InterruptedException { JobConf job = new JobConf(defaultConf); ClassLoader classLoader = getClass().getClassLoader(); String fileName="eth1346406.bin.bz2"; String fileNameBlock=classLoader.getResource("testdata/"+fileName).getFile(); Path file = new Path(fileNameBlock); FileInputFormat.setInputPaths(job, file); EthereumBlockFileInputFormat format = new EthereumBlockFileInputFormat(); format.configure(job); InputSplit[] inputSplits = format.getSplits(job,1); assertEquals( 1, inputSplits.length,"Only one split generated for genesis block"); RecordReader<BytesWritable, EthereumBlock> reader = format.getRecordReader(inputSplits[0], job, reporter); assertNotNull( reader,"Format returned null RecordReader"); BytesWritable key = new BytesWritable(); EthereumBlock block = new EthereumBlock(); assertTrue( reader.next(key,block),"Input Split for block 1346406 contains at least one block"); assertEquals( 6, block.getEthereumTransactions().size(),"Block 1346406 must have 6 transactions"); assertFalse( reader.next(key,block),"No further blocks in block 1346406"); reader.close(); }
@Test public void readExcelInputFormatExcel2013SingleSheetEncryptedNegativeLowFootprint() throws IOException { JobConf job = new JobConf(defaultConf); ClassLoader classLoader = getClass().getClassLoader(); String fileName="excel2013encrypt.xlsx"; String fileNameSpreadSheet=classLoader.getResource(fileName).getFile(); Path file = new Path(fileNameSpreadSheet); FileInputFormat.setInputPaths(job, file); // set locale to the one of the test data job.set("hadoopoffice.read.locale.bcp47","de"); // low footprint job.set("hadoopoffice.read.lowFootprint", "true"); // for decryption simply set the password job.set("hadoopoffice.read.security.crypt.password","test2"); ExcelFileInputFormat format = new ExcelFileInputFormat(); format.configure(job); InputSplit[] inputSplits = format.getSplits(job,1); assertEquals(1,inputSplits.length,"Only one split generated for Excel file"); RecordReader<Text, ArrayWritable> reader = format.getRecordReader(inputSplits[0], job, reporter); assertNull(reader,"Null record reader implies invalid password"); }
/** * Initializes next reader if available, will close previous reader if any. * * @param job map / reduce job configuration. * @return true if new reader was initialized, false is no more readers are available * @throws ExecutionSetupException if could not init record reader */ protected boolean initNextReader(JobConf job) throws ExecutionSetupException { if (inputSplitsIterator.hasNext()) { if (reader != null) { closeReader(); } InputSplit inputSplit = inputSplitsIterator.next(); try { reader = (org.apache.hadoop.mapred.RecordReader<Object, Object>) job.getInputFormat().getRecordReader(inputSplit, job, Reporter.NULL); logger.trace("hive reader created: {} for inputSplit {}", reader.getClass().getName(), inputSplit.toString()); } catch (Exception e) { throw new ExecutionSetupException("Failed to get o.a.hadoop.mapred.RecordReader from Hive InputFormat", e); } return true; } return false; }
static RecordReader<NullWritable, DynamoDBItemWritable> getRecordReader( InputSplit inputSplit, JobConf job, Reporter reporter) throws IOException { // CombineFileSplit indicates the new export format which includes a manifest file if (inputSplit instanceof CombineFileSplit) { int version = job.getInt(DynamoDBConstants.EXPORT_FORMAT_VERSION, -1); if (version != ExportManifestRecordWriter.FORMAT_VERSION) { throw new IOException("Unknown version: " + job.get(DynamoDBConstants .EXPORT_FORMAT_VERSION)); } return new ImportCombineFileRecordReader((CombineFileSplit) inputSplit, job, reporter); } else if (inputSplit instanceof FileSplit) { // FileSplit indicates the old data pipeline format which doesn't include a manifest file Path path = ((FileSplit) inputSplit).getPath(); return new ImportRecordReader(job, path); } else { throw new IOException("Expecting CombineFileSplit or FileSplit but the input split type is:" + " " + inputSplit.getClass()); } }
@Test public void readEthereumBlockInputFormatBlock1346406() throws IOException, EthereumBlockReadException, ParseException, InterruptedException { JobConf job = new JobConf(defaultConf); ClassLoader classLoader = getClass().getClassLoader(); String fileName="eth1346406.bin"; String fileNameBlock=classLoader.getResource("testdata/"+fileName).getFile(); Path file = new Path(fileNameBlock); FileInputFormat.setInputPaths(job, file); EthereumBlockFileInputFormat format = new EthereumBlockFileInputFormat(); format.configure(job); InputSplit[] inputSplits = format.getSplits(job,1); assertEquals( 1, inputSplits.length,"Only one split generated for genesis block"); RecordReader<BytesWritable, EthereumBlock> reader = format.getRecordReader(inputSplits[0], job, reporter); assertNotNull( reader,"Format returned null RecordReader"); BytesWritable key = new BytesWritable(); EthereumBlock block = new EthereumBlock(); assertTrue( reader.next(key,block),"Input Split for block 1346406 contains at least one block"); assertEquals( 6, block.getEthereumTransactions().size(),"Block 1346406 must have 6 transactions"); assertFalse( reader.next(key,block),"No further blocks in block 1346406"); reader.close(); }
@Test public void readExcelInputFormatExcel2003SingleSheetEncryptedNegativeLowFootprint() throws IOException { JobConf job = new JobConf(defaultConf); ClassLoader classLoader = getClass().getClassLoader(); String fileName="excel2003encrypt.xls"; String fileNameSpreadSheet=classLoader.getResource(fileName).getFile(); Path file = new Path(fileNameSpreadSheet); FileInputFormat.setInputPaths(job, file); // set locale to the one of the test data job.set("hadoopoffice.read.locale.bcp47","de"); // low footprint job.set("hadoopoffice.read.lowFootprint", "true"); // for decryption simply set the password job.set("hadoopoffice.read.security.crypt.password","test2"); ExcelFileInputFormat format = new ExcelFileInputFormat(); format.configure(job); InputSplit[] inputSplits = format.getSplits(job,1); assertEquals(1,inputSplits.length,"Only one split generated for Excel file"); RecordReader<Text, ArrayWritable> reader = format.getRecordReader(inputSplits[0], job, reporter); assertNull(reader,"Null record reader implies invalid password"); }
@Test public void readExcelInputFormatExcel2003Empty() throws IOException { JobConf job = new JobConf(defaultConf); ClassLoader classLoader = getClass().getClassLoader(); String fileName="excel2003empty.xls"; String fileNameSpreadSheet=classLoader.getResource(fileName).getFile(); Path file = new Path(fileNameSpreadSheet); FileInputFormat.setInputPaths(job, file); // set locale to the one of the test data job.set("hadoopoffice.locale.bcp47","de"); ExcelFileInputFormat format = new ExcelFileInputFormat(); format.configure(job); InputSplit[] inputSplits = format.getSplits(job,1); assertEquals(1, inputSplits.length,"Only one split generated for Excel file"); RecordReader<Text, ArrayWritable> reader = format.getRecordReader(inputSplits[0], job, reporter); assertNotNull(reader,"Format returned null RecordReader"); Text spreadSheetKey = new Text(); ArrayWritable spreadSheetValue = new ArrayWritable(SpreadSheetCellDAO.class); assertTrue( reader.next(spreadSheetKey,spreadSheetValue),"Input Split for Excel file contains row 1"); assertEquals(0,spreadSheetValue.get().length,"Input Split for Excel file contain row 1 and is empty"); assertFalse(reader.next(spreadSheetKey,spreadSheetValue),"Input Split for Excel file contains no further row"); }
@Test public void readExcelInputFormatExcel2013Empty() throws IOException { JobConf job = new JobConf(defaultConf); ClassLoader classLoader = getClass().getClassLoader(); String fileName="excel2013empty.xlsx"; String fileNameSpreadSheet=classLoader.getResource(fileName).getFile(); Path file = new Path(fileNameSpreadSheet); FileInputFormat.setInputPaths(job, file); // set locale to the one of the test data job.set("hadoopoffice.read.locale.bcp47","de"); ExcelFileInputFormat format = new ExcelFileInputFormat(); format.configure(job); InputSplit[] inputSplits = format.getSplits(job,1); assertEquals(1, inputSplits.length,"Only one split generated for Excel file"); RecordReader<Text, ArrayWritable> reader = format.getRecordReader(inputSplits[0], job, reporter); assertNotNull(reader,"Format returned null RecordReader"); Text spreadSheetKey = new Text(); ArrayWritable spreadSheetValue = new ArrayWritable(SpreadSheetCellDAO.class); assertTrue( reader.next(spreadSheetKey,spreadSheetValue), "Input Split for Excel file contains row 1"); assertEquals(0,spreadSheetValue.get().length, "Input Split for Excel file contain row 1 and is empty"); assertFalse(reader.next(spreadSheetKey,spreadSheetValue), "Input Split for Excel file contains no further row"); }
@Test public void readEthereumBlockInputFormatBlock3346406() throws IOException, EthereumBlockReadException, ParseException, InterruptedException { JobConf job = new JobConf(defaultConf); ClassLoader classLoader = getClass().getClassLoader(); String fileName="eth3346406.bin"; String fileNameBlock=classLoader.getResource("testdata/"+fileName).getFile(); Path file = new Path(fileNameBlock); FileInputFormat.setInputPaths(job, file); EthereumBlockFileInputFormat format = new EthereumBlockFileInputFormat(); format.configure(job); InputSplit[] inputSplits = format.getSplits(job,1); assertEquals( 1, inputSplits.length,"Only one split generated for genesis block"); RecordReader<BytesWritable, EthereumBlock> reader = format.getRecordReader(inputSplits[0], job, reporter); assertNotNull( reader,"Format returned null RecordReader"); BytesWritable key = new BytesWritable(); EthereumBlock block = new EthereumBlock(); assertTrue( reader.next(key,block),"Input Split for block 3346406 contains at least one block"); assertEquals( 7, block.getEthereumTransactions().size(),"Block 3346406 must have 7 transactions"); assertFalse( reader.next(key,block),"No further blocks in block 3346406"); reader.close(); }
public RecordReader<Object, Object> getRecordReader(InputSplit split, JobConf job, Reporter reporter) throws IOException { return new RecordReader<Object, Object>() { boolean once = false; public boolean next(Object key, Object value) throws IOException { if (!once) { once = true; return true; } return false; } public Object createKey() { return new Object(); } public Object createValue() { return new Object(); } public long getPos() throws IOException { return 0L; } public void close() throws IOException { } public float getProgress() throws IOException { return 0.0f; } }; }
public RecordReader<LongWritable, Text> getRecordReader( InputSplit ignored, JobConf conf, Reporter reporter) throws IOException { return new RecordReader<LongWritable, Text>() { boolean sentOneRecord = false; public boolean next(LongWritable key, Text value) throws IOException { key.set(1); value.set("dummy"); if (sentOneRecord == false) { // first call sentOneRecord = true; return true; } return false; // we have sent one record - we are done } public LongWritable createKey() { return new LongWritable(); } public Text createValue() { return new Text(); } public long getPos() throws IOException { return 1; } public void close() throws IOException { } public float getProgress() throws IOException { return 1; } }; }
public RecordReader<K,V> getRecordReader( InputSplit ignored, JobConf conf, Reporter reporter) { return new RecordReader<K,V>() { public boolean next(K key, V value) throws IOException { return false; } public K createKey() { return ReflectionUtils.newInstance(keyclass, null); } public V createValue() { return ReflectionUtils.newInstance(valclass, null); } public long getPos() throws IOException { return 0L; } public void close() throws IOException { } public float getProgress() throws IOException { return 0.0f; } }; }
/** * Create a handler that will handle any records output from the application. * @param collector the "real" collector that takes the output * @param reporter the reporter for reporting progress */ public OutputHandler(OutputCollector<K, V> collector, Reporter reporter, RecordReader<FloatWritable,NullWritable> recordReader, String expectedDigest) { this.reporter = reporter; this.collector = collector; this.recordReader = recordReader; this.expectedDigest = expectedDigest; }
/** {@inheritDoc} */ @SuppressWarnings("unchecked") // Explicit check for value class agreement public V createValue() { if (null == valueclass) { final Class<?> cls = kids[0].createValue().getClass(); for (RecordReader<K,? extends V> rr : kids) { if (!cls.equals(rr.createValue().getClass())) { throw new ClassCastException("Child value classes fail to agree"); } } valueclass = cls.asSubclass(Writable.class); ivalue = createInternalValue(); } return (V) ReflectionUtils.newInstance(valueclass, null); }
/** * Create a new key value common to all child RRs. * @throws ClassCastException if key classes differ. */ @SuppressWarnings("unchecked") // Explicit check for key class agreement public K createKey() { if (null == keyclass) { final Class<?> cls = kids[0].createKey().getClass(); for (RecordReader<K,? extends Writable> rr : kids) { if (!cls.equals(rr.createKey().getClass())) { throw new ClassCastException("Child key classes fail to agree"); } } keyclass = cls.asSubclass(WritableComparable.class); } return (K) ReflectionUtils.newInstance(keyclass, getConf()); }
/** * Close all child RRs. */ public void close() throws IOException { if (kids != null) { for (RecordReader<K,? extends Writable> rr : kids) { rr.close(); } } if (jc != null) { jc.close(); } }
/** * Report progress as the minimum of all child RR progress. */ public float getProgress() throws IOException { float ret = 1.0f; for (RecordReader<K,? extends Writable> rr : kids) { ret = Math.min(ret, rr.getProgress()); } return ret; }
public RecordReader<LongWritable, Text> getRecordReader( InputSplit genericSplit, JobConf job, Reporter reporter) throws IOException { reporter.setStatus(genericSplit.toString()); return new LineRecordReader(job, (FileSplit) genericSplit); }
@SuppressWarnings("unchecked") public RecordReader<K, V> getRecordReader(InputSplit split, JobConf conf, Reporter reporter) throws IOException { // Find the InputFormat and then the RecordReader from the // TaggedInputSplit. TaggedInputSplit taggedInputSplit = (TaggedInputSplit) split; InputFormat<K, V> inputFormat = (InputFormat<K, V>) ReflectionUtils .newInstance(taggedInputSplit.getInputFormatClass(), conf); return inputFormat.getRecordReader(taggedInputSplit.getInputSplit(), conf, reporter); }
/** {@inheritDoc} */ public RecordReader<LongWritable, T> getRecordReader(InputSplit split, JobConf job, Reporter reporter) throws IOException { // wrap the DBRR in a shim class to deal with API differences. return new DBRecordReaderWrapper<T>( (org.apache.hadoop.mapreduce.lib.db.DBRecordReader<T>) createDBRecordReader( (org.apache.hadoop.mapreduce.lib.db.DBInputFormat.DBInputSplit) split, job)); }
public void run(RecordReader<K1, V1> input, OutputCollector<K2, V2> output, Reporter reporter) throws IOException { PipeMapper pipeMapper = (PipeMapper)getMapper(); pipeMapper.startOutputThreads(output, reporter); super.run(input, output, reporter); }
/** * Dump given list of files to standard output as typed bytes. */ @SuppressWarnings("unchecked") private int dumpTypedBytes(List<FileStatus> files) throws IOException { JobConf job = new JobConf(getConf()); DataOutputStream dout = new DataOutputStream(System.out); AutoInputFormat autoInputFormat = new AutoInputFormat(); for (FileStatus fileStatus : files) { FileSplit split = new FileSplit(fileStatus.getPath(), 0, fileStatus.getLen() * fileStatus.getBlockSize(), (String[]) null); RecordReader recReader = null; try { recReader = autoInputFormat.getRecordReader(split, job, Reporter.NULL); Object key = recReader.createKey(); Object value = recReader.createValue(); while (recReader.next(key, value)) { if (key instanceof Writable) { TypedBytesWritableOutput.get(dout).write((Writable) key); } else { TypedBytesOutput.get(dout).write(key); } if (value instanceof Writable) { TypedBytesWritableOutput.get(dout).write((Writable) value); } else { TypedBytesOutput.get(dout).write(value); } } } finally { if (recReader != null) { recReader.close(); } } } dout.flush(); return 0; }
protected SkipRecordsInspector(Properties tableProperties, RecordReader reader) { this.fileFormats = new HashSet<Object>(Arrays.asList(org.apache.hadoop.mapred.TextInputFormat.class.getName())); this.headerCount = retrievePositiveIntProperty(tableProperties, serdeConstants.HEADER_COUNT, 0); this.footerCount = retrievePositiveIntProperty(tableProperties, serdeConstants.FOOTER_COUNT, 0); logger.debug("skipRecordInspector: fileFormat {}, headerCount {}, footerCount {}", this.fileFormats, this.headerCount, this.footerCount); this.footerBuffer = Lists.newLinkedList(); this.continuance = false; this.holderIndex = -1; this.valueHolder = initializeValueHolder(reader, footerCount); this.actualCount = 0; this.tempCount = 0; }
@Override public int populateData() throws IOException, SerDeException { final RecordReader<Object, Object> reader = this.reader; final Converter partTblObjectInspectorConverter = this.partTblObjectInspectorConverter; final int numRowsPerBatch = (int) this.numRowsPerBatch; final StructField[] selectedStructFieldRefs = this.selectedStructFieldRefs; final SerDe partitionSerDe = this.partitionSerDe; final StructObjectInspector finalOI = this.finalOI; final ObjectInspector[] selectedColumnObjInspectors = this.selectedColumnObjInspectors; final HiveFieldConverter[] selectedColumnFieldConverters = this.selectedColumnFieldConverters; final ValueVector[] vectors = this.vectors; final Object key = this.key; final Object value = this.value; int recordCount = 0; while (recordCount < numRowsPerBatch && reader.next(key, value)) { Object deSerializedValue = partitionSerDe.deserialize((Writable) value); if (partTblObjectInspectorConverter != null) { deSerializedValue = partTblObjectInspectorConverter.convert(deSerializedValue); } for (int i = 0; i < selectedStructFieldRefs.length; i++) { Object hiveValue = finalOI.getStructFieldData(deSerializedValue, selectedStructFieldRefs[i]); if (hiveValue != null) { selectedColumnFieldConverters[i].setSafeValue(selectedColumnObjInspectors[i], hiveValue, vectors[i], recordCount); } } recordCount++; } return recordCount; }
private void verifyWithMockedMapReduce(JobConf job, int numRegions, int expectedNumSplits, byte[] startRow, byte[] stopRow) throws IOException, InterruptedException { TableSnapshotInputFormat tsif = new TableSnapshotInputFormat(); InputSplit[] splits = tsif.getSplits(job, 0); Assert.assertEquals(expectedNumSplits, splits.length); HBaseTestingUtility.SeenRowTracker rowTracker = new HBaseTestingUtility.SeenRowTracker(startRow, stopRow); for (int i = 0; i < splits.length; i++) { // validate input split InputSplit split = splits[i]; Assert.assertTrue(split instanceof TableSnapshotInputFormat.TableSnapshotRegionSplit); // validate record reader OutputCollector collector = mock(OutputCollector.class); Reporter reporter = mock(Reporter.class); RecordReader<ImmutableBytesWritable, Result> rr = tsif.getRecordReader(split, job, reporter); // validate we can read all the data back ImmutableBytesWritable key = rr.createKey(); Result value = rr.createValue(); while (rr.next(key, value)) { verifyRowFromMap(key, value); rowTracker.addRow(key.copyBytes()); } rr.close(); } // validate all rows are seen rowTracker.validate(); }