@Before public void setUp() throws IOException { datanode = mock(DataNode.class); storage = mock(DataStorage.class); this.conf = new Configuration(); this.conf.setLong(DFS_DATANODE_SCAN_PERIOD_HOURS_KEY, 0); final DNConf dnConf = new DNConf(conf); when(datanode.getConf()).thenReturn(conf); when(datanode.getDnConf()).thenReturn(dnConf); final BlockScanner disabledBlockScanner = new BlockScanner(datanode, conf); when(datanode.getBlockScanner()).thenReturn(disabledBlockScanner); final ShortCircuitRegistry shortCircuitRegistry = new ShortCircuitRegistry(conf); when(datanode.getShortCircuitRegistry()).thenReturn(shortCircuitRegistry); createStorageDirs(storage, conf, NUM_INIT_VOLUMES); dataset = new FsDatasetImpl(datanode, storage, conf); for (String bpid : BLOCK_POOL_IDS) { dataset.addBlockPool(bpid, conf); } assertEquals(NUM_INIT_VOLUMES, getNumVolumes()); assertEquals(0, dataset.getNumFailedVolumes()); }
static private void checkNumberOfSegmentsAndSlots(final int expectedSegments, final int expectedSlots, ShortCircuitRegistry registry) { registry.visit(new ShortCircuitRegistry.Visitor() { @Override public void accept(HashMap<ShmId, RegisteredShm> segments, HashMultimap<ExtendedBlockId, Slot> slots) { Assert.assertEquals(expectedSegments, segments.size()); Assert.assertEquals(expectedSlots, slots.size()); } }); }
@Test(timeout=60000) public void testPreReceiptVerificationDfsClientCanDoScr() throws Exception { BlockReaderTestUtil.enableShortCircuitShmTracing(); TemporarySocketDirectory sockDir = new TemporarySocketDirectory(); Configuration conf = createShortCircuitConf( "testPreReceiptVerificationDfsClientCanDoScr", sockDir); conf.setLong(DFS_CLIENT_READ_SHORTCIRCUIT_STREAMS_CACHE_EXPIRY_MS_KEY, 1000000000L); MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).build(); cluster.waitActive(); DistributedFileSystem fs = cluster.getFileSystem(); fs.getClient().getConf().brfFailureInjector = new TestPreReceiptVerificationFailureInjector(); final Path TEST_PATH1 = new Path("/test_file1"); DFSTestUtil.createFile(fs, TEST_PATH1, 4096, (short)1, 0xFADE2); final Path TEST_PATH2 = new Path("/test_file2"); DFSTestUtil.createFile(fs, TEST_PATH2, 4096, (short)1, 0xFADE2); DFSTestUtil.readFileBuffer(fs, TEST_PATH1); DFSTestUtil.readFileBuffer(fs, TEST_PATH2); ShortCircuitRegistry registry = cluster.getDataNodes().get(0).getShortCircuitRegistry(); registry.visit(new ShortCircuitRegistry.Visitor() { @Override public void accept(HashMap<ShmId, RegisteredShm> segments, HashMultimap<ExtendedBlockId, Slot> slots) { Assert.assertEquals(1, segments.size()); Assert.assertEquals(2, slots.size()); } }); cluster.shutdown(); sockDir.close(); }
public static void enableShortCircuitShmTracing() { LogManager.getLogger(DfsClientShmManager.class.getName()).setLevel( Level.TRACE); LogManager.getLogger(ShortCircuitRegistry.class.getName()).setLevel( Level.TRACE); LogManager.getLogger(ShortCircuitShm.class.getName()).setLevel( Level.TRACE); LogManager.getLogger(DataNode.class.getName()).setLevel( Level.TRACE); }
@Test(timeout=60000) public void testPreReceiptVerificationDfsClientCanDoScr() throws Exception { BlockReaderTestUtil.enableShortCircuitShmTracing(); TemporarySocketDirectory sockDir = new TemporarySocketDirectory(); Configuration conf = createShortCircuitConf( "testPreReceiptVerificationDfsClientCanDoScr", sockDir); conf.setLong( HdfsClientConfigKeys.Read.ShortCircuit.STREAMS_CACHE_EXPIRY_MS_KEY, 1000000000L); MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).build(); cluster.waitActive(); DistributedFileSystem fs = cluster.getFileSystem(); BlockReaderFactory.setFailureInjectorForTesting( new TestPreReceiptVerificationFailureInjector()); final Path TEST_PATH1 = new Path("/test_file1"); DFSTestUtil.createFile(fs, TEST_PATH1, 4096, (short)1, 0xFADE2); final Path TEST_PATH2 = new Path("/test_file2"); DFSTestUtil.createFile(fs, TEST_PATH2, 4096, (short)1, 0xFADE2); DFSTestUtil.readFileBuffer(fs, TEST_PATH1); DFSTestUtil.readFileBuffer(fs, TEST_PATH2); ShortCircuitRegistry registry = cluster.getDataNodes().get(0).getShortCircuitRegistry(); registry.visit(new ShortCircuitRegistry.Visitor() { @Override public void accept(HashMap<ShmId, RegisteredShm> segments, HashMultimap<ExtendedBlockId, Slot> slots) { Assert.assertEquals(1, segments.size()); Assert.assertEquals(2, slots.size()); } }); cluster.shutdown(); sockDir.close(); }