@Test(timeout=60000) public void testDataXceiverCleansUpSlotsOnFailure() throws Exception { BlockReaderTestUtil.enableShortCircuitShmTracing(); TemporarySocketDirectory sockDir = new TemporarySocketDirectory(); Configuration conf = createShortCircuitConf( "testDataXceiverCleansUpSlotsOnFailure", sockDir); conf.setLong(DFS_CLIENT_READ_SHORTCIRCUIT_STREAMS_CACHE_EXPIRY_MS_KEY, 1000000000L); MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).build(); cluster.waitActive(); DistributedFileSystem fs = cluster.getFileSystem(); final Path TEST_PATH1 = new Path("/test_file1"); final Path TEST_PATH2 = new Path("/test_file2"); final int TEST_FILE_LEN = 4096; final int SEED = 0xFADE1; DFSTestUtil.createFile(fs, TEST_PATH1, TEST_FILE_LEN, (short)1, SEED); DFSTestUtil.createFile(fs, TEST_PATH2, TEST_FILE_LEN, (short)1, SEED); // The first read should allocate one shared memory segment and slot. DFSTestUtil.readFileBuffer(fs, TEST_PATH1); // The second read should fail, and we should only have 1 segment and 1 slot // left. fs.getClient().getConf().brfFailureInjector = new TestCleanupFailureInjector(); try { DFSTestUtil.readFileBuffer(fs, TEST_PATH2); } catch (Throwable t) { GenericTestUtils.assertExceptionContains("TCP reads were disabled for " + "testing, but we failed to do a non-TCP read.", t); } checkNumberOfSegmentsAndSlots(1, 1, cluster.getDataNodes().get(0).getShortCircuitRegistry()); cluster.shutdown(); sockDir.close(); }
@Test(timeout=60000) public void testPreReceiptVerificationDfsClientCanDoScr() throws Exception { BlockReaderTestUtil.enableShortCircuitShmTracing(); TemporarySocketDirectory sockDir = new TemporarySocketDirectory(); Configuration conf = createShortCircuitConf( "testPreReceiptVerificationDfsClientCanDoScr", sockDir); conf.setLong(DFS_CLIENT_READ_SHORTCIRCUIT_STREAMS_CACHE_EXPIRY_MS_KEY, 1000000000L); MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).build(); cluster.waitActive(); DistributedFileSystem fs = cluster.getFileSystem(); fs.getClient().getConf().brfFailureInjector = new TestPreReceiptVerificationFailureInjector(); final Path TEST_PATH1 = new Path("/test_file1"); DFSTestUtil.createFile(fs, TEST_PATH1, 4096, (short)1, 0xFADE2); final Path TEST_PATH2 = new Path("/test_file2"); DFSTestUtil.createFile(fs, TEST_PATH2, 4096, (short)1, 0xFADE2); DFSTestUtil.readFileBuffer(fs, TEST_PATH1); DFSTestUtil.readFileBuffer(fs, TEST_PATH2); ShortCircuitRegistry registry = cluster.getDataNodes().get(0).getShortCircuitRegistry(); registry.visit(new ShortCircuitRegistry.Visitor() { @Override public void accept(HashMap<ShmId, RegisteredShm> segments, HashMultimap<ExtendedBlockId, Slot> slots) { Assert.assertEquals(1, segments.size()); Assert.assertEquals(2, slots.size()); } }); cluster.shutdown(); sockDir.close(); }
@Before public void setup() throws Exception { conf = createCachingConf(); cluster = new MiniDFSCluster.Builder(conf).numDataNodes(NUM_DATANODES).build(); cluster.waitActive(); dfs = cluster.getFileSystem(); proto = cluster.getNameNodeRpc(); namenode = cluster.getNameNode(); prevCacheManipulator = NativeIO.POSIX.getCacheManipulator(); NativeIO.POSIX.setCacheManipulator(new NoMlockCacheManipulator()); BlockReaderTestUtil.enableHdfsCachingTracing(); }
@Test(timeout=60000) public void testDataXceiverCleansUpSlotsOnFailure() throws Exception { BlockReaderTestUtil.enableShortCircuitShmTracing(); TemporarySocketDirectory sockDir = new TemporarySocketDirectory(); Configuration conf = createShortCircuitConf( "testDataXceiverCleansUpSlotsOnFailure", sockDir); conf.setLong( HdfsClientConfigKeys.Read.ShortCircuit.STREAMS_CACHE_EXPIRY_MS_KEY, 1000000000L); MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).build(); cluster.waitActive(); DistributedFileSystem fs = cluster.getFileSystem(); final Path TEST_PATH1 = new Path("/test_file1"); final Path TEST_PATH2 = new Path("/test_file2"); final int TEST_FILE_LEN = 4096; final int SEED = 0xFADE1; DFSTestUtil.createFile(fs, TEST_PATH1, TEST_FILE_LEN, (short)1, SEED); DFSTestUtil.createFile(fs, TEST_PATH2, TEST_FILE_LEN, (short)1, SEED); // The first read should allocate one shared memory segment and slot. DFSTestUtil.readFileBuffer(fs, TEST_PATH1); // The second read should fail, and we should only have 1 segment and 1 slot // left. BlockReaderFactory.setFailureInjectorForTesting( new TestCleanupFailureInjector()); try { DFSTestUtil.readFileBuffer(fs, TEST_PATH2); } catch (Throwable t) { GenericTestUtils.assertExceptionContains("TCP reads were disabled for " + "testing, but we failed to do a non-TCP read.", t); } checkNumberOfSegmentsAndSlots(1, 1, cluster.getDataNodes().get(0).getShortCircuitRegistry()); cluster.shutdown(); sockDir.close(); }
@Test(timeout=60000) public void testPreReceiptVerificationDfsClientCanDoScr() throws Exception { BlockReaderTestUtil.enableShortCircuitShmTracing(); TemporarySocketDirectory sockDir = new TemporarySocketDirectory(); Configuration conf = createShortCircuitConf( "testPreReceiptVerificationDfsClientCanDoScr", sockDir); conf.setLong( HdfsClientConfigKeys.Read.ShortCircuit.STREAMS_CACHE_EXPIRY_MS_KEY, 1000000000L); MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).build(); cluster.waitActive(); DistributedFileSystem fs = cluster.getFileSystem(); BlockReaderFactory.setFailureInjectorForTesting( new TestPreReceiptVerificationFailureInjector()); final Path TEST_PATH1 = new Path("/test_file1"); DFSTestUtil.createFile(fs, TEST_PATH1, 4096, (short)1, 0xFADE2); final Path TEST_PATH2 = new Path("/test_file2"); DFSTestUtil.createFile(fs, TEST_PATH2, 4096, (short)1, 0xFADE2); DFSTestUtil.readFileBuffer(fs, TEST_PATH1); DFSTestUtil.readFileBuffer(fs, TEST_PATH2); ShortCircuitRegistry registry = cluster.getDataNodes().get(0).getShortCircuitRegistry(); registry.visit(new ShortCircuitRegistry.Visitor() { @Override public void accept(HashMap<ShmId, RegisteredShm> segments, HashMultimap<ExtendedBlockId, Slot> slots) { Assert.assertEquals(1, segments.size()); Assert.assertEquals(2, slots.size()); } }); cluster.shutdown(); sockDir.close(); }
@BeforeClass public static void setupCluster() throws Exception { final int REPLICATION_FACTOR = 1; util = new BlockReaderTestUtil(REPLICATION_FACTOR); // Create file. util.writeFile(TEST_FILE, FILE_SIZE_K); // Now get its blocks. List<LocatedBlock> blkList = util.getFileBlocks(TEST_FILE, FILE_SIZE_K); testBlock = blkList.get(0); // Use the first block to test }
@Test public void test2GBMmapLimit() throws Exception { Assume.assumeTrue(BlockReaderTestUtil.shouldTestLargeFiles()); HdfsConfiguration conf = initZeroCopyTest(); final long TEST_FILE_LENGTH = 2469605888L; conf.set(DFSConfigKeys.DFS_CHECKSUM_TYPE_KEY, "NULL"); conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, TEST_FILE_LENGTH); MiniDFSCluster cluster = null; final Path TEST_PATH = new Path("/a"); final String CONTEXT = "test2GBMmapLimit"; conf.set(DFSConfigKeys.DFS_CLIENT_CONTEXT, CONTEXT); FSDataInputStream fsIn = null, fsIn2 = null; ByteBuffer buf1 = null, buf2 = null; try { cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).build(); cluster.waitActive(); DistributedFileSystem fs = cluster.getFileSystem(); DFSTestUtil.createFile(fs, TEST_PATH, TEST_FILE_LENGTH, (short)1, 0xB); DFSTestUtil.waitReplication(fs, TEST_PATH, (short)1); fsIn = fs.open(TEST_PATH); buf1 = fsIn.read(null, 1, EnumSet.of(ReadOption.SKIP_CHECKSUMS)); Assert.assertEquals(1, buf1.remaining()); fsIn.releaseBuffer(buf1); buf1 = null; fsIn.seek(2147483640L); buf1 = fsIn.read(null, 1024, EnumSet.of(ReadOption.SKIP_CHECKSUMS)); Assert.assertEquals(7, buf1.remaining()); Assert.assertEquals(Integer.MAX_VALUE, buf1.limit()); fsIn.releaseBuffer(buf1); buf1 = null; Assert.assertEquals(2147483647L, fsIn.getPos()); try { buf1 = fsIn.read(null, 1024, EnumSet.of(ReadOption.SKIP_CHECKSUMS)); Assert.fail("expected UnsupportedOperationException"); } catch (UnsupportedOperationException e) { // expected; can't read past 2GB boundary. } fsIn.close(); fsIn = null; // Now create another file with normal-sized blocks, and verify we // can read past 2GB final Path TEST_PATH2 = new Path("/b"); conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, 268435456L); DFSTestUtil.createFile(fs, TEST_PATH2, 1024 * 1024, TEST_FILE_LENGTH, 268435456L, (short)1, 0xA); fsIn2 = fs.open(TEST_PATH2); fsIn2.seek(2147483640L); buf2 = fsIn2.read(null, 1024, EnumSet.of(ReadOption.SKIP_CHECKSUMS)); Assert.assertEquals(8, buf2.remaining()); Assert.assertEquals(2147483648L, fsIn2.getPos()); fsIn2.releaseBuffer(buf2); buf2 = null; buf2 = fsIn2.read(null, 1024, EnumSet.of(ReadOption.SKIP_CHECKSUMS)); Assert.assertEquals(1024, buf2.remaining()); Assert.assertEquals(2147484672L, fsIn2.getPos()); fsIn2.releaseBuffer(buf2); buf2 = null; } finally { if (buf1 != null) { fsIn.releaseBuffer(buf1); } if (buf2 != null) { fsIn2.releaseBuffer(buf2); } IOUtils.cleanup(null, fsIn, fsIn2); if (cluster != null) { cluster.shutdown(); } } }
@Test(timeout=60000) public void testDataXceiverHandlesRequestShortCircuitShmFailure() throws Exception { BlockReaderTestUtil.enableShortCircuitShmTracing(); TemporarySocketDirectory sockDir = new TemporarySocketDirectory(); Configuration conf = createShortCircuitConf( "testDataXceiverHandlesRequestShortCircuitShmFailure", sockDir); conf.setLong(DFS_CLIENT_READ_SHORTCIRCUIT_STREAMS_CACHE_EXPIRY_MS_KEY, 1000000000L); MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).build(); cluster.waitActive(); DistributedFileSystem fs = cluster.getFileSystem(); final Path TEST_PATH1 = new Path("/test_file1"); DFSTestUtil.createFile(fs, TEST_PATH1, 4096, (short)1, 0xFADE1); LOG.info("Setting failure injector and performing a read which " + "should fail..."); DataNodeFaultInjector failureInjector = Mockito.mock(DataNodeFaultInjector.class); Mockito.doAnswer(new Answer<Void>() { @Override public Void answer(InvocationOnMock invocation) throws Throwable { throw new IOException("injected error into sendShmResponse"); } }).when(failureInjector).sendShortCircuitShmResponse(); DataNodeFaultInjector prevInjector = DataNodeFaultInjector.instance; DataNodeFaultInjector.instance = failureInjector; try { // The first read will try to allocate a shared memory segment and slot. // The shared memory segment allocation will fail because of the failure // injector. DFSTestUtil.readFileBuffer(fs, TEST_PATH1); Assert.fail("expected readFileBuffer to fail, but it succeeded."); } catch (Throwable t) { GenericTestUtils.assertExceptionContains("TCP reads were disabled for " + "testing, but we failed to do a non-TCP read.", t); } checkNumberOfSegmentsAndSlots(0, 0, cluster.getDataNodes().get(0).getShortCircuitRegistry()); LOG.info("Clearing failure injector and performing another read..."); DataNodeFaultInjector.instance = prevInjector; fs.getClient().getClientContext().getDomainSocketFactory().clearPathMap(); // The second read should succeed. DFSTestUtil.readFileBuffer(fs, TEST_PATH1); // We should have added a new short-circuit shared memory segment and slot. checkNumberOfSegmentsAndSlots(1, 1, cluster.getDataNodes().get(0).getShortCircuitRegistry()); cluster.shutdown(); sockDir.close(); }
/** * Test that when we have an uncache request, and the client refuses to release * the replica for a long time, we will un-mlock it. */ @Test(timeout=120000) public void testRevocation() throws Exception { assumeTrue(NativeCodeLoader.isNativeCodeLoaded() && !Path.WINDOWS); BlockReaderTestUtil.enableHdfsCachingTracing(); BlockReaderTestUtil.enableShortCircuitShmTracing(); Configuration conf = getDefaultConf(); // Set a really short revocation timeout. conf.setLong(DFSConfigKeys.DFS_DATANODE_CACHE_REVOCATION_TIMEOUT_MS, 250L); // Poll very often conf.setLong(DFSConfigKeys.DFS_DATANODE_CACHE_REVOCATION_POLLING_MS, 2L); MiniDFSCluster cluster = null; cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).build(); cluster.waitActive(); DistributedFileSystem dfs = cluster.getFileSystem(); // Create and cache a file. final String TEST_FILE = "/test_file2"; DFSTestUtil.createFile(dfs, new Path(TEST_FILE), BLOCK_SIZE, (short)1, 0xcafe); dfs.addCachePool(new CachePoolInfo("pool")); long cacheDirectiveId = dfs.addCacheDirective(new CacheDirectiveInfo.Builder(). setPool("pool").setPath(new Path(TEST_FILE)). setReplication((short) 1).build()); FsDatasetSpi<?> fsd = cluster.getDataNodes().get(0).getFSDataset(); DFSTestUtil.verifyExpectedCacheUsage(BLOCK_SIZE, 1, fsd); // Mmap the file. FSDataInputStream in = dfs.open(new Path(TEST_FILE)); ByteBuffer buf = in.read(null, BLOCK_SIZE, EnumSet.noneOf(ReadOption.class)); // Attempt to uncache file. The file should get uncached. LOG.info("removing cache directive {}", cacheDirectiveId); dfs.removeCacheDirective(cacheDirectiveId); LOG.info("finished removing cache directive {}", cacheDirectiveId); Thread.sleep(1000); DFSTestUtil.verifyExpectedCacheUsage(0, 0, fsd); // Cleanup in.releaseBuffer(buf); in.close(); cluster.shutdown(); }
@Test public void test2GBMmapLimit() throws Exception { Assume.assumeTrue(BlockReaderTestUtil.shouldTestLargeFiles()); HdfsConfiguration conf = initZeroCopyTest(); final long TEST_FILE_LENGTH = 2469605888L; conf.set(DFSConfigKeys.DFS_CHECKSUM_TYPE_KEY, "NULL"); conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, TEST_FILE_LENGTH); MiniDFSCluster cluster = null; final Path TEST_PATH = new Path("/a"); final String CONTEXT = "test2GBMmapLimit"; conf.set(HdfsClientConfigKeys.DFS_CLIENT_CONTEXT, CONTEXT); FSDataInputStream fsIn = null, fsIn2 = null; ByteBuffer buf1 = null, buf2 = null; try { cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).build(); cluster.waitActive(); DistributedFileSystem fs = cluster.getFileSystem(); DFSTestUtil.createFile(fs, TEST_PATH, TEST_FILE_LENGTH, (short)1, 0xB); DFSTestUtil.waitReplication(fs, TEST_PATH, (short)1); fsIn = fs.open(TEST_PATH); buf1 = fsIn.read(null, 1, EnumSet.of(ReadOption.SKIP_CHECKSUMS)); Assert.assertEquals(1, buf1.remaining()); fsIn.releaseBuffer(buf1); buf1 = null; fsIn.seek(2147483640L); buf1 = fsIn.read(null, 1024, EnumSet.of(ReadOption.SKIP_CHECKSUMS)); Assert.assertEquals(7, buf1.remaining()); Assert.assertEquals(Integer.MAX_VALUE, buf1.limit()); fsIn.releaseBuffer(buf1); buf1 = null; Assert.assertEquals(2147483647L, fsIn.getPos()); try { buf1 = fsIn.read(null, 1024, EnumSet.of(ReadOption.SKIP_CHECKSUMS)); Assert.fail("expected UnsupportedOperationException"); } catch (UnsupportedOperationException e) { // expected; can't read past 2GB boundary. } fsIn.close(); fsIn = null; // Now create another file with normal-sized blocks, and verify we // can read past 2GB final Path TEST_PATH2 = new Path("/b"); conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, 268435456L); DFSTestUtil.createFile(fs, TEST_PATH2, 1024 * 1024, TEST_FILE_LENGTH, 268435456L, (short)1, 0xA); fsIn2 = fs.open(TEST_PATH2); fsIn2.seek(2147483640L); buf2 = fsIn2.read(null, 1024, EnumSet.of(ReadOption.SKIP_CHECKSUMS)); Assert.assertEquals(8, buf2.remaining()); Assert.assertEquals(2147483648L, fsIn2.getPos()); fsIn2.releaseBuffer(buf2); buf2 = null; buf2 = fsIn2.read(null, 1024, EnumSet.of(ReadOption.SKIP_CHECKSUMS)); Assert.assertEquals(1024, buf2.remaining()); Assert.assertEquals(2147484672L, fsIn2.getPos()); fsIn2.releaseBuffer(buf2); buf2 = null; } finally { if (buf1 != null) { fsIn.releaseBuffer(buf1); } if (buf2 != null) { fsIn2.releaseBuffer(buf2); } IOUtils.cleanup(null, fsIn, fsIn2); if (cluster != null) { cluster.shutdown(); } } }
@Test(timeout=60000) public void testDataXceiverHandlesRequestShortCircuitShmFailure() throws Exception { BlockReaderTestUtil.enableShortCircuitShmTracing(); TemporarySocketDirectory sockDir = new TemporarySocketDirectory(); Configuration conf = createShortCircuitConf( "testDataXceiverHandlesRequestShortCircuitShmFailure", sockDir); conf.setLong(HdfsClientConfigKeys.Read.ShortCircuit.STREAMS_CACHE_EXPIRY_MS_KEY, 1000000000L); MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).build(); cluster.waitActive(); DistributedFileSystem fs = cluster.getFileSystem(); final Path TEST_PATH1 = new Path("/test_file1"); DFSTestUtil.createFile(fs, TEST_PATH1, 4096, (short)1, 0xFADE1); LOG.info("Setting failure injector and performing a read which " + "should fail..."); DataNodeFaultInjector failureInjector = Mockito.mock(DataNodeFaultInjector.class); Mockito.doAnswer(new Answer<Void>() { @Override public Void answer(InvocationOnMock invocation) throws Throwable { throw new IOException("injected error into sendShmResponse"); } }).when(failureInjector).sendShortCircuitShmResponse(); DataNodeFaultInjector prevInjector = DataNodeFaultInjector.instance; DataNodeFaultInjector.instance = failureInjector; try { // The first read will try to allocate a shared memory segment and slot. // The shared memory segment allocation will fail because of the failure // injector. DFSTestUtil.readFileBuffer(fs, TEST_PATH1); Assert.fail("expected readFileBuffer to fail, but it succeeded."); } catch (Throwable t) { GenericTestUtils.assertExceptionContains("TCP reads were disabled for " + "testing, but we failed to do a non-TCP read.", t); } checkNumberOfSegmentsAndSlots(0, 0, cluster.getDataNodes().get(0).getShortCircuitRegistry()); LOG.info("Clearing failure injector and performing another read..."); DataNodeFaultInjector.instance = prevInjector; fs.getClient().getClientContext().getDomainSocketFactory().clearPathMap(); // The second read should succeed. DFSTestUtil.readFileBuffer(fs, TEST_PATH1); // We should have added a new short-circuit shared memory segment and slot. checkNumberOfSegmentsAndSlots(1, 1, cluster.getDataNodes().get(0).getShortCircuitRegistry()); cluster.shutdown(); sockDir.close(); }