private void initialize(Configuration job, long splitStart, long splitLength, Path file) throws IOException { start = splitStart; end = start + splitLength; pos = start; // open the file and seek to the start of the split final FileSystem fs = file.getFileSystem(job); fileIn = fs.open(file); this.readStats = new ReadStatistics(); this.bufferPool = new ElasticByteBufferPool(); boolean skipChecksums = job.getBoolean("bytecount.skipChecksums", false); this.readOption = skipChecksums ? EnumSet.of(ReadOption.SKIP_CHECKSUMS) : EnumSet .noneOf(ReadOption.class); CompressionCodec codec = new CompressionCodecFactory(job).getCodec(file); if (null != codec) { isCompressedInput = true; decompressor = CodecPool.getDecompressor(codec); CompressionInputStream cIn = codec.createInputStream(fileIn, decompressor); filePosition = cIn; inputStream = cIn; LOG.info( "Compressed input; cannot compute number of records in the split"); } else { fileIn.seek(start); filePosition = fileIn; inputStream = fileIn; LOG.info("Split pos = " + start + " length " + splitLength); } }
private void updateStats(ReadStatistics newStats) { context.getCounter(READ_COUNTER.BYTES_READ).increment( newStats.getTotalBytesRead() - readStats.getTotalBytesRead()); context.getCounter(READ_COUNTER.LOCAL_BYTES_READ).increment( newStats.getTotalLocalBytesRead() - readStats.getTotalLocalBytesRead()); context.getCounter(READ_COUNTER.SCR_BYTES_READ).increment( newStats.getTotalShortCircuitBytesRead() - readStats.getTotalShortCircuitBytesRead()); context.getCounter(READ_COUNTER.ZCR_BYTES_READ).increment( newStats.getTotalZeroCopyBytesRead() - readStats.getTotalZeroCopyBytesRead()); this.readStats = new ReadStatistics(newStats); }
@Test public void testExternalBlockReader() throws Exception { Configuration conf = new Configuration(); conf.set(HdfsClientConfigKeys.REPLICA_ACCESSOR_BUILDER_CLASSES_KEY, SyntheticReplicaAccessorBuilder.class.getName()); conf.setLong(HdfsClientConfigKeys.DFS_BLOCK_SIZE_KEY, 1024); conf.setLong(DFSConfigKeys.DFS_NAMENODE_MIN_BLOCK_SIZE_KEY, 0); String uuid = UUID.randomUUID().toString(); conf.set(SYNTHETIC_BLOCK_READER_TEST_UUID_KEY, uuid); MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf) .numDataNodes(1) .build(); final int TEST_LENGTH = 2047; DistributedFileSystem dfs = cluster.getFileSystem(); try { DFSTestUtil.createFile(dfs, new Path("/a"), TEST_LENGTH, (short)1, SEED); HdfsDataInputStream stream = (HdfsDataInputStream)dfs.open(new Path("/a")); byte buf[] = new byte[TEST_LENGTH]; stream.seek(1000); IOUtils.readFully(stream, buf, 1000, TEST_LENGTH - 1000); stream.seek(0); IOUtils.readFully(stream, buf, 0, 1000); byte expected[] = DFSTestUtil. calculateFileContentsFromSeed(SEED, TEST_LENGTH); ReadStatistics stats = stream.getReadStatistics(); Assert.assertEquals(1024, stats.getTotalShortCircuitBytesRead()); Assert.assertEquals(2047, stats.getTotalLocalBytesRead()); Assert.assertEquals(2047, stats.getTotalBytesRead()); Assert.assertArrayEquals(expected, buf); stream.close(); ExtendedBlock block = DFSTestUtil.getFirstBlock(dfs, new Path("/a")); Assert.assertNotNull(block); LinkedList<SyntheticReplicaAccessor> accessorList = accessors.get(uuid); Assert.assertNotNull(accessorList); Assert.assertEquals(3, accessorList.size()); SyntheticReplicaAccessor accessor = accessorList.get(0); Assert.assertTrue(accessor.builder.allowShortCircuit); Assert.assertEquals(block.getBlockPoolId(), accessor.builder.blockPoolId); Assert.assertEquals(block.getBlockId(), accessor.builder.blockId); Assert.assertEquals(dfs.getClient().clientName, accessor.builder.clientName); Assert.assertEquals("/a", accessor.builder.fileName); Assert.assertEquals(block.getGenerationStamp(), accessor.getGenerationStamp()); Assert.assertTrue(accessor.builder.verifyChecksum); Assert.assertEquals(1024L, accessor.builder.visibleLength); Assert.assertEquals(24L, accessor.totalRead); Assert.assertEquals("", accessor.getError()); Assert.assertEquals(1, accessor.numCloses); byte[] tempBuf = new byte[5]; Assert.assertEquals(-1, accessor.read(TEST_LENGTH, tempBuf, 0, 0)); Assert.assertEquals(-1, accessor.read(TEST_LENGTH, tempBuf, 0, tempBuf.length)); accessors.remove(uuid); } finally { dfs.close(); cluster.shutdown(); } }