@Test(timeout=60000) public void testDataXceiverHandlesRequestShortCircuitShmFailure() throws Exception { BlockReaderTestUtil.enableShortCircuitShmTracing(); TemporarySocketDirectory sockDir = new TemporarySocketDirectory(); Configuration conf = createShortCircuitConf( "testDataXceiverHandlesRequestShortCircuitShmFailure", sockDir); conf.setLong(DFS_CLIENT_READ_SHORTCIRCUIT_STREAMS_CACHE_EXPIRY_MS_KEY, 1000000000L); MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).build(); cluster.waitActive(); DistributedFileSystem fs = cluster.getFileSystem(); final Path TEST_PATH1 = new Path("/test_file1"); DFSTestUtil.createFile(fs, TEST_PATH1, 4096, (short)1, 0xFADE1); LOG.info("Setting failure injector and performing a read which " + "should fail..."); DataNodeFaultInjector failureInjector = Mockito.mock(DataNodeFaultInjector.class); Mockito.doAnswer(new Answer<Void>() { @Override public Void answer(InvocationOnMock invocation) throws Throwable { throw new IOException("injected error into sendShmResponse"); } }).when(failureInjector).sendShortCircuitShmResponse(); DataNodeFaultInjector prevInjector = DataNodeFaultInjector.instance; DataNodeFaultInjector.instance = failureInjector; try { // The first read will try to allocate a shared memory segment and slot. // The shared memory segment allocation will fail because of the failure // injector. DFSTestUtil.readFileBuffer(fs, TEST_PATH1); Assert.fail("expected readFileBuffer to fail, but it succeeded."); } catch (Throwable t) { GenericTestUtils.assertExceptionContains("TCP reads were disabled for " + "testing, but we failed to do a non-TCP read.", t); } checkNumberOfSegmentsAndSlots(0, 0, cluster.getDataNodes().get(0).getShortCircuitRegistry()); LOG.info("Clearing failure injector and performing another read..."); DataNodeFaultInjector.instance = prevInjector; fs.getClient().getClientContext().getDomainSocketFactory().clearPathMap(); // The second read should succeed. DFSTestUtil.readFileBuffer(fs, TEST_PATH1); // We should have added a new short-circuit shared memory segment and slot. checkNumberOfSegmentsAndSlots(1, 1, cluster.getDataNodes().get(0).getShortCircuitRegistry()); cluster.shutdown(); sockDir.close(); }
@Test public void testPacketTransmissionDelay() throws Exception { // Make the first datanode to not relay heartbeat packet. DataNodeFaultInjector dnFaultInjector = new DataNodeFaultInjector() { @Override public boolean dropHeartbeatPacket() { return true; } }; DataNodeFaultInjector oldDnInjector = DataNodeFaultInjector.get(); DataNodeFaultInjector.set(dnFaultInjector); // Setting the timeout to be 3 seconds. Normally heartbeat packet // would be sent every 1.5 seconds if there is no data traffic. Configuration conf = new HdfsConfiguration(); conf.set(HdfsClientConfigKeys.DFS_CLIENT_SOCKET_TIMEOUT_KEY, "3000"); MiniDFSCluster cluster = null; try { int numDataNodes = 2; cluster = new MiniDFSCluster.Builder(conf).numDataNodes(numDataNodes).build(); cluster.waitActive(); FileSystem fs = cluster.getFileSystem(); FSDataOutputStream out = fs.create(new Path("noheartbeat.dat"), (short)2); out.write(0x31); out.hflush(); DFSOutputStream dfsOut = (DFSOutputStream)out.getWrappedStream(); // original pipeline DatanodeInfo[] orgNodes = dfsOut.getPipeline(); // Cause the second datanode to timeout on reading packet Thread.sleep(3500); out.write(0x32); out.hflush(); // new pipeline DatanodeInfo[] newNodes = dfsOut.getPipeline(); out.close(); boolean contains = false; for (int i = 0; i < newNodes.length; i++) { if (orgNodes[0].getXferAddr().equals(newNodes[i].getXferAddr())) { throw new IOException("The first datanode should have been replaced."); } if (orgNodes[1].getXferAddr().equals(newNodes[i].getXferAddr())) { contains = true; } } Assert.assertTrue(contains); } finally { DataNodeFaultInjector.set(oldDnInjector); if (cluster != null) { cluster.shutdown(); } } }
@Test(timeout=60000) public void testDataXceiverHandlesRequestShortCircuitShmFailure() throws Exception { BlockReaderTestUtil.enableShortCircuitShmTracing(); TemporarySocketDirectory sockDir = new TemporarySocketDirectory(); Configuration conf = createShortCircuitConf( "testDataXceiverHandlesRequestShortCircuitShmFailure", sockDir); conf.setLong(HdfsClientConfigKeys.Read.ShortCircuit.STREAMS_CACHE_EXPIRY_MS_KEY, 1000000000L); MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).build(); cluster.waitActive(); DistributedFileSystem fs = cluster.getFileSystem(); final Path TEST_PATH1 = new Path("/test_file1"); DFSTestUtil.createFile(fs, TEST_PATH1, 4096, (short)1, 0xFADE1); LOG.info("Setting failure injector and performing a read which " + "should fail..."); DataNodeFaultInjector failureInjector = Mockito.mock(DataNodeFaultInjector.class); Mockito.doAnswer(new Answer<Void>() { @Override public Void answer(InvocationOnMock invocation) throws Throwable { throw new IOException("injected error into sendShmResponse"); } }).when(failureInjector).sendShortCircuitShmResponse(); DataNodeFaultInjector prevInjector = DataNodeFaultInjector.instance; DataNodeFaultInjector.instance = failureInjector; try { // The first read will try to allocate a shared memory segment and slot. // The shared memory segment allocation will fail because of the failure // injector. DFSTestUtil.readFileBuffer(fs, TEST_PATH1); Assert.fail("expected readFileBuffer to fail, but it succeeded."); } catch (Throwable t) { GenericTestUtils.assertExceptionContains("TCP reads were disabled for " + "testing, but we failed to do a non-TCP read.", t); } checkNumberOfSegmentsAndSlots(0, 0, cluster.getDataNodes().get(0).getShortCircuitRegistry()); LOG.info("Clearing failure injector and performing another read..."); DataNodeFaultInjector.instance = prevInjector; fs.getClient().getClientContext().getDomainSocketFactory().clearPathMap(); // The second read should succeed. DFSTestUtil.readFileBuffer(fs, TEST_PATH1); // We should have added a new short-circuit shared memory segment and slot. checkNumberOfSegmentsAndSlots(1, 1, cluster.getDataNodes().get(0).getShortCircuitRegistry()); cluster.shutdown(); sockDir.close(); }