/** * Test that calling Unbuffer closes sockets. */ @Test public void testUnbufferClosesSockets() throws Exception { Configuration conf = new Configuration(); // Set a new ClientContext. This way, we will have our own PeerCache, // rather than sharing one with other unit tests. conf.set(DFSConfigKeys.DFS_CLIENT_CONTEXT, "testUnbufferClosesSocketsContext"); // Disable short-circuit reads. With short-circuit, we wouldn't hold open a // TCP socket. conf.setBoolean(DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_KEY, false); // Set a really long socket timeout to avoid test timing issues. conf.setLong(DFSConfigKeys.DFS_CLIENT_SOCKET_TIMEOUT_KEY, 100000000L); conf.setLong(DFSConfigKeys.DFS_CLIENT_SOCKET_CACHE_EXPIRY_MSEC_KEY, 100000000L); MiniDFSCluster cluster = null; FSDataInputStream stream = null; try { cluster = new MiniDFSCluster.Builder(conf).build(); DistributedFileSystem dfs = (DistributedFileSystem) FileSystem.newInstance(conf); final Path TEST_PATH = new Path("/test1"); DFSTestUtil.createFile(dfs, TEST_PATH, 128, (short)1, 1); stream = dfs.open(TEST_PATH); // Read a byte. This will trigger the creation of a block reader. stream.seek(2); int b = stream.read(); Assert.assertTrue(-1 != b); // The Peer cache should start off empty. PeerCache cache = dfs.getClient().getClientContext().getPeerCache(); Assert.assertEquals(0, cache.size()); // Unbuffer should clear the block reader and return the socket to the // cache. stream.unbuffer(); stream.seek(2); Assert.assertEquals(1, cache.size()); int b2 = stream.read(); Assert.assertEquals(b, b2); } finally { if (stream != null) { IOUtils.cleanup(null, stream); } if (cluster != null) { cluster.shutdown(); } } }
/** * Test that calling Unbuffer closes sockets. */ @Test public void testUnbufferClosesSockets() throws Exception { Configuration conf = new Configuration(); // Set a new ClientContext. This way, we will have our own PeerCache, // rather than sharing one with other unit tests. conf.set(HdfsClientConfigKeys.DFS_CLIENT_CONTEXT, "testUnbufferClosesSocketsContext"); // Disable short-circuit reads. With short-circuit, we wouldn't hold open a // TCP socket. conf.setBoolean(HdfsClientConfigKeys.Read.ShortCircuit.KEY, false); // Set a really long socket timeout to avoid test timing issues. conf.setLong(HdfsClientConfigKeys.DFS_CLIENT_SOCKET_TIMEOUT_KEY, 100000000L); conf.setLong(HdfsClientConfigKeys.DFS_CLIENT_SOCKET_CACHE_EXPIRY_MSEC_KEY, 100000000L); MiniDFSCluster cluster = null; FSDataInputStream stream = null; try { cluster = new MiniDFSCluster.Builder(conf).build(); DistributedFileSystem dfs = (DistributedFileSystem) FileSystem.newInstance(conf); final Path TEST_PATH = new Path("/test1"); DFSTestUtil.createFile(dfs, TEST_PATH, 128, (short)1, 1); stream = dfs.open(TEST_PATH); // Read a byte. This will trigger the creation of a block reader. stream.seek(2); int b = stream.read(); Assert.assertTrue(-1 != b); // The Peer cache should start off empty. PeerCache cache = dfs.getClient().getClientContext().getPeerCache(); Assert.assertEquals(0, cache.size()); // Unbuffer should clear the block reader and return the socket to the // cache. stream.unbuffer(); stream.seek(2); Assert.assertEquals(1, cache.size()); int b2 = stream.read(); Assert.assertEquals(b, b2); } finally { if (stream != null) { IOUtils.cleanup(null, stream); } if (cluster != null) { cluster.shutdown(); } } }