static private void checkNumberOfSegmentsAndSlots(final int expectedSegments, final int expectedSlots, ShortCircuitRegistry registry) { registry.visit(new ShortCircuitRegistry.Visitor() { @Override public void accept(HashMap<ShmId, RegisteredShm> segments, HashMultimap<ExtendedBlockId, Slot> slots) { Assert.assertEquals(expectedSegments, segments.size()); Assert.assertEquals(expectedSlots, slots.size()); } }); }
@Test(timeout=60000) public void testPreReceiptVerificationDfsClientCanDoScr() throws Exception { BlockReaderTestUtil.enableShortCircuitShmTracing(); TemporarySocketDirectory sockDir = new TemporarySocketDirectory(); Configuration conf = createShortCircuitConf( "testPreReceiptVerificationDfsClientCanDoScr", sockDir); conf.setLong(DFS_CLIENT_READ_SHORTCIRCUIT_STREAMS_CACHE_EXPIRY_MS_KEY, 1000000000L); MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).build(); cluster.waitActive(); DistributedFileSystem fs = cluster.getFileSystem(); fs.getClient().getConf().brfFailureInjector = new TestPreReceiptVerificationFailureInjector(); final Path TEST_PATH1 = new Path("/test_file1"); DFSTestUtil.createFile(fs, TEST_PATH1, 4096, (short)1, 0xFADE2); final Path TEST_PATH2 = new Path("/test_file2"); DFSTestUtil.createFile(fs, TEST_PATH2, 4096, (short)1, 0xFADE2); DFSTestUtil.readFileBuffer(fs, TEST_PATH1); DFSTestUtil.readFileBuffer(fs, TEST_PATH2); ShortCircuitRegistry registry = cluster.getDataNodes().get(0).getShortCircuitRegistry(); registry.visit(new ShortCircuitRegistry.Visitor() { @Override public void accept(HashMap<ShmId, RegisteredShm> segments, HashMultimap<ExtendedBlockId, Slot> slots) { Assert.assertEquals(1, segments.size()); Assert.assertEquals(2, slots.size()); } }); cluster.shutdown(); sockDir.close(); }
@Test(timeout=60000) public void testPreReceiptVerificationDfsClientCanDoScr() throws Exception { BlockReaderTestUtil.enableShortCircuitShmTracing(); TemporarySocketDirectory sockDir = new TemporarySocketDirectory(); Configuration conf = createShortCircuitConf( "testPreReceiptVerificationDfsClientCanDoScr", sockDir); conf.setLong( HdfsClientConfigKeys.Read.ShortCircuit.STREAMS_CACHE_EXPIRY_MS_KEY, 1000000000L); MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).build(); cluster.waitActive(); DistributedFileSystem fs = cluster.getFileSystem(); BlockReaderFactory.setFailureInjectorForTesting( new TestPreReceiptVerificationFailureInjector()); final Path TEST_PATH1 = new Path("/test_file1"); DFSTestUtil.createFile(fs, TEST_PATH1, 4096, (short)1, 0xFADE2); final Path TEST_PATH2 = new Path("/test_file2"); DFSTestUtil.createFile(fs, TEST_PATH2, 4096, (short)1, 0xFADE2); DFSTestUtil.readFileBuffer(fs, TEST_PATH1); DFSTestUtil.readFileBuffer(fs, TEST_PATH2); ShortCircuitRegistry registry = cluster.getDataNodes().get(0).getShortCircuitRegistry(); registry.visit(new ShortCircuitRegistry.Visitor() { @Override public void accept(HashMap<ShmId, RegisteredShm> segments, HashMultimap<ExtendedBlockId, Slot> slots) { Assert.assertEquals(1, segments.size()); Assert.assertEquals(2, slots.size()); } }); cluster.shutdown(); sockDir.close(); }
/** * Test that a client which supports short-circuit reads using * shared memory can fall back to not using shared memory when * the server doesn't support it. */ @Test public void testShortCircuitReadFromServerWithoutShm() throws Exception { TemporarySocketDirectory sockDir = new TemporarySocketDirectory(); Configuration clientConf = createShortCircuitConf( "testShortCircuitReadFromServerWithoutShm", sockDir); Configuration serverConf = new Configuration(clientConf); serverConf.setInt( DFS_SHORT_CIRCUIT_SHARED_MEMORY_WATCHER_INTERRUPT_CHECK_MS, 0); DFSInputStream.tcpReadsDisabledForTesting = true; final MiniDFSCluster cluster = new MiniDFSCluster.Builder(serverConf).numDataNodes(1).build(); cluster.waitActive(); clientConf.set(DFS_CLIENT_CONTEXT, "testShortCircuitReadFromServerWithoutShm_clientContext"); final DistributedFileSystem fs = (DistributedFileSystem)FileSystem.get(cluster.getURI(0), clientConf); final String TEST_FILE = "/test_file"; final int TEST_FILE_LEN = 4000; final int SEED = 0xFADEC; DFSTestUtil.createFile(fs, new Path(TEST_FILE), TEST_FILE_LEN, (short)1, SEED); byte contents[] = DFSTestUtil.readFileBuffer(fs, new Path(TEST_FILE)); byte expected[] = DFSTestUtil. calculateFileContentsFromSeed(SEED, TEST_FILE_LEN); Assert.assertTrue(Arrays.equals(contents, expected)); final ShortCircuitCache cache = fs.dfs.getClientContext().getShortCircuitCache(); final DatanodeInfo datanode = new DatanodeInfo(cluster.getDataNodes().get(0).getDatanodeId()); cache.getDfsClientShmManager().visit(new Visitor() { @Override public void visit(HashMap<DatanodeInfo, PerDatanodeVisitorInfo> info) throws IOException { Assert.assertEquals(1, info.size()); PerDatanodeVisitorInfo vinfo = info.get(datanode); Assert.assertTrue(vinfo.disabled); Assert.assertEquals(0, vinfo.full.size()); Assert.assertEquals(0, vinfo.notFull.size()); } }); cluster.shutdown(); sockDir.close(); }
/** * Test that a client which supports short-circuit reads using * shared memory can fall back to not using shared memory when * the server doesn't support it. */ @Test public void testShortCircuitReadFromServerWithoutShm() throws Exception { TemporarySocketDirectory sockDir = new TemporarySocketDirectory(); Configuration clientConf = createShortCircuitConf( "testShortCircuitReadFromServerWithoutShm", sockDir); Configuration serverConf = new Configuration(clientConf); serverConf.setInt( DFS_SHORT_CIRCUIT_SHARED_MEMORY_WATCHER_INTERRUPT_CHECK_MS, 0); DFSInputStream.tcpReadsDisabledForTesting = true; final MiniDFSCluster cluster = new MiniDFSCluster.Builder(serverConf).numDataNodes(1).build(); cluster.waitActive(); clientConf.set(DFS_CLIENT_CONTEXT, "testShortCircuitReadFromServerWithoutShm_clientContext"); final DistributedFileSystem fs = (DistributedFileSystem)FileSystem.get(cluster.getURI(0), clientConf); final String TEST_FILE = "/test_file"; final int TEST_FILE_LEN = 4000; final int SEED = 0xFADEC; DFSTestUtil.createFile(fs, new Path(TEST_FILE), TEST_FILE_LEN, (short)1, SEED); byte contents[] = DFSTestUtil.readFileBuffer(fs, new Path(TEST_FILE)); byte expected[] = DFSTestUtil. calculateFileContentsFromSeed(SEED, TEST_FILE_LEN); Assert.assertTrue(Arrays.equals(contents, expected)); final ShortCircuitCache cache = fs.dfs.getClientContext().getShortCircuitCache(); final DatanodeInfo datanode = new DatanodeInfo(cluster.getDataNodes().get(0).getDatanodeId()); cache.getDfsClientShmManager().visit(new Visitor() { @Override public void visit(HashMap<DatanodeInfo, PerDatanodeVisitorInfo> info) throws IOException { Assert.assertEquals(1, info.size()); PerDatanodeVisitorInfo vinfo = info.get(datanode); Assert.assertTrue(vinfo.disabled); Assert.assertEquals(0, vinfo.full.size()); Assert.assertEquals(0, vinfo.notFull.size()); } }); cluster.shutdown(); }